SpringBoot--实战开发--整合RabbitMQ(六十三)

一、RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。

Direct Exchange
Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
Topic Exchange
Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
topic 和 direct 类似, 只是匹配上支持了"模式", 在"点分"的 routing_key 形式中, 可以使用两个通配符:
*表示一个词.

表示零个或多个词.

Headers Exchange
headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型.
在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
Fanout Exchange
Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。

二、Maven依赖

<!--RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

三、简单配置

  1. 配置文件
# rabbitmq 地址
spring.rabbitmq.host=192.168.77.132
# rabbitmq 端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

  1. 配置类
@Configuration
@Slf4j
public class RabbitmqConf {
    /**
     * 消息交换机的名字
     * */
    private static final String DIRECT_EXCHANGE = "DirectExchange";   // 直连交换机
    private static final String TOPIC_EXCHANGE = "TopicExchange";     // 主题交换机
    private static final String FANOUT_EXCHANGE ="FanoutExchange" ;   // 扇形交换机
    private static final String HEADERS_EXCHANGE ="HeadersExchange" ; // 头部交换机

    /**
     * 队列的名字
     * */
    private static final String DIRECT_QUEUE = "DirectQueue";
    private static final String TOPIC_QUEUE = "TopicQueue";
    private static final String FANOUT_QUEUE = "FanoutQueue";
    private static final String HEADERS_QUEUE = "HeadersQueue";

    /**
     * key
     * */
    private static final String DIRECT_KEY = "DirectKey";

    private static final String TOPIC_KEY = "Topic.#";

    /**
     * 1.队列名字
     * 2.durable="true" 是否持久化 rabbitmq重启的时候不需要创建新的队列
     * 3.auto-delete    表示消息队列没有在使用时将被自动删除 默认是false
     * 4.exclusive      表示该消息队列是否只在当前connection生效,默认是false
     */
    @Bean
    public Queue dirctQueue() {
        return new Queue(DIRECT_QUEUE,true,false,false);
    }

    @Bean
    public Queue topicQueue() {
        return new Queue(TOPIC_QUEUE,true,false,false);
    }


    @Bean
    public Queue fanoutQueue() {
        return new Queue(FANOUT_QUEUE,true,false,false);
    }

    @Bean
    public Queue headersQueue() {
        return new Queue(HEADERS_QUEUE,true,false,false);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE,true,false);
    }

    /**
     * 1.交换机名字
     * 2.durable="true" 是否持久化 rabbitmq重启的时候不需要创建新的交换机
     * 3.autoDelete    当所有消费客户端连接断开后,是否自动删除队列
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPIC_EXCHANGE,true,false);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE,true,false);
    }

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERS_EXCHANGE,true,false);
    }

    /**
     * 将direct队列和交换机进行绑定
     */
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(dirctQueue()).to(directExchange()).with(DIRECT_KEY);
    }

    @Bean
    public Binding bindingTopic() {
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_KEY);
    }


    @Bean
    public Binding bindingFanout() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding headersBinding(){
        Map<String,Object> map = new HashMap<>();
        map.put("headers1","value1");
        map.put("headers2","value2");
        return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
    }

    /**
     * 定义消息转换实例  转化成 JSON 传输  传输实体就可以不用实现序列化
     * */
    @Bean
    public MessageConverter integrationEventMessageConverter() {
        return  new Jackson2JsonMessageConverter();
    }
}

  1. 控制器
