Spring Kafka:Retry Topic、DLT 的使用与原理

1. 背景

原生 Kafka 是不支持 Retry Topic 和 DLT (Dead Letter Topic,死信队列)。但是 Spring Kafka 在客户端实现了这两个功能。

2. 版本

spring-kafka 2.7.14(2.7.x 以下版本不支持 Retry Topic)

3. 默认重试策略

默认情况下,spring-kafka 在消费逻辑抛出异常时,会快速重试 10 次(无间隔时间),如果重试完成后,依旧消费失败,spring-kafka 会 commit 这条记录。

默认重试的实现原理是:重置当前 consumer offset,感兴趣的同学可以在 SeekUtils#doSeeks debug 一下

可以通过自定义 SeekToCurrentErrorHandler 来控制消费失败后的处理逻辑。例如:添加重试间隔,重试完成后依旧失败的消息发送到 DLT

3.1. 自定义 SeekToCurrentErrorHandler

    @Bean
    public ErrorHandler errorHandler(KafkaTemplate<Object, Object> kafkaTemplate) {
        ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
        // 设置重试间隔 10秒 次数为 3次
        BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
        // 创建 SeekToCurrentErrorHandler 对象
        return new SeekToCurrentErrorHandler(recoverer, backOff);
    }

添加上述代码后,消费逻辑抛出异常后,会间隔 10s 重试 3 次,重试后依旧失败,会将消息发送到 DLT

关于默认重试策略,Kafka 的 TopicPartition 只会分配给一个消费者,而消费者对于某条消息的重试,会占用消费线程,影响整个 TopicPartition 的消费速度。如果使用 Retry Topic 功能,不会占用消费线程,会有专门的 retry 线程订阅 Retry Topic 执行重试消费。

4. Retry Topic + DLT 使用

可以通过注解和全局配置的方式开启 Retry Topic 功能

4.1. @RetryableTopic

使用注解的方式启用 Retry Topic,在 @KafkaListener 方法上添加 @RetryableTopic 即可

@Slf4j
@Component
public class SimpleConsumer {

    @RetryableTopic()
    @KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
    public void onMessage(MessageWrapper message) {
        log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
        throw new RuntimeException("test kafka exception");
    }

}

此时 Retry Topic 功能已经启用了。当消费逻辑抛出异常时,spring-kafka 会先将消息发送到 Retry Topic,随后在 Main Topic(对应上文的test_topic)中 commit 这条消息。会有专门的线程订阅 Retry Topic,不会影响正常消费

默认重试 3 次,间隔为 1s,如果在重试结束后,还没有成功被消费,该消息会被发送到 DLT 中

默认情况,消息被发送到死信队列后,会输出一条日志。

2022-08-09 16:05:03.920  INFO 4048 --- [ner#3-dlt-0-C-1] o.s.k.retrytopic.RetryTopicConfigurer    : Received message in dlt listener: test_topic-dlt-0@233

上述的日志输出是默认的死信订阅逻辑,用户可以在类中添加 @DltHandler 方法自定义死信消费逻辑

    @DltHandler
    public void processMessage(MessageWrapper message) {
        log.info("dlt {}", message);
    }

至此,你的 Kafka 就拥有了类似 RocketMQ 的消息重试能力,但是配置方面还需要调整一下。

4.2. 定制 @RetryableTopic

可以自定义重试次数,延迟时间,死信策略等等,同时大部分参数还支持使用 Spring EL 表达式读取配置,这里简单列举下,更多的配置读者可以自行探索

    @RetryableTopic(
            attempts = "${kafka.retry.attempts}",
            backoff = @Backoff(delayExpression = "${kafka.retry.delay}", multiplierExpression = "${kafka.retry.multiplier}"),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC
    )
    @KafkaListener(topics = "test_topic", groupId = "demo01-consumer-group-1")
    public void onMessage(MessageWrapper message) {
        log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
        throw new RuntimeException("test kafka exception");
    }

解释一下上述的配置

  • attempts:重试次数
  • @Backoff delayExpression:消费延迟时间
  • @Backoff multiplierExpression:乘数。举个例子,第一次delay = 10s,如果 multiplier = 2,则下次 delay = 20s,以此类推,但是会有一个 maxDelay 作为延迟时间上限
  • fixedDelayTopicStrategy:可选策略包括:每次重试发送到单独的 Topic、只使用一个重试 Topic

fixedDelayTopicStrategy 这个参数还是挺重要的,具体应该怎么选呢,我们稍后再说

4.3. RetryTopicConfiguration

    @Bean
    public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .maxAttempts(4)
                .fixedBackOff(15000)
                .includeTopic("test_topic")
                .create(template);
    }

使用这个方式配置项基本和注解一样,如果你有多个需要配置重试的消费者,使用 RetryTopicConfiguration 的方式要比注解方式更简单

5. 源码解析

5.1. 延迟重试怎么实现的

延迟重试这个功能应该分为两步

  1. 将需要重试的消息发送到 Retry Topic
  2. Retry Topic 的订阅者延迟消费

非常遗憾的是,Kafka 并没有延迟消息这样的功能,所以这个延迟消费也是 spring-kafka 自己实现的,不得不说这个组件真的下了很多功夫

