class ResultWorker(object):
"""
do with result
override this if needed.
"""
def __init__(self, resultdb, inqueue):
self.resultdb = resultdb
self.inqueue = inqueue
self._quit = False
def on_result(self, task, result):
'''Called every result'''
if not result:
return
if 'taskid' in task and 'project' in task and 'url' in task:
logger.info('result %s:%s %s -> %.30r' % (
task['project'], task['taskid'], task['url'], result))
return self.resultdb.save(
project=task['project'],
taskid=task['taskid'],
url=task['url'],
result=result
)
else:
logger.warning('result UNKNOW -> %.30r' % result)
return
def quit(self):
self._quit = True
def run(self):
'''Run loop'''
logger.info("result_worker starting...")
while not self._quit:
try:
task, result = self.inqueue.get(timeout=1)
self.on_result(task, result)
except Queue.Empty as e:
continue
except KeyboardInterrupt:
break
except AssertionError as e:
logger.error(e)
continue
except Exception as e:
logger.exception(e)
continue
logger.info("result_worker exiting...")
调用
@cli.command()
@click.option('--result-cls', default='pyspider.result.ResultWorker', callback=load_cls,
help='ResultWorker class to be used.')
@click.pass_context
def result_worker(ctx, result_cls, get_object=False):
"""
Run result worker.
"""
g = ctx.obj
ResultWorker = load_cls(None, None, result_cls)
result_worker = ResultWorker(resultdb=g.resultdb, inqueue=g.processor2result)
g.instances.append(result_worker)
if g.get('testing_mode') or get_object:
return result_worker
result_worker.run()
这里调用了ResultWorker的run方法。
run
其实run方法就做了一件事
task, result = self.inqueue.get(timeout=1)
self.on_result(task, result)
run方法从队列中获得task和result,并且调用on_result()方法。
task的具体数据
{'type': 1, 'group': None, 'project_updatetime': 1507644612.9016058, 'project': 'baidu', 'process': {'callback': 'detail_page'}, 'lastcrawltime': None, 'taskid': '7e243bf3e4e3950fbc6eb09076c18fe0', 'schedule': {'priority': 2}, 'project_md5sum': '9ddb79b9257976f959bf6a64cb3918b8', 'url': 'http://e.baidu.com/?refer=888', 'fetch': {}, 'track': {}, 'status': 1}
拿出一些参数看一下
- project,process,url, project_md5sum, lastcrawltime, taskid,这些就是字面意思
- {'callback': 'detail_page'} 其实每次callback都是detail_page, 也就是说, result模块取出来的数据都是最后采集到的结果, 这个应该是调度器实现的。
result
{'url': 'http://e.baidu.com/?refer=888', 'title': '百度推广官方网站 | 让有需求的客户找到你'}
爬虫代理return的结果,比如我的测试例子
@config(priority=2)
def detail_page(self, response):
return {
"url": response.url,
"title": response.doc('title').text(),
}
on_result
return self.resultdb.save(
project=task['project'],
taskid=task['taskid'],
url=task['url'],
result=result
)
存入数据库。 数据库连接模块不再关注的范围内,具体实现不想看其实可以