redis实现消息队列的普通订阅及stream订阅实现

环境说明

项目中使用的是springboot2.4.3版本
redis服务安装的是6.x版本
5.x版本也可以,因为redis是在5.x版本开始支持stream数据格式的。老的版本肯定是不行的

各类之间的关系图说明
普通方式的类关系说明
redis发布:订阅消息主要类说明.png
Stream方式的类关系说明
redis的stream发布:订阅消息主要类说明的.png
配置类代码

注意 配置类中有用到线程池,在这里就不展示线程池的配置代码了,可以自行百度配置一个线程池


import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.concurrent.Executor;

/**
 * redis实现消息队列相关配置
 *
 * @author LengYouNuan
 * @create 2021-08-19 上午11:28
 */
@Configuration
@EnableCaching
@Slf4j
public class RedisMessageConfig {

    @Value("${redis-message.topic.aliMsg}")
    private String aliMsgTopic;

    /**
     * 注入TaskExecutorConfig 中配置的线程池
     */
    @Resource
    private Executor taskExecutor;

    /**
     * redis消息监听器容器
     * 可以注册多个消息监听器,每个监听器可以检测多个主题
     *
     * @param redisConnectionFactory redis连接工厂
     * @param listenerAdapter        消息监听器
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
                                                                       MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        // 订阅多个频道
        redisMessageListenerContainer.addMessageListener(listenerAdapter, new PatternTopic(aliMsgTopic));
        //redisMessageListenerContainer.addMessageListener(listenerAdapter, new PatternTopic("xxxxxxx"));

        //序列化对象 发布的时候需要设置序列化;订阅方也需要设置同样的序列化
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        redisMessageListenerContainer.setTopicSerializer(seria);
        return redisMessageListenerContainer;
    }


    //表示监听一个频道
    @Bean
    public MessageListenerAdapter listenerAdapter(MessageConsumer messageConsumer) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageConsumer ”
        return new MessageListenerAdapter(messageConsumer, "getMessage");
    }

    /**
     * stream监听容器配置
     *
     * @param redisConnectionFactory redis连接工厂
     * @param streamListener         消息监听器,消息的实际消费者
     * @return
     */
    @Bean
    public StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisStreamMessageListener streamListener) {
        //构建StreamMessageListenerContainerOptions
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //超时时间
                .pollTimeout(Duration.ofSeconds(2))
                .batchSize(10)
                .targetType(String.class)
                //执行时用的线程池
                .executor(taskExecutor)
                //还可以设置序列化方式,这里不做设置,使用默认的方式
                .build();

        //创建StreamMessageListenerContainer
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
                .create(redisConnectionFactory, options);

        //指定消费最新的消息
        StreamOffset<String> offset = StreamOffset.create(aliMsgTopic, ReadOffset.lastConsumed());

        //创建消费者 指定消费者组和消费者名字(注意,这里使用到用户组时,发送消息时必须有用户组,不然会报错,消息消费不成功)
        Consumer consumer = Consumer.from("group-1", "consumer-1");

        StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer.StreamReadRequest.builder(offset)
                .errorHandler((error) -> log.error(error.getMessage()))
                .cancelOnError(e -> false)
                .consumer(consumer)
                //关闭自动ack确认
                .autoAcknowledge(false)
                .build();
        //指定消费者对象
        container.register(streamReadRequest, streamListener);
        container.start();
        return container;
    }
}

普通形式的监听类(也就是消息的消费者)
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;

/**
 * redis消息订阅者
 *
 * @author LengYouNuan
 * @create 2021-08-19 上午10:41
 */
@Component
public class MessageConsumer {
    public void getMessage(String object) {
        //使用和发布时一样的序列化方式,此处设置的消息格式是String类型,实际中可以根据业务需要设置消息格式
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(String.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        String mes = (String) seria.deserialize(object.getBytes());
        //TODO:拿到消息之后执行业务操作
        //System.out.println("消费:客户端消费了消息==》:" + mes);
    }
}
stream方式的监听类(消息的实际消费者)
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

/**
 * redis的stream消息监听
 *
 * @author LengYouNuan
 * @create 2021-08-19 下午3:10
 */
@Component
@Slf4j
public class RedisStreamMessageListener implements StreamListener<String, ObjectRecord<String, String>> {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(ObjectRecord<String, String> message) {

