Many of you have been there - it’s 3 AM, your phone buzzes, and you’re staring at an alert: “Kafka consumer lag exceeded threshold.”
You stumble to your laptop, check the metrics, and… everything looks fine. The messages are processing. The lag was just a temporary spike from a slow downstream service. You silence the alert and go back to bed, mentally adding another item to your "fix someday" list.
Sound familiar?
Here's what's actually happening: lag tells you how far behind you are, but not whether you're making progress. A consumer can sit at 1000 messages lag for 10 minutes because it's stuck, or because it's processing at exactly the rate messages arrive. From lag alone, you can't tell the difference.
The real question isn't "how much lag?" — it's "are we making progress?"
That’s the problem I found myself wrestling with a while back. After dealing with false positive alerts and delayed detection of real issues, I discovered a simple but brilliant solution from PagerDuty’s engineering team. Rather than retelling their story, I want to show you how to implement this approach in Go, with some insights I’ve picked up along the way.
💡 Want to dive straight into the code? Check out the complete source code here.
Why Traditional Health Checks Fall Short
Let’s talk about common patterns and why they don’t quite work:
The “Always Healthy” Approach
func healthHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK")) // "I'm alive!" (Are you though?)
}
This tells you nothing. Your consumer could be completely frozen, and Kubernetes (any orchestrator) would happily keep it alive.
The “Ping the Broker” Approach
This is better than nothing — at least you’re verifying connectivity. But connectivity doesn’t mean your consumer is processing messages. Your network might be fine while your consumer group is stuck in an infinite rebalance loop.
The “Lag Threshold” Trap
This is where most teams land. You’re probably already tracking consumer lag through Prometheus or similar tools. You’ve set up alerts when lag exceeds some threshold — maybe 100 messages, maybe 1000, maybe a million.
But here’s the problem: what should that threshold be?
Set it too low → You wake up at 3 AM because a downstream API responded slowly for 30 seconds. The consumer is fine, just briefly overwhelmed. False alarm.
Set it too high → You miss genuine issues until they’ve already cascaded into customer-visible problems. By the time your alert fires, you’re in damage control mode.
The granularity problem: A single stuck POD in your deployment of 50 can go unnoticed if you’re monitoring average lag. That POD quietly fails while the other 49 keep working, masking the issue in your aggregate metrics.
The fundamental tension: fast detection means false positives, reliable alerts mean delayed detection.
You can build sophisticated heuristics or even ML models to detect anomalies, but all of that adds complexity and still detects problems with noticeable delay.
The Key Insight: Progress vs. Position
What we really need is a way to answer a simple question: Is this specific consumer instance making progress?
Instead of measuring lag, we measure progress. Here's the approach:
The Heartbeat: Track the timestamp of the last processed message for each partition. If we're processing messages, we're healthy.
The Verification: If enough time passes without new messages (say, X seconds), we don't panic yet. We query the Kafka broker for the latest offset. Now we can make a decision:
- Consumer Offset < Broker Offset: ❌ UNHEALTHY (there are messages available, but we're not processing them — we're stuck)
- Consumer Offset ≥ Broker Offset: ✅ HEALTHY (we're caught up, just waiting for more work — we're idle)
This elegantly distinguishes between a stuck consumer and a consumer that's simply idle.
How It Works: Three Scenarios
Let me show you exactly what happens in each case.
Scenario 1: Active Processing (Healthy)
When messages are flowing and your consumer is processing them, health checks are instant:
No broker queries needed — we've seen recent activity.
Scenario 2: Stuck Consumer (Unhealthy)
The consumer freezes, but messages keep arriving:
The broker query reveals there's work to do, but we're not doing it.
Scenario 3: Idle Consumer (Healthy)
The consumer is caught up, just waiting for new messages:
This is the key distinction — same timeout, different outcome based on broker state.
The beauty of this approach is its simplicity — we're not doing complex analysis or ML. We're just asking two questions: "Have I seen new work? If not, is there new work available?"
Implementation
I’ve packaged this logic into kafka-pulse-go, a lightweight library that works with most popular Kafka clients. The core logic is decoupled from specific client implementations, with adapters provided for Sarama, segmentio/kafka-go, and Confluent’s client.
Let me walk you through how to use it.
Setting Up the Monitor
The setup is straightforward. Here's what you need with Sarama:
import (
"github.com/IBM/sarama"
adapter "github.com/vmyroslav/kafka-pulse-go/adapter/sarama"
"github.com/vmyroslav/kafka-pulse-go/pulse"
)
func main() {
// Your existing Sarama setup
config := sarama.NewConfig()
client, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatal(err)
}
// Create the health checker adapter
brokerClient := adapter.NewClientAdapter(client)
// Configure the monitor
monitorConfig := pulse.Config{
Logger: logger,
StuckTimeout: 30 * time.Second,
}
monitor, err := pulse.NewHealthChecker(monitorConfig, brokerClient)
if err != nil {
log.Fatal(err)
}
// Pass monitor to your consumer...
}
What's StuckTimeout? This defines how long we'll wait without seeing new messages before we query the broker. Set this based on your expected message frequency:
- High-throughput topics: 10-30 seconds works well
- Medium-volume topics: 1-2 minutes is reasonable
- Low-volume topics: You might need 5-10 minutes
The key is balancing detection speed with broker query overhead. Too short and you’ll query the broker constantly during idle periods. Too long and you’ll delay detecting real issues.
Integrating with Your Consumer
The integration is minimal — just call Track() for each message you process:
func (h *consumerGroupHandler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
for message := range claim.Messages() {
// Process your business logic
if err := processMessage(message); err != nil {
// Handle error...
}
// Tell the health monitor we're making progress
wrappedMessage := adapter.NewMessage(message)
h.monitor.Track(session.Context(), wrappedMessage)
session.MarkMessage(message, "")
}
return nil
}
That’s it. The monitor now knows when you’ve processed a message and can track progress per partition.
Exposing the Health Check
Wire this into your HTTP server for health checks:
func (hs *HealthServer) healthHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
isHealthy, err := hs.monitor.Healthy(ctx)
if err != nil {
// Handle broker connection errors
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "unhealthy",
"error": err.Error(),
})
return
}
if isHealthy {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "healthy"})
} else {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "unhealthy",
"reason": "consumer stuck behind available messages",
})
}
}
Now your orchestrator (Kubernetes, ECS, whatever) can hit this endpoint and know the real health of your consumer.
The Critical Detail: Handling Rebalances
Here’s where custom implementations may fail. In Kafka, partition ownership is fluid. If you add a new POD or an old one crashes, Kafka triggers a rebalance. Your consumer instance might lose ownership of Partition 1 and gain an ownership of Partition 2.
The Danger of the "Zombie Partition"
Without proper cleanup, here's what happens:
- Rebalance Triggered: You added a new POD to consumer group, or an existing one crashes. Kafka initiates a consumer group rebalance to redistribute partition ownership.
- Your consumer loses ownership of Partition 1 and stops reading from it (correctly - you no longer own it)
- The health checker still holds a reference to Partition 1 in its memory
- Time passes without new messages for Partition 1 in your tracker (obviously — you're not reading it anymore)
- The health checker eventually queries the broker and sees the offset for Partition 1 is moving (thanks to whoever owns it now)
- But your local tracked offset for Partition 1 is frozen at the last message you processed before losing ownership
- Result: The health checker flags your pod as UNHEALTHY for a partition it doesn't even own anymore
This is a "zombie partition" problem — your health checker is tracking ghosts.
The Fix
You must manage the partition lifecycle. Call Release() when a session ends to purge that partition from the monitor’s memory:
func (h *consumerGroupHandler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
ctx := session.Context()
for {
select {
case <-ctx.Done():
// CRITICAL: Clean up partition tracking during rebalance
h.monitor.Release(ctx, claim.Topic(), claim.Partition())
return nil
case message := <-claim.Messages():
processMessage(message)
wrappedMessage := adapter.NewMessage(message)
h.monitor.Track(ctx, wrappedMessage)
session.MarkMessage(message, "")
}
}
}
The ctx.Done() signal from Sarama indicates that the session is ending — either due to a rebalance, shutdown, or error. This is your cue to clean up tracking state for this partition.
Why this matters: In a production system with frequent deployments, rebalances happen constantly. Without proper cleanup, you’ll see cascading false alerts as PODs report unhealthy for partitions they’ve lost ownership of.
Under the Hood
The library’s design is intentionally simple. Here are the key pieces:
State Tracking
We maintain a map of topic-partitions to their last-seen offset and timestamp:
type tracker struct {
topicPartitionOffsets map[string]map[int32]OffsetTimestamp
mu sync.RWMutex
}
type OffsetTimestamp struct {
Timestamp time.Time
Offset int64
}
Why a map? Because each partition is independent. Partition 0 being stuck doesn’t mean Partition 1 is stuck. We need per-partition granularity to avoid false positives.
The Health Check Logic
The verification follows the two-step process:
func (h *HealthChecker) Healthy(ctx context.Context) (bool, error) {
currentState := h.tracker.currentOffsets()
now := h.clock.Now()
for topic := range currentState {
for partition := range currentState[topic] {
offsetTimestamp := currentState[topic][partition]
// Step 1: Has it been too long since we saw a new message?
if now.Sub(offsetTimestamp.Timestamp) > h.stuckTimeout {
// Step 2: Query broker for latest offset
latestOffset, err := h.client.GetLatestOffset(ctx, topic, partition)
if err != nil {
if h.ignoreBrokerErrors {
continue // Don't fail on transient broker issues
}
return false, fmt.Errorf("failed to get latest offset: %w", err)
}
// Are we stuck or just idle?
if offsetTimestamp.Offset < latestOffset {
// Stuck! There are messages we haven't processed
return false, nil
}
// We're caught up, just idle - that's fine
}
}
}
return true, nil
}
The key insight is in that comparison: offsetTimestamp.Offset < latestOffset.
Practical Considerations
What About Poison Pill Messages?
A common pitfall in health checks is confusing activity with progress. If a consumer is stuck in an infinite retry loop on a single malformed message, it’s technically “active” — burning CPU and making network calls — but it’s not “healthy” because the stream is blocked.
This approach handles it correctly. Here’s why:
The tracker only updates the timestamp when the offset actually moves forward:
// Only update timestamp if offset has changed
if existing.Offset != m.Offset() || existing.Timestamp.IsZero() {
t.topicPartitionOffsets[m.Topic()][m.Partition()] = OffsetTimestamp{
Offset: m.Offset(),
Timestamp: t.clock.Now(),
}
}
If your consumer spends 10 minutes retrying the same message at offset 100, the offset never advances. The timestamp stays frozen at the last successful message. Eventually the timeout expires, and because the broker has newer messages (101, 102, 103…), the health check correctly reports Unhealthy.
This forces visibility on “zombie loops” that may otherwise go unnoticed.
Broker Outages: Choosing Your Failure Mode
Since our verification step requires querying the broker, you need to decide what happens when that call fails.
The library exposes an IgnoreBrokerErrors flag that controls this behavior:
Option A: Fail Closed (default)
config := pulse.Config{
StuckTimeout: 30 * time.Second,
IgnoreBrokerErrors: false, // default - fail on broker errors
}
- Upside: You never report “Healthy” when you’re actually blind to the real state
- Risk: A temporary Kafka blip can cause Kubernetes to restart your entire consumer fleet simultaneously (a “restart storm”). Since restarting won’t fix the broker, this just adds chaos to an ongoing outage
Option B: Fail Open
config := pulse.Config{
StuckTimeout: 30 * time.Second,
IgnoreBrokerErrors: true, // ignore transient broker errors
}
- Upside: Your PODs stay running, warm, and ready to resume work the moment the broker recovers
- Risk: You might miss a legitimate network partition affecting only your specific POD
My recommendation: Start with fail-closed (the default). It's safer — you never report healthy when you're blind to the actual state. If you experience restart storms during broker incidents, switch to fail-open and add separate monitoring for broker connectivity at the infrastructure level.
Does This Replace Consumer Lag Monitoring?
No! Consumer lag metrics are still valuable for:
- Capacity planning: Understanding if you need to scale your consumer group
- Performance trending: Tracking how your processing speed changes over time
- Business SLAs: If your requirement is “process messages within 5 minutes,” lag is exactly what you need to measure
This health check complements lag monitoring — it doesn't replace it. Lag tells you if you need more consumers. Health checks tell you if your existing consumers are working.
Wrapping Up
The core insight is simple: health checks should measure progress.
By distinguishing between “stuck with work to do” and “idle but caught up,” we eliminate false positives without missing real issues. No more 3 AM alerts for temporary slowdowns. No more delayed detection of genuinely stuck consumers.
The implementation is straightforward:
- Track offset progression for each partition
- When idle too long, verify against broker state
- Handle rebalances cleanly with proper partition lifecycle management
- Choose your failure mode for broker errors
Whether you use kafka-pulse-go or implement this pattern yourself, it’s universal across languages and Kafka clients. The principle remains the same.
Try It Out
The repo includes a complete working example.
It's the fastest way to see how this works in practice.



Top comments (0)