ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Thingsboard MQTT权限校验源码解读

2021-12-24 16:34:40  阅读:188  来源: 互联网

标签:函数 队列 mqtt 信息 MQTT 源码 send Thingsboard transport


第一次读源码,理解不到位,请多批评

1、接收MQTT连接请求

首先找到MQTT的模块,./common/transport/mqtt,我们可以看到该模块是一个使用Netty封装的mqttServer,通过读取配置文件来初始化这个mqttServer

1. MqttTransportService

@Service("MqttTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true' && '${transport.mqtt.enabled}'=='true')")
@Slf4j
public class MqttTransportService {

    @Value("${transport.mqtt.bind_address}")
    private String host;
    @Value("${transport.mqtt.bind_port}")
    private Integer port;

    @Value("${transport.mqtt.netty.leak_detector_level}")
    private String leakDetectorLevel;
    @Value("${transport.mqtt.netty.boss_group_thread_count}")
    private Integer bossGroupThreadCount;
    @Value("${transport.mqtt.netty.worker_group_thread_count}")
    private Integer workerGroupThreadCount;
    @Value("${transport.mqtt.netty.so_keep_alive}")
    private boolean keepAlive;

    @Autowired
    private MqttTransportContext context;

    private Channel serverChannel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @PostConstruct
    public void init() throws Exception {
        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        log.info("Starting MQTT transport...");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new MqttTransportServerInitializer(context))
                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);

        serverChannel = b.bind(host, port).sync().channel();
        log.info("Mqtt transport started!");
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        log.info("Stopping MQTT transport!");
        try {
            serverChannel.close().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
        log.info("MQTT transport stopped!");
    }
}

代码中这里初始化了一个handler,来具体处理接收到的消息。(好像是netty的使用方式)
在这里插入图片描述

2. MqttTransportHandler

Degbug启动Thingsboard,并让MqttClinet发送连接请求。发现消息是从channelRead函数读入。
在这里插入图片描述
进入processMqttMsgh函数在这里插入图片描述
看到此时header中的消息类型是CONNECT
在这里插入图片描述
进入processConnect函数
我们这里使用的authToken的方式进行连接的,也就是在设备中定义的token。该token作为用户名传入。不需要密码。
在这里插入图片描述
进入processAuthTokenConnect函数
在本函数中,消息被封装成了ValidateBasicMqttCredRequestMsg类型的消息,封装完成之后,调用transportService的process函数,进行消息的传输,将消息传到core模块进行权限校验。
我们看到在process函数中传入了传输类型MQTT,封装好的登录信息,以及校验完成的回调函数。
在这里插入图片描述

3. 总结

至此,MQTT登录消息就处理完毕了,封装好的消息将被传输至core模块进行处理。

2、消息传输准备

1. DefaultTransportService

在process函数中,消息被进一步封装起来,封装成了一个TbProtoQueueMsg<TransportApiRequestMsg>类型的信息,并增加了一个UUID。在这里插入图片描述
消息被传输到了doProcess函数
我们先将细节屏蔽看看结构。
在这里插入图片描述
第一步是下面的代码
在这里插入图片描述
第一个参数是函数的原返回值,第二个参数是转换函数(也就是如何将原返回值进行转化的函数),第三个参数是Executor(详见Guava)
也就是第一步,将send函数的返回结果从ListenableFuture<Response>转换成了ListenableFuture<ValidateDeviceCredentialsResponse>类型的结果
第二步是
在这里插入图片描述
点进去发现实际上是为上一步转换来的response注册listener,也就是注册回调函数。
在这里插入图片描述
而这回调函数,就是之前process函数中传入的回调。
在这里插入图片描述
综上,doProcess函数将返回值转换成合适的格式,并设置回调函数。下面我们进入transportApiRequestTemplate.send(protoMsg)函数

2. DefaultTbQueueRequestTemplate的send函数

(TbQueueRequestTemplate实现类)
在这里插入图片描述

  1. 判断pending状态的请求数量是否超出限制
    在这里插入图片描述
  2. 生成requestId并组好header信息
    可以看到requestId被存放在了请求头
    在这里插入图片描述
  3. pendingRequests存储
    在这里插入图片描述
    pendingRequests以requestId为key,以一个包含了一个可手动添加future值的future句柄和一个过期时间。这个future句柄目前是没有任何设置的。这个函数被返回出去,在DefaultTransportService的doProcess函数中转换了格式并设置了回调函数。
  4. 真正的发送消息
    在这里插入图片描述

3. 总结

消息传输准备阶段,我们从里向外看,DefaultTbQueueRequestTemplate的send函数生成了一个可手动设置值的future操作句柄,并这次请求的requestId为key,包含操作句柄future和本次请求的过期时间的对象为value存储在pendingRequests中(pendingRequests可以保证requestId唯一)。而后send函数将这个future句柄返回,在上层的调用函数中被转换格式并设置了对应的回调函数。

3、消息传输

1. responseTemplate.send()

