DEV Community

Jolyon
Jolyon

Posted on • Edited on

Practical Synchronization with Go

Introduction

Go is such a beautiful language when it comes to manipulating threads and concurrency programming. Even though I've only used it for a month, I cannot get over with the convenience and efficiency of goroutines and sync packages. This out-of-box feature makes it much easier to practice concurrency patterns(Java is so opposed to this).

This post is inspired by MIT 6.824 Lecture 5:GO, Threads, and Raft. You can watch the lecture first and then come back for the summary notes.

Here we only focus on the practical usage of WaitGroup/Semaphore, mutex, condition variable, and channel.

Channel is used everwhere in Go but still we should try to avoid buffered channel if possible. Only the first three primitives are standard synchronization primitives and can be easily generalized to other languages.

WaitGroup

WaitGroup works like the semaphore. They both have an internal counter. However, WaitGroup will block the thread until the counter is 0 while the semaphore block until the counter is greater than 0. The associated operations are wg.Add()(increase by 1), wg.Done()(decrease by 1), wg.Wait().

A simple usage of waitGroup is to wait some other thread finished first.

package main
import "sync"

func main() {
    var a string
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
    // Refer to variables from the enclosing scope 
        a = "hello world"
        wg.Done()
    }()
    wg.Wait()
    println(a)
}

main spawns a goroutine, and the anonymous function executes before the println(a) because of the wg.Done(). Note that the anonymous function is a closure. It can refer to the variable a from the enclosing scope.

Here, we can use unbuffered channel to do the same thing, but again, channel is exclusive to go, and there is always an alternative way to do the same thing with standard primitives. Those synchronization primitives are more easy to reason.

Only use channel when it makes your life easier.

import "fmt"
func main() {
    var a string
    var ch = make(chan int)
    go func() {
    // Refer to variables from the enclosing scope 
        a = "hello world"
        ch<-1
    }()
    <-ch
    fmt.Println(a)
}

Semaphore Barrier with WaitGroup

A more common usage is to implement barriers: To wait for a group of threads to finish first.

package main
import "sync"

