DEV Community

Haji Rufai
Haji Rufai

Posted on • Originally published at hajirufai.github.io

I Built a Stream Processing Engine from Scratch in Python — Here's How Windowing Actually Works

Most stream processing tutorials teach you how to connect Kafka to Postgres. They don't explain what happens inside the engine — how records get assigned to windows, how watermarks handle late data, or what a checkpoint actually serializes.

I built StreamLite to answer those questions. It's a complete stream processing engine in Python — 24 modules, 281 tests, zero external dependencies. Everything from tumbling windows to keyed state to checkpoint coordination, implemented from scratch.

The API

Three lines for a word count:

from streamlite import StreamLite

result = (
    StreamLite.from_collection(["the quick brown fox", "the lazy dog"])
    .flat_map(str.split)
    .key_by(lambda w: w)
    .count()
)
# [("the", 2), ("quick", 1), ("brown", 1), ...]
Enter fullscreen mode Exit fullscreen mode

But the interesting part isn't the API. It's what happens underneath.

How Windowing Actually Works

When people say "tumbling window" they usually mean "group by minute." But the actual implementation has subtle decisions that change behavior.

Here's the core of TumblingWindow.assign_windows():

class TumblingWindow(WindowAssigner):
    def __init__(self, size_ms, offset_ms=0):
        self.size_ms = size_ms
        self.offset_ms = offset_ms

    def assign_windows(self, timestamp):
        start = timestamp - (timestamp - self.offset_ms) % self.size_ms
        return [TimeWindow(start=start, end=start + self.size_ms)]
Enter fullscreen mode Exit fullscreen mode

One line of math: timestamp - (timestamp - offset) % size. That's it. But this formula handles:

  • Alignment: all windows snap to the same grid regardless of when the first record arrives
  • Offset: shift windows by offset_ms to align with business hours (e.g., daily windows starting at 6 AM, not midnight)
  • Determinism: the same timestamp always maps to the same window, even across restarts

Sliding windows are trickier

A sliding window with size=10s and slide=5s means each record belongs to two windows. A record at timestamp 7000 belongs to [0, 10000) and [5000, 15000):

class SlidingWindow(WindowAssigner):
    def assign_windows(self, timestamp):
        windows = []
        last_start = timestamp - (timestamp - self.offset_ms) % self.slide_ms
        for start in range(last_start, last_start - self.size_ms, -self.slide_ms):
            if start + self.size_ms > timestamp >= start:
                windows.append(TimeWindow(start=start, end=start + self.size_ms))
        return windows
Enter fullscreen mode Exit fullscreen mode

The key insight: you walk backward from the latest window start, adding windows until the record no longer falls within the window bounds.

Session windows merge

Session windows don't have a fixed size. They group records by activity gaps:

class SessionWindow(WindowAssigner):
    def merge_windows(self, windows):
        sorted_windows = sorted(windows, key=lambda w: w.start)
        merged = [sorted_windows[0]]
        for window in sorted_windows[1:]:
            if window.start <= merged[-1].end + self.gap_ms:
                merged[-1] = TimeWindow(merged[-1].start, max(merged[-1].end, window.end))
            else:
                merged.append(window)
        return merged
Enter fullscreen mode Exit fullscreen mode

Each new record creates a point window. Then merge_windows combines overlapping windows (where the gap between them is less than gap_ms). This merge step is what makes session windows expensive — it's O(n log n) per merge pass.

The Watermark Problem

Out-of-order data is the fundamental challenge in stream processing. A record with timestamp t=5000 might arrive after t=8000. When do you close the [0, 10000) window?

Watermarks are the answer. A watermark at time W means: "I believe all records with timestamp ≤ W have arrived."

class BoundedOutOfOrderness(WatermarkStrategy):
    def __init__(self, max_delay_ms):
        self._max_delay = max_delay_ms
        self._max_timestamp = Watermark.MIN_WATERMARK

    def on_event(self, timestamp):
        if timestamp > self._max_timestamp:
            self._max_timestamp = timestamp

    def current_watermark(self):
        if self._max_timestamp == Watermark.MIN_WATERMARK:
            return Watermark.MIN_WATERMARK
        return self._max_timestamp - self._max_delay
