<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: ayoabass777</title>
    <description>The latest articles on DEV Community by ayoabass777 (@ayoabass777).</description>
    <link>https://dev.to/ayoabass777</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3874283%2F7c93aa46-f08a-4f5a-a2de-6a18c0800659.png</url>
      <title>DEV Community: ayoabass777</title>
      <link>https://dev.to/ayoabass777</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ayoabass777"/>
    <language>en</language>
    <item>
      <title>Building an AI-Augmented News Intelligence Pipeline with Kafka, Delta Lake, and LLMs</title>
      <dc:creator>ayoabass777</dc:creator>
      <pubDate>Thu, 30 Apr 2026 01:47:55 +0000</pubDate>
      <link>https://dev.to/ayoabass777/building-an-ai-augmented-news-intelligence-pipeline-with-kafka-delta-lake-and-llms-2nj3</link>
      <guid>https://dev.to/ayoabass777/building-an-ai-augmented-news-intelligence-pipeline-with-kafka-delta-lake-and-llms-2nj3</guid>
      <description>&lt;p&gt;&lt;em&gt;How I built a streaming pipeline that uses LLMs as a transform layer and Delta Lake for stateful content versioning&lt;/em&gt;&lt;/p&gt;




&lt;p&gt;My first portfolio project (Ballistics) was batch — API calls on a schedule, Airflow orchestration, S3 landing zone. My second (Pulse) was streaming — Kafka, exactly-once delivery, session analytics in dbt. Both used the same transformation tool (dbt) with different ingestion patterns.&lt;/p&gt;

&lt;p&gt;Sentinel is the third project, and the question changed. Ballistics and Pulse processed structured data — JSON from APIs, simulated clickstream events. What happens when the raw data is &lt;em&gt;unstructured&lt;/em&gt;? When the "transformation" isn't a SQL model but an LLM that extracts entities, sentiment, and summaries from raw HTML?&lt;/p&gt;

&lt;p&gt;Sentinel is a news intelligence pipeline that ingests articles from multiple sources, uses LLMs to extract structured data, and serves it through an API and dashboard. It's not a product — it's a proof of work for AI-augmented data engineering patterns.&lt;/p&gt;

&lt;h2&gt;
  
  
  What I Built
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;GDELT + RSS ──► Kafka ──► Fetcher ──► Kafka ──► LLM Parser ──► Kafka ──► Bronze Writer ──► Delta Lake ──► FastAPI
                  │                                    │                       │                │
               Redis L1/L2                          DLQ topic          Delta Lake write    PySpark MERGE
               (dedup gate)                      (exponential                                  (CDF)
                                                  backoff)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Two producers (GDELT and an 18-feed RSS aggregator) discover article URLs and push them to Kafka. A fetcher consumer retrieves the HTML. An LLM parser extracts structured data — title, author, companies, people, topics, sentiment, summary — and produces it to a &lt;code&gt;parsed_articles&lt;/code&gt; topic. A Bronze Writer consumer reads from that topic and writes to Delta Lake. A PySpark job transforms Bronze into Silver using a stateful MERGE. FastAPI serves the Silver layer to a React dashboard.&lt;/p&gt;

&lt;p&gt;Everything runs locally in Docker — Kafka in KRaft mode, Redis, and the dashboard. The producers, fetcher, and parser are long-running Python services.&lt;/p&gt;

&lt;h2&gt;
  
  
  Architecture
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;┌──────────────┐
│    GDELT     │──┐    ┌─────────────┐     ┌─────────────┐
│   Producer   │  ├───►│ Redis Dedup │────►│sentinel.urls│
└──────────────┘  │    │  L1+L2 keys │     └──────┬──────┘
┌──────────────┐  │    └─────────────┘            │
│     RSS      │──┘                               ▼
│  (18 feeds)  │                        ┌─────────────────────┐
└──────────────┘                        │ Fetcher (txn)       │
                                        │ begin→fetch→commit  │
                                        └──────────┬──────────┘
                                                   ▼
                                        ┌──────────────────┐
                                        │sentinel.raw_html │
                                        └──────────┬───────┘
                                                   ▼
                                        ┌─────────────────────┐
                                        │ LLM Parser (txn)    │
                                        │ OpenAI / Anthropic  │
                                        │ / DeepSeek          │
                                        └──────────┬──────────┘
                      ┌──────────┐                 │
                      │   DLQ    │◄── fails        │
                      │  Replay  │                 │
                      │1m→5m→30m │                 │
                      └──────────┘                 ▼
                                      ┌────────────────────────┐
                                      │sentinel.parsed_articles│
                                      └───────────┬────────────┘
                                                  ▼
                                        ┌──────────────────┐
                                        │  Bronze Writer   │
                                        └──────────┬───────┘
                                                   ▼
                                            ┌─────────────┐
                                            │Delta Bronze │
                                            │ (CDF-enabled)│
                                            └──────┬──────┘
                                                   ▼
                                            ┌─────────────┐
                                            │  PySpark    │
                                            │ MERGE→Silver│
                                            └──────┬──────┘
                                                   ▼
                                            ┌─────────────┐
                                            │  FastAPI +  │
                                            │  Dashboard  │
                                            └─────────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fp1pkwhoyei4fjkqpqdhi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fp1pkwhoyei4fjkqpqdhi.png" alt="Sentinel Dashboard"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Data Flow: Life of an Article
&lt;/h2&gt;

&lt;p&gt;One article, end to end:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Discovery.&lt;/strong&gt; The RSS producer polls a tech feed. A new entry appears — &lt;code&gt;https://example.com/article-about-funding&lt;/code&gt;. The producer checks Redis: L1 key (&lt;code&gt;sentinel:src:rss:{guid}&lt;/code&gt;) doesn't exist, L2 key (&lt;code&gt;sentinel:url:example.com/article-about-funding&lt;/code&gt;) doesn't exist. Both keys are set with TTLs. The URL goes to &lt;code&gt;sentinel.urls&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Fetch.&lt;/strong&gt; The fetcher consumer reads the message inside a Kafka transaction — &lt;code&gt;begin&lt;/code&gt;, fetch HTML via &lt;code&gt;httpx&lt;/code&gt;, extract clean text with &lt;code&gt;trafilatura&lt;/code&gt; (stripping nav bars, ads, boilerplate), produce to &lt;code&gt;sentinel.raw_html&lt;/code&gt;, &lt;code&gt;commit&lt;/code&gt;. If the fetch fails (timeout, 404), the message goes to &lt;code&gt;sentinel.dlq.fetch&lt;/code&gt; for retry with exponential backoff (1m → 5m → 30m).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Parse.&lt;/strong&gt; The LLM parser reads the clean text and sends it to the LLM with a structured extraction prompt. The LLM returns JSON: title, author, companies, people, topics, sentiment, summary. The parser validates the response against a Pydantic schema, computes &lt;code&gt;data_value_score&lt;/code&gt;, and produces the structured &lt;code&gt;BronzeArticle&lt;/code&gt; to &lt;code&gt;sentinel.parsed_articles&lt;/code&gt;. If the LLM returns malformed JSON or the response fails validation, the message goes to &lt;code&gt;sentinel.dlq.parse&lt;/code&gt; — same exponential backoff as the fetch DLQ. The entire step — read, extract, produce — is a Kafka transaction. No Delta Lake write here.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Bronze Write.&lt;/strong&gt; The Bronze Writer consumer reads from &lt;code&gt;sentinel.parsed_articles&lt;/code&gt; and writes each article to the Delta Lake Bronze table with CDF enabled. If the write fails, the offset isn't committed — Kafka replays the message. This separation means the parser's transaction stays fully within Kafka's guarantees, and the storage write has its own error boundary.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Transform.&lt;/strong&gt; The PySpark job reads Bronze's Change Data Feed — only new rows since the last checkpoint. It dedupes (keeping the latest per URL, aggregating all sources), filters by quality score, enriches with &lt;code&gt;freshness_status&lt;/code&gt; and &lt;code&gt;ingestion_lag_hours&lt;/code&gt;, and MERGEs into Silver. If this URL already exists in Silver with a different &lt;code&gt;content_hash&lt;/code&gt;, the MERGE bumps &lt;code&gt;content_version&lt;/code&gt; and stores the previous hash.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Serve.&lt;/strong&gt; FastAPI reads Silver via &lt;code&gt;deltalake&lt;/code&gt; + PyArrow (no Spark). The dashboard fetches &lt;code&gt;/articles&lt;/code&gt; and renders cards with sentiment badges, freshness status, source tags, and quality scores.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;No scheduler told stages 1–4 when to run. The URL landing on a Kafka topic &lt;em&gt;was&lt;/em&gt; the trigger. Each service reacted to data arriving, not a clock ticking. The transform (stage 5) is the exception — it's a batch job triggered manually or on a cron, reading only new changes via CDF. The architecture is designed so this becomes a one-line swap to Spark Structured Streaming when throughput demands it.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why Docker Compose, Not Airflow
&lt;/h2&gt;

