DEV Community

Predzone
Predzone

Posted on

Building a Real-Time Odds Pipeline: From WebSocket to Your Database in Under 100ms

Building a Real-Time Odds Pipeline: From WebSocket to Your Database in Under 100ms
Tags: websocket, kafka, python, datapipeline

Odds move fast. A line shift on a major market can cascade across dozens of books in seconds, and if your pipeline can't keep up, you're not just missing data — you're making decisions on stale ground. In this post, I'll walk through how we built a real-time odds ingestion pipeline that handles thousands of updates per second with sub-100ms latency from source to storage.

The Problem
Sports betting odds are one of the most volatile data streams you'll encounter in production. Consider:

A single NFL game can generate 50,000+ price updates over its lifetime
Sharp books like Pinnacle move lines in milliseconds after sharp action
You need to capture every tick, not just the final state
Multiple data providers emit events in different schemas and protocols

A naive polling approach — hitting REST APIs every few seconds — simply doesn't cut it. You need a proper event-driven pipeline.

Architecture Overview
Here's the high-level design we landed on:
[Data Providers]
│ WebSocket / SSE

[Ingestion Layer] ← Python async collectors, one per provider
│ Raw JSON events

[Kafka Topic: odds.raw]


[Normalizer Service] ← Schema unification + deduplication
│ Normalized events

[Kafka Topic: odds.normalized]
│ │
▼ ▼
[TimescaleDB] [Redis] ← historical storage + hot cache
Four stages: collect → queue → normalize → persist. Each stage scales independently and fails independently. Let's dig into each one.

Stage 1: Ingestion — Async WebSocket Collectors
Each data provider gets its own lightweight collector process. We use Python's asyncio + websockets library for this.
pythonimport asyncio
import json
import websockets
from aiokafka import AIOKafkaProducer

PROVIDER_WS_URL = "wss://provider.example.com/odds-stream"
KAFKA_TOPIC = "odds.raw"

async def collect(producer: AIOKafkaProducer):
async with websockets.connect(
PROVIDER_WS_URL,
ping_interval=20,
ping_timeout=10,
) as ws:
await ws.send(json.dumps({"action": "subscribe", "markets": ["soccer", "basketball"]}))

    async for message in ws:
        event = {
            "provider": "example_provider",
            "received_at": time.time_ns(),  # nanosecond precision
            "raw": message,
        }
        await producer.send(
            KAFKA_TOPIC,
            key=b"example_provider",
            value=json.dumps(event).encode(),
        )
Enter fullscreen mode Exit fullscreen mode

async def main():
producer = AIOKafkaProducer(bootstrap_servers="localhost:9092")
await producer.start()
try:
await collect(producer)
finally:
await producer.stop()

asyncio.run(main())
A few things worth noting here:

ping_interval / ping_timeout: WebSocket connections to data providers drop silently more often than you'd think. Explicit keepalives catch this before you spend 30 minutes debugging "where did my data go?"
Nanosecond timestamps at ingestion: You'll thank yourself later when you're debugging latency and need to know exactly when an event hit your system vs. when the provider claims it fired.
Wrap, don't transform: The collector's only job is to get bytes into Kafka fast. No parsing, no validation — that's the normalizer's problem.

Stage 2: Kafka — The Backbone
We use Kafka as the message bus between stages for several reasons:

Replayability: If the normalizer has a bug, you can fix it and replay from the beginning of the topic without re-hitting the providers.
Backpressure: If downstream is slow, Kafka absorbs the spike instead of dropping events.
Fan-out: Multiple consumers can read the same normalized topic independently (ML models, alerting systems, your database writer).

Topic Configuration
bash# Raw topic — high retention, many partitions
kafka-topics.sh --create \
--topic odds.raw \
--partitions 24 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7 days
--config compression.type=lz4

Normalized topic — lower retention is fine, downstream handles persistence

kafka-topics.sh --create \
--topic odds.normalized \
--partitions 24 \
--replication-factor 3 \
--config retention.ms=86400000 \ # 1 day
--config compression.type=lz4
We partition by market ID (e.g., match_id + market_type). This keeps all updates for a given market on a single partition, which makes deduplication and ordering trivial in the normalizer.

Stage 3: Normalization — Schema Unification
Different providers use wildly different schemas. One sends home_price, another sends odds_1, a third uses fractional odds. The normalizer's job is to turn all of this into a canonical event:
pythonfrom dataclasses import dataclass
from decimal import Decimal

@dataclass
class NormalizedOddsEvent:
event_id: str # provider + market + timestamp hash
market_id: str # canonical market identifier
provider: str
selection: str # "home" | "away" | "draw" | player name, etc.
odds_decimal: Decimal # always decimal, always
line: Decimal | None # for spreads and totals
timestamp_ms: int # provider-reported time
received_at_ns: int # our ingestion timestamp
Here's a normalizer for a fictional provider:
pythonimport hashlib
import json
from decimal import Decimal
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

AMERICAN_TO_DECIMAL = lambda a: (
Decimal(a) / 100 + 1 if a > 0 else Decimal(100) / abs(a) + 1
)

