In the previous article, we explored distributed tracing with OpenTelemetry. Today we tackle one of the most critical — and most frequently botched — pieces of backend infrastructure: the message queue consumer.
Every production system eventually outgrows synchronous request-response. Order processing, email delivery, image resizing, webhook fanout — these all belong in a queue. But the consumer side is where things get ugly. Messages arrive out of order. Workers crash mid-processing. Duplicates sneak through. Your "simple consumer" becomes a tangle of retries, panics, and lost messages.
This article builds a production-grade NATS JetStream consumer in Go, piece by piece, covering the patterns that keep it alive at 3 AM.
Why Message Queues Matter
Synchronous architectures hit a wall when:
- Latency budgets are tight. Your API shouldn't block for 30 seconds while a PDF renders.
- Failure domains bleed. A downstream service outage shouldn't take your entire API down.
- Load is bursty. Black Friday traffic shouldn't require Black Friday compute 365 days a year.
Message queues decouple producers from consumers. The producer writes a message and moves on. The consumer processes it at its own pace, retries on failure, and scales independently. This is table stakes for any system that handles real traffic.
We'll use NATS JetStream as our concrete example. It's operationally simpler than RabbitMQ or Kafka, supports persistent streams with at-least-once delivery, and has an excellent Go client. The patterns apply universally.
The Architecture
Here's what we're building:
Producer → NATS JetStream Stream → Consumer Group
├── Worker Pool (N goroutines)
├── Retry with Exponential Backoff
├── Dead Letter Queue
├── Idempotency Check
└── Metrics + Tracing
Let's define the core interfaces first:
package consumer
import (
"context"
"time"
)
// Message represents a queue message with metadata.
type Message struct {
ID string
Subject string
Data []byte
Headers map[string]string
Timestamp time.Time
Attempt int
}
// Handler processes a single message. Return an error to trigger retry.
type Handler func(ctx context.Context, msg Message) error
// IdempotencyStore tracks processed message IDs.
type IdempotencyStore interface {
Exists(ctx context.Context, messageID string) (bool, error)
Mark(ctx context.Context, messageID string, ttl time.Duration) error
}
Designing Idempotent Consumers
At-least-once delivery means your handler will receive duplicates. Network blips, consumer restarts, rebalancing — all cause redelivery. Your handler must be idempotent: processing the same message twice produces the same result.
Two strategies work in practice:
1. Deduplication at the consumer level — check a store before processing:
type RedisIdempotencyStore struct {
client *redis.Client
}
func (s *RedisIdempotencyStore) Exists(ctx context.Context, id string) (bool, error) {
val, err := s.client.Exists(ctx, "idem:"+id).Result()
if err != nil {
return false, fmt.Errorf("idempotency check failed: %w", err)
}
return val > 0, nil
}
func (s *RedisIdempotencyStore) Mark(ctx context.Context, id string, ttl time.Duration) error {
return s.client.Set(ctx, "idem:"+id, "1", ttl).Err()
}
2. Idempotent operations — design your writes so repeats are safe. Use INSERT ... ON CONFLICT DO NOTHING, conditional updates with version checks, or deterministic IDs derived from message content.
The first approach is simpler to bolt on. The second is more robust. Production systems usually combine both: a deduplication layer as a fast path, with idempotent database operations as the safety net.
The TTL on the idempotency key matters. Set it too short, and late redeliveries slip through. Set it too long, and your store grows unbounded. Match it to your stream's max redelivery window — typically 24–72 hours.
The Consumer Core
Now let's build the consumer. The Config struct captures every tunable:
type Config struct {
// NATS connection
NATSUrl string
StreamName string
Subject string
Durable string // consumer group name
// Concurrency
WorkerCount int
// Retry
MaxRetries int
InitialBackoff time.Duration
MaxBackoff time.Duration
BackoffFactor float64
// Idempotency
IdempotencyTTL time.Duration
// Observability
ServiceName string
}
func DefaultConfig() Config {
return Config{
WorkerCount: 10,
MaxRetries: 5,
InitialBackoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
BackoffFactor: 2.0,
IdempotencyTTL: 24 * time.Hour,
ServiceName: "queue-consumer",
}
}
The consumer struct ties everything together:
type Consumer struct {
cfg Config
js nats.JetStreamContext
handler Handler
idemStore IdempotencyStore
metrics *Metrics
tracer trace.Tracer
logger *slog.Logger
sub *nats.Subscription
wg sync.WaitGroup
msgCh chan *nats.Msg
shutdownCh chan struct{}
}
func New(nc *nats.Conn, cfg Config, handler Handler, idemStore IdempotencyStore) (*Consumer, error) {
js, err := nc.JetStream()
if err != nil {
return nil, fmt.Errorf("jetstream init: %w", err)
}
tp := otel.GetTracerProvider()
meter := otel.GetMeterProvider().Meter(cfg.ServiceName)
return &Consumer{
cfg: cfg,
js: js,
handler: handler,
idemStore: idemStore,
metrics: NewMetrics(meter),
tracer: tp.Tracer(cfg.ServiceName),
logger: slog.Default(),
msgCh: make(chan *nats.Msg, cfg.WorkerCount*2),
shutdownCh: make(chan struct{}),
}, nil
}
Retry with Exponential Backoff
The retry logic lives in the message processing path. NATS JetStream supports server-side NakWithDelay, which tells the server to redeliver after a specified duration. This is vastly superior to client-side retry loops because it survives consumer restarts:
func (c *Consumer) processMessage(natsMsg *nats.Msg) {
ctx, span := c.tracer.Start(context.Background(), "process_message",
trace.WithAttributes(
attribute.String("subject", natsMsg.Subject),
),
)
defer span.End()
meta, err := natsMsg.Metadata()
if err != nil {
c.logger.Error("failed to read metadata", "error", err)
natsMsg.Nak()
return
}
attempt := int(meta.NumDelivered)
msgID := natsMsg.Header.Get("Nats-Msg-Id")
if msgID == "" {
msgID = fmt.Sprintf("%s-%d", meta.Stream, meta.Sequence.Stream)
}
span.SetAttributes(
attribute.String("message_id", msgID),
attribute.Int("attempt", attempt),
)
// Idempotency check
if exists, err := c.idemStore.Exists(ctx, msgID); err != nil {
c.logger.Error("idempotency check failed", "error", err, "msg_id", msgID)
span.RecordError(err)
// Retry — the store might be temporarily down
c.nakWithBackoff(natsMsg, attempt)
return
} else if exists {
c.logger.Debug("duplicate message, skipping", "msg_id", msgID)
c.metrics.duplicatesSkipped.Add(ctx, 1)
natsMsg.Ack()
return
}
msg := Message{
ID: msgID,
Subject: natsMsg.Subject,
Data: natsMsg.Data,
Headers: flattenHeaders(natsMsg.Header),
Timestamp: meta.Timestamp,
Attempt: attempt,
}
// Process
start := time.Now()
err = c.handler(ctx, msg)
duration := time.Since(start)
c.metrics.processingDuration.Record(ctx, duration.Seconds(),
metric.WithAttributes(attribute.Bool("success", err == nil)),
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
c.metrics.processingErrors.Add(ctx, 1)
if attempt >= c.cfg.MaxRetries {
c.logger.Error("max retries exceeded, sending to DLQ",
"msg_id", msgID, "error", err, "attempts", attempt,
)
c.sendToDLQ(ctx, natsMsg, err)
natsMsg.Ack() // Ack original to stop redelivery
return
}
c.logger.Warn("processing failed, scheduling retry",
"msg_id", msgID, "error", err, "attempt", attempt,
)
c.nakWithBackoff(natsMsg, attempt)
return
}
// Success — mark as processed and ack
if err := c.idemStore.Mark(ctx, msgID, c.cfg.IdempotencyTTL); err != nil {
c.logger.Error("failed to mark idempotency", "error", err, "msg_id", msgID)
// Don't fail the message — it was processed successfully.
// Worst case: a duplicate gets processed again idempotently.
}
c.metrics.messagesProcessed.Add(ctx, 1)
natsMsg.Ack()
}
func (c *Consumer) nakWithBackoff(msg *nats.Msg, attempt int) {
delay := c.cfg.InitialBackoff
for i := 1; i < attempt; i++ {
delay = time.Duration(float64(delay) * c.cfg.BackoffFactor)
if delay > c.cfg.MaxBackoff {
delay = c.cfg.MaxBackoff
break
}
}
msg.NakWithDelay(delay)
}
Key decisions here:
-
Server-side retry via
NakWithDelay— the consumer doesn't hold the message during backoff. If this consumer dies, another picks it up. - Ack after DLQ publish — we ack the original message to stop the redelivery loop. The DLQ is the new source of truth.
- Idempotency mark happens after success — if the mark fails, the message might be reprocessed. That's fine because operations are idempotent.
Dead Letter Queue
Messages that fail all retries go to a separate stream. Your ops team reviews them, fixes the bug, and replays:
func (c *Consumer) sendToDLQ(ctx context.Context, original *nats.Msg, processErr error) {
_, span := c.tracer.Start(ctx, "send_to_dlq")
defer span.End()
headers := nats.Header{}
// Preserve original headers
for k, v := range original.Header {
headers[k] = v
}
headers.Set("X-DLQ-Error", processErr.Error())
headers.Set("X-DLQ-Timestamp", time.Now().UTC().Format(time.RFC3339))
headers.Set("X-Original-Subject", original.Subject)
dlqMsg := &nats.Msg{
Subject: fmt.Sprintf("dlq.%s", c.cfg.Subject),
Data: original.Data,
Header: headers,
}
if _, err := c.js.PublishMsg(dlqMsg); err != nil {
c.logger.Error("failed to publish to DLQ", "error", err)
span.RecordError(err)
c.metrics.dlqFailures.Add(ctx, 1)
// This is bad. The message will be lost after ack.
// In practice, also log the full message body for manual recovery.
c.logger.Error("LOST MESSAGE — manual recovery required",
"subject", original.Subject,
"data", string(original.Data),
)
}
c.metrics.dlqMessages.Add(ctx, 1)
}
Create the DLQ stream during setup:
func EnsureStreams(js nats.JetStreamContext, streamName, subject string) error {
// Main stream
_, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{subject},
Retention: nats.WorkQueuePolicy,
MaxAge: 72 * time.Hour,
})
if err != nil {
return fmt.Errorf("create main stream: %w", err)
}
// DLQ stream
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName + "_dlq",
Subjects: []string{"dlq." + subject},
Retention: nats.LimitsPolicy,
MaxAge: 30 * 24 * time.Hour, // Keep DLQ messages for 30 days
})
if err != nil {
return fmt.Errorf("create DLQ stream: %w", err)
}
return nil
}
Concurrency Control with Worker Pools
A single goroutine consuming messages wastes most of its time waiting on I/O. A worker pool lets you process N messages concurrently while keeping backpressure under control:
func (c *Consumer) Start(ctx context.Context) error {
sub, err := c.js.PullSubscribe(
c.cfg.Subject,
c.cfg.Durable,
nats.ManualAck(),
nats.AckWait(30*time.Second),
nats.MaxDeliver(c.cfg.MaxRetries+1),
)
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
c.sub = sub
// Start workers
for i := 0; i < c.cfg.WorkerCount; i++ {
c.wg.Add(1)
go c.worker(i)
}
// Fetch loop — pulls messages from NATS and fans out to workers
c.wg.Add(1)
go c.fetchLoop(ctx)
c.logger.Info("consumer started",
"workers", c.cfg.WorkerCount,
"subject", c.cfg.Subject,
"durable", c.cfg.Durable,
)
return nil
}
func (c *Consumer) fetchLoop(ctx context.Context) {
defer c.wg.Done()
defer close(c.msgCh)
for {
select {
case <-ctx.Done():
return
case <-c.shutdownCh:
return
default:
}
msgs, err := c.sub.Fetch(c.cfg.WorkerCount, nats.MaxWait(5*time.Second))
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
continue // No messages available, poll again
}
c.logger.Error("fetch error", "error", err)
time.Sleep(time.Second) // Back off on unexpected errors
continue
}
for _, msg := range msgs {
select {
case c.msgCh <- msg:
case <-ctx.Done():
return
case <-c.shutdownCh:
return
}
}
}
}
func (c *Consumer) worker(id int) {
defer c.wg.Done()
c.logger.Debug("worker started", "worker_id", id)
for msg := range c.msgCh {
c.processMessage(msg)
}
c.logger.Debug("worker stopped", "worker_id", id)
}
The Fetch batch size matches the worker count. This keeps the channel buffer from growing unbounded while ensuring every worker stays fed. The MaxWait on Fetch prevents tight-looping when the stream is empty.
Graceful Shutdown Integration
Following the patterns from article #14, we integrate with OS signals to drain in-flight work before exiting:
func (c *Consumer) Shutdown(ctx context.Context) error {
c.logger.Info("shutting down consumer...")
// Signal the fetch loop to stop
close(c.shutdownCh)
// Drain the subscription — no new messages will be delivered
if c.sub != nil {
if err := c.sub.Drain(); err != nil {
c.logger.Error("subscription drain failed", "error", err)
}
}
// Wait for in-flight messages to complete (or context deadline)
done := make(chan struct{})
go func() {
c.wg.Wait()
close(done)
}()
select {
case <-done:
c.logger.Info("consumer shutdown complete")
return nil
case <-ctx.Done():
c.logger.Warn("shutdown timed out, some messages may be redelivered")
return ctx.Err()
}
}
Wire it into main:
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
cfg := consumer.DefaultConfig()
cfg.NATSUrl = nats.DefaultURL
cfg.StreamName = "ORDERS"
cfg.Subject = "orders.process"
cfg.Durable = "order-processor"
store := &RedisIdempotencyStore{client: redis.NewClient(&redis.Options{Addr: "localhost:6379"})}
c, err := consumer.New(nc, cfg, handleOrder, store)
if err != nil {
log.Fatal(err)
}
if err := c.Start(ctx); err != nil {
log.Fatal(err)
}
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
c.Shutdown(shutdownCtx)
}
func handleOrder(ctx context.Context, msg consumer.Message) error {
var order Order
if err := json.Unmarshal(msg.Data, &order); err != nil {
return fmt.Errorf("unmarshal order: %w", err) // Will retry, but won't help — consider a non-retryable error type
}
// Process the order...
return nil
}
A subtle point: the handleOrder comment hints at an important production refinement. Some errors are non-retryable — bad JSON, invalid business data, schema mismatches. Retrying won't help. A production consumer should distinguish these:
type PermanentError struct {
Err error
}
func (e *PermanentError) Error() string { return e.Err.Error() }
func (e *PermanentError) Unwrap() error { return e.Err }
// In processMessage, before retry logic:
var permErr *PermanentError
if errors.As(err, &permErr) {
c.logger.Error("permanent error, sending to DLQ immediately",
"msg_id", msgID, "error", err,
)
c.sendToDLQ(ctx, natsMsg, err)
natsMsg.Ack()
return
}
Observability: Metrics and Tracing
The metrics struct captures the four golden signals for a queue consumer:
type Metrics struct {
messagesProcessed metric.Int64Counter
processingErrors metric.Int64Counter
processingDuration metric.Float64Histogram
duplicatesSkipped metric.Int64Counter
dlqMessages metric.Int64Counter
dlqFailures metric.Int64Counter
inflightMessages metric.Int64UpDownCounter
}
func NewMetrics(meter metric.Meter) *Metrics {
m := &Metrics{}
m.messagesProcessed, _ = meter.Int64Counter("consumer.messages.processed",
metric.WithDescription("Total messages successfully processed"))
m.processingErrors, _ = meter.Int64Counter("consumer.messages.errors",
metric.WithDescription("Total processing errors"))
m.processingDuration, _ = meter.Float64Histogram("consumer.messages.duration_seconds",
metric.WithDescription("Message processing duration"),
metric.WithExplicitBucketBoundaries(0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30))
m.duplicatesSkipped, _ = meter.Int64Counter("consumer.messages.duplicates",
metric.WithDescription("Duplicate messages skipped"))
m.dlqMessages, _ = meter.Int64Counter("consumer.dlq.sent",
metric.WithDescription("Messages sent to DLQ"))
m.dlqFailures, _ = meter.Int64Counter("consumer.dlq.failures",
metric.WithDescription("Failed DLQ publishes — potential message loss"))
m.inflightMessages, _ = meter.Int64UpDownCounter("consumer.messages.inflight",
metric.WithDescription("Currently processing messages"))
return m
}
The dlq.failures counter deserves an alert. If this fires, you're losing messages. Set a PagerDuty threshold at > 0.
For tracing, the spans we set in processMessage create a trace per message. To connect producer and consumer traces, propagate the trace context through NATS headers:
// Producer side
func publishWithTrace(ctx context.Context, js nats.JetStreamContext, subject string, data []byte) error {
msg := &nats.Msg{
Subject: subject,
Data: data,
Header: nats.Header{},
}
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(msg.Header))
_, err := js.PublishMsg(msg)
return err
}
// Consumer side — in processMessage, replace context.Background():
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(natsMsg.Header))
ctx, span := c.tracer.Start(ctx, "process_message", ...)
Now your traces show the full journey: API request → publish → consume → process → downstream calls. This is indispensable for debugging latency in async pipelines.
Testing Strategies
Queue consumers are notoriously hard to test. Here's a layered approach.
Unit test the handler in isolation — no queue, no infrastructure:
func TestHandleOrder_Success(t *testing.T) {
msg := consumer.Message{
ID: "test-1",
Data: []byte(`{"id":"order-123","amount":99.99}`),
}
err := handleOrder(context.Background(), msg)
assert.NoError(t, err)
// Assert side effects: database writes, API calls, etc.
}
func TestHandleOrder_InvalidJSON(t *testing.T) {
msg := consumer.Message{
ID: "test-2",
Data: []byte(`not json`),
}
err := handleOrder(context.Background(), msg)
assert.Error(t, err)
var permErr *consumer.PermanentError
assert.True(t, errors.As(err, &permErr), "bad JSON should be a permanent error")
}
Integration test with an embedded NATS server — tests the full consumer lifecycle:
func TestConsumer_ProcessAndAck(t *testing.T) {
// Start embedded NATS
srv, _ := server.NewServer(&server.Options{
Port: -1,
JetStream: true,
StoreDir: t.TempDir(),
})
go srv.Start()
defer srv.Shutdown()
srv.ReadyForConnections(5 * time.Second)
nc, _ := nats.Connect(srv.ClientURL())
defer nc.Close()
js, _ := nc.JetStream()
consumer.EnsureStreams(js, "TEST", "test.subject")
processed := make(chan string, 1)
handler := func(ctx context.Context, msg consumer.Message) error {
processed <- msg.ID
return nil
}
store := &InMemoryIdempotencyStore{seen: map[string]bool{}}
cfg := consumer.DefaultConfig()
cfg.StreamName = "TEST"
cfg.Subject = "test.subject"
cfg.Durable = "test-consumer"
cfg.WorkerCount = 1
c, _ := consumer.New(nc, cfg, handler, store)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.Start(ctx)
// Publish a message
js.Publish("test.subject", []byte(`{"key":"value"}`),
nats.MsgId("msg-001"))
select {
case id := <-processed:
assert.Equal(t, "msg-001", id)
case <-time.After(5 * time.Second):
t.Fatal("message not processed within timeout")
}
}
func TestConsumer_DuplicateSkipped(t *testing.T) {
// Same setup as above, but pre-mark the message ID
store := &InMemoryIdempotencyStore{seen: map[string]bool{"msg-001": true}}
// ... publish msg-001, assert it gets acked but handler is NOT called
}
func TestConsumer_RetryThenDLQ(t *testing.T) {
// Handler returns error every time.
// Assert message appears in DLQ stream after MaxRetries.
}
Chaos test — kill the consumer mid-processing, restart, and verify no messages are lost:
func TestConsumer_CrashRecovery(t *testing.T) {
// 1. Publish 100 messages
// 2. Start consumer, let it process ~50
// 3. Cancel context (simulates crash)
// 4. Start a new consumer with the same durable name
// 5. Wait for all 100 to be processed (some may be processed twice)
// 6. Assert: handler was called >= 100 times, all message IDs covered
}
The in-memory idempotency store for tests:
type InMemoryIdempotencyStore struct {
mu sync.Mutex
seen map[string]bool
}
func (s *InMemoryIdempotencyStore) Exists(_ context.Context, id string) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.seen[id], nil
}
func (s *InMemoryIdempotencyStore) Mark(_ context.Context, id string, _ time.Duration) error {
s.mu.Lock()
defer s.mu.Unlock()
s.seen[id] = true
return nil
}
Production Checklist
Before shipping your consumer:
- [ ] Idempotency — handler is safe to run twice on the same input
- [ ] Permanent vs transient errors — bad data goes straight to DLQ, not through the retry loop
- [ ] Backoff bounds —
MaxBackoffprevents retry storms; jitter is even better - [ ] DLQ monitoring — alert on
dlq.sent > 0and page ondlq.failures > 0 - [ ] Graceful shutdown — drain subscription, wait for in-flight, then exit
- [ ] Health check — expose
/healthzthat checks NATS and idempotency store connectivity - [ ] Consumer lag metric — monitor
consumer.messages.pendingvia NATS admin API - [ ] Resource limits — bound worker count, channel buffer, and message size
- [ ] Trace propagation — connect producer and consumer spans for end-to-end visibility
Wrapping Up
A message queue consumer is deceptively simple to prototype and deceptively hard to get right in production. The patterns in this article — idempotent processing, server-side retry with backoff, dead letter queues, bounded worker pools, graceful shutdown, and full observability — form a foundation that handles the failures real systems encounter.
The complete consumer is around 300 lines of focused Go code. No frameworks, no magic. Just clear concurrency patterns and deliberate error handling. That's the kind of code you want running at 3 AM.
Next in the series, we'll look at building a production-ready rate limiter using Redis and sliding window counters. Stay tuned.
If this article helped you, consider buying me a coffee on Ko-fi! Follow me for more production backend patterns.
Top comments (0)