DEV Community

Mario
Mario

Posted on • Originally published at mariokandut.com on

How to connect streams with pipeline?

Streams are a built-in feature in Node.js and represent asynchronous flow of data. Streams are also a way to handle reading and/or writing files. A Node.js stream can help process large files, larger than the free memory of your computer, since it processes the data in small chunks.

Streams in Node.js

This is the fourth article of a series about streams in Node.js. It explains what pipeline does in Node.js, and how to connect streams using pipeline.

Streams in Node.js

How to connect streams with pipeline

The pipeline is a module method to pipe between streams and generators. It forwards errors and cleans up. It also provides a callback when the pipeline is complete. The pipeline method was added to Node.js v.10 to improve the experience of piping streams.

It takes any number of streams as arguments, and a callback function as its last argument. If an error occurs anywhere in the pipeline, the pipeline will end, and the callback will be invoked with the error. Also, if the pipeline successfully ends, the callback function is invoked. Hence, we have a way to see when the pipeline has completed.

Let's look at a code example. First we are going to create a sample file, then we are going to create a pipeline, with readable, PassThrough and writable streams.

Create a file.

touch create-sample.js
Enter fullscreen mode Exit fullscreen mode

Add code to create a sample file with lorem ipsum.

const fs = require('fs');

fs.writeFileSync(
  'input.txt',
  "Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.",
  { encoding: 'utf8' },
);
Enter fullscreen mode Exit fullscreen mode

Create a file.

touch streams-pipeline.js
Enter fullscreen mode Exit fullscreen mode

Add sample code.

const { PassThrough, pipeline } = require('stream');
const fs = require('fs');

const input = fs.createReadStream('input.txt');
const out = fs.createWriteStream('output.txt');

const passThrough = new PassThrough();

console.log('Starting pipeline...');
pipeline(input, passThrough, out, err => {
  if (err) {
    console.log('Pipeline failed with an error:', err);
  } else {
    console.log('Pipeline ended successfully');
  }
});
Enter fullscreen mode Exit fullscreen mode

Run the code with node streams-pipeline.js from the terminal. The code will log Starting pipeline... when the pipeline starts and Pipeline ended successfully when the pipeline is done.

Now let's emit an error and see if the error handling is triggered. Add this line at the end of the code and run it again.

passThrough.emit('error', new Error('Oh no!'));
Enter fullscreen mode Exit fullscreen mode

The code will log Starting pipeline... when the pipeline starts, and then the error gets emitted by passThrough and the pipeline will end with an error and log Pipeline failed with an error: Error: Oh no!.

One of the big benefits with pipeline is that the streams gets destroyed when an error occurs, and internal resources get released from the workload (memory which was used for the streams gets freed up) This cleanup step prevents memory leaks, which can occur when a stream has ended, but has not released the memory it was using. When using the pipe method, you are responsible for destroying streams yourself when an error occurs.

Using pipeline simplifies error handling and stream cleanup. The method makes combining streams more readable and maintainable.

Transform stream with pipeline

Let's make a more powerful stream and create our own transform stream to alter data as it is streamed from the source to the destination.

Let's implement a simple transform with the pipeline method, which transforms all strings that pass through to upper case. For input and output we are going to use process.stdin and process.stdout.

Create a file.

touch transform-it.js
Enter fullscreen mode Exit fullscreen mode

Copy code.

const { Transform, pipeline } = require('stream');

const upperCaseTransform = new Transform({
  transform: function(chunk, encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  },
});

pipeline(process.stdin, upperCaseTransform, process.stdout, err => {
  if (err) {
    console.log('Pipeline encountered an error:', err);
  } else {
    console.log('Pipeline ended');
  }
});
Enter fullscreen mode Exit fullscreen mode

Run the file with node transform-it.js and type your name in lower case. You will see that it gets transformed to upper case. You can exit the stream with ctrl+c.

What happened in the code? We created a Transform stream using the constructor from the stream module. We are required to implement a transform method on our transform stream. This transform function will receive a chunk of data that pass through the transform stream, the encoding of the chunk, and a callback function, which we can use to return the transformed data or an error. We are also converting the chunk data to a string, because by default the data chunk will be a Buffer.

Transform streams can be very powerful for creating pipelines to alter or process streaming data and are much more composable than a listening to stream events like .on('data') and then altering it.

TL;DR

  • Using pipeline simplifies error handling and stream cleanup.
  • The pipeline method makes combining streams more readable and maintainable.
  • One of the big benefits with pipeline is that the streams gets destroyed when an error occurs, and internal resources get released from the workload (memory which was used for the streams gets freed up).

Thanks for reading and if you have any questions , use the comment function or send me a message @mariokandut.

If you want to know more about Node, have a look at these Node Tutorials.

References (and Big thanks):

HeyNode,Node.js - Streams,MDN - Streams

Discussion (0)