DEV Community

Wilson Xu
Wilson Xu

Posted on

Use Node.js Streams to Build CLI Tools That Handle Massive Files

Use Node.js Streams to Build CLI Tools That Handle Massive Files

Most CLI tutorials show you how to read an entire file into memory, process it, and write the output. This works fine for small files. But when your tool needs to handle a 2 GB log file or process a continuous data feed, loading everything into memory will crash your process or make it unbearably slow.

Node.js streams solve this problem elegantly. They let you process data piece by piece, keeping memory usage constant regardless of input size. In this article, we'll build a CLI tool that processes massive log files using streams — and learn patterns you can apply to any data-processing CLI.

The Problem with readFile

// This loads the ENTIRE file into memory
import { readFile } from 'node:fs/promises';
const data = await readFile('server.log', 'utf-8');
const lines = data.split('\n');
const errors = lines.filter(line => line.includes('ERROR'));
Enter fullscreen mode Exit fullscreen mode

For a 10 KB file? Fine. For a 2 GB production log? Your process will consume 2+ GB of RAM, and split('\n') will temporarily double that. On a CI runner with 512 MB of memory, this simply crashes.

Streams Process Data in Chunks

import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

const stream = createReadStream('server.log', 'utf-8');
const rl = createInterface({ input: stream });

let errorCount = 0;
for await (const line of rl) {
  if (line.includes('ERROR')) {
    errorCount++;
    console.log(line);
  }
}
console.error(`Total errors: ${errorCount}`);
Enter fullscreen mode Exit fullscreen mode

This processes the same 2 GB file using roughly 64 KB of memory. The readline interface reads one line at a time, and the garbage collector cleans up processed lines immediately.

What We're Building

logstream — a CLI tool that:

  1. Processes log files of any size via streams
  2. Filters by log level (ERROR, WARN, INFO, DEBUG)
  3. Extracts time ranges
  4. Counts occurrences and outputs statistics
  5. Supports piped input (cat log | logstream filter --level error)

Project Setup

mkdir logstream && cd logstream
npm init -y
npm install commander chalk
Enter fullscreen mode Exit fullscreen mode
{
  "name": "logstream-cli",
  "version": "1.0.0",
  "type": "module",
  "bin": { "logstream": "./bin/logstream.js" }
}
Enter fullscreen mode Exit fullscreen mode

Building the Stream Pipeline

The core of our tool is a composable stream pipeline. Each operation is a transform stream that can be chained:

// lib/transforms.js
import { Transform } from 'node:stream';

export function createLevelFilter(level) {
  const pattern = new RegExp(`\\b${level}\\b`, 'i');
  return new Transform({
    objectMode: true,
    transform(line, encoding, callback) {
      if (pattern.test(line)) {
        this.push(line);
      }
      callback();
    }
  });
}

export function createTimeRangeFilter(start, end) {
  const startTime = start ? new Date(start).getTime() : 0;
  const endTime = end ? new Date(end).getTime() : Infinity;

  // Common log timestamp patterns
  const patterns = [
    /(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2})/,  // ISO-ish
    /\[(\d{2}\/\w{3}\/\d{4}:\d{2}:\d{2}:\d{2})/,  // Apache
  ];

  return new Transform({
    objectMode: true,
    transform(line, encoding, callback) {
      for (const pattern of patterns) {
        const match = line.match(pattern);
        if (match) {
          const timestamp = new Date(match[1]).getTime();
          if (timestamp >= startTime && timestamp <= endTime) {
            this.push(line);
          }
          return callback();
        }
      }
      // No timestamp found — pass through
      this.push(line);
      callback();
    }
  });
}

export function createCounter() {
  let count = 0;
  const counter = new Transform({
    objectMode: true,
    transform(line, encoding, callback) {
      count++;
      this.push(line);
      callback();
    },
    flush(callback) {
      // Emit count when stream ends
      process.stderr.write(`\nProcessed ${count} matching lines\n`);
      callback();
    }
  });
  counter.getCount = () => count;
  return counter;
}

export function createJsonFormatter() {
  let first = true;
  return new Transform({
    objectMode: true,
    transform(line, encoding, callback) {
      const prefix = first ? '[\n' : ',\n';
      first = false;
      this.push(prefix + JSON.stringify(line));
      callback();
    },
    flush(callback) {
      this.push(first ? '[]' : '\n]');
      callback();
    }
  });
}
Enter fullscreen mode Exit fullscreen mode

The Line Splitter

We need to convert the raw byte stream into individual lines:

// lib/line-splitter.js
import { Transform } from 'node:stream';

export function createLineSplitter() {
  let buffer = '';

  return new Transform({
    objectMode: true,
    transform(chunk, encoding, callback) {
      buffer += chunk.toString();
      const lines = buffer.split('\n');
      buffer = lines.pop(); // Keep incomplete last line in buffer

      for (const line of lines) {
        if (line.trim()) {
          this.push(line);
        }
      }
      callback();
    },
    flush(callback) {
      if (buffer.trim()) {
        this.push(buffer);
      }
      callback();
    }
  });
}
Enter fullscreen mode Exit fullscreen mode

Wiring Up the CLI

#!/usr/bin/env node
// bin/logstream.js

import { program } from 'commander';
import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { createLineSplitter } from '../lib/line-splitter.js';
import {
  createLevelFilter,
  createTimeRangeFilter,
  createCounter,
  createJsonFormatter,
} from '../lib/transforms.js';

