一、实现分布式锁的要求
- 确保互斥:在同一时刻,必须保证锁至多只能被一个客户端持有。
- 不能死锁:在一个客户端在持有锁的期间崩溃而没有主动解锁情况下,也能保证后续其他客户端能加锁。
- 避免活锁:在获取锁失败的情况下,反复进行重试操作,占用Cpu资源,影响性能。
- 实现更多锁特性:锁中断、锁重入、锁超时等。
- 确保客户端只能解锁自己持有的锁。
二、redission加锁、锁等待、解锁
1、加锁
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
KEYS[1]为getName();
ARGV[1]为internalLockLeaseTime;
ARGV[2]为getLockName(threadId)。
org.redisson.RedissonLock类tryLockInnerAsync通过eval命令执行Lua代码完成加锁操作。KEYS[1]为锁在redis中的key,key对应value为map结构,ARGV[1]为锁超时时间,ARGV[2]为锁value中的key。ARGV[2]由UUID+threadId组成,用来标记锁被谁持有。
(1) 第一个If判断key是否存在,不存在完成加锁操作
- redis.call(‘hset’, KEYS[1], ARGV[2], 1);创建key[1] map中添加key:ARGV[2] ,value:1
- redis.call(‘pexpire’, KEYS[1], ARGV[1]);设置key[1]过期时间,避免发生死锁。
eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。可避免第一条命令执行成功第二条命令执行失败导致死锁。
(2)第二个if判断key存在且当前线程已经持有锁, 重入:
- redis.call(‘hexists’, KEYS[1],ARGV[2]);判断redis中锁的标记值是否与当前请求的标记值相同,相同代表该线程已经获取锁。
- redis.call(‘hincrby’, KEYS[1], ARGV[2],1);记录同一线程持有锁之后累计加锁次数实现锁重入。
(3)最后的return表示key存在被其他线程获取的锁, 等待:
- redis.call(‘pexpire’, KEYS[1], ARGV[1]); 重制锁超时时间。
2、锁等待
@Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();//步骤1、这里调用加锁操作,若没有返回null表名有其它线程占用锁,获得锁超时时间,执行锁等待逻辑Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}//步骤2、步骤一中加锁操作失败,订阅消息,利用redis的pubsub提供一个通知机制来减少不断的重试,避免发生活锁。RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}//步骤3、getLath()获取RedissionLockEntry实例latch变量,由于permits为0,所以调用acquire()方法后线程阻塞。// waiting for messageif (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);}// get(lockAsync(leaseTime, unit));}
其实很多这种循环阻塞的逻辑都不会使用空轮询,如果用空轮询的话太耗费CPU的性能,比如我们有定时去查询数据表中是否有数据,会使用while无线循环,但是配合Thread.sleep来防止空轮询,还有的会借助java的阻塞队列来防止空轮询,这里猜测阻塞队列是借助操作系统内核的中断来唤醒的,这个具体得去研究系统底层实现,这里就不多说,而redisssion是通过消息订阅来阻塞的,也防止了空轮询。总之,尽量避免使用空轮询。
3、解锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}
KEYS[1]为getName();
KEYS[2]为getChannelName();
ARGV[1]为LockPubSub.unlockMessage;
ARGV[2]为internalLockLeaseTime;
ARGV[3]为getLockName(threadId);
- (1)第一个if判断锁对应key是否存在,不存在则publish消息,将获取锁被阻塞的线程恢复重新获取锁;
- (2)第二个if判断锁对应key存在,value中是否存在当前要释放锁的标示,不存在返回nil,确保锁只能被持有的线程释放。
- (3)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1,第三个if判断锁标示对应的值是否大于0,大于0,表示有锁重入情况发生,重新设置锁过期时间。
- (4)对应key存在,value中存在当前要释放锁的标示,将锁标示对应值-1后等于0,调用del操作释放锁,并publish消息,将获取锁被阻塞的线程恢复重新获取锁;
订阅者将会收到如下消息。
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {...@Overrideprotected void onMessage(RedissonLockEntry value, Long message) {if (message.equals(unlockMessage)) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute != null) {runnableToExecute.run();}value.getLatch().release();} else if (message.equals(readUnlockMessage)) {while (true) {Runnable runnableToExecute = value.getListeners().poll();if (runnableToExecute == null) {break;}runnableToExecute.run();}//订阅者接收到publish消息后,执行release操作,调用acquire被阻塞的线程将继续执行获取锁操作。value.getLatch().release(value.getLatch().getQueueLength());}}...}
这里就跟步骤2的所等待逻辑互相呼应了
...RFuture<RedissonLockEntry> future = subscribe(threadId);...getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);...
4、watch dog自动延期机制
redisson默认超时时间是30s,假设我们的业务处理逻辑超过了30秒怎么办?redisson是这样实现的,当客户端
一获得锁,则会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
源码如下:



看门狗因为是新启动一个后台线程在运行,会对性能有一定的影响。
三、redisson实现分布式锁是否满足要求
1、确保互斥
要求:在同一时刻,必须保证锁至多只能被一个客户端持有。
redisson实现:redisson用lua脚本来保证原子操作,判断key是否存在,存在者锁等待,所以是满足于该要求的。
2、不能死锁
要求:在一个客户端在持有锁的期间崩溃而没有主动解锁情况下,也能保证后续其他客户端能加锁。
redisson实现:redisson对锁设置了超时时间,再客户端奔溃的情况下,超时后会自动被清除,所以不会死锁,满足要求。
3、避免活锁
要求:在获取锁失败的情况下,反复进行重试操作,占用Cpu资源,影响性能。
redisson实现:redisson通过消息订阅机制来防止空轮询,满足要求。
4、实现更多锁特性:锁中断、锁重入、锁超时等。
要求:确保客户端只能解锁自己持有的锁。
redisson实现:redisson通过map的结构来实现锁的重入,同一先线程进入只不过value+1,满足要求并且还有看门狗防止客户端还存在的情况下锁超时。
