DEV Community

Young Gao
Young Gao

Posted on

Building a Production-Ready Message Queue Consumer in Go

In the previous article, we explored distributed tracing with OpenTelemetry. Today we tackle one of the most critical — and most frequently botched — pieces of backend infrastructure: the message queue consumer.

Every production system eventually outgrows synchronous request-response. Order processing, email delivery, image resizing, webhook fanout — these all belong in a queue. But the consumer side is where things get ugly. Messages arrive out of order. Workers crash mid-processing. Duplicates sneak through. Your "simple consumer" becomes a tangle of retries, panics, and lost messages.

This article builds a production-grade NATS JetStream consumer in Go, piece by piece, covering the patterns that keep it alive at 3 AM.

Why Message Queues Matter

Synchronous architectures hit a wall when:

  • Latency budgets are tight. Your API shouldn't block for 30 seconds while a PDF renders.
  • Failure domains bleed. A downstream service outage shouldn't take your entire API down.
  • Load is bursty. Black Friday traffic shouldn't require Black Friday compute 365 days a year.

Message queues decouple producers from consumers. The producer writes a message and moves on. The consumer processes it at its own pace, retries on failure, and scales independently. This is table stakes for any system that handles real traffic.

We'll use NATS JetStream as our concrete example. It's operationally simpler than RabbitMQ or Kafka, supports persistent streams with at-least-once delivery, and has an excellent Go client. The patterns apply universally.

The Architecture

Here's what we're building:

Producer → NATS JetStream Stream → Consumer Group
                                      ├── Worker Pool (N goroutines)
                                      ├── Retry with Exponential Backoff
                                      ├── Dead Letter Queue
                                      ├── Idempotency Check
                                      └── Metrics + Tracing
Enter fullscreen mode Exit fullscreen mode

Let's define the core interfaces first:

package consumer

import (
    "context"
    "time"
)

// Message represents a queue message with metadata.
type Message struct {
    ID        string
    Subject   string
    Data      []byte
    Headers   map[string]string
    Timestamp time.Time
    Attempt   int
}

// Handler processes a single message. Return an error to trigger retry.
type Handler func(ctx context.Context, msg Message) error

// IdempotencyStore tracks processed message IDs.
type IdempotencyStore interface {
    Exists(ctx context.Context, messageID string) (bool, error)
    Mark(ctx context.Context, messageID string, ttl time.Duration) error
}
Enter fullscreen mode Exit fullscreen mode

Designing Idempotent Consumers

At-least-once delivery means your handler will receive duplicates. Network blips, consumer restarts, rebalancing — all cause redelivery. Your handler must be idempotent: processing the same message twice produces the same result.

Two strategies work in practice:

1. Deduplication at the consumer level — check a store before processing:

type RedisIdempotencyStore struct {
    client *redis.Client
}

func (s *RedisIdempotencyStore) Exists(ctx context.Context, id string) (bool, error) {
    val, err := s.client.Exists(ctx, "idem:"+id).Result()
    if err != nil {
        return false, fmt.Errorf("idempotency check failed: %w", err)
    }
    return val > 0, nil
}

func (s *RedisIdempotencyStore) Mark(ctx context.Context, id string, ttl time.Duration) error {
    return s.client.Set(ctx, "idem:"+id, "1", ttl).Err()
}
Enter fullscreen mode Exit fullscreen mode

2. Idempotent operations — design your writes so repeats are safe. Use INSERT ... ON CONFLICT DO NOTHING, conditional updates with version checks, or deterministic IDs derived from message content.

The first approach is simpler to bolt on. The second is more robust. Production systems usually combine both: a deduplication layer as a fast path, with idempotent database operations as the safety net.

The TTL on the idempotency key matters. Set it too short, and late redeliveries slip through. Set it too long, and your store grows unbounded. Match it to your stream's max redelivery window — typically 24–72 hours.

The Consumer Core

Now let's build the consumer. The Config struct captures every tunable:

type Config struct {
    // NATS connection
    NATSUrl    string
    StreamName string
    Subject    string
    Durable    string // consumer group name

    // Concurrency
    WorkerCount int

    // Retry
    MaxRetries     int
    InitialBackoff time.Duration
    MaxBackoff     time.Duration
    BackoffFactor  float64

    // Idempotency
    IdempotencyTTL time.Duration

    // Observability
    ServiceName string
}

