1.fork/join示例
package com.zskj.concert.test.forkJoin;
import java.util.concurrent.RecursiveTask;
/**
* @author cw
* @date 2019/8/29
*/
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private int end;
public static final long THRESHOLD = 10L;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers,0,numbers.length);
}
public ForkJoinSumCalculator(long numbers[], int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
//负责求和
@Override
protected Long compute() {
int length =end-start;
if(length <=THRESHOLD){
return computeSequentaily();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult+rightResult;
}
//计算求和
private Long computeSequentaily() {
long sum = 0;
for (int i = 0; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
//java8调用示例
public static long forkJoinSum(long n){
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinSumCalculator task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
2使用分支/合并的最佳做法
- 对于一个任务调用join方法会阻塞调用方,直到该任务做出结果
- 不应该在ResucursiveTask内部使用ForkJoinPool的invoke方法。应该始终调用compute或者fork方法,只有顺序代码才应该用invoke来启动计算。
- 不应该认为并行流以及分支/合框架在多个处理器上使用比顺序计算快。
3.工作窃取技术
- 问题:理想情况下,划分并行任务时,应该让每个任务用完全相同的时间完成,让所有的CPU内核同样繁忙。但是实际中,每个子任务所花的时间并不相同,
- 分支/合并框架用一种称为工作窃取的技术来解决问题。
- 工作原理:每个线程都为分配给他的任务保存一个双向队列,每完成一个任务,就从队列头上取出下一个任务开始执行。有点可能早早完成,但是并没有闲下来。而是随机选取一个别的线程,从队列的尾部拿取一个任务,直到所有的线程都结。
- 一般来说,这种工作窃取算法用于池中的工作线程之间的重新分配和平衡任务。