When you use Kafka or RabbitMQ, dozens of concepts fly past — partitions, consumer groups, acknowledgments, dead letter queues, backpressure. I wanted to understand each one deeply, so I built BrokerLite: a complete in-memory message broker in Python, using nothing but the standard library.
22 modules. 315 tests. Zero pip install.
What It Actually Does
BrokerLite implements both messaging patterns:
Pub/Sub — Topics split into partitions. Producers write messages; consumer groups read them in parallel. Partitions are assigned via range or round-robin strategies, and rebalance automatically when consumers join or leave.
Point-to-Point — FIFO and priority queues where each message is delivered to exactly one consumer. Visibility timeouts hide messages while they're being processed.
from brokerlite.broker import Broker
from brokerlite.message import Message
broker = Broker()
broker.start()
broker.create_topic("orders")
# Publish
metadata = broker.publish(
Message(topic="orders", value=b'{"item": "widget"}', key="user-42")
)
print(f"Wrote to partition {metadata.partition} at offset {metadata.offset}")
The Hard Parts
Partition Assignment
When three consumers subscribe to a topic with six partitions, who gets what? With range assignment, consumer 0 gets partitions [0,1], consumer 1 gets [2,3], consumer 2 gets [4,5]. When consumer 2 dies, the group rebalances and redistributes its partitions.
The tricky part: rebalancing must revoke partitions from existing consumers before reassigning. If you skip revocation, two consumers can read the same partition simultaneously — duplicate processing.
def rebalance(self):
# Revoke all current assignments
for consumer in self._members.values():
consumer.revoke_partitions()
# Collect all partitions across subscribed topics
all_partitions = []
for topic, partitions in self._subscribed_topics.items():
all_partitions.extend(partitions)
# Assign based on strategy
members = list(self._members.values())
if self.strategy == AssignmentStrategy.RANGE:
self._assign_range(members, all_partitions)
else:
self._assign_round_robin(members, all_partitions)
The Write-Ahead Log
Messages need durability. BrokerLite's WAL stores entries in fixed-size segments. When a segment fills up, a new one starts. Old segments can be deleted (retention) or compacted (keep only the latest value per key).
The WAL also persists consumer offsets in SQLite. When a consumer restarts, it resumes from its last committed position rather than re-reading everything.
Acknowledgments and Dead Letters
Every dequeued message enters an "in-flight" state. The consumer must acknowledge it within a timeout, or it goes back to the queue for redelivery. After N failed attempts, the message routes to a dead letter queue:
ack_mgr = AckManager(ack_timeout=5.0, max_attempts=3)
ack_mgr.track(msg, consumer_id="worker-1")
# If processing fails:
result = ack_mgr.negative_acknowledge(msg.id)
if result is None: # exceeded max attempts
dlq.add(msg, reason="Processing timeout")
The DLQ stores the original message, the failure reason, and attempt count. You can replay messages back to the source queue after fixing the underlying issue.
Binary Wire Protocol
BrokerLite includes a TCP server and client using a custom binary protocol. Each frame is length-prefixed (4 bytes big-endian), with a correlation ID for matching requests to responses:
[4 bytes: length][payload]
payload = {
"api_key": 1, // PRODUCE, FETCH, CREATE_TOPIC, etc.
"correlation_id": "...",
"data": {...}
}
The server runs one thread per connection. Not production-grade, but enough to demonstrate the protocol design that real brokers use.
Backpressure
Without flow control, a fast producer overwhelms a slow consumer. BrokerLite's backpressure manager combines two mechanisms:
- Token-bucket rate limiting — producers acquire tokens before publishing. The bucket refills at a configured rate.
- Queue-depth monitoring — when a topic's total message count exceeds a threshold, new publishes are rejected.
What I Learned
The or operator and __len__: Python's or calls bool() on the left operand. If your class defines __len__, an empty container is falsy. I had q = self._queues.get(name) or self._priority_queues.get(name) — it silently skipped empty queues. Fix: use if x is not None explicitly.
Consumer groups are coordination problems: The assignment algorithm is simple. The hard part is detecting dead consumers, triggering rebalances at the right time, and ensuring no partition goes unassigned during transitions.
Schema evolution matters: BrokerLite includes a schema registry that checks backward compatibility — new schema versions must still accept data written with the old schema. Without this, producer upgrades break consumers.
Numbers
- 22 source modules under
brokerlite/ - 315 tests across 16 test files
- 5 runnable examples
- ~5,000 lines of Python
- Zero external dependencies
The entire project runs on Python 3.11+ with nothing but the standard library. No asyncio, no third-party packages.
Try It
git clone https://github.com/hajirufai/brokerlite.git
cd brokerlite
python -m pytest tests/ -v # 315 tests
PYTHONPATH=. python examples/basic_pubsub.py
GitHub: hajirufai/brokerlite
Live docs: hajirufai.github.io/brokerlite
This is project #15 in my "Building From Scratch" series where I rebuild infrastructure tools using only Python's standard library. Previous entries include a distributed tracing system, a search engine, a compiler, and a key-value store.
Top comments (0)