&lt;p&gt;This was the first architectural decision and it surprised me. I used Airflow for Ballistics. Three DAGs, task dependencies, scheduled triggers — it worked because Ballistics is a batch pipeline. Tasks start, run, finish, repeat on a schedule.&lt;/p&gt;

&lt;p&gt;Sentinel's consumers are different. They're &lt;em&gt;services&lt;/em&gt;, not &lt;em&gt;jobs&lt;/em&gt;. The fetcher doesn't run at 8am and finish at 8:05. It starts and stays alive, polling Kafka for new messages in a loop. Same for the parser. If I used Airflow, it would look like: "every 5 minutes, wake up, poll Kafka, process messages, shut down." That's wasteful — the consumer spends more time starting and stopping than working.&lt;/p&gt;

&lt;p&gt;Docker Compose keeps containers alive. Airflow schedules tasks. Different tools for different orchestration patterns.&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Pattern&lt;/th&gt;
&lt;th&gt;Orchestrator&lt;/th&gt;
&lt;th&gt;Why&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Batch (Ballistics)&lt;/td&gt;
&lt;td&gt;Airflow&lt;/td&gt;
&lt;td&gt;Tasks with dependencies, on a schedule&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Service (Sentinel)&lt;/td&gt;
&lt;td&gt;Docker Compose&lt;/td&gt;
&lt;td&gt;Long-running consumers, react to events&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;The key insight: Kafka is the real orchestrator here. A message landing on a topic &lt;em&gt;is&lt;/em&gt; the trigger. No scheduler tells the fetcher when to run — data arriving on &lt;code&gt;sentinel.urls&lt;/code&gt; is the signal. Docker Compose just keeps the services alive so they can listen.&lt;/p&gt;

&lt;h2&gt;
  
  
  4-Level Dedup: Why One Layer Isn't Enough
&lt;/h2&gt;

&lt;p&gt;Two producers ingesting from overlapping sources means duplicates are inevitable. The same article can appear in both GDELT and an RSS feed within minutes. I needed dedup at every boundary.&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Level&lt;/th&gt;
&lt;th&gt;Key&lt;/th&gt;
&lt;th&gt;Where&lt;/th&gt;
&lt;th&gt;TTL&lt;/th&gt;
&lt;th&gt;What it catches&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;L1&lt;/td&gt;
&lt;td&gt;&lt;code&gt;sentinel:src:{source}:{record_id}&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Redis&lt;/td&gt;
&lt;td&gt;24h&lt;/td&gt;
&lt;td&gt;Same source, same record&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;L2&lt;/td&gt;
&lt;td&gt;&lt;code&gt;sentinel:url:{normalized_url}&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Redis&lt;/td&gt;
&lt;td&gt;6h&lt;/td&gt;
&lt;td&gt;Same URL from different sources&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;L3&lt;/td&gt;
&lt;td&gt;&lt;code&gt;url + kafka_ts&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Bronze Delta&lt;/td&gt;
&lt;td&gt;∞&lt;/td&gt;
&lt;td&gt;Replay after crash&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;L4&lt;/td&gt;
&lt;td&gt;&lt;code&gt;normalized_url + content_hash&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Silver MERGE&lt;/td&gt;
&lt;td&gt;∞&lt;/td&gt;
&lt;td&gt;Content change detection&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;L1 and L2 are cheap — Redis &lt;code&gt;SET NX&lt;/code&gt; calls in a pipeline batch. They catch 95% of duplicates before anything touches Kafka. L3 is the parser's safety net against replayed messages (same mechanism as Pulse's &lt;code&gt;ON CONFLICT DO NOTHING&lt;/code&gt;). L4 is the stateful MERGE that tracks whether content actually changed.&lt;/p&gt;

&lt;p&gt;URL normalization matters here. &lt;code&gt;https://www.example.com/article?utm_source=rss&lt;/code&gt; and &lt;code&gt;http://example.com/article&lt;/code&gt; are the same article. The normalizer strips &lt;code&gt;www.&lt;/code&gt;, tracking params (&lt;code&gt;utm_*&lt;/code&gt;, &lt;code&gt;fbclid&lt;/code&gt;, &lt;code&gt;gclid&lt;/code&gt;), protocol differences, and trailing slashes. Without this, L2 misses cross-source duplicates.&lt;/p&gt;

&lt;h2&gt;
  
  
  The LLM as a Transform Layer
&lt;/h2&gt;

&lt;p&gt;In Ballistics and Pulse, transformation meant SQL — dbt models that reshape structured data. In Sentinel, the "transformation" is an LLM that takes article text and extracts structured fields:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;Clean&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;text&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;→&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;LLM&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;→&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="err"&gt;title&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;author&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;publish_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="err"&gt;body&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;summary&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;sentiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="err"&gt;companies&lt;/span&gt;&lt;span class="p"&gt;[],&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;people&lt;/span&gt;&lt;span class="p"&gt;[],&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;topics&lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This creates two problems batch SQL doesn't have:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;1. The bottleneck is cost, not throughput.&lt;/strong&gt; Each LLM call takes 2–5 seconds and costs tokens. The fetcher can saturate the &lt;code&gt;sentinel.raw_html&lt;/code&gt; topic far faster than the parser can drain it. Kafka absorbs the pressure difference as buffered messages — that's backpressure for free.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. The output is non-deterministic.&lt;/strong&gt; The same article parsed twice might produce slightly different entity lists. This is why the Silver MERGE uses &lt;code&gt;content_hash&lt;/code&gt; (an MD5 of the article body) rather than comparing extracted fields. If the source content hasn't changed, the extraction shouldn't re-run — regardless of whether the LLM might produce different output.&lt;/p&gt;

&lt;p&gt;The parser is pluggable — a &lt;code&gt;--provider&lt;/code&gt; flag switches between OpenAI, Anthropic, and DeepSeek per instance. This was a practical decision: OpenAI quotas ran out during development, so I needed a fallback. But it also demonstrates a production pattern — run expensive models (GPT-4o, Claude Sonnet) for quality-critical extraction and cheap models (GPT-4o-mini, DeepSeek) for bulk ingestion.&lt;/p&gt;

