前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、kafka简介

1.1、基本概念

1.1.1、消息和批次

**消息:Kafka 的数据单元被称为消息。如果你在使用 Kafka 之前已经有数据库使用经验,那么可 以把消息看成是数据库里的一个“数据行”或一条“记录”。 **

当消息以一种可控的方式写入不同的分区时,会用到键。最简单的例子就是为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区。这样可以保证具有相同键的消息总是被写到相同的分区上。

批次:为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传 输可以减少网络开销**。

不过,这要在时间延迟和吞吐量之间作出权衡: 批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升 数据的传输和存储能力,但要做更多的计算处理。

1.1.2、主题topic和分区Partition

Kafka 的消息通过主题进行分类。主题就好比数据库的表,或者文件系统里的文件夹。

主题可以被分为若干个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先入先出的顺序读取。

要注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序

下图主题有 4 个分区,消息被追加写入每个分区的尾部。Kafka 通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比 单个服务器更强大的性能。

image-20210218115810723

1.1.2.1、Topic

Topic:发布到Kafka 的每条消息都有一个类别, 这个类别就被称为 Topic

物理上: 不同 Topic 的消息分开存储;

逻辑上,虽然一个 Topic 的消息被保存在一个或多个 Broker上 , 但用户只需指定消息的 Topic 即可生产或消费数据 , 而不必关心数据存于何处

1.1.2.1、 Partition

@:PartitionKafka 中比较特色的部分,一个 Topic 可以分为多个 Partition,每个 Partition 是一个有序的队列,Partition 中的每条消息都存在一个有序的偏移量(Offest) ,同一个 Consumer Group 中,只有一个 Consumer 实例可消费某个 Partition 的消息。

@:partion 可以看作一个有序的队列,里面的数据是储存在硬盘中的,追加式的partition的作用就是提供分布式的扩展,一个topic可以有许多partions,多个partition可以并行处理数据,所以可以处理相当量的数据。

1.1.3、生产者和消费者

Kafka 的客户端就是 Kafka 系统的用户,它们被分为两种基本类型:生产者和消费者

1.1.3.1、Producer

Producer:消息和数据的生产者,可 以理解为向 Kafka发消息的客户端。

生产者创建消息。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区

不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区

1.1.3.2、Consumer

Consumer:消息和数据的消费者 ,可以理解为从Kafka 取消息的 客户端。

消费者读取消息。订阅一个或多个主题,并按照消息生成的顺序读取它们(指的是同一个分区)消费者通过检查消息的偏移量来区分已经读取过的消息

消息的偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在 Zookeeper Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。

消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。群组保证每个分区只能被一个消费者使用

下图所示的群组中,有 3 个消费者同时读取一 个主题。其中的两个消费者各自读取一个分区,另外一个消费者读取其他两个分区。消费 者与分区之间的映射通常被称为消费者对分区的所有权关系

通过这种方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,群组里的其他消费者可以接管失效消费者的工作。

image-20210218141416131

1.1.3.3、Consumer Group

