浅析LongAdder

浅析LongAdder

前言

上文中分析了AtomicLong以及Unsafe,本文将为大家带来LongAdder的分析.LongAdder之前在guava以及hystrix等中出现,但是目前已经出现在jdk8标准库中了,作者是著名的Doug lea大师。

基本分析

先看看LongAdder的java doc的描述:

One or more variables that together maintain an initially zero
{@code long} sum. When updates (method {@link #add}) are contended
across threads, the set of variables may grow dynamically to reduce
contention. Method {@link #sum} (or, equivalently, {@link
longValue}) returns the current total combined across the
variables maintaining the sum.
<p> This class is usually preferable to {@link AtomicLong} when
multiple threads update a common sum that is used for purposes such
as collecting statistics, not for fine-grained synchronization
control. Under low update contention, the two classes have similar
characteristics. But under high contention, expected throughput of
this class is significantly higher, at the expense of higher space
consumption.
<p>This class extends {@link Number}, but does <em>not</em> define
methods such as {@code hashCode} and {@code compareTo} because
instances are expected to be mutated, and so are not useful as
collection keys.
jsr166e note: This class is targeted to be placed in
java.util.concurrent.atomic

翻译过来就是说:
LongAdder中会维护一个或多个变量,这些变量共同组成一个long型的“和”。当多个线程同时更新(特指“add”)值时,为了减少竞争,可能会动态地增加这组变量的数量。“sum”方法(等效于longValue方法)返回这组变量的“和”值。
当我们的场景是为了统计技术,而不是为了更细粒度的同步控制时,并且是在多线程更新的场景时,LongAdder类比AtomicLong更好用。 在小并发的环境下,论更新的效率,两者都差不多。但是高并发的场景下,LongAdder有着明显更高的吞吐量,但是有着更高的空间复杂度。

从上面的java doc来看,LongAdder有两大方法,add和sum。其更适合使用在多线程统计计数的场景下,在这个限定的场景下比AtomicLong要高效一些,下面我们来分析下为啥在这种场景下LongAdder会更高效。

add方法

public void add(long x) {
        Cell[] as; long b, v; HashCode hc; Cell a; int n;
        //首先判断cells是否还没被初始化,并且尝试对value值进行cas操作
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            //查看当前线程中HashCode中存储的随机值
            int h = (hc = threadHashCode.get()).code;
            //此处有多个判断条件,依次是
            //1.cell[]数组还未初始化
            //2.cell[]数组虽然初始化了但是数组长度为0
            //3.该线程所对应的cell为null,其中要注意的是,当n为2的n次幂时,((n - 1) & h)等效于h%n
            //4.尝试对该线程对应的cell单元进行cas更新(加上x)
            if (as == null || (n = as.length) < 1 ||
                (a = as[(n - 1) & h]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //在以上条件都失效的情况下,重试update
                retryUpdate(x, hc, uncontended);
        }
    }

    //一个ThreadLocal类
    static final class ThreadHashCode extends ThreadLocal<HashCode> {
        public HashCode initialValue() { return new HashCode(); }
    }

    
    static final ThreadHashCode threadHashCode = new ThreadHashCode();


    //每个HashCode在初始化时会产生并保存一个非0的随机数
    static final class HashCode {
        static final Random rng = new Random();
        int code;
        HashCode() {
            int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
            code = (h == 0) ? 1 : h;
        }
    }
    
    //尝试使用casBase对value值进行update,baseOffset是value相对于LongAdder对象初始位置的内存偏移量
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val);
    }

add方法的注释在其中,让我们再看看重要的retryUpdate方法。retryUpdate在上述四个条件都失败的情况下尝试再次update,我们猜测在四个条件都失败的情况下在retryUpdate中肯定都对应四个条件失败的处理方法,并且update一定要成功,所以肯定有相应的循环+cas的方式出现。

