DEV Community

Alex Chen
Alex Chen

Posted on

Node.js Streams: The Practical Guide (2026)

Node.js Streams: The Practical Guide (2026)

Streams are one of Node.js's most powerful features. Here's how to actually use them.

Why Streams Matter

Without streams:
  read entire file into memory → process → write entire result
  Memory usage: 100MB for a 100MB file
  Problem: What if the file is 10GB?

With streams:
  read chunk by chunk → process each chunk → write chunk by chunk
  Memory usage: ~64KB regardless of file size
  Result: Process files larger than RAM
Enter fullscreen mode Exit fullscreen mode

The 4 Stream Types

Type Used For Example
Readable Reading data fs.createReadStream(), HTTP requests
Writable Writing data fs.createWriteStream(), HTTP responses
Duplex Both reading & writing TCP sockets, WebSocket
Transform Modify data between read/write zlib, CSV parsing

Basic Example: File Copy

const fs = require('fs');

// ❌ BAD — Loads everything into memory
const data = fs.readFileSync('large-file.txt');
fs.writeFileSync('copy.txt', data);

// ✅ GOOD — Streams in chunks (64KB memory)
const readStream = fs.createReadStream('large-file.txt');
const writeStream = fs.createWriteStream('copy.txt');

readStream.pipe(writeStream);

// That's it. One line.
Enter fullscreen mode Exit fullscreen mode

pipe() — The Magic Method

readableStream.pipe(writableStream);
Enter fullscreen mode Exit fullscreen mode

What it does:

  1. Listens for data events from readable
  2. Writes each chunk to writable
  3. Handles backpressure (slows down if writable can't keep up)
  4. Closes writable when readable ends
// Chaining pipes (Unix philosophy in JavaScript)
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())           // Compress
  .pipe(crypto.createCipheriv(...))  // Encrypt
  .pipe(fs.createWriteStream('output.gz.enc'));  // Write

// Data flows left → right through each transform
Enter fullscreen mode Exit fullscreen mode

Real-World Use Cases

1. Processing Large Files (CSV Example)

const fs = require('fs');
const { pipeline } = require('stream/promises'); // Node 18+
const { Transform } = require('stream');

// A transform that processes CSV rows
class CSVParse extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.buffer = '';
    this.header = null;
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');

    // Keep last incomplete line in buffer
    this.buffer = lines.pop();

    for (const line of lines) {
      if (!line.trim()) continue;

      if (!this.header) {
        this.header = line.split(',');
        continue;
      }

      const values = line.split(',');
      const row = {};
      this.header.forEach((h, i) => row[h] = values[i]);
      this.push(row); // Output object mode
    }

    callback();
  }
}

async function processLargeCSV(inputPath) {
  const results = [];

  await pipeline(
    fs.createReadStream(inputPath),
    new CSVParse(),
    new Transform({
      objectMode: true,
      transform(row, _enc, callback) {
        // Process each row independently
        if (Number(row.price) > 1000) {
          results.push(row);
        }
        callback();
      }
    })
  );

  console.log(`Found ${results.length} expensive items`);
  return results;
}
Enter fullscreen mode Exit fullscreen mode

2. HTTP Request Streaming

const https = require('https');
const fs = require('fs');

// Download large file with progress
function download(url, destPath) {
  return new Promise((resolve, reject) => {
    const file = fs.createWriteStream(destPath);
    let downloadedBytes = 0;

    https.get(url, (response) => {
      if (response.statusCode >= 300 && response.statusCode < 400 && response.headers.location) {
        // Handle redirect
        download(response.headers.location, destPath).then(resolve).catch(reject);
        return;
      }

      if (response.statusCode !== 200) {
        reject(new Error(`Status: ${response.statusCode}`));
        return;
      }

      const totalSize = parseInt(response.headers['content-length'], 10);

      response.on('data', (chunk) => {
        downloadedBytes += chunk.length;

        if (totalSize) {
          const percent = ((downloadedBytes / totalSize) * 100).toFixed(1);
          process.stdout.write(`\rDownloaded: ${percent}% (${downloadedBytes}/${totalSize})`);
        }
      });

      response.pipe(file);

      file.on('finish', () => {
        file.close();
        console.log('\nDownload complete!');
        resolve();
      });
    }).on('error', reject);
  });
}

await download('https://example.com/large-file.zip', './file.zip');
Enter fullscreen mode Exit fullscreen mode

3. API Response Streaming

const express = require('express');
const app = express();

// Stream JSON array as NDJSON (one object per line)
app.get('/api/users-stream', (_req, res) => {
  res.setHeader('Content-Type', 'application/x-ndjson');

  // Simulate streaming from database
  const users = [
    { id: 1, name: 'Alice' },
    { id: 2, name: 'Bob' },
    // ... thousands more
  ];

  // Write header
  res.write('[\n');

  users.forEach((user, index) => {
    const isLast = index === users.length - 1;
    const json = JSON.stringify(user) + (isLast ? '\n]' : ',\n');
    res.write(json);
  });

  res.end();
});

