简介
asyncio是python3.4之后的协程模块,是python实现并发重要的包,这个包使用时间循环驱动实现并发。
- event_loop:时间循环,开启之后,可以将协程注册进来。
- task:一个协程对象就是一个可以挂起的函数,任务是对协程的进一步封装,其中包含了任务的各种状态
- future: 期物,代表将来执行或没有执行的任务的结果。task可以说是future的子类。
asyncio
先通过一个简单的例子看asyncio的基本用法与作用:
import asyncio
import itertools
import sys
import time
@asyncio.coroutine
def spin():
for i in itertools.cycle('|/-\\'):
write, flush = sys.stdout.write, sys.stdout.flush
write(i)
flush()
write('\x08'*len(i))
try:
yield from asyncio.sleep(1)
except asyncio.CancelledError:
break
@asyncio.coroutine
def slow_f():
yield from asyncio.sleep(3)
return 3
@asyncio.coroutine
def sup():
spiner = asyncio.async(spin())
print("spiner:",spiner)
r = yield from slow_f()
spiner.cancel()
return r
def main():
loop = asyncio.get_event_loop()
r = loop.run_until_complete(sup())
loop.close()
print("r:",r)
main()
输出结果:
spiner: <Task pending coro=<spin() running at c:/Users/DELL/Desktop/ssj/search/descrip.py:7>>
r: 3 # 运行期间会有动画指针
- 协程可以使用@asyncio.coroutine装饰器装饰,asyncio.sleep可以避免时间循环阻塞。
- asyncio.async包装的协程,不阻塞,立即返回一个Task对象
- slow_f三秒后,控制器返回到sup上,spiner.cancel()取消Task,任务结束。
- get_event_loop获取时间循环
- run_until_complete在时间循环中,载入任务,驱动协程运行完毕。
asyncio.Task 是asyncio.Future的子类,用于包装协程。asyncio.Future 类的 .result() 方法没有参数,因此不能指定超时时间。此外,如果调用 .result() 方法时期物还没运行完毕,那么 .result() 方法不会阻塞去等待结果,而是抛出 asyncio.InvalidStateError 异常。获取asyncio.Future 对象的结果通常使用 yield from,从中产出结果。使用 yield from 处理期物,等待期物运行完毕这一步无需我们关心,而且不会阻塞事件循环,因为在 asyncio 包中,yield from 的作用是把控制权还给事件循环。
在 asyncio 包中,期物和协程关系紧密,因为可以使用 yield from 从asyncio.Future 对象中产出结果。如,若foo是协程函数,那么可以这样写:res = yield from foo()。
python3.5之后,协程采用新语法,采用了关键字async/await:
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
把@asyncio.coroutine替换为async;把yield from替换为await。
- await
- result = await future 或者 result = yield from future:阻塞直到future完成,然后会返回future的结果,或者取消future的话抛出异常CancelledError
- result = await coroutine 或者 result = yield from coroutine:直到协程产生结果或者抛出异常才往下执行。
调用一个协程,不会执行这个协程的代码,会立即返回,当await coroutine 或者 yield from coroutine才会执行代码。或者通过时间循环调度,
ensure_future 和 create_task都可以包装协程,返回一个task对象
- ensure_future(coro_or_future, *, loop=None) 返回task对象,若参数是future,直接返回
- create_task返回task对象。
- run_until_complete
import asyncio
import datetime
async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
print("stop")
loop.close()
输出结果:
2019-08-25 15:56:04.353596
2019-08-25 15:56:05.354924
2019-08-25 15:56:06.362594
2019-08-25 15:56:07.362925
2019-08-25 15:56:08.363157
stop
display_date(loop)协程运行完毕,run_until_complete才会返回,否则阻塞。与之相似功能的call_soon却是不阻塞的。
- call_soon
import asyncio
def hello_world(loop):
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
call_soon(callback),会把callback放到一个先进先出的队列,每个callback会被执行一次。
call_soon注册的协程任务之后,立即返回,不阻塞,配合run_forever使用,run_forever会一直循环,直到loop.stop()。
-
call_soon_threadsafe
类似于call_soon,但是是线程安全的。涉及到多线程时会使用到。 -
call_later
call_later(delay, callback, *args) 延时delay后执行callback,返回一个asyncio.Handle对象,可以通过cancel取消。类似于这个方法的还有call_at等
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
h = loop.call_later(1, display_date, end_time, loop)
if (loop.time() + 3.0) > end_time:
h.cancel()
loop.stop()
else:
loop.stop()
loop = asyncio.get_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
输出结果:
2019-08-25 16:33:03.879382
2019-08-25 16:33:04.882574
2019-08-25 16:33:05.886996
- add_signal_handler
另外当使用run_forever时,可以通过信号终止
import asyncio
import functools
import os
import signal
def ask_exit(signame):
print("got signal %s: exit" % signame)
loop.stop()
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
functools.partial(ask_exit, signame))
print("Event loop running forever, press Ctrl+C to interrupt.")
print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
try:
loop.run_forever()
finally:
loop.close()
- 链式协程
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
print_sum要等到compute完成之后才会继续执行。
下面是执行链:
event_loop事件循环运行之后,将task注册进来, await compute(x, y)后,控制权走到compute里,遇到 await asyncio.sleep(1.0)后,控制权返回给loop,loop判断情况后,等待1s,控制权给compute,compute完成后,控制权给print_sum。