Last year I was on a team that pushed 40 million events per day through Kafka. We had consumer lag alerts, rebalancing incidents, and a whole runbook for when the broker got behind. I understood how to operate Kafka. But I did not understand how Kafka works.
So I built a tiny one. No dependencies. No Zookeeper. No JVM. Just Python and the core ideas.
Here is what I learned.
The Three Things Kafka Actually Does
People say "Kafka is a message queue." That is not quite right.
Kafka is a distributed commit log. It has three jobs:
- Accept writes from producers and append them to a log
- Let consumers read from any offset in that log
- Remember where each consumer group is up to
That third one is the thing that makes Kafka different from a traditional queue. A queue forgets a message once it is consumed. Kafka remembers. You can replay. You can have 10 different consumer groups reading the same topic at different speeds.
The code to implement this is smaller than you think.
brokelite: A Message Broker in 120 Lines
import threading
import time
from collections import defaultdict
from typing import Dict, List, Tuple
class Partition:
"""Append-only log for one partition of a topic."""
def __init__(self):
self._log: List[Tuple[int, bytes]] = [] # (offset, message)
self._lock = threading.Lock()
self._next_offset = 0
def append(self, message: bytes) -> int:
with self._lock:
offset = self._next_offset
self._log.append((offset, message))
self._next_offset += 1
return offset
def read_from(self, offset: int, max_count: int = 100) -> List[Tuple[int, bytes]]:
with self._lock:
return [
(off, msg)
for off, msg in self._log
if off >= offset
][:max_count]
def __len__(self):
return self._next_offset
class Topic:
"""A topic is just N partitions."""
def __init__(self, name: str, num_partitions: int = 3):
self.name = name
self.partitions = [Partition() for _ in range(num_partitions)]
def route(self, key: bytes | None) -> int:
"""Route to a partition by key hash, or round-robin if no key."""
if key is None:
return int(time.time() * 1000) % len(self.partitions)
return hash(key) % len(self.partitions)
def produce(self, message: bytes, key: bytes | None = None) -> Tuple[int, int]:
partition_id = self.route(key)
offset = self.partitions[partition_id].append(message)
return partition_id, offset
def consume(self, partition_id: int, offset: int, max_count: int = 100):
return self.partitions[partition_id].read_from(offset, max_count)
class ConsumerGroupRegistry:
"""Tracks committed offsets per group, topic, partition."""
def __init__(self):
# {group: {topic: {partition: offset}}}
self._offsets: Dict[str, Dict[str, Dict[int, int]]] = defaultdict(
lambda: defaultdict(lambda: defaultdict(int))
)
self._lock = threading.Lock()
def commit(self, group: str, topic: str, partition: int, offset: int):
with self._lock:
self._offsets[group][topic][partition] = offset
def get_offset(self, group: str, topic: str, partition: int) -> int:
with self._lock:
return self._offsets[group][topic][partition]
class Broker:
"""The broker: accepts produces, serves consumes, tracks consumer groups."""
def __init__(self):
self._topics: Dict[str, Topic] = {}
self._registry = ConsumerGroupRegistry()
self._lock = threading.Lock()
def create_topic(self, name: str, num_partitions: int = 3):
with self._lock:
if name not in self._topics:
self._topics[name] = Topic(name, num_partitions)
def produce(self, topic: str, message: bytes, key: bytes | None = None):
if topic not in self._topics:
self.create_topic(topic)
partition_id, offset = self._topics[topic].produce(message, key)
return {"topic": topic, "partition": partition_id, "offset": offset}
def consume(
self, topic: str, group: str, partition: int, max_count: int = 100
) -> List[Tuple[int, bytes]]:
if topic not in self._topics:
return []
offset = self._registry.get_offset(group, topic, partition)
records = self._topics[topic].consume(partition, offset, max_count)
return records
def commit(self, topic: str, group: str, partition: int, offset: int):
self._registry.commit(group, topic, partition, offset)
That is the whole broker. Let me walk through the key ideas.
The Partition is an Append-Only Log
def append(self, message: bytes) -> int:
with self._lock:
offset = self._next_offset
self._log.append((offset, message))
self._next_offset += 1
return offset
Every message gets a monotonically increasing offset. That offset never changes. That is the core promise. A Kafka offset is like a line number in a file. Once written, line 42 is always line 42.
This is why Kafka is fast. Appends are O(1). You never update old records. There is no random I/O.
Real Kafka writes these to segment files on disk in sequential order. Sequential writes on spinning disk are nearly as fast as sequential writes on SSD. That is why Kafka can hit hundreds of MB/s throughput on commodity hardware.
Key-Based Routing Gives You Ordering Guarantees
def route(self, key: bytes | None) -> int:
if key is None:
return int(time.time() * 1000) % len(self.partitions)
return hash(key) % len(self.partitions)
Here is a thing that trips up new Kafka users: Kafka only guarantees ordering within a partition. Not across partitions.
If you produce events for user_id=123 and they land on partition 0, 2, and 1 in that order, your consumer sees them out of sequence.
The fix is simple: use a consistent key. hash(user_id) % num_partitions means all events for user 123 always land on the same partition. Ordering preserved.
When you do not provide a key, the broker round-robins. Good for throughput. Bad for ordering. Know which one you need.
Consumer Groups Enable Independent Progress
def get_offset(self, group: str, topic: str, partition: int) -> int:
with self._lock:
return self._offsets[group][topic][partition]
This is the idea that changed how I think about event-driven systems.
In a traditional queue, there is one cursor. One consumer group owns it. If you want a second application to process the same events, you have to publish to two separate queues. Or copy the data.
Kafka flips this. The offset belongs to the consumer group, not the broker. Every group maintains its own pointer. Group A can be at offset 1000. Group B can be at offset 500. The broker does not care. It keeps the log until retention expires.
This is why Kafka works so well for fan-out. Your analytics team can replay from offset 0 without blocking your real-time alerting pipeline.
Using brokelite
broker = Broker()
# Producer side
broker.produce("orders", b'{"order_id": 1, "amount": 99.99}', key=b"user_123")
broker.produce("orders", b'{"order_id": 2, "amount": 14.50}', key=b"user_456")
broker.produce("orders", b'{"order_id": 3, "amount": 7.25}', key=b"user_123")
# Consumer side: group A reads partition 0
records = broker.consume("orders", group="analytics", partition=0, max_count=10)
for offset, msg in records:
print(f"offset={offset}: {msg.decode()}")
broker.commit("orders", "analytics", partition=0, offset=offset + 1)
# Consumer side: group B reads the same partition independently
records = broker.consume("orders", group="alerting", partition=0, max_count=10)
for offset, msg in records:
print(f"alerting sees offset={offset}: {msg.decode()}")
Two consumer groups, same data, completely independent offsets. Zero copying. This is the elegance of the commit log model.
What brokelite Leaves Out
This is a toy. Real Kafka has things this does not.
Replication. Each partition in real Kafka has a leader and N replicas. Writes go to the leader and replicate before being acknowledged (depending on acks setting). brokelite has no replication and no durability beyond memory.
Consumer group rebalancing. In real Kafka, when a consumer in a group dies, the partitions it owned get reassigned to surviving consumers. That is the source of most Kafka operational pain. brokelite has no concept of group membership.
Log compaction and retention. Real Kafka can either delete old segments after N days or compact them by key. This keeps disk usage bounded. brokelite grows forever.
Network protocol. Real Kafka has a binary TCP protocol. Producers and consumers connect over the network. brokelite is in-process only.
Those are real engineering problems. But once you understand the core, those features are additive. They do not change the fundamental model: append-only log, per-group offsets, key-based partitioning.
The Lesson I Keep Coming Back To
Every time an alert fires for consumer lag, I used to think "the broker is behind." That is wrong.
Consumer lag is log_end_offset - committed_offset for a consumer group. The broker is not behind. The consumer is behind. The broker is fine, it is just sitting there with messages no one has read yet.
This distinction matters when you are debugging. Slow consumer group? Check your consumer throughput, not your broker. Skewed lag across partitions? Check your key distribution.
Build the toy version. Read the log. The model becomes obvious.
What to Build Next
If you want to go further with brokelite:
- Add a
seek_to_beginningmethod so a group can replay from offset 0 - Add a
seek_to_endmethod so a new group skips history and reads only new messages - Add a simple retention policy that purges segments older than N seconds
- Simulate a consumer dying mid-batch and see what happens to uncommitted offsets
The more you push on the toy, the better your intuition gets for the real system.
Follow along for more deep dives into systems that data engineers use every day but rarely look inside.
What pattern do you rely on most in your streaming pipelines: key-based ordering or maximum throughput with no keys?
Top comments (0)