流式编程基础
如下代码我们首先创建了一个 list,然后从 list 上获取流对象,并使用 foreach 进行遍历:
@Test
public void StreamTest() throws IOException {
// 1.创建list
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
arrayList.add(i + "");
}
arrayList.stream().forEach(e -> System.out.println(Thread.currentThread().getName() + " " + e));
}
运行上面代码,输出为:
main 0
main 1
main 2
main 3
main 4
...
上面打印元素使用的 main 线程顺序进行的,大家都知道我们可以把流转换为并行流,代码如下:
@Test
public void parallelStreamTest() throws IOException {
// 1.创建list
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
arrayList.add(i + "");
}
arrayList.parallelStream().forEach(e -> System.out.println(Thread.currentThread().getName() + " " + e));
}
运行上面代码输出如下:
ForkJoinPool.commonPool-worker-6 94
main 73
main 74
ForkJoinPool.commonPool-worker-6 95
ForkJoinPool.commonPool-worker-1 39
ForkJoinPool.commonPool-worker-3 69
ForkJoinPool.commonPool-worker-3 70
...
上面代码则是使用 ForkJoinPool 的 common 线程池与 main 线程并行输出的,另外我们知道貌似无法对流式的并行处理的线程池进行定制,其内部使用的是整个 JVM 内唯一的 common 线程池。 真的是这样吗?
猜执行结果
上面我们介绍了流式编程的并行流,下面请看下面代码输出时候,打印的线程名称是什么:
@Test
public void forkPoolTest() {
//代码示例1
ForkJoinPool pool = new ForkJoinPool(3);
// 1.创建list
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
arrayList.add(i + "");
}
pool.submit(() -> arrayList.parallelStream().forEach(e -> {
System.out.println(Thread.currentThread().getName() + " " + e);
})).join();
System.out.println("Main is over");
}
阅读上面代码,我们可以看到 main 线程向 forkjoin 线程池里面添加了一个任务,然后阻塞等待任务的完成,然后打印输出 Main is over。
那么这与不提交任务到线程池,而是直接执行,如下代码,看起来没啥区别:
@Test
public void parallelStreamTest() throws IOException {
// 1.创建list
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
arrayList.add(i + "");
}
arrayList.parallelStream().forEach(e -> System.out.println(Thread.currentThread().getName() + " " + e));
}
如上代码,我们也是在 main 线程等待打印任务执行完毕后,在输出 Main is over。
其实不然,在原来的代码示例 1 中,我们创建了一个名称为 pool 的线程池,然后向其中提交了一个任务。 pool 中提交任务后,会创建一个 ForkJoinWorkerThread 类型的线程,来执行我们提交的任务,也就是执行:
arrayList.stream().forEach(e -> System.out.println(Thread.currentThread().getName() + " " + e));
运行上面代码,按理说是 ForkJoinPool 中的 common 线程池线程并行,执行打印输出。但是运行后,你会发现打印任务的线程却是我们自己创建的 pool 中的线程,也就是我们使用自己创建的 pool 替代了并行流默认的 ForkJoinPool 中的 common 线程池。
究其原因是当我们调用并行流的 forEach 方法时候,会调用到 ForkJoinTask 的 fork 方法进行子任务切分:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
由于调用 forEach 的是我们自己创建的 pool 里面的线程(其是 ForkJoinWorkerThread 类型的),所以会把切分的任务添加到我们调用线程所在的队列里面,而不是添加到了 common 线程池里面。