最近项目中要实现消息推送功能,用推送消息的方式在商品下单的时候通知货源发布者,后端实现是 ==python== ,框架是 ==flask== 。
闲话滞后,先直接按步骤实现吧!
参考:
Celery and the Flask Application Factory: (Celery and the Flask Application Factory)
- 安装 ==Celery== 框架:
pip install celery
- 在项目的 ==app== 应用的 ==init== 中初始化 ==celery==:
from celery import Celery
from app.config import base
celery = Celery(__name__, broker=base.CELERY_BROKER_URL)
- ==celery==的配置,在 ==base== 里面:
# celery
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') or 'redis://:@127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') or 'redis://:@127.0.0.1:6379/0'
使用的是 ==redis== 的缓存。
- 由于 ==celery== 执行需要用到应用程序 ==app== 的上下文,所以上面我们的初始化会无法执行具体的操作,需要我们在启动 ==celery== 之后将 ==app.app_context().push()==。
import os
from app import celery, create_app
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
app.app_context().push()
- 启动 ==celery== :
celery worker -A celery_worker.celery --loglevel=info # 启动 celery -A celery应用 --loglevel=info 显示运行日
- 那么,==celery== 的具体工作是怎么执行的呢?不要着急,下面就来开始:
直接上代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import request, current_app
from app.myVerify import getargs
from app.common import success, error
from app import celery
import urllib
import httplib
import time
# 发送消息通知
@celery.task
def send_notifications(channel, content):
with current_app.app_context():
msg = {'appkey': current_app.config['COMMON_KEY'],
'channel': channel,
'content': content
}
requrl = current_app.config['REQURL']
test_data_urlencode = urllib.urlencode(msg)
header_data = {"Host": current_app.config['REQURL_HOST']}
conn = httplib.HTTPConnection(current_app.config['REQURL_HOST'])
conn.request(method="POST", url=requrl, body=test_data_urlencode, headers=header_data)
response = conn.getresponse()
res = response.read()
print ('发送推送消息成功')
先从上面步骤2中的初始化引入 ==celery==
然后在具体的方法中添加 ==@celery.task==