接下来聊聊延迟重试的实现原理

5.1.1. 延迟消息标识

消息发送到 Retry Topic 这个步骤,感兴趣的同学可以 debug 一下 SeekToCurrentErrorHandler#handle 这里就不详细说了

每个需要被重试的消息,都会被添加 retry_topic-backoff-timestamp 这个 header,这个值代表这个消息的期望执行时间

开启了重试功能的 KafkaListener,在执行消费逻辑前,会先执行KafkaBackoffAwareMessageListenerAdapter#onMessage,该方法会先对消息进行检查

KafkaBackoffAwareMessageListenerAdapter#onMessage

这部分逻辑是:

  1. 首先检查 consumerRecord 是否包含 retry_topic-backoff-timestamp,如果有则进入步骤2
  2. 现在时间是否达到了期望执行时间,if ( nowTime > executeTime ) 该方法什么也不做,程序会立刻执行消费逻辑
  3. 未达到期望执行时间,准备暂停消费者对当前 TopicPartition 的消费,但是并不是在这里完成的,这个方法内部只是记录了一下需要暂停的 TopicPartition(这个数据存储在 KafkaMessageListenerContainer 的 pauseRequestedPartitions 中),并在 PartitionPausingBackoffManager 中存储了 BackOffContext,随后抛出一个异常打断消费流程

5.1.2. 暂停分区

只要 Kafka 消费线程还在运行,就会无限调用 KafkaMessageListenerContainer#pollAndInvoke

KafkaMessageListenerContainer#pollAndInvoke

pollAndInvoke 中 pausePartitionsIfNecessary 方法会根据 KafkaMessageListenerContainer 中存储的 pauseRequestedPartitions 暂停 partition,使用的方法是 Kafka Client 的 consumer.pause

调用 consumer.pause 之后,之后调用 consumer.poll 不会返回任何数据,直到调用 resume 恢复消费。该方法不会造成 Rebalance

5.1.3. 恢复分区

有了上面暂停消费的逻辑,还得有对应的恢复消费才能实现“延迟消费”,下面来看下恢复消费的逻辑

KafkaMessageListenerContainer#checkIdlePartition

KafkaMessageListenerContainer#checkIdlePartition 方法会不断地检查 partition 是否空闲(长时间未拉取到消息)。如果符合了空闲 partition 的标准,则发送事件 ListenerContainerPartitionIdleEvent

PartitionPausingBackoffManager

PartitionPausingBackoffManager 监听该事件,并尝试查找该 TopicPartition 是否存在 BackOffContext。存在则代表该分区被暂停,如果时间条件满足,从 KafkaMessageListenerContainer 的 pauseRequestedPartitions 删除该分区

KafkaMessageListenerContainer#resumePartitionsIfNecessary

最后 KafkaMessageListenerContainer#resumePartitionsIfNecessary 会将“已被 Kafka Consumer 暂停”但是“不存在于 KafkaMessageListenerContainer 的 pauseRequestedPartitions 的分区”恢复消费(通过 consumer.resume

5.1.4. 小结

画一张图来总结一下 Retry Topic 的执行流程

这里补充说明一下

  • 其实 MAIN_TOPIC 和 RETRY TOPIC 执行的代码是完全相同的,上图只是为了更好的让大家理解 Retry Topic 的流程
  • 本身 Kafka 消费流程是一个无限循环

5.2. 关于 Retry Topic 策略

下面详细说说 Topic 策略这个事

5.2.1. FixedDelayStrategy.MULTIPLE_TOPICS

test_topic 为例,此时我 attempts = 3, delay=10, multiplier=2,会额外创建以下三个 Topic

  • test_topic-retry-0
  • test_topic-retry-1
  • test_topic-dlt

第一次消费失败,会发送到 test_topic-retry-0,消息延迟为 10s
第二次消费失败,会发送到 test_topic-retry-1,消息延迟为 20s
第三次消费失败,会发送到 test_topic-dlt

此时每个 Retry Topic 中的消息延迟时间是相同的,在消费时间可控的情况下,消息延迟的时间不会有过大的偏差

该策略的缺点就是,使用了过多的 Topic,但是可以实现重试时间指数级上升

5.2.2. FixedDelayStrategy.SINGLE_TOPIC

延迟时间固定的情况适合使用 SINGLE_TOPIC 策略,该策略下只有一个 Retry Topic。如果 SINGLE_TOPIC 延迟时间指数级增长的话,很可能出现的问题是,第一条消息第三次重试延迟时间为 30s,第二条消息第一次重试延迟时间为 10s,两条消息被分配到同一分区,这二条消息被迫在 40s 之后才能重试

补充:如何使用多个 retry 线程

默认情况下,Main Topic,每个 Retry Topic,DLT 分别有 1 个消费线程,默认情况下 Retry 和 DLT 会使用 KafkaListener 提供的 ContainerFactory 初始化。

例如我把 KafkaListener concurrency 设置为 4。此时 Retry Topic,每个 Retry Topic,DLT 分别有 4 个消费线程

也可以自定义 Retry Topic 消费者使用的 ContainerFactory

spring-kafka 相关 demo

https://github.com/TavenYin/taven-springboot-learning/tree/master/springboot-kafka

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335