SpringBoot 使用 redis实现 生产者/消费者模式 消息队列

概述

今天主要是来说一下如何在Springboot中使用redis实现一个 生产者/消费者模式的队列,
首先解释下几个问题。

什么是生产者/消费者模式?

消息队列一般是有两种场景

1、种是发布者订阅者模式
2、种是生产者消费者模式

生产者消费者模式 :生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。

通俗一点说就是比如找对象相亲,一堆男生坐在一个房间里,那么这么多男生就是一个个的消费者,而女生会一个一个进入到房间,而每一个女生就是生产者(绝对没有歧视的意思),点那个一个女生进入房间之后,那就是谁长得帅,会说话就能和这个女生成对,结果就是这个女生有且只能和一个男生成对。

图片如下


生产者消费者图片

所以各位程序员们赶快去找女/男朋友吧

发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。

这个理解起来就很简单了,发布者就是我,订阅者就是所有关注我的人。当我发布新的文章的时候,那么你们就都可以收到我的推送。并且看到的是同一篇文章。这就是发布者订阅者模式。

发布者订阅者模式图片
什么是redis消息队列?

要理解什么是redis消息队列我们就要先理解什么是消息队列。首先我们要理解什么是消息队列。所谓的消息队列就是一个存放消息的容器,消息们在一个队列中遵循这先进先出的原则。通常用于分布式各个服务之间传递消息。或者因为消息队列是有序并且是一个一个出的。所以用于处理部分的并发业务场景。

这里我们就大概介绍一下消息队列的概念,更详细的大家可以看下这篇文章们,我认为讲的很好新手也能看懂,消息队列其实很简单

实现消息队列框架有很多,如 rebbitMQ,ActiveMQ,等等等等。。但是由于redis的特性,所以也可以作为一种轻量型的消息队列。

因为在springboot中redis是有提供订阅者/发布者模式的消息队列,但是没有提供生产者消费者模式的。所以我们就要自己来写了。

正式开始撸代码

关键方法

讲了前面的那么多现在我们就正式开始我们的主题。

SpringBoot 使用 redis实现 生产者/消费者模式 消息队列

要写这个队列我们先要了解redis的两个方法

1、leftPush:在缓存列表的头部放入一个元素。

2、rightPop:取出缓存列表的最后一个元素。

知道这两个方法之后。我们就可以大概知道我们实现的思路了。

1)、生产者将消息使用leftpush放入缓存列表中。

2)、然后使用不停的循环使用rightPop取出缓存列表的最后一条数据。这样就达到了基本的要求。

在这里插入图片描述

流程图

首先我花了一下大概的代码结构,由于本人不懂什么UML什么图表,所以尽了最大的力气花了一幅图,但愿大家能看懂

大概解释下流程:

  1. 首先我们定义N个消费者(redisQueueConsumer)并且放入消费者容器(RedisQueueConsumerContainer);
  2. 然后程序初始化的时候从容器中获取所有消费者然后为每一个消费者分配一个监听器 (RedisQueueListener) 并且创建一个线程循环进行监听;
  3. 我们在程序中通过生产者(RedisQueueProducer)发送消息的时候监听器监听到消息把消息分到消费者。消费者在进行消费。


    在这里插入图片描述

OK,接下来我们来实现代码。LET‘S GO!!!

消息体 RedisQueueMessage

// 这是使用了 lombok插件
@Getter
@Setter
@ToString
public class RedisQueueMessage {

    // 消息
    private String content;

    public RedisQueueMessage() {
    }
    // 定义一个带参数的构造方法方便初始化
    public RedisQueueMessage(String content) {
        this.content = content;
    }
}
生产者 RedisQueueProducer
@Component
public class RedisQueueProducer {

    @Autowired
    IRedisService redisService;

    /**
     * 发送消息
     *
     * @param queueName    队列名称
     * @param redisQueueMessage 消息
     */
    public void sendMessage(String queueName, RedisQueueMessage redisQueueMessage) {
        redisService.lLSet(queueName, redisQueueMessage);
    }
}
消费者接口 RedisQueueConsumer
/**
 * redis队列消费者接口
 * @DateTime: 2020/2/27 4:45 下午
 */
public interface RedisQueueConsumer {

     // 获取队列名称
    String getQueueName();
     // 获取监听器返回的消息
    void getMessage(RedisQueueMessage redisQueueMessage);
     // 获取监听器返回的错误消息
    void error(String error);
}

存放消费者监听器线程的单例线程池 MyRedisThreadPool

/**
 * @Author:buding
 * @DateTime: 2020/2/27 3:53 下午
 */
@Slf4j
public class MyRedisThreadPool {
    // 线程池创建类
    private ThreadPoolExecutor mexecutor;

    private static MyRedisThreadPool instance;

    // 获取java虚拟机最大可用
    private final int availableProcessor = Runtime.getRuntime().availableProcessors();


    public MyRedisThreadPool(int coreNum) {
        if (mexecutor == null) {
            // 用单例模式创建线程池,保留2个核心线程,最多线程为CPU个数的2n+1的两倍.
            mexecutor = new ThreadPoolExecutor(coreNum == 0 ? 3 : coreNum, coreNum, 0L,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingDeque<Runnable>(),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());

        }

    }


    public static MyRedisThreadPool getInstance(int coreNum) {
        if (instance == null) {
            instance = new MyRedisThreadPool(coreNum);
            log.info("MyRedisThreadPool线程池已经开启");
        }
        return instance;
    }

    public void executor(Runnable runnable) {
        if (null == runnable) {
            return;
        }
        mexecutor.execute(runnable);
    }

    public void getInfo() {
        if (null == mexecutor) {
            return;
        }
        int queueSize = mexecutor.getQueue().size();
        log.info("当前排队线程数:" + queueSize);

        int activeCount = mexecutor.getActiveCount();
        log.info("当前活动线程数:" + activeCount);

        long completedTaskCount = mexecutor.getCompletedTaskCount();
        log.info("执行完成线程数:" + completedTaskCount);

        long taskCount = mexecutor.getTaskCount();
        log.info("总线程数:" + taskCount);
    }

    public void destroy() {
        if (mexecutor != null) {
            // 终止线程池
            mexecutor.shutdown();
        }
    }
}
消费者容器 RedisQueueConsumerContainer

```java
/**
 * @Author: buding
 * redis对列消费者者容器
 * @DateTime: 2020/2/27 5:22 下午
 */
@Slf4j
public class RedisQueueConsumerContainer {

   //存放消费者的map  key=消费者 queueName value=消费者对象 
    Map<String, RedisQueueConsumer> consumerMap = new HashMap<>();

    // 容器是否初始化完毕的标示
    static Boolean isRun = false;
    MyRedisThreadPool myRedisThreadPool;

    public void addConsumer(RedisQueueConsumer consumer) {
        if (consumer.getQueueName() == null) {
            log.error("【添加redis队列失败】:{}", "队列名称为null");
        } else if (null == consumerMap.get(consumer.getQueueName())) {
            consumerMap.put(consumer.getQueueName(), consumer);
            log.error("【添加redis队列成功】:{}", consumer.getQueueName());
        }
    }

    public void destroy() {
        log.info("redis消费者容器销毁");
        myRedisThreadPool.destroy();
    }

    public void init() {
        log.info("redis消费者容器初始化开始");
        isRun = true;
        myRedisThreadPool = MyRedisThreadPool.getInstance(consumerMap.size());
        consumerMap.forEach((k, v) -> {
            myRedisThreadPool.executor(new RedisQueueListener(v));
        });
    }
}
消息监听器 RedisQueueListener

/**
 * @Author: 于丁
 * 消息队列的监听器
 * @DateTime: 2020/2/27 4:48 下午
 */
@Slf4j
public class RedisQueueListener implements Runnable {

    private final Long WAITTIME = 30L;
    private RedisQueueConsumer redisQueueConsumer;

    IRedisService redisService;

    public RedisQueueListener(RedisQueueConsumer redisQueueConsumer) {
        this.redisQueueConsumer = redisQueueConsumer;
    }

    @Override
    public void run() {

        redisService = SpringContextHolder.getBean(IRedisService.class);
        log.info("redis监听器开始监听:{}", redisQueueConsumer.getQueueName());
        while (RedisQueueConsumerContainer.isRun) {
            try {
                Object object = redisService.lRPop(redisQueueConsumer.getQueueName(), WAITTIME);
                if (object != null) {
                    redisQueueConsumer.getMessage((RedisQueueMessage) object);
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.info("redis监听器错误:{}", redisQueueConsumer.getQueueName());
            }
        }

    }
}

测试

实例化一个消费者

/**
 * @Author: 
 * @DateTime: 2020/2/27 5:31 下午
 */
@Slf4j
@Component
public class OrderConsumer implements RedisQueueConsumer {


    @Override
    public String getQueueName() {
        return "orderConsumer";
    }

    @Override
    public void getMessage(RedisQueueMessage redisQueueMessage) {
      log.info("接收到了消息:"+redisQueueMessage.toString());
    }

    @Override
    public void error(String error) {

    }
}
启动配置

/**
 * @Author: 
 * redis消费者/生产者模式配置
 * @DateTime: 2020/1/15 2:59 下午
 */
@Slf4j
@Configuration
public class RedisPCQueueConfig {

    // 初始化完毕后调取 init
    @Bean(initMethod = "init", destroyMethod = "destroy")
    public RedisQueueConsumerContainer redisQueueConsumerContainer() {
        log.info("redis队列开始加载");
        RedisQueueConsumerContainer redisQueueConsumerContainer = new RedisQueueConsumerContainer();
        // 添加消费者 OrderConsumer 到消费者容器
        redisQueueConsumerContainer.addConsumer(new OrderConsumer());
        log.info("redis队列开始加载成功");
        return redisQueueConsumerContainer;
    }
}

测试接口

/**
 * @Author: 于丁
 * @DateTime: 2019/12/23 1:58 下午
 */
@RestController
@Slf4j
@RequestMapping("/test")
public class TestController {

    @Autowired
    OrderConsumer orderConsumer;
    @Autowired
    RedisQueueProducer redisQueueProducer;

    @IgnoreAuth
    @GetMapping("test")
    @DouyinApi
    public Response test() {
        redisQueueProducer.sendMessage(orderConsumer.getQueueName(), new RedisQueueMessage("测试消息"));
        return Response.success();
    }
}
测试结果
在这里插入图片描述

这里我们可以看到正常可以收到消息。

我们的使用 redis实现 生产者/消费者模式 消息队列到这里就结束了,本人也是一个小白,如果有什么不对的欢迎大家指出来。
谢谢🙏

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342