个人随笔
目录
并发(十)、ReentrantLock加锁解锁流程源码分析
2021-04-30 15:35:24

1、初始化ReentrantLock锁

  1. ReentrantLock lock = new ReentrantLock(true);

我们可以查看ReentrantLock的构造方法

  1. public ReentrantLock(boolean fair) {
  2. sync = fair ? new FairSync() : new NonfairSync();
  3. }

因为fair传的是true,所以这里新建的是FairSync公平锁。

2、加锁

ReentrantLock的加锁调用的是如下方法

  1. lock.lock();

跟踪进去

  1. @Override
  2. public void lock() {
  3. aboutToAcquire(this);
  4. try {
  5. super.lock();
  6. } finally {
  7. lockStateChanged(this);
  8. }
  9. }

这里先不看 aboutToAcquire(this);和 lockStateChanged(this);先,直接先找主线剧情!super.lock();

  1. public void lock() {
  2. sync.lock();
  3. }

我们知道在初始化阶段实例化的是公平锁FairSync,所以继续走FairSync#lock方法

  1. final void lock() {
  2. acquire(1);
  3. }

然后调用AbstractQueuedSynchronizer#acquire方法获得锁

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

这里我们先看ReentrantLock#tryAcquire(arg),arg为1,单词字面意思为尝试获得,也就是调用这个方法尝试获得锁。

  1. protected final boolean tryAcquire(int acquires) {
  2. //1、获取当前线程
  3. final Thread current = Thread.currentThread();
  4. //2、获取状态值
  5. int c = getState();
  6. if (c == 0) {
  7. if (!hasQueuedPredecessors() &&
  8. compareAndSetState(0, acquires)) {
  9. setExclusiveOwnerThread(current);
  10. return true;
  11. }
  12. }
  13. else if (current == getExclusiveOwnerThread()) {
  14. int nextc = c + acquires;
  15. if (nextc < 0)
  16. throw new Error("Maximum lock count exceeded");
  17. setState(nextc);
  18. return true;
  19. }
  20. return false;
  21. }

假设现在是第一个线程进来,那么此时c的值为0,也就是AbstractQueuedSynchronizer#state=0
那么会进入到下面这段逻辑:

  1. if (!hasQueuedPredecessors() &&
  2. compareAndSetState(0, acquires)) {
  3. setExclusiveOwnerThread(current);
  4. return true;
  5. }

我们先看也就是AbstractQueuedSynchronizer#hasQueuedPredecessors(),通过字面意思,我们可以大概推测作用为判断队列中有没有等带获取锁的线程,代码如下:

  1. public final boolean hasQueuedPredecessors() {
  2. // The correctness of this depends on head being initialized
  3. // before tail and on head.next being accurate if the current
  4. // thread is first in queue.
  5. Node t = tail; // Read fields in reverse initialization order
  6. Node h = head;
  7. Node s;
  8. return h != t &&
  9. ((s = h.next) == null || s.thread != Thread.currentThread());
  10. }

上面假设这里是第一个线程,所以此时tail和head都是null,所以h != t则为false,那么下面这段逻辑

  1. return h != t &&
  2. ((s = h.next) == null || s.thread != Thread.currentThread());

返回的是false。这个方法返回false,表明!hasQueuedPredecessors()为true,那么会执行 compareAndSetState(0, acquires)方法,该方法执行逻辑如下:

  1. protected final boolean compareAndSetState(int expect, int update) {
  2. // See below for intrinsics setup to support this
  3. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  4. }

可以看到是调用了魔术类Unsafe的原子交换方法compareAndSwapInt,具体交换逻辑这里不提,涉及到CAS和一些底层的原子操作逻辑。

如果交换成功则返回true,那么ReentrantLock#tryAcquire方法将会返回true.也就是这段逻辑

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

!tryAcquire(arg)将为false,线程继续执行,拿到了锁。假设此时第二个线程进来,那么因为c此时已经等于1,所以tryAcquire(arg)会返回false,那么!tryAcquire(arg)将为true那么将会进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)判断逻辑,我们先看addWaiter(Node.EXCLUSIVE):

  1. private Node addWaiter(Node mode) {
  2. //初始化一个节点
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. Node pred = tail;
  6. if (pred != null) {
  7. node.prev = pred;
  8. if (compareAndSetTail(pred, node)) {
  9. pred.next = node;
  10. return node;
  11. }
  12. }
  13. enq(node);
  14. return node;
  15. }