func DefaultConfig() Config {
    return Config{
        WorkerCount:    10,
        MaxRetries:     5,
        InitialBackoff: 1 * time.Second,
        MaxBackoff:     60 * time.Second,
        BackoffFactor:  2.0,
        IdempotencyTTL: 24 * time.Hour,
        ServiceName:    "queue-consumer",
    }
}
Enter fullscreen mode Exit fullscreen mode

The consumer struct ties everything together:

type Consumer struct {
    cfg        Config
    js         nats.JetStreamContext
    handler    Handler
    idemStore  IdempotencyStore
    metrics    *Metrics
    tracer     trace.Tracer
    logger     *slog.Logger
    sub        *nats.Subscription
    wg         sync.WaitGroup
    msgCh      chan *nats.Msg
    shutdownCh chan struct{}
}

func New(nc *nats.Conn, cfg Config, handler Handler, idemStore IdempotencyStore) (*Consumer, error) {
    js, err := nc.JetStream()
    if err != nil {
        return nil, fmt.Errorf("jetstream init: %w", err)
    }

    tp := otel.GetTracerProvider()
    meter := otel.GetMeterProvider().Meter(cfg.ServiceName)

    return &Consumer{
        cfg:        cfg,
        js:         js,
        handler:    handler,
        idemStore:  idemStore,
        metrics:    NewMetrics(meter),
        tracer:     tp.Tracer(cfg.ServiceName),
        logger:     slog.Default(),
        msgCh:      make(chan *nats.Msg, cfg.WorkerCount*2),
        shutdownCh: make(chan struct{}),
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

Retry with Exponential Backoff

The retry logic lives in the message processing path. NATS JetStream supports server-side NakWithDelay, which tells the server to redeliver after a specified duration. This is vastly superior to client-side retry loops because it survives consumer restarts:

func (c *Consumer) processMessage(natsMsg *nats.Msg) {
    ctx, span := c.tracer.Start(context.Background(), "process_message",
        trace.WithAttributes(
            attribute.String("subject", natsMsg.Subject),
        ),
    )
    defer span.End()

    meta, err := natsMsg.Metadata()
    if err != nil {
        c.logger.Error("failed to read metadata", "error", err)
        natsMsg.Nak()
        return
    }

    attempt := int(meta.NumDelivered)
    msgID := natsMsg.Header.Get("Nats-Msg-Id")
    if msgID == "" {
        msgID = fmt.Sprintf("%s-%d", meta.Stream, meta.Sequence.Stream)
    }

    span.SetAttributes(
        attribute.String("message_id", msgID),
        attribute.Int("attempt", attempt),
    )

    // Idempotency check
    if exists, err := c.idemStore.Exists(ctx, msgID); err != nil {
        c.logger.Error("idempotency check failed", "error", err, "msg_id", msgID)
        span.RecordError(err)
        // Retry — the store might be temporarily down
        c.nakWithBackoff(natsMsg, attempt)
        return
    } else if exists {
        c.logger.Debug("duplicate message, skipping", "msg_id", msgID)
        c.metrics.duplicatesSkipped.Add(ctx, 1)
        natsMsg.Ack()
        return
    }

    msg := Message{
        ID:        msgID,
        Subject:   natsMsg.Subject,
        Data:      natsMsg.Data,
        Headers:   flattenHeaders(natsMsg.Header),
        Timestamp: meta.Timestamp,
        Attempt:   attempt,
    }

    // Process
    start := time.Now()
    err = c.handler(ctx, msg)
    duration := time.Since(start)

    c.metrics.processingDuration.Record(ctx, duration.Seconds(),
        metric.WithAttributes(attribute.Bool("success", err == nil)),
    )

    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
        c.metrics.processingErrors.Add(ctx, 1)

        if attempt >= c.cfg.MaxRetries {
            c.logger.Error("max retries exceeded, sending to DLQ",
                "msg_id", msgID, "error", err, "attempts", attempt,
            )
            c.sendToDLQ(ctx, natsMsg, err)
            natsMsg.Ack() // Ack original to stop redelivery
            return
        }

        c.logger.Warn("processing failed, scheduling retry",
            "msg_id", msgID, "error", err, "attempt", attempt,
        )
        c.nakWithBackoff(natsMsg, attempt)
        return
    }

    // Success — mark as processed and ack
    if err := c.idemStore.Mark(ctx, msgID, c.cfg.IdempotencyTTL); err != nil {
        c.logger.Error("failed to mark idempotency", "error", err, "msg_id", msgID)
        // Don't fail the message — it was processed successfully.
        // Worst case: a duplicate gets processed again idempotently.
    }

    c.metrics.messagesProcessed.Add(ctx, 1)
    natsMsg.Ack()
}

func (c *Consumer) nakWithBackoff(msg *nats.Msg, attempt int) {
    delay := c.cfg.InitialBackoff
    for i := 1; i < attempt; i++ {
        delay = time.Duration(float64(delay) * c.cfg.BackoffFactor)
        if delay > c.cfg.MaxBackoff {
            delay = c.cfg.MaxBackoff
            break
        }
    }
    msg.NakWithDelay(delay)
}
Enter fullscreen mode Exit fullscreen mode

Key decisions here:

  • Server-side retry via NakWithDelay — the consumer doesn't hold the message during backoff. If this consumer dies, another picks it up.
  • Ack after DLQ publish — we ack the original message to stop the redelivery loop. The DLQ is the new source of truth.
  • Idempotency mark happens after success — if the mark fails, the message might be reprocessed. That's fine because operations are idempotent.

Dead Letter Queue

Messages that fail all retries go to a separate stream. Your ops team reviews them, fixes the bug, and replays:

func (c *Consumer) sendToDLQ(ctx context.Context, original *nats.Msg, processErr error) {
    _, span := c.tracer.Start(ctx, "send_to_dlq")
    defer span.End()

    headers := nats.Header{}
    // Preserve original headers
    for k, v := range original.Header {
        headers[k] = v
    }
    headers.Set("X-DLQ-Error", processErr.Error())
    headers.Set("X-DLQ-Timestamp", time.Now().UTC().Format(time.RFC3339))
    headers.Set("X-Original-Subject", original.Subject)

    dlqMsg := &nats.Msg{
        Subject: fmt.Sprintf("dlq.%s", c.cfg.Subject),
        Data:    original.Data,
        Header:  headers,
    }

    if _, err := c.js.PublishMsg(dlqMsg); err != nil {
        c.logger.Error("failed to publish to DLQ", "error", err)
        span.RecordError(err)
        c.metrics.dlqFailures.Add(ctx, 1)
        // This is bad. The message will be lost after ack.
        // In practice, also log the full message body for manual recovery.
        c.logger.Error("LOST MESSAGE — manual recovery required",
            "subject", original.Subject,
            "data", string(original.Data),
        )
    }
    c.metrics.dlqMessages.Add(ctx, 1)
}
Enter fullscreen mode Exit fullscreen mode

Create the DLQ stream during setup:

func EnsureStreams(js nats.JetStreamContext, streamName, subject string) error {
    // Main stream
    _, err := js.AddStream(&nats.StreamConfig{
        Name:      streamName,
        Subjects:  []string{subject},
        Retention: nats.WorkQueuePolicy,
        MaxAge:    72 * time.Hour,
    })
    if err != nil {
        return fmt.Errorf("create main stream: %w", err)
    }

    // DLQ stream
    _, err = js.AddStream(&nats.StreamConfig{
        Name:      streamName + "_dlq",
        Subjects:  []string{"dlq." + subject},
        Retention: nats.LimitsPolicy,
        MaxAge:    30 * 24 * time.Hour, // Keep DLQ messages for 30 days
    })
    if err != nil {
        return fmt.Errorf("create DLQ stream: %w", err)
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Concurrency Control with Worker Pools

A single goroutine consuming messages wastes most of its time waiting on I/O. A worker pool lets you process N messages concurrently while keeping backpressure under control:

func (c *Consumer) Start(ctx context.Context) error {
    sub, err := c.js.PullSubscribe(
        c.cfg.Subject,
        c.cfg.Durable,
        nats.ManualAck(),
        nats.AckWait(30*time.Second),
        nats.MaxDeliver(c.cfg.MaxRetries+1),
    )
    if err != nil {
        return fmt.Errorf("subscribe: %w", err)
    }
    c.sub = sub

    // Start workers
    for i := 0; i < c.cfg.WorkerCount; i++ {
        c.wg.Add(1)
        go c.worker(i)
    }

    // Fetch loop — pulls messages from NATS and fans out to workers
    c.wg.Add(1)
    go c.fetchLoop(ctx)

    c.logger.Info("consumer started",
        "workers", c.cfg.WorkerCount,
        "subject", c.cfg.Subject,
        "durable", c.cfg.Durable,
    )
    return nil
}

func (c *Consumer) fetchLoop(ctx context.Context) {
    defer c.wg.Done()
    defer close(c.msgCh)

    for {
        select {
        case <-ctx.Done():
            return
        case <-c.shutdownCh:
            return
        default:
        }

        msgs, err := c.sub.Fetch(c.cfg.WorkerCount, nats.MaxWait(5*time.Second))
        if err != nil {
            if errors.Is(err, nats.ErrTimeout) {
                continue // No messages available, poll again
            }
            c.logger.Error("fetch error", "error", err)
            time.Sleep(time.Second) // Back off on unexpected errors
            continue
        }

        for _, msg := range msgs {
            select {
            case c.msgCh <- msg:
            case <-ctx.Done():
                return
            case <-c.shutdownCh:
                return
            }
        }
    }
}

func (c *Consumer) worker(id int) {
    defer c.wg.Done()

    c.logger.Debug("worker started", "worker_id", id)
    for msg := range c.msgCh {
        c.processMessage(msg)
    }
    c.logger.Debug("worker stopped", "worker_id", id)
}
Enter fullscreen mode Exit fullscreen mode

The Fetch batch size matches the worker count. This keeps the channel buffer from growing unbounded while ensuring every worker stays fed. The MaxWait on Fetch prevents tight-looping when the stream is empty.

Graceful Shutdown Integration

Following the patterns from article #14, we integrate with OS signals to drain in-flight work before exiting:

func (c *Consumer) Shutdown(ctx context.Context) error {
    c.logger.Info("shutting down consumer...")

    // Signal the fetch loop to stop
    close(c.shutdownCh)

    // Drain the subscription — no new messages will be delivered
    if c.sub != nil {
        if err := c.sub.Drain(); err != nil {
            c.logger.Error("subscription drain failed", "error", err)
        }
    }

    // Wait for in-flight messages to complete (or context deadline)
    done := make(chan struct{})
    go func() {
        c.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        c.logger.Info("consumer shutdown complete")
        return nil
    case <-ctx.Done():
        c.logger.Warn("shutdown timed out, some messages may be redelivered")
        return ctx.Err()
    }
}
Enter fullscreen mode Exit fullscreen mode

Wire it into main:

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    nc, _ := nats.Connect(nats.DefaultURL)
    defer nc.Close()

    cfg := consumer.DefaultConfig()
    cfg.NATSUrl = nats.DefaultURL
    cfg.StreamName = "ORDERS"
    cfg.Subject = "orders.process"
    cfg.Durable = "order-processor"

    store := &RedisIdempotencyStore{client: redis.NewClient(&redis.Options{Addr: "localhost:6379"})}

    c, err := consumer.New(nc, cfg, handleOrder, store)
    if err != nil {
        log.Fatal(err)
    }

    if err := c.Start(ctx); err != nil {
        log.Fatal(err)
    }

    <-ctx.Done()

    shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    c.Shutdown(shutdownCtx)
}

func handleOrder(ctx context.Context, msg consumer.Message) error {
    var order Order
    if err := json.Unmarshal(msg.Data, &order); err != nil {
        return fmt.Errorf("unmarshal order: %w", err) // Will retry, but won't help — consider a non-retryable error type
    }
    // Process the order...
    return nil
}
Enter fullscreen mode Exit fullscreen mode

A subtle point: the handleOrder comment hints at an important production refinement. Some errors are non-retryable — bad JSON, invalid business data, schema mismatches. Retrying won't help. A production consumer should distinguish these:

type PermanentError struct {
    Err error
}

func (e *PermanentError) Error() string { return e.Err.Error() }
func (e *PermanentError) Unwrap() error { return e.Err }

// In processMessage, before retry logic:
var permErr *PermanentError
if errors.As(err, &permErr) {
    c.logger.Error("permanent error, sending to DLQ immediately",
        "msg_id", msgID, "error", err,
    )
    c.sendToDLQ(ctx, natsMsg, err)
    natsMsg.Ack()
    return
}
Enter fullscreen mode Exit fullscreen mode

Observability: Metrics and Tracing

The metrics struct captures the four golden signals for a queue consumer:

type Metrics struct {
    messagesProcessed metric.Int64Counter
    processingErrors  metric.Int64Counter
    processingDuration metric.Float64Histogram
    duplicatesSkipped metric.Int64Counter
    dlqMessages       metric.Int64Counter
    dlqFailures       metric.Int64Counter
    inflightMessages  metric.Int64UpDownCounter
}

func NewMetrics(meter metric.Meter) *Metrics {
    m := &Metrics{}
    m.messagesProcessed, _ = meter.Int64Counter("consumer.messages.processed",
        metric.WithDescription("Total messages successfully processed"))
    m.processingErrors, _ = meter.Int64Counter("consumer.messages.errors",
        metric.WithDescription("Total processing errors"))
    m.processingDuration, _ = meter.Float64Histogram("consumer.messages.duration_seconds",
        metric.WithDescription("Message processing duration"),
        metric.WithExplicitBucketBoundaries(0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30))
    m.duplicatesSkipped, _ = meter.Int64Counter("consumer.messages.duplicates",
        metric.WithDescription("Duplicate messages skipped"))
    m.dlqMessages, _ = meter.Int64Counter("consumer.dlq.sent",
        metric.WithDescription("Messages sent to DLQ"))
    m.dlqFailures, _ = meter.Int64Counter("consumer.dlq.failures",
        metric.WithDescription("Failed DLQ publishes — potential message loss"))
    m.inflightMessages, _ = meter.Int64UpDownCounter("consumer.messages.inflight",
        metric.WithDescription("Currently processing messages"))
    return m
}
Enter fullscreen mode Exit fullscreen mode

The dlq.failures counter deserves an alert. If this fires, you're losing messages. Set a PagerDuty threshold at > 0.

For tracing, the spans we set in processMessage create a trace per message. To connect producer and consumer traces, propagate the trace context through NATS headers:

// Producer side
func publishWithTrace(ctx context.Context, js nats.JetStreamContext, subject string, data []byte) error {
    msg := &nats.Msg{
        Subject: subject,
        Data:    data,
        Header:  nats.Header{},
    }
    otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(msg.Header))
    _, err := js.PublishMsg(msg)
    return err
}

// Consumer side — in processMessage, replace context.Background():
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(natsMsg.Header))
ctx, span := c.tracer.Start(ctx, "process_message", ...)
Enter fullscreen mode Exit fullscreen mode

