Sometimes I find myself in the situation where I have to perform some asynchronous tasks and perform another asynchronous task when all those tasks have been completed. As always in such situations, I’ve searched stackoverflow for a how-to and the top rated answer suggests the following solution:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) { | |
return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()])) | |
.thenApply(v -> com.stream() | |
.map(CompletableFuture::join) | |
.collect(toList()) | |
); | |
} |
That solution totally works and it is good. However, if you often deal with Streams
, a more functional approach would be neat. So I started coding a Collector
that does this operation for me in one go. I won’t go into detail of how a Collector
works, but this blog-post helped me out a lot understanding it.
Finally I ended up with this solution, which I’ve uploaded to github:
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Set; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.function.BiConsumer; | |
import java.util.function.BinaryOperator; | |
import java.util.function.Function; | |
import java.util.function.Supplier; | |
import java.util.stream.Collector; | |
import java.util.stream.Collectors; | |
public class CompletableFutureCollector<X, T extends CompletableFuture<X>> implements Collector<T, List<T>, CompletableFuture<List<X>>> { | |
private CompletableFutureCollector(){ | |
} | |
public static <X, T extends CompletableFuture<X>> Collector<T, List<T>, CompletableFuture<List<X>>> allOf(){ | |
return new CompletableFutureCollector<>(); | |
} | |
@Override | |
public Supplier<List<T>> supplier() { | |
return ArrayList::new; | |
} | |
@Override | |
public BiConsumer<List<T>, T> accumulator() { | |
return List::add; | |
} | |
@Override | |
public BinaryOperator<List<T>> combiner() { | |
return (left, right) -> { left.addAll(right); return left; }; | |
} | |
@Override | |
public Function<List<T>, CompletableFuture<List<X>>> finisher() { | |
return ls->CompletableFuture.allOf(ls.toArray(new CompletableFuture[ls.size()])) | |
.thenApply(v -> ls | |
.stream() | |
.map(CompletableFuture::join) | |
.collect(Collectors.toList())); | |
} | |
@Override | |
public Set<Characteristics> characteristics() { | |
return Collections.emptySet(); | |
} | |
} |
And here is how you would use it:
public static void main(String[] args) { | |
CompletableFuture<List<Integer>> collect = Stream.of(1,2,3).map(CompletableFuture::completedFuture).collect(CompletableFutureCollector.allOf()); | |
} |
Happy coding!
Update
Obviously it was late yesterday D: The solution I posted was hidden in another answer with less upvotes. It suggest using Collectors.collectAndThen
together with the sequence-method above. In my opinion this is cleaner than following my approach with writing the Collector
on your own (DRY-principle). The final solution is posted below and it contains another Collector
-factory method that can be used if you’re not interested in the results or the CompletableFutures
to collect are of type Void
.
import java.util.List; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.function.Function; | |
import java.util.stream.Collector; | |
import java.util.stream.Collectors; | |
public class CompletableFutureCollector2 { | |
private CompletableFutureCollector2(){ | |
} | |
/** | |
* Transforms a <pre>{@code List<CompletableFuture<T>>}</pre> into a <pre>{@code CompletableFuture<List<T>>}</pre> | |
* @param <X> the computed result type | |
* @param <T> some CompletableFuture | |
* @return a CompletableFuture of <pre>{@code CompletableFuture<List<T>>}</pre> that is complete when all collected CompletableFutures are complete. | |
*/ | |
public static <X, T extends CompletableFuture<X>> Collector<T, ?, CompletableFuture<List<X>>> collectResult(){ | |
return Collectors.collectingAndThen(Collectors.toList(), joinResult()); | |
} | |
/** | |
* Transforms a <pre>{@code List<CompletableFuture<?>>}</pre> into a <pre>{@code CompletableFuture<Void>}</pre> | |
* Use this function if you are not interested in the collected results or the collected CompletableFutures are of | |
* type Void. | |
* @param <T> some CompletableFuture | |
* @return a <pre>{@code CompletableFuture<Void>}</pre> that is complete when all collected CompletableFutures are complete. | |
*/ | |
public static <T extends CompletableFuture<?>> Collector<T, ?, CompletableFuture<Void>> allComplete(){ | |
return Collectors.collectingAndThen(Collectors.toList(), CompletableFutureCollector::allOf); | |
} | |
private static <X, T extends CompletableFuture<X>> Function<List<T>, CompletableFuture<List<X>>> joinResult() { | |
return ls-> allOf(ls) | |
.thenApply(v -> ls | |
.stream() | |
.map(CompletableFuture::join) | |
.collect(Collectors.toList())); | |
} | |
private static <T extends CompletableFuture<?>> CompletableFuture<Void> allOf(List<T> ls) { | |
return CompletableFuture.allOf(ls.toArray(new CompletableFuture[ls.size()])); | |
} | |
} |
Top comments (1)
Very cool!