Hello, everyone!
Continuing the My Broker B3 series, today let's talk about the component that feeds the entire ecosystem with real-time data from the Brazilian financial market: The Broker Market Data API.
This Python-based microservice acts as an ingestor, connecting the external world (Brappi API) to our internal infrastructure.
🏗️ The Solution and Data Flow
The main objective is to ensure that asset prices are always up to date for other services. The data flow was designed in three main steps:
Scheduled Ingestion: The service iterates through a Watchlist of 50 assets, including Blue Chips (such as
PETR4andVALE3), REITs (FIIs), and ETFs.Historical Persistence (MongoDB): Before any processing, the complete payload is saved in MongoDB (
broker_market_data_db). This ensures we have an audit trail and data for future analysis.Event Streaming (Kafka): The updated price is published to the
trading-assets-market-data-v1topic. This allows any other microservice to react to these changes in real time.
🛠️ Implementation Details
I chose Python 3.12 for this component due to its efficiency in handling HTTP requests and its excellent ecosystem for data integration.
The Importance of the Kafka Message Key
A vital technical decision in this service was using the asset ticker as the message key when publishing to Kafka.
Why is this important?
Kafka guarantees message ordering only within a single partition. By setting the ticker (e.g., PETR4) as the key, Kafka ensures that all messages for that specific asset are always routed to the same partition. This guarantees that any consumer will process price updates in the exact order they occurred, preventing a race condition where an older price might be processed after a newer one.
Code Highlights:
-
Rate Limiting: Implemented a
time.sleep(0.5)between API calls to respect the limits of the Brapi free tier and avoid throttling. - Data Mapping: The payload is transformed into a standardized format (ticker, price, volume, timestamp) before transmission.
# Snippet of the Kafka production with keys in main.py
producer.produce(
topic=TOPIC_NAME,
key=ticker, # Ensures ordering per asset
value=json.dumps(payload).encode('utf-8'),
callback=delivery_report
)
✅ Validating the Execution
To ensure the integration is working as expected, I validated two main output points:
-
MongoDB: I verified the
price_historycollection in themarket_data_db, confirming that documents are being inserted with the correct created_at timestamp.
-
Kafka: Using the management UI, I confirmed that messages are arriving with the correct keys and values in the
trading-assets-market-data-v1topic.
🚀 Conclusion
With this service running, our simulator now "sees" the market in real time. The next challenge is to consume these events from within the Java microservices to update user portfolios and trigger order matching.
Do you have any questions about the ingestion strategy or using Kafka with Python? Let's discuss in the comments!
🔎 About the series
⬅️ Previous Post: Infrastructure as Code.
📘 Series Index: Series Guide.
Links:


Top comments (0)