Kafka消费者
前言
Github:https://github.com/HealerJean
1、消费者和消费者群组
1.1、消费者组的出现
假设我们有一个应用程序需要从一个
Kafka
主题读取消息并验证这些消息,然后再把它们保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息 并保存结果。过了一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速 度,这个时候该怎么办?
如果只使用单个消费者处理消息,应用程序会远跟不上消息生成 的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的主题 写入消息一样,我们也可以使用多个消费者从同一个主题读取消息,对消息进行分流。
Kafka
消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
1.1.1、4个分区1个消费者
假设主题 T1 有 4 个分区,我们创建了消费者 C1,它是群组 G1 里唯一的消费者,我们用 它订阅主题 T1。消费者 C1 将收到主题 T1 全部 4 个分区的消息,
结果:1 个消费者收到 4 个分区的消息
1.1.2、4个分区2个消费者
如果在群组 G1 里新增一个消费者 C2,那么每个消费者将分别从两个分区接收消息。我们 假设消费者 C1 接收分区 0 和分区 2 的消息,消费者 C2 接收分区 1 和分区 3 的消息,
1.1.3、4个分区4个消费者
如果群组 G1 有 4 个消费者,那么每个消费者可以分配到一个分区,
1.1.4、4个分区4个以上消费者
如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被 闲置,不会接收到任何消息,
1.1.5、4个分区两个消费者组
1.2、消费者群组和分区再均衡
再均衡:分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡
再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者), 不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。
当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
群组里的消费者共同读取主题的分区。
1、一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。
2、当一个消费者被关闭或发生崩溃 时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。
消费者崩溃挂掉之后的再均衡
消费者通过向被指派为群组协调器的
broker
发送心跳 来维持它们和群组的从属关系以及它们对分区的所有权关系。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。1、只要消费者以正常的时间 间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。
2、如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。
在清理消费者 时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
3、在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配
1.2.1、分配分区是怎样的一个过程
当消费者要加入群组时,它会向群组协调器发送一个
JoinGroup
请求。第一 个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。它使用一个实现了
PartitionAssignor
接口的类来决定哪些分区应该被分配给哪个消费者。
Kafka
内置了两种分配策略。分配 完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
2、创建Kafka消费者
2.1、消费者属性设置
private static final String TOPIC="HLJ_TOPIC_JAVA";
private static final String BROKER_LIST="localhost:9092";
private static final String GROUP_ID="HTEST_GROUP";
private static KafkaConsumer<String,String> kafkaConsumer = null;
static {
Properties properties = initConfig();
kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singletonList(TOPIC));
}
private static Properties initConfig(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
return properties;
}
2.1.1、bootstrap.servers
指定
broker
的地址清单,表示Kafka
集群, 如果集群中有多台物理服务器 ,则服务器地址之间用逗号分。清单里不需要包含所有的
broker
地址,生产者会从给定的broker
里查找到其他broker
的信息。不过建议至少要 提供两个broker
的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
2.1.2、group.id
表示消费者的分组 ID,
group.id
不是必需的,但是一般不会这么干的
2.1.3、key.deserializer
和value.deserializer
key.deserializer
和value.deserializer
与生产者的 `serializer 定义也很类似,不过它们不是使 用指定的类把 Java 对象转成字节数组,而是使用指定的类把字节数组转成 Java 对象。
2.2、消息轮询
消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据。
一旦消费者订阅 了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据, 开发者只需要使用一组简单的 API 来处理从分区返回的数据。消费者代码的主要部分如下 所示:
分配的分区。如果发生了再均衡,整个过程也是 在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间 所做的任何处理工作都应该尽快完成。
注释1:这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据(消费者必须持续对 Kafka 进行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者)。稍后我们会介绍如何退出循环,并关闭消费者。 注释2:传给 poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0,poll() 会立即返回,否则 它会在指定的毫秒数内一直等待 broker 返回数据。poll() 方法有一个超时参数,它指定了方法在多久之后可以返回, 不管有没有可用的数据都要返回。超时时间的设置取决于应用程序对响应速度的要求, 比如要在多长时间内把控制权归还给执行轮询的线程。
⚫ 注释3:poll() 方法返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐 条处理这些记录。 注释4:在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭, 并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡, 因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。
@Test
public void consumer(){
try {
//注释1
while (true) {
//注释2
ConsumerRecords<String, String> records = consumer.poll(100);
//注释3
for (ConsumerRecord<String, String> record : records) {
log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
log.error("消费者处理数据失败", e);
} finally {
//注解4
consumer.close();
}
}
⚫