JUC并发编程随记二——CompletableFuture
创始人
2025-05-30 21:51:32

1、CompletableFuture

1.1、简介

JDK8设计出CompletableFuture,提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
在这里插入图片描述
(1)CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段。
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x->square(x)).thenAccept(x->System.out.print(x)).thenRnu(()->System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是有多个阶段一起触发。
    代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
    (2)CompletableFuture
  • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式计算处理结果,也提供了转换和组合CompletableFuture的方法。
  • 它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现类Future和CompletionStage接口。

1.2、CompletableFuture四大静态方法

1.2.1、runAsync(Runnable()):无返回值两种

/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the {@link ForkJoinPool#commonPool()} after* it runs the given action.** @param runnable the action to run before completing the* returned CompletableFuture* @return the new CompletableFuture*/public static CompletableFuture runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the given executor after it runs the given* action.** @param runnable the action to run before completing the* returned CompletableFuture* @param executor the executor to use for asynchronous execution* @return the new CompletableFuture*/public static CompletableFuture runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);}

(1)runAsync(Runnable runnable)

/*** @Description: 1、runAsync(Runnable()):无返回值* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 不建议直接new使用// CompletableFuture completableFuture = new CompletableFuture();// 推荐使用四大静态方法// 1、runAsync(Runnable()):无返回值CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(completableFuture.get());}}

在这里插入图片描述
没指定Executor的方法,直接使用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。
如果指定,则使用我们自定义的或者特别指定的线程执行异步代码。
(2)unAsync(Runnable runnable,Executor executor)

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @Description: 1、runAsync(Runnable()):无返回值,指定Executors参数* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 不建议直接new使用// CompletableFuture completableFuture = new CompletableFuture();// 推荐使用四大静态方法// 1、runAsync(Runnable()):无返回值ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}},threadPool);System.out.println(completableFuture.get());threadPool.shutdown();}}

在这里插入图片描述

1.2.2、supplyAsync(Supplier supplier)有返回值两种

/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the {@link ForkJoinPool#commonPool()} with* the value obtained by calling the given Supplier.** @param supplier a function returning the value to be used* to complete the returned CompletableFuture* @param  the function's return type* @return the new CompletableFuture*/public static  CompletableFuture supplyAsync(Supplier supplier) {return asyncSupplyStage(asyncPool, supplier);}/*** Returns a new CompletableFuture that is asynchronously completed* by a task running in the given executor with the value obtained* by calling the given Supplier.** @param supplier a function returning the value to be used* to complete the returned CompletableFuture* @param executor the executor to use for asynchronous execution* @param  the function's return type* @return the new CompletableFuture*/public static  CompletableFuture supplyAsync(Supplier supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);}

(1)supplyAsync(Supplier supplier)

/*** @Description: 1、supplyAsync(Supplier supplier):有返回值* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "Hello supplyAsync";});System.out.println(completableFuture.get());}}

在这里插入图片描述
(2)supplyAsync(Supplier supplier,Executor executor)

import java.util.concurrent.*;/*** @Description: 1、supplyAsync(Supplier supplier,Executor executor):有返回值* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName());try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "Hello supplyAsync";},threadPool);System.out.println(completableFuture.get());threadPool.shutdown();}}

在这里插入图片描述

1.3、通用异步编程

从JAVA8开始引入了CompletableFuture,它是future的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回 调对象的回调方法。

/*** @Description: 1、supplyAsync(Supplier supplier,Executor executor):有返回值* @Author: yangyb* @Date:2022/9/25 16:48* Version: 1.0**/
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(3);try{CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName()+"--come in");int result = ThreadLocalRandom.current().nextInt(10);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("1秒后的结果是:"+result);return result;},threadPool).whenComplete((v,e)->{if(e==null){System.out.println("--计算完成,更新系统:"+v);}}).exceptionally(e->{e.printStackTrace();System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());return null;});System.out.println(Thread.currentThread().getName()+"线程先去忙其它任务");}catch (Exception e){e.printStackTrace();}finally {threadPool.shutdown();}}}

在这里插入图片描述
CompletableFuture的优点:
(1)异步任务结束时,会自动回调某个对象的方法。
(2)主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。
(3)异步任务出错时,会自动回调某个对象的方法。

