ForkJoinPool

  1. 为什么使用fork join框架?

ForkJoinPool是ExecutorSerice的一个补充,而不是替代品

并不适合所有场景;
特别适合用于“分而治之”,递归计算的算法;

JAVA8中CompeleteFuture、并发流等都是基于ForkJoinPool实现;

本文将讲述什么样的场景合适、什么样的场景不合适,并且深入剖析为啥某些场景不合适。

  1. fork join框架使用简单实例
    使用RecursiveTask实现一个累加的功能,使用分而治之的思想,实现分段求和后汇总
public class SumTask extends RecursiveTask<Integer> {

    private Integer start = 0;
    private Integer end = 0;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {

        if (end - start < 100) {
            //小于100时直接返回结果
            int sumResult = 0;
            for (int i = start; i <= end; i++) {
                sumResult += i;
            }
            return sumResult;
        } else {
            //大于一百时进行分割
            int middle = (end + start) / 2;
            SumTask leftSum = new SumTask(this.start, middle);
            SumTask rightSum = new SumTask(middle, this.end);
            leftSum.fork();
            rightSum.fork();
            return leftSum.join() + rightSum.join();
        }
    }

    public static void main(String[] args) {
        SumTask sumTask = new SumTask(1, 999999);
        sumTask.fork();
        System.out.print("result:" + sumTask.join());
    }
} 

默认线程池有固定的线程数,会根据可用的availableProcessors来计算线程数量;

/**
     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
     * java.lang.Runtime#availableProcessors}, using the {@linkplain
     * #defaultForkJoinWorkerThreadFactory default thread factory},
     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
     *
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

如果想使用自定义线程池,比如100个线程的线程池,可以如下:
当然无论你初始化多少线程,都只会有和CPU数量相等的几个线程运行,大部分场景下并无异于这个框架的加速;

public static void main(String[] args) {
        ForkJoinPool forkJoinPool=new ForkJoinPool(100);
        SumTask sumTask = new SumTask(1, 999999);
        forkJoinPool.submit(sumTask);
        System.out.print("result:" + sumTask.join());
    }

我们来看看fork的时候做了些什么

 /**
     * Arranges to asynchronously execute this task in the pool the
     * current task is running in, if applicable, or using the {@link
     * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
     * it is not necessarily enforced, it is a usage error to fork a
     * task more than once unless it has completed and been
     * reinitialized.  Subsequent modifications to the state of this
     * task or any data it operates on are not necessarily
     * consistently observable by any thread other than the one
     * executing it unless preceded by a call to {@link #join} or
     * related methods, or a call to {@link #isDone} returning {@code
     * true}.
     *
     * @return {@code this}, to simplify usage
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

当前线程不是个ForkJoinWorkerThread的时候,则加入到ForkJoinPool线程池(基于ExecutorService实现);
如果当前线程已经是个ForkJoinWorkerThread了,则把这个任务加入到当前线程的workQueue;

大概的图

这是和普通线程池不同的地方,task并不是交给线程池中的queue,而是放到线程本地的workQueue

我们来看看ForkJoinPool中的task是如何运行的
a. 线程以LIFO先进后出方式从本地队列获取任务,执行,直到自己的队列为空;
b. 查看其他ForkJoinWorkerThread是否有未执行task,有的话通过work−stealing窃取,窃取方式为FIFO先进先出,减少竞争;优先看曾今从自己那里窃取任务的thread,如果有的话;
c. 任务运行完成时,返回结果;

  1. java8并发流
    java8的并发流使用的也是ForkJoinPool
myList.parallelStream.map(obj -> longRunningOperation())

默认使用公用线程池,使用独立线程池的话如下

ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {  
    firstRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) { }
    });
});
 
ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);  
forkJoinPool2.submit(() -> {  
    secondRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) {
        }
    });
});

4.为什么不适合执行有block比如有io的任务
线程block的时候,线程池会调度线程池队列中的其他未线程运行,这是ExecutorService的机制;
但是ForkJoinPool中的ForkJoinWorkerThread工作机制是不停执行本地workQueue中的task,task是一个个取的,顺序执行,没有塞回去的动作,并不会因为某个task引起block后而换个task继续执行;

参考:
Java Tip: When to use ForkJoinPool vs ExecutorService
http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/
https://www.jianshu.com/p/bd825cb89e00
https://www.cnblogs.com/richaaaard/p/6601445.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 193,968评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,682评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,254评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,074评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,964评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,055评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,484评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,170评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,433评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,512评论 2 308
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,296评论 1 325
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,184评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,545评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,150评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,437评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,630评论 2 335