博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java中CompletableFuture使用总结与示例
阅读量:3898 次
发布时间:2019-05-23

本文共 29452 字,大约阅读时间需要 98 分钟。

目录


参考文档:

———————————————————————————

一、Java中的异步计算

  1. 异步计算很难推理。通常我们希望将任何计算视为一系列步骤。但是在异步计算的情况下,表示为回调的动作往往分散在代码中或者深深地嵌套在彼此内部。当我们需要处理其中一个步骤中可能发生的错误时,情况变得更糟。

  2. Future接口是Java 5中添加作为异步计算的结果,但它没有任何方法,这些计算组合或处理可能出现的错误。

  3. 在Java 8中,引入了CompletableFuture类。与Future接口一起,它还实现了CompletionStage接口。此接口定义了可与其他步骤组合的异步计算步骤的契约。

  4. CompletableFuture同时是一个构建块和一个框架,具有大约50种不同的组合,兼容,执行异步计算步骤和处理错误的方法。

1.1 使用CompletableFuture作为简单的Future

​ 首先,CompletableFuture类实现Future接口,因此您可以将其用作Future实现,但具有额外的完成逻辑。

​ 例如,您可以使用no-arg构造函数创建此类的实例,以表示Future的某些结果,将其交给使用者,并在将来的某个时间使用complete方法完成。消费者可以使用get方法来阻止当前线程,直到提供此结果。

​ 在下面的示例中,我们有一个创建CompletableFuture实例的方法,然后在另一个线程中旋转一些计算并立即返回Future

​ 计算完成后,该方法通过将结果提供给完整方法来完成Future:

public Future
calculateAsync() throws InterruptedException {
CompletableFuture
completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture;}

​ 为了分离计算,我们使用了“Java中的线程池简介”一文中描述的Executor API ,但是这种创建和完成CompletableFuture的方法可以与任何并发机制或API(包括原始线程)一起使用。

请注意,该calculateAsync方法返回一个未来的实例。

​ 我们只是调用方法,接收Future实例并在我们准备阻塞结果时调用它的get方法。

另请注意,get方法抛出一些已检查的异常,即ExecutionException(封装计算期间发生的异常)和InterruptedException(表示执行方法的线程被中断的异常):

Future
completableFuture = calculateAsync();// ... String result = completableFuture.get();assertEquals("Hello", result);

​ 如果您已经知道计算的结果,则可以将static completedFuture方法与表示此计算结果的参数一起使用。然后,Future的get方法永远不会阻塞,而是立即返回此结果。

Future
completableFuture = CompletableFuture.completedFuture("Hello");// ...String result = completableFuture.get();assertEquals("Hello", result);

作为替代方案,您可能希望取消Future的执行。

假设我们没有设法找到结果并决定完全取消异步执行。这可以通过Future的取消方法完成。此方法接收布尔参数mayInterruptIfRunning,但在CompletableFuture的情况下,它没有任何效果,因为中断不用于控制CompletableFuture的处理。

这是异步方法的修改版本:

public Future
calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture
completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500); completableFuture.cancel(false); return null; }); return completableFuture;}

当我们使用Future.get()方法阻塞结果时,如果取消将来取消,它将抛出CancellationException:

Future
future = calculateAsyncWithCancellation();future.get(); // CancellationException

二、CompletableFuture 使用详解

2.1 runAsync 和 supplyAsync方法

CompletableFuture 提供了四个静态方法来创建一个异步操作

public static CompletableFuture
runAsync(Runnable runnable)public static CompletableFuture
runAsync(Runnable runnable, Executor executor)public static
CompletableFuture supplyAsync(Supplier supplier)public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

示例

//无返回值public static void runAsync() throws Exception {
CompletableFuture
future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {
} System.out.println("run end ..."); }); future.get();}//有返回值public static void supplyAsync() throws Exception {
CompletableFuture
future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {
} System.out.println("run end ..."); return System.currentTimeMillis(); }); long time = future.get(); System.out.println("time = "+time);}

2.2 计算结果完成时的回调方法

CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture
whenComplete(BiConsumer
action)public CompletableFuture
whenCompleteAsync(BiConsumer
action)public CompletableFuture
whenCompleteAsync(BiConsumer
action, Executor executor)public CompletableFuture
exceptionally(Function
fn)

可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。

whenComplete 和 whenCompleteAsync 的区别:

whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

示例

public static void whenComplete() throws Exception {
CompletableFuture
future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) {
} if(new Random().nextInt()%2>=0) {
int i = 12/0; } System.out.println("run end ..."); }); future.whenComplete(new BiConsumer
() {
@Override public void accept(Void t, Throwable action) {
System.out.println("执行完成!"); } }); future.exceptionally(new Function
() {
@Override public Void apply(Throwable t) {
System.out.println("执行失败!"+t.getMessage()); return null; } }); TimeUnit.SECONDS.sleep(2);}

2.3 thenApply 方法

当一个线程依赖另一个线程时,可以使用thenApply方法来把这两个线程串行化。

public  CompletableFuture thenApply(Function
fn)public CompletableFuture thenApplyAsync(Function
fn)public CompletableFuture thenApplyAsync(Function
fn, Executor executor)

Function<? super T,? extends U>

T:上一个任务返回结果的类型
U:当前任务的返回值类型

实例1

处理计算结果的最通用方法是将其提供给函数。该thenApply方法正是这么做的:接受一个函数实例,用它来处理结果,并返回一个未来的保存函数的返回值:

CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture
future = completableFuture.thenApply(s -> s + " World");assertEquals("Hello World", future.get());

示例2

private static void thenApply() throws Exception {
CompletableFuture
future = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Long get() {
long result = new Random().nextInt(100); System.out.println("result1="+result); return result; } }).thenApply(new Function
() {
@Override public Long apply(Long t) {
long result = t*5; System.out.println("result2="+result); return result; } }); long result = future.get(); System.out.println(result);}

第二个任务依赖第一个任务的结果。

2.4 handle 方法

handle 是执行任务完成时对结果的处理。

handle方法和thenApply方法处理方式基本一样。不同的是 handle是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

public  CompletionStage handle(BiFunction
fn);public CompletionStage handleAsync(BiFunction
fn);public CompletionStage handleAsync(BiFunction
fn,Executor executor);

示例

public static void handle() throws Exception{
CompletableFuture
future = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int i= 10/0; return new Random().nextInt(10); } }).handle(new BiFunction
() {
@Override public Integer apply(Integer param, Throwable throwable) {
int result = -1; if(throwable==null){
result = param * 2; }else{
System.out.println(throwable.getMessage()); } return result; } }); System.out.println(future.get());}

从示例中可以看出,在 handle 中可以根据任务是否有异常来进行做相应的后续处理操作。而 thenApply 方法,如果上个任务出现错误,则不会执行 thenApply 方法。

2.5 thenAccept 消费处理结果

接收任务的处理结果,并消费处理,无返回结果。

public CompletionStage
thenAccept(Consumer
action);public CompletionStage
thenAcceptAsync(Consumer
action);public CompletionStage
thenAcceptAsync(Consumer
action,Executor executor);

示例1

如果您不需要在Future链中返回值,则可以使用Consumer功能接口的实例。它的单个方法接受一个参数并返回void。

在CompletableFuture中有一个用于此用例的方法- thenAccept方法接收Consumer并将计算结果传递给它。最后的future.get()调用返回Void类型的实例。

CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture
future = completableFuture.thenAccept(s -> System.out.println("Computation returned: " + s));future.get();

最后,如果您既不需要计算的值也不想在链的末尾返回一些值,那么您可以将Runnable lambda 传递给thenRun方法。在下面的示例中,在调用future.get()方法之后,我们只需在控制台中打印一行:

CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture
future = completableFuture.thenRun(() -> System.out.println("Computation finished."));future.get();

示例2

public static void thenAccept() throws Exception{
CompletableFuture
future = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
return new Random().nextInt(10); } }).thenAccept(integer -> {
System.out.println(integer); }); future.get();}

