深入理解kafka生产者

深入理解kafka生产者

一、前言

​ kafka是目前最流行的消息队列之一,但很多同学使用 Kafka 多年,对其生产者(Producer)的使用停留在 API 调用层面,一旦遇到消息发送超时、顺序混乱、性能瓶颈等问题,往往不知从何下手。

​ 本文将深入剖析生产者的实现细节。详细拆解拦截器(Interceptor)-> 序列化器(Serializer)-> 分区器(Partitioner)-> 消息累加器(RecordAccumulator)-> Sender线程 -> NetworkClient 的全链路流程。

二、生产者概述与核心架构

1.1 生产者角色

在Kafka生态中,生产者是数据的入口。它的主要职责是接受业务产生的数据,按照既定规则发送到指定的Topic分区。

1.2 整体架构图

在深入源码前,我们先在脑海中构建生产者的整体模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
[业务线程] 
→ send()
→ ProducerInterceptors(拦截器链)
→ KeySerializer/ValueSerializer(序列化)
→ Partitioner(分区器)
→ RecordAccumulator(累加器/双端队列)
└──────────────────┐

[Sender线程] (后台I/O线程)
→ Batch拆分
→ NetworkClient
→ Selector (基于Java NIO)
→ Broker

kafka生产端流程图

核心设计思想

  1. 主线程与Sender线程分离send() 方法仅负责追加消息到累加器,立即返回Future。真正的网络I/O由后台单线程Sender负责。
  2. 批量发送:不是来一条发一条,而是积攒一批(Batch),极大提高吞吐量,降低网络开销。
  3. 异步回调:通过FutureCallback机制,既不阻塞主线程,又能感知发送结果。

三、生产者初始化

我们从new KafkaProducer<>(props)开始,看看生产者是如何初始化的。

2.1 ProducerConfig:生产者参数

所有的生产者配置都在org.apache.kafka.clients.producer.ProducerConfig中定义。这是一个常量类,每个配置项都对应一个静态常量。

例如,我们最熟悉的几个参数源码定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ProducerConfig.java (Kafka 3.2.0)
public class ProducerConfig extends AbstractConfig {
// ...
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String ACKS_CONFIG = "acks";
public static final String RETRIES_CONFIG = "retries";
public static final String BATCH_SIZE_CONFIG = "batch.size";
public static final String LINGER_MS_CONFIG = "linger.ms";
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
// ... 还有更多,大约有50+个配置项
}

ProducerConfig内部使用了ConfigDef来定义每个参数的类型、默认值、重要性及校验逻辑。

2.2 KafkaProducer构造函数剖析

我们来看KafkaProducer的构造过程(为了清晰,省略部分非核心代码和事务相关逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// KafkaProducer.java
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer,
ProducerMetadata metadata, KafkaClient kafkaClient) {
try {
// 1. 记录基本配置
this.producerConfig = config;
this.time = time;
// 获取客户端ID,如果没配则自动生成 "producer-" + 随机数
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;

// 2. 初始化拦截器链(关键步骤1)
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

// 3. 初始化序列化器(关键步骤2)
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap(KEY_SERIALIZER_CLASS_CONFIG, false)), true);
} else { /*...*/ }
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap(VALUE_SERIALIZER_CLASS_CONFIG, false)), false);
} else { /*...*/ }

// 4. 初始化分区器(关键步骤3)
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
this.partitioner.configure(new ProducerConfig(partitionProps));

// 5. 初始化元数据管理器 (Metadata)
this.metadata = metadata != null ? metadata : new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
this.metadata.bootstrap(addresses);

// 6. 核心:消息累加器 RecordAccumulator(关键步骤4)
this.accumulator = new RecordAccumulator(config, logContext);

// 7. 初始化 NetworkClient 和 Sender 线程(关键步骤5)
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
NetworkClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
config.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
config.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
time,
true,
new ApiVersions(),
this.metrics,
new ImmutableMap.Builder<String, String>().build(),
logContext);

this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
... 省略若干参数 ...);

// 8. 启动Sender线程
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.senderThread = new KafkaThread(ioThreadName, this.sender, true);
this.senderThread.start(); // 后台线程启动

// ...
} catch (Throwable t) { /*异常处理*/ }
}

构造阶段小结
在构造KafkaProducer时,完成了所有组件的组装,并启动了一个名为 kafka-producer-network-thread 的后台线程。这意味着一旦Producer实例化完成,它就已经准备好在后台接收并发送消息了。

四、消息发送主流程——send() 方法

当业务代码调用producer.send(record, callback)时,源码的执行路径依次经过:拦截器 -> 序列化 -> 分区器 -> 累加器

4.1 入口:send()doSend()

1
2
3
4
5
6
7
8
// KafkaProducer.java
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// 第一步:先经过拦截器处理 (OnSend)
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
// 第二步:执行真正的发送逻辑
return doSend(interceptedRecord, callback);
}

interceptors.onSend(record)会遍历我们在配置中设置的所有拦截器链,每个拦截器都可以修改ProducerRecord的内容(如添加时间戳、修改消息体等)。

4.2 核心方法:doSend 源码解析

4.2.1 Step1:等待元数据更新

1
2
3
4
5
6
7
8
9
10
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 等待元数据更新,保证我们知道该topic有哪些分区,leader在哪个broker
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// ...
}
}

waitOnMetadata 是阻塞的。如果本地Metadata缓存中没有该Topic的信息,它会发送METADATA请求给Broker,直到获取到或超时。这解释了为什么发送第一条消息有时会稍微慢一点——因为需要拉取元数据。

4.2.2 Step2:序列化Key和Value

1
2
3
4
5
6
7
8
9
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) { /*...*/ }

byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) { /*...*/ }

这里使用的是我们在构造时初始化的序列化器。如果你用的是StringSerializer,它会将String转为UTF-8字节数组。

4.2.3 Step3:分区计算

1
2
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

调用了partition()方法,它内部调用了partitioner.partition()。我们来看默认分区器DefaultPartitioner的逻辑。

深入 DefaultPartitioner 源码
1
2
3
4
5
6
7
8
9
10
11
12
// DefaultPartitioner.java
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 情况1:没有指定key -> 使用轮询(Round Robin)或粘性分区
return stickyPartitionCache.partition(topic, cluster);
} else {
// 情况2:指定了key -> 对key的hash取模 (murmur2算法)
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

**关于粘性分区 (Sticky Partitioner)**:
Kafka 2.4+ 引入了粘性分区策略。当没有key时,不再严格轮询,而是先“粘”在一个分区上,直到该分区的batch满了或linger时间到了,才切换下一个。这能进一步减少发送的请求数量 。

4.2.4 Step4:验证消息大小

1
2
3
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);

检查消息大小是否超过 max.request.sizebuffer.memory,避免OOM。

4.2.5 Step5:追加到累加器 (RecordAccumulator)

1
2
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);

这里,消息正式从主线程移交给了RecordAccumulator。该方法内部非常复杂,我们后面详细分析。

4.2.6 Step6:唤醒Sender线程

1
2
3
4
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;

如果追加后batch满了,或者因为这是该分区的第一个batch而被创建,则唤醒可能正在阻塞的Sender线程,让它赶紧干活。

五、拦截器链的使用

​ 在 Kafka 生产者的消息发送流程中,拦截器(Interceptor)是一个强大的扩展点,允许用户在消息发送前、发送完成后(Broker 确认后)以及发生错误时插入自定义逻辑。本文将深入 org.apache.kafka.clients.producer.ProducerInterceptor 接口的源码设计,剖析其在 KafkaProducer 内部的工作机制,并探讨使用拦截器的注意事项与最佳实践。

5.1 ProducerInterceptor 接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.apache.kafka.clients.producer;

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {

/**
* 在消息被序列化并分配分区之前调用。
* 可以修改消息内容(topic, partition, key, value, headers, timestamp)。
*/
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

/**
* 当消息发送到 Broker 并收到确认后(或发送失败时)调用。
* 该方法在 producer 回调触发之前执行。
*/
void onAcknowledgement(RecordMetadata metadata, Exception exception);

/**
* 关闭拦截器,用于释放资源。
*/
void close();

/**
* 继承自 Configurable,用于传递配置。
*/
void configure(Map<String, ?> configs);
}

接口要点:

  • 泛型 <K, V> 与生产者保持一致,确保 key/value 类型匹配。
  • **onSend**:消息发送前的预处理,可修改 ProducerRecord 的几乎所有字段。
  • **onAcknowledgement**:消息确认时的回调,无论成功或失败都会调用(但失败时 metadata 可能为 null)。执行顺序在用户提供的 Callback 之前。
  • **configure**:从生产者配置中提取拦截器专属配置,在初始化时调用。
  • **close**:生产者关闭时清理资源。

