1. Introduction
Real-time data pipelines have become critical in financial services, especially in the crypto space where price movements occur in milliseconds. In this walk-through, we’ll build an end-to-end streaming pipeline that ingests live ticker data from the Binance REST API, streams it into Confluent Cloud (Kafka), and persists it into PostgreSQL for downstream analytics and visualization.
This pipeline demonstrates a classic event-driven architecture:
- Producer → Fetches crypto prices from Binance and publishes messages into Kafka.
- Consumer → Reads from Kafka, parses the payload, and inserts into PostgreSQL.
- Database + Views → Allows us to validate and analyze the ingested data using tools like DBeaver.
2. Project Setup
Your project folder contains:
producer.py → extracts Binance 24hr ticker data & streams to Kafka.
consumer.py → consumes from Kafka & inserts into PostgreSQL.
.env → contains secrets (Kafka, Binance, PostgreSQL).
requirements.txt → Python dependencies.
.gitignore → keeps .env and unnecessary files out of version control.
Dependencies(requirements.txt
)
confluent-kafka==2.11.1
psycopg2-binary==2.9.10
websockets==15.0.1
sqlalchemy
python-dotenv
Environment Variable(.env
)
BOOTSTRAP_SERVERS=pkc-xxxx.gcp.confluent.cloud:9092
CONFLUENT_API_KEY=xxxx
CONFLUENT_SECRET_KEY=xxxx
KAFKA_TOPIC=binance_topic
DATABASE_URL=postgresql://user:password@host:port/db?sslmode=require
3. Producing Binance Data into Kafka
The producer script pulls real-time ticker data from Binance’s REST API and pushes it into the Kafka topic binance_topic.
import requests
import json, os, time, logging
from confluent_kafka import Producer
from dotenv import load_dotenv
from typing import Dict, Any, List
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("binance_producer")
binance_url = "https://api.binance.com/api/v3/ticker/24hr"
symbols: List[str] = ["BTCUSDT", "ETHUSDT", "BNBUSDT"]
kafka_config = {
"bootstrap.servers": os.getenv("BOOTSTRAP_SERVERS"),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": os.getenv("CONFLUENT_API_KEY"),
"sasl.password": os.getenv("CONFLUENT_SECRET_KEY"),
"broker.address.family": "v4",
"message.send.max.retries": 5,
"retry.backoff.ms": 500,
}
topic = os.getenv("KAFKA_TOPIC")
producer = Producer(kafka_config)
def binance_extract(symbols: str) -> Dict[str, Any]:
params = {"symbol": symbols}
response = requests.get(binance_url, params=params)
response.raise_for_status()
data = response.json()
data["extracted_symbol"] = symbols
data["source"] = "binance24hr_ticker"
return data
def delivery_report(err, msg):
if err is not None:
logger.error(f"Message delivery failed: {err}")
else:
logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}]")
def data_streaming():
for symbol in symbols:
try:
data = binance_extract(symbol)
producer.produce(
topic,
key = symbol,
value = json.dumps(data),
callback = delivery_report
)
except Exception as e:
logger.error(f"Error fetching data for {symbol}: {e}")
producer.flush()
if __name__ == "__main__":
while True:
data_streaming()
logger.info("Batch sent")
time.sleep(10)
The script runs in a loop every 10 seconds, sending a fresh batch of ticker data.
4. Consuming & Persisting to PostgreSQL
The consumer reads events from the Kafka topic, parses them, and inserts rows into PostgreSQL using SQLAlchemy.
import os
from dotenv import load_dotenv
from confluent_kafka import Consumer, KafkaException
from json import loads
from datetime import datetime, timezone
import psycopg2
from sqlalchemy import create_engine, text
# Load environment variables
load_dotenv()
conn_string = os.getenv("DATABASE_URL")
if not conn_string:
raise ValueError("DATABASE_URL environment variable not set")
# Create SQLAlchemy engine
engine = create_engine(conn_string, pool_pre_ping=True)
#Test connection
with engine.connect() as conn:
print("Database connection successful")
# Create a table
with engine.begin() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS binance_ticker_24h (
id BIGSERIAL PRIMARY KEY,
symbol TEXT NOT NULL,
price_change NUMERIC(38, 18),
price_change_percent NUMERIC(38, 18),
open_price NUMERIC(38, 18),
close_price NUMERIC(38, 18),
high_price NUMERIC(38, 18),
low_price NUMERIC(38, 18),
volume NUMERIC(38, 18),
ask_price NUMERIC(38, 18),
bid_price NUMERIC(38, 18),
ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
"""))
print("PostgreSQL table created successfully")
# Confluent Kafka Consumer Configuration
conf = {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': os.getenv('CONFLUENT_API_KEY'),
'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'),
'group.id': 'binance-group-id',
'auto.offset.reset': 'earliest'
}
# Initialize Kafka consumer
consumer = Consumer(conf)
topic = os.getenv("KAFKA_TOPIC")
consumer.subscribe([topic])
print(f"Subscribed to topic: {topic}")
# --- Consume & insert ---
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
try:
payload = loads(msg.value().decode('utf-8'))
symbol = payload.get("symbol") or payload.get("extracted_symbol")
price_change = payload.get("priceChange")
price_change_percent = payload.get("priceChangePercent")
open_price = payload.get("openPrice")
close_price = payload.get("lastPrice")
high_price = payload.get("highPrice")
low_price = payload.get("lowPrice")
volume = payload.get("volume")
ask_price = payload.get("askPrice")
bid_price = payload.get("bidPrice")
# Insert data into PostgreSQL
insert_sql = text("""
INSERT INTO binance_ticker_24h (
symbol, price_change, price_change_percent, open_price, close_price,
high_price, low_price, volume, ask_price, bid_price
) VALUES (
:symbol, :price_change, :price_change_percent, :open_price, :close_price,
:high_price, :low_price, :volume, :ask_price, :bid_price
)
""")
with engine.begin() as conn:
conn.execute(insert_sql, {
"symbol": symbol,
"price_change": price_change,
"price_change_percent": price_change_percent,
"open_price": open_price,
"close_price": close_price,
"high_price": high_price,
"low_price": low_price,
"volume": volume,
"ask_price": ask_price,
"bid_price": bid_price
})
print(f"Inserted {symbol}: close={close_price} high={high_price} low={low_price} ")
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
print("Consumer stopped manually")
finally:
consumer.close()
print("Consumer closed")
When I first connected with **psycopg2, the connection kept failing because my .env
variables didn’t match exactly (case sensitivity and multiple fragmented parameters like HOST
, USER
, PORT
), so psycopg2 defaulted to my local system user (oliver
) instead of the cloud credentials. In contrast, with **SQLAlchemy* I only needed a single environment variable (DATABASE_URL
) that bundled all connection details (user, password, host, port, database, SSL mode) into one string, eliminating mismatches and simplifying authentication. This abstraction not only solved the credential-loading problem but also gave me cleaner connection handling, pooling, and portability, making SQLAlchemy the more reliable and maintainable choice for my streaming pipeline.*
5. Validating in PostgreSQL
Once the consumer is running, you can query the table directly. Using DBeaver
or psql
:
DBeaver query result showing the latest Binance prices from the view.
6. End-to-End Architecture
The full pipeline looks like this:
Binance API → Producer → Confluent Cloud (Kafka) → Consumer → PostgreSQL → DBeaver
7. Conclusion
With just a few scripts, we built a real-time crypto streaming pipeline:
- Binance data ingested every 10 seconds.
- Streamed into Kafka on Confluent Cloud.
- Persisted into PostgreSQL (Aiven).
- Queried and validated in DBeaver.
This architecture is lightweight but scalable:
- Add more symbols to the producer for broader coverage.
- Use ksqlDB in Confluent Cloud for real-time transformations.
- Hook PostgreSQL up to BI tools like Metabase or Power BI for dashboards.
Such pipelines form the backbone of modern trading systems, market analytics platforms, and real-time risk engines.
Top comments (0)