DEV Community

Rizwan Saleem
Rizwan Saleem

Posted on

Build a resilient data ingestion pipeline with streaming SQL and event sourcing

Build a resilient data ingestion pipeline with streaming SQL and event sourcing

Build a resilient data ingestion pipeline with streaming SQL and event sourcing

In this tutorial, you’ll learn how to design and implement a practical data ingestion pipeline that handles high-throughput streams, preserves replayability, and stays robust in the face of partial failures. You’ll combine streaming SQL for continuous transformations with an event-sourced source of truth, so you can rebuild state deterministically and test end-to-end behavior confidently.

Overview

  • Goals
    • Ingest and transform high-volume data with low latency
    • Maintain an immutable, replayable source of truth via event sourcing
    • Support idempotent processing, exactly-once semantics where possible, and clear failure recovery
    • Provide observable metrics and simple testing strategies
  • Architecture
    • Event source: append-only event stream (e.g., Kafka or a local mock) capturing all changes
    • Processing layer 1: streaming SQL for near-real-time transformations
    • Processing layer 2: idempotent, fault-tolerant consumers that materialize downstream views
    • Storage: immutable event store, read models as materialized views, and an audit log
  • Key concepts
    • Event sourcing and append-only streams
    • Streaming SQL for continuous transforms
    • Exactly-once vs at-least-once semantics
    • Idempotent processing and checkpointing
    • Observability: metrics, dashboards, and alerting

Scenario
You’re collecting user activity events from a mobile app: user_id, event_type (login, purchase, view), event_timestamp, and metadata. You want to build:

  • A real-time user activity feed with counts per user
  • A purchase ledger that aggregates order totals and revenue per day
  • A deterministic replay path to rebuild derived state from the event log

Tech stack (local-friendly)

  • Event source: a Kafka-like interface or a simple in-memory event bus for demos
  • Streaming SQL: ksqlDB or Apache Flink SQL (but you can simulate with a local SQLite + SQL-like processing for concepts)
  • Language: Python (for glue code and tests)
  • Storage: local files for event store and SQLite for read models (or in-memory structures for quick demos)

Part 1: Model your events and the event store
Define the event schema and how you store events immutably.

  • Event envelope

    • event_id: string
    • aggregate_id: string (e.g., user_id)
    • event_type: string
    • data: json
    • timestamp: int (epoch millis)
    • replayed: boolean (for bookkeeping)
  • Example events

    • user_id: "u123", event_type: "login", timestamp: 1680000000000, data: {"ip": "1.2.3.4"}
    • user_id: "u123", event_type: "purchase", timestamp: 1680003600000, data: {"order_id": "o987", "amount": 49.99}
  • Event store interface (Python)

    • append_event(event)
    • read_events(since_timestamp=None)
    • replay_events(processor)

Code sketch (Python)

  • events_store.py
    • A minimal file-based event log with an index for quick reads.
import json
import os
from dataclasses import dataclass
from typing import Iterable, List, Optional

EVENTS_FILE = "events.log"

@dataclass
class Event:
    event_id: str
    aggregate_id: str
    event_type: str
    data: dict
    timestamp: int  # epoch millis

def append_event(e: Event, path: str = EVENTS_FILE):
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(e.__dict__) + "\n")

def read_events(path: str = EVENTS_FILE, since_timestamp: Optional[int] = None) -> List[Event]:
    events: List[Event] = []
    if not os.path.exists(path):
        return events
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            rec = json.loads(line)
            ev = Event(**rec)
            if since_timestamp is None or ev.timestamp >= since_timestamp:
                events.append(ev)
    return events
Enter fullscreen mode Exit fullscreen mode
  • emit sample events
import uuid
import time

from events_store import append_event, Event

def now_ms():
    return int(time.time() * 1000)