从DefaultTbQueueRequestTemplate的send函数中的responseTemplate.send()看起。
在这里插入图片描述
可以看到有三个入参

  1. 应该是topic等相关信息
    在这里插入图片描述
    可以看到消息将会被发到一个tb_transport.api.requests的topic中。

  2. reuqest
    就是前面封装后的登录信息

  3. 队列回调
    在这里插入图片描述

应该是看是否成功发送消息到指定topic的,成功做什么事,失败做什么事。可以看到失败的时候,pendingRequests删掉了对应requestId的信息,并直接给future设置了失败信息。

2. InMemoryTbQueueProducer.send

在这里插入图片描述
可以看到信息传输可以通过很多种方式,我们这里先选择内存队列来看。
在这里插入图片描述
可以看到send函数将前面传入的request放入了topicName对应的队列中。topicName和其对应的队列存在一个单例的ConcurrentHashMap中。
在这里插入图片描述
这样就相当于将消息放到了topicName对应的队列中了。

3. 总结

responseTemplate.send()有很多中实现,其结果就是将封装好的登录信息送到对应topic的队列中(可能是内存的也可能是消息中间件或其他形式)等待校验逻辑的消费。当传送成功时,会返回成功信息。失败时将会直接给future句柄设置为失败。

4、消息的消费

前文说明了使用内存队列消息的生产过程,接下来说明一下消息的消费过程。
消息消费的入口在DefaultTbQueueResponseTemplate的init函数中。

1. DefaultTbQueueResponseTemplate的init函数

init函数中有一个while循环,当stopped不为true的时候,while循环将一直运行
在这里插入图片描述

2. 获取所有的登录request

在这里插入图片描述
requestTemplate是一个下面类型的接口。
在这里插入图片描述
调用其poll(轮询)函数。对于我们的内存队列,其有单独的实现类。
在这里插入图片描述
其中关键一步就是
在这里插入图片描述
根据topic的名称,从单例的storage中取出该topic下的所有之前消息生产中存放的请求。

3. request的处理

  1. 解析header信息,获取超时时间requestId和responseTopic
    在这里插入图片描述
  2. 对于不超时的请求进入下面的处理步骤
    在这里插入图片描述
    AsyncCallbackTemplate的withCallbackAndTimeout函数定义如下
    在这里插入图片描述
    使用Futures.withTimeout生成了一个会超时的future句柄,如果超时将自被中断或者取消。在这里插入图片描述
    之后调用了AsyncCallbackTemplate.withCallback前面用过。withCallback里给这个会超时的future添加了withTimeout传入的回调函数。
    下面我们详细看看这三个参数的来龙去脉。

4. 登录信息校验

在这里插入图片描述
这里的handler是transportApiService,在核心服务初始化的时候被设置的。
在这里插入图片描述

在DefaultTransportApiService是其实现类。其handle函数如下。
在这里插入图片描述
我们找到我们对应的类型
在这里插入图片描述
进入validateCredentials函数
在这里插入图片描述
终于我们看到了校验的逻辑,在这里使用userName也就是我们传入的auth_token,经过业务校验获取到相关设备信息。
校验成功之后,将返回一个包含设备信息的future句柄。

5. 成功回调

在这里插入图片描述
这一块是成功后的回调。回调主要是要将设备信息发送回去。
我们可以看到,response的header里设置了REQUEST_ID_HEADER,这个request_id是之前消息发送过来的request中设置的。
在这里插入图片描述
responseTopic也是requestHeader中带的,是mqtt模块接收core信息的topic。
response就是handle里面返回的信息。

6. 总结

DefaultTbQueueResponseTemplate的init函数将不停地获取对应topic的队列中的request信息,并将所有的request进行处理校验,之后,将带有requestId和设备信息的response重新发送到内存队列中responseTopic对应的队列里。等待进一步处理。

5、校验成功后设备信息的返回

responseTemplate.send()函数,同样我们使用内存队列。
在这里插入图片描述
与消息发送过来的方式相同,将对应的responseTopic的队列中,放置刚获得的设备信息。

6、获取设备信息

TbQueueRequestTemplat接口中同样有一个init方法

其中有一个while循环,不停地获取responseTopic对应队列的信息,这些response信息就是设备相关信息

在这里插入图片描述
对于每个response信息,我们获取其header中的requestId,并通过requestId获取pendingRequests中的对应的ResponseMetaData<Response> 类型的expectedResponse信息,并将其中的future句柄的内容设置为response中的设备信息。
再看看最开始的回调信息
在这里插入图片描述
当成功获取到response之后,调用onValidateDeviceResponse进行后续的工作

7、总结

总的来说大致的流程是这样的,mqtt连接发送到mqtt-server,mqtt-server通过消息队列,将连接请求相关信息发送至core进行权限校验和认证,core进行校验之后,如果成功将设备信息发送到消息队列,消息消费后被预先注册的成功回调函数处理。否则被失败回调函数处理。

参考资料

https://www.baeldung.com/guava-futures-listenablefuture
https://www.jianshu.com/p/33ac5d394f68

标签:函数,队列,mqtt,信息,MQTT,源码,send,Thingsboard,transport
来源: https://blog.csdn.net/qq_43583902/article/details/122122868

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有