DEV Community

Lagat Josiah
Lagat Josiah

Posted on

Real-Time Cryptocurrency Data Pipeline

Real-Time Cryptocurrency Data Pipeline

Streaming Analytics Platform for Digital Asset Monitoring

Enterprise Data Engineering Solution • Kafka • MongoDB • Docker • Python


Executive Summary

This project implements a robust, scalable real-time data pipeline designed to continuously monitor and store cryptocurrency price data from leading digital assets including Bitcoin (BTC), Ethereum (ETH), and Solana (SOL).

Key Objective

Enable real-time financial data ingestion, processing, and persistence for analytics, monitoring, and decision-making applications in the cryptocurrency market.

Core Capabilities

  • Real-time data acquisition from Binance API
  • Event-driven architecture using Apache Kafka
  • Scalable NoSQL data persistence with MongoDB Atlas
  • Containerized deployment for portability and consistency
  • Monitoring and observability with Kafdrop and Grafana

System Architecture

┌─────────────┐     ┌──────────┐     ┌────────────┐     ┌──────────┐     ┌──────────────┐
│   Binance   │────▶│ Producer │────▶│   Kafka    │────▶│ Consumer │────▶│   MongoDB    │
│     API     │     │ (Python) │     │   Broker   │     │ (Python) │     │    Atlas     │
└─────────────┘     └──────────┘     └────────────┘     └──────────┘     └──────────────┘
                                      Topics: BTC                           Collections:
                                             ETH                            - btc
                                             SOL                            - eth
                                                                            - sol
Enter fullscreen mode Exit fullscreen mode

Architecture Highlights

  • Decoupled Design: Producer and consumer operate independently, ensuring fault tolerance and scalability
  • Topic-Based Routing: Separate Kafka topics for each cryptocurrency enable parallel processing
  • Cloud-Native Storage: MongoDB Atlas provides global accessibility and automated backups
  • Containerization: Docker Compose orchestrates all infrastructure components

Technology Stack

Technology Purpose Version
Apache Kafka Distributed streaming platform for high-throughput message brokering 7.6.1
MongoDB Atlas Cloud-hosted NoSQL database with automatic scaling Latest
Python Core application logic with kafka-python and pymongo 3.8+
Docker & Compose Container orchestration for consistent deployment 20.10+
Kafdrop Web UI for monitoring Kafka topics and consumers Latest
Grafana Observability platform for metrics visualization Latest

Development Process

Step 1: Infrastructure Setup with Docker Compose

First, we set up the complete infrastructure using Docker Compose. This creates isolated, reproducible environments for Kafka, Zookeeper, and monitoring tools.

File: docker-compose.yml

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.1
    depends_on: [zookeeper]
    ports:
      - "9092:9092"  # Internal listener
      - "9094:9094"  # External listener for producers/consumers
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
    depends_on: [kafka]

  grafana:
    image: grafana/grafana-enterprise
    container_name: grafana
    restart: unless-stopped
    ports:
      - '3000:3000'
Enter fullscreen mode Exit fullscreen mode

Start the infrastructure:

# Start all services
docker-compose up -d

# Verify services are running
docker-compose ps

# Check Kafka logs
docker-compose logs kafka

# Access Kafdrop at http://localhost:9000
# Access Grafana at http://localhost:3000
Enter fullscreen mode Exit fullscreen mode

Step 2: Data Generation Module

We created modular generator functions that fetch real-time cryptocurrency prices from the Binance API. Each function is designed as a generator for potential future expansion (e.g., historical data, multiple exchanges).

File: data_gen.py

import requests
import time

def btc():
    """Fetch current Bitcoin (BTC) price from Binance API"""
    url = "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT"
    response = requests.get(url)
    data = response.json()

    yield {
        "symbol": data['symbol'],
        "price": float(data['price'])
    }

def eth():
    """Fetch current Ethereum (ETH) price from Binance API"""
    url = "https://api.binance.com/api/v3/ticker/price?symbol=ETHUSDT"
    response = requests.get(url)
    data = response.json()

    yield {
        "symbol": data['symbol'],
        "price": float(data['price'])
    }

def sol():
    """Fetch current Solana (SOL) price from Binance API"""
    url = "https://api.binance.com/api/v3/ticker/price?symbol=SOLUSDT"
    response = requests.get(url)
    data = response.json()

    yield {
        "symbol": data['symbol'],
        "price": float(data['price'])
    }
Enter fullscreen mode Exit fullscreen mode

