DEV Community

Pedro Parker
Pedro Parker

Posted on

Como importei 55 milhões de empresas para PostgreSQL em menos de 3 horas

Quando decidi construir o CNPJ Aberto, que é uma plataforma gratuita de consulta de empresas brasileiras, o primeiro desafio foi óbvio: como colocar 55 milhões de registros dentro do PostgreSQL de forma rápida e repetível (a base atualiza todo mês)?

Os dados vêm dos Dados Abertos da Receita Federal, distribuídos em dezenas de ZIPs com CSVs em latin-1, separados por ;, com campos inconsistentes (muito inconsistentes!!).

A ideia deste post rápido é mostrar as técnicas que transformaram uma importação de 12+ horas em menos de 3 horas.

O cenário

Tabela Registros aproximados Peso CSV
empresas ~55M ~4 GB
estabelecimentos ~70M ~15 GB
socios ~25M ~3 GB
simples ~35M ~2 GB
Tabelas auxiliares ~15K total <1 MB

Total: ~25 GB de CSVs descompactados, distribuídos em ~40 arquivos ZIP. Nestes ponto é visível que os maiores problemas seriam empresas e estabelecimentos, não só para querys como para joins, que são muitos para criar um sistema legal e atrativo.

Tentativa 1: INSERT com ORM (12+ horas)

A abordagem ingênua com SQLAlchemy:

for row in csv_reader:
    empresa = Empresa(**parse_row(row))
    session.add(empresa)
    if i % 1000 == 0:
        session.commit()
Enter fullscreen mode Exit fullscreen mode

Resultado: ~1.200 inserts/segundo. Para 55M de registros, isso dá ~12 horas só para a tabela empresas. Inaceitável. Eu acabei ingerindo tudo pois imaginava que > 12 horas de espera seirma melhores que > X horas de implementação e pesquisa para melhoria de queries. Talvez fossem, mas pelo desafio, fui além.

O ORM adiciona overhead em cada objeto: validação de tipo, tracking de estado, construção de SQL dinâmico.

Para melhorias, a IA acaba ajudando um pouco, não muito, ela alucinada bastante na ajuda e as vezes acaba piorando querys e criando index sem sentido, usei o Opus 4.6. Muito útil, mas o double check é necessário.

Tentativa 2: executemany com batches (4+ horas)

Removendo o ORM e usando psycopg2 direto:

BATCH_SIZE = 5000
batch = []
for row in csv_reader:
    batch.append(parse_row(row))
    if len(batch) >= BATCH_SIZE:
        cursor.executemany(INSERT_SQL, batch)
        conn.commit()
        batch.clear()
Enter fullscreen mode Exit fullscreen mode

Resultado: ~4.000 inserts/segundo. Melhor, mas ainda 4+ horas.

O problema: executemany ainda gera um INSERT por linha. O PostgreSQL parseia e planeja cada statement individualmente.

Solução final: COPY + Temp Tables (< 3 horas)

O COPY é o mecanismo de bulk load nativo do PostgreSQL. Ele bypassa o parser SQL, o planner e o executor — escrevendo direto no heap da tabela. É 10-50x mais rápido que INSERT.

Passo 1: Otimizar a sessão

cursor.execute("SET synchronous_commit = off")
cursor.execute("SET work_mem = '256MB'")
Enter fullscreen mode Exit fullscreen mode

synchronous_commit = off permite que o PostgreSQL confirme transações sem esperar o flush do WAL para disco. Seguro para data loads (se o servidor crashar, você reimporta). Como nesse caso os dados não são tão importantes, é possível reimportar sem precisar fazer check de nada.

Passo 2: Dropar indexes antes, recriar depois

Indexes tornam cada INSERT mais caro porque o B-tree/GIN precisa ser atualizado. Para bulk load, é mais eficiente dropar tudo, importar, e recriar:

def drop_indexes(cursor):
    cursor.execute("""
        SELECT indexname, tablename FROM pg_indexes 
        WHERE schemaname = 'public' 
        AND indexname NOT LIKE '%_pkey'
    """)
    for idx, table in cursor.fetchall():
        cursor.execute(f"DROP INDEX IF EXISTS {idx}")

