DEV Community

nk sk
nk sk

Posted on

Stream Gatherers in Java: A Deep Dive

Java’s Stream API has been very powerful since Java 8, with methods like map, filter, flatMap, limit etc. But there have been many scenarios where custom intermediate operations are needed, more complex than what’s easily expressed with existing ones. To fill that gap, Java has introduced Stream Gatherers (preview feature, JDK 22 / 23 / 24). (OpenJDK)


What are Gatherers?

  • A Gatherer is an intermediate operation: it transforms a stream of input elements into a stream of output elements. Importantly, it gives you more control than map, filter, or flatMap. You can do one-to-one, one-to-many, many-to-one, or many-to-many transformations. (OpenJDK)
  • It also allows you to maintain internal mutable state during the transformation — e.g. tracking previously seen elements, counting items, accumulating, etc. (dev.java)
  • Another key feature is short-circuiting (you can stop processing early) and parallelization support, if you provide a combiner. (OpenJDK)

The API: Components of a Gatherer

A Gatherer<T, A, R> has three type parameters:

  • T: type of input elements
  • A: mutable state type (private) used during processing
  • R: type of output elements from the gatherer operation (Oracle Documentation)

And four main functions/methods you generally need / may need:

Function Purpose
initializer() Supplier that creates the initial private state (A). Called once (or per parallel subdivision). (dev.java)
integrator() This is where incoming elements (T) are processed. It is passed the state A, the current input element, and a Downstream<R> to which it can push output elements. The integrator returns a booleantrue to continue processing, false to stop (short-circuit). (Oracle Documentation)
combiner() Used when stream is parallel. Combines two state objects (A) into one. Only needed if you want parallel behavior. (OpenJDK)
finisher() Called after all input elements are processed. Given the final state and the downstream, it can emit additional output if needed (e.g. produce a summary, flush buffers, etc.). Optional in some cases. (dev.java)

Additionally, there are factory methods in Gatherers / static methods to make common gatherers or to help build them. Examples: Gatherers.fold(...), Gatherers.scan(...), windowFixed(...), windowSliding(...), mapConcurrent(...). (OpenJDK)


When to Use Gatherers

Use a Gatherer when:

  • You need an intermediate operation not provided by default, e.g.:

    • Custom sliding windows (fixed or sliding)
    • Distinct by some custom key
    • Short-circuiting based on conditions not covered by built-in methods
    • Incremental operations (prefix sums, cumulative transformations)
    • Combining mapping, filtering, buffer/accumulation logic in one step for performance/readability
  • You want sometimes better performance by fusing multiple operations into one rather than chaining many stream ops.

  • You may want to use parallel streams and need custom combining logic.


Example Code Snippets

Below are some examples showing how to define and use Gatherers. (Note: as of now, this is a preview feature — you’ll need to enable preview features when compiling/running: --enable-preview etc.) (Oracle Documentation)


1. Basic One-to-One: A custom map

Suppose you want to map strings to uppercase, but using gatherer just for demonstration.

import java.util.stream.Stream;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;

public class GathererExamples {
    public static void main(String[] args) {
        Stream<String> s = Stream.of("one", "two", "three");

        Stream<String> upper = s.gather(
            // Use stateless integrator
            Gatherer.of((String str, Gatherer.Downstream<String> downstream) -> {
                return downstream.push(str.toUpperCase());
            })
        );

        upper.forEach(System.out::println);
    }
}
Enter fullscreen mode Exit fullscreen mode

Here:

  • We don’t need initializer since no mutable state is required (stateless).
  • We used Gatherer.of(...) which takes only integrator; default combiner/finisher etc. are used. (dev.java)

2. Filtering

Filter strings that have length > 3:

Stream<String> s = Stream.of("one", "four", "six", "twelve");

Stream<String> filtered = s.gather(
    Gatherer.of((String str, Gatherer.Downstream<String> downstream) -> {
        if (str.length() > 3) {
            return downstream.push(str);
        } else {
            // drop it
            return true;  // continue
        }
    })
);

filtered.forEach(System.out::println);  
// Output: "four", "twelve"
Enter fullscreen mode Exit fullscreen mode

3. Distinct by a key

E.g. distinct by string length:

import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;

public class DistinctByLengthGatherer implements Gatherer<String, Set<Integer>, String> {

    private final Function<String, Integer> keyExtractor;

    public DistinctByLengthGatherer(Function<String, Integer> keyExtractor) {
        this.keyExtractor = keyExtractor;
    }

