loading...

Semaphored Wait Group

pawzar profile image Paweł Zaremba Originally published at tegh.net ・3 min read

One of the first things you might like to try when starting your journey with Go are the concurrency patterns.
You will probably start using goroutines to run things "in the background",
and you will also get to know channels that allow for safe communication between the sub-processes.

When you'll want to spawn many goroutines, you will surely get to know WaitGroups to wait for them to finish.
There will also be the defer statement which helps you to not forget to clean up after a function has finished running (and remember, deferred calls are executed in reverse order).

Let's start with a simple example:

package main

import (
    "log"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    defer log.Printf("#%d done", id)
    log.Printf("#%d starting", id)
    time.Sleep(time.Second)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 100; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    wg.Wait()
    log.Printf("all done")
}

This works well, but what if we had a million items to process, and the actual work would be a memory-heavy operation?
We don't want to start a million goroutines.
We need something to limit the number of goroutines being run at the same time.

This is where a semaphore will come in handy.
We'll define how many concurrent workers we want, and the semaphore will not allow starting new goroutines until a slot is free.

Using a Channel to Create a Simple Semaphore

Let's say we want a maximum of 5 workers running at any time.
We'll need a buffered channel, and we do not really care what type of values it holds.

sem := make(chan bool, 5)

Whenever we want to start a goroutine, we'll push a value to the channel:

        wg.Add(1)
        sem <- true
        go worker(i, &wg, sem) // we need to pass the channel to the worker

When we're done - we'll take one value out of the channel:

func worker(id int, wg WaitGroup, sem <-chan bool) {
    defer wg.Done()
    defer log.Printf("#%d done", id)
    log.Printf("#%d starting", id)
    time.Sleep(time.Second)
    <-sem
}

Please notice, that wen we added the sem argument to the worker, we also converted the semaphore to a read-only channel using <-chan syntax.

But wait..., we do not want to change the code too much.
Can we just create our own implementation of a wait-group that will have the desired feature?

Yes we can, and Go's interfaces will help us do it.

Go's Implicit Interfaces - The Ultimate Decoupling

In most other languages the interface has to be defined before it can be implemented.
In Go - it is the other way around.
We can create interfaces that satisfy our needs and if anything (internal or external) has the methods with matching signatures,
Go will treat it as a type that implements the interface.

A Tour of Go: Implicit interfaces decouple the definition of an interface from its implementation, which could then appear in any package without prearrangement.

So, even though we cannot change the sync.WaitGroup type, we can extract an interface that matches our current requirements. We'll have:

type WaitGroup interface {
    Add(delta int)
    Done()
    Wait()
}

and change worker function's signature to:

func worker(id int, wg WaitGroup)

Now, we can create our own implementation of our WaitGroup interface with a built-in semaphore.

type SemaphoredWaitGroup struct {
    sem chan bool
    wg   sync.WaitGroup
}

func (s *SemaphoredWaitGroup) Add(delta int) {
    s.wg.Add(delta)
    s.sem <- true
}

func (s *SemaphoredWaitGroup) Done() {
    <-s.sem
    s.wg.Done()
}

func (s *SemaphoredWaitGroup) Wait() {
    s.wg.Wait()
}

Finally, instead of using the default sync.WaitGroup we can use our own SemaphoredWaitGroup:

    wg := SemaphoredWaitGroup{sem: make(chan bool, 5)}

and we have ourselves a limited pool of workers.

The complete example is available as a GitHub Gist
and on Go Playground.

Original post at: https://www.tegh.net/knowledge/semaphored-wait-group/

Discussion

pic
Editor guide