ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

rocketmq核心源码分析第二十三篇一顺序消息

2022-02-08 16:03:51  阅读:129  来源: 互联网

标签:顺序 第二十三 select 源码 消息 msg new null rocketmq


文章目录

顺序消息

使用demo

  • 通过MessageQueueSelector对mqs进行选择
  • 一般按业务维度保障分区顺序
defaultMQProducer.send(msg,new MessageQueueSelector(){
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    	通常用法: msg的业务唯一键相同的消息发送到同一队列
    	保障分区顺序性
        int i = (int)arg % mqs.size();
        return mqs.get(i);
    }
},16/* 为select方法的arg参数*/ );

源码分析

  • 顺序消息通过sendSelectImpl实现发送
  • 获取topic对应的TopicPublishInfo
  • 获取topic消息队列集合
  • 根据selector选择指定的消息队列
  • 消息发送
private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);
    获取topic发布信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        try {
            获取topic消息队列集合
            List<MessageQueue> messageQueueList =
                    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
            Message userMessage = MessageAccessor.cloneMessage(msg);
            String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
            userMessage.setTopic(userTopic);
            根据selector选择指定的消息队列
            mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
        } catch (Throwable e) {
            throw new MQClientException("select message queue throwed exception.", e);
        }

        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTime) {
            throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
        }
        消息发送
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
        } else {
            throw new MQClientException("select message queue return null.", null);
        }
    }
    validateNameServerSetting();
    throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

总结

  • 顺序消息是指将同一业务键的消息发往同一消息队列MessageQueue
  • 只保障分区顺序性
  • 消费端不管是并发消费还是顺序消费都是按照MessageQueue的维度进行拉取,但并发消费由于多线程干扰[所以顺序消费最好是采用ConsumeMessageOrderlyService]

标签:顺序,第二十三,select,源码,消息,msg,new,null,rocketmq
来源: https://blog.csdn.net/qq_35529969/article/details/122825491

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有