一篇文章让您了解MQTT

什么是MQTT

​ MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景。

MQTT使用场景

与XMPP相比有什么特点

​ 同MQTT类似的是XMPP协议,他们的特点点见下表:

MQTT XMPP
基于协议层 TCP TCP,也可以基于HTTP
体积 小巧 庞大
适用场景 物联网 聊天
省流 省流量 费流量
省电 省电 费电
成熟度 不成熟 成熟

发布/订阅模式

与请求/回答这种同步模式不同,发布/订阅模式解耦了发布消息的客户(发布者)与订阅消息的客户(订阅者)之间的关系,这意味着发布者和订阅者之间并不需要直接建立联系。打个比方,你打电话给朋友,一直要等到朋友接电话了才能够开始交流,是一个典型的同步请求/回答的场景;而给一个好友邮件列表发电子邮件就不一样,你发好电子邮件该干嘛干嘛,好友们到有空了去查看邮件就是了,是一个典型的异步发布/订阅的场景。

熟悉编程的同学一定非常熟悉这种设计模式了,因为它带来了这些好处:

  • 发布者与订阅者不必了解彼此,只要认识同一个消息代理即可。

  • 发布者和订阅者不需要交互,发布者无需等待订阅者确认而导致锁定。

  • 发布者和订阅者不需要同时在线,可以自由选择时间来消费消息。

主题

MQTT是通过主题对消息进行分类的,本质上就是一个UTF-8的字符串,不过可以通过反斜杠表示多个层级关系。主题并不需要创建,直接使用就是了。

主题还可以通过通配符进行过滤。其中,+可以过滤一个层级,而#只能出现在主题最后表示过滤任意级别的层级。

举个例子:

  • building-b/floor-5:代表B楼5层的设备。

  • +/floor-5:代表任何一个楼的5层的设备。

  • building-b/#:代表B楼所有的设备。

注意,MQTT允许使用通配符订阅主题,但是并不允许使用通配符广播。

协议介绍

​ MQTT的通信协议并不复杂,最核心的部分,我认为是他的16种控制类型,如下表:

名字 报文流动方向 描述
Reserved 0 禁止 保留
CONNECT 1 客户端到服务端 客户端请求连接服务端
CONNACK 2 服务端到客户端 连接报文确认
PUBLISH 3 两个方向都允许 发布消息
PUBACK 4 两个方向都允许 QoS 1消息发布收到确认
PUBREC 5 两个方向都允许 发布收到(保证交付第一步)
PUBREL 6 两个方向都允许 发布释放(保证交付第二步)
PUBCOMP 7 两个方向都允许 QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE 8 客户端到服务端 客户端订阅请求
SUBACK 9 服务端到客户端 订阅请求报文确认
UNSUBSCRIBE 10 客户端到服务端 客户端取消订阅请求
UNSUBACK 11 服务端到客户端 取消订阅报文确认
PINGREQ 12 客户端到服务端 心跳请求
PINGRESP 13 服务端到客户端 心跳响应
DISCONNECT 14 客户端到服务端 客户端断开连接
Reserved 15 禁止 保留

​ 我看MQTT协议内容的时候,发现有趣的一点就是他的字符长度是可变的,可以用ASCII,也可以用UTF-8,这个在我接触的其他协议里面是没有的,这样子的好处,显而易见的就是能减少传输流量。

什么是QoS

​ Qos的全称是服务质量(Quality of Service)。MQTT支持三种QoS,分别是0、1、2。级别越高,交互越复杂,越能保证正确性和到达率,但是开销也更大。他们的交互流程图如下:

  • QoS 0: 尽力而为。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
image
  • QoS 1: 至少一次。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
image
  • QoS 2: 恰好一次。保证这种语义肯定会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别2是最合适的。


    image

​ 服务质量是个老话题了。级别2所提供的不重不丢很多情况下是最理想的,不过往返多次的确认一定对并发和延迟带来影响。级别1提供的至少一次语义在日志处理这种场景下是完全OK的,所以像Kafka这类的系统利用这一特点减少确认从而大大提高了并发。级别0适合鸡肋数据场景,食之无味弃之可惜,就这么着吧。

相关阅读

Paho UML

paho uml.png

说明:

  1. 上面三个类(MqttAndroidClient, MqttService, MqttConnection)是存在于android.paho库中,也就是说,这三个类是paho基于android的封装,而其余的类都是封装在java.paho中,处于底层库。

  2. 中间的纵轴线从上往下,MqttService,MqttConnection, MqttAsyncClient, ClientComms,这四个类都具备connect, publish, subscribe等开放给上层的基础方法,因为上层的调用都是通过这四个类传递到底层。

  3. Paho运行的核心在于CommsCallback, CommsSender和CommsReceiver三个线程,分别用来做触发回调、发送消息和接收消息三个动作。

  4. ClientState和CommsTokenStore维护着一套复杂的队列,三个线程通过使用这两个类,来实现线程间的同步和消息的排队。

  5. ClientComms的功能除了基础方法接口之外,还维护底层三个线程的状态,这些状态主要包括:

    1. connected

    2. connecting

    3. disconnected

    4. disconnecting

    5. closed

    6. resting

  6. MqttAsyncClient的功能除了基础方法接口之外,主要负责连接的重连,通过下面介绍的流程分析可以看出来,mqtt的重连机制就是在MqttAsynClient发起的。

  7. MqttConnection是一个独立的mqtt连接,用于维护当前连接的所有状态。