1.4、链式语法和join方法介绍

在这里插入图片描述

1.5、电商比价案列

package com.company.juc;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Description: TODO* @Author: yangyb* @Date:2022/9/26 22:09* Version: 1.0**/
public class CompletableFutureMallDemo {static List list= Arrays.asList(new NetMall("jd"),new NetMall("dangdang"),new NetMall("taobao"),new NetMall("tianmao"));public static List getPrice(List list,String productName){return list.stream().map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName))).collect(Collectors.toList());}public static List getPriceByCompletableFuture(List list,String productName){return list.stream().map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + "in %s price is %.2f", netMall.getNetMallName(), netMall.calcPrice(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args){long startTime = System.currentTimeMillis();List list1 = getPrice(list, "mysql");list1.forEach(System.out::println);long endTime =System.currentTimeMillis();System.out.println("-------costTime: "+(endTime-startTime)+" 毫秒");long startTime2 = System.currentTimeMillis();List list2 = getPriceByCompletableFuture(list, "mysql");list2.forEach(System.out::println);long endTime2 =System.currentTimeMillis();System.out.println("-------costTime: "+(endTime2-startTime2)+" 毫秒");}
}class NetMall{public String netMallName;public double calcPrice(String productName){try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return ThreadLocalRandom.current().nextDouble()*2+productName.charAt(0);}public NetMall(String netMallName) {this.netMallName = netMallName;}public String getNetMallName() {return netMallName;}}

在这里插入图片描述

2、CompletableFuture常用方法

2.1、获得结果和触发计算

。。。。。。。。。。

2.2、对计算结果进行处理

。。。。。。。。。。

2.3、对计算结果进行消费

在这里插入图片描述

/*** @Description: * @Author: yangyb* @Date:2022/9/27 21:25* Version: 1.0**/
public class CompletableFutureAPI3Demo {public static void main(String[] args){// thenRun(Runnable runnable),任务A执行完B,并且B不需要A的结果System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{}).join());// thenAccept(Consumer action),任务A执行完B,B需要A的结果,但是任务B无返回值System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(System.out::println).join());// thenAccept,任务A执行完B,B需要A的结果,同时任务B有返回值System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply((r)-> r+"resultB").join());}
}

在这里插入图片描述

import java.util.concurrent.CompletableFuture;/*** @Description: thenAccept,接收任务的处理结果,并消费处理,无返回结果* @Author: yangyb* @Date:2022/9/27 21:25* Version: 1.0**/
public class CompletableFutureAPI3Demo {public static void main(String[] args){CompletableFuture.supplyAsync(()->{return 1;}).thenApply(f->{return f+2;}).thenApply(f->{return f+3;}).thenAccept(System.out::println);}
}

在这里插入图片描述

2.4、对计算速度进行选择

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;/*** @Description: TODO* @Author: yangyb* @Date:2022/9/27 22:16* Version: 1.0**/
public class CompletableFutureFastDemo {public static void main(String[] args){CompletableFuture playA = CompletableFuture.supplyAsync(() -> {System.out.println("A come in");try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "playA";});CompletableFuture playB = CompletableFuture.supplyAsync(() -> {System.out.println("B come in");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "playB";});CompletableFuture result = playA.applyToEither(playB, f -> {return f + "is winner";});System.out.println(Thread.currentThread().getName()+"\t"+"------: "+result.join());}
}

在这里插入图片描述

2.5、对计算结果进行合并

import java.util.concurrent.CompletableFuture;/*** @Description: thenCombine* @Author: yangyb* @Date:2022/9/27 22:32* Version: 1.0**/
public class CompletableFutureCombineDemo {public static void main(String[] args){CompletableFuture completableFutureA = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName()+"\t---启动");return 10;});CompletableFuture completableFutureB = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName()+"\t---启动");return 13;});CompletableFuture result = completableFutureA.thenCombine(completableFutureB, (x, y) -> {System.out.println("--------结果合并");return x + y;});System.out.println(result.join());}
}

在这里插入图片描述

