DEV Community

Alexandr K
Alexandr K

Posted on • Updated on

Elixir TCP server and client example

This post is partially about unfinished project observerio that I don't feel I have enough free time to finish in these days.

I've been working on the project to get the the real time testing of the different strategies for the games and as result the idea came into the project that's having http api, tcp server, ember dashboard and core library based on go(using xgo compilation for cross platform support).

Planning to share the few examples that it could be useful for someone.

If someone asked me about the chosen Elixir language, I can't explain feelings why I decided to use it on the backend. Probably I was inspired by previous experience in my startup app musicfeed. Obviously for me Elixir has pretty cool syntax, community, documentation. But the concepts around gen_server and other tools are pretty complex for understanding if you decide to choose it as the first language to learn. If someone tell you that you don't need to learn erlang it would be partially true while the number of libraries for Elixir is growing so fast.

I've decided to use tcp server and client to send variables and logs from mobile games via internal sdk. When we receive the variables or logs tcp server publish messages via pubsub (gateway) to subscribed websocket handler. In case if websocket connected(alive clients) it will automatically publish the changes to ember dashboard. Probably it sounds like the idea to play more than to have it in production but I did it anyway for fun :)

Lets review tcp server implementation(please feel free to apply changes if you see the possible issues, it's opensourced):

tcp.ex

  • define supervisors for tcp server and client(should be used for create the communication between websocket via pubsub and tcp client)

defmodule Web.Tcp.ServerSupervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: :tcp_server_supervisor)
  end

  def init(_) do
    children = [
      worker(Web.Tcp.Server, [])
    ]

    supervise(children, strategy: :one_for_one)
  end
end

defmodule Web.Tcp.ClientSupervisor do
  use Supervisor

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: :tcp_client_supervisor)
  end

  def start_client(token) do
    Supervisor.start_child(:tcp_client_supervisor, [token])
  end

  def init(_) do
    children = [
      worker(Web.Tcp.Client, [])
    ]

    # We also changed the `strategy` to `simple_one_for_one`.
    # With this strategy, we define just a "template" for a child,
    # no process is started during the Supervisor initialization, just
    # when we call `start_child/2`
    supervise(children, strategy: :simple_one_for_one)
  end
end
Enter fullscreen mode Exit fullscreen mode
  • define server worker and pass module name Web.Tcp.Handler as entrypoint on running processing for connected client
defmodule Web.Tcp.Server do
  require Logger

  def start_link do
    Logger.debug("[tcp] starting server on port :#{_port()}")
    opts = [port: _port()]
    {:ok, _} = :ranch.start_listener(
      :tcp, _acceptors_size(), :ranch_tcp, opts, Web.Tcp.Handler, [])
  end

  def _port do
    String.to_integer Application.get_env(:web, :tcp_port)
  end

  def _acceptors_size do
    String.to_integer Application.get_env(:web, :tcp_acceptors_size)
  end
end
Enter fullscreen mode Exit fullscreen mode
  • define module that would work as worker for connected tcp client. On init it will subscribe to pubsub channel by api key(per user). It will allow to receive the messages from pubsub and send back to the tcp client socket.
defmodule Web.Tcp.Client do
  require Logger
  require Poison

  alias Web.Pubsub

  def start_link(token) do
    GenServer.start_link(__MODULE__, token, name: String.to_atom(token))
  end

  def init(token) do
    Pubsub.subscribe("#{token}:vars:callback")
    {:ok, %{token: token, messages: []}}
  end

  def handle_info(%{vars: vars}, %{token: token, messages: messages} = state) do
    Logger.debug("[tcp.client] received message: #{inspect(vars)}")
    message = _pack(token, "vars", vars)
    Logger.debug("[tcp.client] packed message: #{inspect(message)}")

    messages = messages ++ [message]

    Logger.debug("[tcp.client] begin send message: #{inspect(messages)}")
    state = token |> _get_socket |> _send_back(messages, state)
    Logger.debug("[tcp.client] done send message: #{inspect(messages)}")

    Logger.debug("[tcp.client] messages: #{inspect(messages)}")

    {:noreply, state}
  end

  def terminate(reason, status) do
    Logger.debug("[tcp.client] reason: #{inspect(reason)}, status: #{inspect(status)}")
    :ok
  end

  defp _send_back({:ok, socket}, messages, state) do
    :ok = _send(socket, messages)
    %{state | messages: []}
  end
  defp _send_back(:enqueue, messages, state) do
    %{state | messages: messages}
  end

  defp _send(s, []), do: :ok
  defp _send({socket, transport} = s, [message | messages]) do
    transport.send(socket, message)
    _send(s, messages)
  end

  def _pack(token, "vars", vars) do
    vars = vars
    |> Poison.encode!
    |> Base.encode64

    "v:#{token}:#{vars}\n"
  end

  defp _get_socket(token) do
    Logger.debug("[tcp.socket] search for socket, transport by token: #{inspect(token)}")
    response = case Registry.lookup(Registry.Sockets, token) do
      [{_, socket}] -> {:ok, socket}
      [] -> :enqueue
    end
    Logger.debug("[tcp.client] _get_socket: #{inspect(response)}")
    response
  end
