1.kafka的框架设计
Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。
为什么要使用 kafka,为什么要使用消息队列
- 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
- 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
- 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
- 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
- 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
- broker:Kafka 服务器,负责消息存储和转发。broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。
- topic:消息类别,Kafka 按照 topic 来分类消息
- partition:为了实现扩展性,提高并发能力,一个topic以多个partition的方式,分布到多个broker上,每个partition是一个有序的队列。一个topic的每个partition都有若干副本(reoplic),一个Leader和若干个Follower,生产者发送数据的对象,消费者消费数据的对象,都是leader。Follower负责实时从leader同步数据,保持和leader的数据同步,leader发送故障时,某个Follower会成为新的leader。
- offset:消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表该消息的唯一序号
- Producer:消息生产者
- Consumer:消息消费者
- Consumer Group:消费者分组,每个 Consumer 必须属于一个 group,消费者组内的消费者负责消费不同分区的的数据,提高消费能力,逻辑上的一个订阅者。在 Kafka 中,消费者组是一个由多个消费者实例 构成的组。多个实例共同订阅若干个主题,实现共同消费。同一个组下的每个实例都配置有 相同的组 ID,被分配不同的订阅分区。当某个实例挂掉的时候,其他实例会自动地承担起 它负责消费的分区。
- Zookeeper:保存着集群 broker、topic、partition 等 meta 数据;另外,还负责 broker 故障发现,partition leader 选举,负载均衡等功能
2.kafka什么情况下出现消息丢失及解决方案
消息发送
- ack=0,不重试
producer发送消息完,不管结果了,如果发送失败消息丢失
2.ack=1,leader crash
priducer发送消息完,只等leader写完就返回,leader crash,follower没来得及同步导致消息丢失
3.unclean.leader.election.enable = true
允许选举ISR以外的副本作为leader,会导致消息丢失,默认为false,producer发送完异步消息,只等leader写入就返回,leader crash后,这时ISR(In-Sync Replicas)中没有follower,leader从OSR(Out-Sync Replicas)中选举,因为OSR中本来落后于leader导致消息丢失
解决方案
1.配置ack=all/-1,tries > 1,unclean.leader.election.enable = false
producer发送完消息,等待follower同步完成再返回,如果异常则重试,副本数量可能影响吞吐量
不允许选举ISR的副本作为leader
2.配置min.insync.replicas>1
副本指定必须写操作成功的最小副本数量,如果不能满足这个最小值,则生产者引发一个异常(NotEnoughReplicash或者NotEnoughReplicashAfterAppend)
消费
先commit再处理消息,如果处理消息的时候异常了,但offset已经提交了,这条消息对于消费者来说丢失了
broker的刷盘
减少刷盘的间隔
kafka如何保证不重复消费又不丢失数据
1.必须要求至少一个 Follower 在 ISR 列表里。
2.第二条,每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功。
3.pull和push的区别
pull模式
- 根据Consumer消费能力进行拉取
- 可以批量拉取,也可以单条拉取
- 可以设置不同的提交方式,实现不同的传输语义
缺点:如果kafka没有数据,会导致Consumer空循环,消耗资源
解决:通过设置参数,Consumer拉取数据为空或者没有达到一定数量时候进行阻塞
push模式
缺点:速率固定,忽略了consumer的消费能力,可能导致拒绝服务或者网络阻塞等情况
3.kafka中zookeeper的作用
1.Broker注册 Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来 /brokers/ids
2.Topic注册 在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护 /borkers/topics
3.生产者负载均衡 由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
4.消费者负载均衡 与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
5.分区与消费者 的关系 在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。
6.消息消费进度Offset 记录 在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。
7.消费者注册 消费者服务器在初始化启动时加入消费者分组的步骤如下
注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。
- 对 消费者分组 中的 消费者 的变化注册监听
- 对Broker服务器变化注册监听
-
进行消费者负载均衡
4.kafka高性能的原因
Kafka不基于内存,而是硬盘存储,因此消息堆积能力更强
1.顺序写磁盘(相比磁盘的随机写快很多)。如果你是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能本身也是差不多的。
2.利用Page Cache(页高速缓冲存储器,简称页高缓)空中接力的方式来实现高效读写,操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是操作系统自己管理的缓存。原理就是Page Cache可以把磁盘中的数据缓存到内存中,把对磁盘的访问改为对内存的访问。
3.零拷贝 零拷贝技术是一种避免CPU将数据从一块存储拷贝到另一块存储的技术。Kafka使用零拷贝技术将数据直接从磁盘复制到网卡设备缓冲区中,而不需要经过应用程序的转发。
通常应用程序将磁盘上的数据传送至网卡需要经过4步:
-调用read(),将数据从磁盘复制到内核模式的缓冲区;
-CPU会将数据从内核模式复制到用户模式下的缓冲区;
-调用write(),将数据从用户模式下复制到内核模式下的Socket缓冲区;
-将数据从内核模式的Socket缓冲区复制到网卡设备。
上面的步骤中,第2、3步将数据从内核模式经过用户模式再绕回内核模式,浪费了两次复制过程。采用零拷贝技术,Kafka可以直接请求内核把磁盘中的数据复制到Socket缓冲区,而不用再经过用户模式
5.kafka的rebalance机制
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有consumer如何达成一致,来分配订阅 Topic 的每个分区。
Rebalance 的触发条件有3个
- 组成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
- 订阅的 Topic 个数发生变化。
- 订阅 Topic 的分区数发生变化。
Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较严重的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。
GroupCoordinator(协调者):协调消费者组完成消费者Rebalance的重要组件,每一个broker都会启动一个GroupCoodinator,Kafka 按照消费者组的名称将其分配给对应的GroupCoodinator进行管理;每一个GroupCoodinator只负责管理一部分消费者组,而非集群中全部的消费者组。通常是partition的leader节点的broker
- Coordinator通过心跳返回通知consumer 进行rebalance
- consumer通知Coordinator加入组,Coordinator选举产生leader consumer
- leader consumer从Coordinator获得所有的consumer,发送syncGroup(分配信息)给到Coordinator
- Coordinator通过心跳机制将syncGroup发送到consumer
- 完成rebalance
leader consumer监控topic变化,通知Coordinator进行rebalance
如果C1消费消息超时,出入rebalance,重新分配后该消息被其他消费者消费,此时C1消费完成提交offset,导致错误
解决:Coordinator每次进行rebalance,会标记一个generation给consumer,每次rebalance该generation会+1,consumer提交offset时,会对比generation,不一致则拒绝提交。
6.Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么
ISR:In-Sync Replicas 副本同步队列
AR:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
7.Kafka中位移(offset)的作用
在 Kafka 中,每个 主题分区下的每条消息都被赋予了一个唯一的 ID 数值,用于标识它在分区中的位置。这个 ID 数值,就被称为位移,或者叫偏移量。一旦消息被写入到分区日志,它的位移值将不能被修改。
auto.offset.reset:消费规则,默认earliest 。
earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
自动提交偏移量:
Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。手动提交偏移量:
鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了手动提交offset策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。
对于手动提交offset主要有3种方式:1.同步提交 2.异步提交+回调函数 3.异步+同步 组合的方式提交
8.Kafka中的领导者副本(Leader Replica)和追随者副本 (Follower Replica)的区别
Kafka 副本当前分为领导者副本和追随者副本。只有Leader副本才能 对外提供读写服务,响应Clients端的请求。Follower 副本只是采用拉(PULL)的方 式,被动地同步Leader副本中的数据,并且在Leader副本所在的 Broker 宕机后,随时准备应聘 Leader 副本。
9.分区 Leader 选举策略有几种?
kafka在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
分区的 Leader 副本选举对用户是完全透明的,它是由 Controller 独立完成的。你需要回答的是,在哪些场景下,需要执行分区 Leader 选举。每一种场景对应于一种选举策略。当前,Kafka 有 4 种分区 Leader 选举策略。
- OfflinePartition Leader 选举:每当有分区上线时,就需要执行 Leader 选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区 Leader 选举场景。
- ReassignPartition Leader 选举:当你手动运行kafka-reassign-partitions命令,或者是调用 Admin的 alterPartitionReassignments 方法执行分区副本重分配时,可能触发此类选举。假设原来的 AR 是[1,2,3],Leader 是 1,当执行副本重分配后,副本集 合 AR 被设置成[4,5,6],显然,Leader 必须要变更,此时会发生 Reassign Partition Leader 选举。
- PreferredReplicaPartition Leader 选举:当你手动运行 kafka-preferred-replica- election 命令,或自动触发了 Preferred Leader 选举时,该类策略被激活。所谓的 Preferred Leader,指的是 AR 中的第一个副本。比如 AR 是[3,2,1],那么, Preferred Leader 就是 3。
- ControlledShutdownPartition Leader 选举:当 Broker 正常关闭时,该 Broker 上 的所有 Leader 副本都会下线,因此,需要为受影响的分区执行相应的 Leader 选举。
10.kafka的集群特性
集群 partition 备份 Kafka 支持设置针对每个 partition 备份,可以将 partition 备份到不同的 broker 上,其中 leader partition 负责读写,其他 follower 仅负责同步,当 leader 挂掉后会从 follower 中选取新的 leader 。
消息消费顺序 一个 partition 同一时刻在一个 consumer group 中只能有一个 consumer 实例在消费,从而保证了消费顺序。consumer group 中的 consumer 实例的数量不能比一个 topic 中的 partition 的数量多,否则,多出来的 consumer 无法消费到消息。Kafka 的消息在单个 partition 上是可以保证顺序的,但是在整体上无法保证顺序消费
消息消费模式 关于消费模式,Kafka 通过 消费组的概念可以灵活设置。如常见的 队列模式 即 所有的 consumer 在同一个 consumer group 下。发布订阅模式 则设置多个 consumer group 进行消费即可
11.kafka消息的确认机制
acks:消息的确认机制,默认值是0。
acks=0:如果设置为0,生产者不会等待kafka的响应。
acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。