深入理解kafka生产者
一、前言
kafka是目前最流行的消息队列之一,但很多同学使用 Kafka 多年,对其生产者(Producer)的使用停留在 API 调用层面,一旦遇到消息发送超时、顺序混乱、性能瓶颈等问题,往往不知从何下手。
本文将深入剖析生产者的实现细节。详细拆解拦截器(Interceptor)-> 序列化器(Serializer)-> 分区器(Partitioner)-> 消息累加器(RecordAccumulator)-> Sender线程 -> NetworkClient 的全链路流程。
二、生产者概述与核心架构
1.1 生产者角色
在Kafka生态中,生产者是数据的入口。它的主要职责是接受业务产生的数据,按照既定规则发送到指定的Topic分区。
1.2 整体架构图
在深入源码前,我们先在脑海中构建生产者的整体模块:
1 2 3 4 5 6 7 8 9 10 11 12 13
| [业务线程] → send() → ProducerInterceptors(拦截器链) → KeySerializer/ValueSerializer(序列化) → Partitioner(分区器) → RecordAccumulator(累加器/双端队列) └──────────────────┐ ↓ [Sender线程] (后台I/O线程) → Batch拆分 → NetworkClient → Selector (基于Java NIO) → Broker
|

核心设计思想:
- 主线程与Sender线程分离:
send() 方法仅负责追加消息到累加器,立即返回Future。真正的网络I/O由后台单线程Sender负责。
- 批量发送:不是来一条发一条,而是积攒一批(Batch),极大提高吞吐量,降低网络开销。
- 异步回调:通过
Future和Callback机制,既不阻塞主线程,又能感知发送结果。
三、生产者初始化
我们从new KafkaProducer<>(props)开始,看看生产者是如何初始化的。
2.1 ProducerConfig:生产者参数
所有的生产者配置都在org.apache.kafka.clients.producer.ProducerConfig中定义。这是一个常量类,每个配置项都对应一个静态常量。
例如,我们最熟悉的几个参数源码定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class ProducerConfig extends AbstractConfig { public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; public static final String ACKS_CONFIG = "acks"; public static final String RETRIES_CONFIG = "retries"; public static final String BATCH_SIZE_CONFIG = "batch.size"; public static final String LINGER_MS_CONFIG = "linger.ms"; public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; }
|
ProducerConfig内部使用了ConfigDef来定义每个参数的类型、默认值、重要性及校验逻辑。
2.2 KafkaProducer构造函数剖析
我们来看KafkaProducer的构造过程(为了清晰,省略部分非核心代码和事务相关逻辑):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient) { try { this.producerConfig = config; this.time = time; String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId;
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); this.keySerializer.configure(config.originals(Collections.singletonMap(KEY_SERIALIZER_CLASS_CONFIG, false)), true); } else { } if (valueSerializer == null) { this.valueSerializer = config.getConfiguredInstance(VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); this.valueSerializer.configure(config.originals(Collections.singletonMap(VALUE_SERIALIZER_CLASS_CONFIG, false)), false); } else { }
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); this.partitioner.configure(new ProducerConfig(partitionProps));
this.metadata = metadata != null ? metadata : new ProducerMetadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, true, clusterResourceListeners); this.metadata.bootstrap(addresses);
this.accumulator = new RecordAccumulator(config, logContext);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time); NetworkClient client = kafkaClient != null ? kafkaClient : new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder, logContext), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), config.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG), config.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG), time, true, new ApiVersions(), this.metrics, new ImmutableMap.Builder<String, String>().build(), logContext);
this.sender = new Sender(logContext, client, this.metadata, this.accumulator, ... 省略若干参数 ...); String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.senderThread = new KafkaThread(ioThreadName, this.sender, true); this.senderThread.start();
} catch (Throwable t) { } }
|
构造阶段小结:
在构造KafkaProducer时,完成了所有组件的组装,并启动了一个名为 kafka-producer-network-thread 的后台线程。这意味着一旦Producer实例化完成,它就已经准备好在后台接收并发送消息了。
四、消息发送主流程——send() 方法
当业务代码调用producer.send(record, callback)时,源码的执行路径依次经过:拦截器 -> 序列化 -> 分区器 -> 累加器。
4.1 入口:send() 与 doSend()
1 2 3 4 5 6 7 8
| @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
|
interceptors.onSend(record)会遍历我们在配置中设置的所有拦截器链,每个拦截器都可以修改ProducerRecord的内容(如添加时间戳、修改消息体等)。
4.2 核心方法:doSend 源码解析
4.2.1 Step1:等待元数据更新
1 2 3 4 5 6 7 8 9 10
| private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; } }
|
waitOnMetadata 是阻塞的。如果本地Metadata缓存中没有该Topic的信息,它会发送METADATA请求给Broker,直到获取到或超时。这解释了为什么发送第一条消息有时会稍微慢一点——因为需要拉取元数据。
4.2.2 Step2:序列化Key和Value
1 2 3 4 5 6 7 8 9
| byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { }
byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { }
|
这里使用的是我们在构造时初始化的序列化器。如果你用的是StringSerializer,它会将String转为UTF-8字节数组。
4.2.3 Step3:分区计算
1 2
| int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition);
|
调用了partition()方法,它内部调用了partitioner.partition()。我们来看默认分区器DefaultPartitioner的逻辑。
深入 DefaultPartitioner 源码
1 2 3 4 5 6 7 8 9 10 11 12
| public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); } else { return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
|
**关于粘性分区 (Sticky Partitioner)**:
Kafka 2.4+ 引入了粘性分区策略。当没有key时,不再严格轮询,而是先“粘”在一个分区上,直到该分区的batch满了或linger时间到了,才切换下一个。这能进一步减少发送的请求数量 。
4.2.4 Step4:验证消息大小
1 2 3
| int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize);
|
检查消息大小是否超过 max.request.size 和 buffer.memory,避免OOM。
4.2.5 Step5:追加到累加器 (RecordAccumulator)
1 2
| RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
|
这里,消息正式从主线程移交给了RecordAccumulator。该方法内部非常复杂,我们后面详细分析。
4.2.6 Step6:唤醒Sender线程
1 2 3 4
| if (result.batchIsFull || result.newBatchCreated) { this.sender.wakeup(); } return result.future;
|
如果追加后batch满了,或者因为这是该分区的第一个batch而被创建,则唤醒可能正在阻塞的Sender线程,让它赶紧干活。
五、拦截器链的使用
在 Kafka 生产者的消息发送流程中,拦截器(Interceptor)是一个强大的扩展点,允许用户在消息发送前、发送完成后(Broker 确认后)以及发生错误时插入自定义逻辑。本文将深入 org.apache.kafka.clients.producer.ProducerInterceptor 接口的源码设计,剖析其在 KafkaProducer 内部的工作机制,并探讨使用拦截器的注意事项与最佳实践。
5.1 ProducerInterceptor 接口定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package org.apache.kafka.clients.producer;
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
void onAcknowledgement(RecordMetadata metadata, Exception exception);
void close();
void configure(Map<String, ?> configs); }
|
接口要点:
- 泛型
<K, V> 与生产者保持一致,确保 key/value 类型匹配。
- **
onSend**:消息发送前的预处理,可修改 ProducerRecord 的几乎所有字段。
- **
onAcknowledgement**:消息确认时的回调,无论成功或失败都会调用(但失败时 metadata 可能为 null)。执行顺序在用户提供的 Callback 之前。
- **
configure**:从生产者配置中提取拦截器专属配置,在初始化时调用。
- **
close**:生产者关闭时清理资源。
5.2 拦截器的配置与初始化
5.2.1 配置方式
通过生产者参数 interceptor.classes 指定一个或多个拦截器类(用逗号分隔):
1 2 3 4
| Properties props = new Properties(); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.CustomInterceptor1,com.example.CustomInterceptor2"); KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
ProducerConfig 中对应的常量:
1
| public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
|
5.2.2 拦截器链的构建
在 KafkaProducer 构造函数中,拦截器链通过 ProducerConfig.getConfiguredInstances 方法实例化:
1 2 3 4
| List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
|
getConfiguredInstances 会利用反射创建每个拦截器实例,并自动调用其 configure 方法(因为 ProducerInterceptor 实现了 Configurable)。
- 实例化顺序与配置中类的顺序一致。
- 使用
ProducerInterceptors 包装器统一管理拦截器链,对外提供批量调用能力。
5.2.3 ProducerInterceptors 包装类
ProducerInterceptors 是 Kafka 内部用于封装拦截器链的辅助类,它提供了与 ProducerInterceptor 同名的方法,但内部遍历所有拦截器依次调用。其核心代码结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class ProducerInterceptors<K, V> implements AutoCloseable { private final List<ProducerInterceptor<K, V>> interceptors;
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) { this.interceptors = interceptors; }
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptorRecord = record; for (ProducerInterceptor<K, V> interceptor : interceptors) { try { interceptorRecord = interceptor.onSend(interceptorRecord); } catch (Exception e) { log.error("Error executing interceptor onSend", e); } if (interceptorRecord == null) { interceptorRecord = record; } } return interceptorRecord; }
public void onAcknowledgement(RecordMetadata metadata, Exception exception) { for (ProducerInterceptor<K, V> interceptor : interceptors) { try { interceptor.onAcknowledgement(metadata, exception); } catch (Exception e) { log.error("Error executing interceptor onAcknowledgement", e); } } }
@Override public void close() { for (ProducerInterceptor<K, V> interceptor : interceptors) { try { interceptor.close(); } catch (Exception e) { log.error("Error closing interceptor", e); } } } }
|
关键设计:
- 异常隔离:单个拦截器的异常不会影响其他拦截器执行,仅记录日志。
onSend 链式传递:上一个拦截器的返回结果作为下一个的输入,允许拦截器修改记录。
- 空保护:如果某拦截器返回
null,则用原始 record 继续传递,避免下游 NPE。
5.3 拦截器调用时机源码剖析
5.3.1 onSend 调用点
在 KafkaProducer.send() 方法中,第一行就是调用拦截器的 onSend:
1 2 3 4 5 6
| @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
|
注意:onSend 的返回值将被用于后续的序列化、分区计算和累加器追加。因此,拦截器可以实现:
- 修改消息的 topic、分区、key、value、headers、timestamp。
- 完全替换消息(返回新对象)。
- 甚至返回
null(但生产者会还原为原记录,不推荐)。
5.3.2 onAcknowledgement 调用点
onAcknowledgement 的调用发生在消息发送结果确定之后,但在用户提供的 Callback 之前。具体位置在 KafkaProducer 内部类 Sender 处理响应时。
调用栈:
Sender 线程从 NetworkClient 获取响应。
- 对于每个完成的
ProduceRequest,调用 RecordBatch.done()。
RecordBatch.done() 中会触发所有挂起的回调(包括用户 callback 和拦截器回调)。
- 最终通过
KafkaProducer 的 interceptors.onAcknowledgement(metadata, exception) 调用拦截器链。
关键代码位置(RecordBatch 的 completeFutureAndFireCallbacks 方法中):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { for (Thunk thunk : thunks) { try { if (exception == null) { RecordMetadata metadata = new RecordMetadata(offsetCounter, ...); thunk.callback.onCompletion(metadata, null); } else { thunk.callback.onCompletion(null, exception); } } catch (Exception e) { ... } } if (thunks.size() > 0 && this.producerInterceptors != null) { this.producerInterceptors.onAcknowledgement(metadata, exception); } }
|
- 调用时机:在用户
Callback 之后
- 异常传递:如果发送失败,
exception 不为 null,metadata 可能为 null。
5.3.3 close 调用点
在 KafkaProducer.close() 方法中,会关闭拦截器链:
1 2 3 4 5 6
| public void close(long timeout, TimeUnit timeUnit) { if (interceptors != null) interceptors.close(); }
|
5.4 典型应用场景与代码示例
5.4.1 应用场景
- 分布式追踪:在消息 header 中注入 Trace ID,实现跨系统链路追踪。
- 消息审计:记录发送的消息内容、时间戳、目标主题等。
- 消息修饰:统一添加业务公共字段到 value 或 header 中。
- 监控告警:统计发送成功率、延迟,或在失败时记录异常。
- 数据脱敏:在发送前对敏感字段进行加密或脱敏。
5.4.2 示例:Trace ID 拦截器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class TracingInterceptor implements ProducerInterceptor<String, String> {
private String applicationName;
@Override public void configure(Map<String, ?> configs) { this.applicationName = (String) configs.get("application.name"); }
@Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { String traceId = MDC.get("traceId"); if (traceId == null) { traceId = UUID.randomUUID().toString(); } List<Header> headers = new ArrayList<>(); if (record.headers() != null) { for (Header h : record.headers()) { headers.add(h); } } headers.add(new RecordHeader("traceId", traceId.getBytes(StandardCharsets.UTF_8))); headers.add(new RecordHeader("appName", applicationName.getBytes(StandardCharsets.UTF_8)));
return new ProducerRecord<>( record.topic(), record.partition(), record.timestamp(), record.key(), record.value(), headers ); }
@Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { }
@Override public void close() {} }
|
5.5 拦截器使用注意
5.5.1 修改 ProducerRecord 的潜在风险
拦截器在 onSend 中可以修改 ProducerRecord 的任何字段,但需要谨慎:
- topic:修改 topic 可能导致消息路由到错误的主题。
- partition:修改 partition 会覆盖分区器逻辑,需确保该分区存在。
- key/value:类型必须与序列化器兼容(因为是修改在序列化之前)。
- headers:可以添加或删除 header,用于传递追踪信息。
- timestamp:修改时间戳可能影响日志保留策略或基于时间的处理。
5.5.2 异常处理与性能影响
- 异常被吞没:拦截器抛出的异常只会被记录日志,不会中断主流程。这意味着拦截器不能用于关键校验(如需校验应返回特殊标记或提前抛出运行时异常,但生产者会捕获并继续)。
- 阻塞风险:
onSend 和 onAcknowledgement 在生产者主线程(onSend)或 Sender 线程(onAcknowledgement)中执行,因此必须避免阻塞操作,否则会严重影响吞吐量。
- 性能开销:每个消息都会经过所有拦截器,链越长性能损耗越大,建议仅用于必要的监控和轻量级修饰。
5.5.3 线程安全性
ProducerInterceptor 的实现必须保证线程安全,因为:
onSend 可能被多个生产线程并发调用(如果多个线程共享同一个 KafkaProducer 实例)。
onAcknowledgement 在单个 Sender 线程中顺序调用,但不同分区的回调可能交错,且与 onSend 并发执行。
因此,拦截器内部应避免使用共享可变状态,或通过同步机制保护。
六、序列化器(Serializer)
在Kafka生产者的消息发送流程中,序列化器(Serializer)扮演着将业务对象转换为字节数组的关键角色。Kafka本身并不关心消息的具体格式,它只存储和传输字节。
因此,正确的序列化与反序列化是保证消息生产与消费一致性的基础。
6.1 序列化器概述
Kafka生产者发送的每条消息由key和value组成,它们可以是任意Java对象。在通过网络传输及存储到磁盘之前,这些对象必须被转换为字节数组。序列化器正是负责这一转换的组件。
回顾生产者的消息发送流程:
1
| 业务线程 → 拦截器(onSend) → 序列化器(key+value) → 分区器 → 累加器 → Sender线程
|
序列化发生在拦截器之后、分区器之前。这意味着拦截器修改后的ProducerRecord会被送入序列化器,而分区器在计算分区时可能需要用到序列化后的key字节(例如基于key哈希的分区策略)。因此,序列化器的输出直接影响分区结果。
6.2 Serializer接口源码分析
序列化器接口位于org.apache.kafka.common.serialization包中,定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package org.apache.kafka.common.serialization;
import java.io.Closeable; import java.util.Map;
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) { }
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) { return serialize(topic, data); }
@Override default void close() { } }
|
接口要点:
- **泛型
T**:指定待序列化的对象类型。
- **
configure**:初始化方法,在实例化后调用。isKey参数指示当前序列化器用于key还是value,这在某些需要区分行为的序列化器中很有用(例如,key可能需要压缩,value不需要)。
serialize重载:早期版本只有serialize(String topic, T data);从2.0开始引入带Headers的方法,允许序列化器访问或修改消息头。默认实现委托给无headers版本,保持兼容。
- 线程安全性:接口未强制要求线程安全,但KafkaProducer会并发调用
serialize方法(多个生产线程共享同一序列化器实例)。因此,自定义序列化器必须保证线程安全。
6.3 内置序列化器实现
Kafka提供了多种内置序列化器,覆盖常见数据类型。它们都位于org.apache.kafka.common.serialization包中。
6.3.1 常用内置序列化器
| 序列化器类 |
说明 |
序列化后的格式 |
ByteArraySerializer |
处理byte[]类型 |
直接返回原数组 |
ByteBufferSerializer |
处理ByteBuffer |
转换为字节数组 |
BytesSerializer |
处理org.apache.kafka.common.utils.Bytes |
返回其内部数组 |
StringSerializer |
处理String |
使用指定的字符集编码(默认UTF-8) |
IntegerSerializer |
处理Integer |
4字节大端序 |
LongSerializer |
处理Long |
8字节大端序 |
DoubleSerializer |
处理Double |
8字节双精度浮点数(IEEE 754) |
FloatSerializer |
处理Float |
4字节单精度浮点数 |
ShortSerializer |
处理Short |
2字节大端序 |
VoidSerializer |
处理Void |
始终返回null,用于无key/value的情况 |
6.3.2 StringSerializer源码解析
以最常用的StringSerializer为例,深入其实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package org.apache.kafka.common.serialization;
public class StringSerializer implements Serializer<String> { private String encoding = StandardCharsets.UTF_8.name();
@Override public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; Object encodingValue = configs.get(propertyName); if (encodingValue == null) encodingValue = configs.get("serializer.encoding"); if (encodingValue != null && encodingValue instanceof String) encoding = (String) encodingValue; }
@Override public byte[] serialize(String topic, String data) { if (data == null) return null; else return data.getBytes(Charset.forName(encoding)); }
@Override public byte[] serialize(String topic, Headers headers, String data) { return serialize(topic, data); }
@Override public void close() { } }
|
要点:
- 编码配置:支持通过
key.serializer.encoding或value.serializer.encoding指定字符集,也可以通过通用配置serializer.encoding指定。默认UTF-8。
- 空处理:如果数据为
null,返回null。在Kafka中,允许key或value为null,但需注意有些消费者可能无法处理null值。
- 线程安全:
encoding字段在configure中设置后只读,后续serialize调用无状态,因此线程安全。
6.3.3 ByteArraySerializer源码解析
ByteArraySerializer极为简单:
1 2 3 4 5 6
| public class ByteArraySerializer implements Serializer<byte[]> { @Override public byte[] serialize(String topic, byte[] data) { return data; } }
|
它直接返回传入的字节数组,不进行任何转换。
6.4 序列化器在KafkaProducer中的初始化
在KafkaProducer构造函数中,序列化器的初始化是一个关键步骤。让我们再次回顾相关代码片段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient) { try {
if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); this.keySerializer.configure(config.originals(Collections.singletonMap(KEY_SERIALIZER_CLASS_CONFIG, false)), true); } else { this.keySerializer = keySerializer; }
if (valueSerializer == null) { this.valueSerializer = config.getConfiguredInstance(VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); this.valueSerializer.configure(config.originals(Collections.singletonMap(VALUE_SERIALIZER_CLASS_CONFIG, false)), false); } else { this.valueSerializer = valueSerializer; }
} }
|
初始化流程:
- 实例化:通过
config.getConfiguredInstance反射创建序列化器对象。该方法会调用无参构造器。
- 配置:调用
configure方法,传入生产者配置的副本(通过config.originals()获取所有原始配置),并传入isKey标志。注意这里传入了Collections.singletonMap(...),实际上config.originals()返回所有配置,然后再额外放入了KEY_SERIALIZER_CLASS_CONFIG或VALUE_SERIALIZER_CLASS_CONFIG作为提示?源码细节略有不同,但核心是传递完整配置,并指明是key还是value。
- 用户直接提供序列化器:如果构造
KafkaProducer时传入了现成的序列化器实例,则不再反射创建,但仍需调用其configure方法(如果尚未配置),以确保配置生效。
注意:序列化器在生产者整个生命周期中只会被初始化一次,之后被所有发送线程共享。
6.5 序列化调用流程源码追踪
当调用producer.send(record)时,在doSend方法中,序列化发生在分区计算之前。关键代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + keySerializer.getClass().getSimpleName() + " specified in key.serializer", cce); }
byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + valueSerializer.getClass().getSimpleName() + " specified in value.serializer", cce); }
|
调用细节:
serialize方法的选择:这里调用的是带Headers的重载方法,以便序列化器可以访问消息头。如果自定义序列化器未覆盖此方法,将回退到无headers版本(因接口提供默认实现)。
- 异常处理:捕获
ClassCastException(通常因为序列化器泛型与实际类型不匹配),包装为SerializationException抛出,中断发送流程。
null值处理:如果record.key()为null,且序列化器的serialize方法返回null,则后续累加器会将key视为空。但需注意,分区器在计算分区时如果key为null,将采用粘性分区策略。
序列化后:
serializedKey和serializedValue作为字节数组,连同其他信息(topic, partition, timestamp, headers)一起被封装,传递给分区器partition()方法。
- 分区器可能使用
serializedKey来计算目标分区(例如基于key哈希)。
6.6 自定义序列化器的实现与注意事项
当内置序列化器无法满足需求时(如需要序列化POJO、使用Avro/Protobuf等),可以自定义实现Serializer接口。
6.6.1 自定义序列化器示例:JSON序列化器
假设我们有一个简单的User类:
1 2 3 4 5
| public class User { private String name; private int age; }
|
自定义JSON序列化器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class JsonSerializer<T> implements Serializer<T> { private final ObjectMapper objectMapper = new ObjectMapper();
@Override public void configure(Map<String, ?> configs, boolean isKey) { }
@Override public byte[] serialize(String topic, T data) { if (data == null) return null; try { return objectMapper.writeValueAsBytes(data); } catch (Exception e) { throw new SerializationException("Error serializing JSON message", e); } }
@Override public void close() { } }
|
使用时:
1 2
| props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
|
但上述实现存在类型擦除问题:JsonSerializer<User>在运行时无法获知泛型类型,导致无法正确反序列化。通常自定义序列化器会与特定的类绑定,或者通过配置传递类型信息。更好的做法是使用Kafka提供的JsonSerializer(在kafka-json-serializer模块中),它通过type配置指定目标类。
七、Partitioner(分区器)
在Kafka生产者的消息发送流程中,分区器(Partitioner)负责决定每条消息应该被发送到目标主题的哪一个分区。这一决策直接影响消息的分布均衡性、顺序性保证以及后续消费者的并行处理能力。
7.1. 分区器概述
分区器位于序列化之后、消息追加到累加器之前。它接收序列化后的key字节数组、value字节数组以及主题信息,输出一个整数分区号。这个分区号决定了消息最终被存放在哪个分区的队列中。
回顾生产者的核心处理流程:
1
| 业务线程 → 拦截器(onSend) → 序列化器(key+value) → 分区器 → 累加器 → Sender线程
|
关键点:
- 分区器的输入包含序列化后的key,因此分区策略可以基于key的二进制内容(例如哈希取模)。
- 如果用户未指定分区(
ProducerRecord构造时未传partition参数),生产者必须调用分区器计算分区。
- 分区器的输出直接决定
RecordAccumulator中batches映射的TopicPartition键。
7.2 Partitioner接口源码分析
分区器接口位于org.apache.kafka.clients.producer.Partitioner,定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Configurable; import java.io.Closeable;
public interface Partitioner extends Configurable, Closeable {
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
void close();
default void onNewBatch(String topic, Cluster cluster, int prevPartition) { } }
|
接口要点:
- **继承自
Configurable**:分区器可以通过configure(Map<String, ?>)方法接收生产者配置,实现参数化。
partition方法:核心方法,所有参数都是只读的,实现必须线程安全。
topic:主题名称。
key/value:原始对象,可能为null。
keyBytes/valueBytes:序列化后的字节数组,可能为null。分区器通常使用keyBytes进行哈希,因为直接使用字节更高效且不受序列化器实现影响。
cluster:集群元数据,包含PartitionInfo列表,可用于获取分区数量、副本分布等信息。
- **
onNewBatch**:Kafka 2.4.0引入,用于支持粘性分区策略。当生产者为一个主题-分区创建新的批次(batch)时,会回调此方法,让分区器有机会切换到下一个分区(粘性分区切换点)。
- 线程安全:
partition方法可能被多个生产线程并发调用,因此实现必须是线程安全的。
7.3 默认分区器(DefaultPartitioner)源码深度解析
Kafka默认使用的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner(旧版本为DefaultPartitioner,位于同一包下,但内部实现已迁移至internals)。其核心逻辑分为两种情况:key不为null与key为null。
7.3.1 源码概览
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package org.apache.kafka.clients.producer.internals;
public class DefaultPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
@Override public void configure(Map<String, ?> configs) {}
@Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); } return Utils.toPositive(Utils.murmur2(keyBytes)) % cluster.partitionCountForTopic(topic); }
@Override public void close() {}
@Override public void onNewBatch(String topic, Cluster cluster, int prevPartition) { stickyPartitionCache.nextPartition(topic, cluster, prevPartition); } }
|
7.3.2 key不为null:一致性哈希(Murmur2)
当消息指定了key(即keyBytes != null),DefaultPartitioner采用Murmur2哈希算法对key字节进行散列,然后对分区数取模,得到目标分区。
1
| Utils.toPositive(Utils.murmur2(keyBytes)) % cluster.partitionCountForTopic(topic);
|
- **
Utils.murmur2**:实现MurmurHash2算法(非加密哈希),速度快且分布均匀。
- **
toPositive**:将哈希值转换为非负数(因为murmur2可能返回负数),通过0x7FFFFFFF & hash实现。
- 分区数:从
cluster元数据获取,确保分区数动态变化时仍正确取模。
特点:
- 相同的key总是被路由到相同的分区(前提是分区数不变),保证消息顺序性(同一key的消息按序到达同一分区)。
- 如果分区数变更,映射关系会变化,可能导致部分消息进入不同分区,影响key的顺序性。
MurmurHash2算法:
MurmurHash2是一种非加密型的哈希函数,主要用于快速哈希计算,特别是在需要高性能的场景中,如数据库和缓存系统中。
MurmurHash2算法的基本步骤:
- 初始化:选择一个常量种子(seed),通常是一个32位或64位的值。
- 混合:对输入数据进行多次混合操作,通常包括以下几个步骤:
- 将输入数据分成多个部分(如32位或64位块)。
- 对每个块应用一系列操作,包括乘法、旋转和异或等。
- 混合操作通常包括将当前块与之前的块进行某种形式的合并。
- 最终处理:经过多次混合后,对最终结果进行最终处理,如添加额外的混合步骤或使用特定的最终输出函数。
- 输出:生成一个固定长度的哈希值,通常为32位或64位。
7.3.3 key为null:粘性分区(Sticky Partitioning)
当key为null时,DefaultPartitioner不再使用简单的轮询(Round Robin),而是采用粘性分区策略,由StickyPartitionCache管理。
1. 为什么引入粘性分区?
在旧版本(2.4之前),key为null时使用轮询策略:每条消息依次分配到不同分区。这会导致小消息频繁创建小批次,增加网络请求数量,降低吞吐量。粘性分区通过让消息暂时“粘”在同一个分区上,直到该分区的批次满了或达到linger.ms,才切换到下一个分区,从而增大批次大小、减少请求次数。
2. StickyPartitionCache源码分析
StickyPartitionCache维护了每个主题当前粘性的分区索引。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| class StickyPartitionCache { private final ConcurrentMap<String, Integer> indexCache; private final AtomicInteger transitionCounter;
public StickyPartitionCache() { this.indexCache = new ConcurrentHashMap<>(); }
public int partition(String topic, Cluster cluster) { Integer part = indexCache.get(topic); if (part == null) { return nextPartition(topic, cluster, -1); } return part; }
public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int newPart = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitions.size(); indexCache.put(topic, newPart); return newPart; } }
|
工作流程:
- 首次发送:
partition方法从缓存获取当前粘性分区,如果不存在(首次),则调用nextPartition随机选择一个分区作为粘性分区。
- 消息追加:消息被追加到该分区的
ProducerBatch中。
- 切换触发:当该分区的批次满了,或者创建新批次时(例如
linger.ms到期),Sender线程在处理过程中会调用onNewBatch回调,通知分区器当前批次已结束。DefaultPartitioner.onNewBatch委托给stickyPartitionCache.nextPartition,为下一个批次选择新的粘性分区(随机选择)。
- 后续发送:新消息将进入新选择的粘性分区,重复上述过程。
优势:
- 减少了请求次数:一批消息集中在一个分区,更容易填满批次。
- 负载均衡:切换时机由批次满或超时决定,而非固定轮询,能更好地适应消息量突增的场景。
注意事项:
- 粘性分区不保证消息在分区间的绝对均衡,但在长期运行和大数据量下接近均衡。
- 通过
partitioner.adaptive.partitioning.enable配置(Kafka 3.0+)可以进一步优化,使粘性分区根据broker负载自适应调整。
7.3.4 分区器的初始化与配置
1. 配置方式
通过生产者参数partitioner.class指定自定义分区器类:
1
| props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner");
|
如果未指定,默认使用org.apache.kafka.clients.producer.internals.DefaultPartitioner。
2. 分区器初始化源码
在KafkaProducer构造函数中,分区器的初始化如下:
1 2 3
| this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); this.partitioner.configure(new ProducerConfig(partitionProps));
|
getConfiguredInstance通过反射创建分区器实例(要求有无参构造器)。
- 调用
configure方法,传入一个只包含分区器相关配置的ProducerConfig(实际传递的是原始配置的子集,但通常传递所有配置)。注意:configure的调用时机在分区器实例化后立即执行。
分区器相关配置:
| 配置项 |
描述 |
默认值 |
partitioner.class |
分区器全限定类名 |
org.apache.kafka.clients.producer.internals.DefaultPartitioner |
partitioner.ignore.keys |
如果为true,即使key不为null,也忽略key使用分区策略(粘性分区) |
false (Kafka 3.0+引入) |
partitioner.adaptive.partitioning.enable |
启用自适应分区,根据broker负载动态调整粘性分区选择 |
true (Kafka 3.0+默认) |
partitioner.availability.timeout.ms |
在自适应分区中,认为一个分区不可用的超时时间 |
默认值由broker端配置决定 |
7.3.5 分区调用时机与消息流转
1 调用点:partition()何时执行
在KafkaProducer.doSend()方法中,分区调用紧接在序列化之后:
1 2 3 4 5 6
| byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
int partition = partition(record, serializedKey, serializedValue, cluster); TopicPartition tp = new TopicPartition(record.topic(), partition);
|
partition方法内部调用partitioner.partition(...)。
2 用户指定分区的优先级
如果用户在构造ProducerRecord时已经指定了分区号(record.partition() != null),则不会调用分区器,直接使用用户指定的分区。这在partition()方法中体现:
1 2 3 4 5 6 7 8 9
| private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); if (partition != null) { return partition; } return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
|
3 分区结果如何影响累加器
分区号确定后,与主题一起构成TopicPartition对象。RecordAccumulator使用它作为键,定位到对应的双端队列Deque<ProducerBatch>:
1 2
| Deque<ProducerBatch> dq = getOrCreateDeque(tp);
|
后续的消息将被追加到该队列的尾部batch中。因此,分区器直接决定了消息进入哪个物理队列。
7.3.6 自定义分区器
1 何时需要自定义分区器?
- 基于业务规则的分区:例如按照用户ID的特定范围划分分区,实现数据局部性。
- 负载均衡优化:根据broker负载或分区大小动态选择分区,避免某些分区过热。
- 地理位置路由:将数据发送到离数据源最近的broker所在的分区。
- 复合键路由:使用key的多个字段组合计算分区。
2 自定义分区器示例
需求:根据用户所在地区(从key中解析)将消息发送到特定分区,例如地区”US”对应分区0,”EU”对应分区1,其他随机。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class RegionPartitioner implements Partitioner {
private Map<String, Integer> regionMapping;
@Override public void configure(Map<String, ?> configs) { regionMapping = new HashMap<>(); String mapping = (String) configs.get("region.mapping"); if (mapping != null) { for (String entry : mapping.split(",")) { String[] kv = entry.split(":"); regionMapping.put(kv[0], Integer.parseInt(kv[1])); } } }
@Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (key == null) { return Utils.toPositive(ThreadLocalRandom.current().nextInt()) % cluster.partitionCountForTopic(topic); } String region = extractRegionFromKey(key.toString()); Integer assignedPartition = regionMapping.get(region); if (assignedPartition != null && assignedPartition < cluster.partitionCountForTopic(topic)) { return assignedPartition; } return Utils.toPositive(Utils.murmur2(keyBytes)) % cluster.partitionCountForTopic(topic); }
private String extractRegionFromKey(String key) { return key.split("-")[0]; }
@Override public void close() {}
@Override public void onNewBatch(String topic, Cluster cluster, int prevPartition) { } }
|
3 注意事项
- 线程安全:
partition方法可能被多线程并发调用,应避免使用共享可变状态(如非线程安全的Map)。可以使用ConcurrentHashMap,或确保状态在configure中初始化后只读。
- 性能:
partition方法在业务线程中执行,应尽量轻量,避免阻塞操作(如网络I/O)。
- 正确处理
null key/value:必须考虑key或value为null的场景。
- 分区范围:返回的分区号必须在0到
cluster.partitionCountForTopic(topic)-1之间,否则生产者会抛出异常。
- 与粘性分区兼容:如果希望自定义分区器也支持粘性行为,应正确实现
onNewBatch方法,并管理内部状态。如果不打算支持,可留空,但生产者仍会在创建新批次时调用它(需保证线程安全)。
- 配置传递:通过
configure方法可以接收自定义参数,便于动态调整分区策略。
八、RecordAccumulator(累加器)
在Kafka生产者的架构中,RecordAccumulator(消息累加器)是连接业务主线程与后台Sender线程的桥梁,也是实现高性能异步发送的关键组件。它负责暂存待发送的消息,以批次(Batch)形式组织,并按照分区维度管理,同时通过内存池机制减少GC压力。
8.1累加器概述
回顾生产者的核心处理流程:
1
| 业务线程 → 拦截器 → 序列化器 → 分区器 → RecordAccumulator → Sender线程 → NetworkClient → Broker
|
- 业务线程调用
send()方法,经过拦截器、序列化、分区计算后,将消息交给RecordAccumulator。
RecordAccumulator将消息追加到对应分区的双端队列(Deque<ProducerBatch>)的尾部批次中。
- 如果追加导致批次满或创建了新批次,则唤醒后台
Sender线程。
- Sender线程循环地从
RecordAccumulator中拉取准备好的批次,按节点分组,通过NetworkClient发送给Broker,并处理响应。
RecordAccumulator的设计目标是:
- 高吞吐:批量发送,减少网络请求次数。
- 低延迟:支持
linger.ms配置,在延迟和吞吐之间权衡。
- 内存控制:通过
buffer.memory限制总内存使用,避免OOM。
- 线程安全:支持多个生产者线程并发追加,同时后台线程读取发送。
8.2 核心数据结构:batches映射与双端队列
RecordAccumulator内部最核心的数据结构是:
1
| private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
|
为什么使用双端队列(Deque)?
- 读写分离:主线程(多个生产者线程)从尾部追加新的消息批次或追加到尾部批次;Sender线程从头部取出批次进行发送。这种设计最大程度减少了锁竞争,只需要对单个队列的头部或尾部进行同步,而不需要锁整个映射。
- 实现方式:
Deque的具体实现是ArrayDeque,在Kafka源码中通过synchronized块对队列实例加锁,保证线程安全。
队列中的元素:ProducerBatch,代表一个消息批次。它包含多个消息(ProducerRecord),是网络发送的最小单位。
8.3 内存管理:BufferPool
为了避免频繁创建和释放ByteBuffer导致的GC开销,Kafka实现了BufferPool(缓冲区池),用于复用ByteBuffer。
8.3.1 BufferPool的结构
1 2 3 4 5 6 7 8 9 10 11 12 13
| private final BufferPool free;
public final class BufferPool { private final long totalMemory; private final int poolableSize; private final Deque<ByteBuffer> free; private final Deque<Condition> waiters; private long nonPooledAvailableMemory; private final ReentrantLock lock; }
|
- **
poolableSize**:默认等于batch.size(16KB)。小于等于此大小的ByteBuffer会被池化,大于此大小的每次直接分配,用完后归还时不会进入池子(因为大缓冲区复用价值低,且会增加池子管理复杂度)。
free队列:存放空闲的、大小为poolableSize的ByteBuffer。
waiters队列:当内存不足时,申请内存的线程会阻塞在Condition上,等待其他线程释放内存后唤醒。
- **
nonPooledAvailableMemory**:除了池化缓冲区之外,还可以使用的非池化内存总量(从totalMemory中减去池化缓冲区占用的内存)。
BufferPool将内存视为三个部分:
- 已被分配出去的ByteBuffer。它们的大小可能各异,可以是或不是poolableSize
- 维持在
free空闲链表的ByteBuffer(free会维持它们的引用)。每一个的大小都是poolableSize。
- 申请这个大小的ByteBuffer时,从
free中取出即可
- 归还这个大小的ByteBuffer时,放回
free
- 未分配空闲内存,这块内存在JVM中,是空闲的。用一个数字nonPooledAvailableMemory代表。它的回收是交给gc的。
- 申请不是poolableSize大小的ByteBuffer时,调用
ByteBuffer.allocate(size)
- 归还不是poolableSize大小的ByteBuffer时,调用者解除对该ByteBuffer的引用,然后nonPooledAvailableMemory增加这个大小即可,其回收交给gc
8.3.2 BufferPool的核心方法:allocate()
当需要创建一个新的ProducerBatch时,RecordAccumulator会调用BufferPool.allocate(int size, long maxTimeToBlock)申请内存。其逻辑如下(简化版):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public ByteBuffer allocate(int size, long maxTimeToBlock) throws InterruptedException { if (size > this.totalMemory) { throw new IllegalArgumentException("大小超过总内存"); } this.lock.lock(); try { if (size == poolableSize && !free.isEmpty()) { return free.pollFirst(); }
int freeListSize = free.size() * poolableSize; long availableMemory = freeListSize + nonPooledAvailableMemory;
if (size <= availableMemory) { freeUp(size); nonPooledAvailableMemory -= size; } else { int accumulated = 0; long startTime = time.nanoseconds(); Condition moreMemory = lock.newCondition(); waiters.addLast(moreMemory); while (accumulated < size) { long remaining = maxTimeToBlock - (time.nanoseconds() - startTime); if (remaining <= 0) { waiters.remove(moreMemory); throw new TimeoutException("超时"); } moreMemory.await(remaining, TimeUnit.NANOSECONDS); availableMemory = free.size() * poolableSize + nonPooledAvailableMemory; if (availableMemory >= size) { freeUp(size); nonPooledAvailableMemory -= size; accumulated = size; } } waiters.remove(moreMemory); } } finally { lock.unlock(); }
if (size == poolableSize) { return ByteBuffer.allocate(size); } else { return ByteBuffer.allocate(size); } }
|
要点:
- 优先使用池化缓冲区:相同大小直接复用。
- 内存紧张时阻塞:生产者线程会被挂起,直到其他线程释放内存(例如Sender发送完批次后调用
deallocate)或超时。
- 条件队列:多个等待者排队,释放内存时唤醒第一个等待者(公平)。
8.3.3 内存释放:deallocate()
当批次发送完成(无论成功或失败),其占用的ByteBuffer会归还给BufferPool:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == poolableSize) { free.add(buffer); } else { nonPooledAvailableMemory += size; } Condition moreMemory = waiters.pollFirst(); if (moreMemory != null) moreMemory.signal(); } finally { lock.unlock(); } }
|
设计思想:
- 池化缓冲区复用:减少GC和内存分配开销。
- 内存总量控制:通过
buffer.memory硬限制,生产者不会无限制使用内存。
- 背压机制:当内存不足时,
send()方法阻塞,实现自然限流。
8.4 消息追加流程:append()方法深度解析
RecordAccumulator.append()是生产者线程调用的核心方法,负责将一条消息放入合适的批次中。其源码(简化版)如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { Deque<ProducerBatch> dq = getOrCreateDeque(tp); synchronized (dq) { ProducerBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); if (future != null) { return new RecordAppendResult(future, dq.size() > 1 || last.isFull(), false); } } }
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(...)); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { ProducerBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); if (future != null) { free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.isFull(), false); } }
MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, baseOffset); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); dq.addLast(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } }
|
8.4.1 核心逻辑拆解
- 步骤2(快速路径):在锁内尝试追加到尾部批次。
last.tryAppend()会检查批次是否有足够空间容纳新消息,如果空间不足返回null。这里锁的范围仅限于单个队列,避免长时间持有锁。
- 步骤4(内存申请):在无锁状态下申请内存,可能阻塞。这避免了在持锁期间等待内存,提高了并发度。
- 步骤5(双检锁):重新获取锁,再次检查尾部批次,防止在申请内存期间其他线程已经创建了新批次并追加成功。如果发现已有批次可用,则归还刚申请的
ByteBuffer,避免内存浪费。
- 步骤6(新建批次):使用申请的
ByteBuffer构建MemoryRecordsBuilder和ProducerBatch,然后将消息追加进去。注意,新建的批次刚创建时只有一条消息,可能未满。
- 返回结果:
RecordAppendResult包含:
future:用于异步等待结果。
batchIsFull:当前批次是否已满(可用于决定是否唤醒Sender)。
newBatchCreated:是否新建了批次(同样可用于唤醒Sender)。
4.2.2 ProducerBatch.tryAppend()内部
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { return null; } long offset = recordsBuilder.append(timestamp, key, value, headers); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, offset, timestamp, now); thunks.add(new Thunk(callback, future)); this.recordCount++; return future; }
|
MemoryRecordsBuilder负责将消息按照Kafka协议格式写入ByteBuffer。当批次写满或关闭时,recordsBuilder会构建出最终的MemoryRecords(包含CRC、消息集合等)。
8.5 批次管理:ProducerBatch的内部结构
ProducerBatch是RecordAccumulator中存储消息的基本单位,其核心字段:
1 2 3 4 5 6 7 8 9 10 11 12
| public final class ProducerBatch { private final TopicPartition topicPartition; private final MemoryRecordsBuilder recordsBuilder; private final long createdMs; private final List<Thunk> thunks; private int recordCount; private int maxRecordSize; private long lastAttemptMs; private boolean retry; private final ProduceRequestResult produceFuture; }
|
关键点:
- **
recordsBuilder**:持有ByteBuffer,不断追加消息。
thunks列表:每条消息对应的回调(用户提供的Callback)和FutureRecordMetadata的引用。当批次完成(成功或失败)时,遍历thunks触发回调并完成Future。
- **
produceFuture**:一个内部类,实现了Future<RecordMetadata>,但实际是复合的future,等待批次中所有消息的元数据。
批次的状态:
- 未满:可以继续追加消息。
- 已满:
recordsBuilder已无空间(由hasRoomFor判断),或者主动关闭(例如强制发送)。
- 关闭(closed):不再接受新消息,等待发送或已发送。
8.6 准备发送:accumulator.ready()方法
Sender线程定期调用accumulator.ready(Cluster cluster, long now)来获取当前哪些分区已经准备好可以发送了。该方法返回ReadyCheckResult,包含一组准备好发送的节点(Set<Node>)以及下次检查的等待时间。
8.6.1 判断准备发送的条件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { TopicPartition tp = entry.getKey(); Deque<ProducerBatch> dq = entry.getValue();
synchronized (dq) { ProducerBatch batch = dq.peekFirst(); if (batch != null) { long waitedTimeMs = nowMs - batch.lastAttemptMs; boolean backingOff = batch.inRetry() && waitedTimeMs >= retryBackoffMs; boolean full = batch.isFull(); boolean expired = waitedTimeMs >= lingerMs && !batch.isClosed(); boolean sendable = full || expired || (batch.recordsBuilder().isClosed() && !backingOff); if (sendable && !backingOff) { Node leader = cluster.leaderFor(tp); if (leader != null) { readyNodes.add(leader); } } else { long timeLeftMs = Math.max(lingerMs - waitedTimeMs, 0); nextReadyCheckDelayMs = Math.min(nextReadyCheckDelayMs, timeLeftMs); } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); }
|
准备发送的条件(sendable):
- 批次已满:
batch.isFull()。
- 等待超时:
waitedTimeMs >= lingerMs,且批次未关闭。这实现了linger.ms延迟发送的语义。
- 批次已强制关闭:例如生产者关闭时。
- 不在退避期:如果批次发送失败需要重试,会进入退避状态(
backingOff),此时即使时间到了也不发送,直到退避结束。
8.6.2 为什么检查头部批次
队列是FIFO的,头部批次是最早创建的,也是应该最先发送的。如果头部批次还没准备好(例如linger.ms未到),那么队列中后面的批次即使准备好了也不会发送,因为要保持顺序(同一分区内消息必须有序发送)。这是为了保证分区内的顺序性。
8.7 数据拉取:accumulator.drain()方法
一旦确定了哪些节点有数据可发,Sender线程会调用accumulator.drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs)从累加器中拉取待发送的批次,并按节点分组返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs) { Map<Integer, List<ProducerBatch>> batches = new HashMap<>(); for (Node node : nodes) { List<ProducerBatch> readyBatches = new ArrayList<>(); List<PartitionInfo> partitions = cluster.partitionsForNode(node.id()); long remaining = maxSize; for (PartitionInfo part : partitions) { TopicPartition tp = new TopicPartition(part.topic(), part.partition()); Deque<ProducerBatch> dq = getDeque(tp); if (dq == null) continue; synchronized (dq) { ProducerBatch batch = dq.peekFirst(); if (batch != null) { boolean backingOff = batch.inRetry() && (nowMs - batch.lastAttemptMs) < retryBackoffMs; if (!backingOff) { long batchSize = batch.estimatedSizeInBytes(); if (batchSize <= remaining) { dq.pollFirst(); readyBatches.add(batch); remaining -= batchSize; } } } } } if (!readyBatches.isEmpty()) { batches.put(node.id(), readyBatches); } } return batches; }
|
drain的关键点:
- 按节点分组:为了批量发送给同一个broker,减少网络连接开销。
- 请求大小限制:
maxSize对应max.request.size,避免单个请求过大。
- 每个分区只取头部批次:确保分区内的消息顺序发送。
- 幂等性/事务特殊处理:如果启用了幂等性或事务,
drain方法还涉及ProducerId和序列号的分配,保证每个批次有唯一的序列号范围。
九、Sender线程
在Kafka生产者的架构中,Sender线程是连接业务线程与网络层的核心枢纽。它负责从RecordAccumulator中拉取准备好的消息批次,通过NetworkClient发送给Broker,并处理响应与回调。理解Sender线程的工作机制,是掌握Kafka生产者高性能、可靠性和顺序性保证的关键。
9.1 Sender线程概述
Sender是一个实现了Runnable接口的类,在KafkaProducer构造函数中被实例化并作为后台线程启动:
1 2 3 4 5 6 7 8 9
| this.sender = new Sender(logContext, client, this.metadata, this.accumulator, ... ); this.senderThread = new KafkaThread(ioThreadName, this.sender, true); this.senderThread.start();
|
- 线程名称:
kafka-producer-network-thread | <clientId>,便于问题定位。
- 生命周期:随生产者启动而运行,随生产者关闭而停止。
- 核心职责:
- 轮询
RecordAccumulator,获取已准备好发送的批次。
- 将这些批次按照目标Broker分组,构建
ProduceRequest。
- 通过
NetworkClient执行网络I/O,发送请求并接收响应。
- 处理发送结果(成功/失败),触发用户回调,释放内存。
- 管理元数据更新、重试退避等。
9.2 Sender主循环:run()与runOnce()
9.2.1 run()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public void run() { log.debug("Starting Kafka producer I/O thread.");
while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in producer I/O thread: ", e); } }
while (!forceClose && this.accumulator.hasUndrained()) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in producer I/O thread: ", e); } }
this.client.close(); log.debug("Shutdown of producer I/O thread completed."); }
|
- 正常运行时:无限循环调用
runOnce(),每次执行一轮发送流程。
- 关闭时的额外处理:如果生产者关闭但累加器中仍有未发送的数据(
hasUndrained()),且不是强制关闭(forceClose为false),会继续尝试发送,直到耗尽数据或超时。
9.2.2 runOnce()核心逻辑
1 2 3 4 5
| void runOnce() { long currentTimeMs = time.milliseconds(); long pollTimeout = sendProducerData(currentTimeMs); client.poll(pollTimeout, currentTimeMs); }
|
runOnce()分为两大步:
- **
sendProducerData**:从累加器获取数据,构建请求,并将请求放入NetworkClient的发送缓冲区。
- **
client.poll**:执行实际的网络I/O(发送请求、接收响应),并触发回调。
9.3 步骤一:获取准备发送的节点(accumulator.ready)
sendProducerData首先调用accumulator.ready(),确定哪些Broker节点上有可发送的数据。
1 2
| RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
|
9.3.1 ready()方法源码
RecordAccumulator.ready()遍历所有分区的队列,检查每个队列头部的批次是否满足发送条件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { TopicPartition tp = entry.getKey(); Deque<ProducerBatch> dq = entry.getValue();
synchronized (dq) { ProducerBatch batch = dq.peekFirst(); if (batch != null) { long waitedTimeMs = nowMs - batch.lastAttemptMs; boolean backingOff = batch.inRetry() && waitedTimeMs < retryBackoffMs; boolean full = batch.isFull(); boolean expired = waitedTimeMs >= lingerMs && !batch.isClosed(); boolean sendable = full || expired || (batch.recordsBuilder().isClosed() && !backingOff);
if (sendable && !backingOff) { Node leader = cluster.leaderFor(tp); if (leader != null) { readyNodes.add(leader); } } else { long timeLeftMs = Math.max(lingerMs - waitedTimeMs, 0); nextReadyCheckDelayMs = Math.min(nextReadyCheckDelayMs, timeLeftMs); } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); }
|
判断条件详解:
- 批次已满(full):批次达到了
batch.size,应立即发送,避免等待浪费吞吐。
- 超时(expired):批次等待时间超过
linger.ms,即使未满也触发发送,满足低延迟需求。
- 批次已关闭(closed):例如生产者正在关闭,需要发送剩余数据。
- 退避(backingOff):如果批次之前发送失败正在重试退避期(
retryBackoffMs内),则暂时不发送。
- leader未知:如果分区的leader副本不存在(如主题正在创建中),则标记为未知leader,后续可能需要唤醒元数据更新。
为什么只检查头部批次?
每个分区的队列是FIFO的,头部批次是最早加入的。只有头部批次被发送后,后面的批次才能被处理。这保证了分区内的消息顺序(如果未开启幂等性且max.in.flight.requests.per.connection>1,仍可能出现乱序,但Kafka通过其他机制保证)。头部批次的状态决定了该分区当前是否可发送。
返回值:
readyNodes:当前有数据可发送的Broker节点集合。
nextReadyCheckDelayMs:下一次检查时,还需要等待的最短时间(用于设置poll的超时)。
9.4 步骤二:从累加器拉取批次(accumulator.drain)
得到readyNodes后,Sender调用accumulator.drain(),从这些节点对应的分区中拉取实际要发送的批次。
1
| Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
|
9.4.1 drain()方法源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs) { Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) { List<ProducerBatch> readyBatches = new ArrayList<>(); List<PartitionInfo> partitions = cluster.partitionsForNode(node.id());
long remaining = maxSize; for (PartitionInfo part : partitions) { TopicPartition tp = new TopicPartition(part.topic(), part.partition()); Deque<ProducerBatch> dq = getDeque(tp); if (dq == null) continue;
synchronized (dq) { ProducerBatch batch = dq.peekFirst(); if (batch != null) { boolean backingOff = batch.inRetry() && (nowMs - batch.lastAttemptMs) < retryBackoffMs; if (!backingOff) { long batchSize = batch.estimatedSizeInBytes(); if (batchSize <= remaining) { dq.pollFirst(); readyBatches.add(batch); remaining -= batchSize; } } } } } if (!readyBatches.isEmpty()) { batches.put(node.id(), readyBatches); } } return batches; }
|
流程:
- 节点遍历:对每个有数据可发的节点,收集其所有分区(leader位于该节点)。
- 分区遍历:按分区顺序处理(实际上是无序的,但每个分区内部只取头部)。
- 请求大小限制:累积的批次总大小不能超过
max.request.size,防止单个请求过大。
- 退避重检:再次检查退避状态,因为从
ready到drain之间可能有微小的时间差。
- 移除批次:一旦批次被取出,就从队列中移除,所有权转移给
Sender。如果发送成功或失败,后续会通过回调释放资源。
注意事项:
- 每个分区最多只取出一个批次(头部),因为只有头部批次发送后,下一个批次才能成为新的头部。这确保了分区内顺序发送。
- 如果分区较多,
drain可能会返回多个批次,这些批次将合并到一个ProduceRequest中发送给同一个Broker。
9.5 步骤三:发送ProduceRequest
获取批次列表后,Sender需要为每个节点构建ClientRequest并发送。这通过sendProduceRequests方法完成。
9.5.1 sendProduceRequests
1 2 3 4 5 6 7 8
| private void sendProduceRequests(Map<Integer, List<ProducerBatch>> batches, long now) { for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) { int nodeId = entry.getKey(); List<ProducerBatch> batchList = entry.getValue(); sendProduceRequest(nodeId, batchList, now); } }
|
9.5.2 sendProduceRequest:构建并发送请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private void sendProduceRequest(int nodeId, List<ProducerBatch> batches, long now) { Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(); for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); produceRecordsByPartition.put(tp, records); }
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(batches.get(0).magic, acks, timeout, produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, batches, now); } };
ClientRequest clientRequest = client.newClientRequest(Integer.toString(nodeId), requestBuilder, now, true, callback); client.send(clientRequest, now); }
|
关键点:
- 构建请求体:将多个分区的批次打包成一个
ProduceRequest。MemoryRecords是已经序列化并压缩好的消息集合。
- 回调处理:
RequestCompletionHandler在请求完成时被调用,由NetworkClient触发。
- 发送:通过
client.send将请求加入NetworkClient的发送队列,实际网络发送在client.poll中执行。
9.6 步骤四:处理网络响应(client.poll)
sendProducerData返回后,runOnce调用client.poll(pollTimeout, now)。poll方法是网络层的核心,负责:
- 发送已排队的请求。
- 接收已完成的响应。
- 处理连接、断开、超时等事件。
9.6.1 NetworkClient.poll() 简要流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public List<ClientResponse> poll(long timeout, long now) { maybeUpdateMetadata(now);
List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, now); handleCompletedReceives(responses, now); handleDisconnections(responses, now); handleConnections(responses, now); handleTimoutRequests(responses, now);
this.selector.poll(Utils.min(timeout, nextTimeToUpdateMs, ...));
return responses; }
|
9.6.2 处理Produce响应:handleProduceResponse
Sender.handleProduceResponse负责解析Broker返回的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private void handleProduceResponse(ClientResponse response, List<ProducerBatch> batches, long now) { if (response.wasDisconnected()) { for (ProducerBatch batch : batches) { completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), now); } } else if (response.versionMismatch() != null) { failBatch(batches, response.versionMismatch()); } else if (response.hasResponse()) { ProduceResponse produceResponse = (ProduceResponse) response.responseBody(); for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; ProduceResponse.PartitionResponse partResp = produceResponse.responses().get(tp); completeBatch(batch, partResp, now); } } else { for (ProducerBatch batch : batches) { completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT), now); } } }
|
9.6.3 completeBatch:完成批次
completeBatch是处理单个批次发送结果的核心方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) { Errors error = response.error; if (error == Errors.NONE) { batch.done(response.baseOffset, response.logAppendTime, null); this.accumulator.deallocate(batch); } else if (canRetry(batch, error)) { this.accumulator.reenqueue(batch, now); } else { batch.done(null, null, error.exception()); this.accumulator.deallocate(batch); } }
|
重试判断:canRetry检查错误是否可重试(如LEADER_NOT_AVAILABLE、NETWORK_EXCEPTION等)以及重试次数是否未耗尽(retries配置)。如果可重试,调用accumulator.reenqueue将批次放回队列头部,等待下次发送。
reenqueue的作用:将批次重新放回对应分区队列的头部,并重置lastAttemptMs,进入退避状态。这样,该批次会再次成为头部,在退避期结束后被重新发送。
9.7 顺序性保证与流量控制
9.7.1 分区内顺序
Kafka保证单个分区内消息的顺序,前提是:
- 幂等性关闭:如果
max.in.flight.requests.per.connection=1,则每次只能有一个未确认的请求,自然保证顺序。
- 幂等性开启:即使
max.in.flight.requests.per.connection>1(最大为5),Kafka通过序列号机制保证顺序和去重。Sender在发送批次时会分配递增的序列号,Broker端校验序列号连续性,乱序的请求会被拒绝。
在Sender中,顺序性主要通过drain的“每次只取头部批次”以及幂等性下的序列号分配来保障。
9.7.2 流量控制:max.in.flight.requests.per.connection
该参数限制每个Broker连接上未确认请求的最大数量。在Sender中,当构建请求时,NetworkClient会检查该连接当前是否已达到上限,如果达到,后续请求将排队,不发送。这间接限制了drain出来的批次被立即发送的能力。
1 2 3 4 5 6 7 8 9
| private void send(ClientRequest request, long now) { String nodeId = request.destination(); if (connectionStates.canSendRequests(nodeId) && inFlightRequests.canSendMore(nodeId)) { doSend(request, now); } else { this.pendingCompletion.complete(response); } }
|
9.8 幂等性与事务的支持
9.8.1 幂等性
当enable.idempotence=true时,Sender在发送请求前需要为每个批次分配ProducerId和序列号。这些信息由TransactionManager管理。
1 2 3 4 5 6 7 8 9 10
| if (transactionManager != null && transactionManager.isIdempotent()) { requestBuilder.setProducerId(transactionManager.producerId()); requestBuilder.setProducerEpoch(transactionManager.producerEpoch()); for (ProducerBatch batch : batches) { requestBuilder.setBaseSequence(tp, batch.baseSequence()); } }
|
baseSequence:该批次中第一条消息的序列号。
- 序列号在
RecordAccumulator追加消息时由TransactionManager分配,每个分区独立递增。
9.8.2 事务
事务生产者涉及TransactionManager与TransactionCoordinator的交互。Sender在事务开启、提交、中止阶段会发送特殊的EndTxnRequest。此外,事务内的普通ProduceRequest会带上transactionalId,Broker端会进行事务验证。
在Sender.runOnce中,如果事务状态需要(如准备提交),也会调用transactionManager相关方法。
9.10 Sender线程与主线程的协作
- 唤醒机制:当主线程通过
accumulator.append新建批次或批次变满时,会调用sender.wakeup(),中断client.poll的阻塞,使Sender立即检查新数据。
- 共享数据:
RecordAccumulator是主线程和Sender线程共享的,通过细粒度锁(队列锁)和并发容器(ConcurrentMap)保证安全。
- 回调执行:
Sender在completeBatch中调用batch.done(),该方法遍历thunks列表,依次执行用户提供的Callback。这些回调在Sender线程中执行,因此不应包含耗时操作,以免阻塞I/O。
10 总结
经过对Kafka生产者源码的逐层剖析,我们从KafkaProducer的构造到Sender线程的循环,深入理解了每个组件的职责与协作方式。