LongAdder源码分析

一、原理介绍

关于LongAdder这个类可能很多朋友不太熟悉,我简单的对LongAdder介绍一下。

LongAdder被设计出来是为了用于高并发下的自增,说到自增操作,为了保证线程安全,可能很多朋友用过AtomicInteger或者AtomicLong这两个类,那为什么不用这两个类来完成自增而要用LongAdder呢?这是因为AtomicInteger和AtomicLong的底层都是直接使用CAS来修改类中的value值,假如有N个线程同时修改,每次只能有一个线程修改成功,另外N-1个线程会修改失败然后进行自旋,问题的关键就在这里,自旋是需要消耗CPU时间片的,所以当并发量较大时,自旋所占用CPU的时间就会很长。

为了解决AtomicInteger和AtomicLong的缺点才有了LongAdder类。LongAdder的思想是化整为零,如果在没有线程竞争的情况下进行自增,那么与AtomicInteger和AtomicLong没有什么区别,直接累加即可。如果在高并发的环境中,LongAdder会创建一个cell数组,给每个线程分配一个桶位,一个线程对应一个cell,让每个线程在各自的桶位进行累加,最后把cell数组中的所有值加起来就是最终结果。

二、Striped64类

进入LongAdder源码中可以看见LongAdder继承了Striped64,先来看看Striped64中的核心属性。

Cell是一个内部类,类中有value值和cas修改value的方法,关于@sun.misc.Contended注解可以看一下https://www.cnblogs.com/eycuii/p/11525164.html

@sun.misc.Contended 
static final class Cell {   // 这里的cell就是用来让每个线程来累加的对象
    // 实际累加的变量
    volatile long value;
    Cell(long x) { value = x; }
    // 封装cas操作来修改value
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    // value在内存中的偏移量
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

/** 
 * 获得当前物理机的CPU数量,会被用于限制cell[]的长度
 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
 * cell数组,数组长度一定为2的整数次幂
 */
transient volatile Cell[] cells;

/**
 * base主要在没有争用时使用,通过 CAS 更新
 */
transient volatile long base;

/**
 * cellBusy就是一把锁,需要拿到锁才能进行初始化或者扩容
 * 0表示为加锁,1表示已加锁
 */
transient volatile int cellsBusy;

// cas修改base
final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

// 加锁,初始化或者扩容时锁住cell[]
final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

// 获得线程的探针hash值
static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

// 设置线程的探针hash值,其实就是rehash操作
static final int advanceProbe(int probe) {
    probe ^= probe << 13;
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    return probe;
}

三、add()

add方法是LongAdder的最基本方法,其他的方法均以add来展开,我会逐行分析,请看我的注释。

/**
 * 进行累加
 * x:需要累加的值
 */
public void add(long x) {
    /**
     * as:cell[],用于多线程的累加
     * b:base值
     * v:cell进行cas操作时的期望值
     * m:cell[]长度 - 1
     * a:当前线程对应的cell
     */
    Cell[] as; long b, v; int m; Cell a;
    /**
     * case1:cell[]非空(产生竞争时才会初始化,非空表示已经初始化了),
     *      如果已初始化,那么线程需要把x累加到对应的cell中。
     * case2:前置条件:cell[]未初始化,casBase是Striped64封装的cas操作修改base的一个方法,
     *      case1不满足说明之前未产生线程竞争,这里直接尝试修改base,若修改成功说明没有竞争,
     *      若失败说明修改base产生了竞争,需要创建cell[]进入数组累加。
     */
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // true表示产生竞争,false表示产生了竞争
        boolean uncontended = true;
        /**
          * case1:cell[]是否初始化,true表示未初始化,直接进入longAccumulate()
          * case2:cell[]长度是否为0,防止某些异常情况出现cell[]不为null但是长度为0
          * case3:线程桶位中cell对象是否被初始化,getProbe()表示获取线程的探测hash值,
          *       可以理解为hashCode一样的东西,hash & m的作用就是计算线程对应的桶位
          *      (这个原理在HashMap中讲过),true表示cell未初始化。
          * case4:前置条件:cell[]和线程对应的cell对象已初始化。尝试修改cell中的value,
          *       若修改成功说明没有竞争,若修改失败说明修改value产生了竞争。
          */ 
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // 进入的条件:cell[]未初始化 || 
            //           线程对应的cell未初始化 || 
            //           修改value失败(有竞争)
            longAccumulate(x, null, uncontended);
    }
}

