ExecutorService的invokeAll方法无法抛出异常
最近在线上发现一个多线程查询有问题,始终查询不到数据,但是查看线上日志又未发现报错日志,经过排查发现是由于invokeAll方法将抛出的异常进行二次转换成ExecutionException,但是又没有对该异常进行处理导致
invokeAll方法介绍(不想看的可以直接跳过)
invokeAll方法接收一个Callable集合,并返回一个Future对象的列表,它可以安排多个任务在线程池中执行,然后等待这些任务都执行完成。invokeAll方法会阻塞当前线程,直到所有任务都完成,或者超时(如果提供),或者线程被中断,此时它将抛出InterruptedException异常。它允许任务之间存在依赖关系,即一个任务可以依赖于其他任务的结果。它还允许客户端获取每个任务的执行结果,这是通过返回的Future对象列表来实现的
验证
public class InvokeAllMethodTest {
public static void main(String[] args) {
// 自定义线程池
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new AbortPolicy());
// 创建要执行的任务
List<Callable<Integer>> futures = Lists.newArrayList();
futures.add(() -> {
// 执行的业务逻辑出现异常
if (true) {
throw new RuntimeException("发生业务逻辑异常了");
}
return 1;
});
try {
threadPoolExecutor.invokeAll(futures);
System.out.println("=============运行结束===============");
} catch (InterruptedException e) {
System.out.println("=========发生中断异常了============");
throw new RuntimeException(e);
}catch (Exception e){
System.out.println("=========发生其他异常了============");
throw new RuntimeException(e);
}
}
}
运行结果截图
可以看出代码中已经明确的抛出了指定异常,但是在调用invokeAll方法后却并未将异常抛出
解释
进入AbstractExecutorService类的invokeAll方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 创建一个任务集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
// 任务是否完成的标识
boolean done = false;
try {
for (Callable<T> t : tasks) {
// 把Callable线程封装成了FutureTask
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 执行任务
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
/* 获取线程执行的结果,
这里也能保证所有的线程都执行完才会返回
这里的get方法也就是为什么异常被吞的真正的罪魁祸首,将我们抛出的异常转成了ExecutionException 异常
前面讲到f已经是FutureTask类型了,所以直接找到他的get方法
*/
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
下面通过debug查看源码的执行过程:
解决方案
如果想要批量执行任务,这里推荐使用CompletableFuture ,这里不做详细介绍,具体可以参考博客:CompletableFuture用法详解
改造后的代码:
public class InvokeAllMethodTest {
public static void main(String[] args) {
// 自定义线程池
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new AbortPolicy());
// 创建要执行的任务
List<CompletableFuture<Integer>> tasks = Lists.newArrayList();
tasks.add(CompletableFuture.supplyAsync(()->{
System.out.println("===========任务1正常执行===========");
return 1;
}));
tasks.add(CompletableFuture.supplyAsync(()->{
// 执行的业务逻辑出现异常
if (true) {
throw new RuntimeException("任务2发生业务逻辑异常了");
}
return 1;
}));
// 执行,由于allOf方法的入参是可变参数,因此将参数通过toArray转成数组,通过join执行
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
}
}
执行结果: