Problems this pattern can solve:
- 1. If your service makes 1,000 queries per second to a database, but it can only handle 100, a worker pool will protect the database from crashing.
- In the event of a sudden traffic spike, using go func() for each request could spawn 100,500 goroutines and consume all your memory. The pool limits concurrency.
- If you have a pool of socket connections or file descriptors, a worker pool ensures you don't exceed the OS limit.
The Essence:
We create a fixed number of goroutines (workers) that are started in advance and wait for tasks. The main goroutine (the dispatcher) puts tasks into a channel (the task queue). The workers concurrently take these tasks from the channel and execute them. They can send the results back through another channel.
The Idea: Limiting the number of concurrently executed operations and reusing goroutines.
Difference between Worker Pool and other approaches:
go func() (Spawning raw goroutines):
- Cons compared to Worker Pool: Uncontrolled growth in the number of goroutines can lead to resource exhaustion (memory, file descriptors) and a panic. There is no control over concurrency.
- Pros compared to Worker Pool: Simpler to write, lower startup latency (no need to wait for a free worker).
- When to use it: For handling signals, very lightweight tasks, or when the load is guaranteed to be low.
Pipeline:
- How it's different: This is about sequential processing. Data flows through a chain of stages, where each stage is executed by its own goroutine (or pool), connected by channels.
- Example:
stage1 (generate) -> stage2 (multiply) -> stage3 (save) - Cons compared to Worker Pool: More difficult to cancel and handle errors; the throughput is limited by the slowest stage.
- Pros compared to Worker Pool: Ideal for tasks that can be broken down into distinct, independent processing steps.
Semaphore:
- How it's different: A semaphore is a synchronization primitive used to limit access to a resource. You still spawn a goroutine for each task, but before starting the "heavy" part, they acquire a slot from the semaphore.
- How this relates: A Worker Pool is often implemented using a semaphore, but a semaphore is a lower-level tool.
- If the task is heavy (e.g., an HTTP request, complex calculation, disk I/O) and there aren't millions of them — use a Semaphore. The overhead of creating a goroutine is negligible compared to the task's execution time (e.g., parallel scraping of 50 websites).
- If the task is very light (e.g., parsing a string, a simple transformation) and the stream is infinite — use a Worker Pool. Otherwise, the GC will be overwhelmed by the creation of thousands of goroutines (e.g., real-time log processing, handling events from Kafka).
MapReduce:
- How it's different: A higher-level pattern for distributed computations. It involves a "Map" phase (parallelization) and a "Reduce" phase (aggregation). A Worker Pool is often used as an implementation for the "Map" phase.
- Cons compared to Worker Pool: Overkill for simple concurrent processing.
Example Production-ready: Worker Pool
Key Points:
-
Start()— initialization and launch -
Stop()— immediate shutdown (with context cancellation) -
StopAndWait()— graceful shutdown (waits for current tasks to finish) - Each task runs in its own goroutine with
recover() - A panic doesn't kill the entire worker, but is converted into a result error
- Custom
panicHandlerfor logging - Context is passed through to each task handler
- When the pool stops, all tasks receive a cancellation signal
- Support for timeouts when submitting tasks
-
Submit()— blocking submission -
SubmitWithTimeout()— submission with a timeout -
TrySubmit()— non-blocking submission - Channels are closed only after all writers have stopped
-
sync.WaitGroupensures all workers have completed -
doneChchannel for external waiting on full shutdown - Atomic counters for statistics
Example (simplified):
package main
import (
"fmt"
"sync"
"time"
)
// Job represents a task to be executed
type Job struct {
ID int
Data string
Handler func(id int) int
}
// Result represents the outcome of a task
type Result struct {
JobID int
Output string
}
// WorkerPool manages the pool of workers
type WorkerPool struct {
JobQueue chan Job
Result chan Result
NumWorkers int
wg sync.WaitGroup
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(numWorkers, queueSize int) *WorkerPool {
return &WorkerPool{
JobQueue: make(chan Job, queueSize),
Result: make(chan Result, queueSize),
NumWorkers: numWorkers,
}
}
// worker — the worker function that processes tasks from the channel
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.JobQueue {
fmt.Printf("Worker %d processes the task %d: %s\n", id, job.ID, job.Data)
// Simulating work / Simulating a task
time.Sleep(500 * time.Millisecond)
output := fmt.Sprintf("Processed: %s (result: %d)", job.Data, job.Handler(job.ID))
result := Result{
JobID: job.ID,
Output: output,
}
wp.Result <- result
}
}
// Start launches the workers
func (wp *WorkerPool) Start() {
for i := 1; i <= wp.NumWorkers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// Submit sends a task to the pool
func (wp *WorkerPool) Submit(job Job) {
wp.JobQueue <- job
}
// Wait closes the task channel and waits for all workers to finish
func (wp *WorkerPool) Wait() {
close(wp.JobQueue)
wp.wg.Wait()
close(wp.Result)
}
func main() {
pool := NewWorkerPool(3, 10)
pool.Start()
// Sending tasks / Submitting tasks
var submitWg sync.WaitGroup
for i := 1; i <= 5; i++ {
submitWg.Add(1)
go func(id int) {
defer submitWg.Done()
job := Job{
ID: id,
Data: fmt.Sprintf("Data-%d", id),
Handler: func(id int) int { return id * 2 },
}
pool.Submit(job)
}(i)
}
submitWg.Wait()
// Launching the result collector in a goroutine
go func() {
for result := range pool.Result {
fmt.Printf("Result: task %d -> %s\n", result.JobID, result.Output)
}
}()
// Waiting for all workers to finish
pool.Wait()
fmt.Println("All tasks completed!")
}
Top comments (0)