def demo_seed_events():
    events = [
        Event(event_id=str(uuid.uuid4()), aggregate_id="u1", event_type="login", data={"ip": "10.0.0.1"}, timestamp=now_ms()),
        Event(event_id=str(uuid.uuid4()), aggregate_id="u1", event_type="purchase", data={"order_id": "o1", "amount": 19.99}, timestamp=now_ms()+10),
        Event(event_id=str(uuid.uuid4()), aggregate_id="u2", event_type="login", data={"ip": "10.0.0.2"}, timestamp=now_ms()+20),
    ]
    for e in events:
        append_event(e)
Enter fullscreen mode Exit fullscreen mode

Part 2: Streaming transformations with continuous queries
You want two views:

  • Per-user activity counts by event_type
  • Daily revenue totals from purchases

Approach A (conceptual streaming SQL)

  • Use a streaming SQL engine (ksqlDB/Flink) to define continuous queries over the event stream.
  • Example streams:
    • UserActivity: select aggregate_id as user_id, event_type, count(*) as count, max(timestamp) as last_seen from Events group by aggregate_id, event_type window t.ms
    • DailyRevenue: select date_trunc('day', timestamp) as day, sum((data->>'amount')::numeric) as revenue from Events where event_type='purchase' group by day

Code-like prototype (Python-based, for local demonstration)

  • We simulate streaming by processing appended events and updating read models.
import sqlite3
import json
from datetime import datetime, timezone

DB = "read_models.db"

def init_db():
    conn = sqlite3.connect(DB)
    c = conn.cursor()
    c.execute("""
    CREATE TABLE IF NOT EXISTS user_activity (
        user_id TEXT,
        event_type TEXT,
        count INTEGER,
        last_seen INTEGER,
        PRIMARY KEY (user_id, event_type)
    )
    """)
    c.execute("""
    CREATE TABLE IF NOT EXISTS daily_revenue (
        day TEXT,
        revenue REAL,
        PRIMARY KEY (day)
    )
    """)
    conn.commit()
    conn.close()

def upsert_user_activity(user_id: str, event_type: str, ts: int):
    conn = sqlite3.connect(DB)
    c = conn.cursor()
    c.execute("""
    INSERT INTO user_activity (user_id, event_type, count, last_seen)
    VALUES (?, ?, 1, ?)
    ON CONFLICT(user_id, event_type) DO UPDATE SET
      count = count + 1,
      last_seen = max(last_seen, ?)
    """, (user_id, event_type, ts, ts))
    conn.commit()
    conn.close()

def add_daily_revenue(day: str, amount: float):
    conn = sqlite3.connect(DB)
    c = conn.cursor()
    c.execute("""
    INSERT INTO daily_revenue (day, revenue)
    VALUES (?, ?)
    ON CONFLICT(day) DO UPDATE SET revenue = revenue + ?
    """, (day, amount, amount))
    conn.commit()
    conn.close()

def process_event(e):
    if e.event_type == "login":
        upsert_user_activity(e.aggregate_id, "login", e.timestamp)
    if e.event_type == "purchase":
        day = datetime.fromtimestamp(e.timestamp/1000, tz=timezone.utc).strftime("%Y-%m-%d")
        amount = float(e.data.get("amount", 0.0))
        upsert_user_activity(e.aggregate_id, "purchase", e.timestamp)
        add_daily_revenue(day, amount)

def replay_all(events: Iterable):
    init_db()
    for e in events:
        process_event(e)
Enter fullscreen mode Exit fullscreen mode
  • Running a demo
from events_store import read_events
from datetime import datetime

init_db()
events = read_events()
replay_all(events)

### Show views
def show_views():
    conn = sqlite3.connect(DB)
    cur = conn.cursor()
    print("User Activity:")
    for row in cur.execute("SELECT user_id, event_type, count, last_seen FROM user_activity ORDER BY user_id, event_type"):
        print(row)
    print("Daily Revenue:")
    for row in cur.execute("SELECT day, revenue FROM daily_revenue ORDER BY day"):
        print(row)
    conn.close()

