Celery 进阶学习

Celery 进阶学习

参考链接: Celery 4.1.0 documentation

初始文件

安装部署celery相关的pip包,参考文档或Celery 部署小记

另外,本文使用ipython作为控制台的交互式解释器,pip install ipython

tasks.py例1

from celery import Celery


class CeleryConfig():
    broker_url = 'redis://localhost'
    result_backend = 'redis://localhost'
    timezone = 'Asia/Shanghai'


app = Celery()
app.config_from_object(CeleryConfig)


@app.task
def add(a, b):
    return a + b

以上文件可以正常通过以下命令启动

celery -A tasks worker --loglevel=info

例1中使用类的方式来加载配置,其他方式有:

Configuration

  • app.conf.timezone = 'Asia/Shanghai'app.conf.update(option1=True, option2='xxx', ...)
  • app.config_from_object(param)方法,参数可以是模块名的字符串形式、模块对象实体、配置的类或对象等
  • app.config_from_envvar(param)方法,参数是系统的环境变量名,而这个变量对应的值是模块的字符串。如:os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

抽象任务(类)

所有task都必须使用@app.task装饰器来装饰,经过装饰器之后,这些任务会继承Task类。可以通过继承Task类,来创建一个抽象类,供task装饰

tasks.py例2

from celery import Celery


# 抽象tasks
from celery import Task
class DebugTask(Task):
    # 在调用之前打印一行字
    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)


class CeleryConfig():
    broker_url = 'redis://localhost'
    result_backend = 'redis://localhost'
    timezone = 'Asia/Shanghai'


app = Celery()
app.config_from_object(CeleryConfig)


@app.task(base=DebugTask)
def add(a, b):
    return a + b

ipython调试

In [1]: from tasks import add

In [2]: add.delay(2, 3)
Out[2]: <AsyncResult: d9e63190-0591-403d-a5be-8b59893fcb2d>

celery输出

[2017-11-20 15:52:05,660: INFO/MainProcess] Received task: tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d]  
[2017-11-20 15:52:05,662: WARNING/ForkPoolWorker-4] TASK STARTING: tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d]
[2017-11-20 15:52:05,666: INFO/ForkPoolWorker-4] Task tasks.add[d9e63190-0591-403d-a5be-8b59893fcb2d] succeeded in 0.00480927500029793s: 5

acks_late选项

task在经过worker确认(acknowledge)之后,才会从worker的任务队列中移除。并且worker维护的任务队列可以保留相当大量的队列信息,即使这个worker被杀掉,任务信息仍然可以转移到其他的worker

无限期阻塞的任务

由于网络传输等问题,导致任务无限期阻塞,会阻止此worker实例执行其他工作,解决方案是:

  • I/O任务:确保添加超时(可配合retry)。例如:使用requests
connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))

Prefork池预取设置

prefork池默认将异步发送尽可能多的任务到进程中(进程预取任务)。对于延时短的任务,这样会加快速度,但是如果是高延时的任务,该进程后面的任务会长期处于等待。

默认设置: worker会发送任务给缓冲区可写的进程,例子如下

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send task T3 to process A
# A still executing T1, T3 stuck in local buffer and won't start until
# T1 returns, and other queued tasks won't be sent to idle processes
<- T1 complete sent by process A
# A executes T3

使用-Ofair选项可以关闭预取设置,此时,worker会发送任务给真正可用于工作的进程,例子如下

-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send T3 to process B
# B executes T3

<- T3 complete sent by process B
<- T1 complete sent by process A

task options

@app.task(options...)

link

logging

worker会自动建立log,当然你也可以自定义log,例子如下

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

参数检测(typing)

>>> @app.task
... def add(x, y):
...     return x + y

# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

# typing 属性
>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

# Works locally, but the worker reciving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

隐藏敏感信息(避免进入log)

v4.0之后,且task_protocol为2或以上才有效(该值在4.0之后默认为2)

可以使用argsreprkwargsrepr调用参数来覆盖敏感信息,例子如下

>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')

