因为在很多并发代码里看到下面这种形式的代码,对join方法理解不是很清晰,所以在讲解线程池之前先对join方法进行一个简单的记录:
p1.start();
p2.start();
p1.join();
p2.join();
先看一下join方法的源码注释
因此,可以这样理解join的作用,在主线程中,线程p1调用了自身的join()方法,通知主线程只有等到p1执行完毕的时候主线程才能继续执行(waits for this thread to die)。
言归正传,进入今天的主题--线程池,线程池,首先分析上层接口 Executor的java源码:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
- 接口里只有一个方法,execute, 接收一个Runnable任务;
- 相比于之前的new Thread(Runnable r).start()方式,隐藏了任务是如何被执行,线程调用和使用的细节,当前任务可能被calling thread执行,也可能在一个新线程内执行,取决于Executor的具体实现。
ExecutorService接口继承接口Executor,并且扩充了更多的方法,包括提交一个任务以后,立刻返回一个Future对象。
<T> Future<T> submit(Callable<T> task);
Future有一个get方法,等待线程计算完毕时,返回计算结果。
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
前两个Executor和ExecutorService都是接口,下面的Executors类和ThreadPoolExecutor类是创建线程工厂的类,Executors类提供了创建线程工厂的各种方法,ThreadPoolExecutor类提供了自定义线程池实现方法。
首先介绍Executors类中提供的各种定义好的线程池的创建方法:
- newFixedThreadPool:创建一个固定线程数量的线程池,线程可以复用,阻塞队列长度为无限大,可能造成内存溢出问题。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor():一个线程工作的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool():根据需要创建线程,可以复用之前可用的线程,一个线程在超过60s无任务可执行就会死亡。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到,Executors类为我们提供了各种创建线程池的方法,面向于不同的应用场景,但是这些线程池创建中的任务队列是无限大的,在任务过多时会造成内存溢出问题,而且可能这些线程池都不能满足我们的需求,这时,我们就可以采用ThreadPoolExecutor类中的方法,通过自己操控五个参数,创建满足自己需求的线程池。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize: 线程池中一直维持的最小线程数,即使他们是空闲的;
- maximumPoolSize:线程池中可以存在的最大线程数;
- keepAliveTime:当线程池中线程数大于核心线程数时,一个空闲的线程等待新的任务超过keepAliveTime时间,被杀死;
- unit:时间单位;
- workQueue:保存任务;
- threadFactory:线程工厂,将一个Runnable任务转换为一个线程返回;
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
- RejectedExecutionHandler handler: 任务过多的时候的拒绝策略:
AbortPolicy:拒绝并抛出一个异常
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler
CallerRunsPolicy: 让调用者执行
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler
DiscardOldestPolicy:丢弃一个最老的
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler
DiscardPolicy:丢弃一个
学习了这么多理论知识,不如实际操作,下面给出了如何用Executors的方法实现wordCount:
package com.github.hcsp.multithread;
import sun.awt.windows.WToolkit;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
public class MultiThreadWordCount1 {
//使用threadNum个线程,并发统计文件中各单词的数量
//使用定义好的线程池
public static Map<String, Integer> count(int threadNum, List<File> files) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(threadNum);
List<Future<Map<String, Integer>>> futures = new ArrayList<>();
for (File file : files) {
Future<Map<String, Integer>> future = threadPool.submit(new WordCount(file));
futures.add(future);
}
HashMap<String, Integer> res = new HashMap<>();
for (Future<Map<String, Integer>> future : futures) {
HashMap<String, Integer> tmp = (HashMap<String, Integer>) future.get();
for (String s : tmp.keySet()) {
res.put(s, res.getOrDefault(s, 0) + tmp.get(s));
}
}
return res;
}
static class WordCount implements Callable<Map<String, Integer>> {
File file;
WordCount(File file) {
this.file = file;
}
@Override
public Map<String, Integer> call() throws Exception {
HashMap<String, Integer> res = new HashMap<>();
List<String> strings = Files.readAllLines(file.toPath());
for (String str : strings) {
for (String st : str.split("\\s+")) {
res.put(st, res.getOrDefault(st, 0) + 1);
}
}
return res;
}
}
}