AQS
要点
- 独占式
- 共享式
- 独占式超时
简述
队列同步器 AbstractQueueSynchronizer
是用来构建锁或者其他同步组件的基础框架。
它使用了 state
表示同步状态,volatile 修饰
1 | /** |
通过内置的 FIFO 双向队列 来放还没获取到锁的线程( 节点 ),由 [内部类 Node](#内部类 Node) 组成
主要的使用方式是继承。子类继承 AQS
并实现抽象方法来管理同步状态。同步状态的管理需要同步器提供的三个方法:getState()
、setState(int newState)
、compareAndSetState(int expect, int update)
。这些方法能够保证状态的改变是安全的。
继承同步器需要重写指定的方法,然后同步器有固定的模板方法,这些模板方法会调用重写的方法,实现同步
AQS
可以支持独占式获取同步状态,也可以共享式获取。以实现不同类型的同步组件:ReentrantLock( 可重入锁、独占锁 )
、ReentrantReadWriteLock( 可重入读写锁、共享锁 )
、CountDownLatch
…
AQS
继承了 AbstractOwnableSynchronizer
,能得知是哪个线程持有锁:
1 | /** |
内部类 Node
1 | // 此节点所属线程 |
Node 状态
Node
作为队列的节点,有下面的四种状态,由 waitStatus
来决定( 默认为 0 )
1 | // 节点取消、放弃在同步队列中竞争 |
理解 SIGNAL
- 如果当前节点为
SIGNAL
,表示它的后继节点已经挂起,或者即将被挂起,因此如果一个节点是SIGNAL
,在释放时,还要有一个额外的操作:唤醒它的后继节点 - 一般是新加入的节点给前一个节点设置
SIGNAL
,而新加入的节点,即队列的尾节点,状态为 0 - 而当新加入的节点要挂起( park、BLOCKED )时,要看前驱是不是
SIGNAL
,要设置它,相当于让前驱给自己订了一个闹钟,如果前驱释放,让它记得叫醒自己
同步队列
当前线程获取同步状态失败时,AQS
会将当前线程以及等待状态等信息构成一个 Node
节点,并加入同步队列尾部,同时阻塞当前线程。当头节点释放状态时,会把后续节点唤醒,再次尝试获取同步状态
同步器提供了基于 CAS
的设置尾节点的方法:compareAndSetTail(Node expect, Node update)
首节点的线程释放同步状态时,会唤醒后继结点,后继结点获取同步状态成功时会将自己设置为首节点
而队列的首节点的 thread
变量是 null,不代表任何线程!!!
因为 exclusiveOwnerThread
已经记录了,也可以认为是当前持有锁的线程,不参与排队,因为它已经获得了同步状态了,方便 gc
1 | private void setHead(Node node) { |
队列的初始化是延时的,发生在 enq
方法
独占式状态获取与释放
获取
acquire
使用 acquire
方法,且对中断不敏感( 线程获取状态失败进入队列,线程中断,线程不会从队列移除 )
1 | public final void acquire(int arg) { |
tryAcquire
首先使用 tryAcqure( 由继承 AQS 的子类实现 )
获取同步状态
tryAcqure
的作用简单来说就是把同步状态state
从 0 修改为 非 0
addWaiter
如果获取锁失败,就构造 Node
节点,并使用 addWaiter
尝试加入队列尾部
成功就返回
不成功就 enq
死循环继续
- 因为可能多个线程尝试
CAS
设置尾节点,如果被其他线程抢先,CAS
就失败了,就得进入enq
重新拿到尾节点,重新CAS
设置尾节点 enq
的作用除了死循环 + CAS
设置尾节点,还有初始化队列
1 | private Node addWaiter(Node mode) { |
enq
使用 enq + 死循环
保证节点能顺利添加到尾部( CAS
)
如果队列为空,还要初始化队列
- 这里没有用新传进来的
Node
,而是 new 了一个空的Node
!!!!然后重新循环
尾节点的设置不是原子操作,可能导致“尾分叉”
1 | private Node enq(final Node node) { |
尾分叉
这里的三步并不是一个原子操作,第一步很容易成功;而第二步由于是一个 CAS
操作,在并发条件下有可能失败,第三步只有在第二步成功的条件下才执行
这里的 CAS
保证了同一时刻只有一个节点能成为尾节点,其他节点将失败,失败后将回到 for 循环中继续重试
所以,当有大量的线程在同时入队的时候,同一时刻,只有一个线程能完整地完成这三步,而其他线程只能完成第一步,于是就出现了尾分叉:
而在下一轮的循环中,它们的 prev
属性会重新指向新的尾节点,继续尝试新的 CAS
操作,最终,所有节点都会通过自旋不断的尝试入队,直到成功为止
acquireQueued
把新的 Node
设置为尾节点后,acquireQueued
会让节点一直 自旋
等待获取同步( 等待是因为前驱不是头节点的情况,等待是不会尝试获取同步状态的 )( 并会 阻塞( BLOCKED ) 节点的线程!!! )
BLOCKED
是指线程正在等待获取锁
只有前驱节点是 头节点 才能尝试获取同步状态
成功当然好,荣升头节点
失败了说明锁在头节点那里,人家还没用完呢,那就只能继续 BLOCKED ( park )
,设置前驱为 SIGNAL
( 如果前驱不是的话 ),等待锁释放( shouldParkAfterFailedAcquire
和 parkAndCheckInterrupt
)
shouldParkAfterFailedAcquire
是使用CAS
将前驱节点状态设置成SIGNAL(-1)
CAS
设置失败则说明shouldParkAfterFailedAcquire
返回false
,然后会在acquireQueued
中死循环继续重试- 直到
CAS
设置SIGNAL
成功shouldParkAfterFailedAcquire
返回true
时,才会执行parkAndCheckInterrupt
来阻塞当前节点( 让前驱设置了闹钟,它会在释放的时候唤醒自己 )
1 | final boolean acquireQueued(final Node node, int arg) { |
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
1 | private final boolean parkAndCheckInterrupt() { |
对获取过程的总结
- 调用
acquire
,接着会调用tryAcquire
尝试获取锁,即设置state
为 1- 如果失败,就使用
addWaiter
构造新的Node
节点- 如果是第一次,则直接调用
enq
初始化队列,即设置一个属性为空的头节点,接着在enq
里面继续下一个循环来CAS
设置尾节点为之前构造的Node
节点 - 如果不是第一次,尝试
CAS
插入同步队列尾部,失败也到 enq死循环 + CAS
- 直到成功设置尾节点,
addWaiter
结束,回到外层的acquireQueued
- 如果是第一次,则直接调用
acquireQueued
会一直自旋,如果前驱节点为头节点,就尝试获取锁(tryAcquire
)- 如果成功拿到锁,就要把自己设置为头节点,这时候就不需要
CAS
了,因为已经拿到锁了 - 如果失败,就进入
shouldParkAfterFailedAcquire
看看前驱节点是不是SIGNAL
- 是就返回
true
,再执行parkAndCheckInterrupt
继续阻塞自己,返回acquireQueued
- 如果不是,就尝试
CAS
设置前驱节点为SIGNAL
,但成功是返回false
,返回到acquireQueued
继续循环- 如果不是
SIGNAL
,而且等于 0,就意味着是CANCELLED
,即前驱节点是放弃竞争的,就往前找前驱节点的“替代者”,找到了返回到acquireQueued
继续循环
- 如果不是
- 是就返回
- 如果成功拿到锁,就要把自己设置为头节点,这时候就不需要
- 如果失败,就使用
独占式释放
使用 release
释放同步状态,会唤醒它的后续节点
tryRelease
释放同步状态- 使用
unparkSuccessor
唤醒后续节点( 一个 ),会使用LockSupport
来唤醒( unpark 方法 )
1 | public final boolean release(int arg) { |
1 | private void unparkSuccessor(Node node) { |
if (s == null || s.waitStatus > 0) 代表:
后继节点不存在,或者后继节点取消了排队( CANCELLED )
之所以从后往前遍历是因为之前提到的“尾分叉”
我们是处于多线程并发的条件下的,如果一个节点的
next
属性为null
,并不能保证它就是尾节点( 可能是因为新加的尾节点还没来得及执行pred.next = node
),但是一个节点如果能入队,则它的prev
属性一定是有值的,所以反向查找一定是最精确的
共享式状态获取与释放
以读写为例,同一时刻能多个读,不能写;同一时刻有一个写,不能读
共享状态下,获取与释放都会唤醒后续节点!!!
获取
使用 acquireShared
可以共享式获取同步状态
1 | public final void acquireShared(int arg) { |
tryAcquireShared
获取成功返回值 >= 0
尝试失败,在 doAcquireShared
中,自旋,且前驱为头节点,尝试获取同步状态,阻塞啥的,跟独占式几乎一模一样
不同的是:
tryAcquireShared
获取成功后,就能唤醒后继节点开始获取同步状态了,而不是像独占式那样,需要头节点释放,才能唤醒后续节点- 头节点的设置问题:
setHeadAndPropagate
1 | private void doAcquireShared(int arg) { |
setHeadAndPropagate
在 setHeadAndPropagate
中,除了 setHead
之外,还有一个 doReleaseShared
来唤醒后续节点
这里唤醒了,等下头节点释放又唤醒一次
等等在 释放 那里再看
1 | private void setHeadAndPropagate(Node node, int propagate) { |
1 | private void setHead(Node node) { |
释放
使用 releaseShared
释放同步状态
1 | public final boolean releaseShared(int arg) { |
它和独占式的区别在于 tryReleaseShared
必须确保同步状态线程安全释放,一般是通过 循环 + CAS
保证的,因为释放同步状态的线程可能有多个
doReleaseShared
上面说的,是说会有多个线程调用 doReleaseShared
,而且每个线程会有两次调用
- 一次是成为头节点时调用
- 一次是头节点释放时调用
1 | private void doReleaseShared() { |
多个线程意味着头节点会变,而循环退出的条件是:
1 | if (h == head) // loop if head changed |
意思就是:如果头节点仍然是同一个节点,就退出,否则继续循环
为什么这样做?
因为是多个线程,线程 1 成为头节点,会调用 doReleaseShared1,唤醒线程 2
线程 2成功成为头节点,接着又调用 doReleaseShared2,唤醒线程 3
此时 doReleaseShared1 还没结束,但是头节点已经变了,所以继续循环
循环开始,拿到头节点:Node h = head
这时候线程 3成功成为头节点,doReleaseShared3…….
然后就有大量线程在执行 doReleaseShared
注意现在是共享式,就是要效率,就是要共享
独占式超时获取
在指定时间内获取同步状态,能响应中断
如果 nanosTimeout
小于等于 spinForTimeoutThreshold
( 1000 纳秒 ),就快速进入自旋。
独占式超时获取和之前的独占式获取( acquire
)很像,acquire 在未获取的时候,会使当前线程一直处于等待状态,而 doAcquireNanos
会使当前线程等待 nanosTimeout
纳秒,如果线程还没有获取到同步状态,就从等待逻辑中自动返回
1 | public final boolean tryAcquireNanos(int arg, long nanosTimeout) |
1 | private boolean doAcquireNanos(int arg, long nanosTimeout) |