kafka之浅谈如何去保证数据不重复消费

kafka之浅谈如何去保证数据不重复消费

一。背景:上游数据流,将数据推入kafka中,作为消费者,消费数据并进行处理,对于交易数据,非常敏感,不能出现重复,在消费这一过程中,如何去保证我们不会去重复消费数据。

二。导致数据重复消费的原因一般有:

1.数据消费处理成功(落地入库,或者各种处理成功),向kafka中提交偏移量时,由于宕机,或者断网之类的失败了,这时候其实相对与系统来说,这笔数据已经是处理过了,就会出现重复数据。

2.一般是有新的消费者加入之类的,发生了再均衡,导致数据重发消费。

三。项目使用真实使用

方案:

1.关闭自动提交,设置enable-auto-commit为 false,采用手动Offset提交

2.提交方式,简单提一下两种提交方式的却别

consumer.commitAsync();//异步提交

异步提交:异步提交,非阻塞,有更好的并发性能,但是同时也是还会提交失败,导致数据重复。

consumer.commitSync();//同步提交

同步提交:同步提交,是阻塞的,在提交失败时,会一直阻塞,直到超时,可回调。

这里我们采用异步+同步提交,异步提交失败后,采用同步提交,一直阻塞,达到超时时间还未提交,执行同步提交回调,可记录提交失败的数据

简单示例:

public void commitOffset() {

try {

consumer.commitAsync();//异步提交

} catch (CommitFailedException e) {

consumer.commitSync();

}

}

3.每次使用map跟踪记录偏移量,并将偏移量维护到数据库中,可作为唯一标识字段等。

部分代码

msgList =consumer.poll(Duration.ofMillis(100));

if (msgList !=null && !msgList.isEmpty()) {

for (ConsumerRecord record :msgList) {

currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() +1));

4.在订阅数据时,重写ConsumerRebalandeListener再分区监听器,即继承ConsumerRebalanceListener接口,

 #consumer.subscribe(TOPIC_LIST, new SaveOffsetOnRebalance(consumer));

5.当发生再均衡前后也要记录提交偏移量,根据具体的使用场景,也可以选着将offset维护在库里面,比如,在对消费的数据进行落地时这一场景,有可能会出现入库成功,提交失败,或者提交成功,入库失败等,这一类场景最安全的就是落地维护offset,但是也确实带来了一定的复杂度,对库的读写性能要求也比较高,redis就比较合适,这里不深挖了。

重写方法:onPartitionsRevoked,该方法会在再均衡开始之前和消费者停止读取消息之后被调用。在这个方法里,我们需要做的是,把map中跟踪的偏移量进行提交。

简单例子:

public void onPartitionsRevoked(Collection collection) {

consumer.commitSync();

if(logger.isInfoEnabled()){

logger.info("*- in ralance:onPartitionsRevoked");

}

}

重写方法:onPartitionsAssigned,该方法会在重新分配partition之后和消费者开始读取消息之前被调用。这里我们需要做的是,读取我们库中维护的offset,并使用seek设置开始offset位置。

@Override

public void onPartitionsAssigned(Collection collection) {

//rebalance之后 获取新的分区,获取最新的偏移量,设置拉取分量

for (TopicPartition partition : collection) {

        if(logger.isInfoEnabled()){

            logger.info("*- partition:" + partition.partition());

        }

        //获取消费偏移量,实现原理是向协调者发送获取请求

        OffsetAndMetadata offset = consumer.committed(partition);

        //设置本地拉取分量,下次拉取消息以这个偏移量为准

        consumer.seek(partition,getOffsetFromDB(partition));

    }

这里主要体现思路,具体的方案,一切从实际业务出发,不然就是耍流氓,但是大多思路不变,变动的是细节

热爱生活的码小子wmxiang:

工作之余,记录自己平时的一些小毛病,以及问题排查和解决方案思路。记录自己的经验和不足之处,笔记中可能会有很多不足之处,欢迎各位留言指正讨论。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容