Design Decisions:

  • Generator Pattern: Using yield allows for easy extension to streaming multiple prices or batch fetching
  • Modular Functions: Separate functions for each cryptocurrency enable independent testing and maintenance
  • Type Conversion: Converting price to float ensures consistent numeric operations downstream
  • Direct API Call: Simple REST API approach suitable for 3-second polling intervals

Testing the data generators:

# Test individual generators
from data_gen import btc, eth, sol

# Test BTC price fetch
for price_data in btc():
    print(f"BTC Price: ${price_data['price']:,.2f}")

# Output: BTC Price: $67,234.50
Enter fullscreen mode Exit fullscreen mode

Step 3: Kafka Producer Implementation

The producer fetches cryptocurrency prices and publishes them to dedicated Kafka topics. This component demonstrates the "fire-and-forget" pattern with JSON serialization.

File: producer.py

from kafka import KafkaProducer 
from data_gen import btc, eth, sol
import json
import time

# Initialize Kafka Producer with JSON serialization
producer = KafkaProducer(
    bootstrap_servers=['localhost:9094'],  # Connect to external listener
    value_serializer=lambda v: json.dumps(v).encode()  # Serialize dict to JSON bytes
)

try:
    while True:
        # Fetch and publish BTC price
        for r in btc():
            producer.send("btc", r)
            print(f"✓ Sent BTC: {r}")

        # Fetch and publish ETH price
        for r in eth():
            producer.send("eth", r)
            print(f"✓ Sent ETH: {r}")

        # Fetch and publish SOL price
        for r in sol():
            producer.send("sol", r)
            print(f"✓ Sent SOL: {r}")

        time.sleep(3)  # Poll every 3 seconds

except KeyboardInterrupt:
    print("\n⚠ Producer stopped by user")
    producer.close()
Enter fullscreen mode Exit fullscreen mode

Key Implementation Details:

  • Bootstrap Servers: Connect to port 9094 (external listener) from host machine
  • Value Serializer: Lambda function converts Python dictionaries to JSON-encoded bytes
  • Topic Routing: Each cryptocurrency has its own topic for independent consumption
  • Polling Interval: 3-second sleep balances API rate limits with real-time requirements
  • Graceful Shutdown: KeyboardInterrupt handler ensures proper connection closure

Running the producer:

# Install required packages
pip install kafka-python requests

# Run the producer
python producer.py

# Expected output:
# ✓ Sent BTC: {'symbol': 'BTCUSDT', 'price': 67234.5}
# ✓ Sent ETH: {'symbol': 'ETHUSDT', 'price': 3456.78}
# ✓ Sent SOL: {'symbol': 'SOLUSDT', 'price': 145.23}
Enter fullscreen mode Exit fullscreen mode

Verify messages in Kafdrop:

  1. Open http://localhost:9000
  2. Click on btc, eth, or sol topics
  3. View messages in real-time

Step 4: Kafka Consumer with MongoDB Persistence

The consumer subscribes to all cryptocurrency topics, processes incoming messages, and persists them to MongoDB collections based on the topic name.

File: consumer.py

from kafka import KafkaConsumer
from pymongo import MongoClient
import json

# Initialize Kafka Consumer
consumer = KafkaConsumer(
    "btc", "eth", "sol",  # Subscribe to multiple topics
    bootstrap_servers=['localhost:9094'],
    value_deserializer=lambda m: json.loads(m.decode()),  # Deserialize JSON bytes to dict
    enable_auto_commit=True,  # Automatically commit offsets
    auto_offset_reset="earliest"  # Start from beginning if no offset exists
)

# Connect to MongoDB Atlas
mongo = MongoClient(
    "mongodb+srv://kimtryx_db_user:3100@cluster0.nds95yl.mongodb.net/?appname=Cluster0"
)

# Select database
col = mongo.fx

# Consume messages indefinitely
for msg in consumer:
    topic = msg.topic
    value = msg.value

    # Route to appropriate collection based on topic
    if topic == 'btc':
        col.btc.insert_one(value)
    elif topic == 'eth':
        col.eth.insert_one(value)
    elif topic == 'sol':
        col.sol.insert_one(value)

    print(f"💾 Saved to {topic}: {value}")
Enter fullscreen mode Exit fullscreen mode

