From Java 8, Java Streams and Lambdas was a great addition to Java language, but from there to today, I didn't find any good article or content about how Streams specifically are implemented, until today.
We gonna read and explore all the infrastructure code that make Java streams possible.
For all this study, we gonna use JDK 21, which is open source and you can find all the source here
The basic stream example
Stream.of(1, 2, 3, 4)
.forEach(System.out::println);
The first snippet demonstrates the simplicity of declaring some stream and then, invoking the forEach
method on it. This method will apply a side effect on every element of this stream. First, if you are a bit curious about the implementation of Stream
, the first thing you gonna do is jump to the implementation of it, but you found that in fact, Stream
is an interface:
public interface Stream<T> extends BaseStream<T, Stream<T>> {
// ...
}
It happens because the original intent of the library authors is to hide all the implementation details, leaving us only with some methods and no worry about how it is implemented. However, we can explore all the implementations with the help of our IDE using the find implementations tool.
It reveals us that we have 15 implementations, besides anonymous, package-private, static and abstract classes:
To study the inner workings of code, I don't like to make a very systematical study, by seeing the topology of it's classes and chain. I like to go where the execution path goes, so let's go with it.
Creating streams
As we know, we have some ways to make streams, but the direct way is by using the static methods, like of
, empty
, iterate
, generate
, and by converting collections and arrays into a stream.
By looking at the Stream
class static methods, we have the following ways to build a stream:
Builder
We can make a builder from a stream, by invoking Stream.builder()
. With this builder, we can add elements by invoking the methods accept
or add
:
private static Stream<String> buildStreamBuilder() {
Builder<String> builder = Stream.builder();
builder.accept("José");
return builder
.add("Maria")
.add("Nonato")
.build();
}
The builder method returns a Stream.Builder
implementation, that is another interface that implements the Consumer
interface. The implementation is the class Streams.StreamBuilderImpl
public static<T> Builder<T> builder() {
return new Streams.StreamBuilderImpl<>();
}
The Streams
class is some aggregation of private and package-protected classes and utilities that are used for creating and manipulating streams.
When using a builder, the preferred way to add elements to it is by using a fluent interface. This mechanism is provided by the method add
, and after that, we invoke the build
method.
The add
method will invoke the accept
method:
public Stream.Builder<T> add(T t) {
accept(t);
return this;
}
The accept
method will do the following things:
- Keep track of the first element added to the stream
- Keep track of the number of elements of the stream
- Add the elements, when we have 2 or more elements, to a
SpinedBuffer
public void accept(T t) {
if (count == 0) {
first = t;
count++;
}
else if (count > 0) {
if (buffer == null) {
buffer = new SpinedBuffer<>();
buffer.accept(first);
count++;
}
buffer.accept(t);
}
else {
throw new IllegalStateException();
}
}
A SpinedBuffer
is an immutable, ordered collection of items, where goes through two phases:
- A building phase, where the items where added, but not removed
- A traversal phase, where the buffer cannot be modified, but the items can be traversed and accessed
When we are done with the Stream.Builder
, we can call the build
method on it, that will finally instatiate our stream, using the StreamSupport.stream
method, that always accepts a Spliterator
and a flag that indicates if the stream to be built is parallel or not.
What happens in fact is, our StreamBuilderImpl
is a Spliterator
too, because it extends the AbstractStreamBuilderImpl
, which implements Spliterator
.
private abstract static class AbstractStreamBuilderImpl<T, S extends Spliterator<T>> implements Spliterator<T> {
// ...
}
This abstract class implements a very nice skeleton implementation pattern on top of Spliterator
.
The StreamBuilderImpl
will be used as spliterator only if the stream have 0 or 1 elements. If it haves 2 or more, then the build
method will use the spliterator from SpinedBuffer
.
public Stream<T> build() {
int c = count;
if (c >= 0) {
// Switch count to negative value signalling the builder is built
count = -count - 1;
// Use this spliterator if 0 or 1 elements, otherwise use
// the spliterator of the spined buffer
return (c < 2) ? StreamSupport.stream(this, false) : StreamSupport.stream(buffer.spliterator(), false);
}
throw new IllegalStateException();
}
Empty Stream
A Stream can be created too as an empty stream. It's meant to be used as substitution to null. This stream is created from an empty spliterator:
public static<T> Stream<T> empty() {
return StreamSupport.stream(Spliterators.<T>emptySpliterator(), false);
}
Single Element Stream
We can use the static method Stream.of
to create a stream consisting of a single element. This stream is created with the StreamBuilderImpl, which is the foundation for a StreamBuilder.
However, we already see that the StreamBuilderImpl is a spliterator too. So, the instance is passed directly to StreamSupport.stream
.
public static<T> Stream<T> of(T t) {
return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
}
Single Nullable Element Stream
This is the Single Element Stream with null safety guarantee. It only checks if the element is null. If it is, returns an empty stream, otherwise, return a Single Element Stream:
public static<T> Stream<T> ofNullable(T t) {
return t == null ? Stream.empty()
: StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
}
From array
Streams can be created from arrays. There are two ways to do this:
- Using the
Arrays.stream
method; - Using the
Stream.of
method passing an array (or the arguments as varargs).
The Stream.of
method will call the Arrays.stream
method. So, for all effects, they take the same path.
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
When we open the Arrays
class, we see that the stream creation process follows a process that looks like C++ iterators:
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
The stream is created by specifying where it should start and where it should end. The process is made directly in the spliterator
method, and we can repeat this process ourselves, because the method is public.
So, let's say we want to process only the two middle elements of the array, and it haves 10 items. We can made this by creating our own spliterator and passing it to StreamSupport.stream
method.
private static Stream<String> iteratePartially() {
var strings = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i" };
return StreamSupport.stream(Arrays.spliterator(strings, 3, 6), false);
}
Note that the ranges are start incluse and end exclusive, so, the only elements that are processed in this stream are "d", "e" and "f".
Stream from computations
Streams are lazy by design. So, some stream can be generated by a computation, where we have two methods that creates streams for us.
iterate
The Stream.iterate
method produce a stream by repeatedly call a function using some value as argument, where the value is the initial value or the previous result.
If we see how the spliterator is built, we found that the stream generate is potentially infinite, because it only returns true
on the method tryAdvance
.
Spoiler: the tryAdvance
method on a spliterator indicates to the stream that it can consume more items from the source. Following this logic, the stream only ends if the spliterator return false
when the tryAdvance
method is called.
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
Objects.requireNonNull(f);
Spliterator<T> spliterator = new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE,
Spliterator.ORDERED | Spliterator.IMMUTABLE) {
T prev;
boolean started;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);
T t;
if (started)
t = f.apply(prev);
else {
t = seed;
started = true;
}
action.accept(prev = t);
return true;
}
};
return StreamSupport.stream(spliterator, false);
}
The Stream.iterate
method have an overload. In this variation we have more control to signal to the stream that the computation is over, by passing a Predicate
as second argument, indicating if the iteration has more items or not. This logic is passed to the spliterator:
public static<T> Stream<T> iterate(T seed, Predicate<? super T> hasNext, UnaryOperator<T> next) {
Objects.requireNonNull(next);
Objects.requireNonNull(hasNext);
Spliterator<T> spliterator = new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE,
Spliterator.ORDERED | Spliterator.IMMUTABLE) {
T prev;
boolean started, finished;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (finished)
return false;
T t;
if (started)
t = next.apply(prev);
else {
t = seed;
started = true;
}
if (!hasNext.test(t)) {
prev = null;
finished = true;
return false;
}
action.accept(prev = t);
return true;
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
if (finished)
return;
finished = true;
T t = started ? next.apply(prev) : seed;
prev = null;
while (hasNext.test(t)) {
action.accept(t);
t = next.apply(t);
}
}
};
return StreamSupport.stream(spliterator, false);
}
generate
The Stream.generate
method is very similar to iterate
method, but in this case, this method will accept a Supplier
and not an Operator
. That way, we can supply a single or random values, where the stream we be created by the logic provided in the Supplier
.
Looking at the code, and the name of the namespace where the spliterator
used is located, we can suppose that the stream created is potentially infinite:
public static<T> Stream<T> generate(Supplier<? extends T> s) {
Objects.requireNonNull(s);
return StreamSupport.stream(
new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
}
Stream of streams
Finally, we can create a stream from two other streams, by concatenating them.
The concat
method takes two stream and uses a ConcatSpliterator
to supply the values from one stream and the the other stream sequentially.
By looking at the code, we can conclude that if one of them is parallel, the resulting stream will be parallel too.
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
Objects.requireNonNull(a);
Objects.requireNonNull(b);
@SuppressWarnings("unchecked")
Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
(Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator());
Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
return stream.onClose(Streams.composedClose(a, b));
}
We should note that streams have a method onClose
. It means that they are Closeable
and Autocloseable
resources, where these properties always are valid for resource management, because we can create streams from I/O resources too.
What's next
This post is going too long.
I will divide this and talk about another aspects and implementation of streams in the next posts. We have to talk about:
- Spliterators
- StreamSupport class
- Pipelines
- Operation stages
- Stream evaluation
And many more subjects that we'll discover in the next posts.
Top comments (0)