5.2 拦截器的配置与初始化

5.2.1 配置方式

通过生产者参数 interceptor.classes 指定一个或多个拦截器类(用逗号分隔):

1
2
3
4
Properties props = new Properties();
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.example.CustomInterceptor1,com.example.CustomInterceptor2");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

ProducerConfig 中对应的常量:

1
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";

5.2.2 拦截器链的构建

KafkaProducer 构造函数中,拦截器链通过 ProducerConfig.getConfiguredInstances 方法实例化:

1
2
3
4
// KafkaProducer.java 构造函数片段
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
  • getConfiguredInstances 会利用反射创建每个拦截器实例,并自动调用其 configure 方法(因为 ProducerInterceptor 实现了 Configurable)。
  • 实例化顺序与配置中类的顺序一致。
  • 使用 ProducerInterceptors 包装器统一管理拦截器链,对外提供批量调用能力。

5.2.3 ProducerInterceptors 包装类

ProducerInterceptors 是 Kafka 内部用于封装拦截器链的辅助类,它提供了与 ProducerInterceptor 同名的方法,但内部遍历所有拦截器依次调用。其核心代码结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// ProducerInterceptors.java (Kafka 内部类)
public class ProducerInterceptors<K, V> implements AutoCloseable {
private final List<ProducerInterceptor<K, V>> interceptors;

public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
this.interceptors = interceptors;
}

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptorRecord = record;
for (ProducerInterceptor<K, V> interceptor : interceptors) {
try {
interceptorRecord = interceptor.onSend(interceptorRecord);
} catch (Exception e) {
// 捕获单个拦截器的异常,避免影响后续拦截器
log.error("Error executing interceptor onSend", e);
}
// 如果拦截器返回 null,则使用原 record 继续传递
if (interceptorRecord == null) {
interceptorRecord = record;
}
}
return interceptorRecord;
}

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : interceptors) {
try {
interceptor.onAcknowledgement(metadata, exception);
} catch (Exception e) {
log.error("Error executing interceptor onAcknowledgement", e);
}
}
}

@Override
public void close() {
for (ProducerInterceptor<K, V> interceptor : interceptors) {
try {
interceptor.close();
} catch (Exception e) {
log.error("Error closing interceptor", e);
}
}
}
}

关键设计

  • 异常隔离:单个拦截器的异常不会影响其他拦截器执行,仅记录日志。
  • onSend 链式传递:上一个拦截器的返回结果作为下一个的输入,允许拦截器修改记录。
  • 空保护:如果某拦截器返回 null,则用原始 record 继续传递,避免下游 NPE。

5.3 拦截器调用时机源码剖析

5.3.1 onSend 调用点

KafkaProducer.send() 方法中,第一行就是调用拦截器的 onSend

1
2
3
4
5
6
// KafkaProducer.java
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

注意onSend 的返回值将被用于后续的序列化、分区计算和累加器追加。因此,拦截器可以实现:

  • 修改消息的 topic、分区、key、value、headers、timestamp。
  • 完全替换消息(返回新对象)。
  • 甚至返回 null(但生产者会还原为原记录,不推荐)。

5.3.2 onAcknowledgement 调用点

onAcknowledgement 的调用发生在消息发送结果确定之后,但在用户提供的 Callback 之前。具体位置在 KafkaProducer 内部类 Sender 处理响应时。

调用栈

  1. Sender 线程从 NetworkClient 获取响应。
  2. 对于每个完成的 ProduceRequest,调用 RecordBatch.done()
  3. RecordBatch.done() 中会触发所有挂起的回调(包括用户 callback 和拦截器回调)。
  4. 最终通过 KafkaProducerinterceptors.onAcknowledgement(metadata, exception) 调用拦截器链。

关键代码位置(RecordBatchcompleteFutureAndFireCallbacks 方法中):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// RecordBatch.java (内部类)
void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// ... 先设置 Future
// 然后触发用户 callback
for (Thunk thunk : thunks) {
try {
if (exception == null) {
RecordMetadata metadata = new RecordMetadata(offsetCounter, ...);
thunk.callback.onCompletion(metadata, null);
} else {
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) { ... }
}
// 最后调用拦截器的 onAcknowledgement
if (thunks.size() > 0 && this.producerInterceptors != null) {
this.producerInterceptors.onAcknowledgement(metadata, exception);
}
}
  • 调用时机:在用户 Callback 之后
  • 异常传递:如果发送失败,exception 不为 nullmetadata 可能为 null

5.3.3 close 调用点

KafkaProducer.close() 方法中,会关闭拦截器链:

1
2
3
4
5
6
// KafkaProducer.java
public void close(long timeout, TimeUnit timeUnit) {
// ... 关闭 sender 线程、累加器等
if (interceptors != null)
interceptors.close();
}

5.4 典型应用场景与代码示例

5.4.1 应用场景

  • 分布式追踪:在消息 header 中注入 Trace ID,实现跨系统链路追踪。
  • 消息审计:记录发送的消息内容、时间戳、目标主题等。
  • 消息修饰:统一添加业务公共字段到 value 或 header 中。
  • 监控告警:统计发送成功率、延迟,或在失败时记录异常。
  • 数据脱敏:在发送前对敏感字段进行加密或脱敏。

5.4.2 示例:Trace ID 拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TracingInterceptor implements ProducerInterceptor<String, String> {

private String applicationName;

@Override
public void configure(Map<String, ?> configs) {
this.applicationName = (String) configs.get("application.name");
}

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 从 MDC 或 ThreadLocal 获取 Trace ID
String traceId = MDC.get("traceId");
if (traceId == null) {
traceId = UUID.randomUUID().toString();
}
// 创建新的 Headers
List<Header> headers = new ArrayList<>();
if (record.headers() != null) {
for (Header h : record.headers()) {
headers.add(h);
}
}
headers.add(new RecordHeader("traceId", traceId.getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("appName", applicationName.getBytes(StandardCharsets.UTF_8)));

// 返回新 record(或修改原 record,由于 ProducerRecord 是不可变的,需要新建)
return new ProducerRecord<>(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
record.value(),
headers
);
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 可在此记录发送结果到日志或指标系统
}

@Override
public void close() {}
}

5.5 拦截器使用注意

5.5.1 修改 ProducerRecord 的潜在风险

拦截器在 onSend 中可以修改 ProducerRecord 的任何字段,但需要谨慎:

  • topic:修改 topic 可能导致消息路由到错误的主题。
  • partition:修改 partition 会覆盖分区器逻辑,需确保该分区存在。
  • key/value:类型必须与序列化器兼容(因为是修改在序列化之前)。
  • headers:可以添加或删除 header,用于传递追踪信息。
  • timestamp:修改时间戳可能影响日志保留策略或基于时间的处理。

5.5.2 异常处理与性能影响

  • 异常被吞没:拦截器抛出的异常只会被记录日志,不会中断主流程。这意味着拦截器不能用于关键校验(如需校验应返回特殊标记或提前抛出运行时异常,但生产者会捕获并继续)。
  • 阻塞风险onSendonAcknowledgement 在生产者主线程(onSend)或 Sender 线程(onAcknowledgement)中执行,因此必须避免阻塞操作,否则会严重影响吞吐量。
  • 性能开销:每个消息都会经过所有拦截器,链越长性能损耗越大,建议仅用于必要的监控和轻量级修饰。

5.5.3 线程安全性

ProducerInterceptor 的实现必须保证线程安全,因为:

  • onSend 可能被多个生产线程并发调用(如果多个线程共享同一个 KafkaProducer 实例)。
  • onAcknowledgement 在单个 Sender 线程中顺序调用,但不同分区的回调可能交错,且与 onSend 并发执行。

因此,拦截器内部应避免使用共享可变状态,或通过同步机制保护。

六、序列化器(Serializer)

​ 在Kafka生产者的消息发送流程中,序列化器(Serializer)扮演着将业务对象转换为字节数组的关键角色。Kafka本身并不关心消息的具体格式,它只存储和传输字节。

​ 因此,正确的序列化与反序列化是保证消息生产与消费一致性的基础。

6.1 序列化器概述

​ Kafka生产者发送的每条消息由keyvalue组成,它们可以是任意Java对象。在通过网络传输及存储到磁盘之前,这些对象必须被转换为字节数组。序列化器正是负责这一转换的组件。

​ 回顾生产者的消息发送流程:

1
业务线程 → 拦截器(onSend) → 序列化器(key+value) → 分区器 → 累加器 → Sender线程

