Kafka可靠的数据传递
前言
Github:https://github.com/HealerJean
1、可靠性保证
在讨论可靠性时,我们一般会使用保证这个词,它是指确保系统在各种不同的环境下能够发生一致的行为。 ACID 大概是大家最熟悉的一个例子,它是关系型数据库普遍支持的标准可靠性保证。
ACID 指的是原子性、一致性、隔离性和持久性。如果一个供应商说他们的数据库遵循 ACID 规范,其实就是在说他们的数据库支持与事务相关的行为。
了解系统的保证机制对于构建可靠的应用程序来说至关重要,这也是能够在不同条 件下解释系统行为的前提。那么 Kafka 可以在哪些方面作出保证呢?
⬤ Kafka
可以保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且 消息 B 在消息 A 之后写入,那么 Kafka 可以保证消息 B 的偏移量比消息 A 的偏移量大, 而且消费者会先读取消息 A 再读取消息 B。
⬤ 只有当消息被写入分区的所有同步副本时(但不一定要写入磁盘),它才被认为是“已提交”的。生产者可以选择接收不同类型的确认,比如在消息被完全提交时的确认,或者在消息被写入首领副本时的确认,或者在消息被发送到网络时的确认。
⬤ 只要还有一个副本是活跃的,那么已经提交的消息就不会丢失。
⬤ 消费者只能读取已经提交的消息。
这些基本的保证机制可以用来构建可靠的系统,但仅仅依赖它们是无法保证系统完全可靠 的。
构建一个可靠的系统需要作出一些权衡,Kafka
管理员和开发者可以在配置参数上作 出权衡,从而得到他们想要达到的可靠性。这种权衡一般是指消息存储的可靠性和一致性的重要程度与可用性、高吞吐量、低延迟和硬件成本的重要程度之间的权衡。
下面将介绍 Kafka 的复制机制,并探讨 Kafka 是如何实现可靠性的,最后介绍一些重要的配置参数。
2、复制
Kafka
的分区的多副本和复制机制架构是Kafka
可靠性保证的核心。把消息写入多个副本可以使Kafka
在发生崩溃时仍能保证消息的持久性。(我们已经在第 5 章深入解释了 Kafka 的复制机制,现在重新回顾一下主要内容。)
Kafka
的主题被分为多个分区,分区是基本的数据块。分区存储在单个磁盘上,Kafka
可以 保证分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。每个分区可 以有多个副本,其中一个副本是首领。所有的事件都直接发送给首领副本,或者直接从首领副本读取事件。其他副本只需要与首领保持同步,并及时复制最新的事件。当首领副本 不可用时,其中一个同步副本将成为新首领。分区首领是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步的。
1、与 Zookeeper
之间有一个活跃的会话,也就是说,它在过去的 6s(可配置)内向 Zookeeper
发送过心跳。
2、在过去的 10s 内(可配置)从首领那里获取过消息。
3、在过去的 10s 内从首领那里获取过最新的消息。光从首领那里获取消息是不够的,它还必须是几乎零延迟的。
如果跟随者副本不能满足以上任何一点,比如与 Zookeeper
断开连接,或者不再获取新消 息,或者获取消息滞后了 10s 以上,那么它就被认为是不同步的。一个不同步的副本通过 与 Zookeeper
重新建立连接,并从首领那里获取最新消息,可以重新变成同步的。这个过 程在网络出现临时问题并很快得到修复的情况下会很快完成,但如果 broker
发生崩溃就需 要较长的时间。
2.1、非同步副本
如果一个或多个副本在同步和非同步状态之间快速切换,说明集群内部出现了问题,通常是 Java 不恰当的垃圾回收配置导致的。不恰当的垃圾回收配置 会造成几秒钟的停顿,从而让
broker
与Zookeeper
之间断开连接,最后变成 不同步的,进而发生状态切换。
⬤ 一个滞后的同步副本会导致生产者和消费者变慢,因为在消息被认为已提交之前,客户端会等待所有同步副本接收消息。而如果一个副本不再同步了,我们就不再关心它是否已经收到消息。
⬤ 虽然非同步副本同样滞后,但它并不会对性能产生任何影响。但是,更少的同步副本意味着更低的有效复制系数,在发生宕机时丢失数据的风险更大。我们将在下一节讲解在实际项目中这将意味着什么。
3、broker
配置
broker
有 3 个配置参数会影响Kafka
消息存储的可靠性。与其他配置参数一样,它们可以 应用在broker
级别,用于控制所有主题的行为,也可以应用在主题级别,用于控制个别主 题的行为。在主题级别控制可靠性,意味着
Kafka
集群可以同时拥有可靠的主题和非可靠的主题。例 如,在银行里,管理员可能把整个集群设置为可靠的,但把其中的一个主题设置为非可靠 的,用于保存来自客户的投诉,因为这些消息是允许丢失的。
让我们来逐个介绍这些配置参数,看看它们如何影响消息存储的可靠性,以及 Kafka
在哪 些方面作出了权衡。
3.1、复制系数(主题)
主题级别的配置参数是
replication.factor
,而在broker
级别则可以通过default.replication.factor
来配置自动创建的主题。在这本书里,我们假设主题的复制系数都是 3,也就是说每个分区总共会被 3 个不同的
broker
复制 3 次。这样的假设是合理的,因为Kafka
的默认复制系数就是 3——不过用户 可以修改它。即使是在主题创建之后,也可以通过新增或移除副本来改变复制系数。
如果复制系数为 N
,那么在 N-1
个 broker
失效的情况下,仍然能够从主题读取数据或向主 题写入数据。所以,更高的复制系数会带来更高的可用性、可靠性和更少的故障。另一方 面,复制系数 N
需要至少 N
个 broker
,而且会有 N
个数据副本,也就是说它们会占用 N
倍的磁盘空间。我们一般会在可用性和存储硬件之间作出权衡。
3.1.1、如何确定一个主题需要几个副本
那么该如何确定一个主题需要几个副本呢?这要看主题的重要程度,以及你愿意付出多少 成本来换取可用性。有时候这与你的偏执程度也有点关系。
⬤ 一个broker
:如果因 broker
重启导致的主题不可用是可接受的(这在集群里是很正常的行为),那么把复制系数设为 1
就可以了。在作出这个权衡的时候,要确保这样不会对你的组织和用户造 成影响,因为你在节省了硬件成本的同时也降低了可用性。
⬤ 2个broker
:复制系数为 2
意味着可以容忍 1
个 broker
发生失效,看起来已经足够了。 不过要记住,有时候 1 个 broker
发生失效会导致集群不稳定(通常是旧版的 Kafka),迫使你重启另一个 broker
——集群控制器。也就是 说,如果将复制系数设为 2
,就有可能因为重启等问题导致集群不可用。所以这是一个两难的选择。
基于以上几点原因,我们建议在要求可用性的场景里把复制系数设为 3
。在大多数情况下, 这已经足够安全了——不过我们也见过有些银行使用 5 个副本,以防不测。
副本的分布也很重要。默认情况下,Kafka
会确保分区的每个副本被放在不同的 broker
上。 不过,有时候这样仍然不够安全。如果这些 broker
处于同一个机架上,一旦机架的交换机发生故障,分区就会不可用,这时候把复制系数设为多少都不管用。
为了避免机架级别 的故障,我们建议把 broker
分布在多个不同的机架上,并使用 broker.rack
参数来为每个 broker
配置所在机架的名字。如果配置了机架名字,Kafka
会保证分区的副本被分布在多 个机架上,从而获得更高的可用性。我们已经在第 5 章介绍了如何在 broker
和机架上分布 副本,如果你对此感兴趣,可以参考第 5 章的内容。
3.2、不完全的首领选举(broker
级别)
unclean.leader.election
只能在broker
级别(实际上是在集群范围内)进行配置,它的默认值是true
。我们之前提到过,当分区首领不可用时,一个同步副本会被选为新首领。如果在选举过程中没有丢失数据,也就是说提交的数据同时存在于所有的同步副本上,那么这个选举就是 “完全”的。
但如果在首领不可用时其他副本都是不同步的,我们该怎么办呢? 这种情况会在以下两种场景里出现。
第一种场景:分区有 3
个副本,其中的两个跟随者副本不可用(比如有两个 broker
发生崩溃)。这个 时候,如果生产者继续往首领写入数据,所有消息都会得到确认并被提交(因为此时首领是唯一的同步副本)。现在我们假设首领也不可用了(又一个broker
发生崩溃),这 个时候,如果之前的一个跟随者重新启动,它就成为了分区的唯一不同步副本。
第二种场景:分区有 3 个副本,因为网络问题导致两个跟随者副本复制消息滞后,所以尽管它们还 复制消息,但已经不同步了。首领作为唯一的同步副本继续接收消息。这个时候,如果首领变为不可用,另外两个副本就再也无法变成同步的了。
对于这两种场景,我们要作出一个两难的选择。
①:如果不同步的副本不能被提升为新首领,那么分区在旧首领(最后一个同步副本)恢复 之前是不可用的。有时候这种状态会持续数小时(比如更换内存芯片)。
②:如果不同步的副本可以被提升为新首领,那么在这个副本变为不同步之后写入旧首领的 消息会全部丢失,导致数据不一致。为什么会这样呢?假设在副本 0 和副本 1 不可用时, 偏移量 100~200 的消息被写入副本 2(首领)。现在副本 2 变为不可用的,而副本 0 变 为可用的。副本 0 只包含偏移量 0~100 的消息,不包含偏移量 100~200 的消息。如果我 们允许副本 0 成为新首领,生产者就可以继续写入数据,消费者可以继续读取数据。于是, 新首领就有了偏移量 100~200 的新消息。这样,部分消费者会读取到偏移量 100~200 的 旧消息(从副本2读取的),部分消费者会读取到偏移量 100~200 的新消息(从副本0读取的),还有部分消费者读取的是二者 的混合。这样会导致非常不好的结果,比如生成不准确的报表。
另外,副本 2 可能会重 新变为可用,并成为新首领的跟随者。这个时候,它会把比当前首领旧的消息全部删除, 而这些消息对于所有消费者来说都是不可用的。
简而言之,如果我们允许不同步的副本成为首领,那么就要承担丢失数据和出现数据不一 致的风险。如果不允许它们成为首领,那么就要接受较低的可用性,因为我们必须等待原 先的首领恢复到可用状态。
如果把 unclean.leader.election.enable
设为 true,就是允许不同步的副本成为首领(也 就是“不完全的选举”),那么我们将面临丢失消息的风险。如果把这个参数设为 false
, 就要等待原先的首领重新上线,从而降低了可用性。
使用场景:
**1、我们经常看到一些对数据质量和数据 一致性要求较高的系统会禁用这种不完全的首领选举(把这个参数设为 false
)。银行系统 是这方面最好的例子,大部分银行系统宁愿选择在几分钟甚至几个小时内不处理信用卡支付事务,也不会冒险处理错误的消息。 **
2、不过在对可用性要求较高的系统里,比如实时点击 流分析系统,一般会启用不完全的首领选举。
3.3、最少同步副本(主题和broker
级别)
在主题级别和
broker
级别上,这个参数都叫min.insync.replicas
。当且仅当acks=all
时,这个参数才会生效。
我们知道,尽管为一个主题配置了 3 个副本,还是会出现只有一个同步副本的情况。如果 这个同步副本变为不可用,我们必须在可用性和一致性之间作出选择——这是一个两难的 选择。根据 Kafka
对可靠性保证的定义,消息只有在被写入到所有同步副本之后才被认为 是已提交的。但如果这里的“所有副本”只包含一个同步副本,那么在这个副本变为不可用时,数据就会丢失。
如果要确保已提交的数据被写入不止一个副本,就需要把最少同步副本数量设置为大一点 的值。对于一个包含 3 个副本的主题,如果 min.insync.replicas
被设为 2,那么至少要存在两个同步副本才能向分区写入数据。
如果 3
个副本都是同步的,或者其中一个副本变为不可用,不会有什么问题。不过,如果有两个副本变为不可用,那么 broker
就会停止接受生产者的请求。尝试发送数据的生产者会收到 NotEnoughReplicasException
异常。消费者仍然可以继续读取已有的数据。
实际 上,如果使用这样的配置,那么当只剩下一个同步副本时,它就变成只读了(变成了分区副本首领),这是为了避免在发生不完全选举时数据的写入和读取出现非预期的行为。为了从只读状态中恢复,必须让两个不可用分区中的一个重新变为可用的(比如重启 broker
),并等待它变为同步的。
4、在可靠的系统里使用生产者
即使我们尽可能把
broker
配置得很可靠,但如果没有对生产者进行可靠性方面的配置,整 个系统仍然有可能出现突发性的数据丢失。
请看以下两个例子。
1、为 broker
配置了 3 个副本,并且禁用了不完全首领选举,这样应该可以保证万无一失。 我们把生产者发送消息的 acks
设为 1(只要首领接收到消息就可以认为消息写入成功)。 生产者发送一个消息给首领,首领成功写入,但跟随者副本还没有接收到这个消息。首 领向生产者发送了一个响应,告诉它“消息写入成功”,然后它崩溃了,而此时消息还 没有被其他副本复制过去。另外两个副本此时仍然被认为是同步的(毕竟判定一个副本 不同步需要一小段时间),而且其中的一个副本成了新的首领。因为消息还没有被写入 这个副本,所以就丢失了,但发送消息的客户端却认为消息已成功写入。因为消费者看 不到丢失的消息,所以此时的系统仍然是一致的(因为副本没有收到这个消息,所以消 息不算已提交),但从生产者角度来看,它丢失了一个消息。
2、为 broker
配置了 3 个副本,并且禁用了不完全首领选举。我们接受了之前的教训,把 生产者的 acks
设为 all
。假设现在往 Kafka 发送消息,分区的首领刚好崩溃,新的首领 正在选举当中,Kafka
会向生产者返回“首领不可用”的响应。
在这个时候,如果生产 者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失。 这算不上是 broker
的可靠性问题,因为 broker
并没有收到这个消息。这也不是一致性 问题,因为消费者并没有读到这个消息。问题在于如果生产者没能正确处理这些错误, 弄丢消息的是它们自己。
那么,我们该如何避免这些悲剧性的后果呢?从上面两个例子可以看出,每个使用 Kafka
的开发人员都要注意两件事情。
1、根据可靠性需求配置恰当的 acks
值。
2、在参数配置和代码里正确处理错误。
4.1、发送确认
详细可以看
kafka
生产者那篇文章有详细介绍
4.1.1、acks=0
即无需确认,只要生产者把消息发出去即认为已成功把消息写入kafka
① KafkaProducer
意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka
,不需要等待任何确认收到的消息,没有任何保障可以保证此种情况下server
已经成功接收到数据,同时重试配置也不会发生作用(因为KafkaProducer
并不知道此次发送是否失败)。
② 该情况,当数据已经发送出去,还在半路,此时leader
挂了,但是producer
还是认为消息发送成功了,这个时候就会导致这条消息丢失;
③ 数据可靠性是最低的,传输效率也是最高的,配合发送方式为生成并忘记效率很高
4.1.2、acks=1
(默认)
只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功 响应
① KafkaProducer
把消息发送出去,至少要等待leader
已经成功将数据写入本地log,但是并没有等待所有follower
是否成功写入。该情况下,如果follower
没有成功备份数据,而此时leader
响应成功后,刚好又挂掉了,就会导致消息丢失
② KafkaProducer
把消息发送出去,如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。
4.1.3、acks=all
只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自 服务器的成功响应。这种模式是最安全的 ,不过,它的 延迟比
acks=1
时更高,因为我们要等待不只一个服务器节点接收消息。
4.2、配置生产者的重试参数
生产者需要处理的错误包括两部分: 一部分是生产者可以自动处理的错误,还有一部分是需要开发者手动处理的错误。
如果
broker
返回的错误可以通过重试来解决,那么生产者会自动处理这些错误。生产者向broker
发送消息时,broker
可以返回一个成功响应码或者一个错误响应码。错误响应码可以 分为两种,一种是在重试之后可以解决的,还有一种是无法通过重试解决的。
1、如果
broker
返回的是LEADER_NOT_AVAILABLE
错误,生产者可以尝试重新发送消息。也许在这个时 候一个新的首领被选举出来了,那么这次发送就会成功。也就是说,LEADER_NOT_AVAILABLE
是一个可重试错误。2、另一方面,如果
broker
返回的是INVALID_CONFIG
错误,即使通过重试 也无法改变配置选项,所以这样的重试是没有意义的。这种错误是不可重试错误。
问题1:一般情况下,如果你的目标是不丢失任何消息,那么最好让生产者在遇到可重试错误时 能够保持重试。为什么要这样?
因为像首领选举或网络连接这类问题都可以在几秒钟之 内得到解决,如果让生产者保持重试,你就不需要额外去处理这些问题了。
问题2:为生产者配置多少重试次数比较好?”
这个要看你在生产者放弃重试并抛出异常之 后想做些什么。
1、如果你想直接丢弃消息(不影响业务的场景),多次重试造成的延迟已经失去发送消息的意义;
2、如果你想抓住异常并再多重试几次,那么就可以把重试次数设置得多一 点,让生产者继续重试
3、重试一直失败,此时,如果你想把消息保存到某个地方然后回过头来再继续处理,那就可以停止重试。
问题3:重试可能会导致消息重复,重复了之后怎么办
为什么会重复:生产者因为网络问题没有收到 broker
的确认,但实际上消息已经写入 成功,生产者会认为网络出现了临时故障,就重试发送该消息(因为它不知道消息已经写 入成功)。在这种情况下,broker
会收到两个相同的消息。重试和恰当的错误处理可以保证每个消息“至少被保存一次”
重复了之后怎么办:现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时可以对它们进行清理。还要一些应用程序可以做到消息的“幂等”,也就 是说,即使出现了重复消息,也不会对处理结果的正确性造成负面影响。例如,消息“这个账号里有 110 美元”就是幂等的,因为即使多次发送这样的消息,产生的结果都是一样 的。不过消息“往这个账号里增加 10 美元”就不是幂等的。
4.3、开发者手动处理的额外错误
使用生产者内置的重试机制可以在不造成消息丢失的情况下轻松地处理大部分错误,不过对于开发人员来说,仍然需要处理其他类型的错误,包括:
1、不可重试的 broker
错误,例如消息大小错误、认证错误等;
2、在消息发送之前发生的错误,例如序列化错误;
3、 在生产者达到重试次数上限时或者在消息占用的内存达到上限时发生的错误。
5、在可靠的系统里使用消费者
只有那些被提交到
Kafka
的数据,也就是那些已经被写入所有同步副本的数据,对消费者是可用的,这意味着消费者得到的消息已经具备了一致性。 消费者唯一要做的是跟踪哪些消息是已经读取过的,哪些是还没有读取过的。
这是在读取消息时不丢失消息的关键。在从分区读取数据时,消费者会获取一批事件,检查这批事件里最大的偏移量,然后从这个偏移量开始读取另外一批事件。这样可以保证消费者总能以正确的顺序获取新数据,不 会错过任何事件。
如果一个消费者退出,另一个消费者需要知道从什么地方开始继续处理,它需要知道前一个 消费者在退出前处理的最后一个偏移量是多少。所谓的“另一个”消费者,也可能就是它自己重启之后重新回来工作。这也就是为什么消费者要“提交”它们的偏移量。它们把当前读取的偏移量保存起来,在退出之后,同一个群组里的其他消费者就可以接手它们的工作。
如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。在这种情况下,如果其他消费者接手了工作,那些没有被处理完的消息就 会被忽略,永远得不到处理。这就是为什么我们非常重视偏移量提交的时间点和提交的方式。
5.1、消费者的可靠性配置
为了保证消费者行为的可靠性,需要注意以下 4 个非常重要的配置参数。
5.1.1、 group.id
这个参数在第 4 章已经详细解释过了,如果两个消费者具有相同的
group.id
,并且订阅了同一个主题,那么每个消费者会分到主题分区的一个子集,也就是 说它们只能读到所有消息的一个子集(不过群组会读取主题所有的消息)。如果你希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的
group.id
。
5.1.2、 auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区((比如消费者第 1 次 启动时))或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。
这个参数有3种配置。
一种是 earliest
,如果选择了这种配置,消费者会 从分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数 据,但可以保证最少的数据丢失。
一种是 latest
,如果选择了这种配置,消费者会从分区 的末尾开始读取数据,这样可以减少重复处理消息,但很有可能会错过一些消息。
一种是 none
:启动消费者时,该消费者所消费的主题的分区没有被消费过,就会抛异常
5.1.3、enable.auto.commit
这是一个非常重要的配置参数,你可以让消费者基于任务 调度自动提交偏移量,也可以在代码里手动提交偏移量。自动提交的一个最大好处是,在 实现消费者逻辑时可以少考虑一些问题。
优点:如果你在消费者轮询操作里处理所有的数据,那么自动提交可以保证只提交已经处理过的偏移量(如果忘了消费者轮询是什么,请回顾一 下第 4 章的内容)。
**缺点,1、无法控制重复处理消息:假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理 **
2、消息可能丢失(个人原因):比如消费者在自动提交偏移量之前停止处理消息,再或者如果把消息交给另外一个后台线程去处理,自动提交机制可能会在消息还没有处理完毕就提交偏移量。专业会导致消息丢失
5.1.3、auto.commit.interval.ms
auto.commit.interval.ms
与第 3 个参数有直接的联系。如果选择了自动提交偏移量,可以通过该参数配置提交的频度,默认值是每 5 秒钟提交一次。一般来说,频繁提交会增加额外的开销,但也会降低重复处理消息的概率。
5.2、显式提交偏移量
如果选择了自动提交偏移量,就不需要关心显式提交的问题。不过如果希望能够更多地控 制偏移量提交的时间点,那么就要仔细想想该如何提交偏移量了——要么是为了减少重复 处理消息,要么是因为把消息处理逻辑放在了轮询之外。
3.5.1、确保对提交的偏移量心里有数
在轮询过程中提交偏移量有一个不好的地方,就是提交的偏移量有可能是读取到的最新偏移量,而不是处理过的最新偏移量(消息还没处理就提交)。要记住,在处理完消息后再提交偏移量是非常关键 的——否则会导致消费者错过消息。我们已经在第 4 章给出了示例。
5.2.2、总是在处理完事件后再提交偏移量(自动/手动)
如果所有的处理都是在轮询里完成,并且不需要在轮询之间维护状态(比如为了实现聚合操作),那么可以使用自动提交,或者在轮询结束时进行手动提交。
5.2.3、提交评率是性能和重复消息数量之间的权衡
即使是在最简单的场景里,比如所有的处理都在轮询里完成,并且不需要在轮询之间维护状态,
1、你仍然可以在一个循环里多次提交偏移量(甚至可以在每处理完一个事件之后),
2、或者多个循环里只提交一次(与生产者的
acks=all
配置有点类似),这完全取决于你在性 能和重复处理消息之间作出的权衡。
3.5.4、再均衡
在设计应用程序时要注意处理消费者的再均衡问题。我们在第 4 章举了几个例子,一般要在分区被撤销之前提交偏移量,并在分配到新分区时清理之前的状态。
3.5.5、消费者可能需要重试
有时候,在进行轮询之后,有些消息不会被完全处理,你想稍后再来处理。例如,假设 要把
Kafka
的数据写到数据库里,不过那个时候数据库不可用。如果消息重要,那就不断重试,既然数据库不可用,后面的消息也是一样的问题。
3.5.6、消费者可能需要维护状态
TODOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO
有时候你希望在多个轮询之间维护状态, 例如,你想计算消息的移动平均数,希望在首次 轮询之后计算平均数,然后在后续的轮询中更新这个结果。
如果进程重启,你不仅需要从 上一个偏移量开始处理数据,还要恢复移动平均数。有一种办法是在提交偏移量的同时把 最近计算的平均数写到一个“结果”主题上。消费者线程在重新启动之后,它就可以拿到 最近的平均数并接着计算。不过这并不能完全地解决问题,因为 Kafka 并没有提供事务支 持。消费者有可能在写入平均数之后来不及提交偏移量就崩溃了,或者反过来也一样。这 是一个很复杂的问题,你不应该尝试自己去解决这个问题,建议尝试一下 KafkaStreams 这 个类库,它为聚合、连接、时间窗和其他复杂的分析提供了高级的 DSL API。
3.5.7、长时间处理消息,记得发送心跳
有时候处理数据需要很长时间:你可能会从发生阻塞的外部系统获取信息,或者把数据 写到外部系统,或者进行一个非常复杂的计算。要记住,暂停轮询的时间不能超过几秒 钟。即使不想获取更多的数据,也要保持轮询,这样客户端才能往 broker 发送心跳。
在这种情况下,一种常见的做法是使用一个线程池来处理数据,因为使用多个线程可以进 行并行处理,从而加快处理速度。在把数据移交给线程池去处理之后,你就可以暂停消费者,然后保持轮询,但不获取新数据,直到工作线程处理完成。在工作线程处理完成 之后,可以让消费者继续获取新数据。因为消费者一直保持轮询,心跳会正常发送,就 不会发生再均衡。
3.5.8、仅一次传递(消息幂等/业务幂等)
有些应用程序不仅仅需要“至少一次”(at-least-once)语义(意味着没有数据丢失),还 需要“仅一次”(exactly-once)语义。尽管
Kafka
现在还不能完全支持仅一次语义,消费者还是有一些办法可以保证Kafka
里的每个消息只被写到外部系统一次(但不会处理向Kafka
写入数据时可能出现的重复数据)。
实现仅一次处理最简单且最常用的办法是把结果写到一个支持唯一键的系统里,比如键值 存储引擎、关系型数据库、·ElasticSearch
或其他数据存储引擎。
在这种情况下,要么消息 本身包含一个唯一键(通常都是这样),要么使用主题、分区和偏移量的组合来创建唯一 键——它们的组合可以唯一标识一个 Kafka
记录。(记得设置一个超时时间,比如在15秒内保证幂等)
如果你把消息和一个唯一键写入系统, 然后碰巧又读到一个相同的消息,只要把原先的键值覆盖掉即可。数据存储引擎会覆盖已 经存在的键值对,就像没有出现过重复数据一样。这个模式被叫作幂等性写入,它是一种 很常见也很有用的模式。