def normalize_provider_x(raw: dict) -> list[NormalizedOddsEvent]:
events = []
for outcome in raw.get("outcomes", []):
american_price = outcome["price"]
decimal_odds = AMERICAN_TO_DECIMAL(american_price)

    event_id = hashlib.sha256(
        f"{raw['provider']}{raw['market_id']}{outcome['name']}{raw['timestamp']}".encode()
    ).hexdigest()[:16]

    events.append(NormalizedOddsEvent(
        event_id=event_id,
        market_id=raw["market_id"],
        provider=raw["provider"],
        selection=outcome["name"],
        odds_decimal=decimal_odds,
        line=Decimal(str(outcome["line"])) if "line" in outcome else None,
        timestamp_ms=raw["timestamp"],
        received_at_ns=raw["received_at"],
    ))
return events
Enter fullscreen mode Exit fullscreen mode

Deduplication
Providers sometimes send the same event twice. We deduplicate using a sliding window in Redis:
pythonimport redis.asyncio as redis

r = redis.Redis()

async def is_duplicate(event_id: str, window_seconds: int = 60) -> bool:
key = f"dedup:{event_id}"
result = await r.set(key, 1, nx=True, ex=window_seconds)
return result is None # None means key already existed

Stage 4: Persistence — TimescaleDB + Redis Hot Cache
TimescaleDB for Historical Data
Plain Postgres can't handle the write throughput of a busy odds feed. TimescaleDB's hypertables chunk your time-series data automatically and compress older chunks — we get 10–15x compression on odds data older than a week.
sqlCREATE TABLE odds_ticks (
received_at TIMESTAMPTZ NOT NULL,
market_id TEXT NOT NULL,
provider TEXT NOT NULL,
selection TEXT NOT NULL,
odds_decimal NUMERIC(8, 4) NOT NULL,
line NUMERIC(6, 2),
event_id TEXT NOT NULL
);

SELECT create_hypertable('odds_ticks', 'received_at', chunk_time_interval => INTERVAL '1 hour');

CREATE INDEX ON odds_ticks (market_id, received_at DESC);

-- Enable compression for chunks older than 2 hours
ALTER TABLE odds_ticks SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'market_id, provider'
);
SELECT add_compression_policy('odds_ticks', INTERVAL '2 hours');
We batch inserts using COPY rather than individual INSERT statements:
pythonimport asyncpg
from io import StringIO

async def batch_insert(conn: asyncpg.Connection, events: list[NormalizedOddsEvent]):
buf = StringIO()
for e in events:
buf.write(f"{e.received_at_ns}\t{e.market_id}\t{e.provider}\t"
f"{e.selection}\t{e.odds_decimal}\t{e.line or '\N'}\t{e.event_id}\n")
buf.seek(0)
await conn.copy_to_table(
"odds_ticks",
source=buf,
columns=["received_at", "market_id", "provider", "selection",
"odds_decimal", "line", "event_id"],
format="text",
)
We collect events into 250ms micro-batches before flushing — this alone cut our write latency by 60% versus row-by-row inserts.
Redis for Current Prices
For anything that needs the current price (pricing engines, UI, alerts), hitting TimescaleDB on every request is too slow. We maintain a hot cache in Redis:
pythonasync def update_hot_cache(r: redis.Redis, event: NormalizedOddsEvent):
key = f"odds:{event.market_id}:{event.provider}:{event.selection}"
await r.hset(key, mapping={
"odds": str(event.odds_decimal),
"line": str(event.line or ""),
"ts": event.timestamp_ms,
})
await r.expire(key, 3600) # 1-hour TTL for stale market cleanup

Measuring Latency
We track end-to-end latency as database_write_time - received_at_ns. Here's what we see in production:
PercentileLatencyp5018msp9547msp9981msp99.9142ms
The p99.9 spikes correlate with Kafka consumer group rebalances — still on the list to fix.

Lessons Learned

  1. Never trust provider timestamps alone. One of our providers was sending events with timestamps 8 seconds in the past during high load. Always store your own ingestion timestamp alongside provider time.
  2. Schema changes will break you. Providers update their WebSocket schemas with zero notice. Wrap your normalizer with a schema version check and route unknown schemas to a dead-letter topic rather than crashing.
  3. Backpressure is real. During major sporting events, event volume can spike 20x. Size your Kafka partitions and consumer instances for peak, not average.
  4. Test reconnection logic religiously. WebSocket connections drop. Your collector needs exponential backoff with jitter, and you need a metric that fires when a collector has been disconnected for more than 30 seconds. pythonasync def collect_with_retry(producer): backoff = 1 while True: try: await collect(producer) backoff = 1 # reset on success except Exception as e: jitter = random.uniform(0, backoff * 0.1) await asyncio.sleep(backoff + jitter) backoff = min(backoff * 2, 60)

What's Next
A few things on our roadmap:

Change Data Capture (CDC) from TimescaleDB into a downstream analytics warehouse
KSQL / Flink for stream-time aggregations (rolling implied probability, cross-book arbitrage detection)
OpenTelemetry traces that follow a single odds event from WebSocket frame to database row

Real-time pipelines are never truly "done" — the fun is in the iteration. If you're building something similar or have a different approach to any of this, I'd love to hear about it in the comments.

All benchmarks were taken from a production deployment running on 3×c6i.4xlarge Kafka brokers and 2×r6i.2xlarge TimescaleDB nodes (AWS, us-east-1).

Top comments (0)