DEV Community

loading...

Experiments in concurrency 3: Event loops

Shalvah
Builder, explorer, writer. APIs, dev tools, automation. Advocate of simple design.
Originally published at blog.shalvah.me ・12 min read

New to this series? Check out Parts 1 and 2 first.

Last time, I wrote about how coroutines help enable concurrency, and how they work in JavaScript. I spent some time after that trying to implement coroutines in Ruby and PHP, but inspired by these two articles, I decided to go bigger—implement an event loop.

What's an event loop?

An event loop is a loop that runs your code and does things based on some events. That's vague, I know, but it'll become clearer as we go.

It's called a loop because it really is a loop. In fact, this is basically what happens in an event loop:

function runEventLoop() {
  runUserCode();
  while (taskQueue.isNotEmpty()) {
    runNextTask();
  }
}
Enter fullscreen mode Exit fullscreen mode

First, it executes your code, statement-by-statement. Then, if there are any tasks in the task queue (which your code might have added), it executes them one-by-one. It'll keep looping until there are no more tasks, and then it exits. For instance, take this JS code:

// index.js

const x = 3 + 6;

setTimeout(() => {
  console.log("I'm a task!");
}, 2000);

fs.readFile("some-file.txt", (contents) => {
  console.log("Also a task!");
});

console.log(x);
Enter fullscreen mode Exit fullscreen mode

When you run this with node index.js, it executes each statement in the file. By the time that's done, there are now two tasks in the queue: the tasks added by setTimeout() and fs.readFile(). The event loop waits until those tasks are ready and executes them. When they're completed, the task queue is empty, and the event loop exits, ending the script.

JavaScript's event loop is implicit, so you don't have to run it manually. We'll be implementing an event loop in Ruby, so we'll have to explicitly start the loop with our code inside, so we can enter the loop. Here's how using our event loop will look like:

loop = EventLoop.new
loop.run do
  # do stuff
  loop.set_timeout(2000) do
      puts "I'm a task!"
  end
  # do stuff
end
Enter fullscreen mode Exit fullscreen mode

Task queues

As we've seen, event loops are powered by a task queue. This queue is typically FIFO (first in, first out)—so tasks that get added earlier will get executed first. Let's implement our EventLoop class: its constructor creates a new task queue, while its run method executes the block that it's given, then waits until the task queue is empty.

class EventLoop
  def initialize
    @queue = TaskQueue.new
  end

  def run(&entrypoint)
    entrypoint.call

    until @queue.empty?
      callback = @queue.next
      callback.call if callback
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

(We're going to be passing blocks and procs around a lot with and without the & notation; check out my article on function handles if you're not familiar with those in Ruby.)

Now, let's implement our TaskQueue. Normally, you'd use a simple array and manipulate it as needed, but there's an important detail: not all tasks are equal. For example, a task on a timer (like setTimeout()) should be executed when the timer runs out, even if it was added to the queue last. To make things worse, JavaScript has a "microtask" queue—tasks on this queue get run before any other tasks. And then there's process.nextTick() in Node.js, which runs before the microtask queue. That's a lot of queues!

Node.js' event loop handles this by running in phases: there's a "timers" phase where it checks for any timers that are due, a "pending callbacks" phase where it checks for I/O callbacks (like the callback you passed to fs.readFile()), and more. The phases have a specific order, so that the tasks with higher priority get executed first.

However, we'll keep things simple: we'll still expose a single queue to our event loop, but that queue actually holds two sub-queues: one for timers, and one for any other callbacks.

class TaskQueue
  def initialize
    @callbacks = []
    @timers = []
  end

  def empty?
    @timers.empty? && @callbacks.empty?
  end

  # Get the next task from the queue
  def next
    # TODO
  end
end
Enter fullscreen mode Exit fullscreen mode

Timers

Let's see if we can implement our own version of JavaScript's setTimeout. As we saw earlier, it should take a time and a block to be executed when the time is due:

loop.set_timeout(2000) do
  puts "This task will be executed after 2s"
end
Enter fullscreen mode Exit fullscreen mode

Okay, implementation. Running set_timeout should add a timer task to the queue, telling the queue the time when the task should be executed:

class EventLoop
  # ...
  def set_timeout(timeout, &callback)
    current_time = Time.now.to_f * 1000
    @queue.add_timer(current_time + timeout, callback)
  end
