import paho.mqtt.client as mqtt
from datetime import datetime
import asyncio
class MQTTClient:
def __init__(self, broker, port, username, password, ca_cert, topics):
"""
初始化 MQTT 客户端
"""
self.broker = broker
self.port = port
self.username = username
self.password = password
self.ca_cert = ca_cert
self.topics = topics
self.client = mqtt.Client()
# 配置 MQTT 客户端
self.client.username_pw_set(self.username, self.password)
self.client.tls_set(ca_certs=self.ca_cert)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.custom_message_handler = None # 自定义消息处理器
def set_message_handler(self, handler):
"""
设置自定义消息处理回调函数
"""
self.custom_message_handler = handler
def on_connect(self, client, userdata, flags, rc):
"""
连接成功时的回调
"""
if rc == 0:
print("SSL连接成功")
for topic in self.topics:
client.subscribe(topic)
print(f"已订阅主题: {topic}")
else:
print(f"连接失败,返回码: {rc}")
def on_message(self, client, userdata, msg):
"""
收到消息时的回调
"""
current_time = datetime.now()
payload = msg.payload.decode()
print(f"收到消息: {msg.topic} -> {payload} 时间: {current_time}")
if self.custom_message_handler and self.event_loop:
asyncio.run_coroutine_threadsafe(
self.custom_message_handler(msg.topic, payload),
self.event_loop
)
def connect(self):
"""
连接到 MQTT 服务器
"""
self.client.connect(self.broker, self.port, keepalive=60)
async def start_async(self):
"""
异步运行 MQTT 客户端
"""
self.connect() # 确保连接到 MQTT 服务器
print("Starting MQTT client loop...")
# 异步运行 MQTT 客户端的事件循环
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.loop_forever)
|