DEV Community

Pedro Alvarado
Pedro Alvarado

Posted on

Streams y Buffers en Node.js (3/n)

Tabla de Contenidos

  1. ¿Qué son los Buffers?
  2. Trabajando con Buffers
  3. ¿Qué son los Streams?
  4. Tipos de Streams
  5. Readable Streams
  6. Writable Streams
  7. Duplex y Transform Streams
  8. Piping y Backpressure
  9. Casos de Uso Reales
  10. Streams vs Lectura Completa
  11. Cuestionario de Entrevista

¿Qué son los Buffers?

Un Buffer es un espacio temporal en memoria para almacenar datos binarios. Es la forma en que Node.js maneja datos binarios puros (imágenes, archivos, datos de red, etc.).

Analogía: El Vaso de Agua

Imagina que estás llenando un vaso con agua:

  • El vaso es el Buffer (espacio temporal en memoria)
  • El agua son los datos binarios
  • El tamaño del vaso es el tamaño del Buffer en bytes

¿Por Qué Buffers?

JavaScript originalmente no tenía forma de manejar datos binarios (fue diseñado para navegadores). Node.js agregó Buffers para trabajar con:

  • Archivos
  • Streams de datos
  • Imágenes y videos
  • Protocolos de red TCP
  • Operaciones criptográficas

Comparación con Python y Ruby

Python:

# Python usa bytes
datos = b"Hola Mundo"
print(type(datos))  # <class 'bytes'>
print(datos[0])     # 72 (valor ASCII de 'H')

# Conversiones
texto = "Hola"
datos_binarios = texto.encode('utf-8')
texto_recuperado = datos_binarios.decode('utf-8')
Enter fullscreen mode Exit fullscreen mode

Ruby:

# Ruby usa String con encoding
datos = "Hola".encode('UTF-8')
puts datos.bytes  # [72, 111, 108, 97]

# String binario
binario = "\x48\x6F\x6C\x61"  # "Hola" en hex
puts binario
Enter fullscreen mode Exit fullscreen mode

Node.js:

// Node.js usa Buffer
const datos = Buffer.from('Hola Mundo');
console.log(datos);
// <Buffer 48 6f 6c 61 20 4d 75 6e 64 6f>

console.log(datos[0]);  // 72 (valor ASCII de 'H')
console.log(datos.toString());  // 'Hola Mundo'
Enter fullscreen mode Exit fullscreen mode

Trabajando con Buffers

Crear Buffers

// 1. Desde un string
const buf1 = Buffer.from('Hola');
console.log(buf1);
// <Buffer 48 6f 6c 61>

// 2. Desde un array de bytes
const buf2 = Buffer.from([72, 111, 108, 97]);
console.log(buf2.toString());  // 'Hola'

// 3. Buffer vacío de tamaño específico
const buf3 = Buffer.alloc(10);  // 10 bytes, inicializado con ceros
console.log(buf3);
// <Buffer 00 00 00 00 00 00 00 00 00 00>

// 4. Buffer sin inicializar (más rápido pero inseguro)
const buf4 = Buffer.allocUnsafe(10);
console.log(buf4);
// <Buffer [contenido aleatorio de memoria]>

// IMPORTANTE: Debes llenarlo antes de usar
buf4.fill(0);  // Llenar con ceros
Enter fullscreen mode Exit fullscreen mode

⚠️ Advertencia sobre allocUnsafe:

// allocUnsafe no limpia la memoria
const bufferInseguro = Buffer.allocUnsafe(10);
console.log(bufferInseguro);
// Puede contener datos sensibles de memoria previa
// (contraseñas, tokens, datos de otros procesos)

// SIEMPRE inicializa después
bufferInseguro.fill(0);
// O usa alloc() que es seguro por defecto
Enter fullscreen mode Exit fullscreen mode

Encodings (Codificaciones)

Node.js soporta múltiples codificaciones:

const texto = 'Hola Mundo 🌎';

// UTF-8 (por defecto, soporta todos los caracteres Unicode)
const utf8 = Buffer.from(texto, 'utf8');
console.log(utf8);
// <Buffer 48 6f 6c 61 20 4d 75 6e 64 6f 20 f0 9f 8c 8e>

// ASCII (solo caracteres 0-127)
const ascii = Buffer.from('Hola', 'ascii');
console.log(ascii.toString('ascii'));  // 'Hola'

// Base64 (para transferir datos binarios como texto)
const base64 = Buffer.from('Hola').toString('base64');
console.log(base64);  // 'SG9sYQ=='

const decodedBase64 = Buffer.from(base64, 'base64');
console.log(decodedBase64.toString());  // 'Hola'

// Hexadecimal
const hex = Buffer.from('Hola').toString('hex');
console.log(hex);  // '486f6c61'

const decodedHex = Buffer.from(hex, 'hex');
console.log(decodedHex.toString());  // 'Hola'

// Latin1 (ISO-8859-1)
const latin1 = Buffer.from('Hola', 'latin1');

// UTF-16LE
const utf16 = Buffer.from('Hola', 'utf16le');
Enter fullscreen mode Exit fullscreen mode

Encodings soportados:

  • 'utf8' - Unicode (por defecto)
  • 'ascii' - Solo caracteres ASCII
  • 'base64' - Base64
  • 'hex' - Hexadecimal
  • 'latin1' / 'binary' - ISO-8859-1
  • 'utf16le' / 'ucs2' - UTF-16 Little Endian

Operaciones con Buffers

Leer y Escribir

const buffer = Buffer.alloc(10);

