As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
When I first started writing concurrent programs in Go, I felt like I was herding cats. I'd launch a goroutine and hope for the best. Sometimes it worked. Often, I'd end up with mysterious leaks, errors that vanished into the ether, or a program that just wouldn't stop. My code was a web of channels and sync.WaitGroup calls that only I could understand, and even I would get lost.
I needed a better way. I wanted my concurrent code to be predictable, clean, and safe. I wanted errors to be obvious and resources to be freed without me writing a novel of defer statements. This led me to a concept called structured concurrency. In simple terms, it means treating concurrent operations like a well-organized project, not a free-for-all.
Think of it like building a house. You don't just tell a hundred workers to start doing things randomly. You have a foreman (a manager). The foreman hires sub-contractors for clear tasks: the foundation crew, the framing crew, the plumbing crew (these are our scopes). Each crew has a specific job and a set of workers (goroutines). If the plumbing crew fails, the foreman knows about it immediately. He can tell the electrical crew, who depends on the plumbing, to stop work. And when the house is done, or if construction is cancelled, every single worker leaves the site and all the rented equipment is returned. Nothing is left behind.
This is what the code I'll show you does for Go programs. It provides a framework to manage goroutines in this disciplined, structured way.
Let's start with the main organizer, the ConcurrencyManager. It's the foreman of our operation.
type ConcurrencyManager struct {
parentCtx context.Context
cancelFunc context.CancelFunc
wg sync.WaitGroup
errChan chan error
doneChan chan struct{}
children []*ConcurrencyScope
mu sync.RWMutex
stats ConcurrencyStats
}
The manager holds the big picture. It owns the main context, which can cancel everything. It has a WaitGroup to track all running goroutines. It has an error channel where any serious problem from any child scope can be reported. Its job is to create structured workspaces, which we call ConcurrencyScope.
A ConcurrencyScope is like a dedicated crew with a single goal.
type ConcurrencyScope struct {
name string
parent *ConcurrencyManager
children []*Task
resources []Releasable
cancel context.CancelFunc
completed chan struct{}
err atomic.Value
mu sync.RWMutex
}
When you want to perform a group of related tasks, you create a scope. "fetch_user_data", "process_image_batch", "handle_http_request" — these could all be scopes. A scope contains tasks and holds resources (like database connections or file handles) that only its tasks need. It has its own cancel function, so if something goes wrong in this scope, we can stop all its tasks without affecting other, unrelated scopes.
The actual work is done by a Task. This is our individual worker.
type Task struct {
name string
execute func(context.Context) error
timeout time.Duration
priority int
depends []*Task
state TaskState
result interface{}
err error
}
A task is simple: it's a named function that does something and might return an error. But we've added structure: a timeout so it can't run forever, a list of other tasks it depends on, and a place to store its state and result.
Now, how does it all come together? Let's walk through a typical flow.
First, you create a manager with a background context. This manager is now responsible for all concurrency in this part of your program.
func main() {
ctx := context.Background()
cm := NewConcurrencyManager(ctx)
defer cm.Cancel() // This ensures we clean up if we exit early
}
Let's say you need to fetch data from three different microservices to assemble a webpage. You want to do this in parallel, but if one fails, you want to cancel the others quickly and report the error.
You create a scope for this operation.
dataFetchScope := cm.CreateScope("assemble_webpage_data")
Now, you define your tasks. Each task is a function that calls a service.
userTask := &Task{
name: "get_user_profile",
execute: func(ctx context.Context) error {
// Imagine this calls a User Service HTTP API
user, err := userServiceClient.Fetch(ctx, userID)
if err != nil {
return fmt.Errorf("failed to fetch user: %w", err)
}
// Store the result for later use
userTask.result = user
return nil
},
timeout: 5 * time.Second,
}
ordersTask := &Task{
name: "get_user_orders",
execute: func(ctx context.Context) error {
// Calls an Order Service
orders, err := orderServiceClient.Fetch(ctx, userID)
if err != nil {
return err
}
ordersTask.result = orders
return nil
},
timeout: 7 * time.Second,
}
You execute them within the scope. The scope takes over.
dataFetchScope.Execute(userTask)
dataFetchScope.Execute(ordersTask)
Inside the Execute method, the scope does something crucial. It adds the task to its list, increments the manager's main WaitGroup, and then launches the task's function in a new goroutine in a controlled way.
This is the heart of the runTask method. It sets up a context with the task's specific timeout, waits for any tasks this one depends on, and then runs the execute function.
If the task returns an error, the scope's handleError method is called. This is where structured error propagation happens.
func (cs *ConcurrencyScope) handleError(err error) {
// Store the first error in this scope
if cs.err.Load() == nil {
cs.err.Store(err)
}
// Send it to the parent manager's error channel
select {
case cs.parent.errChan <- fmt.Errorf("[%s] %v", cs.name, err):
// Error sent
default:
// Channel is full, log it as a fallback
log.Printf("Error channel full, dropping: %v", err)
}
// Cancel all other tasks in THIS SCOPE
cs.cancelSiblings()
}
This is powerful. A task failing in the "assemble_webpage_data" scope will:
- Record the error in the scope.
- Notify the manager.
- Immediately cancel the context for every other task in the same scope. The
ordersTaskwill see its context become done and should stop its work cleanly.
Nothing outside this scope is affected. Other parts of your program keep running.
After launching tasks, you wait for the scope to finish.
err := dataFetchScope.Wait()
The Wait method simply waits on the manager's main WaitGroup for all tasks in this scope to complete. How does it know which ones? Because every task launched by this scope added to that WaitGroup. When the wait is done, it checks if the scope stored an error and returns it.
This pattern gives you two incredibly useful high-level operations almost for free: Parallel and Series.
Parallel runs tasks at the same time and collects their results.
results, err := cm.Parallel("data_fetch",
userTask,
ordersTask,
recommendationTask,
)
if err != nil {
// One of them failed. The 'err' will tell you which one.
log.Fatal(err)
}
// 'results' is a slice containing [user, orders, recommendations]
Inside Parallel, it creates a scope, runs all tasks, waits, and then gathers the results. If any task failed, the entire Parallel operation fails, returning the error. The scope's error cancellation ensures the other tasks are stopped.
Series runs tasks one after another, where each task depends on the previous.
results, err := cm.Series("multi_step_processing",
validateInputTask,
sanitizeDataTask,
writeToDatabaseTask,
)
The framework sets up the dependency links (task[2] depends on task[1], which depends on task[0]). If sanitizeDataTask fails, writeToDatabaseTask will never even start.
Now, let's talk about resource management, because goroutines aren't the only thing we leak. We leak database connections, open files, and network sockets.
The framework integrates cleanup directly into the scope lifecycle. Any object that needs cleanup can implement a simple interface.
type Releasable interface {
Release() error
}
A database connection wrapper would do it.
type DatabaseConnection struct {
conn *sql.DB
}
func (dc *DatabaseConnection) Release() error {
return dc.conn.Close()
}
When you create such a resource inside a task, you register it with the scope.
dataFetchScope.Execute(&Task{
name: "query_database",
execute: func(ctx context.Context) error {
conn := &DatabaseConnection{db}
// Tell the scope it needs to clean this up
dataFetchScope.RegisterResource(conn)
// Use the connection...
rows, err := conn.conn.QueryContext(ctx, "SELECT...")
// ...
return nil
},
})
The magic happens in scope.Cleanup(). When a scope finishes—whether successfully or with an error—it calls Cleanup. This method goes through its list of registered resources and calls Release() on every single one, in reverse order. This is important because resources might depend on each other; you often need to close them in the opposite order you opened them.
You don't have to remember a single defer. The structure guarantees it.
For resources that are expensive to create, like database connections, we can build a ResourcePool. This pool works seamlessly with our structured scopes.
dbPool := NewResourcePool(10, func() (Releasable, error) {
db, err := sql.Open("driver", "connectionString")
return &DatabaseConnection{db}, err
})
A task acquires a connection from the pool at the start and releases it when done. The pool manages having a maximum number of connections, waiting for one to free up, and timing out if it takes too long. All of this uses the same context from the task, so if the task is cancelled, the acquisition attempt is cancelled too.
task := &Task{
execute: func(ctx context.Context) error {
connResource, err := dbPool.Acquire(ctx)
if err != nil {
return err // Could be a timeout or cancellation
}
defer dbPool.Release(connResource) // Return to pool
conn := connResource.(*DatabaseConnection)
// ... use conn.conn to query ...
return nil
},
}
Another critical pattern for real-world systems is the circuit breaker. Imagine your task calls a payment service. If that service is down, you don't want every single user request to wait 30 seconds for a timeout and fail. You want to fail fast.
A circuit breaker monitors failures.
paymentCircuit := NewCircuitBreaker(5, 30*time.Second)
This breaker allows up to 5 consecutive failures. After that, it "opens" the circuit. For the next 30 seconds, any call to paymentCircuit.Execute(...) will immediately return an error "circuit breaker open", without even trying the operation. This protects the failing service and saves your application resources. After 30 seconds, it goes into a "half-open" state, allows one trial request, and if it succeeds, closes the circuit again to resume normal operation.
You use it inside a task.
task := &Task{
execute: func(ctx context.Context) error {
var paymentResult *PaymentResult
err := paymentCircuit.Execute(func() error {
var err error
paymentResult, err = paymentClient.Charge(ctx, amount)
return err
})
if err != nil {
// This could be a real charge error OR a "breaker open" error
return handlePaymentError(err)
}
// Use paymentResult...
return nil
},
}
Finally, the framework collects stats. This isn't just nice to have; it's essential for understanding the behavior of your concurrent system in production.
After running a workload, you can ask the manager for statistics.
stats := cm.GetStats()
fmt.Printf(`
Scopes Created: %d
Tasks Started: %d
Tasks Completed: %d
Tasks Failed: %d
Total CPU Time: %v
`, stats.ScopesCreated, stats.TasksStarted,
stats.TasksCompleted, stats.TasksFailed,
time.Duration(stats.TotalExecutionTime))
You can see if you're creating too many small scopes, if your failure rate is high, or where the total execution time is being spent.
Let's put it all together in a realistic example. An API handler that needs to validate a user, charge their card, and update inventory—all within a strict timeout.
func HandleCheckout(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
cm := NewConcurrencyManager(ctx)
defer cm.Cancel()
// We give the whole operation 8 seconds
err := cm.WithTimeout("checkout_flow", 8*time.Second, func(scope *ConcurrencyScope) error {
// Task 1: Validate user and cart (fast, local DB)
validateTask := &Task{name: "validate", execute: validateCart, timeout: 1*time.Second}
// Task 2: Charge payment (external, slow, uses circuit breaker)
chargeTask := &Task{name: "charge", execute: chargeCard, timeout: 5*time.Second}
chargeTask.depends = []*Task{validateTask} // Can't charge if cart is invalid
// Task 3: Update inventory (can happen in parallel with charge)
inventoryTask := &Task{name: "update_inventory", execute: updateInventory, timeout: 3*time.Second}
scope.Execute(validateTask)
scope.Execute(chargeTask)
scope.Execute(inventoryTask)
return scope.Wait()
})
if err != nil {
// This error is clean: it could be a validation error,
// a payment failure, an inventory problem, or the 8-second timeout.
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("Checkout successful"))
}
In this handler, the structure is clear. The WithTimeout creates a scope that automatically cancels after 8 seconds. Inside, tasks have defined dependencies and timeouts. If the payment fails, the inventory update (which was running in parallel) is cancelled. If anything fails or the timeout is hit, the manager's defer calls Cancel(), which propagates through the context and stops all remaining goroutines. No leaks, no orphaned processes.
This is the goal of structured concurrency. It turns the chaotic world of parallel execution into something you can reason about, compose, and debug. Your code becomes about what needs to be done and the relationships between tasks, not the tedious mechanics of starting and stopping threads. The framework handles the mechanics, ensuring safety and providing visibility. It lets you focus on the real work.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)