ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Thingsboard源码分析(三)遥测数据获取

2022-02-23 12:01:28  阅读:373  来源: 互联网

标签:currentUser return 遥测 ts public 源码 EntityId Thingsboard entityId


获取遥测数据

TelemetryController

  • 首先找到入口,比如获取最新遥测数据方法getLatestTelemetry,/DEVICE/deviceId/values/timeseries 在 TelemetryController 中:

    @RestController
    @TbCoreComponent
    // TbUrlConstants.TELEMETRY_URL_PREFIX=="/api/plugins/telemetry"
    @RequestMapping(TbUrlConstants.TELEMETRY_URL_PREFIX)
    @Slf4j
    public class TelemetryController extends BaseController {
    
        // 这个组件负责获取设备遥测数据
        @Autowired
        private TimeseriesService tsService;
    
        // 这个组件时为了验证当前用户是否有权限去执行当前操作
        @Autowired
        private AccessValidator accessValidator;
    
        @Value("${transport.json.max_string_value_length:0}")
        private int maxStringValueLength;
    
        private ExecutorService executor;
        
        @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
        @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET)
        @ResponseBody
        public DeferredResult<ResponseEntity> getLatestTimeseries(
                @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
                @RequestParam(name = "keys", required = false) String keysStr,
                @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException {
            SecurityUser user = getCurrentUser();
    
            return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
                    (result, tenantId, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr, useStrictDataTypes));
        }
        
        private void getLatestTimeseriesValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String keys, Boolean useStrictDataTypes) {
            ListenableFuture<List<TsKvEntry>> future;
            if (StringUtils.isEmpty(keys)) {
                // 如果我们不传入键名列表,则返回所有遥测数据。
                future = tsService.findAllLatest(user.getTenantId(), entityId);
            } else {
                // 如果传入键名列表,则只查找相关的遥测数据。比如查经纬度要将longitude和latitude传入这个接口
                future = tsService.findLatest(user.getTenantId(), entityId, toKeysList(keys));
            }
            Futures.addCallback(future, getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
        }
        
    }
    

AccessValidator

  • 这个类主要是用于判断当前访问者是否有访问遥测数据的权限

  • validateEntityAndCallback 重载方法很多,一定要注意区分。这里使用的是第一个方法

    @Component
    public class AccessValidator {
        ........
        // 这是第一个 validateEntityAndCallback 方法,这一步加入了出错处理方法
        public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, String entityType, String entityIdStr,
                                                                        ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess) throws ThingsboardException {
            // 调用了第二个 validateEntityAndCallback 方法
            return validateEntityAndCallback(currentUser, operation, entityType, entityIdStr, onSuccess,
                   // 加入出错处理方法                          
             	   (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
        }
        
        // 这是第二个 validateEntityAndCallback 方法,这一步根据id和类型(DEVICE)产生EntityId,以便找到这个设备
        public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, String entityType, String entityIdStr,
                                                                        ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess,
                                                                        BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
            // 调用第四个 validateEntityAndCallback 方法
            return validateEntityAndCallback(currentUser, operation,                                    
                   // 根据类型(DEVICE)和id产生EntityId实例,以便查找设备                          
                   EntityIdFactory.getByTypeAndId(entityType, entityIdStr),onSuccess, onFailure);
        }
    	
        // 这是第三个 validateEntityAndCallback 方法,没有被调用
        public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, EntityId entityId,
                                                                        ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess) throws ThingsboardException {
            return validateEntityAndCallback(currentUser, operation, entityId, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
        }
    
        // 这是第四个 validateEntityAndCallback 方法
        public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, EntityId entityId,
                                                                        ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess,
                                                                        BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
    
            final DeferredResult<ResponseEntity> response = new DeferredResult<>();
    		// 调用validate方法,根据当前EntityType来确定使用哪个方法
            validate(currentUser, operation, entityId, new HttpValidationCallback(response,
                    new FutureCallback<DeferredResult<ResponseEntity>>() {
                        @Override
                        public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
                            try {
                                onSuccess.accept(response, currentUser.getTenantId(), entityId);
                            } catch (Exception e) {
                                onFailure(e);
                            }
                        }
    
                        @Override
                        public void onFailure(Throwable t) {
                            onFailure.accept(response, t);
                        }
                    }));
    
            return response;
        }
        
        public void validate(SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
            switch (entityId.getEntityType()) {
                case DEVICE:
                    // EntityType=='DEVICE',调用validateDevice方法,直接返回响应结果到http response
                    validateDevice(currentUser, operation, entityId, callback);
                    return;
                case DEVICE_PROFILE:
                    validateDeviceProfile(currentUser, operation, entityId, callback);
                    return;
                /*。。。。。。。。。。。。。。*/
                default:
                    //TODO: add support of other entities
                    throw new IllegalStateException("Not Implemented!");
            }
        }
        
        private void validateDevice(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
            // 系统管理员没有查看设备遥测数据的权限
            if (currentUser.isSystemAdmin()) {
                callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
            } else {
                // 根据TenantId和DeviceId来查找Device实例
                ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(currentUser.getTenantId(), new DeviceId(entityId.getId()));
                Futures.addCallback(deviceFuture, getCallback(callback, device -> {
                    if (device == null) {
                        return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
                    } else {
                        try {
                            // 查找权限列表判断当前用户有无读取设备遥测数据的权限。如果没有权限将直接报错,然后出错处理
                            accessControlService.checkPermission(currentUser, Resource.DEVICE, operation, entityId, device);
                        } catch (ThingsboardException e) {
                            return ValidationResult.accessDenied(e.getMessage());
                        }
                        return ValidationResult.ok(device);
                    }
                }), executor);
            }
        }
        
        public interface ThreeConsumer<A, B, C> {
            void accept(A a, B b, C c);
        }
    }
    
    
    

