配置类
package com.changgou.order.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/****
* @Author:gzy
* @Description:
* @Date 2019/9/5 10:50
*****/
@Configuration
public class RabbitMQConfig {
//延迟消息
public static final String RELAY_QUEUE = "relay_queue";
//交换机名称
public static final String CANCEL_ORDER_PAY_EXCHANGE = "cancel_order_pay_exchange";
//队列名称
public static final String CANCEL_ORDER_QUEUE = "cancel_order_queue";
//实例化延迟消息,将消息队列绑定到CANCEL_ORDER_PAY_EXCHANGE交换机
@Bean
public Queue relayQueue(){
return QueueBuilder.durable(RELAY_QUEUE)
.withArgument("x-dead-letter-exchange",CANCEL_ORDER_PAY_EXCHANGE)
.withArgument("x-dead-letter-routing-key","")
.build();
}
//接收延迟消息队列消息的队列
@Bean
public Queue cancelOrderQueue(){
return new Queue(CANCEL_ORDER_QUEUE);
}
//实例化路由交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange(CANCEL_ORDER_PAY_EXCHANGE);
}
//将普通队列绑定到交换机
@Bean
public Binding binding(){
return BindingBuilder.bind(cancelOrderQueue()).to(directExchange()).with("");
}
}
向死信队列添加消息,等待5秒后消息发送给交换机
//给死信发消息
rabbitTemplate.convertAndSend(RabbitMQConfig.RELAY_QUEUE, (Object)order.getId(), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");//5秒
return message;
}
});
接收消息
/**
* 5分钟后 接收到消息
* @param orderId
*/
@RabbitListener(queues = {RabbitMQConfig.CANCEL_ORDER_QUEUE})
public void cancelOrder(String orderId){
System.out.println("接收到订单ID:" + orderId);
}