DEV Community

bakenator
bakenator

Posted on

A Tour of Task Async and Await

One great thing about Elixir is that the source code for all the language modules is easily examinable. For a personal project I was recently considering using the Task module instead of a spawn, but then I asked myself...

How do Tasks Work?

If you look at the documentation for the Task module in Hex Docs it says the most common use case for tasks is to convert sequential code into concurrent code by computing a value asynchronously, then it provides the example below.

task = Task.async(fn -> do_some_work() end)
res = do_some_other_work()
res + Task.await(task)

In order to understand how Tasks work we can dive into the Task source code for each of the two functions used above.

Task.async

First there is a call to Task.async. Below is the source code for both definitions of the function.

  def async(fun) when is_function(fun, 0) do
    async(:erlang, :apply, [fun, []])
  end

  def async(module, function_name, args)
    when is_atom(module) and is_atom(function_name) and is_list(args) do
    mfa = {module, function_name, args}
    owner = self()
    {:ok, pid} = Task.Supervised.start_link(get_owner(owner), get_callers(owner), :nomonitor, mfa)
    ref = Process.monitor(pid)
    send(pid, {owner, ref})
    %Task{pid: pid, ref: ref, owner: owner}
  end

Task.async/1 is a catch all function that passes the param function to Task.async/3 with :erlang.apply.

{:ok, pid} = Task.Supervised.start_link(get_owner(owner), get_callers(owner), :nomonitor, mfa)

In this line of Task.async/3 the current pid (owner) and original param function (mfa) is passed to the Task.Supervised as params to the Task.Supervised.start_link function. start_link then spawns a new process and returns its pid.

send(pid, {owner, ref})

Next a copy of the owner_pid (owner) and a monitor reference (ref) is sent to the new Task.Supervised process. These are what the Task.Supervised process will use to identify itself to Task.await when it finishes running the function.

%Task{pid: pid, ref: ref, owner: owner}

Finally Task.async/3 returns a Task struct to be used in Task.await/2 for identification.

Next lets take a look what was going on inside Task.Supervised that was spawned

Task.Supervised

def start_link(owner, callers, monitor, fun) do
  {:ok, :proc_lib.spawn_link(__MODULE__, :reply, [owner, callers, monitor, fun])}
end

defp reply(owner, owner_pid, mref, timeout, mfa) do
    receive do
      {^owner_pid, ref} ->
        _ = if mref, do: Process.demonitor(mref, [:flush])
        send(owner_pid, {ref, invoke_mfa(owner, mfa)})
    ...

{:ok, :proc_lib.spawn_link(__MODULE__, :reply, [owner, callers, monitor, fun])}

Task.Supervised.start_link starts a new process and calls its own reply function.

receive do
{^owner_pid, ref} ->

In the lines copied above, we can see that the first thing reply does is to wait for the {owner, ref} tuple that was sent up in the Task.async function.

send(owner_pid, {ref, invoke_mfa(owner, mfa)})

After it is received Task.Supervised runs the original user provided function in invoke_mfa(owner, mfa). invoke_mfa basically uses apply(module, fun, args) internally with additional error handling.

The result of the original function is then sent back to the owner process (wherever we ran Task.async). This message back to the owner process will be picked up by Task.await

Task.await

  def await(%Task{ref: ref, owner: owner} = task, timeout \\ 5000) when is_timeout(timeout) do
    ...
    receive do
      {^ref, reply} ->
        Process.demonitor(ref, [:flush])
        reply
    ...

def await(%Task{ref: ref, owner: owner} = task, timeout \\ 5000) when is_timeout(timeout) do

Task.await takes a Task struct and pulls out the ref attribute.

receive do
{^ref, reply} ->

The function then calls receive and waits for any incoming messages. Each message from Task.Supervised has a ref it uses as an id to identify itself to Task.await.

reply

If the ref is a match, the result is returned.

Summary of Task.async/Task.await lifecycle

The function fn -> do_some_work() end was:

  1. Given to Task.async as a param
  2. Within Task.async, given to a new Task.supervised process as a param
  3. Run in the Task.supervised process
  4. The result was sent in a message from the Task.supervised process back to the original process
  5. The message from Task.Supervised is caught in Task.await and returned

Why Use a Ref for Identification?

Task.Supervised processes identify themselves with a ref which was initialized all the way back in the line ref = Process.monitor(pid). Why does it do this when we already have a variable to identify a process, pid?

To be honest I dug around and looked through the source for a while and could not find a specific reason to use ref as an identifier instead of the pid. It has been set up this way since the start of the Elixir source code. If anyone has insights on this, I would be interested to know.

Out of Order Tasks?

From this inspection of Tasks I learned something very valuable about messages between processes.

Messages may not be processed in the order in which they are received!

In the Task.await function there is no base case for the receive pattern matching. If the message ref does not match, then there is no match. What happens in this case? An example:

task1 = Task.async(fn ->
  :timer.sleep(10_000)
  1 
end)

task2 = Task.async(fn -> 2 end)

IO.inspect Task.await(task1)
IO.inspect Task.await(task2)

Here we can see that Task 1 is going to stall for 10 seconds before it returns. So Task 2 is going to finish and send its response before Task 1. But Task 1 is the first to block waiting for its response. What is the output?

1
2

Even though the messages with the function results were received in the order [task2, task1] everything still worked.

This is because if receive finds no match for the latest incoming message, it simply puts that message back in the queue and keeps on waiting. Then each time a new message is received it checks again and keeps blocking until a match is found.

This also means that if there are already matching messages in the queue, receive does not need to wait at all. It will check all the existing messages in the queue and move forward if one is found to match. That happens in this case within Task.await(task2)

Hope you found this dive into the Task async/await functionality interesting.

Coding in Elixir is Fun!

Source code for the Task files can be found here:
Task Module
Task.Supervised Module

Top comments (0)