SpringBoot 2.0与RocketMQ客户端集成原理解读与示例(二)

原创:阿里巴巴 辽天RocketMQ官微12月10日


本文主要介绍rocketmq-spring-boot支持的高级用例,包括发送顺序消息,异步发送,消费过滤以及事务消息发送。该项目git地址:https://github.com/apache/rocketmq-spring

  文章主要内容包括以下几个方面:

  1 前言

  2 API和注解列表

  3 消息发送端

  4 消息消费端

  5 发送事务消息

  5.1 定义回查实现类

  5.2 RocketMQTemplate发送事务消息

  1 前言

  首先在这里向大家报告一个好消息,rocketmq-spring-boot项目经过6个多月的孵化(孵化项目repo: https://github.com/apache/rocketmq-externals),在今年12月初正式毕业。今后的维护和增强将在新的release仓库中进行,具体的地址是:https://github.com/apache/rocketmq-spring

  我们把原来单一的project模块按照Spring Boot的规范划分成了四个子模块:

  rocketmq-spring-boot-parent (父pom文件,定义相关的依赖管理和Plugin,供其它几个模块引用)

  rocketmq-spring-boot (定义auto-configuration实现,具体RocketMQ相关的自动配置和Bean创建代码都集中在这里)

  rocketmq-spring-starter (将rocketmq-spring-boot和其它的依赖打包生成全量的依赖,用户引用它即可完成所有rocketmq-spring的客户端操作)

  rocketmq-spring-samples (使用示例,展示如何使用spring-boot方式发送和消费消息)

  相对于孵化器版本,本次代码进行了较大的调整。目前已经支持spring-boot 2.0,推荐使用孵化器版本的用户尽快切换到新release的版本。请参考samples来体验spring-boot方式发送和消费消息的编码和使用方式:

  https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples。

  下面具体的介绍一下rocketmq-spring-boot的一些使用细节。

  2 API和注解列表

  编写代码时需要按消息发送者(Producer)和消息消费者(Consumer)分别进行代码编写,会使用到如下的API或注解:

  API

  类型

  发送端/消费端

  说明

  RocketMQTemplate

  API

  发送端

  发送端负责发送消息的API,它与@Resource注解一同使用,进行对象的声明和实例化。

  RocketMQTransactionListener

  注解

  发送端

  修饰事务回查Listener

  TransactionListener

  API

  消费端

  事务回查Listener接口,实现本地事务执行方法和本地事务回查方法

  RocketMQMessageListener

  注解

  消费端

  修饰消息消费Listener

  RocketMQListener<T>

  API

  消费端

  消息消费Listener接口,实现消费消息的处理逻辑的方法

  RocketMQPushConsumerLife

  cycleListener

  API

  消费端

  该接口用来实现Push方式消费时,消费者的信息配置,如消费时间和消费位点方式

  注:关于上述API或注解的使用方式,我们提供了如何使用Spring-Boot发送和消费RocketMQ消息的例子,可以直接参考sample的源码(https://github.com/apache/rocketmq-externals/tree/master/samples/rocketmq-spring-boot-starter-sample)。下面的文档是对示例的简单说明。

  3 消息发送端

  在使用RocketMQTemplate编写客户端时,需要执行如下的步骤:

  1. 定义Spring-Boot的 application.properties (注:如果全部使用默认的配置,可以不定义这个文件)

  ## application.properties

  spring.rocketmq.nameServer=127.0.0.1:9876

  #你可以根据自己的name-server信息进行修改

  spring.rocketmq.producer.group=my-group

  ## 其他的配置信息spring.rocketmq.producer.retryTimesWhenSendAsyncFailed=0

  spring.rocketmq.producer.sendMessageTimeout=300000

  spring.rocketmq.producer.compressMessageBodyOverHowmuch=4096

  spring.rocketmq.producer.maxMessageSize=4194304

  spring.rocketmq.producer.retryAnotherBrokerWhenNotStoreOk=false

  spring.rocketmq.producer.retryTimesWhenSendFailed=2

  2. 声明RocketMQTemplate,并根据发送方式的不同选择合适的方法进行消息发送。

  3. 如果是异步发送还需要在异步调用方法中,设置回调的Callback对象。

  import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;

  ...

  @SpringBootApplication

  public class ProducerApplication implements CommandLineRunner{

  @Resource

  private RocketMQTemplate rocketMQTemplate;

  public static void main(String[] args){

  SpringApplication.run(ProducerApplication.class, args);

  }

  public void run(String... args) throws Exception {

  // 以同步的方式发送消息,构造器构造对象消息给指定的topic

  sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());

  System.out.printf("string-topic syncSend2 sendResult=%s %n", sendResult); // 异步方式发送用户定义对象类型的消息,并实现回调接口SendCallback

  rocketMQTemplate.asyncSend(orderPaidTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {

  // 实现消息发送成功的后续处理

  public void onSuccess(SendResult var1) {

  System.out.printf("async onSucess SendResult=%s %n", var1);

  }

  // 实现消息发送失败的后续处理

  public void onException(Throwable var1) {

  System.out.printf("async onException Throwable=%s %n", var1);

  }

  });

  // 指定topic的同时,设置tag值,以便消费端可以根据tag值进行选择性消费

  rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0");

  // tag0 will not be consumer-selected

  rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");

  }

  @Data

  @AllArgsConstructor

  public class OrderPaidEvent implements Serializable{

  private String orderId;

  private BigDecimal paidMoney;

  }

  }

  4 消息消费端

  在消息消费端,只需要根据发送消息的类型实现RocketMQListener<T>并将它声明成Spring @Service和@RocketMQMessageListener,同时在相应的onMessage()方法里对拉取到的消息做处理。在@RocketMQMessageListener注解中可以定义如下具体的消费属性:

  @Target({ElementType.TYPE})

  @Retention(RetentionPolicy.RUNTIME)

  @Documentedpublic @interface RocketMQMessageListener {

  String consumerGroup();// 指定consumerGroup

  String topic();// 指定消费的topic

  SelectorType selectorType() default SelectorType.TAG; // 指定消费过滤方式: TAG, SQL92

  String selectorExpress() default "*"; // 根据过滤方式,定义选择表达式

  ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; // 消费方式:并发,顺序

  MessageModel messageModel() default MessageModel.CLUSTERING; // 消费模式: 集群, 广播

  int consumeThreadMax() default 64; //消费的并发线程数

  }

  以下消费端代码,根据指定tag过滤消费信息并声明消费起点的Push方式消费

  import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

  import org.apache.rocketmq.common.UtilAll;

  import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

  import org.apache.rocketmq.common.message.MessageExt;

  import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;

  import org.apache.rocketmq.spring.starter.core.RocketMQListener;

  import org.apache.rocketmq.spring.starter.core.RocketMQPushConsumerLifecycleListener;

  import org.springframework.stereotype.Service;

  // 消费监听标签里定义了消费相关的属性,包括:主题,选择表达式,消费组

  @Service

  @RocketMQMessageListener(topic = "message-ext-topic", selectorExpress = "tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")

  public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {

  @Override

  // 实现消息的消费处理

  public void onMessage(MessageExt message) {

  System.out.printf("------- MessageExtConsumer received message, msgId:%s, body:%s %n ", message.getMsgId(), new String(message.getBody()));

  }

  // 设置从当前时间点开始消费消息

  public void prepareStart(DefaultMQPushConsumer consumer) {

  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));

  }

  }

  5 发送消息事务

  对于事务消息与普通消息发送只在消息发布端区别,在消费端编写代码是没有区别的,所以这里只介绍消息发送端。发送事务消息需要在发送端做如下的编程处理:

  5.1 定义回查实现类

  实现RocketMQLocalTransactionListener接口,并将它使用注解@RocketMQTransactionListener来声明:

  @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)

  public class TransactionListenerImpl implements RocketMQLocalTransactionListener{

  // 实现执行本地事务的逻辑,并返回本地事务执行状态

  public LocalTransactionState executeLocalTransaction(Message msg, Object arg){

  // 实现执行本地事务的逻辑

  ...

  // 可以根据具体的本地事务的执行情况返回 RocketMQLocalTransactionState.COMMIT, ROLLBACK 或UNKNOWN 三种状态

  return ...

  }

  // 实现本地事务回查的逻辑,并返回本地事务执行状态

  public LocalTransactionState checkLocalTransaction(MessageExt msg) {

  // 实现本地事务回查的逻辑

  ...

  // 可以根据具体的本地事务的执行情况返回 RocketMQLocalTransactionState.COMMIT, ROLLBACK 或UNKNOWN 三种状态

  return ...

  }

  }

  @RocketMQTransactionListener注解用来定义TransactionListener的可配置属性:

  @Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})

  @Documented@Component

  public @interface RocketMQTransactionListener {

  /**

  * 定义事务消息发送者组名

  * Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a

  * transactional message with the declared txProducerGroup.

  * <p>

  * <p>It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class.

  */

  String txProducerGroup() defaultRocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME;

  // 以下定义事务回查ExecutorService的属性信息

  /**

  * Set ExecutorService params -- corePoolSize

  */

  int corePoolSize() default 1;

  /**

  * Set ExecutorService params -- maximumPoolSize

  */

  int maximumPoolSize() default 1;

  /**

  * Set ExecutorService params -- keepAliveTime

  */

  long keepAliveTime() default 1000 * 60; //60ms

  /**

  * Set ExecutorService params -- blockingQueueSize

  */

  int blockingQueueSize() default 2000;}

5.2 在RocketMQTemplate中使用特定的方法来发送事务消息

  // 构造Spring Message请求消息

  Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).

  setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build();

  // 指定在@RocketMQTransactionListener中声明的txProducerGroup, 使用这个事务发布者组来发送事务消息和回查状态

  SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME, msg, null);

  如果社区的朋友有兴趣,欢迎大家提建议或者PR来改进和加强spring-boot代码。在源代码库的README(https://github.com/apache/rocketmq-spring/blob/master/README_zh_CN.md)部分,我们整理了一部分的FAQ的问题,如果有更多的疑问请给我们留言或Email,我们会整理到网站,来让大家更方便的使用。

  大家有疑问也可以留言,会为大家一一作答。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容