文章

AQS:同步状态获取与释放

AQS:同步状态获取与释放

此篇博客所有源码均来自JDK 1.8

在前面提到过,AQS 是构建 Java 同步组件的基础,我们期待它能够成为实现大部分同步需求的基础。

AQS 的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态。对于子类而言,它并没有太多的活要做,AQS 已经提供了大量的模板方法来实现同步,主要是分为三类:

独占式获取和释放同步状态

共享式获取和释放同步状态

查询同步队列中的等待线程情况。

自定义子类使用 AQS 提供的模板方法,就可以实现自己的同步语义

1. 独占式

独占式,同一时刻,仅有一个线程持有同步状态

1.1 独占式同步状态获取

老艿艿:「1.1 独占式同步状态获取」整个小节,是本文最难的部分。请一定保持耐心。

#acquire(int arg)方法,为 AQS 提供的模板方法。该方法为独占式获取同步状态,但是该方法对中断不敏感。也就是说,由于线程获取同步状态失败而加入到 CLH 同步队列中,后续对该线程进行中断操作时,线程不会从 CLH 同步队列中移除。代码如下:

1
2
3
4
5
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

第 2 行:调用#tryAcquire(int arg)方法,去尝试获取同步状态,获取成功则设置锁状态并返回 true ,否则获取失败,返回 false 。若获取成功,#acquire(int arg)方法,直接返回,不用线程阻塞,自旋直到获得同步状态成功。

#tryAcquire(int arg)方法,需要自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。代码如下:

1
2
3
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

直接抛出 UnsupportedOperationException 异常。

第 3 行:如果#tryAcquire(int arg)方法返回 false ,即获取同步状态失败,则调用#addWaiter(Node mode)方法,将当前线程加入到 CLH 同步队列尾部。并且,mode方法参数为Node.EXCLUSIVE,表示独占模式。

第 3 行:调用boolean #acquireQueued(Node node, int arg)方法,自旋直到获得同步状态成功。详细解析,见「1.1.1 acquireQueued」中。另外,该方法的返回值类型为boolean,当返回 true 时,表示在这个过程中,发生过线程中断。但是呢,这个方法又会清理线程中断的标识,所以在种情况下,需要调用【第 4 行】的#selfInterrupt()方法,恢复线程中断的标识,代码如下:

1
2
3
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

1.1.1 acquireQueued

boolean #acquireQueued(Node node, int arg)方法,为一个自旋的过程,也就是说,当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。

流程图如下:

