DEV Community

Yawar Amin
Yawar Amin

Posted on • Edited on

Practical OCaml, Multicore Edition

PREVIOUSLY, we created a small proof-of-concept OCaml project to prove out how easy it is to make small but helpful utilities. That project was a simple server that forked off a new process to handle each incoming connection, using nothing but OCaml's built-in libraries. It's a simple, brute-force method, but it quickly reaches a limit in scalability.

To get around this limit, many technologies have evolved over the years. However, as OCaml's Multicore edition (version 5.00) nears its ship date, we will leapfrog past all those technologies and go back to the future–direct-style nonblocking concurrent I/O, running in parallel threads.

Eio

To enable access to all these features, an exciting new library called Eio is being developed. It uses a new paradigm of direct-style concurrent I/O programming, without the need for monads or async/await, thus avoiding the function colour problem.

Let's update our example server to use Eio, with the following goals:

  • Run multi-threaded (N parallel threads where N is the number of CPUs). OCaml's threads are called 'domains' and they map 1:1 to OS threads.
  • Handle each incoming request in a non-blocking fiber in each domain.

Note: to better understand the rest of this post, I highly recommend reviewing the previous proof-of-concept project.

Fibers are essentially non-blocking green threads that run in each domain. A domain can run one fiber at a time. When a fiber gets blocked waiting for I/O, it yields and lets another one run. This achieves concurrency. Meanwhile, other fibers can run in parallel in other domains. This achieves parallelism. By combining the two, we can fully saturate all our CPU cores.

Setup

Now, let's install Multicore OCaml. It hasn't officially shipped yet, so we will get the alpha version. Note, refer to the Up and Running page to install opam, the OCaml Package Manager. Then:

# 'mc' is just a short name we choose
# We need to explicitly add the opam alpha repository with higher priority than the default repo, to allow installing some multicore-only libraries:

opam switch create mc 5.0.0~alpha1 --repositories=mc=git+https://github.com/kit-ty-kate/opam-alpha-repository.git,default
eval $(opam env)
opam install dune utop eio
Enter fullscreen mode Exit fullscreen mode

The project

Now, we create the statsd filter project almost exactly the same way as before:

mkdir ocaml_statsd_filter
cd ocaml_statsd_filter
Enter fullscreen mode Exit fullscreen mode

Create the dune-project file:

(lang dune 3.6)
Enter fullscreen mode Exit fullscreen mode

Create the dune file:

(executable
  (name ocaml_statsd_filter)
  (libraries str eio_main))
Enter fullscreen mode Exit fullscreen mode

Now, we come to the first interesting changes. A couple of the configs need to be expressed slightly differently. Here's the cfg.ml file:

let num_threads =
  try int_of_string (Sys.getenv "num_threads")
  with Not_found -> Domain.recommended_domain_count

let listen_port =
  try int_of_string (Sys.getenv "listen_port") with Not_found -> 8125

let target_host = try Sys.getenv "target_host" with Not_found -> "localhost"
let target_port = try Sys.getenv "target_port" with Not_found -> "8126"

let blocklist =
  try "blocklist"
    |> Sys.getenv
    |> String.split_on_char ',' 
    |> List.map Str.regexp_string 
  with
    Not_found -> []
Enter fullscreen mode Exit fullscreen mode

We add a new num_threads config variable to allow overriding the default, which is the 'recommended number of domains' from the standard library.

Also, the target_host can now be just a host name as we are now using the Eio.Net.getaddrinfo_stream function (shown below) to look up the IP address.

The rest of the configs are unchanged.

Finally, we come to the main file, ocaml_statsd_filter.ml:

open Eio

let max_size = 8192

