DEV Community

Cover image for A Worker Pool That Actually Drains on Shutdown: A Pattern for Go Services
Gabriel Anhaia
Gabriel Anhaia

Posted on

A Worker Pool That Actually Drains on Shutdown: A Pattern for Go Services


A team I worked with ran a Go service that consumed a queue, did some computation per message, and wrote results back to Postgres. The shutdown code looked clean. Channel close, wg.Wait(), exit zero. They had an integration test that drained the queue and confirmed every message was processed.

Then Kubernetes rolled the deployment.

About one in fifty messages came back later with a duplicate-key error from Postgres. Some never came back at all. The integration test still passed. The on-call engineer spent a Saturday correlating pod-termination timestamps against message IDs before the shape became clear: every lost or duplicated message was in flight at the moment SIGTERM arrived. The clean shutdown path was fine. The mid-flight shutdown path did not exist.

This is one of the most common Go-service bugs in production code, and the fix is small. You need a worker pool that knows the difference between "the input is done" and "the platform is killing us in about thirty seconds". Both are shutdown. The pool has to handle both.

What defer wg.Wait() actually promises

The textbook worker pool looks like this:

func run(jobs <-chan Job, n int) {
    var wg sync.WaitGroup
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := range jobs {
                process(j)
            }
        }()
    }
    wg.Wait()
}
Enter fullscreen mode Exit fullscreen mode

It works for one shape of shutdown: somebody upstream closes jobs, every worker drains the channel to empty, every worker returns, Wait returns. Clean.

It does nothing for SIGTERM. The signal handler usually sits in main, not in run. When SIGTERM arrives, main typically calls some Stop() method or cancels a context the workers do not look at. The workers keep pulling from jobs. The grace period elapses and the kubelet sends SIGKILL. Whatever was inside process(j) at that moment is gone, and the queue redelivers it because it never got acked.

The fix is not "add a context check inside process". You need that, but it is not enough. You need a top-level shape where the pool itself owns three states: accepting work, draining in-flight work, force-cancelled.

The three states a real pool needs

Stop accepting new work. Wait for in-flight items to finish. Give up after a deadline.

Each state has a clear trigger:

  • Accepting flips to draining when SIGTERM arrives or the supervisor calls Shutdown.
  • Draining flips to cancelled when either the in-flight workers finish (good) or the deadline passes (bad).

The pool exposes one method that does this: Shutdown(ctx). The caller passes a context with whatever grace period the platform gave them. Kubernetes defaults to 30 seconds (terminationGracePeriodSeconds); ECS defaults to 30 seconds (stopTimeout); systemd defaults to 90 (DefaultTimeoutStopSec). Whatever your number is, that goes on the context.

A pool that drains

Here is the whole thing. About 60 lines of pool code plus a small main to wire it up.

package pool

import (
    "context"
    "sync"

    "golang.org/x/sync/errgroup"
)

type Job func(ctx context.Context) error

type Pool struct {
    jobs     chan Job
    shutting chan struct{}
    eg       *errgroup.Group
    egCtx    context.Context
    cancel   context.CancelFunc

    shutOnce sync.Once
}

func New(parent context.Context, workers, buf int) *Pool {
    ctx, cancel := context.WithCancel(parent)
    eg, egCtx := errgroup.WithContext(ctx)
    p := &Pool{
        jobs:     make(chan Job, buf),
        shutting: make(chan struct{}),
        eg:       eg,
        egCtx:    egCtx,
        cancel:   cancel,
    }
    for i := 0; i < workers; i++ {
        eg.Go(p.worker)
    }
    return p
}

func (p *Pool) worker() error {
    for j := range p.jobs {
        if err := j(p.egCtx); err != nil {
            return err
        }
    }
    return nil
}

func (p *Pool) Submit(ctx context.Context, j Job) error {
    select {
    case <-p.shutting:
        return context.Canceled
    case <-p.egCtx.Done():
        return p.egCtx.Err()
    case <-ctx.Done():
        return ctx.Err()
    case p.jobs <- j:
        return nil
    }
}

func (p *Pool) Shutdown(ctx context.Context) error {
    p.shutOnce.Do(func() {
        close(p.shutting)
        close(p.jobs)
    })

    done := make(chan error, 1)
    go func() { done <- p.eg.Wait() }()

    select {
    case err := <-done:
        return err
    case <-ctx.Done():
        p.cancel()
        return ctx.Err()
    }
}
Enter fullscreen mode Exit fullscreen mode

Read it from the outside in. Submit either places a job on the buffered channel or refuses if the pool is shutting down or the caller's context cancelled. The shutting channel closes before close(p.jobs), so any Submit already in its select sees the shutdown case win and bails out instead of racing into case p.jobs <- j: and panicking on a closed channel.

Shutdown does three things in order. It closes shutting (new submitters refuse immediately), then closes jobs exactly once, which lets every worker's for j := range p.jobs loop exit naturally after the buffer drains. It then waits for the errgroup to finish. The wait runs in its own goroutine because Shutdown itself has to honour the deadline on ctx. If that deadline fires before workers finish, p.cancel() cancels egCtx, every job that is checking ctx inside Job now sees Done(), and the workers can bail out fast. Shutdown returns ctx.Err() on the timeout path; the caller is expected to exit the process shortly after, which lets the runtime tear down any goroutines still wedged on uncancellable I/O.

The errgroup is doing real work here. If a job returns an error, errgroup.WithContext cancels egCtx for every other job, so a poisoned job kills the pool fast. That is what you want in production.

Wiring it into a real service

The signal handler sits in main. It owns the deadline.

