ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

gmqtt自定义插件验证随笔

2022-07-09 20:01:22  阅读:170  来源: 互联网

标签:插件 return nil 自定义 gmqtt storage Storage server


==故事背景==

工业物联场景经常会用到mqtt协议,调研了开源组件gmqtt,尝试做一些改造实验。

 

==改造内容==

增加写入数据至influxdb的处理

 

==改造过程==

【下载代码】

github路径:https://github.com/DrmagicE/gmqtt

下载代码zip包,生成自己的代码环境。

 

【置换包名】

下载下来的源码,所有的import默认都是github路径,先修改为自己本地路径,全局置换import路径

置换前:github.com/DrmagicE/gmqtt

置换后:golang-demo/gmqtt

 

【编译运行】

弄两个bat文件用户快速编译及运行代码

 

1、编译用的bat文件

路径:gmqtt\cmd\gmqttd

文件名:build_linux.bat

SET CGO_ENABLED=0
SET GOOS=linux
SET GOARCH=amd64
go build .

 

2、本地运行用bat文件

路径:gmqtt\cmd\gmqttd

文件名:run.bat

SET CGO_ENABLED=0
SET GOOS=windows
SET GOARCH=amd64
go build .

.\gmqttd.exe start -c default_config.yml

 

先执行.\build_linux.bat,确保环境正确编译

在执行.\run.bat,确定可以本地运行代码

如果正确运行,从控制台可以看到启动了1883的tcp服务端口

 

【验证环境】

去网上找一个MQTT的客户端工具,尝试与本地的MQTT服务建立连接,并发送及订阅消息。

我常用的是MQTTBOX,我的百度网盘地址:

链接:https://pan.baidu.com/s/1eMDETbub2Cg4jGEqLEC7sA
提取码:0eor

 

工具下载安装之后,配置连接本地服务:

 

如果正确连接之后,工具会提示正确连接,代码控制台会提示已经建立连接的日志

 

后台提示已经收到一个新的连接

 

尝试随便弄一个topic,发送并订阅 

 

下图为已经可以正确发送及订阅的截图。

 

至此,一个本地可以正常运行的gmqtt代码已经准备完毕,下面开始尝试做手术。 

 

【生成代码】

代码名称:storage

生成命令:gmqctl gen plugin -c -n storage -o D:\work\10_Git\golang -H OnMsgArrived 

参考博客:https://www.cnblogs.com/quchunhui/p/16461256.html

 

因为我们暂时只测试写入数据,所以钩子函数只用了OnMsgArrived 。

因为需要有Influxdb的连接配置,所以指定了-c生成配置文件。

代码正确生成之后,复制到plugin/storage目录下,如下图

 

生成之后的代码一样修改import的路径,确保不会报红。

  

 

【修改配置文件】

(官方提供的go generate命令我没能验证通过,于是全部通过手动的方式引入的我自己写storage插件,后续再补充验证)

生成之后的插件代码,如果需要生效,需要修改以下配置:

1、gmqtt/plugin_imports.yml

 

2、gmqtt/cmd/gmqttd/plugins.go

 

3、gmqtt/cmd/gmqttd/default_config.yml

配置插件导入顺序及influxdb连接相关配置

 

【编写插件逻辑代码】

直接上代码,大概就是加载配置文件,然后处理收到消息的钩子函数,向influxdb中写入数据这样。

1、config.go

package storage

import "fmt"

// Config is the configuration for the storage plugin.
type Config struct {
    Addr     string `yaml:"addr"`
    OrgId    string `yaml:"orgId"`
    BucketId string `yaml:"bucketId"`
    Token    string `yaml:"token"`
}

// Validate validates the configuration, and return an error if it is invalid.
func (c *Config) Validate() error {
    return nil
}

// DefaultConfig is the default configuration.
var DefaultConfig = Config{}

func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
    type cfg Config
    var v = &struct {
        Storage cfg `yaml:"storage"`
    }{
        Storage: cfg(DefaultConfig),
    }
    if err := unmarshal(v); err != nil {
        return err
    }
    empty := cfg(Config{})
    if v.Storage == empty {
        v.Storage = cfg(DefaultConfig)
    }
    fmt.Println("storage:", v)
    *c = Config(v.Storage)
    return nil

}

 

