DEV Community

Atlas Whoff
Atlas Whoff

Posted on

Node.js Streams: Processing Large Files Without Running Out of Memory

Node.js Streams: Processing Large Files Without Running Out of Memory

Reading a 2GB CSV into memory crashes your process. Streams process data chunk-by-chunk — no matter how large the file, memory usage stays constant.

The Problem

// WRONG: loads entire file into memory
const data = fs.readFileSync('huge-file.csv'); // OOM for large files

// RIGHT: process as a stream
const stream = fs.createReadStream('huge-file.csv');
Enter fullscreen mode Exit fullscreen mode

Reading a Large File Line by Line

import { createReadStream } from 'fs';
import { createInterface } from 'readline';

async function processLargeCSV(filePath: string) {
  const fileStream = createReadStream(filePath);
  const rl = createInterface({ input: fileStream, crlfDelay: Infinity });

  let lineCount = 0;
  for await (const line of rl) {
    const [name, email, amount] = line.split(',');
    await processRecord({ name, email, amount });
    lineCount++;
  }
  console.log(`Processed ${lineCount} records`);
}
Enter fullscreen mode Exit fullscreen mode

Transform Stream

import { Transform } from 'stream';

class UpperCaseTransform extends Transform {
  _transform(chunk: Buffer, encoding: string, callback: Function) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
}

// Chain streams with pipe
fs.createReadStream('input.txt')
  .pipe(new UpperCaseTransform())
  .pipe(fs.createWriteStream('output.txt'));
Enter fullscreen mode Exit fullscreen mode

Streaming HTTP Responses

// Stream a large database query to the client
app.get('/export', async (req, res) => {
  res.setHeader('Content-Type', 'text/csv');
  res.setHeader('Content-Disposition', 'attachment; filename=export.csv');

  // Write header
  res.write('name,email,created_at\n');

  // Cursor through database — never loads all rows at once
  const cursor = prisma.user.findMany({ cursor: undefined });
  for await (const user of cursor) {
    res.write(`${user.name},${user.email},${user.createdAt}\n`);
  }
  res.end();
});
Enter fullscreen mode Exit fullscreen mode

Pipeline (Error Handling)

import { pipeline } from 'stream/promises';
import { createGzip } from 'zlib';

// pipeline handles cleanup on error — pipe() doesn't
await pipeline(
  fs.createReadStream('large-file.log'),
  createGzip(),
  fs.createWriteStream('large-file.log.gz')
);
// Files compressed without loading into memory
Enter fullscreen mode Exit fullscreen mode

Streaming AI Responses

// Stream Claude responses to the client in real time
app.post('/chat', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');

  const stream = client.messages.stream({
    model: 'claude-opus-4-6',
    max_tokens: 1024,
    messages: req.body.messages,
  });

  for await (const chunk of stream) {
    if (chunk.type === 'content_block_delta') {
      res.write(`data: ${JSON.stringify({ text: chunk.delta.text })}\n\n`);
    }
  }
  res.end();
});
Enter fullscreen mode Exit fullscreen mode

Backpressure

// Slow consumer — respect backpressure
const readable = fs.createReadStream('huge-file.bin');
const writable = fs.createWriteStream('output.bin');

readable.on('data', (chunk) => {
  const ok = writable.write(chunk);
  if (!ok) {
    readable.pause(); // Stop reading until writer drains
    writable.once('drain', () => readable.resume());
  }
});
// Simpler: use pipeline() — handles backpressure automatically
Enter fullscreen mode Exit fullscreen mode

The Ship Fast Skill Pack includes a /api skill that generates streaming endpoints for file exports and AI responses. $49 at whoffagents.com.

Top comments (0)