CompletableFuture 异步获取结果并且等待所有异步任务完成



Future的注意事项:

当 for 循环批量获取Future的结果时容易 block,get 方法调用时应使用 timeout 限制

Future 生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来

Future的局限性:

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;

无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;

无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;

没有异常处理:Future接口中没有关于异常处理的方法;





CompletableFuture是对Future的扩展和增强。

CompletableFuture实现了Future接口,并在此基础上进行了丰富的扩展,完美弥补了Future的局限性,同时CompletableFuture实现了对任务编排的能力。

借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。

而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。



public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        useCompletableFuture_complicated();
    }


    public static void useCompletableFuture_complicated() throws ExecutionException, InterruptedException, TimeoutException {
        // 这个方法时描述一般地使用CompletableFuture实现异步操作,即复杂的使用CompletableFuture实现异步操作

        // 假设我们有一个Person名字List
        List personNameList = new ArrayList<>();

        // 为了方便测试,我们要构造大量的数据add到personNameList,用for循环,名字就是1, 2, 3, ...

        // 这里添加1000个名字到personNameList
        for (int i = 0; i < 10; i++) {
            personNameList.add(String.valueOf(i));
        }

        // 假设我们要做的业务是personNameList里的每个人都说一句Hello World, 但是我们不关心他们说这句话的顺序,而且我们希望这个业务能够较快速的完成,所以采用异步就是比较合适的

        // 先创建两个活动线程的线程池
        ExecutorService executor = Executors.newFixedThreadPool(8);
        List> cfs = new ArrayList<>();

        List cfsValue = new ArrayList<>();
        System.out.println("时间:"+DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
        // 开始我们的业务处理  使用JDK 1.8的特性,stream()和Lambda表达式: (参数) -> {表达式}
        for (String personName : personNameList) {
            CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier() {
                @SneakyThrows
                @Override
                public String get() {
                    // 模拟业务逻辑,say hello world
//                    System.out.println(personName + ": Hello World!");
                    Thread.sleep(3000);
                    return personName+"  task finished!";
                }
            }, executor).whenCompleteAsync((result, e) -> {
                //执行线程执行完以后的操作。
                cfsValue.add(result);
            });

            cfs.add(cf);

        }
        System.out.println("时间:"+DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));
        System.out.println("CompletableFuture.allOf(cfs.toArray(new CompletableFuture[cfsValue.size()])).join()");
        

        //所有任务执行完才放行
        CompletableFuture.allOf(cfs.toArray(new CompletableFuture[cfsValue.size()])).join();

        System.out.println("时间:"+DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));

        System.out.println(JSON.toJSON(cfsValue));
        System.out.println("时间:"+DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MS_FORMAT));


        /*for(CompletableFuture cf:cfs){
            try {
                System.out.println("---------------获取结果------------get:   "+cf.get(4,TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }*/





        /*personNameList.stream().forEach(

                name -> CompletableFuture.supplyAsync((Supplier) () -> {
                    // 封装了业务逻辑
                    // 模拟业务逻辑,say hello world
                    System.out.println(name + ":JDK 1.8的特性,stream()和Lambda表达式   Hello World!");
                    return "success";
                }, executor).exceptionally(e -> {
                    System.out.println(e);
                    return "false";
                })

        );*/


        // 关闭线程池executor
        // 说明一下executor必须要显示关闭(它的方法里有介绍),不然线程池会一直等待任务,会导致main方法一直运行
        // 还有就是关闭executor,不会导致之前提交的异步任务被打断或者取消。即之前提交的任务依然会执行到底,只是不会再接收新的任务
        executor.shutdown();

        /* 那么关闭线程池之后,我们怎么确定我们的任务是否都完成了呢,可以使用executor.isTerminated()命令
        // 可以看看isTerminated这个方法的说明,简单的说就是调用isTerminated()方法之前没有调用shutdown()方法的话,那么isTerminated()方法返回的永远是false。
        // 所以isTerminated()方法返回true的情况就是在调用isTerminated()方法之前要先调用shutdown()方法,且所有的任务都完成了。
        // 其实调用isTerminated()的目的就是我们对异步任务的结果是care, 我们需要等待异步任务的结果以便我们做下一步的动作。
        // 如果我们不关心异步任务的结果的话,完全可以不用调用isTerminated()。
        */
        while (!executor.isTerminated()) {
            System.out.println("no terminated");
            try {
                System.out.println("我要休眠一下");
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        System.out.println(">>>>>>>>>>>>>>>>>>>> 结束了  Hello World!");

    }

}
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章