​ 序列化发生在拦截器之后、分区器之前。这意味着拦截器修改后的ProducerRecord会被送入序列化器,而分区器在计算分区时可能需要用到序列化后的key字节(例如基于key哈希的分区策略)。因此,序列化器的输出直接影响分区结果。

6.2 Serializer接口源码分析

序列化器接口位于org.apache.kafka.common.serialization包中,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package org.apache.kafka.common.serialization;

import java.io.Closeable;
import java.util.Map;

public interface Serializer<T> extends Closeable {

/**
* 配置此序列化器。
* @param configs 配置键值对,通常来自生产者配置
* @param isKey 是否为key的序列化器(true表示key,false表示value)
*/
default void configure(Map<String, ?> configs, boolean isKey) {
// 默认空实现
}

/**
* 将给定对象转换为字节数组。
* @param topic 主题名称(可能用于基于主题的序列化策略)
* @param data 待序列化的对象
* @return 序列化后的字节数组
*/
byte[] serialize(String topic, T data);

/**
* 将给定对象转换为字节数组,并可以访问消息头(headers)。
* 此方法是Kafka 2.0.0新增的默认方法,向后兼容。
* @param topic 主题名称
* @param headers 消息头(可包含附加信息)
* @param data 待序列化的对象
* @return 序列化后的字节数组
*/
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}

/**
* 关闭序列化器,释放资源。
*/
@Override
default void close() {
// 默认空实现
}
}

接口要点

  • **泛型T**:指定待序列化的对象类型。
  • **configure**:初始化方法,在实例化后调用。isKey参数指示当前序列化器用于key还是value,这在某些需要区分行为的序列化器中很有用(例如,key可能需要压缩,value不需要)。
  • serialize重载:早期版本只有serialize(String topic, T data);从2.0开始引入带Headers的方法,允许序列化器访问或修改消息头。默认实现委托给无headers版本,保持兼容。
  • 线程安全性:接口未强制要求线程安全,但KafkaProducer会并发调用serialize方法(多个生产线程共享同一序列化器实例)。因此,自定义序列化器必须保证线程安全

6.3 内置序列化器实现

Kafka提供了多种内置序列化器,覆盖常见数据类型。它们都位于org.apache.kafka.common.serialization包中。

6.3.1 常用内置序列化器

序列化器类 说明 序列化后的格式
ByteArraySerializer 处理byte[]类型 直接返回原数组
ByteBufferSerializer 处理ByteBuffer 转换为字节数组
BytesSerializer 处理org.apache.kafka.common.utils.Bytes 返回其内部数组
StringSerializer 处理String 使用指定的字符集编码(默认UTF-8)
IntegerSerializer 处理Integer 4字节大端序
LongSerializer 处理Long 8字节大端序
DoubleSerializer 处理Double 8字节双精度浮点数(IEEE 754)
FloatSerializer 处理Float 4字节单精度浮点数
ShortSerializer 处理Short 2字节大端序
VoidSerializer 处理Void 始终返回null,用于无key/value的情况

6.3.2 StringSerializer源码解析

以最常用的StringSerializer为例,深入其实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.apache.kafka.common.serialization;

public class StringSerializer implements Serializer<String> {
private String encoding = StandardCharsets.UTF_8.name();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 从配置中获取字符集编码
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}

@Override
public byte[] serialize(String topic, String data) {
if (data == null)
return null;
else
return data.getBytes(Charset.forName(encoding));
}

@Override
public byte[] serialize(String topic, Headers headers, String data) {
return serialize(topic, data);
}

@Override
public void close() {
// 无需关闭
}
}

要点

  • 编码配置:支持通过key.serializer.encodingvalue.serializer.encoding指定字符集,也可以通过通用配置serializer.encoding指定。默认UTF-8。
  • 空处理:如果数据为null,返回null。在Kafka中,允许key或value为null,但需注意有些消费者可能无法处理null值。
  • 线程安全encoding字段在configure中设置后只读,后续serialize调用无状态,因此线程安全。

6.3.3 ByteArraySerializer源码解析

ByteArraySerializer极为简单:

1
2
3
4
5
6
public class ByteArraySerializer implements Serializer<byte[]> {
@Override
public byte[] serialize(String topic, byte[] data) {
return data;
}
}

它直接返回传入的字节数组,不进行任何转换。

6.4 序列化器在KafkaProducer中的初始化

KafkaProducer构造函数中,序列化器的初始化是一个关键步骤。让我们再次回顾相关代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// KafkaProducer.java
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer,
ProducerMetadata metadata, KafkaClient kafkaClient) {
try {
// ... 省略其他初始化

// 3. 初始化序列化器
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap(KEY_SERIALIZER_CLASS_CONFIG, false)), true);
} else {
this.keySerializer = keySerializer;
// 如果用户传入了序列化器实例,则仍需调用configure,但需注意是否已配置过
// 实际代码中会检查并可能重新配置,此处简化
}

if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap(VALUE_SERIALIZER_CLASS_CONFIG, false)), false);
} else {
this.valueSerializer = valueSerializer;
}

// ... 继续其他初始化
}
}

初始化流程

  1. 实例化:通过config.getConfiguredInstance反射创建序列化器对象。该方法会调用无参构造器。
  2. 配置:调用configure方法,传入生产者配置的副本(通过config.originals()获取所有原始配置),并传入isKey标志。注意这里传入了Collections.singletonMap(...),实际上config.originals()返回所有配置,然后再额外放入了KEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG作为提示?源码细节略有不同,但核心是传递完整配置,并指明是key还是value。
  3. 用户直接提供序列化器:如果构造KafkaProducer时传入了现成的序列化器实例,则不再反射创建,但仍需调用其configure方法(如果尚未配置),以确保配置生效。

注意:序列化器在生产者整个生命周期中只会被初始化一次,之后被所有发送线程共享。

6.5 序列化调用流程源码追踪

当调用producer.send(record)时,在doSend方法中,序列化发生在分区计算之前。关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// KafkaProducer.doSend() 片段
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + keySerializer.getClass().getSimpleName() + " specified in key.serializer", cce);
}

byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + valueSerializer.getClass().getSimpleName() + " specified in value.serializer", cce);
}

调用细节

  • serialize方法的选择:这里调用的是带Headers的重载方法,以便序列化器可以访问消息头。如果自定义序列化器未覆盖此方法,将回退到无headers版本(因接口提供默认实现)。
  • 异常处理:捕获ClassCastException(通常因为序列化器泛型与实际类型不匹配),包装为SerializationException抛出,中断发送流程。
  • null值处理:如果record.key()null,且序列化器的serialize方法返回null,则后续累加器会将key视为空。但需注意,分区器在计算分区时如果key为null,将采用粘性分区策略。

序列化后

  • serializedKeyserializedValue作为字节数组,连同其他信息(topic, partition, timestamp, headers)一起被封装,传递给分区器partition()方法。
  • 分区器可能使用serializedKey来计算目标分区(例如基于key哈希)。

6.6 自定义序列化器的实现与注意事项

当内置序列化器无法满足需求时(如需要序列化POJO、使用Avro/Protobuf等),可以自定义实现Serializer接口。

6.6.1 自定义序列化器示例:JSON序列化器

假设我们有一个简单的User类:

1
2
3
4
5
public class User {
private String name;
private int age;
// getters/setters 省略
}

自定义JSON序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 可在此配置ObjectMapper的特性,例如日期格式
}

@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {
}
}

使用时:

1
2
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());

但上述实现存在类型擦除问题:JsonSerializer<User>在运行时无法获知泛型类型,导致无法正确反序列化。通常自定义序列化器会与特定的类绑定,或者通过配置传递类型信息。更好的做法是使用Kafka提供的JsonSerializer(在kafka-json-serializer模块中),它通过type配置指定目标类。

七、Partitioner(分区器)

​ 在Kafka生产者的消息发送流程中,分区器(Partitioner)负责决定每条消息应该被发送到目标主题的哪一个分区。这一决策直接影响消息的分布均衡性、顺序性保证以及后续消费者的并行处理能力。

7.1. 分区器概述

​ 分区器位于序列化之后消息追加到累加器之前。它接收序列化后的key字节数组、value字节数组以及主题信息,输出一个整数分区号。这个分区号决定了消息最终被存放在哪个分区的队列中。

​ 回顾生产者的核心处理流程:

1
业务线程 → 拦截器(onSend) → 序列化器(key+value) → 分区器 → 累加器 → Sender线程

