如果使用RabbitMQ,可以安装:
pip install librabbitmq
using the pyamqp:// or librabbitmq:// prefixes.
默认会启用连接池
默认会把任务保存到磁盘来确保会被执行,如果有一些任务可以丢失则:
from kombu import Exchange, Queue
task_queues = (
Queue('celery', routing_key='celery'),
Queue('transient', Exchange('transient', delivery_mode=1),
routing_key='transient', durable=False),
)
或
task_routes = {
'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'}
}
把任务发到该队列
task.apply_async(args, queue='transient')
prefetch
通过 [worker_prefetch_multiplier] * 并发限制 设置它来设置一个工作进程可以攒的任务数量(已经ack)。
如果处理的长任务多则把该值调小,如果处理的是短任务可调大,长短不一则最好分开他们,路由到不同的队列中去。
一次一个任务
如果启用了late ack则可以使用 -c 来设置可以攒的没有ack的任务
task_acks_late = True
worker_prefetch_multiplier = 1
公平分发
celery -A proj worker -l info -Ofair