Enter fullscreen mode Exit fullscreen mode

With max_delay_ms=5000, after seeing timestamp 10000 the watermark advances to 5000. Any record with timestamp ≤ 5000 arriving now is considered "on time." This tolerance is what makes bounded out-of-orderness practical — you trade latency for completeness.

Keyed State: The Secret Sauce

Stateful processing is what separates stream engines from batch map-reduce. StreamLite has four state primitives:

class ValueState:
    def get(self): ...      # returns current value or None
    def update(self, v): ...  # set new value
    def clear(self): ...     # reset

class ListState:
    def add(self, v): ...   # append
    def get(self): ...      # returns list

class MapState:
    def get(self, key): ...
    def put(self, key, value): ...

class ReducingState:
    def add(self, v): ...   # accumulate with reduce function
Enter fullscreen mode Exit fullscreen mode

These are scoped by namespace + key, managed by a StateBackend that handles snapshot/restore for checkpointing:

def running_average(value, ctx):
    total = ctx.get_value_state("total", default=0)
    count = ctx.get_value_state("count", default=0)
    t = (total.get() or 0) + value["temp"]
    c = (count.get() or 0) + 1
    total.update(t)
    count.update(c)
    return [{"sensor": value["sensor"], "avg": t / c}]

stream.key_by(lambda r: r["sensor"]).process(running_average)
Enter fullscreen mode Exit fullscreen mode

Each key gets its own isolated state. That isolation is what enables parallel processing in real engines — different keys can be processed on different threads without coordination.

Checkpointing: Crash Recovery

StreamLite checkpoints by serializing the entire StateBackend to disk:

class CheckpointCoordinator:
    def trigger_checkpoint(self, watermarks=None):
        self._checkpoint_count += 1
        checkpoint_id = f"chk-{self._checkpoint_count:06d}"
        snapshot = self._state_backend.snapshot()
        self._storage.save(checkpoint_id, snapshot, metadata)
        return metadata
Enter fullscreen mode Exit fullscreen mode

The snapshot() method walks every namespace → key → state object and serializes to nested dicts. On restore, it reconstructs the state objects from those dicts.

Real engines like Flink use aligned checkpoint barriers across parallel operators. StreamLite keeps it simple — single-threaded snapshot — but the concept is the same: freeze state, write to durable storage, resume.

Stream Joins

StreamLite supports three join patterns:

Hash join — group both sides by key, match:

join = StreamJoin(left, right, join_fn=lambda l, r: {**l, **r}, join_type=JoinType.INNER)
Enter fullscreen mode Exit fullscreen mode

Interval join — match records within a time window:

join = IntervalJoin(clicks, purchases, lower_bound_ms=-1000, upper_bound_ms=5000, join_fn=correlate)
Enter fullscreen mode Exit fullscreen mode

Window join — join within aligned time windows:

join = WindowJoin(left, right, window_assigner=TumblingWindow(size_ms=60000), join_fn=merge)
Enter fullscreen mode Exit fullscreen mode

The Full Architecture

Sources → Operators → Windowing → State → Sinks
              ↕            ↕         ↕
         Watermarks    Triggers  Checkpoints
              ↕
          Metrics
Enter fullscreen mode Exit fullscreen mode

24 modules in total: types, errors, source, sink, operators, stream, keyed, window, trigger, watermark, state, context, join, split, merge, serializer, partition, checkpoint, metrics, pipeline, executor, time_utils, utils, and the main __init__.

Numbers

  • 24 modules, each focused on one concept
  • 281 tests across 20 test files
  • ~8,000 lines of Python
  • Zero dependencies — stdlib only
  • 5 examples: word count, clickstream, sensors, ETL, split-rejoin

Try it

git clone https://github.com/hajirufai/streamlite.git
cd streamlite
python -m pytest tests/ -v
python examples/word_count.py
Enter fullscreen mode Exit fullscreen mode

The entire codebase is designed to be read. If you've ever wondered how Flink or Kafka Streams work under the hood, start with window.py and watermark.py. Those two files contain the core ideas that make stream processing work.

GitHub · Live docs

Top comments (0)