1.基本介绍
期物(Future)是concurrent.futures模块和asyncio包的重要组件。
python3.4之后标准库中有两个名为Future的类:concurrent.futures.Future和asyncio.Future.
这两个类的作用相同:类的实例都表示可能已经完成活着尚未完成的延迟计算。与JS库中的Promise对象,Tornado框架中的Future类类似。
通常我们自己不应该创建期物,而只能由并发框架实例化。
2.concurrent.futures.ThreadPoolExecutor中的方法介绍
- Executor.submit()方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个期物。
我们的代码中不应该改变期物的状态,并发框架在期物表示的延迟计算结束后会改变期物的状态,而我们无法控制计算何时结束。
这两种期物都有.done()方法。这个方法不阻塞,返回值是布尔值,指明期物连接的可调用对象是否已经执行。我们通常不会询问期物是否运行结束,而是会等待通知。因此,两个Future类都有.add_done_callback()方法。这个方法只有一个参数,类型是可调用的对象,期物运行结束后会调用指定的可调用对象。
此外,还有.result()方法,在期物运行结束后调用的话,这个方法在两个Future类中的 作用相同:返回可调用对象的结果,或者重新抛出执行可调用的对象时抛出的异常。
可是如果期物没有运行结束,.result方法在两个Future类中的行为相差很大。
对concurrent.futures.Future实例来说,调用f.result()方法会阻塞调用方所在的线程,直到有结果可返回。此时,result方法可以接收可选的timeout参数,如果在指定的时间内期物没有运行完毕,会抛出TimeoutError异常。
asyncio.Future.result方法不支持设定超时时间,在那个库中获取期物的结果最好使用yield from结构,不过对于concurrent.futures.Future实例不能这么做。 - Exceptor.map方法返回值是一个迭代器,迭代器的
__next__
方法调用各个期物的result方法,因此我们得到的时各个期物的结果,而非期物本身。该函数还有一个特性:这个函数返回结果的顺序与调用开始的顺序一致。如果第一个调用生产结果用时10s.而其他调用只用1s。代码会阻塞10s,获取map方法返回的生成器产出的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。如果必须等到获取所有结果后再处理,这种行为没有问题,不过通过更可取的方式是,不管提交的顺序,只要有结果就获取。为此,要把Executor.submit方法和futures.as_completed函数结合使用。因为这个组合比executor.map更灵活,因为submit方法能处理不同的可调用对象和参数,而executor.map只能处理参数不同的同一个可调用对象。此外传给futures.as_completed函数的期物集合可以来自多个Executor实例。
concurrent.futures.as_completed函数的参数是一个期物列表,返回值是一个迭代器。在期物运行结束后产出期物。
def download_many(cc_list):
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(max_workers=3) as executor:
to_do = []
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do.append(future)
msg = 'Scheduled for {}: {}'
print(msg.format(cc, future))
results = []
for future in futures.as_completed(to_do):
res = future.result()
msg = '{} result: {!r}'
print(msg.format(future, res))
results.append(res)
return len(results)
这个例子中的future.result方法不会阻塞,因为future对象是有as_completed方法产生的。
3.asyncio库
在asyncio包中,BaseEventLoop.create_task(...)方法接收一个协程,排定他的运行时间,然后返回一个asyncio.Task实例(也是asyncio.Future类的实例),因为Task是Future的子类,用于包装协程。这与Executor.submit(...)方法创建concurrent.futures.Future实例是一个道理。
因为asyncio.Future类的目的是与yield from一起使用,所以通常不需用使用以下方法:
- 无需调用my_future.add_done_callback(...)因为可以直接把想在期物运行结束后执行的操作放在协程中yield from my_future表达式的后面,这是协程的一大优势,协程是可以暂停和恢复的函数。
- 无需调用my_future.result().因为yield from 从期物中产出的值就是结果。
asyncio.async(coro_or_future, *, loop=None)
这个函数统一了协程和期物:第一个参数可以是二者中的任何一个。如果是 Future或 Task 对象,那就原封不动地返回。如果是协程,那么 async 函数会调用loop.create_task(...) 方法创建 Task 对象。loop= 关键字参数是可选的,用于传入事件循环;如果没有传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数获取循环对象.
不过,在asyncio 中,基本的流程是一样的:在一个单线程程序中使用主循环依次激活队列里的协程。各个协程向前执行几步,然后把控制权让给主循环,主循环再激活队列里的下一个协程。
import asyncio
import aiohttp
import time
import os
import sys
POP20_CC = ('CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def show(text):
print(text, end=' ')
sys.stdout.flush()
@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
image = yield from resp.read()
return image
@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc
def download_many(cc_list):
loop = asyncio.get_event_loop()
to_do = [download_one(cc) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(to_do)
res, _ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)
def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main(download_many)
asyncio.wait(...)
协程的参数是一个由期物或者协程构成的可迭代对象。wait会分别把各个协程包装进入一个Task对象。最后的结果是,wait处理的所有对象都通过某种方法变成Future实例。wait是协程函数,因此返回的是一个协程或者生成器对象。为了驱动协程,我们把协程传给loop.run_until_complete(...)方法。