Implementation Highlights:

  • Multi-Topic Subscription: Single consumer handles all three cryptocurrencies
  • Automatic Deserialization: Lambda function converts JSON bytes back to Python dictionaries
  • Auto-Commit: Kafka automatically tracks consumption progress
  • Offset Reset Strategy: earliest ensures no messages are missed on first run
  • Topic-Based Routing: Clean if-elif chain maps topics to MongoDB collections
  • MongoDB Atlas: Cloud database eliminates local database management

Running the consumer:

# Install MongoDB driver
pip install pymongo

# Run the consumer (in a separate terminal from producer)
python consumer.py

# Expected output:
# 💾 Saved to btc: {'symbol': 'BTCUSDT', 'price': 67234.5}
# 💾 Saved to eth: {'symbol': 'ETHUSDT', 'price': 3456.78}
# 💾 Saved to sol: {'symbol': 'SOLUSDT', 'price': 145.23}
Enter fullscreen mode Exit fullscreen mode

Verify data in MongoDB:

from pymongo import MongoClient

client = MongoClient("your_connection_string")
db = client.fx

# Count documents in each collection
print(f"BTC documents: {db.btc.count_documents({})}")
print(f"ETH documents: {db.eth.count_documents({})}")
print(f"SOL documents: {db.sol.count_documents({})}")

# Fetch latest BTC price
latest_btc = db.btc.find_one(sort=[('_id', -1)])
print(f"Latest BTC: ${latest_btc['price']:,.2f}")
Enter fullscreen mode Exit fullscreen mode

Step 5: Enhanced Consumer with Error Handling

For production readiness, we add comprehensive error handling, logging, and retry mechanisms.

File: consumer_enhanced.py

