Un cliente de logística me pidió convertir su dashboard de flota de pooling cada 10 segundos a tiempo real. Tenían 40 operadores mirando 800 vehículos simultáneos. Su factura de API Gateway REST estaba en 1200 USD al mes solo por el polling. Migré a WebSockets con API Gateway WS y DynamoDB Streams. Factura bajó a 180 USD y los updates llegan en menos de 200ms.
Comparto la arquitectura completa.
Arquitectura
flowchart LR
V[Vehicles<br/>GPS pings] --> IoT[AWS IoT Core]
IoT --> DDB[(DynamoDB<br/>Vehicle state)]
DDB --> Stream[DynamoDB Stream]
Stream --> Fanout[Lambda Fanout]
Fanout --> APIGW[API Gateway<br/>WebSocket API]
APIGW --> Browsers[React Dashboards]
Fanout --> Conns[(DynamoDB<br/>Connections)]
El flujo tiene dos partes. Los vehículos envían GPS a IoT Core, IoT escribe en DynamoDB. Cuando cambia un registro, DynamoDB Stream dispara una Lambda que decide a quién notificar. Esa Lambda mira una tabla de connections (quién está conectado, qué filtros tiene) y manda el update por WebSocket.
Setup del WebSocket API
No hay construct de alto nivel en CDK para WebSocket API. Hay que usar los L1:
// cdk/websocket-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as apigwv2 from 'aws-cdk-lib/aws-apigatewayv2';
import * as integrations from 'aws-cdk-lib/aws-apigatewayv2-integrations';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import { Construct } from 'constructs';
export class WebsocketStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
const connectionsTable = new dynamodb.Table(this, 'Connections', {
partitionKey: { name: 'connectionId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
timeToLiveAttribute: 'ttl',
});
connectionsTable.addGlobalSecondaryIndex({
indexName: 'userId-index',
partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },
});
const vehiclesTable = new dynamodb.Table(this, 'Vehicles', {
partitionKey: { name: 'vehicleId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
});
const connectFn = new lambda.Function(this, 'ConnectFn', {
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'connect.handler',
code: lambda.Code.fromAsset('../lambdas/connect'),
environment: {
CONNECTIONS_TABLE: connectionsTable.tableName,
},
});
connectionsTable.grantWriteData(connectFn);
const disconnectFn = new lambda.Function(this, 'DisconnectFn', {
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'disconnect.handler',
code: lambda.Code.fromAsset('../lambdas/disconnect'),
environment: {
CONNECTIONS_TABLE: connectionsTable.tableName,
},
});
connectionsTable.grantWriteData(disconnectFn);
const subscribeFn = new lambda.Function(this, 'SubscribeFn', {
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'subscribe.handler',
code: lambda.Code.fromAsset('../lambdas/subscribe'),
environment: {
CONNECTIONS_TABLE: connectionsTable.tableName,
},
});
connectionsTable.grantReadWriteData(subscribeFn);
const wsApi = new apigwv2.WebSocketApi(this, 'FleetWsApi', {
connectRouteOptions: {
integration: new integrations.WebSocketLambdaIntegration(
'ConnectIntegration',
connectFn
),
},
disconnectRouteOptions: {
integration: new integrations.WebSocketLambdaIntegration(
'DisconnectIntegration',
disconnectFn
),
},
});
wsApi.addRoute('subscribe', {
integration: new integrations.WebSocketLambdaIntegration(
'SubscribeIntegration',
subscribeFn
),
});
const wsStage = new apigwv2.WebSocketStage(this, 'FleetStage', {
webSocketApi: wsApi,
stageName: 'prod',
autoDeploy: true,
});
const fanoutFn = new lambda.Function(this, 'FanoutFn', {
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'fanout.handler',
code: lambda.Code.fromAsset('../lambdas/fanout'),
environment: {
CONNECTIONS_TABLE: connectionsTable.tableName,
WS_ENDPOINT: `${wsApi.apiEndpoint}/${wsStage.stageName}`,
},
timeout: cdk.Duration.seconds(30),
});
connectionsTable.grantReadData(fanoutFn);
wsApi.grantManageConnections(fanoutFn);
fanoutFn.addEventSource(
new cdk.aws_lambda_event_sources.DynamoEventSource(vehiclesTable, {
startingPosition: lambda.StartingPosition.LATEST,
batchSize: 100,
retryAttempts: 3,
})
);
}
}
Lambda de conexión
// lambdas/connect/connect.ts
import { APIGatewayProxyWebsocketEventV2 } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, PutCommand } from '@aws-sdk/lib-dynamodb';
import jwt from 'jsonwebtoken';
import jwksClient from 'jwks-rsa';
const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const jwks = jwksClient({
jwksUri: `https://cognito-idp.us-east-1.amazonaws.com/${process.env.USER_POOL_ID}/.well-known/jwks.json`,
});
function getKey(header: jwt.JwtHeader, callback: jwt.SigningKeyCallback) {
jwks.getSigningKey(header.kid, (err, key) => {
if (err) return callback(err);
callback(null, key!.getPublicKey());
});
}
function verifyToken(token: string): Promise<jwt.JwtPayload> {
return new Promise((resolve, reject) => {
jwt.verify(token, getKey, { algorithms: ['RS256'] }, (err, decoded) => {
if (err) reject(err);
else resolve(decoded as jwt.JwtPayload);
});
});
}
export const handler = async (event: APIGatewayProxyWebsocketEventV2) => {
const connectionId = event.requestContext.connectionId!;
const token = event.queryStringParameters?.token;
if (!token) {
return { statusCode: 401, body: 'Unauthorized' };
}
try {
const payload = await verifyToken(token);
const userId = payload.sub as string;
const tenantId = payload['custom:tenantId'] as string;
await ddb.send(
new PutCommand({
TableName: process.env.CONNECTIONS_TABLE!,
Item: {
connectionId,
userId,
tenantId,
subscribedVehicles: [],
connectedAt: Date.now(),
ttl: Math.floor(Date.now() / 1000) + 7200,
},
})
);
return { statusCode: 200, body: 'Connected' };
} catch (err) {
console.error('Auth failed:', err);
return { statusCode: 401, body: 'Invalid token' };
}
};
El token viene como query string porque WebSockets no soportan headers custom en el handshake. Es un pattern común, documentado y aceptado. El TTL de 2 horas es un cleanup: si algo sale mal y no capturamos el disconnect, DynamoDB borra la entry sola.
Lambda de suscripción
El cliente elige qué vehículos quiere recibir. No le mandamos todos los eventos a todos:
// lambdas/subscribe/subscribe.ts
import { APIGatewayProxyWebsocketEventV2 } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';
const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
interface SubscribeMessage {
action: 'subscribe';
vehicleIds: string[];
}
export const handler = async (event: APIGatewayProxyWebsocketEventV2) => {
const connectionId = event.requestContext.connectionId!;
const body = JSON.parse(event.body || '{}') as SubscribeMessage;
if (!Array.isArray(body.vehicleIds) || body.vehicleIds.length > 200) {
return { statusCode: 400, body: 'Invalid subscription' };
}
await ddb.send(
new UpdateCommand({
TableName: process.env.CONNECTIONS_TABLE!,
Key: { connectionId },
UpdateExpression: 'SET subscribedVehicles = :v',
ExpressionAttributeValues: {
':v': body.vehicleIds,
},
})
);
return { statusCode: 200, body: JSON.stringify({ subscribed: body.vehicleIds.length }) };
};
El límite de 200 vehículos por conexión es arbitrario pero necesario. Sin límite, un cliente malicioso puede suscribirse a toda la flota y DoS'ear al fanout.
Lambda de fanout
Esta es la pieza con más lógica. Lee el stream de DynamoDB, decide qué conexiones deben recibir qué update, y las envía en paralelo:
// lambdas/fanout/fanout.ts
import { DynamoDBStreamEvent } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, ScanCommand } from '@aws-sdk/lib-dynamodb';
import {
ApiGatewayManagementApiClient,
PostToConnectionCommand,
GoneException,
} from '@aws-sdk/client-apigatewaymanagementapi';
import { unmarshall } from '@aws-sdk/util-dynamodb';
const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const apigw = new ApiGatewayManagementApiClient({
endpoint: process.env.WS_ENDPOINT,
});
interface Connection {
connectionId: string;
userId: string;
tenantId: string;
subscribedVehicles: string[];
}
async function getActiveConnections(): Promise<Connection[]> {
const result = await ddb.send(
new ScanCommand({
TableName: process.env.CONNECTIONS_TABLE!,
})
);
return (result.Items || []) as Connection[];
}
async function sendToConnection(
connectionId: string,
payload: unknown
): Promise<boolean> {
try {
await apigw.send(
new PostToConnectionCommand({
ConnectionId: connectionId,
Data: Buffer.from(JSON.stringify(payload)),
})
);
return true;
} catch (err) {
if (err instanceof GoneException) {
console.log(`Connection ${connectionId} is gone, should cleanup`);
return false;
}
throw err;
}
}
export const handler = async (event: DynamoDBStreamEvent) => {
const connections = await getActiveConnections();
const updates: { vehicleId: string; data: any; tenantId: string }[] = [];
for (const record of event.Records) {
if (record.eventName !== 'MODIFY' && record.eventName !== 'INSERT') continue;
const newImage = record.dynamodb?.NewImage;
if (!newImage) continue;
const item = unmarshall(newImage as any);
updates.push({
vehicleId: item.vehicleId,
data: item,
tenantId: item.tenantId,
});
}
const sends: Promise<boolean>[] = [];
for (const update of updates) {
const interested = connections.filter(
(conn) =>
conn.tenantId === update.tenantId &&
conn.subscribedVehicles.includes(update.vehicleId)
);
for (const conn of interested) {
sends.push(
sendToConnection(conn.connectionId, {
type: 'vehicle.update',
vehicleId: update.vehicleId,
data: update.data,
})
);
}
}
const results = await Promise.allSettled(sends);
const failed = results.filter((r) => r.status === 'rejected').length;
console.log(`Sent ${sends.length} messages, ${failed} failed`);
};
El Scan de connections no escala a cientos de miles de conexiones. Para volumen alto, uso el GSI userId-index con queries específicos o muevo a un cache en ElastiCache. En este cliente tenemos 40 conexiones simultáneas, el scan está OK.
Cliente React
// hooks/useVehicleStream.ts
import { useEffect, useRef, useState, useCallback } from 'react';
import { fetchAuthSession } from 'aws-amplify/auth';
interface VehicleState {
vehicleId: string;
lat: number;
lng: number;
speed: number;
updatedAt: number;
}
export function useVehicleStream(vehicleIds: string[]) {
const [vehicles, setVehicles] = useState<Map<string, VehicleState>>(new Map());
const [status, setStatus] = useState<'disconnected' | 'connecting' | 'connected'>(
'disconnected'
);
const wsRef = useRef<WebSocket | null>(null);
const reconnectTimeoutRef = useRef<number | null>(null);
const reconnectAttemptsRef = useRef(0);
const connect = useCallback(async () => {
setStatus('connecting');
const session = await fetchAuthSession();
const token = session.tokens?.idToken?.toString();
const wsUrl = `wss://api.fleet.example.com/prod?token=${token}`;
const ws = new WebSocket(wsUrl);
ws.onopen = () => {
setStatus('connected');
reconnectAttemptsRef.current = 0;
ws.send(
JSON.stringify({
action: 'subscribe',
vehicleIds,
})
);
};
ws.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
if (msg.type === 'vehicle.update') {
setVehicles((prev) => {
const next = new Map(prev);
next.set(msg.vehicleId, msg.data);
return next;
});
}
} catch (err) {
console.error('Bad message:', err);
}
};
ws.onerror = (err) => {
console.error('WebSocket error:', err);
};
ws.onclose = () => {
setStatus('disconnected');
wsRef.current = null;
const backoff = Math.min(30000, 1000 * Math.pow(2, reconnectAttemptsRef.current));
reconnectAttemptsRef.current++;
reconnectTimeoutRef.current = window.setTimeout(connect, backoff);
};
wsRef.current = ws;
}, [vehicleIds]);
useEffect(() => {
connect();
return () => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
}
wsRef.current?.close();
};
}, [connect]);
useEffect(() => {
if (status === 'connected' && wsRef.current) {
wsRef.current.send(
JSON.stringify({
action: 'subscribe',
vehicleIds,
})
);
}
}, [vehicleIds, status]);
return { vehicles, status };
}
Exponential backoff para reconexiones, resuscripción automática cuando cambian los vehicleIds, cleanup del timeout en unmount. Detalles que se olvidan fácil y te muerden en producción.
Comparativa de costos
Para 40 usuarios conectados 8h al día, 800 vehículos actualizando cada 5 segundos:
| Arquitectura | Costo mensual | Latencia | Complejidad |
|---|---|---|---|
| REST polling 10s | 1200 USD | 10s | Baja |
| Pusher managed | 350 USD | 300ms | Baja |
| Socket.io + EC2 | 200 USD | 200ms | Alta |
| API GW WebSockets | 180 USD | 200ms | Media |
| AppSync subscriptions | 250 USD | 200ms | Media |
API Gateway WS cobra 1 USD por millón de mensajes + 0.25 USD por millón de minutos de conexión. Para este tráfico sale muy barato.
Lo que aprendí
1. DynamoDB Streams tiene 2 shards por tabla.
Escalan automáticamente pero el startup es lento. Si tu tabla no ha recibido tráfico en horas, el primer batch tarda más. En horas pico va perfecto.
2. El límite de 128KB por mensaje WebSocket me cacheteó.
Un update de vehículo con su historial de rutas pasaba el límite. Partí el payload en mensajes separados. Regla: mensajes de WebSocket deben ser pequeños, el estado se hidrata por REST y luego solo mandas deltas.
3. API Gateway WS tiene idle timeout de 10 minutos.
Si no hay tráfico en 10 minutos, la conexión se cierra. Implementé heartbeat desde el cliente cada 5 minutos con un {action: 'ping'}. La ruta ping retorna 200 sin hacer nada. Problema resuelto.
4. GoneException significa conexión muerta, hay que limpiar.
Cuando un cliente cierra el browser sin disconnect limpio (mata el proceso, cierra el laptop), la conexión queda zombie en mi tabla. Al intentar enviarle, API Gateway tira GoneException. La Lambda de fanout debería borrar esas entries. Lo hago en un segundo Lambda con SQS para no bloquear el fanout.
5. No pongas lógica pesada en el $connect.
Si $connect tarda más de 30 segundos (timeout de API Gateway WS), la conexión falla. Validé JWT, escribí a DynamoDB, y en casos raros tardaba 35s. Moví la validación de permisos a una Lambda separada llamada en el primer mensaje subscribe.
Cuándo NO usar WebSockets
Si tu update rate es menor a 1 por minuto, polling con HTTP es más simple y barato. WebSockets tiene overhead de mantener conexión.
Si tu frontend es público y masivo (miles de conexiones), considera AppSync con subscriptions (filtros server-side nativos) o incluso EventBridge + SSE (Server-Sent Events) que escalan mejor con CloudFront.
Si tus clientes están detrás de proxies corporativos restrictivos, WebSockets a veces se bloquean. SSE con fallback a long-polling es más resiliente aunque menos eficiente.
El próximo artículo cubre optimización de imágenes on-demand con Lambda y Sharp. Muestro cómo reemplazar Cloudinary con AWS y pagar 50 USD en lugar de 600.
Top comments (0)