一、原理介绍
关于LongAdder这个类可能很多朋友不太熟悉,我简单的对LongAdder介绍一下。
LongAdder被设计出来是为了用于高并发下的自增,说到自增操作,为了保证线程安全,可能很多朋友用过AtomicInteger或者AtomicLong这两个类,那为什么不用这两个类来完成自增而要用LongAdder呢?这是因为AtomicInteger和AtomicLong的底层都是直接使用CAS来修改类中的value值,假如有N个线程同时修改,每次只能有一个线程修改成功,另外N-1个线程会修改失败然后进行自旋,问题的关键就在这里,自旋是需要消耗CPU时间片的,所以当并发量较大时,自旋所占用CPU的时间就会很长。
为了解决AtomicInteger和AtomicLong的缺点才有了LongAdder类。LongAdder的思想是化整为零,如果在没有线程竞争的情况下进行自增,那么与AtomicInteger和AtomicLong没有什么区别,直接累加即可。如果在高并发的环境中,LongAdder会创建一个cell数组,给每个线程分配一个桶位,一个线程对应一个cell,让每个线程在各自的桶位进行累加,最后把cell数组中的所有值加起来就是最终结果。
二、Striped64类
进入LongAdder源码中可以看见LongAdder继承了Striped64,先来看看Striped64中的核心属性。
Cell是一个内部类,类中有value值和cas修改value的方法,关于@sun.misc.Contended注解可以看一下https://www.cnblogs.com/eycuii/p/11525164.html。
@sun.misc.Contended
static final class Cell { // 这里的cell就是用来让每个线程来累加的对象
// 实际累加的变量
volatile long value;
Cell(long x) { value = x; }
// 封装cas操作来修改value
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
// value在内存中的偏移量
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/**
* 获得当前物理机的CPU数量,会被用于限制cell[]的长度
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* cell数组,数组长度一定为2的整数次幂
*/
transient volatile Cell[] cells;
/**
* base主要在没有争用时使用,通过 CAS 更新
*/
transient volatile long base;
/**
* cellBusy就是一把锁,需要拿到锁才能进行初始化或者扩容
* 0表示为加锁,1表示已加锁
*/
transient volatile int cellsBusy;
// cas修改base
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
// 加锁,初始化或者扩容时锁住cell[]
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
// 获得线程的探针hash值
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// 设置线程的探针hash值,其实就是rehash操作
static final int advanceProbe(int probe) {
probe ^= probe << 13;
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
三、add()
add方法是LongAdder的最基本方法,其他的方法均以add来展开,我会逐行分析,请看我的注释。
/**
* 进行累加
* x:需要累加的值
*/
public void add(long x) {
/**
* as:cell[],用于多线程的累加
* b:base值
* v:cell进行cas操作时的期望值
* m:cell[]长度 - 1
* a:当前线程对应的cell
*/
Cell[] as; long b, v; int m; Cell a;
/**
* case1:cell[]非空(产生竞争时才会初始化,非空表示已经初始化了),
* 如果已初始化,那么线程需要把x累加到对应的cell中。
* case2:前置条件:cell[]未初始化,casBase是Striped64封装的cas操作修改base的一个方法,
* case1不满足说明之前未产生线程竞争,这里直接尝试修改base,若修改成功说明没有竞争,
* 若失败说明修改base产生了竞争,需要创建cell[]进入数组累加。
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
// true表示产生竞争,false表示产生了竞争
boolean uncontended = true;
/**
* case1:cell[]是否初始化,true表示未初始化,直接进入longAccumulate()
* case2:cell[]长度是否为0,防止某些异常情况出现cell[]不为null但是长度为0
* case3:线程桶位中cell对象是否被初始化,getProbe()表示获取线程的探测hash值,
* 可以理解为hashCode一样的东西,hash & m的作用就是计算线程对应的桶位
* (这个原理在HashMap中讲过),true表示cell未初始化。
* case4:前置条件:cell[]和线程对应的cell对象已初始化。尝试修改cell中的value,
* 若修改成功说明没有竞争,若修改失败说明修改value产生了竞争。
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 进入的条件:cell[]未初始化 ||
// 线程对应的cell未初始化 ||
// 修改value失败(有竞争)
longAccumulate(x, null, uncontended);
}
}
四、longAccumulate()
longAccumulate()方法中非常复杂,有大量的if/else,在看的时候一定要慢慢捋清楚,最好配合IDEA把if/else块缩起来一个一个看,阅读if/else内部逻辑时需要明白进入这个if/else块的条件和前置条件,我大致列举一下逻辑顺序:
- cell[]已初始化,线程对应的cell未初始化
- 创建cell对象
- 尝试加锁,加锁成功 -> 关联cell对象至桶位,加锁失败 -> 进行rehash -> 自旋
- cell[]已初始化,线程对应的cell已初始化
- 判断是否有竞争,有 -> rehash之后自旋 -> cas修改对应的cell,无 -> cas修改对应的cell
- 修改成功 -> 退出,修改失败 -> 扩容(因为失败的次数太多,所以考虑扩容)
- 判断当前cell[]长度是否大于等于 CPU核心数 && 检查当前是否有其他线程已经完成了扩容操作
- 是 -> 本次循环无法扩容 -> rehash -> 自旋,否 -> 修改扩容意向为true -> 自旋(扩容之前再次尝试修改cell,修改失败则扩容)
- 扩容结束 -> 自旋
- cell[]未初始化
- 直接进行初始化(需要加锁以及检查是否已被其他线程初始化)
整体上来看是按照这三个大块的逻辑来走,其中内部又会有很多细节,比如扩容和初始化需要double check,以及扩容前的重新自旋一次等等。代码的逻辑设计的很巧妙,也很严谨,一定要慢慢读懂。
/**
* 进入的前置条件: cell[]未初始化 || 线程对应的cell未初始化 || 修改value失败(有竞争)
* x:需要累加的值
* fn:函数式接口,可以对方法进行扩展,这里没有用上不用管
* wasUncontended:是否产生竞争,true:没有竞争,false:有竞争
* 根据上面的分析,只有修改value失败时为false
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 当前线程探针hash
int h;
// h为0表示线程的探针hash为初始化,则需要先进行初始化。
if ((h = getProbe()) == 0) {
// 初始化线程中的探针hash
ThreadLocalRandom.current();
h = getProbe();
/**
* 当getProbe()为0的时候,刚刚在a = as[getProbe() & m]会计算出as[0],
* 所以0号桶位产生的竞争是因为线程未初始化探针hash导致在0号桶位产生竞争,
* 正常情况下线程可能对应别的桶位,所以不是真正的竞争,这里才会重新设置true
* 表示没有发送竞争。
*/
wasUncontended = true;
}
// collide表示扩容意向,true:可能扩容,false:一定不会扩容
boolean collide = false;
// 自旋
for (;;) {
/**
* as:当前的cell[]
* a:当前线程对应的cell
* n:cell[]长度
* v:cas修改时的期望值
*/
Cell[] as; Cell a; int n; long v;
// 判断cell[]是否初始化
if ((as = cells) != null && (n = as.length) > 0) {
// 前置条件:cell[]已经初始化
// 拿到当前线程对应的cell判断是否初始化
if ((a = as[(n - 1) & h]) == null) {
// 前置条件:当前线程对应的cell未初始化
// 判断当前是否加锁
if (cellsBusy == 0) {
// 前置条件:未加锁
// 创建线程对应的cell对象并赋值,PS:这里还没有关联到具体的桶位
Cell r = new Cell(x);
/**
* 需要获取到锁才能进行初始化
* case1:判断是否加锁(防止有其他线程在这个时候加锁了)
* case2:尝试加锁
*/
if (cellsBusy == 0 && casCellsBusy()) {
// 前置条件:当前线程加锁成功
// 用于表示是否初始化完毕,true:完成,false未完成
boolean created = false;
// 进行初始化操作
try {
/**
* rs:当前cell[]
* m:cell[]长度
* j:线程对应桶位下标
*/
Cell[] rs; int m, j;
/**
* case1、case2:判断cell[]是否初始化
* case3:判断线程对应的cell是否初始化
* 这里之所以再次判断,是因为可能存在线程安全问题,
* 例如线程1执行刚完初始化,线程2刚好执行到这个if语句,
* 如果线程1和线程2的对应的桶位相同,则线程2会覆盖线程1
* 创建的cell对象。
*/
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 让桶位指向刚刚创建的cell
rs[j] = r;
// 表示创建成功
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 创建成功,退出自旋
if (created)
break;
// 未创建成功则再次自旋
continue;
}
}
// 前置条件:线程对应cell未初始化 && 未获得锁
// 因为有其他线程锁住了cell[],所以无法创建cell对象
// 在下面进行rehash然后自旋,
// 这样有可能重新分配到一个已经初始化的cell对象
collide = false;
}
// 前置条件:cell[]已初始化 && 线程对应的cell已初始化
// cas竞争失败走到这里
else if (!wasUncontended)
// 设置为非竞争状态,在下面进行rehash然后自旋
wasUncontended = true;
// 前置条件:cell[]已初始化 && 线程对应的cell已初始化
// cas修改线程对应的cell,修改成功直接退出,失败则继续自旋
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// case1:数组长度大于等于CPU核心数
// case2:cell[]已经发生扩容,不再是之前的对象了
else if (n >= NCPU || cells != as)
// 当case1或case2为true进入
// case1表示n >= NCPU,则一定不会发生扩容
// case2表示cells刚刚完成了一次扩容,所以设置为false
collide = false;
// 下面是扩容逻辑,所以这里需要判断是否能够扩容
// 当不能扩容时在下面进行rehash然后自旋
else if (!collide)
collide = true;
// 判断是否加锁并且尝试获得锁
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 再次检查,防止进入之前有其他线程进行了扩容导致cells和as不同
if (cells == as) {
// 创建新的数组, n << 1等效于n*2
Cell[] rs = new Cell[n << 1];
// 迁移元素到新数组
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 把成员变量cells指向新的数组
// 其实在上面的几次判断cells == as就是因为这里
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 扩容完成之后设置为false
collide = false;
// 重新走一遍循环,
continue;
}
// rehash操作,重新再分配一次hash和桶位
// 能够进行rehash的情况:
// 产生竞争(wasUncontended == false) ||
h = advanceProbe(h);
}
// 前置条件:cell[]未初始化
// 判断是否加锁以及确定as和cells所指向同一个对象
// 获得锁后执行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 是否初始化完成
boolean init = false;
try {
// 再次检查之后开始初始化cell[]和线程对应的cell
if (cells == as) {
// 初始化cell[]长度为2
Cell[] rs = new Cell[2];
// 初始化线程对应的cell并赋值x
rs[h & 1] = new Cell(x);
// 让cells指向新数组
cells = rs;
// 初始化完成
init = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 初始化完成后退出自旋
if (init)
break;
}
// 前置条件:cell[]未初始化 && (未获取到锁 || cells != as)
// 线程走到这里大概率是因为有其他线程正在初始化cell[]或者已经初始化完成
// 所以这里直接对base尝试累加
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}