// Escribir
buffer.write('Hola', 0, 'utf8');
console.log(buffer);
// <Buffer 48 6f 6c 61 00 00 00 00 00 00>

// Escribir en posición específica
buffer.write('Mundo', 5, 'utf8');
console.log(buffer.toString());
// 'HolaMundo\x00'

// Leer bytes individuales
console.log(buffer[0]);  // 72 ('H')
console.log(buffer[5]);  // 77 ('M')

// Modificar bytes
buffer[0] = 65;  // 'A' en ASCII
console.log(buffer.toString());  // 'AolaMundo\x00'
Enter fullscreen mode Exit fullscreen mode

Copiar Buffers

const buf1 = Buffer.from('Hola');
const buf2 = Buffer.alloc(4);

// Copiar de buf1 a buf2
buf1.copy(buf2);
console.log(buf2.toString());  // 'Hola'

// Copiar parcialmente
const buf3 = Buffer.from('Hola Mundo');
const buf4 = Buffer.alloc(4);

buf3.copy(
  buf4,    // destino
  0,       // posición destino
  0,       // posición inicio origen
  4        // posición fin origen
);
console.log(buf4.toString());  // 'Hola'
Enter fullscreen mode Exit fullscreen mode

Concatenar Buffers

const buf1 = Buffer.from('Hola ');
const buf2 = Buffer.from('Mundo');

const concatenado = Buffer.concat([buf1, buf2]);
console.log(concatenado.toString());  // 'Hola Mundo'

// Con múltiples buffers
const buf3 = Buffer.from('!');
const resultado = Buffer.concat([buf1, buf2, buf3]);
console.log(resultado.toString());  // 'Hola Mundo!'

// Especificar longitud total
const limitado = Buffer.concat([buf1, buf2], 8);
console.log(limitado.toString());  // 'Hola Mun'
Enter fullscreen mode Exit fullscreen mode

Comparar Buffers

const buf1 = Buffer.from('ABC');
const buf2 = Buffer.from('ABC');
const buf3 = Buffer.from('ABD');

// Comparación
console.log(buf1.equals(buf2));  // true
console.log(buf1.equals(buf3));  // false

// Compare (retorna -1, 0, o 1)
console.log(buf1.compare(buf2));  // 0 (iguales)
console.log(buf1.compare(buf3));  // -1 (buf1 < buf3)
console.log(buf3.compare(buf1));  // 1 (buf3 > buf1)

// Ordenar arrays de buffers
const buffers = [
  Buffer.from('C'),
  Buffer.from('A'),
  Buffer.from('B')
];

buffers.sort(Buffer.compare);
console.log(buffers.map(b => b.toString()));
// ['A', 'B', 'C']
Enter fullscreen mode Exit fullscreen mode

Slicing (Cortar)

const buffer = Buffer.from('Hola Mundo');

// Obtener subcadena
const slice = buffer.slice(0, 4);
console.log(slice.toString());  // 'Hola'

// ⚠️ IMPORTANTE: slice() NO crea una copia
// Modifica la referencia original
slice[0] = 65;  // Cambiar a 'A'
console.log(buffer.toString());  // 'Aola Mundo'

// Para hacer una copia real:
const copia = Buffer.from(buffer.slice(0, 4));
copia[0] = 66;  // Cambiar a 'B'
console.log(buffer.toString());  // 'Aola Mundo' (sin cambios)
console.log(copia.toString());   // 'Bola'
Enter fullscreen mode Exit fullscreen mode

Trabajar con JSON

const usuario = {
  nombre: 'Juan',
  edad: 30,
  activo: true
};

// Convertir objeto a Buffer
const buffer = Buffer.from(JSON.stringify(usuario));
console.log(buffer);
// <Buffer 7b 22 6e 6f 6d 62 72 65 22 3a 22 4a 75 61 6e ...>

// Convertir Buffer a objeto
const objetoRecuperado = JSON.parse(buffer.toString());
console.log(objetoRecuperado);
// { nombre: 'Juan', edad: 30, activo: true }

// Usar con toJSON()
const buf = Buffer.from('Hola');
console.log(JSON.stringify(buf));
// {"type":"Buffer","data":[72,111,108,97]}
Enter fullscreen mode Exit fullscreen mode

Casos de Uso Comunes de Buffers

const fs = require('fs');
const crypto = require('crypto');

// 1. Leer archivo como Buffer
const contenido = fs.readFileSync('imagen.png');
console.log(contenido instanceof Buffer);  // true

// 2. Operaciones criptográficas
const hash = crypto
  .createHash('sha256')
  .update('mi contraseña')
  .digest();  // Retorna un Buffer
console.log(hash.toString('hex'));

// 3. Trabajar con datos binarios
const imagen = Buffer.from([0xFF, 0xD8, 0xFF, 0xE0]);  // JPEG header
console.log(imagen);

// 4. Convertir entre encodings
const base64Data = 'SGVsbG8gV29ybGQ=';
const buffer = Buffer.from(base64Data, 'base64');
const texto = buffer.toString('utf8');
console.log(texto);  // 'Hello World'
Enter fullscreen mode Exit fullscreen mode

¿Qué son los Streams?

Los Streams son colecciones de datos (como arrays o strings) con la diferencia de que no necesitan estar disponibles todos al mismo tiempo y no tienen que caber en memoria.

Analogía: Manguera vs Cubeta

Sin Streams (Leyendo todo):

Tienes sed y hay un río →
Llenas un camión cisterna completo →
Esperas que se llene todo →
Recién entonces puedes beber

