消息队列之RabbitMQ《二》

上一篇,我们实现了一个简单的消息队列:
1、生产者发送消息到rabbitMQ Server,rabbitMQ Server将我们把消息队列保存,然后消费者从队列依次取出。
2、我们通过消息确认机制保证消息已被消费者处理。

但是异常也有可能发生在rabbitMQ Server端。如果server异常挂掉,队列里面还有消息未处理完,那我们再次重启server的时候,消息队列里的消息就全没了,显然,这是不能接受的。另外,上一篇,我们是直接指定将消息发送到某个队列,这样我们首先就要创建队列,并给队列命名。producer和consumer要统一队列名。如果每一个新的连接都维持一个新的队列,指定队列名就不太好使了。

简单的消息队列,一个消息只能被一个消费者获取并完成处理。如果一个消息要发送给多个消费者呢。接下来,我们看一种新的模式,即Publish/Subscribe 发布订阅模式。

pub/sub .png

  • 发布订阅者模式下,生产者将消息发送到exchange,exchange,根据路由routing_key发送到相应的队列(fanout类型除外,它会广播到可接收的所有队列)
  • 消费者可以自己声明一个队列,并将队列与exchange进行绑定,绑定时可指定routing_key。
  • 这样,通过exchange和routing_key我们就实现了自由地获取自己想要的信息。不再受queue的约束。一个消息可以发送到多个queue,消费者也可以自定义queue来获取消息
  • Exchange的类别分为4种:direct、topic、header、fanout

exchange_type:fanout
在调用exchange_declare声明exchange时,将exchange_type设置为fanout,我们就得到了一个fanout的exchange。它会将消息发到所有绑定到该exchange上的队列,无需routing_key.

生产者emit_log.py

import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

#声明一个exchange,并指定类型为fanout
#fanout为广播类型,即会发到所有的队列
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ''.join(sys.argv[1:]) or 'info:Hello World'

#指定exchange,fanout无需指定routing_key
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

print('[X] Sent %r' % message)

connection.close()

receive_log.py

import pika

connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

#声明一个exchange
channel.exchange_declare(exchange='logs',exchange_type='fanout')

#我们可以用queue=''声明一个临时的队列,名称由server生成或者server从已有队列中选一个给我们使用
#将exclusive设置为True,只允许当前连接可访问。因为是一个临时队列,在当前连接断开后,该队列就会被删除掉
result = channel.queue_declare(queue='',exclusive=True)
#获取队列的名称
queue_name = result.method.queue
#将交换机与队列相绑定
channel.queue_bind(exchange='logs',queue=queue_name)

#回调
def callback(ch,method,properties,body):
    print("[x] %r" % body)

print('[*] Waiting for logs.To exit press CTRL+C')
channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True)
#启动消费线程
channel.start_consuming()

exchange_type:direct
上面的fanout的类型是一种广播形式。但是在一些情况下,我们希望exchange的消息是分类发到不同的队列中的,比如有一个名为device的exchange,我们希望灯light、开关switches等不同的设备的消息会发到不同的的队列中。此时我们就需要用到routing_key。

type=direct.png

把上面的代码稍微改一下:
发送部分emit_log_direct.py:

#声明一个名为device的exchange,类型是direct
channel.exchange_declare(exchange='device',
                         exchange_type='direct')

#获取命令行的第2个参数为routing_key
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ''.join(sys.argv[2:]) or "Hello World!"
#发送的时候指定exchange,routing_key.
channel.basic_publish(
    exchange='direct_logs',routing_key=severity,body=message
)

接收部分receive_logs_direct.py:

severities = sys.argv[1:]

if not severities:
    sys.stderr.write('Usage:%s[info] [warning] [error]\n' %sys.argv[0])
    sys.exit(1)
    
for severity in severities:
    channel.queue_bind(
        exchange='direct_logs',queue=queue_name,routing_key=severity
    )

终端执行,我们启动接收的worker,只接收light的消息

(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python receive_logs_direct.py light
[*] Waiting for logs.To exit press CTRL+C
[X] 'light':b'Hello World!'

分别设置routing_key为light和switch

(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python emit_log_direct.py light
[X] Sent 'light':'Hello World!'
(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python emit_log_direct.py switch
[X] Sent 'switch':'Hello World!'

上面的结果可以看到,消费者之收到了light的消息,并没有收到switch的消息。switch的消息,exchange根据路由无法找到队列,就被丢弃了。
exchange_type:topic
direct的路由都是完全匹配的,有些情况下,一些消息可能有一个大的主题,然后只要是这个主题下,都要。比如有一个news的exchange,不管wechat.news,还是weibo.news,我这边都需要。
把direct的代码稍改一下:

channel.exchange_declare(exchange='news',exchange_type='topic')

然后运行:
python receive_logs_topic.py "*.news"
*匹配任意字符串

(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python emit_log_topic.py wechat.news wechat
[x] sent 'wechat.news':'wechat'
(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python emit_log_topic.py weibo.news weibo  
[x] sent 'weibo.news':'weibo'
(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % 

receive:

(mq) huanghuan@huanghuandeMacBook-Pro rabbitMQ % python receive_logs_topic.py "*.news"
[*] Waiting for logs.To exit press CTRL+C
[x] 'wechat.news':b'wechat'
[x] 'weibo.news':b'weibo'

这样,不论是weixin的news还是weibo的news,全都获取。

durable
上面我们只是介绍了另一种队列的使用方式,但是重启server,消息丢失的问题还未解决。
怎样保证rabbitMQ Server重启之后,消息不被丢失呢?很简单,只需两步设置:

  • 声明队列的时候,设置durable参数为True,表示在server重启中要“活下来”,不要被删除掉。
channel.queue_declare(queue='task_queue',durable=True)
  • 调用basic_publish发送消息时,设置属性delivery_mode为pika.spec.PERSISTENT_DELIVERY_MODE
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                            delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE,#make message persistent)
)

按照上面两步设置,即使rabbitMQ Server因意外重启,未处理的消息也不会丢失,在Server重启后又可以继续处理。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容