DEV Community

Cover image for Go Concurrency Patterns: Worker Pool, Fan-In / Fan-Out & Pipeline
Serif COLAKEL
Serif COLAKEL

Posted on

Go Concurrency Patterns: Worker Pool, Fan-In / Fan-Out & Pipeline

Concurrency is one of Go’s killer features. But writing a bunch of goroutines and channels is one thing — doing it in a maintainable, efficient, and safe way is another. In this article, we’ll explore three core concurrency patterns in Go:

  • Worker Pool
  • Fan-In / Fan-Out
  • Pipeline

By the end, you’ll see how to combine them to build robust, high-performance concurrent systems — and avoid common pitfalls along the way.


Why Patterns Matter

When you just “sprinkle goroutines everywhere,” you might face:

  • Resource explosion (too many goroutines)
  • Uncontrolled concurrency leading to contention
  • Complex coordination logic
  • Deadlocks, goroutine leaks, race conditions

Patterns give structure. They let you reason about data flow, lifetime, synchronization, and cancellation more clearly. They also help in composing concurrency primitives in scalable ways.


Background: Go Primitives Recap

Before diving in, a quick refresher:

  • Goroutine: lightweight thread, multiplexed by Go runtime
  • Channel: typed conduit for communication between goroutines
  • select: lets you wait on multiple channel operations
  • sync.WaitGroup: wait for a set of goroutines to finish
  • close(channel): signals no more sends (receivers can range)

Also, note that channels must be closed exactly once, and only by the sender side. Mis-closing or double closing is a runtime panic.


Worker Pool Pattern

What & Why

A worker pool is about controlling concurrency. Instead of spawning a new goroutine per task, you have N workers (fixed count) that pull tasks from a queue. This ensures:

  • Bounded concurrency
  • Better resource usage
  • Easier error handling & shutdown

Implementation Sketch

type Job struct {
    ID      int
    Payload interface{}
}

func worker(id int, jobs <-chan Job, results chan<- interface{}, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // process job
        fmt.Printf("worker %d: processing job %d\n", id, job.ID)
        // simulate
        time.Sleep(100 * time.Millisecond)
        // send result
        results <- fmt.Sprintf("job %d done by worker %d", job.ID, id)
    }
}

func RunWorkerPool(numWorkers int, jobsList []Job) []interface{} {
    jobs := make(chan Job)
    results := make(chan interface{})
    var wg sync.WaitGroup

    // start workers
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // feed jobs
    go func() {
        defer close(jobs)
        for _, job := range jobsList {
            jobs <- job
        }
    }()

    // collect results in background
    go func() {
        wg.Wait()
        close(results)
    }()

    var out []interface{}
    for res := range results {
        out = append(out, res)
    }
    return out
}
Enter fullscreen mode Exit fullscreen mode

Notes & Considerations

  • Always close the jobs channel once you’re done sending.
  • Use WaitGroup to know when all workers finish.
  • Close the results channel only after all workers are done.
  • Be careful: panic inside a worker can crash the goroutine. You might want to recover inside workers or propagate errors.
  • You might want bounded job queues (buffered channels) to handle load spikes.

Fan-Out / Fan-In Pattern

Worker Pool is a special case of fan-out / fan-in in many systems. Let’s generalize.

Concept

  • Fan-Out: take one input and distribute it across multiple goroutines / channels
  • Fan-In: merge multiple input channels into one output

Go’s official blog covers this in the Pipelines & Cancellation article.

Implementation Examples

Fan-Out

You have a single channel of tasks. You spawn multiple worker goroutines reading from that same channel:

func fanOut(tasks <-chan int, numWorkers int) []<-chan int {
    outs := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        out := make(chan int)
        go func() {
            defer close(out)
            for t := range tasks {
                // do something, maybe transform
                out <- t * 2
            }
        }()
        outs[i] = out
    }
    return outs
}
Enter fullscreen mode Exit fullscreen mode