&lt;h2&gt;
  
  
  Transaction Boundaries: Why the Parser Doesn't Write to Delta Lake
&lt;/h2&gt;

&lt;p&gt;The parser reads from one Kafka topic and produces to another — &lt;code&gt;sentinel.raw_html&lt;/code&gt; in, &lt;code&gt;sentinel.parsed_articles&lt;/code&gt; out. That read-process-produce cycle is wrapped in a Kafka transaction (&lt;code&gt;begin&lt;/code&gt;, extract, produce, &lt;code&gt;commit&lt;/code&gt;). If the LLM call fails or validation rejects the output, the transaction aborts — the offset isn't committed, the output message isn't visible, and the failed message is routed to &lt;code&gt;sentinel.dlq.parse&lt;/code&gt; for retry with exponential backoff. No half-written state, no silent data loss.&lt;/p&gt;

&lt;p&gt;Critically, the parser does &lt;em&gt;not&lt;/em&gt; write to Delta Lake. That's a separate consumer (the Bronze Writer) with its own error boundary. Earlier in development, the parser wrote directly to Delta Lake inside the transaction — but Delta writes aren't part of Kafka's transactional guarantees, so a successful Delta write followed by a failed transaction commit would leave data in Bronze with an uncommitted offset. Separating them keeps each transaction boundary clean.&lt;/p&gt;

&lt;h2&gt;
  
  
  Delta Lake CDF: Incremental Transforms Without Full Scans
&lt;/h2&gt;

&lt;p&gt;The Bronze → Silver transform needs to process only &lt;em&gt;new&lt;/em&gt; articles, not re-scan the entire Bronze table every run. Delta Lake's Change Data Feed (CDF) solves this.&lt;/p&gt;

&lt;p&gt;When CDF is enabled on a table, every write creates a versioned changelog entry. The transform reads only changes since the last processed version:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Read only new changes from Bronze
&lt;/span&gt;&lt;span class="n"&gt;cdf&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;delta&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;readChangeFeed&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;startingVersion&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;last_processed_version&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;endingVersion&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;current_version&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;load&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bronze_path&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;_change_type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;isin&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;insert&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;update_postimage&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;A simple checkpoint file stores the last processed version. First run does a full scan automatically (no checkpoint found), subsequent runs only process new changes. The &lt;code&gt;--full&lt;/code&gt; flag forces a reconciliation scan if Silver drifts.&lt;/p&gt;

&lt;p&gt;This is the same design pattern as Kafka consumer offsets — "give me everything since my last position." The difference is that Kafka coordinates the streaming stages (producers, fetcher, parser), and CDF coordinates the storage stages (Bronze → Silver). Each tool handles readiness signaling in its domain.&lt;/p&gt;

&lt;h2&gt;
  
  
  Stateful MERGE: Content Versioning
&lt;/h2&gt;

&lt;p&gt;The Silver MERGE does more than dedup. It tracks whether an article's content has changed over time:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;silver_dt&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;merge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;source_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;b&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s.normalized_url = b.normalized_url&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;whenMatchedUpdate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;condition&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;b.content_hash != s.content_hash AND b.kafka_ts &amp;gt; s.kafka_ts&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nb"&gt;set&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;# Update content fields...
&lt;/span&gt;        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;content_version&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s.content_version&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;previous_content_hash&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s.content_hash&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;is_updated&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;F&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="c1"&gt;# first_seen_ts intentionally NOT updated
&lt;/span&gt;    &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;whenNotMatchedInsertAll&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Three outcomes:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Incoming vs Silver&lt;/th&gt;
&lt;th&gt;Action&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;New URL&lt;/td&gt;
&lt;td&gt;Insert (&lt;code&gt;content_version=1, is_updated=False&lt;/code&gt;)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Same URL + same content_hash&lt;/td&gt;
&lt;td&gt;Skip (true duplicate)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Same URL + different content_hash&lt;/td&gt;
&lt;td&gt;Update: bump version, store old hash, set &lt;code&gt;is_updated=True&lt;/code&gt;
&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;The statefulness is in three lines: &lt;code&gt;s.content_version + 1&lt;/code&gt; reads Silver's current state to compute the update. The MERGE isn't just transforming data — it's making decisions based on what's already there.&lt;/p&gt;

&lt;p&gt;If you need the old content, Delta time travel has it. Every previous version of Silver is accessible by version number. No need to store duplicate rows — the history is in the transaction log.&lt;/p&gt;

&lt;h2&gt;
  
  
  Data Quality: Don't Drop Late Data, Flag It
&lt;/h2&gt;

&lt;p&gt;Every article gets a &lt;code&gt;freshness_status&lt;/code&gt; computed from the gap between &lt;code&gt;publish_date&lt;/code&gt; and &lt;code&gt;fetch_timestamp&lt;/code&gt;:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Status&lt;/th&gt;
&lt;th&gt;Condition&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;fresh&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Ingested within 48 hours of publication&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;old&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Ingested 48 hours to 7 days after publication&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;stale&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Ingested more than 7 days after publication&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;unknown&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;No &lt;code&gt;publish_date&lt;/code&gt; available&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;Combined with &lt;code&gt;ingestion_lag_hours&lt;/code&gt; (the continuous metric) and a composite &lt;code&gt;data_value_score&lt;/code&gt; (freshness 40%, completeness 40%, accuracy 20%), Silver consumers can filter by quality without losing data. Articles below &lt;code&gt;data_value_score &amp;lt; 0.3&lt;/code&gt; are excluded from Silver — but they remain in Bronze if you ever need to reprocess them with different thresholds.&lt;/p&gt;

&lt;p&gt;The key decision: &lt;strong&gt;flag, don't drop.&lt;/strong&gt; A week-old article about a funding round is still valuable. Dropping it because it's "stale" loses signal. Flagging it lets the consumer decide.&lt;/p&gt;

&lt;h2&gt;
  
  
  Scaling: Kafka Consumer Groups
&lt;/h2&gt;

&lt;p&gt;Each stage scales horizontally via Kafka consumer groups. If &lt;code&gt;sentinel.raw_html&lt;/code&gt; has 3 partitions, you can run up to 3 parser instances in parallel — each gets a partition, no code changes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;parser-0&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;python -m sentinel.consumers.llm_parser &lt;/span&gt;&lt;span class="m"&gt;0&lt;/span&gt;
&lt;span class="na"&gt;parser-1&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;python -m sentinel.consumers.llm_parser &lt;/span&gt;&lt;span class="m"&gt;1&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Kafka rebalances automatically when consumers join or crash. But the real throughput ceiling isn't partitions — it's LLM cost. Three parsers at 2 calls/minute each gives 6 articles per minute. Switching to a cheaper model (GPT-4o-mini, Haiku, or DeepSeek at ~$0.001/article) shifts the bottleneck from cost to Kafka partitions, which is a one-command fix.&lt;/p&gt;

&lt;h2&gt;
  
  
  What Production Would Look Like
