loading...
gumi TECH Blog

MixとOTP 08: タスクとgen_tcp

gumitech profile image gumi TECH ・4 min read

本稿はElixir公式サイトの許諾を得て「Task and gen_tcp」の解説にもとづき、加筆補正を加えて、Erlangの:gen_tcpモジュールによりリクエストをどのように処理するかについてご説明します。また、Taskモジュールの使い方もご紹介します。

エコーサーバー

まずはエコーサーバーを実装して、TCPサーバーを起ち上げましょう。リクエストで受け取ったテキストを、レスポンスで送ります。少しずつ改善を重ねて目指すのは、監視を受けながら複数の接続が扱えるサーバーです。

TCPサーバーは、大きくつぎの手順を踏みます。

  1. ポートが使えるまで待ってソケットを保持します。
  2. そのポートにクライアントが接続するのを待って受け入れます。
  3. クライアントのリクエストを読み込んで、返すレスポンスを書き込みます。

これらの手順を実装しましょう。apps/kv_serverアプリケーションに移って、lib/kv_server.exを開きます。モジュールKVServerをつぎのように書き替えてください。

require Logger

defmodule KVServer do
  def accept(port) do
    # オプションの機能はつぎのとおり:
    #
    # 1. `:binary` - データをバイナリとして受け取る(リストでなく)
    # 2. `packet: :line` - データを1行ずつ受け取る
    # 3. `active: false` - データが受け取れるようになるまで`:gen_tcp.recv/2`を待たせる
    # 4. `reuseaddr: true` - リスナーが落ちたときアドレスを再利用できるようにする
    #
    {:ok, socket} =
      :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info("Accepting connections on port #{port}")
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    serve(client)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

KVServerモジュールは、accept/1の呼び出しによりサーバーを起ち上げます。引数はポート番号で、渡す値は4040です。accept/1はまずポートを監視して、ソケットが使えるまで待ちます。つぎに呼び出すのはloop_acceptor/1です。ループしながら、クライアントの接続を待ち受けます。そして、接続があるたび呼び出すのはserve/1です。

serve/1もループ処理です。1行ずつソケットから読み込んでは、書き戻します。serve/1関数がこの処理の流れを、パイプ演算子|>で組み立てていることにご注目ください。パイプ演算子の左辺の値を右辺の関数が第1引数に受け取り、その戻り値はつぎのパイプ演算子によりさらに右辺の関数に渡されます。

socket |> read_line() |> write_line(socket)

通常の引数の書き方では関数の入れ子になる処理が、パイプ演算子を用いるとわかりやすく書けるのです。

write_line(read_line(socket), socket)

read_line/1はソケットの読み書きを、:gen_tcpモジュールの関数により実装しています。読み取りが:gen_tcp.recv/2、書き込みは:gen_tcp.send/2です。

serve/1はずっとループして実行され続けます。すると、この関数を呼び出したloop_acceptor/1の本体は末尾呼び出しにたどりつかないので、ループしなくてもよいのではないでしょうか。けれど、あとでserve/1を別のプロセスに分けるため、末尾呼び出しが必要になります。

これでエコーサーバーはできました。kv_serverアプリケーションの中でIExのセッションをiex -S mixで起動してください。そして、KVServer.accept/1をつぎのように呼び出します。

iex> KVServer.accept(4040)

00:00:00.000 [info]  Accepting connections on port 4040

サーバーが起ち上がると接続待ちの状態になります。ここでは、Telnetクライアントを使ってサーバーにアクセスしましょう。コマンドラインツールでつぎのように入力してください。他のクライアントでも、入力するコマンドラインはほぼ同じです。

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

エコーサーバーは入力待ちになります。キー入力して[enter]キーを押せば、そのテキストがつぎの行に表示されるはずです。

hello
hello
world
world

