一、. rebalance
-
FIND_COORDINATOR
确定消费者所属的GroupCoordinator所在的broker,如果消费者已经保存了GroupCordinator信息,可以进入下一个阶段,否则需要查找_consumer_offset对应的分区的leader副本所对应的broker,
具体查找 Group Coordinator 的方式是先根据消费组 groupid 的晗希值计算_consumer_offsets
中的分区编号,具体算法如代码清单
以此broker作为GroupCordinator角色,又扮演分区分配和组内消费者位移的角色。
-
JOIN_GROUP
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的
消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。- 选举消费组的 leader
GroupCoordinator 需要为消费组内的消费者选举出一个消费组的 leader ,这个选举的算法也
很简单,分两种情况分析。如果消费组内还没有 leader ,那么第1 个加入消费组的消费者即为
消费组的 leader 。如果某一时刻 leader 消费者由于某些原因退出了消费组, 那么会重新选举
个新的 leader ,这个重新选举 leader 的过程又更“随意”了,相关代码如下
//scala code.
private val members = new mutable .HashMap[String, MemberMetadata]
var leaderid = members.keys.head
在 GroupCoordinator 中消费者的信息是以 HashMap 的形式存储的,
其中 key 为消费者的member id ,而 value 是消费者相关的元数据信息。 leaderld 表示 leader
消费者的 member id ,它的取值为 HashMap 中的第一个键值对的 key ,这种选举的方式基本
上和随机无异总体上来说,消费组的 leader 选举过程是很随意的。 - 选举分区分配某咯
每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的
各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配 这个分区分配的选
举并非由 leader 费者决定,而是根据消费组内的各个消费者投票来决定的。这里所说的 “根
据组内的各个消费者投票来决定” Group Coordinator 还要再与各个消费者进行进一步交
互,而是根据各个消费者呈报的分配策略来实施。最终选举的分配策略基本上可以看作被各个
消费者支持的最多的策略,具体的选举过程如下:
(1 )收集各个消费者支持的所有分配策略,组成候选集 candidates
(2 )每个消费者从候选集 candidates 找出第 1个自身支持的策略,为这个策略投上一票。
(3 )计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略。
在此之后, Kafka 服务端就要发送 JoinGroupResponse 响应给各个消费者, leader 消费者和
其他普通消费者收到的响应内容不同。
- 选举消费组的 leader
-
SYNC_GROUP
同步阶段,每个消费者会 GroupCoordinator 发送 SyncGroupRequest 请求来同步分配方案。
只有 leader消费者发送的 SyncGroupRequest 请求中才包含具体的分区分配方案
服务端在收到消费者发送的 SyncGroupRequest 请求之后会交 GroupCoordinator 来负责具
体的逻辑处理。 GroupCoordinator 同样会先对 SyncGroupRequest 请求做合法性校验,在此之后会将从 leader 消费者发送过来的分配方案提取出来,连同整个消费组的元数据信息一起存入Kafka _consumer_offsets 主题中 最后发送响应给各个消费者 以提供给各个消费者各自所属
的分配方案。
HEARTBEAT
消费者通过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分
区的所有权关系。只要消费者以正常的时间间隔发送 就被认为是活跃的 ,说明它还在
取分区中的消息。心跳线程是一个独立的线程,可以在轮询消息的空档发送心跳。如果消费者停
发送心跳的时间足够长,则整个会话就被判定为过期, GroupCoordinator 会认为这个消费者
己经死亡,就会触发一次再均衡行为,消费者的心跳间隔时间由参数 heartbeat.interval.ms
指定,默认值为 3000,这个参数必须比 session.timeout.ms 参数设定的值要小,
般情况下 heartbeat.interval.ms的配置值不能超过 session.timeout.ms 配置值的
1/3 。这个参数可以调整得更低,以控制正常重新平衡的预期时间
如果消费者发生崩溃,并停止读取消息 那么 GroupCoordinator 会等待一 段时间
确认这个消费者死亡之后才会触发再均衡。在这一小段时间内, 死掉的消费者井不会读取分区
里的消息。这个一小段时间由 session.timeout.ms 参数控制。
还有一个参数 max.poll.interval.ms ,它用来指定使用消费者组管理时 poll () 方法调
用之间的最大延迟,就是消费者在获取更多消息之前可以空闲的时间量的上限。如果超时
时间期满之前 poll ()没有调用, 消费者被视为失败,并且分组将重新平衡, 便将分区重新分
配给别的成员。
除了被动退出消费组,还可 以使 LeaveGroupRquest 请求主动退出消费组,比如客户端
调用了 unsubscrible() 方法取消对某些主题的订阅
二、. 分区分配方案,Kafka的分区,是怎么分区的
-
rangeAssignor 分配策略
rangeAssignor 策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
假设消费组内有 2个消费者 C1,C0,都订阅了主题 t0和t1 ,并且每个主题都有 4个分区
么订阅的所有分区可以标识为 t0p0 、t0p1、 t0p2、 t0p3 、t1p0 、tlp1 、t1p2 、tlp3 。最终的分配结果为
消费者 CO: t0p0 、t0p1、 t0p2、 t0p3
消货者 Cl :t1p0 、tlp1 、t1p2 、tlp3
这样分配得很均匀,那么这个分配策略能够一直保持这种良好的特性吗?我 不妨再来看
另一种情况。假设上面例子中2 个主题都只有 3个分区,那么订阅的所有分区可以标识为: t0p0 、t0p1、 t0p2、t1p0 、tlp1 、t1p2 。最终的分配结果为
消费者 CO:t0p0 、t0p1、 t0p2、t1p0
消费者 Cl:tlp1 、t1p2
明显这样的分配并不均匀,如果将类似的情形扩大, 有可能出现部分消费者
过载的情况。
- RoundRobinAssignor 分配策略
RoundRobinAssignor 分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐 将分区依次分配给每个消费者。
如果同一个消费组内所有的消费者的订阅信息都是相同的,那RoundRobinAssignor 分配策略分区分配会是均匀的。如果同一个消费组内的消费者订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能导致分区分配得不均匀。 - StickyAssignor 分配策略
三、 Kafka怎么保证负载均衡
消费者组,消费者个数可以设置
主题下面的分区,分区个数可以设置,这样会写到不同文件,减少io的瓶颈
四.、Kafka为什么高性能
零拷贝:https://www.cnblogs.com/shijingxiang/articles/12203167.html、分区有序+索引:https://www.jianshu.com/p/604c8799a662、磁盘缓存、批量读写
五、 Kafka的消费者组
在 Kafka 中,消费者组是一个或多个消费者实例构成的组。多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有相同的组 ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动地承担起它负责消费的分区。此时,如果你擅长位移值原理,就不妨再提一下消费者组的位移提交机制;如果你擅长 Kafka Broker,可以提一下消费者组与 Broker 之间的交互;如果你擅长与消费者组完全不相关的 Producer,那么就可以这么说:“消费者组要消费的数据完全来自于 Producer 端生产的消息,我对 Producer 还是比较熟悉的。”使用这个策略的话,面试官可能会被你的话术所影响,偏离他之前想问的知识路径。
六、分区 Leader 选举策略有几种?
分区的 Leader 副本选举对用户是完全透明的,它是由 Controller 独立完成的。你需要回答的是,在哪些场景下,需要执行分区 Leader 选举。每一种场景对应于一种选举策略。当前,Kafka 有 4 种分区 Leader 选举策略。OfflinePartition Leader 选举:每当有分区上线时,就需要执行 Leader 选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区 Leader 选举场景。ReassignPartition Leader 选举:当你手动运行 kafka-reassign-partitions 命令,或者是调用 Admin 的 alterPartitionReassignments 方法执行分区副本重分配时,可能触发此类选举。假设原来的 AR 是[1,2,3],Leader 是 1,当执行副本重分配后,副本集合 AR 被设置成[4,5,6],显然,Leader 必须要变更,此时会发生 Reassign Partition Leader 选举。PreferredReplicaPartition Leader 选举:当你手动运行 kafka-preferred-replica-election 命令,或自动触发了 Preferred Leader 选举时,该类策略被激活。所谓的 Preferred Leader,指的是 AR 中的第一个副本。比如 AR 是[3,2,1],那么,Preferred Leader 就是 3。ControlledShutdownPartition Leader 选举:当 Broker 正常关闭时,该 Broker 上的所有 Leader 副本都会下线,因此,需要为受影响的分区执行相应的 Leader 选举。这 4 类选举策略的大致思想是类似的,即从 AR 中挑选首个在 ISR 中的副本,作为新 Leader。当然,个别策略有些微小差异。不过,回答到这种程度,应该足以应付面试官了。毕竟,微小差别对选举 Leader 这件事的影响很小。
七、.Kafka 的哪些场景中使用了零拷贝(Zero Copy)?
Zero Copy 是特别容易被问到的高阶题目。在 Kafka 中,体现 Zero Copy 使用场景的地方有两处:基于 mmap 的索引和日志文件读写所用的 TransportLayer。先说第一个。索引都是基于 MappedByteBuffer 的,也就是让用户态和内核态共享内核态的数据缓冲区,此时,数据不需要复制到用户态空间。不过,mmap 虽然避免了不必要的拷贝,但不一定就能保证很高的性能。在不同的操作系统下,mmap 的创建和销毁成本可能是不一样的。很高的创建和销毁开销会抵消 Zero Copy 带来的性能优势。由于这种不确定性,在 Kafka 中,只有索引应用了 mmap,最核心的日志并未使用 mmap 机制。再说第二个。TransportLayer 是 Kafka 传输层的接口。它的某个实现类使用了 FileChannel 的 transferTo 方法。该方法底层使用 sendfile 实现了 Zero Copy。对 Kafka 而言,如果 I/O 通道使用普通的 PLAINTEXT,那么,Kafka 就可以利用 Zero Copy 特性,直接将页缓存中的数据发送到网卡的 Buffer 中,避免中间的多次拷贝。相反,如果 I/O 通道启用了 SSL,那么,Kafka 便无法利用 Zero Copy 特性了。
八、Kafka 为什么不支持读写分离?
这道题目考察的是你对 Leader/Follower 模型的思考。Leader/Follower 模型并没有规定 Follower 副本不可以对外提供读服务。很多框架都是允许这么做的,只是 Kafka 最初为了避免不一致性的问题,而采用了让 Leader 统一提供服务的方式。不过,在开始回答这道题时,你可以率先亮出观点:自 Kafka 2.4 之后,Kafka 提供了有限度的读写分离,也就是说,Follower 副本能够对外提供读服务。说完这些之后,你可以再给出之前的版本不支持读写分离的理由。场景不适用。读写分离适用于那种读负载很大,而写操作相对不频繁的场景,可 Kafka 不属于这样的场景。同步机制。Kafka 采用 PULL 方式实现 Follower 的同步,因此,Follower 与 Leader 存在不一致性窗口。如果允许读 Follower 副本,就势必要处理消息滞后(Lagging)的问题。
九. Java Consumer 为什么采用单线程来获取消息?
在回答之前,如果先把这句话说出来,一定会加分:Java Consumer 是双线程的设计。一个线程是用户主线程,负责获取消息;另一个线程是心跳线程,负责向 Kafka 汇报消费者存活情况。将心跳单独放入专属的线程,能够有效地规避因消息处理速度慢而被视为下线的“假死”情况。单线程获取消息的设计能够避免阻塞式的消息获取方式。单线程轮询方式容易实现异步非阻塞式,这样便于将消费者扩展成支持实时流处理的操作算子。因为很多实时流处理操作算子都不能是阻塞式的。另外一个可能的好处是,可以简化代码的开发。多线程交互的代码是非常容易出错的。
十、简述 Follower 副本消息同步的完整流程
首先,Follower 发送 FETCH 请求给 Leader。接着,Leader 会读取底层日志文件中的消息数据,再更新它内存中的 Follower 副本的 LEO 值,更新为 FETCH 请求中的 fetchOffset 值。最后,尝试更新分区高水位值。Follower 接收到 FETCH 响应之后,会把消息写入到底层日志,接着更新 LEO 和 HW 值。Leader 和 Follower 的 HW 值更新时机是不同的,Follower 的 HW 更新永远落后于 Leader 的 HW。这种时间上的错配是造成各种不一致的原因。
十一、 Kafka丢失消息
为什么会丢消息?怎么防止丢消息?怎么发现丢消息?丢消息补偿
1、Kafka消息丢失的情况:
(1)auto.commit.enable=true,消费端自动提交offersets设置为true,当消费者拉到消息之后,还没有处理完 commit interval 提交间隔就到了,提交了offersets。这时consummer又挂了,重启后,从下一个offersets开始消费,之前的消息丢失了。
(2)网络负载高、磁盘很忙,写入失败,又没有设置消息重试,导致数据丢失。
(3)磁盘坏了已落盘数据丢失。
(4)单 批 数 据 的 长 度 超 过 限 制 会 丢 失 数 据 , 报kafka.common.Mess3.ageSizeTooLargeException异常
2、Kafka避免消息丢失的解决方案:
(1)设置auto.commit.enable=false,每次处理完手动提交。确保消息真的被消费并处理完成。
(2)kafka 一定要配置上消息重试的机制,并且重试的时间间隔一定要长一些,默认 1 秒钟不符合生产环境(网络中断时间有可能超过 1秒)。
(3)配置多个副本,保证数据的完整性。
(4)合理设置flush间隔。kafka 的数据一开始就是存储在 PageCache 上的,定期 flush 到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache 上的数据就丢。可以通过 log.flush.interval.messages 和 log.flush.interval.ms 来 4.配置 flush 间隔,interval大丢的数据多些,小会影响性能但在 0.本,可以通过 replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,kafka 当前支持 GZip 和 Snappy压缩,来缓解这个问题 是否使用 replica 取决于在可靠性和资源代价之间的 balance。
十二、 Kafka重复消费怎么处理,怎么去重,为什么会产生重复消息?
producer发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后producer收到一个可恢复的Exception重试消息导致消息重复
1、取消自动自动提交
每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。
2、下游做幂等
一般的解决方案是让下游做幂等或者尽量每消费一条消息都记录offset,对于少数严格的场景可能需要把offset或唯一ID
- Kafka怎么保证消息有序
分区有序