2 Celery Tasks

task是一个类,扮演了两方面的角色:任务被调用时来发送消息、工作进程在收到消息时操作。task的名称则是标志,这样工作进程可以找到对应的函数。

task只有在收到工作进程的ack后才会从队列中移除,如果工作进程被杀了,任务会被重新分配给其他工作进程。task任务不应该造成副作用,可多次进入。但是工作进程是无法分辨task任务是否为等幂的,所以默认的行为是在执行前先ack该消息,已经被启动的任务是不会被再次执行的(可能会造成任务丢失)。

如果确定任务是等幂的,可以设置acks_late选项。(会在任务执行之后再发送ack,所以如果工作进程在执行过程中崩溃了,该任务可能会被执行多次)。

注意:如果执行该任务的子线程被终止了,如调用sys.exit或被kill,还是会发ack,(可以设置task_reject_on_worker_lost在这种情况不发ack)

原因:

  1. We don’t want to rerun tasks that forces the kernel to send a SIGSEGV (segmentation fault) or similar signals to the process.

  2. 我们认为系统管理员既然手动把它杀掉了那么久不会想让他自动启动

  3. 分配了太多内存的任务可能会被系统杀掉,再次运行也是一样

  4. 一个总是失败的任务即使再启动还是会失败,并可造成循环

警告:如果一个任务永久阻塞,它可能会影响工作进程来做其他工作,所以如果进行io最好加上timeout,或者使用Time limits(时间到了会直接kill掉,最后的办法)。如果有的工作的确是需要很长时间,可以在启动celery时使用 -Ofair 选项。

@task() 可接受的参数

装饰器说明

如果使用了多个装饰器那么需要task装饰器在最后即在最上面,一般情况使用的是从celeryapp中引入的app作为的装饰器:app.task(),如果是django那种在app中定义的task则需要使用@shared_task

bind

@app.task(bind=True)
def add(self, x, y):  # self指向的是自身 task
    logger.info(self.request.id)

同时可以调用self.retry来重试

base

import celery

class MyTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@task(base=MyTask)
def add(x, y):
    raise KeyError()

name

不指定task名称则如前所述:模块名称.函数名称

@app.task(name='sum-of-two-numbers')
def add(x, y):
    return x + y

django中,如果在installed_apps中注册的名称是

INSTALLED_APPS = ['project.myapp']

那么就得这样引入:

from project.myapp.tasks import mytask

或者干脆在定义task时指定清楚:

@task(name='project.tasks.add')
def add(x, y):
return x + y

直接自定义task的查找:

from celery import Celery

class MyCelery(Celery):

    def gen_task_name(self, name, module):
        if module.endswith('.tasks'):
            module = module[:-6]
        return super(MyCelery, self).gen_task_name(name, module)

app = MyCelery('main')

Task Request

id: 任务id
group: 任务的群组
chord: 所属chord的id
correlation_id: 自定义的id可用于去重

args: 序列参数
kwargs: 命名参数

origin: 发送任务的主机
retries: 重试次数

is_eager: 设为true则在本地执行而不是使用工作进程执行

eta:
expires:

hostname: 执行该任务的工作进程所在的主机
delivery_info: 额外的分发信息,是包含该任务被分发时的exchange and routing,用于[app.Task.retry()]来重发该任务到队列中。具体的值取决于使用的消息队列。

reply-to: 发送回复到哪个队列 (用于 RPC 结果存储backend)

called_directly: 不是被工作进程执行时设置为true

timelimit: 任务的时间限制(soft, hard)

callbacks: 执行成功后要被调用的一系列函数(signature)
errback: 执行失败后要调用的一系列函数(signature)
utc: 如果启用了UTC则为true

headers: Mapping of message headers sent with this task message (may be None).

root_id: 工作流中的第一个任务的id
parent_id: 调用该任务的任务id

chain: Reversed list of tasks that form a chain (if any). The last item in this list will be the next task to succeed the current task. If using version one of the task protocol the chain tasks will be in request.callbacks instead.

logging

celery内置了 celery.task的logger,可以从其继承来使用其任务名称和任务id:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

Celery已经把标准输出和标准错误重定向到了logging 系统中,可以使用[worker_redirect_stdouts]来禁用重定向。

重定向标准io到指定的logger:

import sys

logger = get_task_logger(__name__)

@app.task(bind=True)
def add(self, x, y):
    old_outs = sys.stdout, sys.stderr
    rlevel = self.app.conf.worker_redirect_stdouts_level
    try:
        self.app.log.redirect_stdouts_to_logger(logger, rlevel)
        print('Adding {0} + {1}'.format(x, y))
        return x + y
    finally:
        sys.stdout, sys.stderr = old_outs

