Goroutine is one of Golang best feature, with it we can do concurrent functions easily. But things can get pretty messy if some function is dependent to other function result, you will write lots of synchronization, which you can do using channel or waitgroup.
So I made a library to help you easily control your goroutine pipeline.
felixgunawan / safe-step
Safe goroutine flow handler
safe-step
A simple golang library to safely handle your multiple layers goroutine execution.
Installation
go get github.com/felixgunawan/safe-step
Example 1
package main
import (
"fmt"
safestep "github.com/felixgunawan/safe-step"
"time"
)
func main() {
step := safestep.New()
step.AddInput("id", 1)
f1 := func() (interface{}, error) {
fmt.Println("function 1 started")
fmt.Printf("id = %d\n", input["id"])
time.Sleep(time.Millisecond * 500)
fmt.Println("function 1 ended")
return 1, nil
}
f2 := func() (interface{}, error) {
fmt.Println("function 2 started")
time.Sleep(time.Millisecond * 750)
fmt.Println("function 2 ended")
step.AddInput("id2", 2)
return 1.5, nil
}
…Let's jump right into example, assume you have 5 functions that needs to be run (f1-f5). And you want to structure it so f1,f2,f3 runs concurrently before running f4,f5 concurrently. You can do it easily with this code below.
package main
import (
"fmt"
safestep "github.com/felixgunawan/safe-step"
"time"
)
func main() {
step := safestep.New()
step.AddInput("id", 1)
f1 := func(input map[string]interface{}) (interface{}, error) {
fmt.Println("function 1 started")
fmt.Printf("id = %d\n", input["id"])
time.Sleep(time.Millisecond * 500)
fmt.Println("function 1 ended")
return 1, nil
}
f2 := func(input map[string]interface{}) (interface{}, error) {
fmt.Println("function 2 started")
time.Sleep(time.Millisecond * 750)
fmt.Println("function 2 ended")
step.AddInput("id2", 2)
return 1.5, nil
}
f3 := func(input map[string]interface{}) (interface{}, error) {
fmt.Println("function 3 started")
time.Sleep(time.Millisecond * 1000)
fmt.Println("function 3 ended")
return 3, nil
}
f4 := func(input map[string]interface{}) (interface{}, error) {
fmt.Println("function 4 started")
time.Sleep(time.Millisecond * 100)
fmt.Println("function 4 ended")
return "abcde", nil
}
f5 := func(input map[string]interface{}) (interface{}, error) {
fmt.Println("function 5 started")
fmt.Printf("id2 = %d\n", input["id2"])
time.Sleep(time.Millisecond * 1000)
fmt.Println("function 5 ended")
return 5, nil
}
// this will :
// 1. run f1,f2,f3 in goroutine and wait for all of them to finish
// 2. run f4,f5 in goroutine and wait again
// 3. return result of all function execution in map
res, err := step.
AddFunction("f1", f1).
AddFunction("f2", f2).
AddFunction("f3", f3).
Step().
AddFunction("f4", f4).
AddFunction("f5", f5).
Do()
if err != nil {
fmt.Printf("err = %v", err)
}
fmt.Printf("result = %v", res)
}
Output :
function 1 started
id = 1
function 2 started
function 3 started
function 1 ended
function 2 ended
function 3 ended
function 5 started
id2 = 2
function 4 started
function 4 ended
function 5 ended
result = map[f1:1 f2:1.5 f3:3 f4:abcde f5:5]
This library have 2 main features :
- Safe execution, it will automatically recover from panic inside your function.
- Context-aware (use
safestep.NewWithContext(ctx)
), so you can easily add timeout.
You can find out more about this library here. Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Top comments (0)