DEV Community

Hashiguchi Ryo
Hashiguchi Ryo

Posted on • Updated on

Asynchronous Request and Connection Pooling in Elixir with Cizen

Introduction

Yesterday, I was really excited to write example Elixir codes with Cizen for this post. Cizen is a library to build applications with automata, events, (and fun!). With cizen, it becomes easier to create highly concurrent, monitorable, and extensible applications. I believe that Cizen has the potential to make our software development more fun as like Elixir does. In this post, we will build an API client which implements asynchronous request and connection pooling with Cizen.

Automata and Events

With using Cizen, we can use automata (plural of automaton) instead of processes. Automata communicate each other by subscribing and dispatching events in contrast with processes use direct messaging with pid.

API

In this article, we suppose that we need to get articles for each category from an API. The API works like the following:

  • get_category_id with a body "Elixir" (category name) returns a response with a body "01" (category ID)
  • get_articles with a body "01" (category ID) returns a response with a body ["Let's Try Elixir", "Implement an API with Elixir"] (list of articles)

Let's Get Ready

To get ready, run the following commands:

$ mix new client
$ cd articles

And add Cizen to dependencies:

# in mix.exs

  defp deps do
    [
      {:cizen, "~> 0.12.1"},
    ]
  end

Also, you need to prepare a simple mock of the API:

# in a new file request.exs

defmodule API do
  @categories %{
    "Elixir" => "01",
    "Nature" => "02",
    "Cizen" => "03"
  }

  @articles %{
    "01" => ["Let's Try Elixir", "Implement an API with Elixir"],
    "02" => ["Beautiful Nature"],
    "03" => ["Getting Started with Cizen", "Asynchronous Request with Cizen"]
  }

  def get_category_id(text) do
    :timer.sleep(:rand.uniform(10) * 50) # Delay for an API call
    @categories[text]
  end

  def get_articles(category) do
    :timer.sleep(:rand.uniform(10) * 50) # Delay for an API call
    @articles[category]
  end
end

Single Connection

Let's create Connection automaton. Connection automaton repeatedly receives Connection.Request event and dispatches Connection.Request.Reponse event for each request.

First, create Connection module and define those events:

# append the following to request.exs

defmodule Connection do
  defmodule Request do
    defstruct [:connection_id, :type, :body]

    use Cizen.Request
    defresponse Response, :request_id do
      defstruct [:request_id, :body]
    end
  end
end

use Cizen.Request makes Connection.Request requestive, and defresponse defines its response event.

Next, add the implementation of Connection automaton.

# append the following into Connection module

use Cizen.Automaton

defstruct []

alias Cizen.{Event, Filter}
alias Cizen.Effects.{Dispatch, Receive, Subscribe}

def spawn(id, %__MODULE__{}) do
  # Subscribe Connection.Request events for this connection.
  perform id, %Subscribe{
    event_filter: Filter.new(
      fn %Event{body: %Connection.Request{connection_id: value}} -> value == id end
    )
  }

  :loop
end

def yield(id, :loop) do
  request_event = perform id, %Receive{}
  %Request{type: type, body: body} = request_event.body

  # Calls the API
  response = apply(API, type, [body])

  perform id, %Dispatch{
    body: %Connection.Request.Response{
      request_id: request_event.id,
      body: response
    }
  }

  :loop
end

spawn/2 is called when the automaton is created, and yield/2 is called repeatedly until the automaton finishes. In yield/2, Connection handles Connection.Request and dispatches Connection.Request.Response one by one.

Now, we can use Connection to request to the API. Add the following code to run requests.

# append the following to request.exs

defmodule Main do
  use Cizen.Effectful

  def main do
    alias Cizen.Effects.{Request, Start}

    connection_id = handle fn id ->
      perform id, %Start{saga: %Connection{}}
    end

    ["Elixir", "Nature", "Cizen"]
    |> Enum.map(fn category ->
      fn ->
        handle fn id ->
          # Request to get a category ID
          response_event = perform id, %Request{body: %Connection.Request{
            connection_id: connection_id,
            type: :get_category_id,
            body: category
          }}
          %Connection.Request.Response{body: category_id} = response_event.body

          # Request to get articles
          response_event = perform id, %Request{body: %Connection.Request{
            connection_id: connection_id,
            type: :get_articles,
            body: category_id
          }}
          %Connection.Request.Response{body: articles} = response_event.body

          # Returns the articles
          articles
        end
      end
    end)
    |> Enum.map(&Task.async(&1))
    |> Enum.map(&Task.await(&1, :infinity))
    |> Enum.map(&IO.inspect(&1))
  end
end

# Run
Main.main()

Main.main/0 tries to get articles for three categories "Elixir", "Nature", and "Cizen", and the three processes are asynchronously run with using Task.async and Task.await.

Now, we can run request.exs with:

$ mix run request.exs

It outputs:

["Let's Try Elixir", "Implement an API with Elixir"]
["Beautiful Nature"]
["Getting Started with Cizen", "Asynchronous Request with Cizen"]

Request Logger

Before we use the connection pool, we should create a request logger to monitor the complex asynchronous requests.

