请先阅读之前的内容:
- Spring Cloud 学习笔记 - No.1 服务注册发现
- Spring Cloud 学习笔记 - No.2 服务消费 Ribbon & Feign
- Spring Cloud 学习笔记 - No.3 分布式配置 Config
- Spring Cloud 学习笔记 - No.4 断路器 Hystrix
- Spring Cloud 学习笔记 - No.5 服务网关 Zuul
- Spring Cloud 学习笔记 - No.6 通过 Swagger2 构建 API 文档
Spring Cloud Stream
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架,为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及消息分区这三个核心概念。
简单的说,Spring Cloud Stream 本质上就是整合了 Spring Boot 和 Spring Integration,实现了一套轻量级的消息驱动的微服务框架。
通过使用 Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。目前为止 Spring Cloud Stream 只支持下面两个消息中间件的自动化配置:
构建一个 Spring Cloud Stream 消费者
我们利用之前创建的 eureka-consumer
项目。
首先在 pom.xml
中添加如下的依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
其中 spring-cloud-starter-stream-rabbit
是 Spring Cloud Stream 对 RabbitMQ 支持的封装,其中包含了对 RabbitMQ 的自动化配置等内容。
随后创建用于接收来自 RabbitMQ 消息的消费者 SinkReceiver
:
@EnableBinding(Sink.class)
public class SinkReceiver {
private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
logger.info("Received: " + payload);
}
}
-
@EnableBinding
注解用来指定一个或多个定义了@Input
或@Output
注解的接口,以此实现对消息通道(Channel)的绑定。- 绑定了
Sink
接口,该接口是 Spring Cloud Stream 中默认实现的对输入消息通道绑定的定义 - Spring Cloud Stream 还默认实现了绑定输出消息通道的
Source
接口 - 还有结合了
Sink
和Source
的Processor
接口
- 绑定了
-
@StreamListener
注解用来将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。
重启项目,从日志中可以看到声明了一个名为 input.anonymous.cWlqMyH9Tm--INXERE6nhQ
的队列,并通过 RabbitMessageChannelBinder
将自己绑定为它的消费者。
c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.cWlqMyH9Tm--INXERE6nhQ, bound to: input
这些信息我们也能在 RabbitMQ 的控制台中发现它们:
点击进去,通过 Publish Message 功能来发送一条消息到该队列中:
从下面的日志可以看出 SinkReceiver
读取了消息队列中的内容,由于我们没有对消息进行序列化,所以输出的只是该对象的引用:
[m--INXERE6nhQ-1] com.example.SinkReceiver : Received: [B@beb7ce8
在上面的操作中,我们并没有手动去配置 RabbitMQ 的信息,比如 IP,端口等等,这是基于 Spring Boot 的设计理念,提供了对 RabbitMQ 默认的自动化配置。当然,我们可以手动在 application.properties
文件中去配置,例如:
spring.cloud.stream.bindings.input.destination=my_destination
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
编写消费消息的单元测试用例
@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkReceiverTests.SinkSender.class})
public class SinkReceiverTests {
@Autowired
private SinkSender sinkSender;
@Test
public void sinkSenderTester() {
sinkSender.output().send(MessageBuilder.withPayload("Testing Message").build());
}
public interface SinkSender {
String OUTPUT = "input";
@Output(SinkSender.OUTPUT)
MessageChannel output();
}
}
在上面的单元测试中,我们通过 @Output(SinkSender.OUTPUT)
定义了一个输出通过,而该输出通道的名称为 input
,与前文中的 Sink
中定义的消费通道同名,所以这里的单元测试与前文的消费者程序组成了一对生产者与消费者。
运行该单元测试,日志可以看出 SinkReceiver
读取了消息队列中的内容:
[m--INXERE6nhQ-1] com.example.SinkReceiver : Received: [B@89040a9
Spring Cloud Stream 应用模型
图片引自:https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/
绑定器
Spring Cloud Stream 构建的应用程序与消息中间件之间是通过绑定器: Binder 相关联的,绑定器对于应用程序而言起到了隔离作用,它使得不同消息中间件的实现细节对应用程序来说是透明的。
当我们需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是更换它们对应的 Binder 绑定器而不需要修改任何Spring Boot的应用逻辑。
所以对于每一个 Spring Cloud Stream 的应用程序来说,它不需要知晓消息中间件的通信细节,它只需要知道 Binder 对应用程序提供的概念去实现即可,而这个概念就是消息通道:Channel。
发布-订阅模式
消息会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。
这里所提到的 Topic 主题是 Spring Cloud Stream 中的一个抽象概念,用来代表发布共享消息给消费者的地方。
在不同的消息中间件中,Topic 可能对应着不同的概念,比如:在 RabbitMQ 中的它对应了 Exchange、而在 Kakfa 中则对应了 Kafka 中的 Topic。
在上面的例子中,应用启动的时候,在 RabbitMQ 的 Exchange 中也创建了一个名为 input
的 Exchange交换器。例如我们分别以 3001 和 3002 两个端口启动 eureka-consumer
项目。
可以看到 Queues 中有两个 Queue:
可以看到 Channels 中有两个 Channel:
可以看出 Exchanges 中只有一个名称为 input
的 Exchange,即 Topic 主题。但是点进去,可以看出这个名称为 input
的 Exchange 有绑定了两个消息队列:
如果我们通过 Exchange 页面的 Publish Message 来发布消息,可以发现两个启动的应用程序都输出了消息内容。
图片引自 http://blog.didispace.com/spring-cloud-starter-dalston-7-2/
相对于点对点队列实现的消息通信来说,Spring Cloud Stream 采用的发布-订阅模式可以有效的降低消息生产者与消费者之间的耦合,当我们需要对同一类消息增加一种处理方式时,只需要增加一个应用程序并将输入通道绑定到既有的 Topic 中就可以实现功能的扩展,而不需要改变原来已经实现的任何内容。
消费组
很多情况下,消息生产者发送消息给某个具体微服务时,只希望被消费一次,但是上面我们启动两个应用(3001 和 3002 两个端口),这个消息出现了被重复消费两次的情况。
为了解决这个问题,在 Spring Cloud Stream 中提供了消费组的概念。
如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过spring.cloud.stream.bindings.<channelName>.group
属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员真正的收到消息并进行处理。
例如,我们在 eureka-consumer
项目的配置中增加:
spring.cloud.stream.bindings.input.group=eureka-consumer-input-group
重启两个端口的实例,随后通过 Exchange 页面的 Publish Message 来发布消息,可以发现只有一个启动的应用程序都输出了消息内容。并且有时候是 3001 端口的实例处理,有时候是 3002 端口的实例处理。
也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的。
消息分区
在上面的实验中可以看到,消费组并无法控制消息具体被哪个实例消费。但是对于一些业务场景,就需要对于一些具有相同特征的消息每次都可以被同一个消费实例处理。比如:一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身内容聚合这些数据,那么消息生产者可以为消息增加一个固有的特征 ID 来进行分区,使得拥有这些 ID 的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。
而消息分区概念的引入就是为了解决这样的问题:当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。
例如,我们在 eureka-consumer
项目的配置中增加:
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
-
spring.cloud.stream.bindings.input.consumer.partitioned
:通过该参数开启消费者分区功能; -
spring.cloud.stream.instanceCount
:该参数指定了当前消费者的总实例数量; -
spring.cloud.stream.instanceIndex
:该参数设置当前实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount
- 1。
Spring Cloud Stream VS Spring Cloud Bus
我们在 Spring Cloud 学习笔记 - No.3 分布式配置 Config 中使用了 Spring Cloud Bus(结合了 RabbitMQ),那么 Stream 和 Bus 的区别是什么?
-
Spring Cloud Stream 构建消息驱动微服务
- building highly scalable event-driven microservices connected with shared messaging systems.
-
Spring Cloud Bus 广播(例如配置统一管理)和监控
- Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions.
RabbitMQ 负载均衡
在上面的例子中,我们始终只有一个 RabbitMQ 实例。在生产环境中,我们可能需要多个 RabbitMQ 实例来实现高并发和高可用。
参见:
引用:
程序猿DD Spring Cloud基础教程
Spring Cloud构建微服务架构:消息驱动的微服务(入门)【Dalston版】
Spring Cloud构建微服务架构:消息驱动的微服务(核心概念)【Dalston版】
Spring Cloud Dalston中文文档