KafkaProducer Sender 线程详解(含详细的执行流程图)

编程入门 行业动态 更新时间:2024-10-15 04:21:53

KafkaProducer Sender 线程详解(含详细的执行<a href=https://www.elefans.com/category/jswz/34/1768100.html style=流程图)"/>

KafkaProducer Sender 线程详解(含详细的执行流程图)

log.debug(“Aborting incomplete batches due to forced shutdown”);

this.accumulator.abortIncompleteBatches();

}

try {

this.client.close(); // @4

} catch (Exception e) {

log.error(“Failed to close network client”, e);

}

log.debug(“Shutdown of Kafka producer I/O thread has completed.”);

}

代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。

代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。

代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。

代码@4:关闭 Kafka Client 即网络通信对象。

接下来将分别探讨其上述方法的实现细节。

1.2.1 runOnce 详解

Sender#runOnce

void runOnce() {

// 此处省略与事务消息相关的逻辑

long currentTimeMs = time.milliseconds();

long pollTimeout = sendProducerData(currentTimeMs); // @1

client.poll(pollTimeout, currentTimeMs); // @2

}

本文不关注事务消息的实现原理,故省略了该部分的代码。

代码@1:调用 sendProducerData 方法发送消息。

代码@2:调用这个方法的作用?

接下来分别对上述两个方法进行深入探究。

1.1.2.1 sendProducerData

接下来将详细分析其实现步骤。

Sender#sendProducerData

Cluster cluster = metadata.fetch();

// get the list of partitions with data ready to send

RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

Step1:首先根据当前时间,根据缓存队列中的数据判断哪些 topic 的 哪些分区已经达到发送条件。达到可发送的条件将在 2.1.1.1 节详细分析。

Sender#sendProducerData

if (!result.unknownLeaderTopics.isEmpty()) {

for (String topic : result.unknownLeaderTopics)

this.metadata.add(topic);

log.debug(“Requesting metadata update due to unknown leader topics from the batched records: {}”,

result.unknownLeaderTopics);

this.metadata.requestUpdate();

}

Step2:如果在待发送的消息未找到其路由信息,则需要首先去 broker 服务器拉取对应的路由信息(分区的 leader 节点信息)。

Sender#sendProducerData

long notReadyTimeout = Long.MAX_VALUE;

while (iter.hasNext()) {

Node node = iter.next();

if (!this.client.ready(node, now)) {

iter.remove();

notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));

}

}

Step3:移除在网络层面没有准备好的分区,并且计算在接下来多久的时间间隔内,该分区都将处于未准备状态。

1、在网络环节没有准备好的标准如下:

  • 分区没有未完成的更新元素数据请求(metadata)。

  • 当前生产者与对端 broker 已建立连接并完成了 TCP 的三次握手。

  • 如果启用 SSL、ACL 等机制,相关状态都已就绪。

  • 该分区对应的连接正在处理中的请求数时是否超过设定值,默认为 5,可通过属性 max.in.flight.requests.per.connection 来设置。

2、client pollDelayMs 预估分区在接下来多久的时间间隔内都将处于未转变好状态(not ready),其标准如下:

  • 如果已与对端的 TCP 连接已创建好,并处于已连接状态,此时如果没有触发限流,则返回0,如果有触发限流,则返回限流等待时间。

  • 如果还位于对端建立 TCP 连接,则返回 Long.MAX_VALUE,因为连接建立好后,会

更多推荐

KafkaProducer Sender 线程详解(含详细的执行流程图)

本文发布于:2024-02-06 20:40:33,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1751384.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:流程图   线程   详解   详细   KafkaProducer

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!