前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、生产者描述

生产者的核心问题:

1、是否每个消息都很重要?

2、是否允许丢失一小部分消息?

3、偶尔出现重复消息是否可以接受?

4、是否有严格的延迟和吞吐量要求?

image-20210218170538774

1.1、生产者发送消息的过程

①、我们从创建一个ProducerRecord 对象开始,ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输

②、接下来,数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和 分区发送这条记录了。

③、紧接着,这条记录被添加到一个记录批次里这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

④、服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象它包含了主题和分区信息,以及记录在分区里的偏移量如果写入 失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还 是失败,就返回错误信息

2、创建一个生产者

2.1、生产者属性设置

private static final String BROKER_LIST="localhost:9092";
private static final String TOPIC="HLJ_TOPIC_JAVA";
private static KafkaProducer<String,String> producer = null;

private static Properties initConfig(){
  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  return properties;
}

static{
  Properties configs = initConfig();
  producer = new KafkaProducer<>(configs);
}

2.1.1、bootstrap.servers

指定 broker 的地址清单,表示 Kafka 集群, 如果集群中有多台物理服务器 ,则服务器地址之间用逗号分。

清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要 提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

2.1.2、key.serializer

broker 希望接收到的消息的键和值都是字节数组。 生产者接口允许使用参数化类型,因此可以把 Java 对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生 产者需要知道如何把这些 Java 对象转换成字节数组。

key.serializer 必须被设置为一 个实现了 org.apache.kafka.common.serialization.Serializer 接口的类. 生产者会使 用这个类把键对象序列化成字节数组。如果你只使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器

注意:key. serializer 是必须设置的,就算你打算只发送值内容。

2.1.3、value.serializer

key.serializer 一样,value.serializer 指定的类会将值序列化。

2.1.4、acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。 这个参数对消息丢失的可能性有重要影响。该参数有如下选项。

2.1.4.1、acks=0

即无需确认,只要生产者把消息发出去即认为已成功把消息写入kafka

KafkaProducer意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka,不需要等待任何确认收到的消息,没有任何保障可以保证此种情况下server已经成功接收到数据,同时重试配置也不会发生作用(因为KafkaProducer并不知道此次发送是否失败)

② 该情况,当数据已经发送出去,还在半路,此时leader挂了,但是producer还是认为消息发送成功了,这个时候就会导致这条消息丢失;

数据可靠性是最低的,传输效率也是最高的,配合发送方式为生成并忘记效率很高

2.1.4.2、acks=1(默认)

只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功 响应

KafkaProducer把消息发送出去,至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。该情况下,如果follower没有成功备份数据,而此时leader响应成功后,刚好又挂掉了,就会导致消息丢失

KafkaProducer把消息发送出去,如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。

2.1.4.3、acks=all

只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自 服务器的成功响应。这种模式是最安全的 ,不过,它的 延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。

ISR

1、Partition Leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护

2、如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除

3、当ISR中所有Replica都向Leader发送ACK时,leadercommit

acks=all,这个意思就是说,Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。

@:如果说Partition Leader刚接收到了消息,但是结果Follower没有收到消息,此时Leader宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。

@:此时可能Partition 2Follower变成Leader了,此时ISR列表里只有最新的这个Follower转变成的Leader了,那么只要这个新的Leader接收消息就算成功了。如果ISR列表不仅仅只是这个新的leader,则需要ISR列表所有的follower都成功同步消息,才算发送成功

@:还有一种情况,比如3副本,producer发送消息过后,leader成功接受到消息,follower1也成功同步了leader的数据,但是follower2还未同步leader数据,而此时leader挂了,follower1变成了新的leaderproducer会重新发送消息,此时就会存在重复数据,如果是follower2变成了新的leader,就不会存在重复数据

问题:

1、ack=all 可能出现消息重复

答案:如上

2、acks=all 就代表数据一定不会丢失吗?

答案:不是

1、单机:如果partition只有一个副本,也就是只有一个leader,没有其他follower,那么虽然他已经完成接受消息,但是接受消息过后宕机了,一样会导致数据丢失。

2、集群:假设现在往 Kafka 发送消息,分区的首领刚好崩溃,新的首领 正在选举当中,Kafka 会向生产者返回“首领不可用”的响应。在这个时候,如果生产者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失(重试次数/时间设置错误)

