Real-Time Cryptocurrency Data Pipeline
Streaming Analytics Platform for Digital Asset Monitoring
Enterprise Data Engineering Solution • Kafka • MongoDB • Docker • Python
Executive Summary
This project implements a robust, scalable real-time data pipeline designed to continuously monitor and store cryptocurrency price data from leading digital assets including Bitcoin (BTC), Ethereum (ETH), and Solana (SOL).
Key Objective
Enable real-time financial data ingestion, processing, and persistence for analytics, monitoring, and decision-making applications in the cryptocurrency market.
Core Capabilities
- Real-time data acquisition from Binance API
- Event-driven architecture using Apache Kafka
- Scalable NoSQL data persistence with MongoDB Atlas
- Containerized deployment for portability and consistency
- Monitoring and observability with Kafdrop and Grafana
System Architecture
┌─────────────┐ ┌──────────┐ ┌────────────┐ ┌──────────┐ ┌──────────────┐
│ Binance │────▶│ Producer │────▶│ Kafka │────▶│ Consumer │────▶│ MongoDB │
│ API │ │ (Python) │ │ Broker │ │ (Python) │ │ Atlas │
└─────────────┘ └──────────┘ └────────────┘ └──────────┘ └──────────────┘
Topics: BTC Collections:
ETH - btc
SOL - eth
- sol
Architecture Highlights
- Decoupled Design: Producer and consumer operate independently, ensuring fault tolerance and scalability
- Topic-Based Routing: Separate Kafka topics for each cryptocurrency enable parallel processing
- Cloud-Native Storage: MongoDB Atlas provides global accessibility and automated backups
- Containerization: Docker Compose orchestrates all infrastructure components
Technology Stack
| Technology | Purpose | Version |
|---|---|---|
| Apache Kafka | Distributed streaming platform for high-throughput message brokering | 7.6.1 |
| MongoDB Atlas | Cloud-hosted NoSQL database with automatic scaling | Latest |
| Python | Core application logic with kafka-python and pymongo | 3.8+ |
| Docker & Compose | Container orchestration for consistent deployment | 20.10+ |
| Kafdrop | Web UI for monitoring Kafka topics and consumers | Latest |
| Grafana | Observability platform for metrics visualization | Latest |
Development Process
Step 1: Infrastructure Setup with Docker Compose
First, we set up the complete infrastructure using Docker Compose. This creates isolated, reproducible environments for Kafka, Zookeeper, and monitoring tools.
File: docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.1
depends_on: [zookeeper]
ports:
- "9092:9092" # Internal listener
- "9094:9094" # External listener for producers/consumers
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafdrop:
image: obsidiandynamics/kafdrop:latest
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:9092"
depends_on: [kafka]
grafana:
image: grafana/grafana-enterprise
container_name: grafana
restart: unless-stopped
ports:
- '3000:3000'
Start the infrastructure:
# Start all services
docker-compose up -d
# Verify services are running
docker-compose ps
# Check Kafka logs
docker-compose logs kafka
# Access Kafdrop at http://localhost:9000
# Access Grafana at http://localhost:3000
Step 2: Data Generation Module
We created modular generator functions that fetch real-time cryptocurrency prices from the Binance API. Each function is designed as a generator for potential future expansion (e.g., historical data, multiple exchanges).
File: data_gen.py
import requests
import time
def btc():
"""Fetch current Bitcoin (BTC) price from Binance API"""
url = "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT"
response = requests.get(url)
data = response.json()
yield {
"symbol": data['symbol'],
"price": float(data['price'])
}
def eth():
"""Fetch current Ethereum (ETH) price from Binance API"""
url = "https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT"
response = requests.get(url)
data = response.json()
yield {
"symbol": data['symbol'],
"price": float(data['price'])
}
def sol():
"""Fetch current Solana (SOL) price from Binance API"""
url = "https://api.binance.com/api/v3/ticker/price?symbol=SOLUSDT"
response = requests.get(url)
data = response.json()
yield {
"symbol": data['symbol'],
"price": float(data['price'])
}
Design Decisions:
-
Generator Pattern: Using
yieldallows for easy extension to streaming multiple prices or batch fetching - Modular Functions: Separate functions for each cryptocurrency enable independent testing and maintenance
-
Type Conversion: Converting price to
floatensures consistent numeric operations downstream - Direct API Call: Simple REST API approach suitable for 3-second polling intervals
Testing the data generators:
# Test individual generators
from data_gen import btc, eth, sol
# Test BTC price fetch
for price_data in btc():
print(f"BTC Price: ${price_data['price']:,.2f}")
# Output: BTC Price: $67,234.50
Step 3: Kafka Producer Implementation
The producer fetches cryptocurrency prices and publishes them to dedicated Kafka topics. This component demonstrates the "fire-and-forget" pattern with JSON serialization.
File: producer.py
from kafka import KafkaProducer
from data_gen import btc, eth, sol
import json
import time
# Initialize Kafka Producer with JSON serialization
producer = KafkaProducer(
bootstrap_servers=['localhost:9094'], # Connect to external listener
value_serializer=lambda v: json.dumps(v).encode() # Serialize dict to JSON bytes
)
try:
while True:
# Fetch and publish BTC price
for r in btc():
producer.send("btc", r)
print(f"✓ Sent BTC: {r}")
# Fetch and publish ETH price
for r in eth():
producer.send("eth", r)
print(f"✓ Sent ETH: {r}")
# Fetch and publish SOL price
for r in sol():
producer.send("sol", r)
print(f"✓ Sent SOL: {r}")
time.sleep(3) # Poll every 3 seconds
except KeyboardInterrupt:
print("\n⚠ Producer stopped by user")
producer.close()
Key Implementation Details:
- Bootstrap Servers: Connect to port 9094 (external listener) from host machine
- Value Serializer: Lambda function converts Python dictionaries to JSON-encoded bytes
- Topic Routing: Each cryptocurrency has its own topic for independent consumption
- Polling Interval: 3-second sleep balances API rate limits with real-time requirements
- Graceful Shutdown: KeyboardInterrupt handler ensures proper connection closure
Running the producer:
# Install required packages
pip install kafka-python requests
# Run the producer
python producer.py
# Expected output:
# ✓ Sent BTC: {'symbol': 'BTCUSDT', 'price': 67234.5}
# ✓ Sent ETH: {'symbol': 'ETHUSDT', 'price': 3456.78}
# ✓ Sent SOL: {'symbol': 'SOLUSDT', 'price': 145.23}
Verify messages in Kafdrop:
- Open http://localhost:9000
- Click on
btc,eth, orsoltopics - View messages in real-time
Step 4: Kafka Consumer with MongoDB Persistence
The consumer subscribes to all cryptocurrency topics, processes incoming messages, and persists them to MongoDB collections based on the topic name.
File: consumer.py
from kafka import KafkaConsumer
from pymongo import MongoClient
import json
# Initialize Kafka Consumer
consumer = KafkaConsumer(
"btc", "eth", "sol", # Subscribe to multiple topics
bootstrap_servers=['localhost:9094'],
value_deserializer=lambda m: json.loads(m.decode()), # Deserialize JSON bytes to dict
enable_auto_commit=True, # Automatically commit offsets
auto_offset_reset="earliest" # Start from beginning if no offset exists
)
# Connect to MongoDB Atlas
mongo = MongoClient(
"mongodb+srv://kimtryx_db_user:3100@cluster0.nds95yl.mongodb.net/?appname=Cluster0"
)
# Select database
col = mongo.fx
# Consume messages indefinitely
for msg in consumer:
topic = msg.topic
value = msg.value
# Route to appropriate collection based on topic
if topic == 'btc':
col.btc.insert_one(value)
elif topic == 'eth':
col.eth.insert_one(value)
elif topic == 'sol':
col.sol.insert_one(value)
print(f"💾 Saved to {topic}: {value}")
Implementation Highlights:
- Multi-Topic Subscription: Single consumer handles all three cryptocurrencies
- Automatic Deserialization: Lambda function converts JSON bytes back to Python dictionaries
- Auto-Commit: Kafka automatically tracks consumption progress
-
Offset Reset Strategy:
earliestensures no messages are missed on first run - Topic-Based Routing: Clean if-elif chain maps topics to MongoDB collections
- MongoDB Atlas: Cloud database eliminates local database management
Running the consumer:
# Install MongoDB driver
pip install pymongo
# Run the consumer (in a separate terminal from producer)
python consumer.py
# Expected output:
# 💾 Saved to btc: {'symbol': 'BTCUSDT', 'price': 67234.5}
# 💾 Saved to eth: {'symbol': 'ETHUSDT', 'price': 3456.78}
# 💾 Saved to sol: {'symbol': 'SOLUSDT', 'price': 145.23}
Verify data in MongoDB:
from pymongo import MongoClient
client = MongoClient("your_connection_string")
db = client.fx
# Count documents in each collection
print(f"BTC documents: {db.btc.count_documents({})}")
print(f"ETH documents: {db.eth.count_documents({})}")
print(f"SOL documents: {db.sol.count_documents({})}")
# Fetch latest BTC price
latest_btc = db.btc.find_one(sort=[('_id', -1)])
print(f"Latest BTC: ${latest_btc['price']:,.2f}")
Step 5: Enhanced Consumer with Error Handling
For production readiness, we add comprehensive error handling, logging, and retry mechanisms.
File: consumer_enhanced.py
from kafka import KafkaConsumer
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
import json
import logging
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_mongo_client(max_retries=3):
"""Establish MongoDB connection with retry logic"""
for attempt in range(max_retries):
try:
client = MongoClient(
"mongodb+srv://user:pass@cluster.mongodb.net/",
serverSelectionTimeoutMS=5000
)
# Verify connection
client.admin.command('ping')
logger.info("✓ MongoDB connection established")
return client
except ConnectionFailure as e:
logger.error(f"MongoDB connection attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
raise Exception("Failed to connect to MongoDB after retries")
def main():
# Initialize connections
try:
mongo = get_mongo_client()
db = mongo.fx
consumer = KafkaConsumer(
"btc", "eth", "sol",
bootstrap_servers=['localhost:9094'],
value_deserializer=lambda m: json.loads(m.decode()),
enable_auto_commit=True,
auto_offset_reset="earliest",
max_poll_interval_ms=300000 # 5 minutes
)
logger.info("✓ Kafka consumer initialized")
logger.info("Listening for messages...")
except Exception as e:
logger.error(f"Initialization failed: {e}")
return
# Process messages
try:
for msg in consumer:
try:
topic = msg.topic
value = msg.value
# Validate message
if not value or 'symbol' not in value or 'price' not in value:
logger.warning(f"Invalid message format: {value}")
continue
# Insert into appropriate collection
collection = getattr(db, topic)
result = collection.insert_one(value)
logger.info(
f"💾 {topic.upper()}: ${value['price']:,.2f} "
f"(ID: {result.inserted_id})"
)
except OperationFailure as e:
logger.error(f"MongoDB operation failed: {e}")
except Exception as e:
logger.error(f"Error processing message: {e}")
except KeyboardInterrupt:
logger.info("Shutting down consumer...")
finally:
consumer.close()
mongo.close()
logger.info("✓ Connections closed")
if __name__ == "__main__":
main()
Production Features:
- Connection Retry Logic: Exponential backoff for MongoDB connection failures
- Message Validation: Ensures required fields exist before processing
- Structured Logging: Timestamp and severity level for each log entry
- Graceful Shutdown: Properly closes connections on exit
- Error Isolation: Continues processing even if individual messages fail
- Health Checks: Verifies MongoDB connection on startup
Data Flow Process
Producer Flow
1. Fetch Data (data_gen.py)
├─ HTTP GET → Binance API
├─ Parse JSON response
└─ Extract symbol & price
2. Serialize (producer.py)
├─ Convert dict → JSON string
├─ Encode string → bytes
└─ Return serialized message
3. Publish to Kafka
├─ Route to topic (btc/eth/sol)
├─ Kafka appends to partition
└─ Return acknowledgment
4. Repeat every 3 seconds
Message Format:
{
"symbol": "BTCUSDT",
"price": 67234.50
}
Consumer Flow
1. Poll Kafka Topics
├─ Fetch batch of messages
├─ Deserialize bytes → JSON → dict
└─ Extract topic metadata
2. Route by Topic
├─ btc → db.fx.btc
├─ eth → db.fx.eth
└─ sol → db.fx.sol
3. Persist to MongoDB
├─ Insert document
├─ Auto-generate _id
└─ Return insert result
4. Commit Offset
└─ Update consumer position
MongoDB Document Structure:
{
"_id": ObjectId("507f1f77bcf86cd799439011"),
"symbol": "BTCUSDT",
"price": 67234.50
}
Performance Metrics
Throughput Analysis
# Calculate message throughput
messages_per_cycle = 3 # BTC, ETH, SOL
cycle_duration = 3 # seconds
messages_per_minute = (messages_per_cycle / cycle_duration) * 60
messages_per_hour = messages_per_minute * 60
print(f"Messages per minute: {messages_per_minute}") # 60
print(f"Messages per hour: {messages_per_hour}") # 3,600
print(f"Messages per day: {messages_per_hour * 24}") # 86,400
Storage Growth Estimation
import json
# Sample message size
sample_message = {"symbol": "BTCUSDT", "price": 67234.50}
message_size_bytes = len(json.dumps(sample_message).encode())
# Add MongoDB overhead (_id field + document structure)
document_size = message_size_bytes + 50 # ~50 bytes overhead
# Daily storage per cryptocurrency
messages_per_day = 28800 # 24 hours * 60 min * 20 messages/min
daily_storage_mb = (document_size * messages_per_day) / (1024 * 1024)
print(f"Message size: {message_size_bytes} bytes")
print(f"Document size: {document_size} bytes")
print(f"Daily storage per crypto: {daily_storage_mb:.2f} MB")
print(f"Monthly storage (all 3): {daily_storage_mb * 30 * 3:.2f} MB")
Expected Output:
Message size: 38 bytes
Document size: 88 bytes
Daily storage per crypto: 2.42 MB
Monthly storage (all 3): 217.80 MB
Testing & Validation
Unit Testing the Data Generator
# test_data_gen.py
import unittest
from data_gen import btc, eth, sol
class TestDataGenerators(unittest.TestCase):
def test_btc_returns_valid_data(self):
"""Test BTC generator returns correct structure"""
for data in btc():
self.assertIn('symbol', data)
self.assertIn('price', data)
self.assertEqual(data['symbol'], 'BTCUSDT')
self.assertIsInstance(data['price'], float)
self.assertGreater(data['price'], 0)
break # Test only first yield
def test_eth_returns_valid_data(self):
"""Test ETH generator returns correct structure"""
for data in eth():
self.assertEqual(data['symbol'], 'ETHUSDT')
self.assertIsInstance(data['price'], float)
break
def test_price_precision(self):
"""Verify price has proper decimal precision"""
for data in btc():
# Bitcoin prices should have 2 decimal places
price_str = f"{data['price']:.2f}"
self.assertEqual(len(price_str.split('.')[1]), 2)
break
if __name__ == '__main__':
unittest.main()
Integration Testing
# test_integration.py
from kafka import KafkaProducer, KafkaConsumer
import json
import time
def test_end_to_end():
"""Test complete producer-consumer flow"""
# Setup
producer = KafkaProducer(
bootstrap_servers=['localhost:9094'],
value_serializer=lambda v: json.dumps(v).encode()
)
consumer = KafkaConsumer(
'btc',
bootstrap_servers=['localhost:9094'],
value_deserializer=lambda m: json.loads(m.decode()),
auto_offset_reset='latest',
consumer_timeout_ms=5000
)
# Test data
test_message = {"symbol": "BTCUSDT", "price": 12345.67}
# Send message
producer.send('btc', test_message)
producer.flush()
print("✓ Message sent")
# Consume message
time.sleep(1) # Allow propagation
for msg in consumer:
received = msg.value
assert received['symbol'] == test_message['symbol']
assert received['price'] == test_message['price']
print("✓ Message received and validated")
break
# Cleanup
producer.close()
consumer.close()
print("✓ Test passed")
if __name__ == '__main__':
test_end_to_end()
Monitoring & Observability
Kafka Topic Monitoring
# Create topics manually (optional - auto-created by producer)
docker exec -it <kafka_container> kafka-topics \
--create --topic btc \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# List all topics
docker exec -it <kafka_container> kafka-topics \
--list --bootstrap-server localhost:9092
# Describe topic details
docker exec -it <kafka_container> kafka-topics \
--describe --topic btc \
--bootstrap-server localhost:9092
# Check consumer group lag
docker exec -it <kafka_container> kafka-consumer-groups \
--describe --group my-consumer-group \
--bootstrap-server localhost:9092
MongoDB Query Examples
from pymongo import MongoClient
from datetime import datetime, timedelta
client = MongoClient("your_connection_string")
db = client.fx
# Get latest prices
def get_latest_prices():
btc_latest = db.btc.find_one(sort=[('_id', -1)])
eth_latest = db.eth.find_one(sort=[('_id', -1)])
sol_latest = db.sol.find_one(sort=[('_id', -1)])
return {
'BTC': btc_latest['price'],
'ETH': eth_latest['price'],
'SOL': sol_latest['price']
}
# Calculate price statistics
def get_price_stats(symbol='btc', limit=100):
collection = getattr(db, symbol)
prices = [doc['price'] for doc in collection.find().limit(limit)]
return {
'min': min(prices),
'max': max(prices),
'avg': sum(prices) / len(prices),
'count': len(prices)
}
# Find price movements
def detect_price_jumps(symbol='btc', threshold=0.02):
"""Find price changes greater than threshold (2%)"""
collection = getattr(db, symbol)
docs = list(collection.find().sort('_id', -1).limit(100))
jumps = []
for i in range(len(docs) - 1):
current = docs[i]['price']
previous = docs[i + 1]['price']
change = abs(current - previous) / previous
if change > threshold:
jumps.append({
'from': previous,
'to': current,
'change_pct': change * 100
})
return jumps
# Usage
print(get_latest_prices())
print(get_price_stats('btc'))
print(detect_price_jumps('btc', threshold=0.01))
Deployment Guide
Prerequisites
# Install Docker and Docker Compose
sudo apt-get update
sudo apt-get install docker.io docker-compose
# Install Python dependencies
pip install kafka-python pymongo requests
# Verify installations
docker --version
docker-compose --version
python --version
Step-by-Step Deployment
# 1. Clone repository
git clone https://github.com/yourusername/crypto-pipeline.git
cd crypto-pipeline
# 2. Start infrastructure
docker-compose up -d
# 3. Wait for Kafka to be ready (30-60 seconds)
docker-compose logs -f kafka | grep "started"
# 4. Run producer (in terminal 1)
python producer.py
# 5. Run consumer (in terminal 2)
python consumer.py
# 6. Monitor with Kafdrop
# Open browser: http://localhost:9000
# 7. View metrics in Grafana
# Open browser: http://localhost:3000
# Default credentials: admin/admin
Troubleshooting Common Issues
# Issue: Kafka not starting
docker-compose logs kafka
# Solution: Ensure ports 9092, 9094 are not in use
# Issue: Consumer not receiving messages
docker exec -it <kafka_container> kafka-topics --list --bootstrap-server localhost:9092
# Solution: Verify topics exist
# Issue: MongoDB connection timeout
# Solution: Check connection string, network access in Atlas
# Issue: Producer connection refused
# Solution: Wait for Kafka startup (~60 seconds)
# Check container health
docker-compose ps
docker stats
# Restart services
docker-compose restart kafka
docker-compose down && docker-compose up -d
Security Considerations
Current Implementation (Development)
- MongoDB credentials hardcoded (for demo purposes)
- Kafka running without authentication
- No TLS/SSL encryption
- Open ports on localhost
Production Hardening
# Use environment variables for credentials
import os
from dotenv import load_dotenv
load_dotenv()
MONGO_URI = os.getenv('MONGO_CONNECTION_STRING')
KAFKA_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9094')
KAFKA_USERNAME = os.getenv('KAFKA_USERNAME')
KAFKA_PASSWORD = os.getenv('KAFKA_PASSWORD')
# Secure MongoDB connection
from pymongo import MongoClient
from pymongo.encryption import ClientEncryption
client = MongoClient(
MONGO_URI,
tls=True,
tlsAllowInvalidCertificates=False
)
# Kafka with SASL authentication
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=[KAFKA_SERVERS],
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
sasl_plain_username=KAFKA_USERNAME,
sasl_plain_password=KAFKA_PASSWORD,
value_serializer=lambda v: json.dumps(v).encode()
)
Environment Variables (.env file):
# .env (DO NOT COMMIT TO GIT)
MONGO_CONNECTION_STRING=mongodb+srv://user:pass@cluster.mongodb.net/
KAFKA_BOOTSTRAP_SERVERS=kafka.example.com:9094
KAFKA_USERNAME=your_username
KAFKA_PASSWORD=your_password
Future Enhancements
Phase 1: Reliability & Resilience
# Add circuit breaker for API calls
from pybreaker import CircuitBreaker
api_breaker = CircuitBreaker(fail_max=5, timeout_duration=60)
@api_breaker
def fetch_price(url):
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
# Implement retry logic with exponential backoff
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def publish_to_kafka(producer, topic, data):
future = producer.send(topic, data)
return future.get(timeout=10) # Block until send completes
Phase 2: Advanced Analytics
# Real-time moving average calculation
from collections import deque
class PriceAnalyzer:
def __init__(self, window_size=20):
self.prices = deque(maxlen=window_size)
def add_price(self, price):
self.prices.append(price)
def get_moving_average(self):
return sum(self.prices) / len(self.prices) if self.prices else 0
def get_volatility(self):
if len(self.prices) < 2:
return 0
avg = self.get_moving_average()
variance = sum((p - avg) ** 2 for p in self.prices) / len(self.prices)
return variance ** 0.5
# Usage in consumer
analyzer = PriceAnalyzer()
for msg in consumer:
price = msg.value['price']
analyzer.add_price(price)
if len(analyzer.prices) == 20:
ma = analyzer.get_moving_average()
vol = analyzer.get_volatility()
print(f"MA: ${ma:.2f}, Volatility: ${vol:.2f}")
Phase 3: Scalability
# Multi-threaded producer for higher throughput
from concurrent.futures import ThreadPoolExecutor
def produce_crypto_data(crypto_func, topic, producer):
for data in crypto_func():
producer.send(topic, data)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(produce_crypto_data, btc, 'btc', producer)
executor.submit(produce_crypto_data, eth, 'eth', producer)
executor.submit(produce_crypto_data, sol, 'sol', producer)
Conclusion
This cryptocurrency data pipeline demonstrates a production-ready approach to real-time financial data engineering, combining industry-standard technologies with cloud-native practices to deliver a scalable, maintainable, and extensible solution.
Key Achievements
✅ Real-time data ingestion from external APIs
✅ Event-driven architecture with Kafka
✅ Persistent storage in cloud database
✅ Containerized deployment
✅ Monitoring and observability
✅ Modular, testable codebase
Lessons Learned
- Decoupling is Critical: Separating producers and consumers enables independent scaling and fault tolerance
- Message Ordering: Kafka partitions guarantee ordering within a partition, essential for time-series data
- Schema Evolution: JSON flexibility allows easy field additions without breaking consumers
- Monitoring is Essential: Kafdrop and logging provide crucial visibility into system behavior
Project Metrics
- Lines of Code: ~150 (excluding configuration)
- Response Time: <1 second from API to database
- Throughput: 3,600 messages/hour
- Storage Efficiency: ~218 MB/month for 3 cryptocurrencies
- Uptime Target: 99.9% (with proper error handling)
Repository Structure
crypto-pipeline/
Top comments (0)