DEV Community

Roberto de Vargas Neto
Roberto de Vargas Neto

Posted on • Originally published at dev.to

Integrador de Market Data: Consumindo Dados Reais com Python, MongoDB e Kafka

Olá, pessoal!

Dando continuidade à série My Broker B3, hoje vamos falar sobre o componente que alimenta todo o ecossistema com dados reais do mercado financeiro brasileiro: o Broker Market Data API.

Este microserviço em Python atua como um ingestor, conectando o mundo externo (API Brappi) à nossa infraestrutura interna.


🏗️ A Solução e o Fluxo de Dados

O objetivo aqui é garantir que os preços dos ativos estejam sempre atualizados para os outros serviços. O fluxo foi desenhado em três etapas principais:

  1. Ingestão Agendada: O serviço percorre uma Watchlist de 50 ativos (incluindo Blue Chips como PETR4 e VALE3, além de FIIs e ETFs).
  2. Persistência Histórica (MongoDB): Antes de qualquer processamento, o payload completo é salvo no MongoDB. Isso garante que tenhamos uma trilha de auditoria e dados para análises futuras.
  3. Streaming de Eventos (Kafka): O preço atualizado é publicado no tópico trading-assets-market-data-v1. Isso permite que qualquer outro microserviço reaja a essa mudança em tempo real.

🛠️ Detalhes da Implementação

Escolhi Python 3.12 pela sua agilidade em lidar com requisições HTTP e integração com drivers de dados.

A importância da Chave no Kafka (Message Key)

Uma decisão técnica vital neste serviço foi o uso do ticker do ativo como chave (key) da mensagem enviada ao Kafka.

Por que isso é importante?
O Kafka garante a ordenação das mensagens apenas dentro de uma partição. Ao definir o ticker (ex: PETR4) como chave, o Kafka assegura que todas as mensagens desse ativo caiam sempre na mesma partição. Isso garante que qualquer consumidor lerá os eventos na ordem exata em que ocorreram, evitando que um preço antigo seja processado após um preço mais recente.

Destaques do Código:

  • Rate Limiting: Como utilizo o plano gratuito da Brapi, o código implementa um time.sleep(0.5) entre as chamadas para respeitar os limites da API e evitar throttling.
  • Data Mapping: O payload é transformado para um formato padronizado antes de ser enviado para o Kafka, garantindo que os serviços consumidores recebam apenas o necessário (ticker, price, volume e timestamp).
# Trecho do mapeamento de dados no main.py
payload = {
    "ticker": result.get("symbol"),
    "price": result.get("regularMarketPrice"),
    "volume": result.get("regularMarketVolume"),
    "updated_at": result.get("regularMarketTime")
}
Enter fullscreen mode Exit fullscreen mode

✅ Validando a Execução

Para garantir que a integração está funcionando, validei os dois pontos de saída:

  1. MongoDB: Verifiquei a coleção price_history no banco market_data_db, onde os documentos estão sendo inseridos com o campo created_at gerado pelo nosso repositório.

  1. Kafka: Através da interface de gestão, confirmei a chegada das mensagens com as chaves (tickers) e valores corretos no tópico.


🚀 Conclusão

Com este serviço rodando, nosso simulador agora "enxerga" o mercado em tempo real. O próximo desafio é consumir esses eventos de dentro das APIs Java para atualizar as carteiras dos usuários.

Ficou com alguma dúvida sobre a estratégia de ingestão ou sobre o uso do Kafka com Python? Deixe nos comentários!

Para acompanhar essa série clique aqui.


​🔎​ Sobre a série

⬅️ Post Anterior: Infraestrutura como Código.

📘​ Índice da Série: Guia da Série.


Links:

Top comments (0)