ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

最全RabbitMQ教程3-高级特性

2021-02-03 09:57:04  阅读:111  来源: 互联网

标签:教程 消费 队列 最全 RabbitMQ 死信 消息 路由


文章目录


​ RabbitMQ作为当今互联网最为成熟的MQ产品之一,他的高级特性是相当丰富的,如果需要全面了解,最好的参考资料当然是官方文档了。 所有脱离官方文档的教学全是耍流氓

​ 具体可以访问官方文档: https://www.rabbitmq.com/documentation.html ,在右侧的Section模块有最为详细的RocketMQ特性说明。
在这里插入图片描述

​ 关于客户端的编程使用,可以参考其中的Client Documentation部分,而大部分的高级特性都在其中的Server Documentation部分。这整理抽出几个在日常开发中比较重要的部分来帮大家总结一下,减少大家读官方文档的时间。

一、Headers路由

​ 在官网的体验示例中,还有一种路由策略并没有提及,那就是Headers路由。其实官网之所以没有过多介绍,就是因为这种策略在实际中用得比较少,但是在某些比较特殊的业务场景,还是挺好用的。

​ 官网示例中的集中路由策略, direct,fanout,topic等这些Exchange,都是以routingkey为关键字来进行消息路由的,但是这些Exchange有一个普遍的局限就是都是只支持一个字符串的形式,而不支持其他形式。Headers类型的Exchange就是一种忽略routingKey的路由方式。他通过Headers来进行消息路由。这个headers是一个键值对,发送者可以在发送的时候定义一些键值对,接受者也可以在绑定时定义自己的键值对。当键值对匹配时,对应的消费者就能接收到消息。匹配的方式有两种,一种是all,表示需要所有的键值对都满足才行。另一种是any,表示只要满足其中一个键值就可以了。而这个值,可以是List、Boolean等多个类型。

​ 关于Headers路由的示例,首先在Web管理页面上,可以看到默认创建了一个amqp.headers这样的Exchange交换机,这个就是Headers类型的路由交换机。然后关于Headers路由的示例,可以查看示例代码。BasicDemo和SpringBootDemo中均有详细的使用示例。

在这里插入图片描述

​ 例如我们收集应用日志时,如果需要实现按Log4j那种向上收集的方式,就可以使用这种Headers路由策略。

日志等级分为 debug - info - warning - error四个级别。而针对四个日志级别,按四个队列进行分开收集,收集时,每个队列对应一个日志级别,然后收集该日志级别以上级别的所有日志(包含当前日志级别)。像这种场景,就比较适合使用Headers路由机制。

二、分组消费模式

概述

​ 我们回顾下RabbitMQ的消费模式,Exchange与Queue之间的消息路由都是通过RoutingKey关键字来进行的,不同类型的Exchange对RoutingKey进行不同的处理。那有没有不通过RoutingKey来进行路由的策略呢?

这个问题其实是很常见的,一个产品的业务模型设计得再完美,也会有照顾不到的场景。例如ShardingSphere分库分表时,默认都是基于SQL语句来进行分库分表,但是为了保证产品灵活性,也提供了hint强制路由策略,脱离了SQL的限制。

​ 在RabbitMQ产品当中,确实没有这样的路由策略,但是在SpringCloudStream框架对RabbitMQ进行封装时,提供了一个这种策略,即分区消费策略。

​ 这种策略很类似于kafka的分组消费策略。 我们回忆一下,在kafka中的分组策略,是不同的group,都会消费到同样的一份message副本,而在同一个group中,只会有一个消费者消费到一个message。这种分组消费策略,严格来说,在Rabbit中是不存在的。RabbitMQ是通过不同类型的exchange来实现不同的消费策略的。这虽然与kafka的这一套完全不同,但是在SpringCloudStream针对RabbitMQ的实现中,可以很容易的看到kafka这种分组策略的影子。

​ 当有多个消费者实例消费同一个bingding时,Spring Cloud Stream同样是希望将这种分组策略,移植到RabbitMQ中来的。就是在不同的group中,会同样消费同一个Message,而在同一个group中,只会有一个消费者消息到一个Message。

测试

​ 要使用分组消费策略,需要在生产者和消费者两端都进行分组配置。

​ 1、生产者端 核心配置

#指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2
#只有消费端分区ID为1的消费端能接收到消息
spring.cloud.stream.bindings.output.producer.partition-key-expression=1

​ 2、消费者端启动两个实例,组成一个消费者组