acquireQueued流程图

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
final boolean acquireQueued(final Node node, int arg) {
    // 记录是否获取同步状态成功
    boolean failed = true;
    try {
        // 记录过程中,是否发生线程中断
        boolean interrupted = false;
        /*
        * 自旋过程,其实就是一个死循环而已
        */
        for (;;) {
            // 当前线程的前驱节点
            final Node p = node.predecessor();
            // 当前线程的前驱节点是头结点,且同步状态成功
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 获取失败,线程等待–具体后面介绍
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 获取同步状态发生异常,取消获取。
        if (failed)
            cancelAcquire(node);
    }
}

第 3 行:failed变量,记录是否获取同步状态成功。

第 6 行:interrupted变量,记录获取过程中,是否发生线程中断

========== 第 7 至 24 行:”死”循环,自旋直到获得同步状态成功。==========

第 12 行:调用Node#predecessor()方法,获得当前线程的前一个节点p。

第 14 行:p == head代码块,若满足,则表示当前线程的前一个节点为节点,因为head是最后一个获得同步状态成功的节点,此时调用#tryAcquire(int arg)方法,尝试获得同步状态。 在#acquire(int arg)方法的【第 2 行】,也调用了这个方法。

第 15 至 18 行:当前节点( 线程 )获取同步状态成功

第 15 行:设置当前节点( 线程 )为的head。

第 16 行:设置节点p不再指向下一个节点,让它自身更快的被 GC 。

第 17 行:标记failed = false,表示获取同步状态成功。

第 18 行:返回记录获取过程中,是否发生线程中断

第 20 至 24 行:获取失败,线程等待唤醒,从而进行下一次的同步状态获取的尝试。详细解析,见《【死磕 Java 并发】—– J.U.C 之 AQS:阻塞和唤醒线程》。详细解析,见「1.1.2 shouldParkAfterFailedAcquire」

第 21 行:调用#shouldParkAfterFailedAcquire(Node pre, Node node)方法,判断获取失败后,是否当前线程需要阻塞等待。

========== 第 26 至 29 行:获取同步状态的过程中,发生异常,取消获取。==========

第 28 行:调用#cancelAcquire(Node node)方法,取消获取同步状态。详细解析,见「1.1.3 cancelAcquire」

1.1.2 shouldParkAfterFailedAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获得前一个节点的等待状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // Node.SIGNAL
        /*
        * This node has already set status asking a release
        * to signal it, so it can safely park.
        */
        return true;
    if (ws > 0) { // Node.CANCEL
        /*
        * Predecessor was cancelled. Skip over predecessors and
        * indicate retry.
        */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 0 或者 Node.PROPAGATE
        /*
        * waitStatus must be 0 or PROPAGATE. Indicate that we
        * need a signal, but don't park yet. Caller will need to
        * retry to make sure it cannot acquire before parking.
        */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

pred和node方法参数,传入时,要求前者必须是后者的前一个节点。

第 3 行:获得前一个节点(pre)的等待状态。下面会根据这个状态有三种情况的处理。

第 4 至 9 行:等待状态为Node.SIGNAL时,表示pred的下一个节点node的线程需要阻塞等待。在pred的线程释放同步状态时,会对node的线程进行唤醒通知。所以,【第 9 行】返回 true ,表明当前线程可以被park安全的阻塞等待。

第 19 至 26 行:等待状态为0或者Node.PROPAGATE时,通过CAS设置,将状态修改为Node.SIGNAL,即下一次重新执行#shouldParkAfterFailedAcquire(Node pred, Node node)方法时,满足【第 4 至 9 行】的条件。

但是,对于本次执行,【第 27 行】返回 false 。

另外,等待状态不会为Node.CONDITION,因为它用在 ConditonObject 中。

第 10 至 18 行:等待状态为NODE.CANCELLED时,则表明该线程的前一个节点已经等待超时或者被中断了,则需要从 CLH 队列中将该前一个节点删除掉,循环回溯,直到前一个节点状态<= 0。

对于本次执行,【第 27 行】返回 false ,需要下一次再重新执行#shouldParkAfterFailedAcquire(Node pred, Node node)方法,看看满足哪个条件。

整个过程如下图:

shouldParkAfterFailedAcquire流程图

1.1.3 cancelAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
        ((ws = pred.waitStatus) == Node.SIGNAL ||
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) &&
        pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
            compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

第 2 至 4 行:忽略,若传入参数node为空。

第 6 行:将节点的等待线程置

第 9 行:获得node节点的一个节点pred。

第 10 至 11 行: 逻辑同#shouldParkAfterFailedAcquire(Node pred, Node node)的【第 15 至 17 行】。

第 16 行:获得pred的一个节点predNext。在这个变量上,有很”复杂”的英文,我们来理解下:predNext从表面上看,和node是等价的。

但是实际上,存在多线程并发的情况,所以在【第 25 行】或者【第 36 行】中,我们调用#compareAndSetNext(…)方法,使用CAS的方式,设置pred的一个节点。

如果设置失败,说明当前线程和其它线程竞争失败,不需要做其它逻辑,因为pred的一个节点已经被其它线程设置成功

第 21 行:设置node节点的为取消的等待状态Node.CANCELLED。在这个变量上,有很”复杂”的英文,我们再来理解下:

这里可以使用直接写,而不是 CAS 。

在这个操作之后,其它 Node 节点可以忽略node。

Before, we are free of interference from other threads.TODO 9000 芋艿,如何理解。

下面开始开始修改pred的的下一个节点,一共分成种情况。

========== 第一种 ==========

第 24 行:如果node是节点,调用#compareAndSetTail(…)方法,CAS设置pred为节点。

第 25 行:若上述操作成功,调用#compareAndSetNext(…)方法,CAS设置pred的一个节点为空(null)。

========== 第二种 ==========

第 30 行:pred非节点。

第 31 至 32 行:pred的等待状态为Node.SIGNAL,或者可被CAS为Node.SIGNAL。

第 33 行:pred的线程非空

TODO 9001 芋艿,如何理解。目前能想象到的,一开始 30 行为非头节点,在 33 的时候,结果成为头节点,线程已经为空了。

第 34 至 36 行:若node的一个节点next的等待状态非Node.CANCELLED,则调用#compareAndSetNext(…)方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
        doReleaseShared();
}

第 2 行:记录原来节点h。

第 3 行:调用#setHead(Node node)方法,设置node为节点。

第 20 行:propagate > 0代码块,说明同步状态还能被其他线程获取。

第 20 至 21 行:判断原来的或者节点,等待状态为Node.PROPAGATE或者Node.SIGNAL时,可以继续向下唤醒

第 23 行:调用Node#isShared()方法,判断一个节点为共享式获取同步状态。

第 24 行:调用#doReleaseShared()方法,唤醒后续的共享式获取同步状态的节点。详细解析,见「2.1.2 setHeadAndPropagate」

2.2 共享式获取响应中断

#acquireSharedInterruptibly(int arg)方法,代码如下:

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

「1.2 独占式获取响应中断」类似,就不重复解析了。

2.3 共享式超时获取

#tryAcquireSharedNanos(int arg, long nanosTimeout)方法,代码如下:

1
2
3
4
5
6
7
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
    if (Thread.interrupted())
    throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
    doAcquireSharedNanos(arg, nanosTimeout);
}

