使用python实现MQTT通信

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT特点

1. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。该协议需要客户端和服务端,而协议中主要有三种身份:发布者(Publisher)、代理(Broker,服务器)、订阅者(Subscriber)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,而消息发布者可以同时是订阅者,实现了生产者与消费者的脱耦;
2. 对负载内容屏蔽的消息传输;
3. 使用 TCP/IP 提供网络连接;
4. 有三种消息发布服务质量:
a) “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
b) “至少一次”,确保消息到达,但消息重复可能会发生。
c) “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
发布者发送消息到代理服务器,服务器转发消息到订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

MQTT广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。下面我们在Python 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、收发消息等功能,
由于目前没有物联网设备,就在linux虚拟机中获取服务器cpu,磁盘,内存等信息来模拟获取物联网设备信息的消息发布与订阅功能。

搭建开发环境 创建/root/mymqtt 为项目根目录

首先搭建MQTT代理服务器,我们使用EMQX来做MQTT代理服务器。

  1. 下载 emqx-centos7-4.2.7-x86_64.zip 文件到mymqtt目录中
    wget https://www.emqx.cn/downloads/broker/v4.2.7/emqx-centos7-4.2.7-x86_64.zip
  2. 安装
    unzip emqx-centos7-4.2.7-x86_64.zip
  3. 启动MQTT代理服务器
    ./emqx/bin/emqx start
  4. 验证代理服务器是否正常运行

    ps aux | grep emqx
    image.png
    可以看到EMQX已经在运行了,MQTT代理服务器搭建成功。

搭建python环境,在mymqtt目录下创建mypy目录为python项目根目录
image.png
  1. 使用pyenv 搭建python虚拟环境,并安装依赖包psutil和paho-mqtt
    image.png
  2. 消息发布代码
文件名:mypub.py


#!/usr/bin/env python
#coding:utf-8

import time
import json
import psutil
import random
from paho.mqtt import client as mqtt_client

broker = '127.0.0.1'  # mqtt代理服务器地址
port = 1883
keepalive = 60     # 与代理通信之间允许的最长时间段(以秒为单位)              
topic = "/python/mqtt"  # 消息主题
client_id = f'python-mqtt-pub-{random.randint(0, 1000)}'  # 客户端id不能重复

def to_M(n):
    '''将B转换为M'''
    u = 1024 * 1024
    m = round(n / u, 2)
    return m

def get_info():
    '''获取系统硬件信息:cpu利用率,cpu个数,系统负载,内存信息等'''
    cpu_percent = psutil.cpu_percent(interval=1)
    cpu_count = psutil.cpu_count()
    sys_loadavg = [round(x / psutil.cpu_count() * 100, 2) for x in psutil.getloadavg()]
    mem = psutil.virtual_memory()
    mem_total, men_free = to_M(mem.total), to_M(mem.free)
    mem_percent = mem.percent
    info = {
        'cpu_percent': cpu_percent,
        'cpu_count' : cpu_count,
        'sys_loadavg': sys_loadavg,
        'mem_total': mem_total,
        'mem_percent': mem_percent,
        'men_free': men_free
    }
    # mqtt只能传输字符串数据
    return json.dumps(info)

def connect_mqtt():
    '''连接mqtt代理服务器'''
    def on_connect(client, userdata, flags, rc):
        '''连接回调函数'''
        # 响应状态码为0表示连接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)
    # 连接mqtt代理服务器,并获取连接引用
    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port, keepalive)
    return client

def publish(client):
    '''发布消息'''
    while True:
        '''每隔4秒发布一次服务器信息'''
        time.sleep(4)
        msg = get_info()
        result = client.publish(topic, msg)
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")

def run():
    '''运行发布者'''
    client = connect_mqtt()
    # 运行一个线程来自动调用loop()处理网络事件, 非阻塞
    client.loop_start()
    publish(client)

if __name__ == '__main__':
    run()

  1. 消息订阅代码
文件名:mysub.py


#!/usr/bin/env python
#coding:utf-8

import random
from paho.mqtt import client as mqtt_client


broker = '127.0.0.1'  # mqtt代理服务器地址
port = 1883
keepalive = 60     # 与代理通信之间允许的最长时间段(以秒为单位)              
topic = "/python/mqtt"  # 消息主题
client_id = f'python-mqtt-sub-{random.randint(0, 1000)}'  # 客户端id不能重复

def connect_mqtt():
    '''连接mqtt代理服务器'''
    def on_connect(client, userdata, flags, rc):
        '''连接回调函数'''
        # 响应状态码为0表示连接成功
        if rc == 0:
            print("Connected to MQTT OK!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port, keepalive )
    return client

def subscribe(client: mqtt_client):
    '''订阅主题并接收消息'''
    def on_message(client, userdata, msg):
        '''订阅消息回调函数'''
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
    # 订阅指定消息主题
    client.subscribe(topic)
    client.on_message = on_message


def run():
    # 运行订阅者
    client = connect_mqtt()
    subscribe(client)
    #  运行一个线程来自动调用loop()处理网络事件, 阻塞模式
    client.loop_forever()


if __name__ == '__main__':
    run()
  1. 命令行进入python虚拟环境,启动发布者
    image.png

    可以看到发布者启动并运行成功

  2. 打开第二个命令行窗口,进入虚拟环境,启动订阅者
    image.png

    可以看到订阅者启动成功,并接受到了发布者发布的消息。

至此,我们完成了使用 paho-mqtt 客户端连接到 本地MQTT 服务器并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容