Telnetを終了するには、[control]/[Ctrl] + ]でコマンドモードに切り替え、quitに続けて[enter]を入力してください(終了の仕方はクライアントにより異なります)。Telnetが閉じると、IExのセッションにはつぎのようなエラーが表れるでしょう。:gen_tcp.recv/2がデータを受け取ろうとしたところ、クライアントの接続が閉じてしまったからです。この問題は、あとでサーバーを改定するときに扱うことにします。

** (MatchError) no match of right hand side value: {:error, :closed}
    (kv_server) lib/kv_server.ex:33: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:26: KVServer.serve/1
    (kv_server) lib/kv_server.ex:20: KVServer.loop_acceptor/1

差し当たり、先に直さなければならないのは、TCPアクセプタがクラッシュした場合の問題です。サーバーにはまだ監視プロセスがありません。すると、サーバーが落ちたら再起動されないので、そのあとリクエストは扱えなくなります。ですから、サーバーは監視ツリーに移さなければなりません。

タスク

AgentGenServer、そしてSupervisorは、いずれも複数のメッセージを扱い、状態が管理できました。けれど、やりたいことが簡単なタスクのときは、Taskモジュールが使えます。

たとえば、引数に渡した無名関数を新しいプロセスの中で実行し、監視ツリーに加えるのがstart_link/1関数です。lib/kv_server/application.exを開いて、start/2関数につぎのように書き加えてください。いつもは、子のリストに2要素のタプルを渡します。その代わりに、Task.start_link/1をタスクとして呼び出すのです。

