一、基本概念
1. 发布与订阅消息系统
数据(消息)的发送者(发布者)不会直接把消息发送给接收者。发布者以某种方式对消息进行分类,接收者(订阅者)订阅它们,以便接收特定类型的消息。发布与订阅系统一般会有一个broker,即发布消息的中心点。
2. 消息和批次
Kafka的数据单元被称为消息,由字节数组组成,消息有一个可选的元数据,也就是键。当需要控制消息写入特定的分区时,可以指定消息的键,最简单的例子是为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区,这样可以保证相同键的消息总是被写到相同的分区上。
为了提高效率,消息被分批次写入到kafka。批次就是一组消息,这些消息同属于一个主题和分区。批次消息可以减少网络开销,也可以被压缩。
3. 主题和分区
kafka的消息通过主题进行分类。主题可以被分为若干个分区,一个分区就是一个提交日志(Commit Log)。消息以追加的方式写入分区,然后以先入先出(FIFO)的顺序读取。由于一个主题一般包含多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。
Kafka通过分区来实现数据冗余和伸缩性。分区可以分布在不同的物理服务器上。
4. 生产者和消费者
生产者创建消息。一个消息会被发布到一个指定的主题上。生产者在默认情况下(不指定消息的键)吧消息均衡地发布到主题的所有分区上。也可以通过消息键和分区器来实现将消息直接写到指定的分区,分区器为键生成一个散列值,并将其映射到指定的分区上。
消费者读取消息。消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读过的消息。
偏移量是消息的元数据,是一个不断递增的整数值,在创建消息时,kafka会把它添加到消息里。在同一个分区里,每个消息的偏移量都是唯一的。消费者把每个分区的消息偏移量保存在Zookeeper或kafka上。(节点路径:/consumers/{group_id}/offsets/{topic}/{broker_id}-{partition_id}
)
消费者是消费者群组的一部分,若干个消费者共同读取一个主题。消费者组保证每个分区只能被一个消费者使用。如果一个消费者失效,群组里的其他消费者可以接管失效消费者的工作。消费者组里的消费者平均地读取固定的分区,多于分区数量的消费者将会被闲置。
5. broker和集群
一个独立的Kafka服务器被称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
broker是集群的组成部分。每个集群都有一个broker同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给broker和监控broker。
在集群中,一个分区从属与一个broker,该broker被称为分区的首领。一个分区可以分配给多个broker,这时候会发生分区复制。也就是说,一个主题的同一分区会存在于集群的所有broker上,其中一个活跃可用的分区作为首领,其余的作为副本。如果有一个broker失效,其他broker可以接管领导权。
首领副本负责所有客户端读写操作(包括生产者和消费者),跟随者副本仅仅从首领副本同步数据。当首领副本出现故障是,跟随者副本中的一个副本会被选择为新的首领副本。
因为每个分区的副本中只有首领副本接收读写,所以每个服务端都会作为某些分区的首领副本,以及另外一些分区的跟随者副本,这样Kafka集群的所有服务端整体上对客户端是负载均衡的
6. 消息模型
推送模型(Push)
基于推送模型的消息系统,由消息代理(broker)记录消费者的消息状态。消息代理在将消息推送到消费者后,将这条消息标记为已消费,但这种方式无法很好地保证消息的处理语义
拉取模型(Pull)
拉取模型由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。消费者能拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的
二、Kafka的设计与实现
1. 文件系统的持久化与数据传输效率
- 预读
提前将一个比较大的磁盘块读入内存 - 后写
将若干小的逻辑写操作合并成一个大的物理写操作 - 磁盘缓存
在内存中尽量报错尽可能多的数据,并在需要时将这些数据刷新到磁盘 - 消息分组
用批量的方式一次发送一个消息组 - 压缩消息集
-
零拷贝
使用零拷贝技术只需将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存中直接发送到网络(发送给不同的使用者时,都可以重复使用同一个页面缓存),避免了重复的复制操作
2. 生产者与消费者
后面详细叙述
3. 副本机制与容错处理
Kafka的副本机制会在多个broker上对每个主题分区的日志进行复制。副本的单位是主题的分区,每个主题的每个分区都有一个首领副本以及任意个跟随者副本。
- 首领副本
所有的读写请求总是被路由到分区的首领副本上 - 跟随者副本
跟随者副本会和首领副本保持数据同步,在首领副本失效时替换为首领副本 - 节点存活
节点的存活定义的条件:1. 节点必须和Zookeeper保持会话;2. 如果这个节点是某个分区的跟随者副本,它必须对分区首领副本的写操作进行复制,并且复制的进度不能落后太多 - ISR
满足上述两个条件被称为in-sync(正在同步中)。每个分区的首领副本会跟踪in-sync的跟随者副本节点(In Sync Replicas,即ISR)。如果一个跟随者副本挂掉、没有响应或落后太多,首领副本就会将其从同步副本中移除。反之,如果跟随者副本重新赶上首领副本,他就会加入到首领副本的同步集合中 - 确认提交
一条消息只有被ISR集合中所有副本都保存到本地的日志文件中,才会被认为是成功提交了。任何时刻,只要ISR至少有一个副本是存活的,Kafka就可以保证“一条消息一旦被提交,就不会丢失”。只有已经提交的消息才能被消费者消费。
三、Kafka生产者——向Kafka写入数据
1. kafka发送消息的主要步骤
ProducerRecord
需要包含目标主题和发送的内容,还可以指定键或分区。在发送ProducerRecord
对象时,生产者要先把键和值对象序列化成字节数组。
接下来,数据被传给分区器。如果ProducerRecord
对象里指定了分区,直接返回指定的分区,如果没有则分区器会根据键和分区值来确定分区。然后这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责发送批次消息到响应的broker上。
broker收到消息后会返回一个响应。如果消息成功写入到Kafka,就返回一个RecordMetaData
对象,包含了主题、分区信息和分区中的偏移量。如果失败则返回错误,生产者收到错误之后会尝试重新发送消息。
2. 发送消息的模式
- 发送并忘记
把消息发送给broker,但并不关心它是否被正常送达,不能保证消息发送的可靠性 - 同步发送
使用send()
方法发送消息,它会返回一个Future
对象,调用 get() 方法进行等待,来判断消息是否成功发送 - 异步发送
使用send()
方法并制定回调函数,服务器在返回响应时调用该函数
四、Kafka消费者——从Kafka读取数据
1. 消费者和消费者群组
Kafka消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。消费者群组里的消费者总是平均地读取固定的分区。多余分区数量的消费者将会闲置,不会收到任何消息。
一个主题可以被多个消费者群组读取,这些消费者群组之间不会相互影响;一个消费者群组也可以订阅多个主题。
2. 消费者群组和分区再均衡
再均衡的含义:分区的所有权从一个消费者转移到另一个消费者
分区会再均衡的情况:
- 消费者加入群组
- 消费者离开群组
- 主题分区发生变化
消费者通过向北指派为群组协调器的broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系一级它们对分区的所有权关系。只要消费者以正常的频率发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。
消费者会在轮询消息或提交已读取偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器就会认为它已经死亡,就会触发一次再均衡。
3. 消息轮询
消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。
4. 提交和偏移量
poll()
方法总是返回由生产者写入到Kafka但还没有被消费者读取过的记录(偏移量)。更新分区当前位置(偏移量)的操作被称为提交。
消费者向_consumer_offset
的特殊主体发送消息,消息是包含每个分区的偏移量。如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区。此时消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的位置继续处理。
提交的方式:
- 自动提交
如果enable.auto.commit
设置为true
,每过auto.commit.internal.ms
的时间,消费者会自动把从poll()
方法接收到的最大偏移量提交上去。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是则提交上一次轮询获取到的偏移量 - 提交当前偏移量
把enable.auto.commit
设为false
,让消费者决定何时提交偏移量。使用commitSync()
会提交由poll()
获取的最新偏移量,提交成功后马上返回,若失败则抛出异常。手动提交再broker对提交请求作出回应之前,应用程序会一直阻塞 - 异步提交
在成功提交或碰到无法恢复的错误之前,commitSync()
会一直重试,但是commitAsync
不会,之所以不会是因为它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功 - 同步和异步组合提交
- 提交指定的偏移量
每个分区都一个有序、不可变的记录序列,新的消息会不断追加到提交日志(commit log)。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,叫做偏移量(offset),这个偏移量可以唯一确定当前分区的任意一条消息
5. 再均衡监听器
ConsumerRebalanceListener
,用于监听再均衡事件并处理
6. 从指定偏移量处开始处理记录
- 从分区的起始位置开始读取消息
seekToBeginning(Collection<TopicPartition> tp)
- 从分区的末尾开始读取消息
seekToEnd(Collection<TopicPartition> tp)
7. 退出
如果确定要退出循环,需要通过另一个线程调用Consumer#wakeUp()
方法;如果循环运行在主线程里,可以在Runtime#addShutdownHook(Thread)
里调用该方法。Consumer#wakeUp()
是消费者唯一一个可以从其他线程里安全调用的方法,该方法被调用可以退出poll()
,并抛出WakeupException
异常,如果线程没有等待轮询,那么异常将在下一次调用poll()
时抛出
在退出线程之前有必要调用Consumer#close()
,该方法会提交任何没有提交的内容,并向群组协调器发送消息告知其自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时
8. 序列化与反序列化
生产者要用序列化器把对象转换成字节数组再发送给Kafka,消费者需要用反序列化器把从Kafka接收到的字节数组转换成Java对象
四、深入Kafka
1. 集群成员关系
Kafka使用Zookeeper来维护集群成员的信息。每个broker都有一个唯一标识符,可以在配置文件指定,也可以自动生成。不能启动另一个存在相同ID的broker
在broker启动的时候,它通过创建临时节点把自己的ID注册到Zookeeper。Kafka组件订阅Zookeeper的/broker/ids
路径(broker在Zookeeper上的注册路径),当有broker加入或退出集群时,这些组件就会被通知。
当broker停机、出现网络分区或长时间垃圾回收停顿时,broker会从Zookeeper上断开连接,此时broker在启动时创建的临时节点会自动从Zookeeper上被移除。监听borker列表的Kafka组件会被告知该broker已被移除
2. 控制器
控制器的产生
控制器是一个broker,除了具有普通broker的功能之外,还负责分区首领的选举。集群里第一个启动的broker通过在Zookeeper里创建一个路径为/controller
的临时节点让自己成为控制器。其他broker在启动的时候也会尝试创建这个节点并失败,并在控制器节点上创建Zookeeper Watch对象用来接收控制器变更通知
如果控制器被关闭或者与Zookeeper断开连接,/controller
节点会被删除。集群中的其他broker通过Watch对象得到控制器节点断开的通知,并尝试让自己成为新的控制器,非控制器broker重复上述过程
每个新选出的控制器通过Zookeeper的条件递增操作获得一个全新的值更大的controller epoch,其他broker在知道当前controller epoch之后,会忽略含有旧epoch的消息
分区首领的产生
当控制器发现broker离开集群(观察相关Zookeeper路径),它就知道,那些首领在这个broker上的分区需要一个新的首领。控制器遍历这些分区,并确定谁应该成为新首领(分区副本列表的下一个副本),然后向所有包含新首领或现有跟随者的broker发送请求,该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首领爱是处理来自生产者和消费者的请求,而跟随者开始从首领那里复制消息
3. 复制
复制功能是Kafka架构的核心,因为它可以在个别节点失效时仍能保证Kafka的可用性和持久性
Kafka使用主题来组织数据,每个主题被分为若干个内容不同的分区,每个分区有多个内容相同的副本
副本有两种类型:
- 首领副本
每个分区都有一个首领副本,所有的生产者的写请求和消费者的读请求都会在首领副本上操作。首领的另一个任务是了解哪个跟随者副本是跟自己保持一致的。 - 跟随者副本
首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,唯一的任务就是从首领复制消息,保持与首领一致的状态。持续请求得到的最新消息副本被称为同步的副本(ISR)。在首领发生失效时,只有同步副本才有可能成为新首领
如果跟随者在指定时间内没有请求任何消息,或者虽然在请求消息,但是没有请求最新的消息,那么它就不是同步的。如果一个副本无法与首领保持一致,在首领发生失效时,它不能成为新首领
4. 处理请求
Kafka broker处理请求的过程如图
- Acceptor线程
负责监听端口并创建连接,并将连接交给Processor线程处理 - Processor线程
负责从客户端获取请求消息,并放入请求队列,然后从响应队列获取响应消息,把它们发送给客户端。线程数量可以配置 - IO线程
负责处理请求,并产生响应
主要请求类型
- 生产请求
- 消费请求
- 元数据请求
- 其他类型
生产请求和获取请求都必须发送给分区的首领副本。如果broker收到一个指定分区的请求,而该分区的首领不在此broker,那么broker会响应“非分区首领”的错误。Kafka客户端负责把生产请求和获取请求发送到正确的broker上
元数据请求包含了客户端感兴趣的主题列表,服务端的响应信息里指明了这些主题包含的分区、每个分区都有哪些副本,以及哪个副本是首领
5. 物理存储
Kafka的基本存储单元是分区,在配置Kafka时,管理员指定了一个用于存储分区的目录清单log.dirs
。在创建主题时,Kafka首先会决定如何在broker间分配分区,分区要达到以下目标:
- 在broker间平均地分布分区副本
- 确保每个分区的每个副本分布在不同的broker上
- 如果为broker指定了机架信息,那么尽可能地把每个分区的副本分配到不同机架的broker上
五、可靠数据传递
1. Kafka的可靠性保证
- 顺序保证:
保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入,那么Kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A后读取消息B - 提交确认:
只有当消息被写入到分区的所有同步副本时(但不一定要写入磁盘),它才被认为是“提交”的。生产者可以选择接受不同类型的确认:消息被完全提交时的确认、消息写入首领副本时的确认、消息被发送到网络时的确认 - 消息不丢:
只要还有一个副本是活跃的,那么已经提交的消息就不会丢失 - 提交可见:
消费者只能读取已经提交的消息
2. 复制
Kafka的主题被分为多个分区,分区是基本的数据块。分区存储在单个磁盘上,Kafka可以保证分区里的事件是有序的。分区可以在线,也可以离线(不可用)。
每个分区可以有多个副本,其中一个是首领。所有的消息都是直接发送给首领副本,或者直接从首领副本读取消息。其他分区只需要与首领副本保持同步,并及时复制最新的消息。当首领副本不可用时,分区其他任一同步副本将成为新首领。
同步副本需要满足以下条件:
- 与Zookeeper之间有一个活跃的会话,在过去6s(可配置)内向其发送过心跳
- 过去10s内(可配置)从首领那里获取过消息
- 在过去10s内从首领那里获取过最新的消息
3. broker配置
3.1 复制系数
如果复制系数为N,那么在N - 1个broker失效的情况下,仍然能够从主题读取数据或向主题写入数据。所以,更高的复制洗漱会带来更高的可用性、可靠性和更少的故障
3.2 不完全的首领选举
3.3 最少同步副本
4. 在可靠的系统里使用生产者
4.1 发送确认
-
acks = 0
如果生产者能够通过网络把消息发送出去,就认为消息已成功写入Kafka -
acks = 1
首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)是会返回确认或错误响应 -
acks = all
首领在返回确认或错误响应之前,会等待素有同步副本都收到消息
4.2 配置生产者的重试参数
4.3 额外的错误处理
5. 在可靠的系统里使用消费者
5.1 消费者的可靠配置
-
group.id
如果连个消费者具有相同的group.id
,并且订阅了同一个主题,那么每个消费者会分到主题分区的一个子集。如果你希望消费者可以看到主题的所有消息,那么需要它们设置唯一的group.id
-
auto.offset.reset
=ealiest | latest
指定了再没有偏移量可提交时,或者请求的偏移量在broker上不存在时,消费者的行为
earliest
:消费者会动分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数据,但可以保证最少的数据丢失
latest
:消费者会从分区的末尾开始读取数据,这样可以减少重复处理消息,单很有可能丢失数据 -
enable.auto.commit
自动提交偏移量,异步处理消息可能导致提交错误的偏移量 -
auto.commit.interval.ms
自动提交偏移量的频率
5.2 显式提交偏移量
- 总是在处理完事件后再提交偏移量
- 提交频度是性能和重复处理消息数量之间的权衡
- 确保对提交的偏移量心里有数
- 再均衡
- 消费者可能需要重试
- 消费者可能需要维护状态
- 长时间处理
- 仅一次传递
Q&A
- 推送消息给消费者和消费者拉取消息各自优缺点
broker主动地推送消息给下游的消费者,由broker控制数据传输的速率,但是broker对下游消费者能否及时处理消息不得而知。如果数据的消费速率低于生产速率,消费者就会处于符合状态,那么发送给消费者的消息就会堆积得越来越多。而且,推送方式页难以应付不同类型的消费者,因为不同消费者的消费速率不一定都相同,broker需要调整不同消费者的传输速率,并让每个消费者充分利用系统的资源。这种方式实现起来比较困难。
消费者从broker主动拉取数据,broker是无状态的,它不需要标记哪些消息时被消费者处理过,也不需要保证一条消息只会被一个消费者处理。而且,不同的消费者可以按照自己最大的处理能力来拉取数据,及时有时候某个消费者的处理速度稍微落后,它也不会影响其他的消费者,并且在这个消费者恢复处理速度后,仍然可以追赶之前落后的数据。
再有就是,推送方式比较难保证消费者正常消费消息状态一致性,需要保存每条消息的多种状态;而拉取方式只需要为每个有序分区记录一个偏移量,定时将分区的消费进度保存成检查点(checkpoint)文件,不需要记录消息的任何状态,而且有需要时,消费者可以回退到某个旧的偏移量位置,重新处理数据