首发地址:微信 Mars Android Sample 源码分析
注:为了避免看得懵逼(当然可能是我写得不好~),建议最好先下载代码,运行完看到效果图后,再结合项目代码看“详细分析”部分……
零、前言
Mars 是微信官方开源的跨平台跨业务的终端基础组件,具有高质量网络连接模块(长短连接、智能心跳机制)、高性能日志模块和网络监测组件等。而整个 Android Sample 是基于 Mars 开发的一个 demo,包含提供了以下功能:
- 基于TCP长连接的聊天室体验。
- 数据通信成功率、耗时、流量的展示。
- 网络状况检测结果展示。
先下载 Mars 代码:https://github.com/Tencent/mars
一、本地运行 Server 端
具体如何运行 Server 端,参照官方wiki:Mars Sample 使用说明
二、修改 Android Sample
下面说下Android 端该如何修改源码连接到本地服务器
- 1.全局搜索
marsopen.cn
,修改替换为localhost - 2.在保证
app/build.gradle
下useLocalMarsWrapper = true
的情况下,在 wrapper module 下修改com.tencent.mars.sample.wrapper.service. MarsServiceStub.java
的 dns 解析的地址为本地主机的 IP 地址 (wiki 上没写清楚)
@Override
public String[] onNewDns(String host) {
// No default new dns support
return new String[]{"192.168.24.193"}; //替换为本地主机的 IP 地址
}
三、运行后聊天效果
四、整体概述
由项目结构可知 sample 分为两部分,app 和 wrapper(当然也可以用依赖的方式来使用),数据格式使用的是google开源的 protobuf,它具有高效、数据量小的特性(目前版本为 proto3,Android 开发推荐使用 protobuf-lite 版本,使用方法)
目前 gradle 接入支持两种方式:mars-core 和 mars-wrapper。只是做个 sample 的话建议可以使用 mars-wrapper, 但是如果在实际 App 中使用 mars,建议使用 mars-core 或本地编译。
怎么理解这句话?两种接入有什么不同?
从 sample 中 mars-wrapper(下面简称为 wrapper) 的源码可以看出 wrapper 只是对 mars-core 进行了再次封装,wrapper 库本质还是依赖 mars-core 库的,所以微信团队不建议实际App 中接入 wrapper ( 它只是微信官方提供对 mars-core 使用的一种方式,不过具有很好的参考价值)
- 另外,有 wrapper 的 manifest 可知,IM 服务模块是运行在独立的进程的,所以同时会涉及到 AIDL 多进程通信方面技术。
<application>
<service
android:name=".service.MarsServiceNative"
android:process=":marsservice" />
<!--注册一个网络切换的广播接收器(源码上实现的)-->
<receiver
android:name="com.tencent.mars.BaseEvent$ConnectionReceiver"
android:process=":marsservice" />
</application>
五、详细分析
1. 先从 ConversationActivity 获取初始的会话列表的请求开始
private NanoMarsTaskWrapper<Main.ConversationListRequest, Main.ConversationListResponse> taskGetConvList = null;
private void updateConversationTopics() {
if (taskGetConvList != null) {
MarsServiceProxy.cancel(taskGetConvList);
}
textView.setVisibility(View.INVISIBLE);
progressBar.setVisibility(View.VISIBLE);
swipeRefreshLayout.setRefreshing(true);
taskGetConvList = new NanoMarsTaskWrapper<Main.ConversationListRequest, Main.ConversationListResponse>(
new Main.ConversationListRequest(),
new Main.ConversationListResponse()
) {
private List<Conversation> dataList = new LinkedList<>();
@Override
public void onPreEncode(Main.ConversationListRequest req) {
req.type = conversationFilterType;
req.accessToken = ""; // TODO:
}
@Override
public void onPostDecode(Main.ConversationListResponse response) {
}
@Override
public void onTaskEnd(int errType, int errCode) {
runOnUiThread(new Runnable() {
@Override
public void run() {
if (response != null) {
for (Main.Conversation conv : response.list) {
dataList.add(new Conversation(conv.name, conv.topic, conv.notice));
}
}
if (!dataList.isEmpty()) {
progressBar.setVisibility(View.INVISIBLE);
conversationListAdapter.list.clear();
conversationListAdapter.list.addAll(dataList);
conversationListAdapter.notifyDataSetChanged();
swipeRefreshLayout.setRefreshing(false);
} else {
Log.i(TAG, "getconvlist: empty response list");
progressBar.setVisibility(View.INVISIBLE);
textView.setVisibility(View.VISIBLE);
}
}
});
}
};
MarsServiceProxy.send(taskGetConvList.setHttpRequest(CONVERSATION_HOST, "/mars/getconvlist"));
}
执行步骤:
- 创建一个 NanoMarsTaskWrapper 对象,里面主要包含 onPreEncode,onPostDecode 和onTaskEnd 等方法,分别是编码传输前,接收数据解码后和任务结束后的回调;
- 设置 NanoMarsTaskWrapper 的 http 地址
- 通过 MarsServiceProxy.send 方法执行发送请求;
初步了解执行步骤后,再详细了解 MarServiceProxy 和 NanoMarTaskWrapper 的实现,它们为什么会有这样的功能。
2. NanoMarTaskWrapper
顾名思义,NanoMarTaskWrapper 是一个任务的包装器
public abstract class NanoMarsTaskWrapper<T extends MessageNano, R extends MessageNano> extends AbstractTaskWrapper {
private static final String TAG = "Mars.Sample.NanoMarsTaskWrapper";
protected T request;
protected R response;
public NanoMarsTaskWrapper(T req, R resp) {
super();
this.request = req;
this.response = resp;
}
@Override
public byte[] req2buf() {
try {
onPreEncode(request);
final byte[] flatArray = new byte[request.getSerializedSize()];
final CodedOutputByteBufferNano output = CodedOutputByteBufferNano.newInstance(flatArray);
request.writeTo(output);
Log.d(TAG, "encoded request to buffer, [%s]", MemoryDump.dumpHex(flatArray));
return flatArray;
} catch (Exception e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public int buf2resp(byte[] buf) {
try {
Log.d(TAG, "decode response buffer, [%s]", MemoryDump.dumpHex(buf));
response = MessageNano.mergeFrom(response, buf);
onPostDecode(response);
return StnLogic.RESP_FAIL_HANDLE_NORMAL;
} catch (Exception e) {
Log.e(TAG, "%s", e);
}
return StnLogic.RESP_FAIL_HANDLE_TASK_END;
}
public abstract void onPreEncode(T request);
public abstract void onPostDecode(R response);
}
- 1)继承自AbstractTaskWrapper
- 2)创建时会同时创建继承自 MessageNano(protobuf 的消息数据类) 的 request 和 response 数据模型;
- 3)处理编解码的两个方法,req2buf 和 buf2resp,也就是字节流数组和 protobuf 对象的转换,涉及到
MessageNano.writeTo
和MessageNano.mergeFrom
的使用,这方面的具体实现不需要我们去了解,Google 的 protobuf 已经帮我们生成代码完成好了
再看看 AbstractTaskWrapper 是怎样实现的
public abstract class AbstractTaskWrapper extends MarsTaskWrapper.Stub {
private Bundle properties = new Bundle();
public AbstractTaskWrapper() {
// Reflects task properties 通过自身(继承该类的类)标注的注解,用反射获取任务的配置信息
final TaskProperty taskProperty = this.getClass().getAnnotation(TaskProperty.class);
if (taskProperty != null) {
setHttpRequest(taskProperty.host(), taskProperty.path());
setShortChannelSupport(taskProperty.shortChannelSupport());
setLongChannelSupport(taskProperty.longChannelSupport());
setCmdID(taskProperty.cmdID());
}
}
@Override
public Bundle getProperties() {
return properties;
}
@Override
public abstract void onTaskEnd(int errType, int errCode);
public AbstractTaskWrapper setHttpRequest(String host, String path) {
properties.putString(MarsTaskProperty.OPTIONS_HOST, ("".equals(host) ? null : host));
properties.putString(MarsTaskProperty.OPTIONS_CGI_PATH, path);
return this;
}
public AbstractTaskWrapper setShortChannelSupport(boolean support) {
properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, support);
return this;
}
public AbstractTaskWrapper setLongChannelSupport(boolean support) {
properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, support);
return this;
}
public AbstractTaskWrapper setCmdID(int cmdID) {
properties.putInt(MarsTaskProperty.OPTIONS_CMD_ID, cmdID);
return this;
}
@Override
public String toString() {
return "AbsMarsTask: " + BundleFormat.toString(properties);
}
}
抽象的 AbstractTaskWrapper 继承自 MarTaskWrapper.Stub (MarsTaskWrapper.aidl 生成的代码),它主要通过注解类 TaskProperty 设置了任务的配置信息 properties(如主机名、url路径、是否支持长短链接和指令 id),
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Inherited
public @interface TaskProperty {
String host() default "";
String path();
boolean shortChannelSupport() default true;
boolean longChannelSupport() default false;
int cmdID() default -1;
}
再回到刚开始的发送消息的 MarsServiceProxy 类
3. MarsServiceProxy
public class MarsServiceProxy implements ServiceConnection {
//……
private MarsService service = null;
public static MarsServiceProxy inst;
private LinkedBlockingQueue<MarsTaskWrapper> queue = new LinkedBlockingQueue<>();
private MarsServiceProxy() {
worker = new Worker();
worker.start();
}
public static void init(Context context, Looper looper, String packageName) {
if (inst != null) {
// TODO: Already initialized
return;
}
gContext = context.getApplicationContext();
gPackageName = (packageName == null ? context.getPackageName() : packageName);
gClassName = SERVICE_DEFAULT_CLASSNAME;
inst = new MarsServiceProxy();
}
@Override
public void onServiceConnected(ComponentName componentName, IBinder binder) {
Log.d(TAG, "remote mars service connected");
try {
service = MarsService.Stub.asInterface(binder);
service.registerPushMessageFilter(filter);
service.setAccountInfo(accountInfo.uin, accountInfo.userName);
} catch (Exception e) {
service = null;
}
}
//……
private static class Worker extends Thread {
@Override
public void run() {
while (true) {
inst.continueProcessTaskWrappers();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
//
}
}
}
}
}
MarsServiceProxy 只是一个单例的 ServiceConnection,不过它应该具有代理某种 Service的功能(ServiceConnection会和 Service绑定),由定义的变量和onServiceConnected中的调用可知,它关联了 MarsService,但是 MarsService 只是一个 AIDL ,不是真正的服务,这个稍后再说
MarsServiceProxy 是在SampleApplication 的 onCreate调用的时候初始化的
// NOTE: MarsServiceProxy is for client/caller
// Initialize MarsServiceProxy for local client, can be moved to other place
MarsServiceProxy.init(this, getMainLooper(), null);
当调用send 方法发送任务时,
public static void send(MarsTaskWrapper marsTaskWrapper) {
inst.queue.offer(marsTaskWrapper);
}
queue 是一个 LinkedBlockingQueue<MarsTaskWrapper>线程安全的队列,缓存了所有的MarsTaskWrapper 任务。
那将任务放进队列后,什么时候执行呢?由上面代码可以看到 MarsServiceProxy 创建时会启动一个 Worker 线程,线程会每隔50ms 执行调用inst.continueProcessTaskWrappers();
方法
private void continueProcessTaskWrappers() {
try {
if (service == null) { //服务为 null时,重新开启
Log.d(TAG, "try to bind remote mars service, packageName: %s, className: %s", gPackageName, gClassName);
Intent i = new Intent().setClassName(gPackageName, gClassName);
gContext.startService(i);
if (!gContext.bindService(i, inst, Service.BIND_AUTO_CREATE)) { //注意,此处有个 Service 绑定的判断
Log.e(TAG, "remote mars service bind failed");
}
// Waiting for service connected
return;
}
MarsTaskWrapper taskWrapper = queue.take(); //取出队列中的 MarsTask
if (taskWrapper == null) { //任务为空,跳出方法的执行
// Stop, no more task
return;
}
try {
Log.d(TAG, "sending task = %s", taskWrapper);
final String cgiPath = taskWrapper.getProperties().getString(MarsTaskProperty.OPTIONS_CGI_PATH);
final Integer globalCmdID = GLOBAL_CMD_ID_MAP.get(cgiPath);
if (globalCmdID != null) {
taskWrapper.getProperties().putInt(MarsTaskProperty.OPTIONS_CMD_ID, globalCmdID);
Log.i(TAG, "overwrite cmdID with global cmdID Map: %s -> %d", cgiPath, globalCmdID);
}
final int taskID = service.send(taskWrapper, taskWrapper.getProperties());
// NOTE: Save taskID to taskWrapper here
taskWrapper.getProperties().putInt(MarsTaskProperty.OPTIONS_TASK_ID, taskID);
} catch (Exception e) { // RemoteExceptionHandler
e.printStackTrace();
}
} catch (Exception e) {
//
}
}
这个方法里面会从任务队列取出一个任务,然后最终会通过 MarsService.send 方法发送消息任务,并保存任务 id
另外这个方法开始还有启动服务的判断,由gClassName = SERVICE_DEFAULT_CLASSNAME = "com.tencent.mars.sample.wrapper.service.MarsServiceNative"
知道,启动的服务是MarsServiceNative,那 MarsServiceNative 是怎样和 MarsServiceProxy 关联起来的呢?
再看 MarsServiceProxy 的 onServiceConnected 方法,MarsService 的初始化是通过MarsService.Stub.asInterface(binder)
关联了 MarsServiceProxy 的 IBinder
@Override
public void onServiceConnected(ComponentName componentName, IBinder binder) {
Log.d(TAG, "remote mars service connected");
try {
service = MarsService.Stub.asInterface(binder);
service.registerPushMessageFilter(filter);
service.setAccountInfo(accountInfo.uin, accountInfo.userName);
} catch (Exception e) {
service = null;
}
}
那哪个类实现了 MarsService 的方法呢?查看 MarsServiceNative 源码就能明白了,它同时实现 MarsService,间接和 MarsServiceProxy 形成强关联
public class MarsServiceNative extends Service implements MarsService {
private static final String TAG = "Mars.Sample.MarsServiceNative";
private MarsServiceStub stub;
//mars服务配置工厂
private static MarsServiceProfileFactory gFactory = new MarsServiceProfileFactory() {
@Override
public MarsServiceProfile createMarsServiceProfile() {
return new DebugMarsServiceProfile();
}
};
//……
@Override
public IBinder asBinder() {
return stub;
}
/**
* 创建运行 Mars 的服务时就初始化 STN (建议在程序启动时或者使用网络之前调用)
*/
@Override
public void onCreate() {
super.onCreate();
final MarsServiceProfile profile = gFactory.createMarsServiceProfile();
stub = new MarsServiceStub(this, profile);
// set callback
AppLogic.setCallBack(stub);
StnLogic.setCallBack(stub);
SdtLogic.setCallBack(stub);
// Initialize the Mars PlatformComm
Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper()));
// Initialize the Mars
StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts());
StnLogic.setShortlinkSvrAddr(profile.shortLinkPort());
StnLogic.setClientVersion(profile.productID());
Mars.onCreate(true);
StnLogic.makesureLongLinkConnected();
//
Log.d(TAG, "mars service native created");
}
@Override
public IBinder onBind(Intent intent) {
return stub;
}
}
MarsServiceNative 是使用 mars-core 的关键类,同时它也是一个 Service 类,运行在独立进程中,主要负责了以下功能:
- 创建配置信息类 MarsServiceProfile,并在 StnLogic 设置相关信息;
- 实例化一个 MarsServiceStub;
- 设置了 AppLogic, StnLogic, SdtLogic 的回调;
- 初始化 Mars;
- 确定 StnLogic 的长连接
StnLogic.makesureLongLinkConnected();
既然 MarsServiceNative 设置了 AppLogic, StnLogic, SdtLogic 的回调,那再看看 MarsServiceStub 是如何实现它们接口的
4. MarsServiceStub
先看发送消息的 send 方法
public class MarsServiceStub extends MarsService.Stub implements StnLogic.ICallBack, SdtLogic.ICallBack, AppLogic.ICallBack {
private static final String TAG = "Mars.Sample.MarsServiceStub";
private final MarsServiceProfile profile;
private AppLogic.AccountInfo accountInfo = new AppLogic.AccountInfo();
//……
private ConcurrentLinkedQueue<MarsPushMessageFilter> filters = new ConcurrentLinkedQueue<>();
private int clientVersion = 200;
public MarsServiceStub(Context context, MarsServiceProfile profile) {
this.context = context;
this.profile = profile;
}
private static final int FIXED_HEADER_SKIP = 4 + 2 + 2 + 4 + 4;
private static Map<Integer, MarsTaskWrapper> TASK_ID_TO_WRAPPER = new ConcurrentHashMap<>();
@Override
public int send(final MarsTaskWrapper taskWrapper, Bundle taskProperties) throws RemoteException {
final StnLogic.Task _task = new StnLogic.Task(StnLogic.Task.EShort, 0, "", null); //初始化为
// Set host & cgi path
final String host = taskProperties.getString(MarsTaskProperty.OPTIONS_HOST);
final String cgiPath = taskProperties.getString(MarsTaskProperty.OPTIONS_CGI_PATH);
_task.shortLinkHostList = new ArrayList<>();
_task.shortLinkHostList.add(host);
_task.cgi = cgiPath;
final boolean shortSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, true);
final boolean longSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, false);
if (shortSupport && longSupport) {
_task.channelSelect = StnLogic.Task.EBoth;
} else if (shortSupport) {
_task.channelSelect = StnLogic.Task.EShort;
} else if (longSupport) {
_task.channelSelect = StnLogic.Task.ELong;
} else {
Log.e(TAG, "invalid channel strategy");
throw new RemoteException("Invalid Channel Strategy");
}
// Set cmdID if necessary
int cmdID = taskProperties.getInt(MarsTaskProperty.OPTIONS_CMD_ID, -1);
if (cmdID != -1) {
_task.cmdID = cmdID;
}
TASK_ID_TO_WRAPPER.put(_task.taskID, taskWrapper);
// Send 发送任务
Log.i(TAG, "now start task with id %d", _task.taskID);
StnLogic.startTask(_task);
if (StnLogic.hasTask(_task.taskID)) {
Log.i(TAG, "stn task started with id %d", _task.taskID);
} else {
Log.e(TAG, "stn task start failed with id %d", _task.taskID);
}
return _task.taskID;
}
//……
}
- 1)创建一个 StnLogic.Task,并通过 Bundle 传过来的数据(主机名、路径、长短连接和 cmdId),设置 task;
- 2)保存 taskID 和 MarsTaskWrapper 的映射关系;
- 3)调用 StnLogic.startTask(_task) 启动任务执行,最后返回 taskID;
具体的逻辑实现还是在 mars-core 和底层的 C++ 源码中,这个以后研究到底层源码再说。
在 mars-core 大概看下 StnLogic.ICallBack 接口有哪些方法:
/**
* Created by caoshaokun on 16/2/1.
*
* APP使用信令通道必须实现该接口
* 接口用于信令通道处理完后回调上层
*/
public interface ICallBack {
/**
* SDK要求上层做认证操作(可能新发起一个AUTH CGI)
* @return
*/
boolean makesureAuthed();
/**
* SDK要求上层做域名解析.上层可以实现传统DNS解析,或者自己实现的域名/IP映射
* @param host
* @return
*/
String[] onNewDns(final String host);
/**
* 收到SVR PUSH下来的消息
* @param cmdid
* @param data
*/
void onPush(final int cmdid, final byte[] data);
/**
* SDK要求上层对TASK组包
* @param taskID 任务标识
* @param userContext
* @param reqBuffer 组包的BUFFER
* @param errCode 组包的错误码
* @return
*/
boolean req2Buf(final int taskID, Object userContext, ByteArrayOutputStream reqBuffer, int[] errCode, int channelSelect);
/**
* SDK要求上层对TASK解包
* @param taskID 任务标识
* @param userContext
* @param respBuffer 要解包的BUFFER
* @param errCode 解包的错误码
* @return int
*/
int buf2Resp(final int taskID, Object userContext, final byte[] respBuffer, int[] errCode, int channelSelect);
/**
* 任务结束回调
* @param taskID 任务标识
* @param userContext
* @param errType 错误类型
* @param errCode 错误码
* @return
*/
int onTaskEnd(final int taskID, Object userContext, final int errType, final int errCode);
/**
* 流量统计
* @param send
* @param recv
*/
void trafficData(final int send, final int recv);
/**
* 连接状态通知
* @param status 综合状态,即长连+短连的状态
* @param longlinkstatus 仅长连的状态
*/
void reportConnectInfo(int status, int longlinkstatus);
/**
* SDK要求上层生成长链接数据校验包,在长链接连接上之后使用,用于验证SVR身份
* @param identifyReqBuf 校验包数据内容
* @param hashCodeBuffer 校验包的HASH
* @param reqRespCmdID 数据校验的CMD ID
* @return ECHECK_NOW(需要校验), ECHECK_NEVER(不校验), ECHECK_NEXT(下一次再询问)
*/
int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID);
/**
* SDK要求上层解连接校验回包.
* @param buffer SVR回复的连接校验包
* @param hashCodeBuffer CLIENT请求的连接校验包的HASH值
* @return
*/
boolean onLongLinkIdentifyResp(final byte[] buffer, final byte[] hashCodeBuffer);
/**
* 请求做sync
*/
void requestDoSync();
String[] requestNetCheckShortLinkHosts();
/**
* 是否登录
* @return true 登录 false 未登录
*/
boolean isLogoned();
void reportTaskProfile(String taskString);
}
总结一下:
1.NanoMarsTaskWrapper:涉及编码前、解码后和任务结束后的回调,还有字节流数组和 protobuf 对象的转换等;
2.MarsServiceProxy:是一个 ServiceConnection,涉及消息的发送和取消等,作为一个 API 的功能,本质是 MarsServiceNative 代理服务类;
3.MarsServiceNative 是 wrapper 的核心类,里面涉及 Mars 的初始化,设置了 AppLogic, StnLogic 和 SdtLogic 的回调等;
4.而 MarsServiceStub 实现了AppLogic, StnLogic 和 SdtLogic 的回调接口,维护了和 mars-core 的调用等;
其余分析,可以到我 fork 的 mars 的地址GitHub - navyifanr/mars: (添加注释分析)下载。
参考资料:
GitHub - Tencent/mars: Mars is a cross-platform network component developed by WeChat.
微信开源mars源码分析1—上层samples分析 - ameise_w - SegmentFault