DEV Community

Cover image for # Structured Concurrency in Go: Build Predictable, Leak-Free Concurrent Programs
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

# Structured Concurrency in Go: Build Predictable, Leak-Free Concurrent Programs

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

When I first started writing concurrent programs in Go, I felt like I was herding cats. I'd launch a goroutine and hope for the best. Sometimes it worked. Often, I'd end up with mysterious leaks, errors that vanished into the ether, or a program that just wouldn't stop. My code was a web of channels and sync.WaitGroup calls that only I could understand, and even I would get lost.

I needed a better way. I wanted my concurrent code to be predictable, clean, and safe. I wanted errors to be obvious and resources to be freed without me writing a novel of defer statements. This led me to a concept called structured concurrency. In simple terms, it means treating concurrent operations like a well-organized project, not a free-for-all.

Think of it like building a house. You don't just tell a hundred workers to start doing things randomly. You have a foreman (a manager). The foreman hires sub-contractors for clear tasks: the foundation crew, the framing crew, the plumbing crew (these are our scopes). Each crew has a specific job and a set of workers (goroutines). If the plumbing crew fails, the foreman knows about it immediately. He can tell the electrical crew, who depends on the plumbing, to stop work. And when the house is done, or if construction is cancelled, every single worker leaves the site and all the rented equipment is returned. Nothing is left behind.

This is what the code I'll show you does for Go programs. It provides a framework to manage goroutines in this disciplined, structured way.

Let's start with the main organizer, the ConcurrencyManager. It's the foreman of our operation.

type ConcurrencyManager struct {
    parentCtx   context.Context
    cancelFunc  context.CancelFunc
    wg          sync.WaitGroup
    errChan     chan error
    doneChan    chan struct{}
    children    []*ConcurrencyScope
    mu          sync.RWMutex
    stats       ConcurrencyStats
}
Enter fullscreen mode Exit fullscreen mode

The manager holds the big picture. It owns the main context, which can cancel everything. It has a WaitGroup to track all running goroutines. It has an error channel where any serious problem from any child scope can be reported. Its job is to create structured workspaces, which we call ConcurrencyScope.

A ConcurrencyScope is like a dedicated crew with a single goal.

type ConcurrencyScope struct {
    name        string
    parent      *ConcurrencyManager
    children    []*Task
    resources   []Releasable
    cancel      context.CancelFunc
    completed   chan struct{}
    err         atomic.Value
    mu          sync.RWMutex
}
Enter fullscreen mode Exit fullscreen mode

When you want to perform a group of related tasks, you create a scope. "fetch_user_data", "process_image_batch", "handle_http_request" — these could all be scopes. A scope contains tasks and holds resources (like database connections or file handles) that only its tasks need. It has its own cancel function, so if something goes wrong in this scope, we can stop all its tasks without affecting other, unrelated scopes.

The actual work is done by a Task. This is our individual worker.

type Task struct {
    name     string
    execute  func(context.Context) error
    timeout  time.Duration
    priority int
    depends  []*Task
    state    TaskState
    result   interface{}
    err      error
}
Enter fullscreen mode Exit fullscreen mode

A task is simple: it's a named function that does something and might return an error. But we've added structure: a timeout so it can't run forever, a list of other tasks it depends on, and a place to store its state and result.

Now, how does it all come together? Let's walk through a typical flow.

First, you create a manager with a background context. This manager is now responsible for all concurrency in this part of your program.

func main() {
    ctx := context.Background()
    cm := NewConcurrencyManager(ctx)
    defer cm.Cancel() // This ensures we clean up if we exit early
}
Enter fullscreen mode Exit fullscreen mode

Let's say you need to fetch data from three different microservices to assemble a webpage. You want to do this in parallel, but if one fails, you want to cancel the others quickly and report the error.

You create a scope for this operation.

dataFetchScope := cm.CreateScope("assemble_webpage_data")
Enter fullscreen mode Exit fullscreen mode

Now, you define your tasks. Each task is a function that calls a service.

userTask := &Task{
    name: "get_user_profile",
    execute: func(ctx context.Context) error {
        // Imagine this calls a User Service HTTP API
        user, err := userServiceClient.Fetch(ctx, userID)
        if err != nil {
            return fmt.Errorf("failed to fetch user: %w", err)
        }
        // Store the result for later use
        userTask.result = user
        return nil
    },
    timeout: 5 * time.Second,
}

ordersTask := &Task{
    name: "get_user_orders",
    execute: func(ctx context.Context) error {
        // Calls an Order Service
        orders, err := orderServiceClient.Fetch(ctx, userID)
        if err != nil {
            return err
        }
        ordersTask.result = orders
        return nil
    },
    timeout: 7 * time.Second,
}
Enter fullscreen mode Exit fullscreen mode

