这篇文章介绍了CompletableFuture 类的功能和一些使用实例。在我们介绍开始之前,先来了解一下这个类的背景。在JAVA中,一个异步任务的调用可以使用Threads。然而,为了获得最佳性能,需要仔细规划业务流程中的各个步骤的编排,这对于不了解JAVA整个并发体系的人来说,非常容易出错。如果JAVA提供了一个即用的容器来连接一系列任务,并且能为任务的运行提供并发性但是却不用编写复杂的多线程代码呢?CompletableFuture就是这样一个别致的小东西。
创建CompletableFuture对象。
我们可以直接new一个对象出来,也可以使用CompletableFuture为我们提供的静态方法。
注意:这种方法直接new出来的CompletableFuture对象是无法运行的,因为他并没有处于一个“完成”状态,也就是说你调用get()方法是会被阻塞的。
CompletableFuture futrue = new CompletableFuture();
推荐下面这种方法,使用CompletableFuture的静态方法completedFuture(U value)。直接会拿到一个“完成”状态的对象,当用get()方法拿值时,你拿到的值也就是value。
String expectedValue = "the expected value";
CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue);
assertThat(alreadyCompleted.get(), is(expectedValue));
开始运行我们的第一个Task
- runAsync(Runnable)
- runAsync(Runnable,Executor)
- supplyAsync(Supplier<U>)
- supplyAsync(Supplier<U>,Executor)
这里有2种方法来初始化我们的异步任务——使用runAsync()或者supplyAsnync()。先上一段代码:
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
System.out.printf("[%s] I am Cool\n", Thread.currentThread().getName());
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.printf("[%s] Am Awesome\n", Thread.currentThread().getName());
return null;
});
打印结果:
[ForkJoinPool.commonPool-worker-3] I am Cool
[ForkJoinPool.commonPool-worker-3] Am Awesome
从上面的例子中可以看出,有2种初始化CompletableFuture对象并运行我们的异步任务的方法。使用runAsync()和supplyAsync()。可以很容易的看出来两者之间的差别,supplyAsync()有返回值,这个返回值可以用于被下一个任务链结点所消费,后面我们会讲到。除此之外,上述方法还提供了重载方法,当我们传入Executor时,该Task会使用传入的Executor去执行,否则默认去执行任务的线程池就是fork-join thread pool,关于该线程池,暂不赘述。
PS:我个人觉得supplyAsync方法中传入Callable比传入Supplier更合适。它们俩都是函数式接口,但是Callable和异步任务的联系更紧,并且可以抛出非运行时异常。
构造任务链
上述方法中我们只是异步的去执行了一个任务,如果我们想拿到这个任务的执行结果,并执行后面的任务呢?或者当该任务运行抛出异常时我们想来处理这些异常时呢(后面讲)?
CompletableFuture为我们提供了几十种方法来构造任务链,这些任务链的构造过程类似于Stream.map()方法,下面将结合实例详细讲解。
-
thenAccept(Consumer<T>)
该方法是非静态方法,是用来消费上一个任务运行之后的结点的返回值的。可以将该方法中的Consumer参数视为上一个任务在完成时要调用的回调函数。
Tip:该任务有两种方式来执行。第一种是如果当前线程调用thenAccept方法时,上一个任务还没执行完成时(这并不影响任务链执行的顺序性,因为这些任务都被存放在一个容器中),这时候调用此任务的线程就是上一个执行上任务的线程(ForkJoinPool-Thread);第二种是如果当前线程调用该方法时,上一个任务执行完成,这时候调用到了thenAccept方法,那么此任务会被调用thenAccept方法的线程(Main)所调用
请看下面的例子:
while (true) {
{
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I am Cool").thenAccept(msg ->
System.out.printf("[%s] %s and am also Awesome\n", Thread.currentThread().getName(), msg));
try {
cf.get();
} catch (Exception ex) {
ex.printStackTrace(System.err);
}
}
}
可以看到有多个运行结果:
[ForkJoinPool.commonPool-worker-2] I am Cool and am also Awesome
[main] I am Cool and am also Awesome
-
thenApply(Function<T, U>)
该方法接收一个Function参数,T类型是上一个结点传入,需要被消费的值,U类型是生成的,会被输出的值。此Function会在上一个任务执行结束时被调用。调用该Function的线程使用规则请参考上述的Tip。以下是调用该线程的实例:
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Awesome")
/*这里的msg就是上一个异步任务的返回结果:I'm Awesome*/
.thenApply(msg -> String.format("%s and am Super COOL !!!", msg))
.thenAccept(msg -> System.out.printf("%s\n", msg));
输出结果:
I'm Awesome and am Super COOL !!!
对比于thenApply,thenAccept更适用于作为任务链的结尾。
-
thenCombine(CompletionStage<U>, BiFunction<T, U, R>)
该方法能组合2个彼此独立的异步任务的输出。该方法接收2个参数,一个CompletionStage引用和一个BiFunction,类型T和类型U就是2个异步任务的输入,类型R为输出值。BiFunction会在上一个任务和传入第一个参数(CompletionStage)完成时被调用。下面给出实例代码:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Stunning", executor)
.thenCombineAsync(CompletableFuture.supplyAsync(() -> "am New !!!"),
(s1, s2) -> String.format("%s AND %s", s1, s2), executor)
.thenAcceptAsync(msg ->
System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg), executor);
输出结果:
[pool-1-thread-3] I'm Stunning AND am New !!!
注意这里用的是自定义线程池。当线程池在任务执行结束之前shutdown则会抛出rejectedExecution。
-
thenCompose(Function<T, CompletionStage<U>>)
该方法接收一个Function,也是用来消费上一个任务结点的,不过返回的是CompletionStage<U>,当这个CompletionStage<U>执行结束时,它会返回一个类型U的值。请看下面代码:
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Smart")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " & am NIMBLE !!!"))
.thenAccept(msg ->
System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg));
输出结果:
I'm Smart & am NIMBLE !!!
thenCompose VS thenApply
这两个方法都是接收一个参数并且返回的是CompletableFuture对象。它们的不同之处在于Function的返回值,thenCompose返回的是CompletableFuture,是一个你自己已经包装好的对象;而thenApply返回的是值,它底层会将这个值包装成对象返回给你。这就类似于Optianal中faltMap()和map()之间的区别。如果你想链接一个已经存在的返回CompletableFuture的方法,thenCompose是一个更好的选择,如下:
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
-
thenAcceptBoth(CompletionStage<U>, BiConsumer<T, U>)
该方法作用如其名,该方法的功能可以为2个不相关的并行执行的异步任务都执行完成时,提供回调。第一个传入的参数CompletionStage<U>就是其中一个需要执行的异步任务,BiConsumer<T, U>这个参数就是当2个异步任务都执行完成时执行的回调函数。类型T是上一个任务的返回值,U就是传入的任务完成时的返回值。下面请看代码:
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Fast");
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "am Nimble !!!");
CompletableFuture cf3 = cf1.thenAcceptBoth(cf2, (s1, s2) ->
System.out.printf("[%s] %s and %s\n", Thread.currentThread().getName(), s1, s2));
输出结果:
[main] I am Fast and am Nimble !!!
-
acceptEither(CompletionStage<T>, Consumer<T>)
该方法接收2个参数:一个CompletionStage的引用和一个Consumer函数式接口。该回调函数会在上一个任务执行完成或第一个传入的参数执行完成时被调用。返回值为CompletableFuture<Void>。下面请看代码:
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
randomDelay();
return "I am Awesome";
});
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
randomDelay();
return "I am Cool";
});
CompletableFuture cf3 = cf1.acceptEither(cf2, msg ->
System.out.printf("[%s] %s and am NIMBLE !!!\n", Thread.currentThread().getName(), msg));
可能的输出结果:
[ForkJoinPool.commonPool-worker-9] I am Awesome and am NIMBLE !!!
或者
[ForkJoinPool.commonPool-worker-2] I am Cool and am NIMBLE !!!
- applyToEither(CompletionStage<T>, Function<T, U>)该方法和上述方法类型,两个异步任务的其中一个执行完成时会调用Function,不过返回值为CompletableFuture<U>。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
randomDelay();
return "I am Awesome";
});
CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
randomDelay();
return "I am Bold";
});
CompletableFuture cf3 = cf1.applyToEither(cf2, msg -> String.format("%s and am Cool !!!", msg))
.thenAccept(msg -> System.out.printf("[%s] %s\n", Thread.currentThread().getName(), msg));
可能出现以下结果
[ForkJoinPool.commonPool-worker-9] I am Awesome and am Cool !!!
或者
[ForkJoinPool.commonPool-worker-2] I am Bold and am Cool !!!
值得注意的是:以上两种"Either"方法在执行完成时,另一个还没执行完的任务会继续执行。
还有2个静态方法和Either,Both类方法大同小异
- CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 会执行cfs中所有的异步任务。
- CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 返回cfs中第一个执行完成的任务,其他任务都会继续执行。此处便不再赘述。
下面来考一考大家,请看下面一道代码题,输出该任务链的最终值:
Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase());
CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick ");
CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox ");
CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2);
CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction);
//simulatedTask第一个参数为执行时间,第二个参数为返回值。
CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over"));
CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service);
CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into"));
CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service);
CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service);
CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service);
答案会放在文末。
至此,我们讲述了绝大大部分构造任务链的方法,这些方法能让我们不断地向后传递不同的返回值,并且保证了任务链的顺序性。
任务链完成时的回调和异常处理
在讲本节内容之前我们先来看2个方法。
- T get() throws InterruptedException, ExecutionException
-
T join()
这两个方法都是从CompletableFuture里面取值的,调用时会阻塞当前线程。可以很容易看出join没有抛出非运行时异常,不过它会抛出2个运行时异常:当这个任务被取消时会抛出CancellationException;当任务执行时抛出异常时,会抛出CompletionException。当我们需要从任务中拿值时,推荐join()方法。
我们知道上面两种方法在调用时都会阻塞当前线程,如果想继续向下运行,则必须中断或取消任务,但是如果我们想正常的结束任务,或者在拿值时如果值不存在,则抛出自定义的异常或返回默认值呢?CompletableFuture中为我们提供以下解决方法。
-
T getNow(T valueIfAbsent)
该方法不阻塞,如果任务尚未完成,则返回默认值。 -
boolean complete(T)
如果任务未完成,则在拿取返回值时返回T。如果此次调用将CompletableFuture转化为“完成”状态,则返回true,否则返回false。意思是如果通过get()或join()拿到的是类型T的值,则返回true,否则返回false。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
/*循环*/
infiniteLoop();
return "I am Awesome";
});
System.out.println("complete:"+cf1.complete("Default"));
System.out.println("isDone:"+cf1.isDone());
System.out.println("result:"+cf1.join());
输出结果:
complete:true
isDone:true
result:Default
-
boolean completeExceptionally(Throwable)
如果任务未完成,则在拿值时抛出Throwable异常,该异常可以自定义。如果此次调用将CompletableFuture转化为“完成”状态,则返回true,否则返回false。如果你想进一步传递某个异常,可以使用该方法。
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
/*循环*/
infiniteLoop();
return "I am Awesome";
});
System.out.println("complete:"+cf1.completeExceptionally(new RuntimeException("completeExceptionally")));
System.out.println("isDone:"+cf1.isDone());
System.out.println("isCompletedExceptionally:"+cf1.isCompletedExceptionally());
System.out.println("result:"+cf1.join());
输出结果:
complete:true
isDone:true
Exception in thread "main"
isCompletedExceptionally:true
java.util.concurrent.CompletionException: java.lang.RuntimeException: completeExceptionally
at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
at CompletableFutureSample.main(CompletableFutureSample.java:27)
Caused by: java.lang.RuntimeException: completeExceptionally
at CompletableFutureSample.main(CompletableFutureSample.java:24)
上述方法在构建我们不想等待太多时间的健壮系统时很有用。
obtrudeValue(T value)
obtrudeException(Throwable ex)
使用这两个方法可以强制地将值设置或者将异常抛出,无论该之前任务是否完成。这两个方法类似于complete和completeExceptionally,但是complete的功能是如果任务未完成才返回设定的值。在某些场景下,你有可能想放弃该任务的执行,发出一个失败的信号。请谨慎使用这两个方法,因为他们会覆盖前一个Future的值(或异常)。CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> )
该方法的作用是:当前面的任务出了异常时,就会返回T值;否则还是返回原先前面任务应该返回的值。请看下面的代码:
CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
return user.getName();
});
cf1.exceptionally(ex -> ex.getMessage())
/*如果抛出了异常,这里传递给下一个结点的值是ex.getMessage()*/
.thenAccept(System.out::println);
/*如果使用get或join拿里面的值的话,如果任务有异常,会抛出CompletionException异常的*/
System.out.println("isDone:" + cf1.isDone());
System.out.println("isCompletedExceptionally:" + cf1.isCompletedExceptionally());
输出结果:
/*user为空时的打印结果*/
java.lang.NullPointerException
isDone:true
isCompletedExceptionally:true
/*正常的打印结果*/
Jack
isDone:true
isCompletedExceptionally:false
-
handle(BiFunction<T, Throwable, U>)
这个方法有点类似于thenApply和exceptionally的结合,如果上一个任务出了异常,则传入的T类型为null,Throwable为抛出的异常;如果正常运行,则T类型为运行后的返回值,Throwable为null。
下面用handle来还原上个例子的代码的功能:
CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
return user.getName();
}).handle((v,ex)->{
if(v==null)
return "user is null";
return v;
}).thenAccept(System.out::println);
/*如果使用get或join拿里面的值的话,如果任务有异常,会抛出CompletionException异常的*/
System.out.println("isDone:" + cf1.isDone());
/*使用handle时,无论出不出异常,该值都为false。*/
System.out.println("isCompletedExceptionally:" + cf1.isCompletedExceptionally());
/*user为空时的打印结果*/
user is null!
isDone:true
isCompletedExceptionally:false
/*正常的打印结果*/
Jack
isDone:true
isCompletedExceptionally:false
-
whenComplete(BiConsumer<T, Throwable>)
该方法有点像thenAccept(Consumer<T>) ,只不过增加了一个异常处理的功能。传入的参数类似于handle,如果上一个任务出了异常,则传入的T类型为null,Throwable为抛出的异常;如果正常运行,则T类型为之前任务运行后的返回值,Throwable为null。下面上代码:
CompletableFuture<String> cf1 = CompletableFuture.complete(user).thenApply((user) -> {
return user.getName();
}).handle((v,ex)->{
if(v==null)
System.out.println("user is null");
else
System.out.println(v);
}).thenAccept(System.out::println);
输出结果和上例相同,为了简洁就不展示了。
Async Methods
CompletableFuture API为绝大部分的方法提供了2个额外的方法变体,它们后缀名都是“Async”。这些异步方法都会使用线程池来执行,如果传入的线程池为null,则还会使用默认的fork/join pool来执行任务,这可以更有效率地提高你任务的并发性。
总结
这篇文章我们总结了CompletableFuture API的功能,从创建对象,到构造任务链,最后再到异常的处理。
答案
THE QUICK BROWN FOX JUMPED OVER the lazy dog