❌ Lento
❌ Requiere mucha memoria
❌ No puedes empezar hasta tener todo
Enter fullscreen mode Exit fullscreen mode

Con Streams:

Tienes sed y hay un río →
Usas una manguera/pajilla →
Bebes mientras fluye el agua

✅ Inmediato
✅ Memoria constante
✅ Puedes empezar de inmediato
Enter fullscreen mode Exit fullscreen mode

¿Por Qué Usar Streams?

1. Eficiencia de Memoria

const fs = require('fs');

// ❌ SIN STREAMS - Carga todo en memoria
const datos = fs.readFileSync('archivo-5gb.txt');
// Si el archivo es de 5GB, necesitas 5GB de RAM
console.log(datos.toString());

// ✅ CON STREAMS - Memoria constante
const stream = fs.createReadStream('archivo-5gb.txt');
stream.on('data', (chunk) => {
  console.log(`Recibido chunk de ${chunk.length} bytes`);
  // Procesa chunk (ej: 64KB) y libera memoria
});
// Solo necesitas ~64KB de RAM a la vez
Enter fullscreen mode Exit fullscreen mode

2. Eficiencia de Tiempo (Time Efficiency)

const fs = require('fs');
const http = require('http');

// ❌ SIN STREAMS - El usuario espera hasta que todo esté listo
http.createServer((req, res) => {
  fs.readFile('pelicula-1gb.mp4', (err, data) => {
    res.end(data);  // Espera cargar todo antes de enviar
  });
}).listen(8000);

// ✅ CON STREAMS - El usuario empieza a recibir inmediatamente
http.createServer((req, res) => {
  const stream = fs.createReadStream('pelicula-1gb.mp4');
  stream.pipe(res);  // Envía mientras lee
}).listen(8000);
Enter fullscreen mode Exit fullscreen mode

Comparación con Python

Python (sin streams):

# Leer archivo completo
with open('archivo.txt', 'r') as f:
    contenido = f.read()  # Todo en memoria
    print(contenido)
Enter fullscreen mode Exit fullscreen mode

Python (con generadores - similar a streams):

def leer_lineas(archivo):
    with open(archivo) as f:
        for linea in f:  # Lee línea por línea
            yield linea

for linea in leer_lineas('archivo.txt'):
    print(linea)
Enter fullscreen mode Exit fullscreen mode

Node.js (con streams):

const fs = require('fs');
const stream = fs.createReadStream('archivo.txt');

stream.on('data', (chunk) => {
  console.log(chunk.toString());
});
Enter fullscreen mode Exit fullscreen mode

Tipos de Streams

Node.js tiene 4 tipos fundamentales de streams:

┌──────────────────────────────────────────────┐
│                                              │
│  1. Readable    →  Puedes LEER datos        │
│     (origen de datos)                        │
│                                              │
│  2. Writable    →  Puedes ESCRIBIR datos    │
│     (destino de datos)                       │
│                                              │
│  3. Duplex      →  Puedes LEER y ESCRIBIR   │
│     (ambas direcciones)                      │
│                                              │
│  4. Transform   →  Modifica datos al pasar  │
│     (procesa mientras transmite)             │
│                                              │
└──────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Ejemplos de Cada Tipo

const fs = require('fs');
const net = require('net');
const crypto = require('crypto');
const { Transform } = require('stream');

// 1. READABLE - fs.createReadStream()
const readable = fs.createReadStream('entrada.txt');

// 2. WRITABLE - fs.createWriteStream()
const writable = fs.createWriteStream('salida.txt');

// 3. DUPLEX - net.Socket (conexión TCP)
const socket = net.connect(8000);
socket.write('datos');  // Writable
socket.on('data', console.log);  // Readable

// 4. TRANSFORM - crypto.createCipheriv()
const cipher = crypto.createCipheriv('aes256', key, iv);
// Lee datos, los encripta, los escribe
Enter fullscreen mode Exit fullscreen mode

Tabla de Streams Comunes

Tipo Ejemplos Descripción
Readable fs.createReadStream() Leer archivos
http.IncomingMessage Request del servidor
process.stdin Entrada estándar
zlib.createGunzip() Descomprimir
Writable fs.createWriteStream() Escribir archivos
http.ServerResponse Response del servidor
process.stdout Salida estándar
process.stderr Salida de error
Duplex net.Socket TCP sockets
tls.TLSSocket TLS/SSL sockets
Transform zlib.createGzip() Comprimir
crypto.createCipher() Encriptar
stream.Transform Custom transforms

Readable Streams

Los Readable Streams te permiten leer datos de una fuente.

Modos de Lectura

Readable streams operan en dos modos:

  1. Flowing Mode (Flujo): Los datos fluyen automáticamente
  2. Paused Mode (Pausado): Debes pedir datos manualmente
const fs = require('fs');

// PAUSED MODE (por defecto)
const stream = fs.createReadStream('archivo.txt');

stream.on('readable', () => {
  let chunk;
  // Leer manualmente
  while ((chunk = stream.read()) !== null) {
    console.log(`Recibido: ${chunk.length} bytes`);
  }
});

// FLOWING MODE
const stream2 = fs.createReadStream('archivo.txt');

stream2.on('data', (chunk) => {
  // Datos fluyen automáticamente
  console.log(`Recibido: ${chunk.length} bytes`);
});
Enter fullscreen mode Exit fullscreen mode

Eventos de Readable Streams

const fs = require('fs');

