存储概述
Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。
每个主题⼜可以分为⼀个或多个分区。
每个分区各⾃存在⼀个记录消息数据的日志文件。
图中,创建了⼀个 tp_demo_01 主题,其存在6个 Parition,对应的每个Parition下存在⼀个 [Topic-Parition] 命名的消息⽇志⽂件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的⽂件,比如: .index、.timestamp、.log、.snapshot 等。
其中,文件名⼀致的⽂件集合就称为 LogSement。
LogSegment
- 分区日志⽂件中包含很多的 LogSegment
- Kafka 日志追加是顺序写⼊的
- LogSegment 可以减小日志文件的⼤小
- 进行日志删除的时候和数据查找的时候可以快速定位。
-
ActiveLogSegment 是活跃的日志分段,拥有⽂件拥有写⼊权限,其余的 LogSegment 只有只读的权限。
⽇志⽂件存在多种后缀⽂件,重点需要关注 .index、.timestamp、.log 三种类型。
类别作用
每个 LogSegment 都有⼀个基准偏移量,表示当前 LogSegment 中第⼀条消息的 offset。
偏移量是⼀个 64 位的⻓整形数,固定是20位数字,⻓度未达到,⽤ 0 进⾏填补,索引文件和日志文件都由该作为文件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。
如果日志文件名为 00000000000000000121.log ,则当前日志文件的⼀条数据偏移量就是 121(偏移量从 0 开始)。
日志与索引文件
配置项默认值说明
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。
时间戳索引文件则根据时间戳查找对应的偏移量。
Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,并不保证每⼀个消息在索引文件中都有对应的索引项。
每当写入⼀定量的消息时,偏移量索引文件和时间戳索引文件分别增加⼀个偏移量索引项和时间戳索引项。
通过修改 log.index.interval.bytes 的值,改变索引项的密度。
切分⽂件
当满⾜如下⼏个条件中的其中之⼀,就会触发文件的切分:
- 当前⽇志分段⽂件的⼤⼩超过了 broker 端参数 log.segment.bytes 配置的值。 log.segment.bytes 参数的默认值为 1073741824,即 1GB。
- 当前日志分段中消息的最⼤时间戳与当前系统的时间戳的差值⼤于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
- 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。 log.index.size.max.bytes 的默认值为 10485760,即 10MB。
- 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE ,即要追加的消息的偏移量不能转变为相对偏移量。
为什么是 Integer.MAX_VALUE ?
1024 * 1024 * 1024=1073741824
在偏移量索引文件中,每个索引项共占⽤ 8 个字节,并分为两部分。
相对偏移量和物理地址。
相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
4 个字节刚好对应 Integer.MAX_VALUE ,如果⼤于 Integer.MAX_VALUE ,则不能⽤ 4 个字节进行表示了。
索引文件切分过程
索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大值
当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。
这⼀点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。
索引
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。
文件:
查看⼀个topic分区目录下的内容,发现有log、index和timeindex三个⽂件:
- log文件名是以文件中第⼀条message的offset来命名的,实际offset长度是64位,但是这里只使用了20位,应付生产是足够的。
- ⼀组index+log+timeindex⽂件的名字是⼀样的,并且log文件默认写满1G后,会进⾏log rolling形成⼀个新的组合来记录消息,这个是通过broker端 log.segment.bytes =1073741824指定的。
-
index和timeindex在刚使⽤时会分配10M的大小,当进行 log rolling 后,它会修剪为实际的大小。
创建主题:
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic
tp_demo_05 --partitions 1 --replication-factor 1 --config segment.bytes=104857600
创建消息⽂件:
[root@node1 ~]# for i in `seq 10000000`; do echo "hello lagou $i" >> nmm.txt; done
将⽂本消息⽣产到主题中:
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_demo_05<nmm.txt
查看存储⽂件:
如果想查看这些文件,可以使⽤kafka提供的shell来完成,几个关键信息如下:
(1)offset是逐渐增加的整数,每个offset对应⼀个消息的偏移量。
(2)position:消息批字节数,用于计算物理地址。
(3)CreateTime:时间戳。
(4)magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。
(5)compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1-GZIP、2-snappy、3-lz4。
(6)crc:对所有字段进行校验后的crc值。
[root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log | head
Dumping 00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 716 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0
CreateTime: 1596513421661 isvalid: true size: 16380 magic: 2 compresscodec: NONE crc:
2973274901
baseOffset: 717 lastOffset: 1410 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 16380
CreateTime: 1596513421715 isvalid: true size: 16371 magic: 2 compresscodec: NONE crc:
1439993110
baseOffset: 1411 lastOffset: 2092 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 32751
CreateTime: 1596513421747 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
3528903590
baseOffset: 2093 lastOffset: 2774 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 49116
CreateTime: 1596513421791 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
763876977
baseOffset: 2775 lastOffset: 3456 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 65481
CreateTime: 1596513421795 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
2218198476
baseOffset: 3457 lastOffset: 4138 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 81846
CreateTime: 1596513421798 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
4018065070
baseOffset: 4139 lastOffset: 4820 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 98211
CreateTime: 1596513421802 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
3073882858
baseOffset: 4821 lastOffset: 5502 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 114576
CreateTime: 1596513421819 isvalid: true size: 16365 magic: 2 compresscodec: NONE crc:
207330377
[root@node1 tp_demo_05-0]#
关于消息偏移量:
消息存储
- 消息内容保存在log⽇志⽂件中。
- 消息封装为Record,追加到log⽇志⽂件末尾,采⽤的是顺序写模式。
-
⼀个topic的不同分区,可认为是queue,顺序写⼊接收到的消息。
消费者有offset。下图中,消费者A消费的offset是9,消费者B消费的offset是11,不同的消费者offset是交给⼀个内部公共topic来记录的。
(3)时间戳索引⽂件,它的作用是可以让用户查询某个时间段内的消息,它⼀条数据的结构是时间戳(8byte) +相对offset(4byte),如果要使⽤这个索引文件,首先需要通过时间范围,找到对应的相对offset,然后再去对应的index文件找到position信息,然后才能遍历log文件,它也是需要使⽤上⾯说的index文件的。
但是由于producer⽣产消息可以指定消息的时间戳,这可能将导致消息的时间戳不一定有先后顺序,因此尽量不要生产消息时指定时间戳。
偏移量
- 位置索引保存在index文件中
- log日志默认每写⼊4K(log.index.interval.bytes设定的),会写⼊⼀条索引信息到index文件中,因此索引⽂件是稀疏索引,它不会为每条日志都建⽴索引信息。
- log文件中的日志,是顺序写⼊的,由message+实际offset+position组成
-
索引文件的数据结构则是由相对offset(4byte)+position(4byte)组成,由于保存的是相对第⼀个消息的相对offset,只需要4byte就可以了,可以节省空间,在实际查找后还需要计算回实际的offset,这对用户是透明的。
稀疏索引,索引密度不高,但是offset有序,⼆分查找的时间复杂度为O(lgN),如果从头遍历时间复杂度是O(N)。
示意图如下:
偏移量索引由相对偏移量和物理地址组成。
可以通过如下命令解析 .index ⽂件
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index --print-data-log | head
注意:offset 与 position 没有直接关系,因为会删除数据和清理⽇志。
[root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000003925423.log --print-data-log | head
Dumping 00000000000003925423.log
Starting offset: 3925423
baseOffset: 3925423 lastOffset: 3926028 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 0
CreateTime: 1596513434779 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
4049330741
baseOffset: 3926029 lastOffset: 3926634 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 16359
CreateTime: 1596513434786 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
2290699169
baseOffset: 3926635 lastOffset: 3927240 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 32718
CreateTime: 1596513434787 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
368995405
baseOffset: 3927241 lastOffset: 3927846 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 49077
CreateTime: 1596513434788 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
143415655
baseOffset: 3927847 lastOffset: 3928452 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 65436
CreateTime: 1596513434789 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
572340120
baseOffset: 3928453 lastOffset: 3929058 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 81795
CreateTime: 1596513434790 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
1029643347
baseOffset: 3929059 lastOffset: 3929664 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 98154
CreateTime: 1596513434791 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
2163818250
baseOffset: 3929665 lastOffset: 3930270 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 114513
CreateTime: 1596513434792 isvalid: true size: 16359 magic: 2 compresscodec: NONE crc:
3747213735
[root@node1 tp_demo_05-0]#
在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息元数据中存在若⼲的时间戳信息。如果 broker 端参数log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增⻓。反之如果是CreateTime 则⽆法保证顺序。
注意:timestamp文件中的 offset 与 index ⽂件中的 relativeOffset 不是⼀⼀对应的。因为数据的写⼊是各自追加。
思考:如何查看偏移量为23的消息?
Kafka 中存在⼀个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在00000000000000000000.index ,通过二分法在偏移量索引文件中找到不⼤于 23 的最⼤索引项,即 offset 20 那栏,然后从⽇志分段⽂件中的物理位置为320 开始顺序查找偏移量为 23 的消息。
时间戳
在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引⽂件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若⼲的时间戳信息。
如果 broker 端参数log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是CreateTime 则无法保证顺序。
通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。
时间戳索引索引格式:前⼋个字节表示时间戳,后四个字节表示偏移量。
思考:查找时间戳为 1557554753430 开始的消息?
- 查找该时间戳应该在哪个日志分段中。将1557554753430和每个日志分段中最⼤时间戳largestTimeStamp逐⼀对比,直到找到不小于1557554753430所对应的日志分段。日志分段中的largestTimeStamp的计算是:先查询该日志分段所对应时间戳索引文件,找到最后⼀条索引项,若最后⼀条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改时间。
- 查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址。
- 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。
注意:timestamp⽂件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的,因为数据的写入是各自追加。
清理
Kafka 提供两种⽇志清理策略:
⽇志删除:按照⼀定的删除策略,将不满⾜条件的数据进⾏数据删除
⽇志压缩:针对每个消息的 Key 进⾏整合,对于有相同 Key 的不同 Value 值,只保留最后⼀个版本。
Kafka 提供 log.cleanup.policy 参数进⾏相应配置,默认值: delete ,还可以选择 compact 。
主题级别的配置项是 cleanup.policy 。
日志删除
基于时间
⽇志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定⽇志保留的
时间节点。如果超过该设定值,就需要进⾏删除。默认是 7 天, log.retention.ms 优先级最⾼。
Kafka 依据⽇志分段中最⼤的时间戳进⾏定位。
⾸先要查询该⽇志分段所对应的时间戳索引⽂件,查找时间戳索引⽂件中最后⼀条索引项,若最后⼀条索引项的时间戳字段值⼤于 0,则取该值,否则取最近修改时间。
为什么不直接选最近修改时间呢?
因为日志文件可以有意⽆意的被修改,并不能真实的反应日志分段的最⼤时间信息。
删除过程
- 从⽇志对象中所维护⽇志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些⽇志分段进⾏读取操
作。 - 这些⽇志分段所有⽂件添加 上 .delete 后缀。
- 交由⼀个以 "delete-file" 命名的延迟任务来删除这些 .delete 为后缀的⽂件。延迟执⾏时间可以通过
file.delete.delay.ms 进⾏设置
如果活跃的⽇志分段中也存在需要删除的数据时?
Kafka 会先切分出⼀个新的⽇志分段作为活跃⽇志分段,该⽇志分段不删除,删除原来的⽇志分段。
先腾出地⽅,再删除。
基于⽇志⼤⼩
⽇志删除任务会检查当前⽇志的⼤⼩是否超过设定值。设定项为 log.retention.bytes ,单个⽇志分段的⼤⼩由 log.segment.bytes 进⾏设定。
删除过程 - 计算需要被删除的⽇志总⼤⼩ (当前⽇志⽂件⼤⼩(所有分段)减去retention值)。
- 从⽇志⽂件第⼀个 LogSegment 开始查找可删除的⽇志分段的⽂件集合。
-
执⾏删除。
基于偏移量
根据⽇志分段的下⼀个⽇志分段的起始偏移量是否⼤于等于⽇志⽂件的起始偏移量,若是,则可以删除此⽇志分段。
注意:⽇志⽂件的起始偏移量并不⼀定等于第⼀个⽇志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。
删除过程
- 从头开始遍历每个⽇志分段,⽇志分段1的下⼀个⽇志分段的起始偏移量为21,⼩于logStartOffset,将⽇志
分段1加⼊到删除队列中 - ⽇志分段 2 的下⼀个⽇志分段的起始偏移量为35,⼩于 logStartOffset,将 ⽇志分段 2 加⼊到删除队列中
- ⽇志分段 3 的下⼀个⽇志分段的起始偏移量为57,⼩于logStartOffset,将⽇志分段3加⼊删除集合中
- ⽇志分段4的下⼀个⽇志分段的其实偏移量为71,⼤于logStartOffset,则不进⾏删除。
⽇志压缩策略
1. 概念
⽇志压缩是Kafka的⼀种机制,可以提供较为细粒度的记录保留,⽽不是基于粗粒度的基于时间的保留。
对于具有相同的Key,⽽数据不同,只保留最后⼀条数据,前⾯的数据在合适的情况下删除。
2. 应⽤场景
⽇志压缩特性,就实时计算来说,可以在异常容灾⽅⾯有很好的应⽤途径。⽐如,我们在Spark、Flink中做实时
计算时,需要⻓期在内存⾥⾯维护⼀些数据,这些数据可能是通过聚合了⼀天或者⼀周的⽇志得到的,这些数据⼀旦
由于异常因素(内存、⽹络、磁盘等)崩溃了,从头开始计算需要很⻓的时间。⼀个⽐较有效可⾏的⽅式就是定时将
内存⾥的数据备份到外部存储介质中,当崩溃出现时,再从外部存储介质中恢复并继续计算。
使⽤⽇志压缩来替代这些外部存储有哪些优势及好处呢?这⾥为⼤家列举并总结了⼏点:
Kafka即是数据源⼜是存储⼯具,可以简化技术栈,降低维护成本
使⽤外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使⽤这些Key将数据取回,实现起来有⼀定的⼯程难度和复杂度。使⽤Kafka的⽇志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读
回到内存就可以了
Kafka对于磁盘的读写做了⼤量的优化⼯作,⽐如磁盘顺序读写。相对于外部存储介质没有索引查询等⼯作
量的负担,可以实现⾼性能。同时,Kafka的⽇志压缩机制可以充分利⽤廉价的磁盘,不⽤依赖昂贵的内存
来处理,在性能相似的情况下,实现⾮常⾼的性价⽐(这个观点仅仅针对于异常处理和容灾的场景来说)
⽇志压缩⽅式的实现细节
主题的 cleanup.policy 需要设置为compact。
Kafka的后台线程会定时将Topic遍历两次:
- 记录每个key的hash值最后⼀次出现的偏移量
-
第⼆次检查每个offset对应的Key是否在后⾯的⽇志中出现过,如果出现了就删除对应的⽇志。
⽇志压缩允许删除,除最后⼀个key之外,删除先前出现的所有该key对应的记录。在⼀段时间后从⽇志中清理,
以释放空间。
注意:⽇志压缩与key有关,确保每个消息的key不为null。
压缩是在Kafka后台通过定时重新打开Segment来完成的,Segment的压缩细节如下图所示:
⽇志压缩可以确保:
- 任何保持在⽇志头部以内的使⽤者都将看到所写的每条消息,这些消息将具有顺序偏移量。可以使⽤Topic
的min.compaction.lag.ms属性来保证消息在被压缩之前必须经过的最短时间。也就是说,它为每个消息在
(未压缩)头部停留的时间提供了⼀个下限。可以使⽤Topic的max.compaction.lag.ms属性来保证从收到
消息到消息符合压缩条件之间的最⼤延时
消息始终保持顺序,压缩永远不会重新排序消息,只是删除⼀些⽽已
消息的偏移量永远不会改变,它是⽇志中位置的永久标识符
从⽇志开始的任何使⽤者将⾄少看到所有记录的最终状态,按记录的顺序写⼊。另外,如果使⽤者在⽐
Topic的log.cleaner.delete.retention.ms短的时间内到达⽇志的头部,则会看到已删除记录的所有delete标
记。保留时间默认是24⼩时。
默认情况下,启动⽇志清理器,若需要启动特定Topic的⽇志清理,请添加特定的属性。配置⽇志清理器,这⾥为
⼤家总结了以下⼏点: - log.cleanup.policy 设置为 compact ,Broker的配置,影响集群中所有的Topic。
- log.cleaner.min.compaction.lag.ms ,⽤于防⽌对更新超过最⼩消息进⾏压缩,如果没有设置,除最
后⼀个Segment之外,所有Segment都有资格进⾏压缩
log.cleaner.max.compaction.lag.ms ,⽤于防⽌低⽣产速率的⽇志在⽆限制的时间内不压缩。
Kafka的⽇志压缩原理并不复杂,就是定时把所有的⽇志读取两遍,写⼀遍,⽽CPU的速度超过磁盘完全不是问
题,只要⽇志的量对应的读取两遍和写⼊⼀遍的时间在可接受的范围内,那么它的性能就是可以接受的。