ConcurrentHashMap

参考

1.8 相比 1.7 的改变

1.7 的 ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock

理论上 ConcurrentHashMap 支持 CurrencyLevel( Segment 数组数量 )的线程并发

每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment

和 HashMap 一样,查询遍历链表效率太低

1.8 的 ConcurrentHashMap 抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性

必识的变量

和 HashMap 一样

  • 默认数组大小都是 16
  • 负载因子也是 0.75f
  • 链表超过 8 时,且数组长度大于等于 64,转为红黑树,如果数组长度小于 64,就进行扩容
  • 红黑树低于 6 时,转为链表
1
2
3
4
5
6
7
8
9
10
11
12
13
static final int MIN_TREEIFY_CAPACITY = 64;           // 树化最小容量,容量小于64时,先扩容
private static final int MIN_TRANSFER_STRIDE = 16; // 扩容时拆分散列表,最小步长

private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 可参与扩容的最大线程
static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU 数
private transient volatile Node<K,V>[] nextTable; // 扩容时的过度表

private transient volatile int sizeCtl; // 最重要的状态变量
private transient volatile int transferIndex; // 扩容进度指示
private transient volatile long baseCount; // 计数器,基础基数
private transient volatile int cellsBusy; // 计数器,并发标记
private transient volatile CounterCell[] counterCells; // 计数器,并发累计

插入

  • 根据 key 计算出 hashcode
  • 判断是否需要进行初始化
  • f 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功
  • 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容
  • 如果都不满足,则利用 synchronized 锁写入数据
  • 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 和 HashMap 一样,都是高低16位异或,保留高低位信息
int hash = spread(key.hashCode());
// 记录这个槽位的链表的长度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组空,就意味着要初始化,因为是 for 循环,所以重新进入循环
if (tab == null || (n = tab.length) == 0)
// 初始化
tab = initTable();
// 数组不为空,就插入
// 如果数组此下标没元素,直接 cas 插入,成功就 break 了
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// 如果此下标有元素,且元素正在移动,意味着在扩容
else if ((fh = f.hash) == MOVED)
// 那就帮助扩容,看下面的 扩容
tab = helpTransfer(tab, f);
// 如果此下标元素没在移动,那就分链表和红黑树
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
// 头结点的 hash 值大于 0,说明是链表
if (fh >= 0) {
// binCount 计算链表的长度
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果有一样的 key,根据 onlyIfAbsent 来决定要不要替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 如果到链表终点还是没有一样的 key,那就新建元素,插入链表尾部
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果此元素是红黑树节点,就按红黑树的插入方法
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果此槽位链表元素个数超过 8 时,treeifyBin
// 数组长度大于等于 64,转为红黑树
// 数组长度小于 64,就进行扩容
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数器 + 1
addCount(1L, binCount);
return null;
}
image-20210108221004937

  • 根据计算出来的 hash 寻址,如果就在桶上那么直接返回值
  • 如果是红黑树那就按照树的方式获取值
  • 就不满足那就按照链表的方式遍历获取值

扩容

如果在插入时,此槽位链表元素个数超过 8 时,执行 treeifyBin 方法

  • 如果数组长度大于等于 64,转为红黑树
  • 如果数组长度小于 64,就 扩容( tryPresize )
1
2
3
4
5
6
7
8
9
10
11
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 数组长度小于 64,就扩容,实参这里已经翻倍了!!
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 转红黑树
}
}
}

tryPresize -> transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 实参这里已经翻倍了
private final void tryPresize(int size) {
// 如果翻倍的 size 大于最大容量的一半,那 c 就直接等于最大容量
// 如果小于等于,那 c 就等于 size 加上它自己的一半再加 1
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

任务拆分,多线程同时扩容的方式,加速扩容,对 size 使用计数器思想

  • 节点首先移动到过度表 nextTable,所有节点移动完毕时替换散列表 table
  • 移动时先将散列表定长等分,然后 逆序 依次领取任务扩容,设置 sizeCtl 标记正在扩容
  • 移动完成一个哈希桶或者遇到空桶时,将其标记为 ForwardingNode 节点,并指向 nextTable
  • 后有其他线程在操作哈希表时,遇到 ForwardingNode 节点,则先帮助扩容( 继续领取分段任务 ),扩容完成后再继续之前的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

参考文章