const stream = fs.createReadStream('archivo.txt', {
  encoding: 'utf8',
  highWaterMark: 16 * 1024  // Tamaño del chunk: 16KB
});

// 1. 'data' - Nuevo chunk disponible (flowing mode)
stream.on('data', (chunk) => {
  console.log('Datos recibidos:', chunk.length, 'bytes');
});

// 2. 'end' - No hay más datos
stream.on('end', () => {
  console.log('Lectura completada');
});

// 3. 'error' - Error durante la lectura
stream.on('error', (error) => {
  console.error('Error:', error);
});

// 4. 'close' - Stream cerrado
stream.on('close', () => {
  console.log('Stream cerrado');
});

// 5. 'readable' - Datos disponibles para leer (paused mode)
stream.on('readable', () => {
  console.log('Hay datos disponibles');
});
Enter fullscreen mode Exit fullscreen mode

Crear Readable Stream Personalizado

const { Readable } = require('stream');

// Ejemplo 1: Stream simple desde array
class ArrayStream extends Readable {
  constructor(array) {
    super();
    this.array = array;
    this.index = 0;
  }

  _read() {
    if (this.index < this.array.length) {
      const chunk = this.array[this.index];
      this.push(chunk);  // Enviar datos
      this.index++;
    } else {
      this.push(null);  // Señal de fin
    }
  }
}

const stream = new ArrayStream(['Hola', ' ', 'Mundo', '\n']);

stream.on('data', (chunk) => {
  console.log('Recibido:', chunk);
});

stream.on('end', () => {
  console.log('Fin del stream');
});

// Salida:
// Recibido: Hola
// Recibido:  
// Recibido: Mundo
// Recibido: \n
// Fin del stream
Enter fullscreen mode Exit fullscreen mode
// Ejemplo 2: Stream de números
class ContadorStream extends Readable {
  constructor(limite) {
    super({ objectMode: true });  // Para objetos en vez de buffers
    this.limite = limite;
    this.contador = 1;
  }

  _read() {
    if (this.contador <= this.limite) {
      this.push({ numero: this.contador });
      this.contador++;
    } else {
      this.push(null);
    }
  }
}

const contador = new ContadorStream(5);

contador.on('data', (obj) => {
  console.log('Número:', obj.numero);
});

// Salida:
// Número: 1
// Número: 2
// Número: 3
// Número: 4
// Número: 5
Enter fullscreen mode Exit fullscreen mode

Control de Flujo

const fs = require('fs');

const stream = fs.createReadStream('archivo-grande.txt');

stream.on('data', (chunk) => {
  console.log('Procesando chunk...');

  // Pausar el stream si necesitas tiempo para procesar
  stream.pause();

  // Simular procesamiento pesado
  setTimeout(() => {
    console.log('Chunk procesado, continuando...');
    stream.resume();  // Reanudar
  }, 1000);
});

stream.on('end', () => {
  console.log('Todos los chunks procesados');
});
Enter fullscreen mode Exit fullscreen mode

Writable Streams

Los Writable Streams te permiten escribir datos a un destino.

Escribir en Streams

const fs = require('fs');

const stream = fs.createWriteStream('salida.txt');

// Escribir datos
stream.write('Primera línea\n');
stream.write('Segunda línea\n');
stream.write('Tercera línea\n');

// Finalizar el stream
stream.end('Última línea\n');

// O simplemente
// stream.end();
Enter fullscreen mode Exit fullscreen mode

Eventos de Writable Streams

const fs = require('fs');

const stream = fs.createWriteStream('salida.txt');

// 1. 'drain' - El buffer interno está listo para más datos
stream.on('drain', () => {
  console.log('Buffer liberado, puedo escribir más');
});

// 2. 'finish' - Todos los datos fueron escritos
stream.on('finish', () => {
  console.log('Escritura completada');
});

// 3. 'error' - Error durante la escritura
stream.on('error', (error) => {
  console.error('Error:', error);
});

// 4. 'close' - Stream cerrado
stream.on('close', () => {
  console.log('Stream cerrado');
});

// 5. 'pipe' - Readable stream conectado vía pipe
stream.on('pipe', (src) => {
  console.log('Algo se conectó vía pipe');
});

// 6. 'unpipe' - Readable stream desconectado
stream.on('unpipe', (src) => {
  console.log('Algo se desconectó');
});
Enter fullscreen mode Exit fullscreen mode

Backpressure (Contrapresión)

Uno de los conceptos más importantes en streams.

const fs = require('fs');

const stream = fs.createWriteStream('salida.txt');

// write() retorna false cuando el buffer está lleno
function escribirMillon() {
  let i = 1000000;

  function escribir() {
    let ok = true;

    while (i > 0 && ok) {
      i--;
      const datos = `Línea ${i}\n`;

      // write() retorna false si el buffer está lleno
      ok = stream.write(datos);

      if (!ok) {
        console.log('Buffer lleno, esperando drain...');
      }
    }

    if (i > 0) {
      // Esperar evento 'drain' antes de continuar
      stream.once('drain', escribir);
    } else {
      stream.end();
    }
  }

  escribir();
}

escribirMillon();

stream.on('finish', () => {
  console.log('Escritura completada');
});
Enter fullscreen mode Exit fullscreen mode

¿Qué es Backpressure?

  • El stream tiene un buffer interno limitado
  • Si escribes más rápido de lo que puede procesar, el buffer se llena
  • write() retorna false cuando el buffer está lleno
  • Debes esperar el evento 'drain' antes de continuar
  • Si ignoras esto, causas fugas de memoria

