kafka实现无消息丢失与精确一次语义(exactly once)处理

在很多的流处理框架的介绍中,都会说kafka是一个可靠的数据源,并且推荐使用Kafka当作数据源来进行使用。这是因为与其他消息引擎系统相比,kafka提供了可靠的数据保存及备份机制。并且通过消费者位移这一概念,可以让消费者在因某些原因宕机而重启后,可以轻易得回到宕机前的位置。

但其实kafka的可靠性也只能说是相对的,在整条数据链条中,总有可以让数据出现丢失的情况,今天就来讨论如何避免kafka数据丢失,以及实现精确一致处理的语义。

kafka无消息丢失处理

在讨论如何实现kafka无消息丢失的时候,首先要先清楚大部分情况下消息丢失是在什么情况下发生的。为什么是大部分,因为总有一些非常特殊的情况会被人忽略,而我们只需要关注普遍的情况就足够了。接下来我们来讨论如何较为普遍的数据丢失情况。

1.1 生产者丢失

前面介绍Kafka分区和副本的时候,有提到过一个producer客户端有一个acks的配置,这个配置为0的时候,producer是发送之后不管的,这个时候就很有可能因为网络等原因造成数据丢失,所以应该尽量避免。但是将ack设置为1就没问题了吗,那也不一定,因为有可能在leader副本接收到数据,但还没同步给其他副本的时候就挂掉了,这时候数据也是丢失了。并且这种时候是客户端以为消息发送成功,但kafka丢失了数据。

要达到最严格的无消息丢失配置,应该是要将acks的参数设置为-1(也就是all),并且将min.insync.replicas配置项调高到大于1,这部分内容在上一篇副本机制有介绍详细解析kafka之kafka分区和副本

同时还需要使用带有回调的producer api,来发送数据。注意这里讨论的都是异步发送消息,同步发送不在讨论范围。

public class send{
    ......
    public static void main(){
        ...
        /*
        *  第一个参数是 ProducerRecord 类型的对象,封装了目标 Topic,消息的 kv
        *  第二个参数是一个 CallBack 对象,当生产者接收到 Kafka 发来的 ACK 确认消息的时候,
        *  会调用此 CallBack 对象的 onCompletion() 方法,实现回调功能
        */
         producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
                        new DemoCallBack(startTime, messageNo, messageStr));
        ...
    }
    ......
}

class DemoCallBack implements Callback {
    /* 开始发送消息的时间戳 */
    private final long startTime;
    private final int key;
    private final String message;

    public DemoCallBack(long startTime, int key, String message) {
        this.startTime = startTime;
        this.key = key;
        this.message = message;
    }

    /**
     * 生产者成功发送消息,收到 Kafka 服务端发来的 ACK 确认消息后,会调用此回调函数
     * @param metadata 生产者发送的消息的元数据,如果发送过程中出现异常,此参数为 null
     * @param exception 发送过程中出现的异常,如果发送成功为 null
     */
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        if (metadata != null) {
            System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n",
                    key, message, metadata.partition(), metadata.offset(), elapsedTime);
        } else {
            exception.printStackTrace();
        }
    }
}

更详细的代码可以参考这里:Kafka生产者分析——KafkaProducer

我们之前提到过,producer发送到kafka broker的时候,是有多种可能会失败的,而回调函数能准确告诉你是否确认发送成功,当然这依托于acks和min.insync.replicas的配置。而当数据发送丢失的时候,就可以进行手动重发或其他操作,从而确保生产者发送成功。

1.2 kafka内部丢失

有些时候,kafka内部因为一些不大好的配置,可能会出现一些极为隐蔽的数据丢失情况,那么我们分别讨论下大致都有哪几种情况。

首先是replication.factor配置参数,这个配置决定了副本的数量,默认是1。注意这个参数不能超过broker的数量。说这个参数其实是因为如果使用默认的1,或者不在创建topic的时候指定副本数量(也就是副本数为1),那么当一台机器出现磁盘损坏等情况,那么数据也就从kafka里面丢失了。所以replication.factor这个参数最好是配置大于1,比如说3

接下来要说的还是和副本相关的,也是上一篇副本中提到的unclean.leader.election.enable 参数,这个参数是在主副本挂掉,然后在ISR集合中没有副本可以成为leader的时候,要不要让进度比较慢的副本成为leader的。不用多说,让进度比较慢的副本成为leader,肯定是要丢数据的。虽然可能会提高一些可用性,但如果你的业务场景丢失数据更加不能忍受,那还是将unclean.leader.election.enable设置为false吧

1.3 消费者丢失

消费者丢失的情况,其实跟消费者位移处理不当有关。消费者位移提交有一个参数,enable.auto.commit,默认是true,决定是否要让消费者自动提交位移。如果开启,那么consumer每次都是先提交位移,再进行消费,比如先跟broker说这5个数据我消费好了,然后才开始慢慢消费这5个数据。

