目录
源码解析
- JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
- JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
是否允许空, 是否允许重复数据, 是否有序, 是否线程安全
关 注 点 | 结论 |
---|---|
是否允许空 | put时key, value不允许为空 |
是否允许重复数据 | 重复时会更新数据 |
是否有序 | 无序 |
是否线程安全 | 线程安全 |
value的Comparator 排序比较
public static void main(String[] args) {
Map<String, String> map = new ConcurrentHashMap<>();
map.put("a", "dd");
map.put("c", "bb");
map.put("d", "aa");
map.put("b", "cc");
List<Map.Entry<String, String>> list = new ArrayList<>(map.entrySet());
Collections.sort(list, (Map.Entry<String, String> o1,
Map.Entry<String, String> o2) -> o1.getValue().compareTo(o2.getValue()));
list.stream().forEach((mapping) -> System.out.println(mapping.getKey() + ":" + mapping.getValue()));
}
初始化
- 调用默认构造函数时创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射。
- 调用指定大小的构造方法会调用tableSizeFor指定大小Node数组大小,最后会是大于c的最小的2的幂。默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射。
// 要求获得的是大于c的最小的2的幂 也就是获取某个位为1并且左边右边都是1
private static final int tableSizeFor(int c) {
// 那11举例子
int n = c - 1;
// 1010 |= 0101 此时为1111已经符合要求了
// 1111 |= 0011 还是1111
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// 此时n值为左边都是0 右边都是1 +1之后就变成 10000...多个0就是2的n次幂
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
- 契机,第一次put的时候会进行初始化
/**
* 初始化数组table,sizeCtl可以在初始化数组时指定
* 如果volatile sizeCtl小于0,说明别的数组正在进行初始化,则让出执行权
* 如果sizeCtl大于0的话,则初始化一个大小为sizeCtl的数组
* 否则的话初始化一个默认大小(16)的数组
* 然后设置sizeCtl的值为数组长度的3/4
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// sizeCtl小于0,则进行线程让步等待
if ((sc = sizeCtl) < 0)
Thread.yield();
// cas比较sizeCtl的值与sc是否相等,相等则用-1替换
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 双重检测机制,类似单例模式的写法
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; /
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//初始化后,sizeCtl长度为数组长度的3/4
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
get
- get操作全程不需要加锁是因为Node的成员val是用volatile修饰的和数组用volatile修饰没有关系。数组用volatile修饰(保证指向的地址改变了,其他线程能即使获取到)主要是保证在数组扩容的时候保证可见性。
/*
* 相比put方法,get就很单纯了,支持并发操作,
* 当key为null的时候回抛出NullPointerException的异常
* get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置
* 然后遍历该位置的所有节点
* 如果不存在的话返回null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算key的hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空
if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等
if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等
// 返回值
return e.val;
}
else if (eh < 0) // 结点hash值小于0, 可能在扩容
// 在桶(链表/红黑树)中查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 对于结点hash值大于0的情况
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
put
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 键或值为空,抛出异常
if (key == null || value == null) throw new NullPointerException();
// 键的hash值经过计算获得hash值
int hash = spread(key.hashCode());
// 用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
int binCount = 0;
// 无限循环, 如果多个线程走初始化流程,一个线程初始化完之后,会走第二个for循环。初始化线程初始化完后走第二个for循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 表为空或者表的长度为0
if (tab == null || (n = tab.length) == 0)
// 初始化表
tab = initTable();
// 表不为空并且表的长度大于0,并且该桶为空 (n - 1) & hash是size要为2 n次方原因,tabAt是获取volatile的table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 比较并且交换值CAS,如tab的第i项为空则用新生成的node替换
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// 该结点的hash值为MOVED
else if ((fh = f.hash) == MOVED)
// 帮助结点的扩容(在扩容的过程中)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 加锁同步
synchronized (f) {
// 找到table表下标为i的节点
if (tabAt(tab, i) == f) {
// 该table表中该结点的hash值大于0
if (fh >= 0) {
// binCount赋值为1
binCount = 1;
// 无限循环
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 结点的hash值相等并且key也相等
// 保存该结点的val值
oldVal = e.val;
if (!onlyIfAbsent) // 进行判断
// 将指定的value保存至结点,即进行了结点值的更新
e.val = value;
break;
}
// 保存当前结点
Node<K,V> pred = e;
if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点
// 新生一个结点并且赋值给next域
pred.next = new Node<K,V>(hash, key,
value, null);
// 退出循环
break;
}
}
}
else if (f instanceof TreeBin) { // 结点为红黑树结点类型
Node<K,V> p;
// binCount赋值为2
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) { // 将hash、key、value放入红黑树
// 保存结点的val
oldVal = p.val;
if (!onlyIfAbsent) // 判断
// 赋值结点value值
p.val = value;
}
}
}
}
if (binCount != 0) { // binCount不为0
if (binCount >= TREEIFY_THRESHOLD) // 如果binCount大于等于转化为红黑树的阈值
// 进行转化
treeifyBin(tab, i);
if (oldVal != null) // 旧值不为空
// 返回旧值
return oldVal;
break;
}
}
}
// 增加binCount的数量
addCount(1L, binCount);
return null;
}
- 总结步骤
- 判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤②
- 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③
- 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤4. 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤
- 对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤⑥
- 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。
put时helperTransfer
- 此函数用于在扩容时将table表中的结点转移到nextTable中。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// table表不为空并且结点类型是ForwardingNode类型,并且结点的nextTable不为空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 条件判断
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0) //
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 将table的结点转移到nextTab中
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
扩容
- 扩容涉及的点比较多,所以这里分多个步骤说明扩容的原理
扩容时相关参数
nextTable: 扩容期间,将table数组中的元素 迁移到 nextTable, 每次扩容2倍
-
sizeCtl含义
transferIndex: 扩容索引,表示已经分配给扩容线程的table数组索引位置。主要用来协调多个线程,并发安全地获取迁移任务 。transferIndex是通过cas设置的
//cas无锁算法设置 transferIndex = transferIndex - stride
if (U.compareAndSwapInt(this, TRANSFERINDEX,
nextIndex,nextBound = (nextIndex > stride
? nextIndex - stride : 0))) {
}
- ForwardingNode节点: 标记作用,表示其他线程正在扩容,并且此节点已经扩容完毕。关联了nextTable,扩容期间可以通过find方法,访问已经迁移到了nextTable中的数据。
扩容时机
- 当前容量超过阈值0.75n(n 为 length)
- 当链表中元素个数超过默认设定(8个),当数组的大小还未超过64的时候,此时进行数组的扩容,如果超过则将链表转化成红黑树
- 当发现其他线程扩容时,帮其扩容
扩容原理分析(以长度为32分析)
-
线程执行put操作,发现容量已经达到扩容阈值,需要进行扩容操作,此时transferindex=tab.length=32
-
扩容线程A 以cas的方式修改transferindex=32-16=16 ,然后按照降序迁移table[31]--table[16]这个区间的hash桶
迁移hash桶时,会将桶内的链表或者红黑树,按照一定算法,拆分成2份,将其插入nextTable[i]和nextTable[i+n](n是table数组的长度)。 迁移完毕的hash桶,会被设置成ForwardingNode节点,以此告知访问此桶的其他线程,此节点已经迁移完毕
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
...//省略无关代码 同步方法迁移
synchronized (f) {
//将node链表,分成2个新的node链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将新node链表赋给nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
}
...//省略无关代码
}
此时线程2访问到了ForwardingNode节点,如果线程2执行的put或remove等写操作,那么就会先帮其扩容。如果线程2执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素。
-
线程2加入扩容,线程2会自旋等待线程1处理完毕之前的桶,负责15-0的桶迁移
如果准备加入扩容的线程,发现以下情况,放弃扩容,直接返回。
- 发现transferIndex=0,即所有node均已分配
- 发现扩容线程已经达到最大扩容线程数
tranfer源码解析
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//计算需要迁移多少个hash桶(MIN_TRANSFER_STRIDE该值作为下限,以避免扩容线程过多)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//扩容一倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//该标识用于控制是否继续处理下一个桶,为 true 则表示已经处理完当前桶,可以继续迁移下一个桶的数据
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//1 逆序迁移已经获取到的hash桶集合,如果迁移完毕,则更新transferIndex,获取下一批待迁移的hash桶
//2 如果transferIndex=0,表示所以hash桶均被分配,将i置为-1,准备退出transfer方法
//3 如果是helperTransfer,则自旋等待上一个线程处理完毕
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//更新待迁移的hash桶索引
while (advance) {
int nextIndex, nextBound;
//更新迁移索引i。
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//transferIndex<=0表示已经没有需要迁移的hash桶,将i置为-1,线程准备退出
i = -1;
advance = false;
}
//当迁移完bound这个桶后,尝试更新transferIndex,,获取下一批待迁移的hash桶
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//退出transfer
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//最后一个迁移的线程,recheck后,做收尾工作,然后退出
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
/**
第一个扩容的线程,执行transfer方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
后续帮其扩容的线程,执行transfer方法之前,会设置 sizeCtl = sizeCtl+1
每一个退出transfer的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1
那么最后一个线程退出时:
必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
*/
//不相等,说明不到最后一个线程,直接退出transfer方法
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
//最后退出的线程要重新check下是否全部迁移完毕
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//迁移node节点
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//链表迁移
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//将node链表,分成2个新的node链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将新node链表赋给nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//红黑树迁移
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
- 多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量,协调多个线程对table数组中的node进行迁移。
获取size
- 在 JDK1.7 中,第一种方案他会使用不加锁的模式去尝试多次计算 ConcurrentHashMap 的 size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的。 第二种方案是如果第一种方案不符合,他就会给每个 Segment 加上锁,然后计算 ConcurrentHashMap 的 size 返回。
- jdk 1.8中size, mappingCount都是计算size,但是mappingCount返回long计算的限制更小一些,不怕超过int的长度, 但是核心都是sumCount函数的调用
- 看下sumCount代码
/**
* 代码比较简单,就是在baseCount的基础上,循环counterCells累加
* 在调用addCount时,如果 counterCells == null, 则对 baseCount 做 CAS 自增操作
* 如果并发导致 baseCount CAS 失败了使用 counterCells
* 如果counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功
**/
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}