<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Predzone</title>
    <description>The latest articles on DEV Community by Predzone (@predzone).</description>
    <link>https://dev.to/predzone</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3861151%2F27307130-ff7b-4bc6-99af-89e612799479.png</url>
      <title>DEV Community: Predzone</title>
      <link>https://dev.to/predzone</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/predzone"/>
    <language>en</language>
    <item>
      <title>Building a Real-Time Odds Pipeline: From WebSocket to Your Database in Under 100ms</title>
      <dc:creator>Predzone</dc:creator>
      <pubDate>Sat, 04 Apr 2026 15:13:00 +0000</pubDate>
      <link>https://dev.to/predzone/building-a-real-time-odds-pipeline-from-websocket-to-your-database-in-under-100ms-1h66</link>
      <guid>https://dev.to/predzone/building-a-real-time-odds-pipeline-from-websocket-to-your-database-in-under-100ms-1h66</guid>
      <description>&lt;p&gt;Building a Real-Time Odds Pipeline: From WebSocket to Your Database in Under 100ms&lt;br&gt;
Tags: websocket, kafka, python, datapipeline&lt;/p&gt;

&lt;p&gt;Odds move fast. A line shift on a major market can cascade across dozens of books in seconds, and if your pipeline can't keep up, you're not just missing data — you're making decisions on stale ground. In this post, I'll walk through how we built a real-time odds ingestion pipeline that handles thousands of updates per second with sub-100ms latency from source to storage.&lt;/p&gt;

&lt;p&gt;The Problem&lt;br&gt;
Sports betting odds are one of the most volatile data streams you'll encounter in production. Consider:&lt;/p&gt;

&lt;p&gt;A single NFL game can generate 50,000+ price updates over its lifetime&lt;br&gt;
Sharp books like Pinnacle move lines in milliseconds after sharp action&lt;br&gt;
You need to capture every tick, not just the final state&lt;br&gt;
Multiple data providers emit events in different schemas and protocols&lt;/p&gt;

&lt;p&gt;A naive polling approach — hitting REST APIs every few seconds — simply doesn't cut it. You need a proper event-driven pipeline.&lt;/p&gt;

&lt;p&gt;Architecture Overview&lt;br&gt;
Here's the high-level design we landed on:&lt;br&gt;
[Data Providers]&lt;br&gt;
    │  WebSocket / SSE&lt;br&gt;
    ▼&lt;br&gt;
[Ingestion Layer]   ← Python async collectors, one per provider&lt;br&gt;
    │  Raw JSON events&lt;br&gt;
    ▼&lt;br&gt;
[Kafka Topic: odds.raw]&lt;br&gt;
    │&lt;br&gt;
    ▼&lt;br&gt;
[Normalizer Service]  ← Schema unification + deduplication&lt;br&gt;
    │  Normalized events&lt;br&gt;
    ▼&lt;br&gt;
[Kafka Topic: odds.normalized]&lt;br&gt;
    │         │&lt;br&gt;
    ▼         ▼&lt;br&gt;
[TimescaleDB] [Redis]   ← historical storage + hot cache&lt;br&gt;
Four stages: collect → queue → normalize → persist. Each stage scales independently and fails independently. Let's dig into each one.&lt;/p&gt;

&lt;p&gt;Stage 1: Ingestion — Async WebSocket Collectors&lt;br&gt;
Each data provider gets its own lightweight collector process. We use Python's asyncio + websockets library for this.&lt;br&gt;
pythonimport asyncio&lt;br&gt;
import json&lt;br&gt;
import websockets&lt;br&gt;
from aiokafka import AIOKafkaProducer&lt;/p&gt;

&lt;p&gt;PROVIDER_WS_URL = "wss://provider.example.com/odds-stream"&lt;br&gt;
KAFKA_TOPIC = "odds.raw"&lt;/p&gt;

