首页app软件java链表队列 java链式队列

java链表队列 java链式队列

圆圆2025-08-02 23:00:27次浏览条评论

Java CompletableFuture 链式顺序执行与结果列表收集教程本教程详细探讨了如何在Java中使用CompletableFuture实现序列异步任务的顺序执行,把每个任务的结果放到一个列表中。文章收集了两种主要策略:一种是利用外部列表累积结果,另一种是采用更函数式的方式在CompletionStage链中传递并更新结果列表。通过深入解析thenCompose、thenAccept和thenApply等核心方法,并提供示例代码,有帮助开发者、优雅地处理需要严格顺序执行的异步流程。

在现代java开发中,completablefuture是处理高效异步操作的强工具。然而,当遇到一系列需要严格顺序执行的异步任务,并且将每个任务的结果收集起来时,开发者可能会遇到挑战。尤其当每个异步任务本身应用程序就返回一个完成阶段时,如何正确地链式调用避免并无线程阻塞或异步问题,是实现异步需要的关键。

核心问题解析

假设我们有一个运行业务处理函数process(int a),它返回一个CompletionStage:import java.time.LocalDateTime;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.CompletionStage;import java.util.stream.Collectors;import java.util.stream.IntStream;public class CompletableFutureSequential { private CompletionStagelt;Integergt; process(int a) { return CompletableFuture.supplyAsync(() -gt; { System.err.printf(quot;s dispatch d\nquot;, LocalDateTime.now(), a); // 模拟长时间运行的业务流程 try { Thread.sleep(10); // 增加延迟观察顺序性 } 以 catch (InterruptedException e) { Thread.currentThread().interrupt(); } return a 10; }).whenCompleteAsync((e, t) -gt; { if (t != null) System.err.printf(quot;!!!处理 'd' 时出错 !!!\nquot;, a); System.err.printf(quot;s 完成 d\nquot;, LocalDateTime.now(), e); }); } // ... (后续解决方案代码将放在这里)}登录后复制

我们的目标是按顺序执行一系列过程,把它们的整数结果收集到一个列表中。

常见误区与问题:

在 thenApplyAsync 内部使用 join():

立即学习“Java免费成功学习笔记(研究)”;//第一次尝试(但效率低下)Listlt;Integergt; arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());CompletionStagelt;Listlt;Integergt;gt; resultStage1 = CompletableFuture.completedFuture(new ArrayListlt;gt;());for (Integer element : arr) { resultStage1 = resultStage1.thenApplyAsync((ret) -gt; { // 在异步中拒绝等待另一个CompletableFuture完成Integer a = process(element).toCompletableFuture().join(); ret.add(a); return ret; });}Listlt;Integergt;computeResult1 = resultStage1.toCompletableFuture().join();//这种方法虽然能实现顺序执行,但是`join()`的使用意味着在`thenApplyAsync`的执行线程中会发生阻塞,//导致一个stage的执行可能占用两个线程资源(一个用于`thenApplyAsync`,另一个用于`process`内部的`supplyAsync`,//且`thenApplyAsync`的线程会等待`process`完成),效率不高且不符合异步编程的最佳。登录后复制

这种方式虽然实现了顺序性,但 join() 是一个阻塞操作。在 thenApplyAsync 的回调中调用 join() 会导致该回调直到某个线程被阻塞,process(element)完成。这违背了异步编程的非阻塞原则,而且实践中可能导致线程池资源被低效利用。

使用 thenCombineAsync 进行链式调用://第二次尝试(失败,因为是有点执行)Listlt;Integergt; arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());CompletionStagelt;Listlt;Integergt;gt; resultStage2 = CompletableFuture.completedFuture(new ArrayListlt;gt;());for (Integer element : arr) { // thenCombineAsync 会尝试完成两个CompletionStage resultStage2 = resultStage2.thenCombineAsync(process(element), (array, ret) -gt; { array.add(ret); return array; });}// resultStage2.toCompletableFuture().join();//这个方法会导致`process(element)` 几乎同时被调度执行,//因为 `thenCombineAsync` 的设计目的是在两个CompletionStage都完成后,将它们的结果合并。//这与我们要求的“顺序执行”相悖。登录后复制

thenCombineAsync的作用是等待两个独立的CompletionStage都完成后,再将它们的结果合并。这意味着进程(元素)会在循环迭代时被立即触发,而不是等待前一个进程完成。因此,它无法保证任务的顺序执行。正确的解决方案:利用 thenCompose 实现顺序链式调用

thenCompose 是 CompletionStage 中用于顺序执行异步操作的关键方法。它接收一个函数,该函数会返回一个新的 CompletionStage。当当前的 CompletionStage 完成后,thenCompose 会使用其结果来触发并等待这个新的 CompletionStage方案一:使用外部列表累积结果

这种方法通过一个外部的列表来收集结果。我们初始化一个表示“前一个阶段已完成”的 CompletionStage,然后循环激发新的流程任务链接到它后面。

public class CompletableFutureSequential { // ... (处理方法同上) public static void main(String[] args) { CompletableFutureSequential app = new CompletableFutureSequential(); Listlt;Integergt; arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); System.out.println(quot;--- 方案:使用外部列表累计结果---quot;);CompletionStagelt;Voidgt;loopStage = CompletableFuture.completedFuture(null);final Listlt;Integergt;resultList = new ArrayListlt;gt;(); // 外部列表 for (Integer element : arr) { LoopStage = LoopStage // 当loopStage完成后,执行 process(element) .thenCompose(v -gt; app.process(element)) // 当process(element)完成后,将其结果添加到resultList .thenAccept(resultList::add); } // 阻止完成所有等待任务 LoopStage.toCompletableFuture().join(); System.out.println(quot;一方案结果: quot; resultList); // 预期输出:[11, 12, 13, 14, 15, 16, 17, 18, 19] }}登录后复制

原理分析:CompletableFuture.completedFuture(null)创建了一个立即完成的CompletionStage,作为链的起点。在循环中,loopStage = LoopStage.thenCompose(...)完成了每次迭代都将新的流程任务链接到前一个任务的完成。thenCompose之后的作用是:当前一个CompletionStage(即loopStage的前一个状态)完成后,才执行v -gt; app.process(element),并等待app.process(element)返回的CompletionStage 完成。thenAccept(resultList::add) 在 process(element) 完成并产生结果后,将其结果添加到外部的 resultList 中。

thenAccept 不会改变 CompletionStage 的结果类型,它返回一个 CompletionStage,这与 LoopStage 的类型兼容,使得链式调用可以继续。最终,loopStage.toCompletableFuture().join() 会阻塞当前线程,直到整个链上的所有异步任务都按顺序执行完毕,并且所有结果都被添加到 resultList 中。方案二:在 CompletionStage 中链中提交列表

此类方法更加函数式,将结果列表作为CompletionStage的结果在链中提交和更新。

public class CompletableFutureSequential { // ... (流程方法同上) public static void main(String[] args) { // ... (方案一代码,简洁以聚焦方案二) System.out.println(quot;\n--- 方案二:在CompletionStage链中传递列表 ---quot;); Listlt;Integergt; arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStagelt;Listlt;Integergt;gt; listStage = CompletableFuture.completedFuture(new ArrayListlt;gt;()); // 初始列表作为结果 for (Integer element : arr) { listStage = listStage // 当 listStage (包含当前列表)完成后,执行 process(element) .thenCompose(list -gt; app.process(element) // 当 process(element) 完成后,将结果添加到建立的列表中 .thenAccept(list::add) // 关键:将更新后的列表作为下一个 CompletionStage 的结果返回 .thenApply(v -gt; list) ); } Listlt;Integergt; resultList2 = listStage.toCompletableFuture().join(); System.out.println(quot;方案二结果: quot; resultList2); // 预期输出:[11, 12, 13, 14, 15, 16, 17, 18, 19] }}登录后复制

原理分析:CompletableFuture.completedFuture(new ArrayList())创建了一个初始的CompletionStage,其结果是一个空的ArrayList。这个列表将作为状态在链中传递。在循环中,listStage = listStage.thenCompose(list -gt; ...):list参数在CompletionStage之前 的结果(即当前累积的列表)。app.process(element) 异步执行下一个任务。

.thenAccept(list::add):当process(element)完成后,将其结果添加到list中。注意,thenAccept返回的是CompletionStage。.thenApply(v -gt;list):是关键一步。由于thenAccept返回CompletionStage,为了让整个thenCompose块的结果仍然是CompletionStagegt;,我们需要使用thenApply将更新后的list重新包装成CompletionStage这样,更新后面的列表就可以传递给下一个 thenCompose 调用。最终,listStage.toCompletableFuture().join() 阻止并获取最终完成的 CompletionStage 中包含的完整结果列表。注意事项与最佳实践

线程管理:thenCompose 和 thenAccept(不带异步后缀)默认会尝试与前一个阶段相同的线程或默认的ForkJoinPool.commonPool() 中执行。如果process方法本身已经通过supplyAsync或其他方式将计算offload 对于单独的线程池,那么链式操作的执行线程通常不会成为瓶颈。如果需要显式控制后续操作的执行线程,可以使用 thenComposeAsync 和 thenAcceptAsync 并指定 Executor。

错误处理:在链式调用中,任何一个 CompletionStage 发生异常,都会导致整个链条的后续操作被跳过,异常会传递到最终的 CompletionStage。可以异常使用(ex -gt;defaultValue)处理来异常并提供一个值默认,或者使用handle((result, ex) -gt;...)来统一处理正常结果和异常。

阻塞操作:join() 与 get():晚上中使用了 join() 来阻塞主线程以获取最终结果。在实际生产环境中,应避免避免在主线程中阻塞。如果可能,应将最终的 CompletionStage返回或进行异步处理,例如将其结果提交给另一个异步任务或使用回调函数。join() 会触发CompletionException(非受检异常),而 get() 会触发ExecutionException 和 InterruptedException(受检异常),需要捕获处理。

选择方案:方案一(外部列表)相对简单观察,适用于对外部状态进行操作的场景。方案二(链中传递列表)更符合函数式编程的思想,将状态封装在异步流程中,避免了对外部可变状态的直接依赖(虽然列表本身是可变的,但每次提交都是同引用)。在更复杂的场景下,这种模式可能更容易管理和测试。

通过理解和应用 thenCompose,开发者可以有效地构建复杂、顺序执行的异步任务流,同时保持代码的清晰性和响应性。

以上就是Java CompletableFuture链式顺序执行与结果列表收集教程的详细内容,更多请关注乐哥常识网相关文章!

Java Compl
Kotlin Spring 开发:Flow 与 Suspend 的选择与应用 spring for kotlin
相关内容
发表评论

游客 回复需填写必要信息