Streams in Node.js: The Complete Guide (2026)
Streams are Node.js's superpower. They let you handle massive data without running out of memory.
Why Streams Matter
The memory problem:
→ Reading a 2GB log file into memory = CRASH (or swap thrashing)
→ Processing 100,000 database rows at once = OOM
→ Uploading a 500MB video file = out of memory
The stream solution:
→ Process data CHUNK by CHUNK (usually 64KB at a time)
→ Memory usage stays constant regardless of file size
→ 2GB file? Same memory as 2KB file with streams!
Real-world uses:
→ Reading/writing large files
→ Processing API responses ( paginated data)
→ Real-time data (WebSocket, server-sent events)
→ File uploads/downloads
→ Log processing
→ Data pipelines (transform, compress, encrypt in one flow)
Four Stream Types
// 1. Readable — data comes OUT
const fs = require('fs');
const readStream = fs.createReadStream('large-file.json');
// 2. Writable — data goes IN
const writeStream = fs.createWriteStream('output.txt');
// 3. Duplex — data goes BOTH ways (TCP socket, WebSocket)
const net = require('net');
const socket = new net.Socket(); // Read AND write
// 4. Transform — reads, modifies, writes (compression, encryption)
const { Transform } = require('stream');
const upperCase = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});
Reading Files Efficiently
const fs = require('fs');
// ❌ BAD: Reads entire file into memory
const data = fs.readFileSync('huge.log', 'utf8');
const lines = data.split('\n');
// 2GB file = 2GB in RAM = crash!
// ✅ GOOD: Streams — processes chunk by chunk
const readStream = fs.createReadStream('huge.log');
readStream.on('data', (chunk) => {
// chunk is a Buffer, usually ~64KB
// Process this chunk, then it's garbage collected
processChunk(chunk);
});
readStream.on('end', () => {
console.log('File processed completely');
});
readStream.on('error', (err) => {
console.error('Read error:', err);
});
// ✅ Better: Use pipeline (handles errors and cleanup automatically)
const { pipeline } = require('stream/promises');
await pipeline(
fs.createReadStream('input.txt'),
processStream, // Your transform
fs.createWriteStream('output.txt')
);
// Automatically handles: backpressure, errors, cleanup
// If any stream errors → all streams are properly destroyed
// Reading with async iteration (cleanest API!)
const rl = require('readline').createInterface({
input: fs.createReadStream('data.csv'),
crlfDelay: Infinity
});
for await (const line of rl) {
const [name, email, age] = line.split(',');
processUser({ name, email, age: parseInt(age) });
}
// Memory: only ONE line in memory at a time!
Writing Files
const fs = require('fs');
// Create write stream
const writeStream = fs.createWriteStream('output.txt', {
flags: 'w', // 'w' = write (overwrite), 'a' = append
encoding: 'utf8',
highWaterMark: 65536 // Internal buffer size (default: 64KB)
});
writeStream.write('First line\n');
writeStream.write('Second line\n');
// Signal end of writing
writeStream.end('Last line\n');
writeStream.on('finish', () => {
console.log('All data written to disk');
});
writeStream.on('error', (err) => {
console.error('Write error:', err);
});
// Backpressure handling (important for fast producers!)
const fastSource = getFastDataStream();
const slowWriter = fs.createWriteStream('slow-destination.txt');
function writeData() {
let canWrite = true;
while (canWrite && (chunk = fastSource.read())) {
canWrite = slowWriter.write(chunk);
}
if (!canWrite) {
slowWriter.once('drain', writeData); // Wait for buffer to flush
}
}
// ✅ Easier: use pipe (handles backpressure automatically!)
fastSource.pipe(slowWriter);
Pipe: Connect Streams
// pipe() connects readable → writable (handles backpressure automatically!)
const fs = require('fs');
const zlib = require('zlib');
// File copy with pipe
fs.createReadStream('input.txt')
.pipe(fs.createWriteStream('copy.txt'));
// Compress file
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
// Decompress file
fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('output.txt'));
// HTTP file download
const https = require('https');
https.get('https://example.com/large-file.zip', (response) => {
response.pipe(fs.createWriteStream('download.zip'));
});
// Upload file to server
fs.createReadStream('local-file.csv')
.pipe(uploadToServer('/api/upload'));
// Chain multiple transforms!
fs.createReadStream('access.log')
.pipe(split()) // Split into lines
.pipe(parseJson()) // Parse each line as JSON
.pipe(filterErrors()) // Keep only error entries
.pipe(formatOutput()) // Format for display
.pipe(fs.createWriteStream('errors-only.log'));
Transform Streams: Custom Processing
const { Transform, pipeline } = require('stream');
// Custom transform: Process data as it flows through
const csvToJson = new Transform({
objectMode: true, // Process objects instead of buffers
transform(chunk, encoding, callback) {
const line = chunk.toString().trim();
if (!line) return callback();
const [id, name, email] = line.split(',');
callback(null, { id: parseInt(id), name, email }); // Push downstream
}
});
const jsonToString = new Transform({
objectMode: true,
transform(user, encoding, callback) {
callback(null, JSON.stringify(user) + '\n'); // Stringify
}
});
// Use: CSV → JSON in one pipeline
pipeline(
fs.createReadStream('users.csv'),
csvToJson,
jsonToString,
fs.createWriteStream('users.jsonl')
);
Practical Examples
Example 1: Log File Analyzer
const fs = require('fs');
const { Transform, pipeline } = require('stream');
// Count HTTP status codes from access log
class StatusCodeCounter extends Transform {
constructor() {
super({ objectMode: true });
this.counts = {};
}
_transform(line, encoding, callback) {
const match = line.toString().match(/\s(\d{3})\s/);
if (match) {
const code = match[1];
this.counts[code] = (this.counts[code] || 0) + 1;
}
callback();
}
_flush(callback) {
// Called when stream ends — output the summary
const summary = Object.entries(this.counts)
.sort((a, b) => b[1] - a[1])
.map(([code, count]) => `${code}: ${count}`)
.join('\n');
this.push(summary);
callback();
}
}
const { createInterface } = require('readline');
async function analyzeLog(filePath) {
const counter = new StatusCodeCounter();
await pipeline(
fs.createReadStream(filePath),
createInterface({ input: fs.createReadStream(filePath) }),
counter,
process.stdout // Print summary to terminal
);
}
analyzeLog('/var/log/nginx/access.log');
// Output:
// 200: 15432
// 301: 2891
// 404: 1234
// 500: 56
Example 2: API Response Stream
// Stream large API response instead of loading all into memory
const https = require('https');
const { pipeline } = require('stream');
async function fetchLargeApiData(url) {
return new Promise((resolve, reject) => {
https.get(url, (response) => {
if (response.statusCode !== 200) {
reject(new Error(`HTTP ${response.statusCode}`));
return;
}
const chunks = [];
// Option 1: Collect all chunks (if you need the full data)
response.on('data', (chunk) => chunks.push(chunk));
response.on('end', () => {
const data = Buffer.concat(chunks).toString();
resolve(JSON.parse(data));
});
// Option 2: Pipe directly to file (for massive responses)
// response.pipe(fs.createWriteStream('api-data.json'));
});
});
}
// Better: Process paginated API with streams
class PaginatedApiStream extends Transform {
constructor(fetchPage) {
super({ objectMode: true });
this.fetchPage = fetchPage;
this.page = 1;
this.hasMore = true;
}
async _flush(callback) {
while (this.hasMore) {
const items = await this.fetchPage(this.page);
if (items.length === 0) {
this.hasMore = false;
break;
}
for (const item of items) {
this.push(item);
}
this.page++;
}
callback();
}
}
Example 3: Real-time File Watch + Process
const fs = require('fs');
const { watch } = fs;
// Watch file for changes and process new lines
function watchLogFile(filePath) {
let lastPosition = 0;
// Initial read to get current position
const stats = fs.statSync(filePath);
lastPosition = stats.size;
watch(filePath, (eventType) => {
if (eventType !== 'change') return;
const stream = fs.createReadStream(filePath, {
start: lastPosition, // Read only NEW content
});
stream.on('data', (chunk) => {
const lines = chunk.toString().split('\n').filter(Boolean);
lines.forEach(line => processNewLine(line));
lastPosition += chunk.length;
});
});
console.log(`Watching ${filePath} for new entries...`);
}
Common Pitfalls
// ❌ Pitfall 1: Not handling errors
readStream.pipe(writeStream);
// If readStream errors → writeStream is NOT destroyed (memory leak!)
// Fix: Use pipeline() instead of pipe()
// ❌ Pitfall 2: Ignoring backpressure
function badWrite(stream, data) {
data.forEach(item => stream.write(item)); // Ignores drain!
}
// Fix: Check write() return value + handle 'drain' event
// ❌ Pitfall 3: Unclosed streams
const stream = fs.createReadStream('file.txt');
stream.on('data', handler);
// If you abort early, stream stays open (file handle leak!)
// Fix: stream.destroy() when done
// ❌ Pitfall 4: Mixing callbacks and promises
stream.on('data', handler); // Events
await pipeline(...); // Promise
// Don't mix on same stream! Use one approach.
// ❌ Pitfall 5: Forgetting encoding
readStream.setEncoding('utf8'); // If not set, chunks are Buffers!
Have you ever hit a memory limit with large files? Streams are the answer.
Follow @armorbreak for more practical developer guides.
Top comments (0)