这样处理的话,好处是简单,坏处就是漏消费数据,比如你说要消费5个数据,消费了2个自己就挂了。那下次该consumer重启后,在broker的记录中这个consumer是已经消费了5个的。

所以最好的做法就是将enable.auto.commit设置为false,改为手动提交位移,在每次消费完之后再手动提交位移信息。当然这样又有可能会重复消费数据,毕竟exactly once处理一直是一个问题呀(/摊手)。遗憾的是kafka目前没有保证consumer幂等消费的措施,如果确实需要保证consumer的幂等,可以对每条消息维持一个全局的id,每次消费进行去重,当然耗费这么多的资源来实现exactly once的消费到底值不值,那就得看具体业务了。

1.4 无消息丢失小结

那么到这里先来总结下无消息丢失的主要配置吧:

  • producer的acks设置位-1,同时min.insync.replicas设置大于1。并且使用带有回调的producer api发生消息。
  • 默认副本数replication.factor设置为大于1,或者创建topic的时候指定大于1的副本数。
  • unclean.leader.election.enable 设置为false,防止定期副本leader重选举
  • 消费者端,自动提交位移enable.auto.commit设置为false。在消费完后手动提交位移。

那么接下来就来说说kafka实现精确一次(exactly once)处理的方法吧。

实现精确一次(exactly once)处理

在分布式环境下,要实现消息一致与精确一次(exactly once)语义处理是很难的。精确一次处理意味着一个消息只处理一次,造成一次的效果,不能多也不能少。

那么kafka如何能够实现这样的效果呢?在介绍之前,我们先来介绍其他两个语义,至多一次(at most once)和至少一次(at least once)。

最多一次和至少一次

最多一次就是保证一条消息只发送一次,这个其实最简单,异步发送一次然后不管就可以,缺点是容易丢数据,所以一般不采用。

至少一次语义是kafka默认提供的语义,它保证每条消息都能至少接收并处理一次,缺点是可能有重复数据。

前面有介绍过acks机制,当设置producer客户端的acks是1的时候,broker接收到消息就会跟producer确认。但producer发送一条消息后,可能因为网络原因消息超时未达,这时候producer客户端会选择重发,broker回应接收到消息,但很可能最开始发送的消息延迟到达,就会造成消息重复接收。

那么针对这些情况,要如何实现精确一次处理的语义呢?

幂等的producer

要介绍幂等的producer之前,得先了解一下幂等这个词是什么意思。幂等这个词最早起源于函数式编程,意思是一个函数无论执行多少次都会返回一样的结果。比如说让一个数加1就不是幂等的,而让一个数取整就是幂等的。因为这个特性所以幂等的函数适用于并发的场景下。

但幂等在分布式系统中含义又做了进一步的延申,比如在kafka中,幂等性意味着一个消息无论重复多少次,都会被当作一个消息来持久化处理。

kafka的producer默认是支持最少一次语义,也就是说不是幂等的,这样在一些比如支付等要求精确数据的场景会出现问题,在0.11.0后,kafka提供了让producer支持幂等的配置操作。即:

props.put("enable.idempotence", ture)

在创建producer客户端的时候,添加这一行配置,producer就变成幂等的了。注意开启幂等性的时候,acks就自动是“all”了,如果这时候手动将ackss设置为0,那么会报错。

而底层实现其实也很简单,就是对每条消息生成一个id值,broker会根据这个id值进行去重,从而实现幂等,这样一来就能够实现精确一次的语义了。

但是!幂等的producery也并非万能。有两个主要是缺陷:

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息不重复,多分区无法保证幂等性。
  • 只能保持单会话的幂等性,无法实现跨会话的幂等性,也就是说如果producer挂掉再重启,无法保证两个会话间的幂等(新会话可能会重发)。因为broker端无法获取之前的状态信息,所以无法实现跨会话的幂等。

事务的producer

当遇到上述幂等性的缺陷无法解决的时候,可以考虑使用事务了。事务可以支持多分区的数据完整性,原子性。并且支持跨会话的exactly once处理语义,也就是说如果producer宕机重启,依旧能保证数据只处理一次。

开启事务也很简单,首先需要开启幂等性,即设置enable.idempotence为true。然后对producer发送代码做一些小小的修改。

//初始化事务
producer.initTransactions();
try {
    //开启一个事务
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    //提交
    producer.commitTransaction();
} catch (KafkaException e) {
    //出现异常的时候,终止事务
    producer.abortTransaction();
}

但无论开启幂等还是事务的特性,都会对性能有一定影响,这是必然的。所以kafka默认也并没有开启这两个特性,而是交由开发者根据自身业务特点进行处理。

以上~


推荐阅读:
分布式系统一致性问题与Raft算法(上)
Scala函数式编程(五) 函数式的错误处理
大数据存储的进化史 --从 RAID 到 Hadoop Hdfs

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容