&lt;/h2&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Local&lt;/th&gt;
&lt;th&gt;Production&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Kafka (Docker)&lt;/td&gt;
&lt;td&gt;Amazon MSK&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Redis (Docker)&lt;/td&gt;
&lt;td&gt;ElastiCache&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Delta Lake (local filesystem)&lt;/td&gt;
&lt;td&gt;Delta Lake on S3&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Manual &lt;code&gt;docker-compose up&lt;/code&gt;
&lt;/td&gt;
&lt;td&gt;ECS Fargate + KEDA autoscaling&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;PySpark batch transform&lt;/td&gt;
&lt;td&gt;Spark Structured Streaming (one-line swap)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;localhost API&lt;/td&gt;
&lt;td&gt;ECS Fargate + ALB&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;The CDF-based transform is designed so switching to streaming is a one-line change: &lt;code&gt;spark.read&lt;/code&gt; becomes &lt;code&gt;spark.readStream&lt;/code&gt;. The MERGE logic, dedup, and quality filters stay identical.&lt;/p&gt;

&lt;h2&gt;
  
  
  What I Learned
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;The LLM is a liability, not a feature.&lt;/strong&gt; The attention framework needs attention. It's the slowest, most expensive, least deterministic component in the pipeline. Everything around it — rate limiting, DLQ with exponential backoff, pluggable providers, content hashing to avoid re-extraction — exists to manage that liability. The architecture assumes the LLM will fail.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Streaming and batch coexist.&lt;/strong&gt; Sentinel isn't purely streaming or purely batch. Kafka coordinates the streaming stages, CDF coordinates the storage stages, and the batch transform sits at the boundary. Production systems almost always have both patterns running in parallel.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Dedup is a system design problem, not a single filter.&lt;/strong&gt; Each dedup layer catches different classes of duplicates at different costs. Redis is fast but ephemeral. Delta MERGE is durable but expensive. Stacking them means each layer only handles what the previous one missed.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;The progression:&lt;/strong&gt; Ballistics (batch, Airflow, S3) → Pulse (streaming, Kafka, exactly-once) → Sentinel (streaming + LLM, Kafka, Delta Lake, stateful MERGE). Three projects, three ingestion paradigms, each building on the patterns learned in the last.&lt;/p&gt;

&lt;p&gt;Code: &lt;a href="https://github.com/ayoabass777/Sentinel" rel="noopener noreferrer"&gt;github.com/ayoabass777/Sentinel&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Ayomide Abass — Data Engineer, Vancouver&lt;/em&gt;&lt;br&gt;
&lt;a href="https://www.linkedin.com/in/ayomide-abass-36b40025a/" rel="noopener noreferrer"&gt;LinkedIn&lt;/a&gt; · &lt;a href="https://github.com/ayoabass777" rel="noopener noreferrer"&gt;GitHub&lt;/a&gt;&lt;/p&gt;

</description>
      <category>ai</category>
      <category>systemdesign</category>
      <category>sideprojects</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title># Building a Streaming Session Analytics Pipeline with Kafka, Postgres, and dbt</title>
      <dc:creator>ayoabass777</dc:creator>
      <pubDate>Sat, 18 Apr 2026 23:10:46 +0000</pubDate>
      <link>https://dev.to/ayoabass777/-building-a-streaming-session-analytics-pipeline-with-kafka-postgres-and-dbt-4n79</link>
      <guid>https://dev.to/ayoabass777/-building-a-streaming-session-analytics-pipeline-with-kafka-postgres-and-dbt-4n79</guid>
      <description>&lt;p&gt;&lt;em&gt;How I built an end-to-end clickstream pipeline with exactly-once delivery guarantees&lt;/em&gt;&lt;/p&gt;




&lt;p&gt;When I set out to build Pulse, I had a specific goal: demonstrate that I could work with streaming data, not just batch. My first portfolio project (Ballistics) was a batch pipeline — API calls on a schedule, Airflow orchestration, daily refreshes. That's the bread and butter of most data engineering work, but it's only half the picture.&lt;/p&gt;

&lt;p&gt;Pulse is the other half. Real-time events flowing through Kafka, landing in Postgres, transformed by dbt into session analytics. Same dbt layer, completely different ingestion paradigm.&lt;/p&gt;

&lt;h2&gt;
  
  
  What I Built
&lt;/h2&gt;

&lt;p&gt;Pulse is a session analytics pipeline that processes clickstream events in real-time:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Event Simulator → Kafka → Python Consumer → Postgres → dbt → Metabase
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The simulator generates realistic user behavior — page views, product views, add-to-cart events, checkouts, and payments. These flow through Kafka, get written to Postgres with exactly-once semantics, and dbt transforms them into:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Session metrics&lt;/strong&gt; — duration, bounce rate, landing pages&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Funnel analysis&lt;/strong&gt; — step-by-step conversion from awareness to purchase&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;User engagement&lt;/strong&gt; — DAU/WAU/MAU with stickiness ratios&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzxk7ke87pdmimypeotu7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzxk7ke87pdmimypeotu7.png" alt="Sessions Dashboard" width="800" height="451"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Architecture
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;┌─────────────┐     ┌──────────┐     ┌──────────────┐     ┌──────────┐     ┌───────────┐
│   Event     │     │  Kafka   │     │   Python     │     │ Postgres │     │ Metabase  │
│  Simulator  │────▶│ (KRaft)  │────▶│  Consumer    │────▶│   raw    │────▶│ Dashboard │
│  (producer) │     │          │     │              │     │  events  │     │           │
└─────────────┘     └──────────┘     └──────────────┘     └────┬─────┘     └───────────┘
                         │                                     │
                    ┌────▼─────┐                          ┌────▼─────┐
                    │   DLQ    │                          │   dbt    │
                    │  topic   │                          │ models   │
                    └──────────┘                          └──────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Everything runs locally in Docker — Kafka in KRaft mode (no Zookeeper), Postgres, and Metabase. The producer and consumer are Python scripts.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Hard Part: Exactly-Once Delivery
&lt;/h2&gt;

&lt;p&gt;The most interesting engineering challenge was achieving exactly-once semantics end-to-end. This required two separate mechanisms working together:&lt;/p&gt;

&lt;h3&gt;
  
  
  Layer 1: Idempotent Producer (&lt;code&gt;enable.idempotence=True&lt;/code&gt;)
&lt;/h3&gt;

&lt;p&gt;When the producer sends a message and the network times out, it doesn't know if Kafka received it. So it retries. Without idempotence, you'd get duplicate messages in the topic.&lt;/p&gt;

&lt;p&gt;The idempotent producer solves this by tagging each message with a sequence number. If Kafka already has that sequence, it silently drops the retry. Duplicates never enter the topic.&lt;/p&gt;

&lt;h3&gt;
  
  
  Layer 2: Idempotent Consumer Key (&lt;code&gt;ON CONFLICT DO NOTHING&lt;/code&gt;)
&lt;/h3&gt;

&lt;p&gt;Even with an idempotent producer, the consumer can still create duplicates. Here's how:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Consumer reads message from Kafka&lt;/li&gt;
&lt;li&gt;Consumer writes row to Postgres ✓&lt;/li&gt;
&lt;li&gt;Consumer crashes before committing offset to Kafka&lt;/li&gt;
&lt;li&gt;Consumer restarts, replays from last committed offset&lt;/li&gt;
&lt;li&gt;Consumer writes the same row again ← duplicate&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The fix is an idempotent key. Every event gets a unique &lt;code&gt;event_id&lt;/code&gt; derived from &lt;code&gt;user_id&lt;/code&gt; + &lt;code&gt;kafka_timestamp_ms&lt;/code&gt;. The Postgres table has a primary key constraint on this field, and every insert uses &lt;code&gt;ON CONFLICT (event_id) DO NOTHING&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;When the replay happens, Postgres silently rejects the duplicate. No error, no data corruption.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;raw&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="p"&gt;(...)&lt;/span&gt; &lt;span class="k"&gt;VALUES&lt;/span&gt; &lt;span class="p"&gt;(...)&lt;/span&gt;
&lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;CONFLICT&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;DO&lt;/span&gt; &lt;span class="k"&gt;NOTHING&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;These two mechanisms are not the same thing.&lt;/strong&gt; The producer idempotence prevents duplicates &lt;em&gt;in Kafka&lt;/em&gt;. The consumer idempotent key prevents duplicates &lt;em&gt;in Postgres&lt;/em&gt;. You need both for end-to-end exactly-once.&lt;/p&gt;

