kafka
kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。Kafka就是一种发布-订阅模式。将消息保存在磁盘中,以顺序读写方式访问磁盘,避免随机读写导致性能瓶颈。
kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。
1.Producer
生产者即数据的发布者,创建一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。1)key有填: 按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了);2)key没填:round-robin来选partition。
2.Topic
一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。
3.Partition(leader, follower)
当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower。follower只是单调的和leader跟进,同步消息即可。当一个broker歇菜后,所有leader在该broker上的partition都会重新选举,选出一个leader。
leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从“in sync replicas”(ISR)同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它。
一个partition内消息是有序的,但不保证多个partition之间的数据有序。同一topic下的不同分区包含的消息不同。
多副本同步:生产者生产消息的时候,通过request.required.acks参数来设置数据的可靠性。
4.Broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。负责消息的读取和存储,接受生产者发过来的消息,分配offser,保存到磁盘中。
broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
文件组织:
kafka的数据,实际上是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。
在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。
每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。
5.Controller(broker)
关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。kafka使用zookeeper在broker中选出一个controller,用于partition分配和leader选举。
partition的分配
·将所有Broker(假设共n个Broker)和待分配的Partition排序
·将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
·将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
leader容灾
controller会在Zookeeper的/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。当broker宕机后,controller就会给受到影响的partition选出新leader。controller从zk的/brokers/topics/[topic]/partitions/[partition]/state中,读取对应partition的ISR(in-sync replica已同步的副本)列表(跟leader有一定时差),选一个出来做leader。选出leader后,更新zk,然后发送LeaderAndISRRequest给受影响的broker,让它们改变知道这事。为什么这里不是使用zk通知,而是直接给broker发送rpc请求,我的理解可能是这样做zk有性能问题吧。
如果ISR列表是空,那么会根据配置,随便选一个replica做leader,或者干脆这个partition就是歇菜。如果ISR列表的有机器,但是也歇菜了,那么还可以等ISR的机器活过来。
6.消费(coordinator)
订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者(可以向consumer group中增加新的 consumer,来水平扩展消费能力。)。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。
生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。
7.zookeeper
kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响。
Zookeeper主要用于集群中不同节点之间的通信。进行leader检测、分布式同步、配置管理、识别新节点何时离开或连接(通过心跳机制检查每个节点的连接)、集群节点实时状态等。
当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息。每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".
1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,已经监测partitionleader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.
8.kafka支持3种消息投递语义
`At most once:最多一次,消息可能会丢失,但不会重复。先获取数据,再commit offset,最后进行业务处理。
`At least once:最少一次,消息不会丢失,可能会重复。先获取数据,再进行业务处理,业务处理成功后commit offset。(在业务中,常常都是使用At least once的模型)
`Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)
9.其他面试题目:
1.Kafka与传统消息系统之间的区别:
传统的消息传递方法包括两种:
·排队:在队列中,一组用户可以从服务器中读取消息,每条消息都发送给其中一个人。
·发布-订阅:在这个模型中,消息被广播给所有的用户
优势:
1)快速
2)持久跟容错性:消息是持久性的,并在集群中进行复制,以防止数据丢失。
3)可伸缩性:是一个分布式系统,以集群凡是群星,可以灵活伸缩。
2.Kafka服务器能接收到的最大信息是多少?
Kafka服务器可以接收到的消息的最大大小是1000000字节。
3.Kafka的零拷贝技术
传统读取文件发送到网络的过程,会复制4次。
(1)操作系统将数据从磁盘文件中读取到内核空间的页面缓存;(2)应用程序将数据从内核空间读入用户空间缓冲区;(3)应用程序将读到数据写回内核空间并放入socket缓冲区;(4)操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。
Kafka,保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的,因为使用了相同的消息格式进行磁盘存储和网络传输,Kafka可以使用零复制技术将消息直接发送给消费者。“零拷贝技术”只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。
消息体包括键、值、偏移量、消息大小、校验和CRC、消息版本号、压缩算法(Snappy、GZip或LZ4)和时间戳。时间戳是可配置的,可以是生产者发送消息的时间,也可以是消息到达broker的时间。