@RestController
@Slf4j
public class RabbitmqController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @Description: 发送消息
     * 直连交换机
     * 1.交换机
     * 2.key
     * 3.消息
     * 4.消息ID
     * rabbitTemplate.send(message);
     * 发消息;参数对象为org.springframework.amqp.core.Message
     * rabbitTemplate.convertAndSend(message);
     * 转换并发送消息;将参数对象转换为org.springframework.amqp.core.Message后发送,消费者不能有返回值
     * rabbitTemplate.convertSendAndReceive(message)
     * 转换并发送消息,且等待消息者返回响应消息.消费者可以有返回值
     */
    @GetMapping("/directSend")
    public void directSend() {
        String message = "direct 发送消息";
        rabbitTemplate.convertAndSend("DirectExchange", "DirectKey",
                message, new CorrelationData(UUID.randomUUID().toString()));
    }

    @GetMapping("/topicSend")
    public void topicSend() {
        String message = "topic 发送消息";
        rabbitTemplate.convertAndSend("TopicExchange", "Topic.Key",
                message, new CorrelationData(UUID.randomUUID().toString()));
    }

    @GetMapping("/fanoutSend")
    public void fanoutSend() {
        String message = "fanout 发送消息";
        rabbitTemplate.convertAndSend("FanoutExchange", "", message, new CorrelationData(UUID.randomUUID().toString()));
    }

    @GetMapping("/headersSend")
    public void headersSend() {
        Goods goods = new Goods();
        goods.setId(1L);
        goods.setName("手机");
        goods.setPrice(new BigDecimal(2000.36));
        String json = JSON.toJSONString(goods);
        MessageProperties properties = new MessageProperties();
        properties.setHeader("headers1", "value1");
        properties.setHeader("headers2", "value2");
        properties.setContentType("application/json");
        Message message = new Message(json.getBytes(), properties);
        rabbitTemplate.convertAndSend("HeadersExchange", "", message, new CorrelationData(UUID.randomUUID().toString()));
    }

    /**
     * @Description: 消费消息
     */
    @RabbitListener(queues = "DirectQueue")
    @RabbitHandler
    public void directMessage(String message) {
        log.info("DirectConsumer {} directMessage :" + message);
    }

    @RabbitListener(queues = "TopicQueue")
    @RabbitHandler
    public void topicMessage(String message) {
        log.info("TopicConsumer {} topicMessage :" + message);
    }

    @RabbitListener(queues = "FanoutQueue")
    @RabbitHandler
    public void fanoutMessage(String message) {
        log.info("FanoutConsumer {} fanoutMessage :" + message);
    }

    @RabbitListener(queues = "HeadersQueue")
    @RabbitHandler
    public void headersMessage(Message message) {
        log.info("HeadersConsumer {} headersMessage :" + message);
    }
}

  1. 测试
    直连队列:
    http://localhost:8081/directSend
消费记录
直连队列

主题队列:
http://localhost:8081/topicSend

消费记录
主题队列

扇形队列:
http://localhost:8081/fanoutSend

消费记录

扇形队列

头部队列:
http://localhost:8081/headersSend

消费记录

头部队列

注:头部发送一定要为正确的JSON格式数据。

四、开启消息发送交换机确认、重试、同步返回

  1. 配置文件
# rabbitmq 地址
spring.rabbitmq.host=192.168.77.132
# rabbitmq 端口
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 消费者端的重试
spring.rabbitmq.listener.direct.retry.enabled=true
spring.rabbitmq.listener.simple.retry.enabled=true
# 超时时间 时间配置应该诸如: D H M S 分别为:天 小时 分钟 秒(用于同步,为10秒)
spring.rabbitmq.template.reply-timeout=10S
# 设置为true的时候RabbitTemplate(生产端)能够实现重试
spring.rabbitmq.template.retry.enabled=true
# 设置为true 才会触发returnCallback回调方法的执行。
spring.rabbitmq.template.mandatory=true
#消息确认机制 --- 是否开启手ack动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual

ACK确认模式:

AcknowledgeMode.NONE :不确认

  1. 默认所有消息消费成功,会不断的向消费者推送消息
  2. 因为rabbitMq认为所有消息都被消费成功,所以队列中不在存有消息,消息存在丢失的危险

AcknowledgeMode.AUTO:自动确认

  1. 由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。 存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息,如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失;
  2. 使用自动确认模式时,需要考虑的另一件事是消费者过载

AcknowledgeMode.MANUAL:手动确认

  1. 手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者;
  2. 手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量;
#消息确认机制 --- 是否开启手ack动确认模式
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#消息确认机制 --- 是否开启手ack动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  1. 业务类
    RabbitSender.java
@Service
@Slf4j
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 实现消息发送到RabbitMQ交换器后接收ack回调,如果消息发送确认失败就进行重试.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送成功,消息ID:{}", correlationData.getId());
        } else {
            log.info("消息发送失败,消息ID:{}", correlationData.getId());
        }
    }

    /**
     * 实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}", replyCode, replyText, exchange, routingKey, new String(message.getBody()));
    }

    /**
     * convertAndSend 异步,消息是否发送成功用ConfirmCallback和ReturnCallback回调函数类确认。
     * 发送MQ消息
     */
    public void sendMessage(String exchangeName, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));
    }

    /**
     * sendMessageAndReturn 当发送消息过后,该方法会一直阻塞在哪里等待返回结果,直到请求超时,
     * 配置spring.rabbitmq.template.reply-timeout来配置超时时间。
     * 发送MQ消息并返回结果
     * 实现发送消息之后可以同步接收返回信息。
     */
    public Object sendMessageAndReturn(String exchangeName, String routingKey, Object message) {
        return rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));
    }
}

  1. 控制器
