java多线程与高并发(九)线程池与源码解读

1.回顾

之前说的Executor作用是把线程的定义和执行分开,主要是用来做线程的执行接口,在他下面还有一个控制着线程生命周期的ExecutorService,然后才是各种各样的ThreadPoolExecutor,把线程池作为一个执行的单元,给他单独出一个类,下面是他的七个参数
corePoolSize 核心线程数
maxmumPoolSize 最大线程数
keepAliveTime 生存时间
TimeUnit 生存时间的单位
BlockingQueue 任务队列
**ThreadFactory **线程工厂
RejectStrategy 拒绝策略(Abort 抛异常 Discard扔掉 不抛异常 DiscardOldest 扔掉排队时间最久的,CallerRuns 调用处理者处理服务)

2.jdk自带线程池

今天我们来看看JDK给我们提供了一些默认线程池的实现,默认的常用的有哪些,然后来读读ThreadPoolExecutor源码
所有的线程池都是继承ExecutorService的,所以Executors是对线程执行的工具类,可以看做是线程的工厂,产生各种各样的线程池

2.1.SingleThreadPool

先来看第一个SingleThreadPool ,看这个名字就觉得只有一个线程,这个一个线程的线程池可以保证扔进去的任务是顺序执行的

package com.learn.thread.eight;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestSingleThreadPool {
    private static ExecutorService service = Executors.newSingleThreadExecutor();


    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            final int j = i;
            // 顺序执行的
            service.execute(() -> {
                System.out.println(j + " " +Thread.currentThread().getName());
            });
            
        }
    }
}

SingleThreadPool.png

2.2.CacheThreadPool

我们来看第二种,看他源码实际上是跟之前的SingleThreadPool一样,底层是还是ThreadPoolExecutor


CachePool.png

没有核心线程数,最大线程可以很Intger的最大值个,如果60秒没人理他,自动被回收
任务队列用的是SynchronousQueue 不是用来存数据的,用来传递消息的,如果任务没有被执行,就会被阻塞
用的是默认线程工厂
没有指定拒绝策略,用默认拒绝策略
可以看出CachePool的特点,就是你来一个任务我启动一个线程。启动线程的逻辑如下
如果线程没有被回收,就去看当前线程池的线程是不是有空闲的线程,如果有就执行让它去执行任务。如果没有,就自己new 一个线程去执行,原因是队列是SynchronousQueue ,它必须保证它的大小为0,所以你来一个任务必须有一个线程去执行,不然别的线程提交任务就统统阻塞了
来看这个小程序,首先将线程池service打印出来,最后又打印一遍线程池services,然后任务是睡眠500毫秒

package com.learn.thread.eight;

import com.sun.tools.internal.ws.wsdl.document.soap.SOAPUse;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author zglx
 */
