While researching Gleam and Elixir, I came across this architectural pattern which I later discovered is used in other languages as well. Fascinated by the promise of clean and robust development, I tried to create a library called Cinecity to better understand it, and to deepen my knowledge of Go with channels.
Interactions within the application
The basic idea is intuitive and already seen in other frameworks:
isolated components, manage their own state, exchanging only messages.
The postman and actors
Going deeper into the library, the first structure to instantiate is Postman, which is the manager of the actors map in play and handles message forwarding using channels.
The next step is to create actors by providing the structure that handles the state, which must implement the StateProcessor interface, and the address to reach the actor itself.
// init postman for first
actor.InitPostman()
// set an address for actor
whAddr := actor.NewAddress("local", "warehouse")
// spawn an actor by an address and its own state
warehouseActor, err := actor.RegisterActor(
whAddr,
NewWarehouseState(),
)
// prepare the message for warehouse actor
msg := actor.NewMessage(
whAddr,
nil,
AddNewProductPayload{Product{Code: "ABC", Quantity: 5}},
)
// send the message
err := actor.SendMessage(msg)
Processing messages
Messages forwarded to the actor will be processed by distinguishing the message content by evaluating its type. This is handled by Process, which is the most important method exposed by StateProcessor. Let's see an example with two types of messages handled where the second returns a response to the sender:
func (sp *stateProcessor) Process(msg actor.Message) {
switch body := msg.Body.(type) {
case AddNewProduct:
createProduct(body.Name, body.Category)
case GetProduct:
prdBody, err := getProduct(body.ProductID)
if msg.WithResponse {
returnMsg := actor.NewReturnMessage(prdBody, msg, err)
msg.ResponseChan <- returnMsg
}
}
}
Batch management
We might need to process a message not immediately but by aggregating multiple messages and then proceeding with processing to avoid overloading state updates: in this case we can use the Batcher:
// on init state processor function create a new batcher
// set timeout, max messages for starting process, handler function
b := batch.NewBatcher(5000, 5, s.updateAggregateState)
stateProcessor.batcher = b
// on process message
func (state *StateProcessor) Process(msg actor.Message) {
switch msg.Body.(type) {
case store.StoreEventAddedBody:
state.batcher.Add(msg)
}
}
Broadcast a message to many
To send a message to multiple actors at once, we can use the BroadcastMessage method which also accepts a filter to select the actors to reach, represented by the area defined in the address.
msg := actor.NewBroadcastMessage(fromAddr, "broadcast message body")
numSent := actor.BroadcastMessage(msg, "warehouse")
Subscribe to a message
We might need to be notified when a certain event occurs in an actor's state. To achieve this behavior, we need to initialize a helper structure named Subscriptions which is itself an actor. Our StateProcessor will instantiate this actor to pass all messages that arrive to it, but the subscriber will only handle those reserved for it like AddSubscriptionMessageBody and RemoveSubscriptionMessageBody. The StateProcessor will instead decide what the condition is to notify its subscribers.
func (sp *stateProcessor) Process(msg actor.Message) {
// manage add and remove subscription
m.notifier.Process(msg)
switch payload := msg.Body.(type) {
// notify subscribers
case TriggerSubscriptionNotifierBodyMsg:
subsMsg := subscriber.NewSubscribersMessage(msg.To, "hello subscribers!")
m.notifier.NotifySubscribers(subsMsg)
Send messages to other applications
In addition to internal application usage, you can also use the configuration option to connect other applications using a NATS server.
// create NATS connection (this must do for every app that wants receive and send remotely)
natsToken := os.Getenv("NATS_SECRET")
nc := nats.Connect(nats.DefaultURL, nats.Token(natsToken))
// define body type that will be received from remote apps
reg := actor.EnvelopePayloadTypeRegistry{
"domain.MsgBody": reflect.TypeOf(domain.MsgBody{}),
}
// init postman with outbound option
actor.InitPostman(actor.WithOutboundMessageService("dept-products", nc, reg))
// init a outbound address
toAddress := actor.NewOutboundAddress("dept-warehouse", "warehouse", "actor-dispatcher")
fromAddress := actor.NewAddress("local", "actor-product")
rmsg := actor.NewMessage(
toAddress,
fromAddress,
MsgBody{Text: "hello from products dept"},
)
err = actor.SendMessage(rmsg)
Let it crash?
In Erlang/Elixir, one of the characteristics that made it famous for its robustness is that processes are truly separated and a crash of one of them can be handled by a supervisor that coordinates whether it should be restarted and how. This is obviously missing in Cinecity since the application is a single process and channels only serve for coordinating concurrency in message handling, while errors are returned where possible.
Usage
For now, my usage has been modest, so I cannot yet give a judgment on effectiveness, development experience, and performance. So far, the experience during its implementation has been very interesting even though the patterns were simple (message sending, subscribers, batcher).
If you are interested in the subject, I invite you to also look at other well-made libraries like Hollywood, actor-model, but also Watermill, and if you are interested, I gladly accept collaborations, PRs and issues.

Top comments (0)