前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、消费者和消费者群组

1.1、消费者组的出现

假设我们有一个应用程序需要从一个 Kafka 主题读取消息并验证这些消息,然后再把它们保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息 并保存结果。

过了一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速 度,这个时候该怎么办?

如果只使用单个消费者处理消息,应用程序会远跟不上消息生成 的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的主题 写入消息一样,我们也可以使用多个消费者从同一个主题读取消息,对消息进行分流。

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。

1.1.1、4 个分区 1 个消费者

假设主题 T14 个分区,我们创建了消费者 C1,它是群组 G1 里唯一的消费者,我们用 它订阅主题 T1。消费者 C1 将收到主题 T1 全部 4 个分区的消息,

结果:1 个消费者收到 4 个分区的消息

image-20210222110927283

1.1.2、4 个分区 2 个消费者

如果在群组 G1 里新增一个消费者 C2,那么每个消费者将分别从两个分区接收消息。我们 假设消费者 C1 接收分区 0 和分区 2 的消息,消费者 C2 接收分区 1 和分区 3 的消息,

image-20210222111046873

1.1.3、4 个分区 4 个消费者

如果群组 G1 有 4 个消费者,那么每个消费者可以分配到一个分区,

image-20210222111114881

1.1.4、4个分区 4 个以上消费者

如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被 闲置,不会接收到任何消息

image-20210222111140322

1.1.5、4个分区两个消费者组

image-20210222112019401

1.2、消费者群组和分区再均衡

再均衡:分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡

再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者), 不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用

当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

群组里的消费者共同读取主题的分区。

1、一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。

2、当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。

消费者崩溃挂掉之后的再均衡

消费者通过向被指派为群组协调器的 broker 发送心跳 来维持它们和群组的从属关系以及它们对分区的所有权关系。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳

1、只要消费者以正常的时间 间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。

2、如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。

在清理消费者 时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。

3、在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配

1.2.1、分配分区是怎样的一个过程

当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一 个加入群组的消费者将成为“群主”。

群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor 接口的类来决定哪些分区应该被分配给哪个消费者。

Kafka 内置了两种分配策略。分配 完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

2、创建 Kafka 消费者

2.1、消费者属性设置

    private static final String TOPIC="HLJ_TOPIC_JAVA";
    private static final String BROKER_LIST="localhost:9092";
    private static final String GROUP_ID="HTEST_GROUP";

    private static KafkaConsumer<String,String> kafkaConsumer = null;

    static {
        Properties properties = initConfig();
        kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC));
    }

    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        return properties;
    }

2.1.1、bootstrap.servers

指定 broker 的地址清单,表示 Kafka 集群, 如果集群中有多台物理服务器 ,则服务器地址之间用逗号分。

清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要 提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

2.1.2、group.id

表示消费者的分组 ID,group.id 不是必需的,但是一般不会这么干的

2.1.3、key.deserializervalue.deserializer

