If you've ever had to pick a message queue for a production system, you know the pain. Every option forces a trade-off:
- Kafka gives you incredible throughput and durable log-based storage, but its routing model is primitive (topic-partition only), operational complexity is high, and tail latency can spike under load with no built-in back-pressure.
- RabbitMQ offers flexible routing through its exchange model (direct, topic, fanout, headers), but it struggles with throughput at scale, lacks a true log-based storage model, and horizontal scaling is painful.
- ZeroMQ delivers the lowest latency of any messaging library with zero-copy transfers and brokerless operation, but it has no persistence, no routing, and no consumer group coordination.
Every team ends up compromising. Some run Kafka for event streaming and RabbitMQ for task queues side by side. Others bolt on custom routing layers on top of Kafka. None of these are satisfying.
TitanMQ is an attempt to stop compromising. It's a from-scratch message queue written in Java 21 that takes the best architectural ideas from all three systems and unifies them into a single, coherent design.
This article walks through the key design decisions, the actual implementation, and the trade-offs we made along the way.
Architecture at a Glance
┌─────────────────────────────────────────────────────┐
│ Client SDK Layer │
│ (Producer / Consumer / Admin APIs) │
├─────────────────────────────────────────────────────┤
│ Protocol Layer │
│ (Custom binary protocol over TCP/Netty) │
├─────────────────────────────────────────────────────┤
│ Routing Engine │
│ (Direct / Topic / Fanout / Content-Based) │
├─────────────────────────────────────────────────────┤
│ Core Broker Engine │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Topic Mgr │ │ Consumer │ │ Back- │ │
│ │ (Partition │ │ Group Mgr │ │ Pressure │ │
│ │ routing) │ │ (Rebalance) │ │ Controller │ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
├─────────────────────────────────────────────────────┤
│ Storage Layer │
│ (Append-only commit log, memory-mapped I/O) │
├─────────────────────────────────────────────────────┤
│ Cluster Coordination │
│ (Embedded Raft — no ZooKeeper needed) │
└─────────────────────────────────────────────────────┘
The system is organized into 9 Maven modules, each with a clear responsibility boundary. Let's dig into the interesting parts.
1. The Storage Engine: Append-Only Commit Log
The foundation of TitanMQ's performance is its storage layer, directly inspired by Kafka's log-structured design.
Why Append-Only?
Sequential disk writes are dramatically faster than random writes. On modern SSDs, sequential write throughput can exceed 3 GB/s, while random writes might only achieve 50-100 MB/s. An append-only log exploits this by never modifying data in place — every message is written to the end of the current file.
Segment-Based Architecture
Each topic-partition gets its own CommitLog, which is composed of multiple LogSegment files:
topic-orders/partition-0/
├── 00000000000000000000.log (offsets 0–999)
├── 00000000000000001000.log (offsets 1000–1999)
└── 00000000000000002000.log (active segment)
When the active segment exceeds a configurable size threshold (default: 1 GB), a new segment is rolled:
public synchronized long append(TitanMessage message) throws IOException {
ByteBuffer data = message.serialize();
if (activeSegment.size() + data.remaining() > maxSegmentSize) {
rollNewSegment();
}
long offset = nextOffset.getAndIncrement();
activeSegment.append(offset, data);
return offset;
}
Each entry in a segment has a simple binary format:
[8 bytes: offset] [4 bytes: message size] [N bytes: serialized message]
The 12-byte header per entry is a deliberate trade-off: it costs a small amount of space but enables direct offset-based seeking without a separate index file for small segments.
Sparse In-Memory Index
For fast reads, each LogSegment maintains a sparse in-memory index mapping offsets to file positions. Lookups use binary search for O(log n) performance:
private long findPosition(long targetOffset) {
long pos = 0;
int lo = 0, hi = index.size() - 1;
while (lo <= hi) {
int mid = (lo + hi) >>> 1;
if (index.get(mid).offset() <= targetOffset) {
pos = index.get(mid).position();
lo = mid + 1;
} else {
hi = mid - 1;
}
}
return pos;
}
This is similar to Kafka's .index files, but kept in-memory for lower read latency. The trade-off is higher memory usage per partition, which we accept because modern servers have plenty of RAM and the index entries are tiny (16 bytes each: 8 for offset + 8 for position).
2. The Routing Engine: RabbitMQ's Flexibility on a Kafka Foundation
This is where TitanMQ diverges most significantly from Kafka. Kafka's routing model is essentially "publish to a topic, consume from a topic." If you need to route a message to different consumers based on its content, you either create separate topics or build application-level routing.
RabbitMQ solved this elegantly with its exchange model. TitanMQ adopts and extends it.
Four Exchange Types
Direct Exchange — exact routing key match:
DirectExchange exchange = new DirectExchange("orders");
exchange.bind("fulfillment-queue", "order.created");
exchange.bind("analytics-queue", "order.shipped");
// Routes to "fulfillment-queue" only
exchange.route(message, "order.created");
Topic Exchange — wildcard pattern matching with * (one word) and # (zero or more words):
TopicExchange exchange = new TopicExchange("events");
exchange.bind("us-handler", "order.us.*");
exchange.bind("all-errors", "#.error");
// Matches "us-handler"
exchange.route(message, "order.us.created");
// Matches "all-errors"
exchange.route(message, "payment.processing.error");
The pattern matching is implemented with a recursive algorithm that handles the # wildcard's zero-or-more semantics:
private static boolean matchParts(String[] routing, int ri, String[] pattern, int pi) {
if (ri == routing.length && pi == pattern.length) return true;
if (pi == pattern.length) return false;
if (pattern[pi].equals("#")) {
if (pi == pattern.length - 1) return true;
for (int i = ri; i <= routing.length; i++) {
if (matchParts(routing, i, pattern, pi + 1)) return true;
}
return false;
}
if (ri == routing.length) return false;
if (pattern[pi].equals("*") || pattern[pi].equals(routing[ri])) {
return matchParts(routing, ri + 1, pattern, pi + 1);
}
return false;
}
Fanout Exchange — broadcast to all bound destinations. Simple but essential for event-driven architectures where multiple services need to react to the same event.
Content-Based Exchange — this goes beyond RabbitMQ. Instead of just matching routing keys, it evaluates predicates against message headers:
ContentBasedExchange exchange = new ContentBasedExchange("smart-router");
// Route by arbitrary header predicates
exchange.bind("us-priority-queue", headers ->
"us".equals(headers.get("region")) &&
Integer.parseInt(headers.getOrDefault("priority", "0")) > 5
);
// Or use the simple key=value syntax
exchange.bind("eu-queue", "region=eu,type=order");
This is implemented using Java's Predicate<Map<String, String>> interface, which means routing rules can be as complex as needed without changing the exchange implementation:
public List<String> route(TitanMessage message, String routingKey) {
List<String> result = new ArrayList<>();
for (RoutingRule rule : rules) {
if (rule.matcher.test(message.headers())) {
result.add(rule.destination);
}
}
return result;
}
Why This Matters
In Kafka, if you want to route order events differently based on region, you'd typically create separate topics (orders-us, orders-eu) or have every consumer filter messages client-side. Both approaches have problems: topic proliferation makes operations harder, and client-side filtering wastes network bandwidth.
TitanMQ's routing engine sits between the producer and the commit log, so messages are routed at the broker level before being written. This means consumers only receive messages they care about, and you don't need hundreds of topics for what is logically one event stream.
3. Adaptive Back-Pressure: The Feature Nobody Has Right
Back-pressure is one of the most underappreciated aspects of message queue design.
- Kafka has essentially no built-in back-pressure. If consumers fall behind, the broker keeps accepting messages until disk fills up. Producers have no signal to slow down.
- RabbitMQ has basic flow control via TCP back-pressure and memory alarms, but it's coarse-grained — it either accepts messages or blocks entirely.
- ZeroMQ has the best approach with its high-water mark system, but it's a library, not a broker, so it only applies to in-process communication.
TitanMQ implements a dual-watermark system with gradual throttling:
public class BackPressureController {
private final int highWaterMark;
private final int lowWaterMark;
private final AtomicLong inFlightCount = new AtomicLong(0);
private volatile boolean throttled = false;
public boolean tryAcquire() {
long current = inFlightCount.incrementAndGet();
if (current > highWaterMark) {
if (!throttled) {
throttled = true;
}
inFlightCount.decrementAndGet();
return false; // Signal producer to back off
}
return true;
}
public void release() {
long current = inFlightCount.decrementAndGet();
if (throttled && current <= lowWaterMark) {
throttled = false; // Resume accepting
}
}
}
The key insight is the hysteresis gap between the high and low watermarks. Without it, the system would oscillate rapidly between throttled and unthrottled states when hovering near a single threshold. The gap creates a stable band:
- Above high watermark (default: 100,000): reject new messages
- Between watermarks: accept but signal producers to slow down
- Below low watermark (default: 50,000): full speed ahead
The throttleRatio() method provides a continuous signal between 0.0 and 1.0 that producer clients can use for gradual slowdown rather than binary on/off:
public double throttleRatio() {
long current = inFlightCount.get();
if (current <= lowWaterMark) return 0.0;
if (current >= highWaterMark) return 1.0;
return (double)(current - lowWaterMark) / (highWaterMark - lowWaterMark);
}
This is integrated directly into the broker's request handler. Every produce request must acquire a slot before writing:
private void handleProduce(ChannelHandlerContext ctx, Command cmd) throws Exception {
if (!backPressureController.tryAcquire()) {
sendError(ctx, cmd.correlationId(), "Back-pressure: broker is overloaded");
return;
}
try {
// ... write message to commit log
} finally {
backPressureController.release();
}
}
4. Wire Protocol: Custom Binary over Netty
We chose a custom binary protocol over alternatives like gRPC or HTTP/2 for one reason: control over every byte on the wire.
Frame Format
[4 bytes: frame length] [1 byte: command type] [4 bytes: correlation ID] [N bytes: payload]
That's a 9-byte overhead per frame. Compare this to HTTP/2's minimum frame overhead of 9 bytes (but with much more complex state management) or gRPC's additional protobuf encoding overhead.
The Command type is a simple record:
public record Command(CommandType type, int correlationId, byte[] payload) {
public ByteBuffer encode() {
int payloadLen = payload != null ? payload.length : 0;
int frameLen = 1 + 4 + payloadLen;
ByteBuffer buffer = ByteBuffer.allocate(4 + frameLen);
buffer.putInt(frameLen);
buffer.put(type.code());
buffer.putInt(correlationId);
if (payload != null) buffer.put(payload);
buffer.flip();
return buffer;
}
}
Why Netty?
Netty gives us:
- Non-blocking I/O with an event loop model (handles thousands of connections with few threads)
- Built-in
LengthFieldBasedFrameDecoderfor handling TCP fragmentation - Zero-copy
ByteBuffor efficient memory management - Boss/worker thread separation (1 thread accepts connections, N threads handle I/O)
The server pipeline is straightforward:
ch.pipeline()
.addLast("decoder", new TitanMessageDecoder())
.addLast("encoder", new TitanMessageEncoder())
.addLast("handler", new BrokerRequestHandler(...));
5. Message Serialization: Designed for Zero-Copy
The TitanMessage class is designed from the ground up for efficient serialization. Instead of using JSON, Protobuf, or Avro, messages are serialized directly to ByteBuffer:
public ByteBuffer serialize() {
// Calculate exact size upfront — no resizing, no copying
int totalSize = 4 + idBytes.length
+ 4 + topicBytes.length
+ 4 + 8 + 8 // partition + offset + timestamp
+ 4 + (key != null ? key.length : 0)
+ headersSize
+ 4 + payload.length;
ByteBuffer buffer = ByteBuffer.allocate(4 + totalSize);
// ... write fields sequentially
buffer.flip();
return buffer;
}
The design choices here:
- Length-prefixed strings instead of null-terminated: allows pre-calculating total buffer size in one pass
- Fixed-size numeric fields: no varint encoding (simpler, faster to parse, predictable size)
-
Single allocation: the entire message is serialized into one contiguous buffer, which can be sent over the network with a single
write()call
The trade-off is slightly larger wire size compared to varint-encoded formats like Protobuf. For a typical 1 KB message, the overhead is about 50-80 bytes (5-8%). We accept this for the simplicity and speed of encoding/decoding.
6. Cluster Coordination: Embedded Raft
One of Kafka's biggest operational pain points has historically been its dependency on ZooKeeper (now being replaced by KRaft). Running a separate distributed coordination service adds complexity, failure modes, and operational burden.
TitanMQ embeds Raft consensus directly into the broker:
public class RaftNode {
private volatile RaftState state = RaftState.FOLLOWER;
private final AtomicLong currentTerm = new AtomicLong(0);
private void startElection() {
state = RaftState.CANDIDATE;
long term = currentTerm.incrementAndGet();
votedFor = nodeId;
// Request votes from peers...
}
private void becomeLeader() {
state = RaftState.LEADER;
leaderId = nodeId;
// Start sending heartbeats at 150ms intervals
heartbeatTimer = scheduler.scheduleAtFixedRate(
this::sendHeartbeats, 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
}
}
Key design decisions:
- Randomized election timeout (300-500ms): prevents split-brain scenarios where multiple nodes start elections simultaneously
- 150ms heartbeat interval: fast enough to detect failures quickly, infrequent enough to not waste bandwidth
- Leader change listeners: other components (like partition assignment) can react to leadership changes without polling
7. Consumer Groups: Coordinated Consumption
Like Kafka, TitanMQ supports consumer groups where partitions are distributed across group members. The rebalancing algorithm uses round-robin assignment:
Map<String, List<TopicPartition>> rebalance(Map<String, Integer> topicPartitionCounts) {
List<TopicPartition> allPartitions = new ArrayList<>();
// Collect all partitions for subscribed topics
for (String topic : allTopics) {
int count = topicPartitionCounts.getOrDefault(topic, 0);
for (int p = 0; p < count; p++) {
allPartitions.add(new TopicPartition(topic, p));
}
}
// Round-robin assignment
for (int i = 0; i < allPartitions.size(); i++) {
String memberId = memberIds.get(i % memberIds.size());
assignment.get(memberId).add(allPartitions.get(i));
}
return assignment;
}
Offset tracking is managed by the OffsetStore, which maintains committed offsets per consumer group per topic-partition. This enables at-least-once delivery: consumers process a message, then commit the offset. If a consumer crashes before committing, the message will be redelivered to another group member.
8. Key-Based Partitioning: MurmurHash3
When a producer sends a message with a key, TitanMQ uses MurmurHash3 to deterministically assign it to a partition:
private int resolvePartition(TitanMessage message) {
if (message.key() != null) {
return Math.abs(murmurhash3(message.key())) % numPartitions;
}
return (int)(Thread.currentThread().threadId() % numPartitions);
}
MurmurHash3 was chosen over alternatives because:
- It has excellent distribution properties (low collision rate)
- It's fast — no cryptographic overhead
- It's the same algorithm Kafka uses, making migration easier
For keyless messages, we fall back to thread-based distribution, which provides reasonable spread across partitions without the overhead of an atomic counter.
9. Benchmarking
TitanMQ includes two types of benchmarks:
JMH Microbenchmarks
For precise measurement of individual components:
@BenchmarkMode({Mode.Throughput, Mode.AverageTime})
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 5, time = 5)
@Fork(1)
public class CommitLogBenchmark {
@Param({"128", "1024", "4096", "16384"})
private int messageSize;
@Benchmark
public long appendMessage() throws IOException {
return commitLog.append(message);
}
}
This measures raw commit log append throughput across different message sizes, isolating the storage layer from network overhead.
End-to-End Throughput Benchmark
For realistic workload simulation:
// 4 producer threads, 8 partitions, 1M messages of 1KB each
ExecutorService executor = Executors.newFixedThreadPool(NUM_PRODUCERS);
for (int p = 0; p < NUM_PRODUCERS; p++) {
executor.submit(() -> {
for (int i = 0; i < messagesPerProducer; i++) {
topicManager.append(msg);
}
});
}
The benchmark reports messages/sec, MB/sec, and average latency for both producer and consumer paths.
What's Next
TitanMQ is a working foundation, but there's significant work ahead to make it production-ready:
Full Raft RPC implementation — the current Raft node handles single-node mode; multi-node consensus with actual VoteRequest/AppendEntries RPCs over the network is the next milestone.
Tiered storage — hot data in memory-mapped files, warm data on local SSD, cold data in object storage (S3). This would dramatically reduce storage costs for high-retention use cases.
Exactly-once semantics — idempotent producers (dedup by message ID) and transactional consumers with two-phase commit.
Brokerless mode — embed TitanMQ as an in-process library (like ZeroMQ) for ultra-low-latency use cases where persistence isn't needed.
Observability — built-in metrics (Prometheus-compatible), distributed tracing, and a web-based admin UI.
Message compaction — like Kafka's log compaction, retain only the latest value per key for changelog/state-store use cases.
Conclusion
Building a message queue from scratch is a deep exercise in systems design. Every layer involves trade-offs: space vs. speed, simplicity vs. flexibility, latency vs. throughput.
TitanMQ's bet is that these trade-offs don't have to be mutually exclusive. By combining Kafka's log-structured storage with RabbitMQ's routing flexibility and ZeroMQ's low-latency philosophy, we get a system that doesn't force you to choose.
The project is open source under Apache 2.0. Contributions, feedback, and benchmarks against existing systems are all welcome.
Top comments (1)
the throttleRatio() approach for back-pressure is a nice touch. the binary on/off pattern (just reject above a threshold) causes the oscillation problem you describe, and most implementations I've seen don't bother solving it. the continuous 0.0-1.0 signal lets producers implement their own slowdown curve which is much more realistic for heterogeneous workloads.
curious about the embedded Raft implementation - specifically how you handle the case where the leader crashes mid-write. the append-only log is on the leader, but if it crashes after appending to disk but before replicating to followers, do you end up with a committed offset that followers don't have? that's the classic log divergence problem that KRaft spent a lot of time on. would be interesting to see how you handle follower log truncation on leader change.