ReentrantLock

ReentrantLock

先看AQS

可重入锁、独占锁!!!

ReentrantLock 实现了 Lock 接口,实现 lock 等方法,lockSync 实现,而 Sync 有抽象方法 lock ,由其子类实现

ReentrantLock 的抽象子类 Sync 继承了 AQS,子类公平锁 FairSync 和非公平锁 NonfairSync 继承 Sync

如果是同一个线程拿锁,就直接叠加状态 state

有两个构造,默认实现非公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

public void lock() {
sync.lock();
}

static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 这里会到 AQS 的 acquire,然后到下面的 tryAcquire
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
// tryAcquire 到这里的 nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
}

为什么默认是非公平锁?

  • 在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟,这时候锁是空闲的,这段时间是浪费的
  • 非公平锁是插队,但是如果插不了也要继续排队的( 在 lock 方法和 acquire 均有体现 )

Sync 继承了 AQS,那么需要实现独占锁相对应的 tryAcquire 方法,来获取锁( 读写锁就需要相对应的共享锁了 )

nonfairTryAcquire 和 tryAcquire

Sync 实现了非公平锁获取锁的逻辑,公平锁自己会实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
abstract static class Sync extends AbstractQueuedSynchronizer {
// 非公平锁 获取锁的实现
// 公平锁会自己实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 直接插队
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 可重入
// 如果还是当前线程来拿锁,就直接叠加
// 在释放的时候也要一个一个释放
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

// 对比 FairSync 的 lock
// 这里直接尝试 CAS 获取锁
// 失败了才到 AQS acquire 里面获取锁,获取失败了排队( AQS acquireQueued )
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

// 公平锁获取失败会乖乖排队
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 看前面有没有节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 可重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

响应中断

关键:doAcquireInterruptibly

lock 是不响应中断的

要响应中断,要用 lockInterruptibly 方法,有中断就会抛出 InterruptedException

  • 无论是公平还是非公平,都是同一个
1
2
3
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

调用 AQSacquireInterruptibly

1
2
3
4
5
6
7
8
9
10
11
12
// AQS
public final void acquireInterruptibly(int arg) throws InterruptedException {
// 先检查该线程是不是已经中断
// 如果中断,就直接抛异常
if (Thread.interrupted())
throw new InterruptedException();
// 如果没中断过,就继续尝试用 tryAcquire 获取锁,成功啥事没有
// 不成功就要跟之前的 lock 一样扔进队列,只是这一次是有响应中断的,即有中断,就抛异常
if (!tryAcquire(arg))
// 扔队列,和 acquireQueued 差不多,只是有响应中断
doAcquireInterruptibly(arg);
}

doAcquireInterruptibly

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// AQS
// 有响应中断,对比下面的 acquireQueued
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return; // 这里不一样
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 不一样,如果检测到中断,会抛异常,而且进入 finally
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
// AQS
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 如果有中断,就返回 true,在 doAcquireInterruptibly 里就会抛异常
return Thread.interrupted();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// AQS
// 对比 acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 只有前驱是头节点才能尝试获取同步状态
if (p == head && tryAcquire(arg)) {
// 获取同步状态成功
// 设置头节点
// 这里都不需要 CAS,因为已经拿到锁了
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取同步状态失败,会将前驱节点设置为 SIGNAL 状态,然后阻塞
// shouldParkAfterFailedAcquire 返回 true 时才会执行 parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 不响应中断,只是记录
}
} finally {
if (failed)
cancelAcquire(node);
}
}

cancelAcquire

cancelAcquire 取消了当前节点的排队,还会同时将当前节点之前的那些已经 CANCEL 掉的节点移出队列

在并发条件下,新的节点可能已经入队了,成为了新的尾节点,会将当前节点即尾节点的 waitStatus 修改成了 SIGNAL,而在这时,我们发起了中断,又将这个 waitStatus 修改成 CANCELLED,它的闹钟就莫得了,所以在当前节点在出队之前,要负责唤醒后面新加入的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// AQS
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// 忽略
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
// 由当前节点向前遍历,跳过那些 Cancel 的节点
// 从当前节点向前开始查找,找到第一个 waitStatus > 0 的 Node, 该节点为 pred
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext 即是 pred 节点的下一个节点
// 到这里可知,pred 节点是没有被 Cancel 的节点,但是 pred 节点往后,一直到当前节点 Node 都处于被 Cancel 的状态
Node predNext = pred.next;

// 把当前 Node 节点设为 Cancel
node.waitStatus = Node.CANCELLED;

// 如果当前节点是尾节点,则将之前找到的节点 pred 重新设置成尾节点,并将 pred 节点的 next 属性由 predNext 修改成 Null
// 这一段本质上是将 pred 节点后面的节点全部移出队列,因为它们都被 Cancel 掉了
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 当前节点已经不是尾节点了,或者说设置新的尾节点失败了
// 在并发条件下,其他线程可能已经入队了,成为了新的尾节点
// 虽然我们之前已经将当前节点的 waitStatus 设为了 CANCELLED
// 但在 lock 方法中,新的节点入队后会让前面的节点为自己设闹钟,即将前面节点的 status 设为 SIGNAL 以唤醒自己
// 所以,在当前节点的后继节点入队后,可能将当前节点的 waitStatus 修改成了 SIGNAL
// 而在这时,我们发起了中断,又将这个 waitStatus 修改成 CANCELLED
// 它的闹钟就莫得了
// 所以在当前节点在出队之前,要负责唤醒后继节点
int ws;
// pred 不是头节点,thread 不为 null
// waitStatus 为 SIGNAL 或者是小于等于 0 但是被我们成功的设置成 SIGNAL
// 保证了 pred 确实是一个正在正常等待锁的线程,并且它是 SIGNAL 的
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); // 把 pred 的后继改为新加入的那个
} else {
// 如果不满足,直接唤醒 node 的 next,即新加入的节点
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

tryLock

无论是公平还是非公平,tryLock 都是实现了非公平的尝试

仅仅是用于检查锁在当前调用的时候是不是可获得的

1
2
3
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

带超时的 tryLock

1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

AQS 的独占式超时获取

unlock

1
2
3
public void unlock() {
sync.release(1);
}

newCondition

1
2
3
public Condition newCondition() {
return sync.newCondition();
}

直接 new 一个 AQS 的 ConditionObject:

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

参考文章