Node.js Streams: The Practical Guide
Processing 10GB files? Don't load them into memory. Use streams.
What Are Streams?
Without streams (loads everything into memory):
File (10GB) → [RAM: 10GB] → Process → [RAM: 10GB] → Output
❌ Memory explosion!
With streams (process chunk by chunk):
File (10GB) → [64KB chunk] → Process → [64KB chunk] → Output
✅ Constant memory usage!
Four Types of Streams
// 1. Readable — data flows OUT
const readable = fs.createReadStream('big-file.txt');
// 2. Writable — data flows IN
const writable = fs.createWriteStream('output.txt');
// 3. Duplex — both in and out (like TCP socket)
const duplex = net.Socket();
// 4. Transform — modify data as it passes through
const transform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
Reading Files
const fs = require('fs');
// ❌ Don't do this for large files:
// const data = fs.readFileSync('huge.csv', 'utf8'); // Loads ALL into RAM
// ✅ Stream instead:
const readStream = fs.createReadStream('huge.csv', {
highWaterMark: 64 * 1024, // 64KB chunks (default)
encoding: 'utf8',
});
let lineCount = 0;
readStream.on('data', (chunk) => {
// Process each chunk as it arrives
const lines = chunk.split('\n');
lineCount += lines.length;
});
readStream.on('end', () => {
console.log(`Total lines: ${lineCount}`);
});
readStream.on('error', (err) => {
console.error('Read error:', err);
});
Writing Files
const writeStream = fs.createWriteStream('output.txt');
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end('Done.');
// Events
writeStream.on('finish', () => console.log('All data written'));
writeStream.on('error', (err) => console.error('Write error:', err));
// Backpressure handling
const canWriteMore = writeStream.write('Some data...');
if (!canWriteMore) {
// Buffer is full! Wait for drain event
writeStream.once('drain', () => {
writeStream.write('More data');
});
}
Pipe — The Magic Operator
// Pipe connects readable → writable automatically!
// Handles backpressure for you!
// Copy a file
fs.createReadStream('input.txt').pipe(fs.createWriteStream('output.txt'));
// Compress a file
const zlib = require('zlib');
fs.createReadStream('log.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('log.txt.gz'));
// HTTP response streaming
const server = http.createServer((req, res) => {
fs.createReadStream('video.mp4').pipe(res);
});
Transform Streams
const { Transform } = require('stream');
// CSV → JSON converter
class CsvToJson extends Transform {
constructor(options) {
super({ ...options, readableObjectMode: true });
this.buffer = '';
this.headers = null;
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep incomplete line
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',');
if (!this.headers) {
this.headers = values;
continue;
}
const obj = {};
this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
this.push(obj);
}
callback();
}
}
// Usage
fs.createReadStream('users.csv')
.pipe(new CsvToJson())
.on('data', (user) => console.log(user));
Practical Examples
Process Large CSV File
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
async function processLargeCsv(inputFile, outputFile) {
const filter = new Transform({
objectMode: true,
transform(row, encoding, callback) {
if (Number(row.price) > 100) {
this.push(JSON.stringify(row) + '\n');
}
callback();
}
});
await pipeline(
fs.createReadStream(inputFile),
new CsvToJson(), // From previous example
filter, // Only expensive items
fs.createWriteStream(outputFile)
);
console.log('Processing complete!');
}
HTTP File Upload Stream
const server = http.createServer((req, res) => {
if (req.method === 'POST' && req.url === '/upload') {
const fileStream = fs.createWriteStream('uploaded.dat');
req.pipe(fileStream);
fileStream.on('finish', () => {
const stats = fs.statSync('uploaded.dat');
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ size: stats.size, message: 'Upload complete' }));
});
}
});
Stream Merge — Multiple Files into One
const { Readable } = require('stream');
function mergeStreams(streams) {
let pass = new PassThrough();
let i = 0;
function next() {
if (i >= streams.length) {
pass.end();
return;
}
streams[i++].pipe(pass, { end: false }).on('end', next);
}
next();
return pass;
}
// Merge multiple log files
const merged = mergeStreams([
fs.createReadStream('app1.log'),
fs.createReadStream('app2.log'),
fs.createReadStream('app3.log'),
]);
merged.pipe(fs.createWriteStream('all-logs.log'));
async/await with Streams
const { pipeline } = require('stream/promises');
// Clean API for piping with error handling
async function compressFile(input, output) {
try {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compressed successfully!');
} catch (err) {
console.error('Compression failed:', err);
}
}
Quick Reference
| Method | Description |
|---|---|
readable.pipe(writable) |
Connect streams (auto backpressure) |
readable.on('data', fn) |
Handle each chunk |
readable.on('end', fn) |
Stream finished |
writable.write(data) |
Write data |
writable.end() |
Close the stream |
pipeline(...streams) |
Pipe with proper error handling |
Transform |
Modify data in-flight |
PassThrough |
Pass data through unchanged |
Have you used streams in production? What for?
Follow @armorbreak for more Node.js content.
Top comments (0)