Node.js Streaming in Production: Backpressure, Pipelines, and Memory-Efficient Processing
Node.js streams are the reason a 512 MB server can process a 10 GB log file without running out of memory. They're also one of the most misunderstood APIs in the ecosystem. Most developers only encounter streams when they break — a process eating 3 GB of RAM to "transform" a file, a download that stalls mysteriously, or an upload that silently drops data.
This guide fixes that. You'll learn how streams actually work, how to use them correctly in production, and how to avoid the traps that cause real outages.
Why Streams Exist
The alternative to streaming is buffering: load the entire input into memory, process it, write the entire output. This works fine at small scale. At production scale it fails in predictable ways:
- Memory exhaustion: A 2 GB CSV file becomes a 2 GB buffer
- Latency spikes: Users wait for the entire payload before seeing the first byte
- Backpressure blindness: Producers generate data faster than consumers can handle it, filling Node.js's internal buffers and eventually the heap
Streams solve all three by processing data in chunks — small, fixed-size pieces that flow through a pipeline from source to destination.
The Four Stream Types
Node.js has four stream abstractions:
| Type | Direction | Examples |
|---|---|---|
Readable |
Source |
fs.createReadStream(), HTTP request bodies, process.stdin
|
Writable |
Sink |
fs.createWriteStream(), HTTP response, process.stdout
|
Duplex |
Both directions | TCP sockets, net.Socket
|
Transform |
Reads in, transforms, writes out |
zlib.createGzip(), CSV parsers, encryption |
Every stream is an EventEmitter. The core events are data, end, error, finish, and drain — but in 2026, you should rarely use event listeners directly.
Readable Streams: Flowing vs Paused Mode
A Readable stream starts in paused mode. Data accumulates in an internal buffer (the highWaterMark, default 16 KB for object mode, 16384 bytes for byte mode). You pull from it by calling .read().
When you attach a data listener or call .resume(), the stream switches to flowing mode — it pushes chunks to you as fast as it can produce them.
import { createReadStream } from 'fs';
// Flowing mode — data pushed to callback
const readable = createReadStream('large-file.csv');
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readable.on('end', () => console.log('Done'));
readable.on('error', (err) => console.error('Stream error:', err));
The problem with raw flowing mode: if your data handler can't keep up, you overflow internal buffers. This is the backpressure problem.
Backpressure: The Core Concept
Backpressure is the signal that a downstream consumer is full and the producer should slow down. In Node.js streams:
-
writable.write(chunk)returnsfalsewhen the internal buffer exceedshighWaterMark - When it returns
false, the producer must stop until thedrainevent fires - If you ignore this, Node.js buffers everything in memory until it crashes
The naive (broken) approach:
// ❌ Ignores backpressure — will OOM on large files
readable.on('data', (chunk) => {
writable.write(chunk); // Never checks return value
});
The correct manual approach:
// ✓ Respects backpressure manually
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause(); // Stop producing
writable.once('drain', () => readable.resume()); // Resume when safe
}
});
readable.on('end', () => writable.end());
This works but it's boilerplate. pipeline() handles it automatically.
pipeline() vs pipe(): Always Use pipeline()
pipe() was the original stream composition API. It has a critical flaw: it does not handle errors correctly. If any stream in the chain emits an error, the other streams are not automatically destroyed. You get resource leaks — open file handles, dangling sockets, memory that never gets freed.
// ❌ pipe() — error handling is your problem
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
createReadStream('input.log')
.pipe(createGzip())
.pipe(createWriteStream('output.log.gz'));
// If createGzip() throws, createReadStream stays open forever
pipeline() (available since Node.js 10, async version since Node.js 15) destroys all streams on error and cleans up resources:
// ✓ pipeline() — automatic cleanup on error
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
async function compressFile(input, output) {
await pipeline(
createReadStream(input),
createGzip(),
createWriteStream(output)
);
console.log('Compression complete');
}
try {
await compressFile('access.log', 'access.log.gz');
} catch (err) {
console.error('Pipeline failed:', err);
// All streams are already cleaned up
}
Rule: Never use pipe() in production code. Always use pipeline() from stream/promises.
Transform Streams: The Production Workhorse
A Transform stream is the most useful type. It receives chunks from a Readable, processes them, and emits chunks to a Writable. Every compression, encryption, parsing, or serialization step in a production pipeline is a Transform.
Here's a production-grade CSV line counter Transform:
import { Transform } from 'stream';
class LineCountTransform extends Transform {
constructor(options = {}) {
super({ ...options, objectMode: false });
this.lineCount = 0;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
// Accumulate partial lines across chunks
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// Last element may be incomplete — keep it in buffer
this.buffer = lines.pop();
this.lineCount += lines.length;
// Pass the original chunk through unchanged
callback(null, chunk);
}
_flush(callback) {
// Handle the last line if file doesn't end with \n
if (this.buffer.length > 0) {
this.lineCount++;
}
callback();
}
}
Critical Transform implementation rules:
- Always call
callback()— forgetting it stalls the entire pipeline - Handle split chunks — data never arrives as complete lines or records
- Implement
_flush()— process any remaining buffered data before the stream closes - Call
callback(err)to signal errors — don't throw
Streaming Large Files in Production
The canonical use case: transform a 10 GB log file with 512 MB of RAM available.
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
import { Transform } from 'stream';
class LogFilterTransform extends Transform {
constructor(pattern) {
super();
this.pattern = pattern;
this.incomplete = '';
}
_transform(chunk, encoding, callback) {
const data = this.incomplete + chunk.toString();
const lines = data.split('\n');
this.incomplete = lines.pop(); // Save partial last line
const filtered = lines
.filter(line => this.pattern.test(line))
.join('\n');
if (filtered) {
callback(null, filtered + '\n');
} else {
callback(); // No matching lines — pass nothing downstream
}
}
_flush(callback) {
if (this.incomplete && this.pattern.test(this.incomplete)) {
callback(null, this.incomplete + '\n');
} else {
callback();
}
}
}
async function filterAndCompressLogs(inputPath, outputPath, errorPattern) {
const stats = { linesProcessed: 0, bytesRead: 0 };
await pipeline(
createReadStream(inputPath, { highWaterMark: 64 * 1024 }), // 64KB chunks
new LogFilterTransform(errorPattern),
createGzip({ level: 6 }),
createWriteStream(outputPath)
);
return stats;
}
// Filter ERROR lines from a 10GB log, compress output
await filterAndCompressLogs(
'/var/log/app/production.log',
'/var/log/app/errors.log.gz',
/\bERROR\b/
);
This processes 10 GB using ~80 MB of RAM — the 64 KB read buffer plus gzip's internal buffers.
JSON and CSV Streaming Parsers
The wrong way to parse a large JSON file:
// ❌ Loads entire file into memory
const data = JSON.parse(fs.readFileSync('large-dataset.json', 'utf8'));
The right way — stream parsing with stream-json:
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { parser } from 'stream-json';
import { streamArray } from 'stream-json/streamers/StreamArray.js';
async function processLargeJSON(filePath, processFn) {
const records = [];
await pipeline(
createReadStream(filePath),
parser(),
streamArray(),
async function* (source) {
for await (const { value } of source) {
await processFn(value);
yield value;
}
}
);
}
// Process a 2GB JSON array record by record
await processLargeJSON('dataset.json', async (record) => {
await db.insert(record); // Each record processed and GC'd immediately
});
CSV streaming with csv-parse:
import { pipeline } from 'stream/promises';
import { createReadStream } from 'fs';
import { parse } from 'csv-parse';
import { Transform } from 'stream';
class BatchInsertTransform extends Transform {
constructor(db, batchSize = 1000) {
super({ objectMode: true }); // Input: record objects, Output: count
this.db = db;
this.batchSize = batchSize;
this.batch = [];
this.totalInserted = 0;
}
async _transform(record, encoding, callback) {
this.batch.push(record);
if (this.batch.length >= this.batchSize) {
try {
await this.db.batchInsert('users', this.batch);
this.totalInserted += this.batch.length;
this.batch = [];
callback(null, this.totalInserted);
} catch (err) {
callback(err);
}
} else {
callback();
}
}
async _flush(callback) {
if (this.batch.length > 0) {
try {
await this.db.batchInsert('users', this.batch);
this.totalInserted += this.batch.length;
} catch (err) {
return callback(err);
}
}
callback(null, this.totalInserted);
}
}
async function importCSV(filePath, db) {
await pipeline(
createReadStream(filePath),
parse({ columns: true, skip_empty_lines: true, trim: true }),
new BatchInsertTransform(db, 500)
);
}
Streaming HTTP Responses
Streaming HTTP responses reduce Time-to-First-Byte (TTFB) dramatically for large payloads. The client starts receiving and processing data immediately instead of waiting for the full response.
import express from 'express';
import { createReadStream } from 'fs';
import { pipeline } from 'stream/promises';
import { createGzip } from 'zlib';
const app = express();
// Stream a large file with compression
app.get('/download/large-report', async (req, res) => {
res.setHeader('Content-Type', 'application/octet-stream');
res.setHeader('Content-Encoding', 'gzip');
res.setHeader('Transfer-Encoding', 'chunked');
res.setHeader('Content-Disposition', 'attachment; filename="report.csv.gz"');
try {
await pipeline(
createReadStream('/data/reports/large-report.csv'),
createGzip(),
res
);
} catch (err) {
if (!res.headersSent) {
res.status(500).json({ error: 'Stream failed' });
}
// If headers are sent, the connection just closes — client will see a truncated response
}
});
// Stream a database query as NDJSON (Newline-Delimited JSON)
app.get('/api/users/export', async (req, res) => {
res.setHeader('Content-Type', 'application/x-ndjson');
const cursor = db.query('SELECT * FROM users ORDER BY created_at').cursor(100);
try {
for await (const rows of cursor) {
for (const row of rows) {
if (!res.write(JSON.stringify(row) + '\n')) {
// Backpressure: wait for drain before continuing
await new Promise(resolve => res.once('drain', resolve));
}
}
}
res.end();
} catch (err) {
if (!res.headersSent) {
res.status(500).end();
}
}
});
Configuring highWaterMark for Production
The highWaterMark controls how much data is buffered at each stage. The default (16 KB) is conservative. In production, tune it based on your workload:
// Network-bound workload: larger chunks reduce syscall overhead
createReadStream(filePath, { highWaterMark: 256 * 1024 }); // 256KB
// Memory-constrained environment: smaller chunks limit peak usage
createReadStream(filePath, { highWaterMark: 16 * 1024 }); // 16KB (default)
// Object mode streams: highWaterMark is a count, not bytes
new Transform({ objectMode: true, highWaterMark: 100 }); // Buffer 100 objects max
Rules of thumb:
- CPU-bound transforms (compression, encryption): default is fine
- Network-bound (uploading/downloading): 64–256 KB reduces round-trips
- Object mode pipelines: 16–100 objects depending on object size
- Always measure — wrong
highWaterMarkcan hurt throughput
Production Error Handling Checklist
Streams fail silently if you're not careful. This checklist prevents the most common production incidents:
async function productionPipeline(input, output) {
// 1. Use pipeline() — not pipe()
// 2. Await it — so errors propagate to your try/catch
// 3. Handle the error — don't let it go to unhandledRejection
try {
await pipeline(input, /* transforms */, output);
} catch (err) {
// 4. Log with context
logger.error({ err, input, output }, 'Pipeline failed');
// 5. All streams are auto-destroyed by pipeline() — no manual cleanup needed
throw err; // 6. Re-throw to caller
}
}
The production stream checklist:
- [ ] Using
pipeline()fromstream/promises, not.pipe() - [ ] Transform
_transform()always callscallback() - [ ] Transform
_flush()is implemented for any buffering Transform - [ ]
highWaterMarkis tuned to workload, not left at default - [ ] HTTP response streams check
res.write()return value for backpressure - [ ] All streams have error event handlers (or are wrapped in
pipeline()) - [ ] Stream operations have timeouts — a stalled stream blocks resources forever
Key Takeaways
Node.js streams are the correct tool whenever your data is larger than available memory, your latency requirements are tight, or you're composing multi-stage data transformations. The mental model is simple: chunks flow from sources, through transforms, to sinks. Backpressure flows backwards, slowing producers when consumers are full.
The practical rules are even simpler:
-
Always use
pipeline()— never.pipe() -
Always call
callback()in Transform streams -
Always implement
_flush()in buffering Transforms - Stream HTTP responses for any payload over 10 KB
- Stream file operations for any file over 1 MB
Get these right and you'll have a service that handles 100x the load with the same RAM.
Part of the Node.js Production Series — practical, production-tested Node.js engineering guides. npm packages referenced in this series are published at npmjs.com/~axiom-experiment.
Top comments (0)