前言
前文 Spring Cloud Stream 进阶配置——高可用(一)——失败重试 介绍了 失败重试 机制如何保障消息被正确消费,对于短暂性故障,消费失败后重试,可以得到有效解决;但是如果是诸如程序问题导致消费失败的情况,短时间内(未修复bug之前),当重试次数消耗完之后,消息则会被丢弃。
对于无关紧要的消息,丢了也就丢了,但如果是类似账单这种敏感数据,一旦丢了,老板就要找你谈话了。
针对上面所述场景,rabbitmq
有对应的方案,即 死信队列。
死信队列
何为死信
在开始了解死信队列之前,我们需要知道什么死信,从字面看就是“死掉了的信息(消息)”,不过这是相对于队列来说的,在消息所在的队列看来,没有意义、没有价值的消息,就应该丢弃,任其消亡。那么问题来了,队列是怎么界定这类消息的?这里,不得不先说明一下哪类消息属于死信,有如下几种情况:
- 消费者通过
basic.reject
拒绝确认消息; - 消费者使用
basic.nack
否定确认消息,并参数requeue
设置为false
; - 消息在队列中停留时间超过
ttl
,即未能被及时消费; - 消息体过大,超过队列所允许的消息体大小;
对于前2种情况,都是消费者主动放弃消息,而后面2种,则因为队列的自我保护机制被队列无情地丢弃。不过,这几种情况都有一个共同点,如果再保留这些死信,很大可能会影响整个队列的正常工作,因为这些都属于消费者不疼,队列不爱的消息,所以只好选择从队列踢掉。
死信的归宿
而死信队列则是死信的归宿,也可以将它比做死信的回收站(下文会揭秘为什么)。死信队列其实也是一个普通队列,可以被消费者订阅,当消息成为死信后,会被投递到与原队列 “绑定” 的队列,该队列就是死信队列。
死信交换机
我们都知道,发布者在发布消息后,需要经过消息交换机,根据特定的路由,才能被正确投递到目标队列。而死信要被投递到死信队列,那肯定还需要一个消息交换机,该交换机为 Dead Letters Exchanges
(DLX
),即 死信交换机。当然,死信交换机也是一个普通的消息交换机,可以通过正常的声明方式去创建。
思考
这里先抛出一个问题,既然死信队列、死信交换机都是普通的队列、消息交换机,在可视化界面怎么去区分死信队列与其他队列、死信交换机与其他交换机。
声明死信队列
ps: 这里的主题是如何在
SpringCloud Stream
中使用死信队列,其他声明方式(如原生SDK)不在本文讨论范围。
Spring Cloud Stream
声明死信队列非常简单,简单到只需要一个配置就能搞定,这里不得不说 Spring Boot
和 Spring Cloud
的设计思想是真厉害。
“开启” 死信队列的相关配置为:spring.cloud.stream.bindings.<channelName>.consumer.autoBindDlq
,该配置的作用为:是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
上面的 开启 用了双引号,为什么呢?配置 autoBindDlq
翻译一下就能大概猜到原因了,因为这个开关,当为 true
时,开启的是 自动声明死信队列,并将其绑定到死信交换机。所以,我们也是可以自己手动创建的。
示例
以下代码可在 源码 查看。
配置
spring:
cloud:
stream:
bindings:
packetUplinkOutput:
destination: packetUplinkDlxTopic
content-type: application/json
binder: rabbit
packetUplinkInput:
destination: packetUplinkDlxTopic
content-type: application/json
group: ${spring.application.name}.dlx
binder: rabbit
rabbit:
bindings:
packetUplinkInput:
consumer:
ttl: 20000 # 默认不做限制,即无限。消息在队列中最大的存活时间。当消息滞留超过ttl时,会被当成消费失败消息,即会被转发到死信队列或丢弃.
# DLQ相关
autoBindDlq: true # 是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
ps: 上文提到有几种情况,消息会变成死信,而上面使用的配置是通过设置队列的
ttl
,即消息在队列中存活的最大时间为 20s。因为这是制造死信最简单粗暴的方法。
代码
消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
/**
* 设备 eui
*/
private String devEui;
/**
* 数据
*/
private String data;
// 省略其他字段
}
测试用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dlq")
@EnableBinding({ScasDlqTest.MessageSink.class, ScasDlqTest.MessageSource.class})
public class ScasDlqTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
/**
*
*/
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 100; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(1000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("发布上行数据包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("packetUplinkInput")
public void handle(PacketModel model) throws InterruptedException {
Thread.sleep(1000);
log.info("消费上行数据包消息. model: [{}].", model);
}
}
public interface MessageSink {
@Input("packetUplinkInput")
SubscribableChannel packetUplinkInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
}
运行测试用例
运行测试用例后,访问 Rabbitmq可视化页面 可以看到类似下图的页面:
因为目标队列的消费者 1s 才消费一条消息,而队列的 ttl
只有 20s,所以差不多 20s 后,再刷新页面,可以看到:
可以看到,队列中的待消费消息为80条,而我们一共发布了100条,消费力为1条/s,20s后,未消费的消息全部进入死信队列,所以80条对得上。
验证死信被丢弃
为了验证只有创建死信队列并绑定到死信交换机,死信才不会被丢弃,可以将 autoBindDlq
改成 false
,再跑一次,20s 后,看目标队列是不是没有消息。不过,需要先把目标队列删除,不然会出现如下错误:
2019-08-19 17:37:17.545 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:18.552 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:20.557 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:24.566 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:29.575 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
...
删除队列
验证结果
重新跑一次测试用例,20s后,可以看到:
如果没有声明死信队列,那么死信一旦产生,就会直接被丢弃,也找不回来了。
如何使用死信队列
死信在进入死信队列后,如果没有类似重新消费的逻辑,那跟被直接丢弃没啥区别,甚至还占用磁盘空间。下面介绍2种体现死信队列价值的操作与实现。
1. 重新发布到目标队列
在每个队列的详情页中,有一个 Move Messages
分栏,如下图所示:
上图中圈中的提示,是因为没有启动
rabbitmq_shovel
、 rabbitmq_shovel_management
这2个插件,启动命令为: rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
。
如果插件已启动,看到的界面如下:
进入死信队列的详情页,将目标队列 packetUplinkDlxTopic.scas-data-collection.dlx
拷贝到 Move Messages
表单的 Destination queue
输入框中,然后点击 Move Messages
按钮。可以看到如下:
死信队列的所有消息全部被重新投递到目标队列,看到这里,可以确定的是:通过 Move Messages
功能,是可以将死信重新投递到原队列,而且也可以被正常重新消费。不过可以预见的是,再过20s,又有60条消息变成死信。
ps: 揭秘一下上文的埋点——死信队列比作死信的回收站。其实,看到这里,大家应该大致能理解这句话了,消息在变成死信,这在队列看来,就是我不要这些消息了,可以把它们丢了,所以就进入死信队列这一回收站,而在特定时机,比如机器、环境稳定了,又可以重新发布到原来的队列,即对应回收站的恢复文件功能。所以将死信队列比作死信的回收站,在这种情况下还是可以理解的。
2. 定义死信队列的消费逻辑
上文提到,死信队列其实是一个普通的队列,那么我们直接订阅该死信队列是不是就可以正常消费死信了?答案是肯定的。接下来,使用 spring-rabbitmq
的注解 @RabbitListener
定义死信队列的处理逻辑,代码如下(直接追加在测试用例类即可):
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dlq")
@EnableBinding({ScasDlqTest.MessageSink.class, ScasDlqTest.MessageSource.class})
public class ScasDlqTest {
// 省略其他代码
/**
* 原队列名称
*/
private static final String ORIGINAL_QUEUE = "packetUplinkDlxTopic.scas-data-collection.dlx";
/**
* 死信队列名称. 由于没有自定义, 所以根据 spring cloud stream 死信队列名称生成规则, 在原队列名称后追加 '.dlq'.
*/
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
/**
* 死信队列交换机. 默认为: {@link RabbitCommonProperties#DEAD_LETTER_EXCHANGE}, 值为 "DLX".
*/
private static final String DLX = RabbitCommonProperties.DEAD_LETTER_EXCHANGE;
/**
* 死信交换机将死信路由到死信队列的 routing-key. 由于没有自定义, 所以根据 spring cloud stream 死信队列名称生成规则,
* routing-key为原队列的名称.
*/
private static final String routingKey = "packetUplinkDlxTopic.scas-data-collection";
/**
* 死信队列的处理逻辑
* @param failedMessage
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(DLQ)
, exchange = @Exchange(DLX)
, key = routingKey
),
concurrency = "1-5"
)
public void handleDlq(Message failedMessage) throws InterruptedException {
Thread.sleep(10);
log.info("进入 [上行数据包队列] 的死信队列. 完整消息: {};", failedMessage);
log.info("body: {}", (PacketModel) JSON.parseObject(failedMessage.getBody(), PacketModel.class));
}
}
代码中各个变量都说的很清楚,这里就不赘述了,直接重新启动测试用例(可以考虑先把死信队列删掉,因为里边还有之前遗留的死信),20s 后,可以看到控制台打印如下:
消费完成后,2个队列中也都没有堆积的消息,如下:
当然,上面示例中,只是加死信打印出来,而实战中,则需要根据具体业务自定义死信处理逻辑,比如,发送邮件、序列化到数据库等。
这2种方案的区别
- 第一种需要手动人工去操作;而第二种是全自动的,只要有死信,就能立即被消费;
- 基于上面一点,可以引出开发成本上的区别。第一种基本不用额外的编程;而第二种则需要定义对应死信队列的监听器,才能自定义消费逻辑;
- 再基于上面一点,可以引出功能、扩展性上的区别。第一种基本没有其他扩展能力;而第二种,因为收到的死信消息体,不仅包含了原消息,还携带了成为死信的原因,比如上面的例子,在日志打印的完整消息中,可以看到
x-first-death-reason=expired
,即原因是消息过期了,那我们则可以根据不同的原因再结合具体业务,定制处理逻辑;
死信队列其他配置
spring:
cloud:
stream:
rabbit:
bindings:
packetUplinkInput:
consumer:
# DLQ相关
autoBindDlq: true # 是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
deadLetterQueueName: 'packetUplinkDlxTopic.scas-data-collection.dlx.dlq' # 默认prefix + destination + group + .dlq。DLQ的名称。
deadLetterExchange: 'DLX' # 默认prefix + DLX。DLX的名称
deadLetterRoutingKey: 'packetUplinkDlxTopic.scas-data-collection.dlx' # 默认destination + group
dlqExpires: 30000 # 队列所有 customer 下线, 且在过期时间段内 queue 没有被重新声明, 多久之后队列会被销毁, 注意, 不管队列内有没有消息. 默认不设置.
dlqLazy: false # 是否声明为惰性队列(Lazy Queue).默认false
dlqMaxLength: 100000 # 队列中消息数量的最大限制. 默认不限制
dlqMaxLengthBytes: 100000000 # 队列所有消息总字节的最大限制. 默认不限制
dlqMaxPriority: 255 # 队列的消息可以设置的最大优先级. 默认不设置
dlqTtl: 1000000 # 队列的消息的过期时间. 默认不限制
republishToDlq: true # 默认false。当为true时,死信队列接收到的消息的headers会更加丰富,多了异常信息和堆栈跟踪。
republishDeliveryMode: DeliveryMode.PERSISTENT # 默认DeliveryMode.PERSISTENT(持久化)。当republishToDlq为true时,转发的消息的delivery mode
ps:
如果需要验证republishToDlq
配置的作用,可运行测试用例类 `ScasRepublishToDlqTest,既可看到结果,控制台打印结果类似如下:
总结
消息被队列丢弃后,会变成死信,如果队列不声明死信队列,那么这些消息将被永久丢弃,而如果声明死信队列,则死信会进入死信,死信可以被重新投递回原队列,也可以采用订阅死信队列的方式自定义处理逻辑,因为死信队列其实也是一个普通队列。又因为死信队列是一个普通队列,消费过程中肯定也会产生死信,那么死信队列产生的死信,有该何去何从?所以有了死信队列的死信队列,后续文章继续说明。
扩展
1. 鼠标悬停标签查看队列的header
2. 如何看出队列是否声明了死信队列
当队列声明了死信队列,会有上图圈中的2个标签。
DLX: 代表死信会被投递到的死信交换机。悬停该标签,可以看到
x-dead-letter-exchange: DLX
,其中 DLX
就是交换机名称;DLK: 代表死信被投递到死信交换机后,会根据什么路由准确投递到死信队列;悬停该标签,可以看到
x-dead-letter-routing-key: packetUplinkDlxTopic.scas-data-collection.dlx
;
3. 建议队列尽可能声明死信队列
死信队列是个好东西,当队列声明了死信队列,可以很大程度上避免消息丢失的情况,所以建议队列都添加 autoBindDlq
配置。可以使用全局默认配置:spring.cloud.stream.default.consumer.autoBindDlq: true
,这样所有队列都会应用该配置。
推荐阅读
Spring Cloud Stream 进阶配置——高吞吐量(一)——多消费者
Spring Cloud Stream 进阶配置——高吞吐量(二)——弹性消费者数量
Spring Cloud Stream 进阶配置——高吞吐量(三)——批量预取消息(prefetch)
Spring Cloud Stream 进阶配置——高可用(一)——失败重试
相关链接
Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)
完!