ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ生产者消费者模型(二)

2022-01-23 08:00:53  阅读:204  来源: 互联网

标签:java Exchange 生产者 模型 rabbitmq client RabbitMQ com impl


       作为主流的MQ消息队列中间件,RabbitMQ也是具备了生产者消费者的模型,那么也就是说

生产者把消息发送后,消费者来作为接收具体的消息。本文章主要详细的概述RabbitMQ的生产者

投递和消费者监听。

一、消息传递流程

        下面主要详细的总结下RabbitMQ消息队列服务器消息彻底的整体流程,具体汇总如下:

  • 生产者只负责把消息投递到Exchange,这个过程不需要刻意的关注Queue
  • 而由Exchange把消息传递给Queue
  • 作为消费者的程序来负责监听Queue的消息
  • 为了保障消息传递的准确性以及及时性,Exchange与Queue会存在一定的绑定关系就是路由Key

二、MQ投递

        依据RabbitMQ的架构模型,在生产者模型和消费者模型中,其实生产者和消费者并不知道

对方的存在,这是异步通信的特性。作为生产者,它只需要把消息投递到Exchange,在这个过程

中生产者并不需要关注Queue,事实上生产者也是无法关注到Queue的,那么消息是如何让消费者

来监听并且接收的了?这就是说会在Exchange和Queue之间建立一种映射关系,而这层关系就不是

生产者所需要关注的了。作为消费者也不需要刻意的关注Exchange,而只需要监听Queue。

2.1、引入RabbitMQ的jar

        要使用RabbitMQ的前提是需要引入RabbitMQ的jar,那么就需要在pom.xml文件里面新增RabbitMQ

的服务端和客户端,具体如下:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>

2.2、生产者投递步骤

       生产者把消息需要投递给Exchange,那么它的步骤具体总结如下:

ConnectionFactory类负责获取连接工厂

Connection类的对象获取一个连接

Channel创建数据通道信道,可以发送和接收消息

 下面具体是完整的生产者投递的代码,具体如下:

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer
{
    public static void main(String[] args) throws  Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();

        //配置连接mq的地址信息
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        //连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //通过connection来创建Channel
        Channel channel = connection.createChannel();

        //通过channel来发送具体的数据信息
        String msg = "Hello RabbitMQ";
        channel.basicPublish("saas", "", null, msg.getBytes());
        //发送消息成功后,关闭具体的连接
        channel.close();
        connection.close();
    }
}

在如上中,我们可以看到我们首先需要连接到RabbitMQ的服务器,然后在发送消息message的时候我们需要

指定具体的Exchange,因为对于生产者来说,它只关注的是把消息投递给Exchange。

2.3、消费者监听

        生产者把消息投递到Exchange,那么作为消费者就需要来监听具体的消息了。监听的整个过程首先也是

需要建立RabbitMQ的服务器,这部分涉及到的代码具体如下:

package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.*;

public class Consumer
{
    //定义exchange
    private static final String EXCHANGE = "saas";
    //定义队列
    private  static  final String queueName="saas";

    public static void main(String[] args) throws  Exception
    {
        try{
            //创建连接工厂
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //配置连接mq的地址信息
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            //连接工厂创建连接
            Connection connection=connectionFactory.newConnection();

            //通过connection来创建Channel
            Channel channel=connection.createChannel();

            //设置exchange类型为fanout
            channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);

        /*
         定义一个队列
         * 一个队列来接收数据后,消费端才可以从队列里面来接收具体的数据
         * param1:队列名称
         * param2:是否持久化
         * param3:队列是否独占此连接
         * param4:队列不再使用时是否自动删除此队列
         * param5:队列参数
         * */
            channel.queueDeclare(queueName,true,false,false,null);

            channel.queueBind(queueName,EXCHANGE,"");

            //创建一个消费者来消费数据
            DefaultConsumer consumer=new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(
                        String consumerTag,
                        com.rabbitmq.client.Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte [] body) throws  java.io.IOException
                {
                    String message=new String(body);
                    System.out.println("接收到的消息为:"+message);
                };
            };
            // 监听队列,从队列中获取数据
            System.out.println("消费者程序启动成功,准备接收生产者的数据:\n");
            channel.basicConsume(queueName,consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

在如上中,我们看到Exchange与生产端的Exchange名字是一样的,那么只有这样才能够建立绑定关系,

再说的更加简单点来说,生产者把消息给到Exchange,然后Exchange与Queue之间有一个层映射关系,

那么只有这样消费者监听队列才能够收取message的消息。

2.4、绑定关系

        刚才说到Exchange与Queue之间的绑定关系,下面就针对这部分具体的演示下。我们先启动消费者

的程序,启动成功后,就会自动的创建Exchange和Queue,就可以从Exchange的绑定以及Queue的绑定

中能够获取到对应的绑定关系。

2.4.1、Exchange绑定关系

         下面的图是消费者的程序启动后创建的Exchange,以及它的绑定关系,具体如下:

2.4.2、消费者绑定关系

          在Exchange的绑定关系中,点击To里面saas,就会自动的跳转到Queue,具体如下所示:

2.5、406错误避免

        很多初学者在学习RabbitMQ的时候,总是提前创建好Exchange和Queue,这样结果导致消费者的程序报很多

的错误,具体错误如下:

java.io.IOException
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
	at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:783)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:252)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:242)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:222)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:227)
	at com.example.rabbitmq.quickstart.Consumer.main(Consumer.java:31)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
	... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
	at java.lang.Thread.run(Thread.java:748)

其实遇到该问题,最简单解决问题的方式就是删除自己创建的Exchange和Queue。删除后,再次执行消费者的

程序,它会自动创建Exchange和Queue,而且也就不会再报一系列的具体问题了。解决了如上的问题后,再次

执行生产者的程序,就可以看到生产者发送的消息就能够被消费者这边监听到。感谢您的阅读,下个文章主要

介绍Exchange详解。

标签:java,Exchange,生产者,模型,rabbitmq,client,RabbitMQ,com,impl
来源: https://www.cnblogs.com/weke/p/15835674.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有