&lt;p&gt;I verified this works by stopping the consumer mid-stream, resetting the Kafka offset to the beginning, and replaying all messages. Zero duplicates in Postgres.&lt;/p&gt;

&lt;h2&gt;
  
  
  Two Timestamps, Two Purposes
&lt;/h2&gt;

&lt;p&gt;Every event carries two timestamps:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Timestamp&lt;/th&gt;
&lt;th&gt;Source&lt;/th&gt;
&lt;th&gt;Purpose&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;event_timestamp&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Producer (business time)&lt;/td&gt;
&lt;td&gt;When did the user act? Used for session ordering.&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;kafka_timestamp&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Kafka broker (ingestion time)&lt;/td&gt;
&lt;td&gt;When did we receive it? Used for freshness checks.&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;Session reconstruction uses &lt;code&gt;event_timestamp&lt;/code&gt; because that's the truth of user behavior. &lt;code&gt;kafka_timestamp&lt;/code&gt; is for operational concerns — "is data flowing?" and "how stale is the latest batch?"&lt;/p&gt;

&lt;p&gt;This distinction matters because events can arrive out of order. A user might click at 10:00:01, but network latency means Kafka receives it at 10:00:03. If you sessionize on ingestion time, you get wrong session boundaries.&lt;/p&gt;

&lt;h2&gt;
  
  
  Session Reconstruction in SQL
&lt;/h2&gt;

&lt;p&gt;The sessionization logic uses a 30-minute inactivity gap — industry standard for web analytics. If a user is idle for more than 30 minutes, the next event starts a new session.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- Flag new sessions based on time gap&lt;/span&gt;
&lt;span class="k"&gt;CASE&lt;/span&gt;
  &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;prev_event_timestamp&lt;/span&gt; &lt;span class="k"&gt;IS&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;  &lt;span class="c1"&gt;-- first event&lt;/span&gt;
  &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;event_timestamp&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;prev_event_timestamp&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'30 minutes'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
  &lt;span class="k"&gt;ELSE&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
&lt;span class="k"&gt;END&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;is_new_session&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then a running sum of those flags gives each event its session number:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SUM&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;is_new_session&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;
  &lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;event_timestamp&lt;/span&gt;
  &lt;span class="k"&gt;ROWS&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;UNBOUNDED&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;session_number&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I chose to do this in dbt (batch) rather than a stream processor (Flink/Spark Streaming) deliberately. The session definition is still evolving — maybe 30 minutes becomes 20, maybe we add page-specific rules. SQL is testable, rerunnable, and version-controlled. Once the rules stabilize, I can move to stream processing if latency requires it.&lt;/p&gt;

&lt;h2&gt;
  
  
  Error Handling: DLQ for Non-Transient Errors Only
&lt;/h2&gt;

&lt;p&gt;Not all errors are equal:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Error Type&lt;/th&gt;
&lt;th&gt;Example&lt;/th&gt;
&lt;th&gt;Action&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Transient&lt;/td&gt;
&lt;td&gt;Connection timeout, deadlock&lt;/td&gt;
&lt;td&gt;Don't commit offset → Kafka replays automatically&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Non-transient&lt;/td&gt;
&lt;td&gt;Missing required field, bad data type&lt;/td&gt;
&lt;td&gt;Route to DLQ topic → commit offset to unblock&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;Transient errors fix themselves. Just let Kafka replay. Non-transient errors need human attention, so they go to a dead letter queue where someone can inspect and decide what to do.&lt;/p&gt;

&lt;p&gt;The key insight: &lt;strong&gt;DLQ is for errors you can't retry your way out of.&lt;/strong&gt; If you DLQ transient errors, you're throwing away free retries.&lt;/p&gt;

&lt;h2&gt;
  
  
  Funnel Analysis
&lt;/h2&gt;

&lt;p&gt;The funnel tracks users through a five-step purchase journey:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Page View (0) → Product View (1) → Add to Cart (2) → Checkout (3) → Payment (4)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each event carries a &lt;code&gt;funnel_step_index&lt;/code&gt; from the producer. dbt aggregates this into daily conversion rates:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- What % of users who viewed a product added it to cart?&lt;/span&gt;
&lt;span class="n"&gt;ROUND&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;users_step_2&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="k"&gt;NULLIF&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;users_step_1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;cvr_product_to_cart&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2tcv9mm7ra8at86t7ate.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2tcv9mm7ra8at86t7ate.png" alt="Funnel Dashboard" width="800" height="456"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  DAU/WAU/MAU with Stickiness
&lt;/h2&gt;

&lt;p&gt;User engagement uses rolling windows:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;DAU&lt;/strong&gt;: Distinct users today&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;WAU&lt;/strong&gt;: Distinct users in the last 7 days&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;MAU&lt;/strong&gt;: Distinct users in the last 30 days&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Stickiness&lt;/strong&gt;: DAU / MAU — how often do monthly users come back daily?&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;A stickiness of 25% means a quarter of your monthly users are daily actives. Consumer apps aim for 20%+. Below 10% suggests users try the product once and ghost.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2tmmgz0yfkw26wdb54ul.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2tmmgz0yfkw26wdb54ul.png" alt="Engagement Dashboard" width="800" height="459"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Late Data Detection
&lt;/h2&gt;

&lt;p&gt;In streaming, events can arrive out of order. A user clicks at 10:00:00, but network lag means Kafka receives it at 10:06:00. That's "late" data.&lt;/p&gt;

&lt;p&gt;Pulse flags these in the staging layer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CASE&lt;/span&gt;
  &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;kafka_timestamp&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;event_timestamp&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'5 minutes'&lt;/span&gt; 
  &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="k"&gt;TRUE&lt;/span&gt;
  &lt;span class="k"&gt;ELSE&lt;/span&gt; &lt;span class="k"&gt;FALSE&lt;/span&gt;
&lt;span class="k"&gt;END&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;is_late&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I chose &lt;strong&gt;flagging over exclusion&lt;/strong&gt;. Late events still contribute to session reconstruction — they're just marked for observability. If late data becomes a problem (&amp;gt;5% of events), that's a signal to investigate upstream latency, not throw data away.&lt;/p&gt;

&lt;h2&gt;
  
  
  What's Not Here (And Why)
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;No Airflow.&lt;/strong&gt; Pulse is event-driven. The consumer runs continuously, reacting to Kafka messages. There's nothing to schedule for ingestion. dbt runs on a simple cron or EventBridge trigger — Airflow would be overkill for a single transform job.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;No S3 landing zone.&lt;/strong&gt; For a production deployment, I'd add S3 between Kafka and Postgres as a raw archive layer. Enables replay from cold storage if the database needs to be rebuilt. I documented this in the production architecture doc but didn't implement it locally — diminishing returns for a portfolio project.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Simulated data, not real traffic.&lt;/strong&gt; The event simulator generates fake clickstream. Real production would swap in a JavaScript SDK tracking actual user behavior. The pipeline architecture doesn't change — only the producer does.&lt;/p&gt;

