MQTT 物联网通信:发布/订阅与 QoS 机制

FreeGuideOnline 最新 2026-06-15

MQTT 协议物联网通信:发布/订阅与 QoS 机制

MQTT(Message Queuing Telemetry Transport)是一种极其轻量、开放、简单易用的发布/订阅消息传输协议,专为低带宽、高延迟、资源受限的物联网场景设计。它让设备在极少带宽下也能稳定通信,成为智能家居、工业传感、车联网等领域的事实标准。本教程将从零开始,带你系统掌握 MQTT 的发布/订阅模型与三种服务质量(QoS)机制,并结合代码实例快速上手。

为什么物联网需要 MQTT?

在物联网的世界中,设备往往算力弱、电量有限、网络不可靠。传统的 HTTP 协议建立在请求/响应模型之上,每次通信都要消耗较大的连接开销,且不支持实时推送。MQTT 则完全不同:

  • 极低协议头:最小头部仅 2 字节,比 HTTP 轻数十倍,节省流量和电力。
  • 双向推送:服务器可以主动向设备发消息,无需设备轮询。
  • 异步解耦:发布者和订阅者互不知晓,通过中间代理(Broker)通信,系统扩展更灵活。
  • 适应不可靠网络:内置多层服务质量控制,网络中断后仍能恢复到正常通信。
  • 连接状态感知:遗嘱消息、心跳保活等特性,让离线设备能被立刻发现。

核心概念:发布/订阅模型

MQTT 颠覆了传统“客户端-服务器”直连思路,采用**发布/订阅(Publish/Subscribe)**模式,所有通信围绕一个中心 Broker(代理服务器) 展开。

三大角色

  • 发布者(Publisher):产生消息的客户端,如温度传感器、门禁读卡器。
  • 订阅者(Subscriber):消费消息的客户端,如监控后台、手机 APP。
  • 代理(Broker):接收所有消息并将其路由给感兴趣的订阅者。开源常用实现有 Mosquitto、EMQX。

主题(Topic)—— 消息的地址

MQTT 中没有“发送给谁”的概念,消息被发布到 主题(Topic),订阅者再根据主题筛选接收。主题是一种 UTF-8 字符串,通过 / 分层组织,典型示例如下:

home/kitchen/temperature
office/+/light
sensor/#
  • 单层通配符 +:匹配一层任意名字。订阅 home/+/temperature 可同时收到 home/kitchen/temperaturehome/bedroom/temperature
  • 多层通配符 #:匹配当前层级及所有后代层级。office/# 可匹配 office/lightoffice/desk/temperature 等全部子主题。

⚠️ 注意:# 必须放在主题的最后,且只能单独使用,例如 sensor/# 正确,sensor/#/status 非法。

这种发布/订阅设计让系统极易扩展:新增一个存储数据的订阅者,无需修改任何发布者代码,只需要让它订阅相同的主题即可。

服务质量(QoS):确保消息可靠到达

MQTT 支持三种服务质量等级,从“发完即忘”到“保证仅一次到达”。QoS 值是发布者和订阅者之间的契约,Broker 会根据两者声明进行复杂但精妙的下行适配。

QoS 等级 名称 行为 典型场景
0 至多一次 消息仅发送一次,不要求确认,可能丢失 不重要的周期性传感器读数
1 至少一次 保证至少收到一次,但可能重复 大多数遥测数据、告警消息
2 精确一次 确保消息不丢、不重,仅到达一次 计费信息、关键控制指令

QoS 0:发完即忘

发布者将消息发送给 Broker,不等待任何确认,Broker 也不会保存消息。如果网络断开或订阅者离线,消息直接丢失。

优点:速度最快,开销最小。
缺点:无法保证可靠。

QoS 1:确认交付

发布者发送消息后,必须收到 Broker 的 PUBACK 确认报文。若未收到,发布者会重传该消息,因此可能造成接收端收到重复消息(订阅者需要幂等处理)。

流程

  1. Publisher → Broker:PUBLISH(含 PacketId)
  2. Broker 存储消息,转发给订阅者,同时回给 Publisher:PUBACK(相同 PacketId)
  3. 若 Publisher 未收到 PUBACK,重发同一 PUBLISH

QoS 2:仅一次到达——四步握手

QoS 2 是最强保证,通过两次握手实现消息去重和精确一次交付。它使用 PUBLISHPUBRECPUBRELPUBCOMP 四个步骤。

流程概览

  1. Publisher → Broker:PUBLISH(PacketId + QoS 2)
  2. Broker 存储消息,向 Publisher 回发:PUBREC(表示收到,但暂不释放消息)
  3. Publisher → Broker:PUBREL(通知 Broker 可以释放 Message ID,并完成交付)
  4. Broker → Publisher:PUBCOMP(流程结束,PacketId 可复用)

对订阅者一侧,Broker 也会采用类似机制确保“恰好一次”投递。QoS 2 的开销最大,仅在计费、固件更新等苛刻场景使用。

