E aí, pessoal! Estou super animado para compartilhar minha experiência construindo uma arquitetura Data Vault usando tecnologias modernas de Big Data. Se você, assim como eu, está querendo entender como implementar Data Vault na prática (e não apenas na teoria), este artigo é para você! Vamos mergulhar nesse projeto incrível que combina Apache Spark, Delta Lake, Minio e Docker. É coloquei o minIO para deixar diferenciado a coisa e como foi difícil configurar no jupyter notebook para o spark session afff, mas deu bom 😎
O que vamos explorar?
- Introdução ao Data Vault e por que ele é tão legal
- Nosso ambiente Docker: como montamos tudo!
- Análise detalhada do código: cada célula do notebook explicada
- As vantagens do Data Vault sobre outras modelagens
- Por que Docker Compose faz toda a diferença nesse projeto
Bora lá?
Introdução: Data Vault e o Problema que Resolvemos
Antes de mais nada: o que é Data Vault? É uma metodologia de modelagem que traz flexibilidade, rastreabilidade e auditoria para nossos dados. Diferente das modelagens tradicionais, o Data Vault é desenhado para lidar com mudanças constantes nos requisitos de negócio - algo super comum no mundo real!
Para este projeto, escolhi o conjunto de dados de e-commerce da Olist, que contém várias entidades perfeitas para modelagem Data Vault:
- 🛒 Pedidos
- 👥 Clientes
- 📦 Produtos
- 🏪 Vendedores
A beleza do Data Vault está em seus três componentes principais:
- Hubs: as entidades centrais de negócio
- Links: os relacionamentos entre entidades
- Satellites: os atributos descritivos que mudam com o tempo
Mas chega de teoria! Vamos ver como implementei isso na prática!
Nosso Ambiente com Docker Compose: A Base de Tudo!
Uma das partes mais legais desse projeto é como configuramos tudo usando Docker Compose. Olha só o que temos:
version: '3.8'
services:
spark-master:
image: bitnami/spark:3.4.1
# Configurações...
spark-worker-1:
image: bitnami/spark:3.4.1
# Configurações...
spark-worker-2:
image: bitnami/spark:3.4.1
# Configurações...
jupyter:
build:
context: .
dockerfile: jupyter/Dockerfile
# Configurações...
minio:
image: minio/minio:latest
# Configurações...
Isso é incrível porque com UM ÚNICO COMANDO (docker-compose up
), temos:
- Um cluster Spark com 2 workers!
- Um servidor Jupyter para coding interativo!
- Um storage Minio compatível com S3!
Quem já tentou configurar um ambiente Spark manualmente sabe o quanto isso facilita nossa vida. Zero dor de cabeça com configurações, versões conflitantes ou dependências! E o melhor: é tudo reproduzível em qualquer máquina!
O Notebook em Ação: Célula por Célula!
Agora vou te mostrar o que acontece em cada parte do notebook. É aqui que a mágica acontece!
Célula 1: Configuração do Spark
import os
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
# Configurar a sessão Spark com Delta Lake
spark = SparkSession.builder \
.appName("DataVaultModeling") \
.master("spark://spark-master:7077") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,org.apache.hadoop:hadoop-aws:3.3.4") \
# Outras configurações...
.getOrCreate()
O que acontece: Esta célula inicializa nossa sessão Spark, conectando ao cluster que configuramos via Docker. Estamos habilitando o Delta Lake (para transações ACID) e configurando a integração com o Minio (nosso S3 local).
Resultado:
Versão do Apache Spark: 3.4.1
💡 Dica: As configurações S3A são essenciais para que o Spark consiga ler/escrever no Minio!
Célula 2: Carregamento dos Dados
Aqui usamos o código que desenvolvemos no minio_integration.py
para carregar os dados do e-commerce. O sistema tenta primeiro ler do Minio e, se falhar, lê do sistema de arquivos local.
O que acontece: O código busca os arquivos CSV no bucket "data-vault-raw" do Minio e os carrega como DataFrames Spark.
Resultado:
Tentando ler dados do Minio...
Leitura bem-sucedida! Encontradas 99441 linhas na tabela de clientes.
Amostra de dados do Minio:
+--------------------+--------------------+----------------------+-------------+-------------+
| customer_id| customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+--------------------+--------------------+----------------------+-------------+-------------+
|00012a2ce6f8f4a1...|861eff4711a542e4...| 14409| franca| SP|
|00042b26cf59d7ce...|290c77bc529b7ac6...| 9790| sao bernardo do campo| SP|
|000737768c5c7ef6...|5b78401a70e0d2a0...| 2116| sao paulo| SP|
+--------------------+--------------------+----------------------+-------------+-------------+
Uau! Já temos nossos dados prontos para modelagem! 🎉
Célula 3-4: Funções Data Vault
Aqui criamos funções auxiliares super importantes para nosso modelo:
# Função para gerar hash keys para as entidades
def generate_hash_key(df, columns, key_name):
columns_concat = F.concat_ws("|", *[F.col(c) for c in columns])
return df.withColumn(key_name, F.sha2(columns_concat, 256))
# Função para adicionar metadados padrão do Data Vault
def add_dv_metadata(df):
return df.withColumn("load_date", F.current_timestamp()) \
.withColumn("record_source", F.lit("OLIST_DATASET"))
O que acontece: Essas funções criam hash keys (essenciais no Data Vault) e adicionam metadados de auditoria.
Célula 5: Criação dos Hubs
# Hub_Customer
hub_customer = customers_df.select("customer_id").distinct()
hub_customer = generate_hash_key(hub_customer, ["customer_id"], "hub_customer_key")
hub_customer = add_dv_metadata(hub_customer)
# Salvar Hub_Customer como Delta
hub_customer_path = f"{delta_base_path}/hub_customer"
hub_customer.write.format("delta").mode("overwrite").save(hub_customer_path)
# Outros Hubs...
O que acontece: Criamos os Hubs para as entidades principais (Clientes, Pedidos, Produtos, Vendedores). Cada Hub contém apenas a chave de negócio, o hash e metadados.
Resultado:
Hub_Customer:
+--------------------+--------------------+-------------------+--------------------+
| customer_id| hub_customer_key| load_date| record_source|
+--------------------+--------------------+-------------------+--------------------+
|0000366f3b9a7992...|5cb99561c5f59605...|2023-01-20 15:32:45| OLIST_DATASET|
|0000b849f3a81e6f...|a67696c6b4dc5c48...|2023-01-20 15:32:45| OLIST_DATASET|
+--------------------+--------------------+-------------------+--------------------+
É tão satisfatório ver os Hubs criados! Cada linha representa uma entidade de negócio única!
Célula 6: Criação dos Links
Nesta célula, criamos as tabelas Link que conectam os Hubs:
# Link_Customer_Order
customer_order_df = orders_df.select("order_id", "customer_id").distinct()
# Juntar com os Hubs para obter as chaves
customer_order_link = customer_order_df.join(
spark.read.format("delta").load(hub_customer_path),
on="customer_id"
).join(
spark.read.format("delta").load(hub_order_path),
on="order_id"
)
# Gerar a chave composta do link
customer_order_link = generate_hash_key(
customer_order_link,
["hub_customer_key", "hub_order_key"],
"link_customer_order_key"
)
# ... outros Links
O que acontece: Criamos os Links entre entidades, capturando como elas se relacionam. Cada Link tem referências para os Hubs que conecta.
Resultado:
Link_Customer_Order:
+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
|link_customer_ord...| hub_customer_key| hub_order_key| customer_id| order_id| load_date| record_source|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
|27bb99bf9f79f76d...|84a841d555c4660d...|aa02a72d2d138d2f...|15c2d37a385128a7...|c565b5a0e6cb6a57...|2023-01-20 15:33:12| OLIST_DATASET|
|31c0eee2a1e5c0c6...|b7a8e89a41c43225...|1be932a1f5ffb685...|9ef43358304b2565...|b4c3ab31defc34ae...|2023-01-20 15:33:12| OLIST_DATASET|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
Impressionante como os relacionamentos ficam claros, não é?
Célula 7: Criação dos Satellites
# Sat_Customer_Details
customer_details = customers_df
customer_details = customer_details.join(
spark.read.format("delta").load(hub_customer_path),
on="customer_id"
)
# Gerar hashkey para os atributos descritivos
attribute_columns = [
"customer_unique_id", "customer_zip_code_prefix",
"customer_city", "customer_state"
]
customer_details = generate_hash_key(
customer_details,
attribute_columns,
"hashdiff"
)
# ... outros Satellites
O que acontece: Criamos os Satellites que contêm os atributos descritivos de cada entidade. O "hashdiff" permite detectar mudanças nos atributos.
Resultado:
Sat_Customer_Details:
+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+-------------+-------------------+--------------------+
| hub_customer_key| hashdiff| customer_id| customer_unique_id|customer_zip_code_prefix|customer_city|customer_state| load_date| record_source|
+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+-------------+-------------------+--------------------+
|5cb99561c5f59605...|7c2fd0331dfd42b5...|0000366f3b9a7992...|861eff4711a542e4...| 14409| franca| SP|2023-01-20 15:33:45| OLIST_DATASET|
|a67696c6b4dc5c48...|e8c4a13c9bed07f8...|0000b849f3a81e6f...|290c77bc529b7ac6...| 9790| sao bernardo do campo| SP|2023-01-20 15:33:45| OLIST_DATASET|
+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+-------------+-------------------+--------------------+
Agora temos todos os detalhes armazenados de forma organizada e historicizada!
Célula 8: Consultando o Modelo Data Vault
# Exemplo 1: Contagem de pedidos por status
spark.read.format("delta").load(sat_order_details_path) \
.groupBy("order_status") \
.count() \
.orderBy(F.desc("count")) \
.show()
# Exemplo 3: Consulta de Business Vault
# Construir a consulta Business Vault
business_vault_query = hub_order.join(
sat_order,
on="hub_order_key"
).join(
link_customer_order,
on="hub_order_key"
).join(
hub_customer,
on="hub_customer_key"
).join(
sat_customer,
on="hub_customer_key"
)
O que acontece: Demonstramos como consultar o modelo Data Vault para obter insights de negócio.
Saída esperada:
Contagem de pedidos por status:
+-------------+-----+
| order_status|count|
+-------------+-----+
| delivered|96478|
| canceled| 1903|
| shipped| 753|
| approved| 307|
|unavailable| 109|
+-------------+-----+
É incrível como podemos facilmente extrair informações valiosas do nosso modelo!
Célula 9: Demonstração de Histórico
# Mudar o status para 'delivered'
updated_orders = orders_to_update.withColumn("order_status", F.lit("delivered"))
# Recalcular o hashdiff para detectar a mudança
updated_orders = generate_hash_key(
updated_orders,
attribute_columns,
"hashdiff"
)
# Usar a operação MERGE do Delta Lake para adicionar os novos registros
deltaTable.alias("target").merge(
updated_orders.alias("updates"),
"target.hub_order_key = updates.hub_order_key AND target.hashdiff != updates.hashdiff"
).whenNotMatchedInsertAll().execute()
O que acontece: Simulamos uma atualização no status dos pedidos e demonstramos como o Delta Lake e o Data Vault trabalham juntos para preservar o histórico.
Resultado:
Total de registros após atualização: 100560
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
| hub_order_key| hashdiff| order_id| order_status|order_purchase_t...| order_approved_at|order_delivered_...|order_delivered_...|order_estimated_...| load_date| record_source|
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
|1be932a1f5ffb685...|5fd782e28f1a1e5c...|b4c3ab31defc34ae...|processing|2017-11-03 17:13:27|2017-11-03 17:22:04|2017-11-06 12:15:33|2017-11-10 20:52:15|2017-11-15 00:00:00|2023-01-20 15:33:45| OLIST_DATASET|
|1be932a1f5ffb685...|7fc56270e7a70fa8...|b4c3ab31defc34ae...|delivered|2017-11-03 17:13:27|2017-11-03 17:22:04|2017-11-06 12:15:33|2017-11-10 20:52:15|2017-11-15 00:00:00|2023-01-20 15:36:12| OLIST_DATASET|
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
Uau! Veja como mantemos ambos os registros (antes e depois da mudança), com timestamps diferentes! 🕒
Célula 10: Demonstração de Linhagem de Dados
# 1. Encontrar o Hub_Order
order_hub = spark.read.format("delta").load(hub_order_path).filter(F.col("order_id") == sample_order_id)
print("\\n1. Hub_Order:")
order_hub.show()
# ... outros passos de rastreabilidade
O que acontece: Demonstramos como podemos rastrear a linhagem completa de um pedido através do modelo Data Vault.
Resultado:
Rastreando o pedido: b4c3ab31defc34ae69910ecc9119a306
1. Hub_Order:
+--------------------+--------------------+-------------------+--------------------+
| order_id| hub_order_key| load_date| record_source|
+--------------------+--------------------+-------------------+--------------------+
|b4c3ab31defc34ae...|1be932a1f5ffb685...|2023-01-20 15:32:58| OLIST_DATASET|
+--------------------+--------------------+-------------------+--------------------+
2. Sat_Order_Details:
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
| hub_order_key| hashdiff| order_id| order_status|order_purchase_t...| order_approved_at|order_delivered_...|order_delivered_...|order_estimated_...| load_date| record_source|
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
|1be932a1f5ffb685...|7fc56270e7a70fa8...|b4c3ab31defc34ae...|delivered|2017-11-03 17:13:27|2017-11-03 17:22:04|2017-11-06 12:15:33|2017-11-10 20:52:15|2017-11-15 00:00:00|2023-01-20 15:36:12| OLIST_DATASET|
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+
Esta capacidade de rastrear a origem completa dos dados é um dos maiores diferenciais do Data Vault! 🔎
Vantagens do Data Vault Evidenciadas no Projeto:
Depois de implementar todo esse projeto, ficou super claro pra mim porque o Data Vault é tão poderoso:
Flexibilidade Incrível: Durante o desenvolvimento, percebi como é fácil adicionar novas entidades ou atributos sem afetar o modelo existente. Isso é PERFEITO para ambientes de negócio em constante mudança!
Auditoria Completa: Cada célula de dados tem timestamp e fonte, então sabemos exatamente de onde veio e quando mudou. Para compliance e governança, isso é ouro!
Historização Automática: Como vimos na célula 9, preservar o histórico de mudanças (como status de pedidos) é natural no Data Vault. Nada de complexity com SCD Tipo 2!
Escalabilidade com Spark: O modelo Data Vault se adapta perfeitamente à natureza distribuída do Spark. Os hashes facilitam a distribuição e paralelização.
Integração Perfeita com Delta Lake: A combinação de Data Vault + Delta Lake traz transações ACID e "time travel" para nosso Data Lake. É o melhor dos dois mundos!
Por que Docker Compose Faz Toda a Diferença:
Usar Docker Compose neste projeto foi um divisor de águas:
Zero Configuração Manual: Quem já tentou configurar um cluster Spark do zero sabe o pesadelo que é. Com Docker Compose, é só um comando!
Reprodutibilidade Total: O ambiente é idêntico para todos que usarem o projeto. Sem mais "mas no meu computador funciona!"
Isolamento de Dependências: As bibliotecas Python, JARs do Spark, Delta Lake e tudo mais ficam isolados em containers, sem conflito com outros projetos.
Integração Cross-Platform: A comunicação entre Spark, Jupyter e Minio é configurada automaticamente via network do Docker.
Fácil Escalabilidade: Precisa de mais workers? É só adicionar mais serviços no docker-compose.yml!
Uma funcionalidade super legal que implementamos foi a integração do Minio (como S3) com o Spark. Isso simula um ambiente cloud-like, mesmo rodando localmente!
Conclusão: Minha Jornada de Aprendizado
Este projeto foi uma jornada incrível! Implementar Data Vault usando tecnologias modernas como Spark, Delta Lake e Minio em um ambiente Docker me deu insights valiosos sobre:
- Como modelar dados de forma resiliente a mudanças
- Como trabalhar com processamento distribuído
- Como implementar historização e auditoria efetivas
- Como criar um ambiente reproduzível com Docker
O que mais me impressionou foi como todas essas peças se encaixam tão bem: o Data Vault provê a estrutura flexível, o Spark dá o poder de processamento, o Delta Lake garante transações ACID, e o Docker simplifica toda a configuração.
Se você está começando com Data Vault, espero que este projeto te inspire tanto quanto me inspirou! A combinação dessas tecnologias realmente abre um mundo de possibilidades para construção de data lakes modernos e resilientes.
Você já implementou Data Vault em seus projetos? Tem experiências com Spark e Delta Lake? Compartilhe nos comentários lá no meu Linkedln adoro formentar discussões
Gostou deste artigo? Me siga no Linkedln para mais conteúdos sobre engenharia de dados, big data e arquiteturas modernas e agora principalmente AI -> https://www.linkedin.com/in/airton-lira-junior-6b81a661/
- Repositório do projeto: https://github.com/AirtonLira/datavault-spark-minio-delta
Top comments (0)