tornado是异步的,通过使用非阻塞的io方式,可以处理c10k问题,可以处理长连接、 websockets。
安装
py2.7 py3.3+
concurrent.futures
推荐使用的线程池 py3自带,用于ThreadResolver
pycurl
用于tornado.curl_httpclient
Twisted
用于tornado.platform.twisted
pycares
非阻塞DNS解析
monotonic Monotime
在时钟一直改变的情况 在py3.3之后就不需要了
异步和非阻塞IO
实时的web需要每用户的长连接(大多数情况是空闲的),如果这样的话,传统的同步web服务器的解决方案是每个用户一个线程,这样耗费较高。tornado则使用单线程的事件循环,所以应用代码都应该是异步的、非阻塞的,因为同时只能执行一个操作。
阻塞
是等待返回(等待网络IO,磁盘IO,锁),当然实际上任何函数调用都是要被阻塞的(因为要使用CPU)。
tornado的httpclient的DNS解析默认是阻塞的,但是如果使用了ThreadResolver或tornado.curl_httpclient(配合libcurl)则不阻塞了。tornado关注的阻塞主要是在网络IO时的阻塞。
异步
异步函数是在它执行完之前就返回了,这样一般会造成一些工作在后台继续工作,直到达成某些条件。相比通常的同步函数则是在所有的东西都完成后才返回。有很多种异步的风格:
回调参数模式
返回一个占用符 (Future, Promise, Deferred)
分配到队列中
注册回调 (e.g. POSIX signals)
没有什么透明的方法使得同步函数异步,(gevent只是使用了轻量线程切换,并不是真正的异步)
同步函数
from tornado.httpclient import HTTPClient
def synchronous_fetch(url):
http_client = HTTPClient()
response = http_client.fetch(url)
return response.body
异步函数
使用callback argument的:
from tornado.httpclient import AsyncHTTPClient
# 调用时需要给一个回调函数
def asynchronous_fetch(url, callback):
http_client = AsyncHTTPClient()
# 注册async回调函数 在其中调用给定的回调函数
def handle_response(response):
callback(response.body)
http_client.fetch(url, callback=handle_response) # callback
使用Future的:
from tornado.concurrent import Future
def async_fetch_future(url):
http_client = AsyncHTTPClient()
my_future = Future()
fetch_future = http_client.fetch(url)
fetch_future.add_done_callback(
lambda f: my_future.set_result(f.result()))
return my_future
直接使用future的方式较为复杂,但是有优点:
错误处理可以更直观,因为Future.result方法可以直接raise异常
方便协程使用
协程版本(生成器)
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
raise gen.Return(response.body) # 为了python2中生成器不能返回值所以这部是抛出异常来被捕获作为返回值 python3.3及更高会直接return
协程
推荐使用协程版本来写异步代码,在协程中通过使用yield来暂停并恢复执行(相比gevent,使用的是显式的切换并作为异步函数调用)
python3.5 async await
可以使用async来替换@gen.coroutine,await来替换yield,这样会更快
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
yield可以抛出Futures的list,但是await需要使用tornado.gen.multi,可以通过:
async def f():
executor = concurrent.futures.ThreadPoolExecutor()
await tornado.gen.convert_yielded(executor.submit(g))
但是原生的协程是不需要绑定到特定框架的
工作机制
包含yield的就是一个生成器,所有的生成器都是异步的,当被调用时返回一个生成器而不是运行并直到完成。装饰器@gen.coroutine则通过yield来和生成器交流,返回给调用者一个Future。
装饰器的内部循环:
def run(self):
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
装饰器收到生成器的Future,等待future完成(非阻塞),之后发送回给生成器作为yield的值。大多数的异步代码不涉及Future的直接使用,除了是立即发送Future给yield的值(通过异步函数返回的Future)
调用
协程不是按常规方式引发异常的,所有引发的异常都在Future里面直到被yield,所以需要注意使用的方式:
@gen.coroutine
def divide(x, y):
return x / y
def bad_call():
divide(1, 0) # 常规来说是触发除零错误 但是并不会
@gen.coroutine
def good_call():
yield divide(1, 0)
调用协程的必须自己也是一个协程,并使用yield语句来调用。在开发中,如果想重载基类的一个方法,需要查看文档来看它是否可用协程(文档中会说明该方法可以为协程或可以返回Future)。
如果不想要结果,可以使用IOLoop.spawn_callback来让IOLoop负责调用。如抛出了异常IOLoop会捕获并打印堆栈信息。
IOLoop.current().spawn_callback(divide, 1, 0) # 在IOloop启动的情况下添加一个调用
IOLoop.current().run_sync(lambda: divide(1, 0)) # 启动IOloop 运行 然后结束
协程模式
应对使用callback的
@gen.coroutine
def call_task():
yield gen.Task(some_function, other_args)
# some_function(other_args, callback=callback)
这样可按some_function的定义模式(需要callback)来正常调用,但是会返回一个Future这样可在协程中使用。
调用阻塞函数
thread_pool = ThreadPoolExecutor(4)
@gen.coroutine
def call_blocking():
yield thread_pool.submit(blocking_func, args)
并行
如果返回list或dict形式的Futures的,会等待所有的Future完成(并行的):
@gen.coroutine
def parallel_fetch(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
@gen.coroutine
def parallel_fetch_many(urls):
responses = yield [http_client.fetch(url) for url in urls]
# responses is a list of HTTPResponses in the same order
@gen.coroutine
def parallel_fetch_dict(urls):
responses = yield {url: http_client.fetch(url)
for url in urls}
# responses is a dict {url: HTTPResponse}
插入
有时想先保存一个Future而不是立即yield出
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk() # 先保存的
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
如果是使用了async def的形式,需要fetch_future = tornado.gen.convert_yielded(self.fetch_next_chunk())这样来调用
循环
import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
定时调用的
@gen.coroutine
def minute_loop():
while True:
yield do_something()
yield gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
但是注意,前面的函数是每60+N秒执行一次,(N是执行do_something的时间),
@gen.coroutine
def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
yield do_something() # Run while the clock is ticking.
yield nxt # Wait for the timer to run out.
这种方式会先启动时钟,然后执行do_something,然后在睡眠剩余的时间,这样就是的确是每60s执行一次。