概述
今天主要是来说一下如何在Springboot中使用redis实现一个 生产者/消费者模式的队列,
首先解释下几个问题。
什么是生产者/消费者模式?
消息队列一般是有两种场景
1、种是发布者订阅者模式
2、种是生产者消费者模式
生产者消费者模式 :生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。
通俗一点说就是比如找对象相亲,一堆男生坐在一个房间里,那么这么多男生就是一个个的消费者,而女生会一个一个进入到房间,而每一个女生就是生产者(绝对没有歧视的意思),点那个一个女生进入房间之后,那就是谁长得帅,会说话就能和这个女生成对,结果就是这个女生有且只能和一个男生成对。
图片如下
所以各位程序员们赶快去找女/男朋友吧
发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。
这个理解起来就很简单了,发布者就是我,订阅者就是所有关注我的人。当我发布新的文章的时候,那么你们就都可以收到我的推送。并且看到的是同一篇文章。这就是发布者订阅者模式。
什么是redis消息队列?
要理解什么是redis消息队列我们就要先理解什么是消息队列。首先我们要理解什么是消息队列。所谓的消息队列就是一个存放消息的容器,消息们在一个队列中遵循这先进先出的原则。通常用于分布式各个服务之间传递消息。或者因为消息队列是有序并且是一个一个出的。所以用于处理部分的并发业务场景。
这里我们就大概介绍一下消息队列的概念,更详细的大家可以看下这篇文章们,我认为讲的很好新手也能看懂,消息队列其实很简单
实现消息队列框架有很多,如 rebbitMQ,ActiveMQ,等等等等。。但是由于redis的特性,所以也可以作为一种轻量型的消息队列。
因为在springboot中redis是有提供订阅者/发布者模式的消息队列,但是没有提供生产者消费者模式的。所以我们就要自己来写了。
正式开始撸代码
关键方法
讲了前面的那么多现在我们就正式开始我们的主题。
SpringBoot 使用 redis实现 生产者/消费者模式 消息队列
要写这个队列我们先要了解redis的两个方法
1、leftPush:在缓存列表的头部放入一个元素。
2、rightPop:取出缓存列表的最后一个元素。
知道这两个方法之后。我们就可以大概知道我们实现的思路了。
1)、生产者将消息使用leftpush放入缓存列表中。
2)、然后使用不停的循环使用rightPop取出缓存列表的最后一条数据。这样就达到了基本的要求。
流程图
首先我花了一下大概的代码结构,由于本人不懂什么UML什么图表,所以尽了最大的力气花了一幅图,但愿大家能看懂
大概解释下流程:
- 首先我们定义N个消费者(redisQueueConsumer)并且放入消费者容器(RedisQueueConsumerContainer);
- 然后程序初始化的时候从容器中获取所有消费者然后为每一个消费者分配一个监听器 (RedisQueueListener) 并且创建一个线程循环进行监听;
-
我们在程序中通过生产者(RedisQueueProducer)发送消息的时候监听器监听到消息把消息分到消费者。消费者在进行消费。
OK,接下来我们来实现代码。LET‘S GO!!!
消息体 RedisQueueMessage
// 这是使用了 lombok插件
@Getter
@Setter
@ToString
public class RedisQueueMessage {
// 消息
private String content;
public RedisQueueMessage() {
}
// 定义一个带参数的构造方法方便初始化
public RedisQueueMessage(String content) {
this.content = content;
}
}
生产者 RedisQueueProducer
@Component
public class RedisQueueProducer {
@Autowired
IRedisService redisService;
/**
* 发送消息
*
* @param queueName 队列名称
* @param redisQueueMessage 消息
*/
public void sendMessage(String queueName, RedisQueueMessage redisQueueMessage) {
redisService.lLSet(queueName, redisQueueMessage);
}
}
消费者接口 RedisQueueConsumer
/**
* redis队列消费者接口
* @DateTime: 2020/2/27 4:45 下午
*/
public interface RedisQueueConsumer {
// 获取队列名称
String getQueueName();
// 获取监听器返回的消息
void getMessage(RedisQueueMessage redisQueueMessage);
// 获取监听器返回的错误消息
void error(String error);
}
存放消费者监听器线程的单例线程池 MyRedisThreadPool
/**
* @Author:buding
* @DateTime: 2020/2/27 3:53 下午
*/
@Slf4j
public class MyRedisThreadPool {
// 线程池创建类
private ThreadPoolExecutor mexecutor;
private static MyRedisThreadPool instance;
// 获取java虚拟机最大可用
private final int availableProcessor = Runtime.getRuntime().availableProcessors();
public MyRedisThreadPool(int coreNum) {
if (mexecutor == null) {
// 用单例模式创建线程池,保留2个核心线程,最多线程为CPU个数的2n+1的两倍.
mexecutor = new ThreadPoolExecutor(coreNum == 0 ? 3 : coreNum, coreNum, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
}
public static MyRedisThreadPool getInstance(int coreNum) {
if (instance == null) {
instance = new MyRedisThreadPool(coreNum);
log.info("MyRedisThreadPool线程池已经开启");
}
return instance;
}
public void executor(Runnable runnable) {
if (null == runnable) {
return;
}
mexecutor.execute(runnable);
}
public void getInfo() {
if (null == mexecutor) {
return;
}
int queueSize = mexecutor.getQueue().size();
log.info("当前排队线程数:" + queueSize);
int activeCount = mexecutor.getActiveCount();
log.info("当前活动线程数:" + activeCount);
long completedTaskCount = mexecutor.getCompletedTaskCount();
log.info("执行完成线程数:" + completedTaskCount);
long taskCount = mexecutor.getTaskCount();
log.info("总线程数:" + taskCount);
}
public void destroy() {
if (mexecutor != null) {
// 终止线程池
mexecutor.shutdown();
}
}
}
消费者容器 RedisQueueConsumerContainer
```java
/**
* @Author: buding
* redis对列消费者者容器
* @DateTime: 2020/2/27 5:22 下午
*/
@Slf4j
public class RedisQueueConsumerContainer {
//存放消费者的map key=消费者 queueName value=消费者对象
Map<String, RedisQueueConsumer> consumerMap = new HashMap<>();
// 容器是否初始化完毕的标示
static Boolean isRun = false;
MyRedisThreadPool myRedisThreadPool;
public void addConsumer(RedisQueueConsumer consumer) {
if (consumer.getQueueName() == null) {
log.error("【添加redis队列失败】:{}", "队列名称为null");
} else if (null == consumerMap.get(consumer.getQueueName())) {
consumerMap.put(consumer.getQueueName(), consumer);
log.error("【添加redis队列成功】:{}", consumer.getQueueName());
}
}
public void destroy() {
log.info("redis消费者容器销毁");
myRedisThreadPool.destroy();
}
public void init() {
log.info("redis消费者容器初始化开始");
isRun = true;
myRedisThreadPool = MyRedisThreadPool.getInstance(consumerMap.size());
consumerMap.forEach((k, v) -> {
myRedisThreadPool.executor(new RedisQueueListener(v));
});
}
}
消息监听器 RedisQueueListener
/**
* @Author: 于丁
* 消息队列的监听器
* @DateTime: 2020/2/27 4:48 下午
*/
@Slf4j
public class RedisQueueListener implements Runnable {
private final Long WAITTIME = 30L;
private RedisQueueConsumer redisQueueConsumer;
IRedisService redisService;
public RedisQueueListener(RedisQueueConsumer redisQueueConsumer) {
this.redisQueueConsumer = redisQueueConsumer;
}
@Override
public void run() {
redisService = SpringContextHolder.getBean(IRedisService.class);
log.info("redis监听器开始监听:{}", redisQueueConsumer.getQueueName());
while (RedisQueueConsumerContainer.isRun) {
try {
Object object = redisService.lRPop(redisQueueConsumer.getQueueName(), WAITTIME);
if (object != null) {
redisQueueConsumer.getMessage((RedisQueueMessage) object);
}
} catch (Exception e) {
e.printStackTrace();
log.info("redis监听器错误:{}", redisQueueConsumer.getQueueName());
}
}
}
}
测试
实例化一个消费者
/**
* @Author:
* @DateTime: 2020/2/27 5:31 下午
*/
@Slf4j
@Component
public class OrderConsumer implements RedisQueueConsumer {
@Override
public String getQueueName() {
return "orderConsumer";
}
@Override
public void getMessage(RedisQueueMessage redisQueueMessage) {
log.info("接收到了消息:"+redisQueueMessage.toString());
}
@Override
public void error(String error) {
}
}
启动配置
/**
* @Author:
* redis消费者/生产者模式配置
* @DateTime: 2020/1/15 2:59 下午
*/
@Slf4j
@Configuration
public class RedisPCQueueConfig {
// 初始化完毕后调取 init
@Bean(initMethod = "init", destroyMethod = "destroy")
public RedisQueueConsumerContainer redisQueueConsumerContainer() {
log.info("redis队列开始加载");
RedisQueueConsumerContainer redisQueueConsumerContainer = new RedisQueueConsumerContainer();
// 添加消费者 OrderConsumer 到消费者容器
redisQueueConsumerContainer.addConsumer(new OrderConsumer());
log.info("redis队列开始加载成功");
return redisQueueConsumerContainer;
}
}
测试接口
/**
* @Author: 于丁
* @DateTime: 2019/12/23 1:58 下午
*/
@RestController
@Slf4j
@RequestMapping("/test")
public class TestController {
@Autowired
OrderConsumer orderConsumer;
@Autowired
RedisQueueProducer redisQueueProducer;
@IgnoreAuth
@GetMapping("test")
@DouyinApi
public Response test() {
redisQueueProducer.sendMessage(orderConsumer.getQueueName(), new RedisQueueMessage("测试消息"));
return Response.success();
}
}
测试结果
这里我们可以看到正常可以收到消息。
我们的使用 redis实现 生产者/消费者模式 消息队列到这里就结束了,本人也是一个小白,如果有什么不对的欢迎大家指出来。
谢谢🙏