Python并发之异步I/O(async,await)
背景
Python有很长一段的异步编程历史,特别是twisted
,gevent
和一些无堆栈的Python项目。
异步编程因为一些好的原因在这些年来越来越受关注。尽管相对于传统的线性风格更难一点,但是却是值得的:因为异步编程有更高的效率。
举个例子:在Http请求方面,Python异步协程可以提交请求然后去做队列中其他等待的任务,让它慢慢请求,而不是传统的一直等它请求到完成为止,这样的话会浪费更多的时间与资源。总之异步编程能让你的代码在处于等待资源状态时处理其他任务。
在Python3.4中,asyncio
产生了。而在Python3.5中,有加入了对async def
和await
新的语法支持,让我们看一看它们怎么工作的。
协程
在Python中,一个异步的函数我们通常叫它协程。之前我们在讲解yield
的时候也已经讲过yield
语法在协程中的基本使用了,这次同样是协程,但却是不同的语法。
#在Python 3.4中, 创建一个协程我们用asyncio.coroutine装饰器:
async def double(x):
return x * 2
# 这是个协程对象
>>> double(6)
>>><coroutine object double at 0x115b59d58>
# 既然是协程,我们像之前yield协程那样,预激活一下(注意这里用next(double(6)预激活会报错)
>>> double(6).send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: 12
# 好像差不多。
或者是这样的:
async def ping_server(ip):
# ping code here...
# 用await语法代替"yield from"
async def ping_local():
return await ping_server('192.168.1.1')
建议使用Python3.5最新主流语法:async def
,await
asyncio的几个重要结构
我这里都用英文展示,原生,易理解。怎么翻译都绕口。
event loop:
An event loop essentially manages and distributes the execution of different tasks. It registers them and handles distributing the flow of control between them.
Coroutines:
Coroutines are special functions that work similarly Python generators that on await they release the flow of control back to the event loop. A coroutine needs to be scheduled to run using the event loop, to do this we create a Task, which is a type of Future.
Futures:
Futures are objects that represent the result of a task that may or may not have been executed. This result may be an exception.
理解基本架构很重要,理解协程,future,task的概念重中之重,至于什么内部实现,真的挺复杂的,无需关心。
在这边的时候我卡了很长时间:future
与Task
的区别是什么????
future在多线程说过,future表示终将发生的事情,而确定某件事会发生的唯一方式是执行的时间已经排定。(你不排定它,它就是个协程),那怎么排定?
BaseEventLoop.create_task(...)
或者 asyncio.ensure_future
方法接收一个协程,排定它的运行时间,然后返回一个asyncio.Task
实例——也是 asyncio.Future 类的实例,因为 Task 是Future 的子类,用于包装协程。这与调用 Executor.submit(...) 方法创建Future
实例是一个道理
这句话我读了好多遍,意思是不是说future跟task是同一样东西。对于event loop来说,一个包装了协程的future,就是循环中的一个task?我是这么理解的。
我们无法确定future啥时完成结束,但是总归结束(无论报错还是返回值)的,因为我们已经给它排定了时间。
例子1
少废话,直接看例子。
import asyncio
import time
start = time.time()
def tic():
return 'at %1.1f seconds' % (time.time() - start)
async def gr1():
# Busy waits for a second, but we don't want to stick around...
print('gr1 started work: {}'.format(tic()))
# 暂停两秒,但不阻塞时间循环,下同
await asyncio.sleep(2)
print('gr1 ended work: {}'.format(tic()))
async def gr2():
# Busy waits for a second, but we don't want to stick around...
print('gr2 started work: {}'.format(tic()))
await asyncio.sleep(2)
print('gr2 Ended work: {}'.format(tic()))
async def gr3():
print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
await asyncio.sleep(1)
print("Done!")
# 事件循环
ioloop = asyncio.get_event_loop()
# tasks中也可以使用asyncio.ensure_future(gr1())..
tasks = [
ioloop.create_task(gr1()),
ioloop.create_task(gr2()),
ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()
output:
gr1 started work: at 0.0 seconds
gr2 started work: at 0.0 seconds
Let's do some stuff while the coroutines are blocked, at 0.0 seconds
Done!
gr2 Ended work: at 2.0 seconds
gr1 ended work: at 2.0 seconds
asyncio.wait(...)
协程的参数是一个由future或协程构成的可迭代对象;wait 会分别
把各个协程包装进一个 Task 对象。最终的结果是,wait 处理的所有对象都通过某种方式变成 Future 类的实例。wait 是协程函数,因此返回的是一个协程或生成器对象。ioloop.run_until_complete
方法的参数是一个future或协程。如果是协程,run_until_complete
方法与 wait 函数一样,把协程包装进一个 Task 对象中。在 asyncio 包中,future和协程关系紧密,因为可以使用 yield from 从asyncio.Future 对象中产出结果。这意味着,如果 foo 是协程函数(调用后返回协程对象),抑或是返回Future 或 Task 实例的普通函数,那么可以这样写:res = yield from foo()。这是 asyncio 包的 API 中很多地方可以互换协程与期物的原因之一。 例如上面的例子中tasks可以改写成协程列表:
tasks = [gr1(), gr(2), gr(3)]
详细的各个类说明,类方法,传参,以及方法返回的是什么类型都可以在官方文档上仔细研读,多读几遍,方有体会。
好好体会event loop中运作图.....如下所示:
例子2
例子1只是用来讲述语法,实际用途中还是得看这个例子。上节多线程与多进程的例子,获取有效网址。
import asyncio
import time
import aiohttp
import async_timeout
msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}
urls = [msg.format(i) for i in range(4500, 5057)]
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url) as response:
return response.status
async def main(url):
async with aiohttp.ClientSession() as session:
status = await fetch(session, url)
return status
if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop()
tasks = [main(url) for url in urls]
# 返回一个列表,内容为各个tasks的返回值
status_list = loop.run_until_complete(asyncio.gather(*tasks))
print(len([status for status in status_list if status==200]))
end = time.time()
print("cost time:", end - start)
任重而道远
- 在封闭的代码块中使用一些新的关键字就能实现异步功能
- 我对于这一块还是处于小白状态,掌握不是很全面
- 多Google,多Google,多Google.....
参考资料
http://stackabuse.com/python-async-await-tutorial/
https://hackernoon.com/asyncio-for-the-working-python-developer-5c468e6e2e8e
https://docs.python.org/3/library/asyncio-task.html
http://quietlyamused.org/blog/2015/10/02/async-python/