The Memory Problem
// This will OOM on a 2GB file
const data = await fs.readFile('huge-file.csv'); // reads entire file into memory
const lines = data.toString().split('\n');
// crashes with: JavaScript heap out of memory
Streams process data in chunks. You never load the full file—you process pieces as they arrive.
Reading Files With Streams
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
async function processCSV(filePath: string) {
const fileStream = createReadStream(filePath);
const rl = createInterface({ input: fileStream, crlfDelay: Infinity });
let lineCount = 0;
for await (const line of rl) {
// Process one line at a time — never more than ~1KB in memory
const [name, email, amount] = line.split(',');
await processRecord({ name, email, amount });
lineCount++;
}
return lineCount;
}
// A 10GB file uses < 50MB of memory
await processCSV('./million-rows.csv');
Transform Streams
import { Transform, pipeline } from 'stream';
import { promisify } from 'util';
import zlib from 'zlib';
const pipe = promisify(pipeline);
// Custom transform: parse CSV lines to objects
const csvParser = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const line = chunk.toString().trim();
if (line) {
const [id, name, email] = line.split(',');
this.push({ id, name, email });
}
callback();
},
});
// Pipeline: read → parse → compress → write
await pipe(
createReadStream('users.csv'),
csvParser,
new Transform({
objectMode: true,
transform(record, encoding, callback) {
this.push(JSON.stringify(record) + '\n');
callback();
},
}),
zlib.createGzip(),
createWriteStream('users.jsonl.gz')
);
pipeline handles backpressure automatically—it slows the source if the destination can't keep up.
HTTP Streaming
import { Readable } from 'stream';
// Stream a large file download
app.get('/download/:filename', (req, res) => {
const filePath = path.join(__dirname, 'files', req.params.filename);
const stat = fs.statSync(filePath);
res.setHeader('Content-Type', 'application/octet-stream');
res.setHeader('Content-Length', stat.size);
res.setHeader('Content-Disposition', `attachment; filename="${req.params.filename}"`);
const stream = createReadStream(filePath);
stream.pipe(res); // stream directly to client, no buffering
stream.on('error', (err) => {
res.status(500).end(err.message);
});
});
// Stream a database query
app.get('/export', async (req, res) => {
res.setHeader('Content-Type', 'text/csv');
res.setHeader('Content-Disposition', 'attachment; filename="export.csv"');
res.write('id,name,email\n');
// Prisma cursor-based streaming
const cursor = prisma.users.findMany({
cursor: undefined,
take: 100,
});
// Or use raw query with streaming
const stream = await prisma.$queryRawUnsafe('SELECT id, name, email FROM users');
for (const row of stream) {
res.write(`${row.id},"${row.name}",${row.email}\n`);
}
res.end();
});
Async Generator Streams
async function* generateUsers(batchSize = 100) {
let page = 0;
while (true) {
const users = await prisma.users.findMany({
skip: page * batchSize,
take: batchSize,
});
if (users.length === 0) return;
yield* users; // yield one user at a time
page++;
}
}
// Convert to Node.js Readable stream
const userStream = Readable.from(generateUsers());
// Pipe to CSV file
userStream
.pipe(csvStringify({ header: true, columns: ['id', 'name', 'email'] }))
.pipe(createWriteStream('all-users.csv'));
Backpressure Handling
async function writeWithBackpressure(
readable: Readable,
writable: Writable
): Promise<void> {
for await (const chunk of readable) {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Writable buffer is full — wait for drain
await new Promise(resolve => writable.once('drain', resolve));
}
}
writable.end();
}
Without backpressure handling, a fast readable overwhelms a slow writable, filling memory.
When to Use Streams
- Files > 10MB
- Database exports
- Log processing
- Video/audio processing
- HTTP file uploads/downloads
- Real-time data pipelines
For small data (< 1MB), fs.readFile is simpler and fine. Streams shine when you can't fit the data in memory.
Production Node.js patterns including streaming, worker threads, and memory optimization: Whoff Agents AI SaaS Starter Kit.
Top comments (0)