DEV Community

Robin
Robin

Posted on

Marrying Actor, DDD, and Worker Pattern using Go

Imagine a Domain event called Order.Placed.
Each Order.Placed contains:

{
    "customer": "CUST-001",
    "merchant": "MRCN-001",
    "payment": "CARD-001",
    "items": [
        {"id": "IT-001", "qty": 1},
        {"id": "IT-002", "qty": 2}
    ],
    "promo": "PROM-001"
}
Enter fullscreen mode Exit fullscreen mode

Everytime an order is placed, there must be something in the Backend service that:

  • Getting customer's detail
  • Getting merchant's detail
  • Getting promotion's detail
  • Getting each product / item detail
  • Calculate sum of the order and deduct it with promotion
  • Make an order based on the gathered information, and notify it to merchant
  • Make an invoice based on the gathered information, and notify it to customer
  • Make a payment through payment service based on the invoice
  • Flag the order state as paid
  • Notify merchant that payment is done to said order
  • Notify customer that payment success to said invoice

That's Tremendously Ridiculous amount of work needed to be done in the Backend side.
It sounds wrong to just make an API endpoint to handle these work, especially if your Platform can receive a ludicrous amount of order per minute across multiple region.

Aggregate Root disguised as Actor

First of all, let's take on the rate of order issue.
Order can be placed at any time by millions of customer at the same time and the system must NEVER lose any of it.
System can defer processing an order by placing it in a queue which can be persisted and recovered in case of server restart, crash, or whatever disaster that comes.

Let's design an object called Actor

  • An Actor is an object which process messages being sent to it
  • So each Actor have an inbox which acts as a queue
  • The inbox can contain any type of message, but usually 1 actor handle 1 kind of message
  • The actor processor is a function delegated by its maker
  • Each actor can only have 1 type of processor but can have multiple instance of it. Let's call it worker.
  • A processor might, or might not produce a result
  • A processor might, or might not produce an error
package actor

// Processor is the delegate which process a message
// @worker is its assigned worker number (starts from 1) in case we make more than 1 worker
// @actor is the reference to which actor that receives the message
// @message is the current individual message from actor's inbox
type Processor func(worker int, actor *Actor, message interface{}) (interface{}, error)

// Exception handler in case processor produce an error
// @worker is its assigned worker number (starts from 1) in case we make more than 1 worker
// @actor is the reference to which actor that receives the message
// @err is the error that happened after trying to process a message
type Exception func(worker int, actor *Actor, err error)

// Options when initializeing an Actor
type Options struct {
    Worker      int              // number of worker / processor go routine, defaults = 1
    Output      chan interface{} // output channel, on which Actor will send in after process is done
}

// configure fallbacks to default
func (opt *Options) configure() {
    if opt.Worker <= 0 {
        opt.Worker = 1
    }
}

// Actor ...
type Actor struct {
    inbox     chan interface{}
    outbox    chan interface{}
    process   Processor
    exception Exception
}


// New instance of an Actor with w as number of worker
// @p is the processor function
// @e is the exception handler
func New(p Processor, e Exception, opt *Options) *Actor {
    opt.configure()

    actor := &Actor{
        inbox:     make(chan interface{}, opt.Worker),
        outbox:    opt.Output,
        process:   p,
        exception: e,
    }

    actor.start(0, opt.Worker)
    return actor
}

// start the actor with n number of worker
func (actor *Actor) start(idx, n int) {
    if idx == n {
        return
    }

    // worker number starts from 1
    go actor.work(idx + 1)
    actor.start(idx+1, n)
}
func (actor *Actor) work(w int) {
    for message := range actor.inbox {
        result, err := actor.process(w, actor, message)

        if err != nil && actor.exception != nil {
            actor.exception(w, actor, err)
            continue
        }

        if actor.outbox != nil {
            actor.outbox <- result
            continue
        }
    }
}

