loading...
gumi TECH Blog

Elixir入門 11: プロセス

gumitech profile image gumi TECH Updated on ・6 min read

本稿はElixir公式サイトの許諾を得て「Processes」の解説にもとづき、加筆補正を加えて、Elixirにおけるプロセスのつくり方やプロセス間のメッセージのやり取りの仕方についてご説明します。

Elixirではすべてのコードがプロセスの中で動きます。プロセスは互いに切り離され、並行して働き、メッセージを受け渡して通信します。プロセスはElixirの並行処理の基礎となるだけでなく、分散性と耐障害性(フォールトトレランス)に優れたプログラムの構築に役立つのです。

Elixirのプロセスを、オペレーティングシステムのプロセスと混同しないでください。Elixirのプロセスは、メモリやCPUに対してきわめて軽量です。他の多くのプログラミング言語のスレッドとは異なります。同時に数万から数十万のプロセスを実行することもめずらしくはありません。

spawn

新しいプロセスをつくるには、spawn/1関数の呼び出しが基本となります。引数は別のプロセスで実行する関数で、戻り値はアプリケーションにおける一意のプロセス識別子PIDです(識別番号は環境により変わります)。つぎのコードで生成されたプロセスは、与えられた関数を実行したら終了します。

iex> spawn fn -> 1 + 2 end
#PID<0.87.0>

現在のプロセスのPIDを調べるのはself/0関数です。そのPIDをProcess.alive?/1に渡して呼び出せば、プロセスが動いているかどうか確かめられます。

iex> pid = self()
#PID<0.84.0>
iex> Process.alive?(pid)
true

spawn/3関数を使えば、モジュールの関数からプロセスがつくれます。引数はモジュールと関数、および渡す引数のリストです。なお、第2引数の関数はアトムで与えてください。

defmodule Example do
  def add(a, b) do
    IO.puts(a + b)
  end
end
iex> spawn(Example, :add, [2, 3])
5
#PID<0.90.0>

送信と受信

send/2でメッセージを送ると、receive/1で受け取れます。

送られたメッセージはプロセスメールボックスに納められます。すると、receive/1ブロックが現在のプロセスメールボックスを探し、パターンにマッチしたメッセージを受け取るのです。receive/1にはcase/2と同じように、ガードと複数の句が含められます。

iex> send self(), {:hello, "world"}
{:hello, "world"}
iex> receive do
...>   {:hello, msg} -> msg
...>   {:world, msg} -> "won't match"
...> end
"world"

パターンに合うメッセージがメールボックスになければ、現在のプロセスはマッチするメッセージが届くまで待ちます。この場合、タイムアウトも定められます。すでにメッセージがあると想定されるなら、値は0にしても構いません。なお、数値には桁区切りのためにアンダースコア_を加えることができます。

iex> receive do
...>   {:hello, msg}  -> msg
...> after
...>   1_000 -> "nothing after 1s"
...> end
"nothing after 1s"

ふたつのプロセス間でメッセージを送ってみましょう。つぎの送信のプロセスは、receive/1ブロックが受け取って値を返すと、他の処理はありませんので終了します。なお、inspect/2関数(第2引数のデフォルト[])は、引数のデータを内部的な文字列表現に変え、おもに出力に用いられます。

iex> parent = self()
#PID<0.84.0>
iex> spawn fn -> send(parent, {:hello, self()}) end
#PID<0.93.0>
iex> receive do
...>   {:hello, pid} -> "Got hello from #{inspect pid}"
...> end
"Got hello from #PID<0.93.0>"

シェルでメールボックス内のすべてのメッセージを出力して空にするには、ヘルパー関数flush/0をお使いください。

iex> send self(), :hello
:hello
iex> send self(), :world
:world
iex> flush()
:hello:world
:ok
iex> flush()
:ok

モジュールの関数からつくったプロセスにもメッセージが送れます。ただし、関数がメッセージを受け取って処理をすると終了するため、つぎに送るメッセージは受け取られません。

defmodule Example do
  def listen do
    receive do
      {:hello, msg} -> IO.puts(msg)
    end
  end
end
iex> pid = spawn(Example, :listen, [])
#PID<0.90.0>
iex> send pid, {:hello, "world"}
world
{:hello, "world"}
iex> send pid, {:hello, "tokyo"}
{:hello, "tokyo"}

関数を再帰呼び出しすれば処理は終わらず、何度でもメッセージが送れます。

defmodule Example do
  def listen do
    receive do
      {:hello, msg} -> IO.puts(msg)
    end
    listen  #再帰呼び出し
  end
end
iex> send pid, {:hello, "world"}
world
{:hello, "world"}
iex> send pid, {:hello, "tokyo"}
tokyo
{:hello, "tokyo"}

リンク

Elixirでプロセスをつくるとき、多くの場合リンクさせます。リンクしたプロセスを試す前に、spawn/1のプロセスが失敗したときどうなるかみておきましょう。raise/1RuntimeErrorの例外を起こしても、エラーが記録されるだけで、親プロセスは動いています。それは、プロセスが互いに切り離されているからです。