「1.3 独占式超时获取」类似,就不重复解析了。

2.4 共享式同步状态释放

当线程获取同步状态后,执行完相应逻辑后,就需要释放同步状态。AQS 提供了#releaseShared(int arg)方法,释放同步状态。代码如下:

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

第 2 行:调用#tryReleaseShared(int arg)方法,去尝试释放同步状态,释放成功则设置锁状态并返回 true ,否则获取失败,返回 false 。同时,它们分别对应【第 3 至 5】和【第 6 行】的逻辑。

#tryReleaseShared(int arg)方法,需要自定义同步组件自己实现,该方法必须要保证线程安全的释放同步状态。代码如下:

1
2
3
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

直接抛出 UnsupportedOperationException 异常。

第 3 行:调用#doReleaseShared()方法,唤醒后续的共享式获取同步状态的节点。

2.4.1 doReleaseShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
    /*
    * Ensure that a release propagates, even if there are other
    * in-progress acquires/releases. This proceeds in the usual
    * way of trying to unparkSuccessor of head if it needs
    * signal. But if it does not, status is set to PROPAGATE to
    * ensure that upon release, propagation continues.
    * Additionally, we must loop in case a new node is added
    * while we are doing this. Also, unlike other uses of
    * unparkSuccessor, we need to know if CAS to reset status
    * fails, if so rechecking.
    */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
            continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
        break;
    }
}

TODO 9003 doReleaseShared 的详细逻辑。可参考博客:http://zhanjindong.com/2015/03/15/java-concurrent-package-aqs-AbstractQueuedSynchronizer

参考资料

666. 彩蛋

如果你对 Java 并发感兴趣,欢迎加入我的知识星球一起交流。

AQS 的整体设计非常精巧,所以很多地方设计的很难理解。即使老艿艿基于小明哥的博客,并且 Google 了很多博客,发现自己还是无法透彻理解。同时,大量的博客,也没写清楚很多细节。膜拜大神 Doug Lea 的同时,不得不说有点苦闷,无法很干净的理解和描述 AQS 的实现。

Measure

Measure

本文由作者按照 CC BY 4.0 进行授权