ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spring整合Mqtt简单使用

2022-05-24 19:01:20  阅读:162  来源: 互联网

标签:Spring eclipse springframework Mqtt client 整合 org import new


前言

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅模式的轻量级通讯协议,构建于TCP/IP协议之上,
优点是低开销,低宽带占用,适用于物联网、小型设备等弱网环境。

Linux下安装Mqtt服务器

使用Docker安装

docker pull emqx/emqx

这是一个开源的MQTT协议实现,支持MQTT5.0版本。

docker run -d --name mqtt -p 1883:1883 emqx/emqx

创建容器实例,MQTT默认端口号1883

配置账号密码

MQTT协议支持多种认证方式,如固定账号密码,查询MySQL,查询Redis等。具体可以查看EMQX认证
这里我们使用EMQX内置Mnesia数据库存储账号密码。进入容器交互

docker exec -it ba087715dd9b /bin/bash

修改/etc/plugins/emqx_auth_mnesia.conf配置文件,配置账号密码

auth.user.1.username = test1
auth.user.1.password = 123456

启用emqx_auth_mnesia插件

emqx_ctl plugins load emqx_auth_mnesia

关闭匿名访问,修改/etc/emqx.conf配置文件

allow_anonymous = false

重启容器

docker restart ba087715dd9b

桌面客户端连接

MQTTX-下载地址,效果图如下

Java客户端

添加maven依赖

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.2</version>
</dependency>

发布主题

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class TestMqttPublish {

  public static void main(String[] args) throws MqttException {
    MqttClient mqttClient = createMqttClient();
    MqttMessage message = new MqttMessage("this is a message".getBytes());
    // 服务质量 
    message.setQos(2);
    // 发布消息
    mqttClient.publish("first_topic", message);
    // 断开连接
    mqttClient.disconnect();
    mqttClient.close();
  }

  private static MqttClient createMqttClient() throws MqttException {
    // 服务器地址
    String broker = "tcp://xxx:1883";
    String clientId = "emqx_test";//每个客户端必须唯一,可以用随机值
    MemoryPersistence persistence = new MemoryPersistence();
    MqttClient client = new MqttClient(broker, clientId, persistence);
    // 配置账号密码
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setUserName("test1");
    connOpts.setPassword("123456".toCharArray());
    connOpts.setCleanSession(true);
    // 建立连接
    client.connect(connOpts);
    return client;
  }

}

关于服务质量QOS,有3种取值

  • 0:至多一次,消息可能会丢失或重复
  • 1:至少一次,消息确保到达,但可能重复
  • 2:只有一次,确保消息到达一次

订阅主题

import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class TestMqttSubscribe {

  public static void main(String[] args) throws MqttException, InterruptedException {
    MqttClient mqttClient = createMqttClient();
    // 设置消息回调处理
    mqttClient.setCallback(new MyHandler());
    // 订阅消息
    mqttClient.subscribe("first_topic");
    TimeUnit.SECONDS.sleep(10);
    // 断开连接
    mqttClient.disconnect();
    mqttClient.close();
  }

  private static MqttClient createMqttClient() throws MqttException {
    // 服务器地址
    String broker = "tcp://xxx:1883";
    String clientId = "emqx_test2";//每个客户端必须唯一,可以用随机值
    MemoryPersistence persistence = new MemoryPersistence();
    MqttClient client = new MqttClient(broker, clientId, persistence);
    // 配置账号密码
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setUserName("test1");
    connOpts.setPassword("123456".toCharArray());
    connOpts.setCleanSession(true);
    // 建立连接
    client.connect(connOpts);
    return client;
  }

  static class MyHandler implements MqttCallback {

    /**
     * 连接异常断开
     */
    @Override
    public void connectionLost(Throwable cause) {
      cause.printStackTrace();
    }

    /**
     * 消息到达
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
      String msg = new String(message.getPayload());
      System.out.println(String.format("客户端接收到消息,主题为:%s,内容为:%s", topic, msg));
    }

    /**
     * 消息传输完成
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
      System.out.println("消息传输完成");
    }
  }

}

注意,消息订阅的客户端和消息发布的客户端的clientId必须不一样。

MQTT5新特性

MQTT5增加了共享订阅的功能,相当于订阅端的负载均衡功能,在5.0之前,如果有多个客户端订阅了同一个主题,那么这多个客户端都会接收到此消息。这种情况下,只能由订阅者自行处理去重(防止多次消费)。
共享订阅要求我们的主题格式必须为$share/{group}/{filter}

  • $share: 固定前缀,表明这是一个共享订阅
  • {group} : 群组名,是一个不包含 "/", "+" 以及 "#" 的字符串。订阅会话通过使用相同的{group}表示共享同一个订阅,匹配该订阅的消息每次只会发布给其中一个客户端。
    例如,假设订阅者s1,s2,s3属于群组g1,订阅者s4,s5属于群组g2。那么当 EMQX 向这个主题发布消息msg1的时候,s1,s2,s3中只有一个会收到 msg1,s4,s5中只有一个会收到 msg1
                                       [s1]
           msg1                      /
[emqx]  ------>  "$share/g1/topic"    - [s2] got msg1
         |                           \
         |                             [s3]
         | msg1
          ---->  "$share/g2/topic"   --  [s4]
                                     \
                                      [s5] got msg1

  • {filter}: 即非共享订阅中的主题过滤器

订阅主题代码为

mqttClient.subscribe("$share/mqtt/first_topic"); //如果非共享主题为/server/first_topic,那么共享主题为$share/mqtt//server/first_topic

发布主题代码为

mqttClient.publish("first_topic", message);

如果想要使用更多MQTT5新特性,需要使用下面的maven依赖

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
  <version>1.2.5</version>
</dependency>

更多新特性介绍,可以查看MQTT 5.0

Spring整合Mqtt

添加maven依赖

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
  <version>5.1.6.RELEASE</version>
</dependency>

代码实现

import java.util.UUID;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
@IntegrationComponentScan
public class MqttClientConfig {

  /**
   * 连接工厂,配置账号密码等信息
   */
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    mqttConnectOptions.setUserName("test1");
    mqttConnectOptions.setPassword("123456".toCharArray());
    mqttConnectOptions.setServerURIs(new String[]{"tcp://xxx:1883"});
    mqttConnectOptions.setKeepAliveInterval(2);
    mqttConnectOptions.setAutomaticReconnect(true);
    factory.setConnectionOptions(mqttConnectOptions);
    return factory;
  }


  private String createClientId() {
    return UUID.randomUUID().toString();
  }

  /**
   * 配置client,发布.
   */
  @Bean
  @ServiceActivator(inputChannel = "mqttOutboundChannel")
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        createClientId(), mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultQos(2);
    messageHandler.setDefaultRetained(false); //不保留消息
    return messageHandler;
  }

  @Bean
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }

  //接收通道
  @Bean
  public MessageChannel mqttInputChannel() {
    return new DirectChannel();
  }

  /**
   * 配置client,监听的topic.
   */
  @Bean
  public MessageProducer inbound() {
    String[] topics = {"$share/mqtt/first_topic"};
    MqttPahoMessageDrivenChannelAdapter adapter =
        new MqttPahoMessageDrivenChannelAdapter(createClientId(),
            mqttClientFactory(), topics);
    adapter.setTaskScheduler(new ThreadPoolTaskScheduler());
    adapter.setCompletionTimeout(3_000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(2);
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
  }

  /**
   * 消息处理器
   */
  @Bean
  @ServiceActivator(inputChannel = "mqttInputChannel")
  public MessageHandler handler() {
    return (message -> {
      String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
      String payload = message.getPayload().toString();
      System.out.println("消息主题:" + topic);
      System.out.println("消息内容:" + payload);
    });
  }
}

底层也是使用的org.eclipse.paho.client.mqttv3依赖。接下来配置消息网关

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

  void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}

后续直接依赖此网关对象就可以了,Spring底层使用GatewayProxyFactoryBean来实例化此Bean。SpringBoot项目中配置上述两个类就可以使用了。

参考

MQTT 入门介绍
EMQX文档

标签:Spring,eclipse,springframework,Mqtt,client,整合,org,import,new
来源: https://www.cnblogs.com/strongmore/p/16297164.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有