【Java】CompletableFuture 使用教程


一、前言

本篇文章主要介绍 CompletableFuture 的用法和使用案例,基于 Java 8进行测试。

二、介绍

CompletableFuture 是 Java 8 新引入的类,提供了一些并发编程的 api,供开发者更好的实现多线程编程。

三、常用方法

3.1、runAsync()、supplyAsync()

runAsync() 和 supplyAsync() 都是用来执行异步操作的。不同的是,runAsync() 没有返回值 supplyAsync() 有返回值。

/**
* 无返回值
*/
public static void runAsync() throws Exception {
  CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  });

  future.get();
}

/**
* 有返回值
*/
public static void supplyAsync() throws Exception {
  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
    try {
      Thread.sleep(1000);

      //假设通过某个逻辑获取了结果 1000
      return 1000L;
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  });

  Long result = future.get();
  System.out.println(result);
}

3.2、allOf()、anyOf()

allof() 和 anyof() 方法都可以组合任意多个的 CompletableFuture,不同的是 allof() 是与的关系,而 anyof() 是或的关系。

/**
 * allof() 方法测试
 */
private void allOfTest(){
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        System.out.println("任务1开始执行");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1执行结束");
    });

    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        System.out.println("任务2开始执行");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2执行结束");
    });

    //等待所有任务执行完毕
    CompletableFuture.allOf(future1, future2).join();
}
/**
 * anyof() 方法测试
 */
private void anyOfTest() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务1开始执行");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务1执行结束");
        return "任务1执行结果";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务2开始执行");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务2执行结束");
        return "任务2执行结果";
    });

    //等待任意一个任务执行完毕
    CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
    System.out.println(anyFuture.get());
}

3.3、join()、get()

join() 和 get() 方法有很多相似之处,都可以等待线程执行完毕,返回线程执行的结果。不同之处在于,join()方法抛出的是未经检查的异常(可以在代码中不处理异常),get() 方法抛出的经过检查的异常(必须在代码中处理异常情况)。

public static void main(String[] args) {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
        return 1 / 0;
    });
    CompletableFuture.allOf(f1).join();
    //get() 方法必须处理异常
    //CompletableFuture.allOf(f1).get();
    System.out.println("CompletableFuture Test");
}

3.4、thenApply()

thenAppley() 方法是将一个 CompletableFuture 作为入参,给另一个 CompletableFuture 任务使用的方法。只所以不用 x = future1.join,然后 future2 再对 x 进行操作。是因为如果有多个 future,就会出现大量的重复性代码。

public static void main(String[] args) {
    //CompletableFuture 的 thenApply 方法 测试
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> 3)
            .thenAccept(x -> System.out.println("正确结果是: " + x));

    future.join();
}

3.5、thenCombine()

thenCombine() 方法是合并线程操作的方法,第一个参数标识合并哪个异步任务,第二个表示要做什么操作。

public static void main(String[] args) {
    //CompletableFuture 的 thenCombine 方法 测试
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "result1");

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "result2");

    CompletableFuture<String> future3 = future1.thenCombine(future2, (r1, r2) -> r1 + "---" + r2);

    System.out.println(future3.join());

}

四、使用案例

4.1、异步

public static void main(String[] args) {
    log.debug("执行业务逻辑");

    //异步保存操作记录等操作
    CompletableFuture.runAsync(() -> {
        log.debug("保存了操作记录");
    });
}

4.2、多线程获取数据

public static void main(String[] args) {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        //获取结果 1
        return "result1";
    });

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        //获取结果 2
        return "result2";
    });

    //等待两个线程都执行完
    CompletableFuture.allOf(future1, future2).join();
    
    //模拟对多线程获取的数据进行处理
    System.out.println(future1.join() + future2.join());
}

