CH02-2025 Project 101: Crypto Real-Time Data Pipeline
Complete Implementation Summary
π― Project Overview
A production-ready real-time data pipeline that:
- Ingests cryptocurrency data from Binance API
- Stores in MySQL (Aiven Cloud) with SSL
- Replicates via CDC (Debezium) to Kafka
- Stores time-series data in Cassandra
- Visualizes metrics in Grafana dashboards
π Files Created (13 Files)
Configuration Files
- docker-compose.yml - Orchestrates 7 services (Cassandra, Zookeeper, Kafka, Kafka Connect, Grafana, Python app)
- Dockerfile - Python 3.12.3 + Java 17 for PySpark
- requirements.txt - 9 Python dependencies
- .env - Aiven MySQL credentials (secured)
- .gitignore - Excludes sensitive files
Python Application (src/)
- main.py - Main orchestrator with threading
- binance_api.py - API client for 5 endpoints
- mysql_handler.py - MySQL operations with SSL
- cassandra_handler.py - Cassandra operations + materialized views
- debezium_connector.py - CDC connector setup
- kafka_consumer.py - Kafka consumer + Cassandra writer
Grafana Configuration
- grafana/provisioning/datasources/cassandra.yml - Data source
- grafana/provisioning/dashboards/dashboard.yml - Dashboard provisioning
- grafana/provisioning/dashboards/crypto-dashboard.json - 5 visualizations
Documentation & Scripts
- README.md - Comprehensive documentation
- QUICKSTART.md - Step-by-step Windows/WSL2 guide
- setup.sh - Automated setup script
- verify.sh - Health check script
- test_binance.py - API connectivity test
ποΈ Architecture
βββββββββββββββ
β Binance API β
ββββββββ¬βββββββ
β HTTP REST
βΌ
βββββββββββββββββββ
β Python ETL β
β (Every 60s) β
ββββββββ¬βββββββββββ
β MySQL Connector
βΌ
βββββββββββββββββββ
β MySQL (Aiven) βββββββββ
β SSL Required β β
ββββββββ¬βββββββββββ β
β Binlog β
βΌ β
βββββββββββββββββββ β
β Debezium CDC β β Primary
ββββββββ¬βββββββββββ β Storage
β JSON β
βΌ β
βββββββββββββββββββ β
β Kafka Topics β β
ββββββββ¬βββββββββββ β
β β
βΌ β
βββββββββββββββββββ β
β Python Consumer β β
ββββββββ¬βββββββββββ β
β β
βΌ β
βββββββββββββββββββ β
β Cassandra βββββββββ
β (Time Series) β Replicated
ββββββββ¬βββββββββββ Storage
β
βΌ
βββββββββββββββββββ
β Grafana β
β Dashboards β
βββββββββββββββββββ
β Phase Implementation
Phase A: Binance API Integration β
Endpoints Implemented:
-
/api/v3/ticker/24hr- 24-hour statistics (primary) -
/api/v3/ticker/price- Latest prices -
/api/v3/depth- Order book depth -
/api/v3/trades- Recent trades -
/api/v3/klines- Candlestick data
MySQL Storage:
- Table:
ticker_24h(19 columns, 3 indexes) - Table:
prices(4 columns, 2 indexes) - SSL connection enforced
- Auto-reconnection handling
- Bulk insert optimization
Phase B: Change Data Capture β
Debezium Configuration:
- Connector: MySQL CDC (io.debezium.connector.mysql)
- Mode: Initial snapshot + continuous streaming
- Transform: ExtractNewRecordState (unwrap)
- SSL: Required mode
- Topics: Auto-created per table
Kafka Setup:
- Broker: Single node (scalable)
- Zookeeper: Standalone
- Topics:
crypto_mysql_server.defaultdb.ticker_24h,crypto_mysql_server.defaultdb.prices - Retention: Default (7 days)
Consumer Implementation:
- Group ID: crypto-cdc-consumer-group
- Offset: Earliest
- Auto-commit: Enabled
- Deserializer: JSON
- Error handling: Try-catch with logging
Phase C: Visualization β
Cassandra Schema:
- Keyspace: crypto_data (SimpleStrategy, RF=1)
- Tables: ticker_24h, prices
- Partition key: symbol
- Clustering: created_at DESC, id DESC
- Materialized View: top_performers (by price_change_percent)
Grafana Dashboards:
- Top 5 Performers - Table showing 24h % gainers
- Price Change Distribution - Histogram of all changes
- Latest Price Trends - Time series for BTC/ETH/BNB
- Volume Leaders - Bar chart of highest volumes
- Price Volatility - Gauge showing high-low spread
π§ Technical Specifications
Docker Services
| Service | Image | Ports | Purpose |
|---|---|---|---|
| cassandra | cassandra:4.1 | 9042 | Time-series storage |
| zookeeper | cp-zookeeper:7.5.0 | 2181 | Kafka coordination |
| kafka | cp-kafka:7.5.0 | 9092, 29092 | Message broker |
| kafka-connect | debezium/connect:2.4 | 8083 | CDC connector |
| grafana | grafana:10.2.0 | 3000 | Visualization |
| crypto-pipeline | Custom (Python 3.12.3) | - | ETL application |
Python Dependencies
requests==2.31.0 # HTTP client
mysql-connector-python # MySQL driver
cassandra-driver==3.29.0 # Cassandra driver
kafka-python==2.0.2 # Kafka client
pyspark==3.5.0 # Spark (ready for batch)
python-dotenv==1.0.0 # Environment variables
pandas==2.1.4 # Data manipulation
schedule==1.2.0 # Task scheduling
psutil==5.9.6 # System utilities
Environment Variables
# MySQL (Aiven)
MYSQL_HOST=mysql-3cea0949-lagatkjosiah-692c.e.aivencloud.com
MYSQL_PORT=24862
MYSQL_USER=avnadmin
MYSQL_PASSWORD=AVNS_Wfw7QKTqBTISdxKeNq0
MYSQL_DATABASE=defaultdb
# Binance
BINANCE_API_BASE_URL=https://api.binance.com
# Application
FETCH_INTERVAL_SECONDS=60
LOG_LEVEL=INFO
π Data Flow
Ingestion Cycle (Every 60 seconds)
- T+0s: Fetch from Binance API (~2000 trading pairs)
- T+1s: Filter USDT pairs (~600 pairs)
- T+2s: Bulk insert to MySQL (2 tables)
- T+3s: MySQL binlog updated
- T+4s: Debezium detects changes
- T+5s: Publishes to Kafka topics
- T+6s: Python consumer reads messages
- T+7s: Writes to Cassandra
- T+8s: Grafana queries updated data
Data Volumes (Estimated)
- API Response: ~600 USDT pairs
- MySQL Inserts: ~600 rows/minute
- Kafka Messages: ~600 messages/minute
- Cassandra Writes: ~600 rows/minute
- Daily Volume: ~864,000 records/day
π Deployment Instructions
Prerequisites Check
# Windows WSL2
wsl --version
# Docker
docker --version # 28.2.2
docker-compose --version # v2.40.1
# Python
python --version # 3.12.3
# Java
java --version # JDK 17
Installation Steps
# 1. Create project
mkdir crypto-pipeline && cd crypto-pipeline
# 2. Create structure
mkdir -p src grafana/provisioning/{datasources,dashboards} logs
# 3. Copy all 19 files from artifacts
# 4. Make scripts executable
chmod +x setup.sh verify.sh test_binance.py
# 5. Test API connectivity
python test_binance.py
# 6. Start pipeline
./setup.sh
# 7. Verify installation
./verify.sh
# 8. Access Grafana
open http://localhost:3000
π Monitoring & Maintenance
Health Checks
# All services
docker-compose ps
# Application logs
docker-compose logs -f crypto-pipeline
# Debezium status
curl http://localhost:8083/connectors/mysql-crypto-connector/status | jq
# Cassandra data count
docker exec cassandra cqlsh -e "SELECT COUNT(*) FROM crypto_data.ticker_24h;"
# Kafka topics
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
Common Operations
bash
# Restart specific service
docker-compose restart crypto-pipeline
# Update code
docker-compose build crypto-pipeline
docker-compose up -d crypto-pipeline
# Clean restart
docker

Top comments (0)