ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

利用Redisson实现分布式延时任务调度功能

2020-11-23 16:56:44  阅读:1187  来源: 互联网

标签:orderId 执行 Redisson 延时 任务 taskId 定时 任务调度


 定时任务

定时任务是在编码世界中经常遇到的问题,比如定时备份数据库、定时刷新缓存等,可以通过Linux定时任务完成,也可以通过框架如Spring完成,但是在分布式场景中传统单机可以完成功能就不太行了,所以需要借助其他工具来实现任务调度的功能

 场景:在一些订单场景中,用户下单后会锁定一些资源,然后用户非正常退出(没有触发取消订单操作),导致订单资源占用无法释放的问题。

借助工具:redisson分布式服务中的分布式调度任务服务(Scheduler Service)

代码

关单任务

定时执行具体任务,主要实现关单,释放相关资源(优惠券等),设置相关状态标志位

注意:Runnable、Callable接口二选一,必须实现序列化字段,因为任务最终要被序列化存储在Redis中

@Slf4j
@Data
public class CloseOrderTask implements Runnable, Serializable {
    private static final long serialVersionUID = -8193920383968460660L;

    /**
     * 订单id
     */
    private String orderId;

    @Override
    @SneakyThrows
    public void run() {
        // 获取SpringBean,因为此对象不能为Spring所管理,所以需要通过工具获取SpringBean
        RedissonClient redissonClient = SpringUtils.getBean(RedissonClient.class);
        OrderService orderService = SpringUtils.getBean(OrderService.class);
        // 任务必须加锁,因为同一任务可能被多个实例所执行
        RLock lock = redissonClient.getLock(ProductProperties.ORDER_TASK_LOCK + orderId);
        boolean lockFlag = false;
        try {
            //尝试获取锁
            lockFlag = lock.tryLock(0L, TimeUnit.SECONDS);
            if (!lockFlag){ return; }
            //获取到锁,正常执行关单操作
            log.info("get lock order:{} ", orderId);
            OrderDTO order = orderService.getOrder(orderId);
            if(orderDetails.getStatus() == OrderStatus.NEW){
                // 自动关单操作
                orderService.autoCloseOrder(orderDetails);
            }
        }catch (Exception e){
            //TODO 异常情况应添加邮件通知
        }finally {
            if(lockFlag) {
                lock.unlock();
            }
        }

    }
}

关单订单任务调度器

注册一个任务调度器的Bean

注意:Bean的destory方法必须重写,否则在进行关闭Spring容器时,任务调度中心会被关闭,再次启动后不会唤醒

    /**
     * 关单定时任务
     */
    @Bean(destroyMethod = "")
    public RScheduledExecutorService rScheduledExecutorService(@Autowired RedissonClient redissonClient){
        WorkerOptions workerOptions = WorkerOptions.defaults().workers(CPU_NUM + 1);
        ExecutorOptions executorOptions = ExecutorOptions.defaults()
                .taskRetryInterval(10 * 60, TimeUnit.SECONDS);
        RScheduledExecutorService executorService = redissonClient
                .getExecutorService(ProductProperties.CLOSE_ORDER_TASK_EXECUTOR, executorOptions);
        executorService.registerWorkers(workerOptions);
        return executorService;
    }

服务层

例子实现了两个方法,开启一个定时任务和取消一个定时任务。

因为定时任务的取消时通过taskId取消的,所以在提交任务或获取taskId,并对orderId和taskId做了一下映射,在取消订单的时候就比较容易了

@Slf4j
@Service
public class CloseOrderServiceImpl implements CloseOrderService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RScheduledExecutorService executorService;

    @Override
    public void submitScheduleCloseTask(String orderId) {
        CloseOrderTask closeOrderTask = new CloseOrderTask();
        closeOrderTask.setOrderId(orderId);
        RScheduledFuture<?> schedule = executorService.schedule(closeOrderTask, ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
        redisTemplate.opsForValue().set(ProductProperties.ORDER_TASK_MAPPING + orderId, schedule.getTaskId(), ProductProperties.ORDER_TTL_MIN, TimeUnit.MINUTES);
        log.info("submit automatic close task. orderId: {}, taskId : {}", orderId, schedule.getTaskId());
    }

    @Override
    public void cancelScheduleCloseTask(String orderId) {
        String taskId = (String) redisTemplate.opsForValue().get(ProductProperties.ORDER_TASK_MAPPING + orderId);
        log.info("cancel automatic close task. orderId: {}, taskId : {}", orderId, taskId);
        if (!StringUtils.isEmptyStr(taskId)){
            executorService.cancelTask(taskId);
        }

    }
}

具体使用地点

在用户进行下单操作时可以提交一个定时任务

在用户取消订单时可以取消orderId对应的定时任务

原理分析

数据结构

提交一个任务后,Redis中会添加4个键值对

  • {任务队列名:包路径}:counter

    • string类型

    • 记录当前有多少任务待执行

  • {任务队列名:包路径}:shceduler

    • zset类型

    • 通过score排名,任务队列。通过value关联tasks具体任务

  • {任务队列名:包路径}:retry-interval

    • string类型

    • 任务重试时间,默认5000

  • {任务队列名:包路径}:tasks

    • Hash类型

    • 所有具体任务,通过key与shceduler关联执行顺序

    • value序列化的任务

执行流程

提交一个延时任务调度任务,会在:scheduler中产生两条数据,分别是任务下一次执行时间和任务下一次执行时间+重试时间

Spring在注册ExecutorService时指定了工人(worker)的数量,会在本地起线程来执行这些待执行的任务。

问题探究

1、任务过期,客户端挂了,然后过一度时间重启,任务是否还会执行。

客户端重启后,过期的任务都会被拿到客户端里面进行消费。

存在同时启动多个客户端,是否会发生任务抢占问题,内部是否有锁机制?答案是会的

2、同一个任务是否会被多个客户端执行

 通过两台服务器,每台提交10个任务,多轮测试没有发生同一个任务被多次执行的情况。

加上线程执行sleep后同样进行测试,发现同一任务被不同客户端消费,所以需要有锁机制

3、任务(schedule 单次)执行完毕后,Redis是否会删除任务队列中的任务

任务执行完毕后,会删除tasks、scheduler中的任务,同时counter-1

4、任务队列的过期策略

所有任务的过期时间都是-1(永不过期),保证任务不会丢

5、任务执行异常情况

能够正常捕捉到异常,并进行处理,同时任务会从Redis中删除

标签:orderId,执行,Redisson,延时,任务,taskId,定时,任务调度
来源: https://blog.csdn.net/cong____cong/article/details/109999330

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有