DEV Community

Discussion on: Synchronizing Go Routines with Channels and WaitGroups

Collapse
 
handofgod_10 profile image
Vijaykumar

So waitGroup.Wait() is the join point for the all the async go routines to have been called for. Why call the Wait() function in a seperate go routine all together.. Apologies, as I do not understand that part. Also the channels are used for communication, so basically if its an error, capture that and put in the channel or if its a success, you have to put that in the channel. Just like Wait() is the join point of the goroutines to the main thread, <-c needs to be present for the channels.

Just wondering why not go with the channels alone and why use WaitGroup. Since the use case seems to be more driven towards getting response back, channels make more sense to use anyways.

If you have a chance, could you please explain. Just curious to know a little more. I am a newbie myself

Collapse
 
stevefromaccounting profile image
Stephen Yankovich • Edited

I was thinking this same question, so I created a sample scenario to see why: goplay.tools/snippet/4mkhTcLhRoS

package main

import (
    "errors"
    "fmt"
    "golang.org/x/sync/errgroup"
    "sync"
    "time"
)

func main() {

    //usingChannels()

    //usingErrGroups()

    usingChannelsWithWg()
}

// usingChannelsWithWg this one works! Each goroutine correctly spits out errors but continues on the loop, and the
// process correctly ends only when every goroutine is finished with the loop, unlike usingChannels
//
// Thanks to https://dev.to/sophiedebenedetto/synchronizing-go-routines-with-channels-and-waitgroups-3ke2
func usingChannelsWithWg() {
    var workerWg sync.WaitGroup
    errorCh := make(chan error, 1)

    workerWg.Add(3)

    go func() {
        workerWg.Wait()
        close(errorCh)
    }()

    go func(errorCh chan<- error) {
        for i := 0; i <= 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[1]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[1] ERROR: %w", err)
            }
        }

        defer workerWg.Done()
    }(errorCh)

    go func(errorCh chan<- error) {
        for i := 0; i <= 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[2]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[2] ERROR: %w", err)
            }
        }

        defer workerWg.Done()
    }(errorCh)

    go func(errorCh chan<- error) {
        for i := 0; i <= 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[3]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[3] ERROR: %w", err)
            }
        }

        defer workerWg.Done()
    }(errorCh)

channelLoop:
    for {
        select {
        case err, ok := <-errorCh:
            if ok {
                fmt.Println(err.Error())
            } else {
                fmt.Println("channel closed!")
                break channelLoop
            }
        default:
        }
    }

}

// usingErrGroups this also doesn't work because it exits out of everything based ont he very first error
//from the very first goroutine
func usingErrGroups() {
    goRoutineErrGroup := new(errgroup.Group)

    goRoutineErrGroup.Go(func() error {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[1]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                return fmt.Errorf("[1] ERROR: %w", err)
            }
        }

        return nil
    })

    goRoutineErrGroup.Go(func() error {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[2]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                return fmt.Errorf("[2] ERROR: %w", err)
            }
        }

        return nil
    })

    goRoutineErrGroup.Go(func() error {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[3]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                return fmt.Errorf("[3] ERROR: %w", err)
            }
        }

        return nil
    })

    // Wait for all HTTP fetches to complete.
    if err := goRoutineErrGroup.Wait(); err != nil {
        fmt.Printf("err from a routin: %s", err.Error())
    }
}

// usingChannels this doesn't quite work because it never exits unless we explicitly close the channel. Except we
// only have one channel, and when we close the first one, it means the second two goroutines get exited early
func usingChannels() {
    errorCh := make(chan error, 1)

    go func(errorCh chan<- error) {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[1]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[1] ERROR: %w", err)
            }
        }

        defer close(errorCh)
    }(errorCh)

    go func(errorCh chan<- error) {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[2]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[2] ERROR: %w", err)
            }
        }

        defer close(errorCh)
    }(errorCh)

    go func(errorCh chan<- error) {
        for i := 0; i < 50; i++ {
            time.Sleep(time.Millisecond * 2)
            fmt.Printf("[3]: iteration %d\n", i)

            err := triggerErr(i)
            if err != nil {
                errorCh <- fmt.Errorf("[3] ERROR: %w", err)
            }
        }

        defer close(errorCh)
    }(errorCh)

channelLoop:
    for {
        select {
        case err, ok := <-errorCh:
            if ok {
                fmt.Println(err.Error())
            } else {
                fmt.Println("channel closed!")
                break channelLoop
            }
        default:
        }
    }
}

func triggerErr(input int) error {
    if input%10 == 0 {
        return errors.New(fmt.Sprintf("an error, bc i == %d", input))
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

In short, the for loop reading from your error channel will never know when your goroutines are finished, and will continue to read forever.

Like the linked example, suppose you have 3 goroutines that are each looping through the numbers of 1 - 50, with every 10th number returning an error (10,20,30,...). Your code will look like:

errorChannel := make(chan error, 1)

// goroutine 1

// goroutine 2

// goroutine 3

for err := range errorChannel {
   // do things when there's an error, like print it out, PagerDuty, log it, etc
}
Enter fullscreen mode Exit fullscreen mode

Without a WaitGroup, that for loop will go on forever, because it'll never know when you are finished writing things to the channel. You can call close(errorChannel), but where would you call it? Logically, you want to call it "when --EVERY-- goroutine has iterated through the numbers 1 - 50". But how do you tell Go when that has happened? You do that with WaitGroups.