loading...
gumi TECH Blog

MixとOTP 05: ダイナミックスーパーバイザー

gumitech profile image gumi TECH Updated on ・3 min read

本稿はElixir公式サイトの許諾を得て「Dynamic supervisors」の解説にもとづき、加筆補正を加えて、ElixirにおけるDynamicSupervisorの使い方についてご説明します。

MixとOTP 03: GenServer」「モニターとリンク」では、モジュールのhandle_cast/2がプロセスのリンクとモニターをともに実装しているのは適切でないと指摘しました。

{:ok, pid} = KV.Bucket.start_link([])
ref = Process.monitor(pid)

リンクは双方向です。つまり、プロセスがクラッシュすると、それを登録したプロセスも落ちてしまうということです。もっとも、スーパーバイザーが加わりました。登録のプロセスを回復して再起動することはできます。けれども、登録のプロセスがクラッシュすると、そこに登録されたプロセスそれぞれの名前に関連づけたデータはすべて失われてしまうのです。

ですから、登録プロセスは、加えたプロセスが落ちても、動かし続けなければならないということです。test/kv/registry_test.exsに以下のようなテストを書き加えましょう。

プロセスを止めるテストはすでにひとつありました("removes buckets on exit")。このテストが異なるのは、Agent.stop/3の第2引数の理由に正常(デフォルト値:normal)でない:shutdownを渡したことです。プロセスが正常な理由で終わらなかったとき、リンクされたすべてのプロセスはEXIT信号を受け取ります。すると、リンクされたプロセスも、それを避ける処理が加えられていないかぎり、終了することになるのです。

test "removes bucket on crash", %{registry: registry} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

  # プロセスを正常でない理由で止める
  Agent.stop(bucket, :shutdown)
  assert KV.Registry.lookup(registry, "shopping") == :error
end  

このテストは失敗します。エラーメッセージに示されたとおり、KV.Registry.lookup/2からGenServer.call/3を呼びにいったところで、登録のプロセスが失われてしまったためです。

