DEV Community

Alex Chen
Alex Chen

Posted on

Node.js Streams: The Practical Guide

Node.js Streams: The Practical Guide

Processing 10GB files? Don't load them into memory. Use streams.

What Are Streams?

Without streams (loads everything into memory):
  File (10GB) → [RAM: 10GB] → Process → [RAM: 10GB] → Output
  ❌ Memory explosion!

With streams (process chunk by chunk):
  File (10GB) → [64KB chunk] → Process → [64KB chunk] → Output
  ✅ Constant memory usage!
Enter fullscreen mode Exit fullscreen mode

Four Types of Streams

// 1. Readable — data flows OUT
const readable = fs.createReadStream('big-file.txt');

// 2. Writable — data flows IN
const writable = fs.createWriteStream('output.txt');

// 3. Duplex — both in and out (like TCP socket)
const duplex = net.Socket();

// 4. Transform — modify data as it passes through
const transform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});
Enter fullscreen mode Exit fullscreen mode

Reading Files

const fs = require('fs');

// ❌ Don't do this for large files:
// const data = fs.readFileSync('huge.csv', 'utf8'); // Loads ALL into RAM

// ✅ Stream instead:
const readStream = fs.createReadStream('huge.csv', {
  highWaterMark: 64 * 1024, // 64KB chunks (default)
  encoding: 'utf8',
});

let lineCount = 0;

readStream.on('data', (chunk) => {
  // Process each chunk as it arrives
  const lines = chunk.split('\n');
  lineCount += lines.length;
});

readStream.on('end', () => {
  console.log(`Total lines: ${lineCount}`);
});

readStream.on('error', (err) => {
  console.error('Read error:', err);
});
Enter fullscreen mode Exit fullscreen mode

Writing Files

const writeStream = fs.createWriteStream('output.txt');

writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end('Done.');

// Events
writeStream.on('finish', () => console.log('All data written'));
writeStream.on('error', (err) => console.error('Write error:', err));

// Backpressure handling
const canWriteMore = writeStream.write('Some data...');
if (!canWriteMore) {
  // Buffer is full! Wait for drain event
  writeStream.once('drain', () => {
    writeStream.write('More data');
  });
}
Enter fullscreen mode Exit fullscreen mode

Pipe — The Magic Operator

// Pipe connects readable → writable automatically!
// Handles backpressure for you!

// Copy a file
fs.createReadStream('input.txt').pipe(fs.createWriteStream('output.txt'));

// Compress a file
const zlib = require('zlib');
fs.createReadStream('log.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('log.txt.gz'));

// HTTP response streaming
const server = http.createServer((req, res) => {
  fs.createReadStream('video.mp4').pipe(res);
});
Enter fullscreen mode Exit fullscreen mode

Transform Streams

const { Transform } = require('stream');

// CSV → JSON converter
class CsvToJson extends Transform {
  constructor(options) {
    super({ ...options, readableObjectMode: true });
    this.buffer = '';
    this.headers = null;
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();

    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line

    for (const line of lines) {
      if (!line.trim()) continue;

      const values = line.split(',');

      if (!this.headers) {
        this.headers = values;
        continue;
      }

      const obj = {};
      this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
      this.push(obj);
    }

    callback();
  }
}

// Usage
fs.createReadStream('users.csv')
  .pipe(new CsvToJson())
  .on('data', (user) => console.log(user));
Enter fullscreen mode Exit fullscreen mode

Practical Examples

Process Large CSV File

const { pipeline } = require('stream/promises');
const { Transform } = require('stream');

async function processLargeCsv(inputFile, outputFile) {
  const filter = new Transform({
    objectMode: true,
    transform(row, encoding, callback) {
      if (Number(row.price) > 100) {
        this.push(JSON.stringify(row) + '\n');
      }
      callback();
    }
  });

  await pipeline(
    fs.createReadStream(inputFile),
    new CsvToJson(),     // From previous example
    filter,              // Only expensive items
    fs.createWriteStream(outputFile)
  );

  console.log('Processing complete!');
}
Enter fullscreen mode Exit fullscreen mode

HTTP File Upload Stream

const server = http.createServer((req, res) => {
  if (req.method === 'POST' && req.url === '/upload') {
    const fileStream = fs.createWriteStream('uploaded.dat');

    req.pipe(fileStream);

    fileStream.on('finish', () => {
      const stats = fs.statSync('uploaded.dat');
      res.writeHead(200, { 'Content-Type': 'application/json' });
      res.end(JSON.stringify({ size: stats.size, message: 'Upload complete' }));
    });
  }
});
Enter fullscreen mode Exit fullscreen mode

Stream Merge — Multiple Files into One

const { Readable } = require('stream');

function mergeStreams(streams) {
  let pass = new PassThrough();
  let i = 0;

  function next() {
    if (i >= streams.length) {
      pass.end();
      return;
    }
    streams[i++].pipe(pass, { end: false }).on('end', next);
  }

  next();
  return pass;
}

// Merge multiple log files
const merged = mergeStreams([
  fs.createReadStream('app1.log'),
  fs.createReadStream('app2.log'),
  fs.createReadStream('app3.log'),
]);

merged.pipe(fs.createWriteStream('all-logs.log'));
Enter fullscreen mode Exit fullscreen mode

async/await with Streams

const { pipeline } = require('stream/promises');

// Clean API for piping with error handling
async function compressFile(input, output) {
  try {
    await pipeline(
      fs.createReadStream(input),
      zlib.createGzip(),
      fs.createWriteStream(output)
    );
    console.log('Compressed successfully!');
  } catch (err) {
    console.error('Compression failed:', err);
  }
}
Enter fullscreen mode Exit fullscreen mode

Quick Reference

Method Description
readable.pipe(writable) Connect streams (auto backpressure)
readable.on('data', fn) Handle each chunk
readable.on('end', fn) Stream finished
writable.write(data) Write data
writable.end() Close the stream
pipeline(...streams) Pipe with proper error handling
Transform Modify data in-flight
PassThrough Pass data through unchanged

Have you used streams in production? What for?

Follow @armorbreak for more Node.js content.

Top comments (0)