DEV Community

Silver_dev
Silver_dev

Posted on

Concurrency patterns on Golang: Fan-out / Fan-in

Problems this pattern can solve:

  1. You have 8 cores, but your data processing stage runs sequentially in a single goroutine. The CPU sits idle while the task queue grows. We create N workers (one per core) that pull tasks from a shared channel in parallel, utilizing all available cores.
  2. Suppose your Pipeline has a stage that calls an external API or resizes images. This is slow. If left in a single thread, the entire pipeline will bottleneck at this stage.
  3. You've launched 100 goroutines for web scraping. How do you collect all results in one place for final database writing, without using global variables and mutexes.
  4. Creating a new goroutine for each incoming request is dangerous. Under peak load, this can crash the service (panic due to memory exhaustion).

Essence: A pattern consisting of two phases that work in tandem to parallelize tasks.
- Fan-out: The process of launching multiple goroutines (workers) to read tasks from a single input channel. This distributes the load.
- Fan-in: The process of merging results from multiple goroutines into a single output channel. This consolidates data for final processing.

Idea: Parallelize the execution of similar tasks by distributing them among a fixed pool of workers and subsequently aggregating results through multiplexing of output channels.

Fan-out / Fan-in vs Other Patterns

Difference from Pipeline

  • Fan-out/Fan-in: This is about horizontal scaling of a single stage. We take one step and multiply its executors.

  • Pipeline: This is about vertical decomposition of a process into different steps (read -> process -> write). Fan-out/Fan-in often lives inside one specific stage of a Pipeline.

Difference from Worker Pool

  • Actually, they are practically the same thing. Fan-out/Fan-in is a conceptual description of how a worker pool is structured.

  • Fan-out = task dispatching to the pool. Fan-in = result collection from the pool.
    You could say that Fan-out/Fan-in is an architectural pattern, and Worker Pool is its concrete implementation for executing tasks.

Difference from Pub/Sub (Publish-Subscribe)

  • Fan-out/Fan-in: Each task from the input channel is received by only one worker. This is work distribution (competition for tasks).

  • Pub/Sub: Each message is received by all subscribers. This is broadcast distribution.

Example

package main

import (
    "context"
    "fmt"
    "sync"
)

// Task generation
func generateJobs(n int) <-chan int {
    ch := make(chan int)
    go func() {
        for i := 1; i <= n; i++ {
            ch <- i
        }
        close(ch)
    }()
    return ch
}

// Fan-out: Distributing tasks among workers
func fanOut(ctx context.Context, jobs <-chan int, numWorkers int) []<-chan int {
    workerChannels := make([]<-chan int, 0, numWorkers)

    for i := 0; i < numWorkers; i++ {
        resultCh := make(chan int)

        go func() {
            defer close(resultCh)
            for {
                select {
                case job, ok := <-jobs:
                    if !ok {
                        return // Task channel closed
                    }
                    // Task processing (example: squaring)
                    resultCh <- job * job
                case <-ctx.Done():
                    return // Cancellation via context
                }
            }
        }()

        workerChannels = append(workerChannels, resultCh)
    }
    return workerChannels
}

// Fan-in: Merging results
func fanIn(channels []<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    wg.Add(len(channels))

    for _, ch := range channels {
        go func(c <-chan int) {
            defer wg.Done()
            for res := range c {
                merged <- res
            }
        }(ch)
    }

    // Goroutine to close the final channel
    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // 1. Generate tasks
    jobs := generateJobs(5)

    // 2. Fan-out: distribute tasks among 3 workers
    resultChannels := fanOut(context.Background(), jobs, 3)

    // 3. Fan-in: merge results from all channels
    mergedResults := fanIn(resultChannels)

    // 4. Results
    for res := range mergedResults {
        fmt.Printf("Получен результат: %d\n", res)
    }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)