下载延迟消息插件
Warning:请检查你的RabbitMQ版本与插件兼容是否一致
下载完后,将插件上传到服务器
我上传的地址是: /usr/etc/rabbitmq_plugins
在Docker环境下,安装延迟消息插件
侵入容器找到plugins目录
> docker exec -it rabbitmq bash
## 可以看到,plugins就是存放 mq 插件的地方了
> ls
将插件复制到plugins目录下
> cd /usr/etc/rabbitmq_plugins
> docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins
回到plugins目录,查看plugins中是否有rabbitmq_delayed_message_exchange
插件
激活插件
> rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启RabbitMQ
> docker restart rabbitmq
进入RabbitMQ管理界面查看插件是否成功生效
OK,完成以上工作,就可以编写Java代码发送延迟消息了。
SpringBoot中发送延迟消息
Config
package com.xjm.mid.compent.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author jaymin
* 2020/09/22
*/
@Configuration
public class RabbitMQDelayedMessageConfig {
/**
* 延迟消息交换机
*/
public final static String DELAY_EXCHANGE = "jaymin.delay.exchange";
/**
* 队列
*/
public final static String DELAY_QUEUE = "jaymin.delay.queue";
/**
* 路由Key
*/
public final static String DELAY_ROUTING_KEY = "jaymin.delay.routingKey";
@Bean
public CustomExchange delayMessageExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//自定义交换机
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", false, false, args);
}
@Bean
public Queue delayMessageQueue() {
return new Queue(DELAY_QUEUE, false, false, false);
}
@Bean
public Binding bindingDelayExchangeAndQueue() {
return BindingBuilder.bind(delayMessageQueue()).to(delayMessageExchange()).with(DELAY_ROUTING_KEY).noargs();
}
}
Client
package com.xjm.mid.compent.rabbitmq.web;
import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayedMessageConfig;
import com.xjm.mid.compent.rabbitmq.model.Letter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
/**
* @author jaymin
* 2020/09/22
*/
@RestController
@RequestMapping("/message/delayed")
@Slf4j
public class DelayedMessageClient {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/ten")
public void sendDelayedMessage1(){
Letter letter = new Letter();
letter.setRecipient("福尔摩斯");
letter.setContext("您的10S外卖到了!");
Integer ttl = 10000;
rabbitTemplate.convertAndSend(RabbitMQDelayedMessageConfig.DELAY_EXCHANGE, RabbitMQDelayedMessageConfig.DELAY_ROUTING_KEY, letter, message -> {
// 设置过期时间
message.getMessageProperties().setDelay(ttl);
return message;
});
log.info("[发送时间] - [{}]-[过期时间]-[{}]", LocalDateTime.now(),ttl/1000);
}
@PostMapping("/five")
public void sendDelayedMessage2(){
Letter letter = new Letter();
letter.setRecipient("福尔摩斯");
letter.setContext("您的5S外卖到了!");
Integer ttl = 5000;
rabbitTemplate.convertAndSend(RabbitMQDelayedMessageConfig.DELAY_EXCHANGE, RabbitMQDelayedMessageConfig.DELAY_ROUTING_KEY, letter, message -> {
// 设置过期时间
message.getMessageProperties().setDelay(ttl);
return message;
});
log.info("[发送时间] - [{}]-[过期时间]-[{}]", LocalDateTime.now(),ttl/1000);
}
}
Listener
package com.xjm.mid.compent.rabbitmq.web;
import com.rabbitmq.client.Channel;
import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayedMessageConfig;
import com.xjm.mid.compent.rabbitmq.model.Letter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
/**
* @author jaymin
* 2020/09/22
*/
@Component
@Slf4j
public class DelayMessageListener {
@RabbitListener(queues = {RabbitMQDelayedMessageConfig.DELAY_QUEUE})
@RabbitHandler
public void receiveMessage(Channel channel, Message message, Letter letter) {
log.info("[listenerDelayQueue 监听的消息] - [消费时间] - [{}] - [{}]", LocalDateTime.now(), letter.toString());
}
}
Result
rabbitmq 的延时插件极限时间是 8byte 长度 ms,大概 49天。如果你的延时时间很长,建议配合定时任务进行处理。