CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过 回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类
为什么使用CompletableFuture,不使用Future?
Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。
虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
1,创建异步方法
CompletableFuture 提供了四个静态方法来创建一个异步操作。
static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
- runAsync方法不支持返回值。
- supplyAsync可以支持返回值。
示例:
public class ThreadTest2 {
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main..........start");
// CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// int i = 10 / 2;
// System.out.println("运行结果:" + i);
// }, service);
// CompletableFuture<Integer> future = C ompletableFuture.supplyAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// int i = 10 / 0;
// System.out.println("运行结果:" + i);
// return i;
// }, service).whenCompleteAsync((res,exc)->{
// System.out.println("异步任务完成了。。。。结果是:"+res+",异常是:"+exc);
// }).exceptionally(throwable -> {
// return 10;
// });
// CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getId());
// int i = 10 / 1;
// System.out.println("运行结果:" + i);
// return i;
// }, service).handle((res,thr)->{
// if(res != null){
// return res*2;
// }
// if(thr != null){
// return 0;
// }
// return 0;
// });
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 1;
System.out.println("运行结果:" + i);
return i;
}, service).thenApplyAsync(res->{
System.out.println("线程2启动了......."+res*10);
return res*2;
},service);
System.out.println("main................end..."+future.get());
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10/2;
System.out.println("运行结果:"+i);
}
}
}
2,计算完成时回调方法
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
示例:
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
@Override
public Object get() {
System.out.println(Thread.currentThread().getName() + "\t completableFuture");
int i = 10 / 0;
return 1024;
}
}).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable throwable) {
System.out.println("-------o=" + o.toString());
System.out.println("-------throwable=" + throwable);
}
}).exceptionally(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable throwable) {
System.out.println("throwable=" + throwable);
return 6666;
}
});
System.out.println(future.get());
}
}
3,handle 方法
handle 是执行任务完成时对结果的处理。
handle 是在任务完成后再执行,还可以处理异常的任务。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
示例:
public class ThreadTest2 {
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main..........start");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 1;
System.out.println("运行结果:" + i);
return i;
}, service).handle((res,thr)->{
if(res != null){
return res*2;
}
if(thr != null){
return 0;
}
return 0;
});
System.out.println("main................end..."+future.get());
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10/2;
System.out.println("运行结果:"+i);
}
}
}
4,线程串行化方法
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。
thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作
带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
示例:
public class ThreadTest2 {
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main..........start");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 1;
System.out.println("运行结果:" + i);
return i;
}, service).thenApplyAsync(res->{
System.out.println("线程2启动了......."+res*10);
return res*2;
},service);
System.out.println("main................end..."+future.get());
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10/2;
System.out.println("运行结果:"+i);
}
}
}
5,两任务组合 - 都要完成
两个任务必须都完成,触发该任务。
-
thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
-
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。
-
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
示例:
public class ThreadTest3 {
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main..........start");
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程1:" + Thread.currentThread().getId());
int i = 10 / 1;
System.out.println("线程1结束");
return i;
}, service);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程2:" + Thread.currentThread().getId());
int i = 10 / 1;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程2结束:");
return "hello";
}, service);
future01.runAfterBothAsync(future02,()->{
System.out.println("任务3开始...");
},service);
future01.thenAcceptBothAsync(future02,(f1,f2)->{
System.out.println("任务3开始...之前的结果:"+f1+"--->"+f2);
},service);
System.out.println("main................end...");
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10/2;
System.out.println("运行结果:"+i);
}
}
}
6,两任务组合 - 一个完成
当两个任务中,任意一个future任务完成的时候,执行任务。
-
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
-
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
-
runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
示例:
public class ThreadTest3 {
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main..........start");
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程1:" + Thread.currentThread().getId());
int i = 10 / 1;
System.out.println("线程1结束");
return i;
}, service);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
System.out.println("线程2:" + Thread.currentThread().getId());
int i = 10 / 1;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程2结束:");
return "hello";
}, service);
future01.runAfterEitherAsync(future02,()->{
System.out.println("任务3开始...");
});
System.out.println("main................end...");
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10/2;
System.out.println("运行结果:"+i);
}
}
}
7.多任务组合
allOf:等待所有任务完成
anyOf:只要有一个任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
示例:
public class Thread4 {
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main..........start");
CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品页面");
return "hello.jsp";
}, service);
CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("查询商品属性");
return "黑色+256G";
}, service);
CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品介绍");
return "华为";
}, service);
// CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
//
// System.out.println("结果:"+futureImg.get()+"----"+futureAttr.get()+"--------"+futureDesc.get());
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);
System.out.println("结果:"+anyOf.get());
System.out.println("main..........end");
}
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i = 10/2;
System.out.println("运行结果:"+i);
}
}
}