Now your traces show the full journey: API request → publish → consume → process → downstream calls. This is indispensable for debugging latency in async pipelines.

Testing Strategies

Queue consumers are notoriously hard to test. Here's a layered approach.

Unit test the handler in isolation — no queue, no infrastructure:

func TestHandleOrder_Success(t *testing.T) {
    msg := consumer.Message{
        ID:   "test-1",
        Data: []byte(`{"id":"order-123","amount":99.99}`),
    }

    err := handleOrder(context.Background(), msg)
    assert.NoError(t, err)
    // Assert side effects: database writes, API calls, etc.
}

func TestHandleOrder_InvalidJSON(t *testing.T) {
    msg := consumer.Message{
        ID:   "test-2",
        Data: []byte(`not json`),
    }

    err := handleOrder(context.Background(), msg)
    assert.Error(t, err)

    var permErr *consumer.PermanentError
    assert.True(t, errors.As(err, &permErr), "bad JSON should be a permanent error")
}
Enter fullscreen mode Exit fullscreen mode

Integration test with an embedded NATS server — tests the full consumer lifecycle:

func TestConsumer_ProcessAndAck(t *testing.T) {
    // Start embedded NATS
    srv, _ := server.NewServer(&server.Options{
        Port:      -1,
        JetStream: true,
        StoreDir:  t.TempDir(),
    })
    go srv.Start()
    defer srv.Shutdown()
    srv.ReadyForConnections(5 * time.Second)

    nc, _ := nats.Connect(srv.ClientURL())
    defer nc.Close()

    js, _ := nc.JetStream()
    consumer.EnsureStreams(js, "TEST", "test.subject")

    processed := make(chan string, 1)
    handler := func(ctx context.Context, msg consumer.Message) error {
        processed <- msg.ID
        return nil
    }

    store := &InMemoryIdempotencyStore{seen: map[string]bool{}}
    cfg := consumer.DefaultConfig()
    cfg.StreamName = "TEST"
    cfg.Subject = "test.subject"
    cfg.Durable = "test-consumer"
    cfg.WorkerCount = 1

    c, _ := consumer.New(nc, cfg, handler, store)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    c.Start(ctx)

    // Publish a message
    js.Publish("test.subject", []byte(`{"key":"value"}`),
        nats.MsgId("msg-001"))

    select {
    case id := <-processed:
        assert.Equal(t, "msg-001", id)
    case <-time.After(5 * time.Second):
        t.Fatal("message not processed within timeout")
    }
}