Crear Writable Stream Personalizado

const { Writable } = require('stream');

// Stream que convierte a mayúsculas y escribe en consola
class UpperCaseStream extends Writable {
  _write(chunk, encoding, callback) {
    try {
      const texto = chunk.toString().toUpperCase();
      console.log(texto);
      callback();  // Indicar éxito
    } catch (error) {
      callback(error);  // Indicar error
    }
  }
}

const stream = new UpperCaseStream();

stream.write('hola ');
stream.write('mundo');
stream.end();

// Salida:
// HOLA 
// MUNDO
Enter fullscreen mode Exit fullscreen mode
// Stream que acumula datos y los procesa al final
class AcumuladorStream extends Writable {
  constructor(options) {
    super(options);
    this.datos = [];
  }

  _write(chunk, encoding, callback) {
    this.datos.push(chunk);
    callback();
  }

  _final(callback) {
    // Llamado cuando stream.end()
    const total = Buffer.concat(this.datos);
    console.log('Total acumulado:', total.toString());
    callback();
  }
}

const acumulador = new AcumuladorStream();

acumulador.write('Parte 1 ');
acumulador.write('Parte 2 ');
acumulador.write('Parte 3');
acumulador.end();

// Salida (al final):
// Total acumulado: Parte 1 Parte 2 Parte 3
Enter fullscreen mode Exit fullscreen mode

Duplex y Transform Streams

Duplex Streams

Duplex streams son tanto Readable como Writable.

const { Duplex } = require('stream');

class EchoStream extends Duplex {
  constructor(options) {
    super(options);
    this.datos = [];
  }

  // Lado Writable
  _write(chunk, encoding, callback) {
    this.datos.push(chunk);
    callback();
  }

  // Lado Readable
  _read() {
    if (this.datos.length > 0) {
      const chunk = this.datos.shift();
      this.push(chunk);
    }
  }
}

const echo = new EchoStream();

// Escribir datos
echo.write('Hola ');
echo.write('Mundo');
echo.end();

// Leer datos
echo.on('data', (chunk) => {
  console.log('Eco:', chunk.toString());
});

// Salida:
// Eco: Hola 
// Eco: Mundo
Enter fullscreen mode Exit fullscreen mode

Transform Streams

Transform streams son Duplex streams que modifican datos mientras pasan.

const { Transform } = require('stream');

// Stream que convierte a mayúsculas
class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // Transformar el chunk
    const transformado = chunk.toString().toUpperCase();
    // Enviar el dato transformado
    this.push(transformado);
    callback();
  }
}

const upper = new UppercaseTransform();

upper.on('data', (chunk) => {
  console.log(chunk.toString());
});

upper.write('hola ');
upper.write('mundo');
upper.end();

// Salida:
// HOLA 
// MUNDO
Enter fullscreen mode Exit fullscreen mode

Transform Streams Comunes

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

// 1. COMPRESIÓN
const gzip = zlib.createGzip();

fs.createReadStream('entrada.txt')
  .pipe(gzip)
  .pipe(fs.createWriteStream('entrada.txt.gz'));

// 2. DESCOMPRESIÓN
const gunzip = zlib.createGunzip();

fs.createReadStream('archivo.txt.gz')
  .pipe(gunzip)
  .pipe(fs.createWriteStream('archivo.txt'));

// 3. ENCRIPTACIÓN
const algorithm = 'aes-256-cbc';
const key = crypto.randomBytes(32);
const iv = crypto.randomBytes(16);

const cipher = crypto.createCipheriv(algorithm, key, iv);

fs.createReadStream('secreto.txt')
  .pipe(cipher)
  .pipe(fs.createWriteStream('secreto.txt.enc'));

// 4. DESENCRIPTACIÓN
const decipher = crypto.createDecipheriv(algorithm, key, iv);

fs.createReadStream('secreto.txt.enc')
  .pipe(decipher)
  .pipe(fs.createWriteStream('secreto-recuperado.txt'));
Enter fullscreen mode Exit fullscreen mode

Ejemplo Avanzado: Procesador de CSV

const { Transform } = require('stream');
const fs = require('fs');

// Transform que procesa líneas CSV
class CSVParser extends Transform {
  constructor(options) {
    super({ objectMode: true });
    this.headers = null;
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');

    // Mantener la última línea incompleta en el buffer
    this.buffer = lines.pop();

    for (const line of lines) {
      if (!line.trim()) continue;

      const values = line.split(',');

      if (!this.headers) {
        this.headers = values;
        continue;
      }

      const obj = {};
      for (let i = 0; i < this.headers.length; i++) {
        obj[this.headers[i]] = values[i];
      }

      this.push(obj);
    }

    callback();
  }

  _flush(callback) {
    // Procesar la última línea
    if (this.buffer.trim()) {
      const values = this.buffer.split(',');
      const obj = {};
      for (let i = 0; i < this.headers.length; i++) {
        obj[this.headers[i]] = values[i];
      }
      this.push(obj);
    }
    callback();
  }
}

// Uso del CSV Parser
const csvParser = new CSVParser();

fs.createReadStream('usuarios.csv')
  .pipe(csvParser)
  .on('data', (usuario) => {
    console.log('Usuario:', usuario);
    // { nombre: 'Juan', edad: '30', email: 'juan@example.com' }
  })
  .on('end', () => {
    console.log('CSV procesado completamente');
  });
Enter fullscreen mode Exit fullscreen mode

Piping y Backpressure

¿Qué es Piping?

Piping es conectar la salida de un Readable stream con la entrada de un Writable stream.

