概述
ConcurrentHashMap,通过这个名字,可以知道Concurrent是并发的,HashMap是我们常用的一种用来存放键值对的数据结构,所以ConcurrentHashMap就是一种用来解决高并发的HashMap。这个是JDK在1.5之后提供的一种数据结构。我们知道Java中其实已经有了HashTable这个线程安全的Map,但是他是通过对整个table使用synchronized来保证线程安全的,当有多个线程对HashTable进行读写的时候,每次只能有一个线程会获取到HashTable的对象锁,别的只能处于等待状态,所以在高并发的情况下,它的效率是比较低的。
当然JDK还提供了另外一种方式,那就是Collections.synchronizedMap(hashMap),这个底层也是通过synchronized对要高并发的数据结构来实现线程安全的。
通过上面的分析可以知道,ConcurrentHashMap为了解决synchronized在高并发时的低效问题,ConcurrentHashMap之所以比HashTable高效,是因为他并不是对整个HashMap上锁,而且在JDK1.8跟之前的JDK1.6,1.7的实现都不一样,下面进行对比一下
- JDK 1.6:ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
- JDK 1.8:放弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap。
继承关系
通过继承关系,可以发现,ConcurrentHashMap并没有继承HashMap,而是单独实现了ConcurrentMap这个接口,其余的跟HashMap是一样的
成员变量
//table的最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//table的默认容量16
private static final int DEFAULT_CAPACITY = 16;
//默认的并发度为16
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转化为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
//红黑树转链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
//红黑树的最小容量
static final int MIN_TREEIFY_CAPACITY = 64;
//存放Node的数组
transient volatile Node<K,V>[] table;
private transient volatile Node<K,V>[] nextTable;
private transient volatile long baseCount;
//sizeCtl作为table是否在初始化或者在resize的标志,如果为负数说明正在初始化或者扩容,需要table进行CRUD操作的线程只能等待,稍后进行重试,否则就可以直接进行CRUD操作
//正数则表明table没有初始化
private transient volatile int sizeCtl;
//该节点是否已经移动到新数组中去
static final int MOVED = -1;
//是否是TREEBIN节点
static final int TREEBIN = -2;
除此之外,还有几个类需要解释一下
基本原理
TreeNode
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; //根节点
TreeNode<K,V> left;//左节点
TreeNode<K,V> right;//右节点
TreeNode<K,V> prev; //上个节点的指针
boolean red;//默认节点为黑
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
//查找相应的树节点
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
//此处省略一万行代码......
}
跟一般的树节点都差不多,在前面分析二叉树的时候也差不多是这种数据结构,所以还是比较好理解的,注释说Nodes for use in TreeBins,也就是说这个TreeNode是用来服务TreeBin的,那么继续看Treebins
TreeBins
这个通过注释可以知道,是用来包装TreeNode的,然后作为红黑树的树节点
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;//TreeNode的根节点
volatile Thread waiter;//waiter线程
volatile int lockState;//当前节点的锁状态
// values for lockState
static final int WRITER = 1; // 获得写数据的锁状态
static final int WAITER = 2; // 等待写数据的锁状态
static final int READER = 4; // 添加数据时的锁状态
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;//头结点赋值
TreeNode<K,V> r = null;
//递归遍历传过来的Node,从根节点开始
for (TreeNode<K,V> x = b, next; x != null; x = next)
{
next = (TreeNode<K,V>)x.next;//拿到下一个节点
x.left = x.right = null;//将左右节点置空,因为需要重新赋值
//判断r节点是否为空
if (r == null) {
x.parent = null;//根节点置空
x.red = false;//黑子树
r = x;//x赋值为r节点
}
else {
K k = x.key;//拿到key
int h = x.hash;//计算hash值
Class<?> kc = null;//K对应的泛型实体类
//从根节点r开始遍历
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;//将r赋值给根节点
assert checkInvariants(root);
}
}
ForwardingNode
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
ForwardingNode有两个功能,一个是用来指向nextTable节点,便于将原有的table数组复制到新的table中,另外一个是用来占位空节点,告诉其他节点这个节点已经被处理过了,自动遍历后面的节点。
CAS操作
tabAt
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
获得在i索引位置上的Node节点
casTabAt
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
- 1.利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
- 2.在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
setTabAt
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
利用volatile方法设置节点位置的值
核心方法
构造方法
之前分析过HashMap的源码,所以看这些还是比较轻松的,ConcurrentHashMap采用的是懒加载,也就是存入第一对KeyValue的时候才会去初始化table,所以前面四个构造方法只是做了一些参数配置,只有最后一个构造方法,才会去真正的初始化,下面重点看一下最后一个方法
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}
最后调用了putAll方法,这个会在后面的put方法里面再重点讲解。
transfer方法
transfer实际上执行的就是table数组的扩容操作,
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//创建一个容量是原来2倍的table数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//创建一个ForwardingNode,指向新table,并且用于填充已经移动的Node位置
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的节点都已经完成复制工作
//就把nextTable赋值给table 清空临时对象nextTable
nextTable = null;
table = nextTab;
// 扩容至现在容量的0.75倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
//用ForwardingNode来填充Null节点
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//如果当前节点移动过,将次标志设置为true
//这样后续线程访问便可以直接跳过此节点
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//普通Node节点
int runBit = fh & n;//通过runBit将链表打散,便于高效遍历
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
//TreeBin节点,
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
//通过h & n的值将TreeBin的节点进行打散
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
}
}
}
}
}
- 1.根据当前数组长度n,新建一个两倍长度的数组nextTable;
- 2.初始化ForwardingNode节点,其中保存了新数组nextTable的引用,在处理完每个槽位的节点之后当做占位节点,表示该槽位已经处理过了
- 3.通过for自循环处理每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值
- 4.如果槽位中没有节点,则通过CAS插入在第二步中初始化的ForwardingNode节点,用于告诉其它线程该槽位已经处理过了
- 5.如果当前Node已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为MOVED,值为-1,则直接跳过,继续处理下一个Node
- 6.处理不为null的节点,如果是链表结构,先定义两个变量节点ln和hn,通过CAS把ln链表设置到新数组的i位置,hn链表设置到i+n的位置
- 7.如果该槽位是红黑树结构,同样将节点分成两类,通过CAS插入到nextTable中
helpTransfer
如果一个线程在进行扩容的时候中断了,但是此时nextTable已经创建了,那么他就会调用helpTransfer方法,帮助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用上面讲到的transfer方法来进行扩容
put方法
首先调用了put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
紧接着调用了putVal方法,下面来分析下一年putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value都不能为空
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//计算hash值
int binCount = 0;//链表长度
//开启死循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//table为null或者table的长度为0,初始化table
tab = initTable();
//根据hash值计算table的索引
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果索引处为null,则直接在此处进行插入
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
//通过CAS进行插入,插入成功之后跳出循环
break;
}
else if ((fh = f.hash) == MOVED)
//当遇到表ForwardNode时,需要对表进行整合
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//采用同步代码块进行上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
//普通节点
if (fh >= 0) {
binCount = 1;
//遍历所有节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值跟key值都相同,则进行值替换
if (e.hash == hash && ((ek = e.key) == key
||(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null)
{
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
//树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//判断链表长度,如果链表长度大于阈值,进行树节点转换
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//改变table中的元素的个数
return null;
}
下面分解一下步骤:
- 1.计算记录的key的hashCode,然后通过hashcode计算table的index位置
- 2.获取table[index],为null直接采用casTabAt插入一条新的记录
- 3.不为null,则锁住根节点,然后通过节点的hashcode值来区分是链表还是红黑树
- 4.遍历链表或者红黑树,找到相应的节点,进行更新操作
- 5.如果是更新操作,则不需要进行transfer操作,否则判断一下更新之后的size有没有超过阈值,主要是判断链表的节点数跟table的数组长度
如何保证线程安全
当对table中的某个节点进行写操作的时候,如果为null,直接进行插入,注意如果不为null,则用syncronized关键字锁住当前节点,然后进行写操作。为什么为空的时候插入不需要上锁,因为ConcurrentHashMap采用的是CAS的乐观锁机制,如果有线程在进行修改,就会在外面等待,然后过一会儿再过来查询一下,这样可以保证高效并发。
get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());//计算hash值
//根据hash值定位元素
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
//树节点
return (p = e.find(h, key)) != null ? p.val : null;
//通过寻址法进行寻找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 1.通过计算key的hash值进而计算key在table中的index
- 2.拿到table[index],为null,直接返回null
- 3.如果table[index]不为null,通过Node的hash值来判断是链表还是红黑树
- 4.遍历到相应的value之后直接返回,否则返回null
如何保证线程安全
首先当多个线程同时读取的时候,ConcurrentHashMap给每个Node节点加了volatile关键字修饰来保证可见性。
总结
看了很多数据结构,感觉ConcurrentHashMap是最复杂的,里面涉及到了很多的CAS操作,以及硬件层的线程安全操作,分析了好几天,才弄懂,下面总结一下c的JDK1.7跟1.8的区别。
JDK 1.7
JDK1.8
对比分析
- 1.7采用链表,1.8使用红黑树来替换之前的链表,提高了查找的效率。
- 1.7版本锁的粒度是基于Segment的,采用的是分段锁,粒度是每个数组1.8的实现降低锁的粒度到每个Node节点,
- 1.7采用的Segment继承自ReentrantLock,自带锁,1.8的数据结构采用数组+链表+红黑树,使用了大量 的CAS操作,提升了高并发时的性能,丢弃了分段锁的概念。
- 1.7采用的是ReentrantLock进行同步,1.8 采用了synchronized进行同步,其实一开始没理解,因为ReentrantLock比synchronized更加灵活,后来思考了一下觉得是因为锁的粒度进一步降低,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势不大。
参考资料
http://www.importnew.com/22007.html
http://www.jianshu.com/p/f6730d5784ad
http://blog.csdn.net/mine_song/article/details/70140596