本文参考石杉的架构笔记公众号, 总结了一下RabbitMQ的核心知识, 非常感谢
一、消息中间件选型
1. ActiveMQ:
- 优点: ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。
- 缺点: 没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。
2. RabbitMQ:
- 优点: 可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此,综合对比后,RabbitMQ是一个不错的消息中间件选择。
- 缺点: 基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。
3. RocketMQ:
- 优点: 阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。
- 缺点: 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码
4. Kafka:
- 优点: 专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。
- 缺点: 相对于以上几种中间件来说,功能较少,在传统的MQ中间件使用场景中较少采用。
二、消息中间件的常见使用场景
- 复杂系统的解耦: 多个系统间通过中间件进行数据交互, 避免牵一发而动全身, 减少耦合, 提升系统稳定性与可扩展性
- 复杂链路的异步调用: 某些业务场景可以通过异步执行减少同步调用的时间, 从而大大提高系统响应时间而不影响核心逻辑
- 瞬时高峰的削峰处理: 流量高峰期, 可以将请求积压在MQ中, 服务器不用一下处理所有请求从而导致系统崩溃, 高峰期后, 消费者可以慢慢消费
三、系统架构引入消息中间件后会有哪些缺点
- 系统可用性降低: 引入MQ,系统多了一个依赖。依赖如果出现问题,就会导致系统可用性降低。一旦引入中间件,就必须考虑这个中间件是如何部署的,如何保证高可用性
- 系统稳定性降低: 引入MQ, 可能由于网络故障、中间件故障、消费者异常等原因导致各种各样乱七八糟的问题产生, 从而使系统稳定性下降
- 分布式一致性问题: 多系统协同处理一个业务, 不能保证所有系统都正常处理, 有可能出现系统数据不一致的情况, 所以此时又需要使用可靠消息最终一致性的分布式事务方案来保障数据一致性。
四、消息发送确认
生产者发送消息, 先发送消息到Exchange, 然后Exchange再路由到Queue, 这中间就需要确认两个事情
- 确认消息是否成功发送到Exchange
- 确认消息是否从Exchange成功路由到Queue
spring提供了两个回调函数来处理这两种消息发送确认
1. 确认消息是否成功发送到Exchange
有2种方式, 一种是重量级的事务消息机制。采用类事务的机制把消息投递到MQ,可以保证消息不丢失,但是性能极差,经过测试性能会呈现几百倍的下降。
所以说现在一般是不会用这种过于重量级的机制,而是会用轻量级的confirm机制。
另一种方式是confirm机制, 跟手动ack机制类似, 生产者将消息投递到RabbitMQ, 且将消息持久化到硬盘后, RabbitMQ会通过一个回调方法将confirm信息回传给生产端, 这样, 如果生产端的服务接收到了这个confirm消息,就知道是已经持久化到磁盘了。否则如果没有接收到confirm消息,那么就说明这条消息可能半路丢失了,此时你就可以重新投递消息到MQ去,确保消息不会丢失。
1.1 通过AMQP的事务机制可以保证消息发送确认
事务机制主要是通过对channel的设置实现
channel.txSelect();// 声明启动事务模式
channel.txComment();// 提交事务
channel.txRollback();// 回滚事务
1.2 使用confirm确认机制
实现ConfirmCallback并重写confirm回调方法, 消息发送到Broker后触发回调, 可以确认消息是否成功发送到Exchange
application.properties
:
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
回调:
// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功发送到Exchange");
} else {
log.info("消息发送到Exchange失败: cause: {}", correlationData, cause);
}
});
2. 确认消息是否从Exchange成功路由到Queue
实现ReturnCallback并重写returnedMessage回调方法, 可以确认消息从EXchange路由到Queue失败, 注意: 这里的回调是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
application.properties
:
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
spring.rabbitmq.template.mandatory=true
回调:
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
五、消息接收确认
消息怎样才算消费成功?
RabbitMQ默认自动确认(ack)消息被正确消费, 即消息投递到消费者后就自动确认消息被处理完毕, 并且会将该消息删除, 即使消费者意外宕机, 或者抛出异常, 如果消费者接收到消息, 还没处理完成就down掉或者抛出异常, 那么, 这条消息就丢失了
分析一下问题出在哪, 问题出在RabbitMQ只管消息投递出去, 而不管消息是否被正确处理就自动删除消息, 所以, 只要将自动ack修改为手动ack, 消费成功才通知RabbitMQ可以删除该消息即可, 如果消费者宕机, 消费失败, 由于RabbitMQ并未收到ack通知, 且感知到该消费者状态异常(如抛出异常), 就会将该消息重新推送给其他消费者, 让其他消费者继续执行, 这样就保证消费者挂掉但消息不会丢失
消息确认模式有:
- AcknowledgeMode.NONE:自动确认
- AcknowledgeMode.AUTO:根据情况确认
- AcknowledgeMode.MANUAL:手动确认
默认情况下消息消费者是自动ack(确认)消息的, 如果要手动ack(确认), 则需要修改确认模式为manual
application.properties
:
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费消息并手动确认:
@Component
@Slf4j
public class LogUserConsumer {
@Autowired
UserLogService userLogService;
@RabbitListener(queues = "log.user.queue")
public void logUserConsumer(Message message, Channel channel, @Header (AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
log.info("收到消息: {}", message.toString());
userLogService.insert(MessageHelper.msgToObj(message, UserLog.class));
} catch (Exception e){
log.error("logUserConsumer error", e);
channel.basicNack(tag, false, true);
} finally {
channel.basicAck(tag, false);
}
}
}
重点在
channel.basicAck(tag, false)
方法, 第一个参数deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel第二个参数multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
六、消息持久化
消息被投递到RabbitMQ的内存中, 还没投递到消费者实例之前宕机了, 消息不就丢失了?
可以进行消息持久化, 将Exchange、queue和message都持久化到硬盘, 这样, RabbitMQ重启时, 会把持久化的Exchange、queue和message从硬盘重新加载出来, 重新投递消息
1.1 Exchange的持久化, 声明交换机时指定持久化参数为true
即可
@Bean
public DirectExchange logUserExchange() {
return new DirectExchange("log.user.exchange", true, false);
}
第二个参数durable
: 是否持久化, 第三个参数autoDelete
: 当所有绑定队列都不再使用时, 是否自动删除交换器, true: 删除, false: 不删除
1.2 queue的持久化, 声明队列时指定持久化参数为true
即可
@Bean
public Queue logUserQueue() {
return new Queue("log.user.queue.name", true);
}
第二个参数durable
, 是否持久化
1.3 message的持久化, 是通过配置deliveryMode
实现的, 生产者投递时, 指定deliveryMode
为MessageDeliveryMode.PERSISTENT
即可实现消息的持久化, 投递和消费都需要通过Message
对象进行交互, 为了不每次都写配置转换的代码, 我们写一个消息帮助类MessageHelper
:
public class MessageHelper {
public static Message objToMsg(Object obj) {
if (null == obj) {
return null;
}
Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
return message;
}
public static <T> T msgToObj(Message message, Class<T> clazz) {
if (null == message || null == clazz) {
return null;
}
String str = new String(message.getBody());
T obj = JsonUtil.strToObj(str, clazz);
return obj;
}
}
消息投递时:
rabbitTemplate.convertAndSend("log.user.exchange.name", "log.user.routing.key.name", MessageHelper.objToMsg(userLog));
消息消费时(参考五、消息接收确认
):
UserLog userLog = MessageHelper.msgToObj(message, UserLog.class);
如果不需要消息持久化, 则不需要通过Message进行转换, 可以直接通过字符串或者对象投递和消费
七、unack消息的积压问题
什么叫unack消息的积压问题, 简单来说就是消费者处理能力有限, 无法一下将MQ投递过来的所有消息消费完, 如果MQ推送消息过多, 比如可能有几千上万条消息积压在某个消费者实例内存中, 此时这些积压的消息就处于unack状态, 如果一直积压, 就有可能导致消费者服务实例内存溢出、内存消耗过大、甚至内存泄露
所以, RabbitMQ是必须要考虑一下消费者服务的处理能力的。
如何解决?
RabbitMQ基于一个prefetch count来控制这个unack message的数量。
你可以通过 “channel.basicQos(10)” 这个方法来设置当前channel的prefetch count。也可以通过配置文件设置: spring.rabbitmq.listener.simple.prefetch=10
举个例子,比如你要是设置为10的话,那么意味着当前这个channel里,unack message的数量不能超过10个,以此来避免消费者服务实例积压unack message过多。
这样的话,就意味着RabbitMQ正在投递到channel过程中的unack message,以及消费者服务在处理中的unack message,以及异步ack之后还没完成ack的unack message,所有这些message加起来,一个channel也不能超过10个。
如果你要简单粗浅的理解的话,也大致可以理解为这个prefetch count就代表了一个消费者服务同时最多可以获取多少个message来处理。
prefetch就是预抓取的意思,就意味着你的消费者服务实例预抓取多少条message过来处理,但是最多只能同时处理这么多消息。
如果一个channel里的unack message超过了prefetch count指定的数量,此时RabbitMQ就会停止给这个channel投递消息了,必须要等待已经投递过去的消息被ack了,此时才能继续投递下一个消息。
设置多大合理?
RabbitMQ官方给出的建议是prefetch count一般设置在100 - 300之间。也就是一个消费者服务最多接收到100 - 300个message来处理,允许处于unack状态。
这个状态下可以兼顾吞吐量也很高,同时也不容易造成内存溢出的问题。
八、总结
- 配置汇总
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
spring.rabbitmq.template.mandatory=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
Github
完整项目请查看我的GitHub, 通过RabbitMQ实现了用户登录日志的记录
https://github.com/wangzaiplus/springboot/tree/wxw另外, 关于windows安装RabbitMQ详细教程, 请参考:
https://www.jianshu.com/p/c7726ba4b046关于Linux centos7安装RabbitMQ详细教程, 请参考:
https://www.jianshu.com/p/ee9f7594212b