3、CompletableFuture之线程池运行选择

  • 1、没有传入自定义线程池,都用默认线程池ForkJoinPool;

  • 2、传入了一个自定义线程池,如果你执行第一个任务的时候,传入了一个自定义线程池:
    调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
    调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

  • 3、备注
    有可能处理太快,系统优化切换原则,直接使用main线程处理。
    其它如: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理。

相关内容

热门资讯

电压放大器在钢筋剥离损伤识别试...   实验名称:钢筋剥离损伤识别试验  研究方向:无损检测  测试目的&#...
MOCO论文前几段精读 MoCo MoCo是CVPR 2020的最佳论文提名,算是视觉领域里,使...
【lua初级篇】基础知识和开发... 文章介绍 文章介绍 简述 工具安装配置和下载 快速看基础知识 一些常用的关键字一览 数据类型 tab...
Yuv422、Nv12转C#B... 1.1、Nv12转Bitmapint w = 1920;int h = 1080;i...
Linux互斥量和信号量的区别... 互斥量和信号量的区别 1.互斥量用于线程的互斥: 互斥:加锁解锁,是指某...
Git 和 GitHub 超入... 1.解决行结束符问题 需要在你的仓库中添加一个.gitattributes文件,标记正...
基于C++的AI五子棋游戏项目... 项目资源下载 基于C++的AI五子棋游戏项目源码压缩包下载地址基于C+...
#浅聊 webSocket (... 如果可以实现记得点赞分享,谢谢老铁~ 一,什么是webso...
Java SE API kno... Java SE API know how 字符串 紧凑字符串 java8 无论字符串的编码ÿ...
常用的VB函数 数学函数函数说明示例Sin(N)返回自变量N的正弦值Sin(0)=0 N为弧度Cos(N)返...
C++ 机房预约系统(五):管... 7.3 显示功能 功能描述: 显示学生信息或教师信息 功能实现: voi...
PIC单片机的一些问题 error 1347 can't find 0x16 words (0x16 withtotal) ...
完美日记母公司再度携手中国妇基... 撰稿 | 多客 来源 | 贝多财经 当春时节,梦想花开。和煦的三月暖阳,...
GDPU C语言 天码行空3 1. 分段函数 #includeint main(){double x,y;scanf("%lf",...
【瑞萨 MCU】开发环境搭建之... e2 studio e2 studio(简称为 e2 或 e2s)是瑞萨...
C语言内联汇编 之前我们介绍了一种C语言与汇编代码混合编程方式,就是两个文件分开编写,分...
Linux 网络编程学习笔记—... 一、TCP 服务的特点 传输层协议主要有 TCP 协议和 UDP 协议,前者相对于后者...
KubeSphere All ... KubeSphere All in one安装配置手册 1. 初始化 1.1 配置apt源 # vi...
学习软件测试怎么能缺少练手的软... 你好,我是凡哥。 最近收到许多自学自动化测试的小伙伴私信,学习了理论知识...
【面试题】浅谈css加载是否会... 大厂面试题分享 面试题库前后端面试题库 (面试必备) 推荐:...
直播带货系统开发的关键点、代码... 时下,直播的热度依然不减,而它的产物之一:直播带货系统&#...
一文读懂强化学习! 一.了解强化学习1.1基本概念强化学习是考虑智能体(Agent)与环境&...
Spring Cloud之一:... 目录 环境 Eureka工程的创建步骤 系列目录(持续更新。。。) S...
golang实现守护进程(2) 前言golang实现守护进程,包含功能:1. 守护进程只创建一次2. 平...
url 格式详解 统一资源定位系统(uniform resource locator; url ...
elasticsearch7.... elasticsearch版本:7.17.3 目标:实现对类型为text...
SpringBoot 加载系统... 开发环境: IDEA 2022.1.4+ MyBatis         代码参考:spri...
交换机概念和知识和命令 目录 一、华为交换机基础学习的一些重要概念和知识 二、交换机常用命令大全 三、不常用的交换机命令 ...
什么是 JavaScript ... 本文首发自「慕课网」,想了解更多IT干货内容,程序员圈内热闻࿰...
【C++】C++11——lam... 文章目录一、Lambda表达式引入二、Lambda表达式语法三、Lambda表达式交换两个值四、La...