The previous post was an intro about the task module and how it works in a more general way.
Now it's time to make things a little bit complex with Task.Supervisor
and see some elixir concepts.
Before going ahead let's see some elixir base concepts:
A supervisor is a process that supervises other processes, which we refer to as child processes. Supervisors are used to building a hierarchical process structure called a supervision tree. Supervision trees provide fault-tolerance and encapsulate how our applications start and shutdown.
In Elixir, all code runs inside processes. Processes are isolated from each other, run concurrent to one another, and communicate via message passing.
Elixir’s processes should not be confused with operating system processes. Processes in Elixir are extremely lightweight in terms of memory and CPU (even compared to threads as used in many other programming languages). Because of this, it is not uncommon to have tens or even hundreds of thousands of processes running simultaneously.
We will use Task.Supervisor
to create processes responsible for manage the related child.
Creating the playground
The first step is create a new elixir application with a supervision tree, more information about mix possibilities can be found in the official documentation.
mix new newsample --sup
Time to see what was created:
lib
lib/newsample
lib/newsample.ex
lib/newsample/application.ex
test
test/newsample_test.exs
test/test_helper.exs
README.md
mix.exs
For now we'll just work with lib/newsample.ex
and lib/newsample/application.ex
.
The second step is to create the same function from the previous post inside lib/newsample.ex
with minor changes to execute our tests easily.
To force an explosive scenario we have a pattern matching with "alpha" value.
def say_hello("alpha" = to), do: raise("Error to say hello to #{to}")
def say_hello(to) do
IO.puts("Hello #{to}")
:ok
end
def process() do
items = ["alpha", "beta", "gama"]
Enum.map(items, fn item ->
Task.async(fn ->
say_hello(item)
end)
end)
|> Enum.map(&Task.await/1)
end
And finally, execute it using elixir's interactive shell.
iex -S mix
Then execute the code:
iex(1)> self
#PID<0.145.0>
iex(2)> Newsample.process
Hello beta
Hello gama
22:01:54.276 [error] Task #PID<0.148.0> started from #PID<0.145.0> terminating
** (RuntimeError) Error to say hello to alpha
(newsample 0.1.0) lib/newsample.ex:15: anonymous fn/2 in Newsample.process/1
(elixir 1.11.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
(elixir 1.11.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
(stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Function: #Function<1.13017213/0 in Newsample.process/1>
Args: []
** (EXIT from #PID<0.145.0>) shell process exited with reason: an exception was raised:
** (RuntimeError) Error to say hello to alpha
(newsample 0.1.0) lib/newsample.ex:15: anonymous fn/2 in Newsample.process/1
(elixir 1.11.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
(elixir 1.11.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
(stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Interactive Elixir (1.11.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> self
#PID<0.151.0>
As we can see the raised exception was propagated until the iex
session. It's possible to check it comparing the PID
before #PID<0.145.0>
and after #PID<0.151.0>
run the code.
How we avoid this explosive error propagation?
Now we are in the step to use a supervised task, let's open lib/newsample/application.ex
.
def start(_type, _args) do
children = []
opts = [strategy: :one_for_one, name: Newsample.Supervisor]
Supervisor.start_link(children, opts)
end
Let's create our Task.Supervisor
as a child processes
children = [
{Task.Supervisor, name: Newsample.TaskSupervisor}
]
And now use this supervised task in the process
method and start as async_no_link
. It means that there is no link between this child process and the parent.
def process() do
items = ["alpha", "beta", "gama"]
Enum.map(items, fn item ->
Task.Supervisor.async_nolink(Newsample.TaskSupervisor, fn ->
say_hello(item)
end)
end)
|> Enum.map(&Task.await/1)
end
If we execute again everything will work correctly without this error propagation.
iex(1)> self
#PID<0.158.0>
iex(2)> Newsample.process
Hello beta
Hello gama
21:04:48.200 [error] Task #PID<0.161.0> started from #PID<0.158.0> terminating
** (RuntimeError) Error to say hello to alpha
(newsample 0.1.0) lib/newsample.ex:6: Newsample.say_hello/1
(elixir 1.11.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
(elixir 1.11.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
(stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Function: #Function<1.39266525/0 in Newsample.process/0>
Args: []
** (exit) exited in: Task.await(%Task{owner: #PID<0.158.0>, pid: #PID<0.161.0>, ref: #Reference<0.1303494724.713818114.156158>}, 5000)
** (EXIT) an exception was raised:
** (RuntimeError) Error to say hello to alpha
(newsample 0.1.0) lib/newsample.ex:6: Newsample.say_hello/1
(elixir 1.11.3) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
(elixir 1.11.3) lib/task/supervised.ex:35: Task.Supervised.reply/5
(stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
(elixir 1.11.3) lib/task.ex:639: Task.await/2
(elixir 1.11.3) lib/enum.ex:1411: Enum."-map/2-lists^map/1-0-"/2
iex(2)> self
#PID<0.158.0>
As we can see the PID
is the same after and before the execution.
Processes and supervisors trees are important components of elixir to build resiliences and fault tolerance systems.
Top comments (0)