&lt;h2&gt;
  
  
  Production Path
&lt;/h2&gt;

&lt;p&gt;If this were going to production on AWS:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Local&lt;/th&gt;
&lt;th&gt;Production&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Kafka (Docker)&lt;/td&gt;
&lt;td&gt;Amazon MSK&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Postgres (Docker)&lt;/td&gt;
&lt;td&gt;Amazon RDS&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Manual dbt runs&lt;/td&gt;
&lt;td&gt;EventBridge + Lambda&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;localhost:3000&lt;/td&gt;
&lt;td&gt;QuickSight or Power BI Service&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;The patterns stay identical. Idempotent producer, idempotent consumer key, DLQ for non-transient errors, dbt for transforms. Just swap local containers for managed services.&lt;/p&gt;

&lt;h2&gt;
  
  
  What I Learned
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Exactly-once is a chain, not a single mechanism.&lt;/strong&gt; Producer idempotence and consumer idempotent keys solve different failure modes. You need both.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Timestamps are a design decision.&lt;/strong&gt; Business time vs ingestion time isn't academic — it affects session reconstruction correctness.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;DLQ is for non-retryable errors.&lt;/strong&gt; Transient failures should replay from Kafka, not clutter your dead letter queue.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;dbt works for streaming too.&lt;/strong&gt; The transform layer doesn't care if events arrived via batch API or real-time Kafka. Same staging → intermediate → marts pattern.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  What's Next
&lt;/h2&gt;

&lt;p&gt;This completes my second portfolio project. Ballistics showed batch patterns (API → Airflow → S3 → Postgres → dbt). Pulse shows streaming patterns (Kafka → Postgres → dbt). Together they tell the story: I can work across both paradigms.&lt;/p&gt;

&lt;p&gt;Next up: an AI-flavoured pipeline. RAG ingestion, embeddings, vector store. The "DE + AI" trend isn't about building ML models — it's about building pipelines that feed them.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;Code:&lt;/strong&gt; &lt;a href="https://github.com/ayoabass777/Pulse" rel="noopener noreferrer"&gt;github.com/ayoabass777/Pulse&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Author:&lt;/strong&gt; Ayomide Abass — Data Engineer, Vancouver&lt;br&gt;&lt;br&gt;
&lt;a href="https://www.linkedin.com/in/ayomide-abass-36b40025a/" rel="noopener noreferrer"&gt;LinkedIn&lt;/a&gt; · &lt;a href="https://github.com/ayoabass777" rel="noopener noreferrer"&gt;GitHub&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>kafka</category>
      <category>dbt</category>
      <category>postgres</category>
    </item>
    <item>
      <title>Building a Football Analytics Pipeline: Patterns, Tradeoffs, and What Production Would Look Like</title>
      <dc:creator>ayoabass777</dc:creator>
      <pubDate>Sun, 12 Apr 2026 02:50:39 +0000</pubDate>
      <link>https://dev.to/ayoabass777/building-a-football-analytics-pipeline-patterns-tradeoffs-and-what-production-would-look-like-n66</link>
      <guid>https://dev.to/ayoabass777/building-a-football-analytics-pipeline-patterns-tradeoffs-and-what-production-would-look-like-n66</guid>
      <description>&lt;p&gt;Football is the most watched sport on the planet. Millions of fans follow their teams across leagues, tracking form, streaks, and head-to-head records. I built &lt;strong&gt;Ballistics&lt;/strong&gt; — a pipeline that automates the ingestion, transformation, and analytics of football data, currently covering 19 leagues across 15 European countries. The goal is to serve fans with analytical data about their favourite teams: streak tracking, head-to-head breakdowns, and performance metrics.&lt;/p&gt;

&lt;p&gt;This isn't a production system. It's a portfolio project. But every pattern in it maps to a real-world equivalent, and every shortcut I took, I can explain what the production version would look like. That's what this post is about.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Repo:&lt;/strong&gt; &lt;a href="https://github.com/ayoabass777/ballistics" rel="noopener noreferrer"&gt;github.com/ayoabass777/ballistics&lt;/a&gt;&lt;/p&gt;




&lt;h2&gt;
  
  
  The Architecture
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;                         ┌──────────────┐
                         │ Football API │
                         │  (RapidAPI)  │
                         └──────┬───────┘
                                │
                      ┌─────────▼──────────┐
                      │  Apache Airflow    │
                      │  (Dockerized)      │
                      └──┬─────────────┬───┘
                         │             │
              ┌──────────▼──┐   ┌──────▼──────────┐
              │  S3 Landing │   │   Postgres       │
              │  Zone (raw) │──►│   raw schema     │
              └──────┬──────┘   └──────┬───────────┘
                     │                 │
              ┌──────▼──────┐   ┌──────▼───────┐
              │  S3 DLQ     │   │     dbt      │
              │  (failures) │   │  (Docker)    │
              └──────┬──────┘   └──────┬───────┘
                     │                 │
              ┌──────▼──────┐    ┌─────┼─────────────┐
              │ Replay DAG  │    │     │             │
              │ (re-extract)│    ┌────▼───┐ ┌─────▼────┐ ┌────▼────┐
              └─────────────┘    │  stg   │ │   int    │ │  mart   │
                                 │ views  │ │  tables  │ │ tables  │
                                 └────────┘ └──────────┘ └─────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The pipeline has three DAGs, each with a distinct job:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Bootstrap&lt;/strong&gt; — one-time setup triggered by config changes&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Incremental&lt;/strong&gt; — daily fixture updates, standings refresh, dbt transforms&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Replay&lt;/strong&gt; — retries failed extractions from a dead letter queue&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Let me walk through each one and explain why I made the choices I did.&lt;/p&gt;




&lt;h2&gt;
  
  
  Bootstrap: Adding a League Should Be a Config Change
&lt;/h2&gt;

&lt;p&gt;The bootstrap DAG runs on a daily schedule, but most days it does nothing. A custom &lt;code&gt;MetadataChangeSensor&lt;/code&gt; compares the modification time of &lt;code&gt;metadata.yaml&lt;/code&gt; against the last-processed timestamp stored as an Airflow Variable. If the file hasn't changed, all downstream tasks soft-fail and skip.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MetadataChangeSensor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseSensorOperator&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;poke&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;bool&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;current_mtime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getmtime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;filepath&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;last_mtime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;float&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Variable&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata_last_mtime&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;default_var&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;current_mtime&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;last_mtime&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;Variable&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata_last_mtime&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_mtime&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When you &lt;em&gt;do&lt;/em&gt; edit &lt;code&gt;metadata.yaml&lt;/code&gt; — say, adding the Egyptian Premier League — the sensor triggers the full chain: create schemas, extract metadata from the API, full-load all historical fixtures into S3, then load into Postgres.&lt;/p&gt;

&lt;p&gt;The design principle: &lt;strong&gt;adding a league is a config change, not a code change.&lt;/strong&gt; You edit a YAML file, and the pipeline handles the rest.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why a sensor and not a manual trigger?
&lt;/h3&gt;

&lt;p&gt;I considered using &lt;code&gt;TriggerDagRunOperator&lt;/code&gt; from a separate "admin" DAG, or just running the bootstrap manually. The sensor pattern won over because it's self-documenting — the DAG itself encodes when it should run. No one has to remember to trigger it, no runbook to maintain. Edit the file, walk away.&lt;/p&gt;

