一起来学Java8(八)——ForkJoin

一起来学Java8(七)——Stream中我们了解了reduce的用法,其中并行流的底层是使用了分支/合并框架

分支/合并框架的核心思想是把一个大的任务拆分成多个子任务,然后把每个子任务的执行结果整合起来,返回一个最终结果。

ForkJoinPool

分支/合并框架的核心类是java.util.concurrent.ForkJoinPool,从名称中可以看到它是一个线程池,线程数量是默认处理器数量,可以通过下面这句话来改变线程数:

System.setProperty("jaav.util.concurrent.ForkJoinPool.common.parallelism", "8");

RecursiveTask<T>

前面说到了ForkJoinPool类是一个线程池,那么RecursiveTask的做用就是生成一个任务,然后把这个任务放到ForkJoinPool当中去。

RecursiveTask类是一个抽象类,并且有一个抽象方法

protected abstract V compute();

这个方法的主要功能是拆分任务逻辑,直到无法拆分时返回子任务的执行结果。

下面从一个简单的例子来说明各个类的使用方式。

这个例子演示将一个数组内的所有数字相加,得到一个总和。思想是将数组进行对半拆分,得到两个子数组,然后再对两个子数组进行拆分,以此类推,直到数组长度小于等于10的时候不再拆分。

@AllArgsConstructor
    static class NumberAddTask extends RecursiveTask<Long> {

        // 存放数字
        private long[] numbers;
        // 计算的起始位置
        private int startIndex;
        // 计算的结束位置
        private int endIndex;

        @Override
        protected Long compute() {
            int len = endIndex - startIndex;
            // 数组长度小于10,无法拆分,开始运算
            if (len <= 10) {
                return execute();
            }
            // 拆分左边的子任务
            NumberAddTask leftTask = new NumberAddTask(numbers, startIndex, startIndex + len / 2);
            // 将子任务加入到ForkJoinPool中去
            leftTask.fork();
            // 创建右边的任务
            NumberAddTask rightTask = new NumberAddTask(numbers, startIndex + len / 2, endIndex);
            // 执行右边的任务
            Long rightSum = rightTask.compute();
            // 读取左边的子任务结果,这里会阻塞
            Long leftSum = leftTask.join();
            // 合并结果
            return leftSum + rightSum;
        }

        private long execute() {
            long sum = 0;
            for (int i = startIndex; i < endIndex; i++) {
                sum += numbers[i];
            }
            return sum;
        }
    }

运行代码:

long startTime = System.currentTimeMillis();
// 生成一个数组,存放1,2,3,4....
long[] numbers = LongStream.rangeClosed(1, 1000000).toArray();
// 创建一个任务,起始位置是0,结束位置是数组的长度
NumberAddTask numberAddTask = new NumberAddTask(numbers, 0, numbers.length);
// 将任务加入到线程池中运行,得到总和
Long sum = new ForkJoinPool().invoke(numberAddTask);
long time = System.currentTimeMillis() - startTime;
System.out.println("sum:" + sum + ", 耗时:" + time + "毫秒");

打印:

sum:500000500000, 耗时:63毫秒

Spliterator

Spliterator是Java8新增的一个接口,从名字上可解读出两种意思:split,iterate,即该接口提供分割迭代的功能。Spliterator接口需要配合Stream一起使用。

使用方式:

// 创建顺序执行的Stream
Stream stream = StreamSupport.stream(Spliterator, false);

// 创建并行的Stream
Stream stream = StreamSupport.stream(Spliterator, true);

这两行代码即为Collection.stream()方法的默认实现。

Spliterator接口声明了4个抽象方法,需要开发者自己实现。

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();
}

boolean tryAdvance(Consumer<? super T> action):方法用于遍历获取元素,然后通过Consumer来执行,如果取到元素返回true,否则返回false

Spliterator<T> trySplit():执行分割操作,如果集合还能再继续分割,则返回一个新的Spliterator,如果不能继续分割则返回null

