As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
I remember the first time I hit the limits of Java’s Collectors.toList() and Collectors.groupingBy(). I was building a real‑time dashboard for a trading system. The standard collectors could not handle my custom statistics, my multi‑level grouping, or my thread‑safe accumulations. That day I learned to write my own collectors. And it changed the way I look at streams.
Let’s walk through five patterns for writing custom collectors that encapsulate aggregation logic into reusable, composable components. I will keep this as simple as I can, as if I am explaining it to my younger self.
A collector in Java is just a box with four jobs. First, it needs a factory that creates an empty container – that is the supplier. Then it needs a way to drop one element into that container – the accumulator. When streams run in parallel, it needs a method to merge two containers into one – the combiner. And finally, a finisher that transforms the container into the final result you want.
The Java Collector interface captures these four functions plus a set of characteristics that tell the stream how to behave. If you understand these pieces, you can build a collector for almost any aggregation.
I will show you a simple statistics collector that calculates count, sum, minimum, and maximum from a stream of numbers. This pattern is the foundation for everything else.
public class StatisticsCollector<T> implements Collector<T, StatisticsCollector.Accumulator, Statistics> {
static class Accumulator {
long count;
double sum;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
}
@Override
public Supplier<Accumulator> supplier() {
return Accumulator::new;
}
@Override
public BiConsumer<Accumulator, T> accumulator() {
return (acc, value) -> {
double num = ((Number) value).doubleValue();
acc.count++;
acc.sum += num;
acc.min = Math.min(acc.min, num);
acc.max = Math.max(acc.max, num);
};
}
@Override
public BinaryOperator<Accumulator> combiner() {
return (left, right) -> {
left.count += right.count;
left.sum += right.sum;
left.min = Math.min(left.min, right.min);
left.max = Math.max(left.max, right.max);
return left;
};
}
@Override
public Function<Accumulator, Statistics> finisher() {
return acc -> new Statistics(acc.count, acc.sum, acc.min, acc.max);
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.CONCURRENT, Characteristics.UNORDERED);
}
public static <T extends Number> Collector<T, ?, Statistics> toStatistics() {
return new StatisticsCollector<>();
}
}
Notice the CONCURRENT and UNORDERED characteristics. CONCURRENT tells the stream that the accumulator can run safely from multiple threads on the same container – but only if the stream is also unordered. UNORDERED means the order of elements does not matter for the result. Together they allow the stream to skip merging and use a single container for parallel processing. This can be a huge performance win.
But you must be careful. If your accumulator modifies shared state without proper synchronization, you will get corrupted results. In this example, the operations are simple additions and comparisons, which are safe because the accumulator runs on one container at a time (the supplier creates a new container per thread, but with CONCURRENT the supplier might be called only once – check the Java documentation). For truly concurrent collectors, I use atomic classes like LongAdder and AtomicReference. Let me show you that next.
Thread‑Safe Collectors for Parallel Streams
When I write collectors for high‑throughput systems, I use concurrent building blocks. The pattern stays the same, but the accumulator uses thread‑safe objects.
public class ConcurrentStatisticsCollector<T extends Number>
implements Collector<T, ConcurrentStatisticsCollector.ConcurrentAccumulator, Statistics> {
static class ConcurrentAccumulator {
final LongAdder count = new LongAdder();
final DoubleAdder sum = new DoubleAdder();
final AtomicReference<Double> min = new AtomicReference<>(Double.MAX_VALUE);
final AtomicReference<Double> max = new AtomicReference<>(Double.MIN_VALUE);
}
@Override
public Supplier<ConcurrentAccumulator> supplier() {
return ConcurrentAccumulator::new;
}
@Override
public BiConsumer<ConcurrentAccumulator, T> accumulator() {
return (acc, value) -> {
double num = value.doubleValue();
acc.count.increment();
acc.sum.add(num);
acc.min.updateAndGet(current -> Math.min(current, num));
acc.max.updateAndGet(current -> Math.max(current, num));
};
}
@Override
public BinaryOperator<ConcurrentAccumulator> combiner() {
return (left, right) -> {
left.count.add(right.count.sum());
left.sum.add(right.sum.sum());
left.min.updateAndGet(current -> Math.min(current, right.min.get()));
left.max.updateAndGet(current -> Math.max(current, right.max.get()));
return left;
};
}
@Override
public Function<ConcurrentAccumulator, Statistics> finisher() {
return acc -> new Statistics(
acc.count.sum(),
acc.sum.sum(),
acc.min.get(),
acc.max.get()
);
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.CONCURRENT, Characteristics.UNORDERED);
}
}
The combiner still merges two accumulators, but it does so by adding the values from both. This is safe because each accumulator was built in its own thread. The CONCURRENT characteristic allows the stream to avoid merging if it knows only one thread will ever call the accumulator – but with atomic fields you are prepared for either case.
I use this pattern when I process millions of events per second. The overhead of atomic operations is small compared to the cost of merging intermediate results.
Multi‑Level Grouping
Grouping by one key is easy. Grouping by two keys and keeping a list of values inside each leaf – that is a custom collector. I needed this when I had to organise sales data by region and then by product category, while preserving the order of individual transactions.
Here is a cascading grouping collector that builds a nested map.
public class CascadingGroupingCollector<T, K1, K2, V>
implements Collector<T, Map<K1, Map<K2, List<V>>>, Map<K1, Map<K2, List<V>>>> {
private final Function<? super T, ? extends K1> classifier1;
private final Function<? super T, ? extends K2> classifier2;
public CascadingGroupingCollector(
Function<? super T, ? extends K1> classifier1,
Function<? super T, ? extends K2> classifier2) {
this.classifier1 = classifier1;
this.classifier2 = classifier2;
}
@Override
public Supplier<Map<K1, Map<K2, List<V>>>> supplier() {
return HashMap::new;
}
@Override
public BiConsumer<Map<K1, Map<K2, List<V>>>, T> accumulator() {
return (map, item) -> {
K1 key1 = classifier1.apply(item);
K2 key2 = classifier2.apply(item);
map.computeIfAbsent(key1, k -> new HashMap<>())
.computeIfAbsent(key2, k -> new ArrayList<>())
.add((V) item);
};
}
@Override
public BinaryOperator<Map<K1, Map<K2, List<V>>>> combiner() {
return (left, right) -> {
right.forEach((key1, innerMap) -> {
left.merge(key1, innerMap, (existing, incoming) -> {
incoming.forEach((key2, list) ->
existing.merge(key2, list, (l1, l2) -> {
l1.addAll(l2);
return l1;
}));
return existing;
});
});
return left;
};
}
@Override
public Function<Map<K1, Map<K2, List<V>>>, Map<K1, Map<K2, List<V>>>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED);
}
}
Notice that the finisher returns the map as‑is. That is because the intermediate container is also the final result. This works only if the characteristics do not include IDENTITY_FINISH. If you mark a collector as IDENTITY_FINISH, the stream can skip calling the finisher entirely. Here I deliberately left it out to avoid confusion. In real code, you could add IDENTITY_FINISH and remove the finisher override.
The combiner merges nested maps recursively. It uses merge to handle both levels of keys. This pattern scales to three or more levels – just add another layer of maps and another merge loop.
Top‑N Collector
Sometimes you do not need all the elements – only the ten largest, or the five most recent. The standard toList() gives you everything. A custom top‑N collector keeps only the best items using a bounded TreeSet.
public class TopNCollector<T> implements Collector<T, TreeSet<T>, List<T>> {
private final Comparator<? super T> comparator;
private final int n;
public TopNCollector(Comparator<? super T> comparator, int n) {
this.comparator = comparator;
this.n = n;
}
@Override
public Supplier<TreeSet<T>> supplier() {
return () -> new TreeSet<>(comparator);
}
@Override
public BiConsumer<TreeSet<T>, T> accumulator() {
return (set, item) -> {
set.add(item);
if (set.size() > n) {
set.pollLast();
}
};
}
@Override
public BinaryOperator<TreeSet<T>> combiner() {
return (left, right) -> {
for (T item : right) {
left.add(item);
if (left.size() > n) {
left.pollLast();
}
}
return left;
};
}
@Override
public Function<TreeSet<T>, List<T>> finisher() {
return set -> {
List<T> list = new ArrayList<>(set);
Collections.reverse(list);
return list;
};
}
@Override
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.UNORDERED);
}
public static <T extends Comparable<? super T>> Collector<T, ?, List<T>> topN(int n) {
return new TopNCollector<>(Comparator.naturalOrder(), n);
}
public static <T> Collector<T, ?, List<T>> topN(Comparator<? super T> comparator, int n) {
return new TopNCollector<>(comparator, n);
}
}
The magic happens in the accumulator. It adds the element, then immediately removes the worst one if the set exceeds the limit. This keeps memory constant regardless of stream size. The combiner does the same when merging two sets from parallel threads.
I used this collector when I needed to show the top‑10 trending products from a live feed. Instead of storing every product, I kept only the ten with the highest score. The stream never had to hold more than ten plus a few duplicates.
The finisher converts the TreeSet into a list in descending order (because I reversed it). If you want ascending order, skip the reverse.
Composing Collectors with Teeing
Java 12 introduced Collectors.teeing to send stream elements to two collectors and merge their results. But its capacity is limited to two. If you need three or four aggregations from a single pass, you can write your own teeing collector that handles any number of branches.
Here is a simplified version that works for two downstream collectors. You can extend it to more by using a list of collectors and a merger that receives a list of results.
public class TeeingCollector<T, R1, R2, R> implements Collector<T, TeeingCollector.Accumulator<R1, R2>, R> {
static class Accumulator<R1, R2> {
R1 leftResult;
R2 rightResult;
}
private final Collector<T, ?, R1> left;
private final Collector<T, ?, R2> right;
private final BiFunction<? super R1, ? super R2, R> merger;
public TeeingCollector(
Collector<T, ?, R1> left,
Collector<T, ?, R2> right,
BiFunction<? super R1, ? super R2, R> merger) {
this.left = left;
this.right = right;
this.merger = merger;
}
@Override
@SuppressWarnings("unchecked")
public Supplier<Accumulator<R1, R2>> supplier() {
return () -> {
Accumulator<R1, R2> acc = new Accumulator<>();
acc.leftResult = ((Supplier<R1>) left.supplier()).get();
acc.rightResult = ((Supplier<R2>) right.supplier()).get();
return acc;
};
}
// ... full implementation would include accumulator, combiner, finisher
}
The code above is a skeleton. A complete implementation would store the intermediate containers from each downstream collector. The accumulator would apply both downstream accumulators to each element. The combiner would merge both intermediate containers separately. The finisher would apply the downstream finishers and then the merger.
I built a three‑branch teeing collector for a report that needed counts, sums, and averages from the same stream. Instead of iterating three times, I wrote one collector that ran all three in parallel.
Personal Touches and Practical Advice
When I started writing custom collectors, I made two mistakes. First, I forgot to set the characteristics correctly. If your collector does not have UNORDERED but your stream is ordered, the stream may create more intermediate containers than necessary. Second, I wrote accumulators that were not thread‑safe for parallel streams, leading to rare bugs that only appeared under heavy load.
Test your collectors with both sequential and parallel streams. Use parallelStream() and verify that the result matches. I usually create a small test dataset, then compare the output of a known correct method with the custom collector.
Do not be afraid to inline small collectors with Collector.of(). For quick one‑offs, the factory method is easier than writing a full class. But for reusable logic, writing a dedicated class makes it testable and readable.
Collector<Transaction, ?, Map<Integer, List<Transaction>>> byAmountBucket =
Collector.of(
HashMap::new,
(map, tx) -> map.computeIfAbsent(tx.amount() / 1000, k -> new ArrayList<>()).add(tx),
(left, right) -> {
right.forEach((k, v) -> left.merge(k, v, (l1, l2) -> { l1.addAll(l2); return l1; }));
return left;
}
);
This inline collector groups transactions into buckets of thousands. No new class needed.
Summary of the Five Patterns
- Statistics Collector – Accumulates multiple numeric metrics in one pass.
- Concurrent Collector – Uses atomic classes for safe parallel accumulation.
- Cascading Grouping Collector – Builds nested maps with multiple keys.
- Top‑N Collector – Keeps only a bounded number of elements using a sorted set.
- Teeing Collector – Feeds the stream to multiple downstream collectors and merges.
Each pattern follows the same contract: supplier, accumulator, combiner, finisher. Once you internalise that, you can design collectors for any aggregation problem.
Custom collectors have saved me countless hours of manual iteration and temporary collections. They turn messy loops into clean, declarative pipeline code. And because they implement the Collector interface, they work automatically with parallel streams, giving you free performance gains when your data grows.
The next time you find yourself writing a for loop that reduces a stream into a complex structure, stop and ask: can I package this logic into a collector? If the answer is yes, you will end up with code that is easier to read, easier to test, and easier to reuse. I promise you, it is worth the effort.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)