①:所以acks=all,必须跟ISR列表离至少有2个以上的副本配合使用,也就是起码有一个leader和一个follower才行,这样才能保证发送一条消息,一定是2个以上的副本都同步到数据才算是成功,此时任何一个副本宕机,都不会导致数据的丢失

②:要提高数据的可靠性,在设置acks=-1/all的同时,也要设置min.insync.replicas(默认值是1,可以在broker或者topic层面进行设置)这个参数的配合;min.insync.replicas设置的是ISR中的最小副本数是多少,当且仅当acks=all时,这个参数才会生效。

③:重试次数和重试时间间隔要控制好。

2.1.5、buffer.memory(默认32M)

该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。

如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候, send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 max.block.ms参数,表示在抛出异常之前可以阻塞一段时间)

2.1.6、compression.type(默认none)

默认情况下,消息发送时不会被压缩。

该参数可以设置为 snappygziplz4,它指定了 消息被发送给 broker 之前使用哪一种压缩算法进行压缩。使用压缩可以降低网络传输开销和 存储开销,而这往往是向 Kafka 发送消息的瓶颈所在

1、snappy 压缩算法由 Google 发明, 它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网 络带宽,可以使用这种算法。

2、gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩 比,所以如果网络带宽比较有限,可以使用这种算法。

2.1.7、retries

retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会 放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。

1、建议在设置重试次数和重试时间间隔之前, 先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间), 让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。

2、不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。

3、一般情 况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。 你只需要处理那些不可重试的错误或重试次数超出上限的情况。

2.1.8、batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指 定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。

当批次被填满, 批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送(达到发送时间)。所以就算把批次大小设置得很大, 也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频 繁地发送消息,会增加一些额外的开销。

2.1.9、linger.ms

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在 批次填满或 linger.ms 达到上限时把批次发送出去。

默认情况下,只要有可用的线程,生 产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数, 让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。

虽然这样会增加延 迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。

2.1.10、client.id

该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。

2.1.12、max.in.flight.requests.per.connection

该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。

把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试

2.1.12.1、顺序保证

Kafka 可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照 一定的顺序发送消息,broker 就会按照这个顺序把它们写入分区,消费者也 会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。例如,往 一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!不过, 有些场景对顺序不是很敏感。

如果把 retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入 成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那 么两个批次的顺序就反过来了。

一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是 很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests. per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其 他的消息发送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在 对消息的顺序有严格要求的情况下才能这么做。

2.1.13、timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回 一个错误。

request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间(如果在超时超时之前没有收到响应,客户端将在需要时重新发送请求,或者在重试耗尽时失败请求。此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性(我的理解:虽然超时,但是有可能已经发送成功了)。)

metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器 返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。

2.1.14、max.block.ms

该参数指定了在调用 KafkaProducer.send() 方法或使用 KafkaProducer.partitionsFor()方法获取元数据时生产者的阻塞 时间。

当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在 塞时间达到 max.block.ms 时,生产者会抛出超时异常。用户提供的序列化器或分区器中的阻塞将不计入此超时时间 。

2.1.15、 max.request.size

该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。

例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每 个消息大小为 1KB。

另外,broker 对可接收的消息最大值也有自己的限制(message.max. bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。

2.1.16、receive.buffer.bytessend.buffer.bytes

这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1, 就使用操作系统的默认值。

如果生产者或消费者与 broker 处于不同的数据中心,那么可以 适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

2.2、发送消息的方式

2.2.1、发送并忘记

我们把消息发送给服务器(我的理解是:提交给线程去处理了),但并不关心它是否正常到达,本质上是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性:

大多数情况下,消息会正常到 达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候 也会丢失一些消息。

注释1:我们使用生产者的 send() 方法发送 ProducerRecord 对象。从生产者的架构图里可以看 到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。

注释2:send() 方法会返 回一个包含 RecordMetadataFuture 对象,不过因为我们会忽略返回值,所以无法知 道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录不太重要的应用程序日志。

注释3:我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消息之前,生产者还是有可能发生其他的异常。这些异常有可能是 SerializationException (说明序列化消息失败)、BufferExhaustedExceptionTimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。

/**
* 1、发送并忘记
*/
@Test
public void sendAndForget(){
  String key = "Precision_Products";
  String value = "sendAndForget Msg"
  //注释1:  
  ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
  try {
    // 注释1
    producer.send(record);
  } catch (Exception e) {
    // 注释3
    log.info("消息发送失败", e);
  }
}

2.2.2、同步发送

我们使用 send()方法发送消息,它会返回一个 Future 对象,通过调用get() 方法进行等待,判断道消息是否发送成功。

注释1:在这里,producer.send() 方法先返回一个 Future 对象,然后调用 Future 对象的 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata 对象,可以用它获取消息的偏移量

注释2:如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出重试异常。另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。我们只是简单地把 异常信息打印出来。

@Test
public void sendSynchronize(){
  String key = "Precision_Products";
  String value = "sendSynchronize Msg" ;
  ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
  try {
    // 注释1
    RecordMetadata recordMetadata = producer.send(record).get();
    log.info("消息发送成功,返回结果【{}】",recordMetadata);
  } catch (Exception e) {
    // 注释2
    log.error("消息发送失败", e);
  }
}

// 打印日志
// 2021-02-18 18:44:34 INFO  -[                                ]- 消息发送成功,返回结果【{"offset":0,"timestamp":1613645074360,"serializedKeySize":18,"serializedValueSize":25,"topicPartition":{"hash":-1133698772,"partition":0,"topic":"HLJ_TOPIC_JAVA"}}】 com.healerjean.proj.kafka.java.Producer.main[68]
// 2021-02-18 18:45:14 INFO  -[                                ]- 消息发送成功,返回结果【{"offset":1,"timestamp":1613645114891,"serializedKeySize":18,"serializedValueSize":25,"topicPartition":{"hash":-1133698772,"partition":0,"topic":"HLJ_TOPIC_JAVA"}}】 com.healerjean.proj.kafka.java.Producer.main[68]

2.2.3、异步发送

我们调用 send() 方法,并指定一个回调函数,消息以异步的方式发送,通过回调函数返回消息发送成功/失败

/**
* 3、异步发送
*/
@Test
public void sendAsync(){
  String key = "Precision_Products";
  String value = "sendAsync Msg" ;
  ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
  producer.send(record, (metadata, exception) -> {
    log.info("消息发送成功,返回结果【{}】",metadata);
    if(exception != null){
      exception.printStackTrace();
    }
  });
}

3、序列化器

不建议使用序列化器,建议使用json或其他基本数据类型

4、分区

在之前的例子里,ProducerRecord 对象包含了目标主题、键和值。,ProducerRecord 对象可以只包含目标主题和值,键可以设置为默认的 null,不过大多数应用程序会用到键。

键有两个用途

1、:可以作为消息的附加信息,

2、用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。


ProducerRecord<Integer, String> record = new ProducerRecord<>("Topic", "key", "value");
ProducerRecord<Integer, String> record = new ProducerRecord<>("Topic", "value");

@:如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用 的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键进行散列(使用 Kafka 自己 的散列算法),然后根据散列值把消息映射到特定的分区上。

这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入 数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在 分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证 了——旧数据仍然留在分区 34,但新的记录可能被写到其他分区上。

综上所述:如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。

4.1、实现自定义分区策略

实现一个用于审计功能的分区策略:假设我们有两类消息,其中一类消息的key为audit,用于审计,放在最后一个分区中,其他消息在剩下的分区中随机分配。

我的理解:实现有些场景下消费者能力很强的情况/重点观察的消费者的消费情况

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;
import java.util.Random;


/***
 *
 * 实现一个自定义分区策略:
 * key含有audit的一部分消息发送到最后一个分区上,其他消息在其他分区随机分配
 */
public class PartitionerImpl implements Partitioner {


    private Random random;

    public void configure(Map<String, ?> configs) {
        //做必要的初始化工作
        random = new Random();
    }

    //分区策略
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        String keyObj = (String) key;
        //根据topic获取分区信息
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
        //获取分区数量
        int partitionCount = partitionInfoList.size();
        int auditPartition = partitionCount - 1;
        return keyObj == null || "".equals(keyObj) || !keyObj.contains("audit") ? random.nextInt(partitionCount - 1) : auditPartition;
    }

    public void close() {
        //清理工作
    }
}

ContactAuthor