协程一例:用aiohttp代替requests写异步爬虫

这篇文章不规范也不完整,重新整理的更详细规范的介绍见这里
非常不建议阅读下文。

网上aiohttp做爬虫的资料太少,官网文档是英文的看起来麻烦,所以自己部分半带翻译式的总结下

通过requests获取html的函数基本上是这样

import requests


def func(url: str) ->str:
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    cookies = {'Cookie': ''}
    # 这里暂时懒得用session, verify参数忽略https网页的ssl验证
    r = requests.get(url, headers=headers, timeout=10, cookies=cookies, verify=False)
    r.encoding = r.apparent_encoding  # 自动识别网页编码避免中文乱码,但会拖慢程序
    return r.text  # 或r.content


func('www.sina.com')

aiohttp改写

import asyncio

import aiohttp


async def html(url: str) ->str:
    code = 'utf-8'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        # 老版本aiohttp没有verify参数,如果报错卸载重装最新版本
        async with session.get(url, headers=headers, timeout=10, verify_ssl=False) as r:
            # text()函数相当于requests中的r.text,r.read()相当于requests中的r.content
            return await r.text()


loop = asyncio.get_event_loop()
loop.run_until_complete(html('www.sina.com'))
# 对需要ssl验证的网页,需要250ms左右等待底层连接关闭
loop.run_until_complete(asyncio.sleep(0.25))
loop.close()

基本上的改写如上,协程本身的概念不是重点,优越性单线程开销小啥的也不说了,这里只讲几个坑/注意事项。参考文档

  • 如果要返回text和content:
# requests
return r.text, r.content
# aiohttp
return await r.text(), await r.read()  # 不要漏后面的await,每个coroutine都要接await
  • r.text()报编码错误
return await r.text(errors='ignore')  # 直接忽略那些错误,默认是strict严格模式导致出现错误时会直接抛异常终止程序。

这里注意到,r.encoding = r.apparent_encoding的原理是什么?为什么aiohttp没有类似代码?
首先,看一下r.apparent_encoding的源码

image.png

可以看出,写法其实就是

import chardet  # 有requests模块的话已经安装了这个


code = chardet.detect(content)['encoding']

换句话说,套用到aiohttp的代码中,本来应该这么写

import asyncio

import aiohttp
import chardet


async def html(url: str) ->str:
    code = 'utf-8'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        # 老版本aiohttp没有verify参数,如果报错卸载重装最新版本
        async with session.get(url, headers=headers, timeout=10, verify_ssl=False) as r:
            content = await r.read()
            code = chardet.detect(content)['encoding']
            # text()函数相当于requests中的r.text,不带参数则自动识别网页编码,同样会拖慢程序。r.read()相当于requests中的r.content
            return await r.text(encoding=code, errors='ignore')

不过实际上,r.text()encoding=None(默认参数)的时候已经包含了这一步,所以其实无需操心什么chardet,出现编码错误先ignore再单个网页具体分析,或者就不管算了。
这部分见文档

If encoding is None content encoding is autocalculated using Content-Type HTTP header and chardet tool if the header is not provided by server.
cchardet is used with fallback to chardet if cchardet is not available.

  • 超时异常处理
    捕捉就好了...基本上碰到的有这些异常
    asyncio.TimeoutError
    aiohttp.client_exceptions.ServerDisconnectedError
    aiohttp.client_exceptions.InvalidURL
    aiohttp.client_exceptions.ClientConnectorError

文档所写

import async_timeout

with async_timeout.timeout(0.001):
    async with session.get('https://github.com') as r:
        await r.text()

用了with还是会抛timeout异常...这时要把时间设的稍微长一点比如10s,以及捕捉timeout异常。此外,这种写法会避免concurrent.futures._base.CancelledError异常。这个异常意思是超时的场合还没完成的任务会被事件循环取消掉。

The event loop will ensure to cancel the waiting task when that timeout is reached and the task hasn't completed yet.

下面是两段作用完全一样的代码(有比较多的简化只保证正常运行),对比aiohttp和多线程
作用是读取网页内容的标题和正文

aiohttp

import asyncio

import aiohttp
# pip install readability-lxml以安装
from readability import Document


def title_summary(content: bytes, url: str):
    doc = Document(content, url)
    print(doc.short_title(), doc.summary())


async def read_one(id_: int, url: str):
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(
                    url, headers=headers, timeout=1, verify_ssl=False) as r:
                await asyncio.sleep(1 + random())
                content, text = await r.read(), await r.text(
                    encoding=None, errors='ignore')
                if text:
                    title_summary(content, url)
        except:
            pass


