ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

18第四章:03_延时消息

2022-08-14 17:30:09  阅读:129  来源: 互联网

标签:03 18 Broker 投递 消息 延时 等级 延迟


一、延时消息

当消息写入到 Broker 后,在指定的时长后才可被消费处理的消息,称为延时消息。

采用 RocketMQ 的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306 平台订票超时未支付取消订票的场景。

在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在 30 分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完 成,则取消订单,将商品再次放回到库存;如果完成支付,则忽略。

在 12306 平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在 45 分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消预订,将车票再次放回到票池;如果完成支付,则忽略。

二、延时等级

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。

延时等级定义在 RocketMQ 服务端的 MessageStoreConfig 类中的如下变量中:

即,若指定的延时等级为 3,则表示延迟时长为 10s,即延迟等级是从 1 开始计数的。

当然,如果需要自定义的延时等级,可以通过在 broker 加载的配置中新增如下配置(例如下面增加了 1 天这个等级 1d)。配置文件在 RocketMQ 安装目录下的 conf 目录中。

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

三、延时消息实现原理

具体实现方案是:

1、修改消息

Producer 将消息发送到 Broker 后,Broker 会首先将消息写入到 commitlog 文件,然后需要将其分发到相应的 consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程:

  • 修改消息的 Topic 为 SCHEDULE_TOPIC_XXXX
  • 根据延时等级,在 consumequeue 目录中 SCHEDULE_TOPIC_XXXX 主题下创建出相应的 queueId 目录与 consumequeue 文件(如果没有这些目录与文件的话)。

延迟等级 delayLevel 与 queueId 的对应关系为 queueId = delayLevel -1

需要注意,在创建 queueId 目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕, 而是用到哪个延迟等级创建哪个目录

  • 修改消息索引单元内容。索引单元中的 Message Tag HashCode 部分原本存放的是消息的 Tag 的 Hash 值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原 Topic 后再次被写入到 commitlog 中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息被发送到 Broker 时的时间戳。

  • 将消息索引写入到 SCHEDULE_TOPIC_XXXX 主题下相应的 consumequeue 中

SCHEDULE_TOPIC_XXXX 目录中各个延时等级 Queue 中的消息是如何排序的?

是按照消息投递时间排序的。一个 Broker 中同一等级的所有延时消息会被写入到 consumequeue 目录中 SCHEDULE_TOPIC_XXXX 目录下相同 Queue 中。即一个 Queue 中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于于消息存储时间了。即按照消息被发送到 Broker 的时间进行排序的。

2、投递延时消息

Broker 内部有⼀个延迟消息服务类 ScheuleMessageService,其会消费 SCHEDULE_TOPIC_XXXX 中的消息,即按照每条消息的投递时间,将延时消息投递到⽬标 Topic 中。不过,在投递之前会从 commitlog 中将原来写入的消息再次读出,并将其原来的延时等级设置为 0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标 Topic 中。

ScheuleMessageService 在 Broker 启动时,会创建并启动一个定时器 TImer,用于执行相应的定时 任务。系统会根据延时等级的个数,定义相应数量的 TimerTask,每个 TimerTask 负责一个延迟等级消息的消费与投递。每个 TimerTask 都会检测相应 Queue 队列的第一条消息是否到期。若第 一条消息未到期,则后面的所有消息更不会到期(消息是按照投递时间排序的);若第一条消 息到期了,则将该消息投递到目标 Topic,即消费该消息。

3、将消息重新写入 commitlog

延迟消息服务类 ScheuleMessageService 将延迟消息再次发送给了 commitlog,并再次形成新的消息索引条目,分发到相应 Queue。

这其实就是一次普通消息发送。只不过这次的消息 Producer 是延迟消息服务类 ScheuleMessageService

四、代码举例

定义 DelayProducer 类

public class DelayProducer {
 public static void main(String[] args) throws Exception {
  DefaultMQProducer producer = new DefaultMQProducer("pg");
  producer.setNamesrvAddr("rocketmqOS:9876");
  producer.start();
  for (int i = 0; i < 10; i++) {
   byte[] body = ("Hi," + i).getBytes();
   Message msg = new Message("TopicB", "someTag", body);
   // 指定消息延迟等级为3级,即延迟10s
   // msg.setDelayTimeLevel(3);
   SendResult sendResult = producer.send(msg);
   // 输出消息被发送的时间
   System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
   System.out.println(" ," + sendResult);
  }
  producer.shutdown();
 }
}

定义 OtherConsumer 类

public class OtherConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("rocketmqOS:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicB", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus
    consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
     for (MessageExt msg : msgs) {
      // 输出消息被消费的时间
      System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
      System.out.println(" ," + msg);
     }
     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
        consumer.start();
        System.out.println("Consumer Started");
    }
}

标签:03,18,Broker,投递,消息,延时,等级,延迟
来源: https://www.cnblogs.com/niujifei/p/16585846.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有