let listen_addr = `Tcp (Net.Ipaddr.V4.any, Cfg.listen_port)

let target_addr net =
  match Net.getaddrinfo_stream net Cfg.target_host ~service:Cfg.target_port with
  | [] -> invalid_arg Cfg.target_host
  | addr :: _ -> addr

let allow data = Cfg.blocklist
  |> List.exists (fun regexp -> Str.string_match regexp data 0)
  |> not

let on_error = traceln "Connection handling error: %a" Fmt.exn

let main net new_domain =
  Switch.run @@ fun sw ->
  let target = Net.connect ~sw net (target_addr net) in
  let listen_socket = Net.listen ~backlog:128 ~sw net listen_addr in
  traceln "Listening on :%d" Cfg.listen_port;

  let domain_loop () =
    new_domain @@ fun () ->
    let domain_id = (Domain.self () :> int) in

    Switch.run @@ fun sw ->
    while true do
      Net.accept_fork ~sw listen_socket ~on_error @@ fun client _ ->
      let buf_str = client
        |> Buf_read.parse_exn ~max_size Buf_read.take_all
        |> String.trim
      in
      if allow buf_str then begin
        Flow.copy_string buf_str target;
        traceln "Domain %d: sent: '%s'" domain_id buf_str
      end else
        traceln "Domain %d: did not send: '%s'" domain_id buf_str
    done
  in
  let domains = List.init Cfg.num_threads (fun _ -> domain_loop) in
  Fiber.all domains

let () =
  Eio_main.run @@ fun env ->
  main (Stdenv.net env) (Domain_manager.run @@ Stdenv.domain_mgr env)
Enter fullscreen mode Exit fullscreen mode

This has quite a lot of new concepts. The first thing we notice is that the old Unix module open is replaced with Eio. Eio moves away from the old-style I/O which was very much modelled on Unix/C system calls, and tries to introduce new, safer abstractions for raw I/O.

Another change is in how we express network addresses: we now use the types and functions in the Eio.Net module to construct them.

The allowlist data checking is unchanged as it uses only the regular expressions module, Str.

We introduce an error handler to report on any exceptions that occurred while handling requests.

Finally, we come to the meat of the server. This is a main function which takes two parameters: an Eio.Net.t 'capability', and a function which can spawn a new domain safely. Eio encourages a 'capabilities style' of programming, in which functions are passed in only sufficient permissions to do the work they need to, and nothing else. This is explained quite extensively in the excellent readme documentation.

With the capabilities given to the main function, we can now start a new 'switch' (essentially, a resource manager that safely disposes of all open resources when their containing scope ends), then use it to connect to the target (upstream StatsD) address and also bind to the listening address on the local host.

We then define the 'domain loop' (which is my little pun on 'main loop') function. This function immediately spawns a new domain and returns. Asynchronously in the new domain, we capture the domain ID (an integer) for logging purposes, then start handling requests from our clients. The key to this is the Eio.Net.accept_fork function, which handles each client connection in a new fiber. This is where the asynchronous I/O magic happens. In each domain we handle each request in a new fiber. So essentially, N domains * M fibers.

The other point of note here is Eio.Flow which is an abstraction over byte streams. The actual data being shunted back and forth in requests and responses is done in terms of flows, e.g. Flow.copy_string buf_str target. And we no longer need to think in terms of 'read N bytes from that socket, then check that N bytes were read'. Flows handle all that for us.

In fact they handle it a little too well, and a client could potentially send a gigantic request which could eat up all our memory. To prevent this, we use Eio.Buf_read.parse_exn ~max_size Buf_read.take_all, which will take all the characters from the source flow, upto our max_size limit–which is actually the same as before, 8192 bytes.

The max_size parameter is explained in Eio.Buf_read.of_flow.

Notice that we output some trace log messages to prove to ourselves that multiple domains are actually running in parallel. We will show this in action later. We spin up the domains in the final line of the main function: Fiber.all domains, which runs each of the domain spawns concurrently in new fibers.

The final lines are about calling the main function with the required arguments. To get those, we need to call Eio_main.run and pass it a callback in which we set up the arguments with the env parameter. env is the ultimate source of all the capabilities, and from it we extract only the exact permissions we need for our main function–namely accessing the network, and running new domains.

Callbacks

You may have noticed a perhaps unexpected number of callbacks in this code. In most modern async I/O code today we see almost no callbacks–everyone has switched to using monadic promises/futures (async/await style). So why does Eio reintroduce a seemingly retro style?

There are two main types of callbacks:

  1. Callbacks to access resources which the library then automatically cleans up for us, e.g. the Eio environment, switches
  2. Callbacks to delay execution of code which needs to run in new domains or fibers

In my opinion, the second type of callback is most likely unavoidable. This is because we are not using any specific effect system which can delay execution and move it to different fibers/domains for us. We're using direct style and need to delay such executions ourselves. Fortunately, the type system tells us exactly where callbacks are required.

The first type of callback may go away in future iterations of OCaml and Eio, if say we standardize on a let-operator like let& to indicate 'use a resource'. E.g., hypothetically,

let& sw = Switch.run in
...

let& env = Eio_main.run in
...
Enter fullscreen mode Exit fullscreen mode

This remains to be seen however and for now a callback is the obvious way to do it. The main win here is that async I/O calls don't need callbacks–we read the client's request as a string directly, and sent it to the target address again, directly. Eio takes care of the boring details of yielding the fiber when it's blocked on I/O, and resuming when it's unblocked.

Run

Now, we need to actually run this to prove to ourselves that it works. We'll need three terminals:

  1. Mock StatsD server on port 8126 (or optionally a real one if you prefer). I just spin up a Python HTTP server: python3 -m http.server 8126. It doesn't matter what type of server it is, we just need something listening on the correct port.

  2. Run our filter: OCAMLRUNPARAM=b blocklist=foo,bar dune exec ./ocaml_statsd_filter.exe. This will immediately throw an exception and exit if the server in (1) is not running.

  3. Finally, send a few requests to our filter:

export i=0
while [ $i -lt 50 ]; do echo "bar:$i|c" | nc localhost 8125; i=$(expr $i + 1); done
Enter fullscreen mode Exit fullscreen mode

This sends about 50 requests to our filter and gives it a chance to go through multiple domains. If you check the trace logs, they look like this:

+Listening on :8125                
+Domain 1: did not send: 'bar:0|c'
+Domain 2: did not send: 'bar:1|c'
+Domain 3: did not send: 'bar:2|c'
+Domain 4: did not send: 'bar:3|c'
+Domain 5: did not send: 'bar:4|c'
+Domain 6: did not send: 'bar:5|c'
+Domain 7: did not send: 'bar:6|c'
+Domain 8: did not send: 'bar:7|c'
+Domain 1: did not send: 'bar:8|c'
+Domain 2: did not send: 'bar:9|c'
+Domain 3: did not send: 'bar:10|c'
+Domain 4: did not send: 'bar:11|c'
+Domain 5: did not send: 'bar:12|c'
+Domain 6: did not send: 'bar:13|c'
+Domain 7: did not send: 'bar:14|c'
+Domain 8: did not send: 'bar:15|c'
+Domain 1: did not send: 'bar:16|c'
...
Enter fullscreen mode Exit fullscreen mode

The domains are cycled through in a round-robin manner, which proves that they're all running in parallel (otherwise domains 2 through 8 would be blocked on 1, and the trace would print out only domain 1 for every request).

Now we come to the end of our little experiment. Hopefully this gives you some idea of what's coming in OCaml 5.00! For fun, if you want to compare this code to its original inspiration, the equivalent Rust project is here: https://github.com/askldjd/statsd-filter-proxy-rs/blob/main/src/server.rs

Top comments (9)

Collapse
 
talex5 profile image
Thomas Leonard

Also, the target_host IP address string needs to be formatted in a specific 4-byte representation that the Eio library accepts.

Note: instead of providing the raw bytes, you could use Eio.Net.getaddrinfo_datagram to parse the string. It accepts IP addresses and names.

Collapse
 
yawaramin profile image
Yawar Amin

Thanks for the hint! This looks like it would give me a datagram socket address, from which I could create a datagram socket. But a datagram socket is not a two-way flow or even a sink flow, so I would have to use the send function to 'manually' send a cstruct instead of just using Flow.copy_string. Is that the intended usage?

Collapse
 
talex5 profile image
Thomas Leonard

Oh, yeah, I didn't read it carefully enough! You want Eio.Net.getaddrinfo_stream then.

Thread Thread
 
yawaramin profile image
Yawar Amin

Got it! Updating the post now.

Collapse
 
c_cube profile image
Simon Cruanes

I think this is memory unsafe because of the use of Str :). Str is full of global state and should not be used with multiple domains (or at all, really).

I also wonder if Eio.Domain_manager would be preferable to starting domains manually? @talex5 would know. This way it's also possible to shutdown the program gracefully if needed.

Collapse
 
yawaramin profile image
Yawar Amin

Hi, you mean a race condition because of multiple domains using Str? I think in this specific case it's safe because we are just using Str.string_match to get a bool (match or no match) answer, we are not trying to get the matched string or any groups in it. I agree it's almost almost advisable to use a safe regex library though.

Also we are indeed using Eio.Domain_manager.run to start the domains instead of doing it manually, last line of the module :-)

Collapse
 
c_cube profile image
Simon Cruanes

Oh I missed that new_domain was passed as a parameter. My bad.

Is Str.string_match thread-safe though? I wouldn't swear it doesn't depend on C globals internally. In any case I think it's dicy to rely on it :)

Thread Thread
 
yawaramin profile image
Yawar Amin

Fair enough!

Thread Thread
 
talex5 profile image
Thomas Leonard

Looks like Str has been upgraded to use domain-local storage, so I think it's multicore safe now: github.com/ocaml/ocaml/commits/tru...

(but switching fibers within a domain can still corrupt the state)