​ 消费者1 核心配置

#启动消费分区
spring.cloud.stream.bindings.input.consumer.partitioned=true
#参与分区的消费端节点个数
spring.cloud.stream.bindings.input.consumer.instance-count=2
#设置该实例的消费端分区ID
spring.cloud.stream.bindings.input.consumer.instance-index=1

​ 消费者2 核心配置

#启动消费分区
spring.cloud.stream.bindings.input.consumer.partitioned=true
#参与分区的消费端节点个数
spring.cloud.stream.bindings.input.consumer.instance-count=2
#设置该实例的消费端分区ID
spring.cloud.stream.bindings.input.consumer.instance-index=0

​ 这样就完成了一个分组消费的配置。两个消费者实例会组成一个消费者组。而生产者发送的消息,只会被消费者1 消费到(生产者的partition-key-expression 和 消费者的 instance-index 匹配)。

原理

​ 实际上,在跟踪查看RabbitMQ的实现时,就会发现,Spring Cloud Stream在增加了消费者端的分区设置后,会对每个有效的分区创建一个单独的queue,这个队列的队列名是在原有队列名后面加上一个索引值。而发送者端的消息,会最终发送到这个带索引值的队列上,而不是原队列上。这样就完成了分区消费。
在这里插入图片描述

​ 我们的示例中,分组表达式是直接指定的,这样其实是丧失了灵活性的。实际开发中,可以将这个分组表达式放到消息的header当中,在发送消息时指定,这样就更有灵活性了。

​ 例如:将生产者端的分组表达式配置为header[‘partitonkey’]

#生产者端设置
spring.cloud.stream.bindings.output.producer.partition-key-expression=header['partitionkey']

​ 这样,就可以在发送消息时,给消息指定一个header属性,来控制控制分组消费的结果。

Message message = MessageBuilder.withPayload(str).setHeader("partitionKey", 0).build();
source.output().send(message);

​ 分组消费策略是在原有路由策略上的一个补充,在实际生产中也是经常会用到的一种策略。

三、懒队列 Lazy Queue

文档地址: https://www.rabbitmq.com/lazy-queues.html

​ RabbitMQ从3.6.0版本开始,就引入了懒队列的概念。懒队列会尽可能早的将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中。

​ 懒队列的设计目标是为了支持非常长的队列(数百万级别)。队列可能会因为一些原因变得非常长-也就是数据堆积。

  • 消费者服务宕机了

  • 有一个突然的消息高峰,生产者生产消息超过消费者

  • 消费者消费太慢了

​ 默认情况下,RabbitMQ接收到消息时,会保存到内存以便使用,同时把消息写到硬盘。但是,消息写入硬盘的过程中,是会阻塞队列的。RabbitMQ虽然针对写入硬盘速度做了很多算法优化,但是在长队列中,依然表现不是很理想,所以就有了懒队列的出现。

​ 懒队列会尝试尽可能早的把消息写到硬盘中。这意味着在正常操作的大多数情况下,RAM中要保存的消息要少得多。当然,这是以增加磁盘IO为代价的。

​ 声明懒队列有两种方式:

  • 给队列指定参数

在这里插入图片描述

在代码中可以通过x-queue-mode参数指定

Map<String, Object> args = new HashMap<String, Object>();
  args.put("x-queue-mode", "lazy");
  channel.queueDeclare("myqueue", false, false, false, args);
  • 设定一个策略,在策略中指定queue-mode 为 lazy。
	rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

​ 要注意的是,当一个队列被声明为懒队列,那即使队列被设定为不持久化,消息依然会写入到硬盘中。并且,在镜像集群中,大量的消息也会被同步到当前节点的镜像节点当中,并写入硬盘。这会给集群资源造成很大的负担。

​ 最后一句话总结:懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。但是这是以大量消耗集群的网络及磁盘IO为代价的

四、死信队列

文档地址: https://www.rabbitmq.com/dlx.html

​ 死信队列是RabbitMQ中非常重要的一个特性。简单理解,他是RabbitMQ对于未能正常消费的消息进行的一种补救机制。死信队列也是一个普通的队列,同样可以在队列上声明消费者,继续对消息进行消费处理。

1、有以下三种情况会将一个正常的消息转化成死信

  • 消息被消费者确认拒绝。消费者把requeue参数设置为true(false),并且在消费后,向RabbitMQ返回拒绝。channel.basicReject或者channel.basicNack。
  • 消息达到预设的TTL时限还一直没有被消费。
  • 消息由于队列已经达到最长长度限制而被丢掉

