python爬虫之celery分布式任务(踩坑)

一. celery和RabbitMQ简单介绍

  • Celery是一个基于Python开发的分布式异步消息队列,可以轻松实现任务的异步处理。它的基本工作就是管理分配任务到不同的服务器,并且取得结果。
  • RabbitMQ是一个由Erlang语言开发的AMQP的开源实现。AMQP即Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制

二.分布式任务

  • 注:提前安装好RabbitMQ/redis
    安装RabbitMQ
  • 启动RabbitMQ:systemctl start rabbitmq-server
  • 重启RabbitMQ:systemctl restart rabbitmq-server
  • 关闭RabbitMQ:systemctl stop rabbitmq-server
  • 查看RabbitMQ状态:systemctl status rabbitmq-server
(1)项目结构
项目结构
  • celery_test.py:项目主程序,内容如下:
from __future__ import absolute_import
from celery import Celery
app = Celery(include=['tasks'])
app.config_from_object('celeryconfig')
if __name__ == '__main__':
    app.start()

  • (1) "from future import absolute_import"是拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
  • (2)app是Celery类的实例,创建的时候添加了tasks这个模块,也就是包含了tasks.py这个文件。
  • (3)把Celery配置存放进celeryconfig.py文件,使用app.config_from_object加载配置。
  • celeryconfig为配置文件内容去下:
# -*- coding: UTF-8 -*-
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from kombu import Queue, Exchange
from celery.schedules import crontab
BROKER_URL='amqp://guest:guest@localhost:5672//'
# CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了Redis
#默认celery与broker的连接池连接数
BROKER_POOL_LIMIT = 10

CELERY_ACKS_LATE = True
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = True
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 86400}
WORKER_MAX_MEMORY_PER_CHILD = 600
CELERYD_MAX_TASKS_PER_CHILD = 1
CELERY_TASK_SERIALIZER = 'json'
#CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = 'Asia/Shanghai'
TIME_ZONE = 'Asia/Shanghai'
# 配置队列
CELERY_QUEUES = {
        Queue('default', Exchange('default'),routing_key='default'),
        Queue('spider_001', Exchange('spider_001'), routing_key='spider_001'),
        Queue('spider_002', Exchange('spider_002'), routing_key='spider_002'),
        Queue('spider_003', Exchange('spider_003'), routing_key='spider_003'),
}
#队列路由
CELERY_ROUTES = {
    'tasks.daily_spider_001': {'queue': 'spider_001', 'routing_key': 'spider_001'},
    'tasks.daily_spider_002': {'queue': 'spider_002', 'routing_key': 'spider_002'},
    'tasks.daily_spider_003': {'queue': 'spider_003', 'routing_key': 'spider_003'}
}

# 调度任务/定时任务
CELERYBEAT_SCHEDULE = {
    'daily_spider_001': {
        'task': 'tasks.daily_spider_001',
        'schedule': timedelta(seconds=10), #每10秒执行一次
        # 'args': (16, 16)
    },
    'daily_spider_002': {
        'task': 'tasks.daily_spider_002',
        'schedule': timedelta(seconds=11), #每11秒执行一次
    },
    'daily_spider_003': {
        'task': 'tasks.daily_spider_003',
        'schedule': timedelta(seconds=12), #每11秒执行一次
    },
}
  • 在celeryconfig.py文件中,首先设置了Broker(RabbitMQ)的URL,接下来定义了三个Message Queue,并且指明了Queue对应的Exchange(当使用Redis作为Broker时,Exchange的名字必须和Queue的名字一样)以及routing_key的值。
    CELERY_QUEUES中的routing_key与CELERY_ROUTES中的routing_key是一一对应的关系 (),

  • 任务调度:Celery的Beat进程自动生成任务
    CELERYBEAT_SCHEDULE 为设置定时任务

  • tasks.py内容如下:定义三个不同功能的函数

from __future__ import absolute_import
from celery_test import app

@app.task
def daily_spider_001():
    return 1 + 2


@app.task
def daily_spider_002():
    return 2 + 2


@app.task
def daily_spider_003():
    return 3 + 2

程序启动:

  • 启动beat程序:
celery beat -A celery_test
beat启动效果图
  • 启动Worker进程:
celery -A celery_test worker -l info
任务启动
  • Beat和Worker进程同时启动命令
celery -B -A celery_test worker -l info
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335