DEV Community

Miriam
Miriam

Posted on

The easiest concurrent code you will ever write: Elixir's Task Module

Have you ever wanted to do multiple things at once? I constantly try and usually fail. This morning I tried to drink coffee and grab my cat as it attempted to escape to the tick-infested outdoors. Now the cat is on a caffeine fueled rampage and my curtains are covered in coffee stained paw prints. Trying to write code while juggling a baby is also not so great. If you drop the baby on your keyboard you might accidentally leave an awkward comment on that pull request you were reviewing. Plus, my pediatrician says dropping babies is hazardous to their health. But though I may be bad at concurrency, Elixir is pretty good at it.

One of the easiest ways to write concurrent code is using the Elixir Task module.

Say I want to feed all of my cats at once. Here is a list of my cats. They are all quite famous:

["keyboard cat", "lil bub", "grumpy cat"]

I have a function for feeding them:

def feed_cat(name) do
  "#{name} has been fed"
end

If I do a normal iteration they'll get fed synchronously and Grumpy Cat will have a good reason to be grumpy as he will always have to wait for the other cats to be fed before he gets his food.

def feed_all_the_cats() do
  Enum.map(["keyboard cat", "lil bub", "grumpy cat"], &feed_cat/1)
end

iex > feed_all_the_cats()
iex > ["keyboard cat has been fed", "lil bub has been fed", "grumpy cat has been fed"]

But thankfully I can use Task.async_stream() to feed all the cats at once.

Because Task.async_stream() is an Elixir Stream it will sit their lazily till it is triggered by an enumerator. Here I'm triggering the stream with Enum.map().

def feed_all_the_cats() do
  ["keyboard cat", "lil bub", "grumpy cat"]
  |> Task.async_stream(&feed_cat(&1))
  |> Enum.map(&(&1))
end

iex > feed_all_the_cats()
iex > [
  ok: "keyboard cat has been fed",
  ok: "lil bub has been fed",
  ok: "grumpy cat has been fed"
]

Alright, so it may not be obvious here that they are all getting fed at once. Here is one way to see the difference.

Let's add some sleeping time before feeding the cat.

def feed_cat(name) do
  :timer.sleep(2000)
  "#{name} has been fed at #{inspect NaiveDateTime.utc_now()}"
end

Now, when you feed the cats synchronously, you will see that they are fed two seconds apart.

def feed_all_the_cats() do
  Enum.map(["keyboard cat", "lil bub", "grumpy cat"], &feed_cat/1)
end

iex > feed_all_the_cats()
iex > [
 "keyboard cat has been fed at ~N[2020-07-12 14:54:46.684748]",
 "lil bub has been fed at      ~N[2020-07-12 14:54:48.689686]",
 "grumpy cat has been fed at   ~N[2020-07-12 14:54:50.690832]"
]

I can't even imagine how grumpy poor Grumpy Cat is at this point. With the two second delay on each feeding, he had to wait an entire six seconds for his food.

But when you run the asynchronous code you will see that they all were fed within microseconds of each other. Even with a two second delay added to each feed, the total run time is two seconds and Grumpy Cat no longer has to wait and is fed at the same time as the other cats.

def feed_all_the_cats() do
  ["keyboard cat", "lil bub", "grumpy cat"]
  |> Task.async_stream(&feed_cat(&1))
  |> Enum.map(& &1)
end

iex > feed_all_the_cats()
iex > [
    ok: "keyboard cat has been fed at ~N[2020-07-12 14:58:11.844675]",
    ok: "lil bub has been fed at      ~N[2020-07-12 14:58:11.844714]",
    ok: "grumpy cat has been fed at   ~N[2020-07-12 14:58:11.844722]"
]

I doubt even the grumpiest cat will notice a microsecond.

There are some options you will may want to pass in when using Task.async_stream().

  1. :max_concurrency sets the maximum number of tasks to run at the same time - you don't want to try to run a million things at once and crash your server. It defaults to System.schedulers_online/0 (the number of schedulers available in the VM).

  2. :ordered specifies whether the results should be returned in the same order as your input stream. It defaults to true.

  3. :timeout defaults to 5000 which is usually fine for me, but if you need more or less time, you can specify it here.

  4. :on_timeout = how do you want to handle a timeout? Your two options are :exit or :kill_task.

Here is how the function looks with options passed in:

def feed_all_the_cats() do
  ["keyboard cat", "lil bub", "grumpy cat"]
  |> Task.async_stream(&feed_cat(&1), %{
    max_concurrency: 4,
    ordered: true,
    timeout: 5000,
    on_timeout: :kill_task
  })
  |> Enum.map(& &1)
end

Happy concurrent code writing!

Top comments (0)