本文将深入剖析 Java7 中的 HashMap 和 ConcurrentHashMap 的源码,解析 HashMap 线程不安全的原理以及解决方案,最后以测试用例加以验证。
1 Java7 HashMap
HashMap 的数据结构:
从上图中可以看出,HashMap 底层就是一个数组结构,数组中的每一项又是一个链表。
通过查看 JDK 中的 HashMap 源码,可以看到其构造函数有一行代码:
public HashMap(int initialCapacity, float loadFactor) {
...
table = new Entry[capacity];
...
}
即创建了一个大小为 capacity 的 Entry 数组,而 Entry 的结构如下:
static class Entry<K,V> implements Map.Entry<K,V> {
final K key;
V value;
Entry<K,V> next;
final int hash;
……
}
可以看到,Entry 是一个 static class,其中包含了 key 和 value ,也就是键值对,另外还包含了一个 next 的 Entry 指针。
- capacity:当前数组容量,始终保持 2^n,可以扩容,扩容后数组大小为当前的 2 倍。默认初始容量为 16。
- loadFactor:负载因子,默认为 0.75。
- threshold:扩容的阈值,等于 capacity * loadFactor
1.1 put过程分析
public V put(K key, V value) {
// 当插入第一个元素的时候,需要先初始化数组大小
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
// 如果 key 为 null,则这个 entry 放到 table[0] 中
if (key == null)
return putForNullKey(value);
// key 的 hash 值
int hash = hash(key);
// 找到对应的数组下标
int i = indexFor(hash, table.length);
// 遍历一下对应下标处的链表,看是否有重复的 key 已经存在,
// 如果有,直接覆盖,put 方法返回旧值就结束了
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
// 不存在重复的 key,将此 entry 添加到链表中
addEntry(hash, key, value, i);
return null;
}
这里对一些方法做深入解析。
- 数组初始化
private void inflateTable(int toSize) {
// 保证数组大小一定是 2^n
int capacity = roundUpToPowerOf2(toSize);
// 计算扩容阈值
threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
// 初始化数组
table = new Entry[capacity];
initHashSeedAsNeeded(capacity);
}
- 找到对应的数组下标
static int indexFor(int hash, int length) {
// 作用等价于取模运算,但这种方式效率更高
return hash & (length-1);
}
因为HashMap的底层数组长度总是 2^n,当 length 为 2 的 n 次方时,hash & (length-1) 就相当于对length取模,而且速度比直接取模要快的多。
- 添加节点到链表中
void addEntry(int hash, K key, V value, int bucketIndex) {
// 如果当前 HashMap 大小已经达到了阈值,并且新值要插入的数组位置已经有元素了,那么要扩容
if ((size >= threshold) && (null != table[bucketIndex])) {
// 扩容
resize(2 * table.length);
// 重新计算 hash 值
hash = (null != key) ? hash(key) : 0;
// 计算扩容后的新的下标
bucketIndex = indexFor(hash, table.length);
}
createEntry(hash, key, value, bucketIndex);
}
// 永远都是在链表的表头添加新元素
void createEntry(int hash, K key, V value, int bucketIndex) {
// 获取指定 bucketIndex 索引处的 Entry
Entry<K,V> e = table[bucketIndex];
// 将新创建的 Entry 放入 bucketIndex 索引处,并让新的 Entry 指向原来的 Entry
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}
当系统决定存储 HashMap 中的 key-value 对时,完全没有考虑 Entry 中的 value,仅仅只是根据 key 来计算并决定每个 Entry 的存储位置。我们完全可以把 Map 集合中的 value 当成 key 的附属,当系统决定了 key 的存储位置之后,value 随之保存在那里即可。
- 数组扩容
随着 HashMap 中元素的数量越来越多,发生碰撞的概率将越来越大,所产生的子链长度就会越来越长,这样势必会影响 HashMap 的存取速度。为了保证 HashMap 的效率,系统必须要在某个临界点进行扩容处理,该临界点 threshold。而在 HashMap 数组扩容之后,最消耗性能的点就出现了:原数组中的数据必须重新计算其在新数组中的位置,并放进去,这就是 resize。
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
// 若 oldCapacity 已达到最大值,直接将 threshold 设为 Integer.MAX_VALUE
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return; // 直接返回
}
// 否则,创建一个更大的数组
Entry[] newTable = new Entry[newCapacity];
//将每条Entry重新哈希到新的数组中
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
// 重新设定 threshold
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}
1.2 get过程分析
public V get(Object key) {
// key 为 null 的话,会被放到 table[0],所以只要遍历下 table[0] 处的链表就可以了
if (key == null)
return getForNullKey();
// key 非 null 的情况,详见下文
Entry<K,V> entry = getEntry(key);
return null == entry ? null : entry.getValue();
}
final Entry<K,V> getEntry(Object key) {
// The number of key-value mappings contained in this map.
if (size == 0) {
return null;
}
// 根据该 key 的 hashCode 值计算它的 hash 码
int hash = (key == null) ? 0 : hash(key);
// 确定数组下标,然后从头开始遍历链表,直到找到为止
for (Entry<K,V> e = table[indexFor(hash, table.length)];
e != null;
e = e.next) {
Object k;
//若搜索的key与查找的key相同,则返回相对应的value
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null;
}
2 Java7 ConcurrentHashMap
ConcurrentHashMap 的成员变量中,包含了一个 Segment 数组 final Segment<K,V>[] segments;
,而 Segment 是ConcurrentHashMap 的内部类。
然后在 Segment 这个类中,包含了一个 HashEntry 的数组transient volatile HashEntry<K,V>[] table
,而 HashEntry 也是 ConcurrentHashMap 的内部类。
HashEntry 中,包含了 key 和 value 以及 next 指针(类似于 HashMap 中的 Entry),所以 HashEntry 可以构成一个链表。
2.1 成员变量及构造函数
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
...
//初始的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//初始的加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//初始的并发等级,表示当前更新线程的估计数
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//最小的segment数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//最大的segment数量
static final int MAX_SEGMENTS = 1 << 16;
//
static final int RETRIES_BEFORE_LOCK = 2;
// segments 的掩码值, key 的散列码的高位用来选择具体的 segment
final int segmentMask;
// 偏移量
final int segmentShift;
final Segment<K,V>[] segments;
...
// 创建一个带有指定初始容量、加载因子和并发级别的新的空映射
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 寻找最佳匹配参数(不小于给定参数的最接近的 2^n)
int sshift = 0; // 用来记录向左按位移动的次数
int ssize = 1; // 用来记录Segment数组的大小
// 计算并行级别 ssize,因为要保持并行级别是 2^n
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 若为默认值,concurrencyLevel 为 16,sshift 为 4
// 那么计算出 segmentShift 为 28,segmentMask 为 15
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 记录每个 Segment 上要放置多少个元素
int c = initialCapacity / ssize;
// 假如有余数,则Segment数量加1
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
当用 new ConcurrentHashMap() 无参构造函数进行初始化的,那么初始化完成后:
- Segment 数组长度为 16,不可以扩容
- Segment[i] 的默认大小为 2,负载因子是 0.75,得出初始阈值为 1.5,也就是以后插入第一个元素不会触发扩容,插入第二个会进行第一次扩容
- 这里初始化了 segment[0],其他位置还是 null,至于为什么要初始化 segment[0],后面的代码会介绍
- 当前 segmentShift 的值为 32 – 4 = 28,segmentMask 为 16 – 1 = 15,姑且把它们简单翻译为移位数和掩码,这两个值马上就会用到
2.2 put过程分析
根据 hash 值很快就能找到相应的 Segment,之后就是 Segment 内部的 put 操作。
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 根据 hash 值找到 Segment 数组中的位置 j
// hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
// 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
// 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,
// ensureSegment(j) 对 segment[j] 进行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}
Segment 内部是由 数组+链表 组成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 先获取该 segment 的独占锁
// 每一个Segment进行put时,都会加锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// segment 内部的数组
HashEntry<K,V>[] tab = table;
// 利用 hash 值,求应该放置的数组下标
int index = (tab.length - 1) & hash;
// 数组该位置处的链表的表头
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
// 如果链头不为 null
if (e != null) {
K k;
//如果在该链中找到相同的key,则用新值替换旧值,并退出循环
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//如果没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else
e = e.next;
}
else {
// node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
// 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果超过了该 segment 的阈值,这个 segment 需要扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// 没有达到阈值,将 node 放到数组 tab 的 index 位置,
// 其实就是将新的节点设置成原链表的表头
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解锁
unlock();
}
return oldValue;
}
2.3 初始化Segment
ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽来说,在插入第一个值的时候进行初始化。
这里需要考虑并发,因为很可能会有多个线程同时进来初始化同一个槽 segment[k],不过只要有一个成功了就可以。
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 这里看到为什么之前要初始化 segment[0] 了,
// 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k]
// 为什么要用“当前”,因为 segment[0] 可能早就扩容过了
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// 初始化 segment[k] 内部的数组
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck Segment[k] 是否被其它线程初始化了
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 使用 while 循环,内部用 CAS,当前线程成功设值或其他线程成功设值后,退出
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
2.4 get过程分析
比较简单,先找到 Segment 数组的位置,然后找到 HashEntry 数组的位置,最后顺着链表查找即可。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
3 线程不安全
3.1 哈希碰撞
多个线程同时使用 put() 方法添加元素,若存在两个或多个 put() 的 key 发生了碰撞,那么有可能其中一个线程的数据被覆盖。
3.2 扩容
当数据要插入 HashMap 时,都会检查容量有没有超过设定的 thredhold,如果超过,则需要扩容。而多线程会导致扩容后的链表形成环形数据结构,一旦形成环形数据结构,Entry 的 next 的节点永远不为 null,就会在获取 Entry 时产生死循环。
例子可见文章《HashMap多线程死循环问题》。
不过要注意,其使用的 Java 版本既不是 7,也不是 8。在 Java7 中方法 addEntry() 添加节点到链表中是先扩容后再添加,而例子中的源码是:
void addEntry(int hash, K key, V value, int bucketIndex) {
Entry<K,V> e = table[bucketIndex];
// 先添加节点
table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
// 然后扩容
if (size++ >= threshold)
resize(2 * table.length);
}
所以在 Java7 中此例子无效。而在 Java8 中,通过确保建新链与旧链的顺序是相同的,即可避免产生死循环。
4 HashMap遍历方式
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class HashMapTest {
private final static Map<Integer, Object> map = new HashMap<Integer, Object>(10000);
private static final Object PRESENT = new Object();
public static void main(String args[]) {
long startTime;
long endTime;
long totalTime;
for (int i = 0; i < 7500; i++) {
map.put(i, PRESENT);
}
// 方法一
startTime = System.nanoTime();
Iterator iter1 = map.entrySet().iterator();
while (iter1.hasNext()) {
Map.Entry<Integer, Object> entry = (Map.Entry) iter1.next();
Integer key = entry.getKey();
Object val = entry.getValue();
}
endTime = System.nanoTime();
totalTime = endTime - startTime;
System.out.println("methor1 pays " + totalTime + " ms");
// 方法二
startTime = System.nanoTime();
Iterator iter2 = map.keySet().iterator();
while (iter2.hasNext()) {
Object key = iter2.next();
Object val = map.get(key);
}
endTime = System.nanoTime();
totalTime = endTime - startTime;
System.out.println("methor2 pays " + totalTime + " ms");
}
}
运行结果:
5 性能对比
线程安全的使用 HashMap 有三种方式,分别为 Hashtable、SynchronizedMap()、ConcurrentHashMap。
Hashtable
使用 synchronized 来保证线程安全,几乎所有的 public 的方法都是 synchronized 的,而有些方法也是在内部通过 synchronized 代码块来实现。
synchronizedMap()
通过创建一个线程安全的 Map 对象,并把它作为一个封装的对象来返回。
ConcurrentHashMap
支持多线程对 Map 做读操作,并且不需要任何的 blocking 。这得益于 CHM 将 Map 分割成了不同的部分,在执行更新操作时只锁住一部分。根据默认的并发级别, Map 被分割成 16 个部分,并且由不同的锁控制。这意味着,同时最多可以有 16个 写线程操作 Map 。试想一下,由只能一个线程进入变成同时可由 16 个写线程同时进入(读线程几乎不受限制),性能的提升是显而易见的。但由于一些更新操作,如 put(), remove(), putAll(), clear()只锁住操作的部分,所以在检索操作不能保证返回的是最新的结果。
在迭代遍历 CHM 时, keySet 返回的 iterator 是弱一致和 fail-safe 的,可能不会返回某些最近的改变,并且在遍历过程中,如果已经遍历的数组上的内容变化了,不会抛出 ConcurrentModificationExceptoin 的异常。
什么时候使用 ConcurrentHashMap ?
CHM 适用于读者数量超过写者时,当写者数量大于等于读者时,CHM 的性能是低于 Hashtable 和 synchronizedMap 的。这是因为当锁住了整个 Map 时,读操作要等待对同一部分执行写操作的线程结束。
CHM 适用于做 cache ,在程序启动时初始化,之后可以被多个请求线程访问。
CHM 是Hashtable一个很好的替代,但要记住, CHM 的比 Hashrable 的同步性稍弱。
测试示例
详情见文章《HashMap Vs. ConcurrentHashMap Vs. SynchronizedMap – How a HashMap can be Synchronized in Java》
其中的测试用例使用了面向抽象编程的思想,值得借鉴!
6 拓展:Java8 HashMap & ConcurrentHashMap
Java8 对 HashMap 和 ConcurrentHashMap 做了一些修改:
- 二者均利用了红黑树,所以其数据结构由 数组 + 链表 + 红黑树 组成。我们知道,链表上的数据需要一个一个比较下去才能找到我们需要的,时间复杂度取决于链表的长度,为 O(n)。为了降低这一部分的开销,在 Java8 中,当链表中的元素超过了 8 个以后,会将链表转换为红黑树,这个时候时间复杂度就降为了 O(logN)
- Java8 中 ConcurrentHashMap 摒弃 Java7 中的 Segment 的概念,使用了另一种方式实现保证线程安全。