Condition

使用

任何一个类都有父类 Object 提供的等待/通知的监视器方法:waitnotify 等等

Condition 提供类似的等待/通知方法,awaitsignal

1
2
3
4
5
6
7
8
9
10
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

lock.lock();
try {
condition.await();
// condition.signal();
} finaly {
lock.unlock();
}

对于 locknewCondition,是 new 了一个 ConditionObject

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

数据结构 & 原理

这个 ConditionObject 定义在 AQS

ConditionObject 同样维护了一个队列,叫 条件队列( condition queue )

它和锁的 同步队列( sync queue )一样,都是由 AQS 定义的 Node 节点组成,不同的是:

  • 在同步队列中
    • Node 节点的连接是用 prevnext,是双向队列,nextWaiter 属性为 null
    • waitStatusCANCELLEDSIGNAL
  • 在条件队列中
    • Node 节点的连接是用 nextWaiter,是单向队列,prevnext 都为 null
    • waitStatus 只有 CONDITION( - 2 )一个,它表示线程处于正常的等待状态

每创建一个 Condtion 对象就会 对应一个条件队列,每一个调用了 Condtion 对象的 await 方法的线程都会被包装成 Node 扔进一个条件队列中

每个条件队列都是独立的,互相不影响

sync queue 是等待锁的队列,当一个线程被包装成 Node 加到该队列中时,必然是没有获取到锁;当处于该队列中的节点获取到了锁,它将从该队列中移除( 事实上移除操作是将获取到锁的节点设为新的 dummy head并将 thread 属性置为 null )

condition queue 是等待在 特定条件 下的队列,因为调用 await 方法时,必然是已经获得了 lock 锁,所以在进入 condition queue 之前 已经获取了锁;在被包装成 Node 扔进条件队列中后,线程将 释放 锁,然后挂起;当处于该队列中的线程被 signal 方法唤醒后,由于队列中的节点在之前挂起的时候已经释放了锁,被唤醒的线程和普通线程一样需要去争锁,如果没有抢到,则同样要被加到等待锁的 sync queue 中,此时节点就从 condition queue 被转移到 sync queue 中。因此,条件队列在出队时,线程并不持有锁

所以事实上,这两个队列的锁状态正好相反:

  • condition queue:入队时已经持有了锁 -> 在队列中释放锁 -> 离开队列时没有锁 -> 转移到 sync queue
  • sync queue:入队时没有锁 -> 在队列中争锁 -> 离开队列时获得了锁

唤醒的线程加到 sync queue 时,Node 是被一个一个转移过去的,哪怕调用的是 signalAll 也是一个一个转移过去的,而不是将整个 condition queue 接在 sync queue 的末尾

signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final void signal() {
// 先看此线程是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 把第一个 Node 移到 sync queue 并唤醒
Node first = firstWaiter;
if (first != null)
// 执行
doSignal(first);
}

