2.ReetrantLock:多线程下的加锁过程

2.ReetrantLock:多线程下的加锁过程

Posted by ZhaoLe on June 24, 2019

一个简单的demo

通过一个Demo来展示在多线程模式下ReetrantLock加锁和释放锁的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//Demo
private static ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) {
    Thread thread1 = new Thread(lockTask(),"线程一号");
    thread1.start();
    Thread thread2 = new Thread(lockTask(),"线程二号");
    thread2.start();
}


public static Runnable lockTask() {
    return () -> {
        while (true) {
            System.out.println(Thread.currentThread().getName() + "准备获取锁。。。");
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "获取锁成功。。。");
            threadSleep();

            lock.unlock();
            System.out.println(Thread.currentThread().getName() + "的锁已经释放。。。");
        }
    };
}

private static void threadSleep() {
    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

然后点击运行,可以看下控制台打印的结果,如果线程1先获取锁,那线程2线程就要等它先释放后才能持有。

Lock

第一个线程获取锁的逻辑已经在【1.ReetrantLock:单线程下的加锁过程】解释过了,现在主要说线程2在线程1没有释放锁的情况下发生了什么。

类调用:NonfairSync#lock->AbstractQueuedSynchronizer#acquire

1
2
3
4
5
6
//AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • tryAcquire(arg)具体实现是在ReentrantLock#nonfairTryAcquire

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
      //ReentrantLock.java
      final boolean nonfairTryAcquire(int acquires) {
          final Thread current = Thread.currentThread();
          int c = getState();
          if (c == 0) {
              if (compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          else if (current == getExclusiveOwnerThread()) {
              int nextc = c + acquires;
              if (nextc < 0) // overflow
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
    
    • 【5-10】第一个线程已经处于占有锁还未释放的状态 所以state已经是1 ,这个方法并不会进去
    • 【11-17】当前线程二在访问并且线程一已经持有锁,所以这个判断方法也不会进去。
    • 【18】nonfairTryAcquire返回false
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg)把这个方法拆分成两个说。

    • addWaiter(Node.EXCLUSIVE), arg)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      
          //AbstractQueuedSynchronizer.java
          private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
      

      新增个为EXCLUSIVE类型的Node节点,waitStatus为0,第一次进来这个链表结构还没有初始化,都是null,所以会直接进入到enq(node)

      • 【5-11】如果不是第一次进来。此时已经有链表。这个方法会把新来的Node放到链表之后排队。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      
        //AbstractQueuedSynchronizer.java
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
      

      这个方法主要做两件事,一个是初始化链表,二是将新的node节点插入到链表,使用的CAS的方式保证入队列方式的一致性。

      • 【3】算是个自旋,如果第一次进来会做初始化链表+挂node节点。
      • 【5】第一次进方法,链表没有初始化,这时候tail是null,要走一步初始化。
      • 【6-7】这个方法使用CAS方式将新建的Node节点赋值到head节点上,再将head节点赋值给tail,此时tail = head
      • 【9-13】初始化成功后会进入这个方法里面,将传入的Node节点设置为tail节点的后置,构成一个双向链表。
    • acquireQueued

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      
          //AbstractQueuedSynchronizer.java
         final boolean acquireQueued(final Node node, int arg) {
             boolean failed = true;
             try {
                 boolean interrupted = false;
                 for (;;) {
                     final Node p = node.predecessor();
                     if (p == head && tryAcquire(arg)) {
                         setHead(node);
                         p.next = null; // help GC
                         failed = false;
                         return interrupted;
                     }
                     if (shouldParkAfterFailedAcquire(p, node) &&
                         parkAndCheckInterrupt())
                         interrupted = true;
                 }
             } finally {
                 if (failed)
                     cancelAcquire(node);
             }
         }
      
      • 【7-13】参数Node是上一步刚组建好的的链表,首先从Node中获取上一个节点,判断是否是head节点(如果知道当前节点的上个节点是头节点,就说明当前的node链表中只有一个等待的线程)。然后在 tryAcquire(arg),由于在我们例子中,到这一步的时候线程一还是没有释放掉锁,所以这个判断里的代码是走不到的。这个判断里面的方法意思就是如果当前线程已经获取到锁,我们需要按照FIFO原则清除链表上的节点。

        1
        2
        3
        4
        5
        6
        
        //AbstractQueuedSynchronizer.java
        private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
        
      • 【14-16】如果获取不到锁就会进这个方法,这里是实现排队阻塞等待的功能。这里还是分成两个方法来说明。

        • shouldParkAfterFailedAcquire(p, node)

          这里面要先说几个waitStatus状态。 SIGNAL: waitStatus的状态值,-1,表明后面的线程需要当前线程唤醒。 CANCELLED: waitStatus的状态值,1,标记这个线程已经取消了。

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          
              private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
                  int ws = pred.waitStatus;
                  if (ws == Node.SIGNAL)
                       return true;
                  if (ws > 0) {
                      do {
                           node.prev = pred = pred.prev;
                      } while (pred.waitStatus > 0);
                           pred.next = node;
                  } else {
                      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
                  }
                   return false;
              }
          
          • 【3-4】如果上个节点的waitStatusSIGNAL说明这个线程可以直接被挂起
          • 【6-9】暂时没有调用。
          • 【11】改变waitStatusNode.SIGNAL
        • parkAndCheckInterrupt()调用UNSAFE.park挂起线程。

          1
          2
          3
          4
          5
          6
          
           public static void park(Object blocker) {
              Thread t = Thread.currentThread();
              setBlocker(t, blocker);
              UNSAFE.park(false, 0L);
              setBlocker(t, null);
          }
          

unLock

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

跟单线程的比多了对链表中下一个节点线的程进行唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 //AbstractQueuedSynchronizer.java
 private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
  • 【3】获取节点的waitStatus状态。
  • 【4-5】将node节点状态初始化为0
  • 【8-12】没怎么看明白,而且debug模式也没进入。
  • 【14-15】唤醒线程去获取锁。