What is a Gatherer?
A Gatherer is like map
or filter
but far more flexible:
- You can emit zero, one, or many outputs per input.
- You can keep mutable state while processing.
- You can stop early when needed.
- You can even define how it should work in parallel streams.
Think of it as a pluggable custom operator for streams.
Built-in Gatherers
Java comes with a few ready-made gatherers:
-
Gatherers.windowFixed(n)
– split into lists of n elements. -
Gatherers.windowSliding(n)
– sliding window of n elements. -
Gatherers.scan(identity, op)
– running totals (prefix sums). -
Gatherers.fold(...)
– accumulate into one result per group. -
Gatherers.mapConcurrent(...)
– async mapping with concurrency control.
Example 1: Sliding Window
import java.util.List;
import java.util.stream.Gatherers;
public class SlidingWindowExample {
public static void main(String[] args) {
List<Integer> data = List.of(1, 2, 3, 4, 5);
data.stream()
.gather(Gatherers.windowSliding(3))
.forEach(System.out::println);
// Output:
// [1, 2, 3]
// [2, 3, 4]
// [3, 4, 5]
}
}
Example 2: Prefix Sums
import java.util.List;
import java.util.stream.Gatherers;
public class PrefixSumExample {
public static void main(String[] args) {
List<Integer> numbers = List.of(1, 2, 3, 4);
numbers.stream()
.gather(Gatherers.scan(0, Integer::sum))
.forEach(System.out::println);
// Output: 0, 1, 3, 6, 10
}
}
Example 3: Parallel Mapping
import java.util.stream.Gatherers;
public class ConcurrentMapExample {
public static void main(String[] args) {
var data = java.util.stream.IntStream.range(1, 10).boxed();
data.gather(Gatherers.mapConcurrent(4, x -> {
System.out.println("Processing " + x + " in " + Thread.currentThread());
return x * x;
})).forEach(System.out::println);
}
}
When to Use Gatherers
- When built-in stream operations aren’t enough.
- For sliding windows, distinct-by, custom filters, or cumulative operations.
- When you want better performance by fusing multiple steps into one.
✅ Takeaway: Gatherers are like custom operators for streams. Start with built-in ones, and once you’re comfortable, move on to writing your own.
📖 Blog 2: Mastering Stream Gatherers in Java (Advanced Guide)
Introduction
In the beginner post, we explored built-in gatherers like windowSliding
and scan
.
Now let’s dive deeper: how do we build custom gatherers?
This is where Gatherers become truly powerful — you can define your own intermediate operations with state, short-circuiting, and parallel behavior.
Anatomy of a Gatherer
A Gatherer<T, A, R>
has three type parameters:
-
T
: input type -
A
: mutable state type -
R
: output type
It usually defines four methods:
-
initializer()
– create state. -
integrator()
– process each element, possibly emitting output(s). -
combiner()
– merge states in parallel streams. -
finisher()
– finalize processing, emit remaining results.
Example 1: Distinct by Key
Custom distinctBy
operation (like SQL DISTINCT ON
):
import java.util.*;
import java.util.stream.*;
public class DistinctByKey {
public static void main(String[] args) {
List<String> words = List.of("apple", "pear", "banana", "berry", "plum");
Stream<String> distinct = words.stream().gather(
Gatherer.of(
HashSet<Integer>::new, // state
(seen, str, downstream) -> {
if (seen.add(str.length())) {
downstream.push(str);
}
return true; // continue
},
(s1, s2) -> { s1.addAll(s2); return s1; } // combine
)
);
distinct.forEach(System.out::println);
// apple, pear, banana, plum
}
}
Example 2: Short-Circuiting
Stop processing once we see a number ≥ 100.
import java.util.stream.*;
public class StopOnCondition {
public static void main(String[] args) {
Stream.of(5, 20, 80, 101, 200)
.gather(Gatherer.of(
() -> null, // no state
(st, num, downstream) -> {
if (num >= 100) {
downstream.push(num);
return false; // stop!
}
downstream.push(num);
return true;
}
))
.forEach(System.out::println);
// Output: 5, 20, 80, 101
}
}
Example 3: Stateful Transformation
Emit running average over a stream of numbers.
import java.util.*;
import java.util.stream.*;
public class RunningAverage {
public static void main(String[] args) {
Stream<Integer> numbers = Stream.of(10, 20, 30, 40);
numbers.gather(Gatherer.of(
() -> new int[2], // state: [sum, count]
(state, n, downstream) -> {
state[0] += n;
state[1]++;
double avg = (double) state[0] / state[1];
downstream.push(avg);
return true;
}
)).forEach(System.out::println);
// Output: 10.0, 15.0, 20.0, 25.0
}
}
Best Practices
- Keep state minimal — avoid large memory footprints.
- Use
combiner
if you want parallel stream support. - Return
false
inintegrator
for early termination. - Chain gatherers for complex pipelines (e.g. windowing → filtering → stopping).
When to Use Custom Gatherers
- Analytics: rolling averages, moving sums.
- Event processing: stop on condition, detect sequences.
- Custom grouping: distinct by key, bucketization.
- Performance: fuse multiple passes into one.
Summary
- Gatherers make the Stream API extensible.
- Start with built-in gatherers (
scan
,windowSliding
). - Build your own for custom, stateful, or short-circuit operations.
- Think of Gatherers as the “power-user mode” of streams.
With Gatherers, your stream pipelines can evolve from basic data flows into rich, expressive, and efficient data transformations.
👉 Would you like me to also prepare a visual cheatsheet (diagrams of input → state → output for gatherers) that could go along with these blogs?
Top comments (0)