from kafka import KafkaConsumer
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
import json
import logging
import time

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def get_mongo_client(max_retries=3):
    """Establish MongoDB connection with retry logic"""
    for attempt in range(max_retries):
        try:
            client = MongoClient(
                "mongodb+srv://user:pass@cluster.mongodb.net/",
                serverSelectionTimeoutMS=5000
            )
            # Verify connection
            client.admin.command('ping')
            logger.info("✓ MongoDB connection established")
            return client
        except ConnectionFailure as e:
            logger.error(f"MongoDB connection attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
    raise Exception("Failed to connect to MongoDB after retries")

def main():
    # Initialize connections
    try:
        mongo = get_mongo_client()
        db = mongo.fx

        consumer = KafkaConsumer(
            "btc", "eth", "sol",
            bootstrap_servers=['localhost:9094'],
            value_deserializer=lambda m: json.loads(m.decode()),
            enable_auto_commit=True,
            auto_offset_reset="earliest",
            max_poll_interval_ms=300000  # 5 minutes
        )

        logger.info("✓ Kafka consumer initialized")
        logger.info("Listening for messages...")

    except Exception as e:
        logger.error(f"Initialization failed: {e}")
        return

    # Process messages
    try:
        for msg in consumer:
            try:
                topic = msg.topic
                value = msg.value

                # Validate message
                if not value or 'symbol' not in value or 'price' not in value:
                    logger.warning(f"Invalid message format: {value}")
                    continue

                # Insert into appropriate collection
                collection = getattr(db, topic)
                result = collection.insert_one(value)

                logger.info(
                    f"💾 {topic.upper()}: ${value['price']:,.2f} "
                    f"(ID: {result.inserted_id})"
                )

            except OperationFailure as e:
                logger.error(f"MongoDB operation failed: {e}")
            except Exception as e:
                logger.error(f"Error processing message: {e}")

    except KeyboardInterrupt:
        logger.info("Shutting down consumer...")
    finally:
        consumer.close()
        mongo.close()
        logger.info("✓ Connections closed")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Production Features:

  • Connection Retry Logic: Exponential backoff for MongoDB connection failures
  • Message Validation: Ensures required fields exist before processing
  • Structured Logging: Timestamp and severity level for each log entry
  • Graceful Shutdown: Properly closes connections on exit
  • Error Isolation: Continues processing even if individual messages fail
  • Health Checks: Verifies MongoDB connection on startup

Data Flow Process

Producer Flow

1. Fetch Data (data_gen.py)
   ├─ HTTP GET → Binance API
   ├─ Parse JSON response
   └─ Extract symbol & price

2. Serialize (producer.py)
   ├─ Convert dict → JSON string
   ├─ Encode string → bytes
   └─ Return serialized message

3. Publish to Kafka
   ├─ Route to topic (btc/eth/sol)
   ├─ Kafka appends to partition
   └─ Return acknowledgment

4. Repeat every 3 seconds
Enter fullscreen mode Exit fullscreen mode

Message Format:

{
  "symbol": "BTCUSDT",
  "price": 67234.50
}
Enter fullscreen mode Exit fullscreen mode

Consumer Flow

1. Poll Kafka Topics
   ├─ Fetch batch of messages
   ├─ Deserialize bytes → JSON → dict
   └─ Extract topic metadata

2. Route by Topic
   ├─ btc → db.fx.btc
   ├─ eth → db.fx.eth
   └─ sol → db.fx.sol

3. Persist to MongoDB
   ├─ Insert document
   ├─ Auto-generate _id
   └─ Return insert result

4. Commit Offset
   └─ Update consumer position
Enter fullscreen mode Exit fullscreen mode

MongoDB Document Structure:

{
  "_id": ObjectId("507f1f77bcf86cd799439011"),
  "symbol": "BTCUSDT",
  "price": 67234.50
}
Enter fullscreen mode Exit fullscreen mode

Performance Metrics

Throughput Analysis

# Calculate message throughput
messages_per_cycle = 3  # BTC, ETH, SOL
cycle_duration = 3  # seconds
messages_per_minute = (messages_per_cycle / cycle_duration) * 60
messages_per_hour = messages_per_minute * 60

print(f"Messages per minute: {messages_per_minute}")  # 60
print(f"Messages per hour: {messages_per_hour}")      # 3,600
print(f"Messages per day: {messages_per_hour * 24}")  # 86,400
Enter fullscreen mode Exit fullscreen mode

Storage Growth Estimation

import json

# Sample message size
sample_message = {"symbol": "BTCUSDT", "price": 67234.50}
message_size_bytes = len(json.dumps(sample_message).encode())

# Add MongoDB overhead (_id field + document structure)
document_size = message_size_bytes + 50  # ~50 bytes overhead

# Daily storage per cryptocurrency
messages_per_day = 28800  # 24 hours * 60 min * 20 messages/min
daily_storage_mb = (document_size * messages_per_day) / (1024 * 1024)

print(f"Message size: {message_size_bytes} bytes")
print(f"Document size: {document_size} bytes")
print(f"Daily storage per crypto: {daily_storage_mb:.2f} MB")
print(f"Monthly storage (all 3): {daily_storage_mb * 30 * 3:.2f} MB")
Enter fullscreen mode Exit fullscreen mode

Expected Output:

Message size: 38 bytes
Document size: 88 bytes
Daily storage per crypto: 2.42 MB
Monthly storage (all 3): 217.80 MB
Enter fullscreen mode Exit fullscreen mode

Testing & Validation

Unit Testing the Data Generator

# test_data_gen.py
import unittest
from data_gen import btc, eth, sol

class TestDataGenerators(unittest.TestCase):

    def test_btc_returns_valid_data(self):
        """Test BTC generator returns correct structure"""
        for data in btc():
            self.assertIn('symbol', data)
            self.assertIn('price', data)
            self.assertEqual(data['symbol'], 'BTCUSDT')
            self.assertIsInstance(data['price'], float)
            self.assertGreater(data['price'], 0)
            break  # Test only first yield

    def test_eth_returns_valid_data(self):
        """Test ETH generator returns correct structure"""
        for data in eth():
            self.assertEqual(data['symbol'], 'ETHUSDT')
            self.assertIsInstance(data['price'], float)
            break

    def test_price_precision(self):
        """Verify price has proper decimal precision"""
        for data in btc():
            # Bitcoin prices should have 2 decimal places
            price_str = f"{data['price']:.2f}"
            self.assertEqual(len(price_str.split('.')[1]), 2)
            break

if __name__ == '__main__':
    unittest.main()
Enter fullscreen mode Exit fullscreen mode

Integration Testing

# test_integration.py
from kafka import KafkaProducer, KafkaConsumer
import json
import time

def test_end_to_end():
    """Test complete producer-consumer flow"""

    # Setup
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9094'],
        value_serializer=lambda v: json.dumps(v).encode()
    )

    consumer = KafkaConsumer(
        'btc',
        bootstrap_servers=['localhost:9094'],
        value_deserializer=lambda m: json.loads(m.decode()),
        auto_offset_reset='latest',
        consumer_timeout_ms=5000
    )

    # Test data
    test_message = {"symbol": "BTCUSDT", "price": 12345.67}

    # Send message
    producer.send('btc', test_message)
    producer.flush()
    print("✓ Message sent")

    # Consume message
    time.sleep(1)  # Allow propagation
    for msg in consumer:
        received = msg.value
        assert received['symbol'] == test_message['symbol']
        assert received['price'] == test_message['price']
        print("✓ Message received and validated")
        break

    # Cleanup
    producer.close()
    consumer.close()
    print("✓ Test passed")

