DEV Community

Rix
Rix

Posted on

Rx with Go Generics

TLDR;

It's hard to write scalable concurrent programs with many async operation compositions correctly handling Lifecycle compositions with proper error propagation and teardown support. We show these limitations on Go's official async build blocks, then we discuss unofficial solutions, and eventually introduce the experimental Rx implementation (gopkg.in/rx.v0) that provides the ultimate abstration.


Intro

Go's potential to handle large concurrency is amazing. Especially it offers three interesting features - Go routine, channels, and Context. I had my love and hate dealing with them, but there are some other parts of the language that made it cumbersome to write scalable concurrent code. Specifically, because of the lack of generics, I found myself writing a lot of similar code with merely different data types.

Concurrent programs consist of many asynchronous operations. A scalable programming system for concurrent applications should be able to easily compose, orchestrate, and schedule asynchronous operations.

Lifecycle Representation

I like to abstract asynchronous operations by Lifecycles. A lifecycle representation of an asynchronous operation consists of one start, one end, with potentially zero to many valuable inputs and outputs, and zero or one interruption point either by error, or cancellation. No value inputs or outputs can happen before the start, after an interruption, or after the end.

If a child Lifecycle is said to be bounded to a parent Lifecycle, it means:

  • the child Lifecycle can only start at no earlier than the start and no later than the end of the parent Lifecycle.
  • the child Lifecycle can only end no later than the end of the parent Lifecycle.
  • the child Lifecycle should be interrupted together with the parent Lifecycle.

Before we can properly talk about the composition of async operations, we have to understand how can we represent async operations and their Lifecycle with code, in Golang.

Let's start with a synchronous function for illustration:

func plusOne(x int) int {
    return x + 1
}
Enter fullscreen mode Exit fullscreen mode

This operation starts from entering the function block. In the beginning, it takes an input x, does some calculation with it, and outputs the result. Then the operation ends at the exit of the function block. We can see it has everything of a Lifecycle except interruption. Because this is a synchronous operation, there's no way we can interrupt it. But we should be able to interrupt an async operation.

Now let's introduce interruption by converting the above operation into an async version:

