ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

redisson延迟队列解决延迟任务

2022-10-01 21:29:00  阅读:48  来源: 互联网

标签:


场景:

针对客户端提交的合成视频任务,按照提交时间延迟60秒进行执行


方案甄选:

1、使用redis的Java扩展库redisson提供的RBlocingQueue + RDelayDeque来实现

2、使用类似kafka时间轮的方式实现延迟队列

经过在本地测试发现,第一种实现起来工作量要小一些,易于操作,也易于理解;第二种借鉴kafka的时间轮方式,执行延迟任务时间更加精确,但是操作成本要高一些;如果对执行任务时间精度没有太高要求,可以直接选择第一种redisson的方式;


代码示例:

向RDelayedDueue队列存入要进行延迟处理的任务对象:

@Slf4j
@Component
public class SynthesisResultConsumer<T> {

    @Autowired
    private SynthesisResultComponent synthesisResultComponent;

    @Autowired
    private RDelayedQueue<SynthesisResultQryDto> delayedQueue;


    private static final int RETRY_MAX = 60;

    @Async
    public void accept(SynthesisResultQryDto qryDto) {
        qryDto.setRetryNum(qryDto.getRetryNum() + 1);
        SynthesisResultDto resultDto = synthesisResultComponent.getSynthesisResult(qryDto.getVideoId());
        //合成结束 更新返回结果
        if (resultDto.getIsEnd()) {
            synthesisResultComponent.updateSynthesisResult(resultDto);
            log.info("accept 查询合成视频结果 结果已出,videoId:{}", qryDto.getVideoId());
        } else if (qryDto.getRetryNum() >= RETRY_MAX) {
            log.error("accept 查询合成视频结果 超过60次,请检查是否存异常,videoId:{}", qryDto.getVideoId());
        } else {
            log.warn("accept 查询合成视频结果 本次查询未得到结果加入队列等待下次执行,videoId:{}", qryDto.getVideoId());
            delayedQueue.offer(qryDto, 60, TimeUnit.SECONDS);
        }
    }


}

从RBlockingQueue阻塞队列中取出到期的任务执行:

@Slf4j
@Component
public class SynthesisResultDelayJob {

    @Autowired
    private SynthesisResultConsumer<SynthesisResultQryDto> synthesisResultConsumer;

    @Autowired
    private RBlockingDeque<SynthesisResultQryDto> blockingDeque;

    @PostConstruct
    public void listen() {
        new Thread(() -> {
            while (true) {
                try {
                    log.info("listen 本次监听时间:{}", DateUtil.date2String(new Date(), "yyyy-MM-dd HH:mm:ss"));
                    SynthesisResultQryDto dto = blockingDeque.take();
                    log.info("listen 从队列中获取需要查询结果的合成视频任务信息:{}", JSON.toJSONString(dto));
                    synthesisResultConsumer.accept(dto);
                } catch (InterruptedException e) {
                    log.error("listen InterruptedException,error msg:{}", ExceptionUtils.getMessage(e));
                }
            }
        }).start();
    }

}

配置类:

@Configuration
public class SynthesisResultDelayConfig {
    @Autowired
    private RedissonClient redissonClient;

    @Bean("blockQueue")
    public RBlockingDeque<SynthesisResultQryDto> getBlockQueue() {
        RBlockingDeque<SynthesisResultQryDto> blockingDeque =
                redissonClient.getBlockingDeque("prompter:synthesis:task:result");
        return blockingDeque;
    }


    @Bean("delayedQueue")
    public RDelayedQueue<SynthesisResultQryDto> getDelayQueue() {
        RBlockingDeque<SynthesisResultQryDto> blockingDeque = getBlockQueue();
        RDelayedQueue<SynthesisResultQryDto> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        return delayedQueue;
    }

}

依赖:

<dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson-spring-boot-starter</artifactId>
     <version>3.11.4</version>
</dependency>

上面的示例可以看出,这里的延迟队列使用了两个队列,一个是RBlockingQueue,这个队列是用来取延迟任务的;一个是RDelayedDeque,这个队列是用来存延迟任务的;offer方法中第二个参数表示任务延迟时间;


总结:

横向比较kafka时间轮的实现方案和redisson的延迟队列的实现方案,可以充分理解不同方案的优劣和使用场景,选择最终的方案要综合考虑实现成本和使用场景,最终选定最优的方案


 

标签:
来源:

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有