标签:示例 self mqtt client def message id pato
# -*- coding: utf-8 -*- """ @author: Mr_zhang @software: PyCharm @file: publish.py @time: 2022/5/7 21:30 """ import json import time from paho.mqtt import client as mqtt_client class Publish: """消息发布者""" message = None def __init__(self, client_id, host, port, keepalive=60): self.client_id = client_id self.host = host self.port = port self.keepalive = keepalive self.username = "admin" self.password = "admin" @staticmethod def on_connect(client, userdata, flags, rc): """当代理响应连接请求时调用""" print("on_connect: ", client, rc) @staticmethod def on_disconnect(client, userdata, rc): """当与代理断开连接时调用""" print("on_disconnect: ", client, rc) def on_message(self, client, userdata, message): """当收到关于客户订阅的主题的消息时调用""" print("on_message: ", client, message) _message = json.loads(message.payload.decode()) self.message = _message client.disconnect() @staticmethod def on_publish(client, userdata, mid): """当使用使用publish()发送的消息已经传输到代理时被调用""" print("on_publish: ", client, userdata, mid) @staticmethod def on_subscribe(client, userdata, mid, granted_qos): """当代理响应订阅请求时被调用""" print("on_subscribe: ", client) @staticmethod def on_unsubscribe(client, userdata, mid): """当代理响应取消订阅请求时调用""" print("on_unsubscribe: ", client) @staticmethod def on_log(client, userdata, level, buf): """当客户端有日志信息时调用""" print("on_log: ", client) def connect_mqtt(self): """连接mqtt服务器""" client = mqtt_client.Client( self.client_id, protocol=mqtt_client.MQTTv311, transport="tcp" ) client.username_pw_set(self.username, self.password) client.on_connect = self.on_connect client.on_disconnect = self.on_disconnect client.on_message = self.on_message client.on_publish = self.on_publish client.on_subscribe = self.on_subscribe client.on_unsubscribe = self.on_unsubscribe # client.on_log = self.on_log client.connect(host=self.host, port=self.port, keepalive=self.keepalive) return client @staticmethod def publish(client, topic, message): """发布消息""" result = client.publish(topic, payload=json.dumps(message), qos=1) status = result[0] if status == 0: print(f"send {message} to {topic}") else: client.loop_stop() print(f"failed to send message to {topic}") def send(self, topic, message, callback=True): """ 主程序运行 :param topic: 订阅主题 :param message: 消息 :param callback: 是否需要返回值,默认需要 :return: """ client = self.connect_mqtt() self.publish(client=client, topic=topic, message=message) if callback: client.subscribe(self.client_id) client.loop_forever() else: client.loop_start() if __name__ == "__main__": _client_id = "mqtt-tcp-pub-{id}".format(id=time.time() * 100000) data = {"client_id": _client_id, "data": {"k": "v"}, "callback": True} # pub = Publish(client_id=_client_id, host="127.0.0.1", port=1883, keepalive=60) # pub.send(topic="TEST", message=data, callback=True) # print(pub.message)
# -*- coding: utf-8 -*- """ @author: Mr_zhang @software: PyCharm @file: subscribe.py @time: 2022/5/7 21:30 """ import json import time from paho.mqtt import client as mqtt_client class Subscribe: """消息订阅者""" def __init__(self, client_id, host, port, keepalive=60): self.client_id = client_id self.host = host self.port = port self.keepalive = keepalive self.topic = None self.username = "admin" self.password = "admin" @staticmethod def on_connect(client, userdata, flags, rc): """当代理响应连接请求时调用""" print("on_connect: ", client, rc) @staticmethod def on_disconnect(client, userdata, rc): """当与代理断开连接时调用""" print("on_disconnect: ", client, rc) @staticmethod def on_message(client, userdata, message): """当收到关于客户订阅的主题的消息时调用""" print("on_message: ", client) _message = json.loads(message.payload.decode()) callback = _message.get("callback") print(_message) data = _message.get("data") # 这里执行单独业务逻辑,如果是HTTP请求,建议设置超时时间 if callback: # 是否需要返回值 client_id = _message.get("client_id") client.publish( client_id, payload=json.dumps({"client_id": client_id, "message": "自定义消息体返回."}), qos=1, ) @staticmethod def on_publish(client, userdata, mid): """当使用使用publish()发送的消息已经传输到代理时被调用""" print("on_publish: ", client, userdata, mid) @staticmethod def on_subscribe(client, userdata, mid, granted_qos): """当代理响应订阅请求时被调用""" print("on_subscribe: ", client) @staticmethod def on_unsubscribe(client, userdata, mid): """当代理响应取消订阅请求时调用""" print("on_unsubscribe: ", client) @staticmethod def on_log(client, userdata, level, buf): """当客户端有日志信息时调用""" print("on_log: ", client) def connect_mqtt(self): """连接mqtt服务器""" client = mqtt_client.Client( self.client_id, protocol=mqtt_client.MQTTv311, transport="tcp" ) client.username_pw_set(self.username, self.password) client.on_connect = self.on_connect client.on_disconnect = self.on_disconnect client.on_message = self.on_message client.on_publish = self.on_publish client.on_subscribe = self.on_subscribe client.on_unsubscribe = self.on_unsubscribe # client.on_log = self.on_log client.connect(host=self.host, port=self.port, keepalive=self.keepalive) return client @staticmethod def subscribe(client, topic): """发布消息""" client.subscribe(topic, qos=1) def receive(self, topic): """主程序运行""" self.topic = topic client = self.connect_mqtt() self.subscribe(client, topic) client.loop_forever() if __name__ == "__main__": _client_id = "mqtt-tcp-sub-{id}".format(id=time.time() * 100000) # pub = Subscribe( # client_id=_client_id, host="127.0.0.1", port=1883, keepalive=60 # ) # pub.receive(topic="TEST")
生产者跟消费者的角色有时候可以互相转换。生产者亦可作为消费者。
mqtt模拟HTTP请求。可获取状态码信息
标签:示例,self,mqtt,client,def,message,id,pato 来源: https://www.cnblogs.com/52-qq/p/16257381.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。