Add RequestLogger:

# append the following above Main module in request.exs

defmodule RequestLogger do
  use Cizen.Automaton
  alias Cizen.{Event, Filter}
  alias Cizen.Effects.{All, Receive, Subscribe}
  alias Cizen.StartSaga

  defstruct []

  def spawn(id, %__MODULE__{}) do
    perform id, %All{effects: [
      %Subscribe{
        event_filter: Filter.new(
          fn %Event{body: %StartSaga{saga: %Connection{}}} -> true end
        )
      },
      %Subscribe{
        event_filter: Filter.new(
          fn %Event{body: %Connection.Request{}} -> true end
        )
      },
      %Subscribe{
        event_filter: Filter.new(
          fn %Event{body: %Connection.Request.Response{}} -> true end
        )
      },
    ]}

    %{
      connections: %{}, # connection ID => connection number
      requests: %{} # request ID => connection number
    }
  end

  defp tabs(n), do: Stream.cycle([" "]) |> Enum.take(20*n) |> Enum.join()
  def yield(id, state) do
    event = perform id, %Receive{}
    case event.body do
      %StartSaga{id: connection_id} ->
        put_in(state.connections[connection_id], Map.size(state.connections))
      %Connection.Request{connection_id: connection_id} = request ->
        connection_number = state.connections[connection_id]
        IO.puts("""
        #{tabs(connection_number)}request
        #{tabs(connection_number)}#{request.type}
        #{tabs(connection_number)}body: #{request.body}
        """)
        request_id = event.id
        put_in(state.requests[request_id], connection_number)
      %Connection.Request.Response{request_id: request_id} = response ->
        {connection_number, state} = pop_in(state.requests[request_id])
        IO.puts("""
        #{tabs(connection_number)}response
        #{tabs(connection_number)}#{inspect response.body}
        """)
        state
    end
  end
end

It looks a little bit difficult, but it simply logs Connection.Request event and Connection.Request.Response event to stdout. Also, it uses the horizontal axis for showing which connection is used and the vertical axis for the time sequence.

To use the logger, update Main.main/0;

  • Add perform id, %Start{saga: %RequestLogger{}} above the line starting the connection:
    connection_id = handle fn id ->
      perform id, %Start{saga: %RequestLogger{}} # here
      perform id, %Start{saga: %Connection{}}
    end
  • And, remove |> Enum.map(&IO.inspect(&1)) in order to avoid confusing with logging outputs:
    |> Enum.map(&Task.async(&1))
    |> Enum.map(&Task.await(&1, :infinity))
    # |> Enum.map(&IO.inspect(&1)) removed

Now, $ mix run request.exs outputs something like the following:

request
get_category_id
body: Nature

request
get_category_id
body: Cizen

request
get_category_id
body: Elixir

response
"02"

request
get_articles
body: 02

response
"03"

request
get_articles
body: 03

response
"01"

request
get_articles
body: 01

response
["Beautiful Nature"]

response
["Getting Started with Cizen", "Asynchronous Request with Cizen"]

response
["Let's Try Elixir", "Implement an API with Elixir"]

It shows that Connection.Request events are asynchronously dispatched, and the connection handles them in order. In the next section, we use multiple connections with connection pooling.

Connection Pooling

Now, let's create ConnectionPool automaton. It queues pushed connections and lends them with borrow event.

First, define Connection module and define related events:

# append the following above Main module in request.exs

defmodule ConnectionPool do
  defmodule BorrowConnection do
    defstruct []
    use Cizen.Request
    defresponse Lend, :borrow_id do
      defstruct [:borrow_id, :connection_id]
    end
  end

  defmodule PushConnection, do: defstruct [:connection_id]
end

ConnectionPool.BorrowConnection is a requestive event and ConnectionPool.BorrowConnection.Lend is its response event.

ConnectionPool.PushConnection is an event for registering a new connection or returning a borrowed connection.

Next, add implementation of ConnectionPool automaton.

# append the following into the ConnectionPool module

use Cizen.Automaton
alias Cizen.{Event, Filter}
defstruct []

alias Cizen.Effects.{All, Dispatch, Receive, Subscribe}

def spawn(id, %__MODULE__{}) do
  perform id, %All{effects: [
    %Subscribe{
      event_filter: Filter.new(fn %Event{body: %BorrowConnection{}} -> true end)
    },
    %Subscribe{
      event_filter: Filter.new(fn %Event{body: %PushConnection{}} -> true end)
    }
  ]}

  :loop
end

def yield(id, :loop) do
  borrow_event = perform id, %Receive{
    event_filter: Filter.new(fn %Event{body: %BorrowConnection{}} -> true end)
  }

  push_event = perform id, %Receive{
    event_filter: Filter.new(fn %Event{body: %PushConnection{}} -> true end)
  }
  %PushConnection{connection_id: connection_id} = push_event.body

  perform id, %Dispatch{
    body: %BorrowConnection.Lend{
      borrow_id: borrow_event.id,
      connection_id: connection_id
    }
  }

  :loop
end

