Spoiler: Não é só fazer pd.read_csv()
e torcer que vai dar certo
Se você já tentou processar os dados públicos de CNPJ da Receita Federal, sabe que não é trivial. São 60+ milhões de empresas em 37 arquivos ZIP totalizando ~7GB comprimidos que expandem para ~21GB. Neste post, compartilho as decisões arquiteturais do cnpj-data-pipeline.
O Problema Real
A Receita Federal disponibiliza dados mensalmente com desafios únicos:
- 37 arquivos fragmentados: 10 Empresas + 10 Estabelecimentos + 10 Sócios + 7 tabelas de referência
- Dependências entre tabelas: Estabelecimentos referenciam Empresas, Municípios e Motivos
- Encoding ISO-8859-1: Precisa converter para UTF-8
- Memória limitada: Arquivos de até 2GB impossíveis de carregar inteiros
- Formato inconsistente: Datas como "00000000", decimais com vírgula
Arquitetura: Patterns que Fazem Sentido
1. Factory Pattern para Extensibilidade
# src/database/factory.py
ADAPTERS: Dict[str, Type[DatabaseAdapter]] = {
"postgresql": PostgreSQLAdapter,
# Preparado para: MySQL, BigQuery, SQLite
}
def create_database_adapter(config) -> DatabaseAdapter:
backend = config.database_backend.value
adapter_class = ADAPTERS.get(backend)
if not adapter_class:
raise ValueError(f"Unsupported database backend: {backend}")
return adapter_class(config)
Por quê? Adicionar um novo banco é questão de implementar a interface DatabaseAdapter
sem tocar no resto do código.
2. Strategy Pattern para Downloads
Com 47 arquivos, downloads sequenciais são lentos. Implementei duas estratégias:
# src/download_strategies/parallel.py
class ParallelDownloadStrategy(DownloadStrategy):
def download_files(self, directory: str, files: List[str]) -> Iterator[Path]:
# Separa referências de dados
reference_files, data_files = self._categorize_files(files)
# Referências primeiro (sequencial por segurança)
for csv in self._download_sequential(reference_files):
yield csv
# Dados em paralelo
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(self.download_single_file, dir, f): f
for f in data_files
}
for future in as_completed(futures):
yield future.result()
3. Ordenação Inteligente por Dependências
A ordem de processamento importa quando há foreign keys:
# src/downloader.py
def organize_files_by_dependencies(self, files: List[str]) -> Tuple[List[str], Dict]:
# Tabelas de referência primeiro (sem FKs)
REFERENCE_TABLES = {
"Cnaes.zip", "Motivos.zip", "Municipios.zip",
"Naturezas.zip", "Paises.zip", "Qualificacoes.zip"
}
# Ordem respeitando dependências
ORDERED_PATTERNS = [
"Empresas", # Depende de: naturezas_juridicas
"Estabelecimentos", # Depende de: empresas, municipios, motivos
"Socios", # Depende de: empresas
"Simples", # Depende de: empresas
]
# Organiza arquivos por categoria
reference_files = [f for f in files if f in REFERENCE_TABLES]
data_files = {pattern: [] for pattern in ORDERED_PATTERNS}
for filename in files:
for pattern in ORDERED_PATTERNS:
if filename.startswith(pattern):
data_files[pattern].append(filename)
break
# Monta ordem final
ordered_files = reference_files[:]
for pattern in ORDERED_PATTERNS:
ordered_files.extend(sorted(data_files[pattern]))
return ordered_files, categorization_info
Esta estratégia evita erros de foreign key constraint ao garantir que tabelas pai sejam carregadas antes das filhas.
Otimização de Memória
Detecção Automática de Recursos
# src/config.py
def _detect_strategy(self) -> ProcessingStrategy:
memory_gb = psutil.virtual_memory().total / (1024**3)
if memory_gb < 8:
return ProcessingStrategy.MEMORY_CONSTRAINED
elif memory_gb < 32:
return ProcessingStrategy.HIGH_MEMORY
else:
return ProcessingStrategy.DISTRIBUTED
@property
def optimal_chunk_size(self) -> int:
memory_gb = psutil.virtual_memory().total / (1024**3)
# Chunks adaptados à memória disponível
if self.processing_strategy == ProcessingStrategy.MEMORY_CONSTRAINED:
return 100_000 if memory_gb < 4 else 500_000
elif self.processing_strategy == ProcessingStrategy.HIGH_MEMORY:
return 2_000_000
else:
return 5_000_000
Processamento em Streaming com Polars
# src/processor.py
def _process_chunked(self, file_path: Path, db, table_name: str):
chunk_size = self.config.optimal_chunk_size
offset = 0
while True:
chunk_df = pl.read_csv(
file_path,
separator=";",
encoding="utf8",
has_header=False,
skip_rows=offset,
n_rows=chunk_size,
)
if len(chunk_df) == 0:
break
# Transformações: conversão de tipos, limpeza de datas
chunk_df = self._apply_transformations(chunk_df, file_type)
# Bulk insert otimizado
db.bulk_upsert(chunk_df, table_name)
offset += len(chunk_df)
del chunk_df
gc.collect() # Força liberação de memória
Tratamento de Dados Governamentais
Problemas Encontrados e Soluções
# Datas inválidas: "00000000" → NULL
df = df.with_columns(
pl.when(pl.col(date_col) == "0")
.then(None)
.otherwise(pl.col(date_col))
)
# Decimais com vírgula: "1234,56" → 1234.56
df = df.with_columns(
pl.col(numeric_col).str.replace(",", ".").cast(pl.Float64)
)
# Encoding corrompido: detecção e conversão automática
def _convert_file_encoding_chunked(self, input_file: Path):
chunk_size = self.config.encoding_chunk_size # 50MB chunks
with open(input_file, "r", encoding="ISO-8859-1") as infile:
with open(output_file, "w", encoding="UTF-8") as outfile:
while chunk := infile.read(chunk_size):
outfile.write(chunk)
Retry com Backoff Exponencial
def retry_db_connection(max_retries=3, base_delay=1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except (psycopg2.OperationalError, psycopg2.DatabaseError) as e:
if attempt == max_retries:
raise
delay = base_delay * (2**attempt)
time.sleep(delay)
return wrapper
return decorator
Resultados em Produção
Configuração | Tempo de Processamento |
---|---|
VPS 4GB RAM | ~6 horas |
Server 16GB RAM | ~2 horas |
Server 64GB+ RAM | ~1 hora |
- Volume processado: 7GB → 21GB → PostgreSQL (~15GB indexado)
- Registros: 63M empresas + 66M estabelecimentos + 26M sócios
- Confiabilidade: Processamento mensal automatizado via cron
Lições Aprendidas
Ordem importa: Processar arquivos respeitando dependências evita horas debugando constraint violations
Memória > CPU: Melhor processar 100k registros por vez consistentemente do que tentar 10M e travar
Polars > Pandas: Para este volume, Polars usa 3x menos memória e é 2x mais rápido
Patterns justificados: Factory e Strategy não são overengineering quando você sabe que vai expandir
Próximos Passos
- Suporte MySQL e SQLite (em desenvolvimento)
- Filtros de processamento (processar apenas um estado/CNAE específico)
- Exportação para Parquet (para análises em Pandas/Spark)
- Interface natural language (cnpj.chat)
Conclusão
Processar dados em escala não é sobre força bruta - é sobre arquitetura inteligente e trade-offs conscientes. O código está disponível em github.com/cnpj-chat/cnpj-data-pipeline.
Se você está lidando com dados públicos brasileiros ou grandes volumes de CSV, espero que essas técnicas sejam úteis. E se quiser contribuir, PRs são sempre bem-vindos!
Tem alguma pergunta ou sugestão? Me encontre no GitHub ou LinkedIn. Se este post foi útil, considere dar uma estrela no projeto.
Top comments (1)
pretty cool seeing this kinda transparency about struggles and tradeoffs - you think there’s ever a time where brute force really makes sense or is smart design always worth it in the end