什么是MQ?
MQ全称为Message Queue, 消息队列(MQ)是应用程序“对”应用程序的通信方法。
MQ:生产者者往消息队列中写消息,消费可以读取队列中的消息。
简单队列
1、生产者、队列、消费者
队列是RabbitMQ的内部对象,用于存储消息。生产者(下图中的P)生产消息并投递到队列中,消费者(下图中的C)可以从队列中获取消息并消费。
生产者:生产消息发送到消息队列中
package com.zhy.rabbit._01;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send{
//队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException
{
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送的消息
String message = "hello world!";
//往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭频道和连接
channel.close();
connection.close();
}
}
消费者:消费消息从消息队列中取消息
package com.zhy.rabbit._01;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Recv
{
//队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException{
//打开连接和创建频道,与发送端一样
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
事件监听方式
工作者(消费者)Worker.java
public class Customer {
private final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("localhost");
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
System.out.println("Customer Waiting Received messages");
//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行事件触发回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
Round-robin(轮询分发)
使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。
修改一下NewTask,使用for循环模拟多次发送消息的过程:
for (int i = 0; i < 5; i++) {
// 发送的消息
String message = "Hello World"+Strings.repeat(".", i);
// 往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
我们先启动1个生产者实例,2个工作者实例,看一下如何执行:
WorkQueue工作队列
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
工作队列(又名:任务队列)的主要任务是为了避免立即做一个资源密集型的却又必须等待完成的任务。相反的,我们进行任务调度:将任务封装为消息并发给队列。在后台运行的工作者(consumer)将其取出,然后最终执行。当你运行多个工作者(consumer),队列中的任务被工作进行共享执行。
准备
使用Thread.Sleep()方法来模拟耗时。采用小数点的数量来表示任务的复杂性。每一个点将住哪用1s的“工作”。例如,Hello... 处理完需要3s的时间。
发送端(生产者):NewTask.java
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送的消息
String message = "Hello World...";
// 往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 关闭频道和连接
channel.close();
connection.close();
}
}
工作者(消费者)Worker.java
public class Worker {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" [x] Done! at " +new Date().toLocaleString());
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
运行结果如下:
任务分发机制
正主来了。。。下面开始介绍各种任务分发机制。
Round-robin(轮询分发)
使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。
修改一下NewTask,使用for循环模拟多次发送消息的过程:
for (int i = 0; i < 5; i++) {
// 发送的消息
String message = "Hello World"+Strings.repeat(".", i);
// 往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
我们先启动1个生产者实例,2个工作者实例,看一下如何执行:
Fair dispatch(公平分发)
您可能已经注意到,任务分发仍然没有完全按照我们想要的那样。比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。
这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。
为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注:如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。这些内容会在下篇博文中讲述。
整体代码如下:生产者NewTask.java
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 指定一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 1;
//限制发给同一个消费者不得超过1条消息
channel.basicQos(prefetchCount);
for (int i = 0; i < 5; i++) {
// 发送的消息
String message = "Hello World"+Strings.repeat(".",5-i)+(5-i);
// 往队列中发出一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
消费者Worker.java
public class Worker {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);//保证一次只分发一个
// 创建队列消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" [x] Done! at " +new Date().toLocaleString());
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
能者多劳,消息分发;
3.消息应答
3.消息持久化
当已经存在的消息队列不能持久化,如果想持久化,需要删除当前队列,从新定义。
订阅模式
我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型
- 在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。
- 这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。
列:
类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)
那么咱们来看一下图,我们学过前两种有一些不一样,work 模式 是不是同一个队列 多个消费者,而 ps 这种模式呢,是一个队列对应一个消费者,发布/订阅模式还多了一个 X(交换机 转发器) ,这时候我们要获取消息 就需要队列绑定到交换机上,交换机把消息发送到队列 , 消费者才能获取队列的消息
- 发布/订阅模式解读:
1、1 个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
产者
后台注册 ->邮件->短信
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明exchange 交换机 转发器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //fanout 分裂
// 消息内容
String message = "Hello PB";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
那么先看一下控制台 是不是有这个交换机
但是这个发送的消息到哪了呢? 消息丢失了!!!因为交换机没有存储消息的能力,在 rabbitmq 中只有队列存储消息的能力.因为这时还没有队列,所以就会丢失;
小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!
那么我们再来写消费者
消费者 1
邮件发送系统
public class Recv {
private final static String QUEUE_NAME = "test_queue_fanout_email";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//------------下面逻辑和work模式一样-----
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消费者 2
类似短信发送系统
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_fanout_2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手动回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
测试
一个消息 可以被多个消费者获