if __name__ == '__main__':
    test_end_to_end()
Enter fullscreen mode Exit fullscreen mode

Monitoring & Observability

Kafka Topic Monitoring

# Create topics manually (optional - auto-created by producer)
docker exec -it <kafka_container> kafka-topics \
  --create --topic btc \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

# List all topics
docker exec -it <kafka_container> kafka-topics \
  --list --bootstrap-server localhost:9092

# Describe topic details
docker exec -it <kafka_container> kafka-topics \
  --describe --topic btc \
  --bootstrap-server localhost:9092

# Check consumer group lag
docker exec -it <kafka_container> kafka-consumer-groups \
  --describe --group my-consumer-group \
  --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

MongoDB Query Examples

from pymongo import MongoClient
from datetime import datetime, timedelta

client = MongoClient("your_connection_string")
db = client.fx

# Get latest prices
def get_latest_prices():
    btc_latest = db.btc.find_one(sort=[('_id', -1)])
    eth_latest = db.eth.find_one(sort=[('_id', -1)])
    sol_latest = db.sol.find_one(sort=[('_id', -1)])

    return {
        'BTC': btc_latest['price'],
        'ETH': eth_latest['price'],
        'SOL': sol_latest['price']
    }

# Calculate price statistics
def get_price_stats(symbol='btc', limit=100):
    collection = getattr(db, symbol)
    prices = [doc['price'] for doc in collection.find().limit(limit)]

    return {
        'min': min(prices),
        'max': max(prices),
        'avg': sum(prices) / len(prices),
        'count': len(prices)
    }

# Find price movements
def detect_price_jumps(symbol='btc', threshold=0.02):
    """Find price changes greater than threshold (2%)"""
    collection = getattr(db, symbol)
    docs = list(collection.find().sort('_id', -1).limit(100))

    jumps = []
    for i in range(len(docs) - 1):
        current = docs[i]['price']
        previous = docs[i + 1]['price']
        change = abs(current - previous) / previous

        if change > threshold:
            jumps.append({
                'from': previous,
                'to': current,
                'change_pct': change * 100
            })

    return jumps

# Usage
print(get_latest_prices())
print(get_price_stats('btc'))
print(detect_price_jumps('btc', threshold=0.01))
Enter fullscreen mode Exit fullscreen mode

Deployment Guide

Prerequisites

# Install Docker and Docker Compose
sudo apt-get update
sudo apt-get install docker.io docker-compose

# Install Python dependencies
pip install kafka-python pymongo requests

# Verify installations
docker --version
docker-compose --version
python --version
Enter fullscreen mode Exit fullscreen mode

Step-by-Step Deployment

# 1. Clone repository
git clone https://github.com/yourusername/crypto-pipeline.git
cd crypto-pipeline

# 2. Start infrastructure
docker-compose up -d

# 3. Wait for Kafka to be ready (30-60 seconds)
docker-compose logs -f kafka | grep "started"

# 4. Run producer (in terminal 1)
python producer.py

# 5. Run consumer (in terminal 2)
python consumer.py

# 6. Monitor with Kafdrop
# Open browser: http://localhost:9000

# 7. View metrics in Grafana
# Open browser: http://localhost:3000
# Default credentials: admin/admin
Enter fullscreen mode Exit fullscreen mode

Troubleshooting Common Issues

# Issue: Kafka not starting
docker-compose logs kafka
# Solution: Ensure ports 9092, 9094 are not in use

# Issue: Consumer not receiving messages
docker exec -it <kafka_container> kafka-topics --list --bootstrap-server localhost:9092
# Solution: Verify topics exist

# Issue: MongoDB connection timeout
# Solution: Check connection string, network access in Atlas

# Issue: Producer connection refused
# Solution: Wait for Kafka startup (~60 seconds)

# Check container health
docker-compose ps
docker stats

# Restart services
docker-compose restart kafka
docker-compose down && docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Security Considerations

Current Implementation (Development)

  • MongoDB credentials hardcoded (for demo purposes)
  • Kafka running without authentication
  • No TLS/SSL encryption
  • Open ports on localhost

Production Hardening

# Use environment variables for credentials
import os
from dotenv import load_dotenv

load_dotenv()

