Node.js Streams: The Practical Guide
Streams are one of Node's most powerful features. Here's how to use them.
What Are Streams?
No Streams (loads everything into memory):
[File on Disk] → [Read ALL into RAM] → [Process ALL in RAM] → [Write ALL to Output]
With Streams (process piece by piece):
[File on Disk] → [Chunk 1] → [Process] → [Output]
→ [Chunk 2] → [Process] → [Output]
→ [Chunk 3] → [Process] → [Output]
→ ... (constant low memory usage)
Types of Streams
// Readable — data comes OUT
const fs = require('fs');
const readable = fs.createReadStream('big-file.txt');
// Writable — data goes IN
const writable = fs.createWriteStream('output.txt');
// Duplex — both directions (like a TCP socket)
const net = require('net');
// Transform — modify data as it passes through (like zlib)
const zlib = require('zlib');
const gzip = zlib.createGzip();
Basic Usage: File Copy
// ❌ Bad for large files (loads entire file into memory)
const fs = require('fs');
const data = fs.readFileSync('large-file.txt');
fs.writeFileSync('copy.txt', data);
// ✅ Good: Uses streams (constant memory, regardless of file size!)
const { createReadStream, createWriteStream } = require('fs');
const readStream = createReadStream('large-file.txt');
const writeStream = createWriteStream('copy.txt');
readStream.pipe(writeStream); // That's it!
Pipe vs pipeline
const { createReadStream, createWriteStream } = require('fs');
const { pipeline } = require('stream/promises'); // Modern way
const zlib = require('zlib');
// pipe() — old school, doesn't handle errors well
createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(createWriteStream('output.txt.gz'));
// pipeline() — handles errors properly, returns Promise
await pipeline(
createReadStream('input.txt'),
zlib.createGzip(),
createWriteStream('output.txt.gz')
);
console.log('File compressed!');
Transform Stream (Custom Processing)
const { Transform } = require('stream');
// Uppercase transform stream
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// CSV-to-JSON transform
class CsvToJsonTransform extends Transform {
constructor(options) {
super({ ...options, objectMode: true });
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep incomplete line in buffer
lines.forEach(line => {
if (!line) return;
if (!this.headers) {
this.headers = line.split(',');
return;
}
const values = line.split(',');
const obj = {};
this.headers.forEach((h, i) => obj[h.trim()] = values[i]?.trim());
this.push(obj);
});
callback();
}
}
// Usage:
await pipeline(
createReadStream('data.csv'),
new CsvToJsonTransform(),
// Now we get JSON objects! Could write to DB, API, etc.
);
Practical Examples
Process Large JSON File
const { createReadStream } = require('fs');
const { pipeline } = require('stream/promises');
const { JSONParser } = require('stream-json'); // npm install stream-json
let userCount = 0;
await pipeline(
createReadStream('huge-data.json'),
JSONParser(),
// Process each item without loading the whole file
async function* (source) {
for await (const item of source) {
if (item.value?.type === 'user') {
userCount++;
// Process each user individually
await saveToDatabase(item.value);
}
}
}
);
console.log(`Processed ${userCount} users`);
HTTP Request/Response Streaming
const http = require('http');
const fs = require('fs');
// Serve large files without loading into memory
const server = http.createServer((req, res) => {
if (req.url === '/download') {
const filePath = '/path/to/large-file.zip';
res.setHeader('Content-Type', 'application/zip');
res.setHeader('Content-Disposition', 'attachment; filename=file.zip');
const readStream = fs.createReadStream(filePath);
readStream.pipe(res); // Stream directly to response
// Handle errors
readStream.on('error', err => {
console.error('Read error:', err);
res.statusCode = 500;
res.end('Download failed');
});
}
});
server.listen(3000);
Compress HTTP Responses
const http = require('http');
const zlib = require('zlib');
const server = http.createServer((req, res) => {
// Check if client accepts gzip
const acceptEncoding = req.headers['accept-encoding'] || '';
if (acceptEncoding.includes('gzip')) {
res.writeHead(200, { 'Content-Encoding': 'gzip' });
const jsonData = JSON.stringify(largeObject);
// Compress and send in one stream
const gzip = zlib.createGzip();
gzip.end(jsonData);
gzip.pipe(res);
} else {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(largeObject));
}
});
Merge Multiple Files
const { pipeline } = require('stream/promises');
const { createReadStream, createWriteStream } = require('fs');
async function mergeFiles(outputPath, ...inputPaths) {
const writeStream = createWriteStream(outputPath);
for (const inputPath of inputPaths) {
await pipeline(createReadStream(inputPath), writeStream, { end: false });
}
writeStream.end();
}
// Combine part1.txt + part2.txt + part3.txt → merged.txt
await mergeFiles('merged.txt', 'part1.txt', 'part2.txt', 'part3.txt');
Stream Events Reference
const readStream = createReadStream('file.txt');
// Important events:
readStream.on('data', (chunk) => {}); // Received data chunk
readStream.on('end', () => {}); // No more data
readStream.on('error', (err) => {}); // Something went wrong
readStream.on('close', () => {}); // Stream closed
readStream.on('ready', () => {}); // Ready to be read
// Flow control (for backpressure):
readStream.pause(); // Stop emitting data
readStream.resume(); // Resume emitting data
const writeStream = createWriteStream('out.txt');
writeStream.on('drain', () => {}); // Safe to write again (was full)
writeStream.on('finish', () => {}); // All data written
writeStream.on('error', (err) => {}); // Write error
writeStream.on('pipe', (src) => {}); // Source piped in
writeStream.on('unpipe', (src) => {}); // Source unpiped
Quick Decision Guide
| Scenario | Use |
|---|---|
| Read/write large files |
createReadStream / createWriteStream
|
| Chain operations |
pipeline() or .pipe()
|
| Modify data in transit |
Transform stream |
| Compress/decompress |
zlib streams (createGzip, createGunzip) |
| HTTP file download | pipe(response) |
| Parse large JSON/XML |
stream-json or custom Transform |
| Merge multiple sources | Sequential pipeline calls |
| Backpressure handling |
drain event + write() return value |
What's your favorite use case for streams? Share it below!
Follow @armorbreak for more Node.js content.
Top comments (0)