可以实现消息队列的工具有很多,例如:
ZeroMQ、Posix、SquirrelMQ、Redis、QDBM、Tokyo Tyrant、HTTPSQS等(linux平台下)
各自具备各自的特性,不在展开讨论。
其中redis 的订阅的发布,在spring boot是如何实现的,我们来看看代码:
依赖:
1.
<parent><groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.11.RELEASE</version> <relativePath/> <!-- lookup parent from repository --></parent>
2.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency></dependencies>
配置:
spring: application: name: example profiles: test redis: host: localhost port: 6379 password: database: 0 lettuce: pool: 10 min-idle: 10 max-active: 20 max-wait: 10000
注册监听器:
监听消息:
发送信息:
redisTemplate.convertAndSend(channel,message)
这样下来,感觉似乎很完美,但是,无法保证消息真的/一定被消费了,例如客户端关机,宕机,或重启,都会导致数据丢失。因为,这些信息一旦发出,就没有历史记录。专业点就是,不可持久化 和没有ack确认机制。
那么 redis 5.x版本 的stream数据结构很好的解决了这个问题,不但可以发布订阅,还可以能持久化,最重要的是 有ack机制,极大程度上保证了数据的一致性。
好了,尝试搞起来!
进入官网:spring.io
选择spring data
打开 spring data redis 的文档
最新版本 2020-7-20发布的 2.3.2版本。其对应的srping cloud Hoxton SR6,spring boot 2.3.2.RELEASE
依赖和配置和之前一样,
注意的是:
意思就是说,暂时只支持 lettuce 连接池,
下面看看如何实现。
先实现监听器接口:StreamListener
只有一个方法,onMessage(V var) ===>你可以 lombok实现也行。
我们下面做个测试接收下消息。
然后 注册 监听器:
发送消息:
上面的意思是有两种方式,一个是传入字节数组,另一个则是 Map, 也可以说是任何对象。
翻开 StringRecord 源码:
看官方提供的第二个构造 接受的是 string -> json
再看 StreamRecords.string(data),其参数为 Map<String,String>
最后测试下:
简单的从web端push一条消息。
看看打印结果:
1. 这个明显可能是 redis版本问题,更新redis 版本 5x以上
2. 可能是 StreamRecords.string(data)参数问题,换成:Collections.singletonMap( k,v) 的SingletonMap 问题就解决了:
究竟是什么原因呢?
细看
Record ,StreamRecords 自行研究这两个的源码吧····
另外 :
那好,我们来尝试,最后一个策略,并且在消费完消息后 加上ack确认机制:
结果:
分组 group 可以在客户端创建 ,也可以程序启动时初始化:
key 就是 上面的stream , group 自定义取名字:我这里 是mygroup
至于ReadOffset 参数,自行研究源码即可,不难!读取数据的策略而已。
最后别忘记了在注册监听器加上 group;