def create_indexes(cursor):
    # Recriar com CONCURRENTLY para não bloquear reads
    cursor.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS 
        ix_empresas_razao_trgm ON empresas 
        USING gin (razao_social gin_trgm_ops)
    """)
    # ... mais indexes
Enter fullscreen mode Exit fullscreen mode

CREATE INDEX CONCURRENTLY é crucial — permite que o site continue respondendo enquanto os indexes são construídos.

Passo 3: COPY via temp table + UPSERT

Para tabelas que precisam de upsert (atualização mensal), usamos temp tables:

BATCH_SIZE = 200_000

def import_batch(cursor, table, columns, rows):
    # 1. Criar temp table com mesma estrutura
    cursor.execute(f"CREATE TEMP TABLE tmp_{table} (LIKE {table} INCLUDING DEFAULTS)")

    # 2. COPY os dados para a temp table
    csv_buffer = io.StringIO()
    writer = csv.writer(csv_buffer)
    for row in rows:
        writer.writerow(row)
    csv_buffer.seek(0)

    cursor.copy_expert(
        f"COPY tmp_{table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv, NULL '')",
        csv_buffer
    )

    # 3. UPSERT da temp table para a tabela real
    cols = ', '.join(columns)
    update_cols = ', '.join(f"{c} = EXCLUDED.{c}" for c in columns if c != pk)

    cursor.execute(f"""
        INSERT INTO {table} ({cols})
        SELECT {cols} FROM tmp_{table}
        ON CONFLICT ({pk}) DO UPDATE SET {update_cols}
    """)

    # 4. Limpar
    cursor.execute(f"DROP TABLE tmp_{table}")
Enter fullscreen mode Exit fullscreen mode

Por que temp table? Porque o COPY não suporta ON CONFLICT diretamente. A temp table recebe o bulk load ultrarrápido, e depois um único INSERT ... ON CONFLICT faz o merge.

Passo 4: Paralelismo no download e importação

Os ZIPs da Receita Federal são independentes, então podemos baixar e importar em paralelo:

with ThreadPoolExecutor(max_workers=4) as pool:
    futures = []
    for zip_url in zip_urls:
        futures.append(pool.submit(download_and_import, zip_url))

    for future in as_completed(futures):
        future.result()  # propaga exceções
Enter fullscreen mode Exit fullscreen mode

4 workers = 4 ZIPs sendo importados simultaneamente. Com SSDs, o PostgreSQL lida bem com escritas paralelas em tabelas diferentes.

Resultados

Abordagem Velocidade Tempo total
ORM (SQLAlchemy) ~1.200/s 12+ horas
executemany batches ~4.000/s 4+ horas
COPY + temp tables + parallelismo ~80.000/s < 3 horas

67x mais rápido que a abordagem inicial.

Lições aprendidas

  1. use COPY para bulk load no PostgreSQL. Não existe nada mais rápido sem ir para pg_bulkload ou extensões externas.

  2. Dropar indexes antes de um bulk load e recriar depois é quase sempre mais rápido que manter os indexes durante a carga.

  3. Temp tables são o bridge entre COPY (que não suporta upsert) e a necessidade de ON CONFLICT.

  4. synchronous_commit = off é uma otimização segura para data loads — o pior que acontece é perder dados que você pode reimportar.

  5. Batch size importa: 200K linhas por batch é o sweet spot. Muito menos = overhead de transação. Muito mais = uso excessivo de memória.

O resultado

Esse pipeline roda todo mês no CNPJ Aberto para atualizar a base com os dados mais recentes da Receita Federal. Qualquer pessoa pode consultar gratuitamente dados de qualquer empresa brasileira, razão social, sócios, endereço, CNAE, situação cadastral e muito mais.

Se você trabalha com dados públicos brasileiros, dá uma olhada: cnpjaberto.com.br

Obrigado!

Top comments (0)