Node Stream is a great way to process iterables. pipeline()
, landed in Node v10, makes it flexible and powerful even more.
Here I'd like to share pipeline-pipe, utilities work perfectly with Node Stream, utilities such as pipe()
to make your transformer capable to work in parallel.
const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3]),
pipe(async postId => await getPost(postId), 16),
pipe(json => parseHTML(json.postBody).document.title),
pipe(title => title.includes('important') ? title : null),
pipe(async title => await storeInDB(title), 4), 4)
(err) => console.info('All done!')
);
TL;DR
- Readable.from + pipeline is a good choice to finish your batch tasks
- Use pipeline-pipe for parallel execution
- Use Node Stream, not Promise.all
How do you process iterable?
Processing data is our daily job. It often starts from iterable such as an array of RESTful API responses or CSV records to be processed one by one. It then stores the result in a database or somewhere. You may have used for
for such tasks.
const sources = [2, 3, 4];
for (let e of sources) {
const processed = await doSomething(e);
const result = await storeSomewhere(processed);
console.log(result);
}
But with Node Stream, it becomes more flexible and efficient.
await pipeline(
Readable.from(sources),
pipe(doSomething), // ← in parallel, with limit
pipe(storeSomewhere), // ←
pipe(console.log),
);
I'm going to explain why it's possible step by step in the rest of the article.
Native Node Stream syntax
Let's start by rewriting the above with Node native stream.
const { pipeline, Readable, Transform, Writable} = require('stream');
const sources = [2, 3, 4];
pipeline(
Readable.from(sources),
new Transform({
objectMode: true,
transform(e, _, callback) {
doSomething(e).then(processed => callback(undefined, processed));
},
}),
new Writable({
objectMode: true,
write(processed, _, callback) {
storeSomewhere(processed).then(result => {
console.log(result);
callback(undefined);
});
},
}),
(err) => console.log('all done')
);
A bit wordy but it works perfectly. The pipeline()
, landed in Node v10.0.0, tells us the end of the stream by callback and sweeps the stream instances by destroying them. The Readable.from()
, which was backported to Node v10.17.0, creates a readable stream from iterable such as Array and Map.
In case you're wondering how to get an index of an array in Node Stream, I'll leave this for the tip. Readable.from()
is designed great.
// Use index of the array
pipeline(
Readable.from(sources.entries()),
new Transform({
objectMode: true,
transform([index, e], _, callback) {
doSomething(e).then(
processed => callback(undefined, [index, processed])
);
},
}),
new Writable({
objectMode: true,
write([index, processed], _, callback) {
storeSomewhere(processed).then(result => {
console.log(result);
callback(undefined);
});
},
}),
(err) => console.log('all done')
);
To shorten the execution time
We can improve it. Suppose both the doSomething()
and storeSomewhere()
take 1 second to execute respectively. The whole process ends up taking 4 seconds since one stream instance can handle one piece of data at one time.
It is solved by parallel-transform by @mafintosh, the author of pipeline and pump. It'll run in parallel during data processing, so the whole process reduces the time to 2 seconds. It's 2 times faster🚀! The code would be below. However, unfortunately, it doesn't work properly now.
const parallel = require('parallel-transform');
pipeline(
Readable.from([2, 3, 4]),
parallel(10, (e, callback) => {
doSomething(e).then(processed => callback(undefined, processed));
}),
parallel(10, (processed, callback) => {
storeSomewhere(processed)
.then(result => {
console.log(result);
callback(undefined);
})
}),
(err) => console.log('all done'),
);
The reason it goes wrong is that it emits the pipeline callback before finishing because of an unsolved bug. So with respect, I published a scoped package @piglovesyou/parallel-transform that fixes it, thanks for @Tappi's PR.
Introduction of pipeline-pipe
Besides the bug fix, there is one more thing I'd like to improve; syntax. By getting rid of calling callbacks and accepting Promise, Node Stream gets a good fit for the asynchronous paradigm.
const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');
pipeline(
Readable.from([2, 3, 4]),
pipe(async (e) => await doSomething(e)),
pipe(async (processed) => {
const result = await storeSomewhere(processed);
console.log(result);
}),
(err) => console.log('all done'),
);
It can be even shorter.
pipeline(
Readable.from([2, 3, 4]),
pipe(doSomething),
pipe(storeSomewhere),
pipe(console.log),
(err) => console.log('all done'),
);
Plus, feel free to use a promisified version of pipeline()
, exported by pipeline-pipe as a utility function.
const {pipe, pipeline} = require('pipeline-pipe');
// ...
await pipeline(
Readable.from([2, 3, 4]),
pipe(doSomething),
pipe(storeSomewhere),
pipe(console.log),
);
console.log('all done');
Here are gifs to describe how parallel execution brings efficiency to complete 50 tasks, taking 100ms for each. While the sequential for-await example takes 5,000ms+, the parallel pipeline-pipe one only takes 500ms+ (10 times faster).
for-await | pipeline-pipe |
---|---|
Why you shouldn't use Promise.all
Promise.all(sources.map(...))
would be the first option for you to shorten the execution time, but I don't positively recommend it for this reason:
- The length of the source is often uncontrollable; it can get 1,000+
- Execution processes usually contain asynchronous jobs
- When it's RESTful API calls, 1,000+ simultaneous connections could happen
- That could be a burden on the API server and could be a factor of making your execution fail
To achieve fast and stable execution, we want a limitation of parallelism. And for Promise friendly syntax, pipeline-pipe would be your option for various kind of batching process execution.
I hope you'll try pipeline-pipe for your next batching project and give me feedback😁
Top comments (1)
this is gold. thank you!