1.回顾
之前说的Executor作用是把线程的定义和执行分开,主要是用来做线程的执行接口,在他下面还有一个控制着线程生命周期的ExecutorService,然后才是各种各样的ThreadPoolExecutor,把线程池作为一个执行的单元,给他单独出一个类,下面是他的七个参数
corePoolSize 核心线程数
maxmumPoolSize 最大线程数
keepAliveTime 生存时间
TimeUnit 生存时间的单位
BlockingQueue 任务队列
**ThreadFactory **线程工厂
RejectStrategy 拒绝策略(Abort 抛异常 Discard扔掉 不抛异常 DiscardOldest 扔掉排队时间最久的,CallerRuns 调用处理者处理服务)
2.jdk自带线程池
今天我们来看看JDK给我们提供了一些默认线程池的实现,默认的常用的有哪些,然后来读读ThreadPoolExecutor源码
所有的线程池都是继承ExecutorService的,所以Executors是对线程执行的工具类,可以看做是线程的工厂,产生各种各样的线程池
2.1.SingleThreadPool
先来看第一个SingleThreadPool ,看这个名字就觉得只有一个线程,这个一个线程的线程池可以保证扔进去的任务是顺序执行的
package com.learn.thread.eight;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestSingleThreadPool {
private static ExecutorService service = Executors.newSingleThreadExecutor();
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
final int j = i;
// 顺序执行的
service.execute(() -> {
System.out.println(j + " " +Thread.currentThread().getName());
});
}
}
}
2.2.CacheThreadPool
我们来看第二种,看他源码实际上是跟之前的SingleThreadPool一样,底层是还是ThreadPoolExecutor
没有核心线程数,最大线程可以很Intger的最大值个,如果60秒没人理他,自动被回收
任务队列用的是SynchronousQueue 不是用来存数据的,用来传递消息的,如果任务没有被执行,就会被阻塞
用的是默认线程工厂
没有指定拒绝策略,用默认拒绝策略
可以看出CachePool的特点,就是你来一个任务我启动一个线程。启动线程的逻辑如下
如果线程没有被回收,就去看当前线程池的线程是不是有空闲的线程,如果有就执行让它去执行任务。如果没有,就自己new 一个线程去执行,原因是队列是SynchronousQueue ,它必须保证它的大小为0,所以你来一个任务必须有一个线程去执行,不然别的线程提交任务就统统阻塞了
来看这个小程序,首先将线程池service打印出来,最后又打印一遍线程池services,然后任务是睡眠500毫秒
package com.learn.thread.eight;
import com.sun.tools.internal.ws.wsdl.document.soap.SOAPUse;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author zglx
*/
public class TestCachePool {
private static ExecutorService service = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
System.out.println(service);
for (int i = 0; i < 2; i++) {
// 顺序执行的
Thread.sleep(100);
service.execute(() -> {
try {
Thread.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
Thread.sleep(800);
System.out.println(service);
}
}
注意线程存活的时间是60S,所以第一个线程被复用了
2.3.FixedThreadPool
fixed是固定的含义,就是固定一个线程数。
fixedThreadPool指定一个参数,到底有多少线程,核心线程数和最大线程数是固定的,所以不存在回收之说,但是这里用的是LinkedBlockingQueue,这里一定要小心,因为是不建议使用的,因为是会造成内存泄漏
但是用fixedThreadPool有一个好处,可以进行并行的计算
并行与并发 并发是指任务的提交、并行是指任务执行,并行是并发的子集,并行是多个cpu可以同时进行处理,并发是多个任务同时过来。
我们看下面一个程序,用多线程计算1-200000中的质数,可以对这个区间进行分组
package com.learn.thread.eight;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class TestFixedThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
List<Integer> list = getPrime(1,200000);
list.forEach(System.out::println);
long end = System.currentTimeMillis();
System.out.println("time " + (end - start));
start = System.currentTimeMillis();
int num = 4;
ExecutorService service = Executors.newFixedThreadPool(num);
Future<List<Integer>> future1 = service.submit(new Mytask(1, 80000));
Future<List<Integer>> future2 = service.submit(new Mytask(80001, 160000));
Future<List<Integer>> future3 = service.submit(new Mytask(160001, 200000));
future2.get().addAll(future3.get());
future1.get().addAll(future2.get());
future1.get().forEach(System.out::println);
end = System.currentTimeMillis();
System.out.println("time " + (end - start));
}
public static List<Integer> getPrime(Integer start, Integer end) {
List<Integer> list = new ArrayList<>(100);
for (Integer i = start; i <= end; i++) {
if (isPrime(i)) {
list.add(i);
}
}
return list;
}
public static boolean isPrime(Integer num) {
for (int i = 2; i <= num/2; i++) {
if (num % i == 0) {
return false;
}
}
return true;
}
static class Mytask implements Callable<List<Integer>> {
int start;
int end;
public Mytask() {
}
public Mytask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public List<Integer> call() throws Exception {
List<Integer> result = getPrime(start, end);
return result;
}
}
}
2.4.cacahe vs fixed
什么时候用cache 什么时候用fixed ,这得看你的业务量,如果线程池的线程太多,他们就会竞争稀缺的处理器和内存资源,浪费大量的时候在上下文切换,反之如果线程太少,处理器就可能无法充分利用。
建议:线程池大小与处理器的利用率之比可以使用公式来进行估算
线程池 = 你有多少个CPU乘以cpu期望利用率 乘以 (1+W/C)W除C是等待时间与计算时间的比率
表达式为
Nthread = Ncpu * Ucpu * (1+W/C)
如果你的任务不确定是否平稳,但是要保证任务来的时候有线程去执行,那我们就可以用cache,当然你要保证这个任务不会堆积。
假如你大概估了线程的值,这个值完全可以处理任务,我可以直接New 一个线程来执行,那就用fixed,但是阿里不建议这么使用
2.5.ScheduledThreadPool
定时任务线程池,就是一个定时器任务,隔一段时间后执行,这个就是我们专门用来执行定时任务的一个线程池。
这里super调用的是ThreadPoolExecutor 本质上还是ThreadPoolExecutor,它最大线程数也是Integer的最大值,用的队列是DelayedWorkQueue。
它有一些好用的方法,比如scheduleAtFixedRate间隔多长时间在一个固定频率执行一次这个任务,可用通过这样灵活的时间配置。
第一个参数是Delay,第一次执行任务在此之后多长时间
第二个参数period间隔多长时间
第三个参数是时间单位
package com.learn.thread.eight;
import com.learn.thread.five.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TestScheduledPool {
private static ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
public static void main(String[] args) {
// 5秒打印一次线程名字
service.scheduleAtFixedRate(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 5, TimeUnit.SECONDS);
}
}
假如有一个闹钟服务,假如有十亿人订阅了这个闹钟,意味着,每天早上会有10亿的并发量,你怎么优化
思想就是把这个定时的任务分发到很多个边缘的服务器上,假如说你有一台服务器,你也是要用到多线程去消费,总之就是一个分而治之的思想
SingleThreadPool 只有一个线程的线程池
FixedThreadPool 固定多少个线程
CacheThreadPool 有弹性的线程池,有一个启动一个,只要没有空闲的就启动一个新的来执行
ScheduleThreadPool 定时任务来执行线程池
package com.learn.thread.eight;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* 自定义线程池的拒绝策略
*/
public class TestRejectedHandler {
private static ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread("t1");
}
};
private static ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),factory, new MyHandler());
static class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("asdasd");
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
3.ThreadPoolExecutor
3.1.常用变量的解释
// AtomicInteger是int类型,是32位。高3位代表线程状态,低29位代表目前线程池有多少个线程数量,这里把两个值合二为一就是算了执行效率更高一些,因为都需要线程同步,而同步一个值往往比同步一个值容易的多
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.size为32,所以COUNT_BITS为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY 线程允许的最大线程数,1左移29位,然后减1,就是2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// (线程有五种状态按大小排序为 RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED)
// 正常运行的
private static final int RUNNING = -1 << COUNT_BITS;
// 调用shutdown方法进去了shutdown状态
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 调用shutdown马上停止
private static final int STOP = 1 << COUNT_BITS;
// 调用了shutdown然后这个线程也执行完了,现在正在处理的过程叫做TIDYING
private static final int TIDYING = 2 << COUNT_BITS;
// 整个线程全部结束
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程池的状态 通过按位与操作,低29位将全部变成0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取线程的数量,通过按位与操作,高3位全部变成0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根据线程池状态和线程池的线程数量生成ct1值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
3.2.构造方法
// 构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 基本类型参数校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 空指针校验
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 安全管理器
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 核心线程数
this.corePoolSize = corePoolSize;
// 最大线程池数
this.maximumPoolSize = maximumPoolSize;
//
this.workQueue = workQueue;
// 根据传入参数unit和keepAliveTime 将存活时间转换为纳秒存到变量keepAliveTime中
this.keepAliveTime = unit.toNanos(keepAliveTime);
// 线程工厂
this.threadFactory = threadFactory;
// 策略
this.handler = handler;
}
3.3. 提交执行task的过程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.判断线程池活着的那些线程数是不是小于核心线程数,如果小于就addWorker添加一个线程。
if (workerCountOf(c) < corePoolSize) {
// addWorker创建线程,第二个参数表示是否创建核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 这里处理当前线程数超过核心线程数的逻辑
// 2.先判断当前线程池状态,如果是在运行中,就把任务丢到队列里边去,否则拒绝任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 这里需要双重判断线程池的状态,因为这阻塞的过程中有可能线程池的状态被改变了,
// 如果不是Running状态,说明线程池执行了shutdown操作,就删除此任务,并且拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作状态的线程为0,说明没有线程了或者核心线程数设置为0了,就添加非核心线程数
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果线程不是运行状态,那就任务进入队列,这里有3点需要注意
// 线程池不是运行状态,addWorker内部会判断线程池的状态
// 第二个参数表示是否创建核心线程
// addWorker返回false说明任务执行失败,需要拒绝任务
else if (!addWorker(command, false))
reject(command);
}
3.4. addWorker源码分析
addWorker涉及到了很多细节,如果要读懂每一个细节完全不必要,但是思想理解就行了,addWorker的意思就是添加线程,线程要存到容器里,往里头添加线程任务的时候肯定是多个线程同时往里面扔的,所以一定要同步,但是追求效率,一般都是用自旋或者lock
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 两层死循环就为了做一个事情,添加一个woker的数量加1
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程池状态满足以下条件,就返回
// 1、线程池状态大于SHUTDOWN
// 2、线程池状态等于SHUTDOWN并且firstTask执行任务不为null,直接返回false
// 3、线程池状态等于SHUTDOWN,且队列为空,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 嵌套死循环
for (;;) {
int wc = workerCountOf(c);
// 如果当前线程超过最大允许线程数,或者根据core状态,大于核心线程或者最大线程数,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试把当前执行线程数+1,如果+1成功,打破循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新获取当前线程池状态
c = ctl.get();
// 如果当前执行线程数不等于之前读取的数量,说明有别的线程加!成功了
if (runStateOf(c) != rs)
// 重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 下面的逻辑是创建一个线程去执行任务
// 是否执行任务状态,true执行成功
boolean workerStarted = false;
// 判断是否假如addWorker状态 add加入成功
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// worker的添加必须是串行的,因此必须加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 重新检查当前线程池状态,查询当前执行任务的数量
int rs = runStateOf(ctl.get());
// 如果rs小于SHUTDOWN,或者rs为SHUTDOWN并且firstTask为null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// workder已经调用过了start方法,则不再创建worker,抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// worker创建并添加workers成功
workers.add(w);
// 更新largestPoolSize变量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 启动worker线程
if (workerAdded) {
// 这里会调用worker的run方法,
// 实际上是调用执行runWorker方法
t.start();
workerStarted = true;
}
}
} finally {
// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行), 需要进行shutdown相关操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3.5.线程池worker任务单元
来看看worker是个什么东西,看源码是一个静态内部类Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// 1. 线程
final Thread thread;
// 2. 任务
Runnable firstTask;
// 3. 执行次数
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 线程池执行的核心方法
public void run() {
runWorker(this);
}
}
3.6.核心线程执行逻辑runWorker
这里有一个有意思的地方可以看到Worker 是实现了Runnable,它自己也有一个Runnbale和Thread,你就把Worker当成一个工人,工人有任务(Runnbale)和执行能力(Thread),但是你得保证每一个工人执行的任务是自己的,并且自己执行完了以后,completedTasks要加1
所以当多线程添加任务的时候,把当前线程复制一份给Thread和任务下发给工人的Runnbale,然后让工人去lock,工人的lock 其实就是之前学过ReentrantLock的acquire方法,加入链表,等待执行,这样子就完成了整个串行。
final void runWorker(Worker w) {
// 1. 这里是用来自己实现方法的beforeExecute,自己实现内容
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 这里调用了unlock,其实也就调用了release(1),执行过程中允许线程被中断
w.unlock(); // allow interrupts
// 判断是否继续自旋
boolean completedAbruptly = true;
try {
// 这里判断任务是否为存在或者队列中的任务不为空,注意如果从队列取就会造成阻塞
while (task != null || (task = getTask()) != null) {
// CAS加锁
w.lock();
// 判断当前线程池状态是否为停止,如果停止了,则中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 这里用来扩展功能,但是没用到
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 这个工人开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// gc回收
task = null;
// 任务加1
w.completedTasks++;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 自旋操作退出,说明线程池结束
processWorkerExit(w, completedAbruptly);
}
}
4.woker类总结
这个woker是一个Thread 也是一个Runnable 同样也是一个AQS,先来说说为什么要有Runnable和Thread
4.1.为什么要有Runnable?
这里其实是用来记录任务的,因为Woker 里边有很多状态需要根据当前任务去记录,并且这个东西必须要在Thread中执行
4.2.为什么要有Thread?
因为线程池有很多Woker去竞争,所以干脆就把Woker设计成AQS,一个线程处理一个当前任务,而不是说有其他worker执行了不是自己的任务
4.3.submit和execute的区别是什么
方法定义的位置不同,execute是在Executor执行器中,而submit是在ExecutorService执行服务中的
参数接收不同,execute接收的是Runnbale ,submit接收的是Callable
作用不用,execute只是单纯的执行任务,submit可以把任务的结果带出来。
4.4.线程池大概执行流程
核心线程数不够,启动核心线程
核心线程满了,加入队列
核心线程和队列都满了,addWorker 加入非核心线程
4.5.addWorker 做的事情
count 加1
真正的加入任务并且start
WorkStealingPool
WorkStealingPool 是另外一种线程,核心非常简单,之前讲的ThreadPoolExecutor线程是去一个任务的队列里取任务。而WorkStealingPool是每一个线程都有自己的任务队列,当一个线程执行完以后,会在别的线程任务队列中偷任务
跟原来只有一个队列的线程池相比,如果有某一个线程被占用了很长时间,然后任务队列又特别的重,那其他线程只能空着,没办法帮到任务重的线程
源码
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
WorkStealingPool本质上是一个ForkJoinPool
package com.learn.thread.eight;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestWorkStealingPool {
private static final ExecutorService service = Executors.newWorkStealingPool();
public static void main(String[] args) throws IOException {
System.out.println(Runtime.getRuntime().availableProcessors());
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
System.in.read();
}
static class R implements Runnable {
int time;
R(int time) {
this.time = time;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
}
5.ForkJoinPool
ForkJoinPool 适合把大任务切分成一个个小任务去执行,小任务如果还是大,再切,不一定是两个,可以是多个,但是最终的结果就是多个小任务结果的汇总。这个过程就叫做join ,所以这种线程池叫做ForkJoinPool。
既然是可以分割的任务,那怎么定义任务呢,之前线程池执行的任务就是Runnable。在这里,我们一般实现ForkJoinPool的时候需要定义成特定他的类型,这个类型又必须是可以分叉的任务,这个任务就叫做ForkJoinTask,但是实际上这个ForkJoinTask又比较原始,我们可以用RecursiveAction,这里边又有两种。
第一个RecursiveAction递归,称为不带返回值的任务,因为我们可以把大任务分割成小任务,小任务又可以分成小任务,一直到我满意的条件为止,这其中就带有递归的过程。等会来看看一个例子,所以我要对一百万个数进行求和,单线程肯定很慢。
第二个RecursiveTask,叫做带返回值的子任务
package com.learn.thread.eight;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
public class TestForkJoinPool {
private static int[] nums = new int[1000000];
// 按5万个一组,进行分组
private static int MAX_SIZE = 500000;
static Random random = new Random();
static {
for (int i = 0; i < 1000000; i++) {
nums[i] = random.nextInt(100);
}
System.out.println("-----" + Arrays.stream(nums).sum());
}
public static void main(String[] args) throws IOException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 无返回参数子任务
AddTask addTask = new AddTask(0, nums.length);
// forkJoinPool.execute(addTask);
AddTaskRet addTaskRet = new AddTaskRet(0, nums.length);
forkJoinPool.invoke(addTaskRet);
System.out.println(addTaskRet.join());
System.in.read();
}
/**
* 不带返回值的ForkJoinPool
*/
static class AddTask extends RecursiveAction {
private int start;
private int end;
public AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 如果是在分组内,开始计算,否则再分组
if (end - start <= MAX_SIZE) {
long sum = 0L;
for (int i = start; i < end; i++) {
sum += nums[i];
}
System.out.println("from start" + start + "to end " + end + "sum =" + sum);
} else {
int middle = start + (end - start) / 2;
AddTask addTask = new AddTask(start, middle);
AddTask addTask1 = new AddTask(middle, end);
addTask.fork();
addTask1.fork();
}
}
}
/**
* 不带返回值的ForkJoinPool
*/
static class AddTaskRet extends RecursiveTask<Long> {
private int start;
private int end;
public AddTaskRet(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 如果是在分组内,开始计算,否则再分组
if (end - start <= MAX_SIZE) {
long sum = 0L;
for (int i = start; i < end; i++) {
sum += nums[i];
}
return sum;
}
int middle = start + (end - start) / 2;
AddTaskRet addTask = new AddTaskRet(start, middle);
AddTaskRet addTask1 = new AddTaskRet(middle, end);
addTask.fork();
addTask1.fork();
return addTask.join() + addTask1.join();
}
}
}
6.parallelStream
java8 有一个并行流,底层就是ForkJoinPool算法来实现的。
你可以把集合里面的内容想像成一个个河流往外流,在流经过某个地方的时候处理一下。
举例:我们new 了一个1000000 数据的集合,然后判断这些数是不是质数,foreach是lamdba表达式的一个流式处理,还是相当于一个遍历循环。
但是parallelStream并行流是并行的来处理这个任务切分成一个个子任务,所以跟foreach会有一个时间上的差距。所以在互相之间线程不需要同步的时候,你可以用这种并行流来处理效率会高一些
package com.learn.thread.eight;
import com.google.common.collect.RangeMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class TestParallelStream {
public static boolean isPrime(int num) {
for (int i = 2; i < num/2; i++) {
if (num % i ==0) {
return false;
}
}
return true;
}
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 1000000; i++) {
list.add(random.nextInt(1000000));
}
long start = System.currentTimeMillis();
list.forEach(TestParallelStream::isPrime);
long end = System.currentTimeMillis();
System.out.println("ends" + (end - start));
start = System.currentTimeMillis();
list.parallelStream().forEach(TestParallelStream::isPrime);
end = System.currentTimeMillis();
System.out.println("ends" + (end - start));
}
}
总结
线程池有两种ThreadPoolService\ForkJoinPool
区别在于ThreadPoolService多个线程共享一个任务队列,下面各个每个线程都有自己的任务队列