You execute them within the scope. The scope takes over.

dataFetchScope.Execute(userTask)
dataFetchScope.Execute(ordersTask)
Enter fullscreen mode Exit fullscreen mode

Inside the Execute method, the scope does something crucial. It adds the task to its list, increments the manager's main WaitGroup, and then launches the task's function in a new goroutine in a controlled way.

This is the heart of the runTask method. It sets up a context with the task's specific timeout, waits for any tasks this one depends on, and then runs the execute function.

If the task returns an error, the scope's handleError method is called. This is where structured error propagation happens.

func (cs *ConcurrencyScope) handleError(err error) {
    // Store the first error in this scope
    if cs.err.Load() == nil {
        cs.err.Store(err)
    }
    // Send it to the parent manager's error channel
    select {
    case cs.parent.errChan <- fmt.Errorf("[%s] %v", cs.name, err):
        // Error sent
    default:
        // Channel is full, log it as a fallback
        log.Printf("Error channel full, dropping: %v", err)
    }
    // Cancel all other tasks in THIS SCOPE
    cs.cancelSiblings()
}
Enter fullscreen mode Exit fullscreen mode

This is powerful. A task failing in the "assemble_webpage_data" scope will:

  1. Record the error in the scope.
  2. Notify the manager.
  3. Immediately cancel the context for every other task in the same scope. The ordersTask will see its context become done and should stop its work cleanly.

Nothing outside this scope is affected. Other parts of your program keep running.

After launching tasks, you wait for the scope to finish.

err := dataFetchScope.Wait()
Enter fullscreen mode Exit fullscreen mode

The Wait method simply waits on the manager's main WaitGroup for all tasks in this scope to complete. How does it know which ones? Because every task launched by this scope added to that WaitGroup. When the wait is done, it checks if the scope stored an error and returns it.

This pattern gives you two incredibly useful high-level operations almost for free: Parallel and Series.

Parallel runs tasks at the same time and collects their results.

results, err := cm.Parallel("data_fetch",
    userTask,
    ordersTask,
    recommendationTask,
)
if err != nil {
    // One of them failed. The 'err' will tell you which one.
    log.Fatal(err)
}
// 'results' is a slice containing [user, orders, recommendations]
Enter fullscreen mode Exit fullscreen mode

Inside Parallel, it creates a scope, runs all tasks, waits, and then gathers the results. If any task failed, the entire Parallel operation fails, returning the error. The scope's error cancellation ensures the other tasks are stopped.

Series runs tasks one after another, where each task depends on the previous.

results, err := cm.Series("multi_step_processing",
    validateInputTask,
    sanitizeDataTask,
    writeToDatabaseTask,
)
Enter fullscreen mode Exit fullscreen mode

The framework sets up the dependency links (task[2] depends on task[1], which depends on task[0]). If sanitizeDataTask fails, writeToDatabaseTask will never even start.

Now, let's talk about resource management, because goroutines aren't the only thing we leak. We leak database connections, open files, and network sockets.

The framework integrates cleanup directly into the scope lifecycle. Any object that needs cleanup can implement a simple interface.

type Releasable interface {
    Release() error
}
Enter fullscreen mode Exit fullscreen mode

A database connection wrapper would do it.

type DatabaseConnection struct {
    conn *sql.DB
}

func (dc *DatabaseConnection) Release() error {
    return dc.conn.Close()
}
Enter fullscreen mode Exit fullscreen mode

When you create such a resource inside a task, you register it with the scope.

dataFetchScope.Execute(&Task{
    name: "query_database",
    execute: func(ctx context.Context) error {
        conn := &DatabaseConnection{db}
        // Tell the scope it needs to clean this up
        dataFetchScope.RegisterResource(conn)

        // Use the connection...
        rows, err := conn.conn.QueryContext(ctx, "SELECT...")
        // ...
        return nil
    },
})
Enter fullscreen mode Exit fullscreen mode

The magic happens in scope.Cleanup(). When a scope finishes—whether successfully or with an error—it calls Cleanup. This method goes through its list of registered resources and calls Release() on every single one, in reverse order. This is important because resources might depend on each other; you often need to close them in the opposite order you opened them.

You don't have to remember a single defer. The structure guarantees it.

For resources that are expensive to create, like database connections, we can build a ResourcePool. This pool works seamlessly with our structured scopes.

dbPool := NewResourcePool(10, func() (Releasable, error) {
    db, err := sql.Open("driver", "connectionString")
    return &DatabaseConnection{db}, err
})
Enter fullscreen mode Exit fullscreen mode

