广告位联系
返回顶部
分享到

使用Python与MQTT实现异步通信功能

python 来源:互联网 作者:佚名 发布时间:2024-12-20 21:54:30 人浏览
摘要

什么是MQTT协议? MQTT是一种轻量级的发布/订阅消息传输协议,设计用于低带宽和高延迟的网络环境,非常适合物联网设备之间的通信。其主要特点包括: 发布/订阅模型:支持多对多的消息传

什么是MQTT协议?

MQTT是一种轻量级的发布/订阅消息传输协议,设计用于低带宽和高延迟的网络环境,非常适合物联网设备之间的通信。其主要特点包括:

  • 发布/订阅模型:支持多对多的消息传递。
  • 轻量级设计:较低的网络开销。
  • 支持QoS等级:提供不同的消息传递可靠性。

项目背景

本文的示例代码实现了一个基于Python的MQTT客户端。以下功能涵盖在代码中:

  • 通过SSL安全连接到MQTT代理。
  • 支持动态订阅多个主题。
  • 异步处理消息,提高性能和扩展性。
  • 提供自定义消息处理功能。

核心代码解析

以下是代码中的主要功能与模块解析:

MQTT 客户端类

1

2

3

4

5

6

7

class MQTTClient:

    def __init__(self, broker, port, username, password, ca_cert, topics):

        self.client = mqtt.Client()

        self.client.username_pw_set(self.username, self.password)

        self.client.tls_set(ca_certs=self.ca_cert)

        self.client.on_connect = self.on_connect

        self.client.on_message = self.on_message

  • tls_set:启用SSL/TLS以确保通信安全。

  • 主题订阅:在连接成功时,自动订阅指定的主题。

自定义消息处理

1

2

def set_message_handler(self, handler):

    self.custom_message_handler = handler

用户可通过该方法传入自定义的回调函数,从而根据业务逻辑处理消息。

异步启动客户端

1

2

3

async def start_async(self):

    self.connect()

    await asyncio.get_event_loop().run_in_executor(None, self.client.loop_forever)

通过异步事件循环确保消息的高效处理,同时避免阻塞主线程。

示例代码集成

在主文件main.py中,定义了如下流程:

  • 初始化MQTT客户端并传入必要的参数。
  • 注册一个自定义的消息处理函数。
  • 利用asyncio实现消息处理和其他任务的并发执行。

1

2

3

4

5

async def on_mqtt_message(topic, payload):

    print(f"Custom handler: {topic} -> {payload}")

 

mqtt_client.set_message_handler(on_mqtt_message)

await mqtt_client.start_async()

使用指南

安装依赖

确保安装了paho-mqtt库:

1

pip install paho-mqtt

配置MQTT代理

更新代码中的代理地址、端口、用户名、密码和证书路径。

运行程序

使用以下命令运行程序:

1

python main.py

总结

快速搭建一个基于MQTT协议的实时通信系统。这种架构不仅适用于物联网场景,也可以在各种需要实时数据推送的应用中发挥作用,例如聊天应用和实时监控系统。

示例代码

mqtt.py

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

import paho.mqtt.client as mqtt

from datetime import datetime

import asyncio

 

class MQTTClient:

    def __init__(self, broker, port, username, password, ca_cert, topics):

        """

        初始化 MQTT 客户端

        """

        self.broker = broker

        self.port = port

        self.username = username

        self.password = password

        self.ca_cert = ca_cert

        self.topics = topics

        self.client = mqtt.Client()

 

        # 配置 MQTT 客户端

        self.client.username_pw_set(self.username, self.password)

        self.client.tls_set(ca_certs=self.ca_cert)

        self.client.on_connect = self.on_connect

        self.client.on_message = self.on_message

 

        self.custom_message_handler = None  # 自定义消息处理器

 

    def set_message_handler(self, handler):

        """

        设置自定义消息处理回调函数

        """

        self.custom_message_handler = handler

 

    def on_connect(self, client, userdata, flags, rc):

        """

        连接成功时的回调

        """

        if rc == 0:

            print("SSL连接成功")

            for topic in self.topics:

                client.subscribe(topic)

                print(f"已订阅主题: {topic}")

        else:

            print(f"连接失败,返回码: {rc}")

 

    def on_message(self, client, userdata, msg):

        """

        收到消息时的回调

        """

        current_time = datetime.now()

        payload = msg.payload.decode()

        print(f"收到消息: {msg.topic} -> {payload} 时间: {current_time}")

 

        if self.custom_message_handler and self.event_loop:

            asyncio.run_coroutine_threadsafe(

                self.custom_message_handler(msg.topic, payload),

                self.event_loop

            )

 

    def connect(self):

        """

        连接到 MQTT 服务器

        """

        self.client.connect(self.broker, self.port, keepalive=60)

 

    async def start_async(self):

        """

        异步运行 MQTT 客户端

        """

        self.connect()  # 确保连接到 MQTT 服务器

        print("Starting MQTT client loop...")

 

        # 异步运行 MQTT 客户端的事件循环

        loop = asyncio.get_event_loop()

        await loop.run_in_executor(None, self.client.loop_forever)

