DEV Community

Cover image for Crypto Real-Time Data Pipeline
Lagat Josiah
Lagat Josiah

Posted on

Crypto Real-Time Data Pipeline

CH02-2025 Project 101: Crypto Real-Time Data Pipeline

Complete Implementation Summary


🎯 Project Overview

A production-ready real-time data pipeline that:

  • Ingests cryptocurrency data from Binance API
  • Stores in MySQL (Aiven Cloud) with SSL
  • Replicates via CDC (Debezium) to Kafka
  • Stores time-series data in Cassandra
  • Visualizes metrics in Grafana dashboards

πŸ“‹ Files Created (13 Files)

Configuration Files

  1. docker-compose.yml - Orchestrates 7 services (Cassandra, Zookeeper, Kafka, Kafka Connect, Grafana, Python app)
  2. Dockerfile - Python 3.12.3 + Java 17 for PySpark
  3. requirements.txt - 9 Python dependencies
  4. .env - Aiven MySQL credentials (secured)
  5. .gitignore - Excludes sensitive files

Python Application (src/)

  1. main.py - Main orchestrator with threading
  2. binance_api.py - API client for 5 endpoints
  3. mysql_handler.py - MySQL operations with SSL
  4. cassandra_handler.py - Cassandra operations + materialized views
  5. debezium_connector.py - CDC connector setup
  6. kafka_consumer.py - Kafka consumer + Cassandra writer

Grafana Configuration

  1. grafana/provisioning/datasources/cassandra.yml - Data source
  2. grafana/provisioning/dashboards/dashboard.yml - Dashboard provisioning
  3. grafana/provisioning/dashboards/crypto-dashboard.json - 5 visualizations

Documentation & Scripts

  1. README.md - Comprehensive documentation
  2. QUICKSTART.md - Step-by-step Windows/WSL2 guide
  3. setup.sh - Automated setup script
  4. verify.sh - Health check script
  5. test_binance.py - API connectivity test

πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Binance API β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
       β”‚ HTTP REST
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Python ETL     β”‚
β”‚  (Every 60s)    β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚ MySQL Connector
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  MySQL (Aiven)  │◄──────┐
β”‚  SSL Required   β”‚       β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
       β”‚ Binlog           β”‚
       β–Ό                  β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚   Debezium CDC  β”‚       β”‚ Primary
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚ Storage
       β”‚ JSON             β”‚
       β–Ό                  β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  Kafka Topics   β”‚       β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
       β”‚                  β”‚
       β–Ό                  β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚ Python Consumer β”‚       β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
       β”‚                  β”‚
       β–Ό                  β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚   Cassandra     β”‚β—„β”€β”€β”€β”€β”€β”€β”˜
β”‚  (Time Series)  β”‚   Replicated
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   Storage
       β”‚
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    Grafana      β”‚
β”‚   Dashboards    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Enter fullscreen mode Exit fullscreen mode

βœ… Phase Implementation

Phase A: Binance API Integration βœ“

Endpoints Implemented:

  • /api/v3/ticker/24hr - 24-hour statistics (primary)
  • /api/v3/ticker/price - Latest prices
  • /api/v3/depth - Order book depth
  • /api/v3/trades - Recent trades
  • /api/v3/klines - Candlestick data

MySQL Storage:

  • Table: ticker_24h (19 columns, 3 indexes)
  • Table: prices (4 columns, 2 indexes)
  • SSL connection enforced
  • Auto-reconnection handling
  • Bulk insert optimization

Phase B: Change Data Capture βœ“

Debezium Configuration:

  • Connector: MySQL CDC (io.debezium.connector.mysql)
  • Mode: Initial snapshot + continuous streaming
  • Transform: ExtractNewRecordState (unwrap)
  • SSL: Required mode
  • Topics: Auto-created per table

Kafka Setup:

  • Broker: Single node (scalable)
  • Zookeeper: Standalone
  • Topics: crypto_mysql_server.defaultdb.ticker_24h, crypto_mysql_server.defaultdb.prices
  • Retention: Default (7 days)

Consumer Implementation:

  • Group ID: crypto-cdc-consumer-group
  • Offset: Earliest
  • Auto-commit: Enabled
  • Deserializer: JSON
  • Error handling: Try-catch with logging

Phase C: Visualization βœ“

Cassandra Schema:

  • Keyspace: crypto_data (SimpleStrategy, RF=1)
  • Tables: ticker_24h, prices
  • Partition key: symbol
  • Clustering: created_at DESC, id DESC
  • Materialized View: top_performers (by price_change_percent)

