DEV Community

Cover image for pipeline-pipe: Fun Way to Get Your Batch Done with Node Stream
Soichi Takamura
Soichi Takamura

Posted on • Updated on

pipeline-pipe: Fun Way to Get Your Batch Done with Node Stream

Node Stream is a great way to process iterables. pipeline(), landed in Node v10, makes it flexible and powerful even more.

Here I'd like to share pipeline-pipe, utilities work perfectly with Node Stream, utilities such as pipe() to make your transformer capable to work in parallel.

const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');

pipeline(
    Readable.from([1, 2, 3]),
    pipe(async postId => await getPost(postId), 16),
    pipe(json => parseHTML(json.postBody).document.title),
    pipe(title => title.includes('important') ? title : null),
    pipe(async title => await storeInDB(title), 4), 4)
    (err) => console.info('All done!')
);
Enter fullscreen mode Exit fullscreen mode

TL;DR

  • Readable.from + pipeline is a good choice to finish your batch tasks
  • Use pipeline-pipe for parallel execution
  • Use Node Stream, not Promise.all

How do you process iterable?

Processing data is our daily job. It often starts from iterable such as an array of RESTful API responses or CSV records to be processed one by one. It then stores the result in a database or somewhere. You may have used for for such tasks.

const sources = [2, 3, 4];

for (let e of sources) {
  const processed = await doSomething(e);
  const result = await storeSomewhere(processed);
  console.log(result);
}
Enter fullscreen mode Exit fullscreen mode

But with Node Stream, it becomes more flexible and efficient.

await pipeline(
    Readable.from(sources),
    pipe(doSomething),    // ← in parallel, with limit
    pipe(storeSomewhere), // ←
    pipe(console.log),
);
Enter fullscreen mode Exit fullscreen mode

I'm going to explain why it's possible step by step in the rest of the article.

Native Node Stream syntax

Let's start by rewriting the above with Node native stream.

const { pipeline, Readable, Transform, Writable} = require('stream');

const sources = [2, 3, 4];

pipeline(
    Readable.from(sources),
    new Transform({
      objectMode: true,
      transform(e, _, callback) {
        doSomething(e).then(processed => callback(undefined, processed));
      },
    }),
    new Writable({
      objectMode: true,
      write(processed, _, callback) {
        storeSomewhere(processed).then(result => {
          console.log(result);
          callback(undefined);
        });
      },
    }),
    (err) => console.log('all done')
);
Enter fullscreen mode Exit fullscreen mode

A bit wordy but it works perfectly. The pipeline(), landed in Node v10.0.0, tells us the end of the stream by callback and sweeps the stream instances by destroying them. The Readable.from(), which was backported to Node v10.17.0, creates a readable stream from iterable such as Array and Map.

In case you're wondering how to get an index of an array in Node Stream, I'll leave this for the tip. Readable.from() is designed great.

// Use index of the array
pipeline(
    Readable.from(sources.entries()),
    new Transform({
      objectMode: true,
      transform([index, e], _, callback) {
        doSomething(e).then(
          processed => callback(undefined, [index, processed])
        );
      },
    }),
    new Writable({
      objectMode: true,
      write([index, processed], _, callback) {
        storeSomewhere(processed).then(result => {
          console.log(result);
          callback(undefined);
        });
      },
    }),
    (err) => console.log('all done')
);
Enter fullscreen mode Exit fullscreen mode

To shorten the execution time

We can improve it. Suppose both the doSomething() and storeSomewhere() take 1 second to execute respectively. The whole process ends up taking 4 seconds since one stream instance can handle one piece of data at one time.

It is solved by parallel-transform by @mafintosh, the author of pipeline and pump. It'll run in parallel during data processing, so the whole process reduces the time to 2 seconds. It's 2 times faster🚀! The code would be below. However, unfortunately, it doesn't work properly now.

const parallel = require('parallel-transform');

pipeline(
    Readable.from([2, 3, 4]),
    parallel(10, (e, callback) => {
      doSomething(e).then(processed => callback(undefined, processed));
    }),
    parallel(10, (processed, callback) => {
      storeSomewhere(processed)
      .then(result => {
        console.log(result);
        callback(undefined);
      })
    }),
    (err) => console.log('all done'),
);
Enter fullscreen mode Exit fullscreen mode

The reason it goes wrong is that it emits the pipeline callback before finishing because of an unsolved bug. So with respect, I published a scoped package @piglovesyou/parallel-transform that fixes it, thanks for @Tappi's PR.

Introduction of pipeline-pipe

Besides the bug fix, there is one more thing I'd like to improve; syntax. By getting rid of calling callbacks and accepting Promise, Node Stream gets a good fit for the asynchronous paradigm.

const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');

pipeline(
    Readable.from([2, 3, 4]),
    pipe(async (e) => await doSomething(e)),
    pipe(async (processed) => {
      const result = await storeSomewhere(processed);
      console.log(result);
    }),
    (err) => console.log('all done'),
);
Enter fullscreen mode Exit fullscreen mode

It can be even shorter.

pipeline(
    Readable.from([2, 3, 4]),
    pipe(doSomething),
    pipe(storeSomewhere),
    pipe(console.log),
    (err) => console.log('all done'),
);
Enter fullscreen mode Exit fullscreen mode

Plus, feel free to use a promisified version of pipeline(), exported by pipeline-pipe as a utility function.

const {pipe, pipeline} = require('pipeline-pipe');

// ...

  await pipeline(
      Readable.from([2, 3, 4]),
      pipe(doSomething),
      pipe(storeSomewhere),
      pipe(console.log),
  );
  console.log('all done');
Enter fullscreen mode Exit fullscreen mode

Here are gifs to describe how parallel execution brings efficiency to complete 50 tasks, taking 100ms for each. While the sequential for-await example takes 5,000ms+, the parallel pipeline-pipe one only takes 500ms+ (10 times faster).

for-await pipeline-pipe
for-await pipeline-pipe

Why you shouldn't use Promise.all

Promise.all(sources.map(...)) would be the first option for you to shorten the execution time, but I don't positively recommend it for this reason:

  • The length of the source is often uncontrollable; it can get 1,000+
  • Execution processes usually contain asynchronous jobs
  • When it's RESTful API calls, 1,000+ simultaneous connections could happen
  • That could be a burden on the API server and could be a factor of making your execution fail

To achieve fast and stable execution, we want a limitation of parallelism. And for Promise friendly syntax, pipeline-pipe would be your option for various kind of batching process execution.

I hope you'll try pipeline-pipe for your next batching project and give me feedback😁

Discussion (0)