五、所有方法

  1. runAsync(Runnable runnable):以异步的方式执行指定的Runnable任务。
  2. supplyAsync(Supplier<U> supplier):以异步的方式执行指定的Supplier任务,并返回一个CompletableFuture对象,它的结果类型是U。
  3. thenApply(Function<? super T,? extends U> fn):在CompletableFuture对象上应用给定的Function函数,返回一个新的CompletableFuture对象,它的结果类型是U。
  4. thenAccept(Consumer<? super T> action):在CompletableFuture对象上应用给定的Consumer函数,返回一个新的CompletableFuture对象,它的结果类型是Void。
  5. thenRun(Runnable action):在CompletableFuture对象上应用给定的Runnable函数,返回一个新的CompletableFuture对象,它的结果类型是Void。
  6. thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn):将当前CompletableFuture对象和另一个CompletableFuture对象的结果组合起来,返回一个新的CompletableFuture对象,它的结果类型是V。
  7. thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> action):将当前CompletableFuture对象和另一个CompletableFuture对象的结果应用给定的BiConsumer函数,返回一个新的CompletableFuture对象,它的结果类型是Void。
  8. runAfterBoth(CompletableFuture<?> other, Runnable action):在当前CompletableFuture对象和另一个CompletableFuture对象都完成后执行指定的Runnable函数,返回一个新的CompletableFuture对象,它的结果类型是Void。
  9. applyToEither(CompletableFuture<? extends T> other, Function<? super T, U> fn):将当前CompletableFuture对象和另一个CompletableFuture对象的结果中的任意一个应用给定的Function函数,返回一个新的CompletableFuture对象,它的结果类型是U。
  10. acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> action):将当前CompletableFuture对象和另一个CompletableFuture对象的结果中的任意一个应用给定的Consumer函数,返回一个新的CompletableFuture对象,它的结果类型是Void。
  11. runAfterEither(CompletableFuture<?> other, Runnable action):在当前CompletableFuture对象和另一个CompletableFuture对象中的任意一个完成后执行指定的Runnable函数,返回一个新的CompletableFuture对象,它的结果类型是Void。
  12. thenCompose(Function<? super T,? extends CompletionStage<U>> fn):将给定的Function函数应用于当前CompletableFuture对象的结果,并返回一个新的CompletableFuture对象,它的结果类型是U。
  13. exceptionally(Function<Throwable, ? extends T> fn):在当前CompletableFuture对象发生异常时应用给定的Function函数,返回一个新的CompletableFuture对象,它的结果类型是T。
  14. whenComplete(BiConsumer<? super T, ? super Throwable> action):在当前CompletableFuture对象完成时应用给定的BiConsumer函数,返回一个新的CompletableFuture对象,它的结果类型是T。
  15. completeExceptionally(Throwable ex):以异常的方式完成当前CompletableFuture对象,返回一个新的CompletableFuture对象,它的结果类型是T。

六、附录

6.1、自定义线程池的配置类

CompletableFuture 默认的线程池为 ForkJoinPool.commonPool(),该线程池的线程数等于 cpu核心数 - 1。commonPool 设计应用在 计算密集型场景中,如果项目偏 io 场景,那么该线程池将不符合。并且,如果在双核的服务器上,commonPool 的线程池为1,相当于执行单线程操作了。因此,实际开发中常常需要自定义线程池。

6.1.1、自定义线程池的配置类

@Configuration
@EnableAsync
@Component
public class ThreadPoolConfig {

    @Bean
    public Executor threadPoolExecutor() {
        //cpu的核心数,包括超线程出来的cpu核心数
        int num = Runtime.getRuntime().availableProcessors();

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        //核心线程数
        taskExecutor.setCorePoolSize(num);

        //最大线程数
        taskExecutor.setMaxPoolSize(num + num / 2);

        //阻塞队列任务
        taskExecutor.setQueueCapacity(num * 2);

        //线程名的前缀
        taskExecutor.setThreadNamePrefix("taskExecutor->");

        //线程池的拒绝策略
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        //初始化线程池
        taskExecutor.initialize();

        return taskExecutor;
    }
}

6.1.2、CompletableFuture 使用自定义线程池

@Resource
private ThreadPoolTaskExecutor threadPoolExecutor;

CompletableFuture.runAsync(() -> {
        //执行某业务逻辑
}, threadPoolExecutor);

6.2、使用 CompletableFuture 的框架推荐


文章作者: xiucai
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 xiucai !
  目录