const fs = require('fs');

// Copiar archivo usando streams
fs.createReadStream('entrada.txt')
  .pipe(fs.createWriteStream('salida.txt'));
Enter fullscreen mode Exit fullscreen mode

Analogía con Unix:

# En Unix/Linux
cat archivo.txt | grep "buscar" | sort > resultado.txt

# En Node.js
fs.createReadStream('archivo.txt')
  .pipe(grepStream)
  .pipe(sortStream)
  .pipe(fs.createWriteStream('resultado.txt'));
Enter fullscreen mode Exit fullscreen mode

Pipeline Complejo

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

// Pipeline: Leer → Encriptar → Comprimir → Escribir
fs.createReadStream('secreto.txt')
  .pipe(crypto.createCipheriv('aes256', key, iv))
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('secreto.txt.enc.gz'))
  .on('finish', () => {
    console.log('Archivo procesado completamente');
  })
  .on('error', (err) => {
    console.error('Error en el pipeline:', err);
  });
Enter fullscreen mode Exit fullscreen mode

stream.pipeline() - Manejo de Errores Mejorado

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('entrada.txt'),
  zlib.createGzip(),
  fs.createWriteStream('salida.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline falló:', err);
    } else {
      console.log('Pipeline completado exitosamente');
    }
  }
);

// Con Promises (Node.js 15+)
const { pipeline } = require('stream/promises');

try {
  await pipeline(
    fs.createReadStream('entrada.txt'),
    zlib.createGzip(),
    fs.createWriteStream('salida.txt.gz')
  );
  console.log('Pipeline completado');
} catch (error) {
  console.error('Pipeline falló:', error);
}
Enter fullscreen mode Exit fullscreen mode

¿Qué es Backpressure?

Backpressure ocurre cuando un stream produce datos más rápido de lo que el siguiente stream puede consumirlos.

const fs = require('fs');

const readable = fs.createReadStream('archivo-grande.txt');
const writable = fs.createWriteStream('copia.txt');

readable.on('data', (chunk) => {
  const ok = writable.write(chunk);

  if (!ok) {
    // El writable está sobrecargado
    console.log('Backpressure detectado, pausando lectura...');
    readable.pause();

    // Esperar a que se libere
    writable.once('drain', () => {
      console.log('Buffer liberado, reanudando lectura...');
      readable.resume();
    });
  }
});
Enter fullscreen mode Exit fullscreen mode

¿Por qué es importante?

  • Sin manejo de backpressure: fuga de memoria
  • Los datos se acumulan en memoria sin límite
  • La aplicación puede crashear por falta de memoria

Pipe Maneja Backpressure Automáticamente

// ✅ CORRECTO - pipe() maneja backpressure automáticamente
fs.createReadStream('archivo-grande.txt')
  .pipe(fs.createWriteStream('copia.txt'));

// ❌ INCORRECTO - Sin manejo de backpressure
const readable = fs.createReadStream('archivo-grande.txt');
const writable = fs.createWriteStream('copia.txt');

readable.on('data', (chunk) => {
  writable.write(chunk); // Puede causar fuga de memoria
});
Enter fullscreen mode Exit fullscreen mode

Transform Stream con Backpressure

const { Transform } = require('stream');

class SlowProcessor extends Transform {
  _transform(chunk, encoding, callback) {
    // Simular procesamiento lento
    setTimeout(() => {
      const processed = chunk.toString().toUpperCase();
      this.push(processed);
      callback();
    }, 100); // 100ms por chunk
  }
}

const processor = new SlowProcessor();

fs.createReadStream('input.txt')
  .pipe(processor) // Automáticamente maneja backpressure
  .pipe(fs.createWriteStream('output.txt'));
Enter fullscreen mode Exit fullscreen mode

Casos de Uso Reales

1. Servidor de Descarga de Archivos

const http = require('http');
const fs = require('fs');
const path = require('path');

const server = http.createServer((req, res) => {
  if (req.method === 'GET' && req.url.startsWith('/download/')) {
    const filename = path.basename(req.url);
    const filepath = path.join(__dirname, 'files', filename);

    // Verificar que el archivo existe
    fs.access(filepath, fs.constants.F_OK, (err) => {
      if (err) {
        res.statusCode = 404;
        res.end('Archivo no encontrado');
        return;
      }

      // Obtener información del archivo
      fs.stat(filepath, (err, stats) => {
        if (err) {
          res.statusCode = 500;
          res.end('Error del servidor');
          return;
        }

        // Headers de respuesta
        res.setHeader('Content-Type', 'application/octet-stream');
        res.setHeader('Content-Length', stats.size);
        res.setHeader('Content-Disposition', `attachment; filename="${filename}"`);

        // Stream el archivo directamente al cliente
        const fileStream = fs.createReadStream(filepath);

        fileStream.on('error', (err) => {
          console.error('Error leyendo archivo:', err);
          res.statusCode = 500;
          res.end('Error leyendo archivo');
        });

        // Pipe automáticamente maneja backpressure
        fileStream.pipe(res);
      });
    });
  } else {
    res.statusCode = 404;
    res.end('Not Found');
  }
});

server.listen(3000, () => {
  console.log('Servidor de descarga en puerto 3000');
});
Enter fullscreen mode Exit fullscreen mode

2. Procesador de Logs en Tiempo Real

const fs = require('fs');
const { Transform } = require('stream');

