JDK8设计出CompletableFuture,提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
(1)CompletionStage
1.2.1、runAsync(Runnable()):无返回值两种
/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the {@link ForkJoinPool#commonPool()} after* it runs the given action.** @param runnable the action to run before completing the* returned CompletableFuture* @return the new CompletableFuture*/public static CompletableFuture runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the given executor after it runs the given* action.** @param runnable the action to run before completing the* returned CompletableFuture* @param executor the executor to use for asynchronous execution* @return the new CompletableFuture*/public static CompletableFuture runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}
(1)runAsync(Runnable runnable)
/*** @Description: 1、runAsync(Runnable()):无返回值* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 不建议直接new使用// CompletableFuture completableFuture = new CompletableFuture();// 推荐使用四大静态方法// 1、runAsync(Runnable()):无返回值CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(completableFuture.get());}}
没指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。
如果指定,则使用我们自定义的或者特别指定的线程执行异步代码。
(2)unAsync(Runnable runnable,Executor executor)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @Description: 1、runAsync(Runnable()):无返回值,指定Executors参数* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 不建议直接new使用// CompletableFuture completableFuture = new CompletableFuture();// 推荐使用四大静态方法// 1、runAsync(Runnable()):无返回值ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}},threadPool);System.out.println(completableFuture.get());threadPool.shutdown();}}
1.2.2、supplyAsync(Supplier supplier)有返回值两种
/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the {@link ForkJoinPool#commonPool()} with* the value obtained by calling the given Supplier.** @param supplier a function returning the value to be used* to complete the returned CompletableFuture* @param the function's return type* @return the new CompletableFuture*/public static CompletableFuture supplyAsync(Supplier supplier) {return asyncSupplyStage(asyncPool, supplier);}/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the given executor with the value obtained* by calling the given Supplier.** @param supplier a function returning the value to be used* to complete the returned CompletableFuture* @param executor the executor to use for asynchronous execution* @param the function's return type* @return the new CompletableFuture*/public static CompletableFuture supplyAsync(Supplier supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}
(1)supplyAsync(Supplier supplier)
/*** @Description: 1、supplyAsync(Supplier supplier):有返回值* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "Hello supplyAsync";});System.out.println(completableFuture.get());}}
(2)supplyAsync(Supplier supplier,Executor executor)
import java.util.concurrent.*;/*** @Description: 1、supplyAsync(Supplier supplier,Executor executor):有返回值* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "Hello supplyAsync";},threadPool);System.out.println(completableFuture.get());threadPool.shutdown();}}
从JAVA8开始引入了CompletableFuture,它是future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回 调对象的回调方法。
/*** @Description: 1、supplyAsync(Supplier supplier,Executor executor):有返回值* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);try{CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName()+"--come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1秒后的结果是:"+result);return result;},threadPool).whenComplete((v,e)->{if(e==null){System.out.println("--计算完成,更新系统:"+v);}}).exceptionally(e->{e.printStackTrace();System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");}catch (Exception e){e.printStackTrace();}finally {threadPool.shutdown();}}}
CompletableFuture的优点:
(1)异步任务结束时,会自动回调某个对象的方法。
(2)主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。
(3)异步任务出错时,会自动回调某个对象的方法。
package com.company.juc;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Description: TODO* @Author: yangyb* @Date:2022/9/26 22:09* Version: 1.0**/
public class CompletableFutureMallDemo {static List list= Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"),new NetMall("tianmao"));public static List getPrice(List list,String productName){return list.stream().map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))).collect(Collectors.toList());}public static List getPriceByCompletableFuture(List list,String productName){return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args){long startTime = System.currentTimeMillis();List list1 = getPrice(list, "mysql");list1.forEach(System.out::println);long endTime =System.currentTimeMillis();System.out.println("-------costTime: "+(endTime-startTime)+" 毫秒");long startTime2 = System.currentTimeMillis();List list2 = getPriceByCompletableFuture(list, "mysql");list2.forEach(System.out::println);long endTime2 =System.currentTimeMillis();System.out.println("-------costTime: "+(endTime2-startTime2)+" 毫秒");}
}class NetMall{public String netMallName;public double calcPrice(String productName){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextDouble()*2+productName.charAt(0);}public NetMall(String netMallName) {this.netMallName = netMallName;}public String getNetMallName() {return netMallName;}}
。。。。。。。。。。
。。。。。。。。。。
/*** @Description: * @Author: yangyb* @Date:2022/9/27 21:25* Version: 1.0**/
public class CompletableFutureAPI3Demo {public static void main(String[] args){// thenRun(Runnable runnable),任务A执行完B,并且B不需要A的结果System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{}).join());// thenAccept(Consumer action),任务A执行完B,B需要A的结果,但是任务B无返回值System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(System.out::println).join());// thenAccept,任务A执行完B,B需要A的结果,同时任务B有返回值System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply((r)-> r+"resultB").join());}
}
import java.util.concurrent.CompletableFuture;/*** @Description: thenAccept,接收任务的处理结果,并消费处理,无返回结果* @Author: yangyb* @Date:2022/9/27 21:25* Version: 1.0**/
public class CompletableFutureAPI3Demo {public static void main(String[] args){CompletableFuture.supplyAsync(()->{return 1;}).thenApply(f->{return f+2;}).thenApply(f->{return f+3;}).thenAccept(System.out::println);}
}
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;/*** @Description: TODO* @Author: yangyb* @Date:2022/9/27 22:16* Version: 1.0**/
public class CompletableFutureFastDemo {public static void main(String[] args){CompletableFuture playA = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "playA";});CompletableFuture playB = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "playB";});CompletableFuture result = playA.applyToEither(playB, f -> {return f + "is winner";});System.out.println(Thread.currentThread().getName()+"\t"+"------: "+result.join());}
}
import java.util.concurrent.CompletableFuture;/*** @Description: thenCombine* @Author: yangyb* @Date:2022/9/27 22:32* Version: 1.0**/
public class CompletableFutureCombineDemo {public static void main(String[] args){CompletableFuture completableFutureA = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName()+"\t---启动");return 10;});CompletableFuture completableFutureB = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName()+"\t---启动");return 13;});CompletableFuture result = completableFutureA.thenCombine(completableFutureB, (x, y) -> {System.out.println("--------结果合并");return x + y;});System.out.println(result.join());}
}
1、没有传入自定义线程池,都用默认线程池ForkJoinPool;
2、传入了一个自定义线程池,如果你执行第一个任务的时候,传入了一个自定义线程池:
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
3、备注
有可能处理太快,系统优化切换原则,直接使用main线程处理。
其它如: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理。