This script is going to make you control your code's concurrency.
Hi! This is going to be my meticulous attempt to incite the interest and curiosity that piqued in me too, while I was exploring Concurrency patterns in Golang. It delineates the key idea behind controlled concurrency, provides deadlock insights, discusses optimisations & presents a correct concurrency control pattern.
Prerequisite:
Knowledge of Golang syntax, how concurrency works & slight theoretical knowledge of how channels in Go work is useful to get the comprehensive guide work out for you.
Would you like to give some thoughts over this question before we start?
Ask yourself: Suppose you have 100,000 goroutines working concurrently, and they all do some CPU work, some DB call, some network I/O, what happens?
... What happens to the Memory usage, the GC pressure? What happens to the External system(eg. DB)?
Let's mentally simulate this:
Each Goroutine starts small (nearly 2KB stack) but 100K -> 200MB+ stack space quickly;
There can be many variables allocated in the Heap or they escaped to the heap (Go's Escape Analysis);
The DB connection pool can exhaust, Queries to the DB queue up, Multiple timeout and retries to the DB occur.
What issues arise from this?
- Large no. of Goroutines => Scheduler overhead is more. While goroutines are "cheaper" than threads, still the scheduler needs to find the runnable goroutines to schedule them to the CPU.
- 200MB+ space in modern servers, sounds manageable, but the danger lies at the heap level. The GC has to mark the "simultaneous live objects in the heap" and "CPU Stealing" can occur, which means, the GC takes up CPU time away from the application logic.
- If the DB has let's say a connection pool of 100, but 100k goroutines try to query it, 99,900 goroutines will block waiting for a connection. This creates a long queue, thereby again consuming memory.
You see, this can cause the system to collapse. Therefore, we need to limit the no. of concurrent goroutines. And, so let's introduce Worker Pools.
Let us take a working example to gradually ramp up our understanding on worker pools:
- Take 2 Unbuffered channels - Jobs & Results => In a gist, unbuffered channel works like this - Sender on the channel is blocked until receive happens, OR Receiver is blocked until a send happens.
- Consider the Main Goroutine is both the producer to the Jobs channel and the consumer to the Results channel.
- 3 worker goroutines (let's say G1, G2, G3) consume on the Jobs channel and produce on the Results channel.
- Apart from these goroutines, we have a Listener Goroutine that closes the Results channel as soon as all the worker goroutines execute their exit code (implemented using WaitGroup wg)
package main
import (
"fmt"
"sync"
"strings"
"strconv"
)
type Job struct{
Id int
Url string
}
type Result struct{
JobId int
Url string
StatusCode int
Err error
}
func main(){
/*
Unbuffered channels - jobs and results
jobs := make(chan Job)
results := make(chan Result)
*/
jobs := make(chan Job)
results := make(chan Result)
var wg sync.WaitGroup
for i := 1; i <= 3; i++{
wg.Add(1)
go worker(jobs, results, &wg) // 3 worker goroutines spawned
}
urls := [5]string{
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
}
for i, url := range urls{ // Main produces jobs
jobs <- Job{Id: i+1, Url: url}
}
close(jobs)
go func(){ // Listener Goroutine
wg.Wait()
close(results)
}()
for result := range results{ // Main consumes results
fmt.Printf("job-%d %s → %d\n", result.JobId, result.Url, result.StatusCode)
}
fmt.Println("Main ended")
}
func worker(jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup){
defer wg.Done()
for job := range jobs {
tp := strings.Split(job.Url, "/")
statusCode, _ := strconv.Atoi(tp[len(tp)-1])
results <- Result{
JobId: job.Id,
Url: job.Url,
StatusCode: statusCode,
}
}
fmt.Println("G ended")
}
Let's dry run:
- When Main goroutine puts a job in the Jobs channel, it gets blocked on the Jobs channel, until any worker goroutine receives the job.
- Suppose G1 takes the job from the Jobs channel and consequently Main unblocks, then G1 extracts status code from the job URL, and produces a result in the Results Channel. Since no receiver is there on the Results channel currently, G1 gets blocked on the Results channel.
- Main again produces the job in the Jobs channel, and gets blocked again; Let's say now G2 picks up that job, similar to G1, G2 gets blocked on the Results channel.
- Main again produces the job, and now G3 picks the job and similarly, gets blocked on the Results channel.
- That means, after Main produces 3 jobs, G1, G2, G3 are blocked on the Results channel, and Main now gets blocked on the Jobs channel when it tries to produce 4th job. => DEADLOCK!
That implies, when Main is both the producer on the Jobs channel, and the consumer on the Results channel, and the Jobs and the Results channels both are unbuffered, Deadlock occurs if no. of jobs > no. of worker goroutines.
To avoid Deadlock in this case:
- The no. of jobs = no. of worker goroutines = 3 (in our example)(must condition). After producing the 3rd job, Main moves out of the job producer loop and closes the Jobs channel implying that now no jobs would be produced. At this state, G1, G2 & G3 are all blocked on the Results channel.
- Now, Main starts to consume on the Results channel "pairing with the 3 blocked worker goroutines one by one", consequently unblocking all of them".
- G1, G2 & G3 all exit the job consumer loop since Jobs channel was closed.
- The "Listener Goroutine" now closes the Results channel as soon as all the 3 workers exit.
- Subsequently, the Main exits the Results consumer loop, as Results channel was closed.
But, this is not the reality. No. of jobs can be 10, 20, 30.... any number. So, what can we do?
Let's try using buffered channels:
jobs := make(chan Job, 4) // You can simulate with any size
results := make(chan Result, 6)
Now, let's assume for one moment (This can help a lot if you pay attention to what I mean to say) -
There are infinite jobs that the Main goroutine wants to produce.
So, Tell me -- How many jobs can Main produce before deadlock happens? Think carefully! Run Concurrent goroutines in your head, simulating how buffered channels work..
Let me Answer (I hope you thought of it!) -
- Main produces job in the Jobs channel -> Any goroutine picks it up -> it produces result in the Results channel
- Until Results channel fills up (size = 6), 6 jobs can be produced by Main.
- The 3 worker goroutines can take up 3 more jobs and all 3 get blocked on the filled Results channel.
- Further, Main can produce 4 more jobs until the Jobs channel fills up (size = 4).
- Now, if Main tries to produce another job, it gets blocked; and Deadlock occurs. This gives us the formula => Max. Number of jobs that Main can produce without Deadlock = Results channel size (6) + no. of worker goroutines (3) + Jobs channel size (4) = 13.
Similarly, you can simulate for yourself, in the case of:
- Unbuffered Jobs channel & Buffered Results channel: Max. Number of jobs that Main can produce without Deadlock = Results channel size + no. of worker goroutines.
- Buffered Jobs channel & Unbuffered Results channel: Max. Number of jobs that Main can produce without Deadlock = Jobs channel size + no. of worker goroutines.
That implies our controlled concurrency mechanism failed to scale with the number of jobs; once the job count exceeds a threshold, it causes a deadlock.
This Reveals the key insight!
- Buffer is just a performance optimisation, not a correctness mechanism. It only delayed the Deadlock.
- Correctness comes from the *correct goroutine pattern *. What is that pattern??
Correct Goroutine pattern: Decoupled Producer and Consumer
- Earlier, the pattern we were studying was - Main being both the Producer to the Jobs channel and the Consumer to the Results channel. Deadlock occurred when the Main goroutine blocks on the Jobs channel and the worker goroutines block on the Results channel.
- What if - we side-by-side (concurrently) start a consumer service on the Results channel, consequently unblocking the worker goroutines, and making them ready to pick another jobs. And, hence, we found the solution!
A service produces jobs to the Jobs channel -> Worker routines pick up jobs and produce to the Results channel -> A service consumes from the Results channel. This is the Correct Goroutine pattern.
** Take a look at the code: **
/*
Decoupled Producer and Consumer
Main is the results Consumer, new Goroutine spawned as jobs Producer.
** Why not main as producer? Because after producing jobs, main will end without waiting for the consumer to finish.
Unbuffered Channels - Jobs and Results
*/
func main(){
jobs := make(chan Job)
results := make(chan Result)
var wg sync.WaitGroup
for i := 1; i <= 3; i++{
wg.Add(1)
go worker(jobs, results, &wg)
}
go func(){ // Producer goroutine
urls := [15]string{
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500",
"https://httpbin.org/status/500",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500",
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/status/404",
}
for i, url := range urls{
jobs <- Job{Id: i+1, Url: url}
}
close(jobs)
}()
go func(){
wg.Wait()
close(results)
}()
for result := range results{ // Main consumes results
fmt.Printf("job-%d %s → %d\n", result.JobId, result.Url, result.StatusCode)
}
fmt.Println("Main ended")
}
func worker(jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup){
defer wg.Done()
for job := range jobs {
tp := strings.Split(job.Url, "/")
statusCode, _ := strconv.Atoi(tp[len(tp)-1])
results <- Result{
JobId: job.Id,
Url: job.Url,
StatusCode: statusCode,
}
}
fmt.Println("G ended")
}
Therefore, we have arrived at a correct implementation of a worker pool and established control over concurrency. You can now experiment further:
a. Use buffered channels to improve performance in this pattern.
b. Replace the URL parsing with actual HTTP calls, and incorporate context cancellation.
From here, you can compose more advanced primitives within this model.”
_Thanks a lot! _
Top comments (0)