end
Enter fullscreen mode Exit fullscreen mode

Now, back to the queue. We know that timers have higher priority than other tasks: they should be executed when the execution time is due, even if there are tasks in the queue ahead of them. This is why they have their separate queue. But they also have priority amongst themselves. If I run set_timeout(2000) and then set_timeout(1000), I'd expect the second timer to be executed first, since it has a shorter timeout. There's a data structure for this: a priority queue.

A priority queue is like a regular queue, but it sorts items by their priority. When adding an item you specify its priority; when you ask for the next item, it gives you the one with the highest priority. This fits our use case—we can use the scheduled time as the priority for each timer, so when we ask the queue for a timer, we get the timer that's supposed to fire next.

Rather than implementing mine, I'll be using the priority queue from this library. We'll replace our @timers array with a priority queue instead. Here's how our queue will work:

  • We add timers to the timers queue using the inverse (negative) of their scheduled time (-scheduled_time) as priority. We're using the inverse because this PriorityQueue gives us highest priority items next, but we want items with the smallest time first (ie the timers that are happening soon).
  • Whenever next() is called on our TaskQueue, we fetch the next timer. If its time is due, then we return it, otherwise we return the next callback, or nil, and try again later.
require 'algorithms'

class TaskQueue
  def initialize
    @callbacks = []
    @timers = Containers::PriorityQueue.new
  end

  def empty?
    @timers.empty? && @callbacks.empty?
  end

  def next
    next_timer = @timers.next
    current_time = Time.now.to_f * 1000
    if next_timer && (current_time >= next_timer[:scheduled_time])
      # Timer's due; remove it from the queue
      @timers.pop
      return next_timer[:callback]
    end

    if @callbacks.length
      return @callbacks.shift
    end

    nil
  end

  def add_timer(scheduled_time, callback)
    priority = -scheduled_time
    @timers.push({ scheduled_time: scheduled_time, callback: callback }, priority)
  end
end
Enter fullscreen mode Exit fullscreen mode

All good!

Coroutines

Let's head back to coroutines, to see how they can be used to implement "asynchronous" operations with an event loop. Here's a Ruby version of our synchronous copy-and-stringify example from last time:

require 'json'

array = 8_000_000.times.map { |i| {a: i} }
copied = [*array]
puts "Copied"
stringified = copied.to_json
puts "Stringified"
Enter fullscreen mode Exit fullscreen mode

Now let's convert this to a coroutine. Ruby doesn't have generators, but it has something called fibers that lets us achieve the same thing (pause and resume execution of a block). Here's how you'd write a fiber:

example_fiber = Fiber.new do
    puts "Executing Part 1"
    Fiber.yield

    puts "Executing Part 2"
    Fiber.yield

    puts "Executing Part 3"
    Fiber.yield

    return "Finished"
end

example_fiber.resume # Prints "Executing Part 1"
example_fiber.resume # Prints "Executing Part 2"
example_fiber.resume # Prints "Executing Part 3"
Enter fullscreen mode Exit fullscreen mode

You create a fiber by passing in a block to Fiber.new. Inside that block, you call Fiber.yield to pause execution, like JavaScript's yield. The caller can start or resume the fiber by calling resume.

Here's how we would write our copying and stringifying tasks as coroutines using fibers.

copy_fiber = Fiber.new do |arr|
  copied_array = []
  arr.each_with_index do |item, index|
    if index > 0 && (index % 50_000).zero?
      Fiber.yield # Pause every 50_000 items
    end
    copied_array.push item
  end

    puts "Done with copying"
  copied_array
end

stringify_fiber = Fiber.new do |arr|
  result = "["
  arr.each_with_index do |item, index|
    if index > 0 && (index % 50_000).zero?
      Fiber.yield # Pause every 50_000 items
    end
    result += item.to_json
    result += ","
  end

  result[result.length - 1] = "]"
    puts "Done with stringifying"
  result
end
Enter fullscreen mode Exit fullscreen mode

Now let's implement the dispatcher as part of our event loop. We can add a new run_coroutine method to our EventLoop class:

class EventLoop
  # ...

  def run_coroutine(fiber, *args)
    fiber.resume(*args)

    if fiber.alive?
      @queue.add { self.run_coroutine(fiber) }
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Key details:

  • the run_coroutine method takes in the fiber and any arguments to be passed to it, then starts the fiber by running fiber.resume with those arguments
  • we use the fiber.alive? method to check if there's still more of the fiber left to execute. If there is, then we add a new task to the task queue.