// Transform para parsear logs de Apache/Nginx
class LogParser extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');

    for (const line of lines) {
      if (!line.trim()) continue;

      // Parsear línea de log (formato común de Apache)
      const match = line.match(/^(\S+) \S+ \S+ \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+|-)/);

      if (match) {
        const logEntry = {
          ip: match[1],
          timestamp: new Date(match[2]),
          method: match[3],
          url: match[4],
          protocol: match[5],
          status: parseInt(match[6]),
          size: match[7] === '-' ? 0 : parseInt(match[7])
        };

        this.push(logEntry);
      }
    }

    callback();
  }
}

// Transform para filtrar errores
class ErrorFilter extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(logEntry, encoding, callback) {
    if (logEntry.status >= 400) {
      this.push(logEntry);
    }
    callback();
  }
}

// Procesar logs en tiempo real
const logParser = new LogParser();
const errorFilter = new ErrorFilter();

fs.createReadStream('/var/log/apache2/access.log')
  .pipe(logParser)
  .pipe(errorFilter)
  .on('data', (errorLog) => {
    console.log(`ERROR ${errorLog.status}: ${errorLog.method} ${errorLog.url} desde ${errorLog.ip}`);

    // Enviar alerta si es error crítico
    if (errorLog.status >= 500) {
      console.log('🚨 ALERTA: Error del servidor detectado!');
    }
  });
Enter fullscreen mode Exit fullscreen mode

3. Compresión de Imágenes en Lote

const fs = require('fs');
const path = require('path');
const { Transform, pipeline } = require('stream');
const sharp = require('sharp'); // npm install sharp

class ImageProcessor extends Transform {
  constructor(options = {}) {
    super({ objectMode: true });
    this.quality = options.quality || 80;
    this.width = options.width || 800;
  }

  _transform(file, encoding, callback) {
    const inputPath = file.path;
    const outputPath = path.join(
      path.dirname(inputPath),
      'compressed',
      path.basename(inputPath)
    );

    // Crear directorio si no existe
    fs.mkdirSync(path.dirname(outputPath), { recursive: true });

    sharp(inputPath)
      .resize(this.width)
      .jpeg({ quality: this.quality })
      .toFile(outputPath)
      .then(() => {
        console.log(`Procesada: ${path.basename(inputPath)}`);
        this.push({ input: inputPath, output: outputPath });
        callback();
      })
      .catch(callback);
  }
}

// Stream que lee archivos de un directorio
class DirectoryReader extends require('stream').Readable {
  constructor(directory) {
    super({ objectMode: true });
    this.directory = directory;
    this.files = [];
    this.index = 0;

    // Leer archivos de imagen
    const files = fs.readdirSync(directory);
    this.files = files
      .filter(file => /\.(jpg|jpeg|png)$/i.test(file))
      .map(file => ({ path: path.join(directory, file) }));
  }

  _read() {
    if (this.index < this.files.length) {
      this.push(this.files[this.index]);
      this.index++;
    } else {
      this.push(null); // Fin del stream
    }
  }
}

// Usar el procesador
const imageDir = './images';
const reader = new DirectoryReader(imageDir);
const processor = new ImageProcessor({ quality: 70, width: 600 });

reader
  .pipe(processor)
  .on('data', (result) => {
    console.log(`✅ Imagen comprimida: ${result.output}`);
  })
  .on('end', () => {
    console.log('🎉 Todas las imágenes procesadas!');
  })
  .on('error', (err) => {
    console.error('❌ Error procesando imágenes:', err);
  });
Enter fullscreen mode Exit fullscreen mode

4. API de Upload con Streams

const express = require('express');
const multer = require('multer');
const fs = require('fs');
const crypto = require('crypto');
const { pipeline } = require('stream');

const app = express();

// Configurar multer para manejar uploads
const upload = multer({ dest: 'temp/' });

app.post('/upload', upload.single('file'), async (req, res) => {
  try {
    const tempPath = req.file.path;
    const hash = crypto.createHash('sha256');
    const finalPath = `uploads/${Date.now()}-${req.file.originalname}`;

    // Pipeline: Leer temp → Calcular hash → Escribir final
    await pipeline(
      fs.createReadStream(tempPath),
      hash,
      fs.createWriteStream(finalPath)
    );

    // Limpiar archivo temporal
    fs.unlinkSync(tempPath);

    res.json({
      message: 'Archivo subido exitosamente',
      filename: path.basename(finalPath),
      hash: hash.digest('hex'),
      size: req.file.size
    });

  } catch (error) {
    console.error('Error en upload:', error);
    res.status(500).json({ error: 'Error subiendo archivo' });
  }
});

app.listen(3000, () => {
  console.log('Servidor de upload en puerto 3000');
});
Enter fullscreen mode Exit fullscreen mode

Streams vs Lectura Completa

Comparación de Memoria

const fs = require('fs');

// ❌ LECTURA COMPLETA - Usa mucha memoria
function procesarArchivoCompleto(filename) {
  console.log('Memoria antes:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');

  const data = fs.readFileSync(filename);
  const lines = data.toString().split('\n');

  let count = 0;
  for (const line of lines) {
    if (line.includes('ERROR')) count++;
  }

  console.log('Memoria después:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');
  console.log('Errores encontrados:', count);
}

