DEV Community

Ruben D. Garcia
Ruben D. Garcia

Posted on

๐Ÿš€ Introducing go-pool: Concurrency Made Simple and Safe in Go

Go-Pool

Go-Pool is a lightweight, type-safe, and high-performance worker pool for Go. It simplifies managing concurrent workloads, providing deterministic cleanup, optional retries, and efficient result collectionโ€”without the complexity of channels or errgroup.

Goโ€™s concurrency model is powerful, but building high-throughput, leak-free, and memory-efficient concurrent systems can be tricky. Go-Pool abstracts these challenges, giving developers a clean, minimal API for safe and efficient task execution.


Features

  • Efficient worker pool for streamlined concurrent task execution.
  • Type-safe generic Drainer (Drainer[T]) for collecting results safely and efficiently.
  • Retryable tasks with exponential backoff and jitter for transient failures.
  • Context-aware execution for safe cancellation.
  • Deterministic shutdownโ€”no goroutine leaks.
  • Minimal allocations and lock-free designs where possible.
  • Fluent functional composition (WithRetry) for flexible task definitions.
  • High throughput: optimized for millions of tasks with microsecond-friendly operations.

Installation

go get github.com/rubengp99/go-pool
Enter fullscreen mode Exit fullscreen mode

Core Concepts

Concept Description
Task A unit of work (func() error) to be executed concurrently.
Worker Interface representing executable tasks, optionally retryable.
Pool Manages concurrent execution using WaitGroup and semaphores.
Drainer[T] Type-safe collector for task results, optimized for high throughput.
Retryable Wraps tasks with automatic retries, including exponential backoff + jitter.

How it works:

  1. Each Worker runs in its own goroutine, managed by a WaitGroup.
  2. Concurrency is controlled via a semaphore.
  3. Shared context handles cancellation propagation.
  4. Drainer[T] safely collects results in a lock-free or low-allocation linked list.
  5. Resources and channels are deterministically cleaned up upon completion.

Usage Examples

1. Running Simple Concurrent Tasks

package main

import (
    "fmt"
    "sync/atomic"
    gopool "github.com/rubengp99/go-pool"
)

func main() {
    var numInvocations uint32
    task := gopool.NewTask(func() error {
        atomic.AddUint32(&numInvocations, 1)
        fmt.Println("Task executed")
        return nil
    })

    pool := gopool.NewPool().WithLimit(2)
    err := pool.Go(task, task, task).Wait()

    fmt.Println("Tasks completed:", numInvocations) // 3
    fmt.Println("Error:", err)                       // nil
}
Enter fullscreen mode Exit fullscreen mode

Explanation:
Runs multiple tasks concurrently with a pool of 2 workers. Simple demonstration of basic concurrency without retries or result collection.


2. Retryable Tasks

task := gopool.NewTask(func() error {
    return fmt.Errorf("temporary error")
}).WithRetry(3, 100*time.Millisecond)

pool := gopool.NewPool()
pool.Go(task).Wait()
Enter fullscreen mode Exit fullscreen mode

Explanation:
Wraps a task with retries using WithRetry. The task will attempt up to 3 times with exponential backoff (100 ms base) for transient errors.


3. Collecting Results with Drainer

output := gopool.NewDrainer[string]()

tasks := gopool.Workers{
    gopool.NewTask(func() error { output.Send("result1"); return nil }),
    gopool.NewTask(func() error { output.Send("result2"); return nil }),
}

pool := gopool.NewPool().WithLimit(2)
pool.Go(tasks...).Wait()

results := output.Drain()
fmt.Println("Collected results:", results) // ["result1", "result2"]
Enter fullscreen mode Exit fullscreen mode

Explanation:
Demonstrates type-safe result collection using a Drainer. Tasks send their results to the drainer, which is drained after execution.


4. Multiple Task Types

type typeA struct{ value string }
type typeB struct{ value float32 }

outputA := gopool.NewDrainer[typeA]()
outputB := gopool.NewDrainer[typeB]()

tasks := gopool.Workers{
    gopool.NewTask(func() error { outputA.Send(typeA{"hello"}); return nil }),
    gopool.NewTask(func() error { outputB.Send(typeB{42.5}); return nil }),
}

pool := gopool.NewPool().WithLimit(2)
pool.Go(tasks...).Wait()
Enter fullscreen mode Exit fullscreen mode

Explanation:
Illustrates concurrent execution of tasks producing different result types, each collected by a separate type-safe drainer.


5. Complete Example: Retries + Drainers

package main

import (
    "fmt"
    "sync/atomic"
    "time"
    gopool "github.com/rubengp99/go-pool"
)

type typeA struct{ value string }
type typeB struct{ value float32 }

func main() {
    var totalInvocations uint32

    outputA := gopool.NewDrainer[typeA]()
    outputB := gopool.NewDrainer[typeB]()

    tasks := gopool.Workers{
        gopool.NewTask(func() error {
            atomic.AddUint32(&totalInvocations, 1)
            fmt.Println("Running simple task")
            return nil
        }),

        gopool.NewTask(func() error {
            atomic.AddUint32(&totalInvocations, 1)
            if atomic.LoadUint32(&totalInvocations)%2 == 0 {
                fmt.Println("Task succeeded after retry")
                return nil
            }
            fmt.Println("Task failed, retrying...")
            return fmt.Errorf("temporary error")
        }).WithRetry(3, 100*time.Millisecond),

        gopool.NewTask(func() error {
            atomic.AddUint32(&totalInvocations, 1)
            outputA.Send(typeA{value: "Hello from typeA!"})
            return nil
        }),

        gopool.NewTask(func() error {
            atomic.AddUint32(&totalInvocations, 1)
            outputB.Send(typeB{value: 99.99})
            return nil
        }),
    }

    pool := gopool.NewPool().WithLimit(2)
    if err := pool.Go(tasks...).Wait(); err != nil {
        fmt.Println("Pool error:", err)
    }

    resultsA := outputA.Drain()
    resultsB := outputB.Drain()

    fmt.Printf("\nTotal tasks executed: %d\n", totalInvocations)
    fmt.Println("Results of typeA:", resultsA)
    fmt.Println("Results of typeB:", resultsB)
}
Enter fullscreen mode Exit fullscreen mode

Explanation:
Full-featured example combining simple tasks, retryable tasks, and multiple drainers. Demonstrates heterogeneous workloads, retries, and type-safe result collection in one pool.


Benchmarks

Name Iterations ns/op B/op allocs/op
ErrGroup 6,203,892 183.5 24 1
GoPool 6,145,203 192.0 32 1
GoPoolWithDrainer 5,508,412 205.4 90 2
ChannelsWithOutputAndErrChannel 4,461,849 262.0 72 2
ChannelsWithWaitGroup 4,431,901 271.8 80 2
ChannelsWithErrGroup 4,459,243 274.8 80 2
MutexWithErrGroup 2,896,214 378.3 135 2

Benchmark Comparison

Analysis:
Go-Pool adds only ~8.5โ€ฏns per operation versus ErrGroup, while offering:

  • Type safety with generic Drainer[T]
  • Automatic retries with backoff
  • Deterministic cleanup
  • Concurrent-safe result draining All with minimal memory overhead, ideal for high-throughput Go applications.

Repository

Go-Pool on GitHub

Top comments (0)