在springboot项目里使用redis做队列的,教程网上很多不赘述了,使用过程中会遇到短时间不操作(五分钟左右),redis连接就断开了,再使用redis时会报连接超时,此次操作会失败,然后几秒内会重连,重新连上之后又可以正常往redis写东西了,但是注册的监听器失效了,也就是说往队列写的消息无法消费了
监听器的配置:
@Configuration
public class SubscriberConfig {
/**
* 创建连接工厂
*
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("actionQueue"));
return container;
}
@Autowired
Receiver receiver;
/**
* 绑定消息监听者和接收监听的方法
*
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
在网上找redis的重连或者心跳机制没有太多信息
通过spring schedual自己写个定时器,10秒执行一次,手动心跳,解决了问题
@Scheduled(cron = "0/10 * * * * *")
public void timer() {
redisTemplate.opsForValue().get("heartbeat");
}
kafkastream -》redis推数据,-》spring boot中订阅数据后=》websocket推到前端
@Configuration
@EnableCaching
public class RedisConfig {
public static StringRedisTemplate stringRedisTemplate;
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
//相当于xml中的bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅了一个叫chat 的通道
container.addMessageListener(listenerAdapter, new PatternTopic("vin_*"));
//这个container 可以添加多个 messageListener
return container;
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
//也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
return new MessageListenerAdapter(receiver, "receiveMessage");
}
/**redis 读取内容的template */
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
stringRedisTemplate=new StringRedisTemplate(connectionFactory);
return stringRedisTemplate;
}
/*定时心跳*/
@Scheduled(cron = "0/10 * * * * *")
public void timer() {
stringRedisTemplate.opsForValue().get("heartbeat");
}
}
@Component
public class MessageReceiver {
static final Logger log = LoggerFactory.getLogger(MessageReceiver.class);
/**接收消息的方法*/
public void receiveMessage(String message){
transformMsg(message);
}
public void transformMsg(String record){
try{
JSONObject msg = JSON.parseObject(record);
String vin = msg.getString("vid");
if (WebSocketServerSubLogin.getVinSet().contains(vin)) {
WebSocketServerSubLogin.sendInfo(record, vin);
}
}catch (Exception e){
log.error("",e.getMessage());
}
}
}