DEV Community

J Fowler
J Fowler

Posted on

Fanout Pattern in Go

Let's take a quick look at the fanout pattern in Go. In general, fanout is used to perform a number of tasks concurrently.

For example, say you have a data pipeline and you want to process the individual items. We can use go routines and channels to split the items up as we receive them, then process the individual items (put in a dB for example).

It's a simple pattern to implement; but you need to manage the channels to prevent deadlock.

// produce is simulating our single input as a channel
func produce(id int) chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            ch <- rand.Intn(20)
        }
        fmt.Printf("producer %d done\n", id)
        close(ch) // this is important!!!
    }()
    return ch
}

func worker(id int, jobs chan int, wg *sync.WaitGroup) {
    for value := range jobs {
        odd := "even"
        if (value & 1) == 1 {
            odd = "odd"
        }
        fmt.Printf("worker: %d, got %d is %s\n", id, value, odd)
    }
    wg.Done()
}

func main() {
    inputCh := produce(1)

    numWorkers := 3
    jobs := make(chan int)

    // split input into individual jobs
    go func() {
        for value := range inputCh {
            jobs <- value
        }
        close(jobs) // this is important!!!
    }()

    // fan-out
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, &wg)
    }
    wg.Wait()

    fmt.Println("done")
}
Enter fullscreen mode Exit fullscreen mode

The main idea here is that there is a sequence of data that need to be operated on by a fixed number of workers.

For the input, we create a sequence of random numbers and place them in a channel. We them move them into another channel that the workers will pull their 'jobs' from.

In this example, it's not strictly necessary to move the input into the jobs channel. We could just as easily have the workers pull from the input channel; it's just done for clarity here.

We then send launch the fixed number of workers as goroutines. Each worker will pull from the jobs channel until there is no more data to process at which time it signals a WaitGroup that it is done.

The main thread uses a WaitGroup to make sure it does not complete until all the workers are done, ie all jobs have been processed.

A key point to mention that this pattern does not place any guarantees on the order of processing the input sequence. This may be fine in a lot of circumstances. For example, the input sequence are data records containing their own timestamp and the goal is to store the records in a dB. Fan-out in this case would be acceptable.

A final note, you will see some comments on closing the channels once all data in the sequence has been sent. This is critical. The range operator that pulls from the channels will sleep once there is no more data. You can verify this by commenting out once of the close() statements which will cause a deadlock condition. Goroutines and channels are very powerful but you have to use them wisely.

What would you do different? How can we improve this example? Leave your comments below.

Thanks!

The code for this post and all posts in this series can be found here

Top comments (0)