四、longAccumulate()

longAccumulate()方法中非常复杂,有大量的if/else,在看的时候一定要慢慢捋清楚,最好配合IDEA把if/else块缩起来一个一个看,阅读if/else内部逻辑时需要明白进入这个if/else块的条件前置条件,我大致列举一下逻辑顺序:

  • cell[]已初始化,线程对应的cell未初始化
    • 创建cell对象
    • 尝试加锁,加锁成功 -> 关联cell对象至桶位,加锁失败 -> 进行rehash -> 自旋
  • cell[]已初始化,线程对应的cell已初始化
    • 判断是否有竞争,有 -> rehash之后自旋 -> cas修改对应的cell,无 -> cas修改对应的cell
    • 修改成功 -> 退出,修改失败 -> 扩容(因为失败的次数太多,所以考虑扩容)
    • 判断当前cell[]长度是否大于等于 CPU核心数 && 检查当前是否有其他线程已经完成了扩容操作
    • 是 -> 本次循环无法扩容 -> rehash -> 自旋,否 -> 修改扩容意向为true -> 自旋(扩容之前再次尝试修改cell,修改失败则扩容)
    • 扩容结束 -> 自旋
  • cell[]未初始化
    • 直接进行初始化(需要加锁以及检查是否已被其他线程初始化)

整体上来看是按照这三个大块的逻辑来走,其中内部又会有很多细节,比如扩容和初始化需要double check,以及扩容前的重新自旋一次等等。代码的逻辑设计的很巧妙,也很严谨,一定要慢慢读懂。