Consumer Group (消费者组〉:每个消费者都属于一个特定的消费者组(可为每个消费者指定组名, 若不指定组名,则属于默认的组) 这是 Kafka 用来实现一个 Topic 消息 的广播(发送给所有的消费者〉 和单播(发送给任意一个消费者) 的手段。

一个 Topic 可以有多个消费者组。 Topic 的消息会被复制(不是真的复制, 是概念上的)到所有的 消费者组中,但每个消费者组只会把消息发送给该组中的 一个消费者。

如果要实现广 播, 只要每个消费者都有一个独立的消费者组就可以了;

如果要实现单播,只要所有的消费者都在同 一 个消费者组中就行。

使用消费者组还可以对消费者进行自由分组, 而不需要多次发送消息到不同的 Topic。

1.1.4、Broker和集群

@:每个 Broker 即一个 Kafka 服务实例,多个 Broker 构成一个 Kafka 集群,生产者发布的消息将保存在 Broker 中,消费者将从 Broker 中拉取消息进行消费。

@:Broker集群中,会有一个leadercontroller leader),负责管理整个集群中分区和副本的状态和选举partition leader

一个独立的 Kafka 服务器被称为 brokerbroker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。

broker 是集群的组成部分。每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker 和监控 broker,以及分区领导的选举。

在集群中,一个分区从属于一个 broker,该 broker 被称为分区的首领leader。一个分区可以分配给多个 broker,这个时候会发生分区复制(见下图)。这种复制机制为分区提供 了消息冗余,如果有一个 broker 失效,其他 broker 可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。

只有partitionleader才会进行读写操作,folower仅进行复制,客户端是感知不到的。

image-20210218144004099

1.1.4,1、保留消息

保留消息(在一定期限内)是 Kafka 的一个重要特性。Kafka broker 默认的消息保留策略 是这样的:要么保留一段时间(比如 7 天),要么保留到消息达到一定大小的字节数(比 如 1GB)。当消息数量达到这些上限时,旧消息就会过期并被删除,所以在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。

主题可以配置自己的保留策略,可以将消息保留到不再使用它们为止

例如,用于跟踪用户活动的数据可能需要保留几天,而应 用程序的度量指标可能只需要保留几个小时。 也可以通过配置把主题当作紧凑型日志,只有最后一个带有特定键的消息会被保留下来。这种情况对于变更日志类型的数据来说比较适 用,因为人们只关心最后时刻发生的那个变更

1.2、典型的kafka集群

一个典型的 Kafka 集群中包含

1、若干生产者:可以是前端浏览器发起的页面访问或服务器日志等、

2、若干 Broker Kafka支持水平扩展, 一般 Broker数量越多集群吞吐率越大 、

3、若干消费者组

4、以及一个 ZooKeeper集群。Kafka通过 ZooKeeper管理集群配置、选举 leader, 以及当消费者组发生变化时进行 Rebalance (再均衡)。生产者使用推模式将消息发布到 Broker, 消费者使用拉模式从 Broker订阅并消费消息。

image-20210131171058822

2、broke的配置

2.1、常规配置

2.1.1、broker.id

每个 broker 都需要有一个标识符,使用 broker.id 来表示。它的默认值是 0,也可以被设置 成其他任意整数。这个值在整个 Kafka 集群里必须是唯一的。

这个值可以任意选定,如果出于维护的需要,建议把它们设置成与机器名具有相关性的整数,这样在进行维护时,将 ID 号映射到机器名就没那么麻烦了。例如,如果机器名包含唯一性的数字(比如 host1.example.com、host2.example.com),那么用这些数字 来设置 broker.id 就再好不过了。

2.1.2、port

如果使用配置样本来启动 Kafka,它会监听 9092 端口。修改 port 配置参数可以把它设置 成其他任意可用的端口。

2.1.3、zookeeper.connect

zookeeper.connect=localhost:2181

用于保存 broker 元 数 据 的 Zookeeper 地 址 是 通 过 zookeeper.connect 来 指 定 的。 localhost:2181 表示这个 Zookeeper 是运行在本地的 2181 端口上。该配置参数是用冒号分 隔的一组 hostname:port/path 列表,每一部分的含义如下:

@:hostnameZookeeper 服务器的机器名或 IP 地址;

@:port Zookeeper 的客户端连接端口;

@:/path 是可选的 Zookeeper 路径,作为 Kafka 集群的 chroot 环境。如果不指定,默认使用根路径。如果指定的 chroot 路径不存在,broker 会在启动的时候创建它。

最好是 在配置文件里指定一组 Zookeeper 服务器,用分号把它们隔开。一旦有一个 Zookeeper 服务器宕机,broker 可以连接到 Zookeeper 群组的另一个节点上。

为什么使用 chroot 路径?

Zookeeper 群组可以共享给其他应用程序,默认Kafka会使用ZooKeeper默认的/路径,这样有关KafkaZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:

zookeeper.connect=h1:2181,h2:2181,h3:2181/brokers

2.1.4、log.dirs

log.dirs=/tmp/kafka-logs

Kafka 把所有消息都保存在磁盘上,存放这些日志片段的目录是通过 log.dirs 指定的。它是 一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下

要注意,broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。

2.1.5、 num.recovery.threads.per.data.dir

num.recovery.threads.per.data.dir=1

对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段:

@: 服务器正常启动,用于打开每个分区的日志片段;

@: 服务器崩溃后重启,用于检查和截短每个分区的日志片段;

@: 服务器正常关闭,用于关闭日志片段。

默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。

设置 此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,

如 果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总 共需要 24 个线程。

2.1.6、auto.create.topics.enable

默认情况下,Kafka 会在如下几种情形下自动创建主题:

@: 当一个生产者开始往主题写入消息时;

@: 当一个消费者开始从主题读取消息时;

@: 当任意一个客户端向主题发送元数据请求时。

很多时候,这些行为都是非预期的,如果一个主题不先被创建, 根本无法知道它是否已经存在,一般建议设置为为flase。

2.2、主题的默认配置

2.2.1、num.partitions

num.partitions 参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(auto.create.topics.enable该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。

要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于 num.partitions 指定的值,需要手动创建该主题

2.2.1.1、为什么不支持减少分区?

Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂。

2.2.1.2、为什么分区不能太多?

1、客户端/服务器端需要使用的内存就越多

Kafka0.8.2之后,在客户端producer有个参数batch.size(批次大小),默认是16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。 假设你有10000个分区,按照默认设置,这部分缓存需要占用约157MB的内存。

consumer端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,也需要创建大约10000Socket去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。

服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controllerFetcherManager等,因此分区数越多,这种缓存的成本就越大。

2、降低高可用性

Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理producerconsumer请求。其他副本充当follower角色

Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeepercontroller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker还同时是controller情况就更糟了。

3、文件句柄的开销

每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。

2.2.1.3、如何选定分区数量?

为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要 考虑如下几个因素。

@:主题需要达到多大的吞吐量?例如,是希望每秒钟写入100KB还是1GB?

@:从单个分区读取数据的最大吞吐量是多少?每个分区一般都会有一个消费者,如果你知道消费者将数据写入数据库的速度不会超过每秒 50MB,那么你也该知道,从一个分区读取数据的吞吐量不需要超过每秒 50MB。

@:可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多,所以最好为生产者多估算一些吞吐量。

很显然,综合考虑以上几个因素,你需要很多分区,但不能太多。

比如:首先如果估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算出分区的个数。 也就是说, 如果每秒钟要从主题上写入和读取 1GB 的数据,并且每个消费者每秒钟可以处理 50MB 的数据,那么至少需要 20 个分区。这样就可以让 20 个消费者同时读取这些分区,从而达到每秒钟 1GB 的吞吐量。

2.2.1.4、Consumer个数与分区数有什么关系

topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费。如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区,少于N的配置则是分区设置不合理。

2.2.2、 log.retention.ms

Kafka 通常根据时间来决定数据可以被保留多久。默认使用 log.retention.hours 参数来配 置时间,默认值为 168 小时,也就是一周。除此以外,还有其他两个参数 log.retention. minutes log.retention.ms。这 3 个参数的作用是一样的,都是决定消息多久以后会被删 除,不过还是推荐使用 log.retention.ms。如果指定了不止一个参数,Kafka 会优先使用具有最小值的那个参数

根据时间保留数据和最后修改时间:

根据时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现 的。一般来说,最后修改时间指的就是日志片段的关闭时间,也就是文件里 最后一个消息的时间戳。不过,如果使用管理工具在服务器间移动分区,最后修改时间就不准确了。时间误差可能导致这些分区过多地保留数据。在第 9 章讨论分区移动时会提到更多这方面的内容。

2.2.3、 log.retention.bytes

另一种方式是通过保留的消息字节数来判断消息是否过期。它的值通过参数 log. retention.bytes 来指定,作用在每一个分区上。

也就是说,如果有一个包含 8 个分区的主 题,并且 log.retention.bytes 被设为 1GB,那么这个主题最多可以保留 8GB 的数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。

2.2.4、log.segment.bytes

以上的设置都作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上。当日志片段大小达到 log.segment.bytes 指定的上限 (默认是 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。

如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。 如果主题的消息量不大,那么如何调整这个参数的大小就变得尤为重要。

例如,如果一个主题每天只接收 100MB 的消息,而 log.segment.bytes 使用默认设置,那么需要 10 天时间才能填满一个日志片段。因为在日志片段被关闭之前消息是不会过期的,所以如果 log. retention.ms 被设为 604 800 000(也就是 1 周),那么日志片段最多需要 17 天才会过期。 这是因为关闭日志片段需要 10 天的时间,而根据配置的过期时间,还需要再保留7天时间(要等到日志片段里的最后一个消息过期才能被删除)。

使用时间戳获取偏移量

日志片段的大小会影响使用时间戳获取偏移量。在使用时间戳获取日志偏移量时,Kafka 会检查分区里最后修改时间大于指定时间戳的日志片段(已经被关闭的),该日志片段的前一个文件的最后修改时间小于指定时间戳。然后,Kafka 返回该日志片段(也就是文件名)开头的偏移量。对于使用时间戳获取偏移量的操作来说,日志片段越小,结果越准确。

2.2.5、log.segment.ms

另一个可以控制日志片段关闭时间的参数是 log.segment.ms,它指定了多长时间之后日志片段会被关闭。就像 log.retention.byteslog.retention.ms 这两个参数一样,log. segment.byteslog.retention.ms 这两个参数之间也不存在互斥问题。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。默认情况下,log.segment.ms 没 有设定值,所以只根据大小来关闭日志片段。

基于时间的日志片段对磁盘性能的影响

在使用基于时间的日志片段时,要着重考虑并行关闭多个日志片段对磁盘性能的影响。如果多个分区的日志片段永远不能达到大小的上限,就会发生这 种情况,因为 broker 在启动之后就开始计算日志片段的过期时间,对于那些数据量小的分区来说,日志片段的关闭操作总是同时发生。

2.2.6、message.max.bytes

broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认值是 1 000 000,也 就是 1MB。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误信息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 message.max.bytes 指定的值,消息的实际大小 可以远大于这个值。

这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多 的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。

问题:

1、kafka主题消息为什么不能保证消息的有序性

由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序

2、分区的好处

1、一个topic主题有多个分区,分区可以在不同的机器上,Kafka 通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比 单个服务器更强大的性能。

2、多个 partition分区可以并行处理数据,所以可以处理相当量的数据

3、可以通过消息键值和分区器实现消息的有序,这样可以指定消息再同一个分区中,将来被同一个消息消费

3、为什么选择kafka

1、多个生产者

2、多个消费者:Kafka 支持多个消费者从一个单独的消息流上读取数据,而且消费者之间互不影响。这与其他队列系统不同,其他队列系统的消息一旦被一个客户端读 取,其他客户端就无法再读取它。另外,多个消费者可以组成一个群组,它们共享一个消息流,但是能够保证整个群组对每个给定的消息只处理一次

3、基于磁盘存储,持久化:kafka允许消费者非实时地读取消息,这要归功于 Kafka 的数据 保留特性。消息被提交到磁盘,根据设置的保留规则进行保存。每个主题可以设置单独的保留规则,以便满足不同消费者的需求,各个主题可以保留不同数量的消息。消费者可能 会因为处理速度慢或突发的流量高峰导致无法及时读取消息,而持久化数据可以保证数据不会丢失。消费者可以在进行应用程序维护时离线一小段时间,而无需担心消息丢失或堵 塞在生产者端。消费者可以被关闭,但消息会继续保留在 Kafka 里。消费者可以从上次中 断的地方继续处理消息。

4、伸缩性:broker集群扩展十分方便

疑问:

1、下面这句话是什么意思

使用消费者组还可以对消费者进行自由分组, 而不需要多次发送消息到不同的 Topic。

2、zookeeper 的作用是什么

Kafka 使用 Zookeeper 保存集群的元数据信息和消费者信息

image-20210218114111853

ContactAuthor