参数检查

@app.task(typing=False) 禁用检查

过滤敏感信息

 add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')

charge.s(account, card='1234 5678 1234 5678').set(
    kwargsrepr=repr({'card': '**** **** **** 5678'})
).delay()

这样会改变实参在log和监控events中的显示

Retrying

如果调用了retry它会发送一个新的消息,使用相同的任务id,会确保消息被送到和原来的任务相同的队列中。

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

在调用了retry后会引发Retry异常,这将终止其后的代码执行,可以在调用时附带throw=False不抛出异常,exc带的是异常信息。(任务可带有max_retries)

被重试的任务是有一个默认的延迟时间:3分钟,可以设置:

@app.task(bind=True, default_retry_delay=30 * 60) 

另外如果使用了countdown则会覆盖该选项。

也可以设置在遇到一些异常时自动重试:

@app.task(autoretry_for=(FailWhaleError,), retry_kwargs={'max_retries': 5})

State

状态

内置状态

PENDING 等待运行

STARTED(默认没有 通过[app.Task.track_started]启用)

SUCCESS

FAILURE

RETRY

REVOKED

自定义状态

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

结果保存

SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), Redis

RPC Result Backend (RabbitMQ/QPid)

实际上并不保存,只是作为消息发送,只能被获取一次,且只能是启动他的客户端来获得。一般适用于需要实时获得结果的情况。消息在消息队列重启后可能会丢失,可设置消息队列的结果持久(result_persistent)。

Database Result Backend

把结果保存在数据库中

  1. 存取速度慢
  2. 一些数据库使用的默认事务隔离级别不适合

MySQL的默认事务隔离级别是REPEATABLE-READ:当前的事务不会管其他事务的改变,直到当前事务提交后。应该切换到READ-COMMITTED

可序列化的异常

Python要求异常必须支持pickle的序列化操作,最简单的方法是调用Exception.init

# OK:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code
        Exception.__init__(self, status_code)  # <-- REQUIRED

所以任何的异常类有 *args的, 必须使用Exception.init(self, *args),但是并没有处理keyword arguments的方法,只能当成普通的位置参数传入。

[Semipredicates]

有一些异常可以用来通知工作进程来如何对待返回的task。

Ignore

任务可抛出Ignore来通知工作进程忽略该任务,这样就不会记录该任务的状态,但是还是发送了ack并从队列中移除了。这种情况你可能是想实现取消的功能或你想手动存储任务的结果。

from celery.exceptions import Ignore

@app.task(bind=True)
def some_task(self):
    if redis.ismember('tasks.revoked', self.request.id):
        raise Ignore()

from celery import states
from celery.exceptions import Ignore

准备手动记录结果

@app.task(bind=True)
def get_tweets(self, user):
    timeline = twitter.get_timeline(user)
    if not self.request.called_directly:
        self.update_state(state=states.SUCCESS, meta=timeline)
    raise Ignore()

Reject

这个异常需要配合acks_late选项来使用,同时只有一些中间人才实现了一些有用的东西:RabbitMQ支持[Dead Letter Exchanges],这样一个队列可以使用Dead Letter Exchanges来接收被拒收的消息。拒收可以用于任务的重新安排,但是注意这样可能导致消息的无限循环。

# 不进行重分发的例子
import errno
from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def render_scene(self, path):
    file = get_file(path)
    try:
        renderer.render_scene(file)
    except MemoryError as exc:
        raise Reject(exc, requeue=False)
    except OSError as exc:
        if exc.errno == errno.ENOMEM:
            raise Reject(exc, requeue=False)

    # For any other error we retry after 10 seconds.
    except Exception as exc:
        raise self.retry(exc, countdown=10)

# 进行重分发的例子
from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def requeues(self):
    if not self.request.delivery_info['redelivered']:
        raise Reject('no reason', requeue=True)
    print('received two times')

Retry

自定义task

首先,装饰器的工作:

@app.task
def add(x, y):
    return x + y

class _AddTask(app.Task):

    def run(self, x, y):
        return x + y
add = app.tasks[_AddTask.name]

同时,task不是在每次请求都实例化,而是在全局中注册,即init在一个进程中之后执行一次,可以保持状态。

from celery import Task

class NaiveAuthenticateServer(Task):

    def __init__(self):
        self.users = {'george': 'password'}

    def run(self, username, password):
        try:
            return self.users[username] == password
        except KeyError:
            return False

这样的,对于一个进程上的请求,它们之间是保存状态的users,另可用于简单的数据库连接池实现:

from celery import Task

class DatabaseTask(Task):
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db