Finally, update Main.main/0 to use multiple connections with ConnectionPool:

# replace Main.main/0 with the following

def main do
  alias Cizen.Effects.{Dispatch, Request, Start}

  handle fn id ->
    perform id, %Start{saga: %RequestLogger{}}
    perform id, %Start{saga: %ConnectionPool{}}
    # Starts 3 connections
    for _ <- 1..3 do
      connection_id = perform id, %Start{saga: %Connection{}}
      perform id, %Dispatch{
        body: %ConnectionPool.PushConnection{connection_id: connection_id}
      }
    end
  end

  ["Elixir", "Nature", "Cizen"]
  |> Enum.map(fn category ->
    fn ->
      handle fn id ->
        # Borrow a connection
        lend_event = perform id, %Request{body: %ConnectionPool.BorrowConnection{}}
        connection_id = lend_event.body.connection_id
        # Request
        response_event = perform id, %Request{body: %Connection.Request{
          connection_id: connection_id,
          type: :get_category_id,
          body: category
        }}
        %Connection.Request.Response{body: category_id} = response_event.body
        # Return the connection
        perform id, %Dispatch{
          body: %ConnectionPool.PushConnection{connection_id: connection_id}
        }

        # Borrow a connection
        lend_event = perform id, %Request{body: %ConnectionPool.BorrowConnection{}}
        connection_id = lend_event.body.connection_id
        # Request
        response_event = perform id, %Request{body: %Connection.Request{
          connection_id: connection_id,
          type: :get_articles,
          body: category_id
        }}
        %Connection.Request.Response{body: articles} = response_event.body
        # Return the connection
        perform id, %Dispatch{
          body: %ConnectionPool.PushConnection{connection_id: connection_id}
        }

        articles
      end
    end
  end)
  |> Enum.map(&Task.async(&1))
  |> Enum.map(&Task.await(&1))
end

Now, $ mix run request.exs outputs something like the following:

request
get_category_id
body: Cizen

                    request
                    get_category_id
                    body: Elixir

                                        request
                                        get_category_id
                                        body: Nature

response
"03"

request
get_articles
body: 03

                    response
                    "01"

                    request
                    get_articles
                    body: 01

                                        response
                                        "02"

                                        request
                                        get_articles
                                        body: 02

                                        response
                                        ["Beautiful Nature"]

                    response
                    ["Let's Try Elixir", "Implement an API with Elixir"]

response
["Getting Started with Cizen", "Asynchronous Request with Cizen"]

We made it! The requests are concurrently handled by the 3 connections.

Code Readability

I assume that you noticed that the above code of Main.main/0 is too complex, and it may be because we have two steps of perform/2 for starting a connection, and three steps for requesting. In such a situation Chain effect and Map effect will help us to make our code clear to understand.

First, add Main.start_connection/0:

# add the following into the Main module

defp start_connection do
  alias Cizen.Effects.{Chain, Dispatch, Start}
  %Chain{effects: [
    %Start{saga: %Connection{}},
    fn connection_id ->
      %Dispatch{
        body: %ConnectionPool.PushConnection{connection_id: connection_id}
      }
    end
  ]}
end

It joins the two effects, one for starting a connection and the other for dispatching ConnectionPool.PushConnection.

Next, add Main.request/2:

# add the following into the Main module

defp request(type, body) do
  alias Cizen.Effects.{Chain, Dispatch, Map, Request}
  effect = %Chain{effects: [
    %Request{body: %ConnectionPool.BorrowConnection{}},
    fn lend_event ->
      connection_id = lend_event.body.connection_id
      %Request{body: %Connection.Request{
        connection_id: connection_id,
        type: type,
        body: body
      }}
    end,
    fn lend_event, _ ->
      connection_id = lend_event.body.connection_id
      %Dispatch{
        body: %ConnectionPool.PushConnection{connection_id: connection_id}
      }
    end
  ]}
  %Map{
    effect: effect,
    transform: fn [_, response_event, _] -> response_event end
  }
end

It joins the three effects, one for borrowing a connection, another for requesting, and the other for returning the connection. After that, it also transforms the result so that perform id, request(type, body) will return the response for the request.

Finally, update Main.main/0 to use them.

# replace Main.main/0 with the following

def main do
  alias Cizen.Effects.Start

  handle fn id ->
    perform id, %Start{saga: %RequestLogger{}}
    perform id, %Start{saga: %ConnectionPool{}}
    # Starts 3 connections
    for _ <- 1..3 do
      perform id, start_connection()
    end
  end

  ["Elixir", "Nature", "Cizen"]
  |> Enum.map(fn category ->
    fn ->
      handle fn id ->
        response_event = perform id, request(:get_category_id, category)
        %Connection.Request.Response{body: category_id} = response_event.body

        response_event = perform id, request(:get_articles, category_id)
        %Connection.Request.Response{body: articles} = response_event.body

        articles
      end
    end
  end)
  |> Enum.map(&Task.async(&1))
  |> Enum.map(&Task.await(&1, :infinity))
end

Now, our code is more readable than the old one.

Links

Top comments (0)