SpringCloud Stream整合RocketMQ

前言

1.rocketmq 安装可参考:https://www.jianshu.com/p/f3713adfa3dd
2.启动好nameserv 和 broker
3.官方RocketMQ+springcloud stream 例子 https://github.com/alibaba/spring-cloud-alibaba/blob/2021.x/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md

  1. 本文将说明普通消息发送/消费、广播消息发送/消费、延时消息发送消费三种模式

项目环境/依赖:

    <properties>
        <spring-boot-version>2.3.12.RELEASE</spring-boot-version>
        <spring-cloud-version>Hoxton.SR12</spring-cloud-version>
        <spring-cloud-alibaba-version>2.2.7.RELEASE</spring-cloud-alibaba-version>
        <rocketmq.version>2021.1</rocketmq.version>
    </properties>
    !-- Environment START-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>${spring-boot-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-dependencies</artifactId>
        <version>${spring-cloud-alibaba-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-stream-rocketmq -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        <version>${rocketmq.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-acl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.4</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-acl</artifactId>
        <version>4.9.4</version>
    </dependency>

依赖说明:spring-cloud-starter-stream-rocketmq 排除了rocketmq-client、rocketmq-acl依赖是因为我想换成新一点的依赖,不排除也是可以的。

1.普通消息发送

新建模块A用于消息发送
创建一个controller用户测试消息发送

@RestController
public class RocketMqSendMsgController {

    @Autowired
    private StreamBridge streamBridge;

    @PostMapping(value = "/cluster")
    public void sendClusterMsg(@RequestParam("message") String message) {
        Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(CLUSTER_MESSAGE_OUTPUT,"",message));
        boolean result = streamBridge.send(CLUSTER_MESSAGE_OUTPUT, msg);
        System.out.println(Thread.currentThread().getName() + " 消息集群发送: " + msg.getPayload().getData());
    }
}

yml配置

server:
  port: 10004
spring:
  application:
    name: search-server
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
      bindings:
        cluster-out-0:
          destination: cluster

配置说明
1.配置name-server服务地址,必须要配置
2.cluster-out-0 :channel 通道名称 默认的一个规则吧 发送消息就是 -out- 这样子

  1. destination: cluster :topic为cluster

附上代码中用到的常量类

package com.ly.tuliy.commons.base.mq;

/**
 * 类说明: mq 常量类
 *
 * @author wqf
 * @date 2022/9/7 9:30
 */
public class MessageConstant {

    //生产者-集群消息主题
    public static String CLUSTER_MESSAGE_OUTPUT="cluster-out-0";
    //生产者-广播消息主题
    public static String BROADCAST_MESSAGE_OUTPUT="broadcast-out-0";
    //生产者-延时消息主题
    public static String DELAYED_MESSAGE_OUTPUT="delayed-out-0";


    //消费者-集群消息主题
    public static String CLUSTER_MESSAGE_INPUT="cluster-in-0";
    //消费者-广播消息主题
    public static String BROADCAST_MESSAGE_INPUT="broadcast-in-0";
    //消费者-延时消息主题
    public static String DELAYED_MESSAGE_INPUT="delayed-in-0";

}

import java.io.Serializable;
import java.util.Map;

/**
 * @Author: wqf
 * @Date: 2022/09/09
 * @Description: mq 发送消息的内容体基础内容
 */
@ToString
public class BaseMessage<T> implements Serializable {
    /**
     * 消息主题
     */
    private String topic;
    /**
     * 消息标签
     */
    private String tag;
    /**
     * 消息内容
     */
    private T data;
    /**
     *
     */
    private Map<String, Object> header;


    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public Map<String, Object> getHeader() {
        return header;
    }

    public void setHeader(Map<String, Object> header) {
        this.header = header;
    }

    public BaseMessage(String topic, String tag, T data, Map<String, Object> header) {
        this.topic = topic;
        this.tag = tag;
        this.data = data;
        this.header = header;
    }

    public BaseMessage(String topic, String tag, T data) {
        this.topic = topic;
        this.tag = tag;
        this.data = data;
    }

    public BaseMessage(String topic,  T data) {
        this.topic = topic;
        this.data = data;
    }

    public BaseMessage() {
    }
}

新建模块B用于消息消费
创建一个类接收消息

/**
 * @Author: wqf
 * @Date: 2022/09/09
 * @Description:
 */
@RestController
public class RocketMqReceiveMsgController {

    @Autowired
    private StreamBridge streamBridge;

    /**
     * 函数式编辑接收消息
     */
    @Bean
    public Consumer<String> cluster() {
        return message -> {
            System.out.println("接收的集群消息为:" + message);
        };
    }

yml配置

server:
  port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消费者端配置
        definition: cluster
      rocketmq:
        binder:
          name-server: localhost:9876
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group

配置说明:
1.definition: cluster 消费者端配置,这里配置的cluster 必须和我们接收消息类中的方法名称一致


image.png

2.cluster-in-0:也是默认的规则 -in- 标识接收消息
3.group:消费组名称配置 ,这个一定要配,名称命名没有要求

测试:
用postman在生产者端(A)发送消息,消费端(B)能正常接收到消息。将消费端B多启动几个端口,创建多消费者环境,此时我们发送消息可以观测到消息将随即被几个消费者消费,一个消息只会被消费一次

出现的问题: 消息接收不到或者是报错,请先检查下主题是否创建(rocketmq 控制台看看),或者启动broker时修改配置为自动创建主题。

2.广播消息发送

生产者(A)controller添加测试接口

    @PostMapping(value = "/broadcast")
    public void sendBroadcastMsg(@RequestParam("message") String message) {
        Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(BROADCAST_MESSAGE_OUTPUT,"",message));
        boolean result = streamBridge.send(BROADCAST_MESSAGE_OUTPUT, msg);
        System.out.println(Thread.currentThread().getName() + " 消息广播发送: " + msg.getPayload().getData());
    }

消费者端(B)添加以下配置

    /**
     * 函数式编辑接收消息
     */
    @Bean
    public Consumer<String> broadcast() {
        return message -> {
            System.out.println("接收的广播消息为:" + message);
        };
    }
server:
  port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消费者端配置
        definition: cluster;broadcast
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          broadcast-in-0:
            consumer:
              #配置是否开启广播消息 默认为false
              broadcasting: true
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group
        broadcast-in-0:
          destination: broadcast
          group: broadcast-group

配置说明:
1.consumer.broadcasting: true 该配置默认是false,true表示开启广播消费

测试:
启动多个消费者,发送消息时,每个消费者都能接收到每条生产者的消息

3.延时消息发送

生产者(A)controller添加测试接口

    @PostMapping(value = "/delayed")
    public void sendDelayedMsg(@RequestParam("message") String message) {
        String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

        for (int i = 0; i < 100; i++) {
            String key = "KEY" + i;
            Map<String, Object> headers = new HashMap<>();
            headers.put(MessageConst.PROPERTY_KEYS, key);
            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
            // 设置延时等级1~10
            headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 4);
            BaseMessage<String> baseMessage = new BaseMessage<>(MessageConstant.DELAYED_MESSAGE_OUTPUT, message);
            baseMessage.setHeader(headers);
            Message<BaseMessage<String>> msg = new GenericMessage<>(baseMessage, headers);
            streamBridge.send(MessageConstant.DELAYED_MESSAGE_OUTPUT, msg);
            System.out.println(Thread.currentThread().getName() + " 延时消息: " + msg.getPayload().getData());
        }
    }

参数说明:
messageDelayLevel :延时有18个等级(我试了前4个等级),每个等级延时时间如代码

yml添加配置

server:
  port: 10004
spring:
  application:
    name: search-server
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          delayed-out-0:
            producer:
              group: delayed-group
              sync: true
      bindings:
        cluster-out-0:
          destination: cluster
        broadcast-out-0:
          destination: broadcast
        delayed-out-0:
          destination: delayed

配置说明:
bindings.delayed-out-0.producer.sync=true 该项配置只在生产端配置,表示消息发送通道delayed-out-0开启消息异步发送,一定要有,不然延时消息没效果

消费者端(B)添加以下配置

    /**
     * 函数式编辑接收消息
     */
    @Bean
    public Consumer<String> delayed() {
        return message -> {
            System.out.println("接收的延时消息为:" + message);
        };
    }
server:
  port: 10005 #${random.int[10000,19999]} # 随机端口,方便启动多个消费者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消费者端配置
        definition: cluster;broadcast;delayed
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          broadcast-in-0:
            consumer:
              #配置是否开启广播消息 默认为false
              broadcasting: true
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group
        broadcast-in-0:
          destination: broadcast
          group: broadcast-group
        delayed-in-0:
          destination: delayed
          group: delayed-group
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,841评论 5 472
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,415评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,904评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,051评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,055评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,255评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,729评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,377评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,517评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,420评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,467评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,144评论 3 317
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,735评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,812评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,029评论 1 256
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,528评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,126评论 2 341

推荐阅读更多精彩内容