Quando decidi construir o CNPJ Aberto, que é uma plataforma gratuita de consulta de empresas brasileiras, o primeiro desafio foi óbvio: como colocar 55 milhões de registros dentro do PostgreSQL de forma rápida e repetível (a base atualiza todo mês)?
Os dados vêm dos Dados Abertos da Receita Federal, distribuídos em dezenas de ZIPs com CSVs em latin-1, separados por ;, com campos inconsistentes (muito inconsistentes!!).
A ideia deste post rápido é mostrar as técnicas que transformaram uma importação de 12+ horas em menos de 3 horas.
O cenário
| Tabela | Registros aproximados | Peso CSV |
|---|---|---|
empresas |
~55M | ~4 GB |
estabelecimentos |
~70M | ~15 GB |
socios |
~25M | ~3 GB |
simples |
~35M | ~2 GB |
| Tabelas auxiliares | ~15K total | <1 MB |
Total: ~25 GB de CSVs descompactados, distribuídos em ~40 arquivos ZIP. Nestes ponto é visível que os maiores problemas seriam empresas e estabelecimentos, não só para querys como para joins, que são muitos para criar um sistema legal e atrativo.
Tentativa 1: INSERT com ORM (12+ horas)
A abordagem ingênua com SQLAlchemy:
for row in csv_reader:
empresa = Empresa(**parse_row(row))
session.add(empresa)
if i % 1000 == 0:
session.commit()
Resultado: ~1.200 inserts/segundo. Para 55M de registros, isso dá ~12 horas só para a tabela empresas. Inaceitável. Eu acabei ingerindo tudo pois imaginava que > 12 horas de espera seirma melhores que > X horas de implementação e pesquisa para melhoria de queries. Talvez fossem, mas pelo desafio, fui além.
O ORM adiciona overhead em cada objeto: validação de tipo, tracking de estado, construção de SQL dinâmico.
Para melhorias, a IA acaba ajudando um pouco, não muito, ela alucinada bastante na ajuda e as vezes acaba piorando querys e criando index sem sentido, usei o Opus 4.6. Muito útil, mas o double check é necessário.
Tentativa 2: executemany com batches (4+ horas)
Removendo o ORM e usando psycopg2 direto:
BATCH_SIZE = 5000
batch = []
for row in csv_reader:
batch.append(parse_row(row))
if len(batch) >= BATCH_SIZE:
cursor.executemany(INSERT_SQL, batch)
conn.commit()
batch.clear()
Resultado: ~4.000 inserts/segundo. Melhor, mas ainda 4+ horas.
O problema: executemany ainda gera um INSERT por linha. O PostgreSQL parseia e planeja cada statement individualmente.
Solução final: COPY + Temp Tables (< 3 horas)
O COPY é o mecanismo de bulk load nativo do PostgreSQL. Ele bypassa o parser SQL, o planner e o executor — escrevendo direto no heap da tabela. É 10-50x mais rápido que INSERT.
Passo 1: Otimizar a sessão
cursor.execute("SET synchronous_commit = off")
cursor.execute("SET work_mem = '256MB'")
synchronous_commit = off permite que o PostgreSQL confirme transações sem esperar o flush do WAL para disco. Seguro para data loads (se o servidor crashar, você reimporta). Como nesse caso os dados não são tão importantes, é possível reimportar sem precisar fazer check de nada.
Passo 2: Dropar indexes antes, recriar depois
Indexes tornam cada INSERT mais caro porque o B-tree/GIN precisa ser atualizado. Para bulk load, é mais eficiente dropar tudo, importar, e recriar:
def drop_indexes(cursor):
cursor.execute("""
SELECT indexname, tablename FROM pg_indexes
WHERE schemaname = 'public'
AND indexname NOT LIKE '%_pkey'
""")
for idx, table in cursor.fetchall():
cursor.execute(f"DROP INDEX IF EXISTS {idx}")
def create_indexes(cursor):
# Recriar com CONCURRENTLY para não bloquear reads
cursor.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
ix_empresas_razao_trgm ON empresas
USING gin (razao_social gin_trgm_ops)
""")
# ... mais indexes
CREATE INDEX CONCURRENTLY é crucial — permite que o site continue respondendo enquanto os indexes são construídos.
Passo 3: COPY via temp table + UPSERT
Para tabelas que precisam de upsert (atualização mensal), usamos temp tables:
BATCH_SIZE = 200_000
def import_batch(cursor, table, columns, rows):
# 1. Criar temp table com mesma estrutura
cursor.execute(f"CREATE TEMP TABLE tmp_{table} (LIKE {table} INCLUDING DEFAULTS)")
# 2. COPY os dados para a temp table
csv_buffer = io.StringIO()
writer = csv.writer(csv_buffer)
for row in rows:
writer.writerow(row)
csv_buffer.seek(0)
cursor.copy_expert(
f"COPY tmp_{table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv, NULL '')",
csv_buffer
)
# 3. UPSERT da temp table para a tabela real
cols = ', '.join(columns)
update_cols = ', '.join(f"{c} = EXCLUDED.{c}" for c in columns if c != pk)
cursor.execute(f"""
INSERT INTO {table} ({cols})
SELECT {cols} FROM tmp_{table}
ON CONFLICT ({pk}) DO UPDATE SET {update_cols}
""")
# 4. Limpar
cursor.execute(f"DROP TABLE tmp_{table}")
Por que temp table? Porque o COPY não suporta ON CONFLICT diretamente. A temp table recebe o bulk load ultrarrápido, e depois um único INSERT ... ON CONFLICT faz o merge.
Passo 4: Paralelismo no download e importação
Os ZIPs da Receita Federal são independentes, então podemos baixar e importar em paralelo:
with ThreadPoolExecutor(max_workers=4) as pool:
futures = []
for zip_url in zip_urls:
futures.append(pool.submit(download_and_import, zip_url))
for future in as_completed(futures):
future.result() # propaga exceções
4 workers = 4 ZIPs sendo importados simultaneamente. Com SSDs, o PostgreSQL lida bem com escritas paralelas em tabelas diferentes.
Resultados
| Abordagem | Velocidade | Tempo total |
|---|---|---|
| ORM (SQLAlchemy) | ~1.200/s | 12+ horas |
executemany batches |
~4.000/s | 4+ horas |
COPY + temp tables + parallelismo |
~80.000/s | < 3 horas |
67x mais rápido que a abordagem inicial.
Lições aprendidas
use COPY para bulk load no PostgreSQL. Não existe nada mais rápido sem ir para
pg_bulkloadou extensões externas.Dropar indexes antes de um bulk load e recriar depois é quase sempre mais rápido que manter os indexes durante a carga.
Temp tables são o bridge entre COPY (que não suporta upsert) e a necessidade de
ON CONFLICT.synchronous_commit = offé uma otimização segura para data loads — o pior que acontece é perder dados que você pode reimportar.Batch size importa: 200K linhas por batch é o sweet spot. Muito menos = overhead de transação. Muito mais = uso excessivo de memória.
O resultado
Esse pipeline roda todo mês no CNPJ Aberto para atualizar a base com os dados mais recentes da Receita Federal. Qualquer pessoa pode consultar gratuitamente dados de qualquer empresa brasileira, razão social, sócios, endereço, CNAE, situação cadastral e muito mais.
Se você trabalha com dados públicos brasileiros, dá uma olhada: cnpjaberto.com.br
Obrigado!
Top comments (0)