造轮子很高大上,其实理解一个轮子也是一件麻烦事
往往想理解一个库,打开源代码,我们就会在层层的函数/方法调用栈中迷失...
大的项目比如tornado、flask的源码乍一下手难度太高>_<,不妨从一个小项目开始
因为业务需要,公司的爬虫项目需要搭建一个代理池,确保池中的代理必须是可用状态
面向博客编程后,发现很多代理池轮子
综合评比下来,选用了这个方案:https://yukunweb.com/2018/4/build-free-asynchronous-proxy-pool/
轮子不大,但涉及到的技术概念还蛮多的,该篇博客是代理池实现详情说明文档
整套程序模块化设计,分为以下部分:
- 获取ip部分
从网上的免费代理网站抓取代理,设计成可扩展模式,如果有新的免费代理网站,可以添加新的抓取规则,在模块中添加方法即可 - 数据库封装
项目使用mongo数据库,在pymongo上又封装了一层,这样可以根据对应代理项目中的curd需求做更好的定制 - 调度部分
整个程序的核心调度设计,代理入库、定时检测代理的有效性、池子中代理的数量等等、 - api
设计对外的api,可以方便的调用代理,其他机器也可以通过网络调用池中代理
程序结构:
ProxyPool \
Api \
__init__.py
api.py
Spider \
__init__.py
get_proxy.py
Db \
__init__.py
db.py
Schedule \
__init__.py
adder.py
tester.py
schedule.py
config.py
run.py
代理获取
作者选择的免费代理:幻代理,66代理,快代理,西刺代理
然后分析这些代理网站的网页结构,设计规则抓下来并保存
抓取部分代码:
import requests
from lxml import etree
from requests.exceptions import ConnectionError
def parse_url(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0'
}
try:
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
return resp.text
return None
except ConnectionError:
print('Error.')
return None
def proxy_xici():
url = 'http://www.xicidaili.com/'
resp = parse_url(url)
html = etree.HTML(resp)
ips = html.xpath('//*[@id="ip_list"]/tr/td[2]/text()')
ports = html.xpath('//*[@id="ip_list"]/tr/td[3]/text()')
for ip, port in zip(ips, ports):
proxy = ip + ':' + port
yield proxy
parse_url
是一个网页下载函数
proxy_xici
是西刺代理的抓取规则,用xpath解析并使用yield返回组装好的ip+port格式的代理,注意此函数是一个生成器,需要用遍历的方式获取返回内容
这些抓取ip的方法用类来实现,并且做成了可扩展模式
具体就是所有的提取ip的方法统一以proxy_
作方法名开头,并把这些方法保存在元类中(其实保存的是方法名的字符串格式数据),创建类的时候自动检查以proxy_
开头的方法,然后做一些处理
如果有新的免费代理网站,添加新的方法按照约定的规则命名即可
具体实现:
# Spider/get_proxy.py
class ProxyMetaclass(type):
"""
元类,在ProxyGetter类中加入
__CrawlFunc__和__CrawlFuncCount__两个属性
分别表示爬虫函数和爬虫函数的数量
"""
def __new__(cls, name, bases, attrs):
count = 0
attrs['__CrawlFunc__'] = []
for k in attrs.keys():
if k.startswith('proxy_'):
attrs['__CrawlFunc__'].append(k)
count += 1
attrs['__CrawlFuncCount__'] = count
return type.__new__(cls, name, bases, attrs)
class ProxyGetter(object, metaclass=ProxyMetaclass):
def proxy_ip66(self):
pass
def proxy_xici(self):
pass
def proxy_kuai(self):
pass
def proxy_ihuan(self):
pass
上篇文章刚刚介绍了元类,如果你认真理解的话,这里的代码就不难懂了
如果你理解的更透彻,甚至可以发现这个实现中有一个元类属性是不需要添加的,那就是后话了,下篇文章我们会对这个元类稍作修改使代码更简洁
从上面的代码看,作者在attr中添加了两个属性,一个__CrawlFuncCount__
是保存当前类中获取代理的方法数量
另一个__CrawlFunc__
是ip代理爬虫方法名的string列表,具体实现是约定如果类中属性以proxy_开头就表明是代理爬虫方法,保存到属性中
注意new方法中使用的是type返回:
return type.__new__(cls, name, bases, attrs)
现学现卖,根据我的上篇文章,其实这样写会更好:
return super(ProxyMetaclass, cls).__new__(cls, name, bases, attrs)
同时类中还提供了一个方法,通过爬虫方法名来调用方法获取代理
class ProxyGetter(object, metaclass=ProxyMetaclass):
...
def get_raw_proxies(self, callback):
proxies = []
for proxy in eval("self.{}()".format(callback)):
print('Getting', proxy, 'from', callback)
proxies.append(proxy)
return proxies
关于eval的语法,作者也有解释:
>>> m = 5
>>> n = 3
>>> eval('m') + eval('n')
8
数据库
持久化用的是mongo数据库,封装了一套简洁api
mongo是非关系型文档类型数据库,数据在库中的体现形式非常类似于json格式
在linux中安装好mongo,终端shell中输入mongo启动客户端
使用use db
命令指定库,这一点和mysql一致
但是查看库中某张表的命令是不同的:db.table.find()
> use proxy
switched to db proxy
> db.proxy.find()
{ "_id" : ObjectId("5b9f43666cb73e293b904801"), "proxy" : "112.195.206.72:4226", "num" : 1024, "http/s" : true }
{ "_id" : ObjectId("5b9f45956cb73e293b90480e"), "proxy" : "119.114.123.98:8946", "num" : 1028, "http/s" : false }
{ "_id" : ObjectId("5b9f45966cb73e293b904811"), "proxy" : "116.7.187.99:4223", "num" : 1029, "http/s" : true }
{ "_id" : ObjectId("5b9f49c76cb73e293b904820"), "proxy" : "182.113.33.83:4221", "num" : 1033, "http/s" : true }
{ "_id" : ObjectId("5b9f502c6cb73e293b904822"), "proxy" : "116.149.202.69:6436", "num" : 1034, "http/s" : false }
{ "_id" : ObjectId("5b9f502e6cb73e293b904825"), "proxy" : "182.105.201.64:4162", "num" : 1035, "http/s" : true }
{ "_id" : ObjectId("5b9f50406cb73e293b90482c"), "proxy" : "106.46.136.83:4237", "num" : 1036, "http/s" : false }
{ "_id" : ObjectId("5b9f50426cb73e293b90482f"), "proxy" : "112.83.93.63:2589", "num" : 1037, "http/s" : true }
>
表中的数据类似json格式的键值对,其中必须要有_id
字段,如果插入数据的时候未指定id,则mongo会自动插入id
python中和mongo交互,使用的是pymongo库,封装了对数据库的操作
作者在此之上又封装了一层:
# db.py
from pymongo import MongoClient, ASCENDING
# 我们把配置文件放入 config.py 文件内
from config import NAME, HOST, PORT
class MongodbClient(object):
def __init__(self, table=TABLE, host=HOST, port=PORT):
self.table = table
self.client = MongoClient(host, port)
self.db = self.client.NAME
@property
def get_counts(self):
"""获取表里的数据总数"""
return self.db[self.table].count()
def change_table(self, table):
"""切换数据库表"""
self.table = table
def set_num(self):
"""给每条数据设置唯一自增num"""
nums = []
datas = self.get_all()
if datas:
for data in datas:
nums.append(data['num'])
return max(nums)
return 0
def get_new(self):
"""获取最新一条数据"""
data = self.get_all()[-1] if self.get_all() else None
return data
def get_data(self, num):
"""获取指定num的数据"""
datas = self.get_all()
if datas:
data = [i for i in datas if i['num']==num][0]
return data
return None
def get_all(self):
"""获取全部数据"""
# 判断表里是否有数据
if self.get_counts != 0:
# 先排序
self.sort()
datas = [i for i in self.db[self.table].find()]
return datas
return None
def put(self, proxy):
"""
放置代理到数据库
"""
num = self.proxy_num() + 1
if self.db[self.table].find_one({'proxy': proxy}):
self.delete(proxy)
self.db[self.table].insert({'proxy': proxy, 'num': num})
else:
self.db[self.table].insert({'proxy': proxy, 'num': num})
def delete(self, data):
"""删除数据"""
self.db[self.table].remove(data)
def clear(self):
"""清空表"""
self.client.drop_database(self.table)
def sort(self):
"""按num键排序"""
self.db[self.table].find().sort('num', ASCENDING)
在init中初始化数据库、host、端口
put方法向库中插入数据
delete方法根据数据内容删除对应的数据
值得一提的是,插入的每条数据都有一个自增的字段num
这个自己是设置的,sort方法就是根据num
来排序
调度器
添加类:判断数据库中的代理数量是否达到最大阈值,进行启动爬虫和停止爬虫;
测试类:对爬取到的代理进行检测,将有效的代理放入数据库;
任务类:定时对数据库中的代理进行检测,用于启动整个调度器。
代理测试:
代理测试的网址可以选取一个页面很小网址,这样可以占用很小的流浪,响应的速度也会有略微提升
作者提出如果用百度作为测试网址,会出现不管代理是否成功,都会有响应200的情况,我测了一下,的确是
作者选择http://2017.ip138.com/ic.asp这个测试ip的站点作为检测站。是否好用未知,我用的http检测站点是:http://mini.eastday.com/assets/v1/js/search_word.js
作者用的是aiohttp这个异步请求库,是一个基于协程异步实现的库,推荐在python3.5以上环境使用
# Schedule/tester.py
import asyncio
import aiohttp
from Db.db import MongodbClient
# 将配置信息放入配置文件config.py
from config import TEST_URL
class ProxyTester(object):
test_url = TEST_URL
def __init__(self):
self._raw_proxies = None
def set_raw_proxies(self, proxies):
# 供外部添加需要测试的代理
self._raw_proxies = proxies
self._conn = MongodbClient()
async def test_single_proxy(self, proxy):
"""
测试一个代理,如果有效,将他放入usable-proxies
"""
try:
async with aiohttp.ClientSession() as session:
try:
if isinstance(proxy, bytes):
proxy = proxy.decode('utf-8')
real_proxy = 'http://' + proxy
print('Testing', proxy)
async with session.get(self.test_url, proxy=real_proxy, timeout=10) as response:
if response.status == 200:
# 请求成功,放入数据库
self._conn.put(proxy)
print('Valid proxy', proxy)
except Exception as e:
print(e)
except Exception as e:
print(e)
def test(self):
"""
异步测试所有代理
"""
print('Tester is working...')
try:
loop = asyncio.get_event_loop()
tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies]
loop.run_until_complete(asyncio.wait(tasks))
except ValueError:
print('Async Error')
关于aiohttp
的用法,大家可以参考这篇文章。也可以直接参考中文文档
使用aiohttp的好处是,当爬虫发出请求等待网路io的时候不会一直阻塞等待,而可以继续其他的任务,比如说请求一百个网页,假设每个网页io耗时90毫秒,如果使用吧requets阻塞请求的话,耗时大概是:100*90毫秒
如果使用aiohttp,总耗时大概是90毫秒多一点
可以发现任务越多的话,耗时的差距也非常大
代理测的入口是test方法:
创建事件循环,用列表生成器产生任务tasks,异步任务注册到事件循环中
test_single_proxy是测试单个代理的方法,注意方法前的async 关键字
async 、await关键字只能在python3.5以上的环境中使用,让我们可以像写同步任务一样去写异步任务,有了async 关键字以后,方法/函数就不是普通方法/函数了,需要注册到事件循环loop = asyncio.get_event_loop()
里执行
添加代理
在循环中实现,先检测库中代理数量,如果低于设定的阈值,就调用获取代理类中的方法获取代理检测并入库,如果达到阈值则break循环
在获取代理类中通过遍历CrawlFunc列表拿到获取代理的方法,调用类中的get_raw_proxies拿到代理,然后通过测试类中的set_raw_proxies设置好要测试的代理,在事件循环注册测试入库
# Schedule/adder.py
from Db.db import MongodbClient
from Spider.get_proxy import ProxyGetter
from .tester import ProxyTester
class PoolAdder(object):
"""
启动爬虫,添加代理到数据库中
"""
def __init__(self, threshold):
self._threshold = threshold
self._conn = MongodbClient()
self._tester = ProxyTester()
self._crawler = ProxyGetter()
def is_over_threshold(self):
"""
判断数据库中代理数量是否达到设定阈值
"""
return True if self._conn.get_nums >= self._threshold else False
def add_to_pool(self):
"""
补充代理
"""
print('PoolAdder is working...')
proxy_count = 0
while not self.is_over_threshold():
# 迭代所有的爬虫,元类给ProxyGetter的两个方法
# __CrawlFuncCount__是爬虫数量,__CrawlFunc__是爬虫方法
for callback_label in range(self._crawler.__CrawlFuncCount__):
callback = self._crawler.__CrawlFunc__[callback_label]
# 调用ProxyGetter()方法进行抓取代理
raw_proxies = self._crawler.get_raw_proxies(callback)
# 调用方法测试爬取到的代理
self._tester.set_raw_proxies(raw_proxies)
self._tester.test()
proxy_count += len(raw_proxies)
if self.is_over_threshold():
print('Proxy is enough, waiting to be used...')
break
if proxy_count == 0:
print('The proxy source is exhausted.')
调度器定时实现
调度器中两个定时任务,用进程来实现(协程配合进程比较简单些),一个是定时取部分代理调用测试类检测,一个是定时检测数据库中的代理是否低于最低阈值,调用添加类添加。
# Schedulr/schedule.py
import time
from multiprocessing import Process
from ProxyPool.db import MongodbClient
from .tester import ProxyTester
from .adder import PoolAdder
from config import VALID_CHECK_CYCLE, POOL_LEN_CHECK_CYCLE \
POOL_LOWER_THRESHOLD, POOL_UPPER_THRESHOLD
class Schedule(object):
@staticmethod
def valid_proxy(cycle=VALID_CHECK_CYCLE):
"""
从数据库中拿到一半代理进行检查
"""
conn = MongodbClient()
tester = ProxyTester()
while True:
print('Refreshing ip...')
# 调用数据库,从左边开始拿到一半代理
count = int(0.5 * conn.get_nums)
if count == 0:
print('Waiting for adding...')
time.sleep(cycle)
continue
raw_proxies = conn.get(count)
tester.set_raw_proxies(raw_proxies)
tester.test()
time.sleep(cycle)
@staticmethod
def check_pool(lower_threshold=POOL_LOWER_THRESHOLD,
upper_threshold=POOL_UPPER_THRESHOLD,
cycle=POOL_LEN_CHECK_CYCLE):
"""
如果代理数量少于最低阈值,添加代理
"""
conn = MongodbClient()
adder = PoolAdder(upper_threshold)
while True:
if conn.get_nums < lower_threshold:
adder.add_to_pool()
time.sleep(cycle)
def run(self):
print('Ip Processing running...')
valid_process = Process(target=Schedule.valid_proxy)
check_process = Process(target=Schedule.check_pool)
valid_process.start()
check_process.start()
阈值和两个调度周期在config中:
# config.py
# Pool 的低阈值和高阈值
POOL_LOWER_THRESHOLD = 10
POOL_UPPER_THRESHOLD = 40
# 两个调度进程的周期
VALID_CHECK_CYCLE = 600
POOL_LEN_CHECK_CYCLE = 20
valid_proxy周期的从数据库中左侧(为什么是左侧,详情查看db.py源码,在db中实现了)获取一半的代理然后检测
check_pool方法周期的检测数据库中的代理数量,少于阈值则调用add_to_pool方法补充
API
接口部分用flask实现,比较简单,就不列举出来了
以下是程序入口文件:
# run.py
from Api.api import app
from Schedule.schedule import Schedule
def main():
# 任务类的两个周期进程就是整个调度器
s = Schedule()
s.run()
app.run()
if __name__ == '__main__':
main()
项目地址
作者的项目github
感兴趣的童鞋赶紧下载下来好好调试一下吧