DEV Community

EronAlves1996
EronAlves1996

Posted on

Deep dive into Java Streams implementation - Creating Streams

From Java 8, Java Streams and Lambdas was a great addition to Java language, but from there to today, I didn't find any good article or content about how Streams specifically are implemented, until today.

We gonna read and explore all the infrastructure code that make Java streams possible.

For all this study, we gonna use JDK 21, which is open source and you can find all the source here

The basic stream example

Stream.of(1, 2, 3, 4)
  .forEach(System.out::println);
Enter fullscreen mode Exit fullscreen mode

The first snippet demonstrates the simplicity of declaring some stream and then, invoking the forEach method on it. This method will apply a side effect on every element of this stream. First, if you are a bit curious about the implementation of Stream, the first thing you gonna do is jump to the implementation of it, but you found that in fact, Stream is an interface:

public interface Stream<T> extends BaseStream<T, Stream<T>> {
// ...
}
Enter fullscreen mode Exit fullscreen mode

It happens because the original intent of the library authors is to hide all the implementation details, leaving us only with some methods and no worry about how it is implemented. However, we can explore all the implementations with the help of our IDE using the find implementations tool.

It reveals us that we have 15 implementations, besides anonymous, package-private, static and abstract classes:

Image description

To study the inner workings of code, I don't like to make a very systematical study, by seeing the topology of it's classes and chain. I like to go where the execution path goes, so let's go with it.

Creating streams

As we know, we have some ways to make streams, but the direct way is by using the static methods, like of, empty, iterate, generate, and by converting collections and arrays into a stream.

By looking at the Stream class static methods, we have the following ways to build a stream:

Builder

We can make a builder from a stream, by invoking Stream.builder(). With this builder, we can add elements by invoking the methods accept or add:

private static Stream<String> buildStreamBuilder() {
  Builder<String> builder = Stream.builder();
  builder.accept("José");
  return builder
      .add("Maria")
      .add("Nonato")
      .build();
}
Enter fullscreen mode Exit fullscreen mode

The builder method returns a Stream.Builder implementation, that is another interface that implements the Consumer interface. The implementation is the class Streams.StreamBuilderImpl

    public static<T> Builder<T> builder() {
        return new Streams.StreamBuilderImpl<>();
    }
Enter fullscreen mode Exit fullscreen mode

The Streams class is some aggregation of private and package-protected classes and utilities that are used for creating and manipulating streams.

When using a builder, the preferred way to add elements to it is by using a fluent interface. This mechanism is provided by the method add, and after that, we invoke the build method.

The add method will invoke the accept method:

        public Stream.Builder<T> add(T t) {
            accept(t);
            return this;
        }
Enter fullscreen mode Exit fullscreen mode

The accept method will do the following things:

  1. Keep track of the first element added to the stream
  2. Keep track of the number of elements of the stream
  3. Add the elements, when we have 2 or more elements, to a SpinedBuffer
        public void accept(T t) {
            if (count == 0) {
                first = t;
                count++;
            }
            else if (count > 0) {
                if (buffer == null) {
                    buffer = new SpinedBuffer<>();
                    buffer.accept(first);
                    count++;
                }

                buffer.accept(t);
            }
            else {
                throw new IllegalStateException();
            }
        }
Enter fullscreen mode Exit fullscreen mode

A SpinedBuffer is an immutable, ordered collection of items, where goes through two phases:

  1. A building phase, where the items where added, but not removed
  2. A traversal phase, where the buffer cannot be modified, but the items can be traversed and accessed

When we are done with the Stream.Builder, we can call the build method on it, that will finally instatiate our stream, using the StreamSupport.stream method, that always accepts a Spliterator and a flag that indicates if the stream to be built is parallel or not.

What happens in fact is, our StreamBuilderImpl is a Spliterator too, because it extends the AbstractStreamBuilderImpl, which implements Spliterator.

    private abstract static class AbstractStreamBuilderImpl<T, S extends Spliterator<T>> implements Spliterator<T> {
// ...
}
Enter fullscreen mode Exit fullscreen mode

This abstract class implements a very nice skeleton implementation pattern on top of Spliterator.

The StreamBuilderImpl will be used as spliterator only if the stream have 0 or 1 elements. If it haves 2 or more, then the build method will use the spliterator from SpinedBuffer.

        public Stream<T> build() {
            int c = count;
            if (c >= 0) {
                // Switch count to negative value signalling the builder is built
                count = -count - 1;
                // Use this spliterator if 0 or 1 elements, otherwise use
                // the spliterator of the spined buffer
                return (c < 2) ? StreamSupport.stream(this, false) : StreamSupport.stream(buffer.spliterator(), false);
            }

            throw new IllegalStateException();
        }
Enter fullscreen mode Exit fullscreen mode

Empty Stream

A Stream can be created too as an empty stream. It's meant to be used as substitution to null. This stream is created from an empty spliterator:

    public static<T> Stream<T> empty() {
        return StreamSupport.stream(Spliterators.<T>emptySpliterator(), false);
    }
Enter fullscreen mode Exit fullscreen mode

Single Element Stream

We can use the static method Stream.of to create a stream consisting of a single element. This stream is created with the StreamBuilderImpl, which is the foundation for a StreamBuilder.

However, we already see that the StreamBuilderImpl is a spliterator too. So, the instance is passed directly to StreamSupport.stream.

    public static<T> Stream<T> of(T t) {
        return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
    }