long estimateSize():用来返回剩余元素数量

int characteristics():指定集合一些特性。

characteristics特性列表如下:

特性 含义
ORDERD 集合中的的元素有顺序概念
DISTINCT 对任意一对遍历过的元素x,y,x.equals(y)返回false
SORTED 遍历的元素按照一个预定义的顺序排序
SIZED 集合元素大小可确定的
NONNULL 保证遍历元素没有null
IMMUTABLE 集合元素不能修改
CONCURRENT 集合可被其它线程同时修改,无需同步
SUBSIZED 该Spliterator和所有从它拆分出来的子Spliterator都有SIZED特性

characteristics()使用方式如下:

@Override
public int characteristics() {
    return Spliterator.SIZED | Spliterator.SUBSIZED;
}

下面我们来实现一个自定义的分割迭代器

static class LongSpliterator implements Spliterator<Long> {

        private long[] array;
        private int index;
        private int end;

        public LongSpliterator(long[] array, int index, int end) {
            this.array = array;
            this.index = index;
            this.end = end;
        }

        @Override
        public boolean tryAdvance(Consumer<? super Long> action) {
            if (index >= 0 && index < end) {
                // 取出元素
                Long l = array[index++];
                // 执行
                action.accept(l);
                return true;
            }
            return false;
        }

        /**
         * 尝试分割集合。
         * 切割规则:数组一分为2,留后半段,将前半段再一分为2。
         * @return 返回null结束分割
         */
        @Override
        public Spliterator<Long> trySplit() {
            int start = 0;
            // 取中间
            int middle = (start + end) >>> 1;
            if (start < middle) {
                return null;
            }
            // 当前index变成中间值,即当前类的操作范围是:middle ~ end
            index = middle;
            // 将前半段再一分为2
            return new LongSpliterator(array, start, middle);
        }

        @Override
        public long estimateSize() {
            return end - index;
        }

        @Override
        public int characteristics() {
            return Spliterator.SIZED | Spliterator.SUBSIZED;
        }
    }

该分割迭代器中的集合是一个long数组,分割规则是将数组一分为2,留后半段,将前半段再一分为2。

现在来使用这个分割迭代器,计算从1加到1000000,测试用例如下:

    public void testDo() {
        // 创建一个数组
        long[] numbers = LongStream.rangeClosed(1, 1000000).toArray();
        long startTime = System.currentTimeMillis();
        LongSpliterator spliterator = new LongSpliterator(numbers, 0, numbers.length);
        // 申明一个并行的Stream
        Stream<Long> stream = StreamSupport.stream(spliterator, true);
        // 计算从1加到1000000,结果应该是:500000500000
        Long sum = stream.reduce((n1, n2) -> n1 + n2).orElse(0L);
        long time = System.currentTimeMillis() - startTime;
        System.out.println("sum:" + sum + ", 耗时:" + time + "毫秒");
    }

打印:

sum:500000500000, 耗时:32毫秒

定期分享技术干货,一起学习,一起进步!微信公众号:猿敲月下码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容

  • 在一起来学Java8(七)——Stream中我们了解了reduce的用法,其中并行流的底层是使用了分支/合并框架。...
    眉目清俊阅读 226评论 0 0
  • 在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分成若干子部分。第二,你要给每个...
    夏与清风阅读 859评论 0 1
  • 本文主要总结了《Java8实战》,适用于学习 Java8 的同学,也可以作为一个 API 手册文档适用,平时使用时...
    _晓__阅读 1,754评论 2 7
  • Java8 in action 没有共享的可变数据,将方法和函数即代码传递给其他方法的能力就是我们平常所说的函数式...
    铁牛很铁阅读 1,201评论 1 2
  • 点赞+收藏 就学会系列,文章收录在 GitHub JavaEgg ,N线互联网开发必备技能兵器谱 Java8早在2...
    JavaKeeper_海星阅读 299评论 0 0