DEV Community

criscarba
criscarba

Posted on

Ingestando en Near Real Time con Kinesis Data Firehose

Image description

Existen multiples alternativas de ingesta de datos en AWS, gracias a los diversos Servicios que se encuentran disponibles dentro de la consola, sin embargo en varias oportunidades resulta desafiante el decidir cual utilizar para resolver nuesta necesidad/caso de uso.

En esta publicación nos enfocaremos en resolver escenarios donde nuestro "productor" de datos genera multiples streams constantemente. Estos productores de datos podrian ser por ejemplo:

  • Dispositivos IOT
  • Servidores de aplicaciones que generen logs
  • VPC Flow Logs
  • Telemetría en vehiculos

Para estos casos de usos utilizaremos Amazon Kinesis Data Firehose, el cual es un servicio de ETL que captura, transforma y permite ingestar datos de streaming en Datalakes y/o otros servicios que integran nativamente, como por ej. Open Search o Redshift.

Image description

Una de las principales particularidades de Amazon Kinesis Data Firehose, es que permite la ingesta de datos de manera Near Real Time. Esto significa que los streams de datos que hayan sido ingresados al Delivery Stream no seran entregados en su Output hasta que se cumplan una de las siguientes dos premisas:

  • Buffer por Tiempo (Minimo: 60 segundos | Máximo: 900 segundos)

  • Buffer por Size (Minimo: 1 mb | Máximo: 128 mb)

Cualquiera de las 2 que ocurra primero, se realizará el dump de los datos que contenga el Delivery Stream, en el output que se haya configurado.

A modo de ejemplo, implementaremos en la consola de AWS la siguiente arquitectura:

Image description

Donde simularemos un dispositivo de IOT generando streams que serán enviados al Delivery Stream. Al cabo de 1 minuto, los datos serán escritos en S3.

A Continuación listaré los pasos a realizar:

  1. Abrir a la Consola de AWS

Image description

  1. En el buscador de Servicios, ingresar S3

Image description

  1. Crearemos un Bucket que será el target de nuestro Delivery Stream.

NOTA: El nombre del Bucket debe ser único. No intentar crear el mismo que la imagen.

Image description

  1. En el buscador de servicios, ingresar Kinesis Data Firehose

Image description

  1. Seleccionaremos Create Delivery Stream

Image description

Image description

Seleccionamos el Bucket target y le seteamos el buffer minimo

Image description

Tardará unos minutos en crearse el Delivery Stream.

  1. Para poder simular la ingesta de streams, he desarrollado el siguiente script que simula la carga de datos (random) con una estructura fija en el Delivery Stream:
import names
from random import randint
import boto3
import uuid
import time
import json

DeliveryStreamName = 'demo-kdf'
session_dev = boto3.session.Session(profile_name='default')
firehose = session_dev.client('firehose', region_name='us-east-1')


cnt_streams = 10
for i in range(cnt_streams):   
    record = {
      'id': i,
      'name': names.get_first_name(),
      'surname': names.get_last_name(),
      'age': randint(18,80)
    }

    print(record)

    response = firehose.put_record(DeliveryStreamName = DeliveryStreamName,
                                   Record = {'Data': json.dumps(record)})

    time.sleep(.1)
Enter fullscreen mode Exit fullscreen mode

Al ejecutarlo se visualizarán por terminal los streams de datos que fueron enviados al delivery stream

Image description

  1. Luego de unos minutos, al verificar en el bucket de S3 (Output) Encontraremos el dump de datos

Image description

El contenido puede visualizarse con S3 Select

Image description

Muchas Gracias!!
Cristian R. Carballo LinkedIn
https://www.linkedin.com/in/cristianrcarballo/

Top comments (0)