public class TestCachePool {
    private static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException {
        System.out.println(service);
        for (int i = 0; i < 2; i++) {
            // 顺序执行的
            Thread.sleep(100);
            service.execute(() -> {
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);
        Thread.sleep(800);
        System.out.println(service);
    }


}

注意线程存活的时间是60S,所以第一个线程被复用了

2.3.FixedThreadPool

fixed是固定的含义,就是固定一个线程数。


FixedThreadPool.png

fixedThreadPool指定一个参数,到底有多少线程,核心线程数和最大线程数是固定的,所以不存在回收之说,但是这里用的是LinkedBlockingQueue,这里一定要小心,因为是不建议使用的,因为是会造成内存泄漏

但是用fixedThreadPool有一个好处,可以进行并行的计算
并行与并发 并发是指任务的提交、并行是指任务执行,并行是并发的子集,并行是多个cpu可以同时进行处理,并发是多个任务同时过来。

我们看下面一个程序,用多线程计算1-200000中的质数,可以对这个区间进行分组

package com.learn.thread.eight;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class TestFixedThreadPool {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        List<Integer> list = getPrime(1,200000);
        list.forEach(System.out::println);
        long end = System.currentTimeMillis();
        System.out.println("time " + (end - start));

        start = System.currentTimeMillis();
        int num = 4;
        ExecutorService service = Executors.newFixedThreadPool(num);
        Future<List<Integer>> future1 = service.submit(new Mytask(1, 80000));
        Future<List<Integer>> future2 = service.submit(new Mytask(80001, 160000));
        Future<List<Integer>> future3 = service.submit(new Mytask(160001, 200000));
        future2.get().addAll(future3.get());
        future1.get().addAll(future2.get());
        future1.get().forEach(System.out::println);
        end = System.currentTimeMillis();
        System.out.println("time " + (end - start));
    }

    public static List<Integer> getPrime(Integer start, Integer end) {
        List<Integer> list = new ArrayList<>(100);
        for (Integer i = start; i <= end; i++) {
            if (isPrime(i)) {
                list.add(i);
            }
        }

        return list;
    }

    public static boolean isPrime(Integer num) {
        for (int i = 2; i <= num/2; i++) {
            if (num % i == 0) {
                return false;
            }
        }
        return true;
    }

    static class Mytask implements Callable<List<Integer>> {
        int start;
        int end;
        public Mytask() {
        }

        public Mytask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public List<Integer> call() throws Exception {
            List<Integer> result = getPrime(start, end);
            return result;
        }
    }
}

2.4.cacahe vs fixed

什么时候用cache 什么时候用fixed ,这得看你的业务量,如果线程池的线程太多,他们就会竞争稀缺的处理器和内存资源,浪费大量的时候在上下文切换,反之如果线程太少,处理器就可能无法充分利用。
建议:线程池大小与处理器的利用率之比可以使用公式来进行估算

线程池 = 你有多少个CPU乘以cpu期望利用率 乘以 (1+W/C)W除C是等待时间与计算时间的比率
表达式为
Nthread = Ncpu * Ucpu * (1+W/C)

如果你的任务不确定是否平稳,但是要保证任务来的时候有线程去执行,那我们就可以用cache,当然你要保证这个任务不会堆积。

假如你大概估了线程的值,这个值完全可以处理任务,我可以直接New 一个线程来执行,那就用fixed,但是阿里不建议这么使用

2.5.ScheduledThreadPool

定时任务线程池,就是一个定时器任务,隔一段时间后执行,这个就是我们专门用来执行定时任务的一个线程池。


ScheduledThreadPool.png

这里super调用的是ThreadPoolExecutor 本质上还是ThreadPoolExecutor,它最大线程数也是Integer的最大值,用的队列是DelayedWorkQueue。
它有一些好用的方法,比如scheduleAtFixedRate间隔多长时间在一个固定频率执行一次这个任务,可用通过这样灵活的时间配置。
第一个参数是Delay,第一次执行任务在此之后多长时间
第二个参数period间隔多长时间
第三个参数是时间单位

package com.learn.thread.eight;

import com.learn.thread.five.ExecutorService;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduledPool {
    private static ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

    public static void main(String[] args) {
        // 5秒打印一次线程名字
        service.scheduleAtFixedRate(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 5, TimeUnit.SECONDS);
    }
}

假如有一个闹钟服务,假如有十亿人订阅了这个闹钟,意味着,每天早上会有10亿的并发量,你怎么优化
思想就是把这个定时的任务分发到很多个边缘的服务器上,假如说你有一台服务器,你也是要用到多线程去消费,总之就是一个分而治之的思想

SingleThreadPool 只有一个线程的线程池
FixedThreadPool 固定多少个线程
CacheThreadPool 有弹性的线程池,有一个启动一个,只要没有空闲的就启动一个新的来执行
ScheduleThreadPool 定时任务来执行线程池

package com.learn.thread.eight;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * 自定义线程池的拒绝策略
 */
public class TestRejectedHandler {
    private static ThreadFactory factory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread("t1");
        }
    };
    private static ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),factory, new MyHandler());

    static class MyHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("asdasd");

        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

}

3.ThreadPoolExecutor

3.1.常用变量的解释

