一、什么是Celery?
Celery 是一个异步任务队列。你可以使用它在你的应用上下文之外执行任务。总的想法就是你的应用程序可能需要执行任何消耗资源的任务都可以交给任务队列,让你的应用程序自由和快速地响应客户端请求。
使用 Celery 运行后台任务并不像在线程中这样做那么简单。但是好处多多,Celery 具有分布式架构,使你的应用易于扩展。一个 Celery 安装有三个核心组件:
- Celery 客户端: 用于发布后台作业。当与 Flask 一起工作的时候,客户端与 Flask 应用一起运行。
- Celery workers: 这些是运行后台作业的进程。Celery 支持本地和远程的 workers,因此你就可以在 Flask 服务器上启动一个单独的 worker,随后随着你的应用需求的增加而新增更多的 workers。
- 消息代理: 客户端通过消息队列和 workers 进行通信,Celery 支持多种方式来实现这些队列。最常用的代理就是 RabbitMQ 和 Redis。
二、简单应用
通过一个简单例子,了解Flask的定时任务,项目名称:celery_demo,目录结构如下:
|---app
| |---__init__.py
| |---celery_manage
| | | ---- __init__.py
| | | ---- views.py
|---config.py
|---celery_worker.py
1. 安装依赖包
pip install flask-celery-helper
另:Flask官网上的flask-celery包可能因为长期未维护的原因,在应用中与python的celery包存在兼容问题,所以选择了这个依赖包。
2. 编辑配置文件,config.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
from datetime import timedelta
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'hard to guess string'
# 定时任务配置
CELERY_BROKER_URL = 'redis://localhost:6379',
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
# CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
# 定义任务名称:import_data
# 执行规则:每10秒运行一次
'import_data': {
'task': 'import_data',
'schedule': timedelta(seconds=10)
},
}
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-test.sqlite')
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data.sqlite')
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
3. 创建celery管理的蓝本,app/celery_manage/__init__.py
from flask import Blueprint
celery_manage = Blueprint('celery_manage', __name__)
from . import views
4. 初始化celery并注册蓝本到工厂函数,app/__init__.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from flask import Flask
from config import config
from flask_celery import Celery
# 创建Celery实例
celery = Celery()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
celery.init_app(app)
#注册celery管理蓝本
from .celery_manage import celery_manage as celery_manage_blueprint
app.register_blueprint(celery_manage_blueprint)
return app
5. 添加定时任务,app/celery_manage/views.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from app import celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
# 定时导入
@celery.task(name="import_data")
def import_data():
print "定时任务:每10秒执行一次"
# 记录日志
logger.info(u"导入成功")
注:任务名称为配置文件config.py中定义的名称,即“import_data”
6. 编辑启动应用的接口文件,celery_worker.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from app import create_app, celery
import os
application = create_app(os.getenv('FLASK_CONFIG') or 'default')
application.app_context().push()
if __name__ == '__main__':
application.run()
7. 运行celery,在终端输入:
celery worker -l INFO -c 100 -A celery_worker.celery --beat
说明:
- 参数-l INFO可以详细输出任务信息。
- celery默认会开启4个线程来处理任务,参数-c可以开启更多任务线程。
- 参数--beat:执行定时任务时, Celery会通过celerybeat进程来完成。Celerybeat会保持运行, 一旦到了某一定时任务需要执行时, Celerybeat便将其加入到queue中,适用于周期性任务。