连接MQTT

1 MQTT协议概念:

1.1 MQTT特点:

  1. 基于Publish/Subscribe(发布订阅)模式的物联网通信协议
  2. 简单易实现
  3. 支持Qos(服务质量)
  4. 报文精简
  5. 基于TCP/IP
发布订阅模式:

客户端只需要订阅这个主题,当有其他客户端向这个服务端发布消息时,这个客户端就可以收到这个消息

请求响应模式

请求响应模式: 客户端向服务端发送请求,服务端收到请求后,向客户端返回响应

1.2 MQTT简介

MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。

1.3 MQTT协议设计规范

  1. 精简,不添加可有可无的功能;
  2. 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先知道对方的存在(ip/port), 不必同时运行
  3. 允许用户动态创建主题(不需要预先创建主题),零运维成本;
  4. 把传输量降到最低以提高传输效率
  5. 把低带宽、高延迟、不稳定的网络等因素考虑在内;
  6. 支持连续的会话保持和控制(心跳协议)
  7. 理解客户端计算能力可能很低
  8. 提供服务质量( quality of service level: QoS)管理:
  9. 不强求传输数据的类型与格式,保持灵活性(指的使应用层业务数据)
主题的概念

1.4 MQTT协议主要特性

  1. 开放消息协议,简单易实现。
  2. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
  3. 对负载(协议携带的应用数据)内容屏蔽的消息传输。
  4. 基于TCP/IP网络连接,提供有序,无损,双向连接。主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。由于基于不同的连接方式,优缺点自然也就各有不同了。
  5. 消息服务质量(QoS)支持,可靠传输保证;有三种消息发布服务质量:
    QoSO:“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。QoS1:“至少—次”,确保消息到达,但消息重复可能会发生。QoS2:“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
  6. 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。
    这就是为什么在介绍里说它非常适合"在物联网领域,传感器与服务器的通信,信息的收集,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。
  7. 在线状态感知:使用Last Will和Testament特性通知有关各方客户端异常中断的机制。
    Last Will:即遗言机制,用于通知同一主题下的其他设备,发送遗言的设备已经断开了连接。
    Testament:遗嘱机制,功能类似于Last Will。

1.5 MQTT应用领域

  • 物联网M2M通信,物联网大数据采集
  • Android消息推送,WEB消息推送
  • 移动即时消息,例如Facebook Messenger
  • 智能硬件、智能家具、智能电器
  • 车联网通信,电动车站桩采集
  • 智慧城市、远程医疗、远程教育
  • 电力、石油与能源等行业市场

2 MQTT协议原理

2.1 MQTT协议实现方式

实现MQTT协议需要客户端和服务器端通讯完成, 在通讯过程中, MQTT协议中有三种身份: 发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。 其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为: 主题(Topic) 和 负载(payload)两部分:

  1. Topic: 可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)\
  2. payload: 可以理解为消息的内容,是指订阅者具体要使用的内容

2.2 网络传输与应用消息

MQTT会构建底层网络传输: 它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。

当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关联

2.3 MQTT客户端

一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以

  1. 发布其他客户端可能会订阅的信息
  2. 订阅其他客户端发布的消息
  3. 退定或删除应用程序的消息
  4. 断开与服务器的连接

2.4 MQTT服务器端

MQTT服务器以称为"消息代理"(Broker), 可以是一个应用程序或一台设备,它是位于消息发布者和订阅者之间,它可以:

  1. 接受来自客户的网络连接
  2. 接受客户发布的应用信息
  3. 处理来自客户端的订阅和退订请求
  4. 向订阅的客户转发应用程序消息

2.5 发布/订阅、主题、会话

MQTT是基于发布(Publish)/订阅(Subscribe)模式来进行通信及数据交换的,与HTTP的请求(Request)/应答(Response)的模式有本质的不同

订阅者(Subscriber)会向 消息服务器(Broker)订阅一个主题(Topic)。成功订阅后,消息服务器会将该主题下的消息转发给所有订阅者

发布者(Publisher)只能向主题名发布消息,订阅者(Subscriber)则可以通过订阅主题过滤器来通配多个主题名称

会话(Session)

每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

2.6 MQTT协议中的方法

MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

  1. CONNECT: 客户端连接到服务器
  2. CONNACK: 连接确认
  3. PUBLISH: 发布消息
  4. PUBACK: 发布消息确认
  5. PUBREC: 发布的消息已接收
  6. PUBREL: 发布的消息已释放
  7. PUBCOMP: 发布完成
  8. SUBSCRIBE: 订阅请求
  9. SUBACK: 订阅确认
  10. UNSUBSCRIBE: 取消订阅
  11. UNSUBACK: 取消订阅确认
  12. PINGREQ: 客户端发送心跳
  13. PINGRESP: 服务端心跳响应
  14. DISCONNECT: 断开连接
  15. AUTH: 认证

3 下载安装


EMQX:  https://gitcode.com/open-source-toolkit/2c66f
MQTTX:  MQTTX:全功能 MQTT 客户端工具

emqx: 解压到  无中文或 空格 目录下 ,进入 bin目录 cmd   ,命令   emqx start
浏览器    127.0.0.1:18083         账号/密码: admin      public

按装 MQTTX 可模拟连接查看信息


# MQTT 服务器地址(如本地或公共服务器)
MQTT_BROKER = "192.168.2.2" # 测试服务器
MQTT_PORT = 1883 # 默认端口
MQTT_TOPIC = "work01" # 订阅/发布的主题

# 连接回调
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe(MQTT_TOPIC) # 订阅主题
else:
print(f"Failed to connect, return code {rc}")

# 消息接收回调
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.hex()}` from `{msg.topic}` topic")

# 断线自动重连
def on_disconnect(client, userdata, rc):
print("Disconnected, trying to reconnect...")
client.reconnect()




# 创建 MQTT 客户端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# 连接服务器
# client.username_pw_set("your_username", "your_password") # 带用户名/密码的连接,如果 MQTT 服务器需要认证
# 使用 TLS/SSL 加密连接
# client.tls_set() # 默认使用 CA 证书
# client.connect("mqtt.eclipseprojects.io", 8883, 60) # 8883 是 SSL 端口
client.connect('192.168.2.2', 1883, 60)

# 断线自动重连
client.on_disconnect = on_disconnect

# client.loop_forever() # 保持连接(阻塞式)
client.loop_start() # 后台线程维持连接(非阻塞式)

# 使用 will_set 设置遗嘱消息
# client.will_set(MQTT_TOPIC, "Client disconnected!", qos=1)

try:
while True:
# 发布消息
# client.publish(MQTT_TOPIC, "Hello MQTT!")
client.publish(MQTT_TOPIC, b'\xFF\x01\x00\x00\x55\xAA')
time.sleep(3)
except KeyboardInterrupt:
print("Exiting...")
client.disconnect()

python
周润源 2025年6月1日
分析这篇文章

存档
登录 留下评论
ubuntu启动python服务