ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

rocketmq-spring的consumer设置消费失败最大重试次数

2021-06-18 23:05:48  阅读:251  来源: 互联网

标签:topic RocketMQPushConsumerLifecycleListener spring 重试 break new consumer rocketm


说明

rocketmq-spring的consumer的相关属性配置有两种方式:

  1. 在配置文件中进行配置
  2. 类上使用@RocketMQMessageListener注解配置相关属性

关于注解中的属性可以查看:org.apache.rocketmq.spring.annotation.RocketMQMessageListener,而在文件中可以配置的属性只有如下几个(并不遵守spring boot自动配置规范,所以在idea中不会有相关提示)

说明如下:

配置项

说明

rocketmq.name-serverrocketmq的name server地址,格式:`主机:端口;主机:端口`,多个地址以英文分号分隔
rocketmq.consumer.secret-key ACL的secret-key属性
rocketmq.consumer.access-keyACL的access-key属性
rocketmq.consumer.customized-trace-topic
自定义消费轨迹topic,不使用忽略
rocketmq.access-channe
枚举类型,值为:【LOCAL, CLOUD】,值为CLOUD表示设置接入阿里云。忽略。

如果想要设置最大重试次数等一些相关初始化参数配置,很明显是不支持的。

同时,看一下构造consumer的源码,可以看到只配置了固定的几个属性:

    private void initRocketMQPushConsumer() throws MQClientException {

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
            this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
        boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
        if (Objects.nonNull(rpcHook)) {
            consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                enableMsgTrace, this.applicationContext.getEnvironment().
                resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            consumer.setVipChannelEnabled(false);
            consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));
        } else {
            log.debug("Access-key or secret-key not configure in " + this + ".");
            consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        }

        String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
        if (customizedNameServer != null) {
            consumer.setNamesrvAddr(customizedNameServer);
        } else {
            consumer.setNamesrvAddr(nameServer);
        }
        if (accessChannel != null) {
            consumer.setAccessChannel(accessChannel);
        }
        consumer.setConsumeThreadMax(consumeThreadMax);
        if (consumeThreadMax < consumer.getConsumeThreadMin()) {
            consumer.setConsumeThreadMin(consumeThreadMax);
        }
        consumer.setConsumeTimeout(consumeTimeout);

        switch (messageModel) {
            case BROADCASTING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                break;
            case CLUSTERING:
                consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                break;
            default:
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
        }

        switch (selectorType) {
            case TAG:
                consumer.subscribe(topic, selectorExpression);
                break;
            case SQL92:
                consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                break;
            default:
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
        }

        switch (consumeMode) {
            case ORDERLY:
                consumer.setMessageListener(new DefaultMessageListenerOrderly());
                break;
            case CONCURRENTLY:
                consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                break;
            default:
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
        }

        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
        } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
            ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
        }

    }

但是看代码的最后几行,rocketMQListener如果实现了RocketMQPushConsumerLifecycleListener接口,则会调用RocketMQPushConsumerLifecycleListener的prepareStart(consumer)方法,很明显,可以在这里设置consuemr的参数。

说明:rocketMQListener就是类上带有RocketMQMessageListener的bean。

解决方案

    @RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_topic_consumer", selectorExpression = "*")
    class StringConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {

        @Override
        public void onMessage(String message) {
            LOGGER.info("receive message: {}", message);
        }

        @Override public void prepareStart(DefaultMQPushConsumer consumer) {
            // 设置最大重试次数
            consumer.setMaxReconsumeTimes(5);
            // 如下,设置其它consumer相关属性
            consumer.setPullBatchSize(16);
        }
    }

末语

我是在翻源码的才想到这个解决方案,我想既然提供有这个接口进行自定义配置,官方文档应该会有示例说明,然后翻了下github,是有类似的使用方式的,源码上还有其它示例,如果有其它问题,建议还是先看官方示例是否提供了相关解决方案。github地址:https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer

 

标签:topic,RocketMQPushConsumerLifecycleListener,spring,重试,break,new,consumer,rocketm
来源: https://blog.csdn.net/x763795151/article/details/118023942

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有