// AtomicInteger是int类型,是32位。高3位代表线程状态,低29位代表目前线程池有多少个线程数量,这里把两个值合二为一就是算了执行效率更高一些,因为都需要线程同步,而同步一个值往往比同步一个值容易的多

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // Integer.size为32,所以COUNT_BITS为29
    private static final int COUNT_BITS = Integer.SIZE - 3;

    // CAPACITY 线程允许的最大线程数,1左移29位,然后减1,就是2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // (线程有五种状态按大小排序为  RUNNING -> SHUTDOWN  -> STOP -> TIDYING   -> TERMINATED)
    // 正常运行的
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 调用shutdown方法进去了shutdown状态
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 调用shutdown马上停止
    private static final int STOP       =  1 << COUNT_BITS;
    // 调用了shutdown然后这个线程也执行完了,现在正在处理的过程叫做TIDYING
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 整个线程全部结束
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 获取线程池的状态 通过按位与操作,低29位将全部变成0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取线程的数量,通过按位与操作,高3位全部变成0
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 根据线程池状态和线程池的线程数量生成ct1值
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 线程池状态大于等于xx
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

3.2.构造方法

    // 构造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 基本类型参数校验
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 空指针校验
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 安全管理器
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        // 核心线程数
        this.corePoolSize = corePoolSize;
        // 最大线程池数
        this.maximumPoolSize = maximumPoolSize;
        // 
        this.workQueue = workQueue;
        // 根据传入参数unit和keepAliveTime 将存活时间转换为纳秒存到变量keepAliveTime中
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        // 线程工厂
        this.threadFactory = threadFactory;
        // 策略
        this.handler = handler;
    }

3.3. 提交执行task的过程

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 1.判断线程池活着的那些线程数是不是小于核心线程数,如果小于就addWorker添加一个线程。
        if (workerCountOf(c) < corePoolSize) {
            // addWorker创建线程,第二个参数表示是否创建核心线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 这里处理当前线程数超过核心线程数的逻辑
        // 2.先判断当前线程池状态,如果是在运行中,就把任务丢到队列里边去,否则拒绝任务
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 这里需要双重判断线程池的状态,因为这阻塞的过程中有可能线程池的状态被改变了,
            // 如果不是Running状态,说明线程池执行了shutdown操作,就删除此任务,并且拒绝
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果工作状态的线程为0,说明没有线程了或者核心线程数设置为0了,就添加非核心线程数
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3.如果线程不是运行状态,那就任务进入队列,这里有3点需要注意
        // 线程池不是运行状态,addWorker内部会判断线程池的状态
        // 第二个参数表示是否创建核心线程
        // addWorker返回false说明任务执行失败,需要拒绝任务
        else if (!addWorker(command, false))
            reject(command);
    }

3.4. addWorker源码分析

