ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

springboot 集成websocket 实现集群消息推送

2021-08-05 19:30:38  阅读:191  来源: 互联网

标签:websocket springboot springframework annotation org import 推送 public amqp


1.简介

由于遇到异步的接口调用,异步任务处理结果会写在rabbitmq中,部署方式为了实现高可用会使用开启多个微服务实例。无论哪个微服务消费了mq,都能把消息推送到所有的微服务的前端。

2.配置

2.1pom.xml

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>com.github.pagehelper</groupId>
			<artifactId>pagehelper-spring-boot-starter</artifactId>
			<version>1.2.9</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>

2.2application.yml

spring:
  rabbitmq:
    host: 10.1.1.6
    port: 5678
    username: lys
    password: lys
#    publisher-confirms: true
    publisher-returns: true
    #以下配置消费者方需要
    listener:
      simple:
        acknowledge-mode: manual
    template:
      mandatory: true
  redis:
    host: 10.6.6.6
    port: 32252
    timeout: 5001
    password: lys
    

3.mq消息

3.1TopicRabbitConfig

package com.lys.config.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Auther: liuysh
 * @Date: 2021/5/24 15:51
 * @Description:
 */
@Configuration
public class TopicRabbitConfig {
    /**
     * 给topic队列起名
     */
    public static final  String TOPIC_QUEUE = "myTopic";

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_exchange",true,false);
    }

    @Bean
    public Queue myQueue(){
        return  new Queue(TopicRabbitConfig.TOPIC_QUEUE ,true,false,false);
    }

    @Bean
    public Binding bind(){
        return BindingBuilder.bind(myQueue()).to(fanoutExchange());
    }
}

3.2 MessageListenerConfig

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Auther: liuysh
 * @Date: 2021/5/24 16:39
 * @Description:
 */
@Configuration

public class MessageListenerConfig {
    private final CachingConnectionFactory connectionFactory;

    @Autowired
    public MessageListenerConfig(
            CachingConnectionFactory connectionFactory
    ) {
        this.connectionFactory = connectionFactory;
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return container;
    }
}

3.3 FanoutListenerFirst


import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Auther: liuysh
 * @Date: 2021/5/24 16:03
 * @Description:
 */
@Slf4j
@Component
public class FanoutListenerFirst implements ChannelAwareMessageListener {



    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    @RabbitListener(queues = TopicRabbitConfig.TOPIC_CDB_RDS_CONSOLE_QUEUE)
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {

            String json=new String(message.getBody());


           stringRedisTemplate.convertAndSend("myRedisTopic",json);
            
           //为true表示确认之前的所有消息  false表示只来处理着当前的消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("topic模式 监听者 one  处理消息时显示异常,异常是:{},现拒绝消费当前消息且不再放回队列",e);
            //为true会重新放回队列
             channel.basicReject(deliveryTag, false);
            // 为了防止存在有异常的message堆积,故异常的也进行消费
            // channel.basicAck(deliveryTag, false);


        }
    }

    


}

4.redis 订阅消费通信

4.1RedisCacheConfig

import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * @Auther: liuysh
 * @Date: 2021/8/2 15:29
 * @Description:
 */
@Configuration
@EnableCaching
public class RedisCacheConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 可以添加多个 messageListener,配置不同的交换机
        container.addMessageListener(listenerAdapter, new PatternTopic("myRedisTopic"));

        return container;
    }

    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}

4.2 RedisReceiver

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

/**
 * @Auther: liuysh
 * @Date: 2021/8/2 15:29
 * @Description:
 */
@Component
@Slf4j
public class RedisReceiver {
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;
    public void receiveMessage(String message) {
        log.info("redis 订阅者收到的消息:"+message);
        JSONObject jSONObject=JSON.parseObject(message);
        simpMessagingTemplate.convertAndSend(WsConstant.WS_TOPIC, message);
        
    }
}

5.WebSocketConfig

package com.sugon.cloud.config;

import com.sugon.cloud.common.bean.RdsWsConstant;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @Auther: liuysh
 * @Date: 2021/7/10 13:37
 * @Description:
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {


    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册一个Stomp 协议的endpoint,并指定 SockJS协议
        registry.addEndpoint("/ws_endpoint")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //广播式应配置一个/topic 消息代理
        registry.enableSimpleBroker("/topic/instanceStatus");
    }
}

6.前端代码

<!DOCTYPE html>
<html lang="zh-CN" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8" />
    <title>Spring Boot+WebSocket+广播式</title>

</head>
<body onl oad="disconnect()">
<noscript><h2 style="color: #ff0000">貌似你的浏览器不支持websocket</h2></noscript>
<div>
    <div>
        <label>输入实例instanceId</label><input type="text" id="name" value="1" />
        <button id="connect" onclick="connect();">连接</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect();">断开连接</button>
    </div>
    接收的数据:
    <div id="conversationDiv">

        <p id="response"></p>
    </div>
</div>
<script th:src="@{sockjs.min.js}"></script>
<script th:src="@{stomp.min.js}"></script>
<script th:src="@{jquery.js}"></script>
<script type="text/javascript">
    var stompClient = null;
    var instanceId=null;



    function setConnected(connected) {
        document.getElementById('connect').disabled = connected;
        document.getElementById('disconnect').disabled = !connected;
        document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';
        $('#response').html();
    }
	
    function connect() {
        var instanceId= $('#name').val();
        var socket = new SockJS('/ws_endpoint'); //链接SockJS 的endpoint 名称为"/ws_endpoint"
        stompClient = Stomp.over(socket);//使用stomp子协议的WebSocket 客户端
        stompClient.connect({}, function(frame) {//链接Web Socket的服务端。
            setConnected(true);
            console.log('Connected: ' + frame);
            var topic=instanceId.length>0?'/topic/instanceStatus/'+instanceId:'/topic/instanceStatus';
            stompClient.subscribe(topic, function(respnose){
                //订阅/topic/getResponse 目标发送的消息。这个是在控制器的@SendTo中定义的。
                showResponse(Date()+"::"+instanceId+":::"+respnose.body);
            });
        });



    }


    function disconnect() {
        if (stompClient != null) {
            stompClient.disconnect();
        }
        setConnected(false);
        console.log("Disconnected");
    }



    function showResponse(message) {
        $('#response').append("<b>Received: " + message + "</b><br/>")
    }
</script>
</body>
</html>

标签:websocket,springboot,springframework,annotation,org,import,推送,public,amqp
来源: https://blog.csdn.net/liuyunshengsir/article/details/119424910

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有