>>> charge.s(account, card='1234 5678 1234 5678').set(
...     kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()

但实际上,只要可以从broker中读取数据,仍然可以获得这些“敏感信息”,所以如果需要高度保密的数据,要使用其他方法存储(加密等)

重试(Retrying)

当任务执行出现错误情况,可以通过设置retry来解决可恢复的错误。celeryretry机制会确保由相同的队列去执行此task-id的原始任务。简单的例子如下

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

特别注意,retryraise出来的,所以,即使后面有代码,也不会执行。另外,这个异常会被worker视为需要重试,以便在启用result backend时,可以存储正确的状态(RETRY)

重试的各种选项和应用:

以下是@app.task的参数

  • max_retries: 当超过该参数设置的重试次数,会终止并报错。默认是3次,设置为None表示不重试
  • default_retry_delay: 默认重试间隔时间。默认是180s,可以在retry调用中使用countdown参数覆盖
  • autoretry_for: 对特定的异常自动重试,如autoretry_for=(Exception,),仅适用于v4.0
  • retry_kwargs: 后面接字典类型,如retry_kwargs={'max_retries': 5}

以下三个选项是v4.1版本支持的

  • retry_backoff: 可以是布尔型或者数字,如果设置为True,则遵循“指数退避”,即第一次重试在1s后,第二次2s,第三次4s,第n次(2^(n-1))s。 如果设置为数字m,则表示基数为m,即第一次3s,第二次6s,第n次(3×2^(n-1))s。 如果设置为False,则表示不延迟
  • retry_backoff_max: 如果retry_backoff打开,这个选项决定了两次自动重试之间最大的延时,超过了这个延时,则不再重试。默认600s (这里理解可能有问题?需要具体测试: If retry_backoff is enabled, this option will set a maximum delay in seconds between task autoretries. By default, this option is set to 600, which is 10 minutes.)
  • retry_jitter: 布尔型,jitter(抖动)。抖动用于将随机性引入指数退避延迟,以防队列中过多的任务同时被执行。如果设置为True,则retry_backoff计算的延迟值将被视为最大值,而实际延迟值将是介于0和最大值之间的随机数。该设置默认为True

task方法选项列表

类似@app.task(option1=xx, option2=yy),括号内的参数即选项
http://docs.celeryproject.org/en/master/userguide/tasks.html#general

一些选项(部分参考文档即可):

  • max_retries: 只有在任务调用self.retry或者任务使用autoretry_for参数进行装饰时才适用
  • throws: 值是tuple类型,里面的“异常”不会被视为错误而导致失败或重发,即使发生,也是成功的
  • default_retry_delay: 重试间隔时间
  • rate_limit: 关联task_default_rate_limit这个设置,表示某个时间段执行的任务数目,如:"100/m"表示一分钟最多100条。默认无限制
  • time_limit / soft_time_limit: 任务完成的时间限制。前者是经过时限之后worker被杀死,然后用新worker代替,后者是经过时限之后抛出SoftTimeLimitExceeded异常,供开发者处理。默认没有时限。
  • ignore_result / store_errors_even_if_ignored
  • name
  • request
  • serializer
  • compression
  • backend
  • ack_late
  • track_started

状态

link

Handlers

在任务返回、失败、重试、成功、超时等事件发生的时候,触发特定的方法:after_return, on_failure, on_retry, on_success, on_timeout ...

可用于状态转移的监控,如发邮件提醒等

一个自定义请求的例子如下

import logging
from celery.worker.request import Request

logger = logging.getLogger('my.package')

class MyRequest(Request):
    'A minimal custom request to log failures and hard time limits.'

    def on_timeout(self, soft, timeout):
        super(MyRequest, self).on_timeout(soft, timeout)
        if not soft:
           logger.warning(
               'A hard timeout was enforced for task %s',
               self.task.name
           )

    def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
        super(Request, self).on_failure(
            exc_info,
            send_failed_event=send_failed_event,
            return_ok=return_ok
        )
        logger.warning(
            'Failure detected for task %s',
            self.task.name
        )

class MyTask(Task):
    Request = MyRequest  # you can use a FQN 'my.package:MyRequest'

@app.task(base=MyTask)
def some_longrunning_task():
    # use your imagination

最佳实践

Tips and Best Practices

Optimizing

  1. 忽略不需要的结果,ignore_result=True
  2. 尽量避免使用同步子任务(task调用需要依赖其他task执行的结果,这样会造成互相等待,陷入死锁) Avoid launching synchronous subtasks
  3. 设置broker_pool_limit,默认为10,可以根据使用连接的线程的数目调整 link
  4. worker_prefetch_multiplier表示一次prefetch多少条消息乘以并发进程数,默认值为4(每个进程4个消息)。对于长时间的任务,可以把这个值设置为1,其实就相当于关闭预取;对于短时任务,可以设置大一些,比如64,128等;对于长短不一的任务,可以通过Routing Tasks,即分队列的方式执行
  5. 针对长任务,许多人希望的是让当前执行的任务数与保留待确认的任务数目相同,且都等于当前并发数(如-c 10,此时在执行的任务是10个,等待的任务数也是10个)。满足这样的要求选项:task_acks_late = Trueworker_prefetch_multiplier = 1
  6. 在默认的prefork模式下,进程池中的进程可能处于空闲或忙碌的状态。-O是优化选项,如果是default,进程是预取来自worker中的任务的,可能造成长时间的等待;如果是fair,进程只在有空闲的时候,才会去取任务执行。设置fair对于耗时长的任务来说比较有利
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容