show_views()
Enter fullscreen mode Exit fullscreen mode

Notes on the approach

  • This simple prototype demonstrates the core ideas: an immutable event log, and materialized read models updated from the log.
  • In production, replace the Python in-memory or file-based approach with a real streaming system (Kafka or Kinesis) and a streaming SQL engine (ksqlDB or Flink SQL) to handle fault tolerance and horizontal scaling.
  • Ensure idempotency by keeping an event_id and deduplicating if events are replayed.

Part 3: Replaying and rebuilding read models deterministically

  • Because all state is derived from the event log, you can rebuild read models by replaying events from the start.
  • Implement a replay_head pointer to remember how far you’ve materialized.
  • When a failure occurs, you can reset the materialization to a known checkpoint and replay from there.

Code sketch for checkpointed replay

CHECKPOINT = "checkpoint.txt"

def get_checkpoint():
    if not os.path.exists(CHECKPOINT):
        return 0
    with open(CHECKPOINT) as f:
        return int(f.read())

def set_checkpoint(ts: int):
    with open(CHECKPOINT, "w") as f:
        f.write(str(ts))

def replay_from_checkpoint():
    last_ts = get_checkpoint()
    events = read_events(since_timestamp=last_ts)
    for e in events:
        process_event(e)
        last_ts = max(last_ts, e.timestamp)
        set_checkpoint(last_ts)
Enter fullscreen mode Exit fullscreen mode

Part 4: Observability, testing, and reliability

  • Observability
    • Metrics to emit: event_ingest_rate, per_user_activity_counts, daily_revenue, read_model_latency
    • Dashboards: a simple panel showing events per second, top users by activity, and revenue by day
  • Testing strategies
    • Unit tests for the event processing logic
    • End-to-end tests that seed events, run the replay, and verify read models match expectations
    • Replay tests: generate a known sequence, crash mid-replay, then resume and verify final state equals a ground-truth run
  • Fault-tolerance basics
    • Use an idempotent write path for read models (upsert semantics)
    • Persist checkpoints after batches of events
    • Partition the event stream so different aggregates can be processed independently

Part 5: Step-by-step guide to implement in your environment
1) Choose your event store

  • For local experimentation, start with a file-based event store (as shown) or a lightweight in-memory mock with a generator.
  • For production, use Kafka or a managed streaming platform.

2) Implement the event model and writer

  • Create Event data class
  • Implement append_event and read_events
  • Build a small seed script to generate sample events

3) Build read models with a streaming processor

  • Decide on either a streaming SQL engine or a small Python consumer
  • Implement per-user activity and daily revenue materializations
  • Persist state to a durable store (SQLite in demo, PostgreSQL in production)

4) Add replay capability

  • Implement a checkpoint mechanism
  • Enable replay from start to rebuild read models when schema changes or for disaster recovery

5) Instrument and test

  • Add unit tests for Event processing and upserts
  • Write an end-to-end test that seeds events, processes them, and asserts the resulting read models

6) Iterate and scale

  • Introduce streaming operators (group by, windowing) in your chosen engine
  • Move from file-based event store to a message bus
  • Add schema evolution handling and versioned events

Illustration: mental model of the pipeline

  • Think of the event store as a grown-up version of an immutable ledger: every action is recorded with a timestamp and a unique id.
  • The processing layer subscribes to this ledger, applies transformations in order, and updates read models that are optimized for queries.
  • If something breaks, you can reprocess the entire ledger from the beginning or from a checkpoint, and the derived state will end up exactly the same.

Would you like me to tailor this prototype to a specific tech stack you’re using (e.g., Kafka + ksqlDB, or Flink SQL with a PostgreSQL sink), and provide a ready-to-run sample project structure with scripts and tests? If yes, tell me your preferred language, data volume scale, and whether you prefer a local Docker-based setup or a cloud-ready deployment.

-

Rizwan Saleem | https://rizwansaleem.co

Top comments (0)