Fan-In (Merge)

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <-chan int) {
        defer wg.Done()
        for v := range c {
            out <- v
        }
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
Enter fullscreen mode Exit fullscreen mode

With merge, you can combine multiple worker outputs into a single stream. The Go blog shows exactly this pattern when combining two sq stages.

Combined Example (Fan-Out + Fan-In)

in := gen(2, 3)  // generator
c1 := sq(in)
c2 := sq(in)

for n := range merge(c1, c2) {
    fmt.Println(n)
}
Enter fullscreen mode Exit fullscreen mode

This is exactly fan-out (two sq workers working from same input) and fan-in (merge results).

Pitfalls & Tips

  • Order is not preserved — results may come in any order.
  • Must ensure close semantics are correct — merging must close after all sources are done.
  • Use WaitGroup to synchronize closings.
  • If one stage stops early (e.g. due to error), downstream goroutines may hang waiting on channels — you may need cancellation (using context, done channels, etc.).
  • In more advanced recipes, include a done / cancellation channel in merge loops to prematurely abort merging.

Pipeline Pattern

What is Pipeline?

A pipeline is a chain of stages, where each stage transforms data and passes it downstream via channels. Think of Unix pipes (|). Each stage is a function:

stage1 -> stage2 -> stage3 -> … -> output
Enter fullscreen mode Exit fullscreen mode

Each stage:

  • Runs in its own goroutine
  • Reads from input channel
  • Writes to output channel
  • Closes its own output channel when done

Simple Pipeline Example (Squares + Add)

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func add(in <-chan int, delta int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n + delta
        }
    }()
    return out
}

func main() {
    c := gen(1, 2, 3, 4)
    c2 := sq(c)
    c3 := add(c2, 10)
    for res := range c3 {
        fmt.Println(res)
    }
}
Enter fullscreen mode Exit fullscreen mode

This is a classic example from the Go blog.

You can also spawn multiple instances of a stage (parallelize a stage) using fan-out + fan-in inside the pipeline.

Advantages & Tradeoffs

Pros:

  • Cleaner separation of concerns
  • Modular, composable stages
  • Transparent flow of data
  • Easy to reason about each stage

Cons / Considerations:

  • Each channel introduces overhead (latency, memory)
  • If stages are uneven in load, some will be bottlenecks
  • Channels need adequate buffering or backpressure
  • Closing channel logic must be precise
  • Cancellation propagation important (if downstream decides to stop, upstream should stop sending)

To handle early cancellation, many pipeline designs accept a done <-chan struct{} or context.Context to signal termination.


Real-World Use Case: Log Processing Pipeline

Imagine you have logs streaming in, and you want to:

  1. Filter undesired entries
  2. Transform / enrich them
  3. Group / batch them
  4. Write to external sink (file, DB, remote service)

You can build:

  • A gen stage (reads raw logs)
  • A pool of filter workers (fan-out)
  • Merge filtered logs (fan-in)
  • A transform stage
  • A batcher stage
  • A sink stage

You’d combine pipeline + worker pools + fan-in/fan-out. You’d also include cancellation / error propagation so if the sink fails, upstream stops.

Sketch:

gen -> filterPool -> merge -> transform -> batch -> write
Enter fullscreen mode Exit fullscreen mode

Each arrow is a channel; filterPool is multiple filter workers.


Performance & Profiling Tips

  • Use pprof and runtime/trace to inspect CPU / goroutine behavior
  • Benchmark with varying worker counts, channel buffer sizes
  • Look for blocked goroutines in traces
  • Identify stages that stay idle or saturated
  • Tune buffer sizes, splitting of work, and pipeline depth

Common Pitfalls & Gotchas

Problem Cause / Mistake Solution / Prevention
Deadlock / hang Channel not closed or goroutines waiting on send/receive Always close channels carefully, use WaitGroup, propagate cancellation
Goroutine leaks A goroutine stuck waiting on a channel that never closes Use done channel / context to abort, ensure all paths lead to exit
Data races Shared mutable state across goroutines without synchronization Avoid shared state; use channels, or use sync.Mutex / atomic properly
Panic in worker Uncaught panic kills the goroutine silently Recover inside worker or bubble error channel
Too many goroutines Fan-out without bounds Limit concurrency with pools, semaphores, or bounded buffers

Also, in large systems, you may want to adopt structured concurrency (so that lifetimes of goroutines are tied and cancelable) — but Go doesn't provide that out-of-the-box, so you can simulate with context and careful orchestration.


Summary & Advice

  • Use Worker Pool when you have many tasks and want to bound concurrency
  • Use Fan-Out / Fan-In to distribute and collect in parallel
  • Use Pipeline to build clear, stage-based data flows
  • Combine them when needed (e.g. pipeline stages using worker pools)
  • Always handle cancellation and shutdown cleanly
  • Profile, monitor, and tune — concurrency introduces new bottlenecks

Further Reading

Top comments (0)