Kafka生产者
前言
Github:https://github.com/HealerJean
一、生产者描述
生产者的核心问题:
1、是否每个消息都很重要?
2、是否允许丢失一小部分消息?
3、偶尔出现重复消息是否可以接受?
4、是否有严格的延迟和吞吐量要求?

1、生产者发送消息的过程
①、我们从创建一个ProducerRecord 对象开始,ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
②、接下来,数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和 分区发送这条记录了。
③、紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。
④、服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入 失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还 是失败,就返回错误信息。
2、创建一个生产者
private static final String BROKER_LIST="localhost:9092";
private static final String TOPIC="HLJ_TOPIC_JAVA";
private static KafkaProducer<String,String> producer = null;
private static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
return properties;
}
static{
Properties configs = initConfig();
producer = new KafkaProducer<>(configs);
}
1)bootstrap.servers
指定
broker的地址清单,表示Kafka集群, 如果集群中有多台物理服务器 ,则服务器地址之间用逗号分。清单里不需要包含所有的
broker地址,生产者会从给定的broker里查找到其他broker的信息。不过建议至少要 提供两个broker的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
2)key.serializer
broker希望接收到的消息的键和值都是字节数组。 生产者接口允许使用参数化类型,因此可以把Java对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生 产者需要知道如何把这些 Java 对象转换成字节数组。
key.serializer必须被设置为一 个实现了org.apache.kafka.common.serialization.Serializer接口的类. 生产者会使 用这个类把键对象序列化成字节数组。如果你只使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器注意:
key. serializer是必须设置的,就算你打算只发送值内容。
3)value.serializer
与
key.serializer一样,value.serializer指定的类会将值序列化。
4)acks
acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。 这个参数对消息丢失的可能性有重要影响。该参数有如下选项。
| 配置值 | 含义 | 数据可靠性 | 延迟 | 是否支持重试 |
|---|---|---|---|---|
0 |
不等待任何确认,发完即认为成功 | 最低(可能丢失) | 最低 | 否 |
1 |
Leader 写入本地 log 即返回成功 |
中等(Leader 宕机且未同步到 Follower 会丢) |
中等 | 是 |
all |
所有 ISR 中的副本 都写入成功 |
最高(配合 min.insync.replicas ≥ 2) |
最高 | 是 |
| 参数 | 作用 |
|---|---|
acks=all |
必须等所有 ISR 中的副本 都确认收到消息,才认为写入成功 |
min.insync.replicas |
一个分区的 ISR 列表至少要有 N 个副本,否则拒绝写入(即使 acks=all) |
a、关于 acks=all 是否能保证 不丢消息?
答案:不能 100% 保证,但可以做到“极高可靠性”——前提是配置得当。
情况 1:单副本 Topic
- 如果一个
Partition只有一个副本(即replication.factor=1),那么ISR中只有Leader。 - 即使
acks=all,也只需 Leader 写入成功 → 若 Leader 宕机且磁盘损坏,数据永久丢失。 - 解决方案:
replication.factor ≥ 3(生产环境推荐)
情况 2:ISR 缩减为 1(只剩 Leader)
-
假设原本 3 副本,但两个
Follower因网络或性能问题被踢出 ISR,ISR = [Leader] -
此时
acks=all实际等价于acks=1 -
若
Leader在写入后立即宕机,数据丢失 -
解决方案:设置
min.insync.replicas = 2(或更高)- 当
ISR副本数 < 2 时,Producer会收到NotEnoughReplicasException,拒绝写入,从而避免“看似成功实则危险”的情况
- 当
-
关键组合:只有当 至少 2 个副本成功写入,
Producer才认为成功;若不满足,直接报错,由Producer重试或告警。
acks = all
replication.factor >= 3
min.insync.replicas >= 2
情况 3:Producer 未正确处理异常
Kafka返回错误,如果Producer没有重试机制 或 重试次数不足,消息仍会丢失。- 解决方案:
- 启用
retries = Integer.MAX_VALUE - 设置合理的
retry.backoff.ms(如100ms) - 使用幂等
Producer(enable.idempotence=true)避免重复
- 启用
b、acks=all 会导致重复吗?
答案:会!但这是“可接受的代价”,属于 At-Least-Once 语义 的典型表现。
- 重复场景举例: 分布式系统中“可靠性 vs 精确一次”的经典权衡
Producer发送消息MLeader和部分Follower成功写入,但响应在返回途中丢失(网络分区)Producer超时,触发重试,再次发送 M- 新
Leader(原Follower)接收 M → 消息重复
-
如何解决重复?
-
业务层去重:使用唯一
ID+Redis/DB记录已处理消息 -
启用
Kafka幂等Producer(enable.idempotence=true)- 自动去重(基于
producer id + sequence number) - 保证 单
Partition的Exactly-Once写入
- 自动去重(基于
-
c、ISR 机制是什么
ISR的健康程度直接决定acks=all的实际安全性
ISR是 动态维护 的同步副本集合Follower落后太多(由replica.lag.time.max.ms控制,默认 30s)会被移出 ISR- 只有
ISR中的副本才有资格参与选举新Leader acks=all实际是 “等待ISR中所有副本ACK”
5)buffer.memory(默认32M)
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,
send()方法调用要么被阻塞,要么抛出异常,取决于如何设置max.block.ms参数,表示在抛出异常之前可以阻塞一段时间)
6)compression.type(默认none)
默认情况下,消息发送时不会被压缩(
MT: none)。该参数可以设置为
snappy、gzip或lz4,它指定了 消息被发送给broker之前使用哪一种压缩算法进行压缩。使用压缩可以降低网络传输开销和 存储开销,而这往往是向Kafka发送消息的瓶颈所在1、
snappy压缩算法由CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网 络带宽,可以使用这种算法。2、
gzip压缩算法一般会占用较多的 CPU,但会提供更高的压缩 比,所以如果网络带宽比较有限,可以使用这种算法。
7)retries
retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会 放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,不过可以通过retry.backoff.ms参数来改变这个时间间隔。
1、建议在设置重试次数和重试时间间隔之前, 先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间), 让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。
2、不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。
3、一般情 况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。 你只需要处理那些不可重试的错误或重试次数超出上限的情况。
8)batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指 定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。
当批次被填满, 批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送(达到发送时间)。所以就算把批次大小设置得很大, 也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
9)linger.ms
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。
KafkaProducer会在 批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生 产者就会把消息发送出去,就算批次里只有一个消息。把
linger.ms设置成比 0 大的数, 让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延 迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。
10)client.id
该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
11)max.in.flight.requests.per.connection :顺序保证
- 含义:每个
broker连接上,Producer在未收到响应前最多可以发送多少个“未确认”的请求(即“飞行中”的请求数)。 - 默认值:
5(Kafka 2.x+) - 影响:
- 值越大 → 并发度高 → 吞吐量高,但可能破坏顺序性(在重试时)
- 值为 1 → 串行发送 → 严格保序,但吞吐下降
a、为什么 max.in.flight > 1 + retries > 0 会导致乱序?
| 配置组合 | 是否保序? | 是否防重复? | 说明 |
|---|---|---|---|
max.in.flight=1, retries>0 |
是 | 否(除非业务去重) | 经典保序方案,但可能重复 |
max.in.flight=5, retries>0, 无幂等 |
否 | 否 | 可能乱序 + 重复 |
enable.idempotence=true |
是(单分区) | 是(单分区) | 推荐! 允许 max.in.flight ≤ 5 |
| 跨分区消息 | 不保证 | — | Kafka 只保证分区内有序 |
b、为什么 max.in.flight > 1 且 retries > 0 会导致乱序?
A:因为多个请求并发发送,失败的请求被重试时,可能晚于后续成功的请求写入,导致物理顺序与发送顺序不一致。
假设:
max.in.flight.requests.per.connection = 2retries = 3- 发送两条消息:
M1→M2(同一分区)
执行流程:Broker 中消息顺序为 **M2, M1 —— **顺序颠倒!
Producer发送M1(批次1)- 紧接着发送
M2(批次2)——因为允许 2 个in-flight M1发送失败(如Leader宕机),M2成功写入Producer重试M1M1最终也写入成功
b、如何保证严格顺序?
方案 1:max.in.flight.requests.per.connection = 1
- 每次只发一个请求,等
ACK后再发下一个 - 即使重试,也是按原顺序重试
- 天然保序
- 缺点:吞吐量显著下降(尤其网络延迟高时),
方案 2:启用 幂等 Producer(Idempotent Producer)(Kafka 0.11+)
- 设置:
enable.idempotence = true - 原理:
Kafka引入**Producer ID (PID)+Sequence Number机制,Broker会拒绝旧序列号的消息,从而避免乱序和重复。 - 自动设置: 这是现代
Kafka推荐的做法:既保序,又不牺牲太多吞吐max.in.flight.requests.per.connection ≤ 5(Kafka 2.1+ 放宽到 5)acks = allretries = MAX
- 在
max.in.flight ≤ 5的前提下,仍能保证单分区严格有序 + 无重复
c、PID + Sequence Number 是怎么工作的
| 问题 | 回答 |
|---|---|
Kafka 的 PID + Seq 能识别 1 年前的重复消息吗? |
不能 |
| 为什么不能? | PID 是会话级的,重启就变;Broker 不持久化旧 PID 的序列状态 |
| 幂等机制到底防什么? | 防 单次 Producer 运行期间因重试导致的重复/乱序(通常是秒级) |
| 如何防“历史数据”重复? | 必须靠 *业务唯一 ID + 消费端去重 |
enable.idempotence = true 到底 解决的是什么 |
防网络重试重复,不能解决消息本身重发 |
Kafka 从 0.11 版本 开始引入 幂等生产者(Idempotent Producer),其核心机制是:
- 每个 Producer 启动时,Broker 会分配一个唯一的 Producer ID
(PID`) - 对于该 Producer 发往 每个分区(
Partition) 的消息,都会带一个 单调递增的Sequence Number(从 0 开始) Broker为每个(PID, Partition)维护一个 高水位序列号(Last Sequence)
当 Broker 收到一条消息时,会检查:
- 如果
seq == lastSeq + 1→ 正常接收- 如果
seq <= lastSeq→ 重复消息,直接丢弃- 如果
seq > lastSeq + 1→ 乱序或缺失,抛出异常(Out of Order Sequence)
这样就实现了:同一 Producer 对同一分区的消息,不会重复、不会乱序。
12)timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
timeout.ms指定了broker等待同步副本返回消息确认的时间,与asks的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么broker就会返回 一个错误。
request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间(如果在超时超时之前没有收到响应,客户端将在需要时重新发送请求,或者在重试耗尽时失败请求。此值应大于replica.lag.time.max.ms(分区副本失效时间)(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性(我的理解:虽然超时,但是有可能已经发送成功了)。)
metadata.fetch.timeout.ms指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器 返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。
13)max.block.ms
该参数指定了在调用
KafkaProducer.send()方法或使用KafkaProducer.partitionsFor()方法获取元数据时生产者的阻塞 时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在 塞时间达到
max.block.ms时,生产者会抛出超时异常。用户提供的序列化器或分区器中的阻塞将不计入此超时时间 。
14) max.request.size
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。
例如,假设这个值为
1MB,那么可以发送的单个最大消息为1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了1000个消息,每 个消息大小为1KB。另外,
broker对可接收的消息最大值也有自己的限制(message.max. bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。
15)receive.buffer.bytes 和 send.buffer.bytes
这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1, 就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心,那么可以 适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
3、发送消息的方式
| 方式 | 调用方式 | 是否阻塞 | 能否获取结果 | 能否处理异常 | 吞吐量 | 可靠性 | 适用场景 |
|---|---|---|---|---|---|---|---|
| 发送并忘记 | send(record) |
不阻塞 | 不能 | 仅捕获发送前异常 | 最高 | 最低 | 可容忍丢失的数据 |
| 同步发送 | send(record).get() |
阻塞 | 能(RecordMetadata) |
能(捕获所有异常) | 较低 | 高 | 对可靠性要求高、且能接受低吞吐的场景 |
| 异步发送 | send(record, callback) |
不阻塞 | 能(回调) | 能(回调处理 ) | 高 | 高 | 推荐! 兼顾吞吐与可靠性 |
- 要高可靠?→
acks=all+ 必须用同步或异步获取结果 -
要高吞吐?→ 可降低
acks,但仍建议用异步回调监控失败率 acks= 你要求快递公司做到什么程度才算“送达成功”?acks=0:投进邮筒就算成功(不管是否寄出)acks=1:快递员签收就算成功acks=all:收件人 + 备份联系人都签收才算成功
- 发送方式 = 你如何知道快递结果?
- 发送并忘记:寄完就走,不查物流
- 同步发送:站在快递站等到系统显示“已签收”
- 异步发送:留下手机号,快递公司短信通知你结果
1)发送并忘记
优点:
- 极简代码
- 最大吞吐(无等待、无回调开销)
风险:
- 无法知道消息是否真正写入 Kafka
- 即使配置了
retries > 0,如果最终重试失败,你也收不到通知 - 只能捕获客户端本地异常(如序列化失败、缓冲区满),无法捕获 Broker 端错误(如 Leader 不可用、磁盘满)
// 记录用户点击流、系统日志、埋点数据等
producer.send(new ProducerRecord<>("user-clicks", userId, clickEvent));
// 不关心是否成功,丢了也无所谓
2)同步发送
-
优点:
-
强一致性反馈:明确知道成功/失败
-
可获取
offset、timestamp、partition等元数据
-
-
缺点:
-
严重降低吞吐:每条消息都要等网络往返(RTT)
-
容易成为性能瓶颈(尤其高并发时)
-
-
适用场景:
-
极少数对顺序+可靠性有极端要求的场景
-
测试或调试时验证消息是否写入
-
@Test
public void sendSynchronize(){
String key = "Precision_Products";
String value = "sendSynchronize Msg" ;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
try {
// 注释1
RecordMetadata recordMetadata = producer.send(record).get();
log.info("消息发送成功,返回结果【{}】",recordMetadata);
} catch (Exception e) {
// 注释2
log.error("消息发送失败", e);
}
}
// 打印日志
// 2021-02-18 18:44:34 INFO -[ ]- 消息发送成功,返回结果【{"offset":0,"timestamp":1613645074360,"serializedKeySize":18,"serializedValueSize":25,"topicPartition":{"hash":-1133698772,"partition":0,"topic":"HLJ_TOPIC_JAVA"}}】 com.healerjean.proj.kafka.java.Producer.main[68]
// 2021-02-18 18:45:14 INFO -[ ]- 消息发送成功,返回结果【{"offset":1,"timestamp":1613645114891,"serializedKeySize":18,"serializedValueSize":25,"topicPartition":{"hash":-1133698772,"partition":0,"topic":"HLJ_TOPIC_JAVA"}}】 com.healerjean.proj.kafka.java.Producer.main[68]
3)异步发送
-
优点:
-
高吞吐:消息批量发送、后台 I/O 线程处理
-
可靠反馈:通过回调知道每条消息的结果
-
灵活异常处理:可在回调中重试、记录日志、告警等
-
-
关键注意:
-
回调函数在
Kafka内部I/O线程中执行 → 不要做耗时操作(如 DB 写入、复杂计算),否则会阻塞整个 Producer 的发送线程! -
正确做法:在回调中只做轻量操作(如打日志、计数、放入队列由其他线程处理)
-
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 轻量处理:记录错误、增加失败计数器、触发告警
log.error("消息发送失败, key: {}, error: {}", record.key(), exception.getMessage());
metrics.incrementFailedCount();
} else {
log.debug("消息发送成功, offset: {}", metadata.offset());
}
});
二、序列化器
不建议使用序列化器,建议使用
json或其他基本数据类型
三、分区
在之前的例子里,
ProducerRecord对象包含了目标主题、键和值。,ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的null,不过大多数应用程序会用到键。键有两个用途
1、可以作为消息的附加信息,
2、用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。
1、Kafka 分区的核心逻辑
1)键(Key)的作用
- 语义信息:如用户 ID、订单号,供消费者做业务处理
- 分区路由:相同
key→ 相同partition(保证分区内有序)
2)默认分区器(DefaultPartitioner)行为
| Key 情况 | 分区策略 |
|---|---|
key == null |
轮询(Round Robin) 到所有可用分区(注意:不是随机!Kafka 2.4+ 是 sticky 轮询,避免抖动) |
key != null |
Math.abs(Utils.murmur2(keyBytes)) % numPartitions → 固定映射到某一分区 |
ProducerRecord<Integer, String> record = new ProducerRecord<>("Topic", "key", "value");
ProducerRecord<Integer, String> record = new ProducerRecord<>("Topic", "value");
2、实现自定义分区策略
实现一个用于审计功能的分区策略:假设我们有两类消息,其中一类消息的
key为audit,用于审计,放在最后一个分区中,其他消息在剩下的分区中随机分配。我的理解:实现有些场景下消费者能力很强的情况/重点观察的消费者的消费情况
在 Producer 配置中指定:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, AuditPartitioner.class.getName());
public class AuditPartitioner implements Partitioner {
private final Random random = new Random();
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);
int partitionCount = partitions.size();
if (partitionCount <= 1) {
return 0;
}
// 处理 null key
if (key == null) {
return random.nextInt(partitionCount - 1); // 非审计分区
}
String keyStr = key.toString();
int auditPartition = partitionCount - 1;
if (keyStr.contains("audit")) {
return auditPartition;
} else {
// 随机分配到 [0, partitionCount-2]
return random.nextInt(partitionCount - 1);
}
}
@Override
public void close() {}
}
3、FQA
a、分区数量改变后,key 的映射会变吗?
答案:只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在 分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证 了——旧数据仍然留在分区 34,但新的记录可能被写到其他分区上。
综上所述:
- 分区数增加 →
numPartitions变大 →hash(key) % new_num结果可能不同 - 后果:
- 新消息可能进入新分区
- 旧消息仍在原分区
- 同一个
key的消息分散在多个分区 → 消费者无法保证全局有序!
-
应对策略:
-
初始规划足够分区数(如预估 3~5 年流量,设 32/64/128 分区)
-
避免频繁扩分区
-
若必须扩分区,接受“新旧数据分区不一致”的事实,业务层兼容
-
b、根据 key 映射到的分区不可用,会发生什么?
答案:Producer 会尝试向该分区发送消息,但因 Leader 不可用而失败;随后根据配置进行重试(如 retries > 0),直到成功或达到重试上限。期间不会将消息重定向到其他分区——即使其他分区可用。
最终结果
- 成功:新
Leader上线,消息写入 → 保证key的分区一致性 & 顺序性 - 失败:重试耗尽或长时间无
Leader→ 抛出异常,由应用处理(如记录日志、告警
c、为什么不能自动换分区?
答案:这是 Kafka 刻意设计 的核心原则:“相同 key 的消息必须始终进入同一分区” ——这是实现 分区内有序消费 和 状态聚合(如 Kafka Streams) 的基础。
| 问题 | 回答 |
|---|---|
| 分区不可用时,会发到其他分区吗? | 不会! 必须发到原分区 |
| Producer 会怎么办? | 重试原分区,直到成功或失败 |
| 是否影响可靠性? | 副本健康 + 配置合理(acks=all, min.insync.replicas=2),最终可恢复,不丢数据 |
| 应用需要做什么? | 处理发送异常(尤其在重试耗尽时),但不要手动改 key 或分区逻辑 |
d、如何估算合适的分区数
四、延迟消息的实现原理
延迟队列存储的是对应的延迟消息,所谓“延迟消息”是指当消息被发送之后,并不想让消费者马上拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
基于消息的延迟:指为每条消息设置不一样的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,固然这也会对性能形成极大的影响。
基于队列的延迟: 设置不一样延迟级别的队列,好比5s、10s、30s、1min、5mins、10mins等,每一个队列中消息的延迟时间都是相同的,这样免去了延迟排序所要承受的性能之苦,经过必定的扫描策略(好比定时)便可投递超时的消息。
1、时间轮
Kafka中存在大量的延迟操作,比如延迟生产、延迟拉取以及延迟删除等。Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。
1)时间轮结构
Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。

基本时间跨度
tickMs:时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间格个数
wheelSize:时间轮的时间格个数是固定的,可用wheelSize来表示总体时间跨度(
interval):可以通过公式tickMs×wheelSize计算得出。时间轮还有一个表盘指针(
currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList的所有任务。
2)具体过程
时间轮状态:若时间轮的
tickMs=1ms,wheelSize=20,那么可以计算得出interval为20ms。初始情况下表盘指针currentTime指向时间格0,任务开始:
1、此时有一个定时为
2ms的任务插入进来会存放到时间格为2的TimerTaskList中。2、随着时间的不断推移,指针
currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2所对应的TimeTaskList中的任务做相应的到期操作。3、此时若又有一个定时为
8ms的任务插入进来,则会存放到时间格10中,currentTime再过8ms后会指向时间格10。4、如果同时有一个定时为
19ms的任务插入进来怎么办?新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入到原本已经到期的时间格1中。5、总之,整个时间轮的总体跨度是不变的,随着指针
currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime和currentTime + interval之间。
3)层级时间轮
接上,如果此时有个定时为
350ms的任务该如何处理?直接扩充wheelSize的大小么?Kafka中不乏几万甚至几十万毫秒的定时任务,这个wheelSize的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如
100万毫秒,那么这个wheelSize为100万毫秒的时间轮不仅占用很大的内存空间,而且效率也会拉低。Kafka为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。参考上图,复用之前的案例,
⬤ 第一层的时间轮
tickMs=1ms,wheelSize=20,interval=20ms。⬤ 第二层的时间轮的
tickMs为第一层时间轮的interval,即为20ms。⬤ 每一层时间轮的
wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。

4)具体过程
⬤ 对于之前所说的
350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入到第二层时间轮中时间格17所对应的TimerTaskList中。⬤ 如果此时又有一个定时为
450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格1的TimerTaskList中。⬤ 注意到在到期时间在[
400ms,800ms) 区间的多个任务(比如446ms、455ms以及473ms的定时任务)都会被放入到第三层时间轮的时间格1中,时间 格1对应的TimerTaskList的超时时间为400ms。⬤ 随着时间的流逝,当次
TimerTaskList到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为50ms的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms)的时间格中。再经历了40ms之后,此时这个任务又被“察觉”到,不过还剩余10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms)的时间格中,之后再经历10ms后,此任务真正到期,最终执行相应的到期操作。
2、Kafka 时间轮
在
Kafka中第一层时间轮的参数同上面的案例一样:tickMs=1ms,wheelSize=20,interval=20ms,各个层级的wheelSize也固定为20,所以各个层级的tickMs和interval也可以相应的推算出来。Kafka在具体实现时间轮TimingWheel时还有一些小细节:
1、TimingWheel 在创建的时候以当前系统时间为第一层时间轮的起始时间( startMs),这里的当前系统时间并没有简单的调用System.currentTimeMillis(),而是调用了Time.SYSTEM.hiResClockMs,这是因为currentTimeMillis() 方法的时间精度依赖于操作系统的具体实现,有些操作系统下并不能达到毫秒级的精度,而 Time.SYSTEM.hiResClockMs 实质上是采用了 System.nanoTime()/1_000_000来将精度调整到毫秒级。也有其他的某些骚操作可以实现毫秒级的精度,但是笔者并不推荐,System.nanoTime()/1_000_000是最有效的方法。(如对此有想法,可在留言区探讨。)
2、TimingWheel 中的每个双向环形链表 TimerTaskList 都会有一个哨兵节点(sentinel),引入哨兵节点可以简化边界条件。哨兵节点也称为哑元节点(dummy node),它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的方便而引入的。如果一个链表有哨兵节点的话,那么线性表的第一个元素应该是链表的第二个节点。
3、除了第一层时间轮,其余高层时间轮的起始时间(startMs)都设置为创建此层时间轮时前面第一轮的currentTime。每一层的currentTime都必须是 tickMs的整数倍,如果不满足则会将 currentTime修剪为tickMs的整数倍(向下取整),以此与时间轮中的时间格的到期时间范围对应起来。修剪方法为:currentTime = startMs - (startMs % tickMs)。currentTime会随着时间推移而推荐,但是不会改变为tickMs的整数倍的既定事实。若某一时刻的时间为timeMs,那么此时时间轮的currentTime = timeMs - (timeMs % tickMs),时间每推进一次,每个层级的时间轮的 currentTime都会依据此公式推进。
4、Kafka 中的定时器只需持有 TimingWheel 的第一层时间轮的引用,并不会直接持有其他高层的时间轮,但是每一层时间轮都会有一个引用(overflowWheel)指向更高一层的应用,以此层级调用而可以实现定时器间接持有各个层级时间轮的引用。
3、Kafka中到底是怎么推进时间
从动画中可以注意到, 随着时间推进, 时间轮的指针循环往复地定格在每一个时间格上, 每一次都要判断当前定格的时间格里是不是有任务存在,其中有很多时间格都是没有任务的, 指针定格在这种空的时间格中, 就是一次”空推进”,比如说, 插入一个延时时间400s的任务, 指针就要执行399次”空推进”,无故空耗机器的性能资源, 这是一种浪费!
那么Kafka是怎么解决这个问题的呢?这就要从延迟队列DelayQueue开始讲起了!时间轮搭配延迟队列DelayQueue,(Timer、DelayQueue 和 ScheduledThreadPool,它们都是基于优先队列实现的,O(logn) 的时间复杂度在任务数多的情况下频繁的入队出队对性能来说有损耗。因此适合于任务数不多的情况)
答案:这里采用
DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。
1、而对于时间轮来说,它只需要往delayQueue里面扔各种槽即可,比如我们的定时任务长短不一
2、而这个槽到期后,也就是被我们从delayQueue 中 poll 出来后,我们只需要将槽中的所有任务循环一次,重新加到新的槽中(添加失败则直接执行)即可。
3、Bucket 的设计让多个任务“合并”,使得同一个 bucket 的多次插入只需要在 delayQueue 中入队一次,同时减少了 delayQueue 中元素数量,堆的深度也减小,delayqueue 的插入和弹出操作开销也更小



