Introduction
A stream is an abstraction of data in programming. The Node.js Stream API has been around for a long time and is used as a uniform API for reading and writing asynchronous data. The Stream API is mostly used internally with other APIs like fs
and http
.
For example, fs.createReadStream
is often used for reading a large file. Another example is http.ServerResponse
which implements the Stream API so that the server can respond to large data. A stream is mainly used for large data, but conceptually it can represent the infinite length of data.
There is another abstraction of loops called a generator (introduced in ES2015) that is similar to a stream. A generator returns an iterator where you can loop each item and is also capable of representing the infinite length of data. ES2018 introduced async generator/iterator which can handle asynchronous data. An async generator is supported in Node.js v10.
In this post, we will be learning how to implement a synchronous counter with a pull-based stream and generator. We will also be implementing an asynchronous counter with a push-based stream and async generator in order to compare the Stream API and async generators.
Prerequisites
Before continuing, readers will need to have node.js installed and have a basic understanding of streams.
Implement a stream for the synchronous counter
In general, you would just use a stream provided by a library, in other words, you consume a stream. Now, for the purpose of study, we will provide a stream by ourselves. The documentation describes how to implement streams. Let us first make an infinite counter as a readable stream. Create a file, name it “stream-sync-counter.js”.
// stream-sync-counter.js
const { Readable, Writable } = require('stream');
const createCounterReader = () => {
let count = 0;
return new Readable({
objectMode: true,
read() {
count += 1;
console.log('reading:', count);
this.push(count);
},
});
};
const counterReader = createCounterReader();
This is a pull-based stream, which means it will read new values if the buffer is below a certain amount. We used “object mode”, so the item is just one number.
Now, let’s define a writable stream to consume this counter.
// stream-sync-counter.js (continued)
const logWriter = new Writable({
objectMode: true,
write: (chunk, _, done) => {
console.log('writing:', chunk);
done();
},
});
The function logWriter
we defined above does nothing except it outputs numbers to the console.
Now, we connect these streams, also known as a “pipe.”
// stream-sync-counter.js (continued)
counterReader.pipe(logWriter);
If you run this code, you will see numbers counting up infinitely.
$ node stream-sync-counter.js
reading: 1
reading: 2
writing: 1
reading: 3
writing: 2
reading: 4
writing: 3
reading: 5
writing: 4
reading: 6
writing: 5
...
One note is that the readable stream reads several items at once to fill its buffer and waits until some items are consumed. The way readable stream works is 1) read items and store them in the buffer, 2) wait until items are consumed, 3) if some items are consumed and the buffer becomes empty (=” below a certain amount”), it goes back to the step 1). To better see how the buffer works, you can put timeouts in your writable stream.
// modify the function in stream-sync-counter.js
const logWriter = new Writable({
objectMode: true,
write: (chunk, _, done) => {
setTimeout(() => {
console.log('writing:', chunk);
done();
}, 100);
},
});
If you run with this, you would see an interesting output:
$ node stream-sync-counter.js
reading: 1
reading: 2
reading: 3
...
reading: 31
reading: 32
writing: 1
writing: 2
writing: 3
...
writing: 14
writing: 15
writing: 16
reading: 33
reading: 34
reading: 35
...
reading: 46
reading: 47
reading: 48
writing: 17
...
Implement a generator for the synchronous counter
A generator is a feature introduced in ES2015. It’s a general abstraction of loops and allows the implementation of a loop as a function. A generator is a special function to return an iterator.
The following is the code to generate an infinite counter. Create a file, name it “generator-sync-counter.js”.
// generator-sync-counter.js
function* counterGenerator() {
let count = 0;
while (true) {
count += 1;
console.log('reading:', count);
yield count;
}
const counterIterator = counterGenerator();
Now, let’s create a function to run this iterator and output numbers to the console.
// generator-sync-counter.js (continued)
const logIterator = (iterator) => {
for (const item of iterator) {
console.log('writing:', item);
};
This is just a for-of loop. In ES2015, you can simply loop an iterator with for-of loop. You can simply invoke the function.
// generator-sync-counter.js (continued)
logIterator(counterIterator);
The result will look something like this:
$ node generator-sync-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
reading: 6
writing: 6
This is slightly different from the behavior of streams and is more intuitive because there’s no buffer.
You can also add timeouts like this:
// modify the function in generator-sync-counter.js
const logIterator = async (iterator) => {
for (const item of iterator) {
await new Promise(r => setTimeout(r, 100));
console.log('writing:', item);
};
If you run it, you should get the same result.
We’ve basically created a synchronized infinite counter both with a stream and a generator. It works the same as when we consume the counter, but the internal behavior is slightly different because the stream is buffering.
Implement a stream for an asynchronous counter
Next, we will create an asynchronous counter with a stream at first. The asynchronous counter here means it will count up every second. To create such a stream, we use setInterval. Create a file, name it “stream-async-counter.js”.
// stream-async-counter.js
const { Readable, Writable } = require('stream');
const createCounterReader = (delay) => {
let counter = 0;
const reader = new Readable({
objectMode: true,
read() {},
});
setInterval(() => {
counter += 1;
console.log('reading:', counter);
reader.push(counter);
}, delay);
return reader;
};
const counterReader = createCounterReader(1000);
This is a so-called push-based stream. As you might guess, it will push data indefinitely into the buffer, unless you consume data faster than pushing.
We use the logWriter
without timeouts because items are pushed from the readable stream, which controls timing.
// stream-async-counter.js (continued)
const logWriter = new Writable({
objectMode: true,
write: (chunk, _, done) => {
console.log('writing:', chunk);
done();
},
});
counterReader.pipe(logWriter);
If we run this, we should see the following result with delays.
$ node stream-async-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
...
This is slightly different from the result of the pull-based stream because now we consume data before a new item is added to buffer.
To see if the data is pushed regardless of consuming it, you could change the logWriter as follows.
// modify the function in stream-async-counter.js
const logWriter = new Writable({
objectMode: true,
write: (chunk, _, done) => {
setTimeout(() => {
console.log('writing:', chunk);
done();
}, 5 * 1000);
},
});
Use an async generator for an asynchronous counter
Thefor-await-of is a new feature in ES2018. It allows handling promises in iterators. Using an async generator, we can define an asynchronous infinite counter similar to the one in the previous section. Create a file named “generator-async-counter.js”:
// generator-async-counter.js
async function* counterGenerator(delay) {
let counter = 0;
while (true) {
await new Promise(r => setTimeout(r, delay));
counter += 1;
console.log('reading:', counter);
yield counter;
}
const counterIterator = counterGenerator(1000);
Notice in the code shown above, we use Promise
to wait a second.
To loop this iterator, we use the for-await-of statement.
// generator-async-counter.js (continued)
const logIterator = async (iterator) => {
for await (const item of iterator) {
console.log('writing:', item);
};
logIterator(counterIterator);
The result is just as expected.
$ node generator-async-counter.js
reading: 1
writing: 1
reading: 2
writing: 2
reading: 3
writing: 3
reading: 4
writing: 4
reading: 5
writing: 5
...
Unlike the push-based stream, the async generator only generates a new item upon a pull. To confirm that, you could modify logIterator as follows.
// modify the function in generator-async-counter.js
const logIterator = async (iterator) => {
for await (const item of iterator) {
console.log('writing:', item);
await new Promise(r => setTimeout(r, 5 * 1000));
};
Conclusion
In this article, we implemented four infinite counters and saw how streams and generators behave similarly in this example but are fundamentally different. A stream has more control over the data source, whereas there is more control on the loop in a generator. We also saw the behavior difference, a stream has a buffer but a generator generally doesn’t. There are many other differences which we didn’t include in this article. Readers who want to learn more may want to check the documentation.
Plug: LogRocket, a DVR for web apps
LogRocket is a frontend logging tool that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.
In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.
Try it for free.
The post Comparing the Stream API and (async) generators in Node.js v10 appeared first on LogRocket Blog.
Top comments (0)