    @Override
    public Supplier<Set<Integer>> initializer() {
        return HashSet::new;
    }

    @Override
    public Gatherer.Integrator<Set<Integer>, String, String> integrator() {
        return (seen, str, downstream) -> {
            Integer key = keyExtractor.apply(str);
            if (seen.add(key)) {
                return downstream.push(str);
            } else {
                return true;  // continue, but don't emit duplicate
            }
        };
    }

    @Override
    public BinaryOperator<Set<Integer>> combiner() {
        return (left, right) -> {
            left.addAll(right);
            return left;
        };
    }

    @Override
    public BiConsumer<Set<Integer>, Gatherer.Downstream<? super String>> finisher() {
        return (state, downstream) -> {
            // nothing extra to emit
        };
    }
}

// Usage:
List<String> list = List.of("apple", "pear", "banana", "berry", "kiwi", "plum");
Stream<String> distinctByLen = list.stream()
    .gather(new DistinctByLengthGatherer(String::length));

distinctByLen.forEach(System.out::println);
// Possible output: apple, pear, banana, kiwi  (each length only first time)
Enter fullscreen mode Exit fullscreen mode

4. Sliding Window

Suppose you want windows of 3 elements, sliding by 1:

import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;

List<Integer> data = List.of(1,2,3,4,5);

Stream<List<Integer>> windows = data.stream()
    .gather(Gatherers.windowSliding(3));

windows.forEach(w -> System.out.println(w));
// Output:
// [1,2,3]
// [2,3,4]
// [3,4,5]
Enter fullscreen mode Exit fullscreen mode

5. Short-Circuit / Terminate early

Here’s an example of finding the first integer ≥ some limit, stopping further processing:

record BiggestInt(int limit) implements Gatherer<Integer, List<Integer>, Integer> {
    @Override
    public Supplier<List<Integer>> initializer() {
        return () -> new ArrayList<>(1);
    }

    @Override
    public Gatherer.Integrator<List<Integer>, Integer, Integer> integrator() {
        return (maxList, element, downstream) -> {
            if (maxList.isEmpty() || element > maxList.get(0)) {
                maxList.clear();
                maxList.add(element);
            }
            if (element >= limit) {
                downstream.push(element);
                return false;  // stop further processing
            }
            return true;  // continue
        };
    }

    @Override
    public BinaryOperator<List<Integer>> combiner() {
        return (left, right) -> {
            if (left.isEmpty()) return right;
            if (right.isEmpty()) return left;
            return left.get(0) >= right.get(0) ? left : right;
        };
    }

    @Override
    public BiConsumer<List<Integer>, Gatherer.Downstream<? super Integer>> finisher() {
        return (state, downstream) -> {
            // If not yet found limit, might emit the max so far or something
            if (!state.isEmpty()) {
                downstream.push(state.get(0));
            }
        };
    }
}

// Usage:

Stream<Integer> st = Stream.of(5,4,2,1,6,12,8,9);
Optional<Integer> result = st.gather(new BiggestInt(11))
    .findFirst();

System.out.println(result.get());  // prints 12
Enter fullscreen mode Exit fullscreen mode

Built-in Gatherers

Java provides several built-in gatherers via Gatherers class. Some of the common ones:

Gatherer Description / Use-case
fold(...) Many-to-one accumulation: e.g. combine all elements into one result. Like reduce, but as an intermediate operation. (OpenJDK)
scan(...) Cumulative result: emit intermediate accumulation at each input. E.g. prefix sums. (OpenJDK)
mapConcurrent(...) Allows mapping each element concurrently (e.g. via virtual threads), bounded by maxConcurrency. Useful when the mapping is expensive. (dev.java)
windowFixed(size) Break input stream into fixed‐sized windows (lists). (dev.java)
windowSliding(size) Sliding window (window that moves, overlapping). (dev.java)

Parallelism, Short-Circuit, and Performance Considerations

  • If you supply a combiner, the gatherer can work in parallel. If not, even on a parallel stream, gatherer may run sequentially. (OpenJDK)
  • Short-circuit: your integrator can return false to stop further processing. This is useful with infinite streams, or when you only need until some condition. (OpenJDK)
  • Be careful with mutable state: since state is per sub-stream (in parallel), you don’t need thread safety of state, but the way you combine states should be correct.
  • Watch out for memory usage, especially with buffer/finisher that holds many elements (like windowing / sorting etc.).

Sample Use Case: Processing Logs in Windows, Distinct & Early Stop

Let’s build a more real-world example that combines features:

Suppose you have a stream of log entries, say Stream<LogEntry>, where each has a timestamp, message, severity. You want to:

  1. Process logs in sliding windows of 5 entries
  2. Within each window: drop duplicate messages (by content)
  3. Stop processing once you see a critical error

Here’s a sketch:

record LogEntry(LocalDateTime time, String severity, String message) {}

public class ProcessLogs {
    public static void main(String[] args) {
        Stream<LogEntry> logs = getLogStream();  // maybe infinite or real-time

        // 1. Windowing
        var windowed = logs.gather(Gatherers.windowSliding(5));

        // 2. Drop duplicates by message inside each window, and check severity
        var distinctWindows = windowed.gather(new Gatherer<List<LogEntry>, Set<String>, List<LogEntry>>() {
            @Override
            public Supplier<Set<String>> initializer() {
                return HashSet::new;
            }

            @Override
            public Gatherer.Integrator<Set<String>, List<LogEntry>, List<LogEntry>> integrator() {
                return (seen, window, downstream) -> {
                    List<LogEntry> filtered = window.stream()
                        .filter(le -> seen.add(le.message()))
                        .toList();
                    return downstream.push(filtered);
                };
            }

            @Override
            public BinaryOperator<Set<String>> combiner() {
                return (s1, s2) -> { s1.addAll(s2); return s1; };
            }

            @Override
            public BiConsumer<Set<String>, Gatherer.Downstream<? super List<LogEntry>>> finisher() {
                return (state, downstream) -> { /* no extra */ };
            }
        });

        // 3. Early stop once critical error appears
        var stopOnCritical = new Gatherer<List<LogEntry>, Void, List<LogEntry>>() {
            @Override
            public Supplier<Void> initializer() {
                return () -> null;
            }

            @Override
            public Gatherer.Integrator<Void, List<LogEntry>, List<LogEntry>> integrator() {
                return (v, window, downstream) -> {
                    for (LogEntry le : window) {
                        if ("CRITICAL".equals(le.severity())) {
                            downstream.push(window);
                            return false;  // stop further processing
                        }
                    }
                    return true;
                };
            }

            @Override
            public BinaryOperator<Void> combiner() {
                return (a,b) -> null;
            }

            @Override
            public BiConsumer<Void, Gatherer.Downstream<? super List<LogEntry>>> finisher() {
                return (v, downstream) -> { /* no extra */ };
            }
        };

        // Combine them:
        logs
          .gather(Gatherers.windowSliding(5))
          .gather(new /* distinctByMessage gatherer */ /* as above */)
          .gather(stopOnCritical)
          .takeWhile(window -> true) // or something to consume until findFirst etc.
          .forEach(window -> {
              System.out.println("Window with critical or end: " + window);
          });
    }
}
Enter fullscreen mode Exit fullscreen mode

That shows chaining of gatherers, windowing, filtering, and short-circuit.


Limitations and Things to Watch Out For

  • Currently a preview feature (subject to change). Using it requires enabling preview; not stable yet. (Oracle Documentation)
  • Readability: very powerful, but custom gatherers can become complex, especially when mixing state, combiner, finisher. For simple tasks, using map/filter/flatMap/etc is still more readable.
  • Debugging: because of more moving parts, errors in state management or combiner logic can lead to subtle bugs.
  • Resource usage: buffer-based gatherers (e.g. windowing, sorting) may use extra memory.
  • Parallel behavior: need correct combiner; if you omit combiner, some gatherers will execute sequentially even in parallel streams, potentially losing performance benefits.

Status & Examples from the Wild

  • The feature is described in JEP 461: Stream Gatherers (Preview). (OpenJDK)
  • API: java.util.stream.Gatherer, java.util.stream.Gatherers, Stream.gather(Gatherer) methods. (Oracle Documentation)
  • Articles / tutorials exist (Baeldung, SoftwareMill, etc.). (Baeldung on Kotlin)

Summary

Stream Gatherers are a major extension to the Stream API, aimed at giving you the flexibility to define custom intermediate operations that are:

  • more expressive than map/filter/flatMap
  • capable of maintaining state, short-circuiting, buffer/flushing etc.
  • parallelization-aware (if you supply combiner)
  • useful for operations like sliding windows, distinct by custom key, cumulative operations, early stopping etc.

If your use-case is simple, built-in stream operations are fine. But for complex data‐processing pipelines, Gatherers can help you encapsulate and reuse logic cleanly.


If you like, I can send you a cheat sheet with API signatures + more example gatherers, or real code that you can plug into your project?

Top comments (0)