DEV Community

Alex Chen
Alex Chen

Posted on

Node.js Streams: The Practical Guide

Node.js Streams: The Practical Guide

Streams are one of Node.js's most powerful features. Most developers avoid them — here's how to actually use them.

What Are Streams?

Without streams:
  const data = fs.readFileSync('huge-10gb-file.csv'); // 😵 10GB in RAM!
  process(data);

With streams:
  fs.createReadStream('huge-10gb-file.csv')
    .pipe(transform)
    .pipe(writeStream);
  // Uses ~64KB of RAM regardless of file size!
Enter fullscreen mode Exit fullscreen mode

The Four Stream Types

// 1. Readable — data comes OUT
const readable = fs.createReadStream('input.txt');

// 2. Writable — data goes IN
const writable = fs.createWriteStream('output.txt');

// 3. Duplex — both ways (like a TCP socket)
const duplex = net.createConnection(port, host);

// 4. Transform — modify data as it passes through
const transform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});
Enter fullscreen mode Exit fullscreen mode

Reading Files (Readable)

const fs = require('fs');

// ❌ Don't read large files with readFile
const data = fs.readFileSync('big.csv', 'utf8'); // Blocks + loads all in memory

// ✅ Use createReadStream
const readStream = fs.createReadStream('big.csv', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024, // 64KB chunks (default)
});

readStream.on('data', (chunk) => {
  // chunk is a string (because encoding: 'utf8')
  // Each chunk is ~64KB
  process.stdout.write(`Got chunk: ${chunk.length} chars\n`);
});

readStream.on('end', () => {
  console.log('Done reading!');
});

readStream.on('error', (err) => {
  console.error('Read error:', err);
});
Enter fullscreen mode Exit fullscreen mode

Writing Files (Writable)

const fs = require('fs');

// ❌ Don't write large files with writeFile
fs.writeFileSync('big-output.txt', hugeString); // Blocks!

// ✅ Use createWriteStream
const writeStream = fs.createWriteStream('big-output.txt');

writeStream.write('First line\n');
writeStream.write('Second line\n');
writeStream.end('Last line\n');

writeStream.on('finish', () => {
  console.log('Done writing!');
});

writeStream.on('error', (err) => {
  console.error('Write error:', err);
});

// Backpressure handling
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');

// pipe() handles backpressure automatically!
readStream.pipe(writeStream);
Enter fullscreen mode Exit fullscreen mode

Pipe: The Magic Method

// pipe() connects streams — handles backpressure automatically
// No need to manually manage data flow!

// File copy
fs.createReadStream('source.mp4')
  .pipe(fs.createWriteStream('copy.mp4'));

// HTTP response streaming
const server = http.createServer((req, res) => {
  fs.createReadStream('video.mp4').pipe(res);
  // Browser receives data as it's read — no 4GB memory spike!
});

// Process CSV line by line
const { pipeline } = require('stream/promises');

await pipeline(
  fs.createReadStream('data.csv'),
  // Transform each line
  new Transform({
    objectMode: true,
    transform(chunk, encoding, callback) {
      const line = chunk.toString().trim();
      if (line) {
        const [name, email] = line.split(',');
        this.push({ name, email });
      }
      callback();
    }
  }),
  // Write processed data
  new Writable({
    objectMode: true,
    write(record, encoding, callback) {
      db.insert(record, callback);
    }
  })
);
Enter fullscreen mode Exit fullscreen mode

Transform Streams (Custom Processing)

const { Transform } = require('stream');

// Example 1: CSV to JSON
class CsvToJson extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
    this.headers = null;
  }

  _transform(chunk, encoding, callback) {
    const line = chunk.toString().trim();
    if (!line) return callback();

    const values = line.split(',');

    if (!this.headers) {
      this.headers = values; // First line is headers
      return callback();
    }

    const obj = {};
    this.headers.forEach((header, i) => {
      obj[header.trim()] = values[i]?.trim() || '';
    });

    this.push(obj);
    callback();
  }
}

// Usage
fs.createReadStream('users.csv')
  .pipe(new CsvToJson())
  .on('data', (user) => console.log(user));
// Output: { name: 'Alice', email: 'alice@example.com', role: 'admin' }
Enter fullscreen mode Exit fullscreen mode
// Example 2: Compress on the fly
const { createGzip } = require('zlib');

