ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RocketMQ

2022-02-06 18:31:32  阅读:210  来源: 互联网

标签:消费者 队列 RabbitMQ 死信 MQ 消息 RocketMQ


一.MQ的相关概念

什么是MQ?

  • Message Queue,消息队列。简单的来说,就是一个先进先出的队列,用来发送消息(信息)。

为什么要用 MQ?

  • 流量消峰:在电商系统中,比如双11下单太多,来不及处理,生产者就把下单的信息先放在MQ中,后来慢慢交给消费者进行消费。
  • 异步任务:有些服务的调用,比如A调用B,A还有别的事做,不能一直阻塞等待B的结果。就可以使用异步,A调用的信息交给MQ,A继续执行自己的业务。
  • 应用解耦:比如分布式系统中,各个系统相互调用。比如下图的库存系统出错,就会导致整个订单系统都出错,这样很不好,就是用MQ进行解耦。比如日志处理。

MQ的选择?

  • Kafka:大吞吐量,专门为大数据而生。大数据领域的实时计算以及日志采集被大规模使用。只支持简单的MQ功能,大型项目建议使用。
  • RabbitMQ:结合Erlang语言本身的并发优势,并发性强,性能好,社区活跃。适合中小型项目。
  • RocketMQ:阿里巴巴开源框架,天生为金融互联网领域而生,可靠性非常高,尤其对于订单扣款业务。在双11经过多次考验。
二.RabbitMQ工作原理

  • prodeucer:生产者,用来发送消息
  • Consumer:消费者,用来处理消息
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
    • Channel:如果每次访问MQ都建立Connection是很浪费性能的,Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
      • 每次连接都会创建一个Channel,通过Channel进行一系列操作(创建queue,发消息)
  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker,就是Rabbit的实例。
    • Exchange(交换机):message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。
    • Queue(队列):消息最终被送到这里等待 consumer 取走

Exchange说明:

类型:

  • 直接(direct):通过Binding(exchange 和 queue 之间的虚拟连接进行连接,binding 中可以包含 routing key)进行绑定

  • 扇出(fanout):相当于广播

  • 主题(topic):找到匹配的一组,比如abc.orange.def就匹配到Q1

  • 标题(headers) :忽略
三.队列

死信队列:

  • 顾名思义就是无法被消费的消息(由于特定的原因导致 queue 中的某些消息无法被消费)。
  • 解释图:设置死信队列和死信交换机,过期的消息发送给死信交换机,死信交换机再发给死信队列,由特定的消费者进行处理。
  • TTL:消息/队列最大的存活时间。
    • 每一个消息可以单独设置TTL,每一个队列也可以设置TTL。

 

延迟队列:

  • 延迟队列是死信队列的一种形式。看上图,如果我们不设置C1,那么所有的消息只能在队列中等待到过期,进入死信队列,马上被消费,相当于延迟消费。
  • 使用场景:
    • 订单在十分钟之内未支付则自动取消
    • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
    • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议...
  • 存在问题:不同的延迟时长,就需要增加不同的延迟队列。如下图:如果再增加一个1小时后执行的事务,就需要再加一个1小时的延迟队列。
    • 进阶1:使用一个延迟队列,不要设置队列的TTL,设置每一个消息的TTL。这样不同的消息就会在不同的时间点后进行消费。
      • 新问题:RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
    • 进阶2:使用延迟插件可以解决这个问题。  
优先队列:队列者设置为优先级队列,发消息的时候也设置优先级消息。这样才能保证真正的优先   镜像队列:在Rabbit集群中,如何保证队列同步到其他的节点,就使用镜像队列。实现:随便找一个节点添加一个策略policy就可以了。

四.消息可靠性怎么保证?

要保证消息不丢失,需要三方面都进行保证:生产者(发布确认机制,事务机制),消费者(消息应答机制,死信队列),MQ(持久化,集群)


