ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

rabbitMQ的几种工作模式及代码demo(二)-----订阅模式之广播fanout交换机

2021-11-15 18:31:07  阅读:160  来源: 互联网

标签:fanout String exchange 队列 demo 模式 交换机 channel


  1. 订阅模式
    • 订阅模式相比于点对点模式,多了一个交换机exchange角色。
    • 生产者会将消息发送给exchange
    • exchange一端接收消息。一端处理消息。例如,交给特定的队列、发送给所有的队列、或者将消息丢弃。
    • exchange有三种类型:Fanout广播、Direct定向、Topic通配符。
      1. Fanout:广播,将消息交给所有绑定到交换机的队列
      2. Direct:定向,把消息交给符合指定routing key 的队列
      3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    • exchange只负责转发消息,不负责存储消息。所以如果没有任何队列和交换机绑定,或者没有符合路由规则的队列,那么消息就会被丢弃。
  2. Publish/Subscribe发布与订阅模式
    • 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
      image
    • 广播交换机-生产者代码
    public class Producer {
        public static void main(String[] args) throws Exception {
    
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            /*
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
            */
            String exchangeName = "test_fanout";
            //5. 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
            //6. 创建队列
            String queue1Name = "test_fanout_queue1";
            String queue2Name = "test_fanout_queue2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7. 绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
             */
            channel.queueBind(queue1Name,exchangeName,"");
            channel.queueBind(queue2Name,exchangeName,"");
    
            String body = "日志信息:张三调用了findAll方法...日志级别:info...";
            //8. 发送消息
            channel.basicPublish(exchangeName,"",null,body.getBytes());
    
            //9. 释放资源
            channel.close();
            connection.close();
        }
    }
    
    • 广播交换机-对应队列1的消费者代码
    public class Consumer1 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue1Name = "test_fanout_queue1";
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue1Name,true,consumer);
        }
    }
    
    • 广播交换机-对应队列2的消费者代码
        public class Consumer2 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            String queue2Name = "test_fanout_queue2";
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue2Name,true,consumer);
        }
    }
    

标签:fanout,String,exchange,队列,demo,模式,交换机,channel
来源: https://www.cnblogs.com/rbwbear/p/15557669.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有