func TestConsumer_DuplicateSkipped(t *testing.T) {
    // Same setup as above, but pre-mark the message ID
    store := &InMemoryIdempotencyStore{seen: map[string]bool{"msg-001": true}}
    // ... publish msg-001, assert it gets acked but handler is NOT called
}

func TestConsumer_RetryThenDLQ(t *testing.T) {
    // Handler returns error every time.
    // Assert message appears in DLQ stream after MaxRetries.
}
Enter fullscreen mode Exit fullscreen mode

Chaos test — kill the consumer mid-processing, restart, and verify no messages are lost:

func TestConsumer_CrashRecovery(t *testing.T) {
    // 1. Publish 100 messages
    // 2. Start consumer, let it process ~50
    // 3. Cancel context (simulates crash)
    // 4. Start a new consumer with the same durable name
    // 5. Wait for all 100 to be processed (some may be processed twice)
    // 6. Assert: handler was called >= 100 times, all message IDs covered
}
Enter fullscreen mode Exit fullscreen mode

The in-memory idempotency store for tests:

type InMemoryIdempotencyStore struct {
    mu   sync.Mutex
    seen map[string]bool
}

func (s *InMemoryIdempotencyStore) Exists(_ context.Context, id string) (bool, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.seen[id], nil
}

func (s *InMemoryIdempotencyStore) Mark(_ context.Context, id string, _ time.Duration) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.seen[id] = true
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Production Checklist

