上一章讲解了RabbitMQ的一些基础概念,包括:RabbitMQ概念、生产者(producer)、消费者(consumer)、信道(channel)、队列(queue)、交换器(exchange)(direct、fanout、topic)、绑定(binding)、路由键(routing key)、持久化(durable)等,本章开始写第一个HelloWorld程序,话不多说,直接上代码。
1. 项目搭建配置
SpringBoot项目搭建
略Maven引入RabbitMQ jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置RabbitMQ
# RabbitMQ配置:IP、端口、用户名、密码、vhost
spring.rabbitmq.host=192.168.89.168
spring.rabbitmq.port=5672
spring.rabbitmq.username=fzb
spring.rabbitmq.password=fzb2019
spring.rabbitmq.virtual-host=fzb_host
- 生成User表
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`name` varchar(255) DEFAULT NULL COMMENT '姓名',
`age` int(11) DEFAULT NULL COMMENT '年龄',
`birthday` timestamp NULL DEFAULT NULL COMMENT '生日',
`salary` decimal(10,2) DEFAULT NULL COMMENT '年薪',
`create_date` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_date` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
INSERT INTO `user` VALUES ('1', '张三', '18', '2008-02-29 15:47:42', '5000000.00', '2008-02-29 15:47:42', '2019-04-12 14:35:24');
INSERT INTO `user` VALUES ('2', '李四', '17', '2008-02-29 15:47:42', '5000000.00', '2019-03-01 15:48:09', '2019-04-12 14:35:29');
INSERT INTO `user` VALUES ('3', '王五', '3', '2018-02-28 15:49:15', '50000000.00', '2019-03-04 09:38:09', '2019-04-12 14:35:16');
2. 基于代码消息队列示例
- 新建配置 MQConfig.java类,@Component 把该类注册成组件, @Bean 创建交换器、队列及他们的绑定关系。
- 声明Exchange(交换器名称,durable,autoDelete)
- 声明Queue(队列名称,durable,autoDelete)
- 绑定:BindingBuilder绑定队列到交换器,并设置路由键。
package com.fzb.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
/**
* @Description MQ配置信息
*
* 基于代码的绑定交换器、队列、路由键设置
* 1. 声明Exchange(交换器名称,durable,autoDelete)
* 2. 声明Queue(队列名称,durable,autoDelete)
* 3. 绑定:BindingBuilder绑定队列到交换器,并设置路由键
* @Author jxb
* @Date 2019-03-10 10:25:30
*/
@Component
public class MQConfig {
/**
* @Description 创建1:1 类型交换器(direct)
* new DirectExchange(String,boolean,boolean)
* new FanoutExchange(String,boolean,boolean)
* new TopicExchange(String,boolean,boolean)
* 1. 交换器名称
* 2. durable 是否持久化 默认true
* 3. autoDelete 是否自动删除 默认false
* @Author jxb
* @Date 2019-03-02 14:26:59
*/
@Bean
private DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
/**
* @Description 创建队列
* new Queue(String,boolean,boolean,boolean)
* 1. 队列名称
* 2. durable 是否持久化 默认true
* 3. exclusive 排他队列,第一个链接消费后自动删除 默认 false
* 4. autoDelete 是否自动删除 默认false
* @Author jxb
* @Date 2019-03-02 14:12:31
*/
@Bean
private Queue directQueue() {
return new Queue("direct.queue");
}
/**
* @Description 绑定队列、交换器、路由键
* @Author jxb
* @Date 2019-03-04 16:43:08
*/
@Bean
private Binding bindingDirect() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with("HelloWorld");
}
}
- 新建生产者 MQProducer.java类,@RestController 注解为一个控制类,@RequestMapping("mqProducer")
设置访问路径。 - @Autowired 注入RabbitTemplate工具类(SpringBoot集成RabbitMQ自带的)
- @Autowired 注入userService访问数据库(需连接数据库访问User表)
package com.fzb.rabbitmq.producer;
import com.fzb.user.bean.User;
import com.fzb.user.service.UserService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
/**
* @Description 生产者
* @Author jxb
* @Date 2019-03-09 09:43:47
*/
@RestController
@RequestMapping("mqProducer")
public class MQProducer {
@Autowired
public RabbitTemplate rabbitTemplate;
@Autowired
public UserService userService;
/**
* @Description direct 1:1 类型 交换器队列 生产
* @Author jxb
* @Date 2019-03-09 09:56:45
*/
@RequestMapping(value = "/directMQ", method = {RequestMethod.GET})
public List<User> directMQ() {
List<User> users = userService.getUserList(null);
for (User user : users) {
CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
rabbitTemplate.convertAndSend("direct.exchange", "HelloWorld", user, correlationData);
}
return users;
}
}
- 新建消费者 MQConsumer.java类,@Component 注册为组件
package com.fzb.rabbitmq.consumer;
import com.fzb.user.bean.User;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* @Description 消费者
* @Author jxb
* @Date 2019-03-09 09:43:47
*/
@Component
public class MQConsumer {
/**
* @Description direct 1:1 类型 交换器队列 消费
* @Author jxb
* @Date 2019-03-09 09:58:12
*/
@RabbitListener(queues = "direct.queue")
public void getDirectMessage(User user) throws Exception {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getDirectMessage:" + user.toString());
}
/**
* @Description 配合楼上的队列,消费同一个队列,均匀分配到两个消费者
* @Author jxb
* @Date 2019-03-02 14:53:28
*/
@RabbitListener(queues = "direct.queue")
public void getDirectMessageCopy(User user) throws Exception {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
}
}
至此一个基础的SpringBoot集成RabbitMQ的direct类型交换器队列创建完成。推荐基于注解试编程,更直观也符合SpringBoot约定大于配置的思想,所以本文会着重介绍基于注解讲解。
3. 基于注解消息队列示例
1. direct类型
- 注释掉 MQConfig.java的 @Component 注解
- 生产者不做修改
- 消费者
package com.fzb.rabbitmq.consumer;
import com.fzb.user.bean.User;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 基于注解的绑定交换器、队列、路由键设置
* 1. Queue配置:value=队列名称、durable=是否持久化(默认true)、exclusive=排他队列只在当前connection可用(默认false)、autoDelete=如无消息是否自动删除(默认false)
* 2. Exchange配置:value=交换器名称、type=类型(默认direct)、durable=是否持久化(默认true)、autoDelete=如无消息是否自动删除(默认false)
* 3. QueueBinding配置:key=路由键(string数组,支持* # 匹配),*必须匹配一个单词,#匹配0个或N个单词,用.分隔
* 4. RabbitListener配置: bindings=Queue配置+Exchange配置+QueueBinding配置
* 注:如果代码创建交换器等且配置绑定关系,注解只需监听队列即可,如:@RabbitListener(queues = "direct.queue")
*
* @Description 消费者
* @Author jxb
* @Date 2019-03-09 09:43:47
*/
@Component
public class MQConsumer {
/**
* @Description direct 1:1 类型 交换器队列 消费
* @Author jxb
* @Date 2019-03-09 09:58:12
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.queue"), exchange = @Exchange(value = "direct.exchange"), key = "HelloWorld")})
public void getDirectMessage(User user) throws Exception {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
}
/**
* @Description 配合楼上的队列,消费同一个队列,均匀分配到两个消费者
* @Author jxb
* @Date 2019-03-02 14:53:28
*/
@RabbitListener(queues = "direct.queue")
public void getDirectMessageCopy(User user) throws Exception {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
}
}
2. fanout类型
- 生产者,1:N模式,所以不需要设置路由键,即使设置也会忽略
/**
* @Description fanout 1:n 类型 交换器队列 生产
* @Author jxb
* @Date 2019-03-09 09:56:45
*/
@RequestMapping(value = "/fanoutMQ", method = {RequestMethod.GET})
public List<User> fanoutMQ() {
List<User> users = userService.getUserList(null);
for (User user : users) {
rabbitTemplate.convertAndSend("fanout.exchange", "", user.getName());
}
return users;
}
- 消费者,定义了三个消费者,可以根据同一条消息做出不同的动作
/**
* @Description fanout 1:n 类型 交换器队列 消费(3个)
* @Author jxb
* @Date 2019-03-09 09:58:12
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.01"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
public void getFanoutMessage01(String message) throws InterruptedException {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getFanoutMessage01:" + "短信通知:您好," + message + "!感谢您成为FZB会员");
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.02"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
public void getFanoutMessage02(String message) throws InterruptedException {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getFanoutMessage02:" + "增加积分:您好," + message + "!您的当前积分为100");
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.03"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
public void getFanoutMessage03(String message) throws InterruptedException {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getFanoutMessage03:" + "通知好友:您好,您的朋友" + message + "已成为FZB会员,赶快一起互动吧");
}
3. topic类型
- 生产者,三种不同路由键的topic交换器,会根据规则路由到不同的队列
/**
* @Description topic n:1 类型 交换器队列 生产(3个)
* @Author jxb
* @Date 2019-03-09 09:56:45
*/
@RequestMapping(value = "/topicMQ01", method = {RequestMethod.GET})
public List<User> topicMQ01() {
List<User> users = userService.getUserList(null);
for (User user : users) {
rabbitTemplate.convertAndSend("topic.exchange", "jd.reg.msg", user.getName());
}
return users;
}
@RequestMapping(value = "/topicMQ02", method = {RequestMethod.GET})
public List<User> topicMQ02() {
List<User> users = userService.getUserList(null);
for (User user : users) {
rabbitTemplate.convertAndSend("topic.exchange", "tm.reg.msg", user.getName());
}
return users;
}
@RequestMapping(value = "/topicMQ03", method = {RequestMethod.GET})
public List<User> topicMQ03() {
List<User> users = userService.getUserList(null);
for (User user : users) {
rabbitTemplate.convertAndSend("topic.exchange", "super.fzb.reg.msg", user.getName());
}
return users;
}
- 消费者,模糊匹配规则:“.”把路由键分成了几部分,“*”匹配一个词,“#”匹配0个或N个。
/**
* @Description topic n:1 类型 交换器队列 消费(普通会员注册提醒)
* @Author jxb
* @Date 2019-03-02 14:55:16
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"), exchange = @Exchange(value = "topic.exchange", type = "topic"), key = {"*.reg.msg"})})
public void getTopicMessage01(String message) throws InterruptedException {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getTopicMessage01:" + "短信通知:您好," + message + "!感谢您成为FZB会员");
}
/**
* @Description topic n:1 类型 交换器队列 消费(超级会员注册提醒)
* @Author jxb
* @Date 2019-03-02 14:55:16
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "topic.queue.02"), exchange = @Exchange(value = "topic.exchange", type = "topic"), key = {"*.*.reg.msg.#"})})
public void getTopicMessage02(String message) throws InterruptedException {
// 模拟执行任务
Thread.sleep(1000);
System.out.println("--jxb--MQConsumer--getTopicMessage02:" + "短信通知:您好," + message + "!感谢您成为FZB超级会员,祝您玩的开心");
}
direct、fanout、topic介绍完成,运行结果,自行检验。
生活好苦,但你好甜