什么叫消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
为何用消息队列
从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。
RabbitMQ 特点
(1)可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
(2)灵活的路由(Flexible Routing):在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
(3)消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
(4)高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
(5)多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
(6)多语言客户端(Many Clients):RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
(7)管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
(8)跟踪机制(Tracing):如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
(9)插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ 中的概念模型
消息模型
所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
RabbitMQ 基本概念
上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
(1)Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
(2)Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序
(3)Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
(4)Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表
(5)Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走
(6)Connection:网络连接,比如一个TCP连接
(7)Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
(8)Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
(9)Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是/ (每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。)
(10)Broker:表示消息队列服务器实体。
AMQP 中的消息路由
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
(1)direct
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange
Direct exchange的路由算法非常简单:通过binding key的完全匹配,可以通过下图来说明
exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。
(2)fanout
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
(3)topic
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
基本示例
发送端 producer
import pika
# 建立一个实例
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost',5672)# 默认端口5672,可不写)
# 声明一个管道,在管道里发消息
channel = connection.channel()
# 在管道里声明queue
channel.queue_declare(queue='hello’)
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',# queue名字
body='Hello World!')# 消息内容
print(" [x] Sent 'Hello World!'")
connection.close(). # 队列关闭
接收端 consumer
import pika
import time
# 建立实例
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost’))
# 声明管道
channel = connection.channel()
# 为什么又声明了一个‘hello’队列?# 如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次。
channel.queue_declare(queue='hello’)
def callback(ch, method, properties, body):
# 四个参数为标准格式
print(ch, method, properties)# 打印看一下是什么# 管道内存对象 内容相关信息 后面讲
print(" [x] Received %r"% body)
time.sleep(15)
ch.basic_ack(delivery_tag = method.delivery_tag)# 告诉生成者,消息处理完成
channel.basic_consume(# 消费消息
callback,# 如果收到消息,就调用callback函数来处理消息
queue='hello',# 你要从那个队列里收消息#
#no_ack=True # 写的话,如果接收消息,机器宕机消息就丢了# 一般不写。宕机则生产者检测到发给其他消费者)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()# 开始消费消息
RabbitMQ 消息分发轮询
(1)上面的只是一个生产者、一个消费者,能不能一个生产者多个消费者呢?可以上面的例子,多启动几个消费者consumer,看一下消息的接收情况。采用轮询机制;把消息依次分发
(2)假如消费者处理消息需要15秒,如果当机了,那这个消息处理明显还没处理完,怎么处理?(可以模拟消费端断了,分别注释和不注释 no_ack=True 看一下)你没给我回复确认,就代表消息没处理完。
(3)上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢?因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。
(4)上面的模式只是依次分发,实际情况是机器配置不一样。怎么设置类似权重的操作?RabbitMQ怎么办呢,RabbitMQ做了简单的处理就能实现公平的分发。就是RabbitMQ给消费者发消息的时候检测下消费者里的消息数量,如果超过指定值(比如1条),就不给你发了。
只需要在消费者端,channel.basic_consume前加上就可以了。
channel.basic_qos(prefetch_count=1)# 类似权重,按能力分发,如果有一个消息,就不在给你发 channel.basic_consume(# 消费消息
RabbitMQ 消息持久化(durable、properties)
rabbitmqctl list_queues# 查看当前queue数量及queue里消息数量
消息持久化
如果队列里还有消息,RabbitMQ 服务端宕机了呢?消息还在不在?把RabbitMQ服务重启,看一下消息在不在。上面的情况下,宕机了,消息就久了,下面看看如何把消息持久化。
每次声明队列的时候,都加上durable,注意每个队列都得写,客户端、服务端声明的时候都得写。
# 在管道里声明
queuechannel.queue_declare(queue='hello2', durable=True)
测试结果发现,只是把队列持久化了,但是队列里的消息没了。durable的作用只是把队列持久化。离消息持久话还差一步:发送端发送消息时,加上properties
properties=pika.BasicProperties(
delivery_mode=2,# 消息持久化)
发送端 producer
importpika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost',5672))# 默认端口5672,可不写
channel = connection.channel()
#声明queue
channel.queue_declare(queue='hello2', durable=True)# 若声明过,则换一个名字
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello2',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2,# make message persistent)
)
print(" [x] Sent 'Hello World!'")
connection.close()
接收端 consumer
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello2', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r"% body)
time.sleep(10)
ch.basic_ack(delivery_tag = method.delivery_tag)# 告诉生产者,消息处理完成
channel.basic_qos(prefetch_count=1)# 类似权重,按能力分发,如果有一个消息,就不在给你发channel.basic_consume(# 消费消息
callback,# 如果收到消息,就调用callback
queue='hello2’,
# no_ack=True # 一般不写,处理完接收处理结果。宕机则发给其他消费者)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
RabbitMQ消息队列(三):任务分发机制
当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程来完成。接下来我们分布讲解。
默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin。这种分发还有问题,接着向下读吧。
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。
如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了。为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。(在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了,如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失)
Message durability消息持久化
为了保证在RabbitMQ退出或者crash了数据仍没有丢失,需要将queue和Message都要持久化。queue的持久化需要在声明时指定durable=True:
channel.queue_declare(queue='hello', durable=True)
需要持久化Message,即在Publish的时候指定一个properties:
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode =2,# make message persistent
))
(RabbitMQ需要时间去把这些信息存到磁盘上,这个time window虽然短,但是它的确还是有。在这个时间窗口内如果数据没有保存,数据还会丢失。还有另一个原因就是RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。
因此这个持久化还是有问题。但是对于大多数应用来说,这已经足够了。当然为了保持一致性,你可以把每次的publish放到一个transaction中。这个transaction的实现需要user defined codes)
Fair dispatch 公平分发
默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?
通过basic.qos方法设置prefetch_count=1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。 设置方法如下:
channel.basic_qos(prefetch_count=1)
一般情况下,我们只在消息分发的时候会去声明channel.exchange_declare。作为好的习惯,在producer和consumer中分别声明一次以保证所要使用的exchange存在
channel.exchange_declare(exchange='logs',
type='fanout')