关键点

  • 分区器的输入包含序列化后的key,因此分区策略可以基于key的二进制内容(例如哈希取模)。
  • 如果用户未指定分区(ProducerRecord构造时未传partition参数),生产者必须调用分区器计算分区。
  • 分区器的输出直接决定RecordAccumulatorbatches映射的TopicPartition键。

7.2 Partitioner接口源码分析

分区器接口位于org.apache.kafka.clients.producer.Partitioner,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import java.io.Closeable;

public interface Partitioner extends Configurable, Closeable {

/**
* 计算给定消息的分区。
* @param topic 主题
* @param key 消息key(可能为null)
* @param keyBytes 序列化后的key字节(可能为null)
* @param value 消息value(可能为null)
* @param valueBytes 序列化后的value字节(可能为null)
* @param cluster 集群元数据(包含主题分区信息、broker节点等)
* @return 分区号(int类型,必须在0到分区数-1之间)
*/
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

/**
* 当分区器不再使用时调用,用于清理资源。
*/
void close();

/**
* 当分区变化时(例如新分区被添加)可能调用此方法,通知分区器更新内部状态。
* 默认实现为空。
*/
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}

接口要点

  • **继承自Configurable**:分区器可以通过configure(Map<String, ?>)方法接收生产者配置,实现参数化。
  • partition方法:核心方法,所有参数都是只读的,实现必须线程安全。
    • topic:主题名称。
    • key/value:原始对象,可能为null
    • keyBytes/valueBytes:序列化后的字节数组,可能为null。分区器通常使用keyBytes进行哈希,因为直接使用字节更高效且不受序列化器实现影响。
    • cluster:集群元数据,包含PartitionInfo列表,可用于获取分区数量、副本分布等信息。
  • **onNewBatch**:Kafka 2.4.0引入,用于支持粘性分区策略。当生产者为一个主题-分区创建新的批次(batch)时,会回调此方法,让分区器有机会切换到下一个分区(粘性分区切换点)。
  • 线程安全partition方法可能被多个生产线程并发调用,因此实现必须是线程安全的。

7.3 默认分区器(DefaultPartitioner)源码深度解析

​ Kafka默认使用的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner(旧版本为DefaultPartitioner,位于同一包下,但内部实现已迁移至internals)。其核心逻辑分为两种情况:key不为nullkey为null

7.3.1 源码概览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.apache.kafka.clients.producer.internals;

public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

@Override
public void configure(Map<String, ?> configs) {}

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
// 情况1:key为null -> 使用粘性分区缓存
return stickyPartitionCache.partition(topic, cluster);
}
// 情况2:key不为null -> 对keyBytes哈希取模
return Utils.toPositive(Utils.murmur2(keyBytes)) % cluster.partitionCountForTopic(topic);
}

@Override
public void close() {}

@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}

7.3.2 key不为null:一致性哈希(Murmur2)

当消息指定了key(即keyBytes != null),DefaultPartitioner采用Murmur2哈希算法对key字节进行散列,然后对分区数取模,得到目标分区。

1
Utils.toPositive(Utils.murmur2(keyBytes)) % cluster.partitionCountForTopic(topic);
  • **Utils.murmur2**:实现MurmurHash2算法(非加密哈希),速度快且分布均匀。
  • **toPositive**:将哈希值转换为非负数(因为murmur2可能返回负数),通过0x7FFFFFFF & hash实现。
  • 分区数:从cluster元数据获取,确保分区数动态变化时仍正确取模。

特点

  • 相同的key总是被路由到相同的分区(前提是分区数不变),保证消息顺序性(同一key的消息按序到达同一分区)。
  • 如果分区数变更,映射关系会变化,可能导致部分消息进入不同分区,影响key的顺序性。

MurmurHash2算法:

​ MurmurHash2是一种非加密型的哈希函数,主要用于快速哈希计算,特别是在需要高性能的场景中,如数据库和缓存系统中。

MurmurHash2算法的基本步骤:

  1. 初始化‌:选择一个常量种子(seed),通常是一个32位或64位的值。
  2. 混合‌:对输入数据进行多次混合操作,通常包括以下几个步骤:
    • 将输入数据分成多个部分(如32位或64位块)。
    • 对每个块应用一系列操作,包括乘法、旋转和异或等。
    • 混合操作通常包括将当前块与之前的块进行某种形式的合并。
  3. 最终处理‌:经过多次混合后,对最终结果进行最终处理,如添加额外的混合步骤或使用特定的最终输出函数。
  4. 输出‌:生成一个固定长度的哈希值,通常为32位或64位。

7.3.3 key为null:粘性分区(Sticky Partitioning)

当key为null时,DefaultPartitioner不再使用简单的轮询(Round Robin),而是采用粘性分区策略,由StickyPartitionCache管理。

1. 为什么引入粘性分区?

在旧版本(2.4之前),key为null时使用轮询策略:每条消息依次分配到不同分区。这会导致小消息频繁创建小批次,增加网络请求数量,降低吞吐量。粘性分区通过让消息暂时“粘”在同一个分区上,直到该分区的批次满了或达到linger.ms,才切换到下一个分区,从而增大批次大小、减少请求次数

2. StickyPartitionCache源码分析

StickyPartitionCache维护了每个主题当前粘性的分区索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// StickyPartitionCache.java
class StickyPartitionCache {
private final ConcurrentMap<String, Integer> indexCache;
private final AtomicInteger transitionCounter; // 用于调试/指标

public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}

public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
// 首次为该主题选择分区,随机选择一个
return nextPartition(topic, cluster, -1);
}
return part;
}

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int newPart = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitions.size();
// 避免新分区与旧分区相同(如果可能),但简单随机也可接受
indexCache.put(topic, newPart);
return newPart;
}
}

工作流程

  1. 首次发送partition方法从缓存获取当前粘性分区,如果不存在(首次),则调用nextPartition随机选择一个分区作为粘性分区。
  2. 消息追加:消息被追加到该分区的ProducerBatch中。
  3. 切换触发:当该分区的批次满了,或者创建新批次时(例如linger.ms到期),Sender线程在处理过程中会调用onNewBatch回调,通知分区器当前批次已结束。DefaultPartitioner.onNewBatch委托给stickyPartitionCache.nextPartition,为下一个批次选择新的粘性分区(随机选择)。
  4. 后续发送:新消息将进入新选择的粘性分区,重复上述过程。

优势

  • 减少了请求次数:一批消息集中在一个分区,更容易填满批次。
  • 负载均衡:切换时机由批次满或超时决定,而非固定轮询,能更好地适应消息量突增的场景。

注意事项

  • 粘性分区不保证消息在分区间的绝对均衡,但在长期运行和大数据量下接近均衡。
  • 通过partitioner.adaptive.partitioning.enable配置(Kafka 3.0+)可以进一步优化,使粘性分区根据broker负载自适应调整。

7.3.4 分区器的初始化与配置

1. 配置方式

通过生产者参数partitioner.class指定自定义分区器类:

1
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner");

如果未指定,默认使用org.apache.kafka.clients.producer.internals.DefaultPartitioner

2. 分区器初始化源码

KafkaProducer构造函数中,分区器的初始化如下:

1
2
3
// KafkaProducer.java 构造函数片段
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
this.partitioner.configure(new ProducerConfig(partitionProps));
  • getConfiguredInstance通过反射创建分区器实例(要求有无参构造器)。
  • 调用configure方法,传入一个只包含分区器相关配置的ProducerConfig(实际传递的是原始配置的子集,但通常传递所有配置)。注意:configure的调用时机在分区器实例化后立即执行。

分区器相关配置

配置项 描述 默认值
partitioner.class 分区器全限定类名 org.apache.kafka.clients.producer.internals.DefaultPartitioner
partitioner.ignore.keys 如果为true,即使key不为null,也忽略key使用分区策略(粘性分区) false (Kafka 3.0+引入)
partitioner.adaptive.partitioning.enable 启用自适应分区,根据broker负载动态调整粘性分区选择 true (Kafka 3.0+默认)
partitioner.availability.timeout.ms 在自适应分区中,认为一个分区不可用的超时时间 默认值由broker端配置决定

7.3.5 分区调用时机与消息流转

1 调用点:partition()何时执行

KafkaProducer.doSend()方法中,分区调用紧接在序列化之后:

1
2
3
4
5
6
// KafkaProducer.doSend() 片段
byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

int partition = partition(record, serializedKey, serializedValue, cluster);
TopicPartition tp = new TopicPartition(record.topic(), partition);

partition方法内部调用partitioner.partition(...)

2 用户指定分区的优先级

如果用户在构造ProducerRecord时已经指定了分区号(record.partition() != null),则不会调用分区器,直接使用用户指定的分区。这在partition()方法中体现:

