RabbitMQ的六种模式
简单模式Simple、工作模式Work、发布订阅模式Publish/Subscribe、路由模式Routing、通配符模式Topics、远程调用模式RPC
https://www.rabbitmq.com/getstarted.html
一 简单模式(Simple / HelloWorld 单生产单消费)
简单的发送与接收,没有特别的处理。
Pom.xml
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
RabbitMQ连接(公共的连接方法,其他模式共用此方法)
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
return connection;
}
}
生产者:
public class Simple {
private static final String QUEUENAME = "MQ38";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
//创建一个频道
Channel channel = connection.createChannel();
//声明一个消息队列
channel.queueDeclare(QUEUENAME, false, false, false, null);
//发送消息
for (int i = 0; i < 100; i++) {
String msg = "i love laohan";
channel.basicPublish("", QUEUENAME, null, msg.getBytes());
}
System.out.println("send message success!");
connection.close();//finally
}
}
消费者;
public class SimpleCustomer {
private static final String QUEUENAME = "MQ38";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
//创建一个频道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUENAME, false, false, false, null);
//准备开始接受消息
//声明了一个消息的接收者
Consumer defaultConsumer = new DefaultConsumer(channel){
//接受数据的
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者接受到数据 "+msg);
}
};
//接受消息
channel.basicConsume(QUEUENAME, true, defaultConsumer);
}
}
此代码建立1个生产者2个消费者的话,直接回发生轮询,1:1的概念。而且极有可能讲负载负载在某一台机器上,而另外的电脑处于闲置状态。所以必须使用ack和qos设置。
生产者和消费者已经没有关系了。
二 工作模式(Work queues单发送多接收)
一个生产者端,多个消费者端。示例中为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送消息确认。
此处代码基本跟简单模式一致,只是多了个消费者。让人能够提高速度。
一个消费者需要10秒才能完成。二个消费者5秒就能完成。
会存在如下几个问题
1:消息一次性拉取,消费者宕机,会造成数据丢失。
2:多个消费者不均衡,其中一个消费者闲的没事干。另外一个消费者累的要死。
如何解决上述的问题:
ACK消息确认:
//接受消息
//第二个参数,就是是否自动确认
//true 自动确认,接受到了,哪怕我还没有来得及处理,我就确认了
//老潘拿到8000个馍,还没有吃,他就确认了,我收到8000个馍了。
//false 手动确认
channel.basicConsume(QUEUENAME, false, defaultConsumer);
消费者也接收到这个消息了,但是你没有手工确认,所以消息全部在unacked里面。等待确认。如果超时或者宕机都无法确认消息。那么
消息会重新进入消息队列,等待其他消费者进行消费。
//接受消息
//第二个参数,就是是否自动确认
//true 自动确认,接受到了,哪怕我还没有来得及处理,我就确认了
//老潘拿到8000个馍,还没有吃,他就确认了,我收到8000个馍了。
//false 手动确认
channel.basicConsume(QUEUENAME, false, defaultConsumer);
//开始确认消息
//envelope.getDeliveryTag() 当前需要确认消息的下标
//true 当前下标前的消息全部确认
//false 仅仅只确认当前下标的消息
channel.basicAck(envelope.getDeliveryTag(), false);//确认消息了 开发中 确认一定在最后
这样就不会丢失数据了。
QOS限流:
消费者一次性拉了所有的消息。尽管没有Ack,消息会重新进入消息队列。
channel.basicQos(1);//消费者限流1。一次拉取一个消息,至到你消费完再拉取下一个消息
channel.basicConsume(QUEUENAME, false, defaultConsumer);
最终两个消费者几乎同时完成任务。能干的就多干点,不能干的就少干点。
durable
boolean durable = true;//代表持久化//消息就是持久化放入磁盘中的
channel.queueDeclare(QUEUENAME, durable, false, false, null);
x-max-length
Map map = new HashMap();
//秒杀 只有20个产品 100万个请求 没必要接受这么多的数据
map.put("x-max-length", 20);//队列的最大长度20,其他的消息你不处理就丢失,你处理可以进入死信
channel.queueDeclare(QUEUENAME, durable, false, false, map);
x-expire
Map map = new HashMap();
//秒杀 只有20个产品 100万个请求 没必要接受这么多的数据
map.put("x-max-length", 20);//队列的最大长度20,其他的消息你不处理就丢失,你处理可以进入死信
map.put("x-expires", 10000);// 过期时间
channel.queueDeclare(QUEUENAME, durable, false, false, map);
三 发布、订阅模式(Publish/Subscribe)
使用场景:发布、订阅模式,生产者端发送消息,多个消费者同时接收所有的消息
工作模式中,只有一个队列。消息队列只有一个,即便有多个消费者。那么消费者默认也是按照轮询的方式,从队列中取数据。
这两个消费者的功能是同一个行为,目的只是为了提高速度。
发布者订阅模式:Exchange(交换机)。无意识的行为。能够把消息广播到各种绑定到交换机的队列中去。这两个队列中保持了完整的相同的消息。分别交给不同的消费者去处理。
场景:
下订单:
1:订单数据入库
2:记录日志
3::发送短信,告诉用户下单成功
扇出
生产者:
public class Sender {
private static final String EXECHANGENAME = "EXCHANGE38";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明交换机
//交换类型是fanout扇出 无意识的广播
//此处根本就没有生命是那个队列来接受
channel.exchangeDeclare(EXECHANGENAME, "fanout");
for (int i = 0; i < 100; i++) {
String msg = "hello"+i;
channel.basicPublish(EXECHANGENAME, "", null, msg.getBytes());
}
System.out.println("send sucess!");
connection.close();
}
}
消费者
public class Customer1 {
private static final String EXECHANGENAME = "EXCHANGE38";
private static final String QUEUENAME = "QUEUE38";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明交换机
//交换类型是fanout扇出 无意识的广播
//此处根本就没有生命是那个队列来接受
channel.exchangeDeclare(EXECHANGENAME, "fanout");
//声明队列
channel.queueDeclare(QUEUENAME, false, false, false, null);
//把消息队列和交换机进行绑定
channel.queueBind(QUEUENAME,EXECHANGENAME, "");
//第三个参数只有在路由模式下才有效,第三个参数无效
//接受数据了
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者1插入订单 "+msg);
}
};
channel.basicConsume(QUEUENAME, true,defaultConsumer );
}
}
四:路由模式(Routing)
生产者按routing key发送消息,不同的消费者端按不同的routing key接收消息。
比发布订阅模式区别是,更多更灵活的接受队列中的数据,而不是接受全部数据。
场景:
Log4J
info debug warn error fatal
SLF4J
trace info debug warn error
off all
yml
log:
level:
root: error #所有的日志的基本为error
com:
example:
debug #这个包下的所有日志级别为debug
日志:
一般级别有 info debug warn error fatal
如果是普通的info debug 的日志信息,慢慢存储和处理就行了。甚至没人管。
昨天早上8点,服务器报了一个错。是一个内部的错。上线3个月,第一次,唯一一次报。跟你的代码没有关系。
项目经理任务:去解决这个错。只能看日志。
如果还是发布者订阅模式,意味着每个人都接受到了完整的日志。希望发生Error日志的时候,直接打电话。
能够根据路由键,来选择性的接受数据。
直接交换
发布者订阅模式向所有消费者广播所有消息。我们希望扩展它以允许根据消息的严重性过滤消息。例如,我们可能需要一个程序将日志消息写入磁盘以仅接收严重错误,而不是在警告或信息日志消息上浪费磁盘空间。
Fanout交换只能进行无意识的广播。
直接交换。直接交换背后的路由算法很简单 - 消息进入队列,其 绑定密钥与消息的路由密钥完全匹配。
此设置中,我们可以看到直接交换X与两个绑定到它的队列。第一个队列绑定橙色绑定,第二个绑定有两个绑定,一个绑定密钥为黑色,另一个绑定为绿色。
在这样的设置中,使用路由密钥orange发布到交换机的消息 将被路由到队列Q1。路由键为黑色 或绿色的消息将转到Q2。所有其他消息将被丢弃。
生产者
public class Sender {
private static final String EXECHANGENAME = "EXCHANGE38";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
// channel.queueDelete("QUEUE38");
// channel.queueDelete("QUEUE382");
// channel.exchangeDelete(EXECHANGENAME);
channel.exchangeDeclare(EXECHANGENAME, "direct");
String msg1 = "this is a info";
//发送这个消息的时候路由键 info
channel.basicPublish(EXECHANGENAME, "info", MessageProperties.TEXT_PLAIN, msg1.getBytes());
String msg2 = "this is a debug";
//发送这个消息的时候路由键 info
channel.basicPublish(EXECHANGENAME, "debug", MessageProperties.TEXT_PLAIN, msg2.getBytes());
String msg3 = "this is a fatal";
//发送这个消息的时候路由键 info
channel.basicPublish(EXECHANGENAME, "fatal", MessageProperties.TEXT_PLAIN, msg3.getBytes());
}
}
消费者1:
public class Customer1 {
private static final String EXECHANGENAME = "EXCHANGE38";
private static final String QUEUE1 = "QUEUE1";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXECHANGENAME, "direct");
//申请队列
channel.queueDeclare(QUEUE1, false, false, false, null);
//队列和交换机进行绑定 通过路由键进行绑定
channel.queueBind(QUEUE1, EXECHANGENAME, "info");
channel.queueBind(QUEUE1, EXECHANGENAME, "debug");
channel.basicConsume(QUEUE1, true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者1"+msg);
}
});
}
}
消费者2:
public class Customer2 {
private static final String EXECHANGENAME = "EXCHANGE38";
private static final String QUEUE1 = "QUEUE2";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXECHANGENAME, "direct");
//申请队列
channel.queueDeclare(QUEUE1, false, false, false, null);
//队列和交换机进行绑定 通过路由键进行绑定
channel.queueBind(QUEUE1, EXECHANGENAME, "fatal");
channel.basicConsume(QUEUE1, true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("消费者2"+msg);
}
});
}
}
五 通配符(或主题)模式(Topics ,按topic发送接收)
将虚拟广播的fanout交换,更换为带路由的 direct的交换方式,可以选择性的接受消息。但它仍然具有局限性,不能基于多个标准进行路由。
通过更为复杂的 topic 交换,可以检测消息的来源。
生产者端不只按固定的routing key发送消息,而是按字符串“匹配”发送,消费者端同样如此。
与之前的路由模式相比,它将信息的传输类型的key更加细化,以“key1.key2.keyN…”的模式来指定信息传输的key的大类型和大类型下面的小类型,让消费者端可以更加精细的确认自己想要获取的信息类型。而在消费者端,不用精确的指定具体到哪一个大类型下的小类型的key,而是可以使用类似正则表达式(但与正则表达式规则完全不同)的通配符在指定一定范围或符合某一个字符串匹配规则的key,来获取想要的信息。“通配符交换机”(Topic Exchange)将路由键和某模式进行匹配。此时队列需要绑定在一个模式上。符号“#”匹配一个或多个词,符号“”仅匹配一个词。
发送到topic交换的消息不能具有任意的 routing_key - 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由键示例:“ stock.usd.nyse ”,“ nyse.vmw”,“ quick.orange.rabbit ”。路由密钥中可以包含任意数量的单词,最多可达255个字节。
绑定密钥也必须采用相同的形式。topic交换背后的逻辑 类似于direct交换- 使用特定路由密钥发送的消息将被传递到与匹配的绑定密钥绑定的所有队列。但是绑定键有两个重要的特殊情况
· (星号)可以替代一个单词。
· #(hash)可以替换零个或多个单词
场景:
京东,211限时达。本地仓库出的货。早上买,下午到。
你现在下了一个订单
----》京东总部的数据库 接受任何数据
----》插入西安仓库的数据库 只接受西安的数据
----》插入上海仓库的数据库 只接受上海的数据
生产者
public class Sender {
private static final String EXECHANGENAME = "EXCHANGE38";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXECHANGENAME, "topic");
String msg1 = "this is a order xian";
channel.basicPublish(EXECHANGENAME, "order.xian", MessageProperties.TEXT_PLAIN, msg1.getBytes());
msg1 = "this is a order shanghai";
channel.basicPublish(EXECHANGENAME, "order.shanghai", MessageProperties.TEXT_PLAIN, msg1.getBytes());
msg1 = "this is a rollback xian";
channel.basicPublish(EXECHANGENAME, "rollback.xian", MessageProperties.TEXT_PLAIN, msg1.getBytes());
msg1 = "this is a rollback shanghai";
channel.basicPublish(EXECHANGENAME, "rollback.shanghai", MessageProperties.TEXT_PLAIN, msg1.getBytes());
}
}
消费者1:
public class Customer1 {
private static final String EXECHANGENAME = "EXCHANGE38";
private static final String QUEUE1 = "QUEUE1";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXECHANGENAME, "topic");
//申请队列
channel.queueDeclare(QUEUE1, false, false, false, null);
//队列和交换机进行绑定 通过路由键进行绑定
channel.queueBind(QUEUE1, EXECHANGENAME, "*.xian");
channel.basicConsume(QUEUE1, true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("西安仓库"+msg);
}
});
}
}
消费者2:
public class Customer3 {
private static final String EXECHANGENAME = "EXCHANGE38";
private static final String QUEUE1 = "QUEUE3";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXECHANGENAME, "topic");
//申请队列
channel.queueDeclare(QUEUE1, false, false, false, null);
//队列和交换机进行绑定 通过路由键进行绑定
channel.queueBind(QUEUE1, EXECHANGENAME, "*.*");
channel.basicConsume(QUEUE1, true, new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("京东总部"+msg);
}
});
}
}
1、交换机exchange的type为topic
2、发送消息的routing key不是固定的单词,而是匹配字符串,如"order.#",匹配一个单词,#匹配0个或多个单词。因此如“order.#”能够匹配到“order.jd.buy”,但是“order.* ”只会匹配到“order.jd”
六:远程过程调用(RPC)
有关RPC的说明
尽管RPC在计算中是一种非常常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是远程的RPC时,会出现问题。这样的混淆导致系统不可预测,并增加了调试的不必要的复杂性。错误使用RPC可以导致不可维护的意大利面条代码,而不是简化软件。
考虑到这一点,请考虑以下建议:
· 确保明显哪个函数调用是本地的,哪个是远程的。
· 记录您的系统。使组件之间的依赖关系变得清晰。
· 处理错误案例。当RPC服务器长时间停机时,客户端应该如何反应?
如有疑问,请避免使用RPC。如果可以,您应该使用异步管道 - 而不是类似RPC的阻塞,将结果异步推送到下一个计算阶段。
我们的RPC将这样工作:
· 对于RPC请求,客户端发送带有两个属性的消息: replyTo(设置为仅为请求创建的匿名独占队列)和correlationId(设置为每个请求的唯一值)。
· 请求被发送到rpc_queue队列。
· RPC worker(aka:server)正在等待该队列上的请求。当出现请求时,它会执行该作业,并使用来自replyTo字段的队列将带有结果的消息发送回客户端。
· 客户端等待回复队列上的数据。出现消息时,它会检查correlationId属性。如果它与请求中的值匹配,则将响应返回给应用程序。
发出客户请求:
RPCClient fibonacciRpc = new RPCClient();
System.out.println(“[x] Requesting fib(30)”);
字符串响应= fibonacciRpc.call(“30”);
System.out.println(“[。] Got'” + response + “'”);
fibonacciRpc.close();
消息补偿 消息幂等
main方法没有默认的消息补偿
在消费者消费产生异常的时候,消息会重新进入消息队列,等待新的消费者。
Rabbitmq 默认情况下 如果消费者程序出现异常情况 会自动实现补偿机制 也就是 重试机制:
==========================
建立Boot测试环境:
1:yml
server:
port: 8000
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
2:pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3:@Configuration
public class RabbitMqConfig {
@Bean
public Queue getQueue() {
return new Queue("laohanqueue");
}
}
4:@Resource
private AmqpTemplate amqpTemplate;
@RequestMapping("/send")
public String send() {
String message = "message is "+new Date();
amqpTemplate.convertAndSend("laohanqueue",message);
return message;
}
5:@Component
@RabbitListener(queues="laohanqueue")
public class Reveice {
@RabbitHandler
public void receive(String msg) {
System.out.println("您有新的短消息"+msg);
int i = 9/0;
}
}
==========================
测试结果:
当生产者投递消息后:
消费者会不停的进行打印: 消息一直没有被消费
原因 Rabbitmq 默认情况下 如果消费者程序出现异常情况 会自动实现补偿机制 也就是 重试机制
@RabbitListener底层使用AOP进行拦截,如果程序没有抛出异常,自动提交事务。
如果Aop使用异常通知 拦截获取异常信息的话 ,
自动实现补偿机制,该消息会一直缓存在Rabbitmq服务器端进行重放,一直重试到不抛出异常为准。
可以修改重试策略
一般来说默认5s重试一次,
消费者配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
retry:
####开启消费者重试
enabled: true
####最大重试次数(默认无数次)
max-attempts: 5
####重试间隔次数
initial-interval: 3000ms
重试机制都是间隔性的 每次都是一个线程 单线程重试
效果: 重试5次 不行就放弃了。
3 设置死信队列
为了避免消息异常造成的死循环,也可以将requeue(上文配置参数)设置为false,在消息处理失败之后,不会重回队列。
设置死信队列的交换机可路由key。在消息处理失败的情况下会重回队列,我们可以监听死信队列来做异常处理。
boolean requeue = true;
channel.basicNack(deliveryTag, false, requeue);
=================================================================
消费者如果保证消息幂等性,不被重复消费
消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。
当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,
但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,
由此这条消息将会被 重复发送给其他消费者进行消费,
实际上这条消息已经被消费过了
解决办法:
消费者端实现幂等性,意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息。通常有两种方式来避免消费重复消费:
消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)
利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可。
代码如下:
1:pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2:application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
retry:
####开启消费者重试
enabled: true
####最大重试次数(默认无数次)
max-attempts: 5
####重试间隔次数
initial-interval: 3000ms
3:@Configuration
public class RabbitMqConfig {
@Bean
public Queue getQueue() {
return new Queue("laohanqueue");
}
}
4: @Resource
private AmqpTemplate amqpTemplate;
@RequestMapping("/send")
public String send() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
Message message = new Message("hello,world!".getBytes(), messageProperties);
amqpTemplate.convertAndSend("laohanqueue",message);
return "ok";
}
5:@Component
@RabbitListener(queues="laohanqueue")
public class Reveice {
@Resource
private RedisTemplate redisTemplate;
@RabbitHandler
public void receive(String value,Message message, Channel channel) throws Exception {
System.out.println("=====>"+value+" "+message);
String messageid = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8"); //消息内容获取之
System.out.println("您有新的短消息"+msg+" "+messageid);
if(messageid==null||redisTemplate.opsForValue().get(messageid)!=null){
System.out.println("已经消费过了,不要重复进行消费");
return;
}else{
System.out.println("处理完业务数据");
redisTemplate.opsForValue().set(messageid, "ok");
}
int i = 9/0;
}
}
则即便发生网络故障,即便强制报错,则改消息也只会被处理1次,而且消息补偿也只有一次,不会造成
5次消息补偿。