DEV Community

Cover image for Pipeline API 🔥 - the best way to handle stream errors that nobody tells you about
Márton Papp
Márton Papp

Posted on

Pipeline API 🔥 - the best way to handle stream errors that nobody tells you about

... except the documentation on the millionth page, without any context, buried deeper than a superfluous dependency in your node_modules directory.

A little background

Streams are cruel and unpredictable, but usually you can copy-paste top rated answers from Stackoverflow for years without having full comprehension over them - a very important skill that most of us mastered during our career.

But one day you will be asked to transform and upload huge amounts of data from a database table to Google Storage and you will probably write something like this:

/// this is bad, please do not do this!
async streamFromDbToGcloudBucket(incomingStream) {
  const file = ...

  return new Promise((resolve, reject) => {
    incomingStream
      .pipe(file.createWriteStream())
      .on('error', function(err) {
        reject(err);
      })
      .on('finish', function() {
        resolve();
      });
  });
}

Wrapped in a promise, piping incoming stream to a gCloud file pretty neat, huh? After months in production things started to go south as we got inactivity alerts that sometimes the files are not uploaded hourly as expected.

The ugly

During debugging I stumbled upon the following lines in the storage lib from Google:

fs.createReadStream(pathString)
  .on('error', callback!)
  .pipe(newFile.createWriteStream(options))
  .on('error', callback!)

What? You need multiple .on('error', callback)'s in the same chain? Am I stupid for not knowing this? As it turns out you need to subscribe to error handlers on every stream, because pipe does not propagate errors like you would expect. It also means that you need to repeat this for every pipe you use.

Pipeline to the rescue

Fortunately Node 10 introduced the Pipeline API to alleviate such problems. Instead of using pipe, you can use pipeline(...streams, callback). It does pretty much the same, except that callback will be called when the pipeline is fully done, or an error occurred at some point. Let's see how it works:

const { pipeline } = require('stream');

pipeline(
  readableStream,
  writableStream,
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

One more thing

You might notice that it's not wrapped in a promise. The good news is that pipeline is promisify-able (is this even a word?) as well, so you can write this:

const pipeline = util.promisify(stream.pipeline);

await pipeline(
  readableStream,
  writableStream
);

... and wrap it in a try-catch block.

In any case, I hope you the above useful, and as my first article ever, your likes and feedbacks are much appreciated!

Latest comments (1)

Collapse
 
philnash profile image
Phil Nash

This was very useful, thank you. I've just been doing some work with streams and I didn't know about pipeline or catching errors for every stream. You made the problem and solution very clear 👍