Python多线程(四):生产者消费者问题

上一篇:

生产者消费者问题是多线程中一个很经典并发协作的问题,这个问题主要包含两类线程,一个是生产者用于生产数据,另一个是消费者用于消费数据,两者操作同一个数据共享区域,这种模型在编程中非常常见,比如爬虫,生产者负责爬取链接,消费者负责解析链接所指向的网页内容。这种模型需要满足下面的两个特征:

  • 消费者在数据共享区域为空时阻塞,直到共享区域出现新数据。
  • 生产者在数据共享区域满时阻塞,直到数据共享区出现空位。

下面是一个简单的例子:

import threading
import time
import random
MAX_BUFF_LEN = 5

buff = []
lock = threading.Lock()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            lock.acquire()
            if len(buff) < MAX_BUFF_LEN:
                # 如果共享区域未满,生产数据
                num = random.uniform(0, 5)
                buff.append(num)
                print('生产者向共享区域加入%f' % num)
                lock.release()
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            lock.acquire()
            if buff:
                # 如果共享区非空,消费数据
                num = buff.pop(0)
                print('消费者消费掉%f' %num)
                lock.release()
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序强制结束!')

程序运行结果如下:

生产者向共享区域加入1.653411
消费者消费掉1.653411
生产者向共享区域加入2.176285
生产者向共享区域加入4.727504
生产者向共享区域加入3.053323
消费者消费掉2.176285
生产者向共享区域加入0.951072
消费者消费掉4.727504
^C程序强制结束!

在程序中设置两个进程为守护进程,并捕捉KeyboardInterrupt错误,一旦捕捉到就结束主线程,同时结束两个子线程。上面是一个生产者消费者模型的一个简单实现,通过共享变量的方式使两个线程互相通信来达成一致。共享变量是线程间通信的常用方法,只要记得在对共享变量进行操作时加锁,程序就不会有问题。

但是上面的代码也有问题,在于这种代码通过无限对共享变量访问的方式进行判断空还是满,这样也降低了效率。因为其中一个程序在明明知道buff满了或者空了的情况下还要进行无意义的循环,由于GIL机制,它会和其他线程争夺执行权。如果某一方在判断buff满了或者空了的情况下主动阻塞,直到另外一方通知它,它才恢复,这样就能最大化的效率。

Python中threading中的Condition类就是来帮助我们完成这件事的。它的waitnotify方法能够阻塞和通知一个线程,下面还是通过例子来了解一下:

import threading
import time
import random
MAX_BUFF_LEN = 5

buff = []
condition = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            condition.acquire()
            if len(buff) < MAX_BUFF_LEN:
                # 如果共享区域未满,生产数据
                num = random.uniform(0, 5)
                buff.append(num)
                print('生产者向共享区域加入%f' % num)
                condition.notify()
            else:
                # 如果共享区满,停止生产
                print('共享区满,生产者阻塞!')
                condition.wait()
            condition.release()
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            condition.acquire()
            if buff:
                # 如果共享区非空,消费数据
                num = buff.pop(0)
                print('消费者消费掉%f' %num)
                condition.notify()
            else:
                # 如果共享去空,停止消费
                print('共享区空,消费者阻塞!')
                condition.wait()
            condition.release()
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序强制结束!')

程序结果:

生产者向共享区域加入0.040350
消费者消费掉0.040350
共享区空,消费者阻塞!
生产者向共享区域加入3.266167
消费者消费掉3.266167
生产者向共享区域加入3.468917
^C程序强制结束!

上面的代码中,acquire方法实际上是获得锁,wait方法将线程阻塞,实际上是将锁释放。当一个线程调用notify方法时,另一个线程就被唤醒,但是这时候这个线程并没有调用wait或者release方法释放锁,因此另一个线程虽然醒过来了但是还是没有执行,直到这个线程将锁释放。

在使用共享变量的时候,需要时刻注意是否线程安全,非常不方便。好在是Python中提供了一个Queue类,它是线程安全的,有了它我们可以把注意力放在如何实现代码逻辑上,而不是过多的注意到线程安全上。在Python2.7中该模块名为Queue,而在Python3.6中该模块名为queue。使用Queue类改进的代码如下:

import threading
import time
import random
from queue import Queue

MAX_BUFF_LEN = 5

buff = Queue(MAX_BUFF_LEN)
condition = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            num = random.uniform(0, 5)
            buff.put(num)
            print('生产者向共享区域加入%f' % num)
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            num = buff.get()
            print('消费者消费掉%f' %num)
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序强制结束!')

Queue是一个FIFO队列,它的get方法和put方法分别是入队和出队,在入队和出队时获取了锁以保证线程安全,如果队列空或者满,默认情况下get方法和put方法自动阻塞。阻塞和唤醒的方式实质上是调用了Condition类的waitnotify方法。Queue类比较简单,推荐大家直接查看源码或者官方文档。

这里还有一篇写得非常好的博客,推荐大家去看看:Producer-consumer problem in Python

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容

  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    胜浩_ae28阅读 5,084评论 0 23
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 1,661评论 0 12
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    小徐andorid阅读 2,796评论 3 53
  • 本镜像采用官方原版app制作,集成Clover 4391,支持UEFI启动安装;如果卡+++请替换Drivers6...
    daliansky阅读 12,273评论 2 0
  • 大家好,我是侯俊玲,说实话看到这个主题时,我想了很久,我的梦想是什么?我好像是一只温水里的青蛙,跳不出这个圈子,消...
    侯俊玲阅读 419评论 0 0