从示例代码中可以看出,该方法只是消费执行完成的任务,并可以根据上面的任务返回的结果进行处理。并没有后续的输错操作。

2.6 thenRun 方法

跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenAccept 。

public CompletionStage
thenRun(Runnable action);public CompletionStage
thenRunAsync(Runnable action);public CompletionStage
thenRunAsync(Runnable action,Executor executor);

示例1

如果您既不需要计算的值也不想在链的末尾返回一些值,那么您可以将Runnable lambda 传递给thenRun方法。在下面的示例中,在调用future.get()方法之后,我们只需在控制台中打印一行:

CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture
future = completableFuture.thenRun(() -> System.out.println("Computation finished."));future.get();

作者:淡定_蜗牛

链接:https://www.jianshu.com/p/2086154ae5cb
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

示例

public static void thenRun() throws Exception{
CompletableFuture
future = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
return new Random().nextInt(10); } }).thenRun(() -> {
System.out.println("thenRun ..."); }); future.get();}

该方法同 thenAccept 方法类似。不同的是上个任务处理完成后,并不会把计算的结果传给 thenRun 方法。只是处理玩任务后,执行 thenAccept 的后续操作。

2.7 thenCombine 合并任务

thenCombine 会把 两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。

public 
CompletionStage
thenCombine(CompletionStage
other,BiFunction
fn);public
CompletionStage
thenCombineAsync(CompletionStage
other,BiFunction
fn);public
CompletionStage
thenCombineAsync(CompletionStage
other,BiFunction
fn,Executor executor);

示例

private static void thenCombine() throws Exception {
CompletableFuture
future1 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public String get() {
return "hello"; } }); CompletableFuture
future2 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public String get() {
return "hello"; } }); CompletableFuture
result = future1.thenCombine(future2, new BiFunction
() { @Override public String apply(String t, String u) { return t+" "+u; } }); System.out.println(result.get());}

2.8 thenAcceptBoth

当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗。

public  CompletionStage
thenAcceptBoth(CompletionStage
other,BiConsumer
action);public
CompletionStage
thenAcceptBothAsync(CompletionStage
other,BiConsumer
action);public
CompletionStage
thenAcceptBothAsync(CompletionStage
other,BiConsumer
action, Executor executor);

示例

private static void thenAcceptBoth() throws Exception {
CompletableFuture
f1 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture
f2 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f2="+t); return t; } }); f1.thenAcceptBoth(f2, new BiConsumer
() {
@Override public void accept(Integer t, Integer u) {
System.out.println("f1="+t+";f2="+u+";"); } });}

2.9 applyToEither 方法

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。

public  CompletionStage applyToEither(CompletionStage
other,Function
fn);public CompletionStage applyToEitherAsync(CompletionStage
other,Function
fn);public CompletionStage applyToEitherAsync(CompletionStage
other,Function
fn,Executor executor);

示例

private static void applyToEither() throws Exception {
CompletableFuture
f1 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture
f2 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f2="+t); return t; } }); CompletableFuture
result = f1.applyToEither(f2, new Function
() { @Override public Integer apply(Integer t) { System.out.println(t); return t * 2; } }); System.out.println(result.get());}

2.10 acceptEither 方法

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。

public CompletionStage
acceptEither(CompletionStage
other,Consumer
action);public CompletionStage
acceptEitherAsync(CompletionStage
other,Consumer
action);public CompletionStage
acceptEitherAsync(CompletionStage
other,Consumer
action,Executor executor);

示例

private static void acceptEither() throws Exception {
CompletableFuture
f1 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture
f2 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f2="+t); return t; } }); f1.acceptEither(f2, new Consumer
() {
@Override public void accept(Integer t) {
System.out.println(t); } });}

2.11 runAfterEither 方法

两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)

public CompletionStage
runAfterEither(CompletionStage
other,Runnable action);public CompletionStage
runAfterEitherAsync(CompletionStage
other,Runnable action);public CompletionStage
runAfterEitherAsync(CompletionStage
other,Runnable action,Executor executor);

示例

