最近项目中遇到了定时任务,django的框架,用到了celery,mysql作为数据库存储job数据,rabbit MQ作为消息队列,还有oslo.service等openstack组件,现总结下
- 首先在task_manager_api项目中,创建任务时,create_update_trigger是在数据库celery_period_task表中插入一条数据
- 然后在task_manager项目中scheduler/manager.py
通过添加装饰器@periodic_task.periodic_task产生一个周期任务,spacing表示周期运行任务时间间隔
schedule_task函数实现的是从数据库表中获取数据,然后调celery.send_task函数,发送任务
from oslo_service import periodic_task
@periodic_task.periodic_task(spacing=CONF.scheduler_task_interval)
def schedule_task():
jobs = objects.job.JobList.get_all_with_status(context,
[contants.PENDING,
contants.RUNNING])
for job in jobs:
self.schedule_job(context, job.uuid, job=job)
# self.schedule_job中是self.app.send_task
#send_task可以发送未被注册的异步任务,没有被celery.task装饰的任务
- 大概研究了一下装饰器periodic_task,在oslo_service/periodic_task.py中,
def periodic_task 主要作用是增加一个属性f._periodic_task = True,在_PeriodicTasksMeta元类中,会判断函数是否有这个属性,如果有的话,会加入到_periodic_tasks列表中
class _PeriodicTasksMeta(type):
def _add_periodic_task(cls, task):
....
cls._periodic_tasks.append((name, task))
return True
def __init__(cls, names, bases, dict_):
"""Metaclass that allows us to collect decorated periodic tasks."""
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
...
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
cls._add_periodic_task(value)
@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
....
try:
task(self, context)
- run_periodic_tasks函数是task_manager/service.py
Service类中的start函数中被触发
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
loopingcall是查看task是否执行完成