modec传的是Node.EXCLUSIVE,从字面意思可以知道,是个排它锁。此时因才第二个线程进来,所以此时tail=null,pred=tail=null,所以会进入到enq(node)方法。

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. //如果尾节点为空
  5. if (t == null) { // Must initialize
  6. //CAS初始化头节点
  7. if (compareAndSetHead(new Node()))
  8. //尾节的指向头节点
  9. tail = head;
  10. } else {
  11. //当前节点的前驱指向尾节点,也就是把当前节点加在最后。
  12. node.prev = t;
  13. //通过CAS把node设置到tail
  14. if (compareAndSetTail(t, node)) {
  15. t.next = node;
  16. return t;
  17. }
  18. }
  19. }
  20. }

在AQS中,我们进程会看到死循环for和CAS操作,也就是自旋CAS来实现原子操作,第一个if里面的逻辑是相当于初始化队列,也就是初始化一个节点Node,head和tail引用(指针)都指向它。else逻辑中的意思是队列不为空,则通过CAS把node设置到tail。这段逻辑可能有点难想象,这里我画一个图。

  1. node.prev = t

  1. compareAndSetTail(t, node)

  1. t.next = node;

而如果pred不为空,则走这段逻辑

  1. if (pred != null) {
  2. node.prev = pred;
  3. if (compareAndSetTail(pred, node)) {
  4. pred.next = node;
  5. return node;
  6. }
  7. }

按上图的模式分析,结果是一样的,就是把新节点加到最后。

然后addWaiter就会返回新的Node。
然后我们来看方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg));此时addWaiter(Node.EXCLUSIVE)返回node,所以就变为了acquireQueued(node, arg));逻辑如下

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. //
  7. final Node p = node.predecessor();
  8. if (p == head && tryAcquire(arg)) {
  9. setHead(node);
  10. p.next = null; // help GC
  11. failed = false;
  12. return interrupted;
  13. }
  14. if (shouldParkAfterFailedAcquire(p, node) &&
  15. parkAndCheckInterrupt())
  16. interrupted = true;
  17. }
  18. } finally {
  19. if (failed)
  20. cancelAcquire(node);
  21. }
  22. }

这里又是自旋CAS,我们先看addWaiter

  1. final Node predecessor() throws NullPointerException {
  2. Node p = prev;
  3. if (p == null)
  4. throw new NullPointerException();
  5. else
  6. return p;
  7. }

在上面,我们知道p为head,那么这里又重新尝试tryAcquire(arg)获取锁,淡然这里还是返回false,因为第一个线程还在持有锁,那么会走到下面这个方法shouldParkAfterFailedAcquire(p, node)

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. if (ws == Node.SIGNAL)
  4. /*
  5. * This node has already set status asking a release
  6. * to signal it, so it can safely park.
  7. */
  8. return true;
  9. if (ws > 0) {
  10. /*
  11. * Predecessor was cancelled. Skip over predecessors and
  12. * indicate retry.
  13. */
  14. do {
  15. node.prev = pred = pred.prev;
  16. } while (pred.waitStatus > 0);
  17. pred.next = node;
  18. } else {
  19. /*
  20. * waitStatus must be 0 or PROPAGATE. Indicate that we
  21. * need a signal, but don't park yet. Caller will need to
  22. * retry to make sure it cannot acquire before parking.
  23. */
  24. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  25. }
  26. return false;
  27. }

我们知道pred此时为head,而head只是单纯的new Node,所以waitStatus为0,那么会走到compareAndSetWaitStatus(pred, ws, Node.SIGNAL);waitStatus设置为 Node.SIGNAL然后return false;也就是shouldParkAfterFailedAcquire(p, node)为false,那么继续自旋下一个循环,再进入上面的方法后ws就为Node.SIGNAL,所以return true,此时shouldParkAfterFailedAcquire(p, node)为true.那么逻辑:

  1. if (shouldParkAfterFailedAcquire(p, node) &&
  2. parkAndCheckInterrupt())
  3. interrupted = true;

就会走到第二个判断 parkAndCheckInterrupt()通过名字就大概知道,应该是阻塞线程。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }
  1. public static void park(Object blocker) {
  2. Thread t = Thread.currentThread();
  3. setBlocker(t, blocker);
  4. UNSAFE.park(false, 0L);
  5. setBlocker(t, null);
  6. }

