Use Node.js Streams to Build CLI Tools That Handle Massive Files
Most CLI tutorials show you how to read an entire file into memory, process it, and write the output. This works fine for small files. But when your tool needs to handle a 2 GB log file or process a continuous data feed, loading everything into memory will crash your process or make it unbearably slow.
Node.js streams solve this problem elegantly. They let you process data piece by piece, keeping memory usage constant regardless of input size. In this article, we'll build a CLI tool that processes massive log files using streams — and learn patterns you can apply to any data-processing CLI.
The Problem with readFile
// This loads the ENTIRE file into memory
import { readFile } from 'node:fs/promises';
const data = await readFile('server.log', 'utf-8');
const lines = data.split('\n');
const errors = lines.filter(line => line.includes('ERROR'));
For a 10 KB file? Fine. For a 2 GB production log? Your process will consume 2+ GB of RAM, and split('\n') will temporarily double that. On a CI runner with 512 MB of memory, this simply crashes.
Streams Process Data in Chunks
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
const stream = createReadStream('server.log', 'utf-8');
const rl = createInterface({ input: stream });
let errorCount = 0;
for await (const line of rl) {
if (line.includes('ERROR')) {
errorCount++;
console.log(line);
}
}
console.error(`Total errors: ${errorCount}`);
This processes the same 2 GB file using roughly 64 KB of memory. The readline interface reads one line at a time, and the garbage collector cleans up processed lines immediately.
What We're Building
logstream — a CLI tool that:
- Processes log files of any size via streams
- Filters by log level (ERROR, WARN, INFO, DEBUG)
- Extracts time ranges
- Counts occurrences and outputs statistics
- Supports piped input (
cat log | logstream filter --level error)
Project Setup
mkdir logstream && cd logstream
npm init -y
npm install commander chalk
{
"name": "logstream-cli",
"version": "1.0.0",
"type": "module",
"bin": { "logstream": "./bin/logstream.js" }
}
Building the Stream Pipeline
The core of our tool is a composable stream pipeline. Each operation is a transform stream that can be chained:
// lib/transforms.js
import { Transform } from 'node:stream';
export function createLevelFilter(level) {
const pattern = new RegExp(`\\b${level}\\b`, 'i');
return new Transform({
objectMode: true,
transform(line, encoding, callback) {
if (pattern.test(line)) {
this.push(line);
}
callback();
}
});
}
export function createTimeRangeFilter(start, end) {
const startTime = start ? new Date(start).getTime() : 0;
const endTime = end ? new Date(end).getTime() : Infinity;
// Common log timestamp patterns
const patterns = [
/(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2})/, // ISO-ish
/\[(\d{2}\/\w{3}\/\d{4}:\d{2}:\d{2}:\d{2})/, // Apache
];
return new Transform({
objectMode: true,
transform(line, encoding, callback) {
for (const pattern of patterns) {
const match = line.match(pattern);
if (match) {
const timestamp = new Date(match[1]).getTime();
if (timestamp >= startTime && timestamp <= endTime) {
this.push(line);
}
return callback();
}
}
// No timestamp found — pass through
this.push(line);
callback();
}
});
}
export function createCounter() {
let count = 0;
const counter = new Transform({
objectMode: true,
transform(line, encoding, callback) {
count++;
this.push(line);
callback();
},
flush(callback) {
// Emit count when stream ends
process.stderr.write(`\nProcessed ${count} matching lines\n`);
callback();
}
});
counter.getCount = () => count;
return counter;
}
export function createJsonFormatter() {
let first = true;
return new Transform({
objectMode: true,
transform(line, encoding, callback) {
const prefix = first ? '[\n' : ',\n';
first = false;
this.push(prefix + JSON.stringify(line));
callback();
},
flush(callback) {
this.push(first ? '[]' : '\n]');
callback();
}
});
}
The Line Splitter
We need to convert the raw byte stream into individual lines:
// lib/line-splitter.js
import { Transform } from 'node:stream';
export function createLineSplitter() {
let buffer = '';
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete last line in buffer
for (const line of lines) {
if (line.trim()) {
this.push(line);
}
}
callback();
},
flush(callback) {
if (buffer.trim()) {
this.push(buffer);
}
callback();
}
});
}
Wiring Up the CLI
#!/usr/bin/env node
// bin/logstream.js
import { program } from 'commander';
import { createReadStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';
import { createLineSplitter } from '../lib/line-splitter.js';
import {
createLevelFilter,
createTimeRangeFilter,
createCounter,
createJsonFormatter,
} from '../lib/transforms.js';
program
.name('logstream')
.description('Process log files using streams — handles any file size');
program
.command('filter')
.description('Filter log lines by level and/or time range')
.argument('[file]', 'Log file path (or pipe via stdin)')
.option('-l, --level <level>', 'Filter by log level (ERROR, WARN, INFO, DEBUG)')
.option('--after <datetime>', 'Only show entries after this time')
.option('--before <datetime>', 'Only show entries before this time')
.option('--json', 'Output as JSON array')
.option('--count', 'Only output the count of matching lines')
.action(async (file, options) => {
// Input: file or stdin
const input = file
? createReadStream(file, 'utf-8')
: process.stdin;
// Build transform pipeline
const transforms = [createLineSplitter()];
if (options.level) {
transforms.push(createLevelFilter(options.level));
}
if (options.after || options.before) {
transforms.push(createTimeRangeFilter(options.after, options.before));
}
const counter = createCounter();
transforms.push(counter);
if (options.count) {
// Consume silently, then print count
transforms.push(new (await import('node:stream')).Transform({
objectMode: true,
transform(chunk, enc, cb) { cb(); },
flush(cb) {
process.stdout.write(counter.getCount().toString() + '\n');
cb();
}
}));
} else if (options.json) {
transforms.push(createJsonFormatter());
} else {
// Default: print each line
transforms.push(new (await import('node:stream')).Transform({
objectMode: true,
transform(line, enc, cb) {
process.stdout.write(line + '\n');
cb();
}
}));
}
try {
await pipeline(input, ...transforms);
} catch (err) {
if (err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
console.error(`Error: ${err.message}`);
process.exit(1);
}
}
});
program
.command('stats')
.description('Show statistics about a log file')
.argument('[file]', 'Log file path (or pipe via stdin)')
.action(async (file) => {
const input = file
? createReadStream(file, 'utf-8')
: process.stdin;
const stats = { total: 0, error: 0, warn: 0, info: 0, debug: 0 };
const splitter = createLineSplitter();
const analyzer = new (await import('node:stream')).Transform({
objectMode: true,
transform(line, enc, cb) {
stats.total++;
if (/\bERROR\b/i.test(line)) stats.error++;
else if (/\bWARN/i.test(line)) stats.warn++;
else if (/\bINFO\b/i.test(line)) stats.info++;
else if (/\bDEBUG\b/i.test(line)) stats.debug++;
cb();
},
flush(cb) {
console.log(JSON.stringify(stats, null, 2));
cb();
}
});
await pipeline(input, splitter, analyzer);
});
program.parse();
Usage Examples
# Filter errors from a large log file
logstream filter server.log --level error
# Time range query
logstream filter app.log --after "2026-03-01" --before "2026-03-15"
# Pipe from other commands
docker logs myapp | logstream filter --level warn --json
# Get error count for CI checks
ERROR_COUNT=$(logstream filter server.log --level error --count)
if [ "$ERROR_COUNT" -gt 0 ]; then
echo "Found $ERROR_COUNT errors"
exit 1
fi
# Combine filters
logstream filter server.log --level error --after "2026-03-19T10:00:00" --json > errors.json
# Stats overview
logstream stats server.log
# { "total": 42891, "error": 23, "warn": 156, "info": 41002, "debug": 1710 }
Memory Comparison
| Approach | 100 MB file | 1 GB file | 10 GB file |
|---|---|---|---|
readFile + .split()
|
~200 MB RAM | ~2 GB RAM | Crashes |
| Streams | ~2 MB RAM | ~2 MB RAM | ~2 MB RAM |
The stream approach uses constant memory regardless of file size. That's the entire point.
Key Stream Patterns for CLI Tools
1. Always support stdin
const input = file ? createReadStream(file) : process.stdin;
This lets users pipe data into your tool. It's the Unix way.
2. Use pipeline() for error handling
import { pipeline } from 'node:stream/promises';
await pipeline(source, transform1, transform2, destination);
pipeline automatically destroys all streams if any stream errors. Without it, you'll leak file handles and memory.
3. Use object mode for line-by-line processing
new Transform({ objectMode: true, ... })
Object mode lets transforms pass JavaScript objects (like strings) instead of raw buffers. It's essential for line-based processing.
4. Use flush() for cleanup
The flush method runs when the input stream ends. Use it to emit final results, close resources, or print summaries.
Conclusion
Streams aren't just a performance optimization — they're what makes CLI tools production-ready. A tool that crashes on large files isn't a tool anyone will trust in their pipeline.
The patterns are simple: read from a stream, transform line by line, write to stdout. Compose transforms like Lego blocks. Support stdin for piping. Use pipeline() for proper error handling.
Your users will never notice that your tool uses streams — they'll just notice that it works, every time, on files of any size.
Wilson Xu builds Node.js CLI tools and publishes them on npm. Follow his work at dev.to/chengyixu.
Top comments (0)