CompletableFuture
前言
Github:https://github.com/HealerJean
一、CompletableFuture
1、CompletableFuture.runAsync
/**
* 1、无返回值的异步任务 CompletableFuture.runAsync
*/
@Test
public void test1() throws Exception {
ExecutorService executer = Executors.newFixedThreadPool(10);
//1.无返回值的异步任务 runAsync()
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
log.info("currentThread:{}", Thread.currentThread().getName());
}, executer);
Thread.sleep(5000L);
}
2、CompletableFuture.supplyAsync
| 方法 | 结果 | 返回 | 异常捕获 | 说明 |
|---|---|---|---|---|
whenComplete |
有 | 无 | 有 | 第一个参数是结果,第二个参数是异常 |
exceptionally |
无 | 有 | 有 | 只有一个参数是异常类型,他可以感知异常 |
handle |
有 | 有 | 有 | 既可以感知异常,也可以返回默认数据,是 whenComplete 和 exceptionally的结合 |
| 维度 | thenRunAsync |
whenComplete |
|---|---|---|
| 用途 | 执行独立后续任务 | 处理结果或异常(不修改结果) |
| 线程行为 | 默认异步执行(新线程) | 默认同步执行(前序任务线程) |
| 异常传递 | 前序异常会跳过回调 | 无论是否异常,回调均执行 |
| 返回值影响 | 无返回值,不影响前序结果 | 不修改前序结果 |
1)completableFuture.whenComplete
第一个参数是结果,第二个参数是异常
| 维度 | thenApply |
whenComplete |
|---|---|---|
| 触发条件 | 仅在前序任务成功完成时触发 | 无论任务成功或失败均触发 |
| 输入参数 | 接受前序任务的结果(T) |
接受结果(T)和异常(Throwable) |
| 返回值 | 返回新的 CompletableFuture<U> |
返回与原任务相同结果的 CompletableFuture<T> |
| 异常处理 | 不处理异常,异常会传播 | 可访问异常,但需显式处理以避免传播 |
| 典型场景 | 数据转换、链式流水线操作 | 日志记录、资源清理、异常监控 |
/**
* 有返回结果的异步执行 CompletableFuture.supplyAsync
* 1、completableFuture.whenComplete
* 第一个参数是结果
* 第二个参数是异常,他可以感知异常,无法返回默认数据
* */
@Test
public void test_whenComplete() throws Exception{
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("currentThread:{}", Thread.currentThread().getId());
int i = 1 / 0;
return "HealerJean";
}, executor);
log.info("==============");
CompletableFuture<String> completableFuture2 = completableFuture.whenComplete((r, e) -> {
log.info("[completableFuture.whenComplete] result:{} ", r, e);
});
Thread.sleep(5000L);
}
2)completableFuture.exceptionally
只有一个参数是异常类型,他可以感知异常,同时返回默认数据
/**
* 有返回结果的异步执行 CompletableFuture.supplyAsync
* 2、completableFuture.exceptionally
* 只有一个参数是异常类型,他可以感知异常,同时返回默认数据
*/
@Test
public void test_exceptionally() throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("currentThread:{}", Thread.currentThread().getId());
int i = 1 / 0;
return "HealerJean";
}, executor);
CompletableFuture<String> exceptionally = completableFuture.exceptionally(e -> {
log.error("[completableFuture.exceptionally] error", e);
return "exceptionally";
});
Thread.sleep(5000L);
}
3)completableFuture.handle
既可以感知异常,也可以返回默认数据,是
whenComplete和exceptionally的结合
/**
* 有返回结果的异步执行 CompletableFuture.supplyAsync
* 3、completableFuture.handle
* 既可以感知异常,也可以返回默认数据,是whenComplete和exceptionally的结合
*/
@Test
public void test2() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("currentThread:{}", Thread.currentThread().getId());
int i = 1 / 0;
return "HealerJean";
}, service);
log.info("==============");
CompletableFuture<String> handle = completableFuture.handle((r, e) -> {
if (r != null) {
log.error("[completableFuture.handle] result {}", r);
return r;
}
if (e != null) {
log.error("[completableFuture.handle] error", e);
return "error";
}
return "";
});
Thread.sleep(10000L);
}
3、join、get
| 维度 | join() |
get() |
|---|---|---|
| 异常类型 | 仅抛出非受检异常(CompletionException) |
抛出受检异常(InterruptedException, ExecutionException) |
| 代码简洁性 | 无需 try-catch 包裹,代码更简洁 |
需显式处理受检异常,代码冗长 |
| 适用场景 | 推荐在 CompletableFuture 链式调用中使用 |
需精确控制异常时使用(如强制中断任务) |
二、线程任务
1、串行任务
1)thenRunAsync() 异步,无入参,无返回值
thenRunAsync()无入参,无返回值 、线程池执行
/**
* 串行任务
* 1、thenRunAsync() 无入参,无返回值(线程池执行)
*/
@Test
public void test_thenRunAsync() throws InterruptedException {
log.info("[test] currentThread:{}", Thread.currentThread().getId());
// currentThread:1
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] currentThread:{}", Thread.currentThread().getId());
// completableFuture.supplyAsync] currentThread:11
return "HealerJean";
}, executor);
// 1、thenRunAsync() 无入参,无返回值(线程池执行)
completableFuture.thenRunAsync(() -> {
long cost = System.currentTimeMillis() - start;
log.info("[completableFuture.thenRunAsync] currentThread:{}, cost:{}", Thread.currentThread().getId(), cost);
// [completableFuture.thenRunAsync] currentThread:12, cost:84
}, executor);
Thread.sleep(5000L);
}
2)thenAccept() 同步 有入参,无返回值
thenAccept()有入参,无返回值,不传线程池
/**
* 串行任务
* 2、thenAccept() 有入参,无返回值,不传线程池(main线程)
*/
@Test
public void test_thenAccept(){
log.info("[test] currentThread:{}", Thread.currentThread().getId());
// currentThread:1
long start = System.currentTimeMillis();
ExecutorService service = Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] currentThread:{}", Thread.currentThread().getId());
// [completableFuture.supplyAsync] currentThread:11
return "HealerJean";
}, service);
// 2、thenAccept() 有入参,无返回值,不传线程池(main线程)
completableFuture.thenAccept((r) -> {
long cost = System.currentTimeMillis() - start;
log.info("[completableFuture.thenAccept] currentThread:{}, result:{}, cost:{}", Thread.currentThread().getId(), r, cost);
// [completableFuture.thenAccept] currentThread:1, result:HealerJean, cost:76
});
}
3)thenApply() 同步 有入参,有返回值
thenApply()有入参,有返回值,不传线程池 (Main线程)
/**
* 线程串行化
* 3、thenApply() 有入参,有返回值,不传线程池(main线程)
*/
@Test
public void test3() throws ExecutionException, InterruptedException {
log.info("[test] currentThread:{}", Thread.currentThread().getId());
// currentThread:1
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] currentThread:{}", Thread.currentThread().getId());
// [completableFuture.supplyAsync] currentThread:11
return "HealerJean";
}, executor);
// 3、thenApply() 有入参,有返回值,不传线程池(main线程)
completableFuture.thenApply((r) -> {
long cost = System.currentTimeMillis() - start;
log.info("[completableFuture.thenApply] currentThread:{}, result:{}, cost:{}", Thread.currentThread().getId(), r, cost);
// [completableFuture.thenApply] currentThread:1, result:HealerJean, cost:74
return "thenAccept";
});
Thread.sleep(5000L);
}
2、异步任务
1)两个异步任务都完成,第三个任务才开始执行
/**
* 4、异步,两任务组合 :两个异步任务都完成,第三个任务才开始执行
*/
@Test
public void test4(){
ExecutorService service = Executors.newFixedThreadPool(10);
//定义两个任务
//任务一
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] task1 currentThread:{}", Thread.currentThread().getId());
return "task_1";
}, service);
//任务二
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] task2 currentThread:{}", Thread.currentThread().getId());
return "task_2";
}, service);
// 1、runAfterBothAsync:无传入值、无返回值
task1.runAfterBothAsync(task2, () -> {
log.info("[completableFuture.runAfterBothAsync] task3 currentThread:{}", Thread.currentThread().getId());
}, service);
// 2、thenAcceptBothAsync:有传入值、无返回值
task1.thenAcceptBothAsync(task2, (x, y) -> {
log.info("[completableFuture.thenAcceptBothAsync] task3 currentThread:{}, result1:{}, result2:{}", Thread.currentThread().getId(), x, y);
}, service);
// 2、thenCombineAsync:有传入值、有返回值
task1.thenCombineAsync(task2, (x, y) -> {
log.info("[completableFuture.thenCombineAsync] task3 currentThread:{}, result1:{}, result2:{}", Thread.currentThread().getId(), x, y);
return "task3";
}, service);
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
21:43:27.652 [pool-1-thread-1] INFO com.hlj.threadpool.d04_多接口返回.TestMain - [completableFuture.supplyAsync] task1 currentThread:11
21:43:27.652 [pool-1-thread-2] INFO com.hlj.threadpool.d04_多接口返回.TestMain - [completableFuture.supplyAsync] task2 currentThread:12
21:43:27.657 [pool-1-thread-3] INFO com.hlj.threadpool.d04_多接口返回.TestMain - [completableFuture.thenCombineAsync] task3 currentThread:13, result1:task_1, result2:task_2
21:43:27.657 [pool-1-thread-4] INFO com.hlj.threadpool.d04_多接口返回.TestMain - [completableFuture.thenAcceptBothAsync] task3 currentThread:14, result1:task_1, result2:task_2
21:43:27.657 [pool-1-thread-5] INFO com.hlj.threadpool.d04_多接口返回.TestMain - [completableFuture.runAfterBothAsync] task3 currentThread:15
2)当一个任务失败后迅速返回
异常捕获,有一个失败就结束任务
@Test
public void test4_3() {
ExecutorService service = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
int i = 1 / 0;
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("[completableFuture.supplyAsync] task1 currentThread:{}", Thread.currentThread().getId());
return "task_1";
}, service);
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("[completableFuture.supplyAsync] task2 currentThread:{}", Thread.currentThread().getId());
return "task_2";
}, service);
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("[completableFuture.supplyAsync] task3 currentThread:{}", Thread.currentThread().getId());
return "task_3";
}, service);
CompletableFuture[] completableFutures = new CompletableFuture[]{task1, task2, task3};
try {
CompletableFuture.allOf(completableFutures).join();
} catch (Exception e) {
log.info("[CompletableFuture.allOf] error", e);
}
Long cost = System.currentTimeMillis() - start;
log.info("[test] task finish cost:{}", cost);
}
3)等待所有任务结束
循环使用
get方法等待任务结束耗时比allOf方法多
/**
* 5、等待所有任务都结束
*/
@Test
public void test5() throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] task1 currentThread:{}", Thread.currentThread().getId());
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task_1";
}, service);
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] task2 currentThread:{}", Thread.currentThread().getId());
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task_2";
}, service);
CompletableFuture<String> task3= CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] task3 currentThread:{}", Thread.currentThread().getId());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task_3";
}, service);
// 第一种方式:CompletableFuture.allOf
// CompletableFuture[] completableFutures = new CompletableFuture[]{task1,task2, task3};
// CompletableFuture.allOf(completableFutures).join();
// Long cost = System.currentTimeMillis() - start;
// log.info("[test] task finish cost:{}", cost);
// 第二种方式:completableFuture.get()
for (CompletableFuture completableFuture : completableFutures){
Object result = completableFuture.get();
Long cost = System.currentTimeMillis() - start;
log.info("[task] cost:{}, result:{}", cost, result);
}
// 第三种方式:CompletableFuture.allOf 顺便获取结果
CompletableFuture[] completableFutures = new CompletableFuture[]{task1, task2, task3};
CompletableFuture<String> stringCompletableFuture = CompletableFuture.allOf(completableFutures).thenApply(f -> {
String task1Res = null;
try {
task1Res = task1.get();
} catch (Exception e) {
throw new RuntimeException("");
}
String task2Res = null;
try {
task2Res = task2.get();
} catch (Exception e) {
throw new RuntimeException("");
}
String task3Res = null;
try {
task3Res = task3.get();
} catch (Exception e) {
throw new RuntimeException("");
}
return task1Res + task2Res + task3Res;
});
String join = stringCompletableFuture.join();
log.info("join:{}", join);
Long cost = System.currentTimeMillis() - start;
log.info("[test] task finish cost:{}", cost);
}
public ResultDTO dynamicRpcQuery(Query query) {
List<QueryType> queryTypes = query.getQueryTypes();
String id = query.getId();
String pin = query.getPin();
List<CompletableFuture<?>> futures = new ArrayList<>();
// 模拟 RPC 调用 1
CompletableFuture<String> future1 = queryTypes.contains(QueryType.TYPE1) ?
CompletableFuture.supplyAsync(() -> rpcService.getData1(id), THREAD_POOL) : null;
Optional.ofNullable(future1).ifPresent(futures::add);
// 模拟 RPC 调用 2
CompletableFuture<String> future2 = queryTypes.contains(QueryType.TYPE2) ?
CompletableFuture.supplyAsync(() -> rpcService.getData2(id), THREAD_POOL) : null;
Optional.ofNullable(future2).ifPresent(futures::add);
// 模拟 RPC 调用 3
CompletableFuture<String> future3 = queryTypes.contains(QueryType.TYPE3) ?
CompletableFuture.supplyAsync(() -> rpcService.getData3(pin), THREAD_POOL) : null;
Optional.ofNullable(future3).ifPresent(futures::add);
ResultDTO result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(f -> {
ResultDTO dto = new ResultDTO();
Optional.ofNullable(future1).ifPresent(item -> dto.setData1(item.join()));
Optional.ofNullable(future2).ifPresent(item -> dto.setData2(item.join()));
Optional.ofNullable(future3).ifPresent(item -> dto.setData3(item.join()));
return dto;
}).exceptionally(e -> {
log.log(Level.SEVERE, "id: " + id, e);
return new ResultDTO();
}).join();
result.setId(id);
result.setPin(pin);
return result;
}
4)输出最先完成的n个任务
@Test
public void test6() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(10);
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] task1 currentThread:{}", Thread.currentThread().getId());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task_1";
}, service);
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] task2 currentThread:{}", Thread.currentThread().getId());
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task_2";
}, service);
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
log.info("[completableFuture.supplyAsync] task3 currentThread:{}", Thread.currentThread().getId());
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task_3";
}, service);
int n = 2;
CountDownLatch latch = new CountDownLatch(n);
List<CompletableFuture<String>> results = Lists.newArrayList();
CompletableFuture<String>[] completableFutures = new CompletableFuture[]{task1, task2, task3};
// 对每个future添加完成时的回调
for (CompletableFuture<String> future : completableFutures) {
future.whenComplete((r, e) -> {
synchronized (results) {
if (results.size() < n) {
latch.countDown();
results.add(future);
}
}
});
}
latch.await();
for (CompletableFuture<String> future: results){
log.info("[test] task finish result:{}", future.get());
}
Thread.sleep(5000L);
}
三、超时
1、异步: CompletableFutureTimeout
java8中CompletableFuture异步处理超时的方法⬤
Java8 的CompletableFuture并没有timeout机制,虽然可以在get的时候指定timeout,是一个同步堵塞的操作。怎样让timeout也是异步的呢?Java8 内有内建的机制支持,一般的实现方案是启动一个ScheduledThreadpoolExecutor线程在timeout时间后直接调用CompletableFuture.completeExceptionally(newTimeoutException()),然后用acceptEither()或者applyToEither看是先计算完成还是先超时:⬤ 在
java9 引入了orTimeout和completeOnTimeOut两个方法支持 异步timeout机制:内部实现上跟我们上面的实现方案是一模一样的,只是现在不需要自己实现了。
1)CompletableFutureTimeout
注意:异常会被吃掉,返回超时的默认值。我就猜想,如果代码无法保证,既然有超时这种默认值,那么本身的业务异常应该毫无意义
@Slf4j
public class CompletableFutureTimeout {
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
// 注意,这里使用一个线程就可以搞定 因为这个线程并不真的执行请求 而是仅仅抛出一个异常
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new CompletableFutureTimeout.Delayer.DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
static final class FutureTimeOutException extends RuntimeException {
public FutureTimeOutException() {
}
}
static class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
/**
* 哪个先完成 就apply哪一个结果 这是一个关键的API,exceptionally出现异常后返回默认值
* @param future future
* @param t 异常返回默认结果
* @param to 超时返回默认结果
* @param timeout 超时时间
* @param unit 时间单位
* @return
* @param <T>
*/
public static <T> CompletableFuture<T> completeOnTimeout( CompletableFuture<T> future,T t, T to, long timeout, TimeUnit unit) {
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
return future.applyToEither(timeoutFuture, Function.identity()).exceptionally(throwable -> {
if (ExceptionUtils.extractRealException(throwable) instanceof FutureTimeOutException){
return to;
}
return t;
});
}
private static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
CompletableFutureTimeout.Delayer.delayer.schedule(() -> result.completeExceptionally(new FutureTimeOutException()), timeout, unit);
return result;
}
}
2)测试
@Test
public void test() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
log.info("task1 start ");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("task1 end ");
return "task1 success";
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
log.info("task2 start ");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("task2 end ");
return 100;
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
log.info("task3 start ");
int i = 1/0;
try {
Thread.sleep(3500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("task3 end ");
return "task3 success";
});
CompletableFuture<String> completableFuture1 = CompletableFutureTimeout.completeOnTimeout(task1, "Exception", "TimeOutException", 2, TimeUnit.SECONDS);
CompletableFuture<Integer> completableFuture2 = CompletableFutureTimeout.completeOnTimeout(task2, 0, -1,1, TimeUnit.SECONDS);
CompletableFuture<String> completableFuture3 = CompletableFutureTimeout.completeOnTimeout(task3, "Exception","TimeOutException", 2, TimeUnit.SECONDS);
String result1 = completableFuture1.get();
Integer result2 = completableFuture2.get();
String result3 = completableFuture3.get();
Long cost = System.currentTimeMillis() - start;
log.info("result1: {}, cost:{}", result1, cost);
log.info("result2: {}, cost:{}", result2, cost);
log.info("result3: {}, cost:{}", result3, cost);
Thread.sleep(500000L);
}
20:36:13.418 [ForkJoinPool.commonPool-worker-11] INFO com.healerjean.proj.d04_多接口返回.TestMain - task3 start
20:36:13.418 [ForkJoinPool.commonPool-worker-2] INFO com.healerjean.proj.d04_多接口返回.TestMain - task2 start
20:36:13.418 [ForkJoinPool.commonPool-worker-9] INFO com.healerjean.proj.d04_多接口返回.TestMain - task1 start
20:36:13.437 [ForkJoinPool.commonPool-worker-9] INFO com.healerjean.proj.d04_多接口返回.TestMain - task1 end
20:36:14.431 [main] INFO com.healerjean.proj.d04_多接口返回.TestMain - result1: TimeOutException, cost:1121
20:36:14.435 [main] INFO com.healerjean.proj.d04_多接口返回.TestMain - result2: -1, cost:1121
20:36:14.435 [main] INFO com.healerjean.proj.d04_多接口返回.TestMain - result3: Exception, cost:1121
20:36:16.429 [ForkJoinPool.commonPool-worker-2] INFO com.healerjean.proj.d04_多接口返回.TestMain - task2 end
2、同步:completableFuture
public MarginEnum.InsuredFailEnum allCheckFutureResult(CompletableFuture<MarginCheckContextBO>[] allCheckFuture) {
for (CompletableFuture<MarginCheckContextBO> completableFuture : allCheckFuture) {
if (Objects.isNull(completableFuture)) {
continue;
}
try {
MarginCheckContextBO marginCheckContextBo = completableFuture.get();
if (MarginEnum.InsuredFailEnum.SUCCESSFUL != marginCheckContextBo.getInsuredFailEnum()) {
return marginCheckContextBo.getInsuredFailEnum();
}
} catch (Exception e) {
log.error("[MarginService#vendorInsureCheck] ERROR", e);
throw new AppRunException(e.getMessage());
}
}
return MarginEnum.InsuredFailEnum.SUCCESSFUL;
}
三、原理
转子:https://mp.weixin.qq.com/s/GQGidprakfticYnbVYVYGQ
1、CompletableFuture 的背景和定义加载
1)CompletableFuture 解决的问题
CompletableFuture是由Java 8引入的,在Java8之前我们一般通过Future实现异步。⬤
Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java 8之前若要设置回调一般会使用guava的ListenableFuture,回调的引入又会导致臭名昭著的回调地狱(下面的例子会通过ListenableFuture的使用来具体进行展示)。⬤
CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
下面将举例来说明,我们通过 ListenableFuture、CompletableFuture来实现异步的差异。假设有三个操作 step1、step2、step3存在依赖关系,其中 step3 的执行依赖step1和step2的结果。
a、Future ( ListenableFuture ):
ExecutorService executor = Executors.newFixedThreadPool(5);
ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);
ListenableFuture<String> future1 = guavaExecutor.submit(() -> {
//step 1
System.out.println("执行step 1");
return "step1 result";
});
ListenableFuture<String> future2 = guavaExecutor.submit(() -> {
//step 2
System.out.println("执行step 2");
return "step2 result";
});
ListenableFuture<List<String>> future1And2 = Futures.allAsList(future1, future2);
Futures.addCallback(future1And2, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> result) {
System.out.println(result);
ListenableFuture<String> future3 = guavaExecutor.submit(() -> {
System.out.println("执行step 3");
return "step3 result";
});
Futures.addCallback(future3, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable t) {
}
}, guavaExecutor);
}
@Override
public void onFailure(Throwable t) {
}}, guavaExecutor);
b、CompletableFuture 的实现如下:
显然,
CompletableFuture的实现更为简洁,可读性更好。
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行step 1");
return "step1 result";
}, executor);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行step 2");
return "step2 result";
});
cf1.thenCombine(cf2, (result1, result2) -> {
System.out.println(result1 + " , " + result2);
System.out.println("执行step 3");
return "step3 result";
}).thenAccept(result3 -> System.out.println(result3));
2、CompletableFuture 的定义
CompletableFuture实现了两个接口(如上图所示):``Future、CompletionStage`。⬤
Future表示异步计算的结果,⬤
CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。
3、CompletableFuture 的使用
下面我们通过一个例子来讲解
CompletableFuture如何使用,使用CompletableFuture也是构建依赖树的过程。一个CompletableFuture的完成会触发另外一系列依赖它的CompletableFuture的执行:
如图所示,这里描绘的是一个业务接口的流程,其中包括 CF1 \ CF2 \ CF3 \ CF4 \ CF5 共5个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次 RPC 调用、一次数据库操作或者是一次本地方法调用等,在使用 CompletableFuture 进行异步化编程时,图中的每个步骤都会产生一个 CompletableFuture 对象,最终结果也会用一个 CompletableFuture 来进行表示。
根据 CompletableFuture依赖数量,可以分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。

1)零依赖:CompletableFuture 的创建
我们先看下如何不依赖其他
CompletableFuture来创建新的CompletableFuture:

如上图红色链路所示,接口接收到请求后,首先发起两个异步调用 CF1、CF2,主要有三种方式:
ExecutorService executor = Executors.newFixedThreadPool(5);
//1、使用runAsync或supplyAsync发起异步调用
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
return "result1";
}, executor);
//2、CompletableFuture.completedFuture()直接创建一个已完成状态的CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");
//3、先初始化一个未完成的CompletableFuture,然后通过complete()、completeExceptionally(),完成该CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");
2)一元依赖:依赖一个CF
CF3,CF5分别依赖于CF1和CF2,这种对于单个CompletableFuture的依赖可以通过thenApply、thenAccept、thenCompose等方法来实现,代码如下所示:

CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {
//result1为CF1的结果
//......
return "result3";
});
CompletableFuture<String> cf5 = cf2.thenApply(result2 -> {
//result2为CF2的结果
//......
return "result5";
});
3)二元依赖:依赖两个 CF
CF4同时依赖于两个CF1和CF2,这种二元依赖可以通过thenCombine等回调来实现,如下代码所示:

CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {
//result1和result2分别为cf1和cf2的结果
return "result4";
});
4)多元依赖:依赖多个 CF
如上图红色链路所示,整个流程的结束依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过
allOf或anyOf方法来实现,⬤ 多个依赖全部完成时使用
allOf⬤ 当多个依赖中的任意一个完成即可时使用
anyOf

CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
CompletableFuture<String> result = cf6.thenApply(v -> {
//这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全部完成时,才会执行 。
result3 = cf3.join();
result4 = cf4.join();
result5 = cf5.join();
//根据result3、result4、result5组装最终result;
return "result";
});
4、CompletbleFuture 原理
CompletableFuture中包含两个字段:result和stack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(DependencyActions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈[Treiber stack]的形式存储,stack表示栈顶元素。

这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独 Completion子类中。下面是 Completion 类关系结构图。CompletableFuture中的每个方法都对应了图中的一个 Completion 的子类,Completion 本身是观察者的基类。
UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。
1)设计思想
按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手。由于回调种类多,但结构差异不大,所以这里单以一元依赖中的
thenApply为例,不再枚举全部回调类型。如下图所示:

2)被观察者
1、每个 CompletableFuture· 都可以被看作一个被观察者,其内部有一个 Completion类型的链表成员变量 stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈 stack属性,依次通知注册到其中的观察者。上面例子中步骤 fn2就是作为观察者被封装在 UniApply 中。
3、被观察者 CF 中的 result 属性,用来存储返回结果数据。这里可能是一次 RPC 调用的返回值,也可能是任意对象,在上面的例子中对应步骤 fn1 的执行结果。
3)观察者
CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。
1、观察者中的 dep 属性:指向其对应的 CompletableFuture,在上面的例子中 dep 指向 CF2。
2、观察者中的 src 属性:指向其依赖的 CompletableFuture,在上面的例子中 src 指向 CF1。 观察者Completion中的 fn 属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。
5整体流程
1)一元依赖
这里仍然以 thenApply 为例来说明一元依赖的流程:
1、将观察者 Completion 注册到 CF1,此时 CF1将 Completion 压栈。
2、当CF1 的操作运行完成时,会将结果赋值给 CF1 中的 result 属性。
3、依次弹栈,通知观察者尝试运行。

A、FAQ
Q1:在观察者注册之前,如果 CF 已经执行完成,并且已经发出通知,那么这时观察者由于错过了通知是不是将永远不会被触发呢 ?
A1:不会。在注册时检查依赖的CF是否已经完成。如果未完成(即 result == null)则将观察者入栈,如果已完成(result != null)则直接触发观察者操作。
Q2:在”入栈“前会有”result == null“的判断,这两个操作为非原子操作,CompletableFufure 的实现也没有对两个操作进行加锁,完成时间在这两个操作之间,观察者仍然得不到通知,是不是仍然无法触发?
A2:不会。入栈之后再次检查CF是否完成,如果完成则触发。
Q3:当依赖多个CF时,观察者会被压入所有依赖的CF的栈中,每个CF完成的时候都会进行,那么会不会导致一个操作被多次执行呢 ?如下图所示,即当CF1、CF2同时完成时,如何避免CF3被多次触发。
A3:CompletableFuture 的实现是这样解决该问题的:观察者在执行之前会先通过CAS操作设置一个状态位,将status由0改为1。如果观察者已经执行过了,那么CAS操作将会失败,取消执行。
通过对以上3个问题的分析可以看出,CompletableFuture在处理并行问题时,全程无加锁操作,极大地提高了程序的执行效率。我们将并行问题考虑纳入之后,可以得到完善的整体流程图如下所示:

2)二元依赖
我们以thenCombine为例来说明二元依赖:
thenCombine操作表示依赖两个CompletableFuture。其观察者实现类为BiApply,如上图所示,BiApply通过src和snd两个属性关联被依赖的两个CF,fn属性的类型为BiFunction。与单个依赖不同的是,在依赖的CF未完成的情况下,thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中,每个被依赖的CF完成时都会尝试触发观察者BiApply,BiApply会检查两个依赖是否都完成,如果完成则开始执行。这里为了解决重复触发的问题,同样用的是上一章节提到的CAS操作,执行时会先通过CAS设置状态位,避免重复触发。

3)多元依赖
依赖多个
CompletableFuture的回调方法包括allOf、anyOf,区别在于allOf观察者实现类为BiRelay,需要所有被依赖的CF完成后才会执行回调;而anyOf观察者实现类为OrRelay,任意一个被依赖的CF完成后就会触发。二者的实现方式都是将多个被依赖的CF构建成一棵平衡二叉树,执行结果层层通知,直到根节点,触发回调监听。
四、常见问题
1、执行在哪个线程上
要合理治理线程资源,最基本的前提条件就是要在写代码时,清楚地知道每一行代码都将执行在哪个线程上。下面我们看一下
CompletableFuture的执行线程情况。
CompletableFuture实现了CompletionStage接口,通过丰富的回调方法,支持各种组合操作,每种组合场景都有同步和异步两种方法。
⬤ 同步方法(即不带 Async后缀的方法)有两种情况。
1、如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
2、如果注册时被依赖的操作还未执行完,则由回调线程执行。
⬤ 异步方法(即带 Async 后缀的方法):可以选择是否传递线程池参数 Executor运行在指定线程池中;当不传递Executor时,会使用 ForkJoinPool 中的共用线程池 CommonPool(CommonPool的大小是 CPU 核数 - 1 ,如果是IO 密集的应用,线程数可能成为瓶颈)。
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
//业务操作
return "";
}, threadPool1);
//1、此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
future1.thenApply(value -> {
System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
return value + "1";
});
//使用 ForkJoinPool中的共用线程池 CommonPool,但是一定要注意传线程池,具体看下面
future1.thenApplyAsync(value -> {
//do something
return value + "1";
});
//使用指定线程池
future1.thenApplyAsync(value -> {
//do something
return value + "1";
}, threadPool1);
2、线程池须知
1) 异步回调要传线程池
前面提到,异步回调方法可以选择是否传递线程池参数
Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。当不传递线程池时,会使用
ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。
2)线程池循环引用会导致死锁
doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。
为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞。
public Object doGet() {
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
//do sth
return CompletableFuture.supplyAsync(() -> {
System.out.println("child");
return "child";
}, threadPool1).join();//子任务
}, threadPool1);
return cf1.join();
}
3)异步 RPC 调用注意不要阻塞IO线程池
服务异步化后很多步骤都会依赖于异步
RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。
3、异常问题
1)异常处理
由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,无法通过
try\catch捕获异常。CompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try\catch。使用方法如下所示:
@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//内部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {
//业务方法,内部会发起异步rpc调用
CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture =
wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);
return remarkResultFuture
.exceptionally(err -> {//通过exceptionally 捕获异常,打印日志并返回默认值
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, err);
return 0;
});
}
2)真正的异常
有一点需要注意,
CompletableFuture在回调方法中对异常进行了包装。大部分异常会封装成CompletionException后抛出,真正的异常存储在cause属性中,因此如果调用链中经过了回调方法处理那么就需要用Throwable.getCause()方法提取真正的异常。但是,有些情况下会直接返回真正的异常
@Autowired
private WmOrderAdditionInfoThriftService wmOrderAdditionInfoThriftService;//内部接口
public CompletableFuture<Integer> getCancelTypeAsync(long orderId) {
CompletableFuture<WmOrderOpRemarkResult> remarkResultFuture = wmOrderAdditionInfoThriftService.findOrderCancelledRemarkByOrderIdAsync(orderId);//业务方法,内部会发起异步rpc调用
return remarkResultFuture
.thenApply(result -> {//这里增加了一个回调方法thenApply,如果发生异常thenApply内部会通过new CompletionException(throwable) 对异常进行包装
//这里是一些业务操作
})
.exceptionally(err -> {//通过exceptionally 捕获异常,这里的err已经被thenApply包装过,因此需要通过Throwable.getCause()提取异常
log.error("WmOrderRemarkService.getCancelTypeAsync Exception orderId={}", orderId, ExceptionUtils.extractRealException(err));
return 0;
});
}
上面代码中用到了一个自定义的工具类
ExceptionUtils,用于CompletableFuture的异常提取,在使用CompletableFuture做异步编程时,可以直接使用该工具类处理异常。实现代码如下:
public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
//这里判断异常类型是否为CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
4、CompletableFuture 和 线程池
1、直接使用线程池适用于简单的并发任务执行场景,通过直接管理线程和任务,可以更好地控制资源的分配和任务的执行。
2、CompletableFuture + 线程池则更适用于复杂的异步编程场景,通过 CompletableFuture 提供的强大异步编程能力,可以更方便地实现任务的组合、结果的非阻塞等待和异常处理等。


