最近突然对安卓消息推送的原理感兴趣,找了不少资料,实现了一个包括服务端和客户端的简单Demo。
在具体实现的时候踩了不少坑,这里做一下笔记,防止以后忘记。
安卓消息推送的实现方案有下面几种:
- MQTT协议实现
- XMPP协议实现
- C2DM云端推送功能(google官方提供,系统内置,但是国内用不了......)
- 中国统一推送(工信部牵头成立,但是目前只是开了几次会议,并没有什么实际的接口出来,不过以后应该会是中国境内的首选方案)
我这里选择了MQTT协议去实现。
MQTT协议
MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,因此易于实现。这些特点使得它对很多场景来说都是很好的选择,包括受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT),这些场景要求很小的代码封装或者网络带宽非常昂贵。
本协议运行在TCP/IP,或其它提供了有序、可靠、双向连接的网络连接上。它有以下特点:
使用发布/订阅消息模式,提供了一对多的消息分发和应用之间的解耦。
消息传输不需要知道负载内容。
提供三种等级的服务质量:.
“最多一次”,尽操作环境所能提供的最大努力分发消息。消息可能会丢失。例如,这个等级可用于环境传感器数据,单次的数据丢失没关系,因为不久之后会再次发送。
“至少一次”,保证消息可以到达,但是可能会重复。
“仅一次”,保证消息只到达一次。例如,这个等级可用在一个计费系统中,这里如果消息重复或丢失会导致不正确的收费。
很小的传输消耗和协议数据交换,最大限度减少网络流量
异常连接断开发生时,能通知到相关各方。
上面这一段话是从网友翻译的MQTT中文文档直接复制的。有兴趣的同学可以直接访问MQTT协议中文版查看具体的协议细节。
MQTT原理
MQTT协议原理的原理简单说来就是客户端与服务端通过心跳包来保持连接。客户接收端向服务端订阅消息,客户发布端向服务端发布消息。服务端再将消息分发给订阅了该消息的客户接收端。
原理图如下:
实现库的选择
因为MQTT协议中文版上面已经有了整个QMTT的协议细节,所以理论上如果你够厉害的话,完全可以自己从零开始实现服务端和客户端。
但是从实际项目中,我还是倾向选择官方提供或者第三方开源的项目直接使用。
其实官方已经给我们提供了一些推荐实现:
https://github.com/mqtt/mqtt.github.io/wiki/software?id=software
MQTT 服务器搭建
我这边选择使用apache-apollo这个开源的MQTT服务器。
网上有不少的博客都有讲它的配置方法的,但是我按着做之后都出现了一些问题。
1、安装jdk
首先需要去安装jdk:
sudo apt-get install default-jdk
2、下载apache-apollo
然后到官网下载最新的软件。我这边使用的是腾讯云的ubuntu服务器,所以就下载了linux的版本。
下载完之后解压到/opt目录下(其实任意目录均可,只不过我用linux习惯放这里):
/opt/apache-apollo-1.7.1
3、创建项目
然后进入任意目录使用下面命令创建一个项目(官方管它叫broker):
/opt/apache-apollo-1.7.1/bin/apollo create mybroker
它会在当前目录创建一个mybroker目录,里面就是你的项目。
4、编辑admin ip配置
可以编辑mybroker/etc/apollo.xml进行一些配置。
admin后台会默认被绑定到127.0.0.1,这样你是不能通过你电脑的浏览器去访问服务器的admin后台的:
<web_admin bind="http://127.0.0.1:61680"/>
<web_admin bind="https://127.0.0.1:61681"/>
我们将它改成0.0.0.0:
<web_admin bind="http://0.0.0.0:61680"/>
<web_admin bind="https://0.0.0.0:61681"/>
注意这里的61680和61681端口,之后需要访问该端口去登陆admin后台
5、启动MQTT服务
你可以进到mybroker/bin/目录中使用下面两种方式中的一种去启动服务:
- 当前进程阻塞启动:
./apollo-broker run
- 启动后台服务:
./apollo-broker-service start
然后你就可以在你的电脑打开浏览器输入网址访问MQTT后台了:
- 如果你的服务是跑在阿里云、腾讯云这样的服务器上:
http://服务器ip:61680
- 如果你的服务就是跑在你自己的电脑上:
http://0.0.0.0:61680
它的需要输入账号密码才能登陆。默认账号是admin、密码是password
Python paho-mqtt
我们可以用python写一个客户端去验证搭建的mqtt服务器是否可用。
首先需要下载paho-mqtt:
pip install paho-mqtt
Python paho-mqtt 简单收发端Demo
接收端代码:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("topic/test")
def on_message(client, userdata, msg):
print("on_message "+msg.topic+" "+str(msg.payload))
client = mqtt.Client(client_id="", clean_session=True, userdata=None, protocol=mqtt.MQTTv31, transport="tcp")
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect("www.islinjw.cn", 61613, 60)
client.loop_forever()
发送端代码:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client = mqtt.Client(client_id="", clean_session=True, userdata=None, protocol=mqtt.MQTTv31, transport="tcp")
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.connect("www.islinjw.cn", 61613, 60)
client.publish("topic/test", "hello world")
我们先运行接收端,再运行发送端,就可以在接收端看到"hello world"的打印
常见错误
这里需要注意的是一定要配置协议版本为MQTTv31,网上的demo代码都没有配置,没有的话python这就会报错:
[Errno 104] Connection reset by peer
服务器则会报空指针,我们可以在mybroker/log/stacktrace.log中看到:
java.lang.NullPointerException
at org.apache.activemq.apollo.mqtt.MqttProtocolHandler.on_mqtt_connect(MqttProtocolHandler.java:443)
at org.apache.activemq.apollo.mqtt.MqttProtocolHandler$9.call(MqttProtocolHandler.java:410)
at org.apache.activemq.apollo.util.UnitFn1.apply(Scala2JavaHelper.scala:41)
at org.apache.activemq.apollo.mqtt.MqttProtocolHandler.on_transport_command(MqttProtocolHandler.java:377)
at org.apache.activemq.apollo.broker.BrokerConnection.on_transport_command(Connection.scala:144)
at org.apache.activemq.apollo.broker.Connection$$anon$1.onTransportCommand(Connection.scala:71)
at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:709)
at org.fusesource.hawtdispatch.transport.TcpTransport$9.run(TcpTransport.java:770)
at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
而如果没有设置账号密码的话就收到result code 4:
Connected with result code 4
我们可以从官方文档看到result code 4代表用户名或者密码错误:
0: Connection successful
1: Connection refused - incorrect protocol version
2: Connection refused - invalid client identifier
3: Connection refused - server unavailable
4: Connection refused - bad username or password
5: Connection refused - not authorised 6-255: Currently unused.
权限配置
我们设想一下,如果没有账户系统,那么只要知道服务器的ip和端口,就能随便发送消息了,这样谁都能给你的应用推送消息,十分危险。
所以mqtt是需要用账户密码去建权的,有些账户只能发送,有些账户只能接收,而有些账户全部都能做。
创建用户
我们可以编辑mybroker/etc/users.properties添加user1和user2:
admin=password
user1=123456
user2=654321
等号的左边是账户,右边是密码。所以我们也能在这里改admin的密码
创建用户组
创建完用用户,我们还需要编辑mybroker/etc/groups.properties给用户指定用户组:
admins=admin
groupsend=user1
grouprecv=user2
# 还可以用下面的方式将多个用户指定到一个用户组
# groupdemo = user1|user2
设置用户组权限
最后我们就能在mybroker/etc/apollo.xml设置用户组权限了:
<access_rule allow="admins" action="*"/>
<access_rule allow="*" action="connect" kind="connector"/>
<access_rule allow="groupsend" action="connect create send"/>
<access_rule allow="grouprecv" action="connect receive"/>
可以给一个用户组设置多个权限,多个权限之间用空格分割。从官方文档可以看到权限有下面的类别:
- admin : use of the administrative web interface
- monitor : read only use of the administrative web interface
- config : use of the administrative web interface to access and change the broker configuration.
- connect : allows connections to the connector or virtual host
- create : allows creation
- destroy : allows destruction
- send : allows the user to send to the destination
- receive : allows the user to send to do non-destructive reads from the destination
- consume : allows the user to do destructive reads against a destination
- * : All actions
配置好之后我们的user1就只能发送消息,user2就只能接收消息了。
安卓端实现
官方推荐的qatja-android我看了一下,它的实现太挫了,所以在github上搜索了下,发现了个不错的库mqtt-client。
添加依赖:
compile 'org.fusesource.mqtt-client:mqtt-client:1.14'
因为代码比较简单,所以我就直接贴上来了:
public class MainActivity extends AppCompatActivity {
public static final String TAG = "MainActivity";
public static final String TOPIC = "topic/test";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
try {
testMqtt();
} catch (URISyntaxException e) {
Log.e(TAG, "testMqtt failed", e);
}
}
private void testMqtt() throws URISyntaxException {
MQTT mqtt = new MQTT();
mqtt.setHost("www.islinjw.cn", 61613);
mqtt.setVersion("3.1");
mqtt.setUserName("admin");
mqtt.setPassword("password");
final CallbackConnection connection = mqtt.callbackConnection();
//设置监听
connection.listener(new ExtendedListener() {
@Override
public void onPublish(UTF8Buffer topic, Buffer body, Callback<Callback<Void>> ack) {
Log.d(TAG, "onPublish " + topic.toString() + " " + body.toString());
NotificationManager notifyManager
= (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
NotificationCompat.Builder builder = new NotificationCompat
.Builder(MainActivity.this)
.setSmallIcon(R.mipmap.ic_launcher)
.setContentTitle(topic.toString())
.setContentText(body.ascii().toString());
notifyManager.notify(1, builder.build());
}
@Override
public void onConnected() {
Log.d(TAG, "onConnected");
}
@Override
public void onDisconnected() {
Log.d(TAG, "onDisconnected");
}
@Override
public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) {
Log.d(TAG, "onPublish " + topic.toString() + " " + body);
ack.run();
}
@Override
public void onFailure(Throwable value) {
Log.d(TAG, "onFailure");
}
});
//连接服务器
connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
Log.d(TAG, "connect failure");
}
public void onSuccess(Void v) {
//订阅消息
Topic[] topics = {new Topic(TOPIC, QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
Log.d(TAG, "subscribe success");
}
public void onFailure(Throwable value) {
Log.e(TAG, "subscribe failure", value);
connection.disconnect(null); //断开连接
}
});
//发布一个消息
byte[] payload = "hello world".getBytes();
connection.publish(TOPIC, payload, QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
public void onSuccess(Void v) {
Log.d(TAG, "publish success");
}
public void onFailure(Throwable value) {
Log.e(TAG, "publish failure", value);
connection.disconnect(null); //断开连接
}
});
// //断开连接
// connection.disconnect(new Callback<Void>() {
// public void onSuccess(Void v) {
// Log.d(TAG, "disconnect success");
// }
//
// public void onFailure(Throwable value) {
// Log.d(TAG, "disconnect failure");
// // disconnects是不会失败的,也就是这里永远不会被调到
// }
// });
}
});
}
}
然后我们就可以用发送端的py脚本将消息推送给安卓客户端了。
主题名和主题通配符
发布的消息都有一个主题名,例如我们之前作为例子的"topic/test"。客户端向服务端订阅感兴趣的主题。当有某主题的消息被发布的时候,服务端就会将消息分发给订阅了该主题的客户端。
主题名可以用分割符"/"如果存在的话就会将主题分割为多个主题层级。
有了主题层级的概念之后我们就可以用主题通配符去过滤不同的主题。
我这里只做简单介绍,详细的信息可以参考文档
多层通配符
数字标志("#")是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。多层通配符必须位于它自己的层级或者跟在主题层级分隔符后面。不管哪种情况,它都必须是主题过滤器的最后一个字符。
例如,如果客户端订阅主题 "sport/tennis/player1/#",它会收到使用下列主题名发布的消息:
- "sport/tennis/player1"
- "sport/tennis/player1/ranking"
- "sport/tennis/player1/score/wimbledon"
- "sport/#"也匹配单独的 "sport" ,因为 # 包括它的父级。
- "#"也是有效的,会收到所有的应用消息。
多层通配符用法举例:
- "sport/tennis/#"也是有效的。
- "sport/tennis#"是无效的。
- "sport/tennis/#/ranking"是无效的。
单层通配符
加号 ("+") 是只能用于单个主题层级匹配的通配符。在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。然而它必须占据过滤器的整个层级。可以在主题过滤器中的多个层级中使用它,也可以和多层通配符一起使用。
例如, "sport/tennis/+" 匹配 "sport/tennis/player1" 和 "sport/tennis/player2",但是不匹配 "sport/tennis/player1/ranking" 。同时,由于单层通配符只能匹配一个层级, "sport/+" 不匹配 "sport" 但是却匹配 "sport/"。
单层通配符的一些使用情况:
- "+" 是有效的。
- "+/tennis/#" 是有效的。
- "sport+" 是无效的。
- "sport/+/player1" 也是有效的。
- "/finance" 匹配 "+/+" 和 "/+" ,但是不匹配 "+"。
开始通配符
美元符号("$") 用于匹配起始主题,如"$SYS/" 被广泛用作包含服务器特定信息或控制接口的主题的前缀。
开始通配符的一些用法:
- 订阅 "#" 的客户端不会收到任何发布到以 "$" 开头主题的消息。
- 订阅 "+/monitor/Clients" 的客户端不会收到任何发布到 "$SYS/monitor/Clients" 的消息。
- 订阅 "$SYS/#" 的客户端会收到发布到以 “$SYS/” 开头主题的消息。
- 订阅 "$SYS/monitor/+" 的客户端会收到发布到 "$SYS/monitor/Clients" 主题的消息。
- 如果客户端想同时接受以 "$SYS/" 开头主题的消息和不以 "$" 开头主题的消息,它需要同时订阅 "#" 和 "$SYS/#"。