DEV Community

Alex Chen
Alex Chen

Posted on

Node.js Streams: The Practical Guide

Node.js Streams: The Practical Guide

Streams are one of Node's most powerful features. Here's how to use them.

What Are Streams?

No Streams (loads everything into memory):
[File on Disk] → [Read ALL into RAM] → [Process ALL in RAM] → [Write ALL to Output]

With Streams (process piece by piece):
[File on Disk] → [Chunk 1] → [Process] → [Output]
              → [Chunk 2] → [Process] → [Output]
              → [Chunk 3] → [Process] → [Output]
              → ... (constant low memory usage)
Enter fullscreen mode Exit fullscreen mode

Types of Streams

// Readable — data comes OUT
const fs = require('fs');
const readable = fs.createReadStream('big-file.txt');

// Writable — data goes IN
const writable = fs.createWriteStream('output.txt');

// Duplex — both directions (like a TCP socket)
const net = require('net');

// Transform — modify data as it passes through (like zlib)
const zlib = require('zlib');
const gzip = zlib.createGzip();
Enter fullscreen mode Exit fullscreen mode

Basic Usage: File Copy

// ❌ Bad for large files (loads entire file into memory)
const fs = require('fs');
const data = fs.readFileSync('large-file.txt');
fs.writeFileSync('copy.txt', data);

// ✅ Good: Uses streams (constant memory, regardless of file size!)
const { createReadStream, createWriteStream } = require('fs');
const readStream = createReadStream('large-file.txt');
const writeStream = createWriteStream('copy.txt');

readStream.pipe(writeStream); // That's it!
Enter fullscreen mode Exit fullscreen mode

Pipe vs pipeline

const { createReadStream, createWriteStream } = require('fs');
const { pipeline } = require('stream/promises'); // Modern way
const zlib = require('zlib');

// pipe() — old school, doesn't handle errors well
createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(createWriteStream('output.txt.gz'));

// pipeline() — handles errors properly, returns Promise
await pipeline(
  createReadStream('input.txt'),
  zlib.createGzip(),
  createWriteStream('output.txt.gz')
);
console.log('File compressed!');
Enter fullscreen mode Exit fullscreen mode

Transform Stream (Custom Processing)

const { Transform } = require('stream');

// Uppercase transform stream
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// CSV-to-JSON transform
class CsvToJsonTransform extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.headers = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line in buffer

    lines.forEach(line => {
      if (!line) return;

      if (!this.headers) {
        this.headers = line.split(',');
        return;
      }

      const values = line.split(',');
      const obj = {};
      this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
      this.push(obj);
    });

    callback();
  }
}

// Usage:
await pipeline(
  createReadStream('data.csv'),
  new CsvToJsonTransform(),
  // Now we get JSON objects! Could write to DB, API, etc.
);
Enter fullscreen mode Exit fullscreen mode

Practical Examples

Process Large JSON File

const { createReadStream } = require('fs');
const { pipeline } = require('stream/promises');
const { JSONParser } = require('stream-json'); // npm install stream-json

let userCount = 0;

await pipeline(
  createReadStream('huge-data.json'),
  JSONParser(),
  // Process each item without loading the whole file
  async function* (source) {
    for await (const item of source) {
      if (item.value?.type === 'user') {
        userCount++;
        // Process each user individually
        await saveToDatabase(item.value);
      }
    }
  }
);

console.log(`Processed ${userCount} users`);
Enter fullscreen mode Exit fullscreen mode

HTTP Request/Response Streaming

const http = require('http');
const fs = require('fs');

// Serve large files without loading into memory
const server = http.createServer((req, res) => {
  if (req.url === '/download') {
    const filePath = '/path/to/large-file.zip';

    res.setHeader('Content-Type', 'application/zip');
    res.setHeader('Content-Disposition', 'attachment; filename=file.zip');

    const readStream = fs.createReadStream(filePath);
    readStream.pipe(res); // Stream directly to response

    // Handle errors
    readStream.on('error', err => {
      console.error('Read error:', err);
      res.statusCode = 500;
      res.end('Download failed');
    });
  }
});

server.listen(3000);
Enter fullscreen mode Exit fullscreen mode

Compress HTTP Responses

const http = require('http');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  // Check if client accepts gzip
  const acceptEncoding = req.headers['accept-encoding'] || '';

  if (acceptEncoding.includes('gzip')) {
    res.writeHead(200, { 'Content-Encoding': 'gzip' });
    const jsonData = JSON.stringify(largeObject);

    // Compress and send in one stream
    const gzip = zlib.createGzip();
    gzip.end(jsonData);
    gzip.pipe(res);
  } else {
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(JSON.stringify(largeObject));
  }
});
Enter fullscreen mode Exit fullscreen mode

Merge Multiple Files

const { pipeline } = require('stream/promises');
const { createReadStream, createWriteStream } = require('fs');

async function mergeFiles(outputPath, ...inputPaths) {
  const writeStream = createWriteStream(outputPath);

  for (const inputPath of inputPaths) {
    await pipeline(createReadStream(inputPath), writeStream, { end: false });
  }

  writeStream.end();
}

// Combine part1.txt + part2.txt + part3.txt → merged.txt
await mergeFiles('merged.txt', 'part1.txt', 'part2.txt', 'part3.txt');
Enter fullscreen mode Exit fullscreen mode

Stream Events Reference

const readStream = createReadStream('file.txt');

// Important events:
readStream.on('data', (chunk) => {});     // Received data chunk
readStream.on('end', () => {});            // No more data
readStream.on('error', (err) => {});       // Something went wrong
readStream.on('close', () => {});          // Stream closed
readStream.on('ready', () => {});          // Ready to be read

// Flow control (for backpressure):
readStream.pause();   // Stop emitting data
readStream.resume();  // Resume emitting data

const writeStream = createWriteStream('out.txt');
writeStream.on('drain', () => {});         // Safe to write again (was full)
writeStream.on('finish', () => {});        // All data written
writeStream.on('error', (err) => {});      // Write error
writeStream.on('pipe', (src) => {});       // Source piped in
writeStream.on('unpipe', (src) => {});     // Source unpiped
Enter fullscreen mode Exit fullscreen mode

Quick Decision Guide

Scenario Use
Read/write large files createReadStream / createWriteStream
Chain operations pipeline() or .pipe()
Modify data in transit Transform stream
Compress/decompress zlib streams (createGzip, createGunzip)
HTTP file download pipe(response)
Parse large JSON/XML stream-json or custom Transform
Merge multiple sources Sequential pipeline calls
Backpressure handling drain event + write() return value

What's your favorite use case for streams? Share it below!

Follow @armorbreak for more Node.js content.

Top comments (0)