流程分析

因为整个库的代码量比较大,我这里就不把代码贴出来了,这是我通过阅读代码整理出来的所有关键方法的流程,包括connect, publish 和reconnect,希望能对读者您有帮助。

  • connect()
  1. MqttAndroidClient.bindService()

  2. MqttService.connect()

    1. MqttService.createConnection is not exists
  3. MqttConnection.connect()

    1. if option.isCleanSession(), then discard old data (we set cleanSession true)

    2. callback action connect to activity when start connect fail, result fail or success

    3. if MqttAsyncClient created

      1. if isConnecting, return

      2. if is connected, doAfterConnectSuccess

      3. else MqttAsyncClient.connect

    4. new MqttAsyncClient and connect

  4. MqttAsyncClient.connect()

    1. judge if need connect by ClientComms

      1. if connected, throw exception

      2. if connecting, throw exception

      3. if disconnected, throw exception

      4. if closed, throw exception

    2. ClientComms.setReconnectCallback if options set automaticReconnect

    3. new MqttToken as userToken

    4. new ConnectActionListener and set comms, userToken

  5. ConnectActionListener.connect()

    1. persistence.open

    2. comms.connect

  6. ClientComms.connect()

    1. if not disconnected or is closing, throw exception

    2. tokenStore.open(), just set closeResponse null

    3. ConnectBG.run

  7. ConnectBG.run()

    1. CommsReceiver.start()

      1. runningSemaphore.acquire(), ensure just one receiver thread is running

      2. while in.available && message instance of MqttAck

      3. tokenStore.getToken

        1. if token != null, synchronized token and clientState.notifyReceivedAck

        2. else if instance of MqttPubRec MqttPubComp MqttPubAck, log itthese signal only receive after send

        3. else throw exception

    2. CommsSender.start()

      1. runningSemaphore.acquire(), ensure just one sender thread is running

      2. await clientState.pendingMessages.removeElementAt(0)

      3. out.write(message)

    3. CommsCallback.start()

      1. runningSemaphore.acquire(), ensure just one callback thread is running

      2. workAvailable.wait()

      3. check if exists complete token, if true then handleActionComplete

        1. if token.isComplete, notifyComplete

        2. token.internalTok.notifyComplete

        3. if token.isNotified

          1. if mqttCallback != null and is DeliveryToken, notify deliveryComplete

          2. if token.getActionCallback != null, notify onSuccess or onFail

        4. token.setNotifiyed(true)

      4. check if exists MqttPublish, if true then handleMessage(MqttPublish)this happens when register topic, not exists in DataCollector

    4. internalSend

      1. clientState.send()
  8. clientState.send()

    1. token.setMessageId()

    2. if MqttPublish

      1. tokenStore.saveToken

      2. pedingMessages.addElement

      3. queueLock.notifyAll()

    3. if MqttConnect

      1. tokenStore.saveToken

      2. pedingFlows.insertElementAt(0)

      3. queueLock.notifyAll()

  • publish()
  1. MqttAndroidClient.publish()

  2. MqttService.publish()

  3. MqttConnection.publish()

    1. if MqttAsyncClient.isConnected

      1. MqttAsyncClient.publish

      2. storeSendDetails

    2. else, callbackToActivity error

  4. MqttAsyncClient.publish()

    1. new MqttPublish

    2. comms.sendNoWait

  5. ClientComms.sendNoWait()

    1. if connected or trying to connect or trying to disconnect

      1. if disconnectedBuffer is not empty, put message to thatafter reconnect, it will send disconnectBuffer message first, before send real-time message

      2. internalSend()

        1. token.setClient

        2. clientState.send

    2. else if disconnectedBuffer is not null, put message to that

    3. else throw exception

  6. ClientState.send()go to connect

  • Reconnect automatic
  1. ClientComms.shutdownConnection()

  2. CommsCallback.connectionLost()

  3. MqttAsyncClient.MqttReconnectCallback.connectinLost()

    1. startReconnectCycle

    2. run ReconnectTask after 1s

    3. attempReconnect()

  4. MqttAsyncClient.connect()go to connect .4

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容

  • 导语 翻译自iot in five days 8.2 MQTT是什么?MQTT(formerly MQ Telem...
    happy1993阅读 6,512评论 1 14
  • 一:前言 最近在了解MQTT协议相关的内容,内容有点多,特此把MQTT协议,以及其从服务端到客户端的流程整理出来...
    子夏的不语阅读 69,689评论 9 92
  • MQTT Protocol MQTT协议特性 一句话总结:MQTT是一个简单,轻量的消息发布/订阅协议。 MQTT...
    福克斯记阅读 7,176评论 0 9
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,497评论 18 139
  • 水,是自然的恩赐,却也是自然的惩罚。众多的疾病中,水的比例失调,是最难调理的健康问题。如同自然界的水灾,一直是人类...
    一只小佳子阅读 388评论 0 0