前言
ConcurrentHashMap
顾名思义就是同步的HashMap,也就是线程安全的HashMap,所以本篇介绍的ConcurrentHashMap和HashMap有着很重要的关系,所以建议之前没有了解过HashMap的可以先看看这篇关于HashMap的原理分析《HashMap从认识到源码分析》,本篇继续以JDK1.8
版本的源码进行分析,最后在介绍完ConcurrentHashMap之后会对ConcurrentHashMap、Hashtable和HashMap做一个比较和总结。
ConcurrentHashMap
我们先看一下ConcurrentHashMap
实现了哪些接口、继承了哪些类,对ConcurrentHashMap
有一个整体认知。ConcurrentHashMap
继承AbstractMap
接口,这个和HashMap
一样,然后实现了ConcurrentMap
接口,这个和HashMap
不一样,HashMap
是直接实现的Map
接口。
再细看ConcurrentHashMap
的结构,这里列举几个重要的成员变量table
、nextTable
、baseCount
、sizeCtl
、transferIndex
、cellsBusy
- table:数据类型是Node数组,这里的Node和HashMap的Node一样都是内部类且实现了
Map.Entry
接口 - nextTable:哈希表扩容时生成的数据,数组为扩容前的2倍
- sizeCtl:多个线程的共享变量,是操作的控制标识符,它的作用不仅包括
threshold
的作用,在不同的地方有不同的值也有不同的用途-1
代表正在初始化-N
代表有N-1
个线程正在进行扩容操作0
代表hash表还没有被初始化- 正数表示下一次进行扩容的容量大小
- ForwardingNode:一个特殊的Node节点,Hash地址为-1,存储着nextTable的引用,只有table发生扩用的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或者已被移动
ConcurrentHashMap
和HashMap
一样都是采用拉链法处理哈希冲突,且都为了防止单链表过长影响查询效率,所以当链表长度超过某一个值时候将用红黑树代替链表进行存储,采用了数组+链表+红黑树的结构
所以从结构上看HashMap
和ConcurrentHashMap
还是很相似的,只是ConcurrentHashMap
在某些操作上采用了CAS
+synchronized
来保证并发情况下的安全。
说到ConcurrentHashMap
处理并发情况下的线程安全问题,这不得不提到Hashtable
,因为Hashtable
也是线程安全的,那ConcurrentHashMap
和Hashtable
有什么区别或者有什么高明之处嘛?以至于官方都推荐使用ConcurrentHashMap
来代替Hashtable
- 线程安全的实现:
Hashtable
采用对象锁(synchronized修饰对象方法)来保证线程安全,也就是一个Hashtable
对象只有一把锁,如果线程1拿了对象A的锁进行有synchronized
修饰的put
方法,其他线程是无法操作对象A中有synchronized
修饰的方法的(如get
方法、remove
方法等),竞争激烈所以效率低下。而ConcurrentHashMap
采用CAS
+synchronized
来保证并发安全性,且synchronized
关键字不是用在方法上而是用在了具体的对象上,实现了更小粒度的锁,等会源码分析的时候在细说这个SUN大师们的鬼斧神工 - 数据结构的实现:
Hashtable
采用的是数组 + 链表,当链表过长会影响查询效率,而ConcurrentHashMap
采用数组 + 链表 + 红黑树,当链表长度超过某一个值,则将链表转成红黑树,提高查询效率。
构造函数
ConcurrentHashMap
的构造函数有5个,从数量上看就和HashMap
、Hashtable
(4个)的不同,多出的那个构造函数是public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
,即除了传入容量大小、负载因子之外还多传入了一个整型的concurrencyLevel
,这个整型是我们预先估计的并发量,比如我们估计并发是30
,那么就可以传入30
。
其他的4个构造函数的参数和HashMap
的一样,而具体的初始化过程却又不相同,HashMap
和Hashtable
传入的容量大小和负载因子都是为了计算出初始阈值(threshold),而ConcurrentHashMap
传入的容量大小和负载因子是为了计算出sizeCtl用于初始化table
,这个sizeCtl即table数组的大小,不同的构造函数计算sizeCtl方法都不一样。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55//无参构造函数,什么也不做,table的初始化放在了第一次插入数据时,默认容量大小是16和HashMap的一样,默认sizeCtl为0
public ConcurrentHashMap() {
}
//传入容量大小的构造函数。
public ConcurrentHashMap(int initialCapacity) {
//如果传入的容量大小小于0 则抛出异常。
if (initialCapacity < 0)
throw new IllegalArgumentException();
//如果传入的容量大小大于允许的最大容量值 则cap取允许的容量最大值 否则cap =
//((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方),
//即如果传入的容量大小是12 则 cap = 32(12 + (12 >>> 1) + 1=19
//向上取2的幂次方即32),这里为啥一定要是2的幂次方,原因和HashMap的threshold一样,都是为
//了让位运算和取模运算的结果一样。
//MAXIMUM_CAPACITY即允许的最大容量值 为2^30。
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
//tableSizeFor这个函数即实现了将一个整数取2的幂次方。
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//将上面计算出的cap 赋值给sizeCtl,注意此时sizeCtl为正数,代表进行扩容的容量大小。
this.sizeCtl = cap;
}
//包含指定Map的构造函数。
//置sizeCtl为默认容量大小 即16。
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//传入容量大小和负载因子的构造函数。
//默认并发数大小是1。
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//传入容量大小、负载因子和并发数大小的构造函数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//如果传入的容量大小 小于 传入的并发数大小,
//则容量大小取并发数大小,这样做的原因是确保每一个Node只会分配给一个线程,而一个线程则
//可以分配到多个Node,比如当容量大小为64,并发数大
//小为16时,则每个线程分配到4个Node。
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
//size = 1.0 + (long)initialCapacity / loadFactor 这里计算方法和上面的构造函数不一样。
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//如果size大于允许的最大容量值则 sizeCtl = 允许的最大容量值 否则 sizeCtl =
//size取2的幂次方。
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
put方法
- 判断键值是否为
null
,为null
抛出异常。 - 调用
spread()
方法计算key的hashCode()获得哈希地址,这个HashMap相似。 - 如果当前table为空,则初始化table,需要注意的是这里并没有加
synchronized
,也就是允许多个线程去尝试初始化table,但是在初始化函数里面使用了CAS
保证只有一个线程去执行初始化过程。 - 使用 容量大小-1 & 哈希地址 计算出待插入键值的下标,如果该下标上的bucket为
null
,则直接调用实现CAS
原子性操作的casTabAt()
方法将节点插入到table中,如果插入成功则完成put操作,结束返回。插入失败(被别的线程抢先插入了)则继续往下执行。 - 如果该下标上的节点(头节点)的哈希地址为-1,代表需要扩容,该线程执行
helpTransfer()
方法协助扩容。 - 如果该下标上的bucket不为空,且又不需要扩容,则进入到bucket中,同时锁住这个bucket,注意只是锁住该下标上的bucket而已,其他的bucket并未加锁,其他线程仍然可以操作其他未上锁的bucket,这个就是ConcurrentHashMap为什么高效的原因之一。
- 进入到bucket里面,首先判断这个bucket存储的是红黑树(哈希地址小于0,原因后面分析)还是链表。
- 如果是链表,则遍历链表看看是否有哈希地址和键key相同的节点,有的话则根据传入的参数进行覆盖或者不覆盖,没有找到相同的节点的话则将新增的节点插入到链表尾部。如果是红黑树,则将节点插入。到这里结束加锁。
- 最后判断该bucket上的链表长度是否大于链表转红黑树的阈值(8),大于则调用
treeifyBin()
方法将链表转成红黑树,以免链表过长影响效率。 - 调用
addCount()
方法,作用是将ConcurrentHashMap的键值对数量+1,还有另一个作用是检查ConcurrentHashMap是否需要扩容。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允许键值为null,这点与线程安全的Hashtable保持一致,和HashMap不同。
if (key == null || value == null) throw new NullPointerException();
//取键key的hashCode()和HashMap、Hashtable都一样,然后再执行spread()方法计算得到哈希地
//址,这个spread()方法和HashMap的hash()方法一样,都是将hashCode()做无符号右移16位,只不
//过spread()加多了 &0x7fffffff,让结果为正数。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table数组为空或者长度为0(未初始化),则调用initTable()初始化table,初始化函数
//下面介绍。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//调用实现了CAS原子性操作的tabAt方法
//tabAt方法的第一个参数是Node数组的引用,第二个参数在Node数组的下标,实现的是在Nod
//e数组中查找指定下标的Node,如果找到则返回该Node节点(链表头节点),否则返回null,
//这里的i = (n - 1)&hash即是计算待插入的节点在table的下标,即table容量-1的结果和哈
//希地址做与运算,和HashMap的算法一样。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果该下标上并没有节点(即链表为空),则直接调用实现了CAS原子性操作的
//casTable()方法,
//casTable()方法的第一个参数是Node数组的引用,第二个参数是待操作的下标,第三
//个参数是期望值,第四个参数是待操作的Node节点,实现的是将Node数组下标为参数二
//的节点替换成参数四的节点,如果期望值和实际值不符返回false,否则参数四的节点成
//功替换上去,返回ture,即插入成功。注意这里:如果插入成功了则跳出for循环,插入
//失败的话(其他线程抢先插入了),那么会执行到下面的代码。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果该下标上的节点的哈希地址为-1(即链表的头节点为ForwardingNode节点),则表示
//table需要扩容,值得注意的是ConcurrentHashMap初始化和扩容不是用同一个方法,而
//HashMap和Hashtable都是用同一个方法,当前线程会去协助扩容,扩容过程后面介绍。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//如果该下标上的节点既不是空也不是需要扩容,则表示这个链表可以插入值,将进入到链表
//中,将新节点插入或者覆盖旧值。
else {
V oldVal = null;
//通过关键字synchroized对该下标上的节点加锁(相当于锁住锁住
//该下标上的链表),其他下标上的节点并没有加锁,所以其他线程
//可以安全的获得其他下标上的链表进行操作,也正是因为这个所
//以提高了ConcurrentHashMap的效率,提高了并发度。
synchronized (f) {
if (tabAt(tab, i) == f) {
//如果该下标上的节点的哈希地址大于等于0,则表示这是
//个链表。
if (fh >= 0) {
binCount = 1;
//遍历链表。
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果哈希地址、键key相同 或者 键key不为空
//且键key相同,则表示存在键key和待插入的键
//key相同,则执行更新值value的操作。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果找到了链表的最后一个节点都没有找到相
//同键Key的,则是插入操作,将插入的键值新建
//个节点并且添加到链表尾部,这个和HashMap一
//样都是插入到尾部。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果该下标上的节点的哈希地址小于0 且为树节点
//则将带插入键值新增到红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//如果插入的结果不为null,则表示为替换
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash,
key,value)) != null){
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//判断链表的长度是否大于等于链表的阈值(8),大于则将链表转成
//红黑树,提高效率。这点和HashMap一样。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
get方法
- 调用
spread()
方法计算key的hashCode()获得哈希地址。 - 计算出键key所在的下标,算法是(n - 1) & h,如果table不为空,且下标上的bucket不为空,则到bucket中查找。
- 如果bucket的头节点的哈希地址小于0,则代表这个bucket存储的是红黑树,否则是链表。
- 到红黑树或者链表中查找,找到则返回该键key的值,找不到则返回null。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//运用键key的hashCode()计算出哈希地址
int h = spread(key.hashCode());
//如果table不为空 且 table长度大于0 且 计算出的下标上bucket不为空,
//则代表这个bucket存在,进入到bucket中查找,
//其中(n - 1) & h为计算出键key相对应的数组下标的算法。
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果哈希地址、键key相同则表示查找到,返回value,这里查找到的是头节点。
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果bucket头节点的哈希地址小于0,则代表bucket为红黑树,在红黑树中查找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//如果bucket头节点的哈希地址不小于0,则代表bucket为链表,遍历链表,在链表中查找。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
remove方法
- 调用
spread()
方法计算出键key的哈希地址。 - 计算出键key所在的数组下标,如果table为空或者bucket为空,则返回
null
。 - 判断当前table是否正在扩容,如果在扩容则调用helpTransfer方法协助扩容。
- 如果table和bucket都不为空,table也不处于在扩容状态,则锁住当前bucket,对bucket进行操作。
- 根据bucket的头结点判断bucket是链表还是红黑树。
- 在链表或者红黑树中移除哈希地址、键key相同的节点。
- 调用
addCount
方法,将当前table存储的键值对数量-1。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85public V remove(Object key) {
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
//计算需要移除的键key的哈希地址。
int hash = spread(key.hashCode());
//遍历table。
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//table为空,或者键key所在的bucket为空,则跳出循环返回。
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
//如果当前table正在扩容,则调用helpTransfer方法,去协助扩容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
//将键key所在的bucket加锁。
synchronized (f) {
if (tabAt(tab, i) == f) {
//bucket头节点的哈希地址大于等于0,为链表。
if (fh >= 0) {
validated = true;
//遍历链表。
for (Node<K,V> e = f, pred = null;;) {
K ek;
//找到哈希地址、键key相同的节点,进行移除。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
//如果bucket的头节点小于0,即为红黑树。
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
//找到节点,并且移除。
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
//调用addCount方法,将当前ConcurrentHashMap存储的键值对数量-1。
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
initTable初始化方法
table
的初始化主要由initTable()方法实现的,initTable()方法初始化一个合适大小的数组,然后设置sizeCtl。 我们知道ConcurrentHashMap
是线程安全的,即支持多线程的,那么一开始很多个线程同时执行put()
方法,而table
又没初始化,那么就会很多个线程会去执行initTable()方法尝试初始化table,而put
方法和initTable
方法都是没有加锁的(synchronize),那SUN的大师们是怎么保证线程安全的呢? 通过源码可以看得出,table的初始化只能由一个线程完成,但是每个线程都可以争抢去初始化table。
- 判断table是否为
null
,即需不需要首次初始化,如果某个线程进到这个方法后,其他线程已经将table初始化好了,那么该线程结束该方法返回。 - 如果table为
null
,进入到while循环,如果sizeCtl
小于0(其他线程正在对table初始化),那么该线程调用Thread.yield()
挂起该线程,让出CPU时间,该线程也从运行态转成就绪态,等该线程从就绪态转成运行态的时候,别的线程已经table初始化好了,那么该线程结束while循环,结束初始化方法返回。如果从就绪态转成运行态后,table仍然为null
,则继续while循环。 - 如果table为
null
且sizeCtl
不小于0,则调用实现CAS
原子性操作的compareAndSwap()
方法将sizeCtl设置成-1,告诉别的线程我正在初始化table,这样别的线程无法对table进行初始化。如果设置成功,则再次判断table是否为空,不为空则初始化table,容量大小为默认的容量大小(16),或者为sizeCtl。其中sizeCtl的初始化是在构造函数中进行的,sizeCtl = ((传入的容量大小 + 传入的容量大小无符号右移1位 + 1)的结果向上取最近的2幂次方)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//如果table为null或者长度为0, //则一直循环试图初始化table(如果某一时刻别的线程将table初始化好了,那table不为null,该//线程就结束while循环)。
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl小于0,
//即有其他线程正在初始化或者扩容,执行Thread.yield()将当前线程挂起,让出CPU时间,
//该线程从运行态转成就绪态。
//如果该线程从就绪态转成运行态了,此时table可能已被别的线程初始化完成,table不为
//null,该线程结束while循环。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//如果此时sizeCtl不小于0,即没有别的线程在做table初始化和扩容操作,
//那么该线程就会调用Unsafe的CAS操作compareAndSwapInt尝试将sizeCtl的值修改成
//-1(sizeCtl=-1表示table正在初始化,别的线程如果也进入了initTable方法则会执行
//Thread.yield()将它的线程挂起 让出CPU时间),
//如果compareAndSwapInt将sizeCtl=-1设置成功 则进入if里面,否则继续while循环。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次确认当前table为null即还未初始化,这个判断不能少。
if ((tab = table) == null || tab.length == 0) {
//如果sc(sizeCtl)大于0,则n=sc,否则n=默认的容量大
小16,
//这里的sc=sizeCtl=0,即如果在构造函数没有指定容量
大小,
//否则使用了有参数的构造函数,sc=sizeCtl=指定的容量大小。
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//创建指定容量的Node数组(table)。
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算阈值,n - (n >>> 2) = 0.75n当ConcurrentHashMap储存的键值对数量
//大于这个阈值,就会发生扩容。
//这里的0.75相当于HashMap的默认负载因子,可以发现HashMap、Hashtable如果
//使用传入了负载因子的构造函数初始化的话,那么每次扩容,新阈值都是=新容
//量 * 负载因子,而ConcurrentHashMap不管使用的哪一种构造函数初始化,
//新阈值都是=新容量 * 0.75。
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
transfer扩容方法
transfer()
方法为ConcurrentHashMap
扩容操作的核心方法。由于ConcurrentHashMap
支持多线程扩容,而且也没有进行加锁,所以实现会变得有点儿复杂。整个扩容操作分为两步:
- 构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的
- 将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168//协助扩容方法
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//如果当前table不为null 且 f为ForwardingNode节点 且 //新的table即nextTable存在的情况下才能协助扩容,该方法的作用是让线程参与扩容的复制。
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//更新sizeCtl的值,+1,代表新增一个线程参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
//扩容的方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//根据服务器CPU数量来决定每个线程负责的bucket数量,避免因为扩容的线程过多反而影响性能。
//如果CPU数量为1,则stride=1,否则将需要迁移的bucket数量(table大小)除以CPU数量,平分给
//各个线程,但是如果每个线程负责的bucket数量小于限制的最小是(16)的话,则强制给每个线程
//分配16个bucket数。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果nextTable还未初始化,则初始化nextTable,这个初始化和iniTable初始化一样,只能由
//一个线程完成。
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//分配任务和控制当前线程的任务进度,这部分是transfer()的核心逻辑,描述了如何与其他线
//程协同工作。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//迁移过程(对当前指向的bucket),这部分的逻辑与HashMap类似,拿旧数组的容量当做一
//个掩码,然后与节点的hash进行与操作,可以得出该节点的新增有效位,如果新增有效位为
//0就放入一个链表A,如果为1就放入另一个链表B,链表A在新数组中的位置不变(跟在旧数
//组的索引一致),链表B在新数组中的位置为原索引加上旧数组容量。
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
addCount、sumCount方法
addCount()
做的工作是更新table的size,也就是table存储的键值对数量,在使用put()
和remove()
方法的时候都会在执行成功之后调用addCount()
来更新table的size。对于ConcurrentHashMap
来说,它到底有储存有多少个键值对,谁也不知道,因为他是支持并发的,储存的数量无时无刻都在变化着,所以说ConcurrentHashMap
也只是统计一个大概的值,为了统计出这个值也是大费周章才统计出来的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//如果计算盒子不是空,或者修改baseCount的值+x失败,则放弃对baseCount的修改。
//这里的大概意思就是首先尝试直接修改baseCount,达到计数的目的,如果修改baseCount失败(
//多个线程同时修改,则失败)
//则使用CounterCell数组来达到计数的目的。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//如果计数盒子是空的 或者随机取余一个数组为空 或者修改这个槽位的变量失败,
//即表示出现了并发,则执行fullAddCount()方法进行死循环插入,同时返回,
//否则代表修改这个槽位的变量成功了,继续往下执行,不进入if。
//每个线程都会通过ThreadLocalRandom.getProbe() & m寻址找到属于它的CounterCell,
//然后进行计数。ThreadLocalRandom是一个线程私有的伪随机数生成器,
//每个线程的probe都是不同的。CounterCell数组的大小永远是一个2的n次方,初始容量
//为2,每次扩容的新容量都是之前容量乘以二,处于性能考虑,它的最大容量上限是机器
//的CPU数量,所以说CounterCell数组的碰撞冲突是很严重的。
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//并发过大,使用CAS修改CounterCell失败时候执行fullAddCount,
fullAddCount(x, uncontended);
return;
}
//如果上面对盒子的赋值成功,且check<=1,则直接返回,否则调用sumConut()方法计算
if (check <= 1)
return;
s = sumCount();
}
//如果check>=0,则检查是否需要扩容。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
size、mappingCount方法
size
和mappingCount
方法都是用来统计table的size的,这两者不同的地方在size
返回的是一个int
类型,即可以表示size的范围是[-2^31,2^31-1],超过这个范围就返回int能表示的最大值,mappingCount
返回的是一个long
类型,即可以表示size的范围是[-2^63,2^63-1]。
这两个方法都是调用的sumCount()方法实现统计。1
2
3
4
5
6
7
8
9
10
11public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
HashMap、Hashtable、ConcurrentHashMap三者对比
\ | HashMap | Hashtable | ConcurrentHashMap |
---|---|---|---|
是否线程安全 | 否 | 是 | 是 |
线程安全采用的方式 | 采用synchronized 类锁,效率低 |
采用CAS + synchronized ,锁住的只有当前操作的bucket,不影响其他线程对其他bucket的操作,效率高 |
|
数据结构 | 数组+链表+红黑树(链表长度超过8则转红黑树) | 数组+链表 | 数组+链表+红黑树(链表长度超过8则转红黑树) |
是否允许null 键值 |
是 | 否 | 否 |
哈希地址算法 | (key的hashCode)^(key的hashCode无符号右移16位) | key的hashCode | ( (key的hashCode)^(key的hashCode无符号右移16位) )&0x7fffffff |
定位算法 | 哈希地址&(容量大小-1) | (哈希地址&0x7fffffff)%容量大小 | 哈希地址&(容量大小-1) |
扩容算法 | 当键值对数量大于阈值,则容量扩容到原来的2倍 | 当键值对数量大于等于阈值,则容量扩容到原来的2倍+1 | 当键值对数量大于等于sizeCtl,单线程创建新哈希表,多线程复制bucket到新哈希表,容量扩容到原来的2倍 |
链表插入 | 将新节点插入到链表尾部 | 将新节点插入到链表头部 | 将新节点插入到链表尾部 |
继承的类 | 继承abstractMap 抽象类 |
继承Dictionary 抽象类 |
继承abstractMap 抽象类 |
实现的接口 | 实现Map 接口 |
实现Map 接口 |
实现ConcurrentMap 接口 |
默认容量大小 | 16 | 11 | 16 |
默认负载因子 | 0.75 | 0.75 | 0.75 |
统计size方式 | 直接返回成员变量size |
直接返回成员变量count |
遍历CounterCell 数组的值进行累加,最后加上baseCount 的值即为size |
参考
【死磕Java并发】—–J.U.C之Java并发容器:ConcurrentHashMap
Map 大家族的那点事儿 ( 7 ) :ConcurrentHashMap
Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
Java 8 ConcurrentHashMap源码分析