2、hooks.go

package storage

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    influxdb2 "github.com/influxdata/influxdb-client-go/v2"
    "golang-demo/gmqtt/server"
    "time"
)

func (s *Storage) HookWrapper() server.HookWrapper {
    return server.HookWrapper{
        OnMsgArrivedWrapper: s.OnMsgArrivedWrapper,
    }
}

func (s *Storage) OnMsgArrivedWrapper(pre server.OnMsgArrived) server.OnMsgArrived {
    return func(ctx context.Context, client server.Client, req *server.MsgArrivedRequest) error {
        err := pre(ctx, client, req)
        if err != nil {
            return err
        }
        if req.Message == nil {
            return errors.New("message is nil")
        }
        if req.Message.Topic != "/qch/data" {
            fmt.Println("not target topic, ", req.Message.Topic)
            return nil
        }

        // 数据样例:
        // {
        //    "deviceId":"qch_test_device",
        //    "pointId":"AR_TEST1",
        //    "pointValue":342242
        //}
        var data map[string]interface{}
        err = json.Unmarshal(req.Message.Payload, &data)
        if err != nil {
            return errors.New("message is json")
        }
        fmt.Println("arrived message:", data)

        measurement := data["deviceId"].(string)
        fmt.Println("measurement=", measurement)
        pointId := data["pointId"].(string)
        fmt.Println("pointId=", pointId)
        pointValue := data["pointValue"]
        fmt.Println("pointValue=", pointValue)
        point := influxdb2.NewPointWithMeasurement(measurement).AddField(pointId, pointValue).SetTime(time.Now())

        writeAPI := s.Client.WriteAPI(s.OrgId, s.BucketId)
        writeAPI.WritePoint(point)
        writeAPI.Flush()
        return nil
    }
}

 

3、storage.go

package storage

import (
    "fmt"
    influxdb2 "github.com/influxdata/influxdb-client-go/v2"
    "go.uber.org/zap"
    "golang-demo/gmqtt/config"
    "golang-demo/gmqtt/server"
)

var _ server.Plugin = (*Storage)(nil)

const Name = "storage"

func init() {
    server.RegisterPlugin(Name, New)
    config.RegisterDefaultPluginConfig(Name, &DefaultConfig)
}

func New(config config.Config) (server.Plugin, error) {
    log = server.LoggerWithField(zap.String("plugin", Name))
    cfg := config.Plugins[Name].(*Config)
    fmt.Println("storage conf:", cfg)
    cf := &Storage{
        Addr:     cfg.Addr,
        OrgId:    cfg.OrgId,
        BucketId: cfg.BucketId,
        Token:    cfg.Token,
    }
    cf.Client = influxdb2.NewClientWithOptions(cf.Addr, cf.Token, influxdb2.DefaultOptions().SetUseGZip(true).SetBatchSize(5200))
    return cf, nil
}

var log *zap.Logger

type Storage struct {
    Addr     string
    OrgId    string
    BucketId string
    Token    string
    Client   influxdb2.Client
}

func (s *Storage) Load(service server.Server) error {
    log = server.LoggerWithField(zap.String("plugin", Name))
    return nil
}

func (s *Storage) Unload() error {
    return nil
}

func (s *Storage) Name() string {
    return Name
}

 

==验证结果==

执行gmqtt/cmd/gmqttd下面的run.bat文件,程序正常启动之后,通过MQTTBox工具发送json数据。

数据样例:

{
    "deviceId":"qch_test_device",
    "pointId":"AR_TEST1",
    "pointValue":342242
}

 

MQTTBox发送及订阅截图:

 

观察控制台日志,正常会提示收到信息并正确解析的日志。

 

确认influxdb是否正确写入

 

至此基于gmqtt就完成了一个最简单的接收数据,并写入到influxdb的样例程序。

中间还有一些原理不是很明白的地方,以及部分命令还没有验证完成,后续逐渐补充。

 

--End--

 

标签:插件,return,nil,自定义,gmqtt,storage,Storage,server
来源: https://www.cnblogs.com/quchunhui/p/16461599.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有