1
2
3
4
5
6
7
8
9
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
if (partition != null) {
// 用户指定了分区,直接返回
return partition;
}
// 否则调用分区器
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
3 分区结果如何影响累加器

分区号确定后,与主题一起构成TopicPartition对象。RecordAccumulator使用它作为键,定位到对应的双端队列Deque<ProducerBatch>

1
2
// RecordAccumulator.append() 内部
Deque<ProducerBatch> dq = getOrCreateDeque(tp); // tp即TopicPartition

后续的消息将被追加到该队列的尾部batch中。因此,分区器直接决定了消息进入哪个物理队列。

7.3.6 自定义分区器

1 何时需要自定义分区器?
  • 基于业务规则的分区:例如按照用户ID的特定范围划分分区,实现数据局部性。
  • 负载均衡优化:根据broker负载或分区大小动态选择分区,避免某些分区过热。
  • 地理位置路由:将数据发送到离数据源最近的broker所在的分区。
  • 复合键路由:使用key的多个字段组合计算分区。
2 自定义分区器示例

需求:根据用户所在地区(从key中解析)将消息发送到特定分区,例如地区”US”对应分区0,”EU”对应分区1,其他随机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class RegionPartitioner implements Partitioner {

private Map<String, Integer> regionMapping;

@Override
public void configure(Map<String, ?> configs) {
// 从配置中加载映射关系,例如 region.mapping=US:0,EU:1,CN:2
regionMapping = new HashMap<>();
String mapping = (String) configs.get("region.mapping");
if (mapping != null) {
for (String entry : mapping.split(",")) {
String[] kv = entry.split(":");
regionMapping.put(kv[0], Integer.parseInt(kv[1]));
}
}
}

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (key == null) {
// 无key时随机分区(或使用粘性)
return Utils.toPositive(ThreadLocalRandom.current().nextInt()) % cluster.partitionCountForTopic(topic);
}
String region = extractRegionFromKey(key.toString()); // 假设key包含地区前缀
Integer assignedPartition = regionMapping.get(region);
if (assignedPartition != null && assignedPartition < cluster.partitionCountForTopic(topic)) {
return assignedPartition;
}
// 默认使用key哈希
return Utils.toPositive(Utils.murmur2(keyBytes)) % cluster.partitionCountForTopic(topic);
}

private String extractRegionFromKey(String key) {
// 解析逻辑,例如 "US-12345" 返回 "US"
return key.split("-")[0];
}

@Override
public void close() {}

@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
// 可以留空,或者实现自定义的粘性逻辑
}
}
3 注意事项
  1. 线程安全partition方法可能被多线程并发调用,应避免使用共享可变状态(如非线程安全的Map)。可以使用ConcurrentHashMap,或确保状态在configure中初始化后只读。
  2. 性能partition方法在业务线程中执行,应尽量轻量,避免阻塞操作(如网络I/O)。
  3. 正确处理null key/value:必须考虑key或value为null的场景。
  4. 分区范围:返回的分区号必须在0到cluster.partitionCountForTopic(topic)-1之间,否则生产者会抛出异常。
  5. 与粘性分区兼容:如果希望自定义分区器也支持粘性行为,应正确实现onNewBatch方法,并管理内部状态。如果不打算支持,可留空,但生产者仍会在创建新批次时调用它(需保证线程安全)。
  6. 配置传递:通过configure方法可以接收自定义参数,便于动态调整分区策略。

八、RecordAccumulator(累加器)

​ 在Kafka生产者的架构中,RecordAccumulator(消息累加器)是连接业务主线程与后台Sender线程的桥梁,也是实现高性能异步发送的关键组件。它负责暂存待发送的消息,以批次(Batch)形式组织,并按照分区维度管理,同时通过内存池机制减少GC压力。

8.1累加器概述

回顾生产者的核心处理流程:

1
业务线程 → 拦截器 → 序列化器 → 分区器 → RecordAccumulator → Sender线程 → NetworkClient → Broker
  • 业务线程调用send()方法,经过拦截器、序列化、分区计算后,将消息交给RecordAccumulator
  • RecordAccumulator将消息追加到对应分区的双端队列(Deque<ProducerBatch>)的尾部批次中。
  • 如果追加导致批次满或创建了新批次,则唤醒后台Sender线程。
  • Sender线程循环地从RecordAccumulator中拉取准备好的批次,按节点分组,通过NetworkClient发送给Broker,并处理响应。

RecordAccumulator的设计目标是:

  • 高吞吐:批量发送,减少网络请求次数。
  • 低延迟:支持linger.ms配置,在延迟和吞吐之间权衡。
  • 内存控制:通过buffer.memory限制总内存使用,避免OOM。
  • 线程安全:支持多个生产者线程并发追加,同时后台线程读取发送。

8.2 核心数据结构:batches映射与双端队列

RecordAccumulator内部最核心的数据结构是:

1
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

为什么使用双端队列(Deque)?

  • 读写分离:主线程(多个生产者线程)从尾部追加新的消息批次或追加到尾部批次;Sender线程从头部取出批次进行发送。这种设计最大程度减少了锁竞争,只需要对单个队列的头部或尾部进行同步,而不需要锁整个映射。
  • 实现方式Deque的具体实现是ArrayDeque,在Kafka源码中通过synchronized块对队列实例加锁,保证线程安全。

队列中的元素ProducerBatch,代表一个消息批次。它包含多个消息(ProducerRecord),是网络发送的最小单位。

8.3 内存管理:BufferPool

为了避免频繁创建和释放ByteBuffer导致的GC开销,Kafka实现了BufferPool(缓冲区池),用于复用ByteBuffer

8.3.1 BufferPool的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
// RecordAccumulator中持有的BufferPool
private final BufferPool free;

// BufferPool的核心字段
public final class BufferPool {
private final long totalMemory; // 总内存上限,即buffer.memory
private final int poolableSize; // 可池化的每个缓冲区大小,即batch.size
private final Deque<ByteBuffer> free; // 空闲的ByteBuffer队列
private final Deque<Condition> waiters; // 等待内存的线程条件队列
private long nonPooledAvailableMemory; // 非池化可用内存(当请求大小超过poolableSize时使用)
private final ReentrantLock lock; // 可重入锁,用于保证所有操作的线程安全。BufferPool 被设计为在多线程环境下安全使用。
// ...
}
  • **poolableSize**:默认等于batch.size(16KB)。小于等于此大小的ByteBuffer会被池化,大于此大小的每次直接分配,用完后归还时不会进入池子(因为大缓冲区复用价值低,且会增加池子管理复杂度)。
  • free队列:存放空闲的、大小为poolableSizeByteBuffer
  • waiters队列:当内存不足时,申请内存的线程会阻塞在Condition上,等待其他线程释放内存后唤醒。
  • **nonPooledAvailableMemory**:除了池化缓冲区之外,还可以使用的非池化内存总量(从totalMemory中减去池化缓冲区占用的内存)。

BufferPool将内存视为三个部分:

  • 已被分配出去的ByteBuffer。它们的大小可能各异,可以是或不是poolableSize
  • 维持在free空闲链表的ByteBuffer(free会维持它们的引用)。每一个的大小都是poolableSize。
  • 申请这个大小的ByteBuffer时,从free中取出即可
  • 归还这个大小的ByteBuffer时,放回free
  • 未分配空闲内存,这块内存在JVM中,是空闲的。用一个数字nonPooledAvailableMemory代表。它的回收是交给gc的。
  • 申请不是poolableSize大小的ByteBuffer时,调用ByteBuffer.allocate(size)
  • 归还不是poolableSize大小的ByteBuffer时,调用者解除对该ByteBuffer的引用,然后nonPooledAvailableMemory增加这个大小即可,其回收交给gc

8.3.2 BufferPool的核心方法:allocate()