@RestController
@Slf4j
public class RabbitmqController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RabbitSender rabbitSender;

    /**
     * @Description: 发送消息
     * 1.交换机
     * 2.key
     * 3.消息
     * 4消息ID
     * rabbitTemplate.send(message);
     * 发消息;参数对象为org.springframework.amqp.core.Message
     * rabbitTemplate.convertAndSend(message);
     * 转换并发送消息;将参数对象转换为org.springframework.amqp.core.Message
     * 后发送,消费者不能有返回值
     * rabbitTemplate.convertSendAndReceive(message)
     * 转换并发送消息,且等待消息者返回响应消息.消费者可以有返回值
     */
    @GetMapping("/directSend")
    public void directSend() {
        String message = "direct 发送消息";
//        rabbitSender.sendMessage("DirectExchange", "DirectKey", message);
        // TODO 发送消息接受返回,如果无返回,则一直等待,直到超时
        Object object = rabbitSender.sendMessageAndReturn("DirectExchange", "DirectKey", message);
        log.info("生产者接受到返回:"+object.toString());
    }

    @GetMapping("/topicSend")
    public void topicSend() {
        String message = "topic 发送消息";
        rabbitSender.sendMessage("TopicExchange", "Topic.Key", message);
    }

    @GetMapping("/fanoutSend")
    public void fanoutSend() {
        String message = "fanout 发送消息";
        rabbitSender.sendMessage("FanoutExchange", "", message);
    }

    @GetMapping("/headersSend")
    public void headersSend() {
        String msg = "headers 发送消息";
        MessageProperties properties = new MessageProperties();
        properties.setHeader("headers1", "value1");
        properties.setHeader("headers2", "value2");
        Message message = new Message(msg.getBytes(), properties);
        rabbitSender.sendMessage("HeadersExchange", "", message);
    }

    /**
     * @Description: 消费消息-无返回
     */
//    @RabbitListener(queues = "DirectQueue")
//    @RabbitHandler
    public void directMessage(Channel channel, Message message) throws IOException {
        log.info("DirectConsumer {} directMessage :" + new String(message.getBody(),"utf-8"));
        // 业务处理成功后调用,消息会被确认消费
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // 业务处理失败后调用
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
        //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }

    /**
     * @Description: 消费消息-有返回(同步)
     * 用于方法:convertSendAndReceive
     */
    @RabbitListener(queues = "DirectQueue")
    @RabbitHandler
    public String directMessage(String msg,Channel channel, Message message) throws IOException {
        log.info("DirectConsumer {} directMessage :" + msg);
        // TODO 一定要有确认,否则会Unacked
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        return msg;
    }

    @RabbitListener(queues = "TopicQueue")
    @RabbitHandler
    public void topicMessage(String message) {
        log.info("TopicConsumer {} topicMessage :" + message);

    }

    @RabbitListener(queues = "FanoutQueue")
    @RabbitHandler
    public void fanoutMessage(String message) {
        log.info("FanoutConsumer {} fanoutMessage :" + message);
    }

    @RabbitListener(queues = "HeadersQueue")
    @RabbitHandler
    public void headersMessage(Message message) {
        log.info("HeadersConsumer {} headersMessage :" + message);
    }
}

附:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
// 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// ack返回false,并重新回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

五、常用配置

spring.rabbitmq.host=192.168.77.132
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
  # 开启发送确认
spring.rabbitmq.publisher-confirms=true
  # 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 消费者端的重试
spring.rabbitmq.listener.direct.retry.enabled=true
spring.rabbitmq.listener.simple.retry.enabled=true
# 消费者的最小数量
spring.rabbitmq.listener.simple.concurrency=10
# 消费者的最大数量
spring.rabbitmq.listener.simple.max-concurrency=20
# 在单个请求中处理的消息个数,他应该大于等于事务数量
spring.rabbitmq.listener.simple.prefetch=5
# 启动时自动启动容器
spring.rabbitmq.listener.simple.auto-startup=true
# 投递失败时是否重新排队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
# 超时时间 10 秒
spring.rabbitmq.template.reply-timeout=10S
# 设置为true的时候RabbitTemplate(生产端)能够实现重试
spring.rabbitmq.template.retry.enabled=true
# 第一次与第二次发布消息的时间间隔 1秒
spring.rabbitmq.template.retry.initial-interval=1S
# 尝试发布消息的最大数量 3
spring.rabbitmq.template.retry.max-attempts=3
# 尝试发布消息的最大时间间隔 10000
spring.rabbitmq.template.retry.max-interval=10000S
# 上一次尝试时间间隔的乘数 1.0
# 等待间隔 的倍数。如果为2  第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0

六、常见问题:

  1. Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
    参数有问题,不能解析成json对象
  2. replyCode: 312 replyText: NO_ROUTE
    routingKey是一个不存在的key
    由于生产端设置的是一个错误的路由key,所以消费端没有任何打印。如果我们将 Mandatory 属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印了。如果我们的路由key设置为正确的,那么消费端能够正确消费,生产端也不会进行任何打印。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,681评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,710评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,623评论 0 334
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,202评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,232评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,368评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,795评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,461评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,647评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,476评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,525评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,226评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,785评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,857评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,090评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,647评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,215评论 2 341

推荐阅读更多精彩内容