loading...
gumi TECH Blog

MixとOTP 10: 分散処理のタスクと設定

gumitech profile image gumi TECH ・6 min read

本稿はElixir公式サイトの許諾を得て「Distributed tasks and configuration」の解説にもとづき、加筆補正を加えて、タスクの分散処理とアプリケーションの設定についてご説明します。

今回は、kvアプリケーションにルーティングのレイヤーを加えます。リクエストをプロセス名にもとづいてノード間でやり取りできるようにするのです。ルーティングレイヤーは、つぎのような形式でルーティングテーブルを受け取ります。

[
  {?a..?m, :"foo@computer-name"},
  {?n..?z, :"bar@computer-name"}
]

ルータはプロセス名の最初のバイトをテーブルで確かめ、それにもとづいて適切なノードに配信します。たとえば、文字"a"で始まるプロセスは、ノードfoo@computer-nameに送られます。なお、?aは文字"a"のUnicodeコードポイントです(「Elixir入門 06: バイナリと文字列および文字リスト」「UTF-8とUnicode」参照)。

マッチングするエントリーがリクエストを評価しているノードに当たるときは、ルーティングはせず、そのノードが要求された操作を実行します。マッチングするエントリーが別のノードを示すときは、要求をそのノードに渡します。ノードは自身のルーティングテーブル(はじめのノードとは異なるでしょう)を見て、それに応じて動作します。マッチングするエントリーがなければエラーです。

ルーティングテーブルで見つかったノードに、要求された操作を直に実行はさせません。ルーティングの要求をノードに渡して処理するのです。上記のような単純なルーティングテーブルであれば、すべてのノードで共有してもよいでしょう。けれど、ルーティングの要求を渡すかたちにすれば、アプリケーションが拡大したとき、ルーティングテーブルが簡単に小さく分けられます。ある程度大きくなったら、foo@computer-nameはプロセスに要求をルーティングさせる役割だけを担い、要求を扱うプロセスが他のノードに送られることになるでしょう。そうすれば、bar@computer-nameは変更については知らずに済みます。

本稿では、ひとつのマシンでふたつのノードを使います。同じネットワークで、複数のマシンを使うことも可能です。その場合には準備が要ります。第1に、すべてのマシンにファイル~/.erlang.cookieがあり、その値はまったく同じでなければなりません。第2に、epmdがブロックされていないポートで動いていることを確かめます(epmd -dでデバッグ情報が得られます)。さらに詳しくは、「Learn You Some Erlang for Great Good!」の「Distribunomicon」の章をお読みください。

分散処理のコードを書く

Elixirには、ノードに接続して互いに情報をやり取りする機能が備わっています。実際、分散環境でプロセスやメッセージの送受信には同じ考え方が用いられます。Elixirのプロセスは、場所を問わないからです。つまり、メッセージを送るとき、受け手のプロセスが同じノードかどうかは問いません。VMはどちらであってもメッセージを届けます。

分散処理のコードを実行するには、VMに名前をつけて起動しなければなりません。名前は短く(同じネットワークの場合)も長く(完全なコンピュータアドレス)もできます。IExをつぎのように開始してください。

$ iex --sname foo

プロンプトの表示が少し変わります。ノード名のあとに@つきで示されるのはコンピュータ名です。

Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] [dtrace]

