loading...
Cover image for pipeline-pipe: Fun Way to Get Batch Done with Node Stream

pipeline-pipe: Fun Way to Get Batch Done with Node Stream

piglovesyou profile image Soichi Takamura Updated on ・4 min read

Node Stream is a great way to process iterable. With the APIs, landed in Node v10, it has become more powerful.

I'd like to share an npm package pipeline-pipe, which perfectly fits with the latest native Node Stream APIs.

TL;DR

  • Readable.from + pipeline is the choice to get your batch tasks done
  • Use pipeline-pipe for parallel execution
  • Forget about using Promise.all

How do you process iterable?

Processing data is our daily job. It often starts from iterable such as an array of RESTful API responses or CSV records to be processed one by one. It then stores the result in a database or somewhere. You may have used for for such tasks.

const sources = [2, 3, 4];

for (let e of sources) {
  const processed = await doSomething(e);
  const result = await storeSomewhere(processed);
  console.log(result);
}

But with Node Stream, it becomes more flexible and efficient.

await pipeline(
    Readable.from(sources),
    pipe(doSomething),    // ← in parallel, with limit
    pipe(storeSomewhere), // ←
    pipe(console.log),
);

I'm going to explain why it's possible step by step in the rest of the article.

Native Node Stream syntax

Let's start by rewriting the above with Node native stream.

const { pipeline, Readable, Transform, Writable} = require('stream');

const sources = [2, 3, 4];

pipeline(
    Readable.from(sources),
    new Transform({
      objectMode: true,
      transform(e, _, callback) {
        doSomething(e).then(processed => callback(undefined, processed));
      },
    }),
    new Writable({
      objectMode: true,
      write(processed, _, callback) {
        storeSomewhere(processed).then(result => {
          console.log(result);
          callback(undefined);
        });
      },
    }),
    (err) => console.log('all done')
);

A bit wordy but it works perfectly. The pipeline(), landed in Node v10.0.0, tells us the end of the stream by callback and sweeps the stream instances by destroying them. The Readable.from(), which was backported to Node v10.17.0, creates a readable stream from iterable such as Array and Map.

In case you're wondering how to get an index of an array in Node Stream, I'll leave this for the tip. Readable.from() is designed great.

// Use index of the array
pipeline(
    Readable.from(sources.entries()),
    new Transform({
      objectMode: true,
      transform([index, e], _, callback) {
        doSomething(e).then(
          processed => callback(undefined, [index, processed])
        );
      },
    }),
    new Writable({
      objectMode: true,
      write([index, processed], _, callback) {
        storeSomewhere(processed).then(result => {
          console.log(result);
          callback(undefined);
        });
      },
    }),
    (err) => console.log('all done')
);

To shorten the execution time

We can improve it. Suppose both the doSomething() and storeSomewhere() take 1 second to execute respectively. The whole process ends up taking 4 seconds since one stream instance can handle one piece of data at one time.

It is solved by parallel-transform by @mafintosh, the author of pipeline and pump. It'll run in parallel during data processing, so the whole process reduces the time to 2 seconds. It's 2 times faster🚀! The code would be below. However, unfortunately, it doesn't work properly now.

const parallel = require('parallel-transform');

pipeline(
    Readable.from([2, 3, 4]),
    parallel(10, (e, callback) => {
      doSomething(e).then(processed => callback(undefined, processed));
    }),
    parallel(10, (processed, callback) => {
      storeSomewhere(processed)
      .then(result => {
        console.log(result);
        callback(undefined);
      })
    }),
    (err) => console.log('all done'),
);

The reason it goes wrong is that it emits the pipeline callback before finishing because of an unsolved bug. So with respect, I published a scoped package @piglovesyou/parallel-transform that fixes it, thanks for @Tappi's PR.

Introduction of pipeline-pipe

Besides the bug fix, there is one more thing I'd like to improve; syntax. By getting rid of calling callbacks and accepting Promise, Node Stream gets a good fit for the asynchronous paradigm.

const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');

pipeline(
    Readable.from([2, 3, 4]),
    pipe(async (e) => await doSomething(e)),
    pipe(async (processed) => {
      const result = await storeSomewhere(processed);
      console.log(result);
    }),
    (err) => console.log('all done'),
);

It can be even shorter.

pipeline(
    Readable.from([2, 3, 4]),
    pipe(doSomething),
    pipe(storeSomewhere),
    pipe(console.log),
    (err) => console.log('all done'),
);

Plus, feel free to use a promisified version of pipeline(), exported by pipeline-pipe as a utility function.

const {pipe, pipeline} = require('pipeline-pipe');

// ...

  await pipeline(
      Readable.from([2, 3, 4]),
      pipe(doSomething),
      pipe(storeSomewhere),
      pipe(console.log),
  );
  console.log('all done');

Here are gifs to describe how parallel execution brings efficiency to complete 50 tasks, taking 100ms for each. While the sequential for-await example takes 5,000ms+, the parallel pipeline-pipe one only takes 500ms+ (10 times faster).

for-await pipeline-pipe
for-await pipeline-pipe

Why you shouldn't use Promise.all

Promise.all(sources.map(...)) would be the first option for you to shorten the execution time, but I don't positively recommend it for this reason:

  • The length of the source is often uncontrollable; it can get 1,000+
  • Execution processes usually contain asynchronous jobs
  • When it's RESTful API calls, 1,000+ simultaneous connections could happen
  • That could be a burden on the API server and could be a factor of making your execution fail

To achieve fast and stable execution, we want a limitation of parallelism. And for Promise friendly syntax, pipeline-pipe would be your option for various kind of batching process execution.

I hope you'll try pipeline-pipe for your next batching project and give me feedback😁

Posted on by:

Discussion

pic
Editor guide