$ mix test test/kv/registry_test.exs
..

  1) test removes bucket on crash (KV.RegistryTest)
     test/kv/registry_test.exs:26
     ** (exit) exited in: GenServer.call(#PID<0.153.0>, {:lookup, "shopping"}, 5000)
         ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
     code: assert KV.Registry.lookup(registry, "shopping") == :error
     stacktrace:
       (elixir) lib/gen_server.ex:924: GenServer.call/3
       test/kv/registry_test.exs:32: (test)



Finished in 0.03 seconds
3 tests, 1 failure

Randomized with seed 221887

この問題を解決するために、新しいスーパーバイザーによりプロセスの生成や監視を行います。これまで用いたスーパーバイザーと異なり、子のプロセスを予め定めなくてよく、動的につくって開始できるのです。こうしたときに使うのがDynamicSupervisorで、初期化のとき子プロセスのリストは要りません。その代わり、子プロセスはそれぞれDynamicSupervisor.start_child/2で始めることになります。

DynamicSupervisorを使う

lib/kv/supervisor.exKV.SupervisorモジュールでDynamicSupervisorを用います。init/1で定める子プロセスのリストに、つぎのようにKV.BucketSupervisorという名前で加えてください。

def init(:ok) do
  children = [
    {KV.Registry, name: KV.Registry},
    {DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one}  # 追加
  ]

  Supervisor.init(children, strategy: :one_for_one)
end

Supervisorと異なり、DynamicSupervisoruse/2でモジュールを呼び出しません。監視ツリーに加えることにより直接開始したのです。初期化のときに子のプロセスを予め与えておかなくて済みます。

iex -S mixDynamicSupervisorを試してみましょう。DynamicSupervisor.start_child/2のふたつの引数には、スーパーバイザーの名前と開始する子プロセスの仕様を渡します。

iex> {:ok, bucket} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
{:ok, #PID<0.144.0>}
iex> KV.Bucket.put(bucket, "eggs", 3)
:ok
iex> KV.Bucket.get(bucket, "eggs")
3

lib/kv/registry.exの登録のモジュールKV.Registryhandle_cast/2につぎのような手を加えます。これで、DynamicSupervisorが使えるようになるのです。

def handle_cast({:create, name}, {names, refs}) do
  if Map.has_key?(names, name) do
    {:noreply, {names, refs}}
  else
    # {:ok, pid} = KV.Bucket.start_link([])
    {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
    ref = Process.monitor(pid)
    refs = Map.put(refs, ref, name)
    names = Map.put(names, name, pid)
    {:noreply, {names, refs}}
  end
end

これでテストはとおります。けれど、アプリケーションのリソースにリークが生じてしまうのです。監視対象プロセスが終了すると、スーパーバイザーは替わりの新たなプロセスを始めます。これがまさにスーパーバイザーの役割です。

ただし、スーパーバイザーが新しいプロセスを再起動しても、登録のプロセスにはわかりません。スーパーバイザーに、誰も参照できない空のプロセスができしまいます。これを防ぐには、プロセスは一時的なものであると定めるのです。すると、終了したプロセスは、理由を問わずもはや開始されません。そのために、 KV.Bucketuse Agentにはつぎのようにオプションrestart: :temporaryを加えてください。

defmodule KV.Bucket do
  # use Agent
  use Agent, restart: :temporary

end

プロセスが:temporaryであることは、test/kv/bucket_test.exsにつぎのテストを加えて確かめます。Supervisor.child_spec/2関数で子の仕様をモジュールから得て、restartの値が:temporaryであるかどうかassertで調べるのです。

test "are temporary workers" do
  assert Supervisor.child_spec(KV.Bucket, []).restart == :temporary
end
$ mix test test/kv/bucket_test.exs
..

Finished in 0.04 seconds
2 tests, 0 failures

Randomized with seed 83872

ここで、子のプロセスを再起動しないなら、なぜスーパーバイザーを使うのかと疑問に思うかもしれません。スーパーバイザーの役割は再起動だけではないのです。とくに監視ツリーがクラッシュしたときなど、起動や終了が適切に行われるようにも定められます。

監視ツリー

KV.BucketSupervisorKV.Supervisorに加えると、スーパーバイザーを監視するスーパーバイザーができ上がります。いわゆる「監視ツリー」がつくられたのです。

スーパーバイザーに新たな子のプロセスを加えるたびに、スーパーバイザーの戦略や子プロセスの順序が正しいかどうか評価することが大切です。今回の例では、戦略は:one_for_oneで、KV.RegistryKV.BucketSupervisorより先に開始されています。

まず気づく問題はプロセスの順序です。KV.RegistryKV.BucketSupervisorを呼び出します。すると、KV.BucketSupervisorKV.Registryより先に開始しなければなりません。そうしないと、スーパーバイザーがまだ起動しないうちに、登録のプロセスがアクセスしようとしてしまうかもしれないからです。

つぎに、監視戦略も検討すべきです。KV.Registryが落ちると、KV.Bucketプロセスの名前に紐づけた情報もすべて失われます。したがって、KV.BucketSupervisorとすべての子プロセスも終了しなければなりません。そうしないと、親のない子プロセスができてしまいます。

そう考えると、監視戦略は見直さなければなりません。ほかに選ぶとすれば、:one_for_all:rest_for_oneのいずれかです。:rest_for_oneを用いるスーパーバイザーは、クラッシュした子のあとに開始された子プロセスを強制終了して再起動します。この場合、KV.Registryが終了すると、KV.BucketSupervisorも終了させることになります。これは、スーパーバイザーを登録プロセスよりあとに配置しようとすることです。前述の順序の決め方に反します。

残る戦略は:one_for_allです。スーパーバイザーは子プロセスがひとつでも落ちたら、すべての子を終了して再起動します。今回のアプリケーションにもっとも適した戦略でしょう。登録プロセスはスーパーバイザーなしには動作できず、スーパーバイザーは登録プロセスなしに終了することになるからです。KV.Supervisorinit/1に実装したプロセスの順序と戦略を改めましょう。

def init(:ok) do
  children = [
    {DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one},
    {KV.Registry, name: KV.Registry}  # 順序変更
  ]
  # Supervisor.init(children, strategy: :one_for_one)
  Supervisor.init(children, strategy: :one_for_all)
end

テストにおける状態の共有

これまで、登録のプロセスはテストごとにひとつ開始しました。他の影響を受けないためです。

setup do
  registry = start_supervised!(KV.Registry)
  %{registry: registry}
end

けれど、登録プロセスがKV.BucketSupervisorを用いるようになりました。スーパーバイザーはグローバルです。すると、テストごとにそれぞれの登録プロセスをもったとしても、テストは共有のスーパーバイザーに依存することになります。それでよいのでしょうか。

答えは場合によります。共有の状態に依存していても、共有されない部分を使っているかぎり問題はありません。複数の登録プロセスが共有のスーパーバイザー上でプロセスを開始しても、それらのプロセスや登録先プロセスが互いに分離されていればよいのです。同時実行の問題は、Supervisor.count_children/1のような関数をKV.BucketSupervisorで用いた場合に起こります。すべての登録プロセスの子プロセスを数えて、テストを並行で走らせたとき結果が違ってしまう可能性があるからです。

これまでは、スーパーバイザーの共有されていない部分にしか依存してきませんでした。したがって、一連のテストで同時実行の問題は気にしなくて構いません。問題になったときは、テストごとにスーパーバイザーを起動し、状態は登録プロセスのstart_link関数に引数として渡せます。

オブザーバー

監視ツリーを定めましたので、Erlangに同梱されているオブザーバーツールで見てみましょう。iex -S mixで開いたiexシェルで:observer.start/0を呼び出してください。

iex> :observer.start

システムについてのさまざまな情報を示すGUIが開きます。タブを切り替えて見られるのは、一般的な統計値からロードグラフ(load charts)、さらに実行中のすべてのプロセスとアプリケーションのリストなどです。

[Applications]タブには、今システムで動いているすべてのアプリケーションが監視ツリーに沿って示されます(図001)。左側のリストからアプリケーションが選べます。

図001■[Applications]に示された監視ツリー

mix_otp_05_001.png

さらに、iexシェルで新しいプロセスをつくってみましょう。すると、オブザーバーの[Applications]でも監視ツリーに新たなプロセスが加わっているはずです(図002)。

iex> KV.Registry.create(KV.Registry, "shopping")
:ok

図002■[Applications]の監視ツリーに加わった新たなプロセス

mix_otp_05_002.png

監視ツリーのプロセスをどれかダブルクリックすると、そのプロセスのさらに詳しい情報が示されます。あるいは、右クリックして[Kill process]を選べば終了の信号が送れるのです(図003)。障害をエミュレートして、スーパーバイザーの反応が期待したとおりかどうか確かめられます。

図003■[Kill process]で障害をエミュレートする

mix_otp_05_003.png

監視ツリーのもとで開始したプロセスは、オブザーバーで参照してイントロスペクションできます。たとえプロセスが:temporaryであっても、スーパーバイザーを使う理由のひとつです。

MixとOTPもくじ

Posted on by:

gumitech profile

gumi TECH

@gumitech

gumi TECH は、株式会社gumiのエンジニアによる技術記事公開やDrinkupイベントなどの技術者交流を行うアカウントです。 gumi TECH Blog: http://dev.to/gumi / gumi TECH Drinkup: http://gumitech.connpass.com

gumi TECH Blog

株式会社gumiのエンジニアによる技術記事を公開しています。

Discussion

pic
Editor guide