KafkaProducer Sender 线程详解(含详细的执行流程图),java开发面试基础题

编程入门 行业动态 更新时间:2024-10-10 15:25:43

KafkaProducer Sender 线程详解(含详细的执行<a href=https://www.elefans.com/category/jswz/34/1768100.html style=流程图),java开发面试基础题"/>

KafkaProducer Sender 线程详解(含详细的执行流程图),java开发面试基础题

  • TransactionManager transactionManager

事务处理器。

  • Map< TopicPartition, List< ProducerBatch>> inFlightBatches

正在执行发送相关的消息批次。

1.2 run 方法详解

Sender#run

public void run() {

log.debug(“Starting Kafka producer I/O thread.”);

while (running) {

try {

runOnce(); // @1

} catch (Exception e) {

log.error("Uncaught error in kafka producer I/O thread: ", e);

}

}

log.debug(“Beginning shutdown of Kafka producer I/O thread, sending remaining records.”);

while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) { // @2

try {

runOnce();

} catch (Exception e) {

log.error("Uncaught error in kafka producer I/O thread: ", e);

}

}

if (forceClose) { // @3

log.debug(“Aborting incomplete batches due to forced shutdown”);

this.accumulator.abortIncompleteBatches();

}

try {

this.client.close(); // @4

} catch (Exception e) {

log.error(“Failed to close network client”, e);

}

log.debug(“Shutdown of Kafka producer I/O thread has completed.”);

}

代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。

代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。

代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。

代码@4:关闭 Kafka Client 即网络通信对象。

接下来将分别探讨其上述方法的实现细节。

1.2.1 runOnce 详解

Sender#runOnce

void runOnce() {

// 此处省略与事务消息相关的逻辑

long currentTimeMs = time.milliseconds();

long pollTimeout = sendProducerData(currentTimeMs); // @1

client.poll(pollTimeout, currentTimeMs); // @2

}

本文不关注事务消息的实现原理,故省略了该部分的代码。

代码@1:调用 sendProducerData 方法发送消息。

代码@2:调用这个方法的作用?

接下来分别对上述两个方法进行深入探究。

1.1.2.1 sendProducerData

接下来将详细分析其实现步骤。

Sender#sendProducerData

Cluster cluster = metadata.fetch();

// get the list of partitions with data ready to send

RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

Step1:首先根据当前时间,根据缓存队列中的数据判断哪些 topic 的 哪些分区已经达到发送条件。达到可发送的条件将在 2.1.1.1 节详细分析。

Sender#sendProducerData

if (!result.unknownLeaderTopics.isEmpty()) {

for (String topic : result.unknownLeaderTopics)

this.metadata.add(topic);

log.debug(“Requesting metadata update due to unknown leader topics from the batched records: {}”,

result.unknownLeaderTopics);

this.metadata.requestUpdate();

}

Step2:如果在待发送的消息未找到其路由信息,则需要首先去 broker 服务器拉取对应的路由信息(分区的 leader 节点信息)。

Sender#sendProducerData</

更多推荐

KafkaProducer Sender 线程详解(含详细的执行流程图),java开发面试基础题

本文发布于:2024-02-06 20:40:25,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1751184.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:流程图   线程   详解   基础   详细

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!