MONGO_URI = os.getenv('MONGO_CONNECTION_STRING')
KAFKA_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9094')
KAFKA_USERNAME = os.getenv('KAFKA_USERNAME')
KAFKA_PASSWORD = os.getenv('KAFKA_PASSWORD')

# Secure MongoDB connection
from pymongo import MongoClient
from pymongo.encryption import ClientEncryption

client = MongoClient(
    MONGO_URI,
    tls=True,
    tlsAllowInvalidCertificates=False
)

# Kafka with SASL authentication
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=[KAFKA_SERVERS],
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    sasl_plain_username=KAFKA_USERNAME,
    sasl_plain_password=KAFKA_PASSWORD,
    value_serializer=lambda v: json.dumps(v).encode()
)
Enter fullscreen mode Exit fullscreen mode

Environment Variables (.env file):

# .env (DO NOT COMMIT TO GIT)
MONGO_CONNECTION_STRING=mongodb+srv://user:pass@cluster.mongodb.net/
KAFKA_BOOTSTRAP_SERVERS=kafka.example.com:9094
KAFKA_USERNAME=your_username
KAFKA_PASSWORD=your_password
Enter fullscreen mode Exit fullscreen mode

Future Enhancements

Phase 1: Reliability & Resilience

# Add circuit breaker for API calls
from pybreaker import CircuitBreaker

api_breaker = CircuitBreaker(fail_max=5, timeout_duration=60)

@api_breaker
def fetch_price(url):
    response = requests.get(url, timeout=5)
    response.raise_for_status()
    return response.json()

# Implement retry logic with exponential backoff
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def publish_to_kafka(producer, topic, data):
    future = producer.send(topic, data)
    return future.get(timeout=10)  # Block until send completes
Enter fullscreen mode Exit fullscreen mode

Phase 2: Advanced Analytics

# Real-time moving average calculation
from collections import deque

class PriceAnalyzer:
    def __init__(self, window_size=20):
        self.prices = deque(maxlen=window_size)

    def add_price(self, price):
        self.prices.append(price)

    def get_moving_average(self):
        return sum(self.prices) / len(self.prices) if self.prices else 0

    def get_volatility(self):
        if len(self.prices) < 2:
            return 0
        avg = self.get_moving_average()
        variance = sum((p - avg) ** 2 for p in self.prices) / len(self.prices)
        return variance ** 0.5

# Usage in consumer
analyzer = PriceAnalyzer()
for msg in consumer:
    price = msg.value['price']
    analyzer.add_price(price)

    if len(analyzer.prices) == 20:
        ma = analyzer.get_moving_average()
        vol = analyzer.get_volatility()
        print(f"MA: ${ma:.2f}, Volatility: ${vol:.2f}")
Enter fullscreen mode Exit fullscreen mode

Phase 3: Scalability

# Multi-threaded producer for higher throughput
from concurrent.futures import ThreadPoolExecutor

def produce_crypto_data(crypto_func, topic, producer):
    for data in crypto_func():
        producer.send(topic, data)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(produce_crypto_data, btc, 'btc', producer)
    executor.submit(produce_crypto_data, eth, 'eth', producer)
    executor.submit(produce_crypto_data, sol, 'sol', producer)
Enter fullscreen mode Exit fullscreen mode

Conclusion

This cryptocurrency data pipeline demonstrates a production-ready approach to real-time financial data engineering, combining industry-standard technologies with cloud-native practices to deliver a scalable, maintainable, and extensible solution.

Key Achievements

✅ Real-time data ingestion from external APIs

✅ Event-driven architecture with Kafka

✅ Persistent storage in cloud database

✅ Containerized deployment

✅ Monitoring and observability

✅ Modular, testable codebase

Lessons Learned

  • Decoupling is Critical: Separating producers and consumers enables independent scaling and fault tolerance
  • Message Ordering: Kafka partitions guarantee ordering within a partition, essential for time-series data
  • Schema Evolution: JSON flexibility allows easy field additions without breaking consumers
  • Monitoring is Essential: Kafdrop and logging provide crucial visibility into system behavior

Project Metrics

  • Lines of Code: ~150 (excluding configuration)
  • Response Time: <1 second from API to database
  • Throughput: 3,600 messages/hour
  • Storage Efficiency: ~218 MB/month for 3 cryptocurrencies
  • Uptime Target: 99.9% (with proper error handling)

Repository Structure



crypto-pipeline/
Enter fullscreen mode Exit fullscreen mode

Top comments (0)