1 MQTT协议概念:
1.1 MQTT特点:
- 基于Publish/Subscribe(发布订阅)模式的物联网通信协议
- 简单易实现
- 支持Qos(服务质量)
- 报文精简
- 基于TCP/IP
发布订阅模式:
客户端只需要订阅这个主题,当有其他客户端向这个服务端发布消息时,这个客户端就可以收到这个消息
请求响应模式
请求响应模式: 客户端向服务端发送请求,服务端收到请求后,向客户端返回响应
1.2 MQTT简介
MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。
1.3 MQTT协议设计规范
- 精简,不添加可有可无的功能;
- 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先知道对方的存在(ip/port), 不必同时运行
- 允许用户动态创建主题(不需要预先创建主题),零运维成本;
- 把传输量降到最低以提高传输效率
- 把低带宽、高延迟、不稳定的网络等因素考虑在内;
- 支持连续的会话保持和控制(心跳协议)
- 理解客户端计算能力可能很低
- 提供服务质量( quality of service level: QoS)管理:
- 不强求传输数据的类型与格式,保持灵活性(指的使应用层业务数据)
主题的概念
1.4 MQTT协议主要特性
- 开放消息协议,简单易实现。
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
- 对负载(协议携带的应用数据)内容屏蔽的消息传输。
- 基于TCP/IP网络连接,提供有序,无损,双向连接。主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。由于基于不同的连接方式,优缺点自然也就各有不同了。
- 消息服务质量(QoS)支持,可靠传输保证;有三种消息发布服务质量:
QoSO:“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。QoS1:“至少—次”,确保消息到达,但消息重复可能会发生。QoS2:“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。 - 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。
这就是为什么在介绍里说它非常适合"在物联网领域,传感器与服务器的通信,信息的收集,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。 - 在线状态感知:使用Last Will和Testament特性通知有关各方客户端异常中断的机制。
Last Will:即遗言机制,用于通知同一主题下的其他设备,发送遗言的设备已经断开了连接。
Testament:遗嘱机制,功能类似于Last Will。
1.5 MQTT应用领域
- 物联网M2M通信,物联网大数据采集
- Android消息推送,WEB消息推送
- 移动即时消息,例如Facebook Messenger
- 智能硬件、智能家具、智能电器
- 车联网通信,电动车站桩采集
- 智慧城市、远程医疗、远程教育
- 电力、石油与能源等行业市场
2 MQTT协议原理
2.1 MQTT协议实现方式
实现MQTT协议需要客户端和服务器端通讯完成, 在通讯过程中, MQTT协议中有三种身份: 发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。 其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为: 主题(Topic) 和 负载(payload)两部分:
- Topic: 可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)\
- payload: 可以理解为消息的内容,是指订阅者具体要使用的内容
2.2 网络传输与应用消息
MQTT会构建底层网络传输: 它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关联
2.3 MQTT客户端
一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以
- 发布其他客户端可能会订阅的信息
- 订阅其他客户端发布的消息
- 退定或删除应用程序的消息
- 断开与服务器的连接
2.4 MQTT服务器端
MQTT服务器以称为"消息代理"(Broker), 可以是一个应用程序或一台设备,它是位于消息发布者和订阅者之间,它可以:
- 接受来自客户的网络连接
- 接受客户发布的应用信息
- 处理来自客户端的订阅和退订请求
- 向订阅的客户转发应用程序消息
2.5 发布/订阅、主题、会话
MQTT是基于发布(Publish)/订阅(Subscribe)模式来进行通信及数据交换的,与HTTP的请求(Request)/应答(Response)的模式有本质的不同
订阅者(Subscriber)会向 消息服务器(Broker)订阅一个主题(Topic)。成功订阅后,消息服务器会将该主题下的消息转发给所有订阅者
发布者(Publisher)只能向主题名发布消息,订阅者(Subscriber)则可以通过订阅主题过滤器来通配多个主题名称
会话(Session)
每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
2.6 MQTT协议中的方法
MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:
- CONNECT: 客户端连接到服务器
- CONNACK: 连接确认
- PUBLISH: 发布消息
- PUBACK: 发布消息确认
- PUBREC: 发布的消息已接收
- PUBREL: 发布的消息已释放
- PUBCOMP: 发布完成
- SUBSCRIBE: 订阅请求
- SUBACK: 订阅确认
- UNSUBSCRIBE: 取消订阅
- UNSUBACK: 取消订阅确认
- PINGREQ: 客户端发送心跳
- PINGRESP: 服务端心跳响应
- DISCONNECT: 断开连接
- AUTH: 认证
3 下载安装
EMQX: https://gitcode.com/open-source-toolkit/2c66f
MQTTX: MQTTX:全功能 MQTT 客户端工具
emqx: 解压到 无中文或 空格 目录下 ,进入 bin目录 cmd ,命令 emqx start
浏览器 127.0.0.1:18083 账号/密码: admin public
按装 MQTTX 可模拟连接查看信息
# MQTT 服务器地址(如本地或公共服务器)
MQTT_BROKER = "192.168.2.2" # 测试服务器
MQTT_PORT = 1883 # 默认端口
MQTT_TOPIC = "work01" # 订阅/发布的主题
# 连接回调
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe(MQTT_TOPIC) # 订阅主题
else:
print(f"Failed to connect, return code {rc}")
# 消息接收回调
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.hex()}` from `{msg.topic}` topic")
# 断线自动重连
def on_disconnect(client, userdata, rc):
print("Disconnected, trying to reconnect...")
client.reconnect()
# 创建 MQTT 客户端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# 连接服务器
# client.username_pw_set("your_username", "your_password") # 带用户名/密码的连接,如果 MQTT 服务器需要认证
# 使用 TLS/SSL 加密连接
# client.tls_set() # 默认使用 CA 证书
# client.connect("mqtt.eclipseprojects.io", 8883, 60) # 8883 是 SSL 端口
client.connect('192.168.2.2', 1883, 60)
# 断线自动重连
client.on_disconnect = on_disconnect
# client.loop_forever() # 保持连接(阻塞式)
client.loop_start() # 后台线程维持连接(非阻塞式)
# 使用 will_set 设置遗嘱消息
# client.will_set(MQTT_TOPIC, "Client disconnected!", qos=1)
try:
while True:
# 发布消息
# client.publish(MQTT_TOPIC, "Hello MQTT!")
client.publish(MQTT_TOPIC, b'\xFF\x01\x00\x00\x55\xAA')
time.sleep(3)
except KeyboardInterrupt:
print("Exiting...")
client.disconnect()
连接MQTT