Enter fullscreen mode Exit fullscreen mode

Single Nullable Element Stream

This is the Single Element Stream with null safety guarantee. It only checks if the element is null. If it is, returns an empty stream, otherwise, return a Single Element Stream:

    public static<T> Stream<T> ofNullable(T t) {
        return t == null ? Stream.empty()
                         : StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
    }
Enter fullscreen mode Exit fullscreen mode

From array

Streams can be created from arrays. There are two ways to do this:

  1. Using the Arrays.stream method;
  2. Using the Stream.of method passing an array (or the arguments as varargs).

The Stream.of method will call the Arrays.stream method. So, for all effects, they take the same path.

    public static<T> Stream<T> of(T... values) {
        return Arrays.stream(values);
    }
Enter fullscreen mode Exit fullscreen mode

When we open the Arrays class, we see that the stream creation process follows a process that looks like C++ iterators:

    public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
        return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
    }
Enter fullscreen mode Exit fullscreen mode

The stream is created by specifying where it should start and where it should end. The process is made directly in the spliterator method, and we can repeat this process ourselves, because the method is public.

So, let's say we want to process only the two middle elements of the array, and it haves 10 items. We can made this by creating our own spliterator and passing it to StreamSupport.stream method.

  private static Stream<String> iteratePartially() {
    var strings = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i" };
    return StreamSupport.stream(Arrays.spliterator(strings, 3, 6), false);
  }
Enter fullscreen mode Exit fullscreen mode

Note that the ranges are start incluse and end exclusive, so, the only elements that are processed in this stream are "d", "e" and "f".

Stream from computations

Streams are lazy by design. So, some stream can be generated by a computation, where we have two methods that creates streams for us.

iterate

The Stream.iterate method produce a stream by repeatedly call a function using some value as argument, where the value is the initial value or the previous result.

If we see how the spliterator is built, we found that the stream generate is potentially infinite, because it only returns true on the method tryAdvance.

Spoiler: the tryAdvance method on a spliterator indicates to the stream that it can consume more items from the source. Following this logic, the stream only ends if the spliterator return false when the tryAdvance method is called.

    public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
        Objects.requireNonNull(f);
        Spliterator<T> spliterator = new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE,
               Spliterator.ORDERED | Spliterator.IMMUTABLE) {
            T prev;
            boolean started;

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                Objects.requireNonNull(action);
                T t;
                if (started)
                    t = f.apply(prev);
                else {
                    t = seed;
                    started = true;
                }
                action.accept(prev = t);
                return true;
            }
        };
        return StreamSupport.stream(spliterator, false);
    }
Enter fullscreen mode Exit fullscreen mode

The Stream.iterate method have an overload. In this variation we have more control to signal to the stream that the computation is over, by passing a Predicate as second argument, indicating if the iteration has more items or not. This logic is passed to the spliterator:

public static<T> Stream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {
        Objects.requireNonNull(next);
        Objects.requireNonNull(hasNext);
        Spliterator<T> spliterator = new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE,
               Spliterator.ORDERED | Spliterator.IMMUTABLE) {
            T prev;
            boolean started, finished;

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                Objects.requireNonNull(action);
                if (finished)
                    return false;
                T t;
                if (started)
                    t = next.apply(prev);
                else {
                    t = seed;
                    started = true;
                }
                if (!hasNext.test(t)) {
                    prev = null;
                    finished = true;
                    return false;
                }
                action.accept(prev = t);
                return true;
            }

            @Override
            public void forEachRemaining(Consumer<? super T> action) {
                Objects.requireNonNull(action);
                if (finished)
                    return;
                finished = true;
                T t = started ? next.apply(prev) : seed;
                prev = null;
                while (hasNext.test(t)) {
                    action.accept(t);
                    t = next.apply(t);
                }
            }
        };
        return StreamSupport.stream(spliterator, false);
    }
Enter fullscreen mode Exit fullscreen mode

generate

The Stream.generate method is very similar to iterate method, but in this case, this method will accept a Supplier and not an Operator. That way, we can supply a single or random values, where the stream we be created by the logic provided in the Supplier.

Looking at the code, and the name of the namespace where the spliterator used is located, we can suppose that the stream created is potentially infinite:

public static<T> Stream<T> generate(Supplier<? extends T> s) {
        Objects.requireNonNull(s);
        return StreamSupport.stream(
                new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
    }
Enter fullscreen mode Exit fullscreen mode

Stream of streams

Finally, we can create a stream from two other streams, by concatenating them.

The concat method takes two stream and uses a ConcatSpliterator to supply the values from one stream and the the other stream sequentially.

By looking at the code, we can conclude that if one of them is parallel, the resulting stream will be parallel too.

public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
        Objects.requireNonNull(a);
        Objects.requireNonNull(b);

        @SuppressWarnings("unchecked")
        Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
                (Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator());
        Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
        return stream.onClose(Streams.composedClose(a, b));
    }
Enter fullscreen mode Exit fullscreen mode

We should note that streams have a method onClose. It means that they are Closeable and Autocloseable resources, where these properties always are valid for resource management, because we can create streams from I/O resources too.

What's next

This post is going too long.
I will divide this and talk about another aspects and implementation of streams in the next posts. We have to talk about:

  1. Spliterators
  2. StreamSupport class
  3. Pipelines
  4. Operation stages
  5. Stream evaluation

And many more subjects that we'll discover in the next posts.

Top comments (0)