&lt;p&gt;async def collect(producer: AIOKafkaProducer):&lt;br&gt;
    async with websockets.connect(&lt;br&gt;
        PROVIDER_WS_URL,&lt;br&gt;
        ping_interval=20,&lt;br&gt;
        ping_timeout=10,&lt;br&gt;
    ) as ws:&lt;br&gt;
        await ws.send(json.dumps({"action": "subscribe", "markets": ["soccer", "basketball"]}))&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    async for message in ws:
        event = {
            "provider": "example_provider",
            "received_at": time.time_ns(),  # nanosecond precision
            "raw": message,
        }
        await producer.send(
            KAFKA_TOPIC,
            key=b"example_provider",
            value=json.dumps(event).encode(),
        )
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;async def main():&lt;br&gt;
    producer = AIOKafkaProducer(bootstrap_servers="localhost:9092")&lt;br&gt;
    await producer.start()&lt;br&gt;
    try:&lt;br&gt;
        await collect(producer)&lt;br&gt;
    finally:&lt;br&gt;
        await producer.stop()&lt;/p&gt;

&lt;p&gt;asyncio.run(main())&lt;br&gt;
A few things worth noting here:&lt;/p&gt;

&lt;p&gt;ping_interval / ping_timeout: WebSocket connections to data providers drop silently more often than you'd think. Explicit keepalives catch this before you spend 30 minutes debugging "where did my data go?"&lt;br&gt;
Nanosecond timestamps at ingestion: You'll thank yourself later when you're debugging latency and need to know exactly when an event hit your system vs. when the provider claims it fired.&lt;br&gt;
Wrap, don't transform: The collector's only job is to get bytes into Kafka fast. No parsing, no validation — that's the normalizer's problem.&lt;/p&gt;

&lt;p&gt;Stage 2: Kafka — The Backbone&lt;br&gt;
We use Kafka as the message bus between stages for several reasons:&lt;/p&gt;

&lt;p&gt;Replayability: If the normalizer has a bug, you can fix it and replay from the beginning of the topic without re-hitting the providers.&lt;br&gt;
Backpressure: If downstream is slow, Kafka absorbs the spike instead of dropping events.&lt;br&gt;
Fan-out: Multiple consumers can read the same normalized topic independently (ML models, alerting systems, your database writer).&lt;/p&gt;

&lt;p&gt;Topic Configuration&lt;br&gt;
bash# Raw topic — high retention, many partitions&lt;br&gt;
kafka-topics.sh --create \&lt;br&gt;
  --topic odds.raw \&lt;br&gt;
  --partitions 24 \&lt;br&gt;
  --replication-factor 3 \&lt;br&gt;
  --config retention.ms=604800000 \  # 7 days&lt;br&gt;
  --config compression.type=lz4&lt;/p&gt;

&lt;h1&gt;
  
  
  Normalized topic — lower retention is fine, downstream handles persistence
&lt;/h1&gt;

&lt;p&gt;kafka-topics.sh --create \&lt;br&gt;
  --topic odds.normalized \&lt;br&gt;
  --partitions 24 \&lt;br&gt;
  --replication-factor 3 \&lt;br&gt;
  --config retention.ms=86400000 \   # 1 day&lt;br&gt;
  --config compression.type=lz4&lt;br&gt;
We partition by market ID (e.g., match_id + market_type). This keeps all updates for a given market on a single partition, which makes deduplication and ordering trivial in the normalizer.&lt;/p&gt;

&lt;p&gt;Stage 3: Normalization — Schema Unification&lt;br&gt;
Different providers use wildly different schemas. One sends home_price, another sends odds_1, a third uses fractional odds. The normalizer's job is to turn all of this into a canonical event:&lt;br&gt;
pythonfrom dataclasses import dataclass&lt;br&gt;
from decimal import Decimal&lt;/p&gt;

&lt;p&gt;@dataclass&lt;br&gt;
class NormalizedOddsEvent:&lt;br&gt;
    event_id: str          # provider + market + timestamp hash&lt;br&gt;
    market_id: str         # canonical market identifier&lt;br&gt;
    provider: str&lt;br&gt;
    selection: str         # "home" | "away" | "draw" | player name, etc.&lt;br&gt;
    odds_decimal: Decimal  # always decimal, always&lt;br&gt;
    line: Decimal | None   # for spreads and totals&lt;br&gt;
    timestamp_ms: int      # provider-reported time&lt;br&gt;
    received_at_ns: int    # our ingestion timestamp&lt;br&gt;