// ✅ STREAMS - Memoria constante
function procesarArchivoStream(filename) {
  console.log('Memoria antes:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');

  let count = 0;
  let buffer = '';

  const stream = fs.createReadStream(filename, { encoding: 'utf8' });

  stream.on('data', (chunk) => {
    buffer += chunk;
    const lines = buffer.split('\n');
    buffer = lines.pop(); // Mantener línea incompleta

    for (const line of lines) {
      if (line.includes('ERROR')) count++;
    }

    console.log('Memoria durante:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');
  });

  stream.on('end', () => {
    // Procesar última línea
    if (buffer && buffer.includes('ERROR')) count++;

    console.log('Memoria final:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');
    console.log('Errores encontrados:', count);
  });
}

// Probar con archivo de 100MB
// procesarArchivoCompleto('log-grande.txt'); // ~100MB en memoria
// procesarArchivoStream('log-grande.txt');   // ~2MB en memoria
Enter fullscreen mode Exit fullscreen mode

Comparación de Tiempo

const fs = require('fs');

// Medir tiempo de procesamiento
function medirTiempo(fn, nombre) {
  const inicio = Date.now();

  fn(() => {
    const fin = Date.now();
    console.log(`${nombre}: ${fin - inicio}ms`);
  });
}

// Test con archivo de 50MB
medirTiempo((callback) => {
  // Lectura completa
  fs.readFile('archivo-50mb.txt', (err, data) => {
    console.log('Archivo cargado en memoria');
    callback();
  });
}, 'Lectura completa');

medirTiempo((callback) => {
  // Stream
  const stream = fs.createReadStream('archivo-50mb.txt');
  stream.on('data', (chunk) => {
    // Procesar chunk inmediatamente
  });
  stream.on('end', callback);
}, 'Stream');

// Resultados típicos:
// Lectura completa: 2000ms (espera a cargar todo)
// Stream: 50ms (empieza inmediatamente)
Enter fullscreen mode Exit fullscreen mode

Cuestionario de Entrevista

Preguntas Fundamentales

1. ¿Qué es un Buffer en Node.js y cuándo lo usarías?

Respuesta:

  • Un Buffer es un espacio temporal en memoria para datos binarios
  • Node.js usa Buffers para manejar datos que no son texto (imágenes, archivos, datos de red)
  • Se usa cuando trabajas con: archivos binarios, operaciones criptográficas, protocolos de red, conversiones entre encodings

2. ¿Cuáles son los 4 tipos de Streams en Node.js?

Respuesta:

  • Readable: Para leer datos (ej: fs.createReadStream)
  • Writable: Para escribir datos (ej: fs.createWriteStream)
  • Duplex: Para leer y escribir (ej: net.Socket)
  • Transform: Para modificar datos mientras pasan (ej: zlib.createGzip)

3. ¿Qué es backpressure y por qué es importante?

Respuesta:

  • Backpressure ocurre cuando un stream produce datos más rápido de lo que se pueden consumir
  • Sin manejo adecuado causa fugas de memoria
  • pipe() maneja backpressure automáticamente
  • Manualmente se maneja con eventos drain y pausando/reanudando streams

4. ¿Cuál es la diferencia entre Flowing Mode y Paused Mode en Readable Streams?

Respuesta:

  • Paused Mode: Debes pedir datos manualmente con stream.read()
  • Flowing Mode: Los datos fluyen automáticamente vía evento data
  • Se cambia a Flowing Mode al agregar listener de data
  • Se puede pausar/reanudar con pause() y resume()

5. ¿Cuándo usarías Streams vs leer archivos completos?

Respuesta:

  • Streams: Archivos grandes, memoria limitada, procesamiento en tiempo real, operaciones de red
  • Lectura completa: Archivos pequeños, necesitas todos los datos a la vez, operaciones simples

Preguntas Avanzadas

6. ¿Cómo crearías un Transform Stream personalizado?

Respuesta:

const { Transform } = require('stream');

class MiTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const transformado = chunk.toString().toUpperCase();
    this.push(transformado);
    callback();
  }
}
Enter fullscreen mode Exit fullscreen mode

7. ¿Qué hace stream.pipeline() y por qué es mejor que pipe()?

Respuesta:

  • pipeline() maneja errores automáticamente en toda la cadena
  • Limpia recursos correctamente si hay errores
  • Proporciona callback único para éxito/error
  • Mejor para pipelines complejos

8. ¿Cómo manejarías un archivo de 10GB sin cargar todo en memoria?

Respuesta:

  • Usar fs.createReadStream() con chunks pequeños
  • Procesar chunk por chunk
  • Usar Transform streams para modificaciones
  • Implementar backpressure si es necesario

9. ¿Qué es highWaterMark en streams?

Respuesta:

  • Tamaño del buffer interno del stream
  • Controla cuántos datos se pueden almacenar antes de aplicar backpressure
  • Se puede configurar al crear el stream
  • Valor por defecto: 16KB para streams de datos, 16 para object mode

10. ¿Cómo implementarías un sistema de logs que rote archivos automáticamente?

Respuesta:

  • Usar Writable stream personalizado
  • Monitorear tamaño del archivo actual
  • Crear nuevo archivo cuando se alcance el límite
  • Cerrar stream anterior y abrir nuevo
  • Considerar fecha/hora para rotación temporal

Ejercicios Prácticos

Ejercicio 1: Crear un Transform stream que cuente líneas y palabras en un archivo de texto.

Ejercicio 2: Implementar un sistema de backup que comprima y encripte archivos usando streams.

Ejercicio 3: Crear un servidor HTTP que sirva archivos grandes usando streams con soporte para ranges (partial content).

Ejercicio 4: Implementar un procesador de CSV que convierta archivos grandes a JSON usando streams.

Ejercicio 5: Crear un sistema de monitoreo que procese logs en tiempo real y envíe alertas.


Top comments (0)