DEV Community

Ayako yk
Ayako yk

Posted on

Understanding Streams in Node.js: From Piping to Backpressure

Handling large amounts of data efficiently is essential in Node.js. Without proper management, performance can suffer, and memory usage may grow out of control.

In this blog post, I'll walk you through some of the core mechanisms that Node.js provides to handle data streams effectively:

  1. What are Streams?
  2. Type of Streams
  3. Piping
  4. Backpressure

What are Streams?
Node.js can handle large datasets, such as reading and writing files or fetching data over the network. However, when trying to load all data into memory at once, performance drops, and memory exhaustion becomes a risk.

To solve this, Node.js provides streams, which process data in smaller, manageable chunks.

By working with chunks, streams reduce memory usage and improve performance. All streams in Node.js inherit from the EventEmitter class, which allows developers to respond to events like data (when new data arrives) and end (when the stream finishes).
This event-driven approach helps manage the flow of data smoothly and efficiently.

Type of Streams
Readable
Readable is a class, specifically part of a stream module in Node.js.
Here's a basic example of a readable stream from the Node.js documentation:

class MyStream extends Readable { 
  #count = 0; 
  _read(size) { 
    this.push(':-)'); 
    if (++this.#count === 5) { 
      this.push(null); 
    } 
  }}
const stream = new MyStream();stream.on('data', chunk => {   console.log(chunk.toString());
});
Enter fullscreen mode Exit fullscreen mode

This is a custom class, MyStream, that extends the Readable stream. The _read(size) method is part of the internal contract of the Readable stream and is called automatically whenever the stream needs more data.

Streams have an internal buffer (similar to a queue), and data is pushed into this buffer using this.push(data). Pushing null signals the end of the stream.

Writable
Writable is also a class, and it's useful for tasks such as creating files, uploading data, or outputting data to the console or a network.

Here's an example from the Node.js documentation. It defines a writable stream that converts incoming data to uppercase before writing it to standard output.

const { Writable } = require('node:stream');
const { once } = require('node:events');

class MyStream extends Writable {
  constructor() {
    super({ highWaterMark: 10 /* 10 bytes */ });
  }
  _write(data, encode, cb) {
    process.stdout.write(data.toString().toUpperCase() + '\n', cb);
  }
}

async function main() {
  const stream = new MyStream();

  for (let i = 0; i < 10; i++) {
    const waitDrain = !stream.write('hello');

    if (waitDrain) {
      console.log('>> wait drain');
      await once(stream, 'drain');
    }
  }

  stream.end('world');
}

// Call the async function
main().catch(console.error);
Enter fullscreen mode Exit fullscreen mode

The highWaterMark option sets a limit for the internal buffer size -- in this example, it is limited to 10 bytes. This means that once the buffer exceeds 10 bytes (roughly the size of two 'HELLO' strings), the stream will apply backpressure --- a concept that will be explained later in this blog.

The _write() method is similar to _read() in a readable stream. It is required when creating a custom writable stream and is called automatically by Node.js whenever .write(data) is used.

The for loop demonstrates how to handle backpressure. Specifically, this line:

const waitDrain = !stream.write('hello'); 
Enter fullscreen mode Exit fullscreen mode

assigns true to waitDrain when the buffer is full (i.e., when .write() returns false). If that's the case, the code waits for the drain event before continuing. The drain event is emitted when the stream is ready to receive more data.

Duplex
Duplex is also a class, and a duplex stream implements both Readable and Writable. When creating a custom class that extends from Duplex, you need to define both _read() and _write() methods.

The Node.js documentation shares an example using socket because it is a common real-world use case for a duplex stream.

const net = require('node:net');

// Create a TCP server
const server = net.createServer(socket => {
  socket.write('Hello from server!\n');

  socket.on('data', data => {
    console.log(`Client says: ${data.toString()}`);
  });

  // Handle client disconnection
  socket.on('end', () => {
    console.log('Client disconnected');
  });
});

// Start the server on port 8080
server.listen(8080, () => {
  console.log('Server listening on port 8080');
});
Enter fullscreen mode Exit fullscreen mode

This opens a TCP socket on port 8080, sends Hello from server! to any connecting client, and logs any data received from the client.

Transform
A Transform stream is a special type of Duplex stream. It has all the methods that Duplex provides, but it's symmetric, meaning the output is produced by processing the input. The key method that sets it apart is _transform(chunk, encoding, callback).

const { Transform } = require('node:stream');

const upper = new Transform({ 
  transform: function (data, enc, cb) { 
    this.push(data.toString().toUpperCase()); 
    cb(); 
  },
});
Enter fullscreen mode Exit fullscreen mode

The transform function defines how the output should be generated. In this case, any input written to the stream will be converted to uppercase and sent to the readable side as output.

Piping
Piping is a mechanism that sends the output of one data stream into another. This is powerful because it allows data to flow seamlessly from a source to a destination.

pipe()
The pipe() method connects a readable stream to a writable or transform stream, allowing data to flow automatically between them. While it seems simple, it does not handle errors automatically, so error handling must be done manually.

readableStream.pipe(writableStream);
Enter fullscreen mode Exit fullscreen mode

pipeline()
The pipeline() method is a safer and more robust alternative. It automatically handles errors and resource cleanup, making it the recommended way.

pipeline(readableStream, writableStream, err => {
  if (err) {
      console.error('Pipeline failed.', err);
  } else {
    console.log('Pipeline succeeded.');
  }
});
Enter fullscreen mode Exit fullscreen mode

Backpressure
There is a common issue in stream-based data handling where the receiving end, or writable stream, can't keep up with incoming data. This can happen if the writable stream is performing complex operations or if the incoming data is too large or too fast. This mismatch causes memory issues. That's where backpressure comes in. Backpressure is a flow-control mechanism that prevents such problems by temporarily pausing the flow of data until the consumer is ready again.

Let's look at the same writable stream example again:

const { Writable } = require('node:stream');
const { once } = require('node:events');

class MyStream extends Writable { 
  constructor() { 
    super({ highWaterMark: 10 /* 10 bytes */ }); 
  } 
  _write(data, encode, cb) {  
    process.stdout.write(data.toString().toUpperCase() + '\n', cb); 
  }
}
async function main() { 
  const stream = new MyStream(); 
  for (let i = 0; i < 10; i++) { 
    const waitDrain = !stream.write('hello'); 
    if (waitDrain) { 
      console.log('>> wait drain'); 
      await once(stream, 'drain'); 
    } 
  } 
  stream.end('world');
}
// Call the async function
main().catch(console.error);
Enter fullscreen mode Exit fullscreen mode

The highWaterMark is an option that defines the maximum number of bytes allowed in the internal buffer. If the buffer size exceeds this value or the writable stream is busy (i.e., the receiver or consumer is queuing chunks of incoming data for later consumption), the .write() method will return false.

When .write() returns false, a backpressure system takes effect. It pauses the flow of data from the readable stream and waits until the consumer is ready again. Once the buffer has been emptied and the stream is ready to receive more data, a drain event is emitted. At that point, the flow resumes.

Node.js provides powerful tools for working with large volumes of data. Many of these concepts --- like streams, pipes, and backpressure --- are inspired by real-world water systems, making them intuitive to grasp. With these mechanisms in mind, we're better equipped to write efficient, scalable code that can handle a variety of data flow scenarios.

Top comments (0)