Kafka生产者
前言
Github:https://github.com/HealerJean
1、生产者描述
生产者的核心问题:
1、是否每个消息都很重要?
2、是否允许丢失一小部分消息?
3、偶尔出现重复消息是否可以接受?
4、是否有严格的延迟和吞吐量要求?
1.1、生产者发送消息的过程
①、我们从创建一个ProducerRecord
对象开始,ProducerRecord
对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord
对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
②、接下来,数据被传给分区器。如果之前在 ProducerRecord
对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 ProducerRecord
对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和 分区发送这条记录了。
③、紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker
上。
④、服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka
,就返回一个 RecordMetaData
对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入 失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还 是失败,就返回错误信息。
2、创建一个生产者
2.1、生产者属性设置
private static final String BROKER_LIST="localhost:9092";
private static final String TOPIC="HLJ_TOPIC_JAVA";
private static KafkaProducer<String,String> producer = null;
private static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
return properties;
}
static{
Properties configs = initConfig();
producer = new KafkaProducer<>(configs);
}
2.1.1、bootstrap.servers
指定
broker
的地址清单,表示Kafka
集群, 如果集群中有多台物理服务器 ,则服务器地址之间用逗号分。清单里不需要包含所有的
broker
地址,生产者会从给定的broker
里查找到其他broker
的信息。不过建议至少要 提供两个broker
的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
2.1.2、key.serializer
broker
希望接收到的消息的键和值都是字节数组。 生产者接口允许使用参数化类型,因此可以把Java
对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生 产者需要知道如何把这些 Java 对象转换成字节数组。
key.serializer
必须被设置为一 个实现了org.apache.kafka.common.serialization.Serializer
接口的类. 生产者会使 用这个类把键对象序列化成字节数组。如果你只使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器注意:
key. serializer
是必须设置的,就算你打算只发送值内容。
2.1.3、value.serializer
与
key.serializer
一样,value.serializer
指定的类会将值序列化。
2.1.4、acks
acks
参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。 这个参数对消息丢失的可能性有重要影响。该参数有如下选项。
2.1.4.1、acks=0
即无需确认,只要生产者把消息发出去即认为已成功把消息写入kafka
① KafkaProducer
意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka,不需要等待任何确认收到的消息,没有任何保障可以保证此种情况下server
已经成功接收到数据,同时重试配置也不会发生作用(因为KafkaProducer
并不知道此次发送是否失败)。
② 该情况,当数据已经发送出去,还在半路,此时leader
挂了,但是producer
还是认为消息发送成功了,这个时候就会导致这条消息丢失;
③ 数据可靠性是最低的,传输效率也是最高的,配合发送方式为生成并忘记效率很高
2.1.4.2、acks=1
(默认)
只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功 响应
① KafkaProducer
把消息发送出去,至少要等待leader
已经成功将数据写入本地log
,但是并没有等待所有follower
是否成功写入。该情况下,如果follower
没有成功备份数据,而此时leader
响应成功后,刚好又挂掉了,就会导致消息丢失
② KafkaProducer
把消息发送出去,如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。
2.1.4.3、acks=all
只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自 服务器的成功响应。这种模式是最安全的 ,不过,它的 延迟比
acks=1
时更高,因为我们要等待不只一个服务器节点接收消息。
ISR
1、
Partition Leader
会维护一个与其基本保持同步的Replica
列表,该列表称为ISR
(in-sync Replica
),每个Partition
都会有一个ISR
,而且是由leader
动态维护2、如果一个
flower
比一个leader
落后太多,或者超过一定时间未发起数据复制请求,则leader
将其重ISR
中移除3、当
ISR
中所有Replica
都向Leader发
送ACK
时,leader
才commit
acks=all
,这个意思就是说,Partition Leader
接收到消息之后,还必须要求ISR
列表里跟Leader
保持同步的那些Follower
都要把消息同步过去,才能认为这条消息是写入成功了。
@:如果说Partition Leader
刚接收到了消息,但是结果Follower
没有收到消息,此时Leader
宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。
@:此时可能Partition 2
的Follower
变成Leader
了,此时ISR
列表里只有最新的这个Follower
转变成的Leader
了,那么只要这个新的Leader
接收消息就算成功了。如果ISR
列表不仅仅只是这个新的leader
,则需要ISR
列表所有的follower
都成功同步消息,才算发送成功
@:还有一种情况,比如3副本,producer
发送消息过后,leader
成功接受到消息,follower1
也成功同步了leader
的数据,但是follower2
还未同步leader
数据,而此时leader
挂了,follower1
变成了新的leader
,producer
会重新发送消息,此时就会存在重复数据,如果是follower2
变成了新的leader
,就不会存在重复数据
问题:
1、ack=all
可能出现消息重复
答案:如上
2、acks=all
就代表数据一定不会丢失吗?
答案:不是
1、单机:如果partition
只有一个副本,也就是只有一个leader
,没有其他follower
,那么虽然他已经完成接受消息,但是接受消息过后宕机了,一样会导致数据丢失。
2、集群:假设现在往 Kafka
发送消息,分区的首领刚好崩溃,新的首领 正在选举当中,Kafka
会向生产者返回“首领不可用”的响应。在这个时候,如果生产者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失(重试次数/时间设置错误)。
①:所以acks=all
,必须跟 ISR
列表离至少有2
个以上的副本配合使用,也就是起码有一个leader
和一个follower
才行,这样才能保证发送一条消息,一定是2
个以上的副本都同步到数据才算是成功,此时任何一个副本宕机,都不会导致数据的丢失
②:要提高数据的可靠性,在设置acks=-1/all
的同时,也要设置min.insync.replicas
(默认值是1,可以在broker
或者topic
层面进行设置)这个参数的配合;min.insync.replicas
设置的是ISR
中的最小副本数是多少,当且仅当acks=all
时,这个参数才会生效。
③:重试次数和重试时间间隔要控制好。
2.1.5、buffer.memory
(默认32M)
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,
send()
方法调用要么被阻塞,要么抛出异常,取决于如何设置max.block.ms
参数,表示在抛出异常之前可以阻塞一段时间)
2.1.6、compression.type
(默认none
)
默认情况下,消息发送时不会被压缩(
MT: none
)。该参数可以设置为
snappy
、gzip
或lz4
,它指定了 消息被发送给broker
之前使用哪一种压缩算法进行压缩。使用压缩可以降低网络传输开销和 存储开销,而这往往是向Kafka
发送消息的瓶颈所在1、
snappy
压缩算法由CPU
,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网 络带宽,可以使用这种算法。2、
gzip
压缩算法一般会占用较多的 CPU,但会提供更高的压缩 比,所以如果网络带宽比较有限,可以使用这种算法。
2.1.7、retries
retries
参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会 放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms
,不过可以通过retry.backoff.ms
参数来改变这个时间间隔。1、建议在设置重试次数和重试时间间隔之前, 先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间), 让总的重试时间比
Kafka
集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。2、不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。
3、一般情 况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。 你只需要处理那些不可重试的错误或重试次数超出上限的情况。
2.1.8、batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指 定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。
当批次被填满, 批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送(达到发送时间)。所以就算把批次大小设置得很大, 也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
2.1.9、linger.ms
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。
KafkaProducer
会在 批次填满或linger.ms
达到上限时把批次发送出去。默认情况下,只要有可用的线程,生 产者就会把消息发送出去,就算批次里只有一个消息。把
linger.ms
设置成比 0 大的数, 让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延 迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。
2.1.10、client.id
该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
2.1.12、max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。
把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
2.1.12.1、顺序保证
Kafka
可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照 一定的顺序发送消息,broker
就会按照这个顺序把它们写入分区,消费者也 会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。例如,往 一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!不过, 有些场景对顺序不是很敏感。
如果把 retries
设为非零整数,同时把 max.in.flight.requests.per.connection
设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入 成功,broker
会重试写入第一个批次。如果此时第一个批次也写入成功,那 么两个批次的顺序就反过来了。
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是 很关键的,所以不建议把 retries
设为 0。可以把 max.in.flight.requests. per.connection
设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker
。不过这样会严重影响生产者的吞吐量,所以只有在 对消息的顺序有严格要求的情况下才能这么做。
2.1.13、timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
timeout.ms
指定了broker
等待同步副本返回消息确认的时间,与asks
的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么broker
就会返回 一个错误。
request.timeout.ms
指定了生产者在发送数据时等待服务器返回响应的时间(如果在超时超时之前没有收到响应,客户端将在需要时重新发送请求,或者在重试耗尽时失败请求。此值应大于replica.lag.time.max.ms
(分区副本失效时间)(broker
配置),以减少由于不必要的生产者重试而导致消息重复的可能性(我的理解:虽然超时,但是有可能已经发送成功了)。)
metadata.fetch.timeout.ms
指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器 返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。
2.1.14、max.block.ms
该参数指定了在调用
KafkaProducer.send()
方法或使用KafkaProducer.partitionsFor()
方法获取元数据时生产者的阻塞 时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在 塞时间达到
max.block.ms
时,生产者会抛出超时异常。用户提供的序列化器或分区器中的阻塞将不计入此超时时间 。
2.1.15、 max.request.size
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。
例如,假设这个值为
1MB
,那么可以发送的单个最大消息为1MB
,或者生产者可以在单个请求里发送一个批次,该批次包含了1000
个消息,每 个消息大小为1KB
。另外,
broker
对可接收的消息最大值也有自己的限制(message.max. bytes
),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。
2.1.16、receive.buffer.bytes
和 send.buffer.bytes
这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1, 就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心,那么可以 适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
2.2、发送消息的方式
2.2.1、发送并忘记
我们把消息发送给服务器(我的理解是:提交给线程去处理了),但并不关心它是否正常到达,本质上是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性:
大多数情况下,消息会正常到 达,因为
Kafka
是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候 也会丢失一些消息。
注释1:我们使用生产者的 send()
方法发送 ProducerRecord
对象。从生产者的架构图里可以看 到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。
注释2:send()
方法会返 回一个包含 RecordMetadata
的 Future
对象,不过因为我们会忽略返回值,所以无法知 道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录不太重要的应用程序日志。
注释3:我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消息之前,生产者还是有可能发生其他的异常。这些异常有可能是 SerializationException
(说明序列化消息失败)、BufferExhaustedException
或 TimeoutException
(说明缓冲区已满),又或者是 InterruptException
(说明发送线程被中断)。
/**
* 1、发送并忘记
*/
@Test
public void sendAndForget(){
String key = "Precision_Products";
String value = "sendAndForget Msg"
//注释1:
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
try {
// 注释1
producer.send(record);
} catch (Exception e) {
// 注释3
log.info("消息发送失败", e);
}
}
2.2.2、同步发送
我们使用
send()
方法发送消息,它会返回一个Future
对象,通过调用get()
方法进行等待,判断道消息是否发送成功。
注释1:在这里,producer.send()
方法先返回一个 Future
对象,然后调用 Future
对象的 get()
方法等待 Kafka
响应。如果服务器返回错误,get()
方法会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata
对象,可以用它获取消息的偏移量
注释2:如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker
返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出重试异常。另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer
不会进行任何重试,直接抛出异常。我们只是简单地把 异常信息打印出来。
@Test
public void sendSynchronize(){
String key = "Precision_Products";
String value = "sendSynchronize Msg" ;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
try {
// 注释1
RecordMetadata recordMetadata = producer.send(record).get();
log.info("消息发送成功,返回结果【{}】",recordMetadata);
} catch (Exception e) {
// 注释2
log.error("消息发送失败", e);
}
}
// 打印日志
// 2021-02-18 18:44:34 INFO -[ ]- 消息发送成功,返回结果【{"offset":0,"timestamp":1613645074360,"serializedKeySize":18,"serializedValueSize":25,"topicPartition":{"hash":-1133698772,"partition":0,"topic":"HLJ_TOPIC_JAVA"}}】 com.healerjean.proj.kafka.java.Producer.main[68]
// 2021-02-18 18:45:14 INFO -[ ]- 消息发送成功,返回结果【{"offset":1,"timestamp":1613645114891,"serializedKeySize":18,"serializedValueSize":25,"topicPartition":{"hash":-1133698772,"partition":0,"topic":"HLJ_TOPIC_JAVA"}}】 com.healerjean.proj.kafka.java.Producer.main[68]
2.2.3、异步发送
我们调用
send()
方法,并指定一个回调函数,消息以异步的方式发送,通过回调函数返回消息发送成功/失败
/**
* 3、异步发送
*/
@Test
public void sendAsync(){
String key = "Precision_Products";
String value = "sendAsync Msg" ;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
producer.send(record, (metadata, exception) -> {
log.info("消息发送成功,返回结果【{}】",metadata);
if(exception != null){
exception.printStackTrace();
}
});
}
3、序列化器
不建议使用序列化器,建议使用json或其他基本数据类型
4、分区
在之前的例子里,
ProducerRecord
对象包含了目标主题、键和值。,ProducerRecord
对象可以只包含目标主题和值,键可以设置为默认的null
,不过大多数应用程序会用到键。键有两个用途
1、可以作为消息的附加信息,
2、用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。
ProducerRecord<Integer, String> record = new ProducerRecord<>("Topic", "key", "value");
ProducerRecord<Integer, String> record = new ProducerRecord<>("Topic", "value");
@:如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用 的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。
@:如果键不为空,并且使用了默认的分区器,那么 Kafka
会对键进行散列(使用 Kafka
自己 的散列算法),然后根据散列值把消息映射到特定的分区上。
问题1:如果根据键分到的分区不可用,会发生什么呢?
答案:这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。
问题2:分区数量改变了,根据键会分到哪个分区呢?
答案:只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在 分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证 了——旧数据仍然留在分区 34,但新的记录可能被写到其他分区上。
综上所述:如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。
4.1、实现自定义分区策略
实现一个用于审计功能的分区策略:假设我们有两类消息,其中一类消息的key为audit,用于审计,放在最后一个分区中,其他消息在剩下的分区中随机分配。
我的理解:实现有些场景下消费者能力很强的情况/重点观察的消费者的消费情况
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
/***
*
* 实现一个自定义分区策略:
* key含有audit的一部分消息发送到最后一个分区上,其他消息在其他分区随机分配
*/
public class PartitionerImpl implements Partitioner {
private Random random;
public void configure(Map<String, ?> configs) {
//做必要的初始化工作
random = new Random();
}
//分区策略
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String keyObj = (String) key;
//根据topic获取分区信息
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
//获取分区数量
int partitionCount = partitionInfoList.size();
int auditPartition = partitionCount - 1;
return keyObj == null || "".equals(keyObj) || !keyObj.contains("audit") ? random.nextInt(partitionCount - 1) : auditPartition;
}
public void close() {
//清理工作
}
}
5、延迟消息的实现原理
延迟队列存储的是对应的延迟消息,所谓“延迟消息”是指当消息被发送之后,并不想让消费者马上拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
基于消息的延迟:指为每条消息设置不一样的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,固然这也会对性能形成极大的影响。
基于队列的延迟: 设置不一样延迟级别的队列,好比5s
、10s
、30s
、1min
、5mins
、10mins
等,每一个队列中消息的延迟时间都是相同的,这样免去了延迟排序所要承受的性能之苦,经过必定的扫描策略(好比定时)便可投递超时的消息。
5.1、时间轮
Kafka
中存在大量的延迟操作,比如延迟生产、延迟拉取以及延迟删除等。Kafka
并没有使用JDK自带的Timer
或者DelayQueue
来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer
)。JDK
的Timer
和DelayQueue
插入和删除操作的平均时间复杂度为O(nlog(n)
),并不能满足Kafka
的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)
。时间轮的应用并非Kafka
独有,其应用场景还有很多,在Netty
、Akka
、Quartz
、Zookeeper
等组件中都存在时间轮的踪影。
5.2、时间轮结构
Kafka
中的时间轮(TimingWheel
)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList
)。TimerTaskList
是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry
),其中封装了真正的定时任务TimerTask
。
基本时间跨度
tickMs
:时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs
)。时间格个数
wheelSize
:时间轮的时间格个数是固定的,可用wheelSize
来表示总体时间跨度(
interval
):可以通过公式tickMs
×wheelSize
计算得出。时间轮还有一个表盘指针(
currentTime
),用来表示时间轮当前所处的时间,currentTime
是tickMs
的整数倍。currentTime
可以将整个时间轮划分为到期部分和未到期部分,currentTime
当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList
的所有任务。
5.2.1、具体过程
时间轮状态:若时间轮的
tickMs
=1ms
,wheelSize
=20
,那么可以计算得出interval
为20ms
。初始情况下表盘指针currentTime
指向时间格0
,任务开始:
1、此时有一个定时为
2ms
的任务插入进来会存放到时间格为2
的TimerTaskList
中。2、随着时间的不断推移,指针
currentTime
不断向前推进,过了2ms
之后,当到达时间格2
时,就需要将时间格2
所对应的TimeTaskList
中的任务做相应的到期操作。3、此时若又有一个定时为
8ms
的任务插入进来,则会存放到时间格10
中,currentTime
再过8ms
后会指向时间格10
。4、如果同时有一个定时为
19ms
的任务插入进来怎么办?新来的TimerTaskEntry
会复用原来的TimerTaskList
,所以它会插入到原本已经到期的时间格1
中。5、总之,整个时间轮的总体跨度是不变的,随着指针
currentTime
的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime
和currentTime + interval
之间。
5.3、层级时间轮
接上,如果此时有个定时为
350ms
的任务该如何处理?直接扩充wheelSize
的大小么?Kafka
中不乏几万甚至几十万毫秒的定时任务,这个wheelSize
的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如
100
万毫秒,那么这个wheelSize
为100
万毫秒的时间轮不仅占用很大的内存空间,而且效率也会拉低。Kafka
为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。参考上图,复用之前的案例,
⬤ 第一层的时间轮
tickMs
=1ms
,wheelSize
=20
,interval
=20ms
。⬤ 第二层的时间轮的
tickMs
为第一层时间轮的interval
,即为20ms
。⬤ 每一层时间轮的
wheelSize
是固定的,都是20
,那么第二层的时间轮的总体时间跨度interval
为400ms
。以此类推,这个400ms
也是第三层的tickMs
的大小,第三层的时间轮的总体时间跨度为8000ms
。
5.3.1、具体过程
⬤ 对于之前所说的
350ms
的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入到第二层时间轮中时间格17
所对应的TimerTaskList
中。⬤ 如果此时又有一个定时为
450ms
的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格1的TimerTaskList
中。⬤ 注意到在到期时间在[
400ms
,800ms
) 区间的多个任务(比如446ms
、455ms
以及473ms
的定时任务)都会被放入到第三层时间轮的时间格1中,时间 格1
对应的TimerTaskList
的超时时间为400ms
。⬤ 随着时间的流逝,当次
TimerTaskList
到期之时,原本定时为450ms
的任务还剩下50ms
的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为50ms
的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为[40ms,60ms
)的时间格中。再经历了40ms
之后,此时这个任务又被“察觉”到,不过还剩余10ms
,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为[10ms,11ms
)的时间格中,之后再经历10ms
后,此任务真正到期,最终执行相应的到期操作。
5.4、Kafka
时间轮
在
Kafka
中第一层时间轮的参数同上面的案例一样:tickMs
=1ms
,wheelSize
=20
,interval
=20ms
,各个层级的wheelSize
也固定为20
,所以各个层级的tickMs
和interval
也可以相应的推算出来。Kafka
在具体实现时间轮TimingWheel
时还有一些小细节:
1、TimingWheel
在创建的时候以当前系统时间为第一层时间轮的起始时间( startMs
),这里的当前系统时间并没有简单的调用System.currentTimeMillis()
,而是调用了Time.SYSTEM.hiResClockMs
,这是因为currentTimeMillis()
方法的时间精度依赖于操作系统的具体实现,有些操作系统下并不能达到毫秒级的精度,而 Time.SYSTEM.hiResClockMs
实质上是采用了 System.nanoTime()/1_000_000
来将精度调整到毫秒级。也有其他的某些骚操作可以实现毫秒级的精度,但是笔者并不推荐,System.nanoTime()/1_000_000
是最有效的方法。(如对此有想法,可在留言区探讨。)
2、TimingWheel
中的每个双向环形链表 TimerTaskList
都会有一个哨兵节点(sentinel
),引入哨兵节点可以简化边界条件。哨兵节点也称为哑元节点(dummy node
),它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的方便而引入的。如果一个链表有哨兵节点的话,那么线性表的第一个元素应该是链表的第二个节点。
3、除了第一层时间轮,其余高层时间轮的起始时间(startMs)
都设置为创建此层时间轮时前面第一轮的currentTime
。每一层的currentTime
都必须是 tickMs
的整数倍,如果不满足则会将 currentTime
修剪为tickMs
的整数倍(向下取整),以此与时间轮中的时间格的到期时间范围对应起来。修剪方法为:currentTime = startMs - (startMs % tickMs)
。currentTime
会随着时间推移而推荐,但是不会改变为tickMs
的整数倍的既定事实。若某一时刻的时间为timeMs
,那么此时时间轮的currentTime
= timeMs
- (timeMs
% tickMs
),时间每推进一次,每个层级的时间轮的 currentTime
都会依据此公式推进。
4、Kafka
中的定时器只需持有 TimingWheel
的第一层时间轮的引用,并不会直接持有其他高层的时间轮,但是每一层时间轮都会有一个引用(overflowWheel
)指向更高一层的应用,以此层级调用而可以实现定时器间接持有各个层级时间轮的引用。
5.5、Kafka
中到底是怎么推进时间
从动画中可以注意到, 随着时间推进, 时间轮的指针循环往复地定格在每一个时间格上, 每一次都要判断当前定格的时间格里是不是有任务存在,其中有很多时间格都是没有任务的, 指针定格在这种空的时间格中, 就是一次”空推进”,比如说, 插入一个延时时间400s的任务, 指针就要执行399次”空推进”,无故空耗机器的性能资源, 这是一种浪费!
那么Kafka是怎么解决这个问题的呢?这就要从延迟队列DelayQueue开始讲起了!时间轮搭配延迟队列DelayQueue,(Timer、DelayQueue 和 ScheduledThreadPool,它们都是基于优先队列实现的,O(logn) 的时间复杂度在任务数多的情况下频繁的入队出队对性能来说有损耗。因此适合于任务数不多的情况)
答案:这里采用
DelayQueue
来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel
做最擅长的任务添加和删除操作,而用DelayQueue
做最擅长的时间推进工作,相辅相成。
1、而对于时间轮来说,它只需要往delayQueue
里面扔各种槽即可,比如我们的定时任务长短不一
2、而这个槽到期后,也就是被我们从delayQueue
中 poll
出来后,我们只需要将槽中的所有任务循环一次,重新加到新的槽中(添加失败则直接执行)即可。
3、Bucket
的设计让多个任务“合并”,使得同一个 bucket
的多次插入只需要在 delayQueue
中入队一次,同时减少了 delayQueue
中元素数量,堆的深度也减小,delayqueue
的插入和弹出操作开销也更小