当需要创建一个新的ProducerBatch时,RecordAccumulator会调用BufferPool.allocate(int size, long maxTimeToBlock)申请内存。其逻辑如下(简化版):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public ByteBuffer allocate(int size, long maxTimeToBlock) throws InterruptedException {
if (size > this.totalMemory) {
throw new IllegalArgumentException("大小超过总内存");
}
this.lock.lock();
try {
// 情况1:请求大小等于poolableSize且有free缓冲区,直接返回
if (size == poolableSize && !free.isEmpty()) {
return free.pollFirst();
}

// 计算需要从nonPooledAvailableMemory中分配多少
int freeListSize = free.size() * poolableSize;
long availableMemory = freeListSize + nonPooledAvailableMemory;

if (size <= availableMemory) {
// 内存充足,尝试从free中释放一些非池化内存,或者直接分配
freeUp(size); // 如果free中有缓冲区但大小不匹配,可能会将其释放为nonPooledAvailableMemory
nonPooledAvailableMemory -= size;
} else {
// 内存不足,需要阻塞等待
int accumulated = 0;
long startTime = time.nanoseconds();
Condition moreMemory = lock.newCondition();
waiters.addLast(moreMemory);
while (accumulated < size) {
long remaining = maxTimeToBlock - (time.nanoseconds() - startTime);
if (remaining <= 0) {
waiters.remove(moreMemory);
throw new TimeoutException("超时");
}
moreMemory.await(remaining, TimeUnit.NANOSECONDS);
// 被唤醒后重新检查可用内存
availableMemory = free.size() * poolableSize + nonPooledAvailableMemory;
if (availableMemory >= size) {
// 可以分配了
freeUp(size);
nonPooledAvailableMemory -= size;
accumulated = size;
}
}
waiters.remove(moreMemory);
}
} finally {
lock.unlock();
}

// 分配新的ByteBuffer(堆外或堆内取决于配置)
if (size == poolableSize) {
// 如果是池化大小,通常分配直接内存或堆内存,并记录
return ByteBuffer.allocate(size); // 实际Kafka使用allocateDirect或allocate,根据配置
} else {
return ByteBuffer.allocate(size);
}
}

要点

  • 优先使用池化缓冲区:相同大小直接复用。
  • 内存紧张时阻塞:生产者线程会被挂起,直到其他线程释放内存(例如Sender发送完批次后调用deallocate)或超时。
  • 条件队列:多个等待者排队,释放内存时唤醒第一个等待者(公平)。

8.3.3 内存释放:deallocate()

当批次发送完成(无论成功或失败),其占用的ByteBuffer会归还给BufferPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
if (size == poolableSize) {
// 池化大小,放入free队列
free.add(buffer);
} else {
// 非池化大小,增加nonPooledAvailableMemory,不保留buffer(让其GC)
nonPooledAvailableMemory += size;
}
// 唤醒一个等待内存的线程
Condition moreMemory = waiters.pollFirst();
if (moreMemory != null)
moreMemory.signal();
} finally {
lock.unlock();
}
}

设计思想

  • 池化缓冲区复用:减少GC和内存分配开销。
  • 内存总量控制:通过buffer.memory硬限制,生产者不会无限制使用内存。
  • 背压机制:当内存不足时,send()方法阻塞,实现自然限流。

8.4 消息追加流程:append()方法深度解析

RecordAccumulator.append()是生产者线程调用的核心方法,负责将一条消息放入合适的批次中。其源码(简化版)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// 1. 获取或创建该分区的双端队列
Deque<ProducerBatch> dq = getOrCreateDeque(tp);

// 2. 对队列加锁,尝试追加到尾部批次
synchronized (dq) {
ProducerBatch last = dq.peekLast();
if (last != null) {
// 尝试将当前消息追加到尾部批次
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future != null) {
// 追加成功
return new RecordAppendResult(future, dq.size() > 1 || last.isFull(), false);
}
}
}

// 3. 如果没有合适的批次(队列为空或尾部批次已满/不允许追加),需要新建批次
// 计算需要的内存大小(批次头部开销 + 消息大小)
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(...));

// 4. 从BufferPool申请内存(可能阻塞)
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);

// 5. 再次对队列加锁(双检锁模式),因为可能在申请内存期间有其他线程创建了批次
synchronized (dq) {
// 再次检查尾部批次,因为释放锁期间可能被其他线程追加
ProducerBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future != null) {
// 其他线程已经创建了新批次并成功追加,我们刚申请的buffer就不需要了,归还
free.deallocate(buffer);
return new RecordAppendResult(future, dq.size() > 1 || last.isFull(), false);
}
}

// 6. 创建新的ProducerBatch
MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, baseOffset);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());

// 将当前消息追加到新批次
FutureRecordMetadata future = batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());

// 将批次加入队列尾部
dq.addLast(batch);

// 7. 返回结果,标记是否新批次创建成功以及队列状态
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
}

8.4.1 核心逻辑拆解

  • 步骤2(快速路径):在锁内尝试追加到尾部批次。last.tryAppend()会检查批次是否有足够空间容纳新消息,如果空间不足返回null。这里锁的范围仅限于单个队列,避免长时间持有锁。
  • 步骤4(内存申请):在无锁状态下申请内存,可能阻塞。这避免了在持锁期间等待内存,提高了并发度。
  • 步骤5(双检锁):重新获取锁,再次检查尾部批次,防止在申请内存期间其他线程已经创建了新批次并追加成功。如果发现已有批次可用,则归还刚申请的ByteBuffer,避免内存浪费。
  • 步骤6(新建批次):使用申请的ByteBuffer构建MemoryRecordsBuilderProducerBatch,然后将消息追加进去。注意,新建的批次刚创建时只有一条消息,可能未满。
  • 返回结果RecordAppendResult包含:
    • future:用于异步等待结果。
    • batchIsFull:当前批次是否已满(可用于决定是否唤醒Sender)。
    • newBatchCreated:是否新建了批次(同样可用于唤醒Sender)。

4.2.2 ProducerBatch.tryAppend()内部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// 检查是否有足够空间
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
}
// 追加消息到MemoryRecordsBuilder
long offset = recordsBuilder.append(timestamp, key, value, headers);
// 创建Future并记录回调
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, offset, timestamp, now);
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}

MemoryRecordsBuilder负责将消息按照Kafka协议格式写入ByteBuffer。当批次写满或关闭时,recordsBuilder会构建出最终的MemoryRecords(包含CRC、消息集合等)。

8.5 批次管理:ProducerBatch的内部结构

ProducerBatchRecordAccumulator中存储消息的基本单位,其核心字段:

1
2
3
4
5
6
7
8
9
10
11
12
public final class ProducerBatch {
private final TopicPartition topicPartition;
private final MemoryRecordsBuilder recordsBuilder; // 负责构建消息集
private final long createdMs; // 批次创建时间(用于计算linger)
private final List<Thunk> thunks; // 挂起的回调(每条消息对应一个Thunk)
private int recordCount; // 当前批次包含的消息数量
private int maxRecordSize; // 最大消息大小(用于判断是否超限)
private long lastAttemptMs; // 最后尝试发送时间(用于重试)
private boolean retry; // 是否为重试批次
private final ProduceRequestResult produceFuture; // 用于通知结果
// ...
}

关键点

  • **recordsBuilder**:持有ByteBuffer,不断追加消息。
  • thunks列表:每条消息对应的回调(用户提供的Callback)和FutureRecordMetadata的引用。当批次完成(成功或失败)时,遍历thunks触发回调并完成Future
  • **produceFuture**:一个内部类,实现了Future<RecordMetadata>,但实际是复合的future,等待批次中所有消息的元数据。

批次的状态

  • 未满:可以继续追加消息。
  • 已满recordsBuilder已无空间(由hasRoomFor判断),或者主动关闭(例如强制发送)。
  • 关闭(closed):不再接受新消息,等待发送或已发送。

8.6 准备发送:accumulator.ready()方法

Sender线程定期调用accumulator.ready(Cluster cluster, long now)来获取当前哪些分区已经准备好可以发送了。该方法返回ReadyCheckResult,包含一组准备好发送的节点(Set<Node>)以及下次检查的等待时间。

8.6.1 判断准备发送的条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;

// 检查所有分区的队列
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition tp = entry.getKey();
Deque<ProducerBatch> dq = entry.getValue();