/**
 * 进入的前置条件: cell[]未初始化 || 线程对应的cell未初始化 || 修改value失败(有竞争)
 * x:需要累加的值
 * fn:函数式接口,可以对方法进行扩展,这里没有用上不用管
 * wasUncontended:是否产生竞争,true:没有竞争,false:有竞争
 *                 根据上面的分析,只有修改value失败时为false
 */
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 当前线程探针hash
    int h;
    // h为0表示线程的探针hash为初始化,则需要先进行初始化。
    if ((h = getProbe()) == 0) {
        // 初始化线程中的探针hash
        ThreadLocalRandom.current();
        h = getProbe();
        /**
         * 当getProbe()为0的时候,刚刚在a = as[getProbe() & m]会计算出as[0],
         * 所以0号桶位产生的竞争是因为线程未初始化探针hash导致在0号桶位产生竞争,
         * 正常情况下线程可能对应别的桶位,所以不是真正的竞争,这里才会重新设置true
         * 表示没有发送竞争。 
         */
        wasUncontended = true;
    }
    // collide表示扩容意向,true:可能扩容,false:一定不会扩容
    boolean collide = false;
    // 自旋
    for (;;) {
        /**
         * as:当前的cell[]
         * a:当前线程对应的cell
         * n:cell[]长度
         * v:cas修改时的期望值
         */
        Cell[] as; Cell a; int n; long v;
        // 判断cell[]是否初始化
        if ((as = cells) != null && (n = as.length) > 0) {
            // 前置条件:cell[]已经初始化
            // 拿到当前线程对应的cell判断是否初始化
            if ((a = as[(n - 1) & h]) == null) {
                // 前置条件:当前线程对应的cell未初始化
                // 判断当前是否加锁
                if (cellsBusy == 0) {
                    // 前置条件:未加锁
                    // 创建线程对应的cell对象并赋值,PS:这里还没有关联到具体的桶位
                    Cell r = new Cell(x);
                    /**
                     * 需要获取到锁才能进行初始化
                     * case1:判断是否加锁(防止有其他线程在这个时候加锁了)
                     * case2:尝试加锁
                     */
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 前置条件:当前线程加锁成功
                        // 用于表示是否初始化完毕,true:完成,false未完成
                        boolean created = false;
                        // 进行初始化操作
                        try {
                            /**
                             * rs:当前cell[]
                             * m:cell[]长度
                             * j:线程对应桶位下标
                             */
                            Cell[] rs; int m, j;
                            /**
                             * case1、case2:判断cell[]是否初始化
                             * case3:判断线程对应的cell是否初始化
                             * 这里之所以再次判断,是因为可能存在线程安全问题,
                             * 例如线程1执行刚完初始化,线程2刚好执行到这个if语句,
                             * 如果线程1和线程2的对应的桶位相同,则线程2会覆盖线程1
                             * 创建的cell对象。
                             */
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                // 让桶位指向刚刚创建的cell
                                rs[j] = r;
                                // 表示创建成功
                                created = true;
                            }
                        } finally {
                            // 释放锁
                            cellsBusy = 0;
                        }
                        // 创建成功,退出自旋
                        if (created)
                            break;
                        // 未创建成功则再次自旋
                        continue;
                    }
                }
                // 前置条件:线程对应cell未初始化 && 未获得锁
                // 因为有其他线程锁住了cell[],所以无法创建cell对象
                // 在下面进行rehash然后自旋,
                // 这样有可能重新分配到一个已经初始化的cell对象
                collide = false;
            }
            
            // 前置条件:cell[]已初始化 && 线程对应的cell已初始化
            // cas竞争失败走到这里
            else if (!wasUncontended)
                // 设置为非竞争状态,在下面进行rehash然后自旋
                wasUncontended = true;
            // 前置条件:cell[]已初始化 && 线程对应的cell已初始化
            // cas修改线程对应的cell,修改成功直接退出,失败则继续自旋
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // case1:数组长度大于等于CPU核心数
            // case2:cell[]已经发生扩容,不再是之前的对象了
            else if (n >= NCPU || cells != as)
                // 当case1或case2为true进入
                // case1表示n >= NCPU,则一定不会发生扩容
                // case2表示cells刚刚完成了一次扩容,所以设置为false
                collide = false;
            // 下面是扩容逻辑,所以这里需要判断是否能够扩容
            // 当不能扩容时在下面进行rehash然后自旋
            else if (!collide)
                collide = true;
            // 判断是否加锁并且尝试获得锁
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    // 再次检查,防止进入之前有其他线程进行了扩容导致cells和as不同
                    if (cells == as) {
                        // 创建新的数组, n << 1等效于n*2
                        Cell[] rs = new Cell[n << 1];
                        // 迁移元素到新数组
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 把成员变量cells指向新的数组
                        // 其实在上面的几次判断cells == as就是因为这里
                        cells = rs;
                    }
                } finally {
                    // 释放锁
                    cellsBusy = 0;
                }
                // 扩容完成之后设置为false
                collide = false;
                // 重新走一遍循环,
                continue;
            }
            // rehash操作,重新再分配一次hash和桶位
            // 能够进行rehash的情况:
            // 产生竞争(wasUncontended == false) || 
            h = advanceProbe(h);
        }
        // 前置条件:cell[]未初始化
        // 判断是否加锁以及确定as和cells所指向同一个对象
        // 获得锁后执行初始化
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 是否初始化完成
            boolean init = false;
            try {
                // 再次检查之后开始初始化cell[]和线程对应的cell
                if (cells == as) {
                    // 初始化cell[]长度为2
                    Cell[] rs = new Cell[2];
                    // 初始化线程对应的cell并赋值x
                    rs[h & 1] = new Cell(x);
                    // 让cells指向新数组
                    cells = rs;
                    // 初始化完成
                    init = true;
                }
            } finally {
                // 释放锁
                cellsBusy = 0;
            }
            // 初始化完成后退出自旋
            if (init)
                break;
        }
        // 前置条件:cell[]未初始化 && (未获取到锁 || cells != as)
        // 线程走到这里大概率是因为有其他线程正在初始化cell[]或者已经初始化完成
        // 所以这里直接对base尝试累加
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容