ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

AMQP-RabbitMQ/3/发布订阅模式

2019-06-08 08:52:18  阅读:270  来源: 互联网

标签:INFO 订阅 AMQP ps RabbitMQ mq rabbit import com


3. 发布订阅模式 Publish/Subscribe - 全集监听fanout

一次向多个消费者发送消息

  • 图示
    发布订阅模式

# 个人理解

  • 生产者定义Exchange,同时将Exchange的类型定义为fanout,并向该Exchange发送消息。
  • 消费者定义队列Queue,并将队列与该交换机进行绑定。之后交换机付负责将消息全量推送给每一个与之绑定的Queue

RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。
相反,生产者只能向Exchange发送消息。Exchange所做的工作非常简单。一方面,它接收来自生产者的消息,另一方面将它们推送到队列。Exchange必须确切知道如何处理它收到的消息。它应该附加到特定队列吗?它应该附加到多个队列吗?或者它应该被丢弃。其规则由交换类型定义 。

有几种交换类型可供选择:direct,topic,headers andfanout

  • fanout: 将它接收到的消息广播到所有绑定到它的消息队列上。(忽略路由键routingKey)

  • 生产者 - 发布者

package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-发布者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class Publisher {
    @SneakyThrows
    public static void main(String[] args) {
        @Cleanup
        Connection connection = RabbitMqConnectionTools.getConnection();
        @Cleanup
        Channel channel = connection.createChannel();
        //定义交换器类型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        String msg = "Hello RabbitMq!";
        for (int i = 0; i < 20; i++) {
            channel.basicPublish(ExchangeTypeEnum.FANOUT.getExchangeName(), "", null, (msg + i).getBytes());
            log.info("Send msg:[{}] success", (msg + i));
        }
    }
}
  • 消费者1 - 订阅者1
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-订阅者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberOne {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //开启持久化队列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), durable, false, false, null);
        //定义交换器类型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息队列与Exchange交换器绑定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告诉rabbitmq一次只发送一条消息,并且在前一个消息未被处理或者消费之前,不继续发送下一个消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        });
        //关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 消费者2 - 订阅者2
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 发布订阅-订阅者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberTwo {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //开启持久化队列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), durable, false, false, null);
        //定义交换器类型为fanout
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息队列与Exchange交换器进行绑定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告诉rabbitmq一次只发送一条消息,并且在前一个消息未被处理或者消费之前,不继续发送下一个消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(2000);
            } catch (Exception e) {

            }
        });
        //关闭自动应答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 注意
    • 没有在发布者定义队列,而是定义了交换器Exchange。发布者将消息发送到Exchange,而不是Queue
    • 在订阅者端,每个订阅者定义了自己的消息队列,并且将自己的消息队列与Exchange进行绑定。则在每次发布者向相应的Exchange发送消息的时候,Exchange会将消息发送至订阅了该Exchange的队列。(即:每个订阅者收到的消息都是一样的)
  • 测试结果
>>> 订阅者1
[main] INFO mq.rabbit.ps.SubscriberOne - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]


>>> 订阅者2
[main] INFO mq.rabbit.ps.SubscriberTwo - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]

标签:INFO,订阅,AMQP,ps,RabbitMQ,mq,rabbit,import,com
来源: https://blog.csdn.net/futao__/article/details/91283704

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有