Introduction
Yesterday, I was really excited to write example Elixir codes with Cizen for this post. Cizen is a library to build applications with automata, events, (and fun!). With cizen, it becomes easier to create highly concurrent, monitorable, and extensible applications. I believe that Cizen has the potential to make our software development more fun as like Elixir does. In this post, we will build an API client which implements asynchronous request and connection pooling with Cizen.
Automata and Events
With using Cizen, we can use automata (plural of automaton) instead of processes. Automata communicate each other by subscribing and dispatching events in contrast with processes use direct messaging with pid.
API
In this article, we suppose that we need to get articles for each category from an API. The API works like the following:
- get_category_id with a body
"Elixir"
(category name) returns a response with a body"01"
(category ID) - get_articles with a body
"01"
(category ID) returns a response with a body["Let's Try Elixir", "Implement an API with Elixir"]
(list of articles)
Let's Get Ready
To get ready, run the following commands:
$ mix new client
$ cd articles
And add Cizen to dependencies:
# in mix.exs
defp deps do
[
{:cizen, "~> 0.12.1"},
]
end
Also, you need to prepare a simple mock of the API:
# in a new file request.exs
defmodule API do
@categories %{
"Elixir" => "01",
"Nature" => "02",
"Cizen" => "03"
}
@articles %{
"01" => ["Let's Try Elixir", "Implement an API with Elixir"],
"02" => ["Beautiful Nature"],
"03" => ["Getting Started with Cizen", "Asynchronous Request with Cizen"]
}
def get_category_id(text) do
:timer.sleep(:rand.uniform(10) * 50) # Delay for an API call
@categories[text]
end
def get_articles(category) do
:timer.sleep(:rand.uniform(10) * 50) # Delay for an API call
@articles[category]
end
end
Single Connection
Let's create Connection
automaton. Connection
automaton repeatedly receives Connection.Request
event and dispatches Connection.Request.Reponse
event for each request.
First, create Connection
module and define those events:
# append the following to request.exs
defmodule Connection do
defmodule Request do
defstruct [:connection_id, :type, :body]
use Cizen.Request
defresponse Response, :request_id do
defstruct [:request_id, :body]
end
end
end
use Cizen.Request
makes Connection.Request
requestive, and defresponse
defines its response event.
Next, add the implementation of Connection
automaton.
# append the following into Connection module
use Cizen.Automaton
defstruct []
alias Cizen.{Event, Filter}
alias Cizen.Effects.{Dispatch, Receive, Subscribe}
def spawn(id, %__MODULE__{}) do
# Subscribe Connection.Request events for this connection.
perform id, %Subscribe{
event_filter: Filter.new(
fn %Event{body: %Connection.Request{connection_id: value}} -> value == id end
)
}
:loop
end
def yield(id, :loop) do
request_event = perform id, %Receive{}
%Request{type: type, body: body} = request_event.body
# Calls the API
response = apply(API, type, [body])
perform id, %Dispatch{
body: %Connection.Request.Response{
request_id: request_event.id,
body: response
}
}
:loop
end
spawn/2
is called when the automaton is created, and yield/2
is called repeatedly until the automaton finishes. In yield/2
, Connection
handles Connection.Request
and dispatches Connection.Request.Response
one by one.
Now, we can use Connection
to request to the API. Add the following code to run requests.
# append the following to request.exs
defmodule Main do
use Cizen.Effectful
def main do
alias Cizen.Effects.{Request, Start}
connection_id = handle fn id ->
perform id, %Start{saga: %Connection{}}
end
["Elixir", "Nature", "Cizen"]
|> Enum.map(fn category ->
fn ->
handle fn id ->
# Request to get a category ID
response_event = perform id, %Request{body: %Connection.Request{
connection_id: connection_id,
type: :get_category_id,
body: category
}}
%Connection.Request.Response{body: category_id} = response_event.body
# Request to get articles
response_event = perform id, %Request{body: %Connection.Request{
connection_id: connection_id,
type: :get_articles,
body: category_id
}}
%Connection.Request.Response{body: articles} = response_event.body
# Returns the articles
articles
end
end
end)
|> Enum.map(&Task.async(&1))
|> Enum.map(&Task.await(&1, :infinity))
|> Enum.map(&IO.inspect(&1))
end
end
# Run
Main.main()
Main.main/0
tries to get articles for three categories "Elixir", "Nature", and "Cizen", and the three processes are asynchronously run with using Task.async
and Task.await
.
Now, we can run request.exs
with:
$ mix run request.exs
It outputs:
["Let's Try Elixir", "Implement an API with Elixir"]
["Beautiful Nature"]
["Getting Started with Cizen", "Asynchronous Request with Cizen"]
Request Logger
Before we use the connection pool, we should create a request logger to monitor the complex asynchronous requests.
Add RequestLogger
:
# append the following above Main module in request.exs
defmodule RequestLogger do
use Cizen.Automaton
alias Cizen.{Event, Filter}
alias Cizen.Effects.{All, Receive, Subscribe}
alias Cizen.StartSaga
defstruct []
def spawn(id, %__MODULE__{}) do
perform id, %All{effects: [
%Subscribe{
event_filter: Filter.new(
fn %Event{body: %StartSaga{saga: %Connection{}}} -> true end
)
},
%Subscribe{
event_filter: Filter.new(
fn %Event{body: %Connection.Request{}} -> true end
)
},
%Subscribe{
event_filter: Filter.new(
fn %Event{body: %Connection.Request.Response{}} -> true end
)
},
]}
%{
connections: %{}, # connection ID => connection number
requests: %{} # request ID => connection number
}
end
defp tabs(n), do: Stream.cycle([" "]) |> Enum.take(20*n) |> Enum.join()
def yield(id, state) do
event = perform id, %Receive{}
case event.body do
%StartSaga{id: connection_id} ->
put_in(state.connections[connection_id], Map.size(state.connections))
%Connection.Request{connection_id: connection_id} = request ->
connection_number = state.connections[connection_id]
IO.puts("""
#{tabs(connection_number)}request
#{tabs(connection_number)}#{request.type}
#{tabs(connection_number)}body: #{request.body}
""")
request_id = event.id
put_in(state.requests[request_id], connection_number)
%Connection.Request.Response{request_id: request_id} = response ->
{connection_number, state} = pop_in(state.requests[request_id])
IO.puts("""
#{tabs(connection_number)}response
#{tabs(connection_number)}#{inspect response.body}
""")
state
end
end
end
It looks a little bit difficult, but it simply logs Connection.Request
event and Connection.Request.Response
event to stdout. Also, it uses the horizontal axis for showing which connection is used and the vertical axis for the time sequence.
To use the logger, update Main.main/0
;
- Add
perform id, %Start{saga: %RequestLogger{}}
above the line starting the connection:
connection_id = handle fn id ->
perform id, %Start{saga: %RequestLogger{}} # here
perform id, %Start{saga: %Connection{}}
end
- And, remove
|> Enum.map(&IO.inspect(&1))
in order to avoid confusing with logging outputs:
|> Enum.map(&Task.async(&1))
|> Enum.map(&Task.await(&1, :infinity))
# |> Enum.map(&IO.inspect(&1)) removed
Now, $ mix run request.exs
outputs something like the following:
request
get_category_id
body: Nature
request
get_category_id
body: Cizen
request
get_category_id
body: Elixir
response
"02"
request
get_articles
body: 02
response
"03"
request
get_articles
body: 03
response
"01"
request
get_articles
body: 01
response
["Beautiful Nature"]
response
["Getting Started with Cizen", "Asynchronous Request with Cizen"]
response
["Let's Try Elixir", "Implement an API with Elixir"]
It shows that Connection.Request
events are asynchronously dispatched, and the connection handles them in order. In the next section, we use multiple connections with connection pooling.
Connection Pooling
Now, let's create ConnectionPool
automaton. It queues pushed connections and lends them with borrow event.
First, define Connection
module and define related events:
# append the following above Main module in request.exs
defmodule ConnectionPool do
defmodule BorrowConnection do
defstruct []
use Cizen.Request
defresponse Lend, :borrow_id do
defstruct [:borrow_id, :connection_id]
end
end
defmodule PushConnection, do: defstruct [:connection_id]
end
ConnectionPool.BorrowConnection
is a requestive event and ConnectionPool.BorrowConnection.Lend
is its response event.
ConnectionPool.PushConnection
is an event for registering a new connection or returning a borrowed connection.
Next, add implementation of ConnectionPool
automaton.
# append the following into the ConnectionPool module
use Cizen.Automaton
alias Cizen.{Event, Filter}
defstruct []
alias Cizen.Effects.{All, Dispatch, Receive, Subscribe}
def spawn(id, %__MODULE__{}) do
perform id, %All{effects: [
%Subscribe{
event_filter: Filter.new(fn %Event{body: %BorrowConnection{}} -> true end)
},
%Subscribe{
event_filter: Filter.new(fn %Event{body: %PushConnection{}} -> true end)
}
]}
:loop
end
def yield(id, :loop) do
borrow_event = perform id, %Receive{
event_filter: Filter.new(fn %Event{body: %BorrowConnection{}} -> true end)
}
push_event = perform id, %Receive{
event_filter: Filter.new(fn %Event{body: %PushConnection{}} -> true end)
}
%PushConnection{connection_id: connection_id} = push_event.body
perform id, %Dispatch{
body: %BorrowConnection.Lend{
borrow_id: borrow_event.id,
connection_id: connection_id
}
}
:loop
end
Finally, update Main.main/0
to use multiple connections with ConnectionPool
:
# replace Main.main/0 with the following
def main do
alias Cizen.Effects.{Dispatch, Request, Start}
handle fn id ->
perform id, %Start{saga: %RequestLogger{}}
perform id, %Start{saga: %ConnectionPool{}}
# Starts 3 connections
for _ <- 1..3 do
connection_id = perform id, %Start{saga: %Connection{}}
perform id, %Dispatch{
body: %ConnectionPool.PushConnection{connection_id: connection_id}
}
end
end
["Elixir", "Nature", "Cizen"]
|> Enum.map(fn category ->
fn ->
handle fn id ->
# Borrow a connection
lend_event = perform id, %Request{body: %ConnectionPool.BorrowConnection{}}
connection_id = lend_event.body.connection_id
# Request
response_event = perform id, %Request{body: %Connection.Request{
connection_id: connection_id,
type: :get_category_id,
body: category
}}
%Connection.Request.Response{body: category_id} = response_event.body
# Return the connection
perform id, %Dispatch{
body: %ConnectionPool.PushConnection{connection_id: connection_id}
}
# Borrow a connection
lend_event = perform id, %Request{body: %ConnectionPool.BorrowConnection{}}
connection_id = lend_event.body.connection_id
# Request
response_event = perform id, %Request{body: %Connection.Request{
connection_id: connection_id,
type: :get_articles,
body: category_id
}}
%Connection.Request.Response{body: articles} = response_event.body
# Return the connection
perform id, %Dispatch{
body: %ConnectionPool.PushConnection{connection_id: connection_id}
}
articles
end
end
end)
|> Enum.map(&Task.async(&1))
|> Enum.map(&Task.await(&1))
end
Now, $ mix run request.exs
outputs something like the following:
request
get_category_id
body: Cizen
request
get_category_id
body: Elixir
request
get_category_id
body: Nature
response
"03"
request
get_articles
body: 03
response
"01"
request
get_articles
body: 01
response
"02"
request
get_articles
body: 02
response
["Beautiful Nature"]
response
["Let's Try Elixir", "Implement an API with Elixir"]
response
["Getting Started with Cizen", "Asynchronous Request with Cizen"]
We made it! The requests are concurrently handled by the 3 connections.
Code Readability
I assume that you noticed that the above code of Main.main/0
is too complex, and it may be because we have two steps of perform/2
for starting a connection, and three steps for requesting. In such a situation Chain
effect and Map
effect will help us to make our code clear to understand.
First, add Main.start_connection/0
:
# add the following into the Main module
defp start_connection do
alias Cizen.Effects.{Chain, Dispatch, Start}
%Chain{effects: [
%Start{saga: %Connection{}},
fn connection_id ->
%Dispatch{
body: %ConnectionPool.PushConnection{connection_id: connection_id}
}
end
]}
end
It joins the two effects, one for starting a connection and the other for dispatching ConnectionPool.PushConnection
.
Next, add Main.request/2
:
# add the following into the Main module
defp request(type, body) do
alias Cizen.Effects.{Chain, Dispatch, Map, Request}
effect = %Chain{effects: [
%Request{body: %ConnectionPool.BorrowConnection{}},
fn lend_event ->
connection_id = lend_event.body.connection_id
%Request{body: %Connection.Request{
connection_id: connection_id,
type: type,
body: body
}}
end,
fn lend_event, _ ->
connection_id = lend_event.body.connection_id
%Dispatch{
body: %ConnectionPool.PushConnection{connection_id: connection_id}
}
end
]}
%Map{
effect: effect,
transform: fn [_, response_event, _] -> response_event end
}
end
It joins the three effects, one for borrowing a connection, another for requesting, and the other for returning the connection. After that, it also transforms the result so that perform id, request(type, body)
will return the response for the request.
Finally, update Main.main/0
to use them.
# replace Main.main/0 with the following
def main do
alias Cizen.Effects.Start
handle fn id ->
perform id, %Start{saga: %RequestLogger{}}
perform id, %Start{saga: %ConnectionPool{}}
# Starts 3 connections
for _ <- 1..3 do
perform id, start_connection()
end
end
["Elixir", "Nature", "Cizen"]
|> Enum.map(fn category ->
fn ->
handle fn id ->
response_event = perform id, request(:get_category_id, category)
%Connection.Request.Response{body: category_id} = response_event.body
response_event = perform id, request(:get_articles, category_id)
%Connection.Request.Response{body: articles} = response_event.body
articles
end
end
end)
|> Enum.map(&Task.async(&1))
|> Enum.map(&Task.await(&1, :infinity))
end
Now, our code is more readable than the old one.
Top comments (0)