RabbitMQ使用的一些问题

先说点闲话,这个问题的发生是因为当年我的垃圾代码导致,嗯,垃圾到我自己不想看。刚毕业的年轻人总是想探索一下未知,于是一知半解之下就上了MQ,但是话说回来,这种自讨苦吃是要的,不然真的会一直垃圾下去。

问题的发生

项目中的消息推送使用MQ做了异步处理,有一天消息推送突然中断了,排查了好久,日志也没有,这个也是,嗯,知识太少。好久之后,我想不会是MQ出问题了吧,我一看MQ,果然,这里不是原图,Unacknowledged状态的有8,Ready的有好多,再打开消费端的配置一看,

MQ管理

消费端配置
,qos设置正好为8,通过以上排查基本可以确定队列堵塞是由于消费者线程取走了消息,但是既没有ACK,也没有NACK,这样的消息个数到达Qos设置的值后,队列就会堵塞。

  • 问题代码
@Component
public class MsgQueueListener extends MessageListenerAdapter {

  private static Logger logger = LoggerFactory.getLogger(MsgQueueListener.class);
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Autowired
  private MessageSendFacadeService messageSendFacadeService;
  @Autowired
  private TaskExecutor taskExecutor;
  @Autowired
  private RedisUtil redisUtil;

  @Override
  public void onMessage(final Message message, final Channel channel) throws IOException {
    final WechatTemMessageDTO dto = (WechatTemMessageDTO) rabbitTemplate.getMessageConverter()
        .fromMessage(message);
    try {
     //业务代码处理
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (AnswerException e) {
      if (getRetryCount(message.getMessageProperties() < 3) {
        dto.setErrMsg(e.getErrorMsg());
        //重试次数小于3 ,投递到重试队列
        rabbitTemplate.convertAndSend("*", "*",
            dto, new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = new MessageProperties();
                properties.setHeader("x-orig-routing-key", "*");
                return message;
              }
            });
      } else {
        dto.setErrMsg(e.getErrorMsg());
        rabbitTemplate.convertAndSend("*failed", "*failed", dto,
            new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = new MessageProperties();
                properties.setHeader("x-orig-routing-key", "*");
                return message;
              }
            });
      }
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  }

问题出在catch块,

  1. catch只捕获了业务异常
    对于非业务异常,显然无法捕获,导致消费者没有ack,这段代码主要的原因再调用业务代码的时候,我尽可能的将业务代码捕获的异常转换为了业务异常,但是产生了遗漏,而且这里这样做也有很大的坏处,没有将异常分类,来决定失败消息如何处理,下面再细说。
  2. catch块中可能又会抛出异常
    catch块中抛出的异常显然导致消费者没有ack,也没有finally进行处理,所以消费者慢慢的阻塞。
    解决办法应该是在finally语句中来执行这些操作,消费者从队列中取出消息后,无非是三种处理结果:1、处理成功,这种时候应该用basicAck确认消息;2、可重试的处理失败,这时候应该用basicNack将消息重新入列或者丢入死信队列3、不可重试的处理失败,这时候应该使用basicNack将消息丢弃或者丢入失败队列进行相应的业务操作
  • 正确示例
enum ProcessResult{
//这里只举几个简单的例子
  SUCCESS,  // 处理成功
  RETRY,   // 可以重试的错误
  FAIL,  // 无法重试的错误
}
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        WechatMessageDTO messageDTO = null;
        try {
            messageDTO = rabbitMQService.getMessageBody(message);
        }
        catch (Exception e) {
            logger.error("MQ 数据转换异常",e);
        } finally {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
        ProcessResult result=null;
        try {
            result = messageService.processMsg(messageDTO);
        } catch(UserDefineException e){
          logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
          //根据业务异常类型进行处理
          result = ;
        }catch (Exception e) {
            logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
            result = ;
        } finally {
            postProcessByResult(result);
        }
    }

容易出问题的点

  • 自动ack机制导致消息丢失以及客户端崩溃
    MQ只要确认消息发送成功,无须等待应答就会丢弃消息,虽然自动ack机制可以防止队列阻塞的问题,但是无法得知消费者的处理情况。自动ack没有qos控制,只要客户端队列不为空,则不断推送消息,可能导致消费者假死或者崩溃。
    qos是rabbitMQ一种消费限流的手段,上面提到的prefetch属性指定每个消费者最大的unacked messages数目。消费者每次最多可以取prefetch条消息缓存在客户端,Java客户端内部维护了一个BlockingQueue用来缓存从queue获取的message,默认值会设为Integer.MAX_VALUE,如果不设置qos可能会导致队列不断膨胀,最终OOM;Spring amqp提供了类似的功能,队列的大小是prefetch的大小,默认是1,关于prefetch的设置可以参考Some queuing theory: throughput, latency and bandwidth
    假设有客户端两个消费者线程,prefetch都是10,意味着每个消费者线程每次会从queue中预抓取 10 条消息到本地缓存着等待消费。这里对于MQ来说,客户端只是一个消费者,他们之间建立的Connection(包含多个channel,通常每个消费者线程使用一个)的unacked数变为20,但是对于客户端来说,可能是多个消费者线程,每个channel的unack数量达到prefetch预设值,并且达到最大的最大消费者线程数。便会停止投递新的message到该消费者中直到它发出ack。关于这部分大家可以看我以前写的去了解RabbitMQ连接池,Rabbit将会停止投递新的message到该消费者中直到它发出ack。
  • nack机制导致的死循环
    消息处理失败时使用Nack,等下一次重新消费,导致队列中Ready状态的消息暴增,
  • 启用ack机制但是没有启动qos
    如我上面发生的问题,如果没有qos,消息处理发生异常后,无法ack,队列的Unacked消息数暴涨,导致MQ响应越来越慢,甚至崩溃。

一些消息可靠性保证措施

对于生产者:

  • 发送确认
    实现ReturnCallback接口来得到消息发送失败的原因
  • 发送失败返回
    实现ConfirmCallback接口来确认是否正确到达exchange,当使用confirm时候,如果channel或者connection失败,生产者应该重新发送所有没有来得及提交的数据。但是服务器broker可能已经发送确认数据到生产者了,因此消费者要具有幂等性。
    对于消费者:
  • 通过redelivedred来确认消息是否重复发送
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1
  • 应用场景 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种: 1.串行的方式 2.并行的...
    lijun_m阅读 1,805评论 0 3
  • RabbitMQ的学习笔记 关于RabbitMQ的几个角色如下: 关于名词的通俗解析: 首先我们肯定知道Rabbi...
    ChinaXieShuai阅读 1,444评论 0 2
  • 【六项精进打卡】 2019.4.9日 姓名:陈岗 企业名称:上海孚因流体动力设备股份有限公司 打卡第349天 【知...
    我心依旧_79e2阅读 148评论 0 0
  • 今天FOX放出了DC美剧《哥谭》最终季的终极版预告片,而所有的重要角色全都登场了。 看起来每个角色在最终季里都会有...
    DC中文网阅读 1,500评论 0 0