addWorker涉及到了很多细节,如果要读懂每一个细节完全不必要,但是思想理解就行了,addWorker的意思就是添加线程,线程要存到容器里,往里头添加线程任务的时候肯定是多个线程同时往里面扔的,所以一定要同步,但是追求效率,一般都是用自旋或者lock

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 两层死循环就为了做一个事情,添加一个woker的数量加1
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 判断线程池状态满足以下条件,就返回
            // 1、线程池状态大于SHUTDOWN
            // 2、线程池状态等于SHUTDOWN并且firstTask执行任务不为null,直接返回false
            // 3、线程池状态等于SHUTDOWN,且队列为空,直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            // 嵌套死循环
            for (;;) {
        
                int wc = workerCountOf(c);
                // 如果当前线程超过最大允许线程数,或者根据core状态,大于核心线程或者最大线程数,返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 尝试把当前执行线程数+1,如果+1成功,打破循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 重新获取当前线程池状态
                c = ctl.get();  
                // 如果当前执行线程数不等于之前读取的数量,说明有别的线程加!成功了
                if (runStateOf(c) != rs)
                // 重试
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 下面的逻辑是创建一个线程去执行任务
        // 是否执行任务状态,true执行成功
        boolean workerStarted = false;
        // 判断是否假如addWorker状态 add加入成功
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建一个Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // worker的添加必须是串行的,因此必须加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 重新检查当前线程池状态,查询当前执行任务的数量
                    int rs = runStateOf(ctl.get());
                    // 如果rs小于SHUTDOWN,或者rs为SHUTDOWN并且firstTask为null
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // workder已经调用过了start方法,则不再创建worker,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // worker创建并添加workers成功
                        workers.add(w);
                        // 更新largestPoolSize变量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 启动worker线程
                if (workerAdded) {
                    // 这里会调用worker的run方法,
                    // 实际上是调用执行runWorker方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行), 需要进行shutdown相关操作
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

3.5.线程池worker任务单元

来看看worker是个什么东西,看源码是一个静态内部类Worker

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        // 1. 线程
        final Thread thread;
        // 2. 任务
        Runnable firstTask;
        // 3. 执行次数
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        // 线程池执行的核心方法
          public void run() {
            runWorker(this);
        }
        }

3.6.核心线程执行逻辑runWorker

这里有一个有意思的地方可以看到Worker 是实现了Runnable,它自己也有一个Runnbale和Thread,你就把Worker当成一个工人,工人有任务(Runnbale)和执行能力(Thread),但是你得保证每一个工人执行的任务是自己的,并且自己执行完了以后,completedTasks要加1
所以当多线程添加任务的时候,把当前线程复制一份给Thread和任务下发给工人的Runnbale,然后让工人去lock,工人的lock 其实就是之前学过ReentrantLock的acquire方法,加入链表,等待执行,这样子就完成了整个串行。

final void runWorker(Worker w) {
        // 1. 这里是用来自己实现方法的beforeExecute,自己实现内容
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 这里调用了unlock,其实也就调用了release(1),执行过程中允许线程被中断
        w.unlock(); // allow interrupts
        // 判断是否继续自旋
        boolean completedAbruptly = true;
        try {
            // 这里判断任务是否为存在或者队列中的任务不为空,注意如果从队列取就会造成阻塞
            while (task != null || (task = getTask()) != null) {
            // CAS加锁
                w.lock();
                // 判断当前线程池状态是否为停止,如果停止了,则中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                // 这里用来扩展功能,但是没用到
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 这个工人开始执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // gc回收
                    task = null;
                    // 任务加1
                    w.completedTasks++;
                    // 释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 自旋操作退出,说明线程池结束
            processWorkerExit(w, completedAbruptly);
        }
    }

4.woker类总结

这个woker是一个Thread 也是一个Runnable 同样也是一个AQS,先来说说为什么要有Runnable和Thread

4.1.为什么要有Runnable?

这里其实是用来记录任务的,因为Woker 里边有很多状态需要根据当前任务去记录,并且这个东西必须要在Thread中执行

4.2.为什么要有Thread?

因为线程池有很多Woker去竞争,所以干脆就把Woker设计成AQS,一个线程处理一个当前任务,而不是说有其他worker执行了不是自己的任务

4.3.submit和execute的区别是什么

方法定义的位置不同,execute是在Executor执行器中,而submit是在ExecutorService执行服务中的
参数接收不同,execute接收的是Runnbale ,submit接收的是Callable
作用不用,execute只是单纯的执行任务,submit可以把任务的结果带出来。

4.4.线程池大概执行流程

核心线程数不够,启动核心线程
核心线程满了,加入队列
核心线程和队列都满了,addWorker 加入非核心线程

4.5.addWorker 做的事情

count 加1
真正的加入任务并且start
WorkStealingPool
WorkStealingPool 是另外一种线程,核心非常简单,之前讲的ThreadPoolExecutor线程是去一个任务的队列里取任务。而WorkStealingPool是每一个线程都有自己的任务队列,当一个线程执行完以后,会在别的线程任务队列中偷任务

跟原来只有一个队列的线程池相比,如果有某一个线程被占用了很长时间,然后任务队列又特别的重,那其他线程只能空着,没办法帮到任务重的线程

源码

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

WorkStealingPool本质上是一个ForkJoinPool

package com.learn.thread.eight;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestWorkStealingPool {
    private static final ExecutorService service = Executors.newWorkStealingPool();

    public static void main(String[] args) throws IOException {
        System.out.println(Runtime.getRuntime().availableProcessors());
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        System.in.read();
    }

    static class R implements Runnable {
        int time;

        R(int time) {
            this.time = time;
        }
        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(time + " " + Thread.currentThread().getName());
        }
    }
}

image.png

5.ForkJoinPool

ForkJoinPool 适合把大任务切分成一个个小任务去执行,小任务如果还是大,再切,不一定是两个,可以是多个,但是最终的结果就是多个小任务结果的汇总。这个过程就叫做join ,所以这种线程池叫做ForkJoinPool。
既然是可以分割的任务,那怎么定义任务呢,之前线程池执行的任务就是Runnable。在这里,我们一般实现ForkJoinPool的时候需要定义成特定他的类型,这个类型又必须是可以分叉的任务,这个任务就叫做ForkJoinTask,但是实际上这个ForkJoinTask又比较原始,我们可以用RecursiveAction,这里边又有两种。
第一个RecursiveAction递归,称为不带返回值的任务,因为我们可以把大任务分割成小任务,小任务又可以分成小任务,一直到我满意的条件为止,这其中就带有递归的过程。等会来看看一个例子,所以我要对一百万个数进行求和,单线程肯定很慢。
第二个RecursiveTask,叫做带返回值的子任务

package com.learn.thread.eight;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class TestForkJoinPool {
    private static int[] nums = new int[1000000];
    // 按5万个一组,进行分组
    private static int MAX_SIZE = 500000;

    static Random random = new Random();

    static {
        for (int i = 0; i < 1000000; i++) {
            nums[i] = random.nextInt(100);
        }
        System.out.println("-----" + Arrays.stream(nums).sum());
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 无返回参数子任务
        AddTask addTask = new AddTask(0, nums.length);
//      forkJoinPool.execute(addTask);

        AddTaskRet addTaskRet = new AddTaskRet(0, nums.length);
        forkJoinPool.invoke(addTaskRet);
        System.out.println(addTaskRet.join());
        System.in.read();
    }

    /**
     * 不带返回值的ForkJoinPool
     */
    static class AddTask extends RecursiveAction {
        private int start;

        private int end;

        public AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            // 如果是在分组内,开始计算,否则再分组
            if (end - start <= MAX_SIZE) {
                long sum = 0L;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                System.out.println("from start" + start + "to end " + end + "sum =" + sum);
            } else {
                int middle = start + (end - start) / 2;
                AddTask addTask = new AddTask(start, middle);
                AddTask addTask1 = new AddTask(middle, end);
                addTask.fork();
                addTask1.fork();
            }
        }
    }

    /**
     * 不带返回值的ForkJoinPool
     */
    static class AddTaskRet extends RecursiveTask<Long> {
        private int start;

        private int end;

        public AddTaskRet(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            // 如果是在分组内,开始计算,否则再分组
            if (end - start <= MAX_SIZE) {
                long sum = 0L;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                return sum;
            }
            int middle = start + (end - start) / 2;
            AddTaskRet addTask = new AddTaskRet(start, middle);
            AddTaskRet addTask1 = new AddTaskRet(middle, end);
            addTask.fork();
            addTask1.fork();
            return addTask.join() + addTask1.join();
        }
    }
}