final void retryUpdate(long x, HashCode hc, boolean wasUncontended) {
        int h = hc.code;
        boolean collide = false;                // True if last slot nonempty
        //我们猜测的for循环
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //这个if分支处理上述四个条件中的3和4,此时cells数组已经初始化了并且长度大于0
            if ((as = cells) != null && (n = as.length) > 0) {
                //该分支处理四个条件中的3分支,线程对应的cell为null
                if ((a = as[(n - 1) & h]) == null) {
                    //如果busy锁没被占有
                    if (busy == 0) {            // Try to attach new Cell
                        //新建一个cell
                        Cell r = new Cell(x);   // Optimistically create
                        //double check busy,并且尝试锁busy(乐观锁)
                        if (busy == 0 && casBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    //再次确认线程hashcode所对应的cell为null,将新建的cell赋值
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                            //解锁
                                busy = 0;
                            }
                            if (created)
                                break;
                            //如果失败,再次尝试
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //处理四个条件中的条件4,置为true后交给循环重试
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //尝试给线程对应的cell update
                else if (a.cas(v = a.value, fn(v, x)))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                //在以上办法都不管用的情况下尝试扩大cell
                else if (busy == 0 && casBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                        //扩大一倍,将前N个拷贝过去
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        busy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //rehash下,走到这一步基本是因为多个线程的竞争太激烈了,所以在扩展cell后rehash h,等待下次循环处理好这次更新
                h ^= h << 13;                   // Rehash
                h ^= h >>> 17;
                h ^= h << 5;
            }
            //主要针对上述四个条件中的1.2,此时cells还未进行第一次初始化,其中casBusy的理解参照下面busy的      注释,如果casBusy能成功才进入这个分支
            else if (busy == 0 && cells == as && casBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        //创建数量为2的cell数组,2很重要,因为每次都是n<<1进行扩大一倍的,所以n永远是2的幂
                        Cell[] rs = new Cell[2];
                        //需要注意的是h&1 = h%2,将线程对应的cell初始值设置为x
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                //释放busy锁
                    busy = 0;
                }
                if (init)
                    break;
            }
            //busy锁不成功或者忙,则再重试一次casBase对value直接累加
            else if (casBase(v = base, fn(v, x)))
                break;                          // Fall back on using base
        }
        hc.code = h;                            // Record index for next time
    }
    
    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     通过cas实现的自旋锁,用于扩大或者初始化cells
     */
    transient volatile int busy;

从以上分析来看,retryUpdate非常的复杂,所做的努力就是为了尽量减少多个线程更新同一个值value,能用简单的方式解决的绝对不采用开销更大的方法(resize cell也是走投无路的时候)

回过头来总结分析下LongAdder减少冲突的方法以及在求和场景下比AtomicLong更高效的原因

  • 首先和AtomicLong一样,都会先采用cas方式更新值
  • 在初次cas方式失败的情况下(通常证明多个线程同时想更新这个值),尝试将这个值分隔成多个cell(sum的时候求和就好),让这些竞争的线程只管更新自己所属的cell(因为在rehash之前,每个线程中存储的hashcode不会变,所以每次都应该会找到同一个cell),这样就将竞争压力分散了

sum方法

public long sum() {
        long sum = base;
        Cell[] as = cells;
        if (as != null) {
            int n = as.length;
            for (int i = 0; i < n; ++i) {
                Cell a = as[i];
                if (a != null)
                    sum += a.value;
            }
        }
        return sum;
    }

sum方法就简单多了,将cell数组中的value求和就好

AtomicLong可否可以被LongAdder替代

有了传说中更高效的LongAdder,那AtomicLong可否不使用了呢?当然不是!

答案就在LongAdder的java doc中,从我们翻译的那段可以看出,LongAdder适合的场景是统计求和计数的场景,而且LongAdder基本只提供了add方法,而AtomicLong还具有cas方法(要使用cas,在不直接使用unsafe之外只能借助AtomicXXX了)

LongAdder有啥用

从java doc中可以看出,其适用于统计计数的场景,例如计算qps这种场景。在高并发场景下,qps这个值会被多个线程频繁更新的,所以LongAdder很适合。HystrixRollingNumber就是用了它,下篇文章介绍它

总结

本文简单分析了下LongAdder,下篇文章介绍HystrixRollingNumber

留个悬念

  static final class Cell {
        volatile long p0, p1, p2, p3, p4, p5, p6;
        volatile long value;
        volatile long q0, q1, q2, q3, q4, q5, q6;
        Cell(long x) { value = x; }

        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }

    }

Cell单元为啥要这么设计?volatile long p0, p1, p2, p3, p4, p5, p6; volatile long q0, q1, q2, q3, q4, q5, q6;这些看起来没用的p,q可以删掉吗?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容