前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、消费者和消费者群组

1.1、消费者组的出现

假设我们有一个应用程序需要从一个 Kafka 主题读取消息并验证这些消息,然后再把它们保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息 并保存结果。

过了一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速 度,这个时候该怎么办?

如果只使用单个消费者处理消息,应用程序会远跟不上消息生成 的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的主题 写入消息一样,我们也可以使用多个消费者从同一个主题读取消息,对消息进行分流。

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。

1.1.1、4个分区1个消费者

假设主题 T1 有 4 个分区,我们创建了消费者 C1,它是群组 G1 里唯一的消费者,我们用 它订阅主题 T1。消费者 C1 将收到主题 T1 全部 4 个分区的消息,

结果:1 个消费者收到 4 个分区的消息

image-20210222110927283

1.1.2、4个分区2个消费者

如果在群组 G1 里新增一个消费者 C2,那么每个消费者将分别从两个分区接收消息。我们 假设消费者 C1 接收分区 0 和分区 2 的消息,消费者 C2 接收分区 1 和分区 3 的消息,

image-20210222111046873

1.1.3、4个分区4个消费者

如果群组 G1 有 4 个消费者,那么每个消费者可以分配到一个分区,

image-20210222111114881

1.1.4、4个分区4个以上消费者

如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被 闲置,不会接收到任何消息

image-20210222111140322

1.1.5、4个分区两个消费者组

image-20210222112019401

1.2、消费者群组和分区再均衡

再均衡:分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡

再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者), 不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用

当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

群组里的消费者共同读取主题的分区。

1、一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。

2、当一个消费者被关闭或发生崩溃 时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。

消费者崩溃挂掉之后的再均衡

消费者通过向被指派为群组协调器的 broker发送心跳 来维持它们和群组的从属关系以及它们对分区的所有权关系。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳

1、只要消费者以正常的时间 间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。

2、如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。

在清理消费者 时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。

3、在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配

1.2.1、分配分区是怎样的一个过程

当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一 个加入群组的消费者将成为“群主”。

群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor 接口的类来决定哪些分区应该被分配给哪个消费者。

Kafka 内置了两种分配策略。分配 完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

2、创建Kafka消费者

2.1、消费者属性设置

    private static final String TOPIC="HLJ_TOPIC_JAVA";
    private static final String BROKER_LIST="localhost:9092";
    private static final String GROUP_ID="HTEST_GROUP";

    private static KafkaConsumer<String,String> kafkaConsumer = null;

    static {
        Properties properties = initConfig();
        kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC));
    }

    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        return properties;
    }

2.1.1、bootstrap.servers

指定 broker 的地址清单,表示 Kafka 集群, 如果集群中有多台物理服务器 ,则服务器地址之间用逗号分。

清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要 提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

2.1.2、group.id

表示消费者的分组 ID,group.id 不是必需的,但是一般不会这么干的

2.1.3、key.deserializervalue.deserializer

key.deserializer value.deserializer 与生产者的 `serializer 定义也很类似,不过它们不是使 用指定的类把 Java 对象转成字节数组,而是使用指定的类把字节数组转成 Java 对象。

2.2、消息轮询

消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据

一旦消费者订阅 了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据, 开发者只需要使用一组简单的 API 来处理从分区返回的数据。消费者代码的主要部分如下 所示:

分配的分区。如果发生了再均衡,整个过程也是 在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间 所做的任何处理工作都应该尽快完成。

注释1:这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据(消费者必须持续对 Kafka 进行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者)。稍后我们会介绍如何退出循环,并关闭消费者。 注释2:传给 poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0,poll() 会立即返回,否则 它会在指定的毫秒数内一直等待 broker 返回数据。poll() 方法有一个超时参数,它指定了方法在多久之后可以返回, 不管有没有可用的数据都要返回。超时时间的设置取决于应用程序对响应速度的要求, 比如要在多长时间内把控制权归还给执行轮询的线程。

⚫ 注释3:poll() 方法返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐 条处理这些记录。 注释4:在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭, 并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡, 因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。



@Test
public void consumer(){
  try {
    //注释1
    while (true) {
      //注释2
      ConsumerRecords<String, String> records = consumer.poll(100);
      //注释3
      for (ConsumerRecord<String, String> record : records) {
        log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", record.partition(), record.offset(), record.key(), record.value());
      }
    }
  } catch (Exception e) {
    log.error("消费者处理数据失败", e);
  } finally {
    //注解4
    consumer.close();
  }
}

ContactAuthor