// Client can parse incrementally:
// Each line arrives as soon as it's written
// No need to wait for full response
Enter fullscreen mode Exit fullscreen mode

4. Log Aggregation (Multiple Sources → One File)

const fs = require('fs');
const net = require('net');

// Server that accepts log lines from multiple sources
const server = net.createServer((socket) => {
  socket
    .pipe(splitStream())         // Split by newline
    .pipe(timestampTransform())   // Add timestamp
    .pipe(formatTransform())       // Format nicely
    .pipe(fs.createWriteStream('all-logs.log', { flags: 'a' })); // Append
});

server.listen(4000, () => console.log('Log aggregator on :4000'));
Enter fullscreen mode Exit fullscreen mode

Error Handling in Streams

const fs = require('fs');

const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');

// ❌ Errors are silent by default!
readStream.pipe(writeStream);

// ✅ Handle errors properly
readStream
  .on('error', (err) => {
    console.error('Read error:', err.message);
    writeStream.destroy(); // Don't leave writer hanging
  })
  .pipe(writeStream)
  .on('error', (err) => {
    console.error('Write error:', err.message);
  })
  .on('finish', () => {
    console.log('Done!');
  });

// Or use pipeline (Node 18+):
const { pipeline } = require('stream/promises');

try {
  await pipeline(
    fs.createReadStream('input.txt'),
    myTransform,
    fs.createWriteStream('output.txt')
  );
  console.log('Pipeline completed successfully');
} catch (err) {
  console.error('Pipeline failed:', err.message);
}
Enter fullscreen mode Exit fullscreen mode

Backpressure Explained

// Backpressure = when writable is slower than readable
// Good news: pipe() handles this automatically!

// Manual backpressure:
function manualBackpressure(readable, writable) {
  readable.on('data', (chunk) => {
    const canWrite = writable.write(chunk);

    if (!canWrite) {
      // Pause readable until writable drains
      readable.pause();
      writable.once('drain', () => {
        readable.resume();
      });
    }
  });
}

// But seriously, just use .pipe() or pipeline()
Enter fullscreen mode Exit fullscreen mode

Creating Your Own Transform

const { Transform } = require('stream');

// Uppercase transform
class UpperCase extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// JSON parser (handles multi-chunk objects)
class JSONParser extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();

    try {
      // Try to parse complete objects from buffer
      while (this.buffer.length > 0) {
        const obj = JSON.parse(this.buffer);
        this.push(obj);
        this.buffer = ''; // Consumed successfully
      }
    } catch (e) {
      // Incomplete JSON, wait for more data
    }

    callback();
  }
}

// Usage
fs.createReadStream('data.jsonl')  // One JSON object per line
  .pipe(new JSONParser())
  .on('data', (obj) => {
    console.log('Parsed:', obj);
  });
Enter fullscreen mode Exit fullscreen mode

Stream vs Buffer: When to Use Which

Use BUFFER when:
→ Small data (< 10MB)
→ Need random access
→ Data must be fully loaded before processing
→ Example: Config files, API responses under 1MB

Use STREAM when:
→ Large or unknown size data
→ Processing can start before all data arrives
→ Memory efficiency matters
→ Example: File processing, API proxies, log aggregation, real-time data
Enter fullscreen mode Exit fullscreen mode

Quick Reference

// Create streams
fs.createReadStream(path, { highWaterMark: 64 * 1024 })  // Chunk size
fs.createWriteStream(path, { flags: 'a' })            // Append mode
new stream.PassThrough()                               // Identity transform
new stream.Readable({ read(_size) { this.push(data); } })  // Custom readable
new stream.Writable({ write(chunk, _enc, cb) { cb(); } })  // Custom writable

// Events
stream.on('data', (chunk) => {})     // New chunk available
stream.on('end', () => {})            // No more data
stream.on('error', (err) => {})       // Something went wrong
stream.on('finish', () => {})         // Write complete
stream.on('close', () => {})          // Stream destroyed
stream.on('drain', () => {})          // Safe to write again (backpressure)

// Methods
stream.pause()                       // Pause reading
stream.resume()                      // Resume reading
stream.destroy()                     // Close and cleanup
stream.pipe(destination)             // Connect streams
stream.unpipe(destination)           // Disconnect

// Async iteration (Node 18+) — cleanest API!
for await (const chunk of readableStream) {
  processChunk(chunk);
}
Enter fullscreen mode Exit fullscreen mode

Are you using streams enough? Or still loading everything into memory?

Follow @armorbreak for more practical Node.js guides.

Top comments (0)