private static void runAfterEither() throws Exception {
CompletableFuture
f1 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture
f2 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f2="+t); return t; } }); f1.runAfterEither(f2, new Runnable() {
@Override public void run() {
System.out.println("上面有一个已经完成了。"); } });}

2.12 runAfterBoth

两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)

public CompletionStage
runAfterBoth(CompletionStage
other,Runnable action);public CompletionStage
runAfterBothAsync(CompletionStage
other,Runnable action);public CompletionStage
runAfterBothAsync(CompletionStage
other,Runnable action,Executor executor);

示例

private static void runAfterBoth() throws Exception {
CompletableFuture
f1 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f1="+t); return t; } }); CompletableFuture
f2 = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); try {
TimeUnit.SECONDS.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println("f2="+t); return t; } }); f1.runAfterBoth(f2, new Runnable() {
@Override public void run() {
System.out.println("上面两个任务都执行完成了。"); } });}

2.13 thenCompose 方法

thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

public  CompletableFuture thenCompose(Function
> fn);public CompletableFuture thenComposeAsync(Function
> fn) ;public CompletableFuture thenComposeAsync(Function
> fn, Executor executor) ;

该thenCompose()方法类似于thenApply()在都返回一个新的完成阶段。但是,thenCompose()使用前一个阶段作为参数。它会直接使结果变平并返回Future,而不是我们在thenApply()中观察到的嵌套未来:

CompletableFuture
computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);}CompletableFuture
finalResult = compute().thenCompose(this::computeAnother);

因此,如果想要链接CompletableFuture 方法,那么最好使用thenCompose()。

另请注意,这两种方法之间的差异类似于map()和flatMap()之间的差异。

示例

private static void thenCompose() throws Exception {
CompletableFuture
f = CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = new Random().nextInt(3); System.out.println("t1="+t); return t; } }).thenCompose(new Function
>() {
@Override public CompletionStage
apply(Integer param) {
return CompletableFuture.supplyAsync(new Supplier
() {
@Override public Integer get() {
int t = param *2; System.out.println("t2="+t); return t; } }); } }); System.out.println("thenCompose result : "+f.get());}

2.14 thenApply与thenComposse的区别

CompletableFuture API 的最佳部分是能够在一系列计算步骤中组合CompletableFuture实例。

这种链接的结果本身就是CompletableFuture,允许进一步链接和组合。这种方法在函数式语言中无处不在,通常被称为monadic设计模式。

在下面的示例中,我们使用thenCompose方法按顺序链接两个Futures。

请注意,此方法采用返回CompletableFuture实例的函数。该函数的参数是先前计算步骤的结果。这允许我们在下一个CompletableFuture的lambda中使用这个值:

thenCompose

CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> "Hello").thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));assertEquals("Hello World", completableFuture.get());

该thenCompose方法连同thenApply实现一元图案的基本构建块。它们与Java 8中可用的Stream和Optional类的map和flatMap方法密切相关。

两个方法都接收一个函数并将其应用于计算结果,但thenCompose(flatMap)方法接收一个函数,该函数返回相同类型的另一个对象。此功能结构允许将这些类的实例组合为构建块。

如果要执行两个独立的Futures并对其结果执行某些操作,请使用接受Future的thenCombine方法和具有两个参数的Function来处理两个结果:

thenCombine

CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> "Hello").thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2));assertEquals("Hello World", completableFuture.get());

更简单的情况是,当您想要使用两个期货结果时,但不需要将任何结果值传递给Future链。该thenAcceptBoth方法是有帮助:

thenAcceptBoth

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello").thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),(s1, s2) -> System.out.println(s1 + s2));

thenApply与thenCompose之间的区别

在前面的部分中,我们展示了关于thenApply()和thenCompose()的示例。这两个API都有助于链接不同的CompletableFuture调用,但这两个函数的使用是不同的。

2.15 completeExceptionally处理错误

对于异步计算步骤链中的错误处理,必须以类似的方式调整throw / catch惯用法。

CompletableFuture类允许您在特殊的句柄方法中处理它,而不是在语法块中捕获异常。此方法接收两个参数:计算结果(如果成功完成)和抛出异常(如果某些计算步骤未正常完成)。