end
Enter fullscreen mode Exit fullscreen mode
  • tcp handler on receive will register socket in registry and start the new client worker to subscribe to pubsub channel by api key and have communication between pubsub and tcp client.
defmodule Web.Tcp.Handler do
  require Logger

  alias Web.Pubsub

  @moduledoc """
  `Handler` is waiting lines separated by \n new line, in case if handler don't
  see new line it starts to accumulate data until it receives new line.

  `Registry.Sockets` contains api_key -> socket records for easy back communication
  from dashboard page to tcp clients.
  """

  def start_link(ref, socket, transport, opts) do
    pid = spawn_link(__MODULE__, :init, [ref, socket, transport, opts])
    {:ok, pid}
  end

  def init(ref, socket, transport, _opts = []) do
    :ok = :ranch.accept_ack(ref)

    case transport.peername(socket) do
      {:ok, _peer} -> loop(socket, transport, "")
      {:error, reason} -> Logger.error("[tcp.handler] init receive error reason: #{inspect(reason)}")
    end
  end

  @timeout 5_000

  def loop(socket, transport, acc) do
    # Don't flood messages of transport, receive once and leave the remaining
    # data in socket until we run recv again.
    transport.setopts(socket, [active: :once])

    # before to proceed with receive block on messages we should call
    # once transport.messages() to ping ranch
    {ok, closed, error} = transport.messages()

    receive do
      {ok, socket, data} ->
        Logger.debug("[tcp.handler] received data: #{inspect(data)}")

        acc <> data
        |> String.split("\n")
        |> Enum.map(&(String.trim(&1)))
        |> _process(socket, transport)

        loop(socket, transport, "")
      {closed, socket} ->
        Logger.debug("[tcp.handler] closed socket: #{inspect(socket)}")
      {error, socket, reason} ->
        Logger.error("[tcp.handler] socket: #{inspect(socket)}, closed becaose of the error reason: #{inspect(reason)}")
      {:error, error} ->
        Logger.error("[tcp.handler] error: #{inspect(error)}")
      {'EXIT', parent, reason} ->
        Logger.error("[tcp.handler] exit parent reason: #{inspect(reason)}")
        Process.exit(self(), :kill)
      message ->
        Logger.debug("[tcp.handler] message on receive block: #{inspect(message)}")
    end
  end

  defp _kill(), do: Process.exit(self(), :kill)

  defp _process([], socket, transport), do: loop(socket, transport, "")
  defp _process([""], socket, transport), do: loop(socket, transport, "")
  defp _process([line, ""], socket, transport) do
    _protocol(line, socket, transport)
    loop(socket, transport, "")
  end
  defp _process([line], socket, transport), do: loop(socket, transport, line)
  defp _process([line | lines], socket, transport) do
    _protocol(line, socket, transport)
    _process(lines, socket, transport)
  end

  defp _protocol(line, socket, transport) do
    Logger.debug("[_protocol] line: #{line}")

    case line |> Web.Tcp.Protocol.process do
      {:verified, api_key} ->
        _register_socket(api_key, socket, transport)

        Web.Tcp.ClientSupervisor.start_client(api_key)

        Logger.debug("[tcp.server] transport should respond with OK")

        case transport.send(socket, "OK\n") do
          {:error, reason} ->
            Logger.error(inspect(reason))
          _ ->
        end
      {:error, reason} ->
        Logger.error("[tcp] #{inspect(reason)}")
      :error ->
        Logger.error("error on processing: #{inspect(line)}")
      _ ->
    end
  end

  def _register_socket(api_key, socket, transport) do
    Logger.debug("[tcp.handler] _register_socket token: #{api_key}")

    case Registry.register(Registry.Sockets, api_key, {socket, transport}) do
      {:error, {:already_registered, _pid}} ->
        Registry.update_value(Registry.Sockets, api_key, fn (_) -> {socket, transport} end)
      {:error, reason} ->
        Logger.error("[tcp] reason: #{inspect(reason)}")
      _ ->
    end
  end