key.deserializer value.deserializer 与生产者的 `serializer 定义也很类似,不过它们不是使 用指定的类把 Java 对象转成字节数组,而是使用指定的类把字节数组转成 Java 对象。

2.1.4、fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。

broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。

如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

2.1.5、fetch.max.wait.ms(默认500ms)

我们通过fetch.min.bytes 告诉Kafka,等到有足够的数据时才把它返回给消费者。而feth.max.wait.ms 则用于指定broker 的等待时间,默认是 500ms。 如果没有足够的数据流入Kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms 的延迟。如果要降低潜在的延迟,可以把该参数值设置得小一些。

如果fetch.max.wait.ms 被设为100ms,并且fetch.min.bytes 被设为1MB,那么Kafka 在收到消费者的请求后,要么返回1MB 数据,要么在100ms 后返回所有可用的数据,就看哪个条件先得到满足。

2.1.6、max.partition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes 指定的字节。如果一个主题有20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。 同时,在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区区。

KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节

注意:

max.partition.fetch.bytes 的值必须比broker 能够接收的最大消息的字节数(通过max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。因为此值设置的过小,大消息压根就不会接收

在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll() 方法来避免会话过期和发生分区再均衡,如果单次调用poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把max.partition.fetch.bytes 值改小,或者延长会话过期时间

2.1.7、session.timeout.ms

该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s

如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。

该属性与heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了poll() 方法向协调器发送心跳的频率

session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比session.timeout.ms 小,一般是session.timeout.ms 的三分之一。 如果session.timeout.ms 是3s,那么heartbeat.interval.ms 应该是1s。

session.timeout.ms 值设得比默认值小, 可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。

2.1.8、auto.offset.reset(默认latest

该属性指定了消费者在读取一个没有偏移量的分区((比如消费者第 1 次 启动时))或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。

1、如果CURRENT-OFFSET不是为unknown(消费者以前消费过数据,提交过offset),重启消费者时earliestlatestnone都是会从CURRENT-OFFSET一直消费到LOG-END-OFFSET

2、如果CURRENT-OFFSETunknown,重启消费者时earliestlatestnone才会展现出他们各自的不同

earliest :会从该分区当前最开始的 offset 消息开始消费(即从头消费),如果最开始的消息offset0,那么消费者的offset就会被更新为0

latest : 只消费当前消费者启动完成后生产者新生产的数据。旧数据不会再消费

none :启动消费者时,该消费者所消费的主题的分区没有被消费过,就会抛异常

2.1.9、enable.auto.commit(默认true)

该属性指定了消费者是否自动提交偏移量,默认值是true

⬤ 为了尽量避免出现重复数据和数据丢失,设为false,由自己控制何时提交偏移量。

⬤ 如果把它设为true,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。

2.1.10、partition.assignment.strategy

我们知道,分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。

假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。

Range :该策略会把主题的若干个连续的分区分配给消费者。

因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。

消费者 分区
C1 T1(0, 1) T2 (0, 1)
C2 T1(2) T2(2)

RoundRobin该策略把主题的所有分区逐个分配给消费者

一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。

消费者 分区
C1 T1(0 ,2) T2 (1)
C2 T1(1) T2(0, 2)

自定义分区策略partition.assignment.strategy 属性的值就是自定义类的名字。

2.1.11、client.id

该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。

2.1.12、max.poll.records (默认值500)

kafka 消费指定每次最大消费消息数量, 可以帮你控制在轮询里需要处理的数据量(一般设置为1)。

2.1.13、receive.buffer.bytessend.buffer.bytes

socket 在读写数据时用到的TCP 缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。

如果生产者或消费者与broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

2.2、消息轮询

消息轮询是消费者 API 的核心,通过一个简单的轮询( while 死循环)向服务器请求数据

一旦消费者订阅 了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据

⬤ 轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。

⬤ 如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成

注释1:这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据(消费者必须持续对 Kafka 进行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者)。

注释2:传给 poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。 如果该参数被设为 0,poll() 会立即返回,否则 它会在指定的毫秒数内一直等待 broker 返回数据。poll() 方法有一个超时参数,它指定了方法在多久之后可以返回, 不管有没有可用的数据都要返回。超时时间的设置取决于应用程序对响应速度的要求, 比如要在多长时间内把控制权归还给执行轮询的线程

注释3:poll() 方法返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理这些记录。

注释4:在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭, 并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡, 因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息

@Test
public void consumer(){
  try {
    //注释1
    while (true) {
      //注释2
      ConsumerRecords<String, String> records = consumer.poll(100);
      //注释3
      for (ConsumerRecord<String, String> record : records) {
        log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", record.partition(), record.offset(), record.key(), record.value());
      }
    }
  } catch (Exception e) {
    log.error("消费者处理数据失败", e);
  } finally {
    //注解4
    consumer.close();
  }
}

2.2.1、消费者线程安全(顺序消费)

在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。

按照规则,一个消费者使用一个线程。如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。最好是把消费者的逻辑封装在自己的对象里,然后使用Java的ExecutorService 启动多个线程,使每个消费者运行在自己的线程上。

2.2.1.1、方案1(不现实):每个线程维护一个 KafkaConsumer

消费者程序启动多个线程,每个线程维护专属的KafkaConsumer,负责完整的消息获取、消息处理流程。

image-20210223211213194

优点:方便实现,无线程交互开销,易于维护

缺点:从消费消费模型可看出每个 KafkaConsumer 会负责固定的分区,因此无法提升单个分区的消费能力,如果一个主题分区数量很多,只能通过增加 KafkaConsumer 实例提高消费能力,这样一来线程数量过多,导致项目 Socket 连接开销巨大,项目中一般不用该线程模型去消费。

public class KafkaConsumerThread  implements Runnable{

  private KafkaConsumer<String,String> consumer;
  private AtomicBoolean closed = new AtomicBoolean(false);
  public KafkaConsumerThread(){

  }
  // 构造方法 生成自己的consumer
  public KafkaConsumerThread(Properties props) {
    this.consumer = new KafkaConsumer<>(props);
  }

  @Override
  public void run() {
    try {
      // 消费同一主题
      consumer.subscribe(Collections.singletonList("six-topic"));
      // 线程名称
      String threadName = Thread.currentThread().getName();
      while (!closed.get()){
        ConsumerRecords<String, String> records = consumer.poll(3000);
        for (ConsumerRecord<String, String> record : records) {
          System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value());
        }
      }
    }catch (WakeupException e){
      e.printStackTrace();
    }finally {
      consumer.close();
    }
  }

  /**
     * 关闭消费
     */
  public void shutdown(){
    closed.set(true);
    // wakeup 可以安全地从外部线程来中断活动操作
    consumer.wakeup();
  }

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "XXXXXXX:9093");
    props.put("group.id", "thread-1");//消费者组,只要group.id相同,就属于同一个消费者组
    props.put("enable.auto.commit", "true");//自动提交offset
    props.put("auto.offset.reset", "earliest");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("session.timeout.ms", "30000");
    props.put("max.poll.records",6);
    // 运行三个线程,消费同一个topic 这个topic的分区必须大于等于3 否则会有消费者消费不到数据
    for (int i = 0; i < 3 ; i++) {
      new Thread(new KafkaConsumerThread(props),"Thread"+i).start();
    }
  }
}

2.2.1.2、方案2 (采纳):单 KafkaConsumer 实例 + 多 worker 线程

比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的

消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息

因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

image-20210706193125453

优点:

KafkaConsumer 实例与消息消费逻辑解耦后,我们不需要创建多个 KafkaConsumer 实例就可进行多线程消费,还可根据消费的负载情况动态调整 worker 线程,具有很强的独立扩展性

缺点:

1、 跨多个处理器的顺序保证需要特别注意,因为线程是独立的执行,后来的消息可能比遭到的消息先处理,这仅仅是因为线程执行的运气。如果对排序没有问题,这就不是个问题(下面就是解决方案)。

2、手动提交变得更困难,因为它需要协调所有的线程以确保处理对该分区的处理完成(涉及到异常事件的时候偏移量的提交)。

解决:写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

image-20210706193324432

public class KafkaConsumerThread2 implements Runnable {
  // 存储消息 先进先出队列
  private LinkedBlockingQueue<ConsumerRecords<String,String>> list;
  private AtomicBoolean closed = new AtomicBoolean(false);

  public KafkaConsumerThread2() {
  }

  public KafkaConsumerThread2(LinkedBlockingQueue<ConsumerRecords<String, String>> list) {
    this.list = list;
  }

  @Override
  public void run() {
    // 线程名称
    String threadName = Thread.currentThread().getName();
    // 处理消息
    while (!closed.get()){
      try {
        ConsumerRecords<String, String> records = list.take();
        System.out.println("消息数量"+records.count());
        if (records.isEmpty()){
          System.out.printf("队列为空,不消费数据,Thread-name= %s\n",threadName);
        }else {
          for (ConsumerRecord<String, String> record : records) {
            Thread.sleep(3000);
            System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value());
          }
        }
      }catch (InterruptedException e){
        e.printStackTrace();
      }
    }
  }

  public static void main(String[] args) {
    LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue<>();
    Properties props = new Properties();
    props.put("bootstrap.servers", "10.33.68.68:9093");
    props.put("group.id", "thread-5");//消费者组,只要group.id相同,就属于同一个消费者组
    props.put("enable.auto.commit", "true");//自动提交offset
    props.put("auto.offset.reset", "earliest");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("session.timeout.ms", "30000");
    props.put("max.poll.records",5);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // 消费同一主题
    consumer.subscribe(Collections.singletonList("six-topic"));
    // 开启三个线程处理队列中的消息
    for (int i = 0; i <3 ; i++) {
      new Thread(new KafkaConsumerThread2(list),"thread-"+i).start();
    }
    while (true){
      ConsumerRecords<String, String> records = consumer.poll(1000);
      try {
        list.put(records);
        //Thread.sleep(3000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}


public class ConsumerDealThread implements Runnable{
  private ConsumerRecord record;

  public ConsumerDealThread(ConsumerRecord record) {
    this.record = record;
  }

  public void run() {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",Thread.currentThread().getName(),record.topic(),record.partition(),record.offset(),record.key(),record.value());
  }
}


public class KafkaConsumerThread3 {
  public static void main(String[] args) {
    LinkedBlockingQueue<ConsumerRecords<String, String>> list = new LinkedBlockingQueue<>();
    Properties props = new Properties();
    props.put("bootstrap.servers", "10.33.68.68:9093");
    props.put("group.id", "thread-18");//消费者组,只要group.id相同,就属于同一个消费者组
    props.put("enable.auto.commit", "true");//自动提交offset
    props.put("auto.offset.reset", "earliest");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("session.timeout.ms", "30000");
    props.put("max.poll.records",5);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // 消费同一主题
    consumer.subscribe(Collections.singletonList("six-topic"));
    ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.MILLISECONDS,
                                                      new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
    while (true){
      ConsumerRecords<String, String> records = consumer.poll(1000);
      try {
        for (ConsumerRecord<String, String> record : records) {
          executor.submit(new ConsumerDealThread(record));
        }
      } catch (Exception e) {
        e.printStackTrace();
        consumer.wakeup();
        executor.shutdown();
        try {
          if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
            System.out.println("超时,未关闭线程池");
          }
        } catch (InterruptedException e2) {
          e.printStackTrace();
        }
      }
      BlockingQueue<Runnable> queue = ((ThreadPoolExecutor) executor).getQueue();
      System.out.println("队列数量:"+queue.size());
    }
  }
}


2.2.2、订阅主题

订阅主题 创建好消费者之后,下一步可以开始订阅主题了。subscribe() 方法接受一个主题列表作为参数,使用起来很简单:

1、我们也可以在调用 subscribe() 方法时传入一个正则表达式。正则表达式可以匹配多个主 题,如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次 再均衡,消费者就可以读取新添加的主题。

2、如果应用程序需要读取多个主题,并且可以处 理不同类型的数据,那么这种订阅方式就很管用。在 Kafka 和其他系统之间复制数据时, 使用正则表达式的方式订阅多个主题是很常见的做法。

consumer.subscribe(Collections.singletonList("customerCountries"));

3、提交和偏移量

Kafka 不会像其他 JMS 队列那样需要得到消费者的确认,这是 Kafka 的一个独特之处。消费者可以使用 Kafka 来追踪消息在分区里的位置(偏移量)。

每次调用 poll() 方法,它总是返回由生产者写入 Kafka 但还没有被消费者读取过的记录, 我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。我们把更新分区当前位置的操作叫作提交。

3.1、消费者是如何提交偏移量的呢?

新版本的 Kafka ,消费者往一个叫作 _consumer_offset 的特殊主题发送 消息,消息里包含每个分区的偏移量。 也支持将 偏移量 信息存储在 Zookeeper 中,通过设置属性 offsets.storage 控制,offsets.storage 属性可选值有 kafkazookeeper。消费者也可以不使用 Kafka 提供的偏移量存储方案,可自定义存储方式,

☼ 如果消费者一直处于运行状态,那么偏移量就没有什么用处。

☼ 不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前分区的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方 继续处理。

3.1.1、提交的偏移量小于客户端处理的最后一个消息的偏移量

如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的 消息就会被重复处理

image-20210224173150625

3.1.2、提交的偏移量大于客户端处理的最后一个消息的偏移量

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失

image-20210224173420933

3.2、提交偏移量

3.2.1、自动提交(容易产生重复消息)

最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那 么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。

与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。

优点:如果你在消费者轮询操作里处理所有的数据,那么自动提交可以保证只提交已经处理过的偏移量

**缺点,1、无法控制重复处理消息:假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理 **

2、消息可能丢失(个人原因):比如消费者在自动提交偏移量之前停止处理消息,再或者如果把消息交给另外一个后台线程去处理,自动提交机制可能会在消息还没有处理完毕就提交偏移量。专业会导致消息丢失

3.2.1.1、这种方式的问题

自动提交虽然方便,不过并没有为开发者留有余地来避免重复处理消息。

可以通过修改提交时间间隔来更频繁地 提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被真实处理了,所以在再次调用之前最好确保所有当前调用返回的消息 都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。

3.2.2、手动提交

大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性(减少自动提交时间),并在发生再均衡时减少重复消息的数量消费者API 提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔

auto.commit.offset 设为false,让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量最简单也最可靠。这个API 会提交由poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

如果发生了再均衡(我的理解,当前消费者宕机),从最近一批消息到发生再均衡之间的所有消息都将被重复处理。

注释5:处理完当前批次的消息,在轮询更多的消息之前,调用commitSync() 方法提交当前批次最新的偏移量。

注释6:只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功。如果提交失败,我们也只能把异常记录到错误日志里。

@Test
public void consumer(){
  try {
    //注释1
    while (true) {
      //注释2
      ConsumerRecords<String, String> records = consumer.poll(100);
      //注释3
      for (ConsumerRecord<String, String> record : records) {
        log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", 
                 record.topic(), record.partition(), record.offset(), record.key(), record.value());
      }
      try {
        consumer.commitSync();  //注释5
      } catch (CommitFailedException e) {
        log.error("commit failed", e)  //注释6
      }
    }
  } catch (Exception e) {
    log.error("消费者处理数据失败", e);
  } finally {
    //注解4
    consumer.close();
  }
}

3.2.3、异步提交

手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。以同步提交的方式,我们可以通过降低提交频率(处理很多任务之后再提交)来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。所以出现了异步提交

⬤ 在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。

⬤ 为什么不重试呢?是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。 假设我们发出一个请求用 于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000

如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时 候如果发生再均衡,就会出现重复消息。(我个人理解,这种异步的方式如果3000网络快,先提交,2000网络慢后提交,也会导致重复消息)

注释1:异步提交,没有回调

注释2:异步提交,有回调

@Test
public void commitAsync(){
  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", 
                 record.topic(), record.partition(), record.offset(), record.key(), record.value());
      }
      //注释1
      // consumer.commitAsync();

      //注释2
      consumer.commitAsync((offsets, exception) ->{
        if (exception != null){
          log.error("Commit failed for offsets {}", offsets, exception);
        }
      });
    }

  } catch (Exception e) {
    log.error("消费者处理数据失败", e);
  } finally {
    consumer.close();
  }
}

3.2.3.1.、重试异步重试

commitAsync() 也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标, 如果你要用它来进行重试,一定要注意提交的顺序。

我们可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。

1、在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那 么可以安全地进行重试。

2、如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。

3.2.4、同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败 是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。

因此,在消费者关闭前一般会组合使用 commitAsync() commitSync()。它们的工作原理 如下(后面讲到再均衡监听器时,我们会讨论如何在发生再均衡前提交偏移量):

注释1:异步提交,如果一切正常,我们使用 commitAsync() 方法来提交。这样速度更快,而且即使这次提 交失败,下一次提交很可能会成功。

注释2:关闭消费者前提交,如果直接关闭消费者,就没有所谓的 “下一次提交”了。使用 commitSync() 方法会一 直重试,直到提交成功或发生无法恢复的错误。(什么时候关闭消费者,看自己的业务,也看自己业务需要不需要关闭消费者前提交)

@Test
public void commitSyncAndAsync(){
  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", 
                 record.topic(), record.partition(), record.offset(), record.key(), record.value());
      }
      //注释1
      consumer.commitAsync();
    }
  } catch (Exception e) {
    log.error("消费者处理数据失败", e);
  } finally {
    try {
      //注释2
      consumer.commitSync();
    }finally {
      consumer.close();
    }
  }
}

3.2.5、提交特定的偏移量

调用 commitSync()commitAsync() 来实现提交偏移量的频率与处理消息批次的频率是一样的。

但如果想要更频繁地提交该怎么办?

如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息(避免自己突然挂了,但是偏移量还没提交),想要在批次中间提交偏移量该怎么办?

1、这种情况无法通过调用 commitSync()commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

2、消费者 API 允许在调用 commitSync()commitAsync() 方法时传进去希望提交的分区和偏移量的 map

3、假设你处理了半个批次的消息,最后一个来自主题“customers” 分区 3 的消息的偏移量是 5000,你可以调用 commitSync() 方法来提交它。

4、不过,因为消费者可能不只读取一个分区,你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂

注释1:在读取每条记录之后,使用期望处理的下一个消息的偏移量更新 map 里的偏移量。下一次就从这里开始读取消息

注释2:传入主题和分区,以及偏移量信息就可以主动提交偏移量

注释3:我们决定每处理 1000 条记录就提交一次偏移量。在实际应用中,你可以根据时间或记录的内容进行提交。

注释4:这里使用的是异步提交

@Test
public void customOffsetsCommit() {
  Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  int count = 1;
  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", 
                 record.topic(), record.partition(), record.offset(), record.key(), record.value());
        // 注释1
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metadata");
        // 注释2
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), offsetAndMetadata);
        // 注释3
        if (count++ % 1000 == 0) {
          // 注释4
          consumer.commitAsync(currentOffsets, null);
        }
      }
    }
  } catch (Exception e) {
    log.error("消费者处理数据失败", e);
  } finally {
    consumer.close();
  }
}

4、再均衡监听器

kafka 在有新消费者加入或者撤出时,会触发rebalance操作,

subscibe 订阅主题的时候,我们可以编写回掉函数,在触发 rebalance 操作之前和触发成功之后,提交相应偏移量和获取拉取偏移量

4.1、消费者添加再均衡监听器

//消费者添加再均衡监听器
consumer.subscribe(Collections.singletonList(TOPIC), new RebalanceListener(consumer));

ConsumerRebalanceListener 有两个需要实现的方法。

1、public void onPartitionsRevoked(Collection<TopicPartition> partitions)方法会在 再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接 管分区的消费者就知道该从哪里开始读取了。

该方法会在再均衡开始之前和消费者停止读取之后被调用。如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量。

使用:在失去分区所有权之前通过 onPartitionsRevoked() 方法来提交偏移

解释:如果发生再均衡,我们要在即将失去分区所有权时提交偏移量

要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。

我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量。调用 commitSync() 方法,确保在再均衡发生之前提交偏移量。

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
  System.out.println("分区再均衡,提交当前偏移量:" + currentOffsets);
  consumer.commitSync(currentOffsets);
}

2、public void onPartitionsAssigned(Collection<TopicPartition> partitions)方法会在 重新分配分区之后和消费者开始读取消息之前被调用。(这个之后介绍)

4.2、示例代码

@Service
@Slf4j
public class RebalanceListenerConsumer {

  private static final String TOPIC = "HLJ_TOPIC_JAVA";
  private static final String BROKER_LIST = "localhost:9092";
  private static final String GROUP_ID = "HTEST_GROUP";
  //记录当前偏移量
  private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  private static KafkaConsumer<String, String> consumer = null;

  static {
    Properties properties = initConfig();
    consumer = new KafkaConsumer<>(properties);
    //消费者添加再均衡监听器
    consumer.subscribe(Collections.singletonList(TOPIC), new RebalanceListener(consumer));
  }

  private static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return properties;
  }

  /**
   * 再均衡监听器
   */
  @Slf4j
  public static class RebalanceListener implements ConsumerRebalanceListener {


    private KafkaConsumer<String, String> consumer;

    /**
     * 初始化方法,传入consumer对象,否则无法调用外部的consumer对象,必须传入
     */
    public RebalanceListener(KafkaConsumer<String, String> consumer) {
      this.consumer = consumer;
    }


    /**
     * 该方法会在再均衡之后和消费者读取之前被调用
     * 使用:在获得新分区后开始读取消息,不需要做其他事情。(最好是消费的时候保存偏移量到数据库中,然后从数据库中获取偏移量的位置)
     */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      long committedOffset = -1;
      for (TopicPartition topicPartition : partitions) {
        // 获取该分区已经提交的偏移量
        committedOffset = consumer.committed(topicPartition).offset();
        System.out.println("重新分配分区,提交的偏移量:" + committedOffset);
        // 重置偏移量到上一次提交的偏移量的下一个位置处开始消费
        consumer.seek(topicPartition, committedOffset + 1);
        
        //从数据库中获取偏移量,前提是要在每次消费之后保存到数据库中
        //consumer.seek(partition, getOffsetFromDB(partition));
      }
    }


    /**
      * 该方法会在再均衡开始之前和消费者停止读取之后被调用。如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量。
      * 使用:在失去分区所有权之前通过 onPartitionsRevoked() 方法来提交偏移量
      * 解释:如果发生再均衡,我们要在即将失去分区所有权时提交偏移量。
      * 要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。
      *        我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量是已经处理过的,所以不会有什么问题。调用 commitSync() 方法,确保在再均衡发生之前提交偏移量。
         */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      System.out.println("分区再均衡,提交当前偏移量:" + currentOffsets);
      consumer.commitSync(currentOffsets);
    }

  }


  @Test
  public void consumer() {
    try {
      int count = 1;
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
         log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", 
                 record.topic(), record.partition(), record.offset(), record.key(), record.value());
          OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metadata");
          currentOffsets.put(new TopicPartition(record.topic(), record.partition()), offsetAndMetadata);
          if (count++ % 1000 == 0) {
            // 注释4
            consumer.commitAsync(currentOffsets, null);
          }
        }
      }
    } catch (Exception e) {
      log.error("消费者处理数据失败", e);
    } finally {
      try {
        //注释2
        consumer.commitSync();
      } finally {
        consumer.close();
      }
    }
  }

}


5、从特定偏移量处开始处理记录

我们知道了如何使用 poll() 方法从各个分区的最新偏移量处开始处理消息。 不过,有时候我们也需要从特定的偏移量处开始读取消息。

1、从分区开始的起始位置读消息

public void seekToBeginning(Collection<TopicPartition> partitions)  
  
  
//使用  
consumer.seekToBeginning(partitions);

2、从分区末尾的起始位置读消息

public void seekToEnd(Collection<TopicPartition> partitions)   
  
//使用  
consumer.seekToEnd(partitions);

3、从指定的位置查找偏移量

Kafka 也为我们提供了用于查找特定偏移量的 API。它有很多用途,比如向后回退几个消息或者向前跳过几个消息(对时间比较敏感的应用程序在处理滞后的情况下希望能 够向前跳过若干个消息)。如果消息回溯(替换topic可能会用到)

在使用 Kafka 以外的系统来存储偏移量时,它将给我们带来更 大的惊喜。比如数据库。

注释1:订阅主题之后,开始启动消费者,我们调用一次 poll() 方法,让消费者加入到消费者 群组里,并获取分配到的分区,

注释2:注释3完成之后,马上调用 seek() 方法定位分区的偏移量。要记住, seek()方法只更新我们正在使用的位置,在下一次调用 poll() 时就可以获得正确的消 息。如果 seek() 发生错误(比如偏移量不存在),poll() 就会抛出异常

注释3:使用一个虚构的方法来从数据库获取偏移量,在分配到新分区的时候,使用 seek() 方法定位到那些记录。

注释4:使用一个虚构的方法来提交数据库事务。大致想法是这样的:在处理完记录之后,将记录和偏移量插入数据库,然后在即将失去分区所有权之前提交事务,确保成功保存了这 些信息

注释5:另一个虚构的方法,这次要更新的是数据库里用于保存偏移量的表。假设更新记录的速 度非常快,所以每条记录都需要更新一次数据库,但提交的速度比较慢,所以只在每个批次末尾提交一次。这里可以通过很多种方式进行优化

@Service
@Slf4j
public class SaveOffsetsOnRebalanceConsumer {

    private static final String TOPIC = "HLJ_TOPIC_JAVA";
    private static final String BROKER_LIST = "localhost:9092";
    private static final String GROUP_ID = "HTEST_GROUP";
    //记录当前偏移量
    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    private static KafkaConsumer<String, String> consumer = null;

    static {
        Properties properties = initConfig();
        consumer = new KafkaConsumer<>(properties);
        //消费者添加再均衡监听器
        consumer.subscribe(Collections.singletonList(TOPIC), new SaveOffsetsOnRebalance(consumer));
        // 注释1
        consumer.poll(0);
        for (TopicPartition partition: consumer.assignment()){
            //注释2:
            // consumer.seek(partition, getOffsetFromDB(partition));
        }

    }

    private static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return properties;
    }

    /**
     * 再均衡监听器
     */
    @Slf4j
    public static class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {


        private KafkaConsumer<String, String> consumer;

        /**
         * 初始化方法,传入consumer对象,否则无法调用外部的consumer对象,必须传入
         */
        public SaveOffsetsOnRebalance(KafkaConsumer<String, String> consumer) {
            this.consumer = consumer;
        }


        /**
         * 该方法会在再均衡之后和消费者读取之前被调用
         * 使用:在获得新分区后开始读取消息,不需要做其他事情。
         */
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            for(TopicPartition partition: partitions){
                //注释3:
                // consumer.seek(partition, getOffsetFromDB(partition));
            }
        }

        /**
         * 该方法会在再均衡开始之前和消费者停止读取之后被调用。如果在这个方法中提交偏移量,则下一个消费者就可以获得读取的偏移量。
         * 使用:在失去分区所有权之前通过 onPartitionsRevoked() 方法来提交偏移量
         * 解释:如果发生再均衡,我们要在即将失去分区所有权时提交偏移量。
         * 要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。
         *        我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量是已经处理过的,所以不会有什么问题。调用 commitSync() 方法,确保在再均衡发生之前提交偏移量。
         */
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //注释4:
            // commitDBTransaction();
        }

    }


    @Test
    public void consumer() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", 
                 record.topic(), record.partition(), record.offset(), record.key(), record.value());
                // processRecord(record);
                //保存记录结果
                // storeRecordInDB(record);
                //保存位移
                //storeOffsetInDB(record.topic(), record.partition(), record.offset());
            }
          		//注释5
            //提交数据库事务,保存消费的记录以及位移
            // commitDBTransaction();
        }
    }

}


6、消费者退出

轮询的时候,消费者进入了一个无线循环。但是千万不要担心消费者无法退出,如果确定要退出循环,需要通过另一个线程调用 consumer.wakeup() 方法。

1、如果循环运行在主线程里,可以在 ShutdownHook 里调用该方法。要记住,consumer.wakeup() 是消费者 唯一一个可以从其他线程里安全调用的方法。

2、调用 consumer.wakeup() 可以退出 poll(), 并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那 么异常将在下一轮调用 poll() 时抛出。我们不需要处理 WakeupException,因为它只是用 于跳出循环的一种方式。

3、不过,在退出线程之前调用 consumer.close() 是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时。

注释1: ShutdownHook 运行在单独的线程里,所以退出循环最安全的方式只能是调用 wakeup() 方法。

注释2:在另一个线程里调用 wakeup() 方法,导致 poll() 抛出 WakeupException。你可能想捕获 异常以确保应用不会意外终止,但实际上这不是必需的。

注释3: 在退出之前,确保彻底关闭了消费者,关闭消费者会马上通知群组协调器进行一次分区再均衡,而不需要通过心跳判断

@Test
public void test() {
  final Thread mainThread = Thread.currentThread();
  //注释1:
  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    log.info("Starting exit...");
    consumer.wakeup();
    try {
      // 主线程继续执行,以便可以关闭consumer
      mainThread.join();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }));

  try {
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(1000);
      for (ConsumerRecord<String, String> record : records) {
        log.info("topic=[{}], partition = [{}], offset = [{}], key = [{}], value =[{}]", record.topic(), record.partition(), record.offset(), record.key(), record.value());
      }
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    //注释2:
    log.info("消费者退出", e);
  } finally {
    //注释3
    consumer.close();
    log.info("Closed consumer and we are done");
  }
}

6.1、解释shutdownHook

shutdownHook是一种特殊的结构,它允许开发人员插入JVM关闭时执行的一段代码。这种情况在我们需要做特殊清理操作的情况下很有用

6.1.1、执行时机

Application 正常退出,在退出时执行特定的业务逻辑,或者关闭资源等操作

虚拟机非正常退出,比如用户按下ctrl+COOM宕机、操作系统关闭(kill pid)等。在退出时执行必要的挽救措施

6.1.2、使用方法

1、应用程序无法保证shutdownHook总是运行的

如JVM由于某些内部错误而崩溃,或(Unix / Linux中的kill -9)或TerminateProcess(Windows)),那么应用程序需要立即终止而不会甚至等待任何清理活动。除了上述之外,还可以通过调用Runime.halt()方法来终止JVM,而阻止shutdownHook运行。

2、shutdownHook可以在完成前强行停止

虽然shutdownHook开始执行,但是在操作系统关闭的情况下,任然可以在完成之前终止它。在这种情况下,一旦SIGTERM被提供,O/S等待一个进程终止指定的时间。如果进程在该时间限制内没有终止,则O/S通过发出SIGTERM(或Windows中的对等方)强制终止进程。所以有可能这是在shutdownHook中途执行时发生的。

因此,建议谨慎地编写shutdownHook,确保它们快速完成,并且不会造成死锁等情况。另外特别注意的是,不应该执行长时间计算或等待用户I/O操作在钩子。

3、可以有多个shutdownHook,但其执行顺序无法保证

7、消息积压&重试

kafka的存储结构为消息逐条入队,对已经进入队列的消息,进行顺序存储,并且只能顺序消费,当遇到某条消息无法消费时(消息体格式错误,业务不兼容等),会对该消息进行重复推送(表现为重复推送),虽然能达到重试的功能,但如果该消息长时间无法被消费,就会导致整个队列后续消息无法消费。

7.1、优化Kafka

重试队列功能的作用:由于JMQ的消费逻辑为按照进入队列的顺序逐个消费,当遇到某个”坏消息”导致业务方无法消费时,会导致整个队列的消息被全量阻塞,无法消费。

7.1.1、重试队列

为解决此问题,可以考虑提供一种功能,将无法消费的消息移到”重试队列”,从而是消费者能够消费队列里的后续消息,不至于大量积压。在尝试将”坏消息”放入重试队列前,会先进行本地重试,本地重试(默认重试2次,加上最初的一次消费,共消费3次)仍失败后,才会尝试将消息放入重试队列

image-20220418132322760

7.1.2、风险

进入重试队列的消息会尽量按照重试策略推送消息,但是无法保证及时推送

ContactAuthor