在下面的示例中,我们使用handle方法在问候语的异步计算完成时提供默认值,因为没有提供名称:

String name = null;// ...CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!");assertEquals("Hello, Stranger!", completableFuture.get());

作为替代方案,假设我们想要使用值手动完成Future,如第一个示例中所示,但也可以使用异常来完成它。该completeExceptionally方法旨在用于这一点。以下示例中的completableFuture.get()方法抛出ExecutionException,并将RuntimeException作为其原因:

CompletableFuture
completableFuture = new CompletableFuture<>();// ...completableFuture.completeExceptionally( new RuntimeException("Calculation failed!"));// ...completableFuture.get(); // ExecutionException

在上面的示例中,我们可以使用handle方法异步处理异常,但是使用get方法,我们可以使用更典型的同步异常处理方法。

2.16 并行运行多个Future

当我们需要并行执行多个Futures时,我们通常希望等待所有它们执行,然后处理它们的组合结果。

CompletableFuture.allOf静态方法允许等待所有的完成结果作为一个新的对象提供:

CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> "Beautiful");CompletableFuture
future3 = CompletableFuture.supplyAsync(() -> "World");CompletableFuture
combinedFuture = CompletableFuture.allOf(future1, future2, future3);// ...combinedFuture.get();assertTrue(future1.isDone());assertTrue(future2.isDone());assertTrue(future3.isDone());

请注意,CompletableFuture.allOf()的返回类型是CompletableFuture <Void>。这种方法的局限性在于它不会返回所有期货的综合结果。相反,您必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API使它变得简单:

String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));assertEquals("Hello Beautiful World", combined);

CompletableFuture.join()方法类似于GET方法,但它抛出一个未经检查的异常的情况下,在未来没有正常完成。这使得它可以在Stream.map()方法中用作方法引用。


三、CompletableFuture API

模块

软件包

3.1 方法概览

3.1.1 主动完成计算

public boolean complete(T value)

立即完成计算,并把结果设置为传的值,返回是否设置成功

如果 CompletableFuture 没有关联任何的Callback、异步任务等,如果调用get方法,那会一直阻塞下去,可以使用complete方法主动完成计算。

public T get()

该方法时阻塞方法,会等待计算结果完成

public T get(long timeout, TimeUnit unit)

有时间限制的阻塞方法

public T getNow(T valueIfAbsent)

立即获取方法结果,如果没有计算结束则返回传的值

public T join()

和 get() 方法类似也是主动阻塞线程,等待计算结果。和get() 方法有细微的差别

举例如下:

public static void completableFutureGet() throws  Exception{
CompletableFuture
completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000); } catch (InterruptedException e) {
e.printStackTrace(); } return 100 * 10; }); System.out.println(completableFuture.get()); System.out.println(completableFuture.get(1000, TimeUnit.MICROSECONDS)); System.out.println(completableFuture.join()); System.out.println(completableFuture.getNow(1)); }

3.1.2 创建异步任务

public static CompletableFuture completedFuture(U value)

创建一个有初始值的CompletableFuture

public static CompletableFuture runAsync(Runnable runnable)public static CompletableFuture runAsync(Runnable runnable, Executor executor)public static CompletableFuture supplyAsync(Supplier supplier)public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

以上四个方法中,以 Async 结尾并且没有 Executor 参数的,会默认使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。 以run开头的,因为以 Runable 类型为参数所以没有返回值。

示例:

public static void completableFutureSupplyAsync()throws Exception {
CompletableFuture
future1 = CompletableFuture.runAsync(() -> System.out.println("runAsync")); CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> "supplyAsync"); System.out.println(future1.get()); System.out.println(future2.get());}

3.1.3 计算完成时对结果的处理 whenComplete/exceptionally/handle

CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:

  • public CompletableFuture whenComplete(BiConsumer
    action)public CompletableFuture whenCompleteAsync(BiConsumer
    action)public CompletableFuture whenCompleteAsync(BiConsumer
    action, Executor executor)

    参数类型为BiConsumer<? super T, ? super Throwable>会获取上一步计算的计算结果和异常信息。以Async结尾的方法可能会使用其它的线程去执行,如果使用相同的线程池,也可能会被同一个线程选中执行,以下皆相同。

    public static void completableFutureWhenComplete()throws Exception {
    CompletableFuture
    future = CompletableFuture.supplyAsync(() -> {
    try {
    Thread.sleep(2000); } catch (InterruptedException e) {
    e.printStackTrace(); } return 20; }).whenCompleteAsync((v, e) -> {
    System.out.println(v); System.out.println(e); }); System.out.println(future.get()); }
  • public CompletableFuture exceptionally(Function
    fn)

    该方法是对异常情况的处理,当函数异常时应该的返回值。

    public static void completableFutureExceptionally()throws Exception {        CompletableFuture
    future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return 10 / 0; }).whenCompleteAsync((v, e) -> { System.out.println(v); System.out.println(e); }).exceptionally((e) -> { System.out.println(e.getMessage()); return 30; }); System.out.println(future.get()); }
  • public CompletableFuture handle(BiFunction
    fn)public CompletableFuture handleAsync(BiFunction
    fn)public CompletableFuture handleAsync(BiFunction
    fn, Executor executor)

    handle 方法和whenComplete方法类似,只不过接收的是一个 BiFunction<? super T,Throwable,? extends U> fn 类型的参数,因此有 whenComplete 方法和 转换的功能 (thenApply)

    public static void completableFutureHand()throws Exception {   CompletableFuture
    future = CompletableFuture .supplyAsync(() -> 10 / 0) .handle((t, e) -> { System.out.println(e.getMessage()); return 10; }); System.out.println(future.get());}

3.1.4 结果转换

CompletableFuture 由于有回调,可以不必等待一个计算完成而阻塞着调用线程,可以在一个结果计算完成之后紧接着执行某个Action。我们可以将这些操作串联起来。

public CompletableFuture thenApply(Function
fn)public CompletableFuture thenApplyAsync(Function
fn)public CompletableFuture thenApplyAsync(Function
fn, Executor executor)

示例:

public CompletableFuture thenApply(Function
fn)public CompletableFuture thenApplyAsync(Function
fn)public CompletableFuture thenApplyAsync(Function
fn, Executor executor) public static void completableFutureThenApply()throws Exception {
CompletableFuture
future = CompletableFuture .supplyAsync(() -> 1) .thenApply((a) -> {
System.out.println(a);//1 return a * 10; }).thenApply((a) -> {
System.out.println(a);//10 return a + 10; }).thenApply((a) -> {
System.out.println(a);//20 return a - 5; }); System.out.println(future.get());//15 }}

这些方法不是马上执行的,也不会阻塞,而是前一个执行完成后继续执行下一个。

和 handle 方法的区别是,handle 会处理正常计算值和异常,不会抛出异常。而 thenApply 只会处理正常计算值,有异常则抛出。

从上面API可以看出,我们的并发编程在jdk1.8变的更加简单了。以上是部分CompletableFuture API用法。

3.2 API 文档:

点击查看:

你可能感兴趣的文章
linux定时任务的设置
查看>>
MySQL 5.7 完全傻瓜安装教程 图文
查看>>
Hibernate框架概述&SSH框架工作原理以及流程
查看>>
Aapche POI txt 导入excel
查看>>
C语言 ## __VA_ARGS__ 宏
查看>>
C++项目中的extern "C" {}
查看>>
(转)C++中extern “C”含义深层探索
查看>>
【日常小记】linux中强大且常用命令:find、grep
查看>>
Linux多线程编程(不限Linux)
查看>>
C/C++内存泄漏及检测
查看>>
C中的继承和多态
查看>>
linux修改ssh端口和禁止root远程登陆设置
查看>>
What really happens when you navigate to a URL
查看>>
偶遇with ties
查看>>
linux 编译指定库、头文件的路径问题
查看>>
使用gdb调试运行时的程序小技巧
查看>>
linux后端服务程序之信号处理
查看>>
Padding也要小心
查看>>
linux异步IO编程实例分析
查看>>
小组开发环境搭建: apache+ftp+cvs+samba
查看>>