Build a resilient data ingestion pipeline with streaming SQL and event sourcing
Build a resilient data ingestion pipeline with streaming SQL and event sourcing
In this tutorial, you’ll learn how to design and implement a practical data ingestion pipeline that handles high-throughput streams, preserves replayability, and stays robust in the face of partial failures. You’ll combine streaming SQL for continuous transformations with an event-sourced source of truth, so you can rebuild state deterministically and test end-to-end behavior confidently.
Overview
- Goals
- Ingest and transform high-volume data with low latency
- Maintain an immutable, replayable source of truth via event sourcing
- Support idempotent processing, exactly-once semantics where possible, and clear failure recovery
- Provide observable metrics and simple testing strategies
- Architecture
- Event source: append-only event stream (e.g., Kafka or a local mock) capturing all changes
- Processing layer 1: streaming SQL for near-real-time transformations
- Processing layer 2: idempotent, fault-tolerant consumers that materialize downstream views
- Storage: immutable event store, read models as materialized views, and an audit log
- Key concepts
- Event sourcing and append-only streams
- Streaming SQL for continuous transforms
- Exactly-once vs at-least-once semantics
- Idempotent processing and checkpointing
- Observability: metrics, dashboards, and alerting
Scenario
You’re collecting user activity events from a mobile app: user_id, event_type (login, purchase, view), event_timestamp, and metadata. You want to build:
- A real-time user activity feed with counts per user
- A purchase ledger that aggregates order totals and revenue per day
- A deterministic replay path to rebuild derived state from the event log
Tech stack (local-friendly)
- Event source: a Kafka-like interface or a simple in-memory event bus for demos
- Streaming SQL: ksqlDB or Apache Flink SQL (but you can simulate with a local SQLite + SQL-like processing for concepts)
- Language: Python (for glue code and tests)
- Storage: local files for event store and SQLite for read models (or in-memory structures for quick demos)
Part 1: Model your events and the event store
Define the event schema and how you store events immutably.
-
Event envelope
- event_id: string
- aggregate_id: string (e.g., user_id)
- event_type: string
- data: json
- timestamp: int (epoch millis)
- replayed: boolean (for bookkeeping)
-
Example events
- user_id: "u123", event_type: "login", timestamp: 1680000000000, data: {"ip": "1.2.3.4"}
- user_id: "u123", event_type: "purchase", timestamp: 1680003600000, data: {"order_id": "o987", "amount": 49.99}
-
Event store interface (Python)
- append_event(event)
- read_events(since_timestamp=None)
- replay_events(processor)
Code sketch (Python)
- events_store.py
- A minimal file-based event log with an index for quick reads.
import json
import os
from dataclasses import dataclass
from typing import Iterable, List, Optional
EVENTS_FILE = "events.log"
@dataclass
class Event:
event_id: str
aggregate_id: str
event_type: str
data: dict
timestamp: int # epoch millis
def append_event(e: Event, path: str = EVENTS_FILE):
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(e.__dict__) + "\n")
def read_events(path: str = EVENTS_FILE, since_timestamp: Optional[int] = None) -> List[Event]:
events: List[Event] = []
if not os.path.exists(path):
return events
with open(path, "r", encoding="utf-8") as f:
for line in f:
rec = json.loads(line)
ev = Event(**rec)
if since_timestamp is None or ev.timestamp >= since_timestamp:
events.append(ev)
return events
- emit sample events
import uuid
import time
from events_store import append_event, Event
def now_ms():
return int(time.time() * 1000)
def demo_seed_events():
events = [
Event(event_id=str(uuid.uuid4()), aggregate_id="u1", event_type="login", data={"ip": "10.0.0.1"}, timestamp=now_ms()),
Event(event_id=str(uuid.uuid4()), aggregate_id="u1", event_type="purchase", data={"order_id": "o1", "amount": 19.99}, timestamp=now_ms()+10),
Event(event_id=str(uuid.uuid4()), aggregate_id="u2", event_type="login", data={"ip": "10.0.0.2"}, timestamp=now_ms()+20),
]
for e in events:
append_event(e)
Part 2: Streaming transformations with continuous queries
You want two views:
- Per-user activity counts by event_type
- Daily revenue totals from purchases
Approach A (conceptual streaming SQL)
- Use a streaming SQL engine (ksqlDB/Flink) to define continuous queries over the event stream.
- Example streams:
- UserActivity: select aggregate_id as user_id, event_type, count(*) as count, max(timestamp) as last_seen from Events group by aggregate_id, event_type window t.ms
- DailyRevenue: select date_trunc('day', timestamp) as day, sum((data->>'amount')::numeric) as revenue from Events where event_type='purchase' group by day
Code-like prototype (Python-based, for local demonstration)
- We simulate streaming by processing appended events and updating read models.
import sqlite3
import json
from datetime import datetime, timezone
DB = "read_models.db"
def init_db():
conn = sqlite3.connect(DB)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS user_activity (
user_id TEXT,
event_type TEXT,
count INTEGER,
last_seen INTEGER,
PRIMARY KEY (user_id, event_type)
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS daily_revenue (
day TEXT,
revenue REAL,
PRIMARY KEY (day)
)
""")
conn.commit()
conn.close()
def upsert_user_activity(user_id: str, event_type: str, ts: int):
conn = sqlite3.connect(DB)
c = conn.cursor()
c.execute("""
INSERT INTO user_activity (user_id, event_type, count, last_seen)
VALUES (?, ?, 1, ?)
ON CONFLICT(user_id, event_type) DO UPDATE SET
count = count + 1,
last_seen = max(last_seen, ?)
""", (user_id, event_type, ts, ts))
conn.commit()
conn.close()
def add_daily_revenue(day: str, amount: float):
conn = sqlite3.connect(DB)
c = conn.cursor()
c.execute("""
INSERT INTO daily_revenue (day, revenue)
VALUES (?, ?)
ON CONFLICT(day) DO UPDATE SET revenue = revenue + ?
""", (day, amount, amount))
conn.commit()
conn.close()
def process_event(e):
if e.event_type == "login":
upsert_user_activity(e.aggregate_id, "login", e.timestamp)
if e.event_type == "purchase":
day = datetime.fromtimestamp(e.timestamp/1000, tz=timezone.utc).strftime("%Y-%m-%d")
amount = float(e.data.get("amount", 0.0))
upsert_user_activity(e.aggregate_id, "purchase", e.timestamp)
add_daily_revenue(day, amount)
def replay_all(events: Iterable):
init_db()
for e in events:
process_event(e)
- Running a demo
from events_store import read_events
from datetime import datetime
init_db()
events = read_events()
replay_all(events)
### Show views
def show_views():
conn = sqlite3.connect(DB)
cur = conn.cursor()
print("User Activity:")
for row in cur.execute("SELECT user_id, event_type, count, last_seen FROM user_activity ORDER BY user_id, event_type"):
print(row)
print("Daily Revenue:")
for row in cur.execute("SELECT day, revenue FROM daily_revenue ORDER BY day"):
print(row)
conn.close()
show_views()
Notes on the approach
- This simple prototype demonstrates the core ideas: an immutable event log, and materialized read models updated from the log.
- In production, replace the Python in-memory or file-based approach with a real streaming system (Kafka or Kinesis) and a streaming SQL engine (ksqlDB or Flink SQL) to handle fault tolerance and horizontal scaling.
- Ensure idempotency by keeping an event_id and deduplicating if events are replayed.
Part 3: Replaying and rebuilding read models deterministically
- Because all state is derived from the event log, you can rebuild read models by replaying events from the start.
- Implement a replay_head pointer to remember how far you’ve materialized.
- When a failure occurs, you can reset the materialization to a known checkpoint and replay from there.
Code sketch for checkpointed replay
CHECKPOINT = "checkpoint.txt"
def get_checkpoint():
if not os.path.exists(CHECKPOINT):
return 0
with open(CHECKPOINT) as f:
return int(f.read())
def set_checkpoint(ts: int):
with open(CHECKPOINT, "w") as f:
f.write(str(ts))
def replay_from_checkpoint():
last_ts = get_checkpoint()
events = read_events(since_timestamp=last_ts)
for e in events:
process_event(e)
last_ts = max(last_ts, e.timestamp)
set_checkpoint(last_ts)
Part 4: Observability, testing, and reliability
- Observability
- Metrics to emit: event_ingest_rate, per_user_activity_counts, daily_revenue, read_model_latency
- Dashboards: a simple panel showing events per second, top users by activity, and revenue by day
- Testing strategies
- Unit tests for the event processing logic
- End-to-end tests that seed events, run the replay, and verify read models match expectations
- Replay tests: generate a known sequence, crash mid-replay, then resume and verify final state equals a ground-truth run
- Fault-tolerance basics
- Use an idempotent write path for read models (upsert semantics)
- Persist checkpoints after batches of events
- Partition the event stream so different aggregates can be processed independently
Part 5: Step-by-step guide to implement in your environment
1) Choose your event store
- For local experimentation, start with a file-based event store (as shown) or a lightweight in-memory mock with a generator.
- For production, use Kafka or a managed streaming platform.
2) Implement the event model and writer
- Create Event data class
- Implement append_event and read_events
- Build a small seed script to generate sample events
3) Build read models with a streaming processor
- Decide on either a streaming SQL engine or a small Python consumer
- Implement per-user activity and daily revenue materializations
- Persist state to a durable store (SQLite in demo, PostgreSQL in production)
4) Add replay capability
- Implement a checkpoint mechanism
- Enable replay from start to rebuild read models when schema changes or for disaster recovery
5) Instrument and test
- Add unit tests for Event processing and upserts
- Write an end-to-end test that seeds events, processes them, and asserts the resulting read models
6) Iterate and scale
- Introduce streaming operators (group by, windowing) in your chosen engine
- Move from file-based event store to a message bus
- Add schema evolution handling and versioned events
Illustration: mental model of the pipeline
- Think of the event store as a grown-up version of an immutable ledger: every action is recorded with a timestamp and a unique id.
- The processing layer subscribes to this ledger, applies transformations in order, and updates read models that are optimized for queries.
- If something breaks, you can reprocess the entire ledger from the beginning or from a checkpoint, and the derived state will end up exactly the same.
Would you like me to tailor this prototype to a specific tech stack you’re using (e.g., Kafka + ksqlDB, or Flink SQL with a PostgreSQL sink), and provide a ready-to-run sample project structure with scripts and tests? If yes, tell me your preferred language, data volume scale, and whether you prefer a local Docker-based setup or a cloud-ready deployment.
-
Rizwan Saleem | https://rizwansaleem.co
Top comments (0)