Kafka第一次
前言
Github:https://github.com/HealerJean
1、kafka
简介
1.1、基本概念
1.1.1、消息和批次
**消息:
Kafka
的数据单元被称为消息。如果你在使用Kafka
之前已经有数据库使用经验,那么可 以把消息看成是数据库里的一个“数据行”或一条“记录”。 **当消息以一种可控的方式写入不同的分区时,会用到键。最简单的例子就是为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区。这样可以保证具有相同键的消息总是被写到相同的分区上。
批次:为了提高效率,消息被分批次写入
Kafka
。批次就是一组消息,这些消息属于同一个主题和分区。如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传 输可以减少网络开销**。不过,这要在时间延迟和吞吐量之间作出权衡: 批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升 数据的传输和存储能力,但要做更多的计算处理。
1.1.2、主题topic
和分区Partition
Kafka
的消息通过主题进行分类。主题就好比数据库的表,或者文件系统里的文件夹。主题可以被分为若干个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先入先出的顺序读取。
要注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。
下图主题有 4 个分区,消息被追加写入每个分区的尾部。
Kafka
通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比 单个服务器更强大的性能。
1.1.2.1、Topic
Topic
:发布到Kafka
的每条消息都有一个类别, 这个类别就被称为Topic
物理上: 不同
Topic
的消息分开存储;逻辑上,虽然一个
Topic
的消息被保存在一个或多个Broker
上 , 但用户只需指定消息的Topic
即可生产或消费数据 , 而不必关心数据存于何处
1.1.2.1、 Partition
@:
Partition
是Kafka
中比较特色的部分,一个Topic
可以分为多个Partition
,每个Partition
是一个有序的队列,Partition
中的每条消息都存在一个有序的偏移量(Offest
) ,同一个Consumer Group
中,只有一个Consumer
实例可消费某个Partition
的消息。@:
partion
可以看作一个有序的队列,里面的数据是储存在硬盘中的,追加式的。partition
的作用就是提供分布式的扩展,一个topic
可以有许多partions
,多个partition
可以并行处理数据,所以可以处理相当量的数据。
1.1.3、生产者和消费者
Kafka
的客户端就是Kafka
系统的用户,它们被分为两种基本类型:生产者和消费者
1.1.3.1、Producer
Producer
:消息和数据的生产者,可 以理解为向Kafka
发消息的客户端。生产者创建消息。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。
不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。
1.1.3.2、Consumer
Consumer
:消息和数据的消费者 ,可以理解为从Kafka
取消息的 客户端。消费者读取消息。订阅一个或多个主题,并按照消息生成的顺序读取它们(指的是同一个分区)。消费者通过检查消息的偏移量来区分已经读取过的消息。
消息的偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,
Kafka
会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper
或Kafka
上,如果消费者关闭或重启,它的读取状态不会丢失。
消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。群组保证每个分区只能被一个消费者使用。
下图所示的群组中,有 3 个消费者同时读取一 个主题。其中的两个消费者各自读取一个分区,另外一个消费者读取其他两个分区。消费 者与分区之间的映射通常被称为消费者对分区的所有权关系。
通过这种方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,群组里的其他消费者可以接管失效消费者的工作。
1.1.3.3、Consumer Group
Consumer Group (消费者组〉
:每个消费者都属于一个特定的消费者组(可为每个消费者指定组名, 若不指定组名,则属于默认的组) 这是Kafka
用来实现一个Topic
消息 的广播(发送给所有的消费者〉 和单播(发送给任意一个消费者) 的手段。一个
Topic
可以有多个消费者组。Topic
的消息会被复制(不是真的复制, 是概念上的)到所有的 消费者组中,但每个消费者组只会把消息发送给该组中的 一个消费者。如果要实现广 播, 只要每个消费者都有一个独立的消费者组就可以了;
如果要实现单播,只要所有的消费者都在同 一 个消费者组中就行。
使用消费者组还可以对消费者进行自由分组, 而不需要多次发送消息到不同的 Topic。
1.1.4、Broker
和集群
@:每个
Broker
即一个Kafka
服务实例,多个Broker
构成一个Kafka
集群,生产者发布的消息将保存在Broker
中,消费者将从Broker
中拉取消息进行消费。@:
Broker
集群中,会有一个leader
(controller leader
),负责管理整个集群中分区和副本的状态和选举partition leader
一个独立的 Kafka
服务器被称为 broker
。broker
接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker
为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker
可以轻松处理数千个分区以及每秒百万级的消息量。
broker
是集群的组成部分。每个集群都有一个 broker
同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker
和监控 broker
,以及分区领导的选举。
在集群中,一个分区从属于一个 broker
,该 broker
被称为分区的首领leader
。一个分区可以分配给多个 broker
,这个时候会发生分区复制(见下图)。这种复制机制为分区提供 了消息冗余,如果有一个 broker
失效,其他 broker
可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。
Kafka
的副本机制跟其他分布式系统不太一样,只有partition
的leader
才会进行读写操作,folower
仅进行复制,客户端是感知不到的,追随者副本是不对外提供服务的。也就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求必须有领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker ,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写到自己的提交日志中,从而实现与领导者副本的同步
1.1.4,1、保留消息
保留消息(在一定期限内)是
Kafka
的一个重要特性。Kafka
broker
默认的消息保留策略 是这样的:要么保留一段时间(比如 7 天),要么保留到消息达到一定大小的字节数(比 如1GB
)。当消息数量达到这些上限时,旧消息就会过期并被删除,所以在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。
主题可以配置自己的保留策略,可以将消息保留到不再使用它们为止。
例如,用于跟踪用户活动的数据可能需要保留几天,而应 用程序的度量指标可能只需要保留几个小时。 也可以通过配置把主题当作紧凑型日志,只有最后一个带有特定键的消息会被保留下来。这种情况对于变更日志类型的数据来说比较适 用,因为人们只关心最后时刻发生的那个变更。
1.2、典型的kafka
集群
一个典型的 Kafka
集群中包含
1、若干生产者:可以是前端浏览器发起的页面访问或服务器日志等、
2、若干 Broker
:Kafka
支持水平扩展, 一般 Broker
数量越多集群吞吐率越大 、
3、若干消费者组
4、以及一个 ZooKeeper
集群。Kafka
通过 ZooKeeper
管理集群配置、选举 leader
, 以及当消费者组发生变化时进行 Rebalance
(再均衡)。生产者使用推模式将消息发布到 Broker
, 消费者使用拉模式从 Broker
订阅并消费消息。
2、broke
的配置
2.1、常规配置
2.1.1、broker.id
每个
broker
都需要有一个标识符,使用broker.id
来表示。它的默认值是 0,也可以被设置 成其他任意整数。这个值在整个Kafka
集群里必须是唯一的。这个值可以任意选定,如果出于维护的需要,建议把它们设置成与机器名具有相关性的整数,这样在进行维护时,将 ID 号映射到机器名就没那么麻烦了。例如,如果机器名包含唯一性的数字(比如 host1.example.com、host2.example.com),那么用这些数字 来设置
broker.id
就再好不过了。
2.1.2、port
如果使用配置样本来启动 Kafka,它会监听 9092 端口。修改 port 配置参数可以把它设置 成其他任意可用的端口。
2.1.3、zookeeper.connect
zookeeper.connect=localhost:2181
用于保存
broker
元 数 据 的Zookeeper
地 址 是 通 过zookeeper.connect
来 指 定 的。localhost:2181
表示这个Zookeeper
是运行在本地的 2181 端口上。该配置参数是用冒号分 隔的一组hostname:port/path
列表,每一部分的含义如下:@:
hostname
是Zookeeper
服务器的机器名或 IP 地址;@:
port
是Zookeeper
的客户端连接端口;@:
/path
是可选的Zookeeper
路径,作为Kafka
集群的chroot
环境。如果不指定,默认使用根路径。如果指定的chroot
路径不存在,broker
会在启动的时候创建它。最好是 在配置文件里指定一组
Zookeeper
服务器,用分号把它们隔开。一旦有一个Zookeeper
服务器宕机,broker
可以连接到Zookeeper
群组的另一个节点上。
为什么使用 chroot 路径?
Zookeeper
群组可以共享给其他应用程序,默认Kafka
会使用ZooKeeper
默认的/路径,这样有关Kafka
的ZooKeeper
配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper
集群,查看ZooKeeper
中数据可能会不直观,所以强烈建议指定一个chroot
路径,直接在zookeeper.connect
配置项中指定:
zookeeper.connect=h1:2181,h2:2181,h3:2181/brokers
2.1.4、log.dirs
log.dirs=/tmp/kafka-logs
Kafka
把所有消息都保存在磁盘上,存放这些日志片段的目录是通过log.dirs
指定的。它是 一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么broker
会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,
broker
会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
2.1.5、 num.recovery.threads.per.data.dir
num.recovery.threads.per.data.dir=1
对于如下 3 种情况,
Kafka
会使用可配置的线程池来处理日志片段:
@: 服务器正常启动,用于打开每个分区的日志片段;
@: 服务器崩溃后重启,用于检查和截短每个分区的日志片段;
@: 服务器正常关闭,用于关闭日志片段。
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。
设置 此参数时需要注意,所配置的数字对应的是 log.dirs
指定的单个日志目录。也就是说,
如 果 num.recovery.threads.per.data.dir
被设为 8,并且 log.dir
指定了 3 个路径,那么总 共需要 24 个线程。
2.1.6、auto.create.topics.enable
默认情况下,Kafka
会在如下几种情形下自动创建主题:
@: 当一个生产者开始往主题写入消息时;
@: 当一个消费者开始从主题读取消息时;
@: 当任意一个客户端向主题发送元数据请求时。
很多时候,这些行为都是非预期的,如果一个主题不先被创建, 根本无法知道它是否已经存在,一般建议设置为为flase。
2.2、主题的默认配置
2.2.1、num.partitions
num.partitions
参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(auto.create.topics.enable
该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于
num.partitions
指定的值,需要手动创建该主题
2.2.1.1、为什么不支持减少分区?
Kafka
分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka
单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂。
2.2.1.2、为什么分区不能太多?
1、客户端/服务器端需要使用的内存就越多
Kafka0.8.2
之后,在客户端producer
有个参数batch.size
(批次大小),默认是16KB
。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。 假设你有10000
个分区,按照默认设置,这部分缓存需要占用约157MB的内存。
而consumer
端呢?我们抛开获取数据所需的内存不说,只说线程的开销。如果还是假设有10000
个分区,同时consumer
线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client
就要创建10000
个线程,也需要创建大约10000
个Socket
去获取分区数据。这里面的线程切换的开销本身已经不容小觑了。
服务器端的开销也不小,如果阅读Kafka
源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller
,FetcherManage
r等,因此分区数越多,这种缓存的成本就越大。
2、降低高可用性
Kafka
通过副本(replica)
机制来保证高可用。具体做法就是为每个分区保存若干个副本。每个副本保存在不同的broker
上。期中的一个副本充当leader
副本,负责处理producer
和consumer
请求。其他副本充当follower
角色
由Kafka controller
负责保证与leader
的同步。如果leader
所在的broker
挂掉了,contorller
会检测到然后在zookeeper
的帮助下重选出新的leader
——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10
个broker
,也就是说平均每个broker
上有1000
个分区。此时这个broker
挂掉了,那么zookeeper
和controller
需要立即对这1000
个分区进行leader
选举。比起很少的分区leader
选举而言,这必然要花更长的时间,并且通常不是线性累加的。如果这个broker
还同时是controller
情况就更糟了。
3、文件句柄的开销
每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每个broker都保存这两个文件句柄(file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
2.2.1.3、如何选定分区数量?
为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要 考虑如下几个因素。
@:主题需要达到多大的吞吐量?例如,是希望每秒钟写入100KB还是1GB?
@:从单个分区读取数据的最大吞吐量是多少?每个分区一般都会有一个消费者,如果你知道消费者将数据写入数据库的速度不会超过每秒 50MB,那么你也该知道,从一个分区读取数据的吞吐量不需要超过每秒 50MB。
@:可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多,所以最好为生产者多估算一些吞吐量。
很显然,综合考虑以上几个因素,你需要很多分区,但不能太多。
比如:首先如果估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算出分区的个数。 也就是说, 如果每秒钟要从主题上写入和读取 1GB 的数据,并且每个消费者每秒钟可以处理 50MB 的数据,那么至少需要 20 个分区。这样就可以让 20 个消费者同时读取这些分区,从而达到每秒钟 1GB 的吞吐量。
2.2.1.4、Consumer
个数与分区数有什么关系
topic
下的一个分区只能被同一个consumer group
下的一个consumer
线程来消费。如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区,少于N的配置则是分区设置不合理。
2.2.2、 log.retention.ms
Kafka
通常根据时间来决定数据可以被保留多久。默认使用log.retention.hours
参数来配 置时间,默认值为168
小时,也就是一周。除此以外,还有其他两个参数log.retention. minutes
和log.retention.ms
。这 3 个参数的作用是一样的,都是决定消息多久以后会被删 除,不过还是推荐使用log.retention.ms
。如果指定了不止一个参数,Kafka 会优先使用具有最小值的那个参数
根据时间保留数据和最后修改时间:
根据时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现 的。一般来说,最后修改时间指的就是日志片段的关闭时间,也就是文件里 最后一个消息的时间戳。不过,如果使用管理工具在服务器间移动分区,最后修改时间就不准确了。时间误差可能导致这些分区过多地保留数据。在第 9 章讨论分区移动时会提到更多这方面的内容。
2.2.3、 log.retention.bytes
另一种方式是通过保留的消息字节数来判断消息是否过期。它的值通过参数
log. retention.bytes
来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主 题,并且
log.retention.bytes
被设为 1GB,那么这个主题最多可以保留 8GB 的数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。
2.2.4、log.segment.bytes
以上的设置都作用在日志片段上,而不是作用在单个消息上。当消息到达
broker
时,它们被追加到分区的当前日志片段上。当日志片段大小达到log.segment.bytes
指定的上限 (默认是 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。 如果主题的消息量不大,那么如何调整这个参数的大小就变得尤为重要。
例如,如果一个主题每天只接收 100MB 的消息,而
log.segment.bytes
使用默认设置,那么需要 10 天时间才能填满一个日志片段。因为在日志片段被关闭之前消息是不会过期的,所以如果log. retention.ms
被设为 604 800 000(也就是 1 周),那么日志片段最多需要 17 天才会过期。 这是因为关闭日志片段需要 10 天的时间,而根据配置的过期时间,还需要再保留7天时间(要等到日志片段里的最后一个消息过期才能被删除)。
使用时间戳获取偏移量
日志片段的大小会影响使用时间戳获取偏移量。在使用时间戳获取日志偏移量时,Kafka 会检查分区里最后修改时间大于指定时间戳的日志片段(已经被关闭的),该日志片段的前一个文件的最后修改时间小于指定时间戳。然后,Kafka 返回该日志片段(也就是文件名)开头的偏移量。对于使用时间戳获取偏移量的操作来说,日志片段越小,结果越准确。
2.2.5、log.segment.ms
另一个可以控制日志片段关闭时间的参数是 log.segment.ms
,它指定了多长时间之后日志片段会被关闭。就像 log.retention.bytes
和 log.retention.ms
这两个参数一样,log. segment.bytes
和 log.retention.ms
这两个参数之间也不存在互斥问题。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。默认情况下,log.segment.ms
没 有设定值,所以只根据大小来关闭日志片段。
基于时间的日志片段对磁盘性能的影响
在使用基于时间的日志片段时,要着重考虑并行关闭多个日志片段对磁盘性能的影响。如果多个分区的日志片段永远不能达到大小的上限,就会发生这 种情况,因为 broker 在启动之后就开始计算日志片段的过期时间,对于那些数据量小的分区来说,日志片段的关闭操作总是同时发生。
2.2.6、message.max.bytes
broker
通过设置message.max.bytes
参数来限制单个消息的大小,默认值是1 000 000
,也 就是 1MB。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到broker
返回的错误信息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于message.max.bytes
指定的值,消息的实际大小 可以远大于这个值。这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多 的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。
问题:
1、kafka
主题消息为什么不能保证消息的有序性
由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序
2、分区的好处
1、一个
topic
主题有多个分区,分区可以在不同的机器上,Kafka
通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比 单个服务器更强大的性能。2、多个
partition
分区可以并行处理数据,所以可以处理相当量的数据3、可以通过消息键值和分区器实现消息的有序,这样可以指定消息再同一个分区中,将来被同一个消息消费
3、为什么选择kafka
1、多个生产者
2、多个消费者:
Kafka
支持多个消费者从一个单独的消息流上读取数据,而且消费者之间互不影响。这与其他队列系统不同,其他队列系统的消息一旦被一个客户端读 取,其他客户端就无法再读取它。另外,多个消费者可以组成一个群组,它们共享一个消息流,但是能够保证整个群组对每个给定的消息只处理一次3、基于磁盘存储,持久化:
kafka
允许消费者非实时地读取消息,这要归功于Kafk
a 的数据 保留特性。消息被提交到磁盘,根据设置的保留规则进行保存。每个主题可以设置单独的保留规则,以便满足不同消费者的需求,各个主题可以保留不同数量的消息。消费者可能 会因为处理速度慢或突发的流量高峰导致无法及时读取消息,而持久化数据可以保证数据不会丢失。消费者可以在进行应用程序维护时离线一小段时间,而无需担心消息丢失或堵 塞在生产者端。消费者可以被关闭,但消息会继续保留在 Kafka 里。消费者可以从上次中 断的地方继续处理消息。4、伸缩性:
broker
集群扩展十分方便
疑问:
1、zookeeper
的作用是什么
Kafka
使用Zookeeper
保存集群的元数据信息和消费者信息