ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

SpringBoot 第二十二篇 之 使用RabbitMQ

2019-08-16 20:40:58  阅读:364  来源: 互联网

标签:第二十二 java SpringBoot RabbitMQ springframework rabbit org public amqp


1. 简介:

    RabbitMQ是一个在AMQP基础上可复用的企业消息系统。

    AMQP(Advanced(预先) Message Queuing Protocol(协议)) 是一个提供统一消息服务的应用层标准协议,基于此协议的        客户端与消息中间件可传递消息,

     并不受客户端中间件不同产品,不同开发语言的限制。

    RabbitMQ 遵循AMQP协议,用erlang语言开发,用来通过协议在完全不同应用之间共享数据,

    或者将作业排队以便让分布式服务器进行处理。

2. 组件说明

1. Broker: 消息队列服务器实体

2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

3. Queue:  消息队列的载体,每个消息都会被投入到一个,或多个队列。

4. Binding: 绑定,它的作用就是把exchange和队列按照一定的规则绑定起来。

5. Routing Key : 路由关键字,exchange 根据这个关键字进行消息投递,

6. channel: 消息通道,在客户端的每个链接里,可建立多个channel,每个channel

     代表一个绘画任务。

7. vhost: 虚拟主机,一个broker中可开设多个vhost,用作不同用户的权限分离。

8. producer: 消息生产者。

9. consumer: 消息消费者。

 

10. 消息队列的使用过程:

    (1)生产者链接到服务器,声明一个exchange,并设置交换机类型等属性。

    (2)消费者 连接到消息队列服务器,打开一个channel

    (3)消费者 声明一个queue,并设置相关属性。

    (4)消费者 使用routing key, 绑定 exchange 和 queue。

    (5)生产者 投递消息到exchange

    (6)exchange 接收到消息后,根据消息的key和已经设置的binding,进行消息路由,将消息

             投递到一个或多个队列。

总结:

   使用rabbitmq 可以解耦应用程序,使用消息在应用程序中进行传递,

   将一些无需及时返回且耗时的操作提取出来,进行异步处理,可节省服务器相应时间,

    提高系统吞吐量。

2. 安装

    安装rabbitmq前,先要安装erlang 并配置好环境变量(erlang_home, path 都要设)。

    2.1.  监测erlang 是否安装好的方法是:

     cmd到erlang安装目录的bin文件夹下:

       erl -version

    2.2  监测rabbitmq是否安装好的方法是:

        cmd到安装目录的sbin目录下,运行

        rabbitmqctl status

 

3. springboot 使用 rabbitmq 示例:

     Rabbit MQ 服务器 会根据路由键将消息从交换路由到消息队列中,如何处理投递多个队列的情况?

   这里不同类型的交换机起到了重要作用。分别是 fanout,direct,topic, 每种类型实现了不同的路由算法。

 

     3.1 使用 Fanout  Exchange 交换机

         当使用Fanout Exchange 时,无需处理路由键,很像子广播,

          每一台与该交换机绑定的队列都会获得一份消息。

          Fanout 交换机处理消息是最快的。

       示例:

          

          1 所需依赖:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

       2  加载交换机,队列,到容器,并绑定。

/**
 * 初始化Fanout交换机和队列,并将其绑定
 * */
@Configuration
public class FanoutConfig {

    // 初始化创建交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitConstants.FANOUT_EXCHANGE);
    }

    // 初始化创建队列
    @Bean
    public Queue getQueue1(){
        return  new Queue(RabbitConstants.F_QUEUE1,true);
    }
    @Bean
    public Queue getQueue2(){
        return  new Queue(RabbitConstants.F_QUEUE2,true);
    }
    @Bean
    public Queue getQueue3(){
        return  new Queue(RabbitConstants.F_QUEUE3,true);
    }

    // 将交换机和 和队列绑定
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(getQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(getQueue2()).to(fanoutExchange());
    }
    @Bean
    public Binding binding3(){
        return BindingBuilder.bind(getQueue3()).to(fanoutExchange());
    }
}

  3. 生产者

package com.chenyun.web;

import com.chenyun.config.RabbitConstants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class FanoutPublisherController implements RabbitTemplate.ConfirmCallback {

    private String exchange_name = "test_fanout_exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/fanout/test")
    public String send(String msg){

        System.out.println("发送消息:【"+ msg+"】");
        // 设置回调函数
        rabbitTemplate.setConfirmCallback(this);
        // 发送消息,
        rabbitTemplate.convertAndSend(RabbitConstants.FANOUT_EXCHANGE,"",msg);

        return msg;

    }


    // 回调确认方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ask, String cause) {
        if(ask) {
            System.out.println("消费成功!");
        }else {
            System.out.println("消费失败!:" + cause);
        }
    }
}

    4  消费者

package com.chenyun.web;