iex> spawn fn -> raise "oops" end#PID
<0.86.0>
iex>
[error] Process #PID<0.86.0> raised an exception
** (RuntimeError) oops
    (stdlib) erl_eval.erl:668: :erl_eval.do_apply/6

あるプロセスの失敗を他に伝えるには、それらをリンクしなければならないのです。そのためには、spawn_link/1を用いて現在のプロセスにリンクします。

つぎのコードでは、spawn_link/1で親プロセスのシェルにリンクしました。そのため、子プロセスの例外から、親のシェルがEXITの通知を受け取ったのです。IExはシェルの終了を検知し、新たなセッションがはじまります。

iex> self()
#PID<0.84.0>
iex> spawn_link fn -> raise "oops" end
** (EXIT from #PID<0.84.0>) shell process exited with reason: an exception was raised:
    ** (RuntimeError) oops
        (stdlib) erl_eval.erl:668: :erl_eval.do_apply/6

[error] Process #PID<0.90.0> raised an exception
** (RuntimeError) oops
    (stdlib) erl_eval.erl:668: :erl_eval.do_apply/6

spawn_link/3関数を使うと、モジュールの関数からプロセスがリンクできます。引数はモジュールと関数、および渡す引数のリストです。exit/1は、呼び出したプロセスを終了します。そのとき、引数が理由として示されます。

defmodule Example do
  def explode, do: exit(:boom)
end
iex> spawn(Example, :explode, [])
#PID<0.89.0>
iex> spawn_link(Example, :explode, [])
** (EXIT from #PID<0.87.0>) shell process exited with reason: :boom

リンクした現在のプロセスを落としたくない場合には、Process.flag/2を用います。第1引数にフラグ:trap_exit、第2引数の値にtrueを与えると、終了が止められるのです。この関数はerlangのprocess_flag/2によりフラグを定めています(「Receiving Exit Signals」参照)。終了を止めると、{:EXIT, from_pid, reason}というタプルでメッセージが受け取られます。

defmodule Example do
  def explode, do: exit(:boom)
  def run do
    Process.flag(:trap_exit, true)
    spawn_link(Example, :explode, [])
    receive do
      {:EXIT, from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end
iex> Example.run
Exit reason: boom
:ok

ふたつのプロセスをリンクさせるのでなく、情報を得たい場合があります。そういうときに用いるのがspawn_monitor/3で、戻り値はPIDと監視するプロセスの参照です。プロセスがクラッシュすると、メッセージを受け取ります。現在のプロセスは落ちませんし、終了を止める必要もありません。

defmodule Example do
  def explode, do: exit(:boom)
  def run do
    {_pid, _ref} = spawn_monitor(Example, :explode, [])
    receive do
      {:DOWN, _ref, :process, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end
iex> Example.run
Exit reason: boom
:ok

リンクはProcess.link/1を呼び出して定めることもできます。Processモジュールには、そのほかにもさまざまな機能が備わっています。

プロセスとリンクは、フォールトトレランスに優れたシステムを構築するためにも重要な役割を果たします。Elixirのプロセスは互いに切り離されており、デフォルトでは何も共有しません。したがって、ひとつのプロセスの失敗が他のプロセスをクラッシュさせたり、悪影響を及ぼすことはないのです。

けれど、プロセスをリンクすると、障害が起きたときの関係がつくれます。プロセスがよくリンクされるのはスーパーバイザーです。スーパーバイザーはプロセスが落ちたことを検出し、代わりの新たなプロセスを開始できます。

他の言語では例外を補足して処理しなければなりません。Elixirはスーパーバイザーがシステムを適切に再起動できるので、プロセスが失敗したままで構いません。「早く失敗させる」というのは、Elixirでソフトウェアを開発するときの一般的な考え方です。

Task

spawn関数にもとづいて構築されたTaskは、よりよいエラーレポートとイントロスペクション(introspection)の機能を提供します。spawn/1spawn_link/1の替わりにTask.start/1Task.start_link/1を使うと、戻り値は単にPIDではなくタプル{:ok、pid}になります。これでタスクが監視ツリーで扱えるようになるのです。エラーレポートも詳しくなります。

iex> Task.start fn -> raise "oops" end
{:ok, #PID<0.91.0>}
iex>
[error] Task #PID<0.91.0> started from #PID<0.87.0> terminating
** (RuntimeError) oops
    (stdlib) erl_eval.erl:668: :erl_eval.do_apply/6
    (elixir) lib/task/supervised.ex:88: Task.Supervised.do_apply/2
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<20.99386804/0 in :erl_eval.expr/5>
    Args: []

Task.async/3はバックグラウンドで関数を実行します。その戻り値をTask.await/2に渡せば、結果の値がえられるのです(第2引数のタイムアウトはデフォルト値5000)。アプリケーションの実行は止めずに、負荷の高い処理をするときに役立ちます。なお、:timer.sleep/1は引数のミリ秒間、処理を一時停止するerlangの関数です。

defmodule Math do
  def hypot(x, y) do
    :timer.sleep(3000)
    :math.sqrt(x * x + y * y)
  end
end
iex> task = Task.async(Math, :hypot, [3, 4])
%Task{
  owner: #PID<0.87.0>,
  pid: #PID<0.94.0>,
  ref: #Reference<0.4236645349.3512991745.24823>
}
iex> Math.hypot(5, 12)
13.0  #<- 3秒後
iex> Task.await(task)
5.0

状態

構築するアプリケーションに、状態をもたせたい場合があります。たとえば、アプリケーションの設定を保持したり、ファイルを解析してメモリに読み込みたいときなどです。

状態は一般にプロセスにもたせます。プロセスをつくってループさせ、状態を保持し、メッセージをやり取りするのです。ここで、キーと値の組みが納められる新たなプロセスをつくってみましょう。

初期化の関数(start_link)は、プロセスをTask.start_link/1でつくり、プライベートの関数(loop)に空のマップを渡します。receive/1ブロックは、メッセージのキーが:getのときは、send/2でメッセージを送ります。そして、Map.get/3で得たキーの値を返すのです。キーが:putなら、Map.put/3でマップにキーと値を加えます。いずれも関数を再帰呼び出しすることにより、処理を続けていることにご注目ください。

defmodule KeyValue do
  def start_link do
    Task.start_link(fn -> loop(%{}) end)
  end
  defp loop(map) do
    receive do
      {:get, key, caller} ->
        send caller, Map.get(map, key)
        loop(map)
      {:put, key, value} ->
        loop(Map.put(map, key, value))
    end
  end
end

モジュールの初期化の関数(KeyValue.start_link)を呼び出したときは、まだマップは空です。そのため、:getメッセージを送っても値は得られません。現在のプロセスの受信トレイには何も入っていないのです。

iex> {:ok, pid} = KeyValue.start_link
{:ok, #PID<0.91.0>}
iex> send(pid, {:get, :hello, self()})
{:get, :hello, #PID<0.87.0>}
iex> flush
nil
:ok

プロセスに:putメッセージを送ると状態が更新され、キーと値が加わります。そのあとは、:getメッセージでキーの値も得られるでしょう。PIDを知っていれば、別のプロセスであっても、メッセージを送って状態は操作できます。

iex> send pid, {:put, :hello, :world}
{:put, :hello, :world}
iex> send pid, {:get, :hello, self()}
{:get, :hello, #PID<0.87.0>}
iex> flush
:world
:ok

Process.register/2を使えば、PIDに名前をつけて登録できます。どのプロセスからでも、その名前でメッセージが送れるようになるのです。

iex> Process.register(pid, :kv)
true
iex> send :kv, {:get, :hello, self()}
{:get, :hello, #PID<0.87.0>}
iex> flush
:world
:ok

状態と名前をプロセスに登録して保持することは、Elixirアプリケーションでたびたび使われる手法です。もっともほとんどの場合、ご紹介した手動で処理するコードは書きません。Elixirには多くの抽象的なやり方が予め備わっています。たとえば、状態を抽象化したのがAgentです。

Agent

Agentは、状態が保持されているバックグラウンドのプロセスを抽象化です。アプリケーションとノードの他のプロセスからアクセスできます。Agent.start_link/2は現在のプロセスにリンクされたAgentをはじめます(第2引数のオプションはデフォルト値[])。戻り値のタプル{:ok, pid}の第2要素がAgentの状態への参照です。

Agent.update/3により、第1引数のAgentの状態を第2引数の関数で更新できます(第3引数のタイムアウトはデフォルト値5000ミリ秒)。そして、Agentから値を取り出すのがAgent.get/3です。第1引数のAgentから得た値を、第2引数の関数で処理して返します(第3引数のタイムアウトはデフォルト値5000ミリ秒)。

iex> {:ok, agent} = Agent.start_link(fn -> %{} end)
{:ok, #PID<0.112.0>}
iex> Agent.update(agent, fn map -> Map.put(map, :hello, :world) end)
:ok
iex> Agent.get(agent, fn map -> Map.get(map, :hello) end)
:world
iex> {:ok, agent} = Agent.start_link(fn -> [1, 2, 3] end)
{:ok, #PID<0.105.0>}
iex> Agent.update(agent, fn state -> state ++ [4, 5] end)
:ok
iex> Agent.get(agent, &(&1 -- [2, 4]))
[1, 3, 5]

Elixir入門もくじ

番外

Posted on by:

gumitech profile

gumi TECH

@gumitech

gumi TECH は、株式会社gumiのエンジニアによる技術記事公開やDrinkupイベントなどの技術者交流を行うアカウントです。 gumi TECH Blog: http://dev.to/gumi / gumi TECH Drinkup: http://gumitech.connpass.com

gumi TECH Blog

株式会社gumiのエンジニアによる技術記事を公開しています。

Discussion

pic
Editor guide