DEV Community

nk sk
nk sk

Posted on

📖 Blog 1: *Getting Started with Java Stream Gatherers* (Beginner-Friendly)

What is a Gatherer?

A Gatherer is like map or filter but far more flexible:

  • You can emit zero, one, or many outputs per input.
  • You can keep mutable state while processing.
  • You can stop early when needed.
  • You can even define how it should work in parallel streams.

Think of it as a pluggable custom operator for streams.


Built-in Gatherers

Java comes with a few ready-made gatherers:

  • Gatherers.windowFixed(n) – split into lists of n elements.
  • Gatherers.windowSliding(n) – sliding window of n elements.
  • Gatherers.scan(identity, op) – running totals (prefix sums).
  • Gatherers.fold(...) – accumulate into one result per group.
  • Gatherers.mapConcurrent(...) – async mapping with concurrency control.

Example 1: Sliding Window

import java.util.List;
import java.util.stream.Gatherers;

public class SlidingWindowExample {
    public static void main(String[] args) {
        List<Integer> data = List.of(1, 2, 3, 4, 5);

        data.stream()
            .gather(Gatherers.windowSliding(3))
            .forEach(System.out::println);

        // Output:
        // [1, 2, 3]
        // [2, 3, 4]
        // [3, 4, 5]
    }
}
Enter fullscreen mode Exit fullscreen mode

Example 2: Prefix Sums

import java.util.List;
import java.util.stream.Gatherers;

public class PrefixSumExample {
    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4);

        numbers.stream()
               .gather(Gatherers.scan(0, Integer::sum))
               .forEach(System.out::println);

        // Output: 0, 1, 3, 6, 10
    }
}
Enter fullscreen mode Exit fullscreen mode

Example 3: Parallel Mapping

import java.util.stream.Gatherers;

public class ConcurrentMapExample {
    public static void main(String[] args) {
        var data = java.util.stream.IntStream.range(1, 10).boxed();

        data.gather(Gatherers.mapConcurrent(4, x -> {
            System.out.println("Processing " + x + " in " + Thread.currentThread());
            return x * x;
        })).forEach(System.out::println);
    }
}
Enter fullscreen mode Exit fullscreen mode

When to Use Gatherers

  • When built-in stream operations aren’t enough.
  • For sliding windows, distinct-by, custom filters, or cumulative operations.
  • When you want better performance by fusing multiple steps into one.

Takeaway: Gatherers are like custom operators for streams. Start with built-in ones, and once you’re comfortable, move on to writing your own.



📖 Blog 2: Mastering Stream Gatherers in Java (Advanced Guide)


Introduction

In the beginner post, we explored built-in gatherers like windowSliding and scan.

Now let’s dive deeper: how do we build custom gatherers?
This is where Gatherers become truly powerful — you can define your own intermediate operations with state, short-circuiting, and parallel behavior.


Anatomy of a Gatherer

A Gatherer<T, A, R> has three type parameters:

  • T: input type
  • A: mutable state type
  • R: output type

It usually defines four methods:

  1. initializer() – create state.
  2. integrator() – process each element, possibly emitting output(s).
  3. combiner() – merge states in parallel streams.
  4. finisher() – finalize processing, emit remaining results.

Example 1: Distinct by Key

Custom distinctBy operation (like SQL DISTINCT ON):

import java.util.*;
import java.util.stream.*;

public class DistinctByKey {
    public static void main(String[] args) {
        List<String> words = List.of("apple", "pear", "banana", "berry", "plum");

        Stream<String> distinct = words.stream().gather(
            Gatherer.of(
                HashSet<Integer>::new, // state
                (seen, str, downstream) -> {
                    if (seen.add(str.length())) {
                        downstream.push(str);
                    }
                    return true; // continue
                },
                (s1, s2) -> { s1.addAll(s2); return s1; } // combine
            )
        );

        distinct.forEach(System.out::println);
        // apple, pear, banana, plum
    }
}
Enter fullscreen mode Exit fullscreen mode

Example 2: Short-Circuiting

Stop processing once we see a number ≥ 100.

import java.util.stream.*;

public class StopOnCondition {
    public static void main(String[] args) {
        Stream.of(5, 20, 80, 101, 200)
              .gather(Gatherer.of(
                  () -> null, // no state
                  (st, num, downstream) -> {
                      if (num >= 100) {
                          downstream.push(num);
                          return false; // stop!
                      }
                      downstream.push(num);
                      return true;
                  }
              ))
              .forEach(System.out::println);

        // Output: 5, 20, 80, 101
    }
}
Enter fullscreen mode Exit fullscreen mode

Example 3: Stateful Transformation

Emit running average over a stream of numbers.

import java.util.*;
import java.util.stream.*;

public class RunningAverage {
    public static void main(String[] args) {
        Stream<Integer> numbers = Stream.of(10, 20, 30, 40);

        numbers.gather(Gatherer.of(
            () -> new int[2], // state: [sum, count]
            (state, n, downstream) -> {
                state[0] += n;
                state[1]++;
                double avg = (double) state[0] / state[1];
                downstream.push(avg);
                return true;
            }
        )).forEach(System.out::println);

        // Output: 10.0, 15.0, 20.0, 25.0
    }
}
Enter fullscreen mode Exit fullscreen mode

Best Practices

  • Keep state minimal — avoid large memory footprints.
  • Use combiner if you want parallel stream support.
  • Return false in integrator for early termination.
  • Chain gatherers for complex pipelines (e.g. windowing → filtering → stopping).

When to Use Custom Gatherers

  • Analytics: rolling averages, moving sums.
  • Event processing: stop on condition, detect sequences.
  • Custom grouping: distinct by key, bucketization.
  • Performance: fuse multiple passes into one.

Summary

  • Gatherers make the Stream API extensible.
  • Start with built-in gatherers (scan, windowSliding).
  • Build your own for custom, stateful, or short-circuit operations.
  • Think of Gatherers as the “power-user mode” of streams.

With Gatherers, your stream pipelines can evolve from basic data flows into rich, expressive, and efficient data transformations.


👉 Would you like me to also prepare a visual cheatsheet (diagrams of input → state → output for gatherers) that could go along with these blogs?

Top comments (0)