怎么确保produce(生产者不丢数据)
答:将ack机制设置为all,但是效率会降低。ack机制流程图如下:
讲一讲kafka的ack三种机制
request.required.acks有三个值:0,1,-1
0表示生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱,当server挂掉的时候数据就会丢失
1表示生产者会等待ack值leader副本确认接收此消息后发送ack,但是如果leader挂掉后它不确保是否复制完成新leader也会丢失数据。
-1表示all,服务端会等待所有follower的副本收到数据后,才会收到leader发送出的ack,保证数据不丢失。
kafka与传统MQ消息系统之间有三个关键区别:
1.kafka持久化日志,这些日志可以重复读取和无限期保留
2.kafka是一个分布式系统,以集群的方式运行,可以灵活伸缩,内部采用复制数据提升了容错性和高可用性
3.kafka支持实时流式处理
kafka可以脱离zookper单独使用吗,为什么
不能,因为kafka使用zookper管理和协调kafka的节点服务器。
kafka有几种数据保留策略
两种保留策略:按照过期时间保留 。按照存储消息大小保留
kafka同时设置了7天和10G清除数据,到第五天消息达到10G,这个时候kafka将如何处理:
kafka会将数据清理,时间个大小不论哪个满足条件,都会清空数据。
什么情况会导致kafka变慢:
cpu瓶颈,磁盘读写瓶颈,网络瓶颈
使用kafka集群时需要注意什么:
集群的数量不是越多越好,最好不要超过7个。
因为节点越多,消息复制时间越长,整个群组吞吐量就降低。
集群数量最好是单数,因为超过一半故障的集群就不能用了,设置为单数容错性更高。
关于零拷贝:
1、应用程序中调用read()
方法,这里会涉及到一次上下文切换(用户态->内核态),底层采用DMA(direct memory access)读取磁盘的文件,并把内容存储到内核地址空间的读取缓存区。
2、由于应用程序无法读取内核地址空间的数据,如果应用程序要操作这些数据,必须把这些内容从读取缓冲区拷贝到用户缓冲区。这个时候,read()
调用返回,且引发一次上下文切换(内核态->用户态),现在数据已经被拷贝到了用户地址空间缓冲区,这时,如果有需要,应用程序可以操作修改这些内容。
3、我们最终目的是把这个文件内容通过Socket传到另一个服务中,调用Socket的send()
方法,这里又涉及到一次上下文切换(用户态->内核态),同时,文件内容被进行第三次拷贝,被再次拷贝到内核地址空间缓冲区,但是这次的缓冲区与目标套接字相关联,与读取缓冲区没有半点关系。
4、send()
调用返回,引发第四次的上下文切换,同时进行第四次的数据拷贝,通过DMA把数据从目标套接字相关的缓存区传到协议引擎进行发送。
"在整个过程中,过程1和4是由DMA负责,并不会消耗CPU,只有过程2和3的拷贝需要CPU参与,整明白了?"
"我消化一下..."
半小时后...
"狼哥,这个过程,感觉好几次的数据拷贝都是多余的,很影响性能啊"
"对,所以才有了零拷贝技术"
"具体咋实现?"
"慢慢来,如果在应用程序中,不需要操作内容,过程2和3就是多余的,如果可以直接把内核态读取缓存冲区数据直接拷贝到套接字相关的缓存区,是不是可以达到优化的目的?"
这种实现,可以有以下几点改进:
- 上下文切换的次数从四次减少到了两次
- 数据拷贝次数从四次减少到了三次(其中DMA copy 2次,CPU copy 1次)
"怎么实现?"
"在Java中,正好FileChannel的transferTo() 方法可以实现这个过程,该方法将数据从文件通道传输到给定的可写字节通道, 上面的file.read()
和 socket.send()
调用动作可以替换为 transferTo()
调用"
public void transferTo(long position, long count, WritableByteChannel target);
在 UNIX 和各种 Linux 系统中,此调用被传递到 sendfile()
系统调用中,最终实现将数据从一个文件描述符传输到了另一个文件描述符。
"确实改善了很多,但还没达到零拷贝的要求,还有其它黑技术吗?"
"对的,如果底层网络接口卡支持收集操作的话,就可以进一步的优化。"
"怎么优化?"
在 Linux 内核 2.4 及后期版本中,针对套接字缓冲区描述符做了相应调整,DMA自带了收集功能,对于用户方面,用法还是一样的,但是内部操作已经发生了改变:
- 第一步,transferTo() 方法引发 DMA 将文件内容拷贝到内核读取缓冲区。
- 第二步,把包含数据位置和长度信息的描述符追加到套接字缓冲区,避免了内容整体的拷贝,DMA 引擎直接把数据从内核缓冲区传到协议引擎,从而消除了最后一次 CPU参与的拷贝动作。
Kafka中的HW、LEO、LSO等分别代表什么?
1.这些其实跟ISR有着紧密的关系,ISR(In-Sync Replicas )简单说,就是为了 保证数据的一致性使用了ISR机制。是一个副本的列表,所有与leader副本保持一定程度同步的副本(包括leader)组成ISR.
2.HW (High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
下图表示一个日志文件,这个日志文件中只有9条消息,第一条消息的offset(LogStartOffset)为0,最有一条消息的offset为8,offset为9的消息使用虚线表示的,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取offset在 0 到 5 之间的消息,offset为6的消息对消费者而言是不可见的。
LEO (Log End Offset),标识当前日志文件中下一条待写入的消息的offset。上图中offset为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的offset值加1.分区 ISR 集合中的每个副本都会维护自身的 LEO ,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
LSO 特指LastStableOffset。它具体与kafka的事物有关。
kafka有哪些情形会造成重复消费?或丢失信息?
先处理后提交offset,会造成重读消费
先提交offset后处理,会造成数据丢失
kafka避免重复消费,或丢失信息
- 丢包问题
1.1 问题描述
所谓丢包一般是指发送方发送的数据未到达接收方. 常见的丢包可能发生在发送端, 网络,接收端.
例如,消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。
1.2 问题解决
解决方案:
对kafka进行限速,平滑流量
启用重试机制,重试间隔时间设置长一些
Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。
检测方法:使用重放机制,查看问题所在。
2.重发问题
2.1 问题描述
重发问题:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
问题场景:
1.设置offset为自动提交,正在消费数据,kill消费者线程;
2.设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费;
3.消费kafka与业务逻辑在一个线程中处理,可能出现消费程序业务处理逻辑阻塞超时,导致一个周期内,offset还未提交;继而重复消费,但是业务逻辑可能采用发送kafka或者其他无法回滚的方式;
2.2 问题分析
底层根本原因:已经消费了数据,但是offset没提交。
配置问题:设置了offset自动提交
重复消费最常见的情况:re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
2.3 问题解决
解决办法:至少成功发送一次+去重操作(幂等性)
2.3.1 如何保证至少成功发送一次
保证不丢失消息:
生产者(ack=all 代表至少成功发送一次)
消费者 (offset手动提交,业务逻辑成功处理后,提交offset)
2.3.2 去重操作(幂等性)
去重问题:消息可以使用唯一id标识
保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)