并发编程:CompletableFuture异步编程没有那么难

网站建设3年前发布
58 0 0

20230306104226e97aca547dcef40c90257408a61b9dbb66f5dc212,根大家好,我是七哥,今天给大家分享一个非常强大的工具类:CompletableFuture,如果你平时也会遇到用多线程优化业务逻辑的场景,那么今天这篇文章我建议你读完,相信一定会让你在重构相关代码时得心应手,写出让人称赞的好代码,不过使用CompletableFuture的前提是JDK需要1.8以上哦~,那我们下面进入今天的正文。,在Java开发的web项目中,我们经常会遇到接口响应耗时过长,或者定时任务处理过慢,那在Java中最常见的解决方法就是并行了,想必大家也都不陌生了。,今天的分享主要带大家从一个实际的串行场景出发,如何一步步优化,同时也会分享在Java中实现并行处理的多种方式,以及它们之间的区别和优缺点,通过对比总结更加深入的了解并且使用Java中并发编程的相关技术。,现在我们有一个查询carrier下所有Load的接口,它需要查询Loads信息、Instruction信息、Stops信息、Actions信息后然后组装数据。,这段代码会有什么问题?其实算是一段比较正常的代码,但是在某一个carrier下数据量比较大时,sql查询是相对较慢的,那有没有办法优化一下呢?,202303061042278624679170acfd19d3c474b8385a929f36442a127,当前这个请求耗时总计就是12s。上面实现中查询Load、Instruction、Stop、Action等信息是串行的,那串行的系统要做性能优化很常见的就是利用多线程并行了。,20230306104227725b3e25726795b684b1528ff36a6276d92165283,这种相互之间没有影响的任务,利用并行处理后耗时就可以优化为4s。,因为我们都是需要获取任务的返回值的,所以大家肯定想到是用 Future+Callable来做。,ThreadPoolExecutor提供了3个submit方法支持我们需要获取任务执行结果的需求。,简单介绍下这三个submit方法:,这三个方法的返回值都是Future接口,Future 提供了5个方法:,2023030610435185f1f38697c3dabe25d2913ff21fe3a6d15141651,分别是取消任务的方法cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。,需要注意的是:这两个 get()方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。,我们再介绍下FutureTask工具类,这是一个实实在在的工具类,有两个构造函数,和上面类似,一看就明白了。,这个类实现了Runnable 和 Future接口,可以理解就是将任务和结果结合起来了,变成一个可以有响应结果的任务进行提交,本质上FutureTask里面封装的还是一个Callable接口,它实现可以有返回值就是因为它的run方法里面调用了Callable的call()方法,将结果赋值给result,然后返回。,下面我们看下如何优化我们上面的查询接口,实现并行查询:,那你可能会想到,如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,该怎么处理呢?,这种问题基本上也都可以用 Future来解决,但是需要将对应的 FutureTask传入到当前任务中,然后调用get()方法即可。,比如,我们创建了两个 FutureTask——ft1和 ft2,ft1 需要等待 ft2 执行完毕后才能做最后的数据处理,所以 ft1 内部需要引用 ft2,并在执行数据处理前,调用 ft2 的 get()方法实现等待。,通过这上面的的例子,我们明显的发现 Future 实现异步编程时的一些不足之处:,我们很难表述Future结果之间的依赖性,从文字描述上这很简单。比如,下面文字描述的关系,如果用Future去实现时还是很复杂的。,比如:“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。,在JDK8中引入了CompletableFuture,对Future进行了改进,可以在定义CompletableFuture时传入回调对象,任务在完成或者异常时,自动回调,再也不需要每次主动通过Future 去询问结果了,我们接着往下看。,Java 在 1.8 版本提供了CompletableFuture 来支持异步编程,CompletableFuture 类实现了CompletionStage 和 Future,接口,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过 完成时回调 的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。,为了体会到CompletableFuture 异步编程的优势,我们还是先用 CompletableFuture 重新实现前面的程序。,通过上面的代码我们可以发现 CompletableFuture有以下优势:,CompletableFuture 提供了四个静态方法来创建一个异步操作:,这四个方法区别在于:,ForkJoinPool是JDK7提供的,叫做分支/合并框架。可以通过将一个任务递归分成很多分子任务,形成不同的流,进行并行执行,同时还伴随着强大的工作窃取算法,极大的提高效率,这个不属于今天我们讨论的点,感兴趣的话可以后面再聊。,注意️:如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。,问题:为什么supplyAsync方法接收一个 Supplier 函数式接口类型参数而不是一个 Callable 类型的参数呢?,看了接口定义,我们发现它们其实都是一个不接受任何参数类型的函数式接口,在实践中它们做的是相同的事情(定义一个业务逻辑去处理然后有返回值),但在原则上它们的目的是做不同的事情:,另外 CompletableFuture 类还实现了 CompletionStage 接口,这个接口就比较关键了,之所以能实现响应式编程,都是通过这个接口提供的方法。,下面介绍下 CompletionStage 接口,看字面意思可以理解为“完成动作的一个阶段”,官方注释文档:CompletionStage 是一个可能执行异步计算的“阶段”,这个阶段会在另一个 CompletionStage 完成时调用去执行动作或者计算,一个 CompletionStage 会以正常完成或者中断的形式“完成”,并且它的“完成”会触发其他依赖的 CompletionStage 。CompletionStage 接口的方法一般都返回新的CompletionStage,因此构成了链式的调用。,这个看完还是有点懵逼的,不清楚什么是 CompletionStage?,在Java中什么是 CompletionStage ?,一个Function、Comsumer、Supplier 或者 Runnable 都会被描述为一个CompletionStage。,2023030610422888fa45d25399b413244504ab2c9ab2c71617cb975,通过接口的继承关系,我们可以发现这里的异步操作到底什么时候结束、结果如何获取,都可以通过 Future接口来解决。,另外 CompletableFuture 类还实现了 CompletionStage 接口,这个接口就比较关键了,之所以能实现响应式编程,都是通过这个接口提供的方法。,下面介绍下 CompletionStage 接口,看字面意思可以理解为“完成动作的一个阶段”,官方注释文档:CompletionStage 是一个可能执行异步计算的“阶段”,这个阶段会在另一个 CompletionStage 完成时调用去执行动作或者计算,一个 CompletionStage 会以正常完成或者中断的形式“完成”,并且它的“完成”会触发其他依赖的 CompletionStage 。CompletionStage 接口的方法一般都返回新的CompletionStage,因此构成了链式的调用。,这个看完还是有点懵逼的,不清楚什么是 CompletionStage?,在Java中什么是 CompletionStage ?,一个Function、Comsumer、Supplier 或者 Runnable 都会被描述为一个CompletionStage。,但是 CompletionStage 这里面一共有40多个方法,我们该如何理解呢?,CompletionStage 接口可以清晰的描述任务之间的关系,可以分为 顺序串行、并行、汇聚关系以及异常处理。,CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。,thenApply() 和 thenCompose() 的区别?thenApply 转换的是泛型中的类型,是同一个CompletableFuture,thenCompose 用来连接两个CompletableFuture,是生成一个新的 CompletableFuture。他们都是让 CompletableFuture 可以对返回的结果进行后续操作,就像 Stream 一样进行 map 和 flatMap 的转换。,这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。,可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。,CompletableFuture 中 thenApply 如何实现?,CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别是源自 fn、consumer、action 这三个核心参数不同。,Async后缀的方法表示,前面的 CompletionStage 执行完成,在执行后续操作时会提交到线程池处理,否则就还是使用同一个处理线程完成CompletableFuture的所有任务。,这三种方法意思都是等两个 CompletionStage 都完成了计算才会执行下一步的操作,区别在于参数接口类型不一样。,CompletableFuture 中 thenAcceptBoth 如何实现?talk is cheap!,OR的关系,表示谁运行快就用谁的结果执行下一步操作。,同样也是有Async后缀的表示,当前面的 CompletionStage 执行完成,在执行后续操作时会提交到线程池处理。applyToEither、acceptEither、runAfterEither 三个方法的区别还是来自于不同的接口参数类型:Function、Consumer、Runnable。,CompletableFuture 中 applyToEither 如何实现?,在Java编程中,异常处理当然是必不可少的一环,那你可能会想到如果在使用 CompletableFuture 进行异步链式编程时,如果出现异常该怎么处理呢?,首先上面我们提到的 fn、consumer、action 它们的核心方法是不允许抛出可检查异常的,但是却无法限制它们抛出运行时异常。在同步方法中,我们可以使用 try-catch{} 来捕获并处理异常,但在异步编程里面异常该如何处理 ?CompletionStage 接口给我们提供的方案非常简单,比 try-catch{} 还要简单。,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。,handle 也是执行任务完成时对结果的处理,whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。,当上一个的 CompletableFuture 的值计算完成或者抛出异常的时候,会触发 handle 方法中定义的函数,结果由 BiFunction 参数计算而得,因此这组方法兼有 whenComplete 和转换的两个功能。,注意到了吗?这里使用了两个不同的Stream流水线,是否可以在同一个处理流的流水线上一个接一个地放置多个map操作。,这其实是有原因的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,不同的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定服务请求的动作、通知join方法返回结果。,再来看一个例子:,我们的系统提供的运费价格是以美元计价的,但是你希望以人民币(RMB)的方式提供给你的客户。你可以用异步的方式向计费中心查询指定Load的价格,同时从远程的汇率服务那里查到人民币和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以人民币计价的商品价格。,通过上述例子,可以看到相对于采用Java 8之前提供的Future实现,CompletableFuture版本实现所具备的巨大优势。CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。,为了更直观地感受一下使用CompletableFuture在代码可读性上带来的巨大提升,下面尝试仅使用Java 7中提供的特性,重新实现上述例子的功能。,这里我们思考这样一个问题:并行使用流还是CompletableFuture?,对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。同时也可以提供更多描述任务之间关系的接口,我们不需要为之编写更多的代码。,这里对使用这些API的建议如下:,今天大家学到了哪些知识呢?,不足之处:今天只是对于并发编程中的工具类使用和相关原理做了分享,在实际开发过程中可能需要考虑到更多的通用性,封装通过调用模版方法,不要每一个地方都写一堆类似的代码。,通过今天的分享,希望大家可以在平时开发工作中遇到合适的场景时尝试使用 CompletableFuture 提供的API,优化程序性能、提高开发效率。,那如果都看到了这里,一定要帮七哥给个点赞支持哦~

© 版权声明

相关文章