&lt;h3&gt;
  
  
  Parallel Branches
&lt;/h3&gt;

&lt;p&gt;The bootstrap DAG has a fan-out/fan-in structure. Two schema creation tasks — metadata schema (dim tables) and raw fixtures schema — run in parallel before converging at the extract step:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sensor ──► metadata_schema ──► extract_metadata ──┐
                                                   ├──► extract_fixtures ──► load_fixtures
sensor ──► raw_fixtures_schema ───────────────────┘
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is a small optimisation, but it demonstrates an important principle: tasks that don't depend on each other shouldn't wait for each other. In a production pipeline with heavier DDL operations or network-bound tasks, this pattern pays off significantly.&lt;/p&gt;

&lt;h3&gt;
  
  
  XCom: Keep It Lightweight
&lt;/h3&gt;

&lt;p&gt;The extract task returns a list of S3 keys via XCom. The load task consumes them. The critical decision here: &lt;strong&gt;pass URIs, not data.&lt;/strong&gt; XCom stores values in the Airflow metadata database. Stuffing raw JSON payloads into XCom creates bloat and can crash the metadata DB in production. S3 keys are just strings — a few bytes each.&lt;/p&gt;

&lt;p&gt;In production, I'd go further and use an S3-backed XCom backend. But for a demo, default XCom with lightweight strings works fine.&lt;/p&gt;




&lt;h2&gt;
  
  
  S3 as a Raw Landing Zone
&lt;/h2&gt;

&lt;p&gt;Every extraction — whether full load or incremental — writes raw JSON to S3 before touching Postgres. The prefix structure makes each write immutable and replayable:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;full_load/{league_id}/{season}/{ds}/fixtures.json
incremental/{ds}/fixtures.json
dlq/{league_id}/{season}/{ds}/error.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Why not write directly to Postgres?
&lt;/h3&gt;

&lt;p&gt;If the Postgres load fails (network issue, schema mismatch, disk full), the raw data is still sitting in S3. I can replay the load from the landing zone without re-hitting the API, which matters when the API has rate limits and costs per call.&lt;/p&gt;

&lt;p&gt;This is a common pattern in production pipelines: &lt;strong&gt;extract once, load many times.&lt;/strong&gt; S3 is cheap storage that gives you a replayable audit trail. Think of it as a recording studio approach — capture the raw signal first, then process it. You can always reprocess, but you can't un-lose a raw recording.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why date-partitioned prefixes?
&lt;/h3&gt;

&lt;p&gt;Each write goes to a unique prefix that includes the execution date (&lt;code&gt;{ds}&lt;/code&gt;). This means incremental loads never overwrite each other. If I need to debug what was loaded on a specific day, the S3 prefix tells me exactly where to look.&lt;/p&gt;

&lt;p&gt;In production, you'd add lifecycle policies to move older prefixes to S3 Infrequent Access or Glacier. Football fixture data is public and re-fetchable, so I didn't bother with versioning — but for non-recoverable data, I'd enable it.&lt;/p&gt;




&lt;h2&gt;
  
  
  Failure Handling: The Dead Letter Queue
&lt;/h2&gt;

&lt;p&gt;Not every API call succeeds. Rate limits, network timeouts, malformed responses — any extraction can fail. The question is: what happens when it does?&lt;/p&gt;

&lt;p&gt;The bootstrap DAG doesn't retry inline. Instead, failed extractions write an error record to the &lt;code&gt;dlq/&lt;/code&gt; prefix in S3 with enough context to retry later:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;write_to_dlq&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;api_league_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;season_year&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ds&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;error&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;league_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;""&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;api_league_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;api_league_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;season_year&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;season_year&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;league_name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;league_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ds&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ds&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;error&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;error&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;traceback&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;traceback&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;format_exc&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;timestamp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timezone&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;utc&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;isoformat&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;put_object&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Bucket&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bucket&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;dlq/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;api_league_id&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;season_year&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;ds&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/error.json&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;...)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;A separate &lt;strong&gt;Replay DAG&lt;/strong&gt; runs daily, scans the DLQ, re-extracts from the API, and loads successfully replayed entries into Postgres. After replay, it checks if any entries remain and logs a warning. In production, this is where you'd wire up a Slack or PagerDuty alert.&lt;/p&gt;

&lt;h3&gt;
  
  
  Why a separate DAG instead of retries?
&lt;/h3&gt;

&lt;p&gt;Airflow has built-in retries (&lt;code&gt;retries=3&lt;/code&gt;, &lt;code&gt;retry_delay=timedelta(minutes=5)&lt;/code&gt;). Why not just use those?&lt;/p&gt;

&lt;p&gt;Because the failure mode matters. If one league-season fails during a bootstrap of 30 league-seasons, I don't want to re-run the entire bootstrap DAG. The DLQ pattern isolates failures — the 29 successful extractions are safe in S3 and loaded into Postgres. Only the failed one needs another attempt, and it happens independently without blocking anything else.&lt;/p&gt;

&lt;p&gt;This is the same pattern AWS SQS dead letter queues use. The idea: &lt;strong&gt;separate the happy path from the recovery path.&lt;/strong&gt; Each can run on its own schedule, with its own retry logic, without coupling them together.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Incremental Path: Daily Updates and Edge Cases
&lt;/h2&gt;

&lt;p&gt;The daily DAG picks up fixtures that have been played since the last run — specifically, fixtures where &lt;code&gt;kickoff_utc&lt;/code&gt; is in the past but fulltime goals are still null. It fetches updates by fixture ID, writes to S3, upserts into Postgres, corrects any rescheduled kickoff times, refreshes league standings, and triggers dbt.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;extract_updates ──► load_updates ──► fixture_corrections ──► update_standings ──► dbt_run
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  The Belgian Playoff Problem
&lt;/h3&gt;

&lt;p&gt;A week after going live, I noticed warnings in the logs: &lt;code&gt;Fixture 1540006 not found in DB; skipping.&lt;/code&gt; The fixture correction script fetches the last/next N fixtures per league from the API and checks them against the database. These fixture IDs existed in the API but not in my database.&lt;/p&gt;

&lt;p&gt;Investigation revealed these were Belgium's Jupiler Pro League championship playoff fixtures. Belgian football splits into playoff groups mid-season, and the league body creates entirely new fixture IDs for the playoff round. My bootstrap captured the regular season, but the playoff fixtures were created after that.&lt;/p&gt;

&lt;p&gt;This exposed a gap: &lt;strong&gt;the incremental path only updates existing fixtures.&lt;/strong&gt; It doesn't insert new ones. Newly created fixtures (playoffs, rescheduled additions, cup ties added mid-season) get silently skipped.&lt;/p&gt;

&lt;p&gt;For now, re-bootstrapping the affected league-season picks them up. The proper fix is adding an "insert new fixtures" path to the incremental flow. I've documented this as a known gap — acknowledging it matters more to me than pretending it doesn't exist.&lt;/p&gt;

&lt;h3&gt;
  
  
  Standings Deduplication
&lt;/h3&gt;

&lt;p&gt;Another edge case from Belgium: the API returns multiple standings tables for the same league-season when playoff groups are active. The same team appears in both the regular season table and the playoff table, both tagged with the same &lt;code&gt;league_season_id&lt;/code&gt;. Postgres rejected the batch upsert because two rows with the same &lt;code&gt;(league_season_id, api_team_id)&lt;/code&gt; can't appear in a single INSERT.&lt;/p&gt;