A task acquires a connection from the pool at the start and releases it when done. The pool manages having a maximum number of connections, waiting for one to free up, and timing out if it takes too long. All of this uses the same context from the task, so if the task is cancelled, the acquisition attempt is cancelled too.

task := &Task{
    execute: func(ctx context.Context) error {
        connResource, err := dbPool.Acquire(ctx)
        if err != nil {
            return err // Could be a timeout or cancellation
        }
        defer dbPool.Release(connResource) // Return to pool

        conn := connResource.(*DatabaseConnection)
        // ... use conn.conn to query ...
        return nil
    },
}
Enter fullscreen mode Exit fullscreen mode

Another critical pattern for real-world systems is the circuit breaker. Imagine your task calls a payment service. If that service is down, you don't want every single user request to wait 30 seconds for a timeout and fail. You want to fail fast.

A circuit breaker monitors failures.

paymentCircuit := NewCircuitBreaker(5, 30*time.Second)
Enter fullscreen mode Exit fullscreen mode

This breaker allows up to 5 consecutive failures. After that, it "opens" the circuit. For the next 30 seconds, any call to paymentCircuit.Execute(...) will immediately return an error "circuit breaker open", without even trying the operation. This protects the failing service and saves your application resources. After 30 seconds, it goes into a "half-open" state, allows one trial request, and if it succeeds, closes the circuit again to resume normal operation.

You use it inside a task.

task := &Task{
    execute: func(ctx context.Context) error {
        var paymentResult *PaymentResult
        err := paymentCircuit.Execute(func() error {
            var err error
            paymentResult, err = paymentClient.Charge(ctx, amount)
            return err
        })
        if err != nil {
            // This could be a real charge error OR a "breaker open" error
            return handlePaymentError(err)
        }
        // Use paymentResult...
        return nil
    },
}
Enter fullscreen mode Exit fullscreen mode

Finally, the framework collects stats. This isn't just nice to have; it's essential for understanding the behavior of your concurrent system in production.

After running a workload, you can ask the manager for statistics.

stats := cm.GetStats()
fmt.Printf(`
    Scopes Created: %d
    Tasks Started:  %d
    Tasks Completed: %d
    Tasks Failed:    %d
    Total CPU Time:  %v
`, stats.ScopesCreated, stats.TasksStarted,
   stats.TasksCompleted, stats.TasksFailed,
   time.Duration(stats.TotalExecutionTime))
Enter fullscreen mode Exit fullscreen mode

You can see if you're creating too many small scopes, if your failure rate is high, or where the total execution time is being spent.

Let's put it all together in a realistic example. An API handler that needs to validate a user, charge their card, and update inventory—all within a strict timeout.

func HandleCheckout(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    cm := NewConcurrencyManager(ctx)
    defer cm.Cancel()

    // We give the whole operation 8 seconds
    err := cm.WithTimeout("checkout_flow", 8*time.Second, func(scope *ConcurrencyScope) error {

        // Task 1: Validate user and cart (fast, local DB)
        validateTask := &Task{name: "validate", execute: validateCart, timeout: 1*time.Second}

        // Task 2: Charge payment (external, slow, uses circuit breaker)
        chargeTask := &Task{name: "charge", execute: chargeCard, timeout: 5*time.Second}
        chargeTask.depends = []*Task{validateTask} // Can't charge if cart is invalid

        // Task 3: Update inventory (can happen in parallel with charge)
        inventoryTask := &Task{name: "update_inventory", execute: updateInventory, timeout: 3*time.Second}

        scope.Execute(validateTask)
        scope.Execute(chargeTask)
        scope.Execute(inventoryTask)

        return scope.Wait()
    })

    if err != nil {
        // This error is clean: it could be a validation error,
        // a payment failure, an inventory problem, or the 8-second timeout.
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    w.WriteHeader(http.StatusOK)
    w.Write([]byte("Checkout successful"))
}
Enter fullscreen mode Exit fullscreen mode

In this handler, the structure is clear. The WithTimeout creates a scope that automatically cancels after 8 seconds. Inside, tasks have defined dependencies and timeouts. If the payment fails, the inventory update (which was running in parallel) is cancelled. If anything fails or the timeout is hit, the manager's defer calls Cancel(), which propagates through the context and stops all remaining goroutines. No leaks, no orphaned processes.

This is the goal of structured concurrency. It turns the chaotic world of parallel execution into something you can reason about, compose, and debug. Your code becomes about what needs to be done and the relationships between tasks, not the tedious mechanics of starting and stopping threads. The framework handles the mechanics, ensuring safety and providing visibility. It lets you focus on the real work.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)