KafkaProducer Sender 线程详解(含详细的执行流程图)
2021-03-12 19:29
标签:active 实现原理 移除 res last 集合 发送消息 启动 error 温馨提示:本文基于 Kafka 2.2.1 版本。如果觉得源码阅读比较枯燥,本文的中间有 Sender 线程的工作流程图。 上文 《源码分析 Kafka 消息发送流程》 已经详细介绍了 KafkaProducer send 方法的流程,该方法只是将消息追加到 KafKaProducer 的缓存中,并未真正的向 broker 发送消息,本文将来探讨 Kafka 的 Sender 线程。 在 KafkaProducer 中会启动一个单独的线程,其名称为 “kafka-producer-network-thread | clientID”,其中 clientID 为生产者的 id 。 我们先来看一下其各个属性的含义: Sender#run 代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。 代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。 代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。 代码@4:关闭 Kafka Client 即网络通信对象。 接下来将分别探讨其上述方法的实现细节。 Sender#runOnce 本文不关注事务消息的实现原理,故省略了该部分的代码。 代码@1:调用 sendProducerData 方法发送消息。 接下来分别对上述两个方法进行深入探究。 接下来将详细分析其实现步骤。 Step1:首先根据当前时间,根据缓存队列中的数据判断哪些 topic 的 哪些分区已经达到发送条件。达到可发送的条件将在 2.1.1.1 节详细分析。 Sender#sendProducerData Step2:如果在待发送的消息未找到其路由信息,则需要首先去 broker 服务器拉取对应的路由信息(分区的 leader 节点信息)。 Sender#sendProducerData Step3:移除在网络层面没有准备好的分区,并且计算在接下来多久的时间间隔内,该分区都将处于未准备状态。 1、在网络环节没有准备好的标准如下: 2、client pollDelayMs 预估分区在接下来多久的时间间隔内都将处于未转变好状态(not ready),其标准如下: Step4:根据已准备的分区,从缓存区中抽取待发送的消息批次( ProducerBatch ),并且按照 nodeId:List 组织,注意,抽取后的 ProducerBatch 将不能再追加消息了,就算还有剩余空间可用,具体抽取将在下文在详细介绍。 Sender#sendProducerData Step5:将抽取的 ProducerBatch 加入到 inFlightBatches 数据结构,该属性的声明如下:Map Sender#sendProducerData Step6:从 inflightBatches 与 batches 中查找已过期的消息批次( ProducerBatch ),判断是否过期的标准是系统当前时间与 ProducerBatch 创建时间之差是否超过120s,过期时间可以通过参数 delivery.timeout.ms 设置。 Sender#sendProducerData Step7:处理已超时的消息批次,通知该批消息发送失败,即通过设置 KafkaProducer#send 方法返回的凭证中的 FutureRecordMetadata 中的 ProduceRequestResult result,使之调用其 get 方法不会阻塞。 Sender#sendProducerData Step8:收集统计指标,本文不打算详细分析,但后续会专门对 Kafka 的 Metrics 设计进行一个深入的探讨与学习。 Sender#sendProducerData Step9:设置下一次的发送延时,待补充详细分析。 Sender#sendProducerData Step10:该步骤按照 brokerId 分别构建发送请求,即每一个 broker 会将多个 ProducerBatch 一起封装成一个请求进行发送,同一时间,每一个 与 broker 连接只会只能发送一个请求,注意,这里只是构建请求,并最终会通过 NetworkClient#send 方法,将该批数据设置到 NetworkClient 的待发送数据中,此时并没有触发真正的网络调用。 sendProducerData 方法就介绍到这里了,既然这里还没有进行真正的网络请求,那在什么时候触发呢? 我们继续回到 runOnce 方法。 本文并不会详细深入探讨其网络实现部分,Kafka 的 网络通讯后续我会专门详细的介绍,在这里先点出其关键点。 代码@1:尝试更新云数据。 代码@2:触发真正的网络通讯,该方法中会通过收到调用 NIO 中的 Selector#select() 方法,对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker。 代码@3:然后会消息发送,消息接收、断开连接、API版本,超时等结果进行收集。 代码@4:并依次对结果进行唤醒,此时会将响应结果设置到 KafkaProducer#send 方法返回的凭证中,从而唤醒发送客户端,完成一次完整的消息发送流程。 Sender 发送线程的流程就介绍到这里了,接下来首先给出一张流程图,然后对上述流程中一些关键的方法再补充深入探讨一下。 根据上面的源码分析得出上述流程图,图中对重点步骤也详细标注了其关键点。下面我们对上述流程图中 Sender 线程依赖的相关类的核心方法进行解读,以便加深 Sender 线程的理解。 该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。 RecordAccumulator#ready 代码@1:对生产者缓存区 ConcurrentHashMap 代码@2:从生产者元数据缓存中尝试查找分区(TopicPartition) 的 leader 信息,如果不存在,当将该 topic 添加到 unknownLeaderTopics (代码@3),稍后会发送元数据更新请求去 broker 端查找分区的路由信息。 代码@4:如果不在 readyNodes 中就需要判断是否满足条件,isMuted 与顺序消息有关,本文暂时不关注,在后面的顺序消息部分会重点探讨。 代码@5:这里就是判断是否准备好的条件,先一个一个来解读局部变量的含义。 RecordAccumulator#drain 代码@1:我们首先来介绍该方法的参数: 代码@2:遍历所有节点,调用 drainBatchesForOneNode 方法抽取数据,组装成 Map 接下来重点来看一下 drainBatchesForOneNode。 代码@1:根据 brokerId 获取该 broker 上的所有主分区。 代码@4:根据 topic + 分区号从生产者发送缓存区中获取已累积的双端Queue。 代码@5:从双端队列的头部获取一个元素。(消息追加时是追加到队列尾部)。 代码@6:如果当前批次是重试,并且还未到阻塞时间,则跳过该分区。 代码@7:如果当前已抽取的消息总大小 加上新的消息已超过 maxRequestSize,则结束抽取。 代码@8:将当前批次加入到已准备集合中,并关闭该批次,即不在允许向该批次中追加消息。 关于消息发送就介绍到这里,NetworkClient 的 poll 方法内部会调用 Selector 执行就绪事件的选择,并将抽取的消息通过网络发送到 Broker 服务器,关于网络后面的具体实现,将在后续文章中单独介绍。 如果文章对您有所帮助的话,麻烦帮忙点【在看】,谢谢您的认可与支持。 中间件兴趣圈已经陆续发表了源码分析Dubbo、ElasticJob、Mybatis、RocketMQ系列,RocketMQ实战与案例分析、ElasticSearch使用指南等。 KafkaProducer Sender 线程详解(含详细的执行流程图) 标签:active 实现原理 移除 res last 集合 发送消息 启动 error 原文地址:https://blog.51cto.com/15023237/25589111、Sender 线程详解
1.1 类图
kafka 网络通信客户端,主要封装与 broker 的网络通信。
消息记录累积器,消息追加的入口(RecordAccumulator 的 append 方法)。
元数据管理器,即 topic 的路由分区信息。
是否需要保证消息的顺序性。
调用 send 方法发送的最大请求大小,包括 key、消息体序列化后的消息总大小不能超过该值。通过参数 max.request.size 来设置。
用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值如下0、-1、1.
重试次数。
时间工具类。
该线程状态,为 true 表示运行中。
是否强制关闭,此时会忽略正在发送中的消息。
消息发送相关的统计指标收集器。
请求的超时时间。
请求失败时在重试之前等待的时间。
API版本信息。
事务处理器。
正在执行发送相关的消息批次。1.2 run 方法详解
public void run() {
log.debug("Starting Kafka producer I/O thread.");
while (running) {
try {
runOnce(); // @1
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) { // @2
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) { // @3
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close(); // @4
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
1.2.1 runOnce 详解
void runOnce() {
// 此处省略与事务消息相关的逻辑
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs); // @1
client.poll(pollTimeout, currentTimeMs); // @2
}
代码@2:调用这个方法的作用?1.1.2.1 sendProducerData
Sender#sendProducerDataCluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
Sender#sendProducerData// create produce requests
Map
addToInflightBatches(batches);
public void addToInflightBatches(Map
accumulator.resetNextBatchExpiryTime();
List
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
sendProduceRequests(batches, now);
private void sendProduceRequests(Map
1.2.1.2 NetworkClient 的 poll 方法
public List
1.2.2 run 方法流程图
由于在讲解 Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,故接下来重点对上述涉及到 RecordAccumulator 的方法进行一个详细剖析,加强对 Sender 流程的理解。2、RecordAccumulator 核心方法详解
2.1 RecordAccumulator 的 ready 方法详解
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set
该 ProducerBatch 已等待的时长,等于当前时间戳 与 ProducerBatch 的 lastAttemptMs 之差,在 ProducerBatch 创建时或需要重试时会将当前的时间赋值给lastAttemptMs。
当发生异常时发起重试之前的等待时间,默认为 100ms,可通过属性 retry.backoff.ms 配置。
该批次当前已重试的次数。
后台发送是否关闭,即如果需要重试并且等待时间小于 retryBackoffMs ,则 backingOff = true,也意味着该批次未准备好。
send 线程发送消息需要的等待时间,如果 backingOff 为 true,表示该批次是在重试,并且等待时间小于系统设置的需要等待时间,这种情况下 timeToWaitMs = retryBackoffMs 。否则需要等待的时间为 lingerMs。
是否过期,等于已经等待的时间是否大于需要等待的时间,如果把发送看成定时发送的话,expired 为 true 表示定时器已到达触发点,即需要执行。
当前生产者缓存已不够,创建新的 ProducerBatch 时阻塞在申请缓存空间的线程大于0,此时应立即将缓存区中的消息立即发送到服务器。2.2 RecordAccumulator 的 drain方法详解
public Map
集群信息。
已准备好的节点集合。
一次请求最大的字节数。
当前时间。
RecordAccumulator#drainBatchesForOneNodeprivate List
代码@2:初始化 start。这里首先来阐述一下 start 与 drainIndex 。
代码@3:循环从缓存区抽取对应分区中累积的数据。
文章标题:KafkaProducer Sender 线程详解(含详细的执行流程图)
文章链接:http://soscw.com/index.php/essay/63789.html