前言
在阿里巴巴Java开发手册中有如下参考,在JDK8中推荐使用LongAdder替代AtomicLong,故通过本文记录LongAdder类实现原理。
本文主要参考https://www.cnblogs.com/wang-meng/p/12892695.html
AtomicLong
AtomicLong是juc包下的原子类,对数据进行原子操作来保证并发情况下数据的安全性,主要原理是通过CAS+自旋,避免了使用synchronize进行锁操作。不过在高并发环境下会出现大量失败并不断自旋,导致成为性能瓶颈。
LongAdder
LongAdder结构
如上图所示,LongAdder类继承自Striped64,LongAdder的真实值是base的值与Cell数组中的原始的value累加;cellsBusy表示初始化cells或者扩容cells需要获取锁, 0:表示无锁状态 1:表示其他线程已经持有了锁。
LongAdder设计思想
LongAdder的设计思想就是分散热点,上文中提到AtomicLong的性能瓶颈在于大量自旋,将value值的新增操作分散到一个数组中(Cell数组,惰性加载),各个线程对不同的value值进行CAS操作,就可以减小自旋失败概率。流程图如下所示:
LongAdder源码分析
LongAdder流程主要在于添加操作,故分析添加操作源码如下所示
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
添加操作流程主要在longAccumulate
函数中,longAccumulate
源码如下所示:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// cells数组未初始化
// case1
// ...
// ...
// ...
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// cells数组已初始化,但是当前线程对应的cell数据为空
// case2
// ...
// ...
// ...
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// cells数组已经初始化, 当前线程对应的cell数据为空,且CAS操作+1失败
break; // Fall back on using base
}
}
暂时将代码中多个if中的逻辑代码省略,先对该流程有个整体的认识。
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
getProbe()
方法用于获取当前线程的hash值,通过上述方法可以初始化当前线程变量threadLocalRandomProbe的值,该变量会用于计算当前线程会被分配到cells数组的哪个槽中。
首先关注case1中的流程:
if ((as = cells) != null && (n = as.length) > 0) {
// 当前线程hash后指向数据位置元素是否为空
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 当前Cell存在,执行CAS设置
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 当前Cell数组元素大于CPU个数
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 是否存在冲突
else if (!collide)
collide = true;
// 扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
case1的核心是cells数组的扩容操作,也就是当前cells的元素个数小于当前机器CPU个数且当前多个线程访问了cells的同一个元素导致冲突使得其中一个线程CAS失败方式的扩容操作。
扩容操作是将cellsBusy设置为1,然后将Cell扩容为原来的两倍。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
case2中的流程:
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
case2主要是初始化cells数组,并通过casCellsBusy()
将cellsBusy标识改为1,表示cells数组已经扩容