def start(_type, _args) do
  children = [
    {Task, fn -> KVServer.accept(4040) end}  # 追加
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

これで、サーバーが監視ツリーに含められ、アプリケーションとともに起動します。アプリケーションは、mix runコマンドでつぎのように起ち上げてください。

$ mix run --no-halt

00:00:00.000 [info]  Accepting connections on port 4040
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

上のコードでは、ポート番号は直打ちしました。たとえば、つぎのように書き替えると、アプリケーションが起ち上がるとき、ポート番号をシステム環境から読み取ります。

port = String.to_integer(System.get_env("PORT") || "4040")
children = [
  # {Task, fn -> KVServer.accept(4040) end}
  {Task, fn -> KVServer.accept(port) end}
]

ポート番号は、アプリケーションを起動するmix runコマンドにつぎのように変数で加えてください。

$ PORT=4321 mix run --no-halt

前掲ポートの設定にはデフォルト値(4040)を与えました。ですから、ポート番号を省くと、その番号で接続を待ちます。

$ mix run --no-halt

00:00:00.000 [info]  Accepting connections on port 4040

Telnetクライアントを増やすとどうでしょう。ひとつ目のクライアントを接続したまま、もうひとつTelnetクライアントを開いてみてください。ふたつ目のクライアントは、エコーが示されません。

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?

これは、リクエストを同じプロセスで扱っているからです。ひとつのクライアントが接続されると、つながっている間は、別のクライアントは受け入れられません。

タスクスーパーバイザー

サーバーが同時接続を扱えるようにするには、ひとつのプロセスをアクセプタとして、別につくったプロセスにリクエストを処理させなければなりません。ひとつ思いつくのは、KVServer. loop_acceptor/1Task.start_link/1を使うように書き替えることです。これで、アクセプタのプロセスからリンクしたタスクが開始します。

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  # serve(client)
  Task.start_link(fn -> serve(client) end)
  loop_acceptor(socket)
end

けれど、serve(client)のタスクをアクセプタにリンクしてしまうのはいただけません。リクエストの処理が落ちると、アクセプタまで巻き込んで、その結果すべての接続が切れてしまうことになるからです。

そうならないための方策は、プロセスごとにスーパーバイザーを設けることでした(「MixとOTP 05: ダイナミックスーパーバイザー」参照)。暫定のタスクをスーパーバイザーに開始させ、監視ツリーに加えるのです。KVServer.Applicationstart/2関数を改めてつぎのように書き替えましょう。

def start(_type, _args) do
  port = String.to_integer(System.get_env("PORT") || "4040")

  children = [
    {Task.Supervisor, name: KVServer.TaskSupervisor},  # 追加
    {Task, fn -> KVServer.accept(port) end}
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

これで、Task.SupervisorKVServer.TaskSupervisorという名前で起動します。アクセプタはこのスーパーバイザーに依存することにお気をつけください。つまり、スーパーバイザーは先に開始しなければならないのです。そこで、KVServerloop_acceptor/1は、Task.Supervisorがリクエストを扱うように書き替えます。

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  # serve(client)
  {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)  # 追加
  :ok = :gen_tcp.controlling_process(client, pid)  # 追加
  loop_acceptor(socket)
end

:gen_tcp.controlling_process/2の呼び出しに気づいたでしょうか。これは子プロセスをクライアントソケットの「制御プロセス」にします。そうしないと、アクセプタがクラッシュしたとき、すべてのクライアントを落としてしまいます。ソケットは、受けつけられたプロセスに結びついているからです(これはデフォルトの動作です)。

改めてサーバーを起ち上げると、Telnetクライアントが同時にいくつも開けます。ひとつのクライアントを閉じてもアクセプタは落ちないので、他のクライアントの接続は生きているでしょう。

$ mix run --no-halt

00:00:00.000 [info]  Accepting connections on port 4040

スーパーバイザーが加わり、監視戦略は正しく定められました。アクセプタがクラッシュしても、すでにある接続を落とすことはありません。また、タスクスーパーバイザーがクラッシュしたとき、アクセプタを落とす必要もないでしょう。

もうひとつ考えなければならないのは再起動戦略です。タスクはデフォルトでは:restartの値が:temporaryに設定されています。これは再起動しないということです。Task.Supervisorが開始した接続については適切といえます。失敗した接続を再開する意味はないからです。けれど、アクセプタが落ちたら、再起動させるべきでしょう。

ひとつのやり方は、新たに定義するモジュールでuse Taskを使い、restart: :permanentとしてstart_link関数を呼び出すことにより、タスクを再起動することです。けれど、他の開発者のライブラリと統合するときは、エージェント、タスク、およびサーバーの定義方法は変えられません。そこで、子プロセスの仕様を動的にカスタマイズすることにしましょう。そのためには、KVServer.Applicationstart/2Supervisor.child_spec/2を呼び出すように書き替えます。

def start(_type, _args) do
  port = String.to_integer(System.get_env("PORT") || "4040")

  children = [
    {Task.Supervisor, name: KVServer.TaskSupervisor},
    # {Task, fn -> KVServer.accept(port) end}
    Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent)
  ]

  opts = [strategy: :one_for_one, name: KVServer.Supervisor]
  Supervisor.start_link(children, opts)
end

Supervisor.child_spec/2は、モジュールやタプルから子の仕様がつくれます。さらに、子の仕様をオーバーライドすることもできるのです。これで、アクセプタはつねに動作します。そして、一時的なタスクプロセスを、つねに動作しているスーパーバイザーのもとで開始できるのです。以下にKVServer.ApplicationKVServerの定義を掲げましょう。

defmodule KVServer.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    port = String.to_integer(System.get_env("PORT") || "4040")

    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent)
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
require Logger

defmodule KVServer do
  @doc """
  定められた`port`で接続の受け入れを始める。
  """
  def accept(port) do
    {:ok, socket} =
      :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info("Accepting connections on port #{port}")
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
    :ok = :gen_tcp.controlling_process(client, pid)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

MixとOTPもくじ

Posted on by:

gumitech profile

gumi TECH

@gumitech

gumi TECH は、株式会社gumiのエンジニアによる技術記事公開やDrinkupイベントなどの技術者交流を行うアカウントです。 gumi TECH Blog: http://dev.to/gumi / gumi TECH Drinkup: http://gumitech.connpass.com

gumi TECH Blog

株式会社gumiのエンジニアによる技術記事を公開しています。

Discussion

pic
Editor guide