AQS

要点

简述

队列同步器 AbstractQueueSynchronizer 是用来构建锁或者其他同步组件的基础框架。

它使用了 state 表示同步状态,volatile 修饰

1
2
3
4
/**
* The synchronization state.
*/
private volatile int state;

通过内置的 FIFO 双向队列 来放还没获取到锁的线程( 节点 ),由 [内部类 Node](#内部类 Node) 组成

主要的使用方式是继承。子类继承 AQS 并实现抽象方法来管理同步状态。同步状态的管理需要同步器提供的三个方法:getState()setState(int newState)compareAndSetState(int expect, int update)。这些方法能够保证状态的改变是安全的。

继承同步器需要重写指定的方法,然后同步器有固定的模板方法,这些模板方法会调用重写的方法,实现同步

AQS 可以支持独占式获取同步状态,也可以共享式获取。以实现不同类型的同步组件:ReentrantLock( 可重入锁、独占锁 )ReentrantReadWriteLock( 可重入读写锁、共享锁 )CountDownLatch

AQS 继承了 AbstractOwnableSynchronizer ,能得知是哪个线程持有锁:

1
2
3
4
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;

内部类 Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 此节点所属线程
volatile Thread thread;
// 此节点前驱
volatile Node prev;
// 后驱
volatile Node next;

// 此节点为共享模式
static final Node SHARED = new Node();
// 此节点为独占模式
static final Node EXCLUSIVE = null;
// new Node() 和 null 只是用来区分而已,new Node() 并不会串联节点

// 用于 Condition 和 共享式
Node nextWaiter;

Node 状态

Node 作为队列的节点,有下面的四种状态,由 waitStatus 来决定( 默认为 0 )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 节点取消、放弃在同步队列中竞争
static final int CANCELLED = 1;
// 此节点的后继节点的线程处于等待状态
// 如果当前节点释放同步状态,则会通知后继节点,使得后继节点的线程能够尝试获取状态
// 一般是新加入的节点给前一个节点设置 SIGNAL(新加入的节点,即队列的尾节点,状态为 0)
// 如果当前节点为 SIGNAL,表示它的后继节点已经挂起,或者即将被挂起,因此如果一个节点是 SIGNAL
// 在释放时,还要有一个额外的操作:唤醒它的后继节点
static final int SIGNAL = -1;
// 该节点当前在条件等待队列中( 这是 Condition 的东西了 )
static final int CONDITION = -2;
// 表示下一次共享式同步状态获取将会无条件传播下去,大量线程在 doReleaseShared
// 在 doReleaseShared 中设置
static final int PROPAGATE = -3;

volatile int waitStatus;

理解 SIGNAL

  • 如果当前节点为 SIGNAL,表示它的后继节点已经挂起,或者即将被挂起,因此如果一个节点是 SIGNAL,在释放时,还要有一个额外的操作:唤醒它的后继节点
  • 一般是新加入的节点给前一个节点设置 SIGNAL,而新加入的节点,即队列的尾节点,状态为 0
  • 而当新加入的节点要挂起( park、BLOCKED )时,要看前驱是不是 SIGNAL,要设置它,相当于让前驱给自己订了一个闹钟,如果前驱释放,让它记得叫醒自己

同步队列

当前线程获取同步状态失败时,AQS 会将当前线程以及等待状态等信息构成一个 Node 节点,并加入同步队列尾部,同时阻塞当前线程。当头节点释放状态时,会把后续节点唤醒,再次尝试获取同步状态

同步器提供了基于 CAS 的设置尾节点的方法:compareAndSetTail(Node expect, Node update)

首节点的线程释放同步状态时,会唤醒后继结点,后继结点获取同步状态成功时会将自己设置为首节点

而队列的首节点的 thread 变量是 null,不代表任何线程!!!

因为 exclusiveOwnerThread 已经记录了,也可以认为是当前持有锁的线程,不参与排队,因为它已经获得了同步状态了,方便 gc

1
2
3
4
5
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

队列的初始化是延时的,发生在 enq 方法

独占式状态获取与释放

获取

acquire

使用 acquire 方法,且对中断不敏感( 线程获取状态失败进入队列,线程中断,线程不会从队列移除 )

1
2
3
4
5
6
7
8
public final void acquire(int arg) {
// tryAcqure 获取同步状态,失败就加入同步队列尾部
if (!tryAcquire(arg) &&
// EXCLUSIVE: 独占式,意味着 nextWaiter 就为 null 了
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 因为不响应中断,所以在全部搞完,出去之前,再让线程自身中断( 线程自己判断 )
selfInterrupt();
}

tryAcquire

首先使用 tryAcqure( 由继承 AQS 的子类实现 ) 获取同步状态

  • tryAcqure 的作用简单来说就是把同步状态 state 从 0 修改为 非 0

addWaiter

如果获取锁失败,就构造 Node 节点,并使用 addWaiter 尝试加入队列尾部

成功就返回

不成功就 enq 死循环继续

  • 因为可能多个线程尝试 CAS 设置尾节点,如果被其他线程抢先,CAS 就失败了,就得进入 enq 重新拿到尾节点,重新 CAS 设置尾节点
  • enq 的作用除了 死循环 + CAS 设置尾节点,还有初始化队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addWaiter(Node mode) {
// new 一个 Node 节点,传入的是 EXCLUSIVE
// 构造里面的 nextWaiter 被设置为 null,可知每个独占式的节点的 nextWaiter 一定为 null
Node node = new Node(Thread.currentThread(), mode);
// 尾节点不为空
if (pred != null) {
// 设置新的 Node 节点的前驱节点为之前的尾节点
node.prev = pred;
// CAS 设置新的 Node 节点为尾结点,成功就返回,不成功就 enq 死循环保证能设置
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// “死循环” 保证节点能顺利添加到尾部( CAS ),enq 也用于初始化
// 尾节点如果为空,即还没初始化 -> enq
enq(node);
return node;
}

enq

使用 enq + 死循环 保证节点能顺利添加到尾部( CAS )

如果队列为空,还要初始化队列

  • 这里没有用新传进来的 Node,而是 new 了一个空的 Node!!!!然后重新循环

尾节点的设置不是原子操作,可能导致“尾分叉”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 队列空,初始化
if (t == null) { // Must initialize
// 设置一个空的 Node,head 和 tail 指向他,下一个循环再添加节点
if (compareAndSetHead(new Node()))
tail = head;
} else { // 尾节点的设置不是原子操作,可能导致“尾分叉”
node.prev = t; // 1
if (compareAndSetTail(t, node)) { // 2
t.next = node; // 3
return t;
}
}
}
}
尾分叉

这里的三步并不是一个原子操作,第一步很容易成功;而第二步由于是一个 CAS 操作,在并发条件下有可能失败,第三步只有在第二步成功的条件下才执行

这里的 CAS 保证了同一时刻只有一个节点能成为尾节点,其他节点将失败,失败后将回到 for 循环中继续重试

所以,当有大量的线程在同时入队的时候,同一时刻,只有一个线程能完整地完成这三步,而其他线程只能完成第一步,于是就出现了尾分叉:

image-20201216225131244

而在下一轮的循环中,它们的 prev 属性会重新指向新的尾节点,继续尝试新的 CAS 操作,最终,所有节点都会通过自旋不断的尝试入队,直到成功为止

acquireQueued

把新的 Node 设置为尾节点后,acquireQueued 会让节点一直 自旋 等待获取同步( 等待是因为前驱不是头节点的情况,等待是不会尝试获取同步状态的 )( 并会 阻塞( BLOCKED ) 节点的线程!!! )

BLOCKED 是指线程正在等待获取锁

只有前驱节点是 头节点 才能尝试获取同步状态

成功当然好,荣升头节点

失败了说明锁在头节点那里,人家还没用完呢,那就只能继续 BLOCKED ( park ),设置前驱为 SIGNAL ( 如果前驱不是的话 ),等待锁释放( shouldParkAfterFailedAcquireparkAndCheckInterrupt )

  • shouldParkAfterFailedAcquire 是使用 CAS 将前驱节点状态设置成 SIGNAL(-1)
  • CAS 设置失败则说明 shouldParkAfterFailedAcquire 返回 false,然后会在 acquireQueued 中死循环继续重试
  • 直到 CAS 设置 SIGNAL 成功 shouldParkAfterFailedAcquire 返回 true 时,才会执行 parkAndCheckInterrupt 来阻塞当前节点( 让前驱设置了闹钟,它会在释放的时候唤醒自己 )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 只有前驱是头节点才能尝试获取同步状态
if (p == head && tryAcquire(arg)) {
// 获取同步状态成功
// 设置头节点
// 这里都不需要 CAS,因为已经拿到锁了
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取同步状态失败,会将前驱节点设置为 SIGNAL 状态,然后阻塞
// shouldParkAfterFailedAcquire 返回 true 时才会执行 parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // -1
/*
* 前驱状态是 SIGNAL 了,就返回 true,执行 parkAndCheckInterrupt 来阻塞,等头节点唤醒
*/
return true;
// 大于 0 只能是 CANCELLED 了,即这个节点已经取消竞争了,跳过它,往前
if (ws > 0) {
// 往前拿节点,直到节点的小于等于 0
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 拿到最近的有效的前驱节点,使之后驱为本节点 node,跳出到 acquireQueued 重新循环
pred.next = node;
} else {
/*
* 这时候前驱的 watiStatus 肯定是 0 或者 PROPAGATE
* 要设置前驱为 SIGNAL,给自己设闹钟!!让它释放的时候记得唤醒自己
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1
2
3
4
5
private final boolean parkAndCheckInterrupt() {
// 挂起、阻塞自己
LockSupport.park(this);
return Thread.interrupted();
}

对获取过程的总结

  • 调用 acquire,接着会调用 tryAcquire 尝试获取锁,即设置 state 为 1
    • 如果失败,就使用 addWaiter 构造新的 Node 节点
      • 如果是第一次,则直接调用 enq 初始化队列,即设置一个属性为空的头节点,接着在 enq 里面继续下一个循环来 CAS 设置尾节点为之前构造的 Node 节点
      • 如果不是第一次,尝试 CAS 插入同步队列尾部,失败也到 enq 死循环 + CAS
      • 直到成功设置尾节点,addWaiter 结束,回到外层的 acquireQueued
    • acquireQueued 会一直自旋,如果前驱节点为头节点,就尝试获取锁( tryAcquire )
      • 如果成功拿到锁,就要把自己设置为头节点,这时候就不需要 CAS 了,因为已经拿到锁了
      • 如果失败,就进入 shouldParkAfterFailedAcquire 看看前驱节点是不是 SIGNAL
        • 是就返回 true,再执行 parkAndCheckInterrupt 继续阻塞自己,返回 acquireQueued
        • 如果不是,就尝试 CAS 设置前驱节点为 SIGNAL,但成功是返回 false,返回到 acquireQueued 继续循环
          • 如果不是 SIGNAL,而且等于 0,就意味着是 CANCELLED,即前驱节点是放弃竞争的,就往前找前驱节点的“替代者”,找到了返回到 acquireQueued 继续循环

独占式释放

使用 release 释放同步状态,会唤醒它的后续节点

  • tryRelease 释放同步状态
  • 使用 unparkSuccessor 唤醒后续节点( 一个 ),会使用 LockSupport 来唤醒( unpark 方法 )
1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 拿到头节点,判空和看看状态是不是头
if (h != null && h.waitStatus != 0)
// 释放
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 设置 waitStatus 为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
// 后继节点不存在,或者后继节点取消了排队
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 这里使用 LockSupport 来唤醒后续节点( 只是一个 ),让它尝试获取同步状态
if (s != null)
LockSupport.unpark(s.thread);
}

if (s == null || s.waitStatus > 0) 代表:

后继节点不存在,或者后继节点取消了排队( CANCELLED )

之所以从后往前遍历是因为之前提到的“尾分叉

我们是处于多线程并发的条件下的,如果一个节点的 next 属性为 null,并不能保证它就是尾节点( 可能是因为新加的尾节点还没来得及执行 pred.next = node ),但是一个节点如果能入队,则它的 prev 属性一定是有值的,所以反向查找一定是最精确的

共享式状态获取与释放

以读写为例,同一时刻能多个读,不能写;同一时刻有一个写,不能读

共享状态下,获取与释放都会唤醒后续节点!!!

获取

使用 acquireShared 可以共享式获取同步状态

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

tryAcquireShared 获取成功返回值 >= 0

尝试失败,在 doAcquireShared 中,自旋,且前驱为头节点,尝试获取同步状态,阻塞啥的,跟独占式几乎一模一样

不同的是:

  • tryAcquireShared 获取成功后,就能唤醒后继节点开始获取同步状态了,而不是像独占式那样,需要头节点释放,才能唤醒后续节点
  • 头节点的设置问题:setHeadAndPropagate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireShared(int arg) {
// 节点以 SHARED 共享状态放入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 只有前驱节点是头节点,才能尝试获取同步状态
if (p == head) {
int r = tryAcquireShared(arg);
// 获取成功,设置头节点为自己,balabala...然后退出
if (r >= 0) {
// 注意!!!
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate

setHeadAndPropagate 中,除了 setHead 之外,还有一个 doReleaseShared 来唤醒后续节点

这里唤醒了,等下头节点释放又唤醒一次

等等在 释放 那里再看

1
2
3
4
5
6
7
8
9
10
11
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 唤醒后续节点
doReleaseShared();
}
}
1
2
3
4
5
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

释放

使用 releaseShared 释放同步状态

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// tryReleaseShared 释放
if (tryReleaseShared(arg)) {
// 释放成功的话,就 doReleaseShared 唤醒后续节点
doReleaseShared();
return true;
}
return false;
}

它和独占式的区别在于 tryReleaseShared 必须确保同步状态线程安全释放,一般是通过 循环 + CAS 保证的,因为释放同步状态的线程可能有多个

doReleaseShared

上面说的,是说会有多个线程调用 doReleaseShared,而且每个线程会有两次调用

  • 一次是成为头节点时调用
  • 一次是头节点释放时调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// SIGNAL 说明后继节点需要释放,尝试设为 0
if (ws == Node.SIGNAL) {
// CAS
// 保证大量的 doReleaseShared 方法在同时执行时
// 只有一个线程 unpark 成功
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 为 0 的情况可能是上面的情况,但是上面的情况不会进入这个条件了
// 那为 0 只能意味着这是同步队列的最后一个节点成为了头节点
// 因为进入队列的新节点的状态是 0
// 而 compareAndSetWaitStatus(h, 0, Node.PROPAGATE) 失败的情况是
// 执行到这儿的瞬间,有新节点进入
// 新节点进入队列时要把前驱节点设为 SIGNAL,这时候本节点就不是 0 了
// 就会 continue,继续循环
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

多个线程意味着头节点会变,而循环退出的条件是:

1
2
if (h == head)                   // loop if head changed
break;

意思就是:如果头节点仍然是同一个节点,就退出,否则继续循环

为什么这样做?

因为是多个线程,线程 1 成为头节点,会调用 doReleaseShared1,唤醒线程 2

线程 2成功成为头节点,接着又调用 doReleaseShared2,唤醒线程 3

此时 doReleaseShared1 还没结束,但是头节点已经变了,所以继续循环

循环开始,拿到头节点:Node h = head

这时候线程 3成功成为头节点,doReleaseShared3…….

然后就有大量线程在执行 doReleaseShared

注意现在是共享式,就是要效率,就是要共享


独占式超时获取

在指定时间内获取同步状态,能响应中断

如果 nanosTimeout 小于等于 spinForTimeoutThreshold ( 1000 纳秒 ),就快速进入自旋。

独占式超时获取和之前的独占式获取( acquire )很像,acquire 在未获取的时候,会使当前线程一直处于等待状态,而 doAcquireNanos 会使当前线程等待 nanosTimeout 纳秒,如果线程还没有获取到同步状态,就从等待逻辑中自动返回

1
2
3
4
5
6
7
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 多了时间检测
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

AQS14.jpg

参考文章