program
  .name('logstream')
  .description('Process log files using streams — handles any file size');

program
  .command('filter')
  .description('Filter log lines by level and/or time range')
  .argument('[file]', 'Log file path (or pipe via stdin)')
  .option('-l, --level <level>', 'Filter by log level (ERROR, WARN, INFO, DEBUG)')
  .option('--after <datetime>', 'Only show entries after this time')
  .option('--before <datetime>', 'Only show entries before this time')
  .option('--json', 'Output as JSON array')
  .option('--count', 'Only output the count of matching lines')
  .action(async (file, options) => {
    // Input: file or stdin
    const input = file
      ? createReadStream(file, 'utf-8')
      : process.stdin;

    // Build transform pipeline
    const transforms = [createLineSplitter()];

    if (options.level) {
      transforms.push(createLevelFilter(options.level));
    }

    if (options.after || options.before) {
      transforms.push(createTimeRangeFilter(options.after, options.before));
    }

    const counter = createCounter();
    transforms.push(counter);

    if (options.count) {
      // Consume silently, then print count
      transforms.push(new (await import('node:stream')).Transform({
        objectMode: true,
        transform(chunk, enc, cb) { cb(); },
        flush(cb) {
          process.stdout.write(counter.getCount().toString() + '\n');
          cb();
        }
      }));
    } else if (options.json) {
      transforms.push(createJsonFormatter());
    } else {
      // Default: print each line
      transforms.push(new (await import('node:stream')).Transform({
        objectMode: true,
        transform(line, enc, cb) {
          process.stdout.write(line + '\n');
          cb();
        }
      }));
    }

    try {
      await pipeline(input, ...transforms);
    } catch (err) {
      if (err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
        console.error(`Error: ${err.message}`);
        process.exit(1);
      }
    }
  });

program
  .command('stats')
  .description('Show statistics about a log file')
  .argument('[file]', 'Log file path (or pipe via stdin)')
  .action(async (file) => {
    const input = file
      ? createReadStream(file, 'utf-8')
      : process.stdin;

    const stats = { total: 0, error: 0, warn: 0, info: 0, debug: 0 };

    const splitter = createLineSplitter();
    const analyzer = new (await import('node:stream')).Transform({
      objectMode: true,
      transform(line, enc, cb) {
        stats.total++;
        if (/\bERROR\b/i.test(line)) stats.error++;
        else if (/\bWARN/i.test(line)) stats.warn++;
        else if (/\bINFO\b/i.test(line)) stats.info++;
        else if (/\bDEBUG\b/i.test(line)) stats.debug++;
        cb();
      },
      flush(cb) {
        console.log(JSON.stringify(stats, null, 2));
        cb();
      }
    });

    await pipeline(input, splitter, analyzer);
  });

program.parse();
Enter fullscreen mode Exit fullscreen mode

Usage Examples

# Filter errors from a large log file
logstream filter server.log --level error

# Time range query
logstream filter app.log --after "2026-03-01" --before "2026-03-15"

# Pipe from other commands
docker logs myapp | logstream filter --level warn --json

# Get error count for CI checks
ERROR_COUNT=$(logstream filter server.log --level error --count)
if [ "$ERROR_COUNT" -gt 0 ]; then
  echo "Found $ERROR_COUNT errors"
  exit 1
fi

# Combine filters
logstream filter server.log --level error --after "2026-03-19T10:00:00" --json > errors.json

# Stats overview
logstream stats server.log
# { "total": 42891, "error": 23, "warn": 156, "info": 41002, "debug": 1710 }
Enter fullscreen mode Exit fullscreen mode

Memory Comparison

Approach 100 MB file 1 GB file 10 GB file
readFile + .split() ~200 MB RAM ~2 GB RAM Crashes
Streams ~2 MB RAM ~2 MB RAM ~2 MB RAM

The stream approach uses constant memory regardless of file size. That's the entire point.

Key Stream Patterns for CLI Tools

1. Always support stdin

const input = file ? createReadStream(file) : process.stdin;
Enter fullscreen mode Exit fullscreen mode

This lets users pipe data into your tool. It's the Unix way.

2. Use pipeline() for error handling

import { pipeline } from 'node:stream/promises';
await pipeline(source, transform1, transform2, destination);
Enter fullscreen mode Exit fullscreen mode

pipeline automatically destroys all streams if any stream errors. Without it, you'll leak file handles and memory.

3. Use object mode for line-by-line processing

new Transform({ objectMode: true, ... })
Enter fullscreen mode Exit fullscreen mode

Object mode lets transforms pass JavaScript objects (like strings) instead of raw buffers. It's essential for line-based processing.

4. Use flush() for cleanup

The flush method runs when the input stream ends. Use it to emit final results, close resources, or print summaries.

Conclusion

Streams aren't just a performance optimization — they're what makes CLI tools production-ready. A tool that crashes on large files isn't a tool anyone will trust in their pipeline.

The patterns are simple: read from a stream, transform line by line, write to stdout. Compose transforms like Lego blocks. Support stdin for piping. Use pipeline() for proper error handling.

Your users will never notice that your tool uses streams — they'll just notice that it works, every time, on files of any size.


Wilson Xu builds Node.js CLI tools and publishes them on npm. Follow his work at dev.to/chengyixu.

Top comments (0)