作者: 一字马胡
转载标志 【2017-11-03】
更新日志
日期 | 更新内容 | 备注 |
---|---|---|
2017-11-03 | 添加转载标志 | 持续更新 |
导入
ConcurrentHashMap是HashMap的线程安全版本的实现版本,关于HashMap的分析总结,可以参考文章Java 8 HashMap源码分析。本文将基于java 8中的Java 8 实现来分析ConcurrentHashMap,与之前版本的ConcurrentHashMap实现来看,java 8中做了较大调整,本文仅分析java 8的实现,java 8 之前的实现暂不做分析。为了更好的导入本文,首先展示一下ConcurrentHashMap的结构,请看下面的图片:
和HashMap类似,ConcurrentHashMap使用了一个table来存储Node,ConcurrentHashMap同样使用记录的key的hashCode来寻找记录的存储index,而处理哈希冲突的方式与HashMap也是类似的,冲突的记录将被存储在同一个位置上,形成一条链表,当链表的长度大于8的时候会将链表转化为一棵红黑树,从而将查找的复杂度从O(N)降到了O(lgN)。下文中将详细分析ConcurrentHashMap的实现,以及ConcurrentHashMap是如何保证在并发环境下的线程安全的。
ConcurrentHashMap详解
哈希桶Table初始化
首先来看一下table是如何初始化的,初始化table的工作将发生在进行put操作时,如果发现table还没有被初始化,那么就会调用方法initTable来进行table的初始化,下面展示了初始化table的具体流程代码:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
这里面有一个比较关键的地方,就是sizeCtl这个变量,下面是它的定义:
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
sizeCtl是一个用于同步多个线程的共享变量,如果它的当前值为负数,则说明table正在被某个线程初始化或者扩容,所以,如果某个线程想要初始化table或者对table扩容,需要去竞争sizeCtl这个共享变量,获得变量的线程才有许可去进行接下来的操作,没能获得的线程将会一直自旋来尝试获得这个共享变量,所以获得sizeCtl这个变量的线程在完成工作之后需要设置回来,使得其他的线程可以走出自旋进行接下来的操作。而在initTable方法中我们可以看到,当线程发现sizeCtl小于0的时候,他就会让出CPU时间,而稍后再进行尝试,当发现sizeCtl不再小于0的时候,就会通过调用方法compareAndSwapInt来讲sizeCtl共享变量变为-1,以告诉其他试图获得sizeCtl变量的线程,目前正在由本线程在享用该变量,在我完成我的任务之前你可以先休息一会,等会再来试试吧,我完成工作之后会释放掉的。而其他的线程在发现sizeCtl小于0的时候就会理解这种交流,他们会让出cpu时间,等待下次调度再来尝试获取sizeCtl来进行自己的工作。在完成初始化table的任务之后,线程需要将sizeCtl设置成可以使得其他线程获得变量的状态,这其中还有一个地方需要注意,就是在某个线程通过U.compareAndSwapInt方法设置了sizeCtl之前和之后进行了两次check,来检测table是否被初始化过了,这种检测是必须的,因为在并发环境下,可能前一个线程正在初始化table但是还没有成功初始化,也就是table依然还为null,而有一个线程发现table为null他就会进行竞争sizeCtl以进行table初始化,但是当前线程在完成初始化之后,那个试图初始化table的线程获得了sizeCtl,但是此时table已经被初始化了,所以,如果没有再次判断的话,可能会将之后进行put操作的线程的更新覆盖掉,这是极为不安全的行为。
ConcurrentHashMap查询记录方法详解
上面分析了ConcurrentHashMap的table初始化,现在来看一下ConcurrentHashMap的查询操作的实现细节,在ConcurrentHashMap中查询一条记录首先需要知道这条记录存储的table的位置(可以成为卡槽,每个卡槽中都会有一个链表或者一棵红黑树),该位置上可能为null,如果为null,说明想要查询的记录还不存在于ConcurrentHashMap中,否则,就在该位置上的链表或者红黑树中查找记录,下面来详细分析一下ConcurrentHashMap的get方法的实现细节:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
首先,计算出记录的key的hashCode,然后通过使用(hashCode & (length - 1))的计算方法来获得该记录在table中的index,然后判断该位置上是否为null,如果为null,则返回null,否则,如果该位置上的第一个元素(链表头节点或者红黑树的根节点)与我们先要查找的记录匹配,则直接返回这个节点的值,否则,如果该节点的hashCode小于0,则说明该位置上是一颗红黑树,至于为什么hashCode值小于0就代表是一颗红黑树而不是链表了,这就要看下面的代码了:
static final int TREEBIN = -2; // hash for roots of trees
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
......
}
而TREEBIN的值为-2,也就是小于0成立,根据他的说明,TREEBIN想要代表的是一颗红黑树的根节点,所以在判断到table的某个位置上的第一个节点的hashCode值小于0的时候,就可以判断为该位置上是一棵红黑树,继续回到get方法,如果是红黑树,则通过调用Node的find方法来查找到节点,而这个Node的find方法在子类中被重写,所以会直接调用子类的find方法来进行查找。还有一种情况是table的index位置上为一条链表,那么就通过链表的查找方法来进行记录查找。最后需要注意的是,ConcurrentHashMap是一种线程安全的HashMap,但是我们没有发现在get方法的过程中使用任何与锁等效的组件来做线程同步,为什么呢?对于读来说,允许多个线程一起读是很正常的,而且在Node的实现上,ConcurrentHashMap做了一些手脚:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
....
}
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
我们发现table数组是被volatile关键字修饰的,这就代表我们不需要担心table数组的线程可见性问题,也就没有必要再加锁来实现并发了。
ConcurrentHashMap插入记录方法详解
上文中分析了ConcurrentHashMap的查询方法,下面来分析一下ConcurrentHashMap的插入操作时如何完成的。需要注意的一点是,在进行put操作的时候,我们可能会发现table数组还没有初始化的情况,或者发现table中容纳的记录数量超过了阈值的情况,前者我们需要进行table的初始化,而后者需要我们对table进行扩容操作。初始化table的过程我们在上文中已经进行了分析,下面只分析table的扩容操作。首先来考虑put一个记录需要的过程,第一,我们需要计算这个记录的key的hashCode,并且根据hashCode来计算它在table数组中应该存储的index,然后将他存放到对应位置里面的链表或者红黑树中去,并且在某些情况下要进行链表转换红黑树的操作,以及table扩容操作等。还有一件重要的事情就是变更table的size,这一点在后文中还要专门分析到。下面首先展示了put操作涉及的流程:
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
首先,计算记录的key的hashCode,然后计算table的index位置,然后获取该index的值,如果该位置还为null,说明该位置上还没有记录,则通过调用casTabAt方法来讲该新的记录插入到table的index位置上去,否则,通过synchronized关键字对table的index位置加锁,需要注意的是,当前线程只会锁住table的index位置,其他位置上没有锁住,所以此时其他线程可以安全的获得其他的table位置来进行操作。这也就提高了ConcurrentHashMap的并发度。然后判断table的index位置上的第一个节点的hashCode值,这个节点要么是链表的头节点,要么是红黑树的根节点,如果hashCode值小于0,那么就是一颗红黑树,至于为什么是这样,上文中已经提到,如果不小于0,那么就还是一条链表,如果是一条链表,那么就寻找是否已经有记录的key和当前想要插入的记录是一致的,如果一致,那么这次put的效果就是replace,否则,将该记录添加到链表中去。如果是一颗红黑树,那么就通过调用putTreeVal方法来进行插入操作。在插入操作完成之后,需要判断本次操作是否是更新操作,如果是更新操作,则不会造成size的变化,否则,如果本次put操作时一次添加操作,那么就需要进行更新size的操作,而size的更新涉及到并发环境,所以较为复杂,并且table的扩容操作也会在更新size的时候发生,如果在更新size之后发现table中的记录数量达到了阈值,就需要进行扩容操作,这也是较为复杂的一步。还有一点需要说明的是,ConcurrentHashMap和HashMap的区别还有一点,就是HashMap允许一个key和value为null,而ConcurrentHashMap则不允许key和value为null,如果发现key或者value为null,则会抛出NPE,这一点需要特别注意,而这也说明,在ConcurrentHashMap中可以通过使用get操作来测试是否具有某个记录,因为只要get方法返回null,就说明table中必然不存在一个记录和当前查询的匹配,而在HashMap中,get操作返回null有可能是我们查询的记录的value就是null,所以不能使用get方法来测试某个记录是否存在于table中。
ConcurrentHashMap记录数量更新
上面分析put操作的时候提到,在完成一次put操作之后,需要更新table中的记录数量,并且在更新之后如果发现超出了阈值,那么就需要进行table扩容操作,下面来具体分析一下这一过程的前后文。更新记录数量的操作通过调用方法addCount来完成,下面是该方法的细节:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
ConcurrentHashMap维护baseCount来表示当前的记录数量,这在后面获取记录数量的size方法中会用到,而在put操作和remove操作的时候回通过调用方法addCount来更新它。如果CounterCell数组为空,则通过调用方法fullAddCount来初始化数组counterCells,因为本部分内容过于复杂,目前不适合分析,点到为止。在更新table中记录数量的时候,还要考虑一种情况,记录的数量达到了阈值,那么就需要进行扩容操作,这部分的代码也过于复杂,并且ConcurrentHashMap的扩容操作的条件貌似和HashMap是不一样的,它的说法是“如果table过小,并且没有被扩容,那么就需要进行扩容”,扩容需要使用transfer方法来将久的记录迁移到新的table中去。目前,我们需要了解的是,ConcurrentHashMap会在我们进行更新table的记录数量的时候可能进行扩容操作,而前提是“table过小,并且还没有被扩容”,这部分的代码将在未来某个适宜的时刻在进行分析总结。
ConcurrentHashMap移除记录操作
现在来分析一下ConcurrentHashMap是如何进行记录的移除操作的。下面首先展示了remove方法的调用代码:
public V remove(Object key) {
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
删除操作属于写类型的操作,所以在进行删除的时候需要对table中的index位置加锁,ConcurrentHashMap使用synchronized关键字将table中的index位置锁住,然后进行删除,remove方法调用了replaceNode方法来进行实际的操作,而删除操作的步骤首先依然是计算记录的hashCode,然后根据hashCode来计算table中的index值,然后根据table中的index位置上是一条链表还是一棵红黑树来使用不同的方法来删除这个记录,删除记录的操作需要进行记录数量的更新(调用addCount方法进行)。
ConcurrentHashMap的size方法详解
最后,来分析一下ConcurrentHashMap是怎么获得table中的记录数量的,ConcurrentHashMap通过size方法来获得记录数量,下面展示了size方法的细节:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
ConcurrentHashMap的记录数量需要结合baseCount和counterCells数组来得到,通过累计两者的数量即可获得当前ConcurrentHashMap中的记录总量。
java7中的ConcurrentHashMap实现
上文中分析了java8中的ConcurrentHashMap实现,现在来分析一下java7中的实现。java7中的实现在底层使用了数组+链表的方法,下面展示了java7中ConcurrentHashMap的数据结构:
整个结构是一个segment数组,segment数组的大小决定了ConcurrentHashMap的并发度,默认是16,为什么这么说呢?是因为在java7的ConcurrentHashMap实现上,使用了所谓分段锁的方法,而所谓分段锁就是将记录分段存储,不同段的访问互相不影响,某个线程想要访问某一个段的时候就需要对该段上锁,而其他线程不能访问在有其他线程加锁的段,这和对整体加锁的方法相比是一种伟大的进步,对于java7中的具体操作分析就不在此进行了,如有需要,就在其他的文章中再进行分析总结。
本文根据jdk 8 中的源码来分析了ConcurrentHashMap的实现细节,但是因为ConcurrentHashMap的实现过于复杂,本文仅分析了非常表面的内容,分析ConcurrentHashMap的源码的意义在于可以明白为什么ConcurrentHashMap是线程安全的,以及为实现线程安全,ConcurrentHashMap相比于HashMap增加了哪些内容,本文通过分析ConcurrentHashMap的实现,比较HashMap的实现,可以明显的发现它们的实现复杂度上是不在一个等级上的,HashMap更多的是数据结构上的玩弄,而ConcurrentHashMap则更多的需要考虑如何高效的实现并发环境下的线程安全,简单来说,ConcurrentHashMap不仅实现了HashMap支持的所有功能,并且保持了和HashMap一样的高效的前提下,还实现了线程安全,分析源码可以发现ConcurrentHashMap是基于CAS来实现线程安全的,CAS是一种轻量级的锁,它不会阻塞线程,而是会等待直到获得变量,然后进行业务操作,这和锁需要阻塞线程来实现线程安全来比较,是一种很大的改良,因为基于锁的同步控制需要让线程获得锁,而获得锁之前是要阻塞等待的,它需要另外的线程来唤醒它让他再次去竞争锁,关于这部分的内容可以参考文章Java同步框架AbstractQueuedSynchronizer,AQS是java中实现锁的底层支持,也是为程序员实现线程同步的基础框架,基于AQS实现自己的线程同步器十分简便。介于ConcurrentHashMap的复杂性,本文就当做是对ConcurrentHashMap分析的开篇吧,未来会不定时对ConcurrentHashMap的分析总结进行补充。