DEV Community

Silver_dev
Silver_dev

Posted on

Concurrency patterns on Golang: Worker Pool

Problems this pattern can solve:

  1. 1. If your service makes 1,000 queries per second to a database, but it can only handle 100, a worker pool will protect the database from crashing.
  2. In the event of a sudden traffic spike, using go func() for each request could spawn 100,500 goroutines and consume all your memory. The pool limits concurrency.
  3. If you have a pool of socket connections or file descriptors, a worker pool ensures you don't exceed the OS limit.

The Essence:
We create a fixed number of goroutines (workers) that are started in advance and wait for tasks. The main goroutine (the dispatcher) puts tasks into a channel (the task queue). The workers concurrently take these tasks from the channel and execute them. They can send the results back through another channel.

The Idea: Limiting the number of concurrently executed operations and reusing goroutines.

Difference between Worker Pool and other approaches:

go func() (Spawning raw goroutines):

  • Cons compared to Worker Pool: Uncontrolled growth in the number of goroutines can lead to resource exhaustion (memory, file descriptors) and a panic. There is no control over concurrency.
  • Pros compared to Worker Pool: Simpler to write, lower startup latency (no need to wait for a free worker).
  • When to use it: For handling signals, very lightweight tasks, or when the load is guaranteed to be low.

Pipeline:

  • How it's different: This is about sequential processing. Data flows through a chain of stages, where each stage is executed by its own goroutine (or pool), connected by channels.
  • Example: stage1 (generate) -> stage2 (multiply) -> stage3 (save)
  • Cons compared to Worker Pool: More difficult to cancel and handle errors; the throughput is limited by the slowest stage.
  • Pros compared to Worker Pool: Ideal for tasks that can be broken down into distinct, independent processing steps.

Semaphore:

  • How it's different: A semaphore is a synchronization primitive used to limit access to a resource. You still spawn a goroutine for each task, but before starting the "heavy" part, they acquire a slot from the semaphore.
  • How this relates: A Worker Pool is often implemented using a semaphore, but a semaphore is a lower-level tool.
  • If the task is heavy (e.g., an HTTP request, complex calculation, disk I/O) and there aren't millions of them — use a Semaphore. The overhead of creating a goroutine is negligible compared to the task's execution time (e.g., parallel scraping of 50 websites).
  • If the task is very light (e.g., parsing a string, a simple transformation) and the stream is infinite — use a Worker Pool. Otherwise, the GC will be overwhelmed by the creation of thousands of goroutines (e.g., real-time log processing, handling events from Kafka).

MapReduce:

  • How it's different: A higher-level pattern for distributed computations. It involves a "Map" phase (parallelization) and a "Reduce" phase (aggregation). A Worker Pool is often used as an implementation for the "Map" phase.
  • Cons compared to Worker Pool: Overkill for simple concurrent processing.

Example Production-ready: Worker Pool

Key Points:

  • Start() — initialization and launch
  • Stop() — immediate shutdown (with context cancellation)
  • StopAndWait() — graceful shutdown (waits for current tasks to finish)
  • Each task runs in its own goroutine with recover()
  • A panic doesn't kill the entire worker, but is converted into a result error
  • Custom panicHandler for logging
  • Context is passed through to each task handler
  • When the pool stops, all tasks receive a cancellation signal
  • Support for timeouts when submitting tasks
  • Submit() — blocking submission
  • SubmitWithTimeout() — submission with a timeout
  • TrySubmit() — non-blocking submission
  • Channels are closed only after all writers have stopped
  • sync.WaitGroup ensures all workers have completed
  • doneCh channel for external waiting on full shutdown
  • Atomic counters for statistics

Example (simplified):

package main

import (
    "fmt"
    "sync"
    "time"
)

// Job represents a task to be executed
type Job struct {
    ID      int
    Data    string
    Handler func(id int) int
}

// Result represents the outcome of a task
type Result struct {
    JobID  int
    Output string
}

// WorkerPool manages the pool of workers
type WorkerPool struct {
    JobQueue   chan Job
    Result     chan Result
    NumWorkers int
    wg         sync.WaitGroup
}

// NewWorkerPool creates a new worker pool
func NewWorkerPool(numWorkers, queueSize int) *WorkerPool {
    return &WorkerPool{
        JobQueue:   make(chan Job, queueSize),
        Result:     make(chan Result, queueSize),
        NumWorkers: numWorkers,
    }
}

// worker — the worker function that processes tasks from the channel
func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()

    for job := range wp.JobQueue {
        fmt.Printf("Worker %d processes the task %d: %s\n", id, job.ID, job.Data)

        // Simulating work / Simulating a task
        time.Sleep(500 * time.Millisecond)

        output := fmt.Sprintf("Processed: %s (result: %d)", job.Data, job.Handler(job.ID))

        result := Result{
            JobID:  job.ID,
            Output: output,
        }
        wp.Result <- result
    }
}

// Start launches the workers
func (wp *WorkerPool) Start() {
    for i := 1; i <= wp.NumWorkers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

// Submit sends a task to the pool
func (wp *WorkerPool) Submit(job Job) {
    wp.JobQueue <- job
}

// Wait closes the task channel and waits for all workers to finish
func (wp *WorkerPool) Wait() {
    close(wp.JobQueue)
    wp.wg.Wait()
    close(wp.Result)
}

func main() {
    pool := NewWorkerPool(3, 10)
    pool.Start()

    // Sending tasks / Submitting tasks
    var submitWg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        submitWg.Add(1)
        go func(id int) {
            defer submitWg.Done()
            job := Job{
                ID:      id,
                Data:    fmt.Sprintf("Data-%d", id),
                Handler: func(id int) int { return id * 2 },
            }
            pool.Submit(job)
        }(i)
    }
    submitWg.Wait()

    // Launching the result collector in a goroutine
    go func() {
        for result := range pool.Result {
            fmt.Printf("Result: task %d -> %s\n", result.JobID, result.Output)
        }
    }()

    // Waiting for all workers to finish
    pool.Wait()

    fmt.Println("All tasks completed!")
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)