发布确认:生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。如果超时没有收到消息,或者收到未收到的消息,进行重发或者警报管理员。

  • 单个发布确认:是一种同步确认发布的方式,发布一个消息之后只有它被确认发布,后续的消息才能继续发布。channel.waitForConfirms()
    • 缺点:效率低
  • 批量发布确认:也是一种同步确认发布的方式,先发布一批消息然后一起确认,可以极大地提高吞吐量
    • 缺点:一旦出现问题但很难推断出是那 消息出现了问题。
  • 异步发布确认:生产者只负责不断的发就行,确认的方面由MQ自己决定,收到未收到都会回复一个消息(回调函数)。不会造成生产者等待。用下图的俩个回调方法实现
    • 每一个Channel中的消息都有自己的编号,MQ只回复对应的编号就行

事务机制:RabbitMQ 客户端中与事务机制相关的方法有三个

  • channel.txSelect  用于将当前的信道设置成事务模式。
  • channel.txCommit  用于提交事务 。

  • channel.txRollback 用于事务回滚,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,通过txRollback来回滚。

 消息应答:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。如果没收到ack,消息自动重新入队。

  • 自动应答:默认的,消费者一收到消息就马上进行应答。
    • 缺点:可能消费者后续处理出错,就没有办法了。
  • 手动应答:可以在消费者处理完逻辑之后,再进行应答,就保证了消息的可靠。实现:Channel.basicAck()
    • 手动应答还可以设置批量应答:比如发送消息1,2,3,4,5。批量应答是只要收到消息5,只应答5,前面的几个消息就不需要应答了。


 持久化:需要将队列和消息都进行持久化。

  •  刷盘的实现:刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘
    • 同步刷盘:在Broker把消息写到CommitLog映射区后,就会等待写入完成。
    • 异步刷盘:只是唤醒对应的线程,不保证执行的时机,流程如图所示。

五.如何解决消息重复,消息积压,消息有序性?

消息重复:采用幂等性接口。

消息积压:

  • 因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。
    • 解决方案:
      • 定位消费慢的原因,如果有bug,调bug。或者进行数据库层面的调优。
      • 多增加几个队列和消费者。

消息有序性

  • 有一些消息需要保证顺序,比如下订单,要先锁库存,才能生成订单信息
    • 解决方案:
      • 使用CompletableFuture工具类进行异步编排。
      • 全局有序:只能由一个生产者往Broker发送消息,并且一个Broker内部只能有一个队列。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!
      • 部分有序:消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。

.其他小问题

vhost 是什么?起什么作用?

  • vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server
  • 其内部均含有独立的 queueexchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。
  • 从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。

消息基于什么传输?

  • 由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。
  • 信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。

消息如何分发?

  • 轮询:默认
  • 不公平分发:比如消费者1的消费能力高,消费者2的消费能力低。就多给消费者1发消息。实现:channel.basicQos(1);

什么是元数据?元数据分为哪些类型?包括哪些内容?与 cluster 相关的元数据有哪些?元数据是如何保存的?元数据在 cluster 中是如何分布的?

  • 在非 cluster 模式下,元数据主要分为 Queue 元数据(queue 名字和属性等)、Exchange元数据(exchange 名字、类型和属性等)、Binding 元数据(存放路由关系的查找表)、Vhost元数据(vhost 范围内针对前三者的名字空间约束和安全属性设置)。
  • 在 cluster 模式下,还包括 cluster 中 node 位置信息和 node 关系信息。
  • 元数据按照 erlang node 的类型确定是仅保存于 RAM 中,还是同时保存在 RAM 和 disk 上。元数据在 cluster 中是全 node 分布的。

RabbitMQ的集群模式和集群节点类型?

 

 寄语:年轻人,你的职责是平整土地,而非焦虑时光,你做三四月的事,在七八月自有答案   ---余世存《时间之书》

标签:消费者,队列,RabbitMQ,死信,MQ,消息,RocketMQ
来源: https://www.cnblogs.com/monkey-xuan/p/15865509.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有