Interactive Elixir (1.7.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(foo@computer-name)1>

シェルでモジュールをつぎのように定めます。

iex> defmodule Hello do
...>   def world, do: IO.puts "hello world"
...> end
{:module, Hello,
 <<70, 79, 82, 49, 0, 0, 4, 60, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 0, 140,
   0, 0, 0, 15, 12, 69, 108, 105, 120, 105, 114, 46, 72, 101, 108, 108, 111, 8,
   95, 95, 105, 110, 102, 111, 95, 95, 7, ...>>, {:world, 0}}

同じネットワークの別のコンピュータにErlangとElixirがインストールされていれば、別のシェルを起動できます。そうでない場合は、IExのセッションを別のターミナルから開けばよいでしょう。どちらでも、つぎのように短い別の名前をつけます。

$ iex --sname bar

新たな別セッションからは、先ほど定めたモジュールの関数は呼び出せません。

iex(bar@computer-name)> Hello.world
** (UndefinedFunctionError) function Hello.world/0 is undefined (module Hello is not available)
    Hello.world()

けれど、Node.spawn_link/2を用いて、bar@computer-nameからfoo@computer-nameに新たなプロセスがつくれます(ノード名の中のcomputer-nameはプロンプトに示された名前に置き替えてください)。

iex(bar@computer-name)> Node.spawn_link :"foo@computer-name", fn -> Hello.world end             
hello world
#PID<10577.117.0>

Elixirは別ノードにプロセスをつくり、そのPIDが返されました。そして、コードは関数が定められている別ノードで実行され、関数は呼び出されたのです。関数の出力が示されたのは、現在のノードで、別ノードではないことにご注目ください。つまり、メッセージは別ノードから送り返され、それが現ノードに出力されたということです。これは別ノードにつくられたプロセスが、現ノードと同じグループリーダーをもつからです(「Elixir入門 12: 入出力とファイルシステム」「プロセスとグループリーダー」参照)。

Node.spawn_link/2が返すPIDを使えば、メッセージを送って、さらに受け取れます。

iex(bar@computer-name)> pid = Node.spawn_link :"foo@computer-name", fn ->
...(bar@computer-name)>   receive do
...(bar@computer-name)>     {:ping, client} -> send client, :pong
...(bar@computer-name)>   end
...(bar@computer-name)> end
#PID<10577.119.0>
iex(bar@computer-name)> send pid, {:ping, self()}
{:ping, #PID<0.106.0>}
iex(bar@computer-name)> flush()
:pong
:ok

分散処理を行うには、処理のたびにNode.spawn_link/2でリモートノードにプロセスを生成すればよいということがわかりました。けれども、監視ツリーの外にプロセスをつくるのは、できるだけ避けるべきです。今回の実装でNode.spawn_link/2を用いるより望ましいやり方は3つ考えられます。

  1. Erlangの:rpcモジュールを使うと、リモートノードの関数が実行できます。たとえば、シェルから:rpc.call(:"foo@computer-name", Hello, :world, [])とすれば、別ノードの関数Hello.world/0が呼び出せます。
  2. GenServerのAPIにより他のノードでサーバーを起ち上げれば、要求が送れます。たとえば、GenServer.call({name, node}, arg)によりリモートノードのサーバーが呼び出せます。第1引数はリモートプロセスのPIDにしても構いません。
  3. Taskを用いて、ローカルとリモートの両ノードに生成することもできます(「MixとOTP 08: タスクとgen_tcp」参照)。

:rpcGenServerを使うと、要求はひとつのサーバーにシリアライズされます。他方、Taskはリモートノードで効率的に非同期処理されます。シリアライズされるポイントは、スーパーバイザーによる生成だけです。今回のルーティングレイヤーでは、Taskを用いることにします。けれど、他のやり方でも問題はありません。

async/await

これまでは、開始したTaskは単独で実行しました。また、戻り値も確かめていません。しかし、タスクで値を処理し、あとで結果を見ると役立つ場合があります。そのために、Taskに備わっているのがasync/awaitバターンです。

async/awaitは、値を同時に処理するシンプルな仕組みです。それだけでなく、async/awaitを同じTask.Supervisorで使うこともできます(「MixとOTP 08: タスクとgen_tcp」「タスクスーパーバイザー」参照)。Task.Supervisor.start_child/3の替わりにTask.Supervisor.async/3を呼び出して、あとからTask.await/2で結果を読み取るだけです。

task = Task.async(fn -> compute_something_expensive end)
res  = compute_something_else()
res + Task.await(task)

タスクの分散処理

タスクの分散処理は、タスクの監視と基本的に変わりません。違いは、スーパーバイザーにタスクをつくるとき、ノード名を渡すことです。:kvアプリケーションのlib/kv/supervisor.exを開いて、init/1childrenリストの最後につぎのようにTask.Supervisorを加えてください。

def init(:ok) do
  children = [
    {DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one},
    {KV.Registry, name: KV.Registry},
    {Task.Supervisor, name: KV.RouterTasks}  # 追加
  ]
  Supervisor.init(children, strategy: :one_for_all)
end

改めて、名前つきのノードを起動します。ただし、:kvアプリケーションのディレクトリから開いてください。

$ cd apps/kv
$ iex --sname foo -S mix

もうひとつのノードについても同様です。

$ iex --sname bar -S mix

これで、スーパーバイザーによりひとつのノードから、もうひとつのノードに直接タスクがつくれます。つぎの分散処理タスクは、タスクが実行されているノード名を取得します。

iex(bar@computer-name)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@computer-name"}, fn ->
...(bar@computer-name)>   {:ok, node()}
...(bar@computer-name)> end
%Task{
  owner: #PID<0.140.0>,
  pid: #PID<15195.164.0>,
  ref: #Reference<0.1065362951.4240965635.67142>
}
iex(bar@computer-name)> Task.await(task)
{:ok, :"foo@computer-name"}

上のコードではTask.Supervisor.async/3に無名関数を与えました。けれど、分散処理ではモジュールと関数および引数を明示的に与える方が望ましいです。無名関数では、ターゲットノードが呼び出しもととまったく同じコードバージョンをもたなければなりません。Task.Supervisor.async/4を用いれば、引数に渡したモジュールにアリティの合致する関数があればよいので、より堅牢といえます。

iex(bar@computer-name)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@computer-name"}, Kernel, :node, []   
%Task{
  owner: #PID<0.140.0>,
  pid: #PID<15195.165.0>,
  ref: #Reference<0.1065362951.4240965635.67197>
}
iex(bar@computer-name)> Task.await(task)
:"foo@computer-name"

ルーティングレイヤー

ファイルlib/kv/router.exにルーターのモジュールKV.Routerをつぎのように定めます。なお、computer-nameはローカルマシン名に書き替えてください。

defmodule KV.Router do
  @doc """
  与えられた`mod`の`fun`に`args`が渡された要求を
  プロセス`bucket`にもとづいて適切なノードに送る。
  """
  def route(bucket, mod, fun, args) do
    # バイナリの最初のバイトを得る
    first = :binary.first(bucket)

    # table()からエントリーを探してなければエラー
    entry =
      Enum.find(table(), fn {enum, _node} ->
        first in enum
      end) || no_entry_error(bucket)

    # エントリーが現ノードの場合
    if elem(entry, 1) == node() do
      apply(mod, fun, args)
    else
      {KV.RouterTasks, elem(entry, 1)}
      |> Task.Supervisor.async(KV.Router, :route, [bucket, mod, fun, args])
      |> Task.await()
    end
  end

  defp no_entry_error(bucket) do
    raise "could not find entry for #{inspect bucket} in table #{inspect table()}"
  end

  @doc """
  ルーティングテーブル
  """
  def table do
    # computer-nameはローカルマシン名に置き替える
    [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
  end
end

ルーターの動作をテストで確かめましょう。test/kv/router_test.exsに、つぎのふたつのテストを書きます。

defmodule KV.RouterTest do
  use ExUnit.Case, async: true

  test "route requests across nodes" do
    assert KV.Router.route("hello", Kernel, :node, []) ==
           :"foo@computer-name"
    assert KV.Router.route("world", Kernel, :node, []) ==
           :"bar@computer-name"
  end

  test "raises on unknown entries" do
    assert_raise RuntimeError, ~r/could not find entry/, fn ->
      KV.Router.route(<<0>>, Kernel, :node, [])
    end
  end
end

はじめのテストでは、Kernel.node/0の呼び出しにより、実行ノード名を得ます。プロセス名として“hello”と“world”を渡しているので、ルーティングテーブルからそれぞれfoo@computer-namebar@computer-nameが返されるでしょう。

ふたつ目のテストは、知らないエントリに対してエラーが起こるかどうか確かめます。

はじめのテストを走らせるには、ノードがふたつ実行されていなければなりません。ディレクトリapps/kvに移って、テストに使われるもうひとつのノードをつぎのように起動してください。

$ iex --sname bar -S mix

そして、もとのノードから、つぎのようにテストを実行すると、正しくとおるはずです。

$ elixir --sname foo -S mix test

テストのフィルタとタグ

前項のテストはとおりました。けれど、テストの構成はもっと込み入ってくるでしょう。実際今も、mix testで試すと、失敗してしまいます。他のノードへの接続を求めるテストが含まれているからです。

1) test route requests across nodes (KV.RouterTest)
    test/kv/router_test.exs:4
    ** (exit) exited in: GenServer.call({KV.RouterTasks, :"foo@computer-name"}, {:start_task, [#PID<0.169.0>, :monitor, {:nonode@nohost, #PID<0.169.0>}, {KV.Router,:route, ["hello", Kernel, :node, []]}], :temporary, nil}, :infinity)
        ** (EXIT) no connection to foo@computer-name
    code: assert KV.Router.route("hello", Kernel, :node, []) ==
    stacktrace:
      (elixir) lib/gen_server.ex:924: GenServer.call/3
      (elixir) lib/task/supervisor.ex:377: Task.Supervisor.async/6
      (kv) lib/kv/router.ex:21: KV.Router.route/4
      test/kv/router_test.exs:5: (test)

幸いにも、ExUnitにはテストにタグづけする機能が備わっています。特定のコールバックだけ実行したり、タグにもとづいてフィルタリングすることもできるのです。なお、:capture_logのように予めExUnitに定められているタグもあります(「MixとOTP 09: DocTestとwithのパターンマッチング」「コマンドを実行する」参照)。

そこで、test/kv/router_test.exsにつぎのようにタグづけしましょう。@tag :distributed@tag distributed: trueと書くのと同じです。

@tag :distributed  # 追加
test "route requests across nodes" do
  assert KV.Router.route("hello", Kernel, :node, []) ==
          :"foo@computer-name"
  assert KV.Router.route("world", Kernel, :node, []) ==
          :"bar@computer-name"
end

テストに正しくタグづけしてあれば、ネットワークにノードがあるかどうかNode.alive?/0でテストのときに確かめればよいでしょう。なければ、分散処理のテストを省きます。そのために、:kvアプリケーション(apps/kv)のtest/test_helper.exsにつぎのように書き加えてください。

exclude =
  if Node.alive?, do: [], else: [distributed: true]  # 追加

# ExUnit.start()
ExUnit.start(exclude: exclude)

これで、mix testが失敗なく走るようになります。ExUnitが分散処理のテストひとつを省くからです。

$ mix test
Excluding tags: [distributed: true]

......

Finished in 2.0 seconds
7 tests, 0 failures, 1 excluded

ノードbar@computer-nameが使えるときは、つぎのようにテストすると分散処理も含めてとおるでしょう。

$ elixir --sname foo -S mix test
.......

Finished in 2.0 seconds
7 tests, 0 failures

mix testコマンドには、タグを動的に含めたり、除いたりすることができます。たとえば、mix test --include distributedと入力すれば、test/test_helper.exsの設定にかかわらず、分散処理のテストが含められるのです。あるいは、コマンドラインに--excludeを渡すと、特定のタグが除けます。さらに、つぎのように--onlyを用いて、特定タグのテストだけ実行することもできます。

$ elixir --sname foo -S mix test --only distributed
Including tags: [:distributed]
Excluding tags: [:test]

.

Finished in 0.06 seconds
7 tests, 0 failures, 6 excluded

フィルタについて詳しくは、ExUnit.Caseモジュールの「Filters」をお読みください。

アプリケーション環境と設定

ルーティングテーブルはKV.Routerにつぎのように直打ちしてありました。このテーブルを動的にしましょう。そうすれば、開発とテストあるいはプロダクションの設定だけでなく、異なるエントリで動いている異なるノードをルーティングテーブルに加えることもできるようになるのです。OTPにはまさにその機能があります。それがアプリケーション環境です。

def table do
  # computer-nameはローカルマシン名に置き替える
  [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
end

環境はアプリケーションごとにあり、固有の設定がキーに与えられます。たとえば、ルーティングテーブルを:kvアプリケーション環境に納めることができます。これがデフォルト値となり、他のアプリケーションは必要に応じてテーブルが変えられるのです。

apps/kv/mix.exsを開いて、関数application/0の戻り値につぎのように書き加えてください。アプリケーションに与えたのはキー:envです。これがアプリケーションのデフォルト環境となります。エントリーがキー:routing_tableで、値は空のリストです。これで、アプリケーション環境は出荷時に空のテーブルとなります。使われるテーブルは、テストと開発の構成によって変わります。

def application do
  [
    extra_applications: [:logger],
    env: [routing_table: []],  # 追加
    mod: {KV, []}
  ]
end

アプリケーション環境をコードで使うには、KV.Router.table/0の定めをつぎのように書き替えてください。Application.fetch_env!/2は、:routing_table:kvの環境の中のエントリーを読み込みます。アプリケーション環境の操作について詳しくは、「Application behaviour」をご参照ください。

def table do
  # [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
  Application.fetch_env!(:kv, :routing_table)
end

ルーティングテーブルが空になったので、分散処理のテストは失敗するはずです。改めてテストを実行して確かめましょう。

$ iex --sname bar -S mix
$ elixir --sname foo -S mix test --only distributed
1) test route requests across nodes (KV.RouterTest)
    test/kv/router_test.exs:5
    ** (RuntimeError) could not find entry for "hello" in table []
    code: assert KV.Router.route("hello", Kernel, :node, []) ==
    stacktrace:
      (kv) lib/kv/router.ex:27: KV.Router.no_entry_error/1
      (kv) lib/kv/router.ex:14: KV.Router.route/4
      test/kv/router_test.exs:6: (test)

アプリケーション環境のよいところは、現行のアプリケーションだけでなく、アプリケーション全体にも設定できることです。全体の設定は、config/config.exsで行います。たとえば、apps/kv/config/config.exsを開いて、つぎのコードを最後に加えてください。これは、IExのデフォルトプロンプトの定めです。

config :iex, default_prompt: ">>>"
$ iex -S mix
Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] [dtrace]

Compiling 5 files (.ex)
Generated kv app
Interactive Elixir (1.7.3) - press Ctrl+C to exit (type h() ENTER for help)
>>>

同じように、apps/kv/config/config.exsファイルに:routing_tableをつぎのように定めることができます。

# computer-nameはローカルマシン名に置き替える
config :kv, :routing_table, [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]

改めてノードを起動して分散処理をテストすれば、今度はとおるはずです。

:kvアプリケーションはアンブレラプロジェクトの中のひとつでした。Elixir v1.2から、アンブレラアプリケーションはconfig/config.exsの設定を共有します。プロジェクトルートのconfig/config.exsのつぎの記述により、すべての子の設定が読み込まれるからです。

import_config "../apps/*/config/config.exs"

mix runコマンドは--configフラグを加えると、必要に応じた設定ファイルが与えられます。異なるノードを起動して、それぞれに異なる設定ができるのです(たとえば、ルーティングテーブルを変えるなど)。

アプリケーション設定のために組み込まれた機能により、アンブレラアプリケーションをデプロイするときにさまざまなやり方が考えられます。

  • アンブレラアプリケーションをノードにデプロイして、TCPサーバーとキー/値データ保存の両方に使う
  • :kv_serverアプリケーションをデプロイしてTCPサーバーにのみ使い、ルーティングテーブルには他のノードの指定のみさせる
  • :kvアプリケーションだけをデプロイして、ノードはデータ保存にのみ使う(TCPアクセスはしない)

今後アプリケーションが増えても、デプロイの粒度は同じくらいのレベルに保つことができます。プロダクションのとき、どのアプリケーションをどの設定にするか検討してください。

複数のリリースをビルドするときは、Distilleryのようなツールを使うことも考えられるでしょう。アプリケーションと設定を選んで、現行のErlangとElixirも含めてパッケージにできます。そうすれば、ターゲットシステムにランタイムがインストールされていなくても、アプリケーションがデプロイできるのです。

今回のようなキー/値データ保存の分散処理をプロダクションで使うときには、Riakがお勧めです。RiakもErlang VMで動作します。子プロセスは複製され、データを失うことが避けられます。ルーターの替わりにコンシステントハッシュ法を用いて、プロセスはノードにマップされるのです。このりアルゴリズムは、プロセスを保存するノードが新たに加わって、データを移行しなければならなくなっても、その量を抑えるのに役立ちたます。

MixとOTPもくじ

Posted on by:

gumitech profile

gumi TECH

@gumitech

gumi TECH は、株式会社gumiのエンジニアによる技術記事公開やDrinkupイベントなどの技術者交流を行うアカウントです。 gumi TECH Blog: http://dev.to/gumi / gumi TECH Drinkup: http://gumitech.connpass.com

gumi TECH Blog

株式会社gumiのエンジニアによる技術記事を公開しています。

Discussion

pic
Editor guide