DEV Community

Cover image for go-kata 01/01-concurrent-aggregator
Manuel Doncel Martos
Manuel Doncel Martos

Posted on

go-kata 01/01-concurrent-aggregator

A few weeks ago I discovered this GitHub repository go-kata, containing some Go exercises that encourage to write idiomatic Go. What caught my attention was the fact that they come with no solutions, just you, the problem, and your Go skills.

The repository quickly gained traction, and got more than 1k stars⭐ and was mentioned in several social networks, e.g. X.

I decided to tackle these problems myself and share my solutions in my own fork, hoping to inspire people to try and to refine my approach through community feedback.

01-context-cancellation-concurrency/01-concurrent-aggregator

The Problem

The challenge is straightforward but captures a common real-world scenario:

Call two services concurrently, Profile and Order, combine their outputs into a single string like
"User: Alice | Orders: 5", and handle failures gracefully.
If either service call fails, the entire operation must be interrupted immediately.

A more detailed description of the requirements can be found in the README.md.

Implementing The Services

Let's start implementing the services. The goal is they can be configurable in a way that they can either return the actual result, or an error, and also how long does it take to get that response.

So we could have something like:

  • ✅ Both services succeed within timeout
  • ⏱️ Both services timeout
  • ❌ One service fails while the other succeeds
  • 🔄 Context cancellation propagates correctly

For that, and without entering in too much details, I created a mock service that I can configure the output and the time.

type (
    Service[T any] struct {
        Response Response[T]
    }

    Response[T any] struct {
        Sleep time.Duration
    Val   T
    Err   error
    }
)

func (s Service[T]) GetResponse(ctx context.Context) (T, error) {
    t := new(T)

    select {
        case <-time.After(s.Response.Sleep):
        return s.Response.Val, s.Response.Err
    case <-ctx.Done():
        return *t, ctx.Err()
    }
}
Enter fullscreen mode Exit fullscreen mode

Then I created the profile.Service and order.Service using that mock.

In that way, I can configure the two services to cover those scenarios we mentioned above, e.g:

  1. Profile and order services returns the a successful response on time:
timeout := 2*time.Second
ps := profile.NewMockService(profileSuccessResponse(time.Millisecond))
os := order.NewMockService(orderSuccessResponse(time.Millisecond))
Enter fullscreen mode Exit fullscreen mode
  1. Profile and order services returns the a successful response not on time:
timeout := 2*time.Second
ps := profile.NewMockService(profileSuccessResponse(3*time.Second))
os := order.NewMockService(orderSuccessResponse(3*time.Second))
Enter fullscreen mode Exit fullscreen mode
  1. Profile service returns a successful response on time, but order service returns an error.
timeout := 2*time.Second
ps := profile.NewMockService(profileSuccessResponse(time.Millisecond))
os := order.NewMockService(errorOrderResponse (time.Millisecond))
Enter fullscreen mode Exit fullscreen mode
  1. Other combinations...

Implementing The Aggregator

The goal of the aggregator is to call those two services concurrently, and stop as soon as one of the queries fail.
You must use golang.org/x/sync/errgroup.

First of all let's define the Aggregator, we need the two services and a timeout:

type UserAggregator struct {
    profileService profile.Service
    ordersService  order.Service
    timeout        time.Duration
}
Enter fullscreen mode Exit fullscreen mode

The go-kata also mentions that the Aggregator needs to be configurable using the Functional Options Pattern, so then we can define the constructor like this:

func New(ps profile.Service, os order.Service, options ...func(*UserAggregator)) UserAggregator {
    ua := &UserAggregator{
    profileService: ps,
    ordersService:  os,
    }
    for _, option := range options {
        option(ua)
    }

    return *ua
}
Enter fullscreen mode Exit fullscreen mode

And then, finally the actual Aggregate implementation. We need to declare a context.Context using the timeout passed as a struct field, and then use:

g, ctx := errgroup.WithContext(ctx)
g.Go(...
Enter fullscreen mode Exit fullscreen mode

To concurrently query the two services.

func (ua UserAggregator) Aggregate(id int) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), ua.timeout)
    defer cancel()

    var (
        user       string
        orderValue string
    )

    g, ctx := errgroup.WithContext(ctx)
    g.Go(func() error {
        u, err := ua.profileService.GetProfile(ctx, id)
        if err != nil {
        return err
        }

        user = u.Name

        return nil
    })
    g.Go(func() error {
        o, err := ua.ordersService.GetOrder(ctx, id)
        if err != nil {
            return err
        }

        orderValue = o.Order

        return nil
    })

    err := g.Wait()
    if err != nil {
        return "", err
    }

    return fmt.Sprintf("User: %s | Orders: %s", user, orderValue), nil
}
Enter fullscreen mode Exit fullscreen mode

What's Next?

I'm working through more go-kata problems and publishing solutions in my fork.
I'd love to hear your feedback:

  • Would you solve this differently?
  • Are there edge cases I'm missing?
  • What Go patterns would you apply?

Top comments (0)