流程图)"/>
KafkaProducer Sender 线程详解(含详细的执行流程图)
log.debug(“Aborting incomplete batches due to forced shutdown”);
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close(); // @4
} catch (Exception e) {
log.error(“Failed to close network client”, e);
}
log.debug(“Shutdown of Kafka producer I/O thread has completed.”);
}
代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。
代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。
代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。
代码@4:关闭 Kafka Client 即网络通信对象。
接下来将分别探讨其上述方法的实现细节。
1.2.1 runOnce 详解
Sender#runOnce
void runOnce() {
// 此处省略与事务消息相关的逻辑
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs); // @1
client.poll(pollTimeout, currentTimeMs); // @2
}
本文不关注事务消息的实现原理,故省略了该部分的代码。
代码@1:调用 sendProducerData 方法发送消息。
代码@2:调用这个方法的作用?
接下来分别对上述两个方法进行深入探究。
1.1.2.1 sendProducerData
接下来将详细分析其实现步骤。
Sender#sendProducerData
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
Step1:首先根据当前时间,根据缓存队列中的数据判断哪些 topic 的 哪些分区已经达到发送条件。达到可发送的条件将在 2.1.1.1 节详细分析。
Sender#sendProducerData
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug(“Requesting metadata update due to unknown leader topics from the batched records: {}”,
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
Step2:如果在待发送的消息未找到其路由信息,则需要首先去 broker 服务器拉取对应的路由信息(分区的 leader 节点信息)。
Sender#sendProducerData
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
Step3:移除在网络层面没有准备好的分区,并且计算在接下来多久的时间间隔内,该分区都将处于未准备状态。
1、在网络环节没有准备好的标准如下:
-
分区没有未完成的更新元素数据请求(metadata)。
-
当前生产者与对端 broker 已建立连接并完成了 TCP 的三次握手。
-
如果启用 SSL、ACL 等机制,相关状态都已就绪。
-
该分区对应的连接正在处理中的请求数时是否超过设定值,默认为 5,可通过属性 max.in.flight.requests.per.connection 来设置。
2、client pollDelayMs 预估分区在接下来多久的时间间隔内都将处于未转变好状态(not ready),其标准如下:
-
如果已与对端的 TCP 连接已创建好,并处于已连接状态,此时如果没有触发限流,则返回0,如果有触发限流,则返回限流等待时间。
-
如果还位于对端建立 TCP 连接,则返回 Long.MAX_VALUE,因为连接建立好后,会
更多推荐
KafkaProducer Sender 线程详解(含详细的执行流程图)
发布评论