In my workplace, client had reported a performance bottle-neck and as a resolution, I was assigned to identify the bottleneck and make necessary changes to make it more efficient. After some time investing the logs, I had found the root cause and it was due to the repetitive occurrence of some time-consuming activity, which ultimately impacted on the response time.
After some workaround, I decided to execute the time consuming operations asynchronously and wait for the execution of asynchronous work and aggregate the final results. Here I found 3 ways to do this.
go-routines are light weight wrappers for threads, they will do the work using a pool of threads internally.
The Bad way
TLDR: Execute jobs synchronously.
type Job struct{ | |
result interface{} | |
} | |
func (self *Job) execute() { | |
// do some work | |
self.result = .... | |
} | |
// an array of jobs | |
jobs := make([]Job) | |
jobs.append(jobs,...) | |
for i, job := range jobs { | |
job.execute() | |
jobs[idx].result = job.result | |
} |
This is a bad way of doing asynchronous work since it blocks main thread and execute jobs synchronously.
The Normal way
TLDR: Execute jobs asynchronously, but number of parallel jobs are uncontrolled
type Job struct{ | |
result interface{} | |
} | |
func (self *Job) execute() { | |
// do some work | |
self.result = .... | |
} | |
// an array of jobs | |
jobs := make([]Job) | |
jobs.append(jobs,...) | |
var wg sync.WaitGroup | |
wg.Add(len(jobs)) | |
for i, job := range jobs { | |
go func(idx int, task Job) { | |
defer wg.Done() | |
job.execute() | |
jobs[i].result = job.result | |
}(i, job) | |
} | |
wg.Wait() |
In here,
sync.WaitGroup
is controlling the execution of background thread and it waits till the atomic counter of waitGroup is reduced to zero, which indicates all the routines have been executed. The problem with this approach is, if we get a large number of jobs in job array, there will be that number of go-routines which consumes resources.
The Better Way
TLDR: Execute jobs asynchronously and number of parallel jobs are controlled
type Job struct{ | |
result interface{} | |
} | |
func (self *Job) execute() { | |
// do some work | |
self.result = .... | |
} | |
// to be executed parallelly | |
doParallel := func(ctx context.Context, inputs <-chan Job, output chan<- Job) { | |
for { | |
select { | |
case job, ok := <-inputs: | |
if !ok { | |
return | |
} | |
job.execute() | |
output <- job | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
// an array of jobs | |
jobs := make([]Job) | |
jobs.append(jobs,...) | |
ctx := context.Background() | |
max := runtime.NumCPU() | |
queue := make(chan Job, max) | |
output := make(chan Job) | |
defer close(output) | |
// spin up workers | |
for i := 0; i < max; i++ { | |
go doParallel(ctx, queue, output) | |
} | |
// passing jobs to workers though a queue, idle workers will pick the job and execute | |
go func() { | |
for _, job := range jobs { | |
queue <- job | |
} | |
close(queue) | |
}() | |
// collecting results | |
results := make([]Job, len(jobs)) | |
for i := 0; i < len(jobs); i++ { | |
select { | |
case executedJob := <-output: | |
results[i] = executedJob | |
case <-ctx.Done(): | |
break | |
} | |
} |
This approach will do the work in a controlled parallel manner. So at any given time, it will use
max
number of go-routines to execute the jobs parallelly and will consumes less resources.
Top comments (0)