ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

pato-mqtt示例代码

2022-05-11 12:02:24  阅读:135  来源: 互联网

标签:示例 self mqtt client def message id pato


# -*- coding: utf-8 -*-

"""
@author: Mr_zhang
@software: PyCharm
@file: publish.py
@time: 2022/5/7 21:30
"""

import json
import time

from paho.mqtt import client as mqtt_client


class Publish:
    """消息发布者"""

    message = None

    def __init__(self, client_id, host, port, keepalive=60):
        self.client_id = client_id
        self.host = host
        self.port = port
        self.keepalive = keepalive
        self.username = "admin"
        self.password = "admin"

    @staticmethod
    def on_connect(client, userdata, flags, rc):
        """当代理响应连接请求时调用"""
        print("on_connect: ", client, rc)

    @staticmethod
    def on_disconnect(client, userdata, rc):
        """当与代理断开连接时调用"""
        print("on_disconnect: ", client, rc)

    def on_message(self, client, userdata, message):
        """当收到关于客户订阅的主题的消息时调用"""
        print("on_message: ", client, message)
        _message = json.loads(message.payload.decode())
        self.message = _message
        client.disconnect()

    @staticmethod
    def on_publish(client, userdata, mid):
        """当使用使用publish()发送的消息已经传输到代理时被调用"""
        print("on_publish: ", client, userdata, mid)

    @staticmethod
    def on_subscribe(client, userdata, mid, granted_qos):
        """当代理响应订阅请求时被调用"""
        print("on_subscribe: ", client)

    @staticmethod
    def on_unsubscribe(client, userdata, mid):
        """当代理响应取消订阅请求时调用"""
        print("on_unsubscribe: ", client)

    @staticmethod
    def on_log(client, userdata, level, buf):
        """当客户端有日志信息时调用"""
        print("on_log: ", client)

    def connect_mqtt(self):
        """连接mqtt服务器"""
        client = mqtt_client.Client(
            self.client_id, protocol=mqtt_client.MQTTv311, transport="tcp"
        )
        client.username_pw_set(self.username, self.password)
        client.on_connect = self.on_connect
        client.on_disconnect = self.on_disconnect
        client.on_message = self.on_message
        client.on_publish = self.on_publish
        client.on_subscribe = self.on_subscribe
        client.on_unsubscribe = self.on_unsubscribe
        # client.on_log = self.on_log
        client.connect(host=self.host, port=self.port, keepalive=self.keepalive)
        return client

    @staticmethod
    def publish(client, topic, message):
        """发布消息"""
        result = client.publish(topic, payload=json.dumps(message), qos=1)
        status = result[0]
        if status == 0:
            print(f"send {message} to {topic}")
        else:
            client.loop_stop()
            print(f"failed to send message to {topic}")

    def send(self, topic, message, callback=True):
        """
        主程序运行
        :param topic: 订阅主题
        :param message: 消息
        :param callback: 是否需要返回值,默认需要
        :return:
        """
        client = self.connect_mqtt()
        self.publish(client=client, topic=topic, message=message)
        if callback:
            client.subscribe(self.client_id)
            client.loop_forever()
        else:
            client.loop_start()


if __name__ == "__main__":
    _client_id = "mqtt-tcp-pub-{id}".format(id=time.time() * 100000)
    data = {"client_id": _client_id, "data": {"k": "v"}, "callback": True}

    # pub = Publish(client_id=_client_id, host="127.0.0.1", port=1883, keepalive=60)
    # pub.send(topic="TEST", message=data, callback=True)
    # print(pub.message)
# -*- coding: utf-8 -*-

"""
@author: Mr_zhang
@software: PyCharm
@file: subscribe.py
@time: 2022/5/7 21:30
"""

import json
import time

from paho.mqtt import client as mqtt_client


class Subscribe:
    """消息订阅者"""

    def __init__(self, client_id, host, port, keepalive=60):
        self.client_id = client_id
        self.host = host
        self.port = port
        self.keepalive = keepalive
        self.topic = None
        self.username = "admin"
        self.password = "admin"

    @staticmethod
    def on_connect(client, userdata, flags, rc):
        """当代理响应连接请求时调用"""
        print("on_connect: ", client, rc)

    @staticmethod
    def on_disconnect(client, userdata, rc):
        """当与代理断开连接时调用"""
        print("on_disconnect: ", client, rc)

    @staticmethod
    def on_message(client, userdata, message):
        """当收到关于客户订阅的主题的消息时调用"""
        print("on_message: ", client)
        _message = json.loads(message.payload.decode())
        callback = _message.get("callback")
        print(_message)
        data = _message.get("data")
        # 这里执行单独业务逻辑,如果是HTTP请求,建议设置超时时间
        if callback:
            # 是否需要返回值
            client_id = _message.get("client_id")
            client.publish(
                client_id,
                payload=json.dumps({"client_id": client_id, "message": "自定义消息体返回."}),
                qos=1,
            )

    @staticmethod
    def on_publish(client, userdata, mid):
        """当使用使用publish()发送的消息已经传输到代理时被调用"""
        print("on_publish: ", client, userdata, mid)

    @staticmethod
    def on_subscribe(client, userdata, mid, granted_qos):
        """当代理响应订阅请求时被调用"""
        print("on_subscribe: ", client)

    @staticmethod
    def on_unsubscribe(client, userdata, mid):
        """当代理响应取消订阅请求时调用"""
        print("on_unsubscribe: ", client)

    @staticmethod
    def on_log(client, userdata, level, buf):
        """当客户端有日志信息时调用"""
        print("on_log: ", client)

    def connect_mqtt(self):
        """连接mqtt服务器"""
        client = mqtt_client.Client(
            self.client_id, protocol=mqtt_client.MQTTv311, transport="tcp"
        )
        client.username_pw_set(self.username, self.password)
        client.on_connect = self.on_connect
        client.on_disconnect = self.on_disconnect
        client.on_message = self.on_message
        client.on_publish = self.on_publish
        client.on_subscribe = self.on_subscribe
        client.on_unsubscribe = self.on_unsubscribe
        # client.on_log = self.on_log
        client.connect(host=self.host, port=self.port, keepalive=self.keepalive)
        return client

    @staticmethod
    def subscribe(client, topic):
        """发布消息"""
        client.subscribe(topic, qos=1)

    def receive(self, topic):
        """主程序运行"""
        self.topic = topic
        client = self.connect_mqtt()
        self.subscribe(client, topic)
        client.loop_forever()


if __name__ == "__main__":
    _client_id = "mqtt-tcp-sub-{id}".format(id=time.time() * 100000)
    # pub = Subscribe(
    #     client_id=_client_id, host="127.0.0.1", port=1883, keepalive=60
    # )
    # pub.receive(topic="TEST")

生产者跟消费者的角色有时候可以互相转换。生产者亦可作为消费者。

mqtt模拟HTTP请求。可获取状态码信息

标签:示例,self,mqtt,client,def,message,id,pato
来源: https://www.cnblogs.com/52-qq/p/16257381.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有