end
Enter fullscreen mode Exit fullscreen mode
  • Web.Tcp.Protocol defines the parser and handler of possible commands that's passing via tcp client.

Example:

l:logs:<base64 json messages>
i:vars:<base64 json variables>
v:api_key - client should pass api key to verify that we have the registered client
Enter fullscreen mode Exit fullscreen mode
defmodule Web.Tcp.Protocol do
  require Logger
  require Poison

  alias Web.Gateway
  alias Web.Db.Users

  @moduledoc """
    Server messages:

      - `l:logs`
        logs - should come as json array and encoded base64

      - `i:vars`
        vars - should come as json dictionary and encoded by base64

      - `v:api_key`
        api_key - should verify key using our registry

    Client messages:
      - `i:s:name:value` - var set by name value inside of app
  """
  def process("l:" <> <<api_key :: bytes-size(12)>> <> ":" <> logs) do
    Logger.debug("[protocol] api_key: #{api_key}, logs: #{inspect(logs)}")
    logs
    |> Base.decode64!
    |> Poison.decode!
    |> Gateway.logs(api_key)
  end

  def process("i:" <> <<api_key :: bytes-size(12)>> <> ":" <> vars) do
    Logger.debug("[protocol] api_key: #{api_key}, vars: #{inspect(vars)}")
    vars
    |> Base.decode64!
    |> Poison.decode!
    |> Gateway.vars(api_key)
  end

  def process("v:" <> <<api_key :: bytes-size(12)>>) do
    if Users.verify_key(api_key) do
      {:verified, api_key}
    else
      {:error, "not registered user with api_key: #{api_key}"}
    end
  end

  def process(_), do: :error
end
Enter fullscreen mode Exit fullscreen mode
  • finally the test case for review the basic expected behaviour tcp_test.ex:
defmodule Web.TcpTest do
  use ExUnit.Case
  doctest Web.Tcp

  alias Web.Db.Users

  require RedisPoolex
  require Logger
  require Poison
  require Tirexs.Query

  alias RedisPoolex, as: Redis

  setup do
    Redis.query(["FLUSHDB"])

    {:ok, api_key} = Users.register(%{email: "user1@example.com", password: "12345678"})

    port = Application.get_env(:web, :tcp_port)
    host = "127.0.0.1" |> String.to_char_list

    {:ok, socket} = :gen_tcp.connect(host, port, [active: false])
    {:ok, socket: socket, api_key: api_key}
  end

  test "should register user session", %{socket: socket, api_key: api_key} do
    :ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
    {:ok, reply} = :gen_tcp.recv(socket, 0, 1000)
    assert reply == 'OK'
  end

  test "should bulk insert logs on tcp request", %{socket: socket, api_key: api_key} do
    :ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
    :ok = :gen_tcp.send(socket, "l:" <> api_key <> ":" <> ([%{message: "testing1", timestamp: 123123123}, %{message: "testing2", timestamp: 123123123}] |> Poison.encode! |> Base.encode64) <> "\n")

    :timer.sleep(2000)

    assert {:ok, 200, %{hits: %{hits: hits}} = response} = Tirexs.Query.create_resource([index: "logs-#{api_key}", search: ""])
    assert Enum.count(hits) == 2
  end

  # TODO: add marker support to create separate sessions on multiple devices.
  # we could have separate dashboards for the different devices.
  test "should store vars on tcp request", %{socket: socket, api_key: api_key} do
    :ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
    :ok = :gen_tcp.send(socket, "i:" <> api_key <> ":" <> ([%{name: "testing1", type: "string", value: "example"}, %{name: "testing2", type: "integer", value: "-1"}] |> Poison.encode! |> Base.encode64) <> "\n")

    :timer.sleep(2000)

    vars = Redis.query ["HKEYS", "#{api_key}:vs"]
    assert Enum.count(vars) == 4
    assert vars == ["testing1:type", "testing1:value", "testing2:type", "testing2:value"]
  end
end
Enter fullscreen mode Exit fullscreen mode

In the next examples I will share the module for Pubsub and Gateway implementation, it would be pretty easy to review.

Everything was done for fun, in case of usage the same approaches it should be done very carefully, don't practice elixir daily and as result it could be wrong direction in some places and could be improved.

Thank you for reading!

Top comments (1)

Collapse
 
willricketts profile image
Will Ricketts

This is actually mega helpful for what I'm working on. Great writeup and thanks for taking the time to do it <3