普通的API
publicstaticvoidCommonDemo(){finalProperties properties =newProperties() {{ put("bootstrap.servers","localhost:9092"); put("group.id","test"); put("enable.auto.commit","true"); put("auto.commit.interval.ms","1000"); put("session.timeout.ms","30000"); put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("auto.offset.reset","earliest"); }}; KafkaConsumer consumer =newKafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("test"));while(true) { ConsumerRecords records = consumer.poll(100);try{ Thread.sleep(200); }catch(InterruptedException e) { e.printStackTrace(); }for(ConsumerRecord record : records) { System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s"+System.lineSeparator(),record.topic(), record.partition(),record.offset(), record.key(), record.value()); } } }
以上是一个非常常见的简单消费者实例,但是,这样,真的,没问题吗?
以上会存在以下的问题:
//自动提交,会有问题
1.默认会5秒提交一次offset,但是中间停止的话会造成重复消费
2.新添加进消费者组的时候,会再均衡,默认从上次消费提交的地方开始,消息重复
3.自动提交,虽然提交了偏移量,但并不知道,哪些消息被处理了,是否处理成功,偏移量是否提交成功
针对以上的内容,做出修改
publicstaticvoidSyncDemo(){finalProperties properties =newProperties() {{ put("bootstrap.servers","localhost:9092"); put("group.id","test"); put("enable.auto.commit","false"); put("auto.commit.interval.ms","1000"); put("session.timeout.ms","30000"); put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("auto.offset.reset","earliest"); }}; KafkaConsumer consumer =newKafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("test"));while(true) { ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){ System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(), record.partition(),record.offset(),record.key(),record.value()); }try{ consumer.commitSync(); }catch(Exception e){ e.printStackTrace(); } } }
以上是同步提交,但是,会存在一些问题,同步提交,会阻塞,直到有返回结果,性能会差一些。
publicstaticvoidAsyncDemo(){finalProperties properties =newProperties() {{ put("bootstrap.servers","localhost:9092"); put("group.id","test"); put("enable.auto.commit","false"); put("auto.commit.interval.ms","1000"); put("session.timeout.ms","30000"); put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("auto.offset.reset","earliest"); }}; KafkaConsumer consumer =newKafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("test"));while(true) { ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){ System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(), record.partition(),record.offset(),record.key(),record.value()); }try{//consumer.commitAsync();//发送提交请求,提交失败就纪录下来consumer.commitAsync(newOffsetCommitCallback() {@OverridepublicvoidonComplete(Map map, Exception e){if(e !=null){ e.printStackTrace(); } } }); }catch(Exception e){ e.printStackTrace(); } } }
异步提交的特性:与同步提交不同的是,遇到错误,commitSync会一直重试,但是commitAsync不会,原因,很简单,如果异步提交还重试,会存在一个问题,a提交2000的偏移量,网络问题,一直重试,但下一个3000的提交成功,这时候,2000的ok了,就会造成消息重复。
publicstaticvoidSyncAndAsyncDemo(){finalProperties properties =newProperties() {{ put("bootstrap.servers","localhost:9092"); put("group.id","test"); put("enable.auto.commit","false"); put("auto.commit.interval.ms","1000"); put("session.timeout.ms","30000"); put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("auto.offset.reset","earliest"); }}; KafkaConsumer consumer =newKafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("test"));try{while(true) { ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){ System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(), record.partition(),record.offset(),record.key(),record.value()); } consumer.commitAsync(); } }catch(Exception e){ e.printStackTrace(); }finally{try{ consumer.commitSync(); }catch(Exception e){ consumer.close(); } } }
同步提交和异步提交使用组合的方式进行提交,但,这还是会存在一些问题。 因为提交都是批量提交的,但是有可能在批量处理没完成,偏移量没完成的时候,出错了
publicstaticvoidPersonalDemo(){ Map currentOffsets =newHashMap<>();intcount =0;finalProperties properties =newProperties() {{ put("bootstrap.servers","localhost:9092"); put("group.id","test"); put("enable.auto.commit","false"); put("auto.commit.interval.ms","1000"); put("session.timeout.ms","30000"); put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("auto.offset.reset","earliest"); }}; KafkaConsumer consumer =newKafkaConsumer<>(properties); consumer.subscribe(Arrays.asList("test"));while(true) { ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){ System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(), record.partition(),record.offset(),record.key(),record.value()); currentOffsets.put(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1,"no meta"));if(count %1000==0){ consumer.commitAsync(); } count++; } } }
指定每1000条提交一次offset。
publicstaticvoidRebalanceListenDemo(){ Map currentOffsets =newHashMap<>();finalProperties properties =newProperties() {{ put("bootstrap.servers","localhost:9092"); put("group.id","test"); put("enable.auto.commit","false"); put("auto.commit.interval.ms","1000"); put("session.timeout.ms","30000"); put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); put("auto.offset.reset","earliest"); }}; KafkaConsumer consumer =newKafkaConsumer<>(properties);classHandleRebalaceimplementsConsumerRebalanceListener{@OverridepublicvoidonPartitionsRevoked(Collection collection){ }@OverridepublicvoidonPartitionsAssigned(Collection collection){ System.out.println("partition rebalance offset is "+currentOffsets); consumer.commitSync(currentOffsets); } }try{ consumer.subscribe(Arrays.asList("test"),newHandleRebalace());while(true) { ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){ System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(), record.partition(),record.offset(),record.key(),record.value()); currentOffsets.put(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1,"no meta")); } consumer.commitAsync(currentOffsets,null); } }catch(Exception e){ e.printStackTrace(); }finally{try{ consumer.commitSync(currentOffsets); }catch(Exception e){ e.printStackTrace(); }finally{ consumer.close(); } } }