在 Java8 中,新增了 CompletableFuture 异步编程工具类。它与我们之前使用的 Future 接口有何区别呢?本文将对这个重要的 CompletableFuture 类进行分析学习。
Future 接口(FutureTask 实现类),定义了操作异步任务执行的一些方法。例如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等方法。
Modifier and Type | Method | Description |
boolean | cancel(boolean mayInterruptIfRunning) | 尝试取消执行此任务。 |
V | get() | 等待计算完成,然后检索其结果。 |
V | get(long timeout, TimeUnit unit) | 如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。 |
boolean | isCancelled() | 如果此任务在正常完成之前被取消,则返回 true 。 |
boolean | isDone() | 返回 true 如果任务已完成。 |
我们通常使用 Future 接口进行异步编程时,获取结构都需要阻塞获取。主线程中会阻塞,其实更好的结果是让它执行完结果后通知我们获取。
由关系图可以看到,FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口,因此,FutureTask 可以交给 Executor 执行。也可以由调用线程执行执行 FutureTask.run() 方法。
入门案例:
public class FutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask stringFutureTask = new FutureTask<>(new MyThread2());
Thread thread = new Thread(stringFutureTask);
thread.start();
String s = stringFutureTask.get();
System.out.println(s);
}
}
class MyThread2 implements Callable{
@Override
public String call() throws Exception {
System.out.println("come in");
return "hello";
}
}
// come in
// hello
复制代码
future + 线程池异步多线程任务配合,能显著提高程序的执行效率。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
FutureTask stringFutureTask = new FutureTask(()->{
TimeUnit.MICROSECONDS.sleep(500);
return "aaa";
});
threadPool.submit(stringFutureTask);
FutureTask stringFutureTask2 = new FutureTask(()->{
TimeUnit.MICROSECONDS.sleep(300);
return "bbb";
});
threadPool.submit(stringFutureTask2);
TimeUnit.MICROSECONDS.sleep(300);
threadPool.shutdown();
}
复制代码
注意:
如果不想阻塞主线程,异步获取结果,通常会以轮询的方式获取结果,不会造成主线程的阻塞,但是轮询的方式会耗费 CPU 资源,调用 isDone() 进行轮询。
public class CompletableFutureDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask<>(() -> {
System.out.println("-----come in FutureTask");
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
return ""+ ThreadLocalRandom.current().nextInt(100);
});
new Thread(futureTask,"t1").start();
System.out.println(Thread.currentThread().getName()+" "+"线程完成任务");
/**
* 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
*/
while (true){
if(futureTask.isDone()){
System.out.println(futureTask.get());
break;
}
}
}
}
复制代码
总结:
虽然,Future 接口提供了异步任务的能力,但是对于结果的获取非常不方便,阻塞的方式违背了异步编程的初衷,轮询的方式又会耗费无谓的 CPU 资源,且不能及时的得到计算结果,为什么不能使用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如 Node.js 采用回调的方式实现异步编程。 Java 的一些框架,比如 Netty,也提供了通用的扩展 Future,而作为正统的 Java 类库,Java8 新增了一个包含 50 个方法左右的类 ==> CompletableFuture。提供了非常强大的 Future 扩展功能,提供了函数式编程的能力,可以通过回调的方式处理计算结果。
它实现了 future 和 completionStage 两个接口。 提供了观察者模式,可以让任务执行完成后通知监听的一方。
代表异步计算过程的某一阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似 Linux 系统的管道分隔符传参数。 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
CompletionStage 的接口方法可以从多种角度进行分类,从最宏观的横向划分,CompletionStage 的接口主要分为三类:
CompletionStage 接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。 异步执行的,默认线程池是 ForkJoinPool.commonPool() ,但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池 。
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
复制代码
public class CompletableFutureDemo2{
public static void main(String[] args) throws ExecutionException, InterruptedException{
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); }
catch (InterruptedException e) { e.printStackTrace(); }
return 533;
});
//去掉注释上面计算没有完成,返回444
//开启注释上面计算完成,返回计算结果
try { TimeUnit.SECONDS.sleep(2); }
catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(completableFuture.getNow(444));
}
}
public class CompletableFutureDemo2{
public static void main(String[] args) throws ExecutionException, InterruptedException{
System.out.println(CompletableFuture.supplyAsync(() -> "abc")
.thenApply(r -> r + "123").join());
}
}
复制代码
打断 get 方法立即获得返回括号值。
// 是否打断get方法立即返回括号值
public boolean complete(T value)
public class CompletableFutureDemo4{
public static void main(String[] args) throws ExecutionException, InterruptedException{
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return 533;
});
//注释掉暂停线程,get还没有算完只能返回complete方法设置的444;暂停2秒钟线程,异步线程能够计算完成返回get
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
//当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.
System.out.println(completableFuture.complete(444)+" "+completableFuture.get());
}
}
复制代码
计算结果存在依赖关系,这两个线程串行化,由于依赖关系,当前步骤有异常就叫停,主线程如果结束,默认线程池就会关闭。
public class CompletableFutureDemo4{
public static void main(String[] args) throws ExecutionException, InterruptedException{
//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}).thenApply(f -> {
System.out.println("222");
return f + 1;
}).thenApply(f -> {
//int age = 10/0; // 异常情况:那步出错就停在那步。
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("*****v: "+v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(2); }
catch (InterruptedException e) { e.printStackTrace(); }
}
}
复制代码
有异常也可以往下一步走,根据带的异常可以进一步处理。
// 有异常也可以往下一步走,根据带的异常参数可以进一步处理
public class CompletableFutureDemo4{
public static void main(String[] args) throws ExecutionException, InterruptedException{
//当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
// 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}).handle((f,e) -> {
int age = 10/0;
System.out.println("222");
return f + 1;
}).handle((f,e) -> {
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("*****v: "+v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println("-----主线程结束,END");
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
复制代码
与结果处理和结果转换犀利函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行 Action,而不返回新的计算值。 根据对结果的处理方式,结果消费函数又可以分为三大类:
观察该函数的参数类型可知,它们是函数式接口 Consumer,这个接口就只有输入,没有返回值。 常用方法:
public CompletionStage thenAccept(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action);
复制代码
任务A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值。
CompletableFuture future = CompletableFuture
.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第一次运算:" + number);
return number;
}).thenAccept(number ->
System.out.println("第二次运算:" + number * 5));
复制代码
这个函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的 action 消费两个异步的结果。
具体使用:
CompletableFuture futrue1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int number = new Random().nextInt(3) + 1;
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1结果:" + number);
return number;
}
});
CompletableFuture future2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int number = new Random().nextInt(3) + 1;
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结果:" + number);
return number;
}
});
futrue1.thenAcceptBoth(future2, new BiConsumer() {
@Override
public void accept(Integer x, Integer y) {
System.out.println("最终结果:" + (x + y));
}
});
复制代码
thenRun 也是对线程任务结果的一种消费函数,与 thenAccept 不同的是,thenRun 会在上一阶段 Runable。
任务A 执行完 B,并且 B 不需要 A 的结果。
常用方法:
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
复制代码
具体使用:
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第一阶段:" + number);
return number;
}).thenRun(() ->
System.out.println("thenRun 执行"));
复制代码
线程交互指将两个线程任务获取结果的速度相对比较,按一定的规则进行下一步处理。
一句话:谁快用谁。
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
常用方法:
public CompletionStage applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
复制代码
具体使用:
public class CompletableFutureDemo5{
public static void main(String[] args) throws ExecutionException, InterruptedException{
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " " + "---come in ");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return 10;
});
CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " " + "---come in ");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return 20;
});
CompletableFuture thenCombineResult = completableFuture1.applyToEither(completableFuture2,f -> {
System.out.println(Thread.currentThread().getName() + " " + "---come in ");
return f + 1;
});
System.out.println(Thread.currentThread().getName() + " " + thenCombineResult.get());
}
}
复制代码
两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
常用方法:
public CompletionStage acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
复制代码
具体使用:
CompletableFuture future1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int number = new Random().nextInt(10) + 1;
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一阶段:" + number);
return number;
}
});
CompletableFuture future2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int number = new Random().nextInt(10) + 1;
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二阶段:" + number);
return number;
}
});
future1.acceptEither(future2, new Consumer() {
@Override
public void accept(Integer number) {
System.out.println("最快结果:" + number);
}
});
复制代码
两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返
回值。
常用方法:
public CompletionStage runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage<?> other,Runnable action);
复制代码
具体使用:
CompletableFuture future1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1结果:" + number);
return number;
}
});
CompletableFuture future2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2结果:" + number);
return number;
}
});
future1.runAfterEither(future2, new Runnable() {
@Override
public void run() {
System.out.println("已经有一个任务完成了");
}
}).join();
复制代码
CompletableFuture future1 = CompletableFuture
.supplyAsync(new Supplier() {
@Override
public Integer get() {
int number = new Random().nextInt(10);
System.out.println("任务1结果:" + number);
return number;
}
});
CompletableFuture future2 = CompletableFuture
.supplyAsync(new Supplier() {
@Override
public Integer get() {
int number = new Random().nextInt(10);
System.out.println("任务2结果:" + number);
return number;
}
});
CompletableFuture result = future1
.thenCombine(future2, new BiFunction() {
@Override
public Integer apply(Integer x, Integer y) {
return x + y;
}
});
System.out.println("组合后结果:" + result.get());
复制代码
Random random = new Random();
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(random.nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(random.nextInt(1));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});
CompletableFuture
public static void main(String[] args) {
List futures = Arrays.asList(CompletableFuture.completedFuture("hello"),
CompletableFuture.completedFuture(" world!"),
CompletableFuture.completedFuture(" hello"),
CompletableFuture.completedFuture("java!"));
final CompletableFuture allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
allCompleted.thenRun(() -> {
futures.stream().forEach(future -> {
try {
System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
});
}
复制代码
测试结果:
get future at:1568892339473, result:hello
get future at:1568892339473, result: world!
get future at:1568892339473, result: hello
get future at:1568892339473, result:java!
复制代码
场景:例如查询商品详情页面的逻辑比较复杂
// 1. 获取sku的基本信息 0.5s
// 2. 获取sku的图片信息 0.5s
// 3. 获取sku的促销信息 TODO 1s
// 4. 获取spu的所有销售属性 1s
// 5. 获取规格参数组及组下的规格参数 TODO 1.5s
// 6. spu详情 TODO 1s
.........
复制代码
那么一步步的执行下去,用户需要6.5秒后才能看到详情页的内容,很显然不能接受的,如果有多个线程同时完成这6步操作,也许只需要 1.5s 即可完成内容。
@Override
public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
SkuItemVo skuItemVo = new SkuItemVo();
CompletableFuture infoFuture = CompletableFuture.supplyAsync(() -> {
//1、sku基本信息的获取 pms_sku_info
SkuInfoEntity info = this.getById(skuId);
skuItemVo.setInfo(info);
return info;
}, executor);
CompletableFuture saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//3、获取spu的销售属性组合
List saleAttrVos = skuSaleAttrValueService.getSaleAttrBySpuId(res.getSpuId());
skuItemVo.setSaleAttr(saleAttrVos);
}, executor);
CompletableFuture descFuture = infoFuture.thenAcceptAsync((res) -> {
//4、获取spu的介绍 pms_spu_info_desc
SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
skuItemVo.setDesc(spuInfoDescEntity);
}, executor);
CompletableFuture baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
//5、获取spu的规格参数信息
List attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
skuItemVo.setGroupAttrs(attrGroupVos);
}, executor);
//2、sku的图片信息 pms_sku_images
CompletableFuture imageFuture = CompletableFuture.runAsync(() -> {
List imagesEntities = skuImagesService.getImagesBySkuId(skuId);
skuItemVo.setImages(imagesEntities);
}, executor);
CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();
return skuItemVo;
}
复制代码
留言与评论(共有 0 条评论) “” |