ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

redisson MultiLock原理及分布式锁的应用

2021-12-28 23:58:14  阅读:267  来源: 互联网

标签:加锁 leaseTime lock xxx 默认 MultiLock redisson 分布式


一、前言

基于 Redis 的 Redisson 分布式联锁 RedissonMultiLock 对象可以将多个 RLock 对象关联为一个联锁,每个 RLock 对象实例可以来自于不同的 Redisson 实例。当然,这是官网的介绍,具体是什么?一起看看联锁 MultiLock 使用以及源码吧!

二、MultiLock 使用

在这里插入图片描述

按照官方文档的说法,这里 Redisson 客户端可以不是同一个。当然,一般工作中也不会说不用一个客户端吧。

三、加锁

源码入口:org.redisson.RedissonMultiLock#lock(),默认超时时间 leaseTime 没有设置,所以为 -1。

  public void lock(long leaseTime, TimeUnit unit) {
        try {
            lockInterruptibly(leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

在这里插入图片描述
这块方法太长,咱们拆分进行阅读。

  1. 基础等待时间 baseWaitTime = 锁数量 * 1500,在这里就是 4500 毫秒;
  2. leaseTime == -1 所以 waitTime = baseWaitTime,也就是 4500;
  3. while (true) 调用 tryLock 加锁,直到成功。

调用 tryLock 方法,其中参数 waitTime = 4500,leaseTime = -1,unit = MILLISECONDS

下面看一下 tryLock 里面有什么逻辑?

在这里插入图片描述
leaseTime != -1 不满足,这部分直接跳过。

在这里插入图片描述
waitTime != -1 条件满足,remainTime = 4500,lockWaitTime = 4500。

在这里插入图片描述
所以,failedLocksLimit() 这个方法直接返回 0,就是必须全部加锁成功。

在这里插入图片描述

这里才是重点:遍历所有的锁,依次加锁。加锁逻辑就和可重入锁加锁并无区别了。所以 Lua 脚本就不进行分析了。

在这里插入图片描述

上面就是 tryLock 加锁之后的结果。

加锁成功,则将成功的锁放进 acquiredLocks 集合中;加锁失败,需要判断 failedLocksLimit,因为这里是 0,所以会直接对成功加锁集合 acquiredLocks 中的所有锁执行锁释放,同时清空成功集合,恢复迭代器。
在这里插入图片描述

每次加锁之后,会更新锁剩余时间 remainTime,如果 remainTime 小于等于 0 了,则说明加锁超时,直接返回 false。这样就会执行外部的 while (true) 逻辑,然后重新再走一遍 RedissonMultiLock#tryLock

  • 总结
    根据理解,画图如下:总体而言,就是将 key1、key2、key3 …… keyN 放到一个 List 集合中,然后迭代循环加锁,直到所有的都成功。

在这里插入图片描述

  • lock和tryLock的区别
  1. tryLock() 它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false
  2. tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true
  3. tryLock(long waitTime, long leaseTime, TimeUnit unit)在2的基础上,如果获取到锁,锁的最长持有时间为leaseTime

四、锁释放

看完加锁逻辑,锁释放就更容易理解了。
在这里插入图片描述
直接遍历释放锁即可,lock.unlockAsync() 是调用的 RedissonBaseLock#unlockAsync() 方法。

五、使用MultiLock实现分布式锁

  • 建立一个三主三从的redis集群,参考文章
    在这里插入图片描述

  • 创建springboot项目

  • redissonCluster.yml

clusterServersConfig:
  # 连接空闲超时,单位:毫秒 默认10000
  idleConnectionTimeout: 10000
  pingConnectionInterval: 1000
  # 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000
  connectTimeout: 10000
  # 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000
  timeout: 3000
  # 命令失败重试次数
  retryAttempts: 3
  # 命令重试发送时间间隔,单位:毫秒
  retryInterval: 1500
  # 重新连接时间间隔,单位:毫秒
  failedSlaveReconnectionInterval: 3000
  # 执行失败最大次数
  #failedAttempts: 3
  # 密码
  password:
  # 单个连接最大订阅数量
  subscriptionsPerConnection: 5
  clientName: null
  # loadBalancer 负载均衡算法类的选择
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  # 主节点最小空闲连接数 默认32
  masterConnectionMinimumIdleSize: 32
  # 主节点连接池大小 默认64
  masterConnectionPoolSize: 64
  # 订阅操作的负载均衡模式
  subscriptionMode: SLAVE
  # 只在从服务器读取
  readMode: SLAVE
  # 集群地址
  nodeAddresses:
    - "redis://xxx.xxx.xxx.xxx:9001"
    - "redis://xxx.xxx.xxx.xxx:9002"
    - "redis://xxx.xxx.xxx.xxx:9003"
  # 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000
  scanInterval: 1000
  #这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2
threads: 0
#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2
nettyThreads: 0
# 编码方式 默认org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#传输模式
transportMode: NIO
# 分布式锁自动过期时间,防止死锁,单位毫秒,默认30s,每1/3的lockWatchdogTimeout时间,如果没执行玩业务,会自动给锁续约
lockWatchdogTimeout: 30000
# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true
keepPubSubOrder: true
# 用来指定高性能引擎的行为。由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试。
#
#该参数仅限于Redisson PRO版本。
#performanceMode: HIGHER_THROUGHPUT
@Configuration
public class RedissonHttpSessionConfig  {
    //服务停用后调用shutdown方法
    @Bean(destroyMethod="shutdown")
    public RedissonClient getRedissonClient() throws IOException {
        ResourceLoader loader = new DefaultResourceLoader();
        Resource resource = loader.getResource("redissonCluster.yml");
        Config config = Config.fromYAML(resource.getInputStream());
        return Redisson.create(config);
    }
}
@Component
public class RedissonMultiLockInit {
    private final ArrayList<RLock> rLockList=new ArrayList<>();
    @Autowired
    RedissonClient redissonClient;
    public  RedissonMultiLock initLock(String... locksName){
        for (String lockName : locksName) {
           rLockList.add(redissonClient.getLock(lockName));
        }
        RLock[] rLocks = rLockList.toArray(new RLock[0]);
        return new RedissonMultiLock(rLocks);
    }
    public List<RLock> getRLocks(){
        return rLockList;
    }
}
@Controller
@RequestMapping("/lock")
public class LockController {
    @Autowired
    RedissonMultiLockInit redissonMultiLockInit;
    @Autowired
    UserMapper userMapper;
    @Autowired
    PlatformTransactionManager transactionManager;
    
    @GetMapping("/get/{waitTime}/{leaseTime}")
    @ResponseBody
    public String getLock(@PathVariable long waitTime, @PathVariable long leaseTime) throws InterruptedException {
        String[] strings={"test1","test2","test3"};
        RedissonMultiLock lock = redissonMultiLockInit.initLock(strings);
        //手动开启事务管理,@Transitional无法控制redis的分布式锁
        //创建事务定义对象
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        //设置是否只读,false支持事务
        def.setReadOnly(false);
        //设置事务隔离级别,可以重复读mysql默认级别
        def.setIsolationLevel(TransactionDefinition.ISOLATION_REPEATABLE_READ);
        //设置事务传播行为
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        //配置事务管理器
        TransactionStatus status = transactionManager.getTransaction(def);
        if (lock.tryLock(waitTime,leaseTime, TimeUnit.SECONDS)){
            System.out.println(Thread.currentThread().getName()+" waiting time is "+waitTime+"s " +
                    "leaseTime is "+leaseTime+"s "+
                    "execute time is "+(leaseTime+10)+" s" );
            try {
                userMapper.updateById(new User(1L,23,"beijing","myname2"));
                //模拟执行超时释放锁
                Thread.sleep((leaseTime+10)*1000);
                List<RLock> rLocks = redissonMultiLockInit.getRLocks();
                //判断是否仍然持有所有锁,防止锁过期
                if(rLocks.stream().allMatch(RLock::isLocked)){
                    //提交业务
                    transactionManager.commit(status);
                    //提交业务后再释放分布式锁
                    lock.unlock();
                    return "unlock success,transition success";
                }
                else {
                    //回滚业务
                    transactionManager.rollback(status);
                    return "lock is expired,transition fail";
                }
            } catch (Exception e) {
                e.printStackTrace();
                return "transition error";
            }
        }
        else {
            return Thread.currentThread().getName()+" can't get the lock,because the waiting time isn't enough. Waiting time is "+waitTime+"s, " +
                    "leaseTime is "+leaseTime+"s ";
        }
    }
}
  • 注意:这里有一个很经典的@Transitional和分布式锁同时使用的问题,所以为了解决该问题,我们手动开启事务,并确保在事务提交后,再释放分布式锁,关于这个问题,可以参考这篇文章
  • 测试
  1. http://localhost:8090/lock/get/6/-1,表示最多有6s的等待获取锁的时间,并且业务的执行时间可以无续约(启用看门狗机制),那么这次业务是一定会成功的
    在这里插入图片描述

  2. http://localhost:8090/lock/get/6/9,表示最多有6s的等待获取锁的时间,并且最多有9s的业务执行时间,超时就会释放分布式锁,业务失败,由于我在controller中写死了业务超过了9s,所以这次业务肯定失败。
    在这里插入图片描述

  3. http://localhost:8090/lock/get/6/-1http://localhost:8090/lock/get/2/-1

由于第一次业务要花费9s的业务执行时间,那么第二次业务无法在2s的时间内获取到分布式锁,会退出此次业务。
在这里插入图片描述

参考文章

标签:加锁,leaseTime,lock,xxx,默认,MultiLock,redisson,分布式
来源: https://blog.csdn.net/cristianoxm/article/details/122118079

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有