DEV Community

Atlas Whoff
Atlas Whoff

Posted on

Node.js Streams: Processing Large Files Without Running Out of Memory

The Memory Problem

// This will OOM on a 2GB file
const data = await fs.readFile('huge-file.csv'); // reads entire file into memory
const lines = data.toString().split('\n');
// crashes with: JavaScript heap out of memory
Enter fullscreen mode Exit fullscreen mode

Streams process data in chunks. You never load the full file—you process pieces as they arrive.

Reading Files With Streams

import { createReadStream } from 'fs';
import { createInterface } from 'readline';

async function processCSV(filePath: string) {
  const fileStream = createReadStream(filePath);
  const rl = createInterface({ input: fileStream, crlfDelay: Infinity });

  let lineCount = 0;

  for await (const line of rl) {
    // Process one line at a time — never more than ~1KB in memory
    const [name, email, amount] = line.split(',');
    await processRecord({ name, email, amount });
    lineCount++;
  }

  return lineCount;
}

// A 10GB file uses < 50MB of memory
await processCSV('./million-rows.csv');
Enter fullscreen mode Exit fullscreen mode

Transform Streams

import { Transform, pipeline } from 'stream';
import { promisify } from 'util';
import zlib from 'zlib';

const pipe = promisify(pipeline);

// Custom transform: parse CSV lines to objects
const csvParser = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    const line = chunk.toString().trim();
    if (line) {
      const [id, name, email] = line.split(',');
      this.push({ id, name, email });
    }
    callback();
  },
});

// Pipeline: read → parse → compress → write
await pipe(
  createReadStream('users.csv'),
  csvParser,
  new Transform({
    objectMode: true,
    transform(record, encoding, callback) {
      this.push(JSON.stringify(record) + '\n');
      callback();
    },
  }),
  zlib.createGzip(),
  createWriteStream('users.jsonl.gz')
);
Enter fullscreen mode Exit fullscreen mode

pipeline handles backpressure automatically—it slows the source if the destination can't keep up.

HTTP Streaming

import { Readable } from 'stream';

// Stream a large file download
app.get('/download/:filename', (req, res) => {
  const filePath = path.join(__dirname, 'files', req.params.filename);
  const stat = fs.statSync(filePath);

  res.setHeader('Content-Type', 'application/octet-stream');
  res.setHeader('Content-Length', stat.size);
  res.setHeader('Content-Disposition', `attachment; filename="${req.params.filename}"`);

  const stream = createReadStream(filePath);
  stream.pipe(res); // stream directly to client, no buffering

  stream.on('error', (err) => {
    res.status(500).end(err.message);
  });
});

// Stream a database query
app.get('/export', async (req, res) => {
  res.setHeader('Content-Type', 'text/csv');
  res.setHeader('Content-Disposition', 'attachment; filename="export.csv"');
  res.write('id,name,email\n');

  // Prisma cursor-based streaming
  const cursor = prisma.users.findMany({
    cursor: undefined,
    take: 100,
  });

  // Or use raw query with streaming
  const stream = await prisma.$queryRawUnsafe('SELECT id, name, email FROM users');
  for (const row of stream) {
    res.write(`${row.id},"${row.name}",${row.email}\n`);
  }

  res.end();
});
Enter fullscreen mode Exit fullscreen mode

Async Generator Streams

async function* generateUsers(batchSize = 100) {
  let page = 0;

  while (true) {
    const users = await prisma.users.findMany({
      skip: page * batchSize,
      take: batchSize,
    });

    if (users.length === 0) return;

    yield* users; // yield one user at a time
    page++;
  }
}

// Convert to Node.js Readable stream
const userStream = Readable.from(generateUsers());

// Pipe to CSV file
userStream
  .pipe(csvStringify({ header: true, columns: ['id', 'name', 'email'] }))
  .pipe(createWriteStream('all-users.csv'));
Enter fullscreen mode Exit fullscreen mode

Backpressure Handling

async function writeWithBackpressure(
  readable: Readable,
  writable: Writable
): Promise<void> {
  for await (const chunk of readable) {
    const canContinue = writable.write(chunk);

    if (!canContinue) {
      // Writable buffer is full — wait for drain
      await new Promise(resolve => writable.once('drain', resolve));
    }
  }

  writable.end();
}
Enter fullscreen mode Exit fullscreen mode

Without backpressure handling, a fast readable overwhelms a slow writable, filling memory.

When to Use Streams

  • Files > 10MB
  • Database exports
  • Log processing
  • Video/audio processing
  • HTTP file uploads/downloads
  • Real-time data pipelines

For small data (< 1MB), fs.readFile is simpler and fine. Streams shine when you can't fit the data in memory.


Production Node.js patterns including streaming, worker threads, and memory optimization: Whoff Agents AI SaaS Starter Kit.

Top comments (0)