Introduction
This project is a comprehensive real-time data pipeline that captures, processes, and visualises cryptocurrency market data from Binance. This project leverages Apache Kafka, PostgreSQL, Cassandra, and Grafana to build a scalable, event-driven architecture for financial data processing.
If you need any more info, visit the repository to get the full pictures with all the source files.
Technology Stack
- Data Ingestion: Python with Binance API
- Stream Processing: Apache Kafka
- Change Data Capture: Debezium PostgreSQL Connector
- Operational Database: PostgreSQL
- Analytical Database: Apache Cassandra
- Visualization: Grafana
- Orchestration: Docker & Docker Compose
Architecture
Data Flow Patterns
Write Path:
Binance API → ETL Service → PostgreSQL Insert
↓
Debezium CDC
↓
Kafka Topics
↓
Cassandra Sink
Read Path:
Cassandra → Grafana Data Sources → Dashboards
Component Interaction Flow
-
Source Layer (Binance API)
- Exposes RESTful endpoints for market data
- No built-in event streaming capability
- Requires polling via ETL producer
-
Ingestion Layer (PostgreSQL + Binance ETL)
- Receives transformed data from API
- Enables Write-Ahead Logging (WAL) for CDC
- Maintains logical replication slots for consistency
-
Streaming Layer (Apache Kafka)
- Decouples producers from consumers
- Provides topic-based pub/sub model
- Enables event replay and stream processing
-
Persistence Layer (Cassandra)
- Optimized for time-series analytical queries
- Distributed storage for fault tolerance
- Supports high-throughput writes
-
Visualization Layer (Grafana)
- Queries both PostgreSQL and Cassandra
- Real-time dashboard rendering
- Alert configuration capabilities
Key Components
1. Binance ETL Producer
The ETL Producer is the entry point that orchestrates data collection from Binance. It fetches market data for the top cryptocurrency symbols.
The producer identifies the top 5 crypto gainers in the 24-hour period before fetching detailed data:
def fetch_top5_24hr() -> tuple[list, list]:
try:
logging.info(f'Starting to load top 5 data ...')
response = requests.get(f"https://api.binance.com/api/v3/ticker/24hr")
response.raise_for_status()
data = response.json()
# Filter based on the usdt tethered cryptos
usdt_pairs = [
item for item in data
if item['symbol'].endswith('USDT') and float(item['quoteVolume']) > 1000000 and float(item['askQty']) > 0.0
]
# sort the data
sorted_data = sorted(
usdt_pairs,
key = lambda x: float(x["priceChangePercent"]),
reverse=True
)
top_5 = [x['symbol'] for x in sorted_data[:5]]
# push this to the other tasks with xcoms
return sorted_data[:5] , top_5
Once the top 5 symbols are identified, the producer fetches and transforms the other 3 types of market data for each of the symbols identified in the list:
- Kline Data (OHLCV Candlestick Data) - Fetching 5-minute candlestick data for the last 24 hours. Uses interval='5m' to capture granular price movements.
- Recent Trades (Individual Trade Transactions) - Fetching up to 300 most recent trades. Each trade shows: who traded, at what price, how much, and when.
-
Order Book (Bid/Ask Levels) - Fetching the current order book with up to 300 levels on each side. Returns:
{'bids': [[price, qty], ...], 'asks': [[price, qty], ...]}
Then executes the fetch and transform functions in the following main function. It completes in approximately 5-10 seconds.
if __name__ == "__main__":
# Initialize Cassandra schema
setup_cassandra()
# Fetch top 5 gainers (runs once)
data, top_5 = fetch_top5_24hr()
transform_load_top5_24hr(data)
# For each of the top 5 symbols
for i, item in enumerate(top_5):
symbol = item['symbol']
ranking = i + 1
# Fetch and load klines (25 hours of 5-min data)
klines = fetch_klines_24hrs(symbol)
transform_load_klines_data(klines, symbol, ranking)
# Fetch and load recent trades (300 most recent)
trades = fetch_recent_trades(symbol)
transform_load_recent_trades(trades, symbol, ranking)
# Fetch and load order book (bid/ask levels)
ob = fetch_OB(symbol)
transform_load_OB_data(ob, symbol, ranking)
2. PostgreSQL
PostgreSQL serves as the operational database—the first landing zone for all market data. Its role is critical: it must capture every insert with precision for downstream CDC.
Database Initialization
# docker-compose.yml excerpt
postgres:
image: debezium/postgres:15
container_name: postgres
ports:
- "5432:5432"
command: postgres -c wal_level=logical -c max_wal_senders=10 -c max_replication_slots=10
environment:
POSTGRES_USER: dbz
POSTGRES_PASSWORD: dbz
POSTGRES_DB: cap_stock_db
Critical setting: wal_level=logical enables Logical Replication, which is required for Debezium CDC to function.
3. Debezium PostgreSQL Connector: The CDC Engine
Debezium is the change data capture engine that transforms PostgreSQL into an event publisher.
How CDC Works
- Logical Replication Slot — Debezium creates a slot that tracks WAL position
- WAL Scanning — Reads Write-Ahead Log sequentially
- Event Decoding — Converts log entries to JSON change events
- Kafka Publishing — Sends events to topics matching table names
Docker setup
connect:
image: quay.io/debezium/connect:3.1
container_name: connect
depends_on: [kafka]
ports: ["8083:8083"]
environment:
#Bootstrap
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
HOST_NAME: "connect"
ADVERTISED_HOST_NAME: "connect"
ADVERTISED_PORT: "8083"
KAFKA_CONNECT_PLUGIN_PATH: /kafka/connect,/kafka/plugins,/kafka/plugins/kafka-connect-cassandra-sink-1.7.3,/debezium/connect
volumes:
- ./plugins/kafka-connect-cassandra-sink-1.7.3:/kafka/connect/kafka-connect-cassandra-sink-1.7.3
Connector Configuration
{
"name": "cap-stock-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "dbz",
"database.password": "dbz",
"database.dbname": "cap_stock_db",
"topic.prefix": "cap_stock",
"slot.name": "cap_stock_slot",
"publication.autocreate.mode": "filtered",
"table.include.list": "public.kline_data, public.order_book, public.recent_trades, public.top_24hr",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.cap_stock_db"
}
}
4. Apache Kafka: The Event Streaming Backbone
Kafka acts as the central event hub, decoupling data producers (PostgreSQL via Debezium) from consumers (Cassandra Sink Connector).
Kafka Setup
zookeeper:
image: quay.io/debezium/zookeeper:3.1
container_name: zookeeper
ports: ["2181:2181"]
kafka:
image: quay.io/debezium/kafka:3.1
container_name: kafka
depends_on: [zookeeper]
ports: ["29092:29092"]
environment:
KAFKA_BROKER_ID: 1
ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
akhq:
image: tchiotludo/akhq:latest
ports: [8080:8080]
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
local:
properties:
bootstrap.servers: "kafka:9092"
connect:
- name: "connect"
url: "http://connect:8083"
depends_on: [kafka, connect]
Four topics are created:
| Topic | Partitions | Retention | Purpose |
|---|---|---|---|
cap_stock.public.kline_data |
5 | 7 days | Candlestick events |
cap_stock.public.recent_trades |
5 | 7 days | Trade events |
cap_stock.public.order_book |
5 | 7 days | Order book snapshots |
cap_stock.public.top_24hr |
5 | 7 days | 24hr statistics |
To verify, navigate to localhost:8080. Here you'll the kafka ui running with all the info you might need
5. Cassandra Database: The Analytical Store
Cassandra is the analytics powerhouse for our dashboard, purpose-built for time-series analytics with massive write throughput and fast range queries.
But before you do anything, don't forget to set up the database first, lest you want to spend hours debugging, why your queries work on the terminal but nowhere else
Cassandra Initialization
def setup_cassandra() -> None:
logging.basicConfig(
level=logging.INFO,
format= '%(asctime)s|%(levelname)s|%(name)s|%(message)s',
handlers = [
logging.FileHandler(f"cassandra_setup{datetime.now().strftime('%Y%m%d')}.log"),
logging.StreamHandler()
]
)
load_dotenv()
logging.info('Creating the connection ...')
# connecting to cassandra
cluster = Cluster(['cassandra'])
session = cluster.connect()
logging.info('Connected ...')
logging.info('Creating the keyspace cap_stock ...')
# create the key space
create_keyspace_query = """
CREATE KEYSPACE IF NOT EXISTS cap_stock WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1 };
"""
session.execute(create_keyspace_query)
use_keyspace_query= """
USE cap_stock;
"""
session.execute(use_keyspace_query)
logging.info('using the created keyspace ...')
logging.info('Creating the tables')
create_kline_query = """
CREATE TABLE IF NOT EXISTS cap_stock.kline_data (
symbol text,
k_open_time timestamp,
k_close_time timestamp,
open double,
high double,
low double,
close double,
volume double,
quote_asset_volume double,
number_of_trades int,
tb_base_volume double,
tb_quote_volume double,
ranking int,
time_collected timestamp,
PRIMARY KEY ((symbol), k_open_time)
) WITH CLUSTERING ORDER BY (k_open_time DESC)
AND compaction = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': '1'
};
"""
create_OB_query = """
CREATE TABLE IF NOT EXISTS cap_stock.order_book (
symbol text,
side text, -- 'bids' or 'asks'
ranking int, -- rank of the crypto in top performers
price double,
quantity double,
time_collected timestamp, -- when this snapshot was taken
PRIMARY KEY ((symbol, side, time_collected), price)
) WITH CLUSTERING ORDER BY (price DESC);
"""
create_recent_trades_query = """
CREATE TABLE IF NOT EXISTS cap_stock.recent_trades(
symbol text,
trade_time timestamp,
trade_id bigint,
price double,
qty double,
quote_qty double,
is_buyer_maker boolean,
is_best_match boolean,
ranking int,
time_collected timestamp,
PRIMARY KEY ((symbol), trade_time, trade_id)
) WITH CLUSTERING ORDER BY (trade_time DESC, trade_id DESC)
AND compaction = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'HOURS',
'compaction_window_size': '1'
};
"""
create_top_24hrs = """
CREATE TABLE IF NOT EXISTS cap_stock.top_24hr (
symbol text,
time_collected timestamp,
price_change double,
price_change_percent double,
weighted_avg_price double,
prev_close_price double,
last_price double,
last_qty double,
bid_price double,
bid_qty double,
ask_price double,
ask_qty double,
open_price double,
high_price double,
low_price double,
volume double,
quote_volume double,
open_time timestamp,
close_time timestamp,
first_id bigint,
last_id bigint,
PRIMARY KEY ((symbol), time_collected)
) WITH CLUSTERING ORDER BY (time_collected DESC)
AND compaction = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'HOURS',
'compaction_window_size': '1'
};
"""
session.execute(create_top_24hrs)
session.execute(create_recent_trades_query)
session.execute(create_OB_query)
session.execute(create_kline_query)
6. Kafka Connect (The same DBZ connector repurposed)
Kafka Connect is the middleware that runs connectors to move data between systems.
Cassandra Sink Connector Configuration
// register_cassandra.json
{
"name": "cassandra_sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "cap_stock.public.kline_data, cap_stock.public.recent_trades ,cap_stock.public.top_24hr, cap_stock.public.order_book ",
"contactPoints": "cassandra",
"loadBalancing.localDc": "datacenter1",
"topic.cap_stock.public.kline_data.cap_stock.kline_data.mapping": "symbol=value.symbol,k_open_time=value.k_open_time,k_close_time=value.k_close_time,open=value.open,high=value.high,low=value.low,close=value.close,volume=value.volume,quote_asset_volume=value.quote_asset_volume,number_of_trades=value.number_of_trades,tb_base_volume=value.tb_base_volume,tb_quote_volume=value.tb_quote_volume,ranking=value.ranking,time_collected=value.time_collected",
"topic.cap_stock.public.recent_trades.cap_stock.recent_trades.mapping":"symbol=value.symbol,trade_time=value.time,trade_id=value.id,price=value.price,qty=value.qty,quote_qty=value.quote_qty,is_buyer_maker=value.is_buyer_maker,is_best_match=value.is_best_match,ranking=value.ranking,time_collected=value.time_collected",
"topic.cap_stock.public.top_24hr.cap_stock.top_24hr.mapping":"symbol=value.symbol, time_collected=value.time_collected,price_change=value.price_change,price_change_percent=value.price_change_percent,weighted_avg_price=value.weighted_avg_price,prev_close_price=value.prev_close_price,last_price=value.last_price,last_qty=value.last_qty,bid_price=value.bid_price,bid_qty=value.bid_qty,ask_price=value.ask_price,ask_qty=value.ask_qty,open_price=value.open_price,high_price=value.high_price,low_price=value.low_price,volume=value.volume,quote_volume=value.quote_volume,open_time=value.open_time,close_time=value.close_time,first_id=value.first_id,last_id=value.last_id",
"topic.cap_stock.public.order_book.cap_stock.order_book.mapping": "symbol=value.symbol, side=value.side, ranking=value.ranking, price=value.price, quantity=value.quantity, time_collected=value.time_collected",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState"
}
}
Connector Registration
This is a bash script that basically initiates the connections using the configuration files to ensure that Debezium is listening for changes to record to Kafka, and Kafka Connect is listening for changes to record to Cassandra
Verify the status of the connection at localhost:8080
#!/bin/bash
# Register Postgres and Cassandra connectors to Kafka Connect
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @register-postgres.json
echo "✓ Postgres connector registered"
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @register_cassandra.json
echo "✓ Cassandra connector registered"
7. Grafana: Real-Time Visualisation
Grafana provides the user-facing dashboards that display market data in real-time.
Grafana Configuration
grafana:
image: grafana/grafana:main-ubuntu
container_name: grafana
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: admin
ports:
- "3001:3000"
volumes:
- ./grafana_data:/var/lib/grafana
depends_on:
- postgres
Once you get Grafana running, create a new dashboard, navigate to settings, add a variable symbol, then add panels to create visualisations with the sample queries below
This will allow you to switch between different crypto symbols like so
Sample Dashboard Queries
--- Klines
SELECT symbol,
k_open_time AS time,
open,
high,
low,
close,
volume
FROM cap_stock.kline_data
WHERE symbol = '$symbol'
AND k_open_time >= ${__from}
AND k_open_time <= ${__to}
--- Winners at the moment
SELECT symbol,
price_change_percent,
volume
FROM cap_stock.top_24hr
WHERE time_collected >= ${__from}
AND time_collected <= ${__to}
ALLOW FILTERING;
Results
Conclusion: A Data Engineering Success
The Binance Top Crypto Pipeline successfully implements a sophisticated, resilient, and performant data architecture. By strategically combining a relational anchor (PostgreSQL), an immutable log (Kafka), and a distributed analytical store (Cassandra) using Change Data Capture (Debezium).



Top comments (0)