In Part 1, we introduced a "middle ground" for Kafka retries: when a message fails, lock its key, send the message to a retry topic, and let the main partition continue processing other keys. If you need a recap, the Confluent blog covers the pattern.
Simple on a whiteboard. In production, things break in ways the diagrams don't show.
This article covers implementation gaps that tutorials skip. The code examples are from kafka-resilience — simplified for clarity, but the edge cases are real.
The Architecture Recap
The goal: when a message fails, block only that key. Other keys keep flowing.
To make this work across multiple consumer instances, we need three topics:
Main Topic — Your business events. The consumer checks if a key is locked before processing.
Retry Topic — Messages land here either because they failed or because a predecessor with the same key failed. A separate consumer processes them with backoff. When a message succeeds here, it releases the lock.
Lock Topic (compacted) — The shared lock registry. When Instance A locks "Order-123", it writes here. Instance B reads this topic to learn about locks it didn't create.
Each instance runs two consumer loops: one for business messages (main or retry), and one that continuously reads the lock topic to keep the local lock map in sync.
Here's how they connect:
Here's where it breaks.
Problem 1: The Dual-Write
When a message fails, you need to do two things:
- Write a lock to the lock topic
- Write the payload to the retry topic
If the first succeeds but the second fails, you've created a zombie lock — the key is locked forever, but there's no message in the retry queue to unlock it.
Solution A: Kafka Transactions
The obvious solution is Kafka transactions — wrap both writes in a transaction so they either both succeed or both fail.
Most modern clients support this, even outside the Java ecosystem. If your setup allows it, this is a valid approach.
Why I didn't use it: Kafka transactions aren't just a flag you flip. They require a transactional producer, which means a dedicated transaction coordinator on the broker side. Every transactional write follows an init → begin → produce → commit cycle — that's at least two extra round-trips per message. With acks=all across availability zones, each round-trip adds latency.
Under normal load, this overhead is manageable. During a failure storm — exactly when your retry system is under heavy load — it compounds. If 10,000 messages fail in a burst, you're running 10,000 transactions, each waiting for cross-AZ acknowledgments before committing. The retry system becomes the bottleneck at the worst possible moment.
Transactions are the correct solution if your broker topology can absorb the overhead. For high-throughput systems, there's a lighter alternative.
Solution B: Compensating Transaction
Instead, I used a compensating transaction: acquire the lock first, then try to write the payload. If the write fails, undo the lock by writing a release.
When processing fails, the Tracker calls redirect to lock the key and forward the message to the retry topic.
func (t *Tracker) redirect(ctx context.Context, msg Message) error {
// Step 1: Acquire lock first
if err := t.coordinator.Acquire(ctx, msg); err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
// Step 2: Publish to retry topic
if err := t.publisher.Send(ctx, t.retryTopic, msg); err != nil {
// Step 3: Compensate on failure — undo the lock
if releaseErr := t.coordinator.Release(ctx, msg); releaseErr != nil {
t.logger.Error("compensation failed - potential zombie lock",
"key", msg.Key(),
"error", releaseErr)
}
return fmt.Errorf("failed to publish to retry: %w", err)
}
return nil
}
The trade-off: if the process crashes between acquiring the lock and completing the compensation, you get a zombie. Rare, but possible. For those edge cases, you'll need a background "reaper" to clean stale locks. We'll cover that in Part 3.
Whichever solution you choose, the remaining problems still apply.
Problem 2: The Reference Counting Gap
Most descriptions of this pattern treat the retry topic as a queue of failed messages. It's not. It also holds messages that never failed — they were redirected because a predecessor with the same key did. This distinction matters now.
Watch this sequence carefully:
T1: Event A (Key: 123) fails
→ Lock Key 123
→ Messages in retry: 1
T2: Event B (Key: 123) arrives, sees lock
→ Redirect to retry topic (not failed — just blocked)
→ Messages in retry: 2
T3: Event A retries successfully
→ Release lock... but HOW?
If "release" means "delete the lock," you've just broken ordering. Event B is still sitting in the retry topic. But the lock is gone. When Event C arrives for Key 123, it processes immediately — before B.
T4: Event C (Key: 123) arrives
→ Check lock: NOT LOCKED (wrong!)
→ Processes immediately
→ C processed before B
The lock isn't a boolean. It's a counter.
Each instance maintains a LocalCoordinator — an in-memory map counting pending retries per key. Later, we'll wrap this with a KafkaCoordinator that broadcasts state changes to other instances.
type LocalCoordinator struct {
locks map[string]int // key → reference count
mu sync.RWMutex
}
func (c *LocalCoordinator) Acquire(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.locks[key]++ // Increment
}
func (c *LocalCoordinator) Release(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.locks[key]--
if c.locks[key] <= 0 {
delete(c.locks, key) // Only free when count hits zero
}
}
func (c *LocalCoordinator) IsLocked(key string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.locks[key] > 0
}
Now the sequence works correctly:
T1: Event A fails → count = 1
T2: Event B redirects → count = 2
T3: Event A succeeds → count = 1 (still locked!)
T4: Event C arrives → blocked (correct!)
T5: Event B succeeds → count = 0 (now free)
T6: Event C proceeds → processes in correct order
The main topic stays blocked for Key 123 until every pending message clears — both the ones that actually failed and the ones that were redirected to preserve order.
Problem 3: The Self-Consumption Trap
Reference counting works locally. But we're broadcasting every lock change to Kafka so other instances can learn about it. And Kafka doesn't filter — you'll also consume your own messages.
Instance A applies the lock twice: once when it acquires locally, once when it consumes its own broadcast. The reference count is now wrong. When the retry succeeds, the count drops to 1 instead of 0. The key stays locked forever.
Fix: tag every message with an origin ID and ignore your own.
const HeaderCoordinatorID = "x-coordinator-id"
func (c *KafkaCoordinator) Acquire(ctx context.Context, msg Message) error {
c.local.Acquire(msg.Key()) // Apply locally first
return c.producer.Send(ctx, c.lockTopic, &LockMessage{
Key: msg.Key(),
Value: msg.Key(),
Headers: map[string]string{
HeaderCoordinatorID: c.instanceID, // Tag with our ID
},
})
}
func (c *KafkaCoordinator) processLockMessage(msg Message) error {
originID := msg.Headers().Get(HeaderCoordinatorID)
// Skip our own messages
if originID == c.instanceID {
return nil
}
// Apply locks from other instances
if msg.Value() != nil {
return c.local.Acquire(msg.Key())
}
// Release locks for tombstones
return c.local.Release(msg.Key())
}
Now each instance only applies foreign locks from the broadcast.
One detail: the instance ID must be stable across restarts. If a pod restarts and gets a new ID, it won't recognize its own pre-crash messages during replay and will double-count them. Use something deterministic — the consumer group member ID or a stable pod identifier — not a random UUID generated at startup.
Problem 4: The Compaction Collision
The lock topic is compacted — Kafka periodically deduplicates records, keeping only the latest value per key. Great for storage. Dangerous for counting.
Here's the trap: compaction doesn't happen instantly. It runs in the background, whenever Kafka decides. So your system works fine... until it doesn't.
Scenario: Three failures for the same key
T1: Event A fails → Produce Key="Order-123", Value="LOCKED"
T2: Event B fails → Produce Key="Order-123", Value="LOCKED"
T3: Event C fails → Produce Key="Order-123", Value="LOCKED"
Local state: count = 3 ✓ (correct, you saw all three)
Hours later, Kafka compaction runs.
Three records with Key="Order-123" → collapsed into one.
T4: Pod restarts, replays lock topic to rebuild state.
Consumer sees one "LOCKED" record. Count = 1. (wrong!)
T5: Event A succeeds → Tombstone deletes the only record.
Count = 0. System thinks Order-123 is free.
But B and C are still in the retry queue.
The local in-memory state was correct. The problem only surfaces after a restart — when you replay the compacted topic and your counts are silently wrong.
Fix: use a unique ID for each lock operation.
By using a UUID as the Kafka key and putting the business key in headers, you force Kafka to keep all three lock records distinct. After restart, the consumer replays three separate records and counts correctly.
func (c *KafkaCoordinator) Acquire(ctx context.Context, msg Message) error {
lockID := uuid.New().String() // Unique per lock operation
c.local.Acquire(ctx, msg)
return c.producer.Send(ctx, c.lockTopic, &LockMessage{
Key: lockID, // Kafka key = UUID (unique, never collides)
Value: lockID, // Lock marker
Headers: map[string]string{
"business-key": msg.Key(), // Actual key in headers
HeaderCoordinatorID: c.instanceID,
},
})
}
The consuming side reads the business-key header to know which business key the lock applies to.
The trade-off you need to understand: UUID keys mean compaction can never deduplicate lock records — every record has a unique key by definition. The lock topic now grows with every lock and unlock operation. With high failure rates, this topic can grow fast, which directly impacts cold start time (Problem 5) and memory usage (covered in Part 3). Tune your retention aggressively.
Problem 5: The Rebalancing Race
Pod A crashes. Kubernetes starts Pod B. Kafka assigns Partition 0 to Pod B.
Pod B's local state is stale. It hasn't finished reading the lock topic, so it doesn't know Order-123 is locked. It processes messages out of order.
The Synchronization Barrier
A consumer must not process messages until its view of the lock state is current.
func (c *KafkaCoordinator) Synchronize(ctx context.Context) error {
// Step 1: Ask broker for the high water mark (latest offset)
hwm, err := c.admin.GetHighWaterMark(ctx, c.lockTopic)
if err != nil {
return fmt.Errorf("failed to get high water mark: %w", err)
}
// Step 2: Wait until our local consumer catches up
for {
currentOffset := c.lockConsumer.CurrentOffset()
if currentOffset >= hwm {
return nil // Caught up, safe to proceed
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
// Keep waiting
}
}
}
Hook this into the consumer's setup phase:
func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
// Block until lock state is current
if err := h.coordinator.Synchronize(session.Context()); err != nil {
return fmt.Errorf("failed to synchronize state: %w", err)
}
return nil
}
Now a pod never "guesses" the lock state. It waits until it knows.
The Rebalance Loop
This barrier has teeth. Every time a pod restarts or Kafka rebalances partitions, the consumer blocks until it catches up on the lock topic. With thousands of active locks, this adds seconds to startup. With millions — especially if the lock topic has grown due to the UUID trade-off from Problem 4 — it can trigger a rebalance loop: you exceed max.poll.interval.ms waiting to sync, Kafka kicks you out, the next pod inherits the partition and does the same thing, and the partition becomes effectively frozen.
For high-churn systems, you need to tune retention on the lock topic aggressively, or move to an external state store (covered in Part 3).
What About New Locks During Sync?
New locks may arrive after the high water mark snapshot. But this doesn't cause a race — and the reason is worth understanding.
The lock is always written before the failed message gets redirected to the retry topic, and the main consumer commits its offset after the redirect. So the sequence is: lock written → message redirected → offset committed. When Pod B takes over the partition, it starts consuming from the last committed offset. Any message that needs a lock was already redirected by Pod A before the offset advanced. Pod B won't encounter that message on the main topic — it's already in the retry topic.
For messages that arrive after Pod B starts, the lock consumer is already tailing the lock topic continuously. Since the lock is written before the redirect, Pod B will see the lock before the redirected message could possibly arrive.
What About the Retry Consumer?
The retry consumer doesn't need a sync barrier. Its job is to process messages that are already in the retry topic and release locks when they succeed. It doesn't check whether a key is locked — every message in the retry topic is there precisely because the key is (or was) locked. It processes sequentially within each partition, releases on success, and re-publishes on failure. No lock lookup needed, no stale state risk.
Summary
Implementing ordered retries isn't routing messages to different topics. It's building a distributed state machine on top of Kafka — and distributed state is never consistent when you need it to be.
The pattern looks clean on a whiteboard. This is what happens when you actually build it.
- Dual-Write — Use transactions or compensating actions
- Reference Counting — Locks are counters, not booleans
- Self-Consumption — Tag messages with origin ID, ignore your own
- Compaction Collision — UUID per lock, business key in headers (but this disables compaction deduplication)
- Rebalancing Race — Sync barrier before processing
The full implementation is in kafka-resilience. The code snippets here are simplified — the real version handles more edge cases.
In Part 3, we'll look at the operational cost: cold starts, memory pressure, and why you might choose a simpler solution instead.




Top comments (0)