一、
执行过程如下:
redis.call('hset', KEYS[1], ARGV[2], 1),往 Redis 中写入数据,形成 hash 结构,如Lock{id + ":" + threadId : 1}。
ARGV[2]:id + ":" + threadId,即锁的小 key。若是自己的,则执行redis.call('hincrby', KEYS[1], ARGV[2], 1),将锁的 value 加 1,并执行redis.call('pexpire', KEYS[1], ARGV[1])设置过期时间。
"if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);"
若当前锁存在,第一个条件不满足,接着判断redis.call('hexists', KEYS[1], ARGV[2]) == 1,通过大 key 和小 key 判断当前锁是否属于自己。
查看源码会发现,会判断当前方法的返回值是否为null。若某个节点挂了,只要有一个节点拿不到锁,都不算加锁成功,保证了加锁的可靠性。以锁失效时间为 30s,10s 后触发任务进行续约,将锁续约成 30s,若操作成功,会递归调用自己,重新设置任务,实现不停续约。基于setnx实现的分布式锁问题
重入问题:获得锁的线程应能再次进入相同锁的代码块,可重入锁能防止死锁。
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); }});return ttlRemainingFuture;
续约逻辑是通过commandExecutor.getConnectionManager().newTimeout()方法实现的,该方法表示在一定时间后执行特定任务。
五、若以上两个条件都不满足,则抢锁失败,返回锁的失效时间。若线程出现宕机,则不会续约,等到时间后自然释放锁。而synchronized和Lock锁都是可重入的。
二、
ARGV[1]:锁失效时间。
主从一致性:若 Redis 提供主从集群,向集群写数据时,主机异步同步数据给从机,若同步前主机宕机,会出现死锁问题。@Configurationpublic class RedissonConfig { @Bean public RedissonClient redissonClient(){ // 配置 Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.150.101:6379") .setPassword("123321"); // 创建RedissonClient对象 return Redisson.create(config); }}
配置 Redisson 客户端:进行 Redisson 客户端的配置。
若返回值为null,代表当前线程已抢锁完毕或可重入完毕;若以上两个条件都不满足,则进入第三个条件,返回锁的失效时间。
@Resourceprivate RedissonClient redissonClient;@Overridepublic Result seckillVoucher(Long voucherId) { // 1.查询优惠券 SeckillVoucher voucher = seckillVoucherService.getById(voucherId); // 2.判断秒杀是否开始 if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀尚未开始!"); } // 3.判断秒杀是否已经结束 if (voucher.getEndTime().isBefore(LocalDateTime.now())) { // 尚未开始 return Result.fail("秒杀已经结束!"); } // 4.判断库存是否充足 if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } Long userId = UserHolder.getUser().getId(); //创建锁对象 这个代码不用了,因为我们现在要使用分布式锁 //SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); RLock lock = redissonClient.getLock("lock:order:" + userId); //获取锁对象 boolean isLock = lock.tryLock(); //加锁失败 if (!isLock) { return Result.fail("不允许重复下单"); } try { //获取代理对象(事务) IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { //释放锁 lock.unlock(); } }
三、commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()ttlRemainingFuture.onComplete((ttlRemaining, e)
这句话相当于对抢锁进行监听,抢锁完毕后会调用特定方法开启一个线程进行续约逻辑,即看门狗线程。以主从为例,写命令在主机上,主机会将数据同步给从机,但在主机还未将数据写入从机时宕机,哨兵会选举一个 slave 变成 master,此时新的 master 中没有锁信息,锁就丢失了。例如有 3 个锁,时间就是 4500ms,在这时间内所有锁加锁成功才算加锁成功,若有线程加锁失败,则会再次重试。
long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) { return;}
接下来根据lock方法的重载情况进行处理。
不可重试:目前的分布式锁只能尝试一次,合理的情况是线程在获得锁失败后应能再次尝试。大 key 表示锁是否存在,小 key 表示当前锁被哪个线程持有。若为null,对应前两个条件,退出抢锁逻辑;若返回值不是null,即走第三个分支,在源码处会进行while(true)的自旋抢锁。例如在HashTable中,方法用synchronized修饰,若在一个方法内调用另一个方法,不可重入会导致死锁。