Here's a normalizer for a fictional provider:&lt;br&gt;
pythonimport hashlib&lt;br&gt;
import json&lt;br&gt;
from decimal import Decimal&lt;br&gt;
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer&lt;/p&gt;

&lt;p&gt;AMERICAN_TO_DECIMAL = lambda a: (&lt;br&gt;
    Decimal(a) / 100 + 1 if a &amp;gt; 0 else Decimal(100) / abs(a) + 1&lt;br&gt;
)&lt;/p&gt;

&lt;p&gt;def normalize_provider_x(raw: dict) -&amp;gt; list[NormalizedOddsEvent]:&lt;br&gt;
    events = []&lt;br&gt;
    for outcome in raw.get("outcomes", []):&lt;br&gt;
        american_price = outcome["price"]&lt;br&gt;
        decimal_odds = AMERICAN_TO_DECIMAL(american_price)&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    event_id = hashlib.sha256(
        f"{raw['provider']}{raw['market_id']}{outcome['name']}{raw['timestamp']}".encode()
    ).hexdigest()[:16]

    events.append(NormalizedOddsEvent(
        event_id=event_id,
        market_id=raw["market_id"],
        provider=raw["provider"],
        selection=outcome["name"],
        odds_decimal=decimal_odds,
        line=Decimal(str(outcome["line"])) if "line" in outcome else None,
        timestamp_ms=raw["timestamp"],
        received_at_ns=raw["received_at"],
    ))
return events
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Deduplication&lt;br&gt;
Providers sometimes send the same event twice. We deduplicate using a sliding window in Redis:&lt;br&gt;
pythonimport redis.asyncio as redis&lt;/p&gt;

&lt;p&gt;r = redis.Redis()&lt;/p&gt;

&lt;p&gt;async def is_duplicate(event_id: str, window_seconds: int = 60) -&amp;gt; bool:&lt;br&gt;
    key = f"dedup:{event_id}"&lt;br&gt;
    result = await r.set(key, 1, nx=True, ex=window_seconds)&lt;br&gt;
    return result is None  # None means key already existed&lt;/p&gt;

&lt;p&gt;Stage 4: Persistence — TimescaleDB + Redis Hot Cache&lt;br&gt;
TimescaleDB for Historical Data&lt;br&gt;
Plain Postgres can't handle the write throughput of a busy odds feed. TimescaleDB's hypertables chunk your time-series data automatically and compress older chunks — we get 10–15x compression on odds data older than a week.&lt;br&gt;
sqlCREATE TABLE odds_ticks (&lt;br&gt;
    received_at   TIMESTAMPTZ NOT NULL,&lt;br&gt;
    market_id     TEXT        NOT NULL,&lt;br&gt;
    provider      TEXT        NOT NULL,&lt;br&gt;
    selection     TEXT        NOT NULL,&lt;br&gt;
    odds_decimal  NUMERIC(8, 4) NOT NULL,&lt;br&gt;
    line          NUMERIC(6, 2),&lt;br&gt;
    event_id      TEXT        NOT NULL&lt;br&gt;
);&lt;/p&gt;

&lt;p&gt;SELECT create_hypertable('odds_ticks', 'received_at', chunk_time_interval =&amp;gt; INTERVAL '1 hour');&lt;/p&gt;

&lt;p&gt;CREATE INDEX ON odds_ticks (market_id, received_at DESC);&lt;/p&gt;

&lt;p&gt;-- Enable compression for chunks older than 2 hours&lt;br&gt;
ALTER TABLE odds_ticks SET (&lt;br&gt;
    timescaledb.compress,&lt;br&gt;
    timescaledb.compress_segmentby = 'market_id, provider'&lt;br&gt;
);&lt;br&gt;
SELECT add_compression_policy('odds_ticks', INTERVAL '2 hours');&lt;br&gt;
We batch inserts using COPY rather than individual INSERT statements:&lt;br&gt;
pythonimport asyncpg&lt;br&gt;
from io import StringIO&lt;/p&gt;

