Node.js Streams: The Practical Guide
Streams are why Node can handle 10k+ concurrent connections. Here's how they work.
What Are Streams?
Without Streams (buffering):
File (100MB) → Read ALL into memory → Process → Write ALL to output
Memory usage: 100MB+ 💀
With Streams:
File → Read chunk by chunk → Process each chunk → Write each chunk
Memory usage: ~64KB per chunk ✅
Four Types of Streams
// 1. Readable — Data comes OUT
const fs = require('fs');
const readable = fs.createReadStream('bigfile.txt');
// 2. Writable — Data goes IN
const writable = fs.createWriteStream('output.txt');
// 3. Duplex — Both directions (TCP socket)
const net = require('net');
const socket = new net.Socket(); // Readable AND Writable
// 4. Transform — Modify data in transit
const { Transform } = require('stream');
const uppercase = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});
Basic Usage
Reading a File
const fs = require('fs');
const readable = fs.createReadStream('large-file.txt', {
encoding: 'utf8', // Auto-convert chunks to strings
highWaterMark: 64 * 1024 // Chunk size: 64KB (default)
});
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
// Process chunk here
});
readable.on('end', () => {
console.log('Done reading');
});
readable.on('error', (err) => {
console.error('Error:', err);
});
Writing a File
const fs = require('fs');
const writable = fs.createWriteStream('output.txt');
writable.write('Hello, ');
writable.write('world!\n');
writable.end(); // Close the stream
writable.on('finish', () => {
console.log('All writes completed');
});
writable.on('error', (err) => {
console.error('Error:', err);
});
Piping (The Magic)
const fs = require('fs');
const zlib = require('zlib');
// One-liner that handles everything!
fs.createReadStream('large-file.txt')
.pipe(zlib.createGzip()) // Compress on the fly
.pipe(fs.createWriteStream('file.txt.gz'));
// No memory issues regardless of file size!
// Back-pressure handled automatically!
// HTTP response streaming:
const http = require('http');
http.createServer((req, res) => {
fs.createReadStream('video.mp4')
.pipe(res); // Stream video to browser without loading it all into RAM
}).listen(3000);
Pipe vs pipeline
// pipe (older, doesn't handle errors well)
readable.pipe(writable);
// pipeline (newer, recommended)
const { pipeline } = require('stream/promises'); // Or require('stream').pipeline
async function processFile() {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.gz'),
(err) => {
if (err) console.error('Pipeline failed:', err);
else console.log('Pipeline succeeded');
}
);
}
Custom Transform Streams
const { Transform } = require('stream');
// CSV to JSON converter
class CsvToJson extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (!line.trim()) continue;
if (!this.headers) {
this.headers = line.split(',');
continue;
}
const values = line.split(',');
const obj = {};
this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
this.push(obj); // Output as JS objects
}
callback();
}
}
// Usage:
fs.createReadStream('data.csv')
.pipe(new CsvToJson())
.on('data', (row) => {
console.log(row); // { name: "Alex", age: "30", city: "NYC" }
});
Practical Examples
Log Rotator (Read + Write + Rotate)
const fs = require('fs');
const path = require('path');
function createLogRotator(maxSizeBytes = 5 * 1024 * 1024) {
let currentSize = 0;
let fileNum = 1;
function getOutputPath() {
return path.join(__dirname, `logs/app-${fileNum}.log`);
}
const logStream = new Writable({
write(chunk, encoding, callback) {
currentSize += chunk.length;
if (currentSize > maxSizeBytes) {
fileNum++;
currentSize = 0;
// Could emit event to switch file here
}
fs.appendFileSync(getOutputPath(), chunk);
callback();
}
});
return logStream;
}
process.stdout.pipe(createLogRotator());
API Response Stream
const http = require('http');
http.createServer(async (req, res) => {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked' // Streaming!
});
// Stream database results as they come
const dbStream = getDatabaseCursor(); // Hypothetical cursor
res.write('[\n');
let first = true;
for await (const row of dbStream) {
if (!first) res.write(',\n');
first = false;
res.write(JSON.stringify(row));
}
res.write('\n]');
res.end();
}).listen(3000);
// Client receives data incrementally instead of waiting for all results!
Real-time File Watcher
const fs = require('fs');
const readline = require('readline');
function tailFile(filePath) {
const stream = fs.createReadStream(filePath, {
start: 0,
encoding: 'utf8'
});
const rl = readline.createInterface({
input: stream,
crlfDelay: Infinity
});
rl.on('line', (line) => {
// Process each line as it's written to the file
const logEntry = JSON.parse(line);
if (logEntry.level === 'error') {
alertTeam(logEntry);
}
});
}
tailFile('/var/log/app.log');
Memory Comparison
// ❌ Buffering (don't do this with large files!)
const fs = require('fs');
const data = fs.readFileSync('huge-file.json'); // Loads ENTIRE file into RAM
const parsed = JSON.parse(data); // Duplicates in memory
const result = parsed.filter(item => item.active); // Another copy!
// Memory: 3x the file size
// ✅ Streaming (constant memory!)
const JSONStream = require('JSONStream');
fs.createReadStream('huge-file.json')
.pipe(JSONStream.parse('*')) // Parse item by item
.pipe(new FilterTransform({ // Filter without buffering all
transform(item, enc, cb) {
if (item.active) cb(null, item);
else cb(); // Skip
}
}));
// Memory: ~64KB regardless of file size!
Quick Reference
| Method | Use Case |
|---|---|
.pipe(dest) |
Connect streams |
pipeline(...) |
Error-safe piping |
Readable.from(iterable) |
Create readable from array/async iterator |
new Transform() |
Modify data in-flight |
objectMode: true |
Pass objects instead of buffers |
highWaterMark |
Control buffer size |
.on('data') |
Handle incoming chunks |
.on('end') |
Stream finished |
.on('error') |
Handle errors |
What's your favorite use case for Node.js streams?
Follow @armorbreak for more Node.js content.
Top comments (0)