// 这里只移动第一个 Node
private void doSignal(Node first) {
do {
// 修改队列第一个节点 firstWaiter
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 把之前的第一个节点 first 独立出来
first.nextWaiter = null;
// transferForSignal 移动 first,成功就结束循环了
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {

// 修改移动的节点的 waitStatus
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

// enq 自旋 + CAS 加到 sync queue 尾部
// 成功后返回前一个节点
Node p = enq(node);
// 拿到前一个节点的 waitStatus
int ws = p.waitStatus;
// 设置前一个节点的 waitStatus 为 SIGNAL,相对于设闹钟,然后唤醒 unpark 尾节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

await

对于中断模式 interruptMode,有:

  • THROW_IE( - 1 )
    • 表示退出 await 方法时需要抛出 InterruptedException,这种模式对应于中断发生在 signal 之前
  • 0
    • 表示整个过程中一直没有中断发生
  • REINTERRUPT( 1 )
    • 表示退出 await 时只需要再自我中断,这种模式对应于中断发生在 signal 之后,即中断来的太晚了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 构造 Node 节点,把节点加到条件队列
Node node = addConditionWaiter();
// 释放所有锁,包括多次重入的锁,savedState 是释放之前的锁状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果没有出现在 sync queue,说明已经 await,还没被 signal
// 此时应该把线程挂起,等待 signal
while (!isOnSyncQueue(node)) {
// 挂起当前线程,此时线程已经挂起,没有继续执行
LockSupport.park(this);
// 当执行到这里,有两种情况
// 1.当前线程被 signal
// 2.当前线程中断
// checkInterruptWhileWaiting 检查是否有中断
// 如果在 signal 之前被中断,则返回 THROW_IE
// 在 signal 之后,则返回 REINTERRUPT
// 如果没有被中断,则返回 0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 唤醒后就得按 sync queue 争锁的方式了,抢到锁就返回,抢不到锁就继续被挂起
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 获取到锁后,调用 unlinkCancelledWaiters 将自己从条件队列中移除
// 该方法还会顺便移除其他取消等待的锁
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 处理中断
reportInterruptAfterWait(interruptMode);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果尾节点是 cancel,则先遍历队列,清除所有 cancel 的节点
if (t != null && t.waitStatus != Node.CONDITION) {
// 该方法将从头节点开始遍历整个队列,剔除其中 waitStatus 不为 Node.CONDTION 的节点
unlinkCancelledWaiters();
// 此时拿到 不是 cancel 的尾节点
t = lastWaiter;
}
// 构造新 Node,设为 CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果空,那 node 就是头,不空,node 就是尾
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 处理中断
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// 抛出
if (interruptMode == THROW_IE)
throw new InterruptedException();
// 自己决定
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

signal 唤醒 or 中断唤醒

signal 唤醒为正常唤醒

中断唤醒为不正常唤醒,分为 signal 之前中断 和 signal 之后中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 检查是否有中断
// 如果在 signal 之前被中断,则返回 THROW_IE
// 在 signal 之后被中断,则返回 REINTERRUPT
// 如果没有被中断,则返回 0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 如果 signal 之前发生中断,就加入 sync queue,返回 true
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
// 如果之前状态不是 CONDITION,表明是 signal 之后发生中断
// 看看有没有加入 sync queue
// 到这里只需要等待线程成功进入 sync queue 即可
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

对于在 signal 之前中断唤醒的流程:

  1. 线程因为中断,从挂起的地方被唤醒
  2. 通过transferAfterCancelledWait 确认了线程的 waitStatus 值为 CONDITION,说明并没有 signal 发生过
  3. 然后我们修改线程的 waitStatus 为 0,并通过 enq 将其添加到 sync queue
  4. 接下来线程将在 sync queue 中以阻塞的方式获取,如果获取不到锁,将会被再次挂起
  5. 线程在sync queue中获取到锁后,将调用 unlinkCancelledWaiters 方法将自己从条件队列中移除,该方法还会顺便移除其他取消等待的锁
  6. 最后 reportInterruptAfterWait 抛出 InterruptedException

对于在 signal 之后中断:

忽略中断,推迟至 await() 返回时再发生

总结

  1. 进入 await 时必须是已经持有了锁
  2. 离开 await 时同样必须是已经持有了锁
  3. 调用 await 会使得当前线程被封装成 Node 扔进条件队列,然后释放所持有的锁
  4. 释放锁后,当前线程将在 condition queue 中被挂起,等待 signal 或者中断
  5. 线程被唤醒后会将会离开 condition queue 进入 sync queue 中进行抢锁
  6. 若在线程抢到锁之前发生过中断,则根据中断发生在 signal 之前还是之后记录中断模式
  7. 线程在抢到锁后进行善后工作( 离开 condition queue,处理中断异常 )
  8. 线程已经持有了锁,从 await 方法返回

注意中断

中断和 signal 所起到的作用都是将线程从 condition queue 中移除,加入到 sync queue 中去争锁

所不同的是,signal 方法被认为是正常唤醒线程,中断方法被认为是非正常唤醒线程

如果中断发生在 signal 之前,则我们在最终返回时,应当抛出 InterruptedException

如果中断发生在 signal 之后,我们就认为线程本身已经被正常唤醒了,这个中断来的太晚了,我们直接忽略它,并在 await 返回时再自我中断一下,这种做法相当于将中断推迟至 await() 返回时再发生

在await()方法返回后,如果是因为中断被唤醒,则await()方法需要抛出InterruptedException异常,表示是它是被非正常唤醒的

只要它是在signal之后发生的,我们就认为它来的太晚了,我们将忽略这个中断。因此,从await()方法返回的时候,我们只会将当前线程重新中断一下,而不会抛出中断异常

awaitUninterruptibly

中断属于将一个等待中的线程非正常唤醒,可能即使线程被唤醒后,也抢到了锁,但是却发现当前的等待条件并没有满足,则还是得把线程挂起

awaitUninterruptibly 即不希望 await 方法被中断

awaitNanos

如果设定的超时时间还没到,就将线程挂起

超过等待的时间了,就将线程从 condtion queue 转移到 sync queue

当设定的超时时间很短时( 小于 spinForTimeoutThreshold ),就简单的自旋,而不是将线程挂起,以减少挂起线程和唤醒线程所带来的时间消耗

await(long time, TimeUnit unit)

awaitNanos(long nanosTimeout) 的返回值是剩余的超时时间

如果该值大于 0,说明超时时间还没到,则说明该返回是由 signal 行为导致的

await(long time, TimeUnit unit) 的返回值就是 transferAfterCancelledWait(node) 的值

  • 如果调用该方法时,node 还没有被 signal 过则返回 true,node 已经被 signal 过了,则返回 false

因此当 await(long time, TimeUnit unit) 方法返回 true,则说明在超时时间到之前就已经发生过 signal 了,该方法的返回是由 signal 方法导致的而不是超时时间

await(long time, TimeUnit unit) 等价于 awaitNanos(unit.toNanos(time)) > 0

参考文章