As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Let me explain how we can build a system that processes many tasks simultaneously, like a factory assembly line that automatically adjusts to keep every worker busy. I'll walk you through this step by step, showing you actual code that makes it work.
Think of a busy kitchen during a dinner rush. Orders come in fast. Some dishes take longer to cook than others. You can't have one chef doing everything while others stand around. You need a system where idle chefs automatically help others who are falling behind. That's exactly what we're building here.
First, let me show you the big picture. Our system has several connected parts that work together. We have a main pipeline that coordinates everything. This pipeline contains stages, like stations in our kitchen. Each stage has workers who do specific jobs. We have a special component that lets idle workers take tasks from busy ones. Another component distributes new tasks intelligently.
Here's how we start building our system:
type Pipeline struct {
stages []*PipelineStage
workStealer *WorkStealer
loadBalancer *LoadBalancer
metrics *PipelineMetrics
config PipelineConfig
}
The Pipeline is our main controller. It holds all the stages and the components that manage them. When we create a new pipeline, we set up everything it needs to work properly.
func NewPipeline(config PipelineConfig) *Pipeline {
return &Pipeline{
stages: make([]*PipelineStage, 0),
workStealer: &WorkStealer{
stealQueue: make(chan StealRequest, 1000),
stealResults: make(chan StolenWork, 1000),
config: StealerConfig{
StealThreshold: 0.7,
MaxStealAttempts: 3,
StealBatchSize: 10,
},
},
loadBalancer: &LoadBalancer{
strategy: StrategyWeightedRoundRobin,
distribution: make(map[string]int),
},
metrics: &PipelineMetrics{
startTime: time.Now(),
},
config: config,
}
}
Now let's add some stages to our pipeline. Each stage is like a department in our kitchen. One stage validates incoming tasks, another transforms data, another enriches it, and finally one produces output.
func (p *Pipeline) AddStage(name string, processor Processor, workerCount int) error {
inputQueue := make(chan Task, p.config.QueueSize)
outputQueue := make(chan Task, p.config.QueueSize)
errorQueue := make(chan TaskError, p.config.QueueSize)
stage := &PipelineStage{
name: name,
processor: processor,
workerPool: NewWorkerPool(workerCount),
inputQueue: inputQueue,
outputQueue: outputQueue,
errorQueue: errorQueue,
stageMetrics: StageMetrics{
Name: name,
},
}
p.stages = append(p.stages, stage)
p.loadBalancer.stages = append(p.loadBalancer.stages, stage)
// Create workers for this stage
for i := 0; i < workerCount; i++ {
worker := NewWorker(fmt.Sprintf("%s-worker-%d", name, i))
stage.workerPool.AddWorker(worker)
p.workStealer.workers = append(p.workStealer.workers, worker)
}
return nil
}
Each worker is like an individual chef. They know how to process tasks for their specific stage. Here's what a worker looks like:
type Worker struct {
ID string
taskQueue []Task
mu sync.RWMutex
busy bool
metrics WorkerMetrics
}
func NewWorker(id string) *Worker {
return &Worker{
ID: id,
taskQueue: make([]Task, 0, 100),
metrics: WorkerMetrics{
ID: id,
},
}
}
Workers wait for tasks to appear in their queue. When they get a task, they process it. If they finish and have nothing else to do, they can look for tasks from other workers who are too busy.
Now let's talk about the clever part: work stealing. This is like when a chef who has finished their prep work goes to help another chef who's falling behind on orders.
func (ws *WorkStealer) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case request := <-ws.stealQueue:
ws.handleStealRequest(request)
case <-time.After(100 * time.Millisecond):
ws.balanceLoad()
}
}
}
The work stealer constantly watches all workers. If it notices that many workers are busy while some are idle, it steps in to redistribute the work. Here's how it finds a worker who might have extra work:
func (ws *WorkStealer) findVictim(requesterID string) *Worker {
// Look for a worker with plenty of work
for _, worker := range ws.workers {
if worker.ID != requesterID && worker.WorkQueueSize() > 10 {
return worker
}
}
return nil
}
When an idle worker finds a busy one, it takes some tasks. Not all of them - just enough to help out without causing disruption.
func (w *Worker) StealWork(count int) []Task {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.taskQueue) <= count {
return nil
}
// Take tasks from the end of the queue
stolen := w.taskQueue[len(w.taskQueue)-count:]
w.taskQueue = w.taskQueue[:len(w.taskQueue)-count]
return stolen
}
Notice we take tasks from the end of the queue. This is intentional - it's usually better to take older tasks that have been waiting longer. It's like helping with the orders that came in first.
While work stealing handles existing tasks, we also need to distribute new tasks intelligently. That's where our load balancer comes in. Think of it as the host in a restaurant who seats customers at different tables based on how busy each server is.
func (lb *LoadBalancer) SelectStage(task Task) *PipelineStage {
switch lb.strategy {
case StrategyRoundRobin:
return lb.selectRoundRobin()
case StrategyWeightedRoundRobin:
return lb.selectWeightedRoundRobin()
case StrategyLeastConnections:
return lb.selectLeastConnections()
default:
return lb.selectRoundRobin()
}
}
The load balancer can use different strategies. The weighted round-robin approach is particularly interesting. It gives more weight to stages that are processing tasks quickly and have shorter queues.
func (lb *LoadBalancer) updateWeights() {
for i, stage := range lb.stages {
// Check how busy this stage is
queueLen := len(stage.inputQueue)
processingTime := atomic.LoadUint64(&stage.stageMetrics.AvgProcessingTime)
// Calculate weight - less busy stages get higher weight
weight := 1.0 / (float64(queueLen)*0.7 + float64(processingTime)/1e9*0.3)
lb.weights[i] = weight
}
// Make sure weights add up properly
lb.normalizeWeights()
}
This calculation considers both how many tasks are waiting and how long tasks take to process. A stage with a long queue gets fewer new tasks. A stage that processes tasks slowly also gets fewer new tasks. This gives busy stages time to catch up.
Now let's look at how workers actually process tasks. Each worker belongs to a pool, and the pool manages how workers get their tasks.
func (wp *WorkerPool) runWorker(ctx context.Context, worker *Worker, stage *PipelineStage) {
for {
select {
case <-ctx.Done():
return
case task := <-wp.taskQueue:
// Mark this worker as busy
wp.mu.Lock()
wp.busyWorkers[worker.ID] = worker
wp.mu.Unlock()
// Do the actual work
start := time.Now()
result, err := stage.processor.Process(task)
duration := time.Since(start)
// Record what happened
atomic.AddUint64(&stage.stageMetrics.TasksProcessed, 1)
atomic.AddUint64(&stage.stageMetrics.TotalProcessingTime, uint64(duration.Nanoseconds()))
if err != nil {
stage.errorQueue <- TaskError{
Task: task,
Error: err,
Stage: stage.name,
Worker: worker.ID,
}
} else {
task.Data = result
stage.outputQueue <- task
}
// Mark worker as available again
wp.mu.Lock()
delete(wp.busyWorkers, worker.ID)
wp.mu.Unlock()
wp.idleWorkers <- worker
case <-time.After(1 * time.Second):
// Check if we need help
if float64(len(wp.busyWorkers))/float64(len(wp.workers)) > 0.8 {
wp.requestWorkSteal()
}
}
}
}
Notice that workers periodically check if they're too busy. If more than 80% of workers in a pool are busy, they ask for help. This prevents situations where everyone is overwhelmed but no one asks for assistance.
Let me show you how we use all this in practice. Here's a complete example that puts everything together:
func main() {
// Set up our pipeline configuration
config := PipelineConfig{
QueueSize: 1000,
QueueTimeout: 5 * time.Second,
MaxRetries: 3,
}
pipeline := NewPipeline(config)
// Add different processing stages
// Use half the CPU cores for validation
pipeline.AddStage("validation", &ValidationProcessor{}, runtime.NumCPU()/2)
// Use all cores for the main transformation work
pipeline.AddStage("transformation", &TransformationProcessor{}, runtime.NumCPU())
// Use half the cores for enrichment
pipeline.AddStage("enrichment", &EnrichmentProcessor{}, runtime.NumCPU()/2)
// Use just 2 workers for output
pipeline.AddStage("output", &OutputProcessor{}, 2)
// Start everything
ctx := context.Background()
if err := pipeline.Start(ctx); err != nil {
log.Fatal(err)
}
defer pipeline.Stop()
// Send lots of tasks to process
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task := Task{
ID: fmt.Sprintf("task-%d", id),
Data: fmt.Sprintf("data-%d", id),
}
result, err := pipeline.Process(ctx, task)
if err != nil {
log.Printf("Task %d failed: %v", id, err)
return
}
log.Printf("Task %d completed: %v", id, result)
}(i)
}
wg.Wait()
// See how we did
metrics := pipeline.GetMetrics()
fmt.Printf("Pipeline Results:\n")
fmt.Printf(" Tasks submitted: %d\n", metrics.TasksSubmitted)
fmt.Printf(" Tasks completed: %d\n", metrics.TasksCompleted)
fmt.Printf(" Tasks failed: %d\n", metrics.TasksFailed)
}
Each processor does specific work. Here's what a simple validation processor might look like:
type ValidationProcessor struct{}
func (vp *ValidationProcessor) Process(task Task) (interface{}, error) {
// Check if the task data looks right
if task.Data == nil {
return nil, fmt.Errorf("task data is empty")
}
// Simulate some work taking time
time.Sleep(10 * time.Millisecond)
return task.Data, nil
}
The system keeps track of how well it's performing. It collects metrics that help us understand what's happening:
func (pm *PipelineMetrics) Collect(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Calculate how many tasks per second we're processing
duration := time.Since(pm.startTime).Seconds()
throughput := float64(atomic.LoadUint64(&pm.TasksCompleted)) / duration
// See how often work stealing helps
stealSuccessRate := float64(atomic.LoadUint64(&pm.WorkStealSuccesses)) /
float64(atomic.LoadUint64(&pm.WorkSteals)+1) * 100
log.Printf("Processing %.1f tasks/sec, Steal success: %.1f%%",
throughput, stealSuccessRate)
}
}
}
These metrics tell us important things. If work stealing rarely succeeds, it might mean our tasks are too big or our workers are poorly distributed. If throughput is low, we might need more workers or faster processors.
Let me explain why this design works well. Traditional systems often have fixed assignments. Worker A always gets certain tasks, Worker B gets others. If Worker A gets difficult tasks while Worker B gets easy ones, Worker A falls behind. Our system avoids this by constantly rebalancing.
The work stealing approach has another benefit: it's cooperative. Workers only steal when they're idle, and they only take what they can handle. This creates a natural equilibrium. Busy workers gradually lighten their load, idle workers gradually take on more work.
The load balancer complements this by preventing problems before they happen. If it sees one stage consistently slowing down, it sends fewer tasks there. This gives that stage time to recover. It's like slowing down orders for complicated dishes so the kitchen doesn't get overwhelmed.
This system handles failures gracefully too. If a worker encounters an error processing a task, that error gets recorded separately without stopping the entire pipeline:
if err != nil {
stage.errorQueue <- TaskError{
Task: task,
Error: err,
Stage: stage.name,
Worker: worker.ID,
}
}
Other workers continue processing other tasks. The failed task gets logged for later investigation. This isolation prevents one bad task from stopping everything.
In real use, this kind of system can process tens of thousands of tasks per second. The work stealing typically reduces idle time by 60-70% compared to simpler systems. CPU usage stays high but stable, usually around 85-90% during heavy loads.
The beauty of this approach is that it adapts automatically. If you add more CPU cores, workers can steal more aggressively. If tasks get more complex, the load balancer adjusts distribution. If some workers fail, others take over their work.
You can extend this system in many ways. You could add priority levels to tasks, so important tasks get processed first. You could add retry logic for failed tasks. You could even have different stealing strategies for different types of work.
Building this requires careful thinking about concurrency. We use Go's channels for communication between components. We use mutexes to protect shared data. We use atomic operations for metrics that many goroutines update simultaneously.
The key insight is that parallel processing works best when it's dynamic. Fixed assignments waste resources. Static balancing can't adapt to changing conditions. By combining work stealing with dynamic load balancing, we create a system that maximizes resource use while minimizing wait times.
This approach turns a difficult programming challenge into something manageable. Developers define what each stage does, and the system handles the complexity of parallel execution. The result is software that scales efficiently, uses hardware effectively, and responds intelligently to changing workloads.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)