Problems this pattern can solve:
- When you need to process millions of records (e.g., log lines, images, or database entries), executing all steps sequentially for one item after another leads to resource underutilization. The CPU sits idle during I/O operations (disk reads, network requests), while subsequent steps wait for previous ones to complete.
- Imagine a 500-line function that does:
read -> transform -> validate -> enrich -> save. It's impossible to cover with adequate tests, difficult to extend (adding a new step requires modifying the entire function), and easy to break by affecting the logic of an adjacent step. - In concurrent environments, it's challenging to properly handle situations where a fatal error occurs at one stage. You need to stop all stages, prevent goroutine leaks, and shut down the program gracefully.
Essence: Breaking down a complex data processing operation into a sequence of independent, reusable stages, where each stage performs one specific function, and data is passed between them via channels.
Idea: Split data processing into sequential, concurrently executing stages, where the output of one stage serves as input for the next, using channels as a conveyor belt for data transfer and flow control.
Pipeline vs Other Patterns
Difference from Chain of Responsibility
Pipeline: Each element necessarily goes through all stages of the pipeline and is transformed at each stage. Data moves in one direction. It's about data processing.
Chain of Responsibility: A request is passed along a chain of handlers until one of them handles it. The request does not necessarily go through the entire chain. It's about passing responsibility. (Example: middleware in web frameworks, where one handler can return a response and break the chain).
Difference from Fan-Out / Fan-In
Pipeline: This is a structural code organization pattern.
-
Fan-Out / Fan-In: These are parallelization patterns often used inside Pipeline stages.
- Fan-Out: Launching multiple goroutines (workers) at one stage to parallelize heavy work.
- Fan-In: Collecting results from multiple workers back into a single channel.
- It can be said that Fan-Out/Fan-In are ways to implement a specific Pipeline stage to improve performance.
Difference from Observer
Pipeline: Push-model. The "generator" actively sends data to the next stage. It's a unidirectional stream.
Observer: One subject notifies many subscribers about changes in its state (one-to-many). Subscribers are passive and wait for notifications. It's about event broadcasting, not data transformation.
Example
package main
import "fmt"
// stage 1: Generator
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// stage 2: Multiplying by 2
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// Assembling the pipeline
in := gen(2, 3, 4)
out := sq(in)
// Result
for result := range out {
fmt.Println(result) // 4, 9, 16
}
}

Top comments (0)