ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

按 CompletableFuture 完成顺序实现 Streaming Future

2021-05-30 10:56:07  阅读:278  来源: 互联网

标签:indexedFutures return futures Streaming future CompletableFuture java Future


Java 8 给引入了 `CompletableFuture` 和 Stream API 这样的工具。让我们尝试把它们结合起来,创建一个 Stream 在 future 完成时返回一组 `CompletableFutures` 集合。


在 [parallel-collectors][1] V1.0.0 开发中也使用了这种方法。


[1]:https://github.com/pivovarit/parallel-collectors


把 CompletableFuture 转成 Steam


基本上,我们要做的就是设计一种方案,把一组 future 集合转换成由任务返回值组成的 Steam:


```java
Collection<CompletableFuture<T>> -> Stream<T>
```


在 Java 的世界里,这可以通过使用 `static` 方法实现:


```java
public static <T> Stream<T> inCompletionOrder(Collection<CompletableFuture<T>> futures) {
   // ...
}
```


要创建自定义 `Stream`,需要自己实现一个 `java.util.Spliterator`:


```java
final class CompletionOrderSpliterator<T>
 implements Spliterator<T> { ... }
```


下面是 `static` 方法的具体实现:


```java
public static <T> Stream<T> completionOrder(Collection<CompletableFuture<T>> futures) {
   return StreamSupport.stream(
     new CompletionOrderSpliterator<>(futures), false);
}
```


这部分相对简单,现在让我们实现 `CompletionOrderSpliterator`。


实现 CompletionOrderSpliterator


要实现自定义 `Spliterator`,需要完成下列方法:


```java
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
   CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
       // TODO
   }
   @Override
   public boolean tryAdvance(Consumer<? super T> action) {
       // TODO
   }
   @Override
   public Spliterator<T> trySplit() {
       // TODO
   }
   @Override
   public long estimateSize() {
       // TODO
   }
   @Override
   public int characteristics() {
       // TODO
   }
}
```


当然,构造函数也要实现。


"最直接的解决方法:拷贝传入的集合,等待 future 完成,把完成的 future 从集合里移除,把结果传给 `Spliterator`。"


使用 `CompletableFuture#anyOf` 可等待 future 完成,并且默认实现了正确的异常处理。


然而,还有一个问题略显复杂。


如果仔细查看 `CompletableFuture#anyOf` 方法,会发现它不是很实用,因为要求传入多个 `CompletableFutures<?>` 然后返回一个 `CompletableFuture< Object>` 对象,但这不是主要问题,只是稍有不便。


真正的问题在于,方法返回的 `CompletableFuture<Object>` 对象并不是第一个完成的 future,而是当有任何一个 future 完成时新建的 `CompletableFuture` 实例。


这种方案把"等待 future 完成,然后从列表移除"变复杂了。"我们不能依赖引用想等性,所以要么在 `CompletableFuture#anyof` 触发后执行线性扫描,要么试着想出更好的办法。"


> 译注:"Reference Equality 引用相等性"是对象相等性的一部分,在两个被比较的引用都指向同一个对象的情况下,通过使用 `==` 而不是进一步进行对象比较。


一种简单的解决方案:


```java
private T takeNextCompleted() {
   anyOf(futureQueue.toArray(new CompletableFuture[0])).join();
   CompletableFuture<T> next = null;
   for (CompletableFuture<T> future : futureQueue) {
       if (future.isDone()) {
           next = future;
           break;
       }
   }
   futureQueue.remove(next);
   return next.join();
}
```


上面的代码中,执行线性扫描并记录了 `index`,确保移出操作时间复杂度为常量。尽管已经知道数组大小,为什么还要向 `CompletableFuture[]` 传 0?


[2]:https://shipilev.net/blog/2016/arrays-wisdom-ancients/


从实用角度看,这个方案应该是足够好了,"通常没有人会处理1万~2万大小的 future 集合",而且硬件支持的线程数量有上限。受堆栈大小等多种因素影响,实际支持的线程数量会有所差别。不过,一旦“[Loom 项目][3]”投入使用,这种情况可能会有改善。


> 译注:Loom 项目提供一个轻量级用户态的纤程,简化并发编程并且更为高效。


[3]:https://openjdk.java.net/projects/loom/


尽管如此,2万次迭代最乐观的情况下会访问2万个节点(即总是第一个完成的 future),至多访问[2亿个节点][4]节点。


[4]:https://en.wikipedia.org/wiki/Arithmetic_progression#Sum


如果无法依赖 `CompletableFuture` 引用相等性或者 hashcode 还可以做怎样的改进?


可以为 future 分配 id,将它们与对象 future 一起存储到 map 中,这样 future 可以通过关联的 index 标记自己。


所以,让我们把 future 存到 map 中:


```java
private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
```


现在,可以从一个单调递增序列中手动指定 id,并让 future 返回时带上 id:


```java
private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(List<CompletableFuture<T>> futures) {
   Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map
     = new HashMap<>(futures.size(), 1); // 因为知道集合大小和预期的冲突计数 (0), 可以提前指定 HashMap 大小
   int seq = 0;
   for (CompletableFuture<T> future : futures) {
       int index = seq++;
       map.put(
         index,
         future.thenApply(
           value -> new AbstractMap.SimpleEntry<>(index, value)));
   }
   return map;
}
```


现在,可以高效地找到并处理下一个完成的 future:等待 future,读取序列号,根据序列号从剩余序列中移除:


```java
private T nextCompleted() {
   return anyOf(indexedFutures.values()
     .toArray(new CompletableFuture[0]))
       .thenApply(result -> ((Map.Entry<Integer, T>) result))
       .thenApply(result -> {
           indexedFutures.remove(result.getKey());
           return result.getValue();
       }).join();
}
```


`tryAdvance()` 的实现很简单:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
   if (!indexedFutures.isEmpty()) {
       action.accept(nextCompleted());
       return true;
   } else {
       return false;
   }
}
```


最困难的部分已经解决,现在需要实现剩下的三个方法:


```java
@Override
public Spliterator<T> trySplit() {
   return null; // 不支持 split
}
@Override
public long estimateSize() {
   return indexedFutures.size(); // 提前知道集合的大小
}
@Override
public int characteristics() {
   return
     SIZED       // 知道前面的大小
     & IMMUTABLE // 输入的集合可安全地修改
     & NONNULL;  // 输入的集合不支持 null
}
```


到这里代码已经完成。


示例演示


可以加入随机处理延迟快速验证代码是否正确:


```java
public static void main(String[] args) {
   ExecutorService executorService = Executors.newFixedThreadPool(10);
   List<CompletableFuture<Integer>> futures = Stream
     .iterate(0, i -> i + 1)
     .limit(100)
     .map(i -> CompletableFuture.supplyAsync(
       withRandomDelay(i), executorService))
     .collect(Collectors.toList());
   completionOrder(futures)
     .forEach(System.out::println);
}

private static Supplier<Integer> withRandomDelay(Integer i) {
   return () -> {
       try {
           Thread.sleep(ThreadLocalRandom.current()
             .nextInt(10000));
       } catch (InterruptedException e) {
           // 无耻地留白了, 请不要在生产环境中这么做
       }
       return i;
   };
}
```


可以看到,结果没有按照原来的顺序返回:


Streaming Future 的原始顺序


```shell
6
5
2
4
1
11
8
12
3
```



按原始顺序 Streaming Future


假如要求只保持原来的顺序该怎么处理?


幸运的是,可以像下面这样实现,无需添加特别的实现:


```java
public static <T> Stream<T> originalOrder(
 Collection<CompletableFuture<T>> futures) {
   return futures.stream().map(CompletableFuture::join);
}
```


完整示例


```java
package com.pivovarit.collectors;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import static java.util.concurrent.CompletableFuture.anyOf;
/**
* @author Grzegorz Piwowarek
*/
final class CompletionOrderSpliterator<T> implements Spliterator<T> {
   private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;
   CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
       indexedFutures = toIndexedFutures(futures);
   }
   @Override
   public boolean tryAdvance(Consumer<? super T> action) {
       if (!indexedFutures.isEmpty()) {
           action.accept(nextCompleted());
           return true;
       } else {
           return false;
       }
   }
   private T nextCompleted() {
       return anyOf(indexedFutures.values().toArray(new CompletableFuture[0]))
         .thenApply(result -> ((Map.Entry<Integer, T>) result))
         .thenApply(result -> {
             indexedFutures.remove(result.getKey());
             return result.getValue();
         }).join();
   }
   @Override
   public Spliterator<T> trySplit() {
       return null;
   }
   @Override
   public long estimateSize() {
       return indexedFutures.size();
   }
   @Override
   public int characteristics() {
       return SIZED & IMMUTABLE & NONNULL;
   }
   private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(Collection<CompletableFuture<T>> futures) {
       Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map = new HashMap<>(futures.size(), 1);
       int counter = 0;
       for (CompletableFuture<T> f : futures) {
           int index = counter++;
           map.put(index, f.thenApply(value -> new AbstractMap.SimpleEntry<>(index, value)));
       }
       return map;
   }
}
```


本文完整的源代码也可以[在 GitHub 上找到][5]。


[5]:https://github.com/pivovarit/articles/blob/master/java-completion-order-spliterator/src/main/java/com/pivovarit/stream/CompletionOrderSpliterator.java


标签:indexedFutures,return,futures,Streaming,future,CompletableFuture,java,Future
来源: https://blog.51cto.com/u_15127686/2832732

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有