With that, we've achieved concurrency. By adding a new task to the queue, we're handing control to the event loop to continue the coroutine whenever it's free.

Let's implement the add method in our TaskQueue. Its job is very simple: add a new task to the @callbacks array:

class TaskQueue
  def add(&callback)
    @callbacks << callback
  end
end
Enter fullscreen mode Exit fullscreen mode

Let's test out what we have so far:

loop = EventLoop.new
loop.run do
  loop.run_coroutine(copy_fiber, array)
  loop.run_coroutine(stringify_fiber, array)
  loop.set_timeout(1200) { puts "Executed after 1.2s" }
  loop.set_timeout(800) { puts "Executed next" }
  loop.set_timeout(2200) { puts "Executed last" }
  puts "Executed first"
end
Enter fullscreen mode Exit fullscreen mode

Here it is in action (note: I reduced the size of the array because the Ruby version on repl.it is somewhat slow). Click "▸" to run the main.rb; click the "Files" icon to see event_loop.rb:

Sweet!

Continuations

We've implemented coroutines, but we can go a step further: how about if we add a piece of code we want to run after the coroutine is done?Think about fs.readFile() in Node.js—you pass in the file you want to read, and then a function you want to execute when that is done (known as a callback or continuation).

fs.readFile(fileName, (contents) => {
  // do stuff with the file
});
Enter fullscreen mode Exit fullscreen mode

This is helpful in case we have some other operation that depends on the result of the coroutine (for example, our stringifying operation could use the result of the copy operation).

To do this, we add another argument to our run_coroutine method to represent the continuation. Once the coroutine finishes (fiber.alive? returns false), we capture its return value and add a new task to the queue to run the continuation later, passing it the return value.

class EventLoop
  # ...

  def run_coroutine(fiber, *args, &continuation)
    return_val = fiber.resume(*args)

    if fiber.alive?
      @queue.add { self.run_coroutine(fiber, &continuation) }
    else
      if continuation
        @queue.add { continuation.call(return_val) }
      end
    end
  end

end
Enter fullscreen mode Exit fullscreen mode

Now we can do this:

loop.run do
  loop.run_coroutine(copy, array) do |copied|
    # Continuation after copy
    puts "Copy is DONE."
    loop.run_coroutine(stringify, copied) do
      # Continuration after stringify
      puts "Stringify is DONE."
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

If we wanted to avoid the extra indentation and potential "callback hell", we could wrap this in a custom Promise implementation, complete with .then/.catch and even async/await, but I'll stop here.

For completeness, we can add an add_callback method to the event loop to provide an easy way for external code to directly add new tasks to the queue—it'll come in handy later.

  def add_callback(&callback)
    @queue.add &callback
  end
Enter fullscreen mode Exit fullscreen mode

Handling requests

Now let's make our event loop actually useful: we'll write a basic HTTP server. Ruby's default execution model is similar to PHP's (blocking and synchronous), so a single-threaded server would have to finish handling one request before it can accept new ones. For example, here's a simple HTTP server:

require 'socket'

server = TCPServer.new("127.0.0.1", 5678)
puts "Listening on localhost:5678"

while socket = server.accept # Wait for a new connection
    request = socket.gets.strip
    puts "Started handling req #{request}: #{Time.now}"
    sleep 5
    puts "Responding 5 seconds later: #{Time.now}"
    socket.puts <<~HTTP
      HTTP/1.1 200 OK
      Content-Type: text/html

      Hii 👋
    HTTP
    socket.close
end
Enter fullscreen mode Exit fullscreen mode