func main() {
    ctx, cancel := signal.NotifyContext(
        context.Background(),
        syscall.SIGTERM, syscall.SIGINT,
    )
    defer cancel()

    p := pool.New(ctx, 8, 32)

    go feed(ctx, p) // feed pulls from the broker and calls p.Submit

    <-ctx.Done()

    grace, cancelG := context.WithTimeout(
        context.Background(), 25*time.Second,
    )
    defer cancelG()

    if err := p.Shutdown(grace); err != nil {
        slog.Error("drain incomplete", "err", err)
        cancelG()
        return
    }
}
Enter fullscreen mode Exit fullscreen mode

The shape worth noticing: signal.NotifyContext cancels the parent context the moment SIGTERM lands. The feeder loop and any Submit calls see that and stop adding work. The grace context is a fresh context derived from Background, not from ctx. If you derive it from the cancelled ctx, your "deadline" is already done, the drain returns instantly, and you ship the same bug. Note also that the failure path returns rather than calling os.Exit(1), because os.Exit skips deferred cancelG().

The grace value is whatever your platform promises minus a buffer for the rest of shutdown. Kubernetes' default terminationGracePeriodSeconds is 30; reserving five seconds for connection close, log flush, and the kubelet's SIGKILL leaves 25 for the pool. Tune to your numbers.

Testing the shutdown path

The reason this bug ships is that nobody tests it. The standard pool test submits N jobs, closes the channel, and verifies all N ran. That tests the clean path. The mid-flight path needs a different fixture, and the cancel path needs a third one.

func TestShutdownDrainsInFlight(t *testing.T) {
    p := pool.New(context.Background(), 4, 8)

    started := make(chan struct{}, 4)
    release := make(chan struct{})
    var done atomic.Int32

    for i := 0; i < 4; i++ {
        err := p.Submit(
            context.Background(),
            func(ctx context.Context) error {
                started <- struct{}{}
                select {
                case <-release:
                    done.Add(1)
                    return nil
                case <-ctx.Done():
                    return ctx.Err()
                }
            },
        )
        if err != nil {
            t.Fatal(err)
        }
    }

    for i := 0; i < 4; i++ {
        <-started
    }

    shutdownDone := make(chan error, 1)
    go func() {
        ctx, c := context.WithTimeout(
            context.Background(),
            500*time.Millisecond,
        )
        defer c()
        shutdownDone <- p.Shutdown(ctx)
    }()

    close(release)

    if err := <-shutdownDone; err != nil {
        t.Fatalf("drain: %v", err)
    }
    if got := done.Load(); got != 4 {
        t.Fatalf("want 4 done, got %d", got)
    }
}
Enter fullscreen mode Exit fullscreen mode

The fixture pins four workers in a job that blocks on release or honours ctx. Shutdown runs concurrently with a 500ms deadline. We unblock the workers right after; they finish, Shutdown returns nil, all four jobs incremented done. The cancel path is the same fixture, run without unblocking release:

func TestShutdownCancelsOnDeadline(t *testing.T) {
    p := pool.New(context.Background(), 4, 8)

    started := make(chan struct{}, 4)
    release := make(chan struct{})
    var done atomic.Int32

    for i := 0; i < 4; i++ {
        err := p.Submit(
            context.Background(),
            func(ctx context.Context) error {
                started <- struct{}{}
                select {
                case <-release:
                    done.Add(1)
                    return nil
                case <-ctx.Done():
                    return ctx.Err()
                }
            },
        )
        if err != nil {
            t.Fatal(err)
        }
    }
    for i := 0; i < 4; i++ {
        <-started
    }

    ctx, c := context.WithTimeout(
        context.Background(),
        100*time.Millisecond,
    )
    defer c()

    if err := p.Shutdown(ctx); err != context.DeadlineExceeded {
        t.Fatalf("want DeadlineExceeded, got %v", err)
    }
    if got := done.Load(); got != 0 {
        t.Fatalf("want 0 done, got %d", got)
    }
}
Enter fullscreen mode Exit fullscreen mode

Because the job honours ctx, when Shutdown's deadline fires and p.cancel() runs, every worker sees ctx.Done() and returns. The cancel path returns context.DeadlineExceeded, done stays at zero, and no goroutine is leaked.

That second test is the one most teams skip. It is the one that actually proves the deadline works.

Why this lives at the architectural seam

Graceful shutdown is not a worker-pool concern alone. The HTTP server has its own Shutdown(ctx). The database pool has its own close. The message broker has its own ack. Each one needs the same grace context, derived from the same SIGTERM moment, and they need to drain in the right order: stop pulling from the broker before you stop processing, stop accepting HTTP before you close the DB.

In a hexagonal layout, that orchestration sits at the application boundary. The domain does not know about SIGTERM. The adapters do. The composition root in main is the only place that has both the grace deadline and the list of every resource that owns a drain. Everything else exposes a Shutdown(ctx) method that obeys the deadline it is handed.

The reason you feel the pain when you skip this is that Postgres rolls back uncommitted transactions on connection close, the broker redelivers unacked messages, and the load balancer sometimes has not finished evicting your pod before the platform pulls the plug. Those are exactly the contracts those systems advertise. The gap is your code not understanding how much time it has and what to do with it.

Drain on purpose, with a deadline, and with a test that proves the deadline works.


If this saved you a Saturday

Graceful shutdown sits at the seam between the platform, the adapters, and the domain. That kind of cross-layer concern gets messy in a service without explicit boundaries. Hexagonal Architecture in Go walks through where these orchestration responsibilities live, why the composition root in main is the only place that should know about SIGTERM, and how to keep your domain code free of platform concerns while still draining cleanly when the platform asks.

If your day job is shipping Go alongside an AI coding assistant, Hermes IDE is the editor I build for that workflow. It is built for the loop where the AI is reading and editing your Go code with you, not at you.

Thinking in Go — the 2-book series on Go programming and hexagonal architecture

Top comments (0)