main.py

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

import asyncio

from mqtt import MQTTClient

 

# MQTT 配置

MQTT_BROKER = "你的服务器地址"

MQTT_PORT = 8883  # 使用 SSL 的端口

MQTT_USERNAME = "用户名"

MQTT_PASSWORD = "密码"

CA_CERT = "./emqxsl-ca.crt"  # CA 证书路径

TOPICS = ["clients/disconnect", "uhome/esp32"]  # 订阅的主题列表

 

 

async def main():

    loop = asyncio.get_running_loop()

 

    mqtt_client = MQTTClient(

        broker=MQTT_BROKER,

        port=MQTT_PORT,

        username=MQTT_USERNAME,

        password=MQTT_PASSWORD,

        ca_cert=CA_CERT,

        topics=TOPICS

    )

 

    async def on_mqtt_message(topic, payload):

        print(f"Custom handler: {topic} -> {payload}")

 

    mqtt_client.set_message_handler(on_mqtt_message)

    mqtt_client.event_loop = loop  # 将事件循环传递给 MQTT 客户端

    await mqtt_client.start_async()

 

 

    await asyncio.gather(websocket_task, periodic_task)

 

 

if __name__ == "__main__":

    asyncio.run(main())


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • Python一行代码实现打开各种类型的文件
    在处理大量文件时,手动一个个打开是不是很麻烦?或者你正在开发一个自动化工具,需要能够自动打开某些文件。这时候,Python的os.star
  • 使用Python与MQTT实现异步通信功能
    什么是MQTT协议? MQTT是一种轻量级的发布/订阅消息传输协议,设计用于低带宽和高延迟的网络环境,非常适合物联网设备之间的通信。其主
  • Python使用vars轻松获取对象属性
    vars 是 Python 内置函数之一,它主要用于返回对象的 __dict__ 属性,该属性是一个字典,包含了对象的所有属性和属性值。在调试和检查对象状
  • Python实现自动化批量调整Word样式
    处理大量的Word文档是一个常见的任务,尤其是需要批量修改文档的样式时,手动操作既费时又容易出错。幸运的是,Python提供了丰富的库,
  • Python实现批量提取Excel数据
    在数据处理和分析的过程中,Excel 是一种广泛使用的数据存储格式。使用 Python 可以高效地从多个 Excel 文件中提取数据,进行汇总和分析。
  • Python在PDF中添加或删除超链接的操作

    Python在PDF中添加或删除超链接的操作
    PDF文件现已成为文档存储和分发的首选格式。然而,PDF文件的静态特性有时会限制其交互性。超链接是提高PDF文件互动性和用户体验的关键
  • 从基础到进阶带你玩转Python中的JSON
    在Python中处理JSON数据是日常开发中的常见任务之一。JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,在Web开发、数据存储等多个
  • 基于Python开发一个Instant Messaging(IM)聊天工具
    在现代社会中,即时通讯工具已经成为人们日常沟通的重要工具。开发一个IM聊天工具不仅能够提高我们的编程技能,还能让我们更好地理解
  • 使用Python和Flask编写一个留言簿
    在本文中,我们将通过创建一个简单的留言簿应用来入门Flask。这个项目将帮助我们理解Flask的基本概念和功能,如路由、模板、表单处理等
  • python makedirs() 递归创建目录介绍
    在 Python 中,os.makedirs() 函数用于递归地创建目录。也就是说,它不仅会创建指定的目录,还会创建任何必要的父目录。这个函数在处理需要
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计