ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMq 学习总结 - amqp

2021-06-11 11:00:31  阅读:174  来源: 互联网

标签:总结 amqp exchange 队列 RabbitMq 交换机 消息 msg String


背景
RabbitMQ 作为最常用的消息中间件,这里做一下相关学习记录

2.2 消息队列的使用场景

  1. 解耦,两个应用服务之间消息传输使用队列
  2. 保证有序性,利用消息队列先进先出的特点保证,事务处理的有序性
  3. 跨系统的异步通信
  4. 异步处理

2.3 基础应用 SpringBoot + Spring AMQP(内部整合了RabbitMQ)

  1. Docker 安装 服务端安装
    拉取镜像(management 含有web管理页面)

docker pull rabbitmq:3.7.7-management

  1. 创建 启动容器
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9
参数说明:
 -d 容器后台运行  --name 自定义容器名称 -p 指定服务端口映射 (5672:应用访问接口,15672: web管理页面访问接口)  -v 映射目录或文件 –hostname (主机名) -e 指定环境变量
  1. docker ps 查看容器状态
    在这里插入图片描述

  2. 访问管理地址

http://服务器地址:15672
  1. 添加 依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. Application.yml 配置
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: " df80af9ca0c9"
  1. 创建 RabbitMQConfig配置类
    借鉴热联项目使用的配置类

  2. 实际使用示例
    生产者
    在这里插入图片描述

消费者
使用@RabbitListener 可以标注在方法上单独使用,也可以标注在类上,配合 @RabbitHandler 注解一起使用
在这里插入图片描述

2.4 基本原理
官网 https://www.rabbitmq.com/getstarted.html
在这里插入图片描述

首先了解一下 RabbitMq 常用关键字

  1. producer:消息生产者,发送消息的一方,发送消息到RabbitMQ,消息包括消息体(即自定义的消息)和附加信息(如交换器名称、RoutingKey和一些自定义的属性)
  2. consumer:消息消费者,消费消息的一方,接收RabbitMQ的消息,它无需知道生产者是谁;
  3. vhost 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。虚拟主机共享相同身份认证和加密环境(类似数据库中一个库的概念)
    虚拟主机的作用在于进行权限管控,rabbitmq默认有一个虚拟主机"/"。可以使用rabbitmqctl
  4. add_vhost命令添加虚拟主机,然后使用rabbitmqctl
    set_permissions命令设置指定用户在指定虚拟主机下的权限,以此达到权限管控的目的。
  5. Channel: 消息通道: 在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
  6. Exchange
    交换机:用于消息分发,它负责接收消息并转发到与之绑定的队列,exchange不存储消息,如果一个exchange没有binding任何Queue,那么当它会丢弃生产者发送过来的消息,在启用ACK机制后,如果exchange找不到队列,则会返回错误。一个exchange可以和多个Queue进行绑定。
  7. 交换机的四种类型:
    Direct 路由模式 : 先匹配再投送 即在绑定时设定一个 routing_key,消息的routing_key 匹配时,
    才会被交换器投送到绑定的队列中去。direct是rabbitmq的默认交换机类型。
    Topic 通配符模式 :此时routing_key 支持模糊匹配 “#”匹配多个词 “*”匹配一个词
    Fanout 发布订阅模式 :转发消息到所有绑定队列,忽略routing_key。
  8. Headers 设置header attribute参数类型的交换机。相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型,忽略routing_key。在队列与交换器绑定时,会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列。在绑定Queue与Exchange时指定一组键值对,当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配。如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。
    匹配规则x-match有下列两种类型:
    x-match = all :表示所有的键值对都匹配才能接受到消息
    x-match = any:表示只要有键值对匹配就能接受到消息

以下是配置类实例:

@Configuration
public class RabbitMQConfig {

    private static final String topicExchangeName = "topic-exchange";
    private static final String fanoutExchange = "fanout-exchange";
    private static final String headersExchange = "headers-exchange";

    private static final String queueName = "cord";

    //声明队列
    @Bean
    public Queue queue() {
        //Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
        return new Queue("cord", false, true, true);
//第二个参数为 true 配置队列的持久化
    }