@app.task(base=DatabaseTask)
def process_rows():
    for row in process_rows.db.table.all():
        process_row(row)

Handlers

after_return(self, status, retval, task_id, args, kwargs, einfo)
在任务返回后调用

Parameters:
status – Current task state.
retval – Task return value/exception.
task_id – Unique id of the task.
args – Original arguments for the task that returned.
kwargs – Original keyword arguments for the task that returned.

Keyword Arguments:

einfo – ExceptionInfo
instance, containing the traceback (if any).

函数返回值被忽略

on_failure (self, exc, task_id, args, kwargs, einfo)
在任务失败时调用

Parameters:
exc – The exception raised by the task.
task_id – Unique id of the failed task.
args – Original arguments for the task that failed.
kwargs – Original keyword arguments for the task that failed.

Keyword Arguments:

einfo – ExceptionInfo
instance, containing the traceback.

函数返回值被忽略

on_retry(self, exc, task_id, args, kwargs, einfo)
当任务被重试时调用

Parameters:
exc – The exception sent to retry
task_id – Unique id of the retried task.
args – Original arguments for the retried task.
kwargs – Original keyword arguments for the retried task.

Keyword Arguments:

einfo – ExceptionInfo
instance, containing the traceback.

函数返回值被忽略

on_success(self, retval, task_id, args, kwargs)
在任务执行成功时调用

Parameters:
retval – The return value of the task.
task_id – Unique id of the executed task.
args – Original arguments for the executed task.
kwargs – Original keyword arguments for the executed task.

函数返回值被忽略

怎么工作的

所有的任务都是在注册表中,可以使用:

from proj.celery import app
app.tasks

{'celery.chord_unlock':
    <@task: celery.chord_unlock>,
 'celery.backend_cleanup':
    <@task: celery.backend_cleanup>,
 'celery.chord':
    <@task: celery.chord>}

这样显示的是celery内置的,自定义的任务是在他们被引入时注册,默认的loader是要引入所有在imports设置中列出的。

[app.task()]装饰器则负责注册你的任务。

发送任务时,发送的只是个名字,工作进程收到后再从注册表中查找代码。

优化

如果不需要结果就指定ignore_result设置,可在全局和任务级别设置。

不要启动同步的子任务,使用工作流来处理。

一般把任务分成很多小任务要比使用长任务更好。

数据要和工作进程部署在一起。

任务来做保证。

对于django,不要把model作为参数传递,而是要在任务中重新抓取。

事务

from django.db import transaction

@transaction.commit_on_success
def create_article(request):
    article = Article.objects.create()
    expand_abbreviations.delay(article.pk)

这样会造成问题:因为是异步启动了task,但是可能task运行的时候这边还没提交呢。

@transaction.commit_manually
def create_article(request):
    try:
        article = Article.objects.create()
    except:
        transaction.rollback()
        raise
    else:
        transaction.commit()
        expand_abbreviations.delay(article.pk)

当然了,django现在是自动提交,不用管这个问题了。但是django可以使用ATOMIC_REQUESTS,这样又有该问题了,可以使用@transaction.non_atomic_requests装饰器,这样又变成自动提交了。

例子

from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response

from blog import tasks
from blog.models import Comment


class CommentForm(forms.ModelForm):

    class Meta:
        model = Comment


def add_comment(request, slug, template_name='comments/create.html'):
    post = get_object_or_404(Entry, slug=slug)
    remote_addr = request.META.get('REMOTE_ADDR')

    if request.method == 'post':
        form = CommentForm(request.POST, request.FILES)
        if form.is_valid():
            comment = form.save()
            # Check spam asynchronously.
            tasks.spam_filter.delay(comment_id=comment.id,
                                    remote_addr=remote_addr)
            return HttpResponseRedirect(post.get_absolute_url())
    else:
        form = CommentForm()

    context = RequestContext(request, {'form': form})
    return render_to_response(template_name, context_instance=context)


from celery import Celery
from akismet import Akismet

from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site

from blog.models import Comment


app = Celery(broker='amqp://')


@app.task
def spam_filter(comment_id, remote_addr=None):
    logger = spam_filter.get_logger()
    logger.info('Running spam filter for comment %s', comment_id)

    comment = Comment.objects.get(pk=comment_id)
    current_domain = Site.objects.get_current().domain
    akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
    if not akismet.verify_key():
        raise ImproperlyConfigured('Invalid AKISMET_KEY')


    is_spam = akismet.comment_check(user_ip=remote_addr,
                        comment_content=comment.comment,
                        comment_author=comment.name,
                        comment_author_email=comment.email_address)
    if is_spam:
        comment.is_spam = True
        comment.save()

    return is_spam
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容