github上面的rabbitMq的测试代码:https://github.com/wuzhong290/rabbitmq.git
要想保住RabbitMQ消息不丢失,需要从下面几个方面进行完善。
一、消息持久化
要想做到消息持久化,必须满足以下三个条件,缺一不可。
1、Exchange 设置持久化:durable:true
new TopicExchange("amq.topic")创建的Exchange就是持久化的。
org.springframework.amqp.core.AbstractExchange构造方法如下:
2、Queue 设置持久化
new Queue(queueName)创建的Queue就是持久化的。
org.springframework.amqp.core.Queue构造方法如下:
3、Message持久化发送
发送消息设置发送模式deliveryMode=2代表持久化消息
org.springframework.amqp.rabbit.core.RabbitTemplate默认情况下发送模式为deliveryMode=2
org.springframework.amqp.core.MessageProperties的默认发送模式:
二、ACK确认机制
1、消息发送确认
ConfirmCallback 只确认消息是否正确到达 Exchange 中。
ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行。
github上面的代码:https://github.com/wuzhong290/rabbitmq.git
执行com.demo.message.TestReturnCallback方法testReturnCallback能够看效果。
2、消息接收确认
默认情况下消息消费者是自动 ack (确认)消息的,需要设置为手动确认,原因是:自动确认会在消息发送给消费者后立即确认,这样存在丢失消息的可能。
github上面的代码:https://github.com/wuzhong290/rabbitmq.git
执行com.demo.message.TestManualAcks方法testMQ能够看效果。
com.demo.message.example.NoticeHandlerACK演示了手动确认消费者的实现。
3、参考资料
参考https://www.jianshu.com/p/2c5eebfd0e95
上面文章的消息发送确认ReturnCallback部分需要完善一下:
return-callback="returnCallback" mandatory="true"同时设置了才有效。和rabbit:connection-factory 的publisher-returns="true"没有关系。
三、设置集群镜像模式
参考https://www.jianshu.com/p/dd706e9ee80d 文章中的《6、HA 镜像模式队列设置》
四、消息补偿机制
在消息发送、接受时记录DB日志,定时轮训DB日志,查明哪些发送消息没有成功消费,启动重新发送消息机制。