DEV Community

jklingsporn
jklingsporn

Posted on • Originally published at jensonjava.wordpress.com on

4 1

Complete a Stream of CompletableFutures

Sometimes I find myself in the situation where I have to perform some asynchronous tasks and perform another asynchronous task when all those tasks have been completed. As always in such situations, I’ve searched stackoverflow for a how-to and the top rated answer suggests the following solution:

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(toList())
);
}

That solution totally works and it is good. However, if you often deal with Streams, a more functional approach would be neat. So I started coding a Collector that does this operation for me in one go. I won’t go into detail of how a Collector works, but this blog-post helped me out a lot understanding it.

Finally I ended up with this solution, which I’ve uploaded to github:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
public class CompletableFutureCollector<X, T extends CompletableFuture<X>> implements Collector<T, List<T>, CompletableFuture<List<X>>> {
private CompletableFutureCollector(){
}
public static <X, T extends CompletableFuture<X>> Collector<T, List<T>, CompletableFuture<List<X>>> allOf(){
return new CompletableFutureCollector<>();
}
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
@Override
public BinaryOperator<List<T>> combiner() {
return (left, right) -> { left.addAll(right); return left; };
}
@Override
public Function<List<T>, CompletableFuture<List<X>>> finisher() {
return ls->CompletableFuture.allOf(ls.toArray(new CompletableFuture[ls.size()]))
.thenApply(v -> ls
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}

And here is how you would use it:

public static void main(String[] args) {
CompletableFuture<List<Integer>> collect = Stream.of(1,2,3).map(CompletableFuture::completedFuture).collect(CompletableFutureCollector.allOf());
}

Happy coding!


Update

Obviously it was late yesterday D: The solution I posted was hidden in another answer with less upvotes. It suggest using Collectors.collectAndThen together with the sequence-method above. In my opinion this is cleaner than following my approach with writing the Collector on your own (DRY-principle). The final solution is posted below and it contains another Collector-factory method that can be used if you’re not interested in the results or the CompletableFutures to collect are of type Void.

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
public class CompletableFutureCollector2 {
private CompletableFutureCollector2(){
}
/**
* Transforms a <pre>{@code List<CompletableFuture<T>>}</pre> into a <pre>{@code CompletableFuture<List<T>>}</pre>
* @param <X> the computed result type
* @param <T> some CompletableFuture
* @return a CompletableFuture of <pre>{@code CompletableFuture<List<T>>}</pre> that is complete when all collected CompletableFutures are complete.
*/
public static <X, T extends CompletableFuture<X>> Collector<T, ?, CompletableFuture<List<X>>> collectResult(){
return Collectors.collectingAndThen(Collectors.toList(), joinResult());
}
/**
* Transforms a <pre>{@code List<CompletableFuture<?>>}</pre> into a <pre>{@code CompletableFuture<Void>}</pre>
* Use this function if you are not interested in the collected results or the collected CompletableFutures are of
* type Void.
* @param <T> some CompletableFuture
* @return a <pre>{@code CompletableFuture<Void>}</pre> that is complete when all collected CompletableFutures are complete.
*/
public static <T extends CompletableFuture<?>> Collector<T, ?, CompletableFuture<Void>> allComplete(){
return Collectors.collectingAndThen(Collectors.toList(), CompletableFutureCollector::allOf);
}
private static <X, T extends CompletableFuture<X>> Function<List<T>, CompletableFuture<List<X>>> joinResult() {
return ls-> allOf(ls)
.thenApply(v -> ls
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
private static <T extends CompletableFuture<?>> CompletableFuture<Void> allOf(List<T> ls) {
return CompletableFuture.allOf(ls.toArray(new CompletableFuture[ls.size()]));
}
}

AWS GenAI LIVE image

How is generative AI increasing efficiency?

Join AWS GenAI LIVE! to find out how gen AI is reshaping productivity, streamlining processes, and driving innovation.

Learn more

Top comments (1)

Collapse
 
evanoman profile image
Evan Oman

Very cool!

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay