一、前言
本篇文章主要介绍 CompletableFuture 的用法和使用案例,基于 Java 8进行测试。
二、介绍
CompletableFuture 是 Java 8 新引入的类,提供了一些并发编程的 api,供开发者更好的实现多线程编程。
三、常用方法
3.1、runAsync()、supplyAsync()
runAsync() 和 supplyAsync() 都是用来执行异步操作的。不同的是,runAsync() 没有返回值 supplyAsync() 有返回值。
/**
* 无返回值
*/
public static void runAsync() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
future.get();
}
/**
* 有返回值
*/
public static void supplyAsync() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
//假设通过某个逻辑获取了结果 1000
return 1000L;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Long result = future.get();
System.out.println(result);
}
3.2、allOf()、anyOf()
allof() 和 anyof() 方法都可以组合任意多个的 CompletableFuture,不同的是 allof() 是与的关系,而 anyof() 是或的关系。
/**
* allof() 方法测试
*/
private void allOfTest(){
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("任务1开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1执行结束");
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("任务2开始执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2执行结束");
});
//等待所有任务执行完毕
CompletableFuture.allOf(future1, future2).join();
}
/**
* anyof() 方法测试
*/
private void anyOfTest() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1执行结束");
return "任务1执行结果";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2开始执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2执行结束");
return "任务2执行结果";
});
//等待任意一个任务执行完毕
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
System.out.println(anyFuture.get());
}
3.3、join()、get()
join() 和 get() 方法有很多相似之处,都可以等待线程执行完毕,返回线程执行的结果。不同之处在于,join()方法抛出的是未经检查的异常(可以在代码中不处理异常),get() 方法抛出的经过检查的异常(必须在代码中处理异常情况)。
public static void main(String[] args) {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
return 1 / 0;
});
CompletableFuture.allOf(f1).join();
//get() 方法必须处理异常
//CompletableFuture.allOf(f1).get();
System.out.println("CompletableFuture Test");
}
3.4、thenApply()
thenAppley() 方法是将一个 CompletableFuture 作为入参,给另一个 CompletableFuture 任务使用的方法。只所以不用 x = future1.join,然后 future2 再对 x 进行操作。是因为如果有多个 future,就会出现大量的重复性代码。
public static void main(String[] args) {
//CompletableFuture 的 thenApply 方法 测试
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> 3)
.thenAccept(x -> System.out.println("正确结果是: " + x));
future.join();
}
3.5、thenCombine()
thenCombine() 方法是合并线程操作的方法,第一个参数标识合并哪个异步任务,第二个表示要做什么操作。
public static void main(String[] args) {
//CompletableFuture 的 thenCombine 方法 测试
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "result1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "result2");
CompletableFuture<String> future3 = future1.thenCombine(future2, (r1, r2) -> r1 + "---" + r2);
System.out.println(future3.join());
}
四、使用案例
4.1、异步
public static void main(String[] args) {
log.debug("执行业务逻辑");
//异步保存操作记录等操作
CompletableFuture.runAsync(() -> {
log.debug("保存了操作记录");
});
}
4.2、多线程获取数据
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
//获取结果 1
return "result1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
//获取结果 2
return "result2";
});
//等待两个线程都执行完
CompletableFuture.allOf(future1, future2).join();
//模拟对多线程获取的数据进行处理
System.out.println(future1.join() + future2.join());
}
五、所有方法
runAsync(Runnable runnable)
:以异步的方式执行指定的Runnable任务。-
supplyAsync(Supplier<U> supplier)
:以异步的方式执行指定的Supplier任务,并返回一个CompletableFuture对象,它的结果类型是U。 thenApply(Function<? super T,? extends U> fn)
:在CompletableFuture对象上应用给定的Function函数,返回一个新的CompletableFuture对象,它的结果类型是U。thenAccept(Consumer<? super T> action)
:在CompletableFuture对象上应用给定的Consumer函数,返回一个新的CompletableFuture对象,它的结果类型是Void。thenRun(Runnable action)
:在CompletableFuture对象上应用给定的Runnable函数,返回一个新的CompletableFuture对象,它的结果类型是Void。thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
:将当前CompletableFuture对象和另一个CompletableFuture对象的结果组合起来,返回一个新的CompletableFuture对象,它的结果类型是V。thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> action)
:将当前CompletableFuture对象和另一个CompletableFuture对象的结果应用给定的BiConsumer函数,返回一个新的CompletableFuture对象,它的结果类型是Void。runAfterBoth(CompletableFuture<?> other, Runnable action)
:在当前CompletableFuture对象和另一个CompletableFuture对象都完成后执行指定的Runnable函数,返回一个新的CompletableFuture对象,它的结果类型是Void。applyToEither(CompletableFuture<? extends T> other, Function<? super T, U> fn)
:将当前CompletableFuture对象和另一个CompletableFuture对象的结果中的任意一个应用给定的Function函数,返回一个新的CompletableFuture对象,它的结果类型是U。acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> action)
:将当前CompletableFuture对象和另一个CompletableFuture对象的结果中的任意一个应用给定的Consumer函数,返回一个新的CompletableFuture对象,它的结果类型是Void。runAfterEither(CompletableFuture<?> other, Runnable action)
:在当前CompletableFuture对象和另一个CompletableFuture对象中的任意一个完成后执行指定的Runnable函数,返回一个新的CompletableFuture对象,它的结果类型是Void。thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
:将给定的Function函数应用于当前CompletableFuture对象的结果,并返回一个新的CompletableFuture对象,它的结果类型是U。exceptionally(Function<Throwable, ? extends T> fn)
:在当前CompletableFuture对象发生异常时应用给定的Function函数,返回一个新的CompletableFuture对象,它的结果类型是T。whenComplete(BiConsumer<? super T, ? super Throwable> action)
:在当前CompletableFuture对象完成时应用给定的BiConsumer函数,返回一个新的CompletableFuture对象,它的结果类型是T。completeExceptionally(Throwable ex)
:以异常的方式完成当前CompletableFuture对象,返回一个新的CompletableFuture对象,它的结果类型是T。
六、附录
6.1、自定义线程池的配置类
CompletableFuture 默认的线程池为 ForkJoinPool.commonPool()
,该线程池的线程数等于 cpu核心数 - 1。commonPool 设计应用在 计算密集型场景中,如果项目偏 io 场景,那么该线程池将不符合。并且,如果在双核的服务器上,commonPool 的线程池为1,相当于执行单线程操作了。因此,实际开发中常常需要自定义线程池。
6.1.1、自定义线程池的配置类
@Configuration
@EnableAsync
@Component
public class ThreadPoolConfig {
@Bean
public Executor threadPoolExecutor() {
//cpu的核心数,包括超线程出来的cpu核心数
int num = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(num);
//最大线程数
taskExecutor.setMaxPoolSize(num + num / 2);
//阻塞队列任务
taskExecutor.setQueueCapacity(num * 2);
//线程名的前缀
taskExecutor.setThreadNamePrefix("taskExecutor->");
//线程池的拒绝策略
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}
6.1.2、CompletableFuture 使用自定义线程池
@Resource
private ThreadPoolTaskExecutor threadPoolExecutor;
CompletableFuture.runAsync(() -> {
//执行某业务逻辑
}, threadPoolExecutor);