果然最后调用了UNSAFE.park(false, 0L);该方法会调用操作系统api来阻塞线程。那么线程就阻塞在了这段代码,等待被唤醒后继续下一次循环tryAcquire(arg)尝试获取锁,如果获取到锁则方法返回,程序继续执行,否在继续阻塞!

好了,加锁的源码逻辑到这里就结束了,这里只抽取了主线,大概总结下。

1、线程尝试获取锁
2、获取锁失败则加入等待队列
3、调用Unsafe.park阻塞

关键点是自旋CAS(for(;;)+CAS)。

3、解锁

看完了加锁的源码,我们来看下解锁的源码,解锁首先从下面的代码开始

  1. lock.unlock();

然后跟踪进去:

  1. public void unlock() {
  2. sync.release(1);
  3. }

这些通过加锁后,源码跟踪比较简单,再跟踪进去

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

我们先看tryRelease方法,尝试去释放

  1. protected final boolean tryRelease(int releases) {
  2. int c = getState() - releases;
  3. if (Thread.currentThread() != getExclusiveOwnerThread())
  4. throw new IllegalMonitorStateException();
  5. boolean free = false;
  6. if (c == 0) {
  7. free = true;
  8. setExclusiveOwnerThread(null);
  9. }
  10. setState(c);
  11. return free;
  12. }

很明显现在是持有锁的线程去释放,c为0,那么free就为true,则该方法返回tue,若该方法返回ture,则接下来将执行这段逻辑

  1. Node h = head;
  2. if (h != null && h.waitStatus != 0)
  3. unparkSuccessor(h);
  4. return true;

在加锁的后面部分我们把waitStatus设置为Node.SIGNAL,而SIGNAL对应的值是-1,所以会执行逻辑unparkSuccessor(h);

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus;
  8. if (ws < 0)
  9. compareAndSetWaitStatus(node, ws, 0);
  10. /*
  11. * Thread to unpark is held in successor, which is normally
  12. * just the next node. But if cancelled or apparently null,
  13. * traverse backwards from tail to find the actual
  14. * non-cancelled successor.
  15. */
  16. Node s = node.next;
  17. if (s == null || s.waitStatus > 0) {
  18. s = null;
  19. for (Node t = tail; t != null && t != node; t = t.prev)
  20. if (t.waitStatus <= 0)
  21. s = t;
  22. }
  23. if (s != null)
  24. LockSupport.unpark(s.thread);
  25. }

这个方法传的是head头节点,所以开始head头节点的waitStatus为-1小于0,所以自旋重新设置为0,这个相当于初始化为0,好让第二个线程继续走前面走过的逻辑。然后node.next;获取下一个节点,这个节点从上面的分析来说是不为空的,所以最后会执行下面的逻辑:

  1. LockSupport.unpark(s.thread);
  1. public static void unpark(Thread thread) {
  2. if (thread != null)
  3. UNSAFE.unpark(thread);
  4. }

这里是调用Unsafe魔术类重新唤醒线程。到这里就解锁成功了,唤醒线程后,我们可以回到线程阻塞的逻辑:

  1. ```
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. //
  8. final Node p = node.predecessor();
  9. if (p == head && tryAcquire(arg)) {
  10. setHead(node);
  11. p.next = null; // help GC
  12. failed = false;
  13. return interrupted;
  14. }
  15. if (shouldParkAfterFailedAcquire(p, node) &&
  16. parkAndCheckInterrupt())
  17. interrupted = true;
  18. }
  19. } finally {
  20. if (failed)
  21. cancelAcquire(node);
  22. }
  23. }

可以知道之前在这个方法被阻塞parkAndCheckInterrupt(),现在被唤醒了,继续执行for循环,重新尝试获得锁,如果获取失败又会被阻塞。

到这里解锁的源码页走了一波!

4、总结

a、加锁,通过自旋CAS获取锁,获取锁失败调用Unsafe.park阻塞等待

b、解锁,公平锁直接调用Unsafe.unpark唤醒head.next节点的线程

 16

啊!这个可能是世界上最丑的留言输入框功能~


当然,也是最丑的留言列表

有疑问发邮件到 : suibibk@qq.com 侵权立删
Copyright : 个人随笔   备案号 : 粤ICP备18099399号