Node.js Streams: The Practical Guide
Streams are one of Node.js's most powerful features. Most developers avoid them — here's how to actually use them.
What Are Streams?
Without streams:
const data = fs.readFileSync('huge-10gb-file.csv'); // 😵 10GB in RAM!
process(data);
With streams:
fs.createReadStream('huge-10gb-file.csv')
.pipe(transform)
.pipe(writeStream);
// Uses ~64KB of RAM regardless of file size!
The Four Stream Types
// 1. Readable — data comes OUT
const readable = fs.createReadStream('input.txt');
// 2. Writable — data goes IN
const writable = fs.createWriteStream('output.txt');
// 3. Duplex — both ways (like a TCP socket)
const duplex = net.createConnection(port, host);
// 4. Transform — modify data as it passes through
const transform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
Reading Files (Readable)
const fs = require('fs');
// ❌ Don't read large files with readFile
const data = fs.readFileSync('big.csv', 'utf8'); // Blocks + loads all in memory
// ✅ Use createReadStream
const readStream = fs.createReadStream('big.csv', {
encoding: 'utf8',
highWaterMark: 64 * 1024, // 64KB chunks (default)
});
readStream.on('data', (chunk) => {
// chunk is a string (because encoding: 'utf8')
// Each chunk is ~64KB
process.stdout.write(`Got chunk: ${chunk.length} chars\n`);
});
readStream.on('end', () => {
console.log('Done reading!');
});
readStream.on('error', (err) => {
console.error('Read error:', err);
});
Writing Files (Writable)
const fs = require('fs');
// ❌ Don't write large files with writeFile
fs.writeFileSync('big-output.txt', hugeString); // Blocks!
// ✅ Use createWriteStream
const writeStream = fs.createWriteStream('big-output.txt');
writeStream.write('First line\n');
writeStream.write('Second line\n');
writeStream.end('Last line\n');
writeStream.on('finish', () => {
console.log('Done writing!');
});
writeStream.on('error', (err) => {
console.error('Write error:', err);
});
// Backpressure handling
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
// pipe() handles backpressure automatically!
readStream.pipe(writeStream);
Pipe: The Magic Method
// pipe() connects streams — handles backpressure automatically
// No need to manually manage data flow!
// File copy
fs.createReadStream('source.mp4')
.pipe(fs.createWriteStream('copy.mp4'));
// HTTP response streaming
const server = http.createServer((req, res) => {
fs.createReadStream('video.mp4').pipe(res);
// Browser receives data as it's read — no 4GB memory spike!
});
// Process CSV line by line
const { pipeline } = require('stream/promises');
await pipeline(
fs.createReadStream('data.csv'),
// Transform each line
new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const line = chunk.toString().trim();
if (line) {
const [name, email] = line.split(',');
this.push({ name, email });
}
callback();
}
}),
// Write processed data
new Writable({
objectMode: true,
write(record, encoding, callback) {
db.insert(record, callback);
}
})
);
Transform Streams (Custom Processing)
const { Transform } = require('stream');
// Example 1: CSV to JSON
class CsvToJson extends Transform {
constructor(options = {}) {
super({ ...options, objectMode: true });
this.headers = null;
}
_transform(chunk, encoding, callback) {
const line = chunk.toString().trim();
if (!line) return callback();
const values = line.split(',');
if (!this.headers) {
this.headers = values; // First line is headers
return callback();
}
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i]?.trim() || '';
});
this.push(obj);
callback();
}
}
// Usage
fs.createReadStream('users.csv')
.pipe(new CsvToJson())
.on('data', (user) => console.log(user));
// Output: { name: 'Alice', email: 'alice@example.com', role: 'admin' }
// Example 2: Compress on the fly
const { createGzip } = require('zlib');
fs.createReadStream('large-file.log')
.pipe(createGzip()) // Transform: compress
.pipe(fs.createWriteStream('large-file.log.gz'));
// Reads → compresses → writes — all streaming!
// Example 3: Filter stream
class FilterStream extends Transform {
constructor(predicate) {
super({ objectMode: true });
this.predicate = predicate;
}
_transform(chunk, encoding, callback) {
if (this.predicate(chunk)) {
this.push(chunk); // Only pass matching items
}
callback();
}
}
// Usage: filter users who are active
fs.createReadStream('users.csv')
.pipe(new CsvToJson())
.pipe(new FilterStream(user => user.active === 'true'))
.pipe(new JsonToStringArray())
.pipe(fs.createWriteStream('active-users.json'));
Async Iterators (Modern Approach)
// Node.js 10+ supports async iteration over streams!
// Much cleaner than event listeners
async function processLogFile(filePath) {
const readStream = fs.createReadStream(filePath, { encoding: 'utf8' });
for await (const chunk of readStream) {
const lines = chunk.split('\n');
for (const line of lines) {
if (line.includes('ERROR')) {
console.error(line);
}
}
}
}
// With pipeline (error handling + cleanup)
const { pipeline } = require('stream/promises');
async function processCsv(inputPath, outputPath) {
await pipeline(
fs.createReadStream(inputPath),
new CsvToJson(),
new FilterStream(user => user.active === 'true'),
fs.createWriteStream(outputPath)
);
console.log('Processing complete!');
}
// pipeline() handles:
// - Backpressure automatically
// - Error propagation (if any stream fails, all are cleaned up)
// - Resource cleanup (closes all streams on error)
Real-World Use Cases
// 1. Large file processing (CSV, JSON, logs)
// Process files larger than available RAM
// 2. HTTP proxying
http.createServer((req, res) => {
http.get('http://api-backend.com' + req.url, (backendRes) => {
backendRes.pipe(res); // Stream response directly to client
});
});
// 3. Database streaming
const queryStream = db.query('SELECT * FROM users').stream();
queryStream.pipe(new CsvToJson())
.pipe(res); // Stream query results to HTTP response
// 4. File upload handling
app.post('/upload', (req, res) => {
req.pipe(fs.createWriteStream(`uploads/${Date.now()}.jpg`));
req.on('end', () => res.json({ success: true }));
});
// 5. Real-time log processing
const logStream = fs.createReadStream('/var/log/app.log', { flags: 'r' });
const tailStream = tailFile('/var/log/app.log'); // Follow new lines
tailStream.pipe(new LogParser())
.pipe(new AlertFilter())
.pipe(new SlackNotifier());
Stream Debugging
// Log what passes through a stream
const { Transform } = require('stream');
class Logger extends Transform {
constructor(name = 'stream') {
super();
this.name = name;
this.bytes = 0;
this.chunks = 0;
}
_transform(chunk, encoding, callback) {
this.bytes += chunk.length;
this.chunks++;
console.log(`[${this.name}] Chunk #${this.chunks}: ${chunk.length} bytes`);
this.push(chunk);
callback();
}
_flush(callback) {
console.log(`[${this.name}] Total: ${this.bytes} bytes in ${this.chunks} chunks`);
callback();
}
}
// Usage: insert between any two streams
readStream
.pipe(new Logger('input'))
.pipe(transform)
.pipe(new Logger('output'))
.pipe(writeStream);
Quick Reference
| Method | Purpose |
|---|---|
stream.pipe(dest) |
Connect streams (auto backpressure) |
stream.on('data', cb) |
Handle each chunk |
stream.on('end', cb) |
Stream finished |
stream.on('error', cb) |
Handle errors |
for await (const chunk of stream) |
Async iteration |
pipeline(...streams) |
Compose with error handling |
Readable.from(array) |
Create from array |
PassThrough |
Stream that passes data unchanged |
Do you use streams in your Node.js apps? What for?
Follow @armorbreak for more Node.js content.
Top comments (0)