// Queue a message to inbox
func (actor *Actor) Queue(messages ...interface{}) {
    go func() {
        for _, message := range messages {
            actor.inbox <- message
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

We'll test the actor with simple test case:

func Test_Actor(t *testing.T) {
    word= [...]string{"One", "Two", "Three"}
    actor := New(func(w int, actor *Actor, message interface{}) (interface{}, error) {
        result := words[w-1]

        fmt.Println("worker", w,
            "receive", message,
            "processed as", result,
            "send to?", actor.outbox)

        return result, nil
    }, func(w int, actor *Actor, err error) {
        fmt.Println(err)
    }, &Options{Worker: 3})

    wg := sync.WaitGroup{}
    for i := 0; i <= 10; i++ {
        wg.Add(1)
        go func(i int) {
            idx := i % 3
            word := words[idx]
            actor.Queue(word)
            wg.Done()
        }(i)
    }

    wg.Wait()
}
Enter fullscreen mode Exit fullscreen mode
  • The test code creates an actor with 3 worker
  • Each worker do the same type of job, which only returns word based on its worker number
  • We loop 10 times and queue a word into the actor
  • Lastly, we wait for the wait group to be done
robin.bastian$ go test -timeout=10s -run "^(Test_Actor)$"
worker 2 receive One processed as Two send to? <nil>
worker 2 receive One processed as Two send to? <nil>
worker 3 receive Two processed as Three send to? <nil>
worker 3 receive One processed as Three send to? <nil>
worker 1 receive Two processed as One send to? <nil>
worker 1 receive Three processed as One send to? <nil>
worker 1 receive Three processed as One send to? <nil>
worker 1 receive Two processed as One send to? <nil>
worker 1 receive One processed as One send to? <nil>
worker 2 receive Three processed as Two send to? <nil>
worker 3 receive Two processed as Three send to? <nil>
PASS
ok      github.com/bastianrob/go-experiences/generator/actor    0.006s
Enter fullscreen mode Exit fullscreen mode

From the test results, we can see:

  • Exactly 3 workers are spawned with one type of processor
  • Order of message processed is NOT guaranteed
  • Each process returns a result but is not being sent to actor's outbox

Stopping the Actor

Next, we'll need a mechanism to Stop an actor. When an actor is stopped, we collect all pending messages and return it to the caller.

First of all, we add an exit mechanism to the Actor by adding:

type Actor struct {
    ...
    // exit mechanism
    exit       chan struct{}
    workgroup  *sync.WaitGroup // worker waitgroup
    inboxgroup *sync.WaitGroup // inbox waitgroup
}

func New(p Processor, e Exception, opt *Options) *Actor {
    opt.configure()

    actor := &Actor{
        ...
        exit:       make(chan struct{}),
        workgroup:  &sync.WaitGroup{},
        inboxgroup: &sync.WaitGroup{},
    }

    actor.start(0, opt.Worker)
    return actor
}
Enter fullscreen mode Exit fullscreen mode
  • exit is an exit channel which will be used to signal each worker to stop processing a message
  • workgroup is a wait group that counts how many worker is still active
  • inboxgroup is a wait group that counts how many messages is still in the inbox

Next, we implement all of those exit mechanism:

  • inboxgroup is added everytime a message is queued into inbox
  • workgroup is added everytime a worker is spawned
  • Each of it tries to listen to both inbox and exit channel by using select case statement
  • inboxgroup is decreased everytime a message is done processed by a worker
  • workgroup is decreased everytime a worker quits when it receives signal from exit channel
func (actor *Actor) Queue(messages ...interface{}) {
    // add length of message to inbox wait group
    actor.inboxgroup.Add(len(messages))
    go func() {
        for _, message := range messages {
            actor.inbox <- message
        }
    }()
}

func (actor *Actor) work(w int) {
    actor.workgroup.Add(1)       // worker group is added
    defer actor.workgroup.Done() // defer flagging worker group as done

    for {
        select {
        case message := <-actor.inbox: // waits for message to come from inbox
            result, err := actor.process(w, actor, message)

            if err != nil && actor.exception != nil {
                actor.exception(w, actor, err)
                actor.inboxgroup.Done() // flag 1 message as done
                continue
            }

            if actor.outbox != nil {
                actor.outbox <- result
                actor.inboxgroup.Done() // flag 1 message as done
                continue
            }

            // flag 1 message as done
            actor.inboxgroup.Done()
        case <-actor.exit: // listen on exit signal
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Lastly, we implements the Stop method to:

  • Close the exit channel so it is broadcasted to all workers
  • Wait for all workers to finish exiting
  • Gather the pending messages that's still lingering in inbox
  • Decrease the inboxgroup counter everytime we finished collecting a message
  • Wait for all pending messages to be collected
  • Close the inbox channel
  • And report all the pending messages to caller
// Stop actor from processing any message
func (actor *Actor) Stop() (pendings []interface{}) {
    // stop all worker from processing any inbox
    close(actor.exit)
    actor.workgroup.Wait()

    // gather pending messages inside inbox and flag it as done
    go func() {
        for message := range actor.inbox {
            pendings = append(pendings, message)
            actor.inboxgroup.Done()
        }
    }()

    // wait for pending messages gathering to be completed and close the inbox channel
    actor.inboxgroup.Wait()
    close(actor.inbox)

    // return gathered pending messages
    return pendings
}
Enter fullscreen mode Exit fullscreen mode

And now it's time to test the stop mechanism:

func Test_ActorStop(t *testing.T) {
    mux := sync.Mutex{}
    var processed []interface{}
    actor := New(func(w int, actor *Actor, message interface{}) (interface{}, error) {
        mux.Lock()
        processed = append(processed, message)
        mux.Unlock()

        return nil, nil
    }, func(w int, actor *Actor, err error) {
        fmt.Println(err)
    }, &Options{Worker: 5})

    expected := 0
    for i := 1; i <= 100; i++ {
        go actor.Queue(i)
        expected = expected + i
    }

    pendings := actor.Stop()
    combined := append(processed, pendings...)

    sum := 0
    for _, e := range combined {
        sum = sum + e.(int)
    }
    if sum != expected {
        t.Error("Sum of 1-100 must be", expected, "but received", sum)
    }

    fmt.Println("PEND", pendings)
    fmt.Println("PROC", processed)
}
Enter fullscreen mode Exit fullscreen mode
  • The test code creates an Actor with 5 workers
  • We feed the actor with number from 1 to 100
  • So the expected sum of 1 + 2 + ... + 100 should be 5050
  • In the processor function, we collect all processed number into a variable called var processed []interface{}
  • We use mutex to guard it because it will be accessed by 5 workers working from different go routine
  • We then collect all pending numbers by calling actor.Stop and store it to a variable called pendings
  • We combine both processed and pendings into slice called combined and sum all numbers inside it
  • We compare the sum of all combined numbers with the expected number
robin.bastian$ go test -timeout=10s -run "^(Test_ActorStop)$"
PEND [7 21 8 22 10 23 63 24 57 25 26 27 28 29 30 31 73 64 65 66 67 68 58 59 60 61 62 71 70 79 74 75 76 77 69 78 72 82 80 81 84 83 85 86 93 87 88 89 90 91 92 96 94 95 98 97 99 100]
PROC [6 11 3 4 5 43 13 33 14 34 15 35 16 36 17 18 37 38 39 40 41 42 56 44 45 46 47 48 49 50 51 52 53 54 55 19 9 12 1 32 20 2]
PASS
ok      github.com/bastianrob/go-experiences/generator/actor    0.006s
Enter fullscreen mode Exit fullscreen mode

Directing Actors

Individually, an actor can do its job fine enough. But it will be better if we can chain actors together to build a pipeline processing!
So let's think of what we already have:

  • Each actor have an inbox and outbox
  • While inbox is required, an outbox is optional
  • Now we want to chain output from one actor as input to another actor
  • In other words: we can assign one actor's outbox with another actor's inbox

First, we'll have to refactor some of our Actor's code:

type Options struct {
    ...
    Name        string // actor's name
    Output      *Actor // output actor, on which source actor will send a message after process is done
}

type Actor struct {
    ...
    // metadata
    name string
}

func (actor *Actor) work(w int) {
    ...

    for {
        select {
        case message := <-actor.inbox: // waits for message to come from inbox
            ...
            if actor.outbox != nil {
                actor.outbox.Queue(result)
                actor.inboxgroup.Done() // flag 1 message as done
                continue
            }
            ...
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We added name to an actor so we can identify an actor easily.
And in the worker, if current actor's outbox is not nil, we queue the process output to next actor.

And then we write a function called Direct:

package actor

// Direct inbox of a target actor, as source actor's outbox
func Direct(actors ...*Actor) {
    var source *Actor
    for _, target := range actors {
        if source == nil {
            source = target
            continue
        }

        source.outbox = target.inbox
        source = target
    }
}
Enter fullscreen mode Exit fullscreen mode

Again, we'll write a test to ensure the behaviour is within expectation

func Test_ActorDirected(t *testing.T) {
    errPrinter := func(w int, actor *Actor, err error) {
        fmt.Println("worker:", w, "err:", err)
    }

    bale := New(func(w int, actor *Actor, in interface{}) (interface{}, error) {
        return in, nil
    }, errPrinter, &Options{
        Worker: 3,
        Name:   "Bale",
    })
    bane := New(func(w int, actor *Actor, in interface{}) (interface{}, error) {
        switch {
        case in == "I AM VENGEANCE":
            return "I AM INEVITABLE", nil
        case in == "I AM THE NIGHT":
            return "I AM BANE", nil
        case in == "I'M BATMAN":
            return "I WILL BREAK YOU", nil
        default:
            return nil, errors.New("WHATEVER YOU SAY")
        }
    }, errPrinter, &Options{
        Worker: 3,
        Name:   "Bane",
    })
    subtitle := New(func(w int, actor *Actor, in interface{}) (interface{}, error) {
        fmt.Println("worker:", w, "actor:", actor.name, "receive:", in)
        if in != "I AM INEVITABLE" && in != "I AM BANE" && in != "I WILL BREAK YOU" {
            t.Error("Bane's subtitle must be one of:", "I AM INEVITABLE", "I AM BANE", "I WILL BREAK YOU")
        }
        return nil, nil
    }, errPrinter, &Options{
        Worker: 3,
        Name:   "Subtitle",
    })

    Direct(bale, bane, subtitle)

    bale.Queue("I AM VENGEANCE", "I AM THE NIGHT", "I'M BATMAN", "HEY HO!")
    bale.Stop()
    bane.Stop()
    subtitle.Stop()
}
Enter fullscreen mode Exit fullscreen mode

In this test, we:

  • Have 3 actors named Bale, Bane, and Subtitle
  • Bale will say I AM VENGEANCE, I AM THE NIGHT, I'M BATMAN, and HEY HO! to Bane
  • Bane will say I AM INEVITABLE, I AM BANE, and I WILL BREAK YOU to Subtitle
  • Bane will be confused by HEY HO! and responds WHATEVER YOU SAY to errPrinter
  • Subtitle will print whatever Bane says

Test result shows:

robin.bastian$ go test -timeout 10s -run "^(Test_ActorDirected)$"
worker: 3 actor: Subtitle receive: I AM BANE
worker: 2 actor: Subtitle receive: I WILL BREAK YOU
worker: 1 actor: Subtitle receive: I AM INEVITABLE
worker: 2 actor: Bane err: WHATEVER YOU SAY
PASS
ok      github.com/bastianrob/go-experiences/generator/actor    0.006s
Enter fullscreen mode Exit fullscreen mode

Mocking Fake Microservices

Back to the example domain event in the beginning of this article, we have 7 microservices with CRUD implementation with Order microservice in the center of it all.
We'll try to mock the CRUD implementation to all of the microservices:

package mock

// CRUD contract
type CRUD interface {
    Get(id string) (interface{}, error)
    Create(dao interface{}) error
    Update(dao interface{}) error
}

// APIClient generic mock implementation of CRUD interface
type APIClient struct {
    GetFunc    func(id string) (interface{}, error)
    CreateFunc func(dao interface{}) error
    UpdateFunc func(dao interface{}) error
}

// Get mock, please implement GetFunc
func (ac *APIClient) Get(id string) (interface{}, error) {
    return ac.GetFunc(id)
}

// Create mock, please implement CreateFunc
func (ac *APIClient) Create(dao interface{}) error {
    return ac.CreateFunc(dao)
}

// Update mock, please implement UpdateFunc
func (ac *APIClient) Update(dao interface{}) error {
    return ac.UpdateFunc(dao)
}
Enter fullscreen mode Exit fullscreen mode
  • CRUD is a contract to imaginary API which
  • APIClient is the mocked / fake implementation of a CRUD

We'll use these later to mock CRUD call all microservices.

Each of those microservice have their own data transfer object.
Let's call them: dto

package dto

type Customer struct {
    ID    string
    Email string
    Name  string
}

type Invoice struct {
    ID       string
    Order    string
    Customer string
    Promo    string
    Subtotal int
    Discount int
    Total    int
}

type Merchant struct {
    ID    string
    Email string
    Name  string
}

type Payment struct {
    ID        string
    InvoiceID string
    MethodID  string // card, cash, balance, whatever
    Amount    int
}

type Product struct {
    ID    string
    Name  string
    Price int
}

type Promotion struct {
    ID       string
    Name     string
    Discount int // percentage
}
Enter fullscreen mode Exit fullscreen mode

And since we're in the context of order we can access its data access object (dao):

package dao

import "time"

// OrderItem DAO
type OrderItem struct {
    ID    string
    Name  string
    Qty   int
    Price int
}

// Order DAO
type Order struct {
    ID           string
    Date         time.Time
    State        OrderState
    CustomerID   string
    CustomerName string
    MerchantID   string
    MerchantName string
    Items        []*OrderItem
    Total        int
}
Enter fullscreen mode Exit fullscreen mode

Lastly we create a command package which, as its name implies, command our order service to do something

package command

// LineItem ordered item & qty
type LineItem struct {
    ID  string
    Qty int
}

// PlaceOrder command
type PlaceOrder struct {
    Customer string
    Merchant string
    Payment  string
    Promo    string
    Items    []LineItem
}
Enter fullscreen mode Exit fullscreen mode

Order as Aggregate Root

We need a service which orchestrate all required data fom multiple services.
We call it Aggregate Root and in this case, it's in the order context.

So let's start thinking about the flow:

  • Order is working as the Aggregate Root so let's just call it Root
  • Root can receive a command called PlaceOrder which contains all necessary information for Root to work
  • Each time Root receives a message, it will orchestrate calls to multiple services needed to place an order
package order

import (
    "errors"
    "fmt"
    "time"

    "github.com/bastianrob/go-experiences/generator/order/pkg/dao"
    "github.com/bastianrob/go-experiences/generator/order/pkg/dto"

    "github.com/bastianrob/go-experiences/generator/actor"
    "github.com/bastianrob/go-experiences/generator/mock"
    "github.com/bastianrob/go-experiences/generator/order/pkg/command"
)

// Services collection
type Services struct {
    Customer mock.CRUD
    Invoice  mock.CRUD
    Merchant mock.CRUD
    Order    mock.CRUD
    Payment  mock.CRUD
    Product  mock.CRUD
    Promo    mock.CRUD
}

// Config for order service
type Config struct {
    Worker   int
    Services Services
}

// Root aggregate root of order
type Root struct {
    *actor.Actor
    services Services
}

// NewAggregateRoot for order
func NewAggregateRoot(cfg *Config) *Root {
    root := &Root{
        services: cfg.Services,
    }

    n := cfg.Worker
    if n <= 0 {
        n = 10
    }

    worker := &actor.Options{Worker: n}
    root.Actor = actor.New(root.processor, root.exception, worker)

    return root
}

func (root *Root) processor(w int, a *actor.Actor, msg interface{}) (interface{}, error) {
    if msg == nil {
        return nil, errors.New("Order message is empty")
    }

    var customer *dto.Customer
    var merchant *dto.Merchant
    var promo *dto.Promotion

    // 1. Converts message to command
    cmd := msg.(*command.PlaceOrder)

    // 2. Fetch required information
    // Uses goroutine because we all have verbose if err
    errc := make(chan error)
    go func(errc chan<- error) {
        cust, err := root.services.Customer.Get(cmd.Customer)
        if err != nil {
            errc <- err
            return
        }
        customer = cust.(*dto.Customer)

        mcr, err := root.services.Merchant.Get(cmd.Merchant)
        if err != nil {
            errc <- err
            return
        }
        merchant = mcr.(*dto.Merchant)

        prm, err := root.services.Promo.Get(cmd.Promo)
        if err != nil {
            errc <- err
            return
        }
        promo = prm.(*dto.Promotion)

        errc <- nil
    }(errc)

    // 3. Wait for fetch to complete and listen to any error occurred
    if err := <-errc; err != nil {
        return nil, err
    }

    // 4. Get product details and calculate the total
    order := &dao.Order{
        Date:         time.Now(),
        State:        dao.New,
        CustomerID:   customer.ID,
        CustomerName: customer.Name,
        MerchantID:   merchant.ID,
        MerchantName: merchant.Name,
        Items:        make([]*dao.OrderItem, len(cmd.Items)),
    }
    for i, entry := range cmd.Items {
        it, err := root.services.Product.Get(entry.ID)
        if err != nil {
            return nil, errors.New("Failed to get item with ID: " + entry.ID)
        }

        item := it.(*dto.Product)
        order.Items[i] = &dao.OrderItem{
            ID:    item.ID,
            Name:  item.Name,
            Qty:   entry.Qty,
            Price: item.Price,
        }
        order.Total += (entry.Qty * item.Price)
    }

    // 5. Persist the order data to database
    err := root.services.Order.Create(order)
    if err != nil {
        return nil, errors.New("Failed to create a new order: " + err.Error())
    }

    // 6. Create the invoice through API
    discount := order.Total * promo.Discount / 100
    invoice := &dto.Invoice{
        Order:    order.ID,
        Customer: order.CustomerID,
        Promo:    promo.ID,
        Subtotal: order.Total,
        Discount: discount,
        Total:    (order.Total - discount),
    }
    err = root.services.Invoice.Create(invoice)
    if err != nil {
        // TODO: Recovery strategy, delete the order? or flag it if you wish
        return nil, errors.New("Failed to create a payment: " + err.Error())
    }

    // 7. Make a payment through API call
    payment := &dto.Payment{
        InvoiceID: invoice.ID,
        MethodID:  cmd.Payment,
        Amount:    invoice.Total,
    }
    err = root.services.Payment.Create(payment)
    if err != nil {
        // TODO: Recovery strategy to both order and invoice
        return nil, errors.New("Failed to create a payment: " + err.Error())
    }

    return order, nil
}

func (root *Root) exception(w int, a *actor.Actor, err error) {
    fmt.Println("Exception occurred at worker:", w, "with err:", err)
}
Enter fullscreen mode Exit fullscreen mode

As we can see in the code above:

  • Root owns an Actor
  • The processor is an actor's processor which orchestrate all call to multiple services

Now let's test the code by writing some unit tests:

package order

import (
    "errors"
    "fmt"
    "sync"
    "testing"
    "time"

    "github.com/bastianrob/go-experiences/generator/actor"
    "github.com/bastianrob/go-experiences/generator/mock"
    "github.com/bastianrob/go-experiences/generator/order/pkg/command"
    "github.com/bastianrob/go-experiences/generator/order/pkg/dao"
    "github.com/bastianrob/go-experiences/generator/order/pkg/dto"
)

func Test_OrderAsAggregateRoot(t *testing.T) {
    customerAPIMock := &mock.APIClient{
        GetFunc: func(id string) (interface{}, error) {
            time.Sleep(20 * time.Millisecond) // simulate 20ms latency
            return &dto.Customer{
                ID:   id,
                Name: "I am your customer",
            }, nil
        },
    }
    merchantAPIMock := &mock.APIClient{
        GetFunc: func(id string) (interface{}, error) {
            time.Sleep(20 * time.Millisecond) // simulate 20ms latency
            return &dto.Merchant{
                ID:   id,
                Name: "I am your merchant",
            }, nil
        },
    }
    promotionAPIMock := &mock.APIClient{
        GetFunc: func(id string) (interface{}, error) {
            time.Sleep(20 * time.Millisecond) // simulate 20ms latency
            return &dto.Promotion{
                ID:       id,
                Name:     "10% discount",
                Discount: 10,
            }, nil
        },
    }
    invoiceAPIMock := &mock.APIClient{
        CreateFunc: func(obj interface{}) error {
            time.Sleep(20 * time.Millisecond) // simulate 20ms latency
            inv := obj.(*dto.Invoice)
            inv.ID = "INV-001"
            return nil
        },
    }
    orderAPIMock := &mock.APIClient{
        CreateFunc: func(obj interface{}) error {
            time.Sleep(20 * time.Millisecond) // simulate 20ms latency
            inv := obj.(*dao.Order)
            inv.ID = "INV-001"
            return nil
        },
    }
    paymentAPIMock := &mock.APIClient{
        CreateFunc: func(obj interface{}) error {
            time.Sleep(20 * time.Millisecond) // simulate 20ms latency
            pay := obj.(*dto.Payment)
            pay.ID = "PMT-001"
            return nil
        },
    }
    productAPIMock := &mock.APIClient{
        GetFunc: func(id string) (interface{}, error) {
            time.Sleep(20 * time.Millisecond) // simulate 20ms latency
            switch id {
            case "ITEM-001":
                return &dto.Product{
                    ID:   id,
                    Name: "I am item 001",
                }, nil
            case "ITEM-002":
                return &dto.Product{
                    ID:   id,
                    Name: "I am item 002",
                }, nil
            }
            return nil, errors.New("404")
        },
    }

    // root is order actor which acts as an aggregate root
    // have by default 10 workers
    root := NewAggregateRoot(&Config{
        Worker: 20,
        Services: Services{
            Customer: customerAPIMock,
            Merchant: merchantAPIMock,
            Invoice:  invoiceAPIMock,
            Order:    orderAPIMock,
            Payment:  paymentAPIMock,
            Product:  productAPIMock,
            Promo:    promotionAPIMock,
        },
    })

    // waiter is an actor which hears from root and keep tracks of how many order have we done processing
    wg := &sync.WaitGroup{}
    wg.Add(100)
    waiter := actor.New(
        func(w int, a *actor.Actor, message interface{}) (interface{}, error) {
            wg.Done()
            return nil, nil
        },
        func(w int, a *actor.Actor, err error) {
            wg.Done()
        },
        &actor.Options{Worker: 10},
    )

    // clock in to check how long we're porcessing 100 command
    start := time.Now()

    // place order 100 times
    var orders []interface{}
    for i := 0; i < 100; i++ {
        orders = append(orders, &command.PlaceOrder{
            Customer: "CUST-001",
            Merchant: "MRCN-001",
            Payment:  "CARD-001",
            Promo:    "DISC-10",
            Items: []command.LineItem{{
                ID:  "ITEM-001",
                Qty: 1,
            }, {
                ID:  "ITEM-002",
                Qty: 1,
            }},
        })
    }
    root.Queue(orders...)

    actor.Direct(root.Actor, waiter)
    fmt.Println("We are waiting")
    wg.Wait()

    // we have at least 7 fake services and each takes simulated 20ms to complete
    // so total time it takes to complete 100 command * 140ms = 14sec
    // 14 sec if we only have 1 worker.
    // Ideally we can cut it down to minimal of 0.7sec because we have 20 workers
    dur := time.Since(start)
    if dur.Seconds() >= 1. {
        t.Error("Total processing time should not exceed 1sec")
    }

    fmt.Println("Duration:", dur)
}
Enter fullscreen mode Exit fullscreen mode

In the test:

  • We mock all API call by using the mock.APIClient
  • Each API call is given 20ms sleep to simulate latency
  • We instantiate an Aggregate Root and give it 20 worker
  • We have total of 7 services and each call cost 20ms so 1 message should cost around 140ms
  • We give 100 command to the Aggregate Root
  • If each command takes 140ms processing time, and we're giving 100. Total time it takes is 14000ms or 14s if done serially
  • But we give 20 workers to the Aggregate Root so the fastest we can achieve is t = 14s / 20 = 0.7s
  • We direct Aggregate Root's output to another actor called waiter
  • The waiter simply waits for Aggregate Root to finish processing 100 commands.
  • And then we track the duration it takes for Aggregate Root to complete 100 commands.
robin.bastian$ go test -timeout 3s -run "^(Test_OrderAsAggregateRoot)$"
We are waiting
Duration: 881.459161ms
PASS
ok      github.com/bastianrob/go-experiences/generator/order/internal   0.888s
Enter fullscreen mode Exit fullscreen mode

Distributed Processing

Now imagine 100 order placed / sec in the test case above is not enough.
We can make a cluster of Aggregate Root, have them listen to a message broker like NSQ or RabbitMQ, and let the message broker handle the load balancing.


Shenanigan's github https://github.com/bastianrob/go-experiences/tree/master/generator

Top comments (1)

Collapse
 
maestre3d profile image
A. Ruiz • Edited

To be honest, this could be a lot easier if you used hexagonal architecture.

You just need to abstract your data concurrent aggregation with an event bus interface (the implementation could be using AWS SNS/SQS, Rabbit, NATS or my preferred, Apache Kafka), after that, inside your concrete implementation you will need to publish a message into your favorite provider. Therefore, you'll need to start a pool of processor's workers to parallel jobs using goroutines and then you start the aggregation asynchronously.

That or if you were using AWS or something similar (say GCP or even MS Azure), you can use their PubSub APIs -SNS/SQS in AWS- and create a serverless function per-processor which can be easily subscribed to topics/queues from their related infrastructure using the platform itself, instead programmatic access. Thus, you will get a serverless event-driven or reactive ecosystem with ease (just take care of concurrency limits on serverless functions and timeouts). You can even make deployments and cloud configurations expressive and atomic using Infrastructure as Code (Terraform, Ansible, AWS CloudFormation, ...).