func main() {
    var wg sync.WaitGroup
  // Eg: Send RPC to all clients
    for i := 0; i < 5; i++ {
        wg.Add(1)
    // Have to make a copy of the i from outer scope
    // Otherwise the x will change and end up to 5. 
        go func(x int) {
            sendRPC(x)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

func sendRPC(i int) {
    println(i)
}

In this example, main sends RPC requests to 5 clients and wait for their executions.

The variable i is passed by copying the value instead of using a closure. The main will block until all 5 goroutines finished.

Mutex

Mutex is used whenever multiple threads try to access to some share memory/data.

The following is a bank example. Two accounts transfer money to each other in two threads. A practical tip is that we can append defer mu.Unlock() after mu.Lock() so that won't mess up the order.( defer mu.Unlock() should not be used here because its in a loop instead of a enclosing function. )

// bank.go
package main

import "sync"
import "time"
import "fmt"

func main() {
    alice := 10000
    bob := 10000
    var mu sync.Mutex

    total := alice + bob

    go func() {
        for i := 0; i < 1000; i++ {
            mu.Lock()
      // defer mu.Unlock()
            alice -= 1
            bob += 1
            mu.Unlock()
        }
    }()
    go func() {
        for i := 0; i < 1000; i++ {
            mu.Lock()
      // defer mu.Unlock()
            bob -= 1
            alice += 1
            mu.Unlock()
        }
    }()

    start := time.Now()
    for time.Since(start) < 1*time.Second {
        mu.Lock()
        if alice+bob != total {
            fmt.Printf("observed violation, alice = %v, bob = %v, sum = %v\n", alice, bob, alice+bob)
        }
        mu.Unlock()
    }
  fmt.Printf("alie+bob= %v\n", alice+bob)
}

When you have multiple shared instance, it's better to use embedded mutex as a field in a struct. Ref: When do you embed mutex in struct in Go?

var account struct {
    sync.Mutex
    money int
}

alice := account{
    money: 10000
}
bob := account{
    money: 10000
}

In this example, since we need automatically mutate two variants at the same time(alice-=1 and bob+=1), use a global mutex to specify a critical section is more straightforward.

Cancel periodic tasks with a shared variable and mutex

We often what to do something periodically in the background, and when some service terminate, we also want to kill these background thread so that no random threads are running in the background. An easy way to do that is to use a shared variable protected by a mutex.

func main() {
    time.Sleep(1 * time.Second)
    println("started")
    go periodic()
    time.Sleep(5 * time.Second) // wait for a while so we can observe what ticker does
    mu.Lock()
    done = true
    mu.Unlock()
    println("cancelled")
    time.Sleep(3 * time.Second) // observe no output
}

// Run something perioidically in background until the
// controller decide to kill them
func periodic() {
    for {
        println("tick")
        time.Sleep(1 * time.Second)
        mu.Lock()
        if done {
      mu.Unlock()
            return
        }
        mu.Unlock()
    }
}
/

Condition Variable

Condition variable is used a lot in practice.
It has three operations and is always paired with mutex(cond:=sync.NewCond(&mu)):

  • cond.Broadcast(): wakes all goroutines on the cond
  • cond.Singal(): only wake one goroutine(random choice)
  • cond.Wait(): Automatically unlock the mutex(c.L) and suspend the current goroutine(go into sleep and cannot come back unless awoken by the above operations). When awoken up, Wait() will try to grab the lock again.

Consider the example of vote counting. One client launches a vote and sends requests to other clients. The organizer can only continue until it gets a quorum(get over half of the votes).

Naively we can implement this by using an infinite for-loop with a mutex(busy-waiting), instead of the condition variable. However, it is not nice because the busy-waiting can burn up 100% cpu on a single core.

// Busy-waiting without condition variables
func main() {
  rand.Seed(time.Now().UnixNano())
  count := 0
  finished := 0
  var mu sync.Mutex
  for i := 0; i < 10; i++ {
    go func() {
      vote := requestVote()
      mu.Lock()
      defer mu.Unlock()
      if vote {
        count++
      }
      finished++
    }()
  }
  // Busy-waiting is always not good. Burn up 100% CPU on
  // one core. 
  for {
    mu.Lock()
    if count >= 5 || finished == 10 {
      break
    }
    mu.Unlock()
  }
  if count >= 5 {
    println("received 5+ votes!")
  } else {
    println("lost")
  }
  mu.Unlock()
}

The main thread keeps looping and testing until it gets a quorum. Notice it needs to both check count(how many of yes) and finished(how many voted) to fully observe the vote is finished.

A better solution is to use a condition variable to do that. The cond.wait() let the current main thread go to sleep and give up the lock, so other threads have more time slot to manipulate the data. And the main thread only resumes when others wake it up.

func main() {
  rand.seed(time.now().unixnano())
  count := 0
  finished := 0
  var mu sync.mutex
  cond := sync.newcond(&mu)

  for i := 0; i < 10; i++ {
    go func() {
      vote := requestvote()
      mu.lock()
      defer mu.unlock() // remember to unlock the mutex after broadcasting
      if vote {
        count++
      }
      finished++
      // broadcast to the threads that waitting on the condition variable.
      cond.broadcast()
    }()
  }
  mu.lock()   // the lock here also avoid the lost-wakeup issue.
  for count < 5 && finished != 10 {
    // this will block the current thread and give up its lock atomically 
    // so that other threads can make progress.
    cond.wait()
    // if someone wakes this up. the current thread will try to grab the
    // lock again. and then check the count and finished while holding 
    // the lock
  }
  if count >= 5 {
    println("received 5+ votes!")
  } else {
    println("lost")
  }
  mu.unlock()
}
func requestVote() bool {
  time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
  return rand.Int() % 2 == 0
}

There is a particular pattern we need to follow when using condition variables. For the side that might make changes to a critical section that will change the outcome of the condition test, you always lock first then manipulate the data. Then call broadcast and call unlock afterward.

Similarly, when you're checking the condition you grab the lock first. Then you're always checking the condition in a loop and call wait() inside it. And unlock whenever we are done. The condition test will only execute when you grab the lock(When waked up, the end of wait() will grab the lock again).

mu.Lock()
// do something that might affect the condition
cond.Broadcast()
mu.Unlock()

----

mu.Lock()
while condition == false {
    cond.Wait()
}
// now condition is true, and we have the lock
mu.Unlock()

Top comments (3)

Collapse
 
ewirch profile image
Eduard Wirch

Jolyon, your bank.go program is broken, it deadlocks: play.golang.org/p/2RE89_ER2N2

The problem is:

go func() {
  for i := 0; i < 1000; i++ {
    mu.Lock()
    defer mu.Unlock()
    bob -= 1
    alice += 1
    // mu.Unlock()
  }
}()

defer executes mu.Unlock() when the enclosing function returns. The second loop iteration will deadlock on mu.Lock().

Collapse
 
jolyon129 profile image
Jolyon

Thanks for pointing out!

Just corrected it.

Collapse
 
qm3ster profile image
Mihail Malo • Edited

If I have to block multiple goroutines until a single goroutine is ready to let them all proceed, are channels of struct{} or WaitGroups where the count will only be 1 and then 0 more efficient?

    m.inflightLock.Lock()
    inf, ok := m.inflight[k]
    if ok {
        m.inflightLock.Unlock()
        <-inf
        return m.Get(k, or)
    }
    inf = make(chan struct{})
    m.inflight[k] = inf
    m.inflightLock.Unlock()
    v = or()
    /* snip */
    close(inf)
Enter fullscreen mode Exit fullscreen mode

vs

    m.inflightLock.Lock()
    inf, ok := m.inflight[k]
    if ok {
        m.inflightLock.Unlock()
        inf.Wait()
        return m.Get(k, or)
    }
    inf = new(sync.WaitGroup)
    inf.Add(1)
    m.inflight[k] = inf
    m.inflightLock.Unlock()
    v = or()
    /* snip */
    inf.Done()
Enter fullscreen mode Exit fullscreen mode