When I test this with autocannon making three simultaneous requests (autocannon --connections 3 --amount 3 --timeout 10000 --no-progress http://localhost:5678/):

Listening on localhost:5678
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:31 +0100
Responding 5 seconds later: 2021-04-29 10:23:36 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:36 +0100
Responding 5 seconds later: 2021-04-29 10:23:41 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:41 +0100
Responding 5 seconds later: 2021-04-29 10:23:46 +0100
Enter fullscreen mode Exit fullscreen mode

As expected, requests are handled sequentially. Each request does something that takes 5 seconds, and the thread has to wait for that to finish before handling the next request, so it takes 15 seconds in total. Let's change this to a concurrent setup using our event loop.

def handle_next_request(server, &handler)
  begin
    socket = server.accept_nonblock
    request = socket.gets.strip
    puts "Started handling req #{request}: #{Time.now}"
    handler.call(socket)
    add_callback { handle_next_request(server, &handler) }
  rescue IO::WaitReadable, Errno::EINTR
    add_callback { handle_next_request(server, &handler) }
  end
end

run_loop do
  server = TCPServer.new("127.0.0.1", 5678)
  puts "Listening on localhost:5678"
  handle_next_request(server) do |socket|
    set_timeout(5000) do
      puts "Responding 5 seconds later: #{Time.now}"
      socket.puts <<~HTTP
        HTTP/1.1 200 OK
        Content-Type: text/html

        Hii 👋
      HTTP
      socket.close
    end
    # A few extra timers, to look cool 😎
    set_timeout(0) { puts "0ms later: #{Time.now}" }
    set_timeout(1500) { puts "1.5s later: #{Time.now}" }
  end
end
Enter fullscreen mode Exit fullscreen mode

It looks a little complicated at first, but it's not. Here are the important bits:

  • begin/rescue is Ruby's try/catch
  • server.accept_nonblock is key here. server.accept (the earlier version) checks to see if there's a new incoming request, and waits until there is one. accept_nonblock, however, throws an error if there's no incoming connection (this is why we use the begin/rescue), giving you the freedom to do other things before checking again. And that's what we're doing here: if there isn't any new connection, we add a task to check again, allowing the event loop to execute any pending tasks first. On the flip side, if there's a request, handle_next_request will call the handler block you passed in, allowing you to respond to the request.

Note: I made the code a little cleaner by using some Ruby abstractions to hide the creation of the loop, and to make our set_timeout, run_coroutine, and add_callback methods global. See the full code in this gist.

Now, when I try making three concurrent requests:

Listening on localhost:5678
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
1.5s later: 2021-04-29 11:02:26 +0100
1.5s later: 2021-04-29 11:02:26 +0100
1.5s later: 2021-04-29 11:02:26 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Enter fullscreen mode Exit fullscreen mode

Sweet! The server starts handling one request, schedules the timeouts and switches to another, so no connection is kept waiting. And, in this case, we get a speed boost—the server responds to all requests in 5s total (rather than 15s), since it doesn't wait for one to finish.

Why use an event loop?

You might already be convinced, but let's formally answer this. Why use an event loop? Why try to make the language behave differently? The answer is concurrency (duh).

Event loops are a way to achieve concurrency without having to add more threads or processes. Remember in part 1 of this series, we saw how a single-threaded server in PHP has zero concurrency? With an event loop, we can split up handling one request into separate tasks, allowing for our app to switch over to handling a new request. In fact, the example of a concurrent webserver in PHP uses Amp's event loop, along with asynchronous coroutines like \Amp\File\get():

\Amp\Loop::run(function () {
    $sockets = [
        Server::listen("0.0.0.0:8080"),
    ];

    $server = new HttpServer($sockets, new CallableRequestHandler(function (Request $request) {
        $file = yield \Amp\File\get(__FILE__);
        return new Response(Status::OK, ["content-type" => "text/plain"], "Hello, World!");
    }), new NullLogger);

    yield $server->start();
});
Enter fullscreen mode Exit fullscreen mode

Many other popular async libraries implement an event loop, including ReactPHP, async (Ruby), and EventMachine (Ruby).

But it's not just the idea of doing multiple things at once that makes event loops cool. It's the ability to utilise "idle time". For example, with server.accept, we spend a good deal of time waiting for the next connection. What if we could do something else while waiting? server.accept_nonblock + the event loop allows for that. If we go further and implement asynchronous I/O like file reads, database queries, network calls, we'll see the difference even more—in that time our code spends waiting for the OS/database/remote server to return results, we can switch to doing something else.

Event loops shine in event-driven contexts. New events (like new requests, complete database queries) add new items to the task queue, and the event loop is able to switch over and handle those. Nginx uses an event loop, and so do many other GUI systems.

I've mostly mentioned webservers in this series, but concurrency is needed in a lot of places. For instance, games have to process user input while rendering the game world and a making a bunch of calculations. Mobile apps have to handle user input, fetch data from remote APIs, draw UIs and more. An event loop is one way of making this work.

Discussion (0)