&lt;p&gt;async def batch_insert(conn: asyncpg.Connection, events: list[NormalizedOddsEvent]):&lt;br&gt;
    buf = StringIO()&lt;br&gt;
    for e in events:&lt;br&gt;
        buf.write(f"{e.received_at_ns}\t{e.market_id}\t{e.provider}\t"&lt;br&gt;
                  f"{e.selection}\t{e.odds_decimal}\t{e.line or '\N'}\t{e.event_id}\n")&lt;br&gt;
    buf.seek(0)&lt;br&gt;
    await conn.copy_to_table(&lt;br&gt;
        "odds_ticks",&lt;br&gt;
        source=buf,&lt;br&gt;
        columns=["received_at", "market_id", "provider", "selection",&lt;br&gt;
                 "odds_decimal", "line", "event_id"],&lt;br&gt;
        format="text",&lt;br&gt;
    )&lt;br&gt;
We collect events into 250ms micro-batches before flushing — this alone cut our write latency by 60% versus row-by-row inserts.&lt;br&gt;
Redis for Current Prices&lt;br&gt;
For anything that needs the current price (pricing engines, UI, alerts), hitting TimescaleDB on every request is too slow. We maintain a hot cache in Redis:&lt;br&gt;
pythonasync def update_hot_cache(r: redis.Redis, event: NormalizedOddsEvent):&lt;br&gt;
    key = f"odds:{event.market_id}:{event.provider}:{event.selection}"&lt;br&gt;
    await r.hset(key, mapping={&lt;br&gt;
        "odds": str(event.odds_decimal),&lt;br&gt;
        "line": str(event.line or ""),&lt;br&gt;
        "ts": event.timestamp_ms,&lt;br&gt;
    })&lt;br&gt;
    await r.expire(key, 3600)  # 1-hour TTL for stale market cleanup&lt;/p&gt;

&lt;p&gt;Measuring Latency&lt;br&gt;
We track end-to-end latency as database_write_time - received_at_ns. Here's what we see in production:&lt;br&gt;
PercentileLatencyp5018msp9547msp9981msp99.9142ms&lt;br&gt;
The p99.9 spikes correlate with Kafka consumer group rebalances — still on the list to fix.&lt;/p&gt;

&lt;p&gt;Lessons Learned&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Never trust provider timestamps alone. One of our providers was sending events with timestamps 8 seconds in the past during high load. Always store your own ingestion timestamp alongside provider time.&lt;/li&gt;
&lt;li&gt;Schema changes will break you. Providers update their WebSocket schemas with zero notice. Wrap your normalizer with a schema version check and route unknown schemas to a dead-letter topic rather than crashing.&lt;/li&gt;
&lt;li&gt;Backpressure is real. During major sporting events, event volume can spike 20x. Size your Kafka partitions and consumer instances for peak, not average.&lt;/li&gt;
&lt;li&gt;Test reconnection logic religiously. WebSocket connections drop. Your collector needs exponential backoff with jitter, and you need a metric that fires when a collector has been disconnected for more than 30 seconds.
pythonasync def collect_with_retry(producer):
backoff = 1
while True:
    try:
        await collect(producer)
        backoff = 1  # reset on success
    except Exception as e:
        jitter = random.uniform(0, backoff * 0.1)
        await asyncio.sleep(backoff + jitter)
        backoff = min(backoff * 2, 60)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;What's Next&lt;br&gt;
A few things on our roadmap:&lt;/p&gt;

&lt;p&gt;Change Data Capture (CDC) from TimescaleDB into a downstream analytics warehouse&lt;br&gt;
KSQL / Flink for stream-time aggregations (rolling implied probability, cross-book arbitrage detection)&lt;br&gt;
OpenTelemetry traces that follow a single odds event from WebSocket frame to database row&lt;/p&gt;

&lt;p&gt;Real-time pipelines are never truly "done" — the fun is in the iteration. If you're building something similar or have a different approach to any of this, I'd love to hear about it in the comments.&lt;/p&gt;

&lt;p&gt;All benchmarks were taken from a production deployment running on 3×c6i.4xlarge Kafka brokers and 2×r6i.2xlarge TimescaleDB nodes (AWS, us-east-1).&lt;/p&gt;

</description>
      <category>ai</category>
      <category>webdev</category>
      <category>programming</category>
      <category>javascript</category>
    </item>
  </channel>
</rss>