def read_many(links: list):
    loop = asyncio.get_event_loop()
    to_do = [read_one(id_, url) for id_, url in links]
    loop.run_until_complete(asyncio.wait(to_do))
    # 或loop.run_until_complete(asyncio.gather(*to_do))这两行代码作用似乎没啥区别
    loop.close()


def main():
    links = [...]  # 要跑的所有链接列表
    read_many(links)


if __name__ == '__main__':
    main()

多线程

from concurrent import futures


import requests
from readability import Document


def title_summary(content: bytes, url: str):
    doc = Document(content, url)
    print(doc.short_title(), doc.summary())


def read_one(url: str):
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
        try:
            r = requests.get(url, headers=headers, timeout=1, verify=False)
            r.encoding = r.apparent_encoding
            content, text = r.content, await r.text
            if text:
                title_summary(content, url)
        except:
            pass


def read_many(links: list) ->int:
    workers = min(100, len(links))  # 线程数
    with futures.ThreadPoolExecutor(workers) as e:
        res = e.map(read_one, links)
    return len(list(res))


def main():
    links = [...]
    read_many(links)


if __name__ == '__main__':
    main()

基本上,协程和线程的使用就是这样。但是,如果,任务数以千计时,asyncio可能会报错:ValueError: too many file descriptors in select()
这是因为asyncio内部调用select,这个打开文件数是有限度的,这部分需要复习深入理解计算机系统一书。
这个场合不能这样写,有可能用到回调,其实也可以不用

def read_many(links: list):
    loop = asyncio.get_event_loop()
    to_do = [read_one(id_, url) for id_, url in links]
    loop.run_until_complete(asyncio.wait(to_do))
    # 或loop.run_until_complete(asyncio.gather(*to_do))这两行代码作用似乎没啥区别
    loop.close()

以上代码这样改

def read_many(links: list):
    loop = asyncio.get_event_loop()
    for id_, url in links:
        task = asyncio.ensure_future(read_one(id_, url))
        loop.run_until_complete(task)
    loop.close()

即可。

这样改完不再是并发而是顺序执行,正确的写法见文章开头链接的回调部分。

如果要用回调的话,比较麻烦,不少地方要修改,见下,主要是参数传递上要多多注意。
其实没有必要用回调,虽然拆开写似乎更规范,而且可以在需要请求其他页面时重用,但是受限很多。

import asyncio

import aiohttp
# pip install readability-lxml以安装
from readability import Document


def title_summary(fut):
    res = fut.result()  # 回调中调用result()才是上个函数的真实返回值
    if res:
        content, url = res
        doc = Document(content, url)
        print(doc.short_title(), doc.summary())


async def read_one(id_: int, url: str):
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(
                    url, headers=headers, timeout=1, verify_ssl=False) as r:
                await asyncio.sleep(1 + random())
                return await r.read(), await r.text(encoding=None, errors='ignore')
        except:
            pass


def read_many(links: list):
    loop = asyncio.get_event_loop()
    for id_, url in links:
        task = asyncio.ensure_future(read_one(id_, url))
        # 注意参数问题,这里不能传递多个参数,要么用functool的partial,要么干脆传递元组解包,也可以用lambda,官方比较推荐functool这里就不写了
        task.add_done_callback(title_summary)
        loop.run_until_complete(task)
    loop.close()


def main():
    links = [...]  # 要跑的所有链接列表
    read_many(links)


if __name__ == '__main__':
    main()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,924评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,781评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,813评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,264评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,273评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,383评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,800评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,482评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,673评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,497评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,545评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,240评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,802评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,866评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,101评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,673评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,245评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,573评论 18 139
  • 原文:http://www.jianshu.com/p/4e048726b613 引言 随着node.js的盛行,...
    jacke121阅读 2,065评论 1 3
  • 电影院 图书馆 故事长 票根短 借来的书 没读完 要么赖着不还 要么买新版替换 可电影院 的电影啊 即使加场 也会...
    段童阅读 429评论 0 2
  • 如果你会ios,那么对于cocoapods你一定不会陌生,cocoapods是一个方便管理你项目中的一些使用到的第...
    无名lxl阅读 271评论 0 1
  • 李存勖是后唐的建立者,在前半生(38年),他用热血与勇气打造了一个国家;后半生(称帝后3年),他用乐器和吝啬摧毁了...
    梁木纯阅读 478评论 6 4