TimeseriesService

  • 这个类用于查找最新遥测数据
@Service
@Slf4j
public class BaseTimeseriesService implements TimeseriesService {
    
    // 这个组件负责查找所有最新遥测数据
    @Autowired
    private TimeseriesLatestDao timeseriesLatestDao;
    
    @Override
    public ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
        validate(entityId);
        List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
        keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
        keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key)));
        return Futures.allAsList(futures);
    }

    @Override
    public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
        //校验当前deviceId是否合法
        validate(entityId);
        // 根据tenantId和deviceId返回结果
        return timeseriesLatestDao.findAllLatest(tenantId, entityId);
    }
}

TimeseriesLatestDao

  • 时序查找遥测数据的Dao层接口
@Slf4j
@Component
@SqlTsLatestAnyDaopublic 
class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao {    		@Autowired    
    private SearchTsKvLatestRepository searchTsKvLatestRepository;
    @Override    
    public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
        return getFindAllLatestFuture(entityId);    
    }        
    protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {        
        return Futures.immediateFuture(DaoUtil.convertDataList(Lists.newArrayList(
            searchTsKvLatestRepository.findAllByEntityId(entityId.getId()))));
    }
}

SearchTsKvLatestDao

  • 时序键值查找遥测数据的接口,在这里可以找到sql语句
@SqlTsLatestAnyDao@Repositorypublic class SearchTsKvLatestRepository {
    public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId"; 
    public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, ts_kv_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," +            " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " +            "INNER JOIN ts_kv_dictionary ON ts_kv_latest.key = ts_kv_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)";    
    @PersistenceContext   
    private EntityManager entityManager;   
    public List<TsKvLatestEntity> findAllByEntityId(UUID entityId) { 
        return entityManager.createNamedQuery(FIND_ALL_BY_ENTITY_ID, 
               TsKvLatestEntity.class.setParameter("id",entityId).getResultList());    
    }
}

EntityManager

  • 进行并发查找的类,里面有很多加锁、刷新数据、查找的方法

Futures

  • 异步响应组件。

标签:currentUser,return,遥测,ts,public,源码,EntityId,Thingsboard,entityId
来源: https://blog.csdn.net/qq_46331393/article/details/123086108

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有