java8 stream 并行

本文讨论:

  • 串行和并行的执行效率区别
  • 并行带来的一些问题
  • 什么时候使用并行

ps:主要参考资料来自Effective Java 第三版

先来看一个关于性能提升的例子(来自effective java),是用来计算小于n的素数个数,机器是一个6核cpu(i5-8400)。
肉眼可见的性能得到了提升。

@Test
public void test() throws InterruptedException {
    long start = System.currentTimeMillis();
    pi(10000000);
    long end = System.currentTimeMillis();
    System.out.println("串行时间:" + (end - start) / 1000);

    start = System.currentTimeMillis();
    piParallel(10000000);
    end = System.currentTimeMillis();
    System.out.println("并行时间:" + (end - start) / 1000);
}
static long pi(long n) {
    return LongStream.rangeClosed(2, n).mapToObj(BigInteger::valueOf).filter(i -> i.isProbablePrime(50)).count();
}
static long piParallel(long n) {
    return LongStream.rangeClosed(2, n).parallel().mapToObj(BigInteger::valueOf).filter(i -> i.isProbablePrime(50)).count();
}
// console 
串行时间:31
并行时间:6

并行处理看起来非常美好,但是,真的如此吗?先看一个官方给出的反面教材,下面的一段代码,每次执行都会得到不同的结果
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
线程安全集合

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8};
List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray));
List<Integer> parallelStorage = Collections.synchronizedList(new ArrayList<>());
listOfIntegers.parallelStream()
        // Don't do this! It uses a stateful lambda expression.
        .map(e -> {
            parallelStorage.add(e);
            return e;
        })
        .forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
parallelStorage.stream().forEachOrdered(e -> System.out.print(e + " "));
// console
1 2 3 4 5 6 7 8 
4 2 8 3 7 1 5 6 
// console 
1 2 3 4 5 6 7 8 
6 5 1 4 2 3 7 8 

线程不安全集合

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8};
List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray));
List<Integer> parallelStorage = Lists.newArrayList();
listOfIntegers.parallelStream()
        // Don't do this! It uses a stateful lambda expression.
        .map(e -> {
            parallelStorage.add(e);
            return e;
        })
        .forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
parallelStorage.stream().forEachOrdered(e -> System.out.print(e + " "));
// console 
1 2 3 4 5 6 7 8 
4 8 7 1 5 2 
// console
1 2 3 4 5 6 7 8 
2 8 3 5 1 6 4 7 
// console
1 2 3 4 5 6 7 8 
null 4 6 1 5 8 7 2 

从结果来看,有时候元素会变少,有时候甚至还出现了null。原因也非常简单:ArrayList不是线程安全的

使用并行流引出的第一个问题:

  • 结果不确定性

官方文档有直接给出建议:

The lambda expression e -> { parallelStorage.add(e); return e; } is a stateful lambda expression. Its result can vary every time the code is run.

有状态的lambda表达式每次运行都会出现不同的结果。所以我们应该避免使用有状态的代码。
我们再看下面一个例子(也是来自effective java)
先来一个串行计算,和预计一样,很好的完了代码,并打印出了相应结果

@Test
public void test() {
    primes().map(p -> BigInteger.valueOf(2).pow(p.intValueExact()).subtract(ONE))
            .filter(mersenne -> mersenne.isProbablePrime(50))
            .limit(10)
            .forEach(System.out::println);
}
static Stream<BigInteger> primes() {
    return Stream.iterate(BigInteger.valueOf(2), BigInteger::nextProbablePrime);
}
// console
3
7
31
127
8191
131071
524287
2147483647
2305843009213693951
618970019642690137449562111

再把他改成并行的试试,程序不动了

这里发生了什么?简而言之,流类库不知道如何并行化此管道并且启发式失败(heuristics fail)。 即使在最好的情况下,如果源来自 Stream.iterate 方法,或者使用中间操作 limit 方法,并行化管道也不太可能提高其性能

简而言之limit()方法是个stateful中间操作,并行处理的时候并不知道该如何并行

@Test
public void test() {
    primes().map(p -> BigInteger.valueOf(2).pow(p.intValueExact()).subtract(ONE))
            .filter(mersenne -> mersenne.isProbablePrime(50))
            .limit(10)
            .forEach(System.out::println);
}

再看一些其他的例子,对比一下串行,并行,和传统写法的性能差别

List<Long> list = Lists.newArrayList();
for (long i = 0; i < 1000000; i++) {
    list.add(i);
}

long start = System.currentTimeMillis();
System.out.println("串行:" + list.stream().filter(n -> n % 3 == 0).mapToLong(n -> n).sum());
long end = System.currentTimeMillis();
System.out.println("串行时间:" + (end - start) + "ms");

start = System.currentTimeMillis();
System.out.println("并行:" + list.stream().parallel().filter(n -> n % 3 == 0).mapToLong(n -> n).sum());
end = System.currentTimeMillis();
System.out.println("并行时间:" + (end - start) + "ms");

start = System.currentTimeMillis();
long sum = 0;
for (long l : list) {
    if (l % 3 == 0) {
        sum += l;
    }
}
System.out.println("传统:" + sum);
end = System.currentTimeMillis();
System.out.println("传统时间:" + (end - start) + "ms");
// console
串行:166666833333
串行时间:49ms
并行:166666833333
并行时间:23ms
传统:166666833333
传统时间:9ms

最后贴几个effective java里面关于并行流建议

  • 通常,并行性带来的性能收益在 ArrayList、HashMap、HashSet 和 ConcurrentHashMap 实例、数组、int 类型范围和 long 类型的范围的流上最好
  • 并行化一个流不仅会导致糟糕的性能,包括活性失败(liveness failures);它会导致不正确的结果和不可预知的行为 (安全故障)
  • 在适当的情况下,只需向流管道添加一个 parallel 方法调用,就可以实现处理器内核数量的近似线性加速
    最重要的一个建议就是

总之,甚至不要尝试并行化流管道,除非你有充分的理由相信它将保持计算的正确性并提高其速度。不恰当地并行化流的代价可能是程序失败或性能灾难。如果您认为并行性是合理的,那么请确保您的代码在并行运行时保持正确,并在实际情况下进行仔细的性能度量。如果您的代码是正确的,并且这些实验证实了您对性能提高的怀疑,那么并且只有这样才能在生产代码中并行化流。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容