DEV Community

Oliver Samuel
Oliver Samuel

Posted on

Streaming Real-Time Binance Data into Confluent Cloud and PostgreSQL

1. Introduction

Real-time data pipelines have become critical in financial services, especially in the crypto space where price movements occur in milliseconds. In this walk-through, we’ll build an end-to-end streaming pipeline that ingests live ticker data from the Binance REST API, streams it into Confluent Cloud (Kafka), and persists it into PostgreSQL for downstream analytics and visualization.

This pipeline demonstrates a classic event-driven architecture:

  • Producer → Fetches crypto prices from Binance and publishes messages into Kafka.
  • Consumer → Reads from Kafka, parses the payload, and inserts into PostgreSQL.
  • Database + Views → Allows us to validate and analyze the ingested data using tools like DBeaver.

2. Project Setup

Your project folder contains:
producer.py → extracts Binance 24hr ticker data & streams to Kafka.
consumer.py → consumes from Kafka & inserts into PostgreSQL.
.env → contains secrets (Kafka, Binance, PostgreSQL).
requirements.txt → Python dependencies.
.gitignore → keeps .env and unnecessary files out of version control.

Dependencies(requirements.txt)

confluent-kafka==2.11.1
psycopg2-binary==2.9.10
websockets==15.0.1
sqlalchemy
python-dotenv
Enter fullscreen mode Exit fullscreen mode

Environment Variable(.env)

BOOTSTRAP_SERVERS=pkc-xxxx.gcp.confluent.cloud:9092
CONFLUENT_API_KEY=xxxx
CONFLUENT_SECRET_KEY=xxxx
KAFKA_TOPIC=binance_topic
DATABASE_URL=postgresql://user:password@host:port/db?sslmode=require
Enter fullscreen mode Exit fullscreen mode

Confluent Cloud → Cluster Settings → API Keys

3. Producing Binance Data into Kafka

The producer script pulls real-time ticker data from Binance’s REST API and pushes it into the Kafka topic binance_topic.

import requests
import json, os, time, logging
from confluent_kafka import Producer

from dotenv import load_dotenv
from typing import Dict, Any, List
load_dotenv()

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("binance_producer")

binance_url = "https://api.binance.com/api/v3/ticker/24hr"

symbols: List[str] = ["BTCUSDT", "ETHUSDT", "BNBUSDT"]
kafka_config = {
    "bootstrap.servers": os.getenv("BOOTSTRAP_SERVERS"),
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": os.getenv("CONFLUENT_API_KEY"),
    "sasl.password": os.getenv("CONFLUENT_SECRET_KEY"),
    "broker.address.family": "v4",
    "message.send.max.retries": 5,
    "retry.backoff.ms": 500,
}

topic = os.getenv("KAFKA_TOPIC")
producer = Producer(kafka_config)

def binance_extract(symbols: str) -> Dict[str, Any]:
    params = {"symbol": symbols}
    response = requests.get(binance_url, params=params)
    response.raise_for_status()
    data = response.json()
    data["extracted_symbol"] = symbols
    data["source"] = "binance24hr_ticker"
    return data

def delivery_report(err, msg):
    if err is not None:
        logger.error(f"Message delivery failed: {err}")
    else:
        logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}]")


def data_streaming():
    for symbol in symbols:
        try:
            data = binance_extract(symbol)
            producer.produce(
                topic,
                key = symbol,
                value = json.dumps(data),
                callback = delivery_report
            )
        except Exception as e:
            logger.error(f"Error fetching data for {symbol}: {e}")
    producer.flush()


if __name__ == "__main__":
    while True:
        data_streaming()
        logger.info("Batch sent")
        time.sleep(10)
Enter fullscreen mode Exit fullscreen mode

The script runs in a loop every 10 seconds, sending a fresh batch of ticker data.
JSON payloads

4. Consuming & Persisting to PostgreSQL

The consumer reads events from the Kafka topic, parses them, and inserts rows into PostgreSQL using SQLAlchemy.

import os
from dotenv import load_dotenv
from confluent_kafka import Consumer, KafkaException
from json import loads
from datetime import datetime, timezone
import psycopg2
from sqlalchemy import create_engine, text

# Load environment variables
load_dotenv()

conn_string = os.getenv("DATABASE_URL")
if not conn_string:
    raise ValueError("DATABASE_URL environment variable not set")

# Create SQLAlchemy engine
engine = create_engine(conn_string, pool_pre_ping=True)

#Test connection
with engine.connect() as conn:
    print("Database connection successful")

