Concurrency is one of Go’s killer features. But writing a bunch of goroutines and channels is one thing — doing it in a maintainable, efficient, and safe way is another. In this article, we’ll explore three core concurrency patterns in Go:
- Worker Pool
- Fan-In / Fan-Out
- Pipeline
By the end, you’ll see how to combine them to build robust, high-performance concurrent systems — and avoid common pitfalls along the way.
Why Patterns Matter
When you just “sprinkle goroutines everywhere,” you might face:
- Resource explosion (too many goroutines)
- Uncontrolled concurrency leading to contention
- Complex coordination logic
- Deadlocks, goroutine leaks, race conditions
Patterns give structure. They let you reason about data flow, lifetime, synchronization, and cancellation more clearly. They also help in composing concurrency primitives in scalable ways.
Background: Go Primitives Recap
Before diving in, a quick refresher:
- Goroutine: lightweight thread, multiplexed by Go runtime
- Channel: typed conduit for communication between goroutines
- select: lets you wait on multiple channel operations
- sync.WaitGroup: wait for a set of goroutines to finish
-
close(channel): signals no more sends (receivers can
range
)
Also, note that channels must be closed exactly once, and only by the sender side. Mis-closing or double closing is a runtime panic.
Worker Pool Pattern
What & Why
A worker pool is about controlling concurrency. Instead of spawning a new goroutine per task, you have N workers (fixed count) that pull tasks from a queue. This ensures:
- Bounded concurrency
- Better resource usage
- Easier error handling & shutdown
Implementation Sketch
type Job struct {
ID int
Payload interface{}
}
func worker(id int, jobs <-chan Job, results chan<- interface{}, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
// process job
fmt.Printf("worker %d: processing job %d\n", id, job.ID)
// simulate
time.Sleep(100 * time.Millisecond)
// send result
results <- fmt.Sprintf("job %d done by worker %d", job.ID, id)
}
}
func RunWorkerPool(numWorkers int, jobsList []Job) []interface{} {
jobs := make(chan Job)
results := make(chan interface{})
var wg sync.WaitGroup
// start workers
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// feed jobs
go func() {
defer close(jobs)
for _, job := range jobsList {
jobs <- job
}
}()
// collect results in background
go func() {
wg.Wait()
close(results)
}()
var out []interface{}
for res := range results {
out = append(out, res)
}
return out
}
Notes & Considerations
- Always close the jobs channel once you’re done sending.
- Use
WaitGroup
to know when all workers finish. - Close the results channel only after all workers are done.
- Be careful: panic inside a worker can crash the goroutine. You might want to recover inside workers or propagate errors.
- You might want bounded job queues (buffered channels) to handle load spikes.
Fan-Out / Fan-In Pattern
Worker Pool is a special case of fan-out / fan-in in many systems. Let’s generalize.
Concept
- Fan-Out: take one input and distribute it across multiple goroutines / channels
- Fan-In: merge multiple input channels into one output
Go’s official blog covers this in the Pipelines & Cancellation article.
Implementation Examples
Fan-Out
You have a single channel of tasks. You spawn multiple worker goroutines reading from that same channel:
func fanOut(tasks <-chan int, numWorkers int) []<-chan int {
outs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
out := make(chan int)
go func() {
defer close(out)
for t := range tasks {
// do something, maybe transform
out <- t * 2
}
}()
outs[i] = out
}
return outs
}
Fan-In (Merge)
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
With merge
, you can combine multiple worker outputs into a single stream. The Go blog shows exactly this pattern when combining two sq
stages.
Combined Example (Fan-Out + Fan-In)
in := gen(2, 3) // generator
c1 := sq(in)
c2 := sq(in)
for n := range merge(c1, c2) {
fmt.Println(n)
}
This is exactly fan-out (two sq
workers working from same input) and fan-in (merge results).
Pitfalls & Tips
- Order is not preserved — results may come in any order.
- Must ensure close semantics are correct — merging must close after all sources are done.
- Use WaitGroup to synchronize closings.
- If one stage stops early (e.g. due to error), downstream goroutines may hang waiting on channels — you may need cancellation (using
context
, done channels, etc.). - In more advanced recipes, include a
done
/ cancellation channel in merge loops to prematurely abort merging.
Pipeline Pattern
What is Pipeline?
A pipeline is a chain of stages, where each stage transforms data and passes it downstream via channels. Think of Unix pipes (|). Each stage is a function:
stage1 -> stage2 -> stage3 -> … -> output
Each stage:
- Runs in its own goroutine
- Reads from input channel
- Writes to output channel
- Closes its own output channel when done
Simple Pipeline Example (Squares + Add)
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func add(in <-chan int, delta int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n + delta
}
}()
return out
}
func main() {
c := gen(1, 2, 3, 4)
c2 := sq(c)
c3 := add(c2, 10)
for res := range c3 {
fmt.Println(res)
}
}
This is a classic example from the Go blog.
You can also spawn multiple instances of a stage (parallelize a stage) using fan-out + fan-in inside the pipeline.
Advantages & Tradeoffs
Pros:
- Cleaner separation of concerns
- Modular, composable stages
- Transparent flow of data
- Easy to reason about each stage
Cons / Considerations:
- Each channel introduces overhead (latency, memory)
- If stages are uneven in load, some will be bottlenecks
- Channels need adequate buffering or backpressure
- Closing channel logic must be precise
- Cancellation propagation important (if downstream decides to stop, upstream should stop sending)
To handle early cancellation, many pipeline designs accept a done <-chan struct{}
or context.Context
to signal termination.
Real-World Use Case: Log Processing Pipeline
Imagine you have logs streaming in, and you want to:
- Filter undesired entries
- Transform / enrich them
- Group / batch them
- Write to external sink (file, DB, remote service)
You can build:
- A gen stage (reads raw logs)
- A pool of filter workers (fan-out)
- Merge filtered logs (fan-in)
- A transform stage
- A batcher stage
- A sink stage
You’d combine pipeline + worker pools + fan-in/fan-out. You’d also include cancellation / error propagation so if the sink fails, upstream stops.
Sketch:
gen -> filterPool -> merge -> transform -> batch -> write
Each arrow is a channel; filterPool is multiple filter workers.
Performance & Profiling Tips
- Use
pprof
andruntime/trace
to inspect CPU / goroutine behavior - Benchmark with varying worker counts, channel buffer sizes
- Look for blocked goroutines in traces
- Identify stages that stay idle or saturated
- Tune buffer sizes, splitting of work, and pipeline depth
Common Pitfalls & Gotchas
Problem | Cause / Mistake | Solution / Prevention |
---|---|---|
Deadlock / hang | Channel not closed or goroutines waiting on send/receive | Always close channels carefully, use WaitGroup , propagate cancellation |
Goroutine leaks | A goroutine stuck waiting on a channel that never closes | Use done channel / context to abort, ensure all paths lead to exit |
Data races | Shared mutable state across goroutines without synchronization | Avoid shared state; use channels, or use sync.Mutex / atomic properly |
Panic in worker | Uncaught panic kills the goroutine silently | Recover inside worker or bubble error channel |
Too many goroutines | Fan-out without bounds | Limit concurrency with pools, semaphores, or bounded buffers |
Also, in large systems, you may want to adopt structured concurrency (so that lifetimes of goroutines are tied and cancelable) — but Go doesn't provide that out-of-the-box, so you can simulate with context
and careful orchestration.
Summary & Advice
- Use Worker Pool when you have many tasks and want to bound concurrency
- Use Fan-Out / Fan-In to distribute and collect in parallel
- Use Pipeline to build clear, stage-based data flows
- Combine them when needed (e.g. pipeline stages using worker pools)
- Always handle cancellation and shutdown cleanly
- Profile, monitor, and tune — concurrency introduces new bottlenecks
Top comments (0)