DEV Community

Alex Chen
Alex Chen

Posted on

Node.js Streams: The Practical Guide

Node.js Streams: The Practical Guide

Streams are why Node can handle 10k+ concurrent connections. Here's how they work.

What Are Streams?

Without Streams (buffering):
File (100MB) → Read ALL into memory → Process → Write ALL to output
Memory usage: 100MB+ 💀

With Streams:
File → Read chunk by chunk → Process each chunk → Write each chunk
Memory usage: ~64KB per chunk ✅
Enter fullscreen mode Exit fullscreen mode

Four Types of Streams

// 1. Readable — Data comes OUT
const fs = require('fs');
const readable = fs.createReadStream('bigfile.txt');

// 2. Writable — Data goes IN
const writable = fs.createWriteStream('output.txt');

// 3. Duplex — Both directions (TCP socket)
const net = require('net');
const socket = new net.Socket(); // Readable AND Writable

// 4. Transform — Modify data in transit
const { Transform } = require('stream');
const uppercase = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  }
});
Enter fullscreen mode Exit fullscreen mode

Basic Usage

Reading a File

const fs = require('fs');

const readable = fs.createReadStream('large-file.txt', {
  encoding: 'utf8',    // Auto-convert chunks to strings
  highWaterMark: 64 * 1024 // Chunk size: 64KB (default)
});

readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
  // Process chunk here
});

readable.on('end', () => {
  console.log('Done reading');
});

readable.on('error', (err) => {
  console.error('Error:', err);
});
Enter fullscreen mode Exit fullscreen mode

Writing a File

const fs = require('fs');
const writable = fs.createWriteStream('output.txt');

writable.write('Hello, ');
writable.write('world!\n');
writable.end(); // Close the stream

writable.on('finish', () => {
  console.log('All writes completed');
});

writable.on('error', (err) => {
  console.error('Error:', err);
});
Enter fullscreen mode Exit fullscreen mode

Piping (The Magic)

const fs = require('fs');
const zlib = require('zlib');

// One-liner that handles everything!
fs.createReadStream('large-file.txt')
  .pipe(zlib.createGzip())           // Compress on the fly
  .pipe(fs.createWriteStream('file.txt.gz'));

// No memory issues regardless of file size!
// Back-pressure handled automatically!

// HTTP response streaming:
const http = require('http');
http.createServer((req, res) => {
  fs.createReadStream('video.mp4')
    .pipe(res); // Stream video to browser without loading it all into RAM
}).listen(3000);
Enter fullscreen mode Exit fullscreen mode

Pipe vs pipeline

// pipe (older, doesn't handle errors well)
readable.pipe(writable);

// pipeline (newer, recommended)
const { pipeline } = require('stream/promises'); // Or require('stream').pipeline

async function processFile() {
  await pipeline(
    fs.createReadStream('input.txt'),
    zlib.createGzip(),
    fs.createWriteStream('output.gz'),
    (err) => {
      if (err) console.error('Pipeline failed:', err);
      else console.log('Pipeline succeeded');
    }
  );
}
Enter fullscreen mode Exit fullscreen mode

Custom Transform Streams

const { Transform } = require('stream');

// CSV to JSON converter
class CsvToJson extends Transform {
  constructor(options) {
    super({ ...options, objectMode: true });
    this.headers = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line in buffer

    for (const line of lines) {
      if (!line.trim()) continue;

      if (!this.headers) {
        this.headers = line.split(',');
        continue;
      }

      const values = line.split(',');
      const obj = {};
      this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
      this.push(obj); // Output as JS objects
    }

    callback();
  }
}

// Usage:
fs.createReadStream('data.csv')
  .pipe(new CsvToJson())
  .on('data', (row) => {
    console.log(row); // { name: "Alex", age: "30", city: "NYC" }
  });
Enter fullscreen mode Exit fullscreen mode

Practical Examples

Log Rotator (Read + Write + Rotate)

const fs = require('fs');
const path = require('path');

function createLogRotator(maxSizeBytes = 5 * 1024 * 1024) {
  let currentSize = 0;
  let fileNum = 1;

  function getOutputPath() {
    return path.join(__dirname, `logs/app-${fileNum}.log`);
  }

  const logStream = new Writable({
    write(chunk, encoding, callback) {
      currentSize += chunk.length;

      if (currentSize > maxSizeBytes) {
        fileNum++;
        currentSize = 0;
        // Could emit event to switch file here
      }

      fs.appendFileSync(getOutputPath(), chunk);
      callback();
    }
  });

  return logStream;
}

process.stdout.pipe(createLogRotator());
Enter fullscreen mode Exit fullscreen mode

API Response Stream

const http = require('http');

http.createServer(async (req, res) => {
  res.writeHead(200, {
    'Content-Type': 'application/json',
    'Transfer-Encoding': 'chunked' // Streaming!
  });

  // Stream database results as they come
  const dbStream = getDatabaseCursor(); // Hypothetical cursor

  res.write('[\n');
  let first = true;

  for await (const row of dbStream) {
    if (!first) res.write(',\n');
    first = false;
    res.write(JSON.stringify(row));
  }

  res.write('\n]');
  res.end();

}).listen(3000);

// Client receives data incrementally instead of waiting for all results!
Enter fullscreen mode Exit fullscreen mode

Real-time File Watcher

const fs = require('fs');
const readline = require('readline');

function tailFile(filePath) {
  const stream = fs.createReadStream(filePath, {
    start: 0,
    encoding: 'utf8'
  });

  const rl = readline.createInterface({
    input: stream,
    crlfDelay: Infinity
  });

  rl.on('line', (line) => {
    // Process each line as it's written to the file
    const logEntry = JSON.parse(line);
    if (logEntry.level === 'error') {
      alertTeam(logEntry);
    }
  });
}

tailFile('/var/log/app.log');
Enter fullscreen mode Exit fullscreen mode

Memory Comparison

// ❌ Buffering (don't do this with large files!)
const fs = require('fs');
const data = fs.readFileSync('huge-file.json'); // Loads ENTIRE file into RAM
const parsed = JSON.parse(data);               // Duplicates in memory
const result = parsed.filter(item => item.active); // Another copy!
// Memory: 3x the file size

// ✅ Streaming (constant memory!)
const JSONStream = require('JSONStream');
fs.createReadStream('huge-file.json')
  .pipe(JSONStream.parse('*'))     // Parse item by item
  .pipe(new FilterTransform({      // Filter without buffering all
    transform(item, enc, cb) {
      if (item.active) cb(null, item);
      else cb(); // Skip
    }
  }));
// Memory: ~64KB regardless of file size!
Enter fullscreen mode Exit fullscreen mode

Quick Reference

Method Use Case
.pipe(dest) Connect streams
pipeline(...) Error-safe piping
Readable.from(iterable) Create readable from array/async iterator
new Transform() Modify data in-flight
objectMode: true Pass objects instead of buffers
highWaterMark Control buffer size
.on('data') Handle incoming chunks
.on('end') Stream finished
.on('error') Handle errors

What's your favorite use case for Node.js streams?

Follow @armorbreak for more Node.js content.

Top comments (0)