Existen multiples alternativas de ingesta de datos en AWS, gracias a los diversos Servicios que se encuentran disponibles dentro de la consola, sin embargo en varias oportunidades resulta desafiante el decidir cual utilizar para resolver nuesta necesidad/caso de uso.
En esta publicación nos enfocaremos en resolver escenarios donde nuestro "productor" de datos genera multiples streams constantemente. Estos productores de datos podrian ser por ejemplo:
- Dispositivos IOT
- Servidores de aplicaciones que generen logs
- VPC Flow Logs
- Telemetría en vehiculos
Para estos casos de usos utilizaremos Amazon Kinesis Data Firehose, el cual es un servicio de ETL que captura, transforma y permite ingestar datos de streaming en Datalakes y/o otros servicios que integran nativamente, como por ej. Open Search o Redshift.
Una de las principales particularidades de Amazon Kinesis Data Firehose, es que permite la ingesta de datos de manera Near Real Time. Esto significa que los streams de datos que hayan sido ingresados al Delivery Stream no seran entregados en su Output hasta que se cumplan una de las siguientes dos premisas:
Buffer por Tiempo (Minimo: 60 segundos | Máximo: 900 segundos)
Buffer por Size (Minimo: 1 mb | Máximo: 128 mb)
Cualquiera de las 2 que ocurra primero, se realizará el dump de los datos que contenga el Delivery Stream, en el output que se haya configurado.
A modo de ejemplo, implementaremos en la consola de AWS la siguiente arquitectura:
Donde simularemos un dispositivo de IOT generando streams que serán enviados al Delivery Stream. Al cabo de 1 minuto, los datos serán escritos en S3.
A Continuación listaré los pasos a realizar:
- Abrir a la Consola de AWS
- En el buscador de Servicios, ingresar S3
- Crearemos un Bucket que será el target de nuestro Delivery Stream.
NOTA: El nombre del Bucket debe ser único. No intentar crear el mismo que la imagen.
- En el buscador de servicios, ingresar Kinesis Data Firehose
- Seleccionaremos Create Delivery Stream
Seleccionamos el Bucket target y le seteamos el buffer minimo
Tardará unos minutos en crearse el Delivery Stream.
- Para poder simular la ingesta de streams, he desarrollado el siguiente script que simula la carga de datos (random) con una estructura fija en el Delivery Stream:
import names
from random import randint
import boto3
import uuid
import time
import json
DeliveryStreamName = 'demo-kdf'
session_dev = boto3.session.Session(profile_name='default')
firehose = session_dev.client('firehose', region_name='us-east-1')
cnt_streams = 10
for i in range(cnt_streams):
record = {
'id': i,
'name': names.get_first_name(),
'surname': names.get_last_name(),
'age': randint(18,80)
}
print(record)
response = firehose.put_record(DeliveryStreamName = DeliveryStreamName,
Record = {'Data': json.dumps(record)})
time.sleep(.1)
Al ejecutarlo se visualizarán por terminal los streams de datos que fueron enviados al delivery stream
- Luego de unos minutos, al verificar en el bucket de S3 (Output) Encontraremos el dump de datos
El contenido puede visualizarse con S3 Select
Muchas Gracias!!
Cristian R. Carballo LinkedIn
https://www.linkedin.com/in/cristianrcarballo/
Top comments (0)