Spring Cloud Stream?
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
目前仅支持RabbitMQ、Kafka。
pom.xml
rabbitMQ:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
kafka:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
发送和接收
public interface StreamClient {
String INPUT = "myMessage";
String INPUT2 = "myMessage2";
@Input(StreamClient.INPUT)
SubscribableChannel input();
@Output(StreamClient.INPUT)
MessageChannel output();
}
发送
streamClient.output().send(MessageBuilder.withPayload("now "+new Date()).build());
接收
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {
@StreamListener(StreamClient.INPUT)
//消费成功发送mq消息到StreamClient.INPUT2
@SendTo(StreamClient.INPUT2)
public String process(Object message){
log.info("StreamReceiver:{}", message);
return "receiver.";
}
@StreamListener(StreamClient.INPUT2)
public void process2(Object message){
log.info("StreamReceiver2:{}", message);
}
}
多实例的时候只让一个实例消费消息
spring:
cloud:
stream:
bindings:
myMessage:
group: order
content-type: application/json