发布与订阅的 QoS 降级规则

在 MQTT 中,发布消息所使用的 QoS 和订阅时声明的 QoS 可能不同,实际投递给订阅者的 QoS 是两者中的最小值

  • 发布者用 QoS 2 发送消息,但订阅者只请求了 QoS 1,那么订阅者最多只能收到 QoS 1 的消息。
  • 发布者用 QoS 0 发送,即使订阅者请求 QoS 2,实际也按 QoS 0 投递。

这层设计让系统在可靠性成本和网络约束之间自动平衡。

高级特性:持久会话与遗嘱消息

MQTT 不仅是发布/订阅协议,还内建了异常处理与状态保持机制,帮助物联网系统从容应对意外离线。

持久会话(Clean Session)

客户端连接 Broker 时可设置 Clean Session 标志:

  • Clean Session = True(默认):断开后 Broker 丢弃所有未完成的消息和订阅关系。重连后像全新客户端。
  • Clean Session = False:Broker 会保留客户端的订阅主题、未确认的 QoS 1/2 消息。重连后这些数据会自动恢复,设备可继续收到离线期间积压的消息。

持久会话对低功耗传感器尤为关键——它们定期休眠,醒来后仍可获取断连期间的重要指令。

遗嘱消息(Last Will and Testament, LWT)

设备连接时可预定义一个“遗嘱消息”。一旦连接异常断开(没有发送正确的 DISCONNECT 包),Broker 就自动将这条消息发布到指定主题,通知其他客户端该设备已下线。

例如,一个门锁传感器连接时设定:

遗嘱主题:home/door/status
遗嘱消息:offline
遗嘱 QoS:1
保留标志:true

如果传感器突然断电,订阅了 home/door/status 的监控系统会立刻收到 offline,从而触发警报。这避免了轮询心跳的延迟,实现了实时状态感知。

动手实践:用 Python 与公共 Broker 通信

1. 选择 Broker 与客户端库

你可以使用 Mosquitto 搭建私有 Broker,也可以直接接入公共测试 Broker(如 test.mosquitto.orgbroker.emqx.io 均在 1883 端口提供 MQTT 服务,1884 为 WebSocket)。客户端库推荐 paho-mqtt,安装:

pip install paho-mqtt

2. 编写一个基本的发布/订阅程序

以下代码创建一个订阅者和发布者,模拟温度传感器上报,并实时接收。

订阅者代码 subscriber.py

import paho.mqtt.client as mqtt

BROKER = "test.mosquitto.org"
PORT = 1883
TOPIC = "mytutorial/temperature"

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe(TOPIC, qos=1)

def on_message(client, userdata, msg):
    print(f"{msg.topic} -> {msg.payload.decode()}")

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, 60)
client.loop_forever()

发布者代码 publisher.py

import paho.mqtt.client as mqtt
import time
import random

BROKER = "test.mosquitto.org"
PORT = 1883
TOPIC = "mytutorial/temperature"

client = mqtt.Client()
client.connect(BROKER, PORT, 60)

while True:
    temp = round(random.uniform(20.0, 30.0), 2)
    client.publish(TOPIC, payload=f"{temp}", qos=1)
    print(f"Published: {temp}")
    time.sleep(2)

运行两个脚本,你将在订阅者终端看到源源不断的温度数据。

3. 体验 QoS 与 LWT

在上面的例子中,断开订阅者网络,重连后你将看到 QoS 1 消息被 Broker 暂存并补发(如果启用了持久会话且订阅时 QoS 1)。此外,可以给连接添加遗嘱消息:

client.will_set("mytutorial/status", payload="offline", qos=1, retain=True)

启动另一个客户端订阅 mytutorial/status,然后强制关闭发布者(不要正常断开),你会发现 Broker 自动发布了离线通知。

安全加固:TLS/SSL 加密

生产环境中绝不能明文传输 MQTT 消息,尤其是包含凭证或敏感数据时。主流 Broker 都支持 TLS 加密。在客户端只需指定 TLS 配置:

client.tls_set(ca_certs="./ca.crt",
               certfile="./client.crt",
               keyfile="./client.key")
client.connect(BROKER, 8883)   # 默认 MQTTS 端口

同时使用用户名/密码认证,并配合访问控制列表(ACL)限制主题读写权限,构建完整的物联网安全框架。

进阶学习路径

掌握发布/订阅与 QoS 后,你已入门 MQTT 核心。可继续探索:

  • 保留消息(Retained Message):新订阅者可立即获取最新状态。
  • 共享订阅:在多个消费者间负载均衡。
  • MQTT 5.0 新特性:会话过期、用户属性、流程改进等。
  • 集成到实际场景:结合 InfluxDB + Grafana 构建传感器监控仪表盘;使用 Node-RED 拖拽式连接硬件到云服务。

现在你已经拥有了一份可立刻运行的 MQTT 通信方案,无论是开发个人智能家居,还是构建工业数据采集系统,这套轻量协议都将为你提供坚如磐石的通信骨架。