DEV Community

Roberto de Vargas Neto
Roberto de Vargas Neto

Posted on

Market Data Integrator: Consuming Real-Time Data with Python, MongoDB, and Kafka

Hello, everyone!

Continuing the My Broker B3 series, today let's talk about the component that feeds the entire ecosystem with real-time data from the Brazilian financial market: The Broker Market Data API.

This Python-based microservice acts as an ingestor, connecting the external world (Brappi API) to our internal infrastructure.


🏗️ The Solution and Data Flow

The main objective is to ensure that asset prices are always up to date for other services. The data flow was designed in three main steps:

  1. Scheduled Ingestion: The service iterates through a Watchlist of 50 assets, including Blue Chips (such as PETR4 and VALE3), REITs (FIIs), and ETFs.

  2. Historical Persistence (MongoDB): Before any processing, the complete payload is saved in MongoDB (broker_market_data_db). This ensures we have an audit trail and data for future analysis.

  3. Event Streaming (Kafka): The updated price is published to the trading-assets-market-data-v1 topic. This allows any other microservice to react to these changes in real time.


🛠️ Implementation Details

I chose Python 3.12 for this component due to its efficiency in handling HTTP requests and its excellent ecosystem for data integration.

The Importance of the Kafka Message Key

A vital technical decision in this service was using the asset ticker as the message key when publishing to Kafka.

Why is this important?
Kafka guarantees message ordering only within a single partition. By setting the ticker (e.g., PETR4) as the key, Kafka ensures that all messages for that specific asset are always routed to the same partition. This guarantees that any consumer will process price updates in the exact order they occurred, preventing a race condition where an older price might be processed after a newer one.

Code Highlights:

  • Rate Limiting: Implemented a time.sleep(0.5) between API calls to respect the limits of the Brapi free tier and avoid throttling.
  • Data Mapping: The payload is transformed into a standardized format (ticker, price, volume, timestamp) before transmission.
# Snippet of the Kafka production with keys in main.py
producer.produce(
    topic=TOPIC_NAME,
    key=ticker, # Ensures ordering per asset
    value=json.dumps(payload).encode('utf-8'),
    callback=delivery_report
)
Enter fullscreen mode Exit fullscreen mode

✅ Validating the Execution

To ensure the integration is working as expected, I validated two main output points:

  1. MongoDB: I verified the price_history collection in the market_data_db, confirming that documents are being inserted with the correct created_at timestamp.

  1. Kafka: Using the management UI, I confirmed that messages are arriving with the correct keys and values in the trading-assets-market-data-v1 topic.


🚀 Conclusion

With this service running, our simulator now "sees" the market in real time. The next challenge is to consume these events from within the Java microservices to update user portfolios and trigger order matching.

Do you have any questions about the ingestion strategy or using Kafka with Python? Let's discuss in the comments!


🔎 About the series

⬅️ Previous Post: Infrastructure as Code.

📘​ Series Index: Series Guide.


Links:

Top comments (0)