Python 基于协程的异步 IO

转载请注明出处


异步 IO

参考链接
我们使用的是 Python 3 内置的模块 asyncio


概念说明

  • event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数

  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
    例如:

    # 这样就是注册了一个协程
    async def test1():
        pass
        return 'ok'
    
    coroutine = test1()
    
  • task 任务:一个协程对象就是一个原生可以挂起的函数,task 则是对协程进一步封装,其中包含了任务的各种状态

  • future: 代表将来执行或没有执行的任务的结果。它和 task 上没有本质上的区别

  • async/await 关键字:Python3.5 用于定义协程的关键字,async 定义一个协程,await 用于挂起阻塞的异步调用接口。


定义一个简单的协程

import asyncio

# 这里定义了一个协程函数
async def test():
    pass
    return 'ok'

# 这里是提前写好的回调函数
def test(future):
    print(future)


# 这里是返回了一个协程对象
coroutine = test()

# 把协程对象封装成 task 
task = asyncio.ensure_future(coroutine)

# 创建消息循环
loop = asyncio.get_event_loop()

# 设置回调函数,我们通过回调函数,当任务执行完毕,就会把结果传到回调函数中
task.add_done_callback(test)

# 把 task 加入到消息队列中去运行,这一步一定要到最后
# 这是一个阻塞式的操作
loop.run_until_complete(task)

await 关键字

await就类似于 yield from

使用 async 可以定义协程对象,使用 await 可以针对耗时的操作进行挂起,就像生成器里的 yield 一样,函数让出控制权。协程遇到 await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行 。

这里我们用内置的 asyncio.sleep(1) ,模拟网络请求

import asyncio


async def test():
    # 这里就是使用 await 关键字,遇到这个,事件循环就会挂起该协程,然后执行其他协程。
    # 直到其他的协程也挂起或者执行完毕,才继续执行下面的代码,异步执行的,就是说总时间不是叠加,而是算最长的
    await asyncio.sleep(1)
    return 'ok'


def a(future):
    print(future.result())


coroutine = test()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
task.add_done_callback(a)
loop.run_until_complete(task)

并发操作(多个协程)

import asyncio
import time

start = time.clock()


async def test(a):
    await asyncio.sleep(a)
    return 'ok'


coroutine1 = test(1)
coroutine2 = test(1)
coroutine3 = test(1)
coroutine4 = test(1)

# 这里就是多个任务。
tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
    asyncio.ensure_future(coroutine4),
]

loop = asyncio.get_event_loop()
# loop.run_until_complete(asyncio.gather(*tasks)) 
# 两个方式都可以,实现并发的时候,要把协程像下面一样并列的执行,
# 如果有协程嵌套,那么子协程如果要并发,可以使用 await asyncio.gather(*tasks)
results = loop.run_until_complete(asyncio.wait(tasks))
end = time.clock()
print(end - start)

# 可以通过这个来获取每一个,协程的返回值,Python 官方不推荐这样做,不推荐使用返回值,而是直接在协程中完成任务
for item in results[0]:
    print(item.result())


run_until_complete 和 run_forever

我们一直通过 run_until_complete 来运行 loop ,等到 future 完成,run_until_complete 也就返回了。

import asyncio


async def do_some(n):
    print('等待1秒')
    await asyncio.sleep(n)
    print('完成')
    return 'ok'


loop = asyncio.get_event_loop()

coroutine = do_some(1)

task = asyncio.ensure_future(coroutine)

# 等所有的 task 都执行完了,这个函数就会结束,程序退出。
res = loop.run_until_complete(task)
print(res.result())

现在改用 run_forever

import asyncio

async def do_some(n):
    print('等待1秒')
    await asyncio.sleep(1)
    print('完成')


coro = do_some(1)
asyncio.ensure_future(coro)
loop = asyncio.get_event_loop()
# 一秒后结果输出,但是,线程一直被阻塞,程序没有退出。
loop.run_forever()

使用run_forever(),future 结束,但是程序并不会退出。run_forever会一直运行,知道 loop.stop()被调用,但是你不能像下面这样调用

loop.run_forever()
loop.stop()

run_forever()不返回,stop永远也不会被调用,所以我们只能在协程中调用 stop,如下

async def do_some(loop):
    print('Waiting 3 second')
    await asyncio.sleep(3)
    print('Done')
    loop.stop()

这样并非没有问题,假设有多个协程在 loop 里面运行

import asyncio

async def do_some(loop, n):
    print('等待{}秒'.format(n))
    await asyncio.sleep(n)
    print('{} 完成'.format(n))
    
    # 第二个协程还没有结束,loop 就停止了,被先完成的那个协程给 stop 
    loop.stop()


loop = asyncio.get_event_loop()
coro1 = do_some(loop, 1)
coro2 = do_some(loop, 2)
asyncio.ensure_future(coro1)
asyncio.ensure_future(coro2)

loop.run_forever()

要解决这个问题,那我们就要使用 gather把多个协程合并为一个 future ,并添加回调。然后在回调中停止。

asyncio.gather 和 asyncio.ensure_future 可以把新的任务加入到事件循环当中

import asyncio
import functools


async def do_some(n):
    print('等待{}秒'.format(n))
    await asyncio.sleep(n)
    print('{} 完成'.format(n))

# 这个 future 是我们执行后的结果集
def done_callback(loop, future):
    loop.stop()
    pass


loop = asyncio.get_event_loop()
futures = asyncio.gather(do_some(1), do_some(3))
# 这里式使用了偏函数,当然我们也可以使用闭包
futures.add_done_callback(functools.partial(done_callback, loop))
loop.run_forever()

--------------------------------------------
# 使用闭包

def done_callback(loop):
    def inner(future):
        loop.stop()
    return inner

loop = asyncio.get_event_loop()
futures = asyncio.gather(do_some(1), do_some(3))
futures.add_done_callback(done_callback(loop))
loop.run_forever()
---------------------------------------------

这样就可以啦,其实我们这里就是手动的实现了 run_until_complete,而且 run_until_complete内部也是调用了run_forever


Close Loop ?

以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢? 简单来说,loop 只要不关闭,就还可以再运行。:

loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

但是如果关闭了,就不能再运行了:

loop.run_until_complete(do_some_work(loop, 1))
loop.close()
# 此处异常
loop.run_until_complete(do_some_work(loop, 3))  
建议调用 loop.close,以彻底清理 loop 对象防止误用。

生产者消费者

import asyncio
from asyncio import Queue


async def producer(queue):
    print('开始生成数据')
    for i in range(100):
        await queue.put(i)
        print('已经生产数据 {}'.format(i))

        if queue.qsize() >= 10:
            await queue.join()


async def comsumer(queue):
    print('开始消费数据')
    while True:
        item = await queue.get()
        print('消费了数据 {}'.format(item))
        queue.task_done()

loop = asyncio.get_event_loop()
queue = Queue()
loop.run_until_complete(asyncio.gather(producer(queue), comsumer(queue)))

协程 API

loop = asyncio.get_event_loop()  # 获取事件循环
loop = run_until_complete()  # 开启循环
asyncio.ensure_future()  # 把协程加入到循环,直到调用 loop.run_forever() 才会执行
asyncio.gather()  # 把更多的协程加入到循环
asyncio.Semaphore(5) # 限制同时运行的协程数量

Semaphore 例子

import asyncio
import aiohttp


# Semaphore 只要放在协程里面就可以,用 async with 修饰,windows 限制了 Semap 的数量,Linux 没有限制。

class AsyncRequest(object):
    sem = asyncio.Semaphore(500)
    @classmethod
    async def request(cls, url=None, method='GET', cookies=None, params=None, data=None, headers=None, encode='utf8'):
        async with cls.sem:
            async with aiohttp.ClientSession(cookies=cookies) as session:
                if method == 'GET':
                    async with session.get(url, params=params, headers=headers) as response:
                        return {
                            'text': await response.text(encoding=encode),
                            'url': response.url,
                            'headers': response.headers,
                            'read': await response.read(),
                            'status_code': response.status,
                            'cookies': response.cookies
                        }
                elif method == 'POST':
                    async with session.post(url, data=data, headers=headers) as response:
                        return {
                            'text': await response.text(encoding=encode),
                            'url': response.url,
                            'headers': response.headers,
                            'read': await response.read(),
                            'status_code': response.status,
                            'cookies': response.cookies
                        }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,049评论 5 473
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,478评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,109评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,097评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,115评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,280评论 1 279
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,748评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,398评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,553评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,440评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,487评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,176评论 3 317
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,750评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,821评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,049评论 1 257
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,559评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,150评论 2 341

推荐阅读更多精彩内容