// 检查队列头部批次(最老的批次)
synchronized (dq) {
ProducerBatch batch = dq.peekFirst();
if (batch != null) {
long waitedTimeMs = nowMs - batch.lastAttemptMs; // 上次尝试发送到现在的时间
boolean backingOff = batch.inRetry() && waitedTimeMs >= retryBackoffMs;
boolean full = batch.isFull(); // 批次已满
boolean expired = waitedTimeMs >= lingerMs && !batch.isClosed(); // linger超时且未关闭
boolean sendable = full || expired || (batch.recordsBuilder().isClosed() && !backingOff);

if (sendable && !backingOff) {
// 找到该分区的leader节点
Node leader = cluster.leaderFor(tp);
if (leader != null) {
readyNodes.add(leader);
}
} else {
// 如果没准备好,计算还需要等待多久
long timeLeftMs = Math.max(lingerMs - waitedTimeMs, 0);
nextReadyCheckDelayMs = Math.min(nextReadyCheckDelayMs, timeLeftMs);
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}

准备发送的条件sendable):

  1. 批次已满batch.isFull()
  2. 等待超时waitedTimeMs >= lingerMs,且批次未关闭。这实现了linger.ms延迟发送的语义。
  3. 批次已强制关闭:例如生产者关闭时。
  4. 不在退避期:如果批次发送失败需要重试,会进入退避状态(backingOff),此时即使时间到了也不发送,直到退避结束。

8.6.2 为什么检查头部批次

队列是FIFO的,头部批次是最早创建的,也是应该最先发送的。如果头部批次还没准备好(例如linger.ms未到),那么队列中后面的批次即使准备好了也不会发送,因为要保持顺序(同一分区内消息必须有序发送)。这是为了保证分区内的顺序性。

8.7 数据拉取:accumulator.drain()方法

​ 一旦确定了哪些节点有数据可发,Sender线程会调用accumulator.drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs)从累加器中拉取待发送的批次,并按节点分组返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs) {
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();

for (Node node : nodes) {
List<ProducerBatch> readyBatches = new ArrayList<>();
// 获取该节点上所有分区的leader副本
List<PartitionInfo> partitions = cluster.partitionsForNode(node.id());

// 为了保持顺序,我们需要从每个分区的头部批次开始取,但受限于maxSize(请求大小上限)
long remaining = maxSize;
for (PartitionInfo part : partitions) {
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
Deque<ProducerBatch> dq = getDeque(tp);
if (dq == null) continue;

synchronized (dq) {
ProducerBatch batch = dq.peekFirst();
if (batch != null) {
// 检查batch是否应该被发送(再次确认,因为ready之后可能状态变化)
boolean backingOff = batch.inRetry() && (nowMs - batch.lastAttemptMs) < retryBackoffMs;
if (!backingOff) {
// 估算该batch的大小(近似)
long batchSize = batch.estimatedSizeInBytes();
if (batchSize <= remaining) {
// 从队列头部移除batch
dq.pollFirst();
// 可能还有更多batch在头部?但为了简单,每次只取一个,因为同一分区队列头部未发送前不能取后面的
readyBatches.add(batch);
remaining -= batchSize;
}
}
}
}
}
if (!readyBatches.isEmpty()) {
batches.put(node.id(), readyBatches);
}
}
return batches;
}

drain的关键点

  • 按节点分组:为了批量发送给同一个broker,减少网络连接开销。
  • 请求大小限制maxSize对应max.request.size,避免单个请求过大。
  • 每个分区只取头部批次:确保分区内的消息顺序发送。
  • 幂等性/事务特殊处理:如果启用了幂等性或事务,drain方法还涉及ProducerId和序列号的分配,保证每个批次有唯一的序列号范围。

九、Sender线程

​ 在Kafka生产者的架构中,Sender线程是连接业务线程与网络层的核心枢纽。它负责从RecordAccumulator中拉取准备好的消息批次,通过NetworkClient发送给Broker,并处理响应与回调。理解Sender线程的工作机制,是掌握Kafka生产者高性能、可靠性和顺序性保证的关键。

9.1 Sender线程概述

Sender是一个实现了Runnable接口的类,在KafkaProducer构造函数中被实例化并作为后台线程启动:

1
2
3
4
5
6
7
8
9
// KafkaProducer.java
this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
... // 省略若干参数
);
this.senderThread = new KafkaThread(ioThreadName, this.sender, true);
this.senderThread.start();
  • 线程名称kafka-producer-network-thread | <clientId>,便于问题定位。
  • 生命周期:随生产者启动而运行,随生产者关闭而停止。
  • 核心职责
    • 轮询RecordAccumulator,获取已准备好发送的批次。
    • 将这些批次按照目标Broker分组,构建ProduceRequest
    • 通过NetworkClient执行网络I/O,发送请求并接收响应。
    • 处理发送结果(成功/失败),触发用户回调,释放内存。
    • 管理元数据更新、重试退避等。

9.2 Sender主循环:run()与runOnce()

9.2.1 run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Sender.java
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// 主循环,直到生产者关闭
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in producer I/O thread: ", e);
}
}

// 线程退出前的清理
while (!forceClose && this.accumulator.hasUndrained()) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in producer I/O thread: ", e);
}
}

// 关闭NetworkClient
this.client.close();
log.debug("Shutdown of producer I/O thread completed.");
}
  • 正常运行时:无限循环调用runOnce(),每次执行一轮发送流程。
  • 关闭时的额外处理:如果生产者关闭但累加器中仍有未发送的数据(hasUndrained()),且不是强制关闭(forceClose为false),会继续尝试发送,直到耗尽数据或超时。

9.2.2 runOnce()核心逻辑

1
2
3
4
5
void runOnce() {
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs); // 核心:准备并发送数据
client.poll(pollTimeout, currentTimeMs); // 执行网络I/O,处理响应
}

runOnce()分为两大步:

  • **sendProducerData**:从累加器获取数据,构建请求,并将请求放入NetworkClient的发送缓冲区。
  • **client.poll**:执行实际的网络I/O(发送请求、接收响应),并触发回调。

9.3 步骤一:获取准备发送的节点(accumulator.ready)

sendProducerData首先调用accumulator.ready(),确定哪些Broker节点上有可发送的数据。

1
2
// Sender.java sendProducerData方法
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

9.3.1 ready()方法源码

RecordAccumulator.ready()遍历所有分区的队列,检查每个队列头部的批次是否满足发送条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;

for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition tp = entry.getKey();
Deque<ProducerBatch> dq = entry.getValue();

synchronized (dq) {
ProducerBatch batch = dq.peekFirst();
if (batch != null) {
long waitedTimeMs = nowMs - batch.lastAttemptMs;
boolean backingOff = batch.inRetry() && waitedTimeMs < retryBackoffMs;
boolean full = batch.isFull();
boolean expired = waitedTimeMs >= lingerMs && !batch.isClosed();
boolean sendable = full || expired || (batch.recordsBuilder().isClosed() && !backingOff);

if (sendable && !backingOff) {
Node leader = cluster.leaderFor(tp);
if (leader != null) {
readyNodes.add(leader);
}
} else {
long timeLeftMs = Math.max(lingerMs - waitedTimeMs, 0);
nextReadyCheckDelayMs = Math.min(nextReadyCheckDelayMs, timeLeftMs);
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}

判断条件详解

  • 批次已满(full):批次达到了batch.size,应立即发送,避免等待浪费吞吐。
  • 超时(expired):批次等待时间超过linger.ms,即使未满也触发发送,满足低延迟需求。
  • 批次已关闭(closed):例如生产者正在关闭,需要发送剩余数据。
  • 退避(backingOff):如果批次之前发送失败正在重试退避期(retryBackoffMs内),则暂时不发送。
  • leader未知:如果分区的leader副本不存在(如主题正在创建中),则标记为未知leader,后续可能需要唤醒元数据更新。

为什么只检查头部批次?
每个分区的队列是FIFO的,头部批次是最早加入的。只有头部批次被发送后,后面的批次才能被处理。这保证了分区内的消息顺序(如果未开启幂等性且max.in.flight.requests.per.connection>1,仍可能出现乱序,但Kafka通过其他机制保证)。头部批次的状态决定了该分区当前是否可发送。

返回值

  • readyNodes:当前有数据可发送的Broker节点集合。
  • nextReadyCheckDelayMs:下一次检查时,还需要等待的最短时间(用于设置poll的超时)。

9.4 步骤二:从累加器拉取批次(accumulator.drain)

得到readyNodes后,Sender调用accumulator.drain(),从这些节点对应的分区中拉取实际要发送的批次。

1
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

9.4.1 drain()方法源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs) {
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();

for (Node node : nodes) {
List<ProducerBatch> readyBatches = new ArrayList<>();
List<PartitionInfo> partitions = cluster.partitionsForNode(node.id()); // 获取该节点上所有分区

long remaining = maxSize;
for (PartitionInfo part : partitions) {
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
Deque<ProducerBatch> dq = getDeque(tp);
if (dq == null) continue;

synchronized (dq) {
ProducerBatch batch = dq.peekFirst();
if (batch != null) {
// 再次检查退避(因为可能在ready后进入退避)
boolean backingOff = batch.inRetry() && (nowMs - batch.lastAttemptMs) < retryBackoffMs;
if (!backingOff) {
long batchSize = batch.estimatedSizeInBytes();
if (batchSize <= remaining) {
// 从队列头部移除
dq.pollFirst();
readyBatches.add(batch);
remaining -= batchSize;
}
}
}
}
}
if (!readyBatches.isEmpty()) {
batches.put(node.id(), readyBatches);
}
}
return batches;
}

流程:

  • 节点遍历:对每个有数据可发的节点,收集其所有分区(leader位于该节点)。
  • 分区遍历:按分区顺序处理(实际上是无序的,但每个分区内部只取头部)。
  • 请求大小限制:累积的批次总大小不能超过max.request.size,防止单个请求过大。
  • 退避重检:再次检查退避状态,因为从readydrain之间可能有微小的时间差。
  • 移除批次:一旦批次被取出,就从队列中移除,所有权转移给Sender。如果发送成功或失败,后续会通过回调释放资源。

注意事项

  • 每个分区最多只取出一个批次(头部),因为只有头部批次发送后,下一个批次才能成为新的头部。这确保了分区内顺序发送。
  • 如果分区较多,drain可能会返回多个批次,这些批次将合并到一个ProduceRequest中发送给同一个Broker。

9.5 步骤三:发送ProduceRequest

获取批次列表后,Sender需要为每个节点构建ClientRequest并发送。这通过sendProduceRequests方法完成。

9.5.1 sendProduceRequests

1
2
3
4
5
6
7
8
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> batches, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) {
int nodeId = entry.getKey();
List<ProducerBatch> batchList = entry.getValue();
// 为每个节点发送请求
sendProduceRequest(nodeId, batchList, now);
}
}