Grafana Dashboards:

  1. Top 5 Performers - Table showing 24h % gainers
  2. Price Change Distribution - Histogram of all changes
  3. Latest Price Trends - Time series for BTC/ETH/BNB
  4. Volume Leaders - Bar chart of highest volumes
  5. Price Volatility - Gauge showing high-low spread

πŸ”§ Technical Specifications

Docker Services

Service Image Ports Purpose
cassandra cassandra:4.1 9042 Time-series storage
zookeeper cp-zookeeper:7.5.0 2181 Kafka coordination
kafka cp-kafka:7.5.0 9092, 29092 Message broker
kafka-connect debezium/connect:2.4 8083 CDC connector
grafana grafana:10.2.0 3000 Visualization
crypto-pipeline Custom (Python 3.12.3) - ETL application

Python Dependencies

requests==2.31.0          # HTTP client
mysql-connector-python    # MySQL driver
cassandra-driver==3.29.0  # Cassandra driver
kafka-python==2.0.2       # Kafka client
pyspark==3.5.0           # Spark (ready for batch)
python-dotenv==1.0.0     # Environment variables
pandas==2.1.4            # Data manipulation
schedule==1.2.0          # Task scheduling
psutil==5.9.6            # System utilities
Enter fullscreen mode Exit fullscreen mode

Environment Variables

# MySQL (Aiven)
MYSQL_HOST=mysql-3cea0949-lagatkjosiah-692c.e.aivencloud.com
MYSQL_PORT=24862
MYSQL_USER=avnadmin
MYSQL_PASSWORD=AVNS_Wfw7QKTqBTISdxKeNq0
MYSQL_DATABASE=defaultdb

# Binance
BINANCE_API_BASE_URL=https://api.binance.com

# Application
FETCH_INTERVAL_SECONDS=60
LOG_LEVEL=INFO
Enter fullscreen mode Exit fullscreen mode

πŸ“Š Data Flow

Ingestion Cycle (Every 60 seconds)

  1. T+0s: Fetch from Binance API (~2000 trading pairs)
  2. T+1s: Filter USDT pairs (~600 pairs)
  3. T+2s: Bulk insert to MySQL (2 tables)
  4. T+3s: MySQL binlog updated
  5. T+4s: Debezium detects changes
  6. T+5s: Publishes to Kafka topics
  7. T+6s: Python consumer reads messages
  8. T+7s: Writes to Cassandra
  9. T+8s: Grafana queries updated data

Data Volumes (Estimated)

  • API Response: ~600 USDT pairs
  • MySQL Inserts: ~600 rows/minute
  • Kafka Messages: ~600 messages/minute
  • Cassandra Writes: ~600 rows/minute
  • Daily Volume: ~864,000 records/day

πŸš€ Deployment Instructions

Prerequisites Check

# Windows WSL2
wsl --version

# Docker
docker --version  # 28.2.2
docker-compose --version  # v2.40.1

# Python
python --version  # 3.12.3

# Java
java --version  # JDK 17
Enter fullscreen mode Exit fullscreen mode

Installation Steps

# 1. Create project
mkdir crypto-pipeline && cd crypto-pipeline

# 2. Create structure
mkdir -p src grafana/provisioning/{datasources,dashboards} logs

# 3. Copy all 19 files from artifacts

# 4. Make scripts executable
chmod +x setup.sh verify.sh test_binance.py

# 5. Test API connectivity
python test_binance.py

# 6. Start pipeline
./setup.sh

# 7. Verify installation
./verify.sh

# 8. Access Grafana
open http://localhost:3000
Enter fullscreen mode Exit fullscreen mode

πŸ” Monitoring & Maintenance

Health Checks

# All services
docker-compose ps

# Application logs
docker-compose logs -f crypto-pipeline

# Debezium status
curl http://localhost:8083/connectors/mysql-crypto-connector/status | jq

# Cassandra data count
docker exec cassandra cqlsh -e "SELECT COUNT(*) FROM crypto_data.ticker_24h;"

# Kafka topics
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

Common Operations


bash
# Restart specific service
docker-compose restart crypto-pipeline

# Update code
docker-compose build crypto-pipeline
docker-compose up -d crypto-pipeline

# Clean restart
docker
Enter fullscreen mode Exit fullscreen mode

Top comments (0)