Tabla de Contenidos
- ¿Qué son los Buffers?
- Trabajando con Buffers
- ¿Qué son los Streams?
- Tipos de Streams
- Readable Streams
- Writable Streams
- Duplex y Transform Streams
- Piping y Backpressure
- Casos de Uso Reales
- Streams vs Lectura Completa
- Cuestionario de Entrevista
¿Qué son los Buffers?
Un Buffer es un espacio temporal en memoria para almacenar datos binarios. Es la forma en que Node.js maneja datos binarios puros (imágenes, archivos, datos de red, etc.).
Analogía: El Vaso de Agua
Imagina que estás llenando un vaso con agua:
- El vaso es el Buffer (espacio temporal en memoria)
- El agua son los datos binarios
- El tamaño del vaso es el tamaño del Buffer en bytes
¿Por Qué Buffers?
JavaScript originalmente no tenía forma de manejar datos binarios (fue diseñado para navegadores). Node.js agregó Buffers para trabajar con:
- Archivos
- Streams de datos
- Imágenes y videos
- Protocolos de red TCP
- Operaciones criptográficas
Comparación con Python y Ruby
Python:
# Python usa bytes
datos = b"Hola Mundo"
print(type(datos)) # <class 'bytes'>
print(datos[0]) # 72 (valor ASCII de 'H')
# Conversiones
texto = "Hola"
datos_binarios = texto.encode('utf-8')
texto_recuperado = datos_binarios.decode('utf-8')
Ruby:
# Ruby usa String con encoding
datos = "Hola".encode('UTF-8')
puts datos.bytes # [72, 111, 108, 97]
# String binario
binario = "\x48\x6F\x6C\x61" # "Hola" en hex
puts binario
Node.js:
// Node.js usa Buffer
const datos = Buffer.from('Hola Mundo');
console.log(datos);
// <Buffer 48 6f 6c 61 20 4d 75 6e 64 6f>
console.log(datos[0]); // 72 (valor ASCII de 'H')
console.log(datos.toString()); // 'Hola Mundo'
Trabajando con Buffers
Crear Buffers
// 1. Desde un string
const buf1 = Buffer.from('Hola');
console.log(buf1);
// <Buffer 48 6f 6c 61>
// 2. Desde un array de bytes
const buf2 = Buffer.from([72, 111, 108, 97]);
console.log(buf2.toString()); // 'Hola'
// 3. Buffer vacío de tamaño específico
const buf3 = Buffer.alloc(10); // 10 bytes, inicializado con ceros
console.log(buf3);
// <Buffer 00 00 00 00 00 00 00 00 00 00>
// 4. Buffer sin inicializar (más rápido pero inseguro)
const buf4 = Buffer.allocUnsafe(10);
console.log(buf4);
// <Buffer [contenido aleatorio de memoria]>
// IMPORTANTE: Debes llenarlo antes de usar
buf4.fill(0); // Llenar con ceros
⚠️ Advertencia sobre allocUnsafe
:
// allocUnsafe no limpia la memoria
const bufferInseguro = Buffer.allocUnsafe(10);
console.log(bufferInseguro);
// Puede contener datos sensibles de memoria previa
// (contraseñas, tokens, datos de otros procesos)
// SIEMPRE inicializa después
bufferInseguro.fill(0);
// O usa alloc() que es seguro por defecto
Encodings (Codificaciones)
Node.js soporta múltiples codificaciones:
const texto = 'Hola Mundo 🌎';
// UTF-8 (por defecto, soporta todos los caracteres Unicode)
const utf8 = Buffer.from(texto, 'utf8');
console.log(utf8);
// <Buffer 48 6f 6c 61 20 4d 75 6e 64 6f 20 f0 9f 8c 8e>
// ASCII (solo caracteres 0-127)
const ascii = Buffer.from('Hola', 'ascii');
console.log(ascii.toString('ascii')); // 'Hola'
// Base64 (para transferir datos binarios como texto)
const base64 = Buffer.from('Hola').toString('base64');
console.log(base64); // 'SG9sYQ=='
const decodedBase64 = Buffer.from(base64, 'base64');
console.log(decodedBase64.toString()); // 'Hola'
// Hexadecimal
const hex = Buffer.from('Hola').toString('hex');
console.log(hex); // '486f6c61'
const decodedHex = Buffer.from(hex, 'hex');
console.log(decodedHex.toString()); // 'Hola'
// Latin1 (ISO-8859-1)
const latin1 = Buffer.from('Hola', 'latin1');
// UTF-16LE
const utf16 = Buffer.from('Hola', 'utf16le');
Encodings soportados:
-
'utf8'
- Unicode (por defecto) -
'ascii'
- Solo caracteres ASCII -
'base64'
- Base64 -
'hex'
- Hexadecimal -
'latin1'
/'binary'
- ISO-8859-1 -
'utf16le'
/'ucs2'
- UTF-16 Little Endian
Operaciones con Buffers
Leer y Escribir
const buffer = Buffer.alloc(10);
// Escribir
buffer.write('Hola', 0, 'utf8');
console.log(buffer);
// <Buffer 48 6f 6c 61 00 00 00 00 00 00>
// Escribir en posición específica
buffer.write('Mundo', 5, 'utf8');
console.log(buffer.toString());
// 'HolaMundo\x00'
// Leer bytes individuales
console.log(buffer[0]); // 72 ('H')
console.log(buffer[5]); // 77 ('M')
// Modificar bytes
buffer[0] = 65; // 'A' en ASCII
console.log(buffer.toString()); // 'AolaMundo\x00'
Copiar Buffers
const buf1 = Buffer.from('Hola');
const buf2 = Buffer.alloc(4);
// Copiar de buf1 a buf2
buf1.copy(buf2);
console.log(buf2.toString()); // 'Hola'
// Copiar parcialmente
const buf3 = Buffer.from('Hola Mundo');
const buf4 = Buffer.alloc(4);
buf3.copy(
buf4, // destino
0, // posición destino
0, // posición inicio origen
4 // posición fin origen
);
console.log(buf4.toString()); // 'Hola'
Concatenar Buffers
const buf1 = Buffer.from('Hola ');
const buf2 = Buffer.from('Mundo');
const concatenado = Buffer.concat([buf1, buf2]);
console.log(concatenado.toString()); // 'Hola Mundo'
// Con múltiples buffers
const buf3 = Buffer.from('!');
const resultado = Buffer.concat([buf1, buf2, buf3]);
console.log(resultado.toString()); // 'Hola Mundo!'
// Especificar longitud total
const limitado = Buffer.concat([buf1, buf2], 8);
console.log(limitado.toString()); // 'Hola Mun'
Comparar Buffers
const buf1 = Buffer.from('ABC');
const buf2 = Buffer.from('ABC');
const buf3 = Buffer.from('ABD');
// Comparación
console.log(buf1.equals(buf2)); // true
console.log(buf1.equals(buf3)); // false
// Compare (retorna -1, 0, o 1)
console.log(buf1.compare(buf2)); // 0 (iguales)
console.log(buf1.compare(buf3)); // -1 (buf1 < buf3)
console.log(buf3.compare(buf1)); // 1 (buf3 > buf1)
// Ordenar arrays de buffers
const buffers = [
Buffer.from('C'),
Buffer.from('A'),
Buffer.from('B')
];
buffers.sort(Buffer.compare);
console.log(buffers.map(b => b.toString()));
// ['A', 'B', 'C']
Slicing (Cortar)
const buffer = Buffer.from('Hola Mundo');
// Obtener subcadena
const slice = buffer.slice(0, 4);
console.log(slice.toString()); // 'Hola'
// ⚠️ IMPORTANTE: slice() NO crea una copia
// Modifica la referencia original
slice[0] = 65; // Cambiar a 'A'
console.log(buffer.toString()); // 'Aola Mundo'
// Para hacer una copia real:
const copia = Buffer.from(buffer.slice(0, 4));
copia[0] = 66; // Cambiar a 'B'
console.log(buffer.toString()); // 'Aola Mundo' (sin cambios)
console.log(copia.toString()); // 'Bola'
Trabajar con JSON
const usuario = {
nombre: 'Juan',
edad: 30,
activo: true
};
// Convertir objeto a Buffer
const buffer = Buffer.from(JSON.stringify(usuario));
console.log(buffer);
// <Buffer 7b 22 6e 6f 6d 62 72 65 22 3a 22 4a 75 61 6e ...>
// Convertir Buffer a objeto
const objetoRecuperado = JSON.parse(buffer.toString());
console.log(objetoRecuperado);
// { nombre: 'Juan', edad: 30, activo: true }
// Usar con toJSON()
const buf = Buffer.from('Hola');
console.log(JSON.stringify(buf));
// {"type":"Buffer","data":[72,111,108,97]}
Casos de Uso Comunes de Buffers
const fs = require('fs');
const crypto = require('crypto');
// 1. Leer archivo como Buffer
const contenido = fs.readFileSync('imagen.png');
console.log(contenido instanceof Buffer); // true
// 2. Operaciones criptográficas
const hash = crypto
.createHash('sha256')
.update('mi contraseña')
.digest(); // Retorna un Buffer
console.log(hash.toString('hex'));
// 3. Trabajar con datos binarios
const imagen = Buffer.from([0xFF, 0xD8, 0xFF, 0xE0]); // JPEG header
console.log(imagen);
// 4. Convertir entre encodings
const base64Data = 'SGVsbG8gV29ybGQ=';
const buffer = Buffer.from(base64Data, 'base64');
const texto = buffer.toString('utf8');
console.log(texto); // 'Hello World'
¿Qué son los Streams?
Los Streams son colecciones de datos (como arrays o strings) con la diferencia de que no necesitan estar disponibles todos al mismo tiempo y no tienen que caber en memoria.
Analogía: Manguera vs Cubeta
Sin Streams (Leyendo todo):
Tienes sed y hay un río →
Llenas un camión cisterna completo →
Esperas que se llene todo →
Recién entonces puedes beber
❌ Lento
❌ Requiere mucha memoria
❌ No puedes empezar hasta tener todo
Con Streams:
Tienes sed y hay un río →
Usas una manguera/pajilla →
Bebes mientras fluye el agua
✅ Inmediato
✅ Memoria constante
✅ Puedes empezar de inmediato
¿Por Qué Usar Streams?
1. Eficiencia de Memoria
const fs = require('fs');
// ❌ SIN STREAMS - Carga todo en memoria
const datos = fs.readFileSync('archivo-5gb.txt');
// Si el archivo es de 5GB, necesitas 5GB de RAM
console.log(datos.toString());
// ✅ CON STREAMS - Memoria constante
const stream = fs.createReadStream('archivo-5gb.txt');
stream.on('data', (chunk) => {
console.log(`Recibido chunk de ${chunk.length} bytes`);
// Procesa chunk (ej: 64KB) y libera memoria
});
// Solo necesitas ~64KB de RAM a la vez
2. Eficiencia de Tiempo (Time Efficiency)
const fs = require('fs');
const http = require('http');
// ❌ SIN STREAMS - El usuario espera hasta que todo esté listo
http.createServer((req, res) => {
fs.readFile('pelicula-1gb.mp4', (err, data) => {
res.end(data); // Espera cargar todo antes de enviar
});
}).listen(8000);
// ✅ CON STREAMS - El usuario empieza a recibir inmediatamente
http.createServer((req, res) => {
const stream = fs.createReadStream('pelicula-1gb.mp4');
stream.pipe(res); // Envía mientras lee
}).listen(8000);
Comparación con Python
Python (sin streams):
# Leer archivo completo
with open('archivo.txt', 'r') as f:
contenido = f.read() # Todo en memoria
print(contenido)
Python (con generadores - similar a streams):
def leer_lineas(archivo):
with open(archivo) as f:
for linea in f: # Lee línea por línea
yield linea
for linea in leer_lineas('archivo.txt'):
print(linea)
Node.js (con streams):
const fs = require('fs');
const stream = fs.createReadStream('archivo.txt');
stream.on('data', (chunk) => {
console.log(chunk.toString());
});
Tipos de Streams
Node.js tiene 4 tipos fundamentales de streams:
┌──────────────────────────────────────────────┐
│ │
│ 1. Readable → Puedes LEER datos │
│ (origen de datos) │
│ │
│ 2. Writable → Puedes ESCRIBIR datos │
│ (destino de datos) │
│ │
│ 3. Duplex → Puedes LEER y ESCRIBIR │
│ (ambas direcciones) │
│ │
│ 4. Transform → Modifica datos al pasar │
│ (procesa mientras transmite) │
│ │
└──────────────────────────────────────────────┘
Ejemplos de Cada Tipo
const fs = require('fs');
const net = require('net');
const crypto = require('crypto');
const { Transform } = require('stream');
// 1. READABLE - fs.createReadStream()
const readable = fs.createReadStream('entrada.txt');
// 2. WRITABLE - fs.createWriteStream()
const writable = fs.createWriteStream('salida.txt');
// 3. DUPLEX - net.Socket (conexión TCP)
const socket = net.connect(8000);
socket.write('datos'); // Writable
socket.on('data', console.log); // Readable
// 4. TRANSFORM - crypto.createCipheriv()
const cipher = crypto.createCipheriv('aes256', key, iv);
// Lee datos, los encripta, los escribe
Tabla de Streams Comunes
Tipo | Ejemplos | Descripción |
---|---|---|
Readable | fs.createReadStream() |
Leer archivos |
http.IncomingMessage |
Request del servidor | |
process.stdin |
Entrada estándar | |
zlib.createGunzip() |
Descomprimir | |
Writable | fs.createWriteStream() |
Escribir archivos |
http.ServerResponse |
Response del servidor | |
process.stdout |
Salida estándar | |
process.stderr |
Salida de error | |
Duplex | net.Socket |
TCP sockets |
tls.TLSSocket |
TLS/SSL sockets | |
Transform | zlib.createGzip() |
Comprimir |
crypto.createCipher() |
Encriptar | |
stream.Transform |
Custom transforms |
Readable Streams
Los Readable Streams te permiten leer datos de una fuente.
Modos de Lectura
Readable streams operan en dos modos:
- Flowing Mode (Flujo): Los datos fluyen automáticamente
- Paused Mode (Pausado): Debes pedir datos manualmente
const fs = require('fs');
// PAUSED MODE (por defecto)
const stream = fs.createReadStream('archivo.txt');
stream.on('readable', () => {
let chunk;
// Leer manualmente
while ((chunk = stream.read()) !== null) {
console.log(`Recibido: ${chunk.length} bytes`);
}
});
// FLOWING MODE
const stream2 = fs.createReadStream('archivo.txt');
stream2.on('data', (chunk) => {
// Datos fluyen automáticamente
console.log(`Recibido: ${chunk.length} bytes`);
});
Eventos de Readable Streams
const fs = require('fs');
const stream = fs.createReadStream('archivo.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024 // Tamaño del chunk: 16KB
});
// 1. 'data' - Nuevo chunk disponible (flowing mode)
stream.on('data', (chunk) => {
console.log('Datos recibidos:', chunk.length, 'bytes');
});
// 2. 'end' - No hay más datos
stream.on('end', () => {
console.log('Lectura completada');
});
// 3. 'error' - Error durante la lectura
stream.on('error', (error) => {
console.error('Error:', error);
});
// 4. 'close' - Stream cerrado
stream.on('close', () => {
console.log('Stream cerrado');
});
// 5. 'readable' - Datos disponibles para leer (paused mode)
stream.on('readable', () => {
console.log('Hay datos disponibles');
});
Crear Readable Stream Personalizado
const { Readable } = require('stream');
// Ejemplo 1: Stream simple desde array
class ArrayStream extends Readable {
constructor(array) {
super();
this.array = array;
this.index = 0;
}
_read() {
if (this.index < this.array.length) {
const chunk = this.array[this.index];
this.push(chunk); // Enviar datos
this.index++;
} else {
this.push(null); // Señal de fin
}
}
}
const stream = new ArrayStream(['Hola', ' ', 'Mundo', '\n']);
stream.on('data', (chunk) => {
console.log('Recibido:', chunk);
});
stream.on('end', () => {
console.log('Fin del stream');
});
// Salida:
// Recibido: Hola
// Recibido:
// Recibido: Mundo
// Recibido: \n
// Fin del stream
// Ejemplo 2: Stream de números
class ContadorStream extends Readable {
constructor(limite) {
super({ objectMode: true }); // Para objetos en vez de buffers
this.limite = limite;
this.contador = 1;
}
_read() {
if (this.contador <= this.limite) {
this.push({ numero: this.contador });
this.contador++;
} else {
this.push(null);
}
}
}
const contador = new ContadorStream(5);
contador.on('data', (obj) => {
console.log('Número:', obj.numero);
});
// Salida:
// Número: 1
// Número: 2
// Número: 3
// Número: 4
// Número: 5
Control de Flujo
const fs = require('fs');
const stream = fs.createReadStream('archivo-grande.txt');
stream.on('data', (chunk) => {
console.log('Procesando chunk...');
// Pausar el stream si necesitas tiempo para procesar
stream.pause();
// Simular procesamiento pesado
setTimeout(() => {
console.log('Chunk procesado, continuando...');
stream.resume(); // Reanudar
}, 1000);
});
stream.on('end', () => {
console.log('Todos los chunks procesados');
});
Writable Streams
Los Writable Streams te permiten escribir datos a un destino.
Escribir en Streams
const fs = require('fs');
const stream = fs.createWriteStream('salida.txt');
// Escribir datos
stream.write('Primera línea\n');
stream.write('Segunda línea\n');
stream.write('Tercera línea\n');
// Finalizar el stream
stream.end('Última línea\n');
// O simplemente
// stream.end();
Eventos de Writable Streams
const fs = require('fs');
const stream = fs.createWriteStream('salida.txt');
// 1. 'drain' - El buffer interno está listo para más datos
stream.on('drain', () => {
console.log('Buffer liberado, puedo escribir más');
});
// 2. 'finish' - Todos los datos fueron escritos
stream.on('finish', () => {
console.log('Escritura completada');
});
// 3. 'error' - Error durante la escritura
stream.on('error', (error) => {
console.error('Error:', error);
});
// 4. 'close' - Stream cerrado
stream.on('close', () => {
console.log('Stream cerrado');
});
// 5. 'pipe' - Readable stream conectado vía pipe
stream.on('pipe', (src) => {
console.log('Algo se conectó vía pipe');
});
// 6. 'unpipe' - Readable stream desconectado
stream.on('unpipe', (src) => {
console.log('Algo se desconectó');
});
Backpressure (Contrapresión)
Uno de los conceptos más importantes en streams.
const fs = require('fs');
const stream = fs.createWriteStream('salida.txt');
// write() retorna false cuando el buffer está lleno
function escribirMillon() {
let i = 1000000;
function escribir() {
let ok = true;
while (i > 0 && ok) {
i--;
const datos = `Línea ${i}\n`;
// write() retorna false si el buffer está lleno
ok = stream.write(datos);
if (!ok) {
console.log('Buffer lleno, esperando drain...');
}
}
if (i > 0) {
// Esperar evento 'drain' antes de continuar
stream.once('drain', escribir);
} else {
stream.end();
}
}
escribir();
}
escribirMillon();
stream.on('finish', () => {
console.log('Escritura completada');
});
¿Qué es Backpressure?
- El stream tiene un buffer interno limitado
- Si escribes más rápido de lo que puede procesar, el buffer se llena
-
write()
retornafalse
cuando el buffer está lleno - Debes esperar el evento
'drain'
antes de continuar - Si ignoras esto, causas fugas de memoria
Crear Writable Stream Personalizado
const { Writable } = require('stream');
// Stream que convierte a mayúsculas y escribe en consola
class UpperCaseStream extends Writable {
_write(chunk, encoding, callback) {
try {
const texto = chunk.toString().toUpperCase();
console.log(texto);
callback(); // Indicar éxito
} catch (error) {
callback(error); // Indicar error
}
}
}
const stream = new UpperCaseStream();
stream.write('hola ');
stream.write('mundo');
stream.end();
// Salida:
// HOLA
// MUNDO
// Stream que acumula datos y los procesa al final
class AcumuladorStream extends Writable {
constructor(options) {
super(options);
this.datos = [];
}
_write(chunk, encoding, callback) {
this.datos.push(chunk);
callback();
}
_final(callback) {
// Llamado cuando stream.end()
const total = Buffer.concat(this.datos);
console.log('Total acumulado:', total.toString());
callback();
}
}
const acumulador = new AcumuladorStream();
acumulador.write('Parte 1 ');
acumulador.write('Parte 2 ');
acumulador.write('Parte 3');
acumulador.end();
// Salida (al final):
// Total acumulado: Parte 1 Parte 2 Parte 3
Duplex y Transform Streams
Duplex Streams
Duplex streams son tanto Readable como Writable.
const { Duplex } = require('stream');
class EchoStream extends Duplex {
constructor(options) {
super(options);
this.datos = [];
}
// Lado Writable
_write(chunk, encoding, callback) {
this.datos.push(chunk);
callback();
}
// Lado Readable
_read() {
if (this.datos.length > 0) {
const chunk = this.datos.shift();
this.push(chunk);
}
}
}
const echo = new EchoStream();
// Escribir datos
echo.write('Hola ');
echo.write('Mundo');
echo.end();
// Leer datos
echo.on('data', (chunk) => {
console.log('Eco:', chunk.toString());
});
// Salida:
// Eco: Hola
// Eco: Mundo
Transform Streams
Transform streams son Duplex streams que modifican datos mientras pasan.
const { Transform } = require('stream');
// Stream que convierte a mayúsculas
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// Transformar el chunk
const transformado = chunk.toString().toUpperCase();
// Enviar el dato transformado
this.push(transformado);
callback();
}
}
const upper = new UppercaseTransform();
upper.on('data', (chunk) => {
console.log(chunk.toString());
});
upper.write('hola ');
upper.write('mundo');
upper.end();
// Salida:
// HOLA
// MUNDO
Transform Streams Comunes
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
// 1. COMPRESIÓN
const gzip = zlib.createGzip();
fs.createReadStream('entrada.txt')
.pipe(gzip)
.pipe(fs.createWriteStream('entrada.txt.gz'));
// 2. DESCOMPRESIÓN
const gunzip = zlib.createGunzip();
fs.createReadStream('archivo.txt.gz')
.pipe(gunzip)
.pipe(fs.createWriteStream('archivo.txt'));
// 3. ENCRIPTACIÓN
const algorithm = 'aes-256-cbc';
const key = crypto.randomBytes(32);
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(algorithm, key, iv);
fs.createReadStream('secreto.txt')
.pipe(cipher)
.pipe(fs.createWriteStream('secreto.txt.enc'));
// 4. DESENCRIPTACIÓN
const decipher = crypto.createDecipheriv(algorithm, key, iv);
fs.createReadStream('secreto.txt.enc')
.pipe(decipher)
.pipe(fs.createWriteStream('secreto-recuperado.txt'));
Ejemplo Avanzado: Procesador de CSV
const { Transform } = require('stream');
const fs = require('fs');
// Transform que procesa líneas CSV
class CSVParser extends Transform {
constructor(options) {
super({ objectMode: true });
this.headers = null;
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
// Mantener la última línea incompleta en el buffer
this.buffer = lines.pop();
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',');
if (!this.headers) {
this.headers = values;
continue;
}
const obj = {};
for (let i = 0; i < this.headers.length; i++) {
obj[this.headers[i]] = values[i];
}
this.push(obj);
}
callback();
}
_flush(callback) {
// Procesar la última línea
if (this.buffer.trim()) {
const values = this.buffer.split(',');
const obj = {};
for (let i = 0; i < this.headers.length; i++) {
obj[this.headers[i]] = values[i];
}
this.push(obj);
}
callback();
}
}
// Uso del CSV Parser
const csvParser = new CSVParser();
fs.createReadStream('usuarios.csv')
.pipe(csvParser)
.on('data', (usuario) => {
console.log('Usuario:', usuario);
// { nombre: 'Juan', edad: '30', email: 'juan@example.com' }
})
.on('end', () => {
console.log('CSV procesado completamente');
});
Piping y Backpressure
¿Qué es Piping?
Piping es conectar la salida de un Readable stream con la entrada de un Writable stream.
const fs = require('fs');
// Copiar archivo usando streams
fs.createReadStream('entrada.txt')
.pipe(fs.createWriteStream('salida.txt'));
Analogía con Unix:
# En Unix/Linux
cat archivo.txt | grep "buscar" | sort > resultado.txt
# En Node.js
fs.createReadStream('archivo.txt')
.pipe(grepStream)
.pipe(sortStream)
.pipe(fs.createWriteStream('resultado.txt'));
Pipeline Complejo
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
// Pipeline: Leer → Encriptar → Comprimir → Escribir
fs.createReadStream('secreto.txt')
.pipe(crypto.createCipheriv('aes256', key, iv))
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('secreto.txt.enc.gz'))
.on('finish', () => {
console.log('Archivo procesado completamente');
})
.on('error', (err) => {
console.error('Error en el pipeline:', err);
});
stream.pipeline() - Manejo de Errores Mejorado
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('entrada.txt'),
zlib.createGzip(),
fs.createWriteStream('salida.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline falló:', err);
} else {
console.log('Pipeline completado exitosamente');
}
}
);
// Con Promises (Node.js 15+)
const { pipeline } = require('stream/promises');
try {
await pipeline(
fs.createReadStream('entrada.txt'),
zlib.createGzip(),
fs.createWriteStream('salida.txt.gz')
);
console.log('Pipeline completado');
} catch (error) {
console.error('Pipeline falló:', error);
}
¿Qué es Backpressure?
Backpressure ocurre cuando un stream produce datos más rápido de lo que el siguiente stream puede consumirlos.
const fs = require('fs');
const readable = fs.createReadStream('archivo-grande.txt');
const writable = fs.createWriteStream('copia.txt');
readable.on('data', (chunk) => {
const ok = writable.write(chunk);
if (!ok) {
// El writable está sobrecargado
console.log('Backpressure detectado, pausando lectura...');
readable.pause();
// Esperar a que se libere
writable.once('drain', () => {
console.log('Buffer liberado, reanudando lectura...');
readable.resume();
});
}
});
¿Por qué es importante?
- Sin manejo de backpressure: fuga de memoria
- Los datos se acumulan en memoria sin límite
- La aplicación puede crashear por falta de memoria
Pipe Maneja Backpressure Automáticamente
// ✅ CORRECTO - pipe() maneja backpressure automáticamente
fs.createReadStream('archivo-grande.txt')
.pipe(fs.createWriteStream('copia.txt'));
// ❌ INCORRECTO - Sin manejo de backpressure
const readable = fs.createReadStream('archivo-grande.txt');
const writable = fs.createWriteStream('copia.txt');
readable.on('data', (chunk) => {
writable.write(chunk); // Puede causar fuga de memoria
});
Transform Stream con Backpressure
const { Transform } = require('stream');
class SlowProcessor extends Transform {
_transform(chunk, encoding, callback) {
// Simular procesamiento lento
setTimeout(() => {
const processed = chunk.toString().toUpperCase();
this.push(processed);
callback();
}, 100); // 100ms por chunk
}
}
const processor = new SlowProcessor();
fs.createReadStream('input.txt')
.pipe(processor) // Automáticamente maneja backpressure
.pipe(fs.createWriteStream('output.txt'));
Casos de Uso Reales
1. Servidor de Descarga de Archivos
const http = require('http');
const fs = require('fs');
const path = require('path');
const server = http.createServer((req, res) => {
if (req.method === 'GET' && req.url.startsWith('/download/')) {
const filename = path.basename(req.url);
const filepath = path.join(__dirname, 'files', filename);
// Verificar que el archivo existe
fs.access(filepath, fs.constants.F_OK, (err) => {
if (err) {
res.statusCode = 404;
res.end('Archivo no encontrado');
return;
}
// Obtener información del archivo
fs.stat(filepath, (err, stats) => {
if (err) {
res.statusCode = 500;
res.end('Error del servidor');
return;
}
// Headers de respuesta
res.setHeader('Content-Type', 'application/octet-stream');
res.setHeader('Content-Length', stats.size);
res.setHeader('Content-Disposition', `attachment; filename="${filename}"`);
// Stream el archivo directamente al cliente
const fileStream = fs.createReadStream(filepath);
fileStream.on('error', (err) => {
console.error('Error leyendo archivo:', err);
res.statusCode = 500;
res.end('Error leyendo archivo');
});
// Pipe automáticamente maneja backpressure
fileStream.pipe(res);
});
});
} else {
res.statusCode = 404;
res.end('Not Found');
}
});
server.listen(3000, () => {
console.log('Servidor de descarga en puerto 3000');
});
2. Procesador de Logs en Tiempo Real
const fs = require('fs');
const { Transform } = require('stream');
// Transform para parsear logs de Apache/Nginx
class LogParser extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!line.trim()) continue;
// Parsear línea de log (formato común de Apache)
const match = line.match(/^(\S+) \S+ \S+ \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+|-)/);
if (match) {
const logEntry = {
ip: match[1],
timestamp: new Date(match[2]),
method: match[3],
url: match[4],
protocol: match[5],
status: parseInt(match[6]),
size: match[7] === '-' ? 0 : parseInt(match[7])
};
this.push(logEntry);
}
}
callback();
}
}
// Transform para filtrar errores
class ErrorFilter extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(logEntry, encoding, callback) {
if (logEntry.status >= 400) {
this.push(logEntry);
}
callback();
}
}
// Procesar logs en tiempo real
const logParser = new LogParser();
const errorFilter = new ErrorFilter();
fs.createReadStream('/var/log/apache2/access.log')
.pipe(logParser)
.pipe(errorFilter)
.on('data', (errorLog) => {
console.log(`ERROR ${errorLog.status}: ${errorLog.method} ${errorLog.url} desde ${errorLog.ip}`);
// Enviar alerta si es error crítico
if (errorLog.status >= 500) {
console.log('🚨 ALERTA: Error del servidor detectado!');
}
});
3. Compresión de Imágenes en Lote
const fs = require('fs');
const path = require('path');
const { Transform, pipeline } = require('stream');
const sharp = require('sharp'); // npm install sharp
class ImageProcessor extends Transform {
constructor(options = {}) {
super({ objectMode: true });
this.quality = options.quality || 80;
this.width = options.width || 800;
}
_transform(file, encoding, callback) {
const inputPath = file.path;
const outputPath = path.join(
path.dirname(inputPath),
'compressed',
path.basename(inputPath)
);
// Crear directorio si no existe
fs.mkdirSync(path.dirname(outputPath), { recursive: true });
sharp(inputPath)
.resize(this.width)
.jpeg({ quality: this.quality })
.toFile(outputPath)
.then(() => {
console.log(`Procesada: ${path.basename(inputPath)}`);
this.push({ input: inputPath, output: outputPath });
callback();
})
.catch(callback);
}
}
// Stream que lee archivos de un directorio
class DirectoryReader extends require('stream').Readable {
constructor(directory) {
super({ objectMode: true });
this.directory = directory;
this.files = [];
this.index = 0;
// Leer archivos de imagen
const files = fs.readdirSync(directory);
this.files = files
.filter(file => /\.(jpg|jpeg|png)$/i.test(file))
.map(file => ({ path: path.join(directory, file) }));
}
_read() {
if (this.index < this.files.length) {
this.push(this.files[this.index]);
this.index++;
} else {
this.push(null); // Fin del stream
}
}
}
// Usar el procesador
const imageDir = './images';
const reader = new DirectoryReader(imageDir);
const processor = new ImageProcessor({ quality: 70, width: 600 });
reader
.pipe(processor)
.on('data', (result) => {
console.log(`✅ Imagen comprimida: ${result.output}`);
})
.on('end', () => {
console.log('🎉 Todas las imágenes procesadas!');
})
.on('error', (err) => {
console.error('❌ Error procesando imágenes:', err);
});
4. API de Upload con Streams
const express = require('express');
const multer = require('multer');
const fs = require('fs');
const crypto = require('crypto');
const { pipeline } = require('stream');
const app = express();
// Configurar multer para manejar uploads
const upload = multer({ dest: 'temp/' });
app.post('/upload', upload.single('file'), async (req, res) => {
try {
const tempPath = req.file.path;
const hash = crypto.createHash('sha256');
const finalPath = `uploads/${Date.now()}-${req.file.originalname}`;
// Pipeline: Leer temp → Calcular hash → Escribir final
await pipeline(
fs.createReadStream(tempPath),
hash,
fs.createWriteStream(finalPath)
);
// Limpiar archivo temporal
fs.unlinkSync(tempPath);
res.json({
message: 'Archivo subido exitosamente',
filename: path.basename(finalPath),
hash: hash.digest('hex'),
size: req.file.size
});
} catch (error) {
console.error('Error en upload:', error);
res.status(500).json({ error: 'Error subiendo archivo' });
}
});
app.listen(3000, () => {
console.log('Servidor de upload en puerto 3000');
});
Streams vs Lectura Completa
Comparación de Memoria
const fs = require('fs');
// ❌ LECTURA COMPLETA - Usa mucha memoria
function procesarArchivoCompleto(filename) {
console.log('Memoria antes:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');
const data = fs.readFileSync(filename);
const lines = data.toString().split('\n');
let count = 0;
for (const line of lines) {
if (line.includes('ERROR')) count++;
}
console.log('Memoria después:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');
console.log('Errores encontrados:', count);
}
// ✅ STREAMS - Memoria constante
function procesarArchivoStream(filename) {
console.log('Memoria antes:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');
let count = 0;
let buffer = '';
const stream = fs.createReadStream(filename, { encoding: 'utf8' });
stream.on('data', (chunk) => {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop(); // Mantener línea incompleta
for (const line of lines) {
if (line.includes('ERROR')) count++;
}
console.log('Memoria durante:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');
});
stream.on('end', () => {
// Procesar última línea
if (buffer && buffer.includes('ERROR')) count++;
console.log('Memoria final:', process.memoryUsage().heapUsed / 1024 / 1024, 'MB');
console.log('Errores encontrados:', count);
});
}
// Probar con archivo de 100MB
// procesarArchivoCompleto('log-grande.txt'); // ~100MB en memoria
// procesarArchivoStream('log-grande.txt'); // ~2MB en memoria
Comparación de Tiempo
const fs = require('fs');
// Medir tiempo de procesamiento
function medirTiempo(fn, nombre) {
const inicio = Date.now();
fn(() => {
const fin = Date.now();
console.log(`${nombre}: ${fin - inicio}ms`);
});
}
// Test con archivo de 50MB
medirTiempo((callback) => {
// Lectura completa
fs.readFile('archivo-50mb.txt', (err, data) => {
console.log('Archivo cargado en memoria');
callback();
});
}, 'Lectura completa');
medirTiempo((callback) => {
// Stream
const stream = fs.createReadStream('archivo-50mb.txt');
stream.on('data', (chunk) => {
// Procesar chunk inmediatamente
});
stream.on('end', callback);
}, 'Stream');
// Resultados típicos:
// Lectura completa: 2000ms (espera a cargar todo)
// Stream: 50ms (empieza inmediatamente)
Cuestionario de Entrevista
Preguntas Fundamentales
1. ¿Qué es un Buffer en Node.js y cuándo lo usarías?
Respuesta:
- Un Buffer es un espacio temporal en memoria para datos binarios
- Node.js usa Buffers para manejar datos que no son texto (imágenes, archivos, datos de red)
- Se usa cuando trabajas con: archivos binarios, operaciones criptográficas, protocolos de red, conversiones entre encodings
2. ¿Cuáles son los 4 tipos de Streams en Node.js?
Respuesta:
- Readable: Para leer datos (ej: fs.createReadStream)
- Writable: Para escribir datos (ej: fs.createWriteStream)
- Duplex: Para leer y escribir (ej: net.Socket)
- Transform: Para modificar datos mientras pasan (ej: zlib.createGzip)
3. ¿Qué es backpressure y por qué es importante?
Respuesta:
- Backpressure ocurre cuando un stream produce datos más rápido de lo que se pueden consumir
- Sin manejo adecuado causa fugas de memoria
-
pipe()
maneja backpressure automáticamente - Manualmente se maneja con eventos
drain
y pausando/reanudando streams
4. ¿Cuál es la diferencia entre Flowing Mode y Paused Mode en Readable Streams?
Respuesta:
-
Paused Mode: Debes pedir datos manualmente con
stream.read()
-
Flowing Mode: Los datos fluyen automáticamente vía evento
data
- Se cambia a Flowing Mode al agregar listener de
data
- Se puede pausar/reanudar con
pause()
yresume()
5. ¿Cuándo usarías Streams vs leer archivos completos?
Respuesta:
- Streams: Archivos grandes, memoria limitada, procesamiento en tiempo real, operaciones de red
- Lectura completa: Archivos pequeños, necesitas todos los datos a la vez, operaciones simples
Preguntas Avanzadas
6. ¿Cómo crearías un Transform Stream personalizado?
Respuesta:
const { Transform } = require('stream');
class MiTransform extends Transform {
_transform(chunk, encoding, callback) {
const transformado = chunk.toString().toUpperCase();
this.push(transformado);
callback();
}
}
7. ¿Qué hace stream.pipeline()
y por qué es mejor que pipe()
?
Respuesta:
-
pipeline()
maneja errores automáticamente en toda la cadena - Limpia recursos correctamente si hay errores
- Proporciona callback único para éxito/error
- Mejor para pipelines complejos
8. ¿Cómo manejarías un archivo de 10GB sin cargar todo en memoria?
Respuesta:
- Usar
fs.createReadStream()
con chunks pequeños - Procesar chunk por chunk
- Usar Transform streams para modificaciones
- Implementar backpressure si es necesario
9. ¿Qué es highWaterMark
en streams?
Respuesta:
- Tamaño del buffer interno del stream
- Controla cuántos datos se pueden almacenar antes de aplicar backpressure
- Se puede configurar al crear el stream
- Valor por defecto: 16KB para streams de datos, 16 para object mode
10. ¿Cómo implementarías un sistema de logs que rote archivos automáticamente?
Respuesta:
- Usar Writable stream personalizado
- Monitorear tamaño del archivo actual
- Crear nuevo archivo cuando se alcance el límite
- Cerrar stream anterior y abrir nuevo
- Considerar fecha/hora para rotación temporal
Ejercicios Prácticos
Ejercicio 1: Crear un Transform stream que cuente líneas y palabras en un archivo de texto.
Ejercicio 2: Implementar un sistema de backup que comprima y encripte archivos usando streams.
Ejercicio 3: Crear un servidor HTTP que sirva archivos grandes usando streams con soporte para ranges (partial content).
Ejercicio 4: Implementar un procesador de CSV que convierta archivos grandes a JSON usando streams.
Ejercicio 5: Crear un sistema de monitoreo que procese logs en tiempo real y envíe alertas.
Top comments (0)