传输对象的方法有:
- 使用序列化工具
- Convertor方法
自定义转换器:
@Bean(name = "userRabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setExchange("FacadeReservationDelayExchange");
template.setRoutingKey("FacadeReservationThirdDelayQueue");
return template;
}
一般延迟队列的方式有两种:
1、在配置类中增加 "x-message-ttl"
配置
2、在发送的时候对 Message
的 Properties
属性,增加过期时间
//第一种方式
@Bean
Queue messageFirstTtlQueue() {
return QueueBuilder
.durable("FacadeReservationFirstDelayQueue")
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", "FacadeReservationDirectExchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "FacadeReservationDirectQueue")
.withArgument("x-message-ttl", 10000)
.build();
}
//第二种方式
rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
// 设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
下面是主要的代码部分:
一、RabbitMQ
的交换机和队列的配置类
主要的流程是:
1、定义交换机Exchange
2、定义队列Queue
3、绑定关系Exchange
和Queue
如果需要对队列
增加自定配置,可以自定义 SimpleMessageListenerContainer
。
@Configuration
public class MessageRabbitMQConfiguration {
/**
* 消息中心实际消费队列交换配置
*/
@Bean
DirectExchange messageDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange("FacadeReservationDirectExchange")
.durable(true)
.build();
}
/**
* 消息中心延迟消费交换配置
*/
@Bean
DirectExchange messageFirstTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange("FacadeReservationFirstDelayExchange")
.durable(true)
.build();
}
/**
* 消息中心延迟消费交换配置
*/
@Bean
DirectExchange messageSecondTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange("FacadeReservationSecondDelayExchange")
.durable(true)
.build();
}
/**
* 消息中心延迟消费交换配置
*/
@Bean
DirectExchange messageThirdTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange("FacadeReservationThirdDelayExchange")
.durable(true)
.build();
}
/**
* 消息中心实际消费队列配置
*/
@Bean
public Queue messageQueue() {
return new Queue("FacadeReservationDirectQueue", true);
}
/**
* 消息中心TTL第一级延迟队列
*/
@Bean
Queue messageFirstTtlQueue() {
return QueueBuilder
.durable("FacadeReservationFirstDelayQueue")
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", "FacadeReservationDirectExchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "FacadeReservationDirectQueue")
.withArgument("x-message-ttl", 10000)
.build();
}
/**
* 消息中心TTL第二级延迟队列
* 往第一级延迟队列发
*/
@Bean
Queue messageSecondTtlQueue() {
return QueueBuilder
.durable("FacadeReservationSecondDelayQueue")
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", "FacadeReservationFirstDelayExchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "FacadeReservationFirstDelayQueue")
.withArgument("x-message-ttl", 10000)
.build();
}
/**
* 消息中心TTL第三级延迟队列
* 往第二级延迟队列发
*/
@Bean
Queue messageThirdTtlQueue() {
return QueueBuilder
.durable("FacadeReservationThirdDelayQueue")
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", "FacadeReservationSecondDelayExchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "FacadeReservationSecondDelayQueue")
.withArgument("x-message-ttl", 10000)
.build();
}
/**
* 消息中心实际消息交换与队列绑定
*
* @param messageDirect 消息中心交换配置
* @param messageQueue 消息中心队列
*/
@Bean
Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
return BindingBuilder
.bind(messageQueue)
.to(messageDirect)
.with("FacadeReservationDirectQueue");
}
/**
* 消息中心TTL绑定实际消息中心实际消费交换机
*/
@Bean
public Binding messageFirstTtlBinding(Queue messageFirstTtlQueue, DirectExchange messageFirstTtlDirect) {
return BindingBuilder
.bind(messageFirstTtlQueue)
.to(messageFirstTtlDirect)
.with("FacadeReservationFirstDelayQueue");
}
@Bean
public Binding messageSecondTtlBinding(Queue messageSecondTtlQueue, DirectExchange messageSecondTtlDirect) {
return BindingBuilder
.bind(messageSecondTtlQueue)
.to(messageSecondTtlDirect)
.with("FacadeReservationSecondDelayQueue");
}
@Bean
public Binding messageThirdTtlBinding(Queue messageThirdTtlQueue, DirectExchange messageThirdTtlDirect) {
return BindingBuilder
.bind(messageThirdTtlQueue)
.to(messageThirdTtlDirect)
.with("FacadeReservationThirdDelayQueue");
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(10);
// 监听的队列
container.setQueueNames("FacadeReservationDirectQueue");
// NONE 代表自动确认
container.setAcknowledgeMode(AcknowledgeMode.NONE);
//消息监听处理
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
});
return container;
}
@Bean(name = "userRabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
template.setExchange("FacadeReservationDelayExchange");
template.setRoutingKey("FacadeReservationThirdDelayQueue");
return template;
}
}
关于自定义配置中使用的 Message
监听类,主要有两种:
1、
MessageListener
自动Ack时可以使用
2、ChannelAwareMessageListener
需要手动Ack时
@Service
public class MyMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag,false);
}catch (Exception e){
}
System.out.println("MyMessageListener收到消息: " + message);
}
}
二、配置 application.properties
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
#手动ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
三、发送类
RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback
是用于确认消息是否收到。
@Component
@Slf4j
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitMqTemplate;
/**
* 发送延迟消息
*
* @param messageContent 消息内容
* @param exchange 队列交换
* @param routerKey 队列交换绑定的路由键
* @param delayTimes 延迟时长,单位:毫秒
*/
public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
if (!StringUtils.isEmpty(exchange)) {
log.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
// 执行发送消息到指定队列
rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
// 设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
} else {
log.error("未找到队列消息:{},所属的交换机", exchange);
}
}
/**
* 发送延迟消息
*
* @param messageContent 消息内容
* @param exchange 队列交换
* @param routerKey 队列交换绑定的路由键
*/
public void sendMessageOnly(Object messageContent, String exchange, String routerKey) {
if (!StringUtils.isEmpty(exchange)) {
log.info("写入消息队列:{},消息内容:{}", routerKey, JSON.toJSONString(messageContent));
// 执行发送消息到指定队列
rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent);
} else {
log.error("未找到队列消息:{},所属的交换机", exchange);
}
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println(" 回调id:" + correlationData);
if (b) {
System.out.println("消息成功消费");
} else {
System.out.println("消息消费失败:" + s);
}
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println(message.getMessageProperties() + " 发送失败");
}
}
四、测试类
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class MessageSenderTest {
/**
* 消息队列提供者
*/
@Autowired
private MessageSender messageProvider;
/**
* 测试延迟消息消费
*/
@Test
public void testLazy() throws Exception{
// 测试延迟10秒
for (int i = 0; i < 20; i++) {
messageProvider.sendMessageOnly("测试延迟消费,写入时间:" + new Date(),
"FacadeReservationThirdDelayExchange",
"FacadeReservationThirdDelayQueue");
Thread.sleep(1000);
}
}
}