# Create a table
with engine.begin() as conn:
    conn.execute(text("""
                CREATE TABLE IF NOT EXISTS binance_ticker_24h (
                    id BIGSERIAL PRIMARY KEY,
                    symbol TEXT NOT NULL,
                    price_change NUMERIC(38, 18),
                    price_change_percent NUMERIC(38, 18),
                    open_price NUMERIC(38, 18),
                    close_price NUMERIC(38, 18),
                    high_price NUMERIC(38, 18),
                    low_price NUMERIC(38, 18),
                    volume NUMERIC(38, 18),
                    ask_price NUMERIC(38, 18),
                    bid_price NUMERIC(38, 18),
                    ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
                )
            """))
    print("PostgreSQL table created successfully")

# Confluent Kafka Consumer Configuration
conf = {
    'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': os.getenv('CONFLUENT_API_KEY'),
    'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'),
    'group.id': 'binance-group-id',
    'auto.offset.reset': 'earliest'
}

# Initialize Kafka consumer
consumer = Consumer(conf)
topic = os.getenv("KAFKA_TOPIC")
consumer.subscribe([topic])
print(f"Subscribed to topic: {topic}")

# --- Consume & insert ---
try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())

        try:
            payload = loads(msg.value().decode('utf-8'))
            symbol = payload.get("symbol") or payload.get("extracted_symbol")

            price_change = payload.get("priceChange")
            price_change_percent = payload.get("priceChangePercent")
            open_price = payload.get("openPrice")
            close_price = payload.get("lastPrice")
            high_price = payload.get("highPrice")
            low_price = payload.get("lowPrice")
            volume = payload.get("volume")
            ask_price = payload.get("askPrice")
            bid_price = payload.get("bidPrice")

            # Insert data into PostgreSQL
            insert_sql = text("""
                INSERT INTO binance_ticker_24h (
                    symbol, price_change, price_change_percent, open_price, close_price,
                    high_price, low_price, volume, ask_price, bid_price
                ) VALUES (
                    :symbol, :price_change, :price_change_percent, :open_price, :close_price,
                    :high_price, :low_price, :volume, :ask_price, :bid_price
                )
            """)
            with engine.begin() as conn:
                conn.execute(insert_sql, {
                    "symbol": symbol,
                    "price_change": price_change,
                    "price_change_percent": price_change_percent,
                    "open_price": open_price,
                    "close_price": close_price,
                    "high_price": high_price,
                    "low_price": low_price,
                    "volume": volume,
                    "ask_price": ask_price,
                    "bid_price": bid_price
                })
            print(f"Inserted {symbol}: close={close_price} high={high_price} low={low_price} ")
        except Exception as e:
            print(f"Error processing message: {e}")
except KeyboardInterrupt:
    print("Consumer stopped manually")
finally:
    consumer.close()
    print("Consumer closed")
Enter fullscreen mode Exit fullscreen mode

Consumer terminal output

When I first connected with **psycopg2, the connection kept failing because my .env variables didn’t match exactly (case sensitivity and multiple fragmented parameters like HOST, USER, PORT), so psycopg2 defaulted to my local system user (oliver) instead of the cloud credentials. In contrast, with **SQLAlchemy* I only needed a single environment variable (DATABASE_URL) that bundled all connection details (user, password, host, port, database, SSL mode) into one string, eliminating mismatches and simplifying authentication. This abstraction not only solved the credential-loading problem but also gave me cleaner connection handling, pooling, and portability, making SQLAlchemy the more reliable and maintainable choice for my streaming pipeline.*

5. Validating in PostgreSQL

Once the consumer is running, you can query the table directly. Using DBeaver or psql:
Validation output
DBeaver query result showing the latest Binance prices from the view.

6. End-to-End Architecture

The full pipeline looks like this:

Binance API → Producer → Confluent Cloud (Kafka) → Consumer → PostgreSQL → DBeaver
Architecture Diagram

7. Conclusion

With just a few scripts, we built a real-time crypto streaming pipeline:

  • Binance data ingested every 10 seconds.
  • Streamed into Kafka on Confluent Cloud.
  • Persisted into PostgreSQL (Aiven).
  • Queried and validated in DBeaver.

This architecture is lightweight but scalable:

  • Add more symbols to the producer for broader coverage.
  • Use ksqlDB in Confluent Cloud for real-time transformations.
  • Hook PostgreSQL up to BI tools like Metabase or Power BI for dashboards.

Such pipelines form the backbone of modern trading systems, market analytics platforms, and real-time risk engines.

Top comments (0)