Most stream processing tutorials teach you how to connect Kafka to Postgres. They don't explain what happens inside the engine — how records get assigned to windows, how watermarks handle late data, or what a checkpoint actually serializes.
I built StreamLite to answer those questions. It's a complete stream processing engine in Python — 24 modules, 281 tests, zero external dependencies. Everything from tumbling windows to keyed state to checkpoint coordination, implemented from scratch.
The API
Three lines for a word count:
from streamlite import StreamLite
result = (
StreamLite.from_collection(["the quick brown fox", "the lazy dog"])
.flat_map(str.split)
.key_by(lambda w: w)
.count()
)
# [("the", 2), ("quick", 1), ("brown", 1), ...]
But the interesting part isn't the API. It's what happens underneath.
How Windowing Actually Works
When people say "tumbling window" they usually mean "group by minute." But the actual implementation has subtle decisions that change behavior.
Here's the core of TumblingWindow.assign_windows():
class TumblingWindow(WindowAssigner):
def __init__(self, size_ms, offset_ms=0):
self.size_ms = size_ms
self.offset_ms = offset_ms
def assign_windows(self, timestamp):
start = timestamp - (timestamp - self.offset_ms) % self.size_ms
return [TimeWindow(start=start, end=start + self.size_ms)]
One line of math: timestamp - (timestamp - offset) % size. That's it. But this formula handles:
- Alignment: all windows snap to the same grid regardless of when the first record arrives
-
Offset: shift windows by
offset_msto align with business hours (e.g., daily windows starting at 6 AM, not midnight) - Determinism: the same timestamp always maps to the same window, even across restarts
Sliding windows are trickier
A sliding window with size=10s and slide=5s means each record belongs to two windows. A record at timestamp 7000 belongs to [0, 10000) and [5000, 15000):
class SlidingWindow(WindowAssigner):
def assign_windows(self, timestamp):
windows = []
last_start = timestamp - (timestamp - self.offset_ms) % self.slide_ms
for start in range(last_start, last_start - self.size_ms, -self.slide_ms):
if start + self.size_ms > timestamp >= start:
windows.append(TimeWindow(start=start, end=start + self.size_ms))
return windows
The key insight: you walk backward from the latest window start, adding windows until the record no longer falls within the window bounds.
Session windows merge
Session windows don't have a fixed size. They group records by activity gaps:
class SessionWindow(WindowAssigner):
def merge_windows(self, windows):
sorted_windows = sorted(windows, key=lambda w: w.start)
merged = [sorted_windows[0]]
for window in sorted_windows[1:]:
if window.start <= merged[-1].end + self.gap_ms:
merged[-1] = TimeWindow(merged[-1].start, max(merged[-1].end, window.end))
else:
merged.append(window)
return merged
Each new record creates a point window. Then merge_windows combines overlapping windows (where the gap between them is less than gap_ms). This merge step is what makes session windows expensive — it's O(n log n) per merge pass.
The Watermark Problem
Out-of-order data is the fundamental challenge in stream processing. A record with timestamp t=5000 might arrive after t=8000. When do you close the [0, 10000) window?
Watermarks are the answer. A watermark at time W means: "I believe all records with timestamp ≤ W have arrived."
class BoundedOutOfOrderness(WatermarkStrategy):
def __init__(self, max_delay_ms):
self._max_delay = max_delay_ms
self._max_timestamp = Watermark.MIN_WATERMARK
def on_event(self, timestamp):
if timestamp > self._max_timestamp:
self._max_timestamp = timestamp
def current_watermark(self):
if self._max_timestamp == Watermark.MIN_WATERMARK:
return Watermark.MIN_WATERMARK
return self._max_timestamp - self._max_delay
With max_delay_ms=5000, after seeing timestamp 10000 the watermark advances to 5000. Any record with timestamp ≤ 5000 arriving now is considered "on time." This tolerance is what makes bounded out-of-orderness practical — you trade latency for completeness.
Keyed State: The Secret Sauce
Stateful processing is what separates stream engines from batch map-reduce. StreamLite has four state primitives:
class ValueState:
def get(self): ... # returns current value or None
def update(self, v): ... # set new value
def clear(self): ... # reset
class ListState:
def add(self, v): ... # append
def get(self): ... # returns list
class MapState:
def get(self, key): ...
def put(self, key, value): ...
class ReducingState:
def add(self, v): ... # accumulate with reduce function
These are scoped by namespace + key, managed by a StateBackend that handles snapshot/restore for checkpointing:
def running_average(value, ctx):
total = ctx.get_value_state("total", default=0)
count = ctx.get_value_state("count", default=0)
t = (total.get() or 0) + value["temp"]
c = (count.get() or 0) + 1
total.update(t)
count.update(c)
return [{"sensor": value["sensor"], "avg": t / c}]
stream.key_by(lambda r: r["sensor"]).process(running_average)
Each key gets its own isolated state. That isolation is what enables parallel processing in real engines — different keys can be processed on different threads without coordination.
Checkpointing: Crash Recovery
StreamLite checkpoints by serializing the entire StateBackend to disk:
class CheckpointCoordinator:
def trigger_checkpoint(self, watermarks=None):
self._checkpoint_count += 1
checkpoint_id = f"chk-{self._checkpoint_count:06d}"
snapshot = self._state_backend.snapshot()
self._storage.save(checkpoint_id, snapshot, metadata)
return metadata
The snapshot() method walks every namespace → key → state object and serializes to nested dicts. On restore, it reconstructs the state objects from those dicts.
Real engines like Flink use aligned checkpoint barriers across parallel operators. StreamLite keeps it simple — single-threaded snapshot — but the concept is the same: freeze state, write to durable storage, resume.
Stream Joins
StreamLite supports three join patterns:
Hash join — group both sides by key, match:
join = StreamJoin(left, right, join_fn=lambda l, r: {**l, **r}, join_type=JoinType.INNER)
Interval join — match records within a time window:
join = IntervalJoin(clicks, purchases, lower_bound_ms=-1000, upper_bound_ms=5000, join_fn=correlate)
Window join — join within aligned time windows:
join = WindowJoin(left, right, window_assigner=TumblingWindow(size_ms=60000), join_fn=merge)
The Full Architecture
Sources → Operators → Windowing → State → Sinks
↕ ↕ ↕
Watermarks Triggers Checkpoints
↕
Metrics
24 modules in total: types, errors, source, sink, operators, stream, keyed, window, trigger, watermark, state, context, join, split, merge, serializer, partition, checkpoint, metrics, pipeline, executor, time_utils, utils, and the main __init__.
Numbers
- 24 modules, each focused on one concept
- 281 tests across 20 test files
- ~8,000 lines of Python
- Zero dependencies — stdlib only
- 5 examples: word count, clickstream, sensors, ETL, split-rejoin
Try it
git clone https://github.com/hajirufai/streamlite.git
cd streamlite
python -m pytest tests/ -v
python examples/word_count.py
The entire codebase is designed to be read. If you've ever wondered how Flink or Kafka Streams work under the hood, start with window.py and watermark.py. Those two files contain the core ideas that make stream processing work.
Top comments (0)