Before shipping your consumer:

  • [ ] Idempotency — handler is safe to run twice on the same input
  • [ ] Permanent vs transient errors — bad data goes straight to DLQ, not through the retry loop
  • [ ] Backoff boundsMaxBackoff prevents retry storms; jitter is even better
  • [ ] DLQ monitoring — alert on dlq.sent > 0 and page on dlq.failures > 0
  • [ ] Graceful shutdown — drain subscription, wait for in-flight, then exit
  • [ ] Health check — expose /healthz that checks NATS and idempotency store connectivity
  • [ ] Consumer lag metric — monitor consumer.messages.pending via NATS admin API
  • [ ] Resource limits — bound worker count, channel buffer, and message size
  • [ ] Trace propagation — connect producer and consumer spans for end-to-end visibility

Wrapping Up

A message queue consumer is deceptively simple to prototype and deceptively hard to get right in production. The patterns in this article — idempotent processing, server-side retry with backoff, dead letter queues, bounded worker pools, graceful shutdown, and full observability — form a foundation that handles the failures real systems encounter.

The complete consumer is around 300 lines of focused Go code. No frameworks, no magic. Just clear concurrency patterns and deliberate error handling. That's the kind of code you want running at 3 AM.

Next in the series, we'll look at building a production-ready rate limiter using Redis and sliding window counters. Stay tuned.


If this article helped you, consider buying me a coffee on Ko-fi! Follow me for more production backend patterns.

Top comments (0)