TTL即最长存活时间 Time-To-Live 。消息在队列中保存时间超过这个TTL,即会被认为死亡。死亡的消息会被丢入死信队列,如果没有配置死信队列的话,RabbitMQ会保证死了的消息不会再次被投递,并且在未来版本中,会主动删除掉这些死掉的消息。

设置TTL有两种方式,一是通过配置策略指定,另一种是给队列单独声明TTL

策略配置方式 - Web管理平台配置 或者 使用指令配置 60000为毫秒单位

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

在声明队列时指定 - 同样可以在Web管理平台配置,也可以在代码中配置:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

2、声明死信队列同样有两种方式:配置策略 和 给队列单独指定

策略配置方式:

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

代码配置方式:

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

3、关于参数x-dead-letter-routing-key

​ 死信在转移到死信队列时,他的Routing key 也会保存下来。但是如果配置了x-dead-letter-routing-key这个参数的话,routingkey就会被替换为配置的这个值。

​ 另外,死信在转移到死信队列的过程中,是没有经过消息发送者确认的,所以并不能保证消息的安全性

4、死信的Header消息

​ 消息被作为死信转移到死信队列后,会在Header当中增加一些消息。在官网的详细介绍中,可以看到很多内容,比如时间、原因(rejected,expired,maxlen)、队列等。然后header中还会加上第一次成为死信的三个属性,并且这三个属性在以后的传递过程中都不会更改。

  • x-first-death-reason

  • x-first-death-queue

  • x-first-death-exchange

    5、死信队列同样具有FIFO先进先出的顺序

​ 最先进入死信队列的消息,会最先被消费。

RabbitMQ中,是不存在延迟队列的功能的,而通常如果要用到延迟队列,就会采用TTL+死信队列的方式来处理。

五、备份与恢复

文档地址: https://www.rabbitmq.com/backup.html

​ RabbitMQ有一个data目录会保存分配到该节点上的所有消息。我们的实验环境中,默认是在/var/lib/rabbitmq/mnesia目录下 这个目录里面的备份分为两个部分,一个是元数据(定义结构的数据),一个是消息存储目录。

对于元数据,可以在Web管理页面通过json文件直接导出或导入。

在这里插入图片描述

而对于消息,可以手动进行备份恢复

​ 其实对于消息,由于MQ的特性,是不建议进行备份恢复的。而RabbitMQ如果要进行数据备份恢复,也非常简单。

​ 首先,要保证要恢复的RabbitMQ中已经有了全部的元数据,这个可以通过上一步的json文件来恢复。

​ 然后,备份过程必须要先停止应用。如果是针对镜像集群,还需要把整个集群全部停止。

​ 最后,在RabbitMQ的数据目录中,有按virtual hosts组织的文件夹。你只需要按照虚拟主机,将整个文件夹复制到新的服务中即可。持久化消息和非持久化消息都会一起备份。 我们实验环境的默认目录是/var/lib/rabbitmq/mnesia/rabbit@worker2/msg_stores/vhosts

六、消费优先级与流量控制

文档地址: https://www.rabbitmq.com/consumer-priority.html

​ 关于消费队列的优先级,关键是x-priority 这个参数,可以指定队列的优先级。默认情况下,RabbitMQ会根据round-robin策略,把消息均匀的给不同的消费者进行处理,但是有了优先级之后,RabbitMQ会保证优先级高的队列先进行消费,而同一优先级的对接,还是会使用round-robin轮询策略来进行分配。

​ 与之对应的是RabbitMQ的流量控制配置,关键是channeld basicQos(int prefetch_size, int prefetch_count, boolean global)。 这个方法中,prefetch_count设置了当前消费者节点最多保持的未答复的消息个数, prefetch_size设置了当前消费节点最多保持的未答复的消息大小,然后global参数为true则表示该配置针对当前channel的所有队列,而默认情况下是false,表示该配置只针对当前消费者队列。最常用的方式就是只设定一个prefetch_count参数。

​ 这个参数实际上就是配置当前消费节点的消息吞吐量,可以通过给不同的消费节点配置不同的prefetch_count,再结合消费优先级的配置来实现流量控制策略。

标签:教程,消费,队列,最全,RabbitMQ,死信,消息,路由
来源: https://blog.csdn.net/roykingw/article/details/113584551

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有