Kafka高性能高吞吐的原因:
kafka是作为消息中间件、流处理平台、消息引擎性能吞吐量均是是目前市面第一,得益于它的消息读写的方式
一、延时批量发送消息
Kafka不会一有消息就会进行发送,而延迟到固定的时间,生产者可以将多个发送到同一个分区的消息放到同一个批次内积累起来,当批次大小填满(通过batch.size设置批次大小)或者是到达了超时时间(通过linger.ms设置超时时间)就会进行发送,通过批量的发送减小了发送的次数,有效的减少网络消耗,这也是kafka高吞吐的主要原因。
二、页缓存
kafka的写入和读取都是先通过操作页缓存,页缓存操作的是内存,写时kafka只会将消息写入页缓存,然后由操作系统决定什么时候将数据写入磁盘,读消息时kafka也是先到页缓存中捞数据,命中则直接返回,大部分情况kafka都能从也缓存中读取到数据,所以kafka的读取和写入都是操作内存速度非常快,得益于JVM对象的朝生夕死,大部分的情况下,kafka只需要6G的JVM内存就可以完成正常的工作,其他的内存可以分配给kafka的页缓存
三、磁盘顺序读写
机械磁盘顺序读写比内存的随机读写和固态硬盘的顺序读写还要快,所以kafka写入消息的操作是采用的追加写入(append)的方式进行,即只能在日志文件的末尾追加写入新的消息,并且不能修改已经写入的消息,这就是kafka的对磁盘的顺序访问,可以很轻松的实现每秒几十万消息的写入量,这是kafka高性能的重要原因四、sendfile零拷贝
kafka读取消息先是从缓存页中读取消息,如果命中就直接将消息经缓存页交由网络的socket缓存中进行发送,不必经过用户态和内核态的切换,节省了内核缓冲区与用户态应用程序缓冲区之间的数据拷贝的消耗,减少了CPU开销,这是kafka高吞吐高性能的主要原因五、IO模型
Kafka 客户端底层使用了 Java的 selector,selector 在 Linux 上的实现机制是 epoll
六、分区机制
kafka通过将Topic进行拆分,分成一个个partition,partition的leader通常会均匀的分配到不同的broker上,这样将消息的读写压力分配给了集群的各个broker上,充分的利用了集群的资源
Kafka负载均衡和故障转移:
kafka通过将Topic进行拆分,分成一个个partition,通常还会对partition创建replica(replica数量不能超过broker的数量-1),每个partition leader和他们的replica会组成一个ISR,ISR中有leader和follower的角色,follower作为leader的备份负责将partition leader复制备份,leader负责处理客户端请求,ISR里的每个成员会均匀的落到broker中,ISR中的每个成员的id是所在当前broker的编号,也就是说同一个ISR的成员不能在同一个broker中,这样就避免了一个broker挂了,其他broker上的follower会顶替挂掉的broker上的leader,所以ISR的成员要分布在不同的broker中。Kafka还会将同一个Tocpic下的partition中作为leader的partition均匀的分配到不同的broker中,这样可以使一个broker不至于负载过热,使kafka充分的利用集群的资源,实现了负载均衡
而kafka是不需要保存集群状态的,kafka依赖Zookeeper协调服务来获取集群的状态,每个broker会通过临时会话机制将自己的元数据保存在Zookeeper的临时节点中,通过通过心跳来检查broker的状态,通过zookeeper的通知集群可以相互通信共享状态,当集群中的某个broker挂了,zookeeper可以通知broker controller进行新的partition leader的选举,这样就达到了故障转移的效果
生产者 Producer:
kafka的消息是由key,value,timestamp组成,其中value是消息体,保存实际的消息数据。timestamp是消息发送的时间戳。
key是消息键,它主要起到了路由的作用,在Apache kafka producer中提供了默认的分区器,如果指定了固定的key值就会取key的哈希值来决定路由到哪个partition中,如果没有指定key则会默认使用轮询的方式均匀的将消息发送到partition中,当然我们可以自定义分区器
代码演示
导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
先定义好参数
private final static Properties properties = new Properties();
static{
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
}
异步发送,发送后就不管发送的结果如何,不推荐
/*
Producer异步发送演示
*/
public static void producerSend(){
// Producer的主对象
Producer<String,String> producer = new KafkaProducer<>(properties);
// 消息对象 - ProducerRecoder
for(int i=0;i<10;i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record);
}
// 所有的通道打开都需要关闭
producer.close();
}
实际上kafka的send方法是个异步的过程,好在它提供了结果回调这种非阻塞式的方法,通过onCompletion回调可以知道发送的结果
在onCompletion方法中的参数RecordMetadata 和Exception不会同时为null,如果Exception为null或者RecordMetadata 不为null代表消息发送成功,如果RecordMetadata为null或者Exception不为null代表消息发送失败
/*
Producer异步发送带回调函数
*/
public static void producerSendWithCallback(){
// Producer的主对象
Producer<String,String> producer = new KafkaProducer<>(properties);
// 消息对象 - ProducerRecoder
for(int i=0;i<10;i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("消息发送成功");
System.out.println("partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}else {
System.out.println("消息发送失败,执行失败处理,一般存库或者重发");
}
}
});
}
// 所有的通道打开都需要关闭
producer.close();
}
kafka的send方法通过future可以实现异步阻塞的方式
/*
Producer异步阻塞发送
*/
public static void producerSyncSend() throws ExecutionException, InterruptedException {
// Producer的主对象
Producer<String,String> producer = new KafkaProducer<>(properties);
// 消息对象 - ProducerRecoder
for(int i=0;i<10;i++){
String key = "key-"+i;
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,key,"value-"+i);
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
System.out.println(key + "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}
// 所有的通道打开都需要关闭
producer.close();
}
失败重试
(前段时间我遇到一个弱智面试官,我说kafka可以支持错误重试,他就把我pass掉了,说kafka是不支持重试的,我反驳了他还不接受,我想一锤子敲死他,不懂还当什么面试官,真是出来祸害社会)
无论发送方式是异步还是同步的,都可能会发送失败错误,错误包括了可重试错误和不可重试错误,对于可重试的错误,如果还在kafka可以重新发送的次数范围内就会继续发送,如果重新发送的次数在配置的次数还没有成功就会在回调函数nCompletion的Exception打印错误信息,这时需要我们手动处理,一般可以进行存
库记录发送失败的信息,重试配置是
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
下面是处理重试错误的代码
//先把重试次数设置为3
properties.put(ProducerConfig.RETRIES_CONFIG,"3")
/*
Producer异步发送带回调函数
*/
public static void producerSendWithCallback(){
// Producer的主对象
Producer<String,String> producer = new KafkaProducer<>(properties);
// 消息对象 - ProducerRecoder
for(int i=0;i<10;i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
System.out.println("消息发送成功");
System.out.println("partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}else {
if (e instanceof RetriableException){
System.out.println("可重试错误");
}else {
System.out.println("不可重试错误");
}
System.out.println("消息发送失败,执行失败处理,一般存库或者重发");
}
}
});
}
// 所有的通道打开都需要关闭
producer.close();
}
自定义分区器
在Apache kafka producer中提供了默认的分区器,如果指定了固定的key值就会取key的哈希值来决定路由到哪个partition中,如果没有指定key则会默认使用轮询的方式均匀的将消息发送到partition中,当然我们可以自定义分区器
代码演示
需要继承Partitioner
public class SamplePartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String keyStr = key + "";
String keyInt = keyStr.substring(4);
int i = Integer.parseInt(keyInt);
return i%2;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
定义好我们的分区器后,需要给其添加到配置属性中,这样我们就可以按照我们的分区器进行消息路由了
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.kafka.producer.SamplePartition");
生产者的可靠性
生产者的幂等性
生产者的事务
生产者数据乱序问题
生产者的参数
batch.size:batch.size 是 producer 最重要的参数之一 ! producer 会将发往同一分区的多条消息封装进一个 batch 中。当 batch 满了的时候, producer 会发送 batch 中的所有消息 。不过, producer 并不总是等待 batch 满了才发送消息,很有可能当 batch 还有很多空闲空间时 但是超过了linger.ms设置的时间producer 就发送该 batch 。
linger.ms: linger.ms 参数就是控制消息发送延时行为的 。 该参数默认值是 0 ,表示消息需要被立即 发送,无须关心 b atch 是否己被填满
ack:
0: 表示 producer 从来不等待来自 broker 的确认信息。这个选择提供了最小的时延但同时风险最大
1:表示获得 leader已经接收了数据的确认信息,延迟一般。kafka默认的配置,这个方案比较适中
-1:producer 会获得所有同步 replicas 都收到数据的确认。同时时延最大
retries:重试次数,默认为0,设置该参数为一个大于 0 的值。 只不过在考虑 retries 的设置时,有两点需要着重注意 。
1、重试可能造成消息的乱序:比如由于瞬时的网络抖动使得 broker 端己成功写入 消息但没有成功发送响应给 producer ,因此 producer 会认为消息发送失败,从而开启 重试机制
2、重试可能造成消息的重复发送:
buffer.memory:producer 端用于缓存消息的缓冲区大小,单位是字节,默认值是 33554432,即 32MB 。由于采用了异步发送消息的设计架构, Java 版本 producer 启动时会首先 创建一块 内存缓冲区用于保存待发送的消息,然后由另 一个专属线程负责从缓冲区中读取消息 执行真正的发送 。这部分 内存空间的大小即是由 buffer.memory 参数指定的。若 producer 向 缓 冲区写消息的速度超过了专属 I/O 线程发送消息的速度,那么必然造成该缓冲区空间的不断增 大。此时 producer 会停止手头的工作等待 1/0 线程追上来,若一段时间之后 1/0 线程还是无法 追上 producer 的进度,那么 producer 就会抛出异常并期望用户介入进行处理。
compression.type:设置 producer 端是否压缩消息,默认值是 none ,即不压缩消息 。和
任何系统相同的是, Kafka 的 producer 端引入压缩后可以显著地降低网络 νo 传输开销从而提 升整体吞吐量,但也会增加 producer 端机器的 CPU 开销 。另 外,如果 broker 端的压缩参数设 置得与 producer 不同, broker 端在写入消息时也会额外使用 CPU 资源对消息进行对应的解压 缩-重压缩操作。
max.request.size:该参数用于控制 producer 发送请求的大小 。 实际上该参数控制的是
producer 端能够发送的最大消息大小。默认的 1048576 字节太小 了 , 通常无法满足 企业级消息的大小要求 。
request.timeout.ms:当 producer 发送请求给 broker 后 , broker 需要在规定 的时 间范围 内 将处理结果返还给producer 。默认是 30 秒 。 这就是说,如果 broker 在 30 秒内都 没有给 produc er 发送响应,那么 produc er 就会认为该请求超时了,并在回调函数中显式地抛出 TimeoutException 异常交由用户处理。
消费者 consumer
消费者使用一个消费者组名(即 group.id )来标记自己, topic 的每条消息 都只会被发送到每个订阅它的消费者组的一个消费者实例上。
①一个 consumer group 可能有若干个 consumer 实例( 一个 group 只有一个实例也是允许的);
②对于同一个 group 而言, topic 的每条 消息只能被发送到 group 下的一个 consumer 实例上;
③topic 消息可以被发送到多个 group 中 ;
消费者流程
Offset
Kafka中有两个Offset,一个是partition的Offset,一个是consumer的offset
partition的Offset:partition 上的每条消息都会被分配一个唯一 的序列号。该序列号是从 0 开始顺序递增的整数。位移信息可以唯一定位到某 partition 下的一条消息。
consumer的offset:每个 consumer 实例都会为它消费的分区维护属于自己的位置信息来记录当前 消费了多少条消息 。在以前是把offset保存在了zookeeper中,而consumer 把offset提交到 Kafka 的一个内部 topic (__consumer_offset),所以现在consumer可以不用连接zookeeper
负载
kafka分配分区的策略有三种range,round-robin,sticky,默认使用range策略给consumer分配分区,通过分区对组的分配可以达到负载均衡的效果
消费者代码演示
定义参数
private static final Properties props = new Properties();
static{
//连接哪个kafka服务器
props.setProperty("bootstrap.servers", "192.168.220.128:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void simpleConsumer(){
KafkaConsumer<String,String> consumer = new KafkaConsumer(props);
// 消费订阅哪一个Topic或者几个Topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
//每一万毫秒拉取一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records)
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
}
提交offset
默认情况下消费者会每隔5秒进行自动提交offset,如果想处理完业务再提交,可以设置手动提交,手动提交也是避免消息未消费是有效手动
设置手动提交
props.setProperty("enable.auto.commit", "false");
private static void commitedOffset() {
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
// 消费订阅哪一个Topic或者几个Topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
// 想把数据保存到数据库,成功就成功,不成功...
// TODO record 2 db
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
// 如果失败,则回滚, 不要提交offset
}
if (records.isEmpty()){
System.out.println("消费失败");
}
// 手动通知offset提交
consumer.commitAsync();
}
}
如果提升消费速度
1、增加分区和消费者,使分区和消费者能一对一。
ISR同步机制
每个partition 都有一个ISR列表,ISR中follower会向leader发送同步请求,leader副本会向follower副本推送数据,follower通过写入推送的数据完成数据的同步,由于网络等问题follower的数据可能会落后leader。
在旧版本的kafka中是通过判断follower的数据落后leader到达一定的量就将其剔除,默认是4000条,这造成的问题是:假设落后4000条消息就剔除,当某个流量高峰期,每次生产者都发送5000条,那么leader每次都会剔除follower,就算是follower本身没有任何问题
新版本的kafka是通过落后时间去剔除,默认是10秒,follower无论落后多少只要在10秒内赶上leader即可,如果持续落后10秒就会被剔除
HW(高水印)和LEO(日志末端位移)
在ISR通过HW和LEO去比较同步情况,在ISR中每个成员都要去维护自己的HW和LEO
- LEO:在leader和follower都代表了自身的所有消息的位移,在leader中生产者每发送一条消息LEO就会加1,对于follower来说LEO代表每成功写入一条leader推送过来的数据就会将其LEO加1
- HW:在leader中HW的值等于ISR成员里最慢的那个follower的复制量,也就是等于最慢的那个follower的LEO,也是消费者可消费消息的最大范围,通常可以将HW理解为“已备份的”或者是“已提交的”,它可以起到限制消费者消息的消息最大位移的作用。而在follower中,最落后的follower的HW值和自身的LEO值相等并且等于leader的HW,如果follower不是最落后的,其HW值是小于自身的LEO值并且等于leader的HW