task是一个类,扮演了两方面的角色:任务被调用时来发送消息、工作进程在收到消息时操作。task的名称则是标志,这样工作进程可以找到对应的函数。
task只有在收到工作进程的ack后才会从队列中移除,如果工作进程被杀了,任务会被重新分配给其他工作进程。task任务不应该造成副作用,可多次进入。但是工作进程是无法分辨task任务是否为等幂的,所以默认的行为是在执行前先ack该消息,已经被启动的任务是不会被再次执行的(可能会造成任务丢失)。
如果确定任务是等幂的,可以设置acks_late选项。(会在任务执行之后再发送ack,所以如果工作进程在执行过程中崩溃了,该任务可能会被执行多次)。
注意:如果执行该任务的子线程被终止了,如调用sys.exit或被kill,还是会发ack,(可以设置task_reject_on_worker_lost在这种情况不发ack)
原因:
We don’t want to rerun tasks that forces the kernel to send a SIGSEGV (segmentation fault) or similar signals to the process.
我们认为系统管理员既然手动把它杀掉了那么久不会想让他自动启动
分配了太多内存的任务可能会被系统杀掉,再次运行也是一样
一个总是失败的任务即使再启动还是会失败,并可造成循环
警告:如果一个任务永久阻塞,它可能会影响工作进程来做其他工作,所以如果进行io最好加上timeout,或者使用Time limits(时间到了会直接kill掉,最后的办法)。如果有的工作的确是需要很长时间,可以在启动celery时使用 -Ofair 选项。
@task() 可接受的参数
装饰器说明
如果使用了多个装饰器那么需要task装饰器在最后即在最上面,一般情况使用的是从celeryapp中引入的app作为的装饰器:app.task(),如果是django那种在app中定义的task则需要使用@shared_task
bind
@app.task(bind=True)
def add(self, x, y): # self指向的是自身 task
logger.info(self.request.id)
同时可以调用self.retry来重试
base
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@task(base=MyTask)
def add(x, y):
raise KeyError()
name
不指定task名称则如前所述:模块名称.函数名称
@app.task(name='sum-of-two-numbers')
def add(x, y):
return x + y
django中,如果在installed_apps中注册的名称是
INSTALLED_APPS = ['project.myapp']
那么就得这样引入:
from project.myapp.tasks import mytask
或者干脆在定义task时指定清楚:
@task(name='project.tasks.add')
def add(x, y):
return x + y
直接自定义task的查找:
from celery import Celery
class MyCelery(Celery):
def gen_task_name(self, name, module):
if module.endswith('.tasks'):
module = module[:-6]
return super(MyCelery, self).gen_task_name(name, module)
app = MyCelery('main')
Task Request
id: 任务id
group: 任务的群组
chord: 所属chord的id
correlation_id: 自定义的id可用于去重
args: 序列参数
kwargs: 命名参数
origin: 发送任务的主机
retries: 重试次数
is_eager: 设为true则在本地执行而不是使用工作进程执行
eta:
expires:
hostname: 执行该任务的工作进程所在的主机
delivery_info: 额外的分发信息,是包含该任务被分发时的exchange and routing,用于[app.Task.retry()]来重发该任务到队列中。具体的值取决于使用的消息队列。
reply-to: 发送回复到哪个队列 (用于 RPC 结果存储backend)
called_directly: 不是被工作进程执行时设置为true
timelimit: 任务的时间限制(soft, hard)
callbacks: 执行成功后要被调用的一系列函数(signature)
errback: 执行失败后要调用的一系列函数(signature)
utc: 如果启用了UTC则为true
headers: Mapping of message headers sent with this task message (may be None).
root_id: 工作流中的第一个任务的id
parent_id: 调用该任务的任务id
chain: Reversed list of tasks that form a chain (if any). The last item in this list will be the next task to succeed the current task. If using version one of the task protocol the chain tasks will be in request.callbacks instead.
logging
celery内置了 celery.task的logger,可以从其继承来使用其任务名称和任务id:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
Celery已经把标准输出和标准错误重定向到了logging 系统中,可以使用[worker_redirect_stdouts]来禁用重定向。
重定向标准io到指定的logger:
import sys
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
old_outs = sys.stdout, sys.stderr
rlevel = self.app.conf.worker_redirect_stdouts_level
try:
self.app.log.redirect_stdouts_to_logger(logger, rlevel)
print('Adding {0} + {1}'.format(x, y))
return x + y
finally:
sys.stdout, sys.stderr = old_outs
参数检查
@app.task(typing=False) 禁用检查
过滤敏感信息
add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
charge.s(account, card='1234 5678 1234 5678').set(
kwargsrepr=repr({'card': '**** **** **** 5678'})
).delay()
这样会改变实参在log和监控events中的显示
Retrying
如果调用了retry它会发送一个新的消息,使用相同的任务id,会确保消息被送到和原来的任务相同的队列中。
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
在调用了retry后会引发Retry异常,这将终止其后的代码执行,可以在调用时附带throw=False不抛出异常,exc带的是异常信息。(任务可带有max_retries)
被重试的任务是有一个默认的延迟时间:3分钟,可以设置:
@app.task(bind=True, default_retry_delay=30 * 60)
另外如果使用了countdown则会覆盖该选项。
也可以设置在遇到一些异常时自动重试:
@app.task(autoretry_for=(FailWhaleError,), retry_kwargs={'max_retries': 5})
State
状态
内置状态
PENDING 等待运行
STARTED(默认没有 通过[app.Task.track_started]启用)
SUCCESS
FAILURE
RETRY
REVOKED
自定义状态
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
结果保存
SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), Redis
RPC Result Backend (RabbitMQ/QPid)
实际上并不保存,只是作为消息发送,只能被获取一次,且只能是启动他的客户端来获得。一般适用于需要实时获得结果的情况。消息在消息队列重启后可能会丢失,可设置消息队列的结果持久(result_persistent)。
Database Result Backend
把结果保存在数据库中
- 存取速度慢
- 一些数据库使用的默认事务隔离级别不适合
MySQL的默认事务隔离级别是REPEATABLE-READ:当前的事务不会管其他事务的改变,直到当前事务提交后。应该切换到READ-COMMITTED
可序列化的异常
Python要求异常必须支持pickle的序列化操作,最简单的方法是调用Exception.init:
# OK:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
Exception.__init__(self, status_code) # <-- REQUIRED
所以任何的异常类有 *args的, 必须使用Exception.init(self, *args),但是并没有处理keyword arguments的方法,只能当成普通的位置参数传入。
[Semipredicates]
有一些异常可以用来通知工作进程来如何对待返回的task。
Ignore
任务可抛出Ignore来通知工作进程忽略该任务,这样就不会记录该任务的状态,但是还是发送了ack并从队列中移除了。这种情况你可能是想实现取消的功能或你想手动存储任务的结果。
from celery.exceptions import Ignore
@app.task(bind=True)
def some_task(self):
if redis.ismember('tasks.revoked', self.request.id):
raise Ignore()
from celery import states
from celery.exceptions import Ignore
准备手动记录结果
@app.task(bind=True)
def get_tweets(self, user):
timeline = twitter.get_timeline(user)
if not self.request.called_directly:
self.update_state(state=states.SUCCESS, meta=timeline)
raise Ignore()
Reject
这个异常需要配合acks_late选项来使用,同时只有一些中间人才实现了一些有用的东西:RabbitMQ支持[Dead Letter Exchanges],这样一个队列可以使用Dead Letter Exchanges来接收被拒收的消息。拒收可以用于任务的重新安排,但是注意这样可能导致消息的无限循环。
# 不进行重分发的例子
import errno
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def render_scene(self, path):
file = get_file(path)
try:
renderer.render_scene(file)
except MemoryError as exc:
raise Reject(exc, requeue=False)
except OSError as exc:
if exc.errno == errno.ENOMEM:
raise Reject(exc, requeue=False)
# For any other error we retry after 10 seconds.
except Exception as exc:
raise self.retry(exc, countdown=10)
# 进行重分发的例子
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def requeues(self):
if not self.request.delivery_info['redelivered']:
raise Reject('no reason', requeue=True)
print('received two times')
Retry
自定义task
首先,装饰器的工作:
@app.task
def add(x, y):
return x + y
class _AddTask(app.Task):
def run(self, x, y):
return x + y
add = app.tasks[_AddTask.name]
同时,task不是在每次请求都实例化,而是在全局中注册,即init在一个进程中之后执行一次,可以保持状态。
from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
这样的,对于一个进程上的请求,它们之间是保存状态的users,另可用于简单的数据库连接池实现:
from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
@app.task(base=DatabaseTask)
def process_rows():
for row in process_rows.db.table.all():
process_row(row)
Handlers
after_return(self, status, retval, task_id, args, kwargs, einfo)
在任务返回后调用
Parameters:
status – Current task state.
retval – Task return value/exception.
task_id – Unique id of the task.
args – Original arguments for the task that returned.
kwargs – Original keyword arguments for the task that returned.
Keyword Arguments:
einfo – ExceptionInfo
instance, containing the traceback (if any).
函数返回值被忽略
on_failure (self, exc, task_id, args, kwargs, einfo)
在任务失败时调用
Parameters:
exc – The exception raised by the task.
task_id – Unique id of the failed task.
args – Original arguments for the task that failed.
kwargs – Original keyword arguments for the task that failed.
Keyword Arguments:
einfo – ExceptionInfo
instance, containing the traceback.
函数返回值被忽略
on_retry(self, exc, task_id, args, kwargs, einfo)
当任务被重试时调用
Parameters:
exc – The exception sent to retry
task_id – Unique id of the retried task.
args – Original arguments for the retried task.
kwargs – Original keyword arguments for the retried task.
Keyword Arguments:
einfo – ExceptionInfo
instance, containing the traceback.
函数返回值被忽略
on_success(self, retval, task_id, args, kwargs)
在任务执行成功时调用
Parameters:
retval – The return value of the task.
task_id – Unique id of the executed task.
args – Original arguments for the executed task.
kwargs – Original keyword arguments for the executed task.
函数返回值被忽略
怎么工作的
所有的任务都是在注册表中,可以使用:
from proj.celery import app
app.tasks
{'celery.chord_unlock':
<@task: celery.chord_unlock>,
'celery.backend_cleanup':
<@task: celery.backend_cleanup>,
'celery.chord':
<@task: celery.chord>}
这样显示的是celery内置的,自定义的任务是在他们被引入时注册,默认的loader是要引入所有在imports设置中列出的。
[app.task()]装饰器则负责注册你的任务。
发送任务时,发送的只是个名字,工作进程收到后再从注册表中查找代码。
优化
如果不需要结果就指定ignore_result设置,可在全局和任务级别设置。
不要启动同步的子任务,使用工作流来处理。
一般把任务分成很多小任务要比使用长任务更好。
数据要和工作进程部署在一起。
任务来做保证。
对于django,不要把model作为参数传递,而是要在任务中重新抓取。
事务
from django.db import transaction
@transaction.commit_on_success
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
这样会造成问题:因为是异步启动了task,但是可能task运行的时候这边还没提交呢。
@transaction.commit_manually
def create_article(request):
try:
article = Article.objects.create()
except:
transaction.rollback()
raise
else:
transaction.commit()
expand_abbreviations.delay(article.pk)
当然了,django现在是自动提交,不用管这个问题了。但是django可以使用ATOMIC_REQUESTS,这样又有该问题了,可以使用@transaction.non_atomic_requests装饰器,这样又变成自动提交了。
例子
from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response
from blog import tasks
from blog.models import Comment
class CommentForm(forms.ModelForm):
class Meta:
model = Comment
def add_comment(request, slug, template_name='comments/create.html'):
post = get_object_or_404(Entry, slug=slug)
remote_addr = request.META.get('REMOTE_ADDR')
if request.method == 'post':
form = CommentForm(request.POST, request.FILES)
if form.is_valid():
comment = form.save()
# Check spam asynchronously.
tasks.spam_filter.delay(comment_id=comment.id,
remote_addr=remote_addr)
return HttpResponseRedirect(post.get_absolute_url())
else:
form = CommentForm()
context = RequestContext(request, {'form': form})
return render_to_response(template_name, context_instance=context)
from celery import Celery
from akismet import Akismet
from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site
from blog.models import Comment
app = Celery(broker='amqp://')
@app.task
def spam_filter(comment_id, remote_addr=None):
logger = spam_filter.get_logger()
logger.info('Running spam filter for comment %s', comment_id)
comment = Comment.objects.get(pk=comment_id)
current_domain = Site.objects.get_current().domain
akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
if not akismet.verify_key():
raise ImproperlyConfigured('Invalid AKISMET_KEY')
is_spam = akismet.comment_check(user_ip=remote_addr,
comment_content=comment.comment,
comment_author=comment.name,
comment_author_email=comment.email_address)
if is_spam:
comment.is_spam = True
comment.save()
return is_spam