Java8中新增新特性异步编程之CompletableFuture

上一篇 / 下一篇  2023-07-19 13:28:16

  Future基本应用
  Future是从JDK1.5开始有的,目的是获取异步任务执行的结果,通常情况会结合ExecutorService及Callable一起使用。
  1. Future结合Callable使用
  单任务执行
  private static class Task implements Callable<String> {
    @Override
    public String call() throws Exception {
      TimeUnit.SECONDS.sleep(3) ;
      return "success";
    }
      
  }
  public static void main(String[] args) throws Exception {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
    Future<String> future = executor.submit(new Task()) ;
    String result = future.get() ;
    System.out.println("执行结果:" + result) ;
  }
  当执行到future.get()方法的时候会阻塞,等待3s后继续执行。
  多个任务同时执行
  private static class Task implements Callable<String> {
    private int sleep ;
    public Task(int sleep) {
      this.sleep = sleep ;
    }
      
    @Override
    public String call() throws Exception {
      TimeUnit.SECONDS.sleep(this.sleep) ;
      return "success";
    }
  }
  public static void main(String[] args) throws Exception {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
    Future<String> future1 = executor.submit(new Task(3)) ;
    Future<String> future2 = executor.submit(new Task(2)) ;
    Future<String> future3 = executor.submit(new Task(1)) ;
    String result1 = future1.get() ;
    String result2 = future2.get() ;
    String result3 = future3.get() ;
    System.out.println("result1:" + result1 + "\t" + "result2:" + result2 + "\t" + "result3:" + result3) ;
  }
  以上代码执行的3个任务分别用时3,2,1s。future1用时最长。
  从运行的结果看到即便future2, future3执行时间短也必须等待future1执行完后才会继续,虽然你可以倒过来获取结果,但是在实际项目中的应用你应该是不能确认每个任务执行需要多长时间,谁先执行完就先获取谁。
  虽然这种同步阻塞的方式在有些场景下还是很有必要的。但由于它的同步阻塞导致了当前线程不能干其它的事必须一致等待。
  CompletionService解决Future的缺点
  CompletionService是一边生产新的任务,一边处理已经完成的任务。简单地说就是CompletionService不管任务执行先后顺序,谁先执行完就处理谁。
  private static class Task implements Callable<String> {
    private int time;
    private String name ;
    public Task(int time, String name) {
      this.time = time ;
      this.name = name ;
    }
    @Override
    public String call() throws Exception {
      TimeUnit.SECONDS.sleep(this.time) ;
      return name ;
    }
      
  }
  public static void main(String[] args) throws Exception {
    ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
    CompletionService<String> cs = new ExecutorCompletionService<>(pool) ;
    cs.submit(new Task(3, "name" + 3)) ;
    cs.submit(new Task(1, "name" + 1)) ;
    cs.submit(new Task(2, "name" + 2)) ;
    for (int i = 0; i < 3; i++) {
      System.out.println(cs.take().get()) ;
    }
  }
  通过执行结果发现,任务的结果获取是以谁先执行完处理谁与任务的执行先后没有关系。
  CompletableFuture异步编程
  CompletableFuture通过如下4个静态方法来执行异步任务
  2. 简单异步任务链式调用执行
  ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
  CompletableFuture.runAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(3) ;
      System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }, executor).thenRun(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }) ;
  System.out.println("主线程:" + Thread.currentThread().getName()) ;
  executor.shutdown() ;
  执行结果:
  3. 获取上一步任务执行结果及任务完成处理
  CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(3) ;
      System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return "1" ;
  }, executor).thenApply(res -> {
    System.out.println("获取到上一步任务执行结果:" + res) ;
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return "2" ;
  }).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res) ;
    if (tx != null) {
      System.err.println("发生错误了:" + tx.getMessage()) ;
    }
    executor.shutdown();
  }) ;
  System.out.println("主线程:" + Thread.currentThread().getName()) ;
  执行结果:
  这里如果任务执行的时候发生了异常那么在whenComplete方法中的res 会为空,tx为发生异常的对象。没有异常时res有执行的机构,tx异常对象为空。
  4. 异步任务异常处理
  CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(3) ;
      System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return "1" ;
  }, executor).thenApply(res -> {
    System.out.println("获取到上一步任务执行结果:" + res) ;
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ;
      System.out.println(1 / 0) ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return "2" ;
  }).exceptionally(tx -> {
    System.out.println(Thread.currentThread().getName() + ", 任务执行发生了异常") ;
    return "error" ;
  }).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res) ;
    if (tx != null) {
      System.err.println("发生错误了:" + tx.getMessage()) ;
    }
    executor.shutdown();
  }) ;
  System.out.println("主线程:" + Thread.currentThread().getName()) ;
  这里我们人为的制造异常 1 / 0 。
  执行结果:
  根据执行结果当发生异常时进入exceptionally方法,最终进入whenComplete方法此时 tx异常对象是发生异常的异常对象。
  5. 所有任务完成才算完成任务
  CompletableFuture.allOf
  CompletableFuture<Double> calc1 = CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", calc1任务执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 10D ;
  }, executor) ;
      
  CompletableFuture<Double> calc2 = CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(5) ;
      System.out.println(Thread.currentThread().getName() + ", calc2任务执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 20D ;
  }, executor) ;
  // 当任何一个任务发生异常,这里的tx都不会为null
  CompletableFuture.allOf(calc1, calc2).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res + ", " + tx) ;
    try {
      System.out.println(calc1.get()) ;
      System.out.println(calc2.get()) ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }
    executor.shutdown();
  }) ;
  执行结果:
  在这里whenComplete中的res是没有结果的,要获取数据我们的分别调用get方法获取。
  6. handle方法对结果处理
  CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return "0" ;
  }, executor).handle((res, tx) -> {
    // 处理结果数据
    return res + "1" ;
  }).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res) ;
    if (tx != null) {
      System.err.println("发生错误了:" + tx.getMessage()) ;
    }
    executor.shutdown();
  }) ;
  执行结果:
  正确
  发生异常时:
  当发生异常时handle方法中的res是没有值的,tx异常对象为发生异常的异常对象。
  7. 合并异步任务
  将两个异步任务完成后合并处理
  CompletableFuture.thenCombine
  CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 10d ;
  }, executor) ;
  CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 20d ;
  }, executor) ;
  task1.thenCombine(task2, (t1, t2) -> {
    System.out.println(Thread.currentThread().getName() + ", 合并任务完成") ;
    return t1 + "," + t2 ;
  }).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res) ;
    if (tx != null) {
      System.err.println("发生错误了:" + tx.getMessage()) ;
    }
    executor.shutdown();
  }) ;
  执行结果:
  8. 异步任务谁快谁就进入下一步的执行
  CompletableFuture.applyToEither
  两个异步任务谁先执行完谁就继续执行后续的操作。
  CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 10d ;
  }, executor) ;
  CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 20d ;
  }, executor) ;
  task1.applyToEither(task2, res -> {
    return res ;
  }).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res) ;
    if (tx != null) {
      System.err.println("发生错误了:" + tx.getMessage()) ;
    }
    executor.shutdown();
  }) ;
  执行结果:
  9. 两个异步任务都执行完了才继续执行
  只有两个任务都执行完成了后才会继续。
  CompletableFuture.runAfterBoth
  CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 10d ;
  }, executor) ;
  CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> {
    try {
      TimeUnit.SECONDS.sleep(2) ;
      System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ;
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 20d ;
  }, executor) ;
  task1.runAfterBoth(task2, () -> {
    System.out.println("任务都执行完成了...") ;
  }).whenComplete((res, tx) -> {
    System.out.println("获取到结果:" + res) ;
    if (tx != null) {
      System.err.println("发生错误了:" + tx.getMessage()) ;
    }
    executor.shutdown();
  }) ;
  执行结果:
  10. 等待所有任务执行完成
  CompletableFuture.anyOf
  CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    sleep(1000) ;
    System.out.println("我是任务1") ;
    return "Task1" ;
  }, executor) ;
  CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    sleep(3000) ;
    System.out.println("我是任务2") ;
    System.out.println(1 / 0) ;
    return "Task2" ;
  }, executor) ;
  // 任意一个任务执行完成就算完成
  // 当任务执行发生异常后,th才不会为null
  CompletableFuture.anyOf(task1, task2).whenCompleteAsync((v, th) -> {
    System.out.println("v = " + v) ;
    System.out.println("th = " + th) ;
  }, executor) ;
  执行结果:
  11. 接收上一个任务的执行结果
  CompletableFuture.supplyAsync(() -> {
    sleep(2000) ;
    System.out.println("第一个任务执行完成...") ;
    // System.out.println(1 / 0) ;
    return new Random().nextInt(10000) ;
  }, executor).thenAcceptAsync(res -> { // 接收上一个任务的执行结果
    System.out.println("任务执行结果:" + res) ;
  }, executor) ;
  执行结果:

TAG: 软件开发 Java java

 

评分:0

我来说两句

Open Toolbar