先说点闲话,这个问题的发生是因为当年我的垃圾代码导致,嗯,垃圾到我自己不想看。刚毕业的年轻人总是想探索一下未知,于是一知半解之下就上了MQ,但是话说回来,这种自讨苦吃是要的,不然真的会一直垃圾下去。
问题的发生
项目中的消息推送使用MQ做了异步处理,有一天消息推送突然中断了,排查了好久,日志也没有,这个也是,嗯,知识太少。好久之后,我想不会是MQ出问题了吧,我一看MQ,果然,这里不是原图,Unacknowledged状态的有8,Ready的有好多,再打开消费端的配置一看,
- 问题代码
@Component
public class MsgQueueListener extends MessageListenerAdapter {
private static Logger logger = LoggerFactory.getLogger(MsgQueueListener.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageSendFacadeService messageSendFacadeService;
@Autowired
private TaskExecutor taskExecutor;
@Autowired
private RedisUtil redisUtil;
@Override
public void onMessage(final Message message, final Channel channel) throws IOException {
final WechatTemMessageDTO dto = (WechatTemMessageDTO) rabbitTemplate.getMessageConverter()
.fromMessage(message);
try {
//业务代码处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (AnswerException e) {
if (getRetryCount(message.getMessageProperties() < 3) {
dto.setErrMsg(e.getErrorMsg());
//重试次数小于3 ,投递到重试队列
rabbitTemplate.convertAndSend("*", "*",
dto, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties properties = new MessageProperties();
properties.setHeader("x-orig-routing-key", "*");
return message;
}
});
} else {
dto.setErrMsg(e.getErrorMsg());
rabbitTemplate.convertAndSend("*failed", "*failed", dto,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties properties = new MessageProperties();
properties.setHeader("x-orig-routing-key", "*");
return message;
}
});
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
问题出在catch块,
- catch只捕获了业务异常
对于非业务异常,显然无法捕获,导致消费者没有ack,这段代码主要的原因再调用业务代码的时候,我尽可能的将业务代码捕获的异常转换为了业务异常,但是产生了遗漏,而且这里这样做也有很大的坏处,没有将异常分类,来决定失败消息如何处理,下面再细说。 - catch块中可能又会抛出异常
catch块中抛出的异常显然导致消费者没有ack,也没有finally进行处理,所以消费者慢慢的阻塞。
解决办法应该是在finally语句中来执行这些操作,消费者从队列中取出消息后,无非是三种处理结果:1、处理成功,这种时候应该用basicAck确认消息;2、可重试的处理失败,这时候应该用basicNack将消息重新入列或者丢入死信队列3、不可重试的处理失败,这时候应该使用basicNack将消息丢弃或者丢入失败队列进行相应的业务操作
- 正确示例
enum ProcessResult{
//这里只举几个简单的例子
SUCCESS, // 处理成功
RETRY, // 可以重试的错误
FAIL, // 无法重试的错误
}
@Override
public void onMessage(Message message, Channel channel) throws Exception {
WechatMessageDTO messageDTO = null;
try {
messageDTO = rabbitMQService.getMessageBody(message);
}
catch (Exception e) {
logger.error("MQ 数据转换异常",e);
} finally {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
ProcessResult result=null;
try {
result = messageService.processMsg(messageDTO);
} catch(UserDefineException e){
logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
//根据业务异常类型进行处理
result = ;
}catch (Exception e) {
logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
result = ;
} finally {
postProcessByResult(result);
}
}
容易出问题的点
- 自动ack机制导致消息丢失以及客户端崩溃
MQ只要确认消息发送成功,无须等待应答就会丢弃消息,虽然自动ack机制可以防止队列阻塞的问题,但是无法得知消费者的处理情况。自动ack没有qos控制,只要客户端队列不为空,则不断推送消息,可能导致消费者假死或者崩溃。
qos是rabbitMQ一种消费限流的手段,上面提到的prefetch属性指定每个消费者最大的unacked messages数目。消费者每次最多可以取prefetch条消息缓存在客户端,Java客户端内部维护了一个BlockingQueue用来缓存从queue获取的message,默认值会设为Integer.MAX_VALUE,如果不设置qos可能会导致队列不断膨胀,最终OOM;Spring amqp提供了类似的功能,队列的大小是prefetch的大小,默认是1,关于prefetch的设置可以参考Some queuing theory: throughput, latency and bandwidth
假设有客户端两个消费者线程,prefetch都是10,意味着每个消费者线程每次会从queue中预抓取 10 条消息到本地缓存着等待消费。这里对于MQ来说,客户端只是一个消费者,他们之间建立的Connection(包含多个channel,通常每个消费者线程使用一个)的unacked数变为20,但是对于客户端来说,可能是多个消费者线程,每个channel的unack数量达到prefetch预设值,并且达到最大的最大消费者线程数。便会停止投递新的message到该消费者中直到它发出ack。关于这部分大家可以看我以前写的去了解RabbitMQ连接池,Rabbit将会停止投递新的message到该消费者中直到它发出ack。 - nack机制导致的死循环
消息处理失败时使用Nack,等下一次重新消费,导致队列中Ready状态的消息暴增, - 启用ack机制但是没有启动qos
如我上面发生的问题,如果没有qos,消息处理发生异常后,无法ack,队列的Unacked消息数暴涨,导致MQ响应越来越慢,甚至崩溃。
一些消息可靠性保证措施
对于生产者:
- 发送确认
实现ReturnCallback接口来得到消息发送失败的原因 - 发送失败返回
实现ConfirmCallback接口来确认是否正确到达exchange,当使用confirm时候,如果channel或者connection失败,生产者应该重新发送所有没有来得及提交的数据。但是服务器broker可能已经发送确认数据到生产者了,因此消费者要具有幂等性。
对于消费者: - 通过redelivedred来确认消息是否重复发送