&lt;p&gt;The fix: deduplicate before upserting, keeping the last occurrence (the most current table). A small function, but it's the kind of thing that only surfaces with real data from real APIs — not from tutorials.&lt;/p&gt;




&lt;h2&gt;
  
  
  dbt: Three Layers of Transformation
&lt;/h2&gt;

&lt;p&gt;The dbt project follows a standard staging → intermediate → mart structure:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Staging&lt;/strong&gt; views are thin wrappers over raw tables. They rename columns, cast types, and derive flags like &lt;code&gt;is_played&lt;/code&gt; from &lt;code&gt;fixture_status = 'FT'&lt;/code&gt;. The staging layer uses incremental materialisation keyed on &lt;code&gt;api_fixture_id&lt;/code&gt;, so daily runs only process changed fixtures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Intermediate&lt;/strong&gt; tables enrich fixtures with league context (league name, season label, current season flag) and team dimensions. This is where streak computation happens — win runs, clean sheet runs, scoring runs — along with relevance scores based on configurable weights stored as dbt seeds.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Mart&lt;/strong&gt; tables assemble API-ready JSON payloads for the frontend: team pages, fixture pages, head-to-head breakdowns, homepage streak rankings. These rebuild fully on each run — at ~10k fixtures, that's fine. At higher volumes, you'd push incremental materialisation further down the DAG.&lt;/p&gt;

&lt;h3&gt;
  
  
  Data Quality at Three Levels
&lt;/h3&gt;

&lt;p&gt;I set up tests at each layer, each catching a different class of problem:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Source tests&lt;/strong&gt; validate the raw data at the door. Not-null and unique constraints on primary keys, accepted-value checks on result columns (&lt;code&gt;win&lt;/code&gt;, &lt;code&gt;draw&lt;/code&gt;, &lt;code&gt;loss&lt;/code&gt;, null). Source freshness monitoring warns at 24 hours stale, errors at 48 hours. Freshness tests don't validate the data itself — they validate that the pipeline is &lt;em&gt;running&lt;/em&gt;. A passing not-null test on a table that hasn't been updated in a week is a false sense of security.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Staging model tests&lt;/strong&gt; validate the output of my transforms. Key uniqueness, not-null on derived columns like &lt;code&gt;is_played&lt;/code&gt;, and referential integrity — &lt;code&gt;stg_dim_league_seasons.league_id&lt;/code&gt; must reference a valid &lt;code&gt;stg_dim_leagues.league_id&lt;/code&gt;. These are like foreign key constraints enforced at test time rather than in the database.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Singular tests&lt;/strong&gt; encode business logic. I wrote one: &lt;code&gt;assert_no_future_completed_fixtures.sql&lt;/code&gt; — no fixture should have &lt;code&gt;is_played = TRUE&lt;/code&gt; with a &lt;code&gt;kickoff_utc&lt;/code&gt; in the future. This catches data quality issues where the API returns a "finished" status for a match that hasn't happened yet, typically a sync bug or timezone mismatch.&lt;/p&gt;

&lt;p&gt;The distinction between generic and singular tests matters. Generic tests (not_null, unique) are reusable patterns. Singular tests are custom SQL queries specific to your domain. Having both shows you understand data quality isn't just "add not_null to everything."&lt;/p&gt;




&lt;h2&gt;
  
  
  What Production Would Look Like
&lt;/h2&gt;

&lt;p&gt;This project runs on Docker Compose on my laptop. Here's what I'd change for a real deployment:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Aspect&lt;/th&gt;
&lt;th&gt;Demo (current)&lt;/th&gt;
&lt;th&gt;Production&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Airflow&lt;/td&gt;
&lt;td&gt;Local Docker Compose&lt;/td&gt;
&lt;td&gt;EC2/ECS with IAM role&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Credentials&lt;/td&gt;
&lt;td&gt;Access keys in &lt;code&gt;.env&lt;/code&gt;
&lt;/td&gt;
&lt;td&gt;Role-based, no keys in code&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;S3&lt;/td&gt;
&lt;td&gt;Single bucket, prefix-partitioned&lt;/td&gt;
&lt;td&gt;Separate buckets per environment&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Versioning&lt;/td&gt;
&lt;td&gt;Off&lt;/td&gt;
&lt;td&gt;On for critical data&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Encryption&lt;/td&gt;
&lt;td&gt;SSE-S3 (default)&lt;/td&gt;
&lt;td&gt;SSE-KMS for sensitive data&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;IAM&lt;/td&gt;
&lt;td&gt;Admin user + access keys&lt;/td&gt;
&lt;td&gt;Least-privilege roles per service&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;The biggest shift: &lt;strong&gt;moving from access keys to IAM roles.&lt;/strong&gt; In the demo, Airflow connects to S3 using access keys stored in &lt;code&gt;.env&lt;/code&gt;. In production, the Airflow instance would run on EC2 or ECS with an IAM role attached — no keys in code, no keys to rotate, no keys to leak.&lt;/p&gt;

&lt;p&gt;I'd also add proper monitoring: the DLQ check task currently logs a warning. In production, that becomes a Slack notification or PagerDuty alert. Source freshness errors would trigger similar alerts. The infrastructure for observability is already there in the pipeline design — it just needs to be wired to real alerting tools.&lt;/p&gt;




&lt;h2&gt;
  
  
  What I'd Do Differently
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;The playoff gap&lt;/strong&gt; is the most concrete lesson. I assumed that bootstrapping a league-season once would capture all fixtures. It doesn't — leagues create new fixtures mid-season for playoffs, rescheduled matches, and cup ties. The incremental path needs an "insert new fixtures" capability, not just updates.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;dbt schema management&lt;/strong&gt; gave me trouble. I didn't initially set up a separate profile for the project, so both the old and new dbt projects shared the same &lt;code&gt;profiles.yml&lt;/code&gt; block. Changing the database for one would break the other. The fix was simple — a separate &lt;code&gt;ballistics&lt;/code&gt; profile — but it cost me a debugging session that could have been avoided with upfront isolation.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Docker-in-Docker&lt;/strong&gt; is a footgun. Running dbt via &lt;code&gt;DockerOperator&lt;/code&gt; inside a Dockerized Airflow requires mounting the host's Docker socket. This works for local development but wouldn't fly in production — you'd run dbt as a dedicated container service or use the &lt;code&gt;BashOperator&lt;/code&gt; with dbt installed directly in the Airflow image.&lt;/p&gt;




&lt;h2&gt;
  
  
  Wrapping Up
&lt;/h2&gt;

&lt;p&gt;Ballistics currently tracks 19 leagues across 15 countries, ingesting and transforming fixture data daily. The patterns — sensor-gated bootstrapping, S3 landing zones, dead letter queues, layered data quality testing — are all transferable to production pipelines at any scale.&lt;/p&gt;

&lt;p&gt;The gap between a portfolio project and production isn't the patterns. It's the operational maturity: IAM roles instead of access keys, alerting instead of log messages, lifecycle policies instead of unbounded storage. Knowing that gap exists, and being able to articulate it, matters as much as the code itself.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Check out the repo:&lt;/strong&gt; &lt;a href="https://github.com/ayoabass777/ballistics" rel="noopener noreferrer"&gt;github.com/ayoabass777/ballistics&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>python</category>
      <category>aws</category>
      <category>dbt</category>
    </item>
  </channel>
</rss>