    //声明Topic交换机
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(topicExchangeName);
    }

    //将队列与Topic交换机进行绑定,并指定路由键
    @Bean
    Binding topicBinding(Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("org.cord.#");
    }

    //声明fanout交换机
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(fanoutExchange);
    }

    //将队列与fanout交换机进行绑定
    @Bean
    Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    //声明Headers交换机
    @Bean
    HeadersExchange headersExchange() {
        return new HeadersExchange(headersExchange);
    }

    //将队列与headers交换机进行绑定
    @Bean
    Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
        Map<String, Object> map = new HashMap<>();
        map.put("First","A");
        map.put("Fourth","D");
        //whereAny表示部分匹配,whereAll表示全部匹配
//        return BindingBuilder.bind(queue).to(headersExchange).whereAll(map).match();
        return BindingBuilder.bind(queue).to(headersExchange).whereAny(map).match();
    }
}

Producer.java

@Component
public class Producer {

    @Autowired
    private AmqpTemplate template;

    @Autowired
    private AmqpAdmin admin;

    /**
     * @param routingKey 路由关键字
     * @param msg 消息体
     */
    public void sendDirectMsg(String routingKey, String msg) {
        template.convertAndSend(routingKey, msg);
    }

    /**
     * @param routingKey 路由关键字
     * @param msg 消息体
     * @param exchange 交换机
     */
    public void sendExchangeMsg(String exchange, String routingKey, String msg) {
        template.convertAndSend(exchange, routingKey, msg);
    }

    /**
     * @param map 消息headers属性
     * @param exchange 交换机
     * @param msg 消息体
     */
    public void sendHeadersMsg(String exchange, String msg, Map<String, Object> map) {
        template.convertAndSend(exchange, null, msg, message -> {
            message.getMessageProperties().getHeaders().putAll(map);
            return message;
        });
    }
}

测试用例
RabbitmqTest.java

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = CordApplication.class)
public class RabbitmqTest {

    @Autowired
    private Producer producer;

    //Direct
    @Test
    public void sendDirectMsg() {
        producer.sendDirectMsg("cord", String.valueOf(System.currentTimeMillis()));
    }

    //Topic
    @Test
    public void sendtopicMsg() {
        producer.sendExchangeMsg("topic-exchange","org.cord.test", "hello world");
    }

    //Fanout
    @Test
    public void sendFanoutMsg() {
        producer.sendExchangeMsg("fanout-exchange", "abcdefg", String.valueOf(System.currentTimeMillis()));
    }

    //Headers
    @Test
    public void sendHeadersMsg() {
        Map<String, Object> map = new HashMap<>();
        map.put("First","A");
        producer.sendHeadersMsg("headers-exchange", "hello word", map);
    }
}

2.5 ACK 消息确认
消息被消费者接受都需要做消息确认,消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK。
消息确认模式有:
AcknowledgeMode.NONE:自动确认
AcknowledgeMode.AUTO:根据情况确认
AcknowledgeMode.MANUAL:手动确认。

默认模式为 自动确认,自动确认模式会在消息发送给消费者后立刻确认,也就是如果消费者后续代码报错,消息也会被消耗,不会保留。

手动确认
常用API
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息

channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列

channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);
nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示不是重回队列

2.6 常见问题

  1. 观察者模式和发布/订阅模式的区别
    观察者模式的定义:对象间的一种一对多的组合关系,以便一个对象的状态发生变化时,所有依赖于它的对象都得到通知。
    发布/订阅模式
    在观察者模式中的Subject就像一个发布者(Publisher),而观察者(Observer)完全可以看作一个订阅者(Subscriber)。subject通知观察者时,就像一个发布者通知他的订阅者。这也就是为什么很多书和文章使用“发布-订阅”概念来解释观察者设计模式。但是这里还有另外一个流行的模式叫做发布-订阅设计模式。它的概念和观察者模式非常类似。最大的区别是:
    在发布-订阅模式,消息的发送方,叫做发布者(publishers),消息不会直接发送给特定的接收者(订阅者)。

  2. 如何保证RabbitMQ不被重复消费?
    正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;
    但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。
    比如:可以在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;

标签:总结,amqp,exchange,队列,RabbitMq,交换机,消息,msg,String
来源: https://blog.csdn.net/weixin_38227192/article/details/117808222

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有