fs.createReadStream('large-file.log')
  .pipe(createGzip())  // Transform: compress
  .pipe(fs.createWriteStream('large-file.log.gz'));
// Reads → compresses → writes — all streaming!
Enter fullscreen mode Exit fullscreen mode
// Example 3: Filter stream
class FilterStream extends Transform {
  constructor(predicate) {
    super({ objectMode: true });
    this.predicate = predicate;
  }

  _transform(chunk, encoding, callback) {
    if (this.predicate(chunk)) {
      this.push(chunk); // Only pass matching items
    }
    callback();
  }
}

// Usage: filter users who are active
fs.createReadStream('users.csv')
  .pipe(new CsvToJson())
  .pipe(new FilterStream(user => user.active === 'true'))
  .pipe(new JsonToStringArray())
  .pipe(fs.createWriteStream('active-users.json'));
Enter fullscreen mode Exit fullscreen mode

Async Iterators (Modern Approach)

// Node.js 10+ supports async iteration over streams!
// Much cleaner than event listeners

async function processLogFile(filePath) {
  const readStream = fs.createReadStream(filePath, { encoding: 'utf8' });

  for await (const chunk of readStream) {
    const lines = chunk.split('\n');
    for (const line of lines) {
      if (line.includes('ERROR')) {
        console.error(line);
      }
    }
  }
}

// With pipeline (error handling + cleanup)
const { pipeline } = require('stream/promises');

async function processCsv(inputPath, outputPath) {
  await pipeline(
    fs.createReadStream(inputPath),
    new CsvToJson(),
    new FilterStream(user => user.active === 'true'),
    fs.createWriteStream(outputPath)
  );
  console.log('Processing complete!');
}

// pipeline() handles:
// - Backpressure automatically
// - Error propagation (if any stream fails, all are cleaned up)
// - Resource cleanup (closes all streams on error)
Enter fullscreen mode Exit fullscreen mode

Real-World Use Cases

// 1. Large file processing (CSV, JSON, logs)
// Process files larger than available RAM

// 2. HTTP proxying
http.createServer((req, res) => {
  http.get('http://api-backend.com' + req.url, (backendRes) => {
    backendRes.pipe(res); // Stream response directly to client
  });
});

// 3. Database streaming
const queryStream = db.query('SELECT * FROM users').stream();
queryStream.pipe(new CsvToJson())
  .pipe(res); // Stream query results to HTTP response

// 4. File upload handling
app.post('/upload', (req, res) => {
  req.pipe(fs.createWriteStream(`uploads/${Date.now()}.jpg`));
  req.on('end', () => res.json({ success: true }));
});

// 5. Real-time log processing
const logStream = fs.createReadStream('/var/log/app.log', { flags: 'r' });
const tailStream = tailFile('/var/log/app.log'); // Follow new lines
tailStream.pipe(new LogParser())
  .pipe(new AlertFilter())
  .pipe(new SlackNotifier());
Enter fullscreen mode Exit fullscreen mode

Stream Debugging

// Log what passes through a stream
const { Transform } = require('stream');

class Logger extends Transform {
  constructor(name = 'stream') {
    super();
    this.name = name;
    this.bytes = 0;
    this.chunks = 0;
  }

  _transform(chunk, encoding, callback) {
    this.bytes += chunk.length;
    this.chunks++;
    console.log(`[${this.name}] Chunk #${this.chunks}: ${chunk.length} bytes`);
    this.push(chunk);
    callback();
  }

  _flush(callback) {
    console.log(`[${this.name}] Total: ${this.bytes} bytes in ${this.chunks} chunks`);
    callback();
  }
}

// Usage: insert between any two streams
readStream
  .pipe(new Logger('input'))
  .pipe(transform)
  .pipe(new Logger('output'))
  .pipe(writeStream);
Enter fullscreen mode Exit fullscreen mode

Quick Reference

Method Purpose
stream.pipe(dest) Connect streams (auto backpressure)
stream.on('data', cb) Handle each chunk
stream.on('end', cb) Stream finished
stream.on('error', cb) Handle errors
for await (const chunk of stream) Async iteration
pipeline(...streams) Compose with error handling
Readable.from(array) Create from array
PassThrough Stream that passes data unchanged

Do you use streams in your Node.js apps? What for?

Follow @armorbreak for more Node.js content.

Top comments (0)