6.parallelStream

java8 有一个并行流,底层就是ForkJoinPool算法来实现的。
你可以把集合里面的内容想像成一个个河流往外流,在流经过某个地方的时候处理一下。
举例:我们new 了一个1000000 数据的集合,然后判断这些数是不是质数,foreach是lamdba表达式的一个流式处理,还是相当于一个遍历循环。
但是parallelStream并行流是并行的来处理这个任务切分成一个个子任务,所以跟foreach会有一个时间上的差距。所以在互相之间线程不需要同步的时候,你可以用这种并行流来处理效率会高一些

package com.learn.thread.eight;

import com.google.common.collect.RangeMap;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class TestParallelStream {

    public static boolean isPrime(int num) {
        for (int i = 2; i < num/2; i++) {
            if (num % i ==0) {
                return false;
            }
        }
        return true;
    }
    public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1000000; i++) {
            list.add(random.nextInt(1000000));
        }

        long start = System.currentTimeMillis();
        list.forEach(TestParallelStream::isPrime);
        long end = System.currentTimeMillis();
        System.out.println("ends" + (end - start));

        start = System.currentTimeMillis();
        list.parallelStream().forEach(TestParallelStream::isPrime);
        end = System.currentTimeMillis();
        System.out.println("ends" + (end - start));
    }
}

总结
线程池有两种ThreadPoolService\ForkJoinPool
区别在于ThreadPoolService多个线程共享一个任务队列,下面各个每个线程都有自己的任务队列

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容