ReentrantLock
先看AQS
可重入锁、独占锁!!!
ReentrantLock
实现了 Lock
接口,实现 lock
等方法,lock
由 Sync
实现,而 Sync
有抽象方法 lock
,由其子类实现
ReentrantLock
的抽象子类 Sync
继承了 AQS
,子类公平锁 FairSync
和非公平锁 NonfairSync
继承 Sync
如果是同一个线程拿锁,就直接叠加状态 state
有两个构造,默认实现非公平锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public ReentrantLock() { sync = new NonfairSync(); }
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
public void lock() { sync.lock(); }
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
|
为什么默认是非公平锁?
- 在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟,这时候锁是空闲的,这段时间是浪费的
- 非公平锁是插队,但是如果插不了也要继续排队的( 在 lock 方法和 acquire 均有体现 )
Sync
继承了 AQS
,那么需要实现独占锁相对应的 tryAcquire
方法,来获取锁( 读写锁就需要相对应的共享锁了 )
nonfairTryAcquire 和 tryAcquire
Sync
实现了非公平锁获取锁的逻辑,公平锁自己会实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L;
final void lock() { acquire(1); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
|
响应中断
关键:doAcquireInterruptibly
lock
是不响应中断的
要响应中断,要用 lockInterruptibly
方法,有中断就会抛出 InterruptedException
1 2 3
| public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
|
调用 AQS
的 acquireInterruptibly
1 2 3 4 5 6 7 8 9 10 11 12
| public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
|
doAcquireInterruptibly
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
|
1 2 3 4 5 6
| private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
|
cancelAcquire
cancelAcquire
取消了当前节点的排队,还会同时将当前节点之前的那些已经 CANCEL 掉的节点移出队列
在并发条件下,新的节点可能已经入队了,成为了新的尾节点,会将当前节点即尾节点的 waitStatus 修改成了 SIGNAL,而在这时,我们发起了中断,又将这个 waitStatus 修改成 CANCELLED,它的闹钟就莫得了,所以在当前节点在出队之前,要负责唤醒后面新加入的节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| private void cancelAcquire(Node node) { if (node == null) return;
node.thread = null;
Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); }
node.next = node; } }
|
tryLock
无论是公平还是非公平,tryLock
都是实现了非公平的尝试
仅仅是用于检查锁在当前调用的时候是不是可获得的
1 2 3
| public boolean tryLock() { return sync.nonfairTryAcquire(1); }
|
带超时的 tryLock
1 2 3 4
| public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
|
看 AQS 的独占式超时获取
unlock
1 2 3
| public void unlock() { sync.release(1); }
|
newCondition
1 2 3
| public Condition newCondition() { return sync.newCondition(); }
|
直接 new 一个 AQS 的 ConditionObject:
1 2 3
| final ConditionObject newCondition() { return new ConditionObject(); }
|
参考文章