In the previous article in the series, we had a look at Generator Functions that let you generate values in a way that can be consumed by a for...of
loop. We used this to create a sequence of natural numbers without having to allocate the entire number set in memory, lazily producing each value when the consumer requests one.
In the same vein, Streams allow for incremental reading of a large chunk of data (or files!). Instead of forcing us to allocate memory for the data, the decoded content and any subsequent results, it allows us to process data as a series of steps on individual slices.
A Gentle Introduction to Streams
Streams come in three variations:
-
ReadableStream
: a stream that can be read from, -
WritableStream
: a stream that can be written to, -
TransformStream
: a stream that can take the output of a readable or transform stream and output a transformed result. It is both readable and writable.
The one that will be useful most of the time is ReadableStream
. This is also the stream type that is provided by the File
and Response
objects as an alternative to the more common text()
and json()
methods that returns a decoded string asynchronously.
As an example, we'll build an application that will can count the number of lines of text for a file dropped into the web page. First, let's look at the core of the problem — how do we read a file using streams?
Consuming a ReadableStream
Let's start with a function that takes a File
object and reads it from start to end, counting the occurence of line feeds (empty lines also count as a line) and returns the final count:
const countLines = async (file: File): Promise<number> => {
let count = 1;
const stream = file.stream().pipeThrough(new TextDecoderStream());
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lineBreaks = value.match(/\n/g)?.length ?? 0;
count += lineBreaks;
}
} finally {
reader.releaseLock();
}
return count;
};
The first thing to note is that files are 8-bit binary streams (Uint8Array
) which must first be decoded into UTF-16 strings that JavaScript for string representation. To do this, we pipe the file's ReadableStream<Uint8Array>
through a transform stream called TextDecoderStream
which converts the byte stream into a stream of strings that we can consume using the reader (an instance of ReadableStreamDefaultReader
).
A reader provides the read()
method which returns a Promise
that resolves to the next chunk in the stream. We can use an infinite loop to read the next chunk and check if the stream has been exhausted by testing the done
property on the resolved value. If true
, we simply break the loop.
Now that we have a function that can count lines in files, let's build a quick web application that we can demonstrate its functionality. I've decided to use Vite, Preact and @krofdrakula/drop
to build the web application shell that uses the above function to display line counts for any file (or multiple files) dropped into the page:
Since the files are read locally and not sent over the wire, you can drop any size files (even hundreds of MB!) and the browser will handle it just fine. It doesn't even have to be text, but the line count will not really make much sense in that case.
Creating Your Own ReadableStream
At this point you may be wondering how one can one create a ReadableStream
in the first place. A ReadableStream
is created using the ReadableStream
constructor which takes a configuration object with the following optional methods:
-
pull(controller)
: whenever the stream is read, this method is called by the reader in order to produce the next chunk of data. Thecontroller
object is used to send values to the consumer of the stream. -
start(controller)
: whenever the stream is constructed, this method is called and can optionally send values via thecontroller
object -
cancel(reason?)
: called when the stream'scancel()
method is called with an optional reason. Used to clean up any open handles to resources or similar.
Let's take the example of natural numbers from the previous article and write a ReadableStream
that emits all natural numbers up to the given maximum:
const createNaturalNumberStream = (max: number = Infinity) => {
let i = 1;
return new ReadableStream<number>({
pull(controller) {
if (i <= max) {
controller.enqueue(i);
i++;
} else {
controller.close();
}
},
});
};
Reading this stream will keep emitting numbers in sequence up until, and including, max
.
Transforming Streams Using TransformStream
As previously mentioned, TextDecoderStream
takes a stream of bytes and converts them to a stream of UTF-16 strings suitable for JavaScript. Streams do not have to provide data in byte arrays or strings, however; they can emit any kind of JavaScript value.
Constructing a TransformStream
is achieved similarly to ReadableStream
but has a different set of optional methods:
-
transform(chunk, controller)
: called whenever a chunk is read from the transform stream. Usecontroller
to emit values to the reader. -
start(controller)
: called when the transform stream is constructed. -
flush(controller)
: called when the source has been consumed and is about to be closed.
To demonstrate the API, let's create a mapping stream that will take a value from the input stream and apply a mapping function on the emitted value. The only method we need to implement is the transform
function that takes a chunk and enqueues the mapped value:
const createMapStream = <I, O>(
mapper: (value: I) => O
): TransformStream<I, O> =>
new TransformStream({
transform(value, controller) {
controller.enqueue(mapper(value));
},
});
Using this transformer is simply a matter of piping any compatible readable stream through it:
const evenNumberStream = createNaturalNumberStream().pipeThrough(
createMapStream((n) => n * 2)
);
Other Streams
-
CompressionStream
: compresses the content of aUint8Array
stream into a newUint8Array
stream -
DecompressionStream
: does the opposite of the above -
FileSystemWritableFileStream
: allows writing to an open file system file handle -
WebSocketStream
: an open standard proposal to allow writing to a web socket as aWritableStream
Converting ReadableStream
s into Asynchronous Iterables
Admittedly, streams are not as straightforward to consume as a for...of
loop, but there is good and bad news.
The good news is that the ReadableStream
specification implements Symbol.asyncIterator
that allows you to read a ReadableStream
using a for await...of
loop, e.g.:
for await (const number of evenNumberStream) {
console.log(number);
}
The bad news is that it currently isn't supported in all JavaScript engines. However, what we can do is leverage what we learned in the previous article regarding generator functions and create a utility function that takes a stream and creates an AsyncGenerator
yielding values from the input stream:
const streamToIterable = async function* <T>(
stream: ReadableStream<T>
): AsyncGenerator<T> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
};
This makes it possible to consume any stream as an async iterable:
for await (const number of streamToIterable(evenNumberStream))
console.log(number);
Applied Streamology
In the next installment, we'll have a look at applying our knowledge of streams and generators to read large files in the browser while keeping memory consumption low and the UI responsive while data is being processed.
Top comments (6)
Thank you much for your detailed description.
Often we have to break out parts of the document and process them separately. Let say you have to process a bracket in one piece. What is your recommendataion to apply this kind of process tools?
It really depends on the scope of your task. What do you mean by processing a bracket? Do you mean like modifying a specific fragment of a JSON file without having to decode, modify and stringify everything?
Think of a markdown processor, that reads a text and outputs HTML. There might be a need to handle a block inside a bracket. Easy going if the block is small, but what if you put the whole text into brackets?
In that case, I would create a new transform stream that would first parse the block-level elements (paragraphs, code blocks, etc.) and have that stream output an object like
{ type: 'code_block', stream: readableStream }
. I was going to demonstrate this on a much simpler example of converting a string stream into a stream of individual lines in a future article, but the process is very similar. You can create a duplicated readable stream by usingtee()
or creating a new readable stream to pass down the pipeline.The idea is that the final consumer would consume a series of readable streams of strings and block types, and parse those blocks incrementally using each block's provided readable stream. It's a bit hard to explain in more detail in a single comment, but that's the gist of it.
I may add a cut-down example in my next article of how one might do this.
I solved this by first splitting the source into an array of strings. This makes some trouble extracting code blocks (like multiline-brackets), but makes string handling fast, as most operations will run on a single line of text.
Bracket handling was mainly done manually, see here for more details.
Very interesting.