Node.js Streams: The Practical Guide (2026)
Streams are one of Node.js's most powerful features. Here's how to actually use them.
Why Streams Matter
Without streams:
read entire file into memory → process → write entire result
Memory usage: 100MB for a 100MB file
Problem: What if the file is 10GB?
With streams:
read chunk by chunk → process each chunk → write chunk by chunk
Memory usage: ~64KB regardless of file size
Result: Process files larger than RAM
The 4 Stream Types
| Type | Used For | Example |
|---|---|---|
| Readable | Reading data |
fs.createReadStream(), HTTP requests |
| Writable | Writing data |
fs.createWriteStream(), HTTP responses |
| Duplex | Both reading & writing | TCP sockets, WebSocket |
| Transform | Modify data between read/write |
zlib, CSV parsing |
Basic Example: File Copy
const fs = require('fs');
// ❌ BAD — Loads everything into memory
const data = fs.readFileSync('large-file.txt');
fs.writeFileSync('copy.txt', data);
// ✅ GOOD — Streams in chunks (64KB memory)
const readStream = fs.createReadStream('large-file.txt');
const writeStream = fs.createWriteStream('copy.txt');
readStream.pipe(writeStream);
// That's it. One line.
pipe() — The Magic Method
readableStream.pipe(writableStream);
What it does:
- Listens for
dataevents from readable - Writes each chunk to writable
- Handles backpressure (slows down if writable can't keep up)
- Closes writable when readable ends
// Chaining pipes (Unix philosophy in JavaScript)
fs.createReadStream('input.txt')
.pipe(zlib.createGzip()) // Compress
.pipe(crypto.createCipheriv(...)) // Encrypt
.pipe(fs.createWriteStream('output.gz.enc')); // Write
// Data flows left → right through each transform
Real-World Use Cases
1. Processing Large Files (CSV Example)
const fs = require('fs');
const { pipeline } = require('stream/promises'); // Node 18+
const { Transform } = require('stream');
// A transform that processes CSV rows
class CSVParse extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.buffer = '';
this.header = null;
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// Keep last incomplete line in buffer
this.buffer = lines.pop();
for (const line of lines) {
if (!line.trim()) continue;
if (!this.header) {
this.header = line.split(',');
continue;
}
const values = line.split(',');
const row = {};
this.header.forEach((h, i) => row[h] = values[i]);
this.push(row); // Output object mode
}
callback();
}
}
async function processLargeCSV(inputPath) {
const results = [];
await pipeline(
fs.createReadStream(inputPath),
new CSVParse(),
new Transform({
objectMode: true,
transform(row, _enc, callback) {
// Process each row independently
if (Number(row.price) > 1000) {
results.push(row);
}
callback();
}
})
);
console.log(`Found ${results.length} expensive items`);
return results;
}
2. HTTP Request Streaming
const https = require('https');
const fs = require('fs');
// Download large file with progress
function download(url, destPath) {
return new Promise((resolve, reject) => {
const file = fs.createWriteStream(destPath);
let downloadedBytes = 0;
https.get(url, (response) => {
if (response.statusCode >= 300 && response.statusCode < 400 && response.headers.location) {
// Handle redirect
download(response.headers.location, destPath).then(resolve).catch(reject);
return;
}
if (response.statusCode !== 200) {
reject(new Error(`Status: ${response.statusCode}`));
return;
}
const totalSize = parseInt(response.headers['content-length'], 10);
response.on('data', (chunk) => {
downloadedBytes += chunk.length;
if (totalSize) {
const percent = ((downloadedBytes / totalSize) * 100).toFixed(1);
process.stdout.write(`\rDownloaded: ${percent}% (${downloadedBytes}/${totalSize})`);
}
});
response.pipe(file);
file.on('finish', () => {
file.close();
console.log('\nDownload complete!');
resolve();
});
}).on('error', reject);
});
}
await download('https://example.com/large-file.zip', './file.zip');
3. API Response Streaming
const express = require('express');
const app = express();
// Stream JSON array as NDJSON (one object per line)
app.get('/api/users-stream', (_req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
// Simulate streaming from database
const users = [
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' },
// ... thousands more
];
// Write header
res.write('[\n');
users.forEach((user, index) => {
const isLast = index === users.length - 1;
const json = JSON.stringify(user) + (isLast ? '\n]' : ',\n');
res.write(json);
});
res.end();
});
// Client can parse incrementally:
// Each line arrives as soon as it's written
// No need to wait for full response
4. Log Aggregation (Multiple Sources → One File)
const fs = require('fs');
const net = require('net');
// Server that accepts log lines from multiple sources
const server = net.createServer((socket) => {
socket
.pipe(splitStream()) // Split by newline
.pipe(timestampTransform()) // Add timestamp
.pipe(formatTransform()) // Format nicely
.pipe(fs.createWriteStream('all-logs.log', { flags: 'a' })); // Append
});
server.listen(4000, () => console.log('Log aggregator on :4000'));
Error Handling in Streams
const fs = require('fs');
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
// ❌ Errors are silent by default!
readStream.pipe(writeStream);
// ✅ Handle errors properly
readStream
.on('error', (err) => {
console.error('Read error:', err.message);
writeStream.destroy(); // Don't leave writer hanging
})
.pipe(writeStream)
.on('error', (err) => {
console.error('Write error:', err.message);
})
.on('finish', () => {
console.log('Done!');
});
// Or use pipeline (Node 18+):
const { pipeline } = require('stream/promises');
try {
await pipeline(
fs.createReadStream('input.txt'),
myTransform,
fs.createWriteStream('output.txt')
);
console.log('Pipeline completed successfully');
} catch (err) {
console.error('Pipeline failed:', err.message);
}
Backpressure Explained
// Backpressure = when writable is slower than readable
// Good news: pipe() handles this automatically!
// Manual backpressure:
function manualBackpressure(readable, writable) {
readable.on('data', (chunk) => {
const canWrite = writable.write(chunk);
if (!canWrite) {
// Pause readable until writable drains
readable.pause();
writable.once('drain', () => {
readable.resume();
});
}
});
}
// But seriously, just use .pipe() or pipeline()
Creating Your Own Transform
const { Transform } = require('stream');
// Uppercase transform
class UpperCase extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
// JSON parser (handles multi-chunk objects)
class JSONParser extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
try {
// Try to parse complete objects from buffer
while (this.buffer.length > 0) {
const obj = JSON.parse(this.buffer);
this.push(obj);
this.buffer = ''; // Consumed successfully
}
} catch (e) {
// Incomplete JSON, wait for more data
}
callback();
}
}
// Usage
fs.createReadStream('data.jsonl') // One JSON object per line
.pipe(new JSONParser())
.on('data', (obj) => {
console.log('Parsed:', obj);
});
Stream vs Buffer: When to Use Which
Use BUFFER when:
→ Small data (< 10MB)
→ Need random access
→ Data must be fully loaded before processing
→ Example: Config files, API responses under 1MB
Use STREAM when:
→ Large or unknown size data
→ Processing can start before all data arrives
→ Memory efficiency matters
→ Example: File processing, API proxies, log aggregation, real-time data
Quick Reference
// Create streams
fs.createReadStream(path, { highWaterMark: 64 * 1024 }) // Chunk size
fs.createWriteStream(path, { flags: 'a' }) // Append mode
new stream.PassThrough() // Identity transform
new stream.Readable({ read(_size) { this.push(data); } }) // Custom readable
new stream.Writable({ write(chunk, _enc, cb) { cb(); } }) // Custom writable
// Events
stream.on('data', (chunk) => {}) // New chunk available
stream.on('end', () => {}) // No more data
stream.on('error', (err) => {}) // Something went wrong
stream.on('finish', () => {}) // Write complete
stream.on('close', () => {}) // Stream destroyed
stream.on('drain', () => {}) // Safe to write again (backpressure)
// Methods
stream.pause() // Pause reading
stream.resume() // Resume reading
stream.destroy() // Close and cleanup
stream.pipe(destination) // Connect streams
stream.unpipe(destination) // Disconnect
// Async iteration (Node 18+) — cleanest API!
for await (const chunk of readableStream) {
processChunk(chunk);
}
Are you using streams enough? Or still loading everything into memory?
Follow @armorbreak for more practical Node.js guides.
Top comments (0)