9.5.2 sendProduceRequest:构建并发送请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void sendProduceRequest(int nodeId, List<ProducerBatch> batches, long now) {
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
// 将批次转换为MemoryRecords(已完成构建)
MemoryRecords records = batch.records();
produceRecordsByPartition.put(tp, records);
}

// 创建ProduceRequest.Builder
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(batches.get(0).magic,
acks, timeout, produceRecordsByPartition, transactionalId);

// 回调:请求完成后(收到响应或超时)的处理
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, batches, now); // 处理响应
}
};

// 构建ClientRequest并发送到NetworkClient
ClientRequest clientRequest = client.newClientRequest(Integer.toString(nodeId), requestBuilder,
now, true, callback);
client.send(clientRequest, now);
}

关键点

  • 构建请求体:将多个分区的批次打包成一个ProduceRequestMemoryRecords是已经序列化并压缩好的消息集合。
  • 回调处理RequestCompletionHandler在请求完成时被调用,由NetworkClient触发。
  • 发送:通过client.send将请求加入NetworkClient的发送队列,实际网络发送在client.poll中执行。

9.6 步骤四:处理网络响应(client.poll)

sendProducerData返回后,runOnce调用client.poll(pollTimeout, now)poll方法是网络层的核心,负责:

  • 发送已排队的请求。
  • 接收已完成的响应。
  • 处理连接、断开、超时等事件。

9.6.1 NetworkClient.poll() 简要流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// NetworkClient.java
public List<ClientResponse> poll(long timeout, long now) {
// 1. 更新元数据(如果有必要)
maybeUpdateMetadata(now);

// 2. 处理已完成发送的请求(从Selector获取响应)
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, now);
handleCompletedReceives(responses, now);
handleDisconnections(responses, now);
handleConnections(responses, now);
handleTimoutRequests(responses, now);

// 3. 实际调用Selector的poll
this.selector.poll(Utils.min(timeout, nextTimeToUpdateMs, ...));

return responses;
}

9.6.2 处理Produce响应:handleProduceResponse

Sender.handleProduceResponse负责解析Broker返回的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void handleProduceResponse(ClientResponse response, List<ProducerBatch> batches, long now) {
if (response.wasDisconnected()) {
// 连接断开,所有批次都需要重试
for (ProducerBatch batch : batches) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), now);
}
} else if (response.versionMismatch() != null) {
// 版本不匹配,视为不可重试错误
failBatch(batches, response.versionMismatch());
} else if (response.hasResponse()) {
// 正常响应
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (ProducerBatch batch : batches) {
// 根据分区解析结果
TopicPartition tp = batch.topicPartition;
ProduceResponse.PartitionResponse partResp = produceResponse.responses().get(tp);
completeBatch(batch, partResp, now);
}
} else {
// 没有响应(如请求超时),重试
for (ProducerBatch batch : batches) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT), now);
}
}
}

9.6.3 completeBatch:完成批次

completeBatch是处理单个批次发送结果的核心方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
Errors error = response.error;
if (error == Errors.NONE) {
// 成功:记录元数据,触发回调,释放内存
batch.done(response.baseOffset, response.logAppendTime, null);
this.accumulator.deallocate(batch);
} else if (canRetry(batch, error)) {
// 可重试错误:重新入队
this.accumulator.reenqueue(batch, now);
} else {
// 不可重试错误:标记失败
batch.done(null, null, error.exception());
this.accumulator.deallocate(batch);
}
}

重试判断canRetry检查错误是否可重试(如LEADER_NOT_AVAILABLENETWORK_EXCEPTION等)以及重试次数是否未耗尽(retries配置)。如果可重试,调用accumulator.reenqueue将批次放回队列头部,等待下次发送。

reenqueue的作用:将批次重新放回对应分区队列的头部,并重置lastAttemptMs,进入退避状态。这样,该批次会再次成为头部,在退避期结束后被重新发送。

9.7 顺序性保证与流量控制

9.7.1 分区内顺序

Kafka保证单个分区内消息的顺序,前提是:

  • 幂等性关闭:如果max.in.flight.requests.per.connection=1,则每次只能有一个未确认的请求,自然保证顺序。
  • 幂等性开启:即使max.in.flight.requests.per.connection>1(最大为5),Kafka通过序列号机制保证顺序和去重。Sender在发送批次时会分配递增的序列号,Broker端校验序列号连续性,乱序的请求会被拒绝。

Sender中,顺序性主要通过drain的“每次只取头部批次”以及幂等性下的序列号分配来保障。

9.7.2 流量控制:max.in.flight.requests.per.connection

该参数限制每个Broker连接上未确认请求的最大数量。在Sender中,当构建请求时,NetworkClient会检查该连接当前是否已达到上限,如果达到,后续请求将排队,不发送。这间接限制了drain出来的批次被立即发送的能力。

1
2
3
4
5
6
7
8
9
// NetworkClient.java
private void send(ClientRequest request, long now) {
String nodeId = request.destination();
if (connectionStates.canSendRequests(nodeId) && inFlightRequests.canSendMore(nodeId)) {
doSend(request, now);
} else {
this.pendingCompletion.complete(response);
}
}

9.8 幂等性与事务的支持

9.8.1 幂等性

enable.idempotence=true时,Sender在发送请求前需要为每个批次分配ProducerId和序列号。这些信息由TransactionManager管理。

1
2
3
4
5
6
7
8
9
10
// Sender.sendProduceRequest
if (transactionManager != null && transactionManager.isIdempotent()) {
// 设置PID、epoch和序列号到请求Builder中
requestBuilder.setProducerId(transactionManager.producerId());
requestBuilder.setProducerEpoch(transactionManager.producerEpoch());
// 每个分区第一个批次的序列号
for (ProducerBatch batch : batches) {
requestBuilder.setBaseSequence(tp, batch.baseSequence());
}
}
  • baseSequence:该批次中第一条消息的序列号。
  • 序列号在RecordAccumulator追加消息时由TransactionManager分配,每个分区独立递增。

9.8.2 事务

​ 事务生产者涉及TransactionManagerTransactionCoordinator的交互。Sender在事务开启、提交、中止阶段会发送特殊的EndTxnRequest。此外,事务内的普通ProduceRequest会带上transactionalId,Broker端会进行事务验证。

​ 在Sender.runOnce中,如果事务状态需要(如准备提交),也会调用transactionManager相关方法。

9.10 Sender线程与主线程的协作

  • 唤醒机制:当主线程通过accumulator.append新建批次或批次变满时,会调用sender.wakeup(),中断client.poll的阻塞,使Sender立即检查新数据。
  • 共享数据RecordAccumulator是主线程和Sender线程共享的,通过细粒度锁(队列锁)和并发容器(ConcurrentMap)保证安全。
  • 回调执行SendercompleteBatch中调用batch.done(),该方法遍历thunks列表,依次执行用户提供的Callback。这些回调在Sender线程中执行,因此不应包含耗时操作,以免阻塞I/O。

10 总结

经过对Kafka生产者源码的逐层剖析,我们从KafkaProducer的构造到Sender线程的循环,深入理解了每个组件的职责与协作方式。


深入理解kafka生产者
https://johnjoyjzw.github.io/2025/08/15/深入理解kafka生产者/
Author
JiangZW
Posted on
August 15, 2025
Licensed under