手机上形形色色的app会给我们推送各种消息,那么一条消息的推送是如何实现的呢?下面让我从某个app的python后端开发的角度来解析一下。
一、背景
- 推送的消息包括两大类:运营人员手动编辑、推送的公告、活动等,与用户行为(比如交易)相关的通知,这部分的消息是在代码执行过程中自动生成、推送。
- 涉及到三个服务器:A是生产服务器,部署Django,负责与app交互,采用前后端分离开发,能拿到用户数据;B是公共接口服务器,部署的Flask,拿不到用户数据,但储存消息数据;C是提供给运营人员使用的服务器,部署Flask,通过指令访问B的数据库。
- 在具体的消息推送步骤上采用第三方的小米推送服务
二、 小米推送的实现
小米推送服务的具体文档可以自行查阅
- 它定义了一套完整的消息格式,同时也支持“透传”模式,这就方便开发者自己约定消息的内容,我们这次就采用透传;
- 它同时支持渠道订阅和指定alias两种推送方式,前者一对多、后者一对一,两者的接口不一样;
- 在开发前需要先注册app的包名,拿到secret存储起来,执行推送时需要使用与包名对应的secret,否则不会成功;
- IOS与Android的参数是不一样;
假设包名为com.theapp.ios、com.theapp.android,topic表示订阅主题,msg表示消息内容,username用来生成alias(别名),secret配置在字典MIPUSH_SECRETS中,则主要的实现代码如下
import hashlib
import json
import requests
def mipush(pkg, msg, username=None, topic=None):
secret = MIPUSH_SECRETS[pkg]
if 'ios' in pkg:
data = {
'extra.content-available': 1,
'extra.payload': json.dumps(msg),
'time_to_live': 3600 * 1000,
}
else:
data = {
'pass_through': 1,
'payload': json.dumps(msg),
'time_to_live': 3600 * 1000,
}
if username:
api_url = "https://api.xmpush.xiaomi.com/v2/message/alias"
user_code = hashlib.md5(username.encode('utf-8')).hexdigest()
data['alias'] = user_code
else:
api_url = "https://api.xmpush.xiaomi.com/v2/message/topic"
data['topic'] = topic
resp = requests.post(api_url, headers={"Authorization": "key=%s" % secret}, data=data)
return resp
三、消息模型
建立消息的数据模型时需要考虑的问题有:
- (1. 与app约定好透传消息的内容格式
- (2. 考虑内容格式与app版本的解耦性
- (3. H5页面中消息列表及内容的展示
- (4. 运营消息与用户消息推送流程的差异以及消息存储与未读处理的取舍
针对(1)和(2),我们已约定公共消息体加上特定类别的方法来处理,即消息体中包括所有可能用到的字段,然后再消息体外部再给出一个消息类别的字段,用来和app约定弹窗中的格式;当app更新时,如果不想支持以前的或者想识别新的消息类别,就可以依据该字段做过滤处理。
同时针对(3)所展示都是消息体的具体内容,可以一并塞进去,但同时要通过另一个字段来与H5约定消息内容的展示种类。
得到消息体模型如下:
from mongoengine import *
class MsgBody(EmbeddedDocument):
title = StringField(required=True) # 标题
content = StringField(required=True) # 内容
img = StringField() # 图片
url= StringField() # 点击跳转
remark = DictField() # 其他扩展
针对(4),有两种方案:
一、对两个业务流程定义不同数据模型,各自开发一套接口,推送和存储都是独立的。好处在于运营消息就可以与用户的id解耦,一条运营消息可以对应所有的用户,而用户消息则只能对应的用户相关;推送时,一个采用订阅方式,一个采用别名方式。不利之处则是,不太方便处理运营消息的未读状态。
PS:此处的推送订阅也是考虑过解耦的,也就是app运行后,自动向推送服务器订阅开发者定义好的主题。
当用户不想弹窗,关闭推送时,并没有在推送服务器上取消订阅,只是在app上修改了标记;
当app接收到推送,发现标记为关闭时,不进行弹窗操作,对用户没影响,但实际也推送了。
这样想更换第三方推送服务时,比较方便。
二、两者数据模型统一,只是通过字段区别,公共接口只需要开发一套,都是用别名推送方式。好处在于节省代码(或者工作量),实现了兼容性,运营消息也可以统计未读了。不好的地方在于,一条运营消息可能需要为每个用户存一条记录,当用户量大的时候,这个开销有点大。
方案一的实现如下:
class OperateMsg(DynamicDocument):
meta = {'db_alias': 'test',
'indexes': [略]}
msg_tag = StringField(required=True) # 与H5约定的标签
msg_body = EmbeddedDocumentField(MsgBody)
msg_type = StringField(required=True) # 与app约定的消息类别,app根据此字段来决定是否识别消息、从msg_body 读取哪些字段
enable = BooleanField(default=False) # 用户可见
class UserMsg(DynamicDocument):
meta = {'db_alias': 'test',
'indexes': [略]}
user_id = IntField(required=True) # 用户id
msg_tag = StringField(required=True)
msg_type = StringField(required=True)
msg_body = EmbeddedDocumentField(MsgBody)
unread = BooleanField(default=True) # 未读标记
方案二的实现如下:
class Msg(DynamicDocument):
meta = {'db_alias': 'test',
'indexes': [略]}
user_id = IntField(required=True) # 用户id
msg_tag = StringField(required=True) # 与H5约定的标签
msg_body = EmbeddedDocumentField(MsgBody)
msg_type = StringField(required=True) # 与app约定的消息类别,app根据此字段来决定是否识别消息、从msg_body 读取哪些字段
enable = BooleanField(default=False) # 用户可见
unread = BooleanField(default=True) # 未读标记
最终我决定使用方案一
四、接口设计
约定A、B、C的映射地址为分别为https://test_A.com、https://test_B.com、https://test_C.com
为了A和C能访问到B的公共接口,在B上的接口要进行允许跨域访问的处理
1. server B上的公共接口
1) 运营消息
主要是需要提供创建、查询、更新、发布四个操作接口来支持运营的发布流程(创建消息、查询检查一下、更新enable=True上线、执行推送);其次,还要考虑提供给H5页面中的消息列表和概况展示的需求。
接口设计如下,具体的业务实现、接口访问的安全性处理以及允许跨域访问的处理就不给出了:
import flask
oms_api = flask.blueprints.Blueprint('oms_api', __name__)
@oms_api.route('/create', methods=['POST'])
def oms_create():
pass
@oms_api.route('/update', methods=['POST'])
def oms_update():
pass
@oms_api.route('/query', methods=['POST'])
def oms_query():
pass
@oms_api.route('/push', methods=['POST'])
def oms_push():
# mipush(pkg, msg, topic) # 订阅推送
pass
@oms_api.route('/list', methods=['POST'])
def oms_list():
pass
@oms_api.route('/survey', methods=['POST'])
def oms_survey():
pass
2)用户消息
由于用户消息是程序创建和推送,所以二者应该合并为一个接口,同时还要支持H5未读查询、已读标记、概况和列表展示需求。
import flask
ums_api= flask.blueprints.Blueprint('ums_api', __name__)
@ums_api.route('/new', methods=['POST'])
def create_and_push_ums():
# mipush(pkg, msg, username): # 别名推送
pass
@ums_api.route('/read', methods=['POST'])
def read_ums():
pass
@ums_api.route('/unread_count', methods=['POST'])
def unread_count():
pass
@ums_api.route('/survey', methods=['POST'])
def ums_survey():
pass
@pri_ums_api.route('/list', methods=['POST'])
def ums_list():
pass
假设消息的模块名为message,上述接口代码在message/views/api.py中实现,为了外部能访问到这些接口,还需要在Flask项目根目录下的app.py中进行蓝图(blueprint)的注册才算是完成了路由地址的映射(即接口的url地址)
from message.views.api import oms_api, ums_api
app.register_blueprint(oms_api, url_prefix='/oms')
app.register_blueprint(ums_api, url_prefix='/ums')
这样运营消息接口的url地址就是https://test_B.com/oms/ **
相应用户消息接口的url地址就是https://test_B.com/ums/ **
2. server C上的操作
也就是给运营人员提供创建、更新、查询、发布四个操作的接口,不过需要用到request来进行网络请求,以创建为例:
import request
def create_oms(**data):
resp = requests.post('https://test_B.com/oms/create', json=data)
3. server A
在A上的接口分两种,一种是与H5交互的接口,包括通过mst_tag查询具体消息列表内容、查询整体消息概况(oms+ums)、查询ums未读、标记ums已读;还有一种接口就是ums的创建推送接口,这个只需要封装成功能函数提供调用即可,有必要的话需要异步处理(比如作为生产者丢给AWS去消费)。
这些接口都是和C上的一样,首先封装一下对B上公共接口的调用,同时自身接口提供调用。
不过由于server A上部署的是Django,接口url的路由实现就与Flask的实现不太一样,以未读消息统计接口为例吧(视觉效果就是逼死处女座的那个图标右上角的未读小红点或者统计数字)
同样假设模块名为message,接口在message/views/api.py中实现,未读统计接口的名称为ums_unread_count_api。
首先在message目录下的api_urls.py中进行映射
from django.conf.urls import url
import message.views.api
urlpatterns = [
url(r'unread_count$', message.views.api.ums_unread_count_api),
# To Add
]
然后在Django根目录下的urls.py中进行操作
from django.conf.urls import include, url
urlpatterns = [
url(r'^api/msg/', include('message.api_urls')),
# To Add
]
就可以通过完整的接口地址 https://test_A.com/api/msg/unread_count 来获取数据了