import com.chenyun.config.RabbitConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutCustomer {

    // 定义监听字符串队列
    @RabbitListener(queues = {RabbitConstants.F_QUEUE1})
    public void receiveMsg1(String msg){
        System.out.println("F_QUEUE1 收到消息:"+msg);

    }

    // 定义监听字符串队列
    @RabbitListener(queues = {RabbitConstants.F_QUEUE2})
    public void receiveMsg2(String msg){
        System.out.println("F_QUEUE2 收到消息:"+msg);
    }

    // 定义监听字符串队列
    @RabbitListener(queues = {RabbitConstants.F_QUEUE3})
    public void receiveMsg3(String msg){
        System.out.println("F_QUEUE3 收到消息:"+msg);
    }

}

    源码地址: https://github.com/sss996/springboot-rabbitmq 

   3.2 使用Topic Exchange

       topic exchange 将路由键 和 某个模式进行匹配。

       此时队列需要绑定在一个模式上。

       通过“#” 匹配一个或多个词,

       通过“*”匹配一个词。因此 “fin.#” 能匹配到“fin.aa.bb",

       但是fin.* 只能匹配到fin.aa

       代码示例:

         1. 初始化交换机,队列,并绑定

            

package com.chenyun.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 初始化Fanout交换机和队列,并将其绑定
 * */
@Configuration
public class TopicConfig {

    // 初始化创建交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(RabbitConstants.TOPIC_EXCHANGE);
    }

    // 初始化创建队列
    @Bean
    public Queue getQueue1(){
        return  new Queue(RabbitConstants.T_QUEUE1,true);
    }
    @Bean
    public Queue getQueue2(){
        return  new Queue(RabbitConstants.T_QUEUE2,true);
    }

    // 将交换机和 和队列绑定
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(getQueue1()).to(topicExchange()).with("insert.#");
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(getQueue2()).to(topicExchange()).with("insert.user");
    }

}

     3.2.2  生产者

package com.chenyun.web;

import com.chenyun.config.RabbitConstants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TopicPublisherController implements RabbitTemplate.ConfirmCallback {

    private String exchange_name = "test_fanout_exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/topic/test")
    public String send(String msg,String routingKey){

        System.out.println("发送消息:【"+ msg+"】");
        // 设置回调函数
        rabbitTemplate.setConfirmCallback(this);
        // 发送消息,
        rabbitTemplate.convertAndSend(RabbitConstants.TOPIC_EXCHANGE,"insert.user",msg);

        return msg;

    }

    // 回调确认方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ask, String cause) {
        if(ask) {
            System.out.println("消费成功!");
        }else {
            System.out.println("消费失败!:" + cause);
        }
    }
}

  3.2.3  消费者

package com.chenyun.web;

import com.chenyun.config.RabbitConstants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicCustomer {

    // 定义监听字符串队列T
    @RabbitListener(queues = {RabbitConstants.T_QUEUE1})
    public void receiveMsg1(String msg){
        System.out.println("T_QUEUE1 收到消息:"+msg);

    }

    // 定义监听字符串队列
    @RabbitListener(queues = {RabbitConstants.T_QUEUE2})
    public void receiveMsg2(String msg){
        System.out.println("T_QUEUE2 收到消息:"+msg);
    }

}

       源码地址: https://github.com/sss996/springboot-rabbitmq  

   3.3  使用 Direct Exchange 

        Direct Exchange .需要将一个队列绑定到交换机上,要求该消息与路由键完全匹配

        用法和topic 基本相同不在重复。

          源码地址: https://github.com/sss996/springboot-rabbitmq 

        

4.   借鉴的博客  https://blog.csdn.net/u013045552/column/info/17329

5.   踩过的坑

     在浏览器中输入localhost:15672 可以打开http客户端。

注意:server端口号是 5672 ,sprnigboot项目里注意不能绑定15672,应该是5672 否则会报如下错误。

java.net.SocketException: Socket Closed
	at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_151]
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_151]
	at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_151]
	at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_151]
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[na:1.8.0_151]
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265) ~[na:1.8.0_151]
	at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) ~[na:1.8.0_151]
	at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580) ~[amqp-client-5.1.2.jar:5.1.2]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

2019-07-30 20:06:02.943 ERROR 13320 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
	at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:484) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:626) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1797) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:345) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:995) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: java.net.ConnectException: Connection refused: connect
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_151]
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_151]
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_151]
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_151]
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_151]
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_151]
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_151]
	at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_151]
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:955) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907) ~[amqp-client-5.1.2.jar:5.1.2]
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:847) ~[amqp-client-5.1.2.jar:5.1.2]
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:457) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	... 9 common frames omitted

 

    

标签:第二十二,java,SpringBoot,RabbitMQ,springframework,rabbit,org,public,amqp
来源: https://blog.csdn.net/sss996/article/details/97673244

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有