        // 消息ID
        RecordId messageId = message.getId();

        // 消息的key和value
        String stream = message.getStream();
        String string = message.getValue();
        log.info("========stream监听器监听到消息========它们分别是。messageId={}, stream={}, body={}", messageId,
                stream, string);


        // 通过RedisTemplate手动确认消息,确认之后消息会从队列中消失,如果不确认,可能存在重复消费
        Long acknowledge = this.stringRedisTemplate.opsForStream().acknowledge("group-1", message);
        if (acknowledge > 0) {
            System.out.println("acknowledge的值是:" + acknowledge);
        }

    }
}
最后测试类
@Resource
    private RedisTemplate redisTemplate;

    @Value("${redis-message.topic.aliMsg}")
    private String aliMsgTopic;

    @Test
    public void test1() {
        for (int i = 0; i < 50; i++) {
            redisTemplate.convertAndSend(aliMsgTopic, "主题" + aliMsgTopic + "====》发送消息" + i);
        }
    }

    /**
     * 无用户组和ack确认
     */
    @Test
    public void testStream() {
        Map<String, String> msg = new HashMap<>();
        msg.put("aaa", "消息aaaaaaa===》");
        MapRecord mapRecord = MapRecord.create(aliMsgTopic, msg);
        StringRecord stringRecord = StringRecord.of(mapRecord).withStreamKey(aliMsgTopic);
        redisTemplate.opsForStream().add(stringRecord);
    }

    /**
     * 有用户组,无ack确认
     */
    @Test
    public void testConsumerStream() {
        Map<String, String> msg = new HashMap<>();
        msg.put("aaa", "消息aaaaaaa===》");
        MapRecord mapRecord = MapRecord.create(aliMsgTopic, msg);
        StringRecord stringRecord = StringRecord.of(mapRecord).withStreamKey(aliMsgTopic);
        StreamOperations streamOperations = redisTemplate.opsForStream();



        Map<String, String> msg1 = new HashMap<>();
        msg1.put("aaa1", "消息aaaaaaa11111===》");
        MapRecord mapRecord1 = MapRecord.create(aliMsgTopic, msg1);
        StringRecord stringRecord1 = StringRecord.of(mapRecord1).withStreamKey(aliMsgTopic);


        Map<String, String> msg2 = new HashMap<>();
        msg2.put("aaa2", "消息aaaaaaa222222222===》");
        MapRecord mapRecord2 = MapRecord.create(aliMsgTopic, msg2);
        StringRecord stringRecord2 = StringRecord.of(mapRecord2).withStreamKey(aliMsgTopic);


        //注意,当用户组已经存在时,执行创建同名用户组会报错,测试时调用了销毁用户组方法,但是没有效果,业务中用到用户组的可能性不大,如果真的用到了,建议在程序初始化时创建用户组
//        String group = streamOperations.createGroup(aliMsgTopic, "group-1");
//        org.springframework.data.redis.connection.stream.Consumer consumer=
//                org.springframework.data.redis.connection.stream.Consumer.from(group, "consumer-1");
        streamOperations.add(stringRecord1);
        streamOperations.add(stringRecord);
        streamOperations.add(stringRecord2);
    }

    /**
     * 有用户组,有ack确认,ack确认实际在消费者端做,ack确认必须有用户组存在
     */
    @Test
    public void testACKStream() {
        Map<String, String> msg = new HashMap<>();
        msg.put("aaa", "消息aaaaa333333a===》");
        MapRecord mapRecord = MapRecord.create(aliMsgTopic, msg);
        StringRecord stringRecord = StringRecord.of(mapRecord).withStreamKey(aliMsgTopic);
        StreamOperations streamOperations = redisTemplate.opsForStream();
        //注意,当用户组已经存在时,执行创建同名用户组会报错,测试时调用了销毁用户组方法,但是没有效果,业务中用到用户组的可能性不大,如果真的用到了,建议在程序初始化时创建用户组
//        String group = streamOperations.createGroup(aliMsgTopic, "group-1");
//        org.springframework.data.redis.connection.stream.Consumer consumer=
//                org.springframework.data.redis.connection.stream.Consumer.from(group, "consumer-1");
        streamOperations.add(stringRecord);
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容