func plusOne(ctx context.Context, input <-chan int) chan<- int {
    output := make(chan int)
    go func() {
        for {
            select {
            case x := <-input:
                select {
                case output <- x + 1:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}
Enter fullscreen mode Exit fullscreen mode

This time we utilized three interesting and powerful features in Golang which are the basic building blocks for async operations: Go routine, channels, and Context.

Goroutines

Goroutines is probably the easiest of the three. go f() is the special syntax of starting a Goroutines. It means starting a new "thread", and runs function f() on that "thread". So that the lines after it can continue to execute without waiting for f() to finish. And f() may continue to run until its completion even after the function starting the Go routine has returned. So in this example, we created an output variable, started a Goroutine to do something on another "thread", and just return the output variable.

I put "thread" in quotes because they are not the system thread we used to know, but something much more lightweight. We are not going to dig into the details of that. You can just conceptually think of them as the thread for the benefits of this article.

Channels

Channels, on the other hand, are something that looks simple but with a lot of catches. When you prefix chan in front of another type, you got a new type which is a channel for that type. You create a channel with make(chan int) built-in function. Channels can have an internal buffer size but we will not get into that in this article. Three operations can be applied to a channel.

The first is to send a value through the channel. We write c <- x to denote sending the value of variable x through channel c.

The second is to receive a value from the channel. We write y = <- c to denote receiving a value from channel c and set it to variable y.

Third is you can call close(c) on a channel. Every channel has a state of either open or closed. close(c) will change the channel c state from open to closed. A closed channel cannot be opened again, and calling close(c) on an already closed channel will panic.

When a channel is open, a send operation will block until a receive operation is executed on the other end, on another "thread"; and a receive operation will block until a send operation is executed on the other end, on another "thread". However, if a channel is closed, things got complicated: a send operation will complete immediately with panic information: panic: send on closed channel; and a receive operation will complete immediately with an empty value being returned. For example x = <-c where c is a closed chan int channel, will complete immediately with x set to int empty value 0. To distinguish a received value from an open channel from a closed channel, one can use x, ok = <-c to both receive a channel value and a boolean state ok where is true when the channel is open, and false if the channel is closed.

One often technique to create a single toggle signal is to first create a channel, and later close it to toggle it. This way, before the toggle, any receive operation on the channel will block, and once the toggle happens, the channel becomes closed, and all receive operations will complete with an empty value altogether. And future receive operation will be complete immediately with the empty value as well. A practical example of such a single toggle signal is an interruption signal like a cancellation.

For unbuffered channels (it's all we use in this article), a data send will block forever if no one is receiving the data, and vice versa. A successful send operation must match with a successful receive operation or otherwise, the excess operating side will block (with some exceptions discussed below).

The select Statement

However, there is a way to avoid blocking when doing channel send and receive operations:

select {
case c <- x:
default:
}

select {
case y = <- c:
default:
}
Enter fullscreen mode Exit fullscreen mode

This way the send or receive operation will get skipped if they can't complete it immediately.

A select statement with multiple channel I/O cases will block until any one of those cases can complete. If multiple cases can complete at the same time, select will randomly choose and execute one of them. A select statement with a default clause will try to check if any of the cases can be completed immediately, and execute the default clause if none.

This means we can use select to block on multiple channel I/O operations together, and let the first that completes executing:

select {
case output <- x + 1:
case <-ctx.Done():
    return
}
Enter fullscreen mode Exit fullscreen mode

This part of the code will block the write operation on channel output, and the read operation on channel ctx.Done(). It will until either of the two cases can complete, and it will continue execution of that case. In this specific example, the ctx.Done() channel is the source of interruption. A value received from this channel means the operation Lifecycle has been canceled.

We use this select statement on both receiving a value from the input channel, and sending a result to the output channel because they are both blocking operations that we want to interrupt if the Lifecycle is interrupted.

Go Context

Now is a good time to explain Go Context. The Go Context is Golang's attempt to represent some part of the Lifecycle as a function argument, usually passed as the first parameter, to every function that performs asynchronous operations. The Context can register different kinds of cancellation signals, and spawn child Context while doing so.

For example, a parent Context can spawn a child Context with explicit cancellation function:

func parent(parentCtx context.Context) {
    childCtx, cancel := context.WithCancel(parentCtx)
    go func() {
        <-time.NewTimer(time.Second).C
        cancel()
    }()
    child(childCtx)
}
Enter fullscreen mode Exit fullscreen mode

A child Context Lifecycle is bounded by its parent Context Lifecycle. It means that if the parent Context Lifecycle is being interrupted or ended, the child Context Lifecycle will change to the same state. In addition, since we created an explicit cancellation signal - a cancel function, the child Context Lifecycle can also be interrupted by calling cancel(), and it will interrupt the child Context Lifecycle with a cancellation signal - the childCtx.Done() channel will be closed, and all receive operations on that channel will complete with an empty value altogether. Here in this example, we spawn a Goroutine to count 1 second and then call cancel() to interrupt the child Context Lifecycle with a cancellation signal.

Because the above example is a common use case, Go Context package included another helper function to register a timeout on the spawned child Context:

childCtx, cancel := context.WithTimeout(parentCtx, time.Second)
Enter fullscreen mode Exit fullscreen mode

With these basic async operation building blocks explained, we can finally look back at our earlier async example code:

func plusOne(ctx context.Context, input <-chan int) chan<- int {
    output := make(chan int)
    go func() {
        for {
            select {
            case x := <-input:
                select {
                case output <- x + 1:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}
Enter fullscreen mode Exit fullscreen mode

Its Lifecycle starts at entering the function block, its child Lifecycle is bounded by the parent Lifecycle, starting at entering the Go routine function block. The Lifecycle consumes input values and yields output values with the channel operations on input and output channels. It receives interruption of cancellation from receiving the ctx.Done() channel, and finally ends after return from the Go routine.

Async Operation Composition

Now we can finally properly discuss the composition of async operations. A composition of two asynchronous operations has to accomplish two things:

  1. It has to properly route the inputs and outputs of values between the two operations, usually with one being the source, and the other being the destination.
  2. It has to manage the Lifecycles of the two operations and generates the composed Lifecycle as a result.

The first part is easy to understand. It's the same as composing synchronous operations. For example:

func plusOne(x int) int {
    return x + 1
}

func double(x int) int {
    return x * 2
}

func plusOneThenDouble(x int) int {
    return double(plusOne(x))
}
Enter fullscreen mode Exit fullscreen mode

Here we wire the input of the composed function to the input of plusOne. Then wire the output from plusOne to the input of double. Finally, wire the output of double as the output of the composed function.

Now let's upgrade plusOne and double into async operations. We already wrote plusOne above, now let's do the same for double:

func plusOne(ctx context.Context, input <-chan int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for {
            select {
            case x, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- x + 1:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}

func double(ctx context.Context, input <-chan int) <-chan int {
    output := make(chan int)
    go func() {
        defer close(output)
        for {
            select {
            case x, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- x * 2:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return output
}
Enter fullscreen mode Exit fullscreen mode

This time we also consider the case where input channels are being closed. It will end the Go routine if either the input channel is closed, or if the Context is being canceled. This way operation termination can happen in two directions - a source operation that produces the inputs to the current operation can end the current operation, and cancellation on the Context provided from a consumer can also interrupt and end the current operation. We also added defer close(output) to the Go routine so that it always close its output channel just before it ends.

Now let's try to do a composition of the two operations that satisfy our two requirements of asynchronous operations composition.

func plusOneThenDouble(ctx context.Context, input <-chan int) <-chan int {
    return double(ctx, plusOne(ctx, input))
}
Enter fullscreen mode Exit fullscreen mode

As we can see, for simple async operations composition, Go Context makes it very easy to do so. We can just pass the Context to child operations and their Lifecycle will be bounded by the Context passed in.

Everything looks nice and good so far. However, now I'm going to show you the dark side of the coin. It's mainly about two things and is commonly considered together - error propagation and teardown after interruption.

Now let's consider another function composition that involves with error:

func f(x, y int) (int, error) {
    x = double(x)
    y = plusOne(y)
    if y == 0 {
        err = errors.New("divide by zero")
    }
    return x / y, nil
}
Enter fullscreen mode Exit fullscreen mode

It would be messy trying to write the async version of this composition. First of all, we cannot return an error together with the output channel, because real work happens on another thread we cannot know any error before all the work on the other thread has finished. If we wait for all the work and get the error from the other thread and return it, it would mean the caller cannot start receiving the result before all works have finished. It defeats the purpose of async operation - we have to provide a way to signal and propagate the errors during the actual work outside of the f() running thread.

// NOT A GOOD PATTERN, DON'T USE!
func f(ctx context.Context, x, y <-chan int) (<-chan int, <-chan error) {
    output := make(chan int)
    errorC := make(chan error)
    x = double(ctx, x)
    y = plusOne(ctx, y)
    ctx, cancel := context.WithCancel(ctx)
    go func() {
        var err error
        defer func() {
            cancel()
            close(output)
            errorC <- err
            close(errorC)
        }()

        for {
            var ok bool
            var a, b int
            select {
            case a, ok = <-x:
                if !ok {
                    return
                }
            case <-ctx.Done():
                return
            }
            select {
            case b, ok = <-y:
                if !ok {
                    return
                }
            case <-ctx.Done():
                return
            }
            if y == 0 {
                err = errors.New("divide by zero")
                return
            }
            select {
            case output <- a / b:
            case <-ctx.Done():
                return
            }
        }
    }()
    return output, errorC
}
Enter fullscreen mode Exit fullscreen mode

To use this composed function, the caller has to consume the returned error channel:

// NOT A GOOD PATTERN, DON'T USE!
output, errorC := f(ctx, x, y)
for n := range output {
    fmt.Println(n)
}
if err := <-errorC; err != nil {
    panic(err)
}
Enter fullscreen mode Exit fullscreen mode

This implementation is complicated, let's dig in to understand what we've done. First, it returns two channels, one for value output, and another for error output. Value output may emit zero or multiple values and close. The error output will emit one value and close. A consumer needs to drain both channels, otherwise, the async operation may block forever without termination.

Then it spawned a child Context with an explicit cancellation signal. The child Context is canceled at the end of the Go routine (in the defer statement). Meaning it will interrupt the child async operations (double and plusOne) when the current operation is ending.

In the main function body, we have a for loop that reads pairs of values from double and plusOne output channels. It receives each value while checking for parent Context interruption, and ends its operation if being interrupted.

Then it checks for divide by zero error, set the error, and bail out if found. In such an error case, the defer statements will perform the following: explicitly cancel the child Context Lifecycles, letting double and plusOne to terminate; close the output channel so the consumer will receive no more success values from the channel; send the err value to errorC channel. Since we require a consumer to drain both output and errorC channels, this send operation will be complete. Finally, it closes the errorC channel so no more success values can be received from a consumer.

If there's no error, it will send the division result to the output channel, with potential Context interruption.

However, this has gone too far and too complicated to keep track of everything one has to do to write a working and correct async operation composition using this pattern. Furthermore, this pattern is not scalable - if you add an error channel to both the double and plusOne function returns, and try to wire the errors from the child operations to the parent's, it will be a nightmare. I mean, don't try it, but if you did, I'd like to know how it went for you.

We have seen cancellation that propagates by closing the ctx.Done() channel, but we don't see how errors can propagate to child Lifecycles - the best we can do above is to manually cancel() the children, and that doesn't distinguish an error and a cancellation - they both terminate the child Lifecycles. But if we want that distinction in the child Lifecycles, there's no easy way to do it with the above pattern. So that summarized the problem of error propagation. Then we have the problem of teardown after an interruption. If your async operation has important work to do after getting interrupted, like releasing some important resources, before it ends, you can register them using the defer statement, but the parent operation can't tell when you finished. If the parent wants to wait until all children operations have released all resource and terminates, until it continues execution, there's no scalable way to do it with the above pattern. The best we can do is to pass down a WaitGroup and let every child operation register their Go routine with it, and in every child Go routine, register a wg.Done() with the defer statement after teardown. Then the parent operation can call wg.Wait() to wait for all the children to finish teardown. This approach doesn't scale because WaitGroup cannot be nested or fan-out, making it only usable at one particular level of operation to join all children operations. Any child-level operation doesn't have the ability to tap in and do the same.

The Tomb Package

A very good solution to these problems of naive async patterns with the Go Context is the Tomb package, written by Gustavo Niemeyer. An explanation and introduction of the package can be found on his blog post here. Or TLDR:

The model is simple: a Tomb tracks whether one or more goroutines are alive, dying, or dead, and the death reason.

This model perfectly matches what I described as a Lifecycle:

  • alive means after the start of the Lifecycle.
  • dying means the Lifecycle was just being interrupted, or if all the Go routines it manages are completed.
  • dead means the Lifecycle was completed without interruption or finished its teardown after dying, and ended.
  • death reason means the error if any, during the operation execution, or the propagated error if it's being interrupted.

Tomb is an improved alternative to Go Context. It has intuitive interface symantics and comprehansive Lifecycle management tools. Using it we can design async operations with proper Lifecycle control and composition more easily. Let's dive into the adapted exmaple:

func plusOne(t *tomb.Tomb, input <-chan int) <-chan int {
    output := make(chan int)
    t.Go(func() (err error) {
        defer close(output)
        for {
            select {
            case x, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- x + 1:
                case <-t.Dying():
                    return
                }
            case <-t.Dying():
                return
            }
        }
    })
    return output
}

func double(t *tomb.Tomb, input <-chan int) <-chan int {
    output := make(chan int)
    t.Go(func() (err error) {
        defer close(output)
        for {
            select {
            case x, ok := <-input:
                if !ok {
                    return
                }
                select {
                case output <- x * 2:
                case <-t.Dying():
                    return
                }
            case <-t.Dying():
                return
            }
        }
    })
    return output
}

func f(t *tomb.Tomb, x, y <-chan int) <-chan int {
    x = double(t, x)
    y = plusOne(t, y)
    output := make(chan int)
    t.Go(func() (err error) {
        defer close(output)
        for {
            var ok bool
            var a, b int
            select {
            case a, ok = <-x:
                if !ok {
                    return
                }
            case <-t.Dying():
                return
            }
            select {
            case b, ok = <-y:
                if !ok {
                    return
                }
            case <-t.Dying():
                return
            }
            if y == 0 {
                err = errors.New("divide by zero")
                return
            }
            select {
            case output <- a / b:
            case <-t.Dying():
                return
            }
        }
    })
    return output
}
Enter fullscreen mode Exit fullscreen mode

It looks similar to our previous example, but if we look closer, all three async operations have the same function signature, with t *tomb.Tomb is the first parameter. We are doing a few things differently with Go Context being replaced:

  1. Instead of using the go f() language statement to start a Go routine, we use t.Go(func() error { ... }) instead. This will magically bind the spawned Go routine's Lifecycle to the Lifecycle represented by the Tomb object t. We will explain how it does that later.

  2. Instead of using <-ctx.Done(), we use <-t.Dying() for listening for interruption. As we briefly explained above, dying has specific semantics in Tomb that explicitly means that an interruption has just happened, to distinguish that with dead which means when an operation ended, either normally completed, or finished teardown after interruption. While in Go Context we don't have such distinction.

  3. In the combined function f, we no longer use a separate errorC channel to propagate errors. The Tomb object takes care of error propagation because it's part of an operation's Lifecycle. When you spawn a Tomb wrapped Go routine with t.Go(func() error { ... }), the error returned from the routine function will be checked by Tomb. If it's non-nil, it will treat it as an error and issue an interruption with that error on the Tomb's Lifecycle.

When a Tomb is interrupted, it will close its t.Dying() channel, making all receivers of the channel complete. Goroutines that registered with a Tomb object should listen to that t.Dying() channel, and enter teardown once it received the interruption signal.

A Tomb can also be interrupted explicitly by calling t.Kill(err). It will not check if the error passed in is non-nil. It will always interrupt the Tomb's Lifecycle with the error value passed in. The convention here is that, if the interruption error is nil, we should consider it a cancellation, and if it's non-nil, we should consider it an actual error propagated to this Lifecycle. If interruption happens more than one time, either that multiple Goroutines returned non-nil errors, or t.Kill(err) being called multiple times, only the first interruption is acknowledged, and the rest will be ignored.

The Tomb object will save the interruption's error value, i.e. death reason, and can be queried by t.Err().

That concludes the error propagation of Tomb. Now how does Tomb know if an async operation, including all of its children's Lifecycle, has all ended? Since the pattern here lets every spawned Goroutine run its teardown after it received an interruption signal, the Goroutine will only return when all of its teardowns is finished. And since we register the Go routine with Tomb's wrapper, Tomb keeps track of every Go routine it helped spawned, and when all those Go routines returned, the Tomb object can be sure that there's no more async operation or teardown to happen, it will transit its state to dead, and close its t.Dead() channel.

Tomb also provides a err = t.Wait() function that will block until t enters dead state, and returns its death reason.

Using Tomb we can compose async operations in a scalable way, especially because it has a versatile error propagation system, and tools to keep track of teardown and real death of child operations.

Let's push it a bit to try something more advanced and see how Tomb can handle it.

Fan-in and Fan-out

Let's write an async operation that takes an input of Twitter user names, and for each user, get the first 10 tweets and download images in those tweets. We can have 5 concurrent workers loading the user tweets, and another 10 concurrent workers downloading images. We will ignore the proper integration with Twitter API, and simplify things a bit to assume we already have a couple of working functions dealing with Twitter APIs.

func BindTomb(parent *tomb.Tomb, child *tomb.Tomb, teardown func() error) {
    parent.Go(func() (err error) {
        select {
        case <-child.Dying():
            err = child.Wait()
        case <-t.Dying():
            child.Kill(t.Err())
        }
        if teardown != nil {
            parent.Go(teardown)
        }
        return
    })
}

func DownloadUserTop10TweetsImages(t *tomb.Tomb, users <-chan string) <-chan *Image {
    tweets := make(chan *Tweet)
    images := make(chan *Image)

    tweetT := new(tomb.Tomb)
    imageT := new(tomb.Tomb)

    for i := 0; i < 5; i++ {
        tweetT.Go(func() (err error) {
            for {
                var ok bool
                var user string
                var tweet *Tweet
                select {
                case user, ok = <-users:
                    if !ok {
                        return
                    }
                case tweetT.Dying():
                    return
                }
                if tweet, err = FetchUserTop10Tweets(tweetT, user); err != nil {
                    return
                }
                select {
                case tweets <- tweet:
                case tweetT.Dying():
                    return
                }
            }
        })
    }

    for i := 0; i < 10; i++ {
        imageT.Go(func() (err error) {
            for {
                var ok bool
                var tweet *Tweet
                var image *Image
                select {
                case tweet, ok = <-tweets:
                    if !ok {
                        return
                    }
                case imageT.Dying():
                    return
                }
                if image, err = DownloadTweetImage(imageT, tweet); err != nil {
                    return
                }
                select {
                case images <- image:
                case imageT.Dying():
                    return
                }
            }
        })
    }

    BnidTomb(t, tweetT, func() (err error) {
        close(tweets)
        return
    })

    BnidTomb(t, imageT, func() (err error) {
        close(images)
        return
    })

    return images
}
Enter fullscreen mode Exit fullscreen mode

Since we are spawning two groups of concurrent workers, we want to have fine grind control over their Lifecycles as two different groups - that we want to close their output channel after each group is finished (dead). To give us that level of control, we are creating two Tomb objects, tweetT and imageT, to manage the Lifecycles of the two worker groups.

Then we wrote a magic function BindTomb to bind a child Tomb object with a parent Tomb object so the child Tomb Lifecycle will be bounded by the parent Tomb Lifecycle. This function does 4 things:

  1. It propagates the death reason of the child Lifecycle to the parent Lifecycle.
  2. It propagates interruption from the parent Lifecycle to the child Lifecycle.
  3. It extends the parent Lifecycle's end to after the child Lifecycle's end.
  4. It provides an opportunity to run extra teardown after the child's Lifecycle has ended.

We use this function to bind tweetT and imageT Tombs' Lifecycle to the parent Tomb t Lifecycle, and registered extra teardown that closes their output channel after their worker group Lifecycle ends.

Even though we can now write correct and scalable async operation compositions for advanced cases like this, we still find ourselves rewriting a lot of boilerplate codes again and again. And because of the lack of generics, a lot of these boilerplate codes cannot be abstracted away with function calls.

Generics

Recently with the stable release of Go 1.18, Go finally got the ability to do generics. Let's try to simplify away the boilerplate codes in the above example:

func Receive[T any](t *tomb.Tomb, c <-chan T) (value T, ok bool) {
    select {
    case value, ok = <-c:
    case t.Dying():
    }
    return
}

func Send[T any](t *tomb.Tomb, c chan<- T, value T) bool {
    select {
    case c <- value:
        return true
    case t.Dying():
    }
    return false
}

func MakeChildTombAndChan[T any](t *tomb.Tomb) (childT *tomb.Tomb, childC chan T) {
    childC = make(chan T)
    childT = new(tomb.Tomb)
    t.Go(func() (err error) {
        select {
        case <-childT.Dead():
            err = childT.Err()
        case <-t.Dying():
            childT.Kill(t.Err())
        }
        close(childC)
        return
    })
}

func FanOut[T any, S any](t *tomb.Tomb, workers int, inputs <-chan T, outputs chan<- S, project func(*tomb.Tomb, T) (S, error)) {
    for i := 0; i < 5; i++ {
        t.Go(func() (err error) {
            for {
                var ok bool
                var input T
                var output S
                if input, ok = Receive(t, inputs); !ok {
                    return
                }
                if output, err = project(t, input); err != nil {
                    return
                }
                if !Send(t, outputs, output) {
                    return
                }
            }
        })
    }
}

func DownloadUserTop10TweetsImages(t *tomb.Tomb, users <-chan string) <-chan *Image {
    tweetT, tweets := MakeChildTombAndChan[*Tweet](t)
    imageT, images := MakeChildTombAndChan[*Image](t)

    FanOut(tweetT, 5, users, tweets, FetchUserTop10Tweets)
    FanOut(imageT, 10, tweets, images, DownloadTweetImage)

    return images
}
Enter fullscreen mode Exit fullscreen mode

Now we finally end up with something much simpler. All the abstracted helper functions can be reused for the same pattern, and the main compose function is left with only a few lines of code.

ReactiveX (Rx)

However, we are only doing it for the Fan-in & Fan-out pattern. But there are many other async composition patterns out there. If we want to write many other patterns within a few lines of code, we would need a library for abstracting those boilerplates for us, pattern by pattern.

Fortunately, there's a language-agnostic library called ReactiveX, or just Rx, that not only has an abstraction for Lifecycle, but also a collection of common compositions of async operations that have semantic definitions for how to compose their Lifecycles as well.

Because the majority of Rx's implementations rely heavily on the use of generics, existing Golang implementations of Rx either doesn't leverage the Golang's type system very well (using interface{} everywhere) or is using legacy or unofficial generics design that's incompatible with today's Go 1.18 release of generics.

Therefore, why don't we try and build our own Rx implementation with Go 1.18 generics?

If you want to check out the final result, go to https://github.com/go-rx/rx. But keep in mind that this is an experimental project that I just hacked together in a weekend.

ReactiveX (Rx) is an Observable programming model. It defines Observable as an operation that outputs a stream of values. An Observable's Lifecycle starts when some "Subscriber" calls its Subscribe() method. The Operation can send output values to the subscriber, it can signal an error to the subscriber, can listen for interruption from the subscriber, and it will teardown and end after interruption.

We can define an Observable as an interface that can be "subscribed":

type Observable[T any] interface {
    Subscribe(subscriber Writer[T])
}
Enter fullscreen mode Exit fullscreen mode

Here we use the generic type parameter T to indicate the type of value the Observable would output. We see the subscriber has Writer[T] type. Let's see the type definition for Write[T] and its accompanying type Reader[T]:

type Writer[T any] interface {
    Lifecycle
    Write(T) bool
}

type Reader[T any] interface {
    Lifecycle
    Read() (T, bool)
}

type Lifecycle interface {
    Kill(error)
    Dead() <-chan struct{}
    Dying() <-chan struct{}
    Wait() error
    Go(func() error)
    Err() error
    Alive() bool
    Context(context.Context) context.Context
}
Enter fullscreen mode Exit fullscreen mode

The Writer[T] and Reader[T] types are a combination of the Go channel, and the Lifecycle management goodies borrowed from the Tomb semantics. Even though the underlying implementation can use the Go channel, we defined Read() and Write functions instead of directly exposing the Go channel because of a few reasons:

  1. We don't want to close on channels. It's hard to properly close a Go channel especially when there could be send and receive operations going on. If we closed a channel while someone is sending to it, the whole Go program will panic. So we don't want to use channel close as an indicator of stream completion.
  2. We want the underlying channel to be bounded with its Lifecycle. Which means ok := Writer[T].Write(value) and value, ok := Reader[T].Read() will complete with ok == true when a value is went through the underlying channel, or with ok == false if their Lifecycle is dying.
  3. We want the library users to think in terms of Lifecycles instead of the basic async building blocks. Such abstraction hides away those details.

This interface design is a little bit different than usual Rx implementations, but let's see how it can help up building async operation composition patterns.

Let's first start with something simple, writing a time Ticker as an Observable.

func Ticker(period time.Duration) Observable[int] {
    return Func(func(subscriber Writer[int]) (err error) {
        if period == 0 {
            return
        }
        count := 0
        ticker := time.NewTicker(period)
        for {
            select {
            case <-ticker.C:
                if !subscriber.Write(count) {
                    return
                }
                count++
            case <-subscriber.Dying():
                ticker.Stop()
                return
            }
        }
    })
}
Enter fullscreen mode Exit fullscreen mode

In this example we basically want to showcase the Func helper. It takes a function and turns it into an Observable. The implementation is quite easy:

func Func[T any](f func(subscriber Writer[T]) error) Observable[T] {
    return functionObservable[T](f)
}

type functionObservable[T any] func(subscriber Writer[T]) error

func (o functionObservable[T]) Subscribe(subscriber Writer[T]) {
    subscriber.Go(func() error {
        return o(subscriber)
    })
}
Enter fullscreen mode Exit fullscreen mode

When the returned Observable is subscribed, the provided function will run on a Go routine that is bounded to the subscriber's Lifecycle. That means all the blocking operations in the inner function will be async to the subscriber.

The Ticker example above has an inner function that blocks on a Ticker and sends values to the subscriber. This is a basic example of creating Observables.

Then, we can write common async operation combination patterns as Observable operators, that takes one or more Observables and perform the combination and transformation on them, resulting a new Observable. Let's look at how we can write a simple Map Observable operator:

func Map[T any, S any](source Observable[T], project func(T) (S, error)) Observable[S] {
    return Func(func(subscriber Writer[S]) (err error) {
        writer, reader := Pipe(PipeWithParentLifecycle[T](subscriber))
        source.Subscribe(writer)
        for {
            if input, ok := reader.Read(); ok {
                var output S
                if output, err = project(input); err != nil {
                    return
                }
                if !subscriber.Write(output) {
                    return
                }
            } else {
                return
            }
        }
    })
}
Enter fullscreen mode Exit fullscreen mode

Here we introduce another utility function Pipe, which will make a pair of writer and reader that share the same Lifecycle and underlying value channel. It means if you Write to the writer, you can Read the value from the reader. In this case above, PipeWithParentLifecycle[T](subscriber) option made the new Lifecycle of the pipe a child Lifecycle of the parent Lifecycle - subscriber. It means the writer and reader Lifecycle is bounded by the subscriber Lifecycle.

No let's see something a little bit more advanced - a SwitchMap:

func SwitchMap[T any, S any](source Observable[T], project func(T) Observable[S]) Observable[S] {
    return Func(func(subscriber Writer[S]) (err error) {
        outerWriter, outerReader := Pipe(PipeWithParentLifecycle[T](subscriber))
        source.Subscribe(outerWriter)
        var innerLifecycle Lifecycle
        for {
            var ok bool
            var outerValue T
            var innerObservable Observable[S]

            if outerValue, ok = outerReader.Read(); !ok {
                if innerLifecycle != nil {
                    innerLifecycle.Wait()
                }
                return
            }

            if innerLifecycle != nil {
                innerLifecycle.Kill(nil)
            }

            innerObservable = project(outerValue)
            innerWriter, innerReader := Pipe(PipeWithParentLifecycle[S](subscriber))
            subscriber.Go(func() (err error) {
                for {
                    if innerValue, ok := innerReader.Read(); ok {
                        // forward inner observable value to outter subscriber
                        if !subscriber.Write(innerValue) {
                            return
                        }
                    } else {
                        return
                    }
                }
            })

            innerLifecycle = innerReader
            innerObservable.Subscribe(innerWriter)
        }
    })
}
Enter fullscreen mode Exit fullscreen mode

SwitchMap will manage an innerLifecycle that is the subscription to the project function result. Every time it received a value from the outer source Observable, it project that value to an inner Observable and Subscribe on it, forwarding values from the inner Observable to the outer subscriber. If there's any subsequent value from the outer source Observable while we had an inner Subscription, we kill the inner Subscription and Subscribe on the new inner Observable projected from the new outer value.

Let's utilize this Rx library and write something more practical.

package main

import (
    "crypto/sha256"
    "encoding/hex"
    "fmt"
    "io"
    "net/http"
    "time"

    "github.com/google/go-github/v43/github"
    "golang.org/x/time/rate"
    "gopkg.in/rx.v0"
)

func main() {
    client := github.NewClient(nil)
    rl := rate.NewLimiter(rate.Every(time.Second), 1)

    fetchTopContributors := func(owner string, repository string) rx.Observable[*github.Contributor] {
        return rx.Defer(func(subscriber rx.Lifecycle) rx.Observable[*github.Contributor] {
            ctx := subscriber.Context(nil)
            rl.Wait(ctx)
            contribs, _, err := client.Repositories.ListContributors(ctx, owner, repository, nil)
            if err != nil {
                return rx.Error[*github.Contributor](err)
            }
            return rx.List(contribs)
        })
    }

    fetchUserFollowings := func(contrib *github.Contributor) rx.Observable[*github.User] {
        return rx.Func(func(subscriber rx.Writer[*github.User]) (err error) {
            if contrib.Login == nil {
                return
            }
            ctx := subscriber.Context(nil)
            rl.Wait(ctx)
            followings, _, err := client.Users.ListFollowing(ctx, *contrib.Login, nil)
            if err != nil {
                return
            }
            for _, following := range followings {
                if !subscriber.Write(following) {
                    return
                }
            }
            return
        })
    }

    downloadUserProfileImages := func(user *github.User) rx.Observable[[]byte] {
        return rx.Func(func(subscriber rx.Writer[[]byte]) (err error) {
            if user.AvatarURL == nil {
                return
            }
            resp, err := http.Get(*user.AvatarURL)
            if err != nil {
                return
            }
            defer resp.Body.Close()

            image, err := io.ReadAll(resp.Body)
            if err != nil {
                return
            }
            if !subscriber.Write(image) {
                return
            }
            return
        })
    }

    contribs := fetchTopContributors("ReactiveX", "rxjs")
    followings := rx.MergeMap(contribs, fetchUserFollowings, rx.MergeMapWithStandbyConcurrency(5))
    images := rx.MergeMap(followings, downloadUserProfileImages, rx.MergeMapWithStandbyConcurrency(10))

    writer, reader := rx.Pipe[[]byte]()
    images.Subscribe(writer)

    for {
        if image, ok := reader.Read(); ok {
            fmt.Printf("Image Size: %d Hash: %s\n", len(image), hex.EncodeToString(SHA256(image)))
        } else {
            break
        }
    }

    if err := reader.Wait(); err != nil {
        panic(err)
    }
}

func SHA256(b []byte) []byte {
    m := sha256.Sum256(b)
    return m[:]
}
Enter fullscreen mode Exit fullscreen mode

This is a whole example program that can run (check out from GitHub). In this example, we utilize the concurrent fan-out feature of the MergeMap operator to create 5 workers and 10 workers accordingly for fetching GitHub user followings and downloading and hashing user profile images.

Conclusion

We explained how thinking in terms of Lifecycle can help us to write scalable and proper async programs. Today with the help of Tomb package and Go generics, we can finally have a library that helps us to easily manage and compose Lifecycles.

The library is still in the early alpha phase, with many operators to be implemented. Contribution and suggestions are welcomed!

Oldest comments (0)