Você já escreveu um script para buscar dados de centenas de APIs ou ler milhares de arquivos e ficou olhando para o progresso, linha por linha, enquanto a maior parte do tempo o seu processador parecia estar de férias?
Se a resposta é sim, você provavelmente estava lidando com uma tarefa I/O-Bound. Em engenharia de dados, entender esse conceito é a chave para transformar pipelines lentos em processos eficientes.
Conceitos Fundamentais: Concorrência não é Paralelismo
Antes de mergulharmos no código, vale a pena esclarecer uma coisa. Como diz Rob Pike, um dos criadores da linguagem Go:
- Concorrência é lidar com muitas coisas ao mesmo tempo.
- Paralelismo é fazer muitas coisas ao mesmo tempo.
Nosso caso de I/O-bound é um exemplo clássico de concorrência. Nosso programa gerencia centenas de requisições de rede "pendentes" de uma só vez. Mesmo que tenhamos apenas um punhado de cores de CPU, o sistema operacional consegue dar progresso a cada uma delas, aproveitando o tempo de espera. Não estamos necessariamente executando os downloads em paralelo (o que exigiria centenas de cores), mas estamos gerenciando a concorrência entre eles de forma eficiente.
O Problema na Prática: A Execução Sequencial
A abordagem mais intuitiva para processar uma lista de tarefas é um loop for
. Vamos simular a busca de dados para 200 produtos, onde cada busca leva 1 segundo (simulando a espera da rede).
import time
def buscar_dados_produto(product_id: int) -> str:
"""Simula uma chamada de rede que leva 1 segundo."""
print(f"Buscando dados para o produto {product_id}...")
time.sleep(1)
return f"Dados do produto {product_id}"
# --- Execução Sequencial ---
inicio_seq = time.time()
resultados_seq = []
for i in range(200):
resultados_seq.append(buscar_dados_produto(i))
fim_seq = time.time()
print(f"\nTempo total (Sequencial): {fim_seq - inicio_seq:.2f} segundos")
O cálculo do tempo é simples:
Tempo Total (Sequencial) = Número de Tarefas × Tempo por Tarefa
200 tarefas × 1s/tarefa = 200 segundos
É um processo lento, pois não aproveitamos o tempo de espera.
A Solução Concorrente: Threads ao Resgate
Se a CPU está ociosa enquanto espera, por que não usá-la para iniciar outras requisições? É exatamente isso que a concorrência com threads nos permite fazer. A biblioteca concurrent.futures
do Python torna isso simples.
import time
import os
import concurrent.futures
def buscar_dados_produto(product_id: int) -> str:
"""Simula uma chamada de rede que leva 1 segundo."""
print(f"Buscando dados para o produto {product_id}...")
time.sleep(1)
return f"Dados do produto {product_id}"
# --- Execução Paralela com Threads ---
# Identificar o número de cores para definir os workers dinamicamente
num_cores = os.cpu_count() or 1
# Para I/O-Bound, usamos um multiplicador. 4x o número de cores é um bom começo.
MAX_WORKERS = num_cores * 4
print(f"Configurando pool com {MAX_WORKERS} threads...")
inicio_par = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# O executor.map agenda as tarefas e as executa nas threads.
resultados_par = list(executor.map(buscar_dados_produto, range(200)))
fim_par = time.time()
print(f"\nTempo total (Threads): {fim_par - inicio_par:.2f} segundos")
O tempo de execução cai drasticamente. Para uma máquina com 4 cores (16 workers), o tempo teórico seria:
Tempo Total (Paralelo) ≈ (Número de Tarefas / Número de Workers) × Tempo por Tarefa
(200 tarefas / 16 workers) * 1s/tarefa ≈ 12.5 segundos
Gráfico Comparativo: Visualizando o Ganho
Este gráfico ilustra perfeitamente os conceitos que discutimos:
- Ineficiência Sequencial: A linha "Tempo Sequencial" cresce de forma perfeitamente linear e íngreme. Dobrar o número de tarefas dobra o tempo de execução, como esperado. Em uma escala logarítmica, isso se manifesta como uma linha reta e diagonal.
- Impacto Massivo do Paralelismo: Todas as linhas de execução paralela estão ordens de magnitude abaixo da linha sequencial, mostrando o ganho imediato e drástico de performance ao simplesmente não esperar em fila.
- Benefícios de Mais Workers: A linha de 32 workers está consistentemente abaixo da de 16, que por sua vez está abaixo da de 8. Isso confirma que, para esta tarefa I/O-Bound, adicionar mais "trabalhadores" (threads) para fazer requisições concorrentes acelera ainda mais o processo.
- Retornos Decrescentes: Note que a distância entre as linhas paralelas diminui. O salto de performance de "Sequencial" para "8 Workers" é gigantesco. O salto de "8" para "16" é ótimo, e o de "16" para "32" é bom, mas menor. Isso sugere que, em algum ponto, adicionar mais workers não trará um benefício tão grande, pois o sistema começará a ser limitado por outros fatores (largura de banda da rede, limites da API, etc.).
Por Que Threads e Não Processos? O GIL
O GIL: O Vilão que se Torna Herói no I/O
O Global Interpreter Lock (GIL) do Python é uma trava que permite que apenas uma thread execute bytecode Python por vez. Para tarefas que usam intensivamente a CPU, isso é um gargalo, pois impede o paralelismo real em múltiplos cores.
No entanto, para tarefas I/O-bound, o GIL se comporta de maneira diferente. O segredo é que toda função da biblioteca padrão do Python que faz uma chamada de sistema (syscall) libera o GIL.
Operações de rede e de disco são syscalls. Isso significa que quando a Thread A
faz uma chamada para ler um arquivo do S3, ela libera a trava do GIL. Isso permite que a Thread B
assuma o controle e inicie sua própria chamada de rede.
O resultado, como diz o autor David Beazley, é que "threads em Python são ótimas em não fazer nada" — e isso é exatamente o que queremos. Elas são a ferramenta perfeita para gerenciar a "espera" de forma concorrente. Usar processos (ProcessPoolExecutor
) teria um custo de memória e inicialização muito maior, que é desnecessário para tarefas que não competem por tempo de CPU.
O Que Ferramentas Como o ThreadPoolExecutor
Fazem por Nós?
Escrever código concorrente do zero é complexo. Ferramentas como concurrent.futures
escondem essa complexidade de nós.
- Gerenciamento do Ciclo de Vida: Iniciar threads tem um custo. Reutilizá-las em um "pool" é muito mais eficiente do que criar e destruir uma thread para cada pequena tarefa. O
ThreadPoolExecutor
faz exatamente esse gerenciamento de pool para nós. - Coordenação da Comunicação: Como obter o resultado de uma tarefa que rodou em outra thread? O método
.map()
do executor abstrai tudo isso, coletando os resultados e até mesmo tratando erros de forma transparente, sem que precisemos implementar filas ou outros mecanismos de comunicação manualmente.
Próximos Passos: Dask e Spark
Para tarefas de engenharia de dados mais complexas ou com volumes que não cabem na memória, as ferramentas da biblioteca padrão atingem seu limite. É aí que entram frameworks mais robustos.
- Dask: Oferece uma abstração de alto nível sobre o paralelismo em Python, com DataFrames e Bags que podem operar em dados maiores que a memória e escalar para múltiplos nós.
- Spark: É o padrão da indústria para processamento de Big Data. Com seu motor otimizado (Catalyst) e arquitetura distribuída, ele lida com transformações complexas (joins, agregações) em terabytes de dados de forma eficiente.
Exploraremos como essas ferramentas resolvem o mesmo problema (e muitos outros) em posts futuros.
Conclusão
Entender a natureza da sua carga de trabalho é o primeiro passo para a otimização. Muitas tarefas em engenharia de dados não são limitadas pela velocidade de processamento, mas pelo tempo de espera.
Da próxima vez que seu script de ingestão de dados parecer lento, pergunte-se: meu código está realmente trabalhando ou está apenas esperando? Se a resposta for "esperando", concurrent.futures.ThreadPoolExecutor
é uma ferramenta simples e poderosa da biblioteca padrão para transformar essa espera em eficiência.
Top comments (0)