disclaimer:
It is just a pattern that wraps Goroutine and channel. It is still Goroutine underneath. It is not bringing "coloring" to Go code. Functions still call other functions as normal functions, without the need to distinguish async/sync ones.
Goroutine
For beginners of Go who come from other "colored" languages like JavaScript, one big difference in Golang's mental model is Goroutine.
func Add() int {
a := 1
b := 2
var c int
doneSignal := make(chan struct{}, 1)
go func() { // fire a function running in parallel with Goroutine
c = 3 // able to edit variable outside the local scope
go func() {}() // able to start child Goroutine inside a goroutine
doneSignal <- struct{}{}
}()
<- doneSignal // wait for the goroutine to complete
return a + b + c
}
Unlike JavaScript, you can fire a function that runs in parallel anywhere in your function code without making that function async. It is one key strength that makes Golang stand out against other languages
Async/Await
If you came from "colored" languages, you can make it into your familiar Async/Await while not breaking Golang's simplicity, just by wrapping it up a bit.
func Async(fn func(), doneSignal chan struct{}) {
go func() {
fn()
doneSignal <- struct{}{}
}()
}
func Await(doneSignal chan struct{}) {
<- doneSignal
}
func Add() int {
a := 1
b := 2
sumCh := make(chan int, 1)
doneSignal := make(chan struct{}, 1)
Async(func() {
sumCh <- a + b + 3
}, doneSignal)
Await(doneSignal)
return <-sumCh
}
Better Async/Await
We can make this even better if these problems can be solved:
- A
donesignal must be repeatedly created for each Goroutine. - When encapsulation is demanded, passing parameters and return values becomes hard.
- No easy way to stop a running Goroutine.
- When child Goroutine outlives parent, things get worse to manage.
- How about error handling inside and outside of Goroutine?
These are real problems my own team project faced when applying Go concurrency. We figured out one solution and it worked well. So I distilled the most general and reusable part of it and made this library go-opera. Making error handling better is also another goal of this library.
Without wasting time, your final Async/Await pattern would look like this:
// This code is from a real Fiber http handler, which queries both database record count and record data concurrently.
func(ctx context.Context, q *gorm.DB, p Params) error {
return opera.Do(func () any {
logTask := opera.Async(ctx, func(ctx context.Context) opera.Result[opera.Unit] {
return opera.TryPass(pushLogsToRemote(ctx))
})
visitTask := opera.Async(ctx, func(ctx context.Context) opera.Result[opera.Unit] {
return opera.TryPass(redisClient.Incr(ctx, "visit:"+p.Path).Err())
})
countTask := opera.Async(ctx, func(ctx context.Context) opera.Result[int64] {
return opera.Try(q.Count(ctx, "*"))
})
dataTask := opera.Async(ctx, func(ctx context.Context) opera.Result[[]T] {
size := p.Size.Or(20)
offset := (p.Page.Or(1) - 1) * size
q.Offset(offset).Limit(size)
return opera.Try(q.Find(ctx))
})
// Side tasks still report errors through Result, so Await them as well.
opera.Await(ctx, logTask).Yield()
opera.Await(ctx, visitTask).Yield()
// Getting results of two async tasks in one line
count, data := opera.Await(ctx, countTask).Yield(), opera.Await(ctx, dataTask).Yield()
return nil
}).Err()
}
What if you do it all with goroutine?
// Pure goroutine version of the above example (no go-opera).
func HandleRequest(parent context.Context, q *gorm.DB, p Params) error {
ctx, cancel := context.WithTimeout(parent, 2*time.Second)
defer cancel()
type countRes struct {
v int64
err error
}
type dataRes struct {
v []T
err error
}
countCh := make(chan countRes, 1)
dataCh := make(chan dataRes, 1)
logErrCh := make(chan error, 1)
visitErrCh := make(chan error, 1)
// Start concurrent tasks
go func() {
cnt, err := q.Count(ctx, "*")
countCh <- countRes{cnt, err}
}()
go func() {
size := p.Size.Or(20)
offset := (p.Page.Or(1) - 1) * size
q.Offset(offset).Limit(size)
data, err := q.Find(ctx)
dataCh <- dataRes{data, err}
}()
// Side tasks (non-fatal)
go func() {
logErrCh <- pushLogsToRemote(ctx)
}()
go func() {
visitErrCh <- redisClient.Incr(ctx, "visit:"+p.Path).Err()
}()
// Await side tasks (log errors, but don't fail request)
if err := <-logErrCh; err != nil {
log.Println("logTask failed:", err)
}
if err := <-visitErrCh; err != nil {
log.Println("visitTask failed:", err)
}
// Await main tasks and return early on error
cr := <-countCh
if cr.err != nil {
return cr.err
}
dr := <-dataCh
if dr.err != nil {
return dr.err
}
count := cr.v
data := dr.v
_ = count
_ = data
return nil
}
The above raw Goroutine code is just a simple rewrite. If error propagation and goroutine cancellation added, code would become even more bloated.
Introducing go-opera
What go-opera brings you for async/await-like programming:
Better encapsulation
Simple Goroutines return result of execution by just editing outer variable directly:
func LoadUsername() string {
var username string
doneSignal := make(chan struct{}, 1)
go func() {
username = fetchUsernameFromDB()
doneSignal <- struct{}{}
}()
<-doneSignal
return username
}
This is bad for encapsulation and function composition.
With go-opera, you start a Goroutine by using Async, and the return value of the execution is automatically encapsulated for you. You get that value with Await.
func LoadUsername(ctx context.Context) string {
usernameChannel := opera.Async(ctx, func(ctx context.Context) opera.Result[string] {
return opera.Try(fetchUsernameFromDB(ctx))
})
username := opera.Await(ctx, usernameChannel).OrPanic()
return username
}
Under the hood, Async gives you a chan through which the final execution result is passed to you. And Await simply blocks the main routine and waits for that result.
That follows the nature of Golang, not "coloring" it.
No need for manual done signal or WaitGroup
Using WaitGroup, you have to manually count done signal:
func LoadDashboard() (int, []Order, error) {
var wg sync.WaitGroup
var count int
var orders []Order
var countErr error
var ordersErr error
wg.Add(2)
go func() {
defer wg.Done()
count, countErr = repo.CountOrders()
}()
go func() {
defer wg.Done()
orders, ordersErr = repo.ListOrders()
}()
wg.Wait()
if countErr != nil {
return 0, nil, countErr
}
if ordersErr != nil {
return 0, nil, ordersErr
}
return count, orders, nil
}
With go-opera, as said above, every Async is actually giving a chan, and that chan delivers the final Result to unblock the main routine execution, so that you do not need to manually create done signal or use WaitGroup for this pattern at all.
[write a simple sample that also count orders and list orders]
Better error handling
It is even more crucial to properly handle failable operations in concurrency code. The conventional val, err := way of handling error does not make life easier because chan type does not accept multi-return values.
userCh := make(chan User, 1)
errCh := make(chan error, 1)
go func() {
user, err := repo.FindUser(id)
if err != nil {
errCh <- err
return
}
userCh <- user
}()
select {
case user := <-userCh:
_ = user
case err := <-errCh:
return err
}
What go-opera does is stick to Result type, enforcing Async, Await, and all other related utility functions to use Result type as intermediate processing value. You will always get the error information no matter how deep you nest Async.
userTask := opera.Async(ctx, func(ctx context.Context) opera.Result[User] {
return opera.Try(repo.FindUser(ctx, id))
})
user, err := opera.Await(ctx, userTask).Get()
Structured concurrency
Structured concurrency is a widely promoted pattern for modern concurrency design. go-opera Async/Await helps you create such concurrency with ease. All you have to do is give the same context.Context instance. When that context.Context is Done, all Goroutines created by go-opera Async/Await will be cancelled. You can also derive context.Context from parent, which enforces child Goroutine to not outlive its parent.
func HandleRequest(parent context.Context) opera.Result[opera.Unit] {
return opera.Do(func() opera.Unit {
ctx, cancel := context.WithTimeout(parent, 2*time.Second)
defer cancel()
reportTask := opera.Async(ctx, func(ctx context.Context) opera.Result[Report] {
return opera.Try(buildReport(ctx))
})
auditTask := opera.Async(ctx, func(ctx context.Context) opera.Result[opera.Unit] {
return opera.TryPass(writeAuditLog(ctx))
})
report := opera.Await(ctx, reportTask).Yield()
opera.Await(ctx, auditTask).Yield()
return opera.U
})
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
HandleRequest(ctx)
cancel // It cancels all goroutines created by HandleRequest
}
There are also other useful utility functions for Async/Await-like programming:
AwaitAll
Use AwaitAll when several async tasks are all required for the next step. It preserves input order, treats nil channels as empty values, and returns early if any task fails.
countTask := opera.Async(ctx, func(ctx context.Context) opera.Result[int] {
return opera.Try(repo.CountOrders(ctx))
})
ordersTask := opera.Async(ctx, func(ctx context.Context) opera.Result[[]Order] {
return opera.Try(repo.ListOrders(ctx))
})
all := opera.AwaitAll[any](ctx, countTask, ordersTask).Yield()
count := opera.MayCast[int](all[0]).OrEmpty()
orders := opera.MayCast[[]Order](all[1]).OrEmpty()
If all channels have the same type, use AwaitAllT instead:
prices := opera.AwaitAllT(ctx, priceTask1, priceTask2, priceTask3).Yield()
AwaitAllSettled
Use AwaitAllSettled when you want every task outcome, including failures, instead of stopping on the first error. The returned slice keeps the same order as the input channels.
results := opera.AwaitAllSettled[any](ctx, profileTask, ordersTask, auditTask)
for _, result := range results {
if err := result.Err(); err != nil {
log.Println("task failed:", err)
continue
}
log.Println("task value:", result.Yield())
}
For homogeneous channels, AwaitAllSettledT keeps the call site type-safe:
results := opera.AwaitAllSettledT(ctx, job1, job2, job3)
AwaitFirst
Use AwaitFirst when several alternative operations can satisfy the same need and you want the first successful one. It returns both the value and the index of the winning channel. Errors are ignored until all tasks fail.
cachedTask := opera.Async(ctx, func(ctx context.Context) opera.Result[User] {
return opera.Try(cacheRepo.FindUser(ctx, id))
})
primaryTask := opera.Async(ctx, func(ctx context.Context) opera.Result[User] {
return opera.Try(primaryRepo.FindUser(ctx, id))
})
winner := opera.AwaitFirstT(ctx, cachedTask, primaryTask).Yield()
user := winner.V0
sourceIndex := winner.V1
AwaitZip, AwaitZip3 ... AwaitZip9
Use the zip helpers when you want results from multiple tasks as a typed tuple instead of a []any. This is especially useful for heterogeneous tasks.
countTask := opera.Async(ctx, func(ctx context.Context) opera.Result[int64] {
return opera.Try(repo.CountOrders(ctx))
})
ordersTask := opera.Async(ctx, func(ctx context.Context) opera.Result[[]Order] {
return opera.Try(repo.ListOrders(ctx))
})
summary := opera.AwaitZip(ctx, countTask, ordersTask).Yield()
count, orders := summary.V0, summary.V1
There are zip helpers up to AwaitZip9, so you can keep strong typing even when coordinating several different result types.
Give it a try: go-opera
Top comments (0)