redis06消息队列

任务队列

在web开发的场景中撰写常规的代码,一般是在一个视图层(view)将所有业务逻辑完成之后直接返回数据(可以是一个渲染后的html也可以是一组json数据)给用户的浏览器;业务比较简单且单一的逻辑处理响应时间通常都比较短,但涉及到发邮件、数据爬取、密集计算、复杂业务这种情况耗时较长的操作,如果不采取任何措施的话,那么在用户这边最直观的感受就是提交了请求后页面就僵在那里了(如果时间大于10秒钟,可能就会认为出问题了)。

redis的任务队列(列表)就是这种耗时等待比较长场景的解决办法之一,视图层是一个负责塞消息的对象(Producer角色),然后在单独写一个独立的进程来读取这个列表(Consumer)去完成后续的工作;那么视图层就可以直接返回一个页面给用户,告诉用户这个事情的状态是正在处理中,当处理完成之后将状态改成已完成,并提供一个链接让用户可以去查看任务明细结果。

将一个耗时操作拆开要从好几个方面进行考虑,用户的等待体验、将相同功能相同作用的代码抽象出来解耦、简化逻辑层的代码复杂度。

 

非堵塞读取(rpop)

queue_rpop/producer.py

import redis

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)

# 清空所有键
for number, key in enumerate(r.keys()):
    r.delete(key)

# 准备数据
r.rpush('queue:send_mail', *['3330356463@qq.com', 'zhengtong0898@aliyun.com'])

queue_rpop/consumer1.py

import redis
from datetime import datetime as dt

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)


while True:
    email = r.rpop('queue:send_mail')
    print('[{} {} INFO]: {}'.format(
        dt.now(),
        'queue:send_mail',
        email
    ))

queue_rpop/consumer2.py

import redis
import time
from datetime import datetime as dt

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)


def send_mail(email):
    print('[{} {} INFO]: send mail to {}'.format(
        dt.now(),
        'queue:send_mail',
        email
    ))

if __name__ == '__main__':

    while True:
        mail = r.rpop('queue:send_mail')
        if not mail:

            print('[{} {} INFO]: {}'.format(
                dt.now(),
                'queue:send_mail',
                mail
            ))

            time.sleep(1)
            continue
        send_mail(mail)

运行

# 运行consumer1.py
python consumer1.py
# 显示结果,这种方式对CPU消耗非常大,整体的IO消耗也很高。
[2017-04-26 18:59:31.822200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.822700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.822700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.824200 queue:send_mail INFO]: None
...
...
...


# 运行consumer2.py (为了降低没有必要的系统消耗)
python consumer2.py

# 再开一个窗口运行 producer.py
python producer.py

# 显示结果
[2017-04-26 19:15:29.695200 queue:send_mail INFO]: None
[2017-04-26 19:15:30.695700 queue:send_mail INFO]: None
[2017-04-26 19:15:31.695700 queue:send_mail INFO]: None
[2017-04-26 19:15:32.696700 queue:send_mail INFO]: None
[2017-04-26 19:15:33.697200 queue:send_mail INFO]: send mail to 3330356463@qq.com
[2017-04-26 19:15:33.697700 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
[2017-04-26 19:15:33.698200 queue:send_mail INFO]: None
[2017-04-26 19:15:34.698700 queue:send_mail INFO]: None
[2017-04-26 19:15:35.699200 queue:send_mail INFO]: None

 

堵塞读取(brpop)

redis内部并没有采用非堵塞的socket模型,因此初步判断它是一个堵塞请求,它的等待机制是由redis内部来负责响应的。堵塞让资源消耗更少,逻辑撰写更容易。

queue_brpop/consumer.py

import redis
from datetime import datetime as dt

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)


def send_mail(email):
    print('[{} {} INFO]: send mail to {}'.format(
        dt.now(),
        'queue:send_mail',
        email
    ))

if __name__ == '__main__':

    while True:
        queue_name, mail = r.brpop('queue:send_mail')
        send_mail(mail)

运行

# 运行queue_brpop/consumer.py
python queue_brpop/consumer.py

# 运行queue_rpop/producer.py

# 显示结果(没有多余的输出,挺好!)
[2017-04-26 19:31:23.947200 queue:send_mail INFO]: send mail to 3330356463@qq.com
[2017-04-26 19:31:23.947700 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
优先级(lpush、blpop)

最普遍和常见的新增列表列表元素是在当前列表后面追加元素,取值的话则是从左到右读取,这种行为在消息队列中被称为先进先出(FIFO);那也会有一些场景要求后进先出(LIFO),这就需要利用lpush(从左边开始插入元素)和blpop(从左边开始读取)。

quque_priority/producer.py

import redis

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)

# 清空所有键
for number, key in enumerate(r.keys()):
    r.delete(key)

# 准备数据
r.lpush('queue:send_mail', '3330356463@qq.com')
r.lpush('queue:send_mail', 'zhengtong0898@aliyun.com')
r.lpush('queue:send_mail', 'zhengtong0898@a.com')
r.lpush('queue:send_mail', 'zhengtong0898@b.com')
r.lpush('queue:send_mail', 'zhengtong0898@c.com')
r.lpush('queue:send_mail', 'zhengtong0898@d.com')
r.lpush('queue:send_mail', 'zhengtong0898@e.com')

queue_priority/consumer.py

import redis
from datetime import datetime as dt

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)


def send_mail(email):
    print('[{} {} INFO]: send mail to {}'.format(
        dt.now(),
        'queue:send_mail',
        email
    ))

if __name__ == '__main__':

    while True:
        queue_name, mail = r.blpop('queue:send_mail')
        send_mail(mail)

运行

# 运行producer.py,先将数据塞到redis列表中(每次都是从左边插入)
python queue_priority/producer.py

# 运行consumer.py,消费数据(每次都是从左边读取)
python queue_priority/consumer.py

# 显示结果,仔细看就可以看得出来,确实是后进先出(LIFO)
[2017-04-26 20:29:18.735700 queue:send_mail INFO]: send mail to zhengtong0898@e.com
[2017-04-26 20:29:18.735700 queue:send_mail INFO]: send mail to zhengtong0898@d.com
[2017-04-26 20:29:18.736200 queue:send_mail INFO]: send mail to zhengtong0898@c.com
[2017-04-26 20:29:18.736700 queue:send_mail INFO]: send mail to zhengtong0898@b.com
[2017-04-26 20:29:18.736700 queue:send_mail INFO]: send mail to zhengtong0898@a.com
[2017-04-26 20:29:18.737200 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
[2017-04-26 20:29:18.737700 queue:send_mail INFO]: send mail to 3330356463@qq.com

 
 

发布/订阅

消息队列是将消息放在列表中,由一个或多个consumer去消费列表中不同的元素;而发布/订阅是将消息放在一个频道中,有一个或多个consumer去消费频道中相同的元素;若该频道中没有任何consumer,这次producer生产了一条消息放入到频道中,这条消息将会自动消失,因此发布/订阅模式要求consumer必须时刻运行,否则producer发送的任何消息都是无效的。

publish_subscribe/producer.py

import redis
from datetime import datetime as dt

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)

r.publish('channel1.1', 'hi')
r.publish('channel1.1', 'my')
r.publish('channel1.1', 'name')
r.publish('channel1.1', 'is')
r.publish('channel1.1', 'publisher')


publish_subscribe/consumer.py

import redis
import time
from datetime import datetime as dt

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
p = r.pubsub()
p.subscribe('channel1.1')    # 支持同时订阅多个频道: p.subscribe('channel1.1', 'channel1.2')

while True:
    msg = p.get_message(ignore_subscribe_messages=True)
    if not msg:
        time.sleep(1)
        print('sleep 1')
        continue
    print('debug: ', msg)


运行

# 打开两个窗口运行两次 consumer.py, 都订阅'channel1.1'频道.
python publish_subscribe/consumer.py # 窗口一
python publish_subscribe/consumer.py # 窗口二

# 运行producer.py, 将消息塞到'channel1.1'频道.
python publish_subscribe/producer.py


# 显示结果(两边都同时存在)
[2017-04-27 13:01:17.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:18.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'hi', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'my', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'name', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'is', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'publisher', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:20.713200 publish/subscribe INFO]: None
[2017-04-27 13:01:21.714200 publish/subscribe INFO]: None

主题订阅

除了订阅多个频道之外,redis也支持consumer通过glob通配符(psubscribe)来订阅不同主题的频道。

publish_psubscribe/producer.py

import redis
from datetime import datetime as dt

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)

r.publish('channel1.1', 'hi')
r.publish('channel1.2', 'my')
r.publish('channel1.3', 'name')
r.publish('channel1.4', 'is')
r.publish('channel1.5', 'publisher')

publish_psubscribe/consumer.py

import redis
import time
from datetime import datetime as dt

# 连接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
p = r.pubsub()
p.psubscribe('channel1.*')

while True:
    msg = p.get_message(ignore_subscribe_messages=True)
    if not msg:
        time.sleep(1)
        msg = None

    print('[{} {} INFO]: {}'.format(
        dt.now(),
        'publish/psubscribe',
        msg
    ))


运行

# 运行consumer.py
python publish_psubscribe/consumer.py

# 运行producer.py
publish_psubscribe/producer.py

# 显示结果
[2017-04-27 13:31:00.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:01.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'hi', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.1'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'my', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.2'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'name', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.3'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'is', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.4'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'publisher', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.5'}
[2017-04-27 13:31:03.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:04.683700 publish/psubscribe INFO]: None

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容