其实
asyncio
的学习一点也不快乐
一、python 的多线程和多进程
要想理解 asyncio
的异步编程,需要简单了解一下 python
的多线程和多进程知识
-
1、多线程
python
有GIL
机制,因此,python
的多线程虽然是操作系统的原生线程,但无法完成真正的并行运行,而仅仅在线程处于睡眠或者等待I/O
时,才会发挥真正的多线程作用。
-
1.1、睡眠
time.sleep()
threading.Lock
- 线程模块其他同步对象
-
1.2、I/O
aiohttp
open
-
1.3、释放
GIL
- 所谓释放
GIL
是指当前线程执行一定长度字节码或者一段时间后,释放GIL
,由系统将GIL
分配给其他线程,当前线程进入等待状态 -
py2
解释器每执行1000
字节码释放GIL
-
py3
解释器每执行15ms
释放GIL
- 所谓释放
-
1.4、
GIL
全局解释器锁- 同一进程同一时间只有一个线程在执行字节码,但睡眠线程或者
I/O
操作相关线程不受GIL
锁限制,允许并发执行。(GIL
保证同一时刻只有一个线程对共享资源进行存取,省去线程间资源锁的开销)
- 同一进程同一时间只有一个线程在执行字节码,但睡眠线程或者
-
1.5、
GIL
原理/* s.connect((host, port)) method */ static PyObject * sock_connect(PySocketSockObject *s, PyObject *addro) { sock_addr_t addrbuf; int addrlen; int res; /* convert (host, port) tuple to C address */ getsockaddrarg(s, addro, SAS2SA(&addrbuf), &addrlen); Py_BEGIN_ALLOW_THREADS res = connect(s->sock_fd, addr, addrlen); Py_END_ALLOW_THREADS /* error handling and so on .... */ }
-
Py_BEGIN_ALLOW_THREADS
放弃GIL
-
Py_END_ALLOW_THREADS
重新获取GIL
,一个线程会在这个位置阻塞,等待另一个线程释放锁;一旦出现这个情况,等待的线程会抢夺回锁,并恢复字节码的执行 - 简而言之:允许有N个线程在网络
I/O
堵塞,或等待重新获取GIL
,但只有一个线程运行字节码
-
-
1.6、示例
- 睡眠阻塞
import time from threading import Thread from datetime import datetime def write(i): print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i)) time.sleep(4) print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i)) def fun(): print('start ...') for i in range(3): Thread(target=write, args=(i,), daemon=False).start() print('end ...') # 输出结果 start ... 2018-02-09 23:58:25 start write --> 0 2018-02-09 23:58:25 start write --> 1 2018-02-09 23:58:25 start write --> 2 end ... 2018-02-09 23:58:29 end write --> 0 2018-02-09 23:58:29 end write --> 1 2018-02-09 23:58:29 end write --> 2
- CPU 阻塞
import time from threading import Thread from datetime import datetime def write(n): print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n)) l, sum_ = list(range(100000000)), 0 for i in l: sum_ += i print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n)) def fun(): print('start ...') for i in range(3): Thread(target=write, args=(i,), daemon=False).start() print('end ...') # 输出结果 start ... 2018-02-10 00:13:55 start write --> 0 2018-02-10 00:13:58 start write --> 1 2018-02-10 00:14:02 start write --> 2 end ... 2018-02-10 00:14:27 end write --> 0 2018-02-10 00:14:32 end write --> 1 2018-02-10 00:14:35 end write --> 2
- 总结
- 对于睡眠操作或者
I/O
操作,多线程的作用非常明显,明显减少所消耗总时间; - 对于
CPU
计算型操作,多线程操作反而因为多线程间获取GIL
而增加总的消耗时间。
- 对于睡眠操作或者
-
2、
python
多进程
python
多进程即其他语言中的多进程概念,不再累述
二、异步编程思想
- 1、协程
coroutine
- 2、任务
Task
- 3、事件循环
loop
1、Task 对象主要包含 协程(coro)和轮询对象(loop)2个属性;
2、Loop 对象使用队列和堆数据结构存放Handle对象(绑定了回调函数,如:task的_step方法等)。队列中存放的是可以立即执行的任务,堆中存放的是一定时间后要执行的任务。对于yield from asyncio.sleep() 的任务则是添加到堆中,到达指定时间后执行。
# 简单的调用示例
import asyncio
@asyncio.coroutine
def coro_fun():
yield from range(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(coro_fun())
# or
tasks = [asyncio.ensure_future(coro_fun())]
loop.run_until_complete(asyncio.wait(tasks))
三、什么是协程
进程或线程间的创建依赖于系统底层进程或线程库,其运行也依赖于系统的任务调度系统,在任务切换时,
cpu
需要进行上下文切换。
协程是运行在单线程上,协程间的切换是在语言层级实现的,依赖于对应协程库。
- 在单线程执行过程中,如果涉及
sleep
,网络IO
操作时,线程会阻塞住等待任务完成; - 但如果使用协程,轮询对象(
loop
)在轮询事件时,会分别处理就绪对象_ready
和调度对象_scheduled
以及select
监听对象。每次进行轮询时,会筛选出调度对象中满足执行条件的对象以及select
监听到可读或可写的对象,添加到就绪对象中,由loop
对象进行循环调度。_ready
-
_ready
+= 满足执行条件的_scheduled
对象 -
_ready
+=select
监听到的可读或可写对象 -
loop
遍历执行_ready
中的对象
四、什么是期物
期物对象的设计初衷是,期物用来追踪任务或者协程的运行状态。一般使用中,期物用来追踪
_ready
,_scheduled
,select
监听的对象,在各对象执行完成后设置期物对象状态为FINISHED
,并将设置_loop
轮询对象状态为close
的函数注册到loop
对象的_ready
队列中,由loop
对象轮询完成。
五、源代码分析
- 关于 _ code _.co_flags
# 每个函数或方法都有 __code__ 魔法方法 以及其对应的 co_flags 值 # 在 Cpython 中, 1、生成器函数的标识符为 CO_GENERATOR 即 0x20, 2、协程函数的标识符为 CO_COROUTINE 即 0x180 3、CO_ITERABLE_COROUTINE 即 0x100 # 通过对函数对象的 __code__.co_flags 与 对应的标识符做位与运算,如果是真值,则表明函数对象属于生成器函数或协程函数 def gen_fun(): yield from range(10) >>> gen_fun.__code__.co_flags # 99 >>> 99 & 0x20 # 32, True >>> 99 & 0x180 # 0, False async def asy_fun(): await sleep(4) >>> asy_fun.__code__.co_flags # 227 >>> 99 & 0x20 # 32, True >>> 227 & 0x180 # 128, True
- 关于类型判断
from collections import Iterator, Awaitable # 判断迭代器 和 Awaitable 对象 class A: def __iter__(self): return iter([1,2,3,4,5]) def __await__(self): return iter([1,2,3,4,5]) a = A() >>> isinstance(a, Iterator) # True >>> isinstance(a, Awaitable) # True # 判断是否为协程等 import inspect async def asy_fun(): await a >>> inspect.iscoroutine(asy_fun()) # True
- @asyncio.coroutine
def coroutine(func): # 将一个生成器标记为协程,如果在destroyed前没有调用,则会记录错误 # 这个方法是使用 inspect.iscoroutinefunction 方法判断是否为协程方法,使用 types.coroutine 装饰的生成器,或 async def 语法定义的函数都会返回 True if _inspect_iscoroutinefunction(func): return func # 使用 co_flags 判断是否为生成器 if inspect.isgeneratorfunction(func): coro = func else: @functools.wraps(func) def coro(*args, **kw): res = func(*args, **kw) # 判断 res 是否为期物,生成器 或 协程包装类 实例 if isinstance(res, futures.Future) or inspect.isgenerator(res) or \ isinstance(res, CoroWrapper): res = yield from res elif _AwaitableABC is not None: # py 3.5 才会有 Awaitable 类 try: # 如果有 __await__属性,__await__属性只会返回一个不是协程的迭代器 await_meth = res.__await__ except AttributeError: pass else: # 如果是 Awaitable 对象 if isinstance(res, _AwaitableABC): # 使用 yield from 处理其迭代器 res = yield from await_meth() return res # 使用 types.coroutine 包装 coro(注意,多层 @types.coroutine 装饰不会影响,会直接return装饰的值) if not _DEBUG: if _types_coroutine is None: wrapper = coro else: wrapper = _types_coroutine(coro) else: @functools.wraps(func) def wrapper(*args, **kwds): # 使用协程包装器处理 w = CoroWrapper(coro(*args, **kwds), func=func) if w._source_traceback: del w._source_traceback[-1] # 如果是 py 3.5 则包装增加 协程 对象的属性,否则包装为 生成器 对象的属性 w.__name__ = getattr(func, '__name__', None) w.__qualname__ = getattr(func, '__qualname__', None) return w # 用以别处使用 asyncio.iscoroutinefunction() 判断为 True 的作用 wrapper._is_coroutine = True # For iscoroutinefunction(). return wrapper
- @types.coroutine
def coroutine(func): # 将一个普通的生成器函数转化为协程 if not callable(func): raise TypeError('types.coroutine() expects a callable') if (func.__class__ is FunctionType and getattr(func, '__code__', None).__class__ is CodeType): # 获取函数的 co_flags co_flags = func.__code__.co_flags # 检查是否为协程函数 if co_flags & 0x180: return func # 检查是否为生成器函数,此步主要作用是将生成器的 co_flags 同 0x100 做位或运算,将其标识变更为协程标识 if co_flags & 0x20: # TODO: Implement this in C. co = func.__code__ func.__code__ = CodeType( co.co_argcount, co.co_kwonlyargcount, co.co_nlocals, co.co_stacksize, co.co_flags | 0x100, # 0x100 == CO_ITERABLE_COROUTINE co.co_code, co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, co.co_lnotab, co.co_freevars, co.co_cellvars) return func # 用以支持类似生成器的对象 @_functools.wraps(func) def wrapped(*args, **kwargs): coro = func(*args, **kwargs) # 协程或 co_flags 大于 256 的生成器对象,直接返回 if (coro.__class__ is CoroutineType or coro.__class__ is GeneratorType and coro.gi_code.co_flags & 0x100): return coro if (isinstance(coro, _collections_abc.Generator) and not isinstance(coro, _collections_abc.Coroutine)): # 实现了生成器抽象类的方法,使用生成器包装器处理成生成器 return _GeneratorWrapper(coro) # 协程抽象类实例或其他对象 return coro return wrapped