Java’s Stream API has been very powerful since Java 8, with methods like map
, filter
, flatMap
, limit
etc. But there have been many scenarios where custom intermediate operations are needed, more complex than what’s easily expressed with existing ones. To fill that gap, Java has introduced Stream Gatherers (preview feature, JDK 22 / 23 / 24). (OpenJDK)
What are Gatherers?
- A Gatherer is an intermediate operation: it transforms a stream of input elements into a stream of output elements. Importantly, it gives you more control than
map
,filter
, orflatMap
. You can do one-to-one, one-to-many, many-to-one, or many-to-many transformations. (OpenJDK) - It also allows you to maintain internal mutable state during the transformation — e.g. tracking previously seen elements, counting items, accumulating, etc. (dev.java)
- Another key feature is short-circuiting (you can stop processing early) and parallelization support, if you provide a combiner. (OpenJDK)
The API: Components of a Gatherer
A Gatherer<T, A, R>
has three type parameters:
-
T
: type of input elements -
A
: mutable state type (private) used during processing -
R
: type of output elements from the gatherer operation (Oracle Documentation)
And four main functions/methods you generally need / may need:
Function | Purpose |
---|---|
initializer() | Supplier that creates the initial private state (A ). Called once (or per parallel subdivision). (dev.java) |
integrator() | This is where incoming elements (T ) are processed. It is passed the state A , the current input element, and a Downstream<R> to which it can push output elements. The integrator returns a boolean → true to continue processing, false to stop (short-circuit). (Oracle Documentation) |
combiner() | Used when stream is parallel. Combines two state objects (A ) into one. Only needed if you want parallel behavior. (OpenJDK) |
finisher() | Called after all input elements are processed. Given the final state and the downstream, it can emit additional output if needed (e.g. produce a summary, flush buffers, etc.). Optional in some cases. (dev.java) |
Additionally, there are factory methods in Gatherers
/ static methods to make common gatherers or to help build them. Examples: Gatherers.fold(...)
, Gatherers.scan(...)
, windowFixed(...)
, windowSliding(...)
, mapConcurrent(...)
. (OpenJDK)
When to Use Gatherers
Use a Gatherer when:
-
You need an intermediate operation not provided by default, e.g.:
- Custom sliding windows (fixed or sliding)
- Distinct by some custom key
- Short-circuiting based on conditions not covered by built-in methods
- Incremental operations (prefix sums, cumulative transformations)
- Combining mapping, filtering, buffer/accumulation logic in one step for performance/readability
You want sometimes better performance by fusing multiple operations into one rather than chaining many stream ops.
You may want to use parallel streams and need custom combining logic.
Example Code Snippets
Below are some examples showing how to define and use Gatherers. (Note: as of now, this is a preview feature — you’ll need to enable preview features when compiling/running: --enable-preview
etc.) (Oracle Documentation)
1. Basic One-to-One: A custom map
Suppose you want to map strings to uppercase, but using gatherer just for demonstration.
import java.util.stream.Stream;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
public class GathererExamples {
public static void main(String[] args) {
Stream<String> s = Stream.of("one", "two", "three");
Stream<String> upper = s.gather(
// Use stateless integrator
Gatherer.of((String str, Gatherer.Downstream<String> downstream) -> {
return downstream.push(str.toUpperCase());
})
);
upper.forEach(System.out::println);
}
}
Here:
- We don’t need
initializer
since no mutable state is required (stateless). - We used
Gatherer.of(...)
which takes only integrator; default combiner/finisher etc. are used. (dev.java)
2. Filtering
Filter strings that have length > 3:
Stream<String> s = Stream.of("one", "four", "six", "twelve");
Stream<String> filtered = s.gather(
Gatherer.of((String str, Gatherer.Downstream<String> downstream) -> {
if (str.length() > 3) {
return downstream.push(str);
} else {
// drop it
return true; // continue
}
})
);
filtered.forEach(System.out::println);
// Output: "four", "twelve"
3. Distinct by a key
E.g. distinct by string length:
import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;
public class DistinctByLengthGatherer implements Gatherer<String, Set<Integer>, String> {
private final Function<String, Integer> keyExtractor;
public DistinctByLengthGatherer(Function<String, Integer> keyExtractor) {
this.keyExtractor = keyExtractor;
}
@Override
public Supplier<Set<Integer>> initializer() {
return HashSet::new;
}
@Override
public Gatherer.Integrator<Set<Integer>, String, String> integrator() {
return (seen, str, downstream) -> {
Integer key = keyExtractor.apply(str);
if (seen.add(key)) {
return downstream.push(str);
} else {
return true; // continue, but don't emit duplicate
}
};
}
@Override
public BinaryOperator<Set<Integer>> combiner() {
return (left, right) -> {
left.addAll(right);
return left;
};
}
@Override
public BiConsumer<Set<Integer>, Gatherer.Downstream<? super String>> finisher() {
return (state, downstream) -> {
// nothing extra to emit
};
}
}
// Usage:
List<String> list = List.of("apple", "pear", "banana", "berry", "kiwi", "plum");
Stream<String> distinctByLen = list.stream()
.gather(new DistinctByLengthGatherer(String::length));
distinctByLen.forEach(System.out::println);
// Possible output: apple, pear, banana, kiwi (each length only first time)
4. Sliding Window
Suppose you want windows of 3 elements, sliding by 1:
import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
List<Integer> data = List.of(1,2,3,4,5);
Stream<List<Integer>> windows = data.stream()
.gather(Gatherers.windowSliding(3));
windows.forEach(w -> System.out.println(w));
// Output:
// [1,2,3]
// [2,3,4]
// [3,4,5]
5. Short-Circuit / Terminate early
Here’s an example of finding the first integer ≥ some limit, stopping further processing:
record BiggestInt(int limit) implements Gatherer<Integer, List<Integer>, Integer> {
@Override
public Supplier<List<Integer>> initializer() {
return () -> new ArrayList<>(1);
}
@Override
public Gatherer.Integrator<List<Integer>, Integer, Integer> integrator() {
return (maxList, element, downstream) -> {
if (maxList.isEmpty() || element > maxList.get(0)) {
maxList.clear();
maxList.add(element);
}
if (element >= limit) {
downstream.push(element);
return false; // stop further processing
}
return true; // continue
};
}
@Override
public BinaryOperator<List<Integer>> combiner() {
return (left, right) -> {
if (left.isEmpty()) return right;
if (right.isEmpty()) return left;
return left.get(0) >= right.get(0) ? left : right;
};
}
@Override
public BiConsumer<List<Integer>, Gatherer.Downstream<? super Integer>> finisher() {
return (state, downstream) -> {
// If not yet found limit, might emit the max so far or something
if (!state.isEmpty()) {
downstream.push(state.get(0));
}
};
}
}
// Usage:
Stream<Integer> st = Stream.of(5,4,2,1,6,12,8,9);
Optional<Integer> result = st.gather(new BiggestInt(11))
.findFirst();
System.out.println(result.get()); // prints 12
Built-in Gatherers
Java provides several built-in gatherers via Gatherers
class. Some of the common ones:
Gatherer | Description / Use-case |
---|---|
fold(...) |
Many-to-one accumulation: e.g. combine all elements into one result. Like reduce , but as an intermediate operation. (OpenJDK) |
scan(...) |
Cumulative result: emit intermediate accumulation at each input. E.g. prefix sums. (OpenJDK) |
mapConcurrent(...) |
Allows mapping each element concurrently (e.g. via virtual threads), bounded by maxConcurrency . Useful when the mapping is expensive. (dev.java) |
windowFixed(size) |
Break input stream into fixed‐sized windows (lists). (dev.java) |
windowSliding(size) |
Sliding window (window that moves, overlapping). (dev.java) |
Parallelism, Short-Circuit, and Performance Considerations
- If you supply a combiner, the gatherer can work in parallel. If not, even on a parallel stream, gatherer may run sequentially. (OpenJDK)
- Short-circuit: your integrator can return
false
to stop further processing. This is useful with infinite streams, or when you only need until some condition. (OpenJDK) - Be careful with mutable state: since state is per sub-stream (in parallel), you don’t need thread safety of state, but the way you combine states should be correct.
- Watch out for memory usage, especially with buffer/finisher that holds many elements (like windowing / sorting etc.).
Sample Use Case: Processing Logs in Windows, Distinct & Early Stop
Let’s build a more real-world example that combines features:
Suppose you have a stream of log entries, say Stream<LogEntry>
, where each has a timestamp, message, severity. You want to:
- Process logs in sliding windows of 5 entries
- Within each window: drop duplicate messages (by content)
- Stop processing once you see a critical error
Here’s a sketch:
record LogEntry(LocalDateTime time, String severity, String message) {}
public class ProcessLogs {
public static void main(String[] args) {
Stream<LogEntry> logs = getLogStream(); // maybe infinite or real-time
// 1. Windowing
var windowed = logs.gather(Gatherers.windowSliding(5));
// 2. Drop duplicates by message inside each window, and check severity
var distinctWindows = windowed.gather(new Gatherer<List<LogEntry>, Set<String>, List<LogEntry>>() {
@Override
public Supplier<Set<String>> initializer() {
return HashSet::new;
}
@Override
public Gatherer.Integrator<Set<String>, List<LogEntry>, List<LogEntry>> integrator() {
return (seen, window, downstream) -> {
List<LogEntry> filtered = window.stream()
.filter(le -> seen.add(le.message()))
.toList();
return downstream.push(filtered);
};
}
@Override
public BinaryOperator<Set<String>> combiner() {
return (s1, s2) -> { s1.addAll(s2); return s1; };
}
@Override
public BiConsumer<Set<String>, Gatherer.Downstream<? super List<LogEntry>>> finisher() {
return (state, downstream) -> { /* no extra */ };
}
});
// 3. Early stop once critical error appears
var stopOnCritical = new Gatherer<List<LogEntry>, Void, List<LogEntry>>() {
@Override
public Supplier<Void> initializer() {
return () -> null;
}
@Override
public Gatherer.Integrator<Void, List<LogEntry>, List<LogEntry>> integrator() {
return (v, window, downstream) -> {
for (LogEntry le : window) {
if ("CRITICAL".equals(le.severity())) {
downstream.push(window);
return false; // stop further processing
}
}
return true;
};
}
@Override
public BinaryOperator<Void> combiner() {
return (a,b) -> null;
}
@Override
public BiConsumer<Void, Gatherer.Downstream<? super List<LogEntry>>> finisher() {
return (v, downstream) -> { /* no extra */ };
}
};
// Combine them:
logs
.gather(Gatherers.windowSliding(5))
.gather(new /* distinctByMessage gatherer */ /* as above */)
.gather(stopOnCritical)
.takeWhile(window -> true) // or something to consume until findFirst etc.
.forEach(window -> {
System.out.println("Window with critical or end: " + window);
});
}
}
That shows chaining of gatherers, windowing, filtering, and short-circuit.
Limitations and Things to Watch Out For
- Currently a preview feature (subject to change). Using it requires enabling preview; not stable yet. (Oracle Documentation)
- Readability: very powerful, but custom gatherers can become complex, especially when mixing state, combiner, finisher. For simple tasks, using
map/filter/flatMap/etc
is still more readable. - Debugging: because of more moving parts, errors in state management or combiner logic can lead to subtle bugs.
- Resource usage: buffer-based gatherers (e.g. windowing, sorting) may use extra memory.
- Parallel behavior: need correct combiner; if you omit combiner, some gatherers will execute sequentially even in parallel streams, potentially losing performance benefits.
Status & Examples from the Wild
- The feature is described in JEP 461: Stream Gatherers (Preview). (OpenJDK)
- API:
java.util.stream.Gatherer
,java.util.stream.Gatherers
,Stream.gather(Gatherer)
methods. (Oracle Documentation) - Articles / tutorials exist (Baeldung, SoftwareMill, etc.). (Baeldung on Kotlin)
Summary
Stream Gatherers are a major extension to the Stream API, aimed at giving you the flexibility to define custom intermediate operations that are:
- more expressive than map/filter/flatMap
- capable of maintaining state, short-circuiting, buffer/flushing etc.
- parallelization-aware (if you supply combiner)
- useful for operations like sliding windows, distinct by custom key, cumulative operations, early stopping etc.
If your use-case is simple, built-in stream operations are fine. But for complex data‐processing pipelines, Gatherers can help you encapsulate and reuse logic cleanly.
If you like, I can send you a cheat sheet with API signatures + more example gatherers, or real code that you can plug into your project?
Top comments (0)