DEV Community

Szymon Gibała
Szymon Gibała

Posted on • Originally published at sgibala.com

Async without async

To start with a short TLDR: this article is my exploration of
implementing an asynchronous networking application, without using
async Rust.

Background

Over the past months (if not years at this point) I have been playing
around with some sane approaches of implementing consensus algorithms,
and perhaps more general distributed systems.

As part of this journey I am seeking ways to have more control over
the whole "application framework". This recently led me to ask a
question: Can I have a performant, IO heavy application without using
async Rust?

Why not async?

In many ways async support in Rust is great. If you are just writing a
web application, the async and await keywords really make it very easy
to write the code as you would with sync Rust. However, everything
comes at a cost - async Rust brings in the complexity of the whole
async runtime, and hides a lot of what is going on from our sight.

One of the reasons for it is that Async Rust and accompanying runtimes
are built to be a generalized solution, to support a lot of different
cases and be robust in many different ways. To be able to do this some
complexity arises naturally, which is then well hidden from us by async
and friends. Not all this complexity is needed for every use case,
and since there is no such thing as a free lunch, there may come the
time to pay for it.

Another thing we sacrifice by using an async runtime is control.
It is programmed in a specific way, with some knobs that we are able
to tweak and some we are not. Until we understand the code thoroughly
and grasp the possible code paths, there will always be a black box
aspect to it.

What I have learned over the years is that sometimes it is better to
ditch a one-size-fits-all, batteries-included solution, and build
something simpler, use-case-specific, sacrificing some time but sparing
yourself a lot of complexity, and retaining full control and
better understanding of the system.

Part of this exploration is to answer the question whether it is worth
it in this case.

Objective

Not using async Rust is not a goal in itself, but only a means to
an end. The main objective remains to build a proof of concept of a
simple system that could be used to implement more sophisticated
software on top of it.

The goal is exploration, but there are a few constraints I want to
satisfy.

  1. Keep it simple

    • The foundation needs to be simple, easy to reason about, troubleshoot and understand. Let the complexity arise from the problems that applications on top of it will be solving, not from its fundamental parts. Given that I want it to be single threaded, at least for as long as it is not a performance limitation.
  2. Keep it real

    • This application aims to be proof of concept, of something that could be turned into a functional system. For me this implies:
      • No busy waiting - I do not want to burn the CPU when nothing is going on.
      • No added latency - When IO is ready, it should be processed, not wait until a few milliseconds sleep between loop iterations finishes.
  3. Not just request trigger

    • It is not a REST API I want to use it for, therefore applications built on top need to have a way of "triggering" some logic not only when a request arrives. To be more specific here, I am thinking of time based triggers, be it intervals or timeouts, there needs to be a way to run some logic based on those, and not just incoming IO.

Since I am ditching async Rust, and IO is still at the core of the
application, the first step is to figure out how to handle it without
the magic of Tokio.
Let's take a look at the possibilities.

Handling IO

If I were asked to write the simplest echo server to handle just one
connection I would end up with something like this:

let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
let (mut connection, addr) = listener.accept()?;
loop {
    let mut buffer = [0u8; 1024];
    let bytes_read = connection.read(&mut buffer)?;
    if bytes_read == 0 {
        // Connection closed by client
        break;
    }
    connection.write_all(&buffer[..bytes_read])?;
}
Enter fullscreen mode Exit fullscreen mode

And that is the first, likely simplest approach to handling IO -
blocking IO. The application will block on the connection.read call
until there is something to read.

Now if I would have to handle multiple connections there are a few
ways to extend it.

I suppose that the most intuitive one is to just handle each connection
in a separate thread and keep accepting in the main one:

let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
loop {
    let (mut connection, addr) = listener.accept()?;
    let _handle = std::thread::spawn(move || -> anyhow::Result<()> {
        loop {
            let mut buffer = [0u8; 1024];
            let bytes_read = connection.read(&mut buffer)?;
            if bytes_read == 0 {
                // Connection closed by client
                break;
            }
            connection.write_all(&buffer[..bytes_read])?;
        }
        Ok(())
    });
}
Enter fullscreen mode Exit fullscreen mode

Clearly, this approach is more versatile than handling only one
connection, but it is also clear that this approach violates one of my
objectives -- being single threaded.

Note: a variation of this could be process per connection, which
is used by some systems. Still they often use async IO anyway.

The other option that we have allows us to keep our single thread, all
we need is making sockets non-blocking and adding a bit more code:

let mut connections = Vec::new();
let listener = std::net::TcpListener::bind("127.0.0.1:9000")?;
listener.set_nonblocking(true)?;
loop {
    // accept all pending connections
    for cnn_res in listener.incoming() {
        match cnn_res {
            Ok(connection) => {
                // set socket non-blocking
                connection.set_nonblocking(true)?;
                connections.push(connection);
            }
            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                // No more incoming connections at the moment
                break;
            }
            Err(e) => {
                return Err(e.into());
            }
        }
    }
    // Go through all connections. Read, write, and retain
    // only not closed connections.
    let mut new_connections = Vec::new();
    for connection in connections.iter_mut() {
        let mut drop = false;
        let mut buffer = [0u8; 1024];
        loop {
            match connection.read(&mut buffer) {
                Ok(bytes_read) => {
                    if bytes_read == 0 {
                        // Connection closed by client
                        drop = true;
                        break;
                    }
                    // If we would like to do it properly, we should
                    // handle would block errors as well
                    connection.write_all(&buffer[..bytes_read])?;
                }
                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                    // No data to read at the moment
                    break
                }
                Err(e) => {
                    return Err(e.into());
                }
            }
        }
        if !drop {
            new_connections.push(connection.try_clone()?);
        }
    }
    connections = new_connections;
}
Enter fullscreen mode Exit fullscreen mode

However, a careful observer can immediately see that this violates
another constraint, as the loop will just keep spinning burning all
the CPU cycles. We could avoid busy looping by adding a short sleep
between iterations, but that is added latency I want to avoid as
well.

With all of those out of the way, and not suitable, we need to do a
full circle and go back to async. Not necessarily async Rust but
async IO nonetheless.

Asynchronous IO (or not really)

As the name suggests, asynchronous IO is not synchronous.

ba_dum_tss

But what it really means is a bit complicated.

Async IO can work in different ways, and I am not sure if there is
a real, correct definition of what is async and what is not. In
general, when we talk about async, it is understood as something that
happens "in the background" and there is some notification when
"things are ready".

Different systems work in different ways. With io_uring, IO
happens in kernel space and user space application receives
notification when the work is completed, while with epoll the
application still does the dirty work of IO syscalls, and just
receives the notification when there is progress to be made.

Deeper tangent:
I would say that async is in the eye of the beholder.
One can argue that epoll is not "real" async since the application
only receives the event and all work still happens synchronously
(in a non-blocking way, let's say).
However, if you go with this thinking then Rust tokio isn't really
async, since it is also the application that does the IO.
"But io_uring is a real async!" You may object. In its case it is not
the application that does IO, but the kernel itself. It must be a
true async then!
However, if you look at it from the perspective of the CPU (or even
the kernel), it all happens on the same silicon (perhaps on different
cores, but that is not for us to decide), so is it "async" after all?

Different operating systems have different APIs for async IO, to name
a few:

  • kqueue - MacOS, BSD
  • epoll and io_uring - Linux
  • IOCP - Windows

There are other, older mechanisms on Linux as well, such as poll and
select, but these days epoll is likely most prevalent, with io_uring
being the newest and slowly getting more adoption.

Since penguins dominate the server world, I focused on Linux
and took a deeper look into Epoll.

Epoll

I knew about epoll from the first time I asked myself a question
"But how does async, actually work?" that led me into the deep rabbit
hole of various kernel mechanisms, down to the realm of CPU interrupts
(if you never went there, I highly recommend that journey). However,
not being a C programmer, I never used it "directly".

Most of the "async" web dev libraries in all languages rely on it,
but hide it carefully under a few layers of abstraction, mainly because
they are meant to work on all OSes and not just on Linux. But, let's
get to the point...

Epoll as a whole
is an API in the Linux kernel that allows a process to register
interest in IO events for a set of file descriptors.

There are 4 syscalls listed under the epoll man page:

  • epoll_create
  • epoll_create1
  • epoll_ctl
  • epoll_wait

Names are somewhat self-explanatory, so I will not copy-paste
definitions from the
man page,
feel free to check it out on your own.

I will not leave you empty-handed, however, and give you a quick intuition of how things work:
Epoll is about events. Instead of constantly checking if there is
any IO to be done, the user space application receives "notification",
when there is "progress" to be made. Since behind this mechanism is the
kernel, while waiting for the events the waiting thread can "go to
sleep" and get woken up when the IO event arrives, hence not wasting
CPU cycles by spinning around checking all connections, and also not
adding latency with an actual sleep.

One might ask: how do we know when to stop reading or writing then?
Well, if you ask the socket politely it will tell you. As long as it is
a non-blocking socket, as async IO is usually used in conjunction
with those.

And by socket telling you what I mean is returning EAGAIN or
EWOULDBLOCK.

A look inside

For each epoll instance created in userspace there is an
eventpoll
allocated on the kernel side. It contains a red-black tree of
epitems
keyed by file pointer and file descriptor. When we register
interest, the new tree node is inserted, and a callback is added
to file descriptors' wait queue. This callback is where the magic
happens, as whenever we call epoll_wait our thread will be parked
(if no interests are ready), and it is this callback's job to wake
it up (if the interest mask matches). Additionally when this happens
the reference to epitem from the tree is inserted into
eventpoll's ready list.

Now, to the more interesting part: how to actually use it.
My goal here is to get a real glimpse of epoll in all its glory,
not covered by the compatibility layers and easy to use abstractions.

Fine, fine... Using libc is not the lowest one can go, but it is good
enough for today...

c_code_is_coming

First things first, I need something that listens on a port and
accepts connections. No epoll magic here, no async IO, just good
old C:

#define _GNU_SOURCE
#include <netinet/in.h>
#include <stdio.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>

int main(void) {
    int socket_fd = socket(AF_INET, SOCK_STREAM, 0);   // create TCP socket
    if (socket_fd < 0) { perror("socket"); return 1; } // check return value

    int yes = 1;
    setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));

    struct sockaddr_in addr = {          // bind address
        .sin_family = AF_INET,           // IPv4
        .sin_port   = htons(9000),       // port 9000
        // Convert from host byte order to network byte order
        .sin_addr   = { htonl(INADDR_LOOPBACK) } // localhost
    };

    if (bind(socket_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { 
        perror("bind"); return 1; 
    }

    if (listen(socket_fd, SOMAXCONN) < 0) { 
        perror("listen"); 
        return 1;
    }

    printf("Listening on 127.0.0.1:9000...\n");

    struct sockaddr_in clientaddr;
    socklen_t clientaddr_size = sizeof(clientaddr);
    int connection_fd = accept(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size);
    if (connection_fd < 0) {
        perror("accept");
        return 1;
    }
    printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));

    char buf[4096];

    for (;;) {
        ssize_t r = read(connection_fd, buf, sizeof buf); // read some
        if (r == 0) {      // peer closed
            printf("read 0 bytes, closing fd\n");
            close(connection_fd);
            break;
        } else if (r < 0) {
            perror("read");  // read error
            close(connection_fd);
            break;
        } else {            // got r bytes
            printf("read %zd bytes\n", r);
            ssize_t off = 0;
            while (off < r) { 
                // echo back, in a blocking way
                ssize_t w = send(connection_fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL);
                if (w < 0) {
                    perror("send");  // write error
                    close(connection_fd);
                    break;
                }
                off += w;    // advance
            }
        }
    }

    return 0;
}
Enter fullscreen mode Exit fullscreen mode

This will do the work, as a simple echo server. However, as an example
of single thread blocking IO (just in a different language), it can
only handle one connection at a time.

Since async IO only makes sense with non-blocking sockets, the first
step is to make the listening socket as such:

#include <fcntl.h>

static int set_nonblocking(int fd) {
    // F_GETFL - return (as the function result) the file access mode and
    // the file status flags; arg is ignored.
    int fl = fcntl(fd, F_GETFL, 0);
    if (fl == -1) return -1;
    // Preserve the current flags and add O_NONBLOCK.
    return fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}

...
    int yes = 1;
    setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
    // new line - setting socket_fd as non-blocking
    if (set_nonblocking(socket_fd) < 0) { 
        perror("fcntl"); 
        return 1; 
    }
...
Enter fullscreen mode Exit fullscreen mode

Now, this breaks the echo server, since accept will no longer block,
but return the error instead:

Listening on 127.0.0.1:9000...
accept4: Resource temporarily unavailable
Enter fullscreen mode Exit fullscreen mode

Which basically means that: "if the socket was blocking, the call would
block". Now it is the time we need to create an epoll instance and
register read interest (EPOLLIN) for socket_fd on it:

#include <sys/epoll.h>
...
if (listen(socket_fd, SOMAXCONN) < 0) { 
    perror("listen"); 
    return 1; 
}

// Create epoll instance and return its fd
int ep = epoll_create1(EPOLL_CLOEXEC);
if (ep < 0) { 
    perror("epoll_create1"); 
    return 1; 
}

// watch socket_fd for READ events, and register it with epoll instance
struct epoll_event ev = { .events = EPOLLIN, .data.fd = socket_fd };
if (epoll_ctl(ep, EPOLL_CTL_ADD, socket_fd, &ev) < 0) { 
    perror("epoll_ctl ADD lfd"); 
    return 1; 
}
...
Enter fullscreen mode Exit fullscreen mode

Next, instead of just calling accept and handling the connection
directly in the main thread, we call epoll_wait inside the loop. When
the socket is able to accept a connection, epoll_wait returns,
putting an event into the buffer we pass to it. We then iterate
through new events, checking if the associated file descriptor is
the listening socket -- in which case we accept all new connections
and add them to epoll -- or regular connection socket otherwise.

...
struct epoll_event events[128];      // event buffer
char buf[4096];                      // I/O buffer

printf("Listening on 127.0.0.1:9000...\n");

for (;;) {
    // epoll_wait is a system call. It will populate the events array and 
    // return the number of events that were triggered.
    // We also specify max events to return - calculated based on the size of the events array.
    int n = epoll_wait(ep, events, (int)(sizeof events / sizeof events[0]), -1);
    if (n < 0) {
        if (errno == EINTR) continue; // interrupted -> retry
        perror("epoll_wait");
        break;
    }
    printf("epoll_wait wake up: %d events\n", n);
    for (int i = 0; i < n; i++) {     // handle ready fds
        int fd = events[i].data.fd;   // fd associated with the event
        uint32_t e = events[i].events;// event flags - readable, writable etc

        if (fd == socket_fd) { // socket_fd is readable, that may mean new connections to accept
            for (;;) { // accept until EAGAIN or EWOULDBLOCK
                struct sockaddr_in clientaddr;
                socklen_t clientaddr_size = sizeof(clientaddr);
                // now, accept with SOCK_NONBLOCK so that the new fd is already
                // set to non-blocking, so we can avoid fcntl call
                int connection_fd = accept4(socket_fd, (struct sockaddr*)&clientaddr, &clientaddr_size, SOCK_NONBLOCK);
                if (connection_fd < 0) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no more connections
                    else perror("accept4");
                    break;
                }
                printf("accepted connection from %s:%d\n", inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));
                // We have accepted a new connection. Now we add it to the
                // epoll instance to monitor it for events.
                struct epoll_event cev = {
                    // Edge-triggered epoll will notify only once when fd is
                    // ready, it will not keep notifying until we read all data.
                    .events = EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR,  // read, edge-triggered, hup/err
                    .data.fd = connection_fd
                };
                // Add new connection to epoll
                if (epoll_ctl(ep, EPOLL_CTL_ADD, connection_fd, &cev) < 0) {
                    perror("epoll_ctl ADD connection_fd");
                    close(connection_fd);
                }
            }

        } else { // handle existing connection
            if (e & (EPOLLHUP | EPOLLERR)) { // hangup/error
                printf("epollhup | epollerr, closing fd\n");
                // fd is removed from the epoll when the last reference
                // is closed. We never dup it so it is the only one.
                close(fd);
                continue;
            }

            if (e & EPOLLIN) { // readable
                if (handle_connection_io(fd, buf, sizeof buf) < 0) {
                    break;
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Epoll interest can be registered as level-triggered or edge-triggered.
Level-triggered is a default option and it will keep notifying while
the interest "is fulfilled". So if I register a TCP connection
socket with read interest epoll_wait will keep waking with an
event until I read all available data from that socket.
Edge-triggered (EPOLLET option) on the other hand will notify
(at least once) only when the interest "becomes fulfilled", so in
the example above, only when new data arrives to the socket.
More details can be found on already known to you
man page.

Here I add TCP connections to epoll as edge-triggered, however, in this
case it does not really matter, since I read all available data each
time and we are not working around any constraints. I also do not care
about writes as they are done in best effort fashion.

Gotchas with writes

Write interest is slightly more complicated. If we were to use
level-triggered epoll, we would get wake up events as long as the
socket is writable, which if we do not have anything to write will
be all the time, hence the application will never "sleep". One
option here is to register write interest only when there is data
to be written, and then remove it. It is not a problem with
edge-triggered epoll, however, here we need to be mindful that we
only get notified when the socket state changes to be writable,
therefore if the socket was already writable, and we have new data
to send, we will not be notified, so either again, we re-arm the
epoll with write interest only when we have data to write, or
whenever we have new data we attempt to write to socket immediately,
and stop when the write would block.

Handling existing connections will change slightly as well, as now we
also need to handle EAGAIN and EWOULDBLOCK since connection
sockets are now non-blocking as well:

int handle_connection_io(int fd, char *buf, size_t buf_size) {
    for (;;) { // drain the connection until EAGAIN or EWOULDBLOCK
        ssize_t r = read(fd, buf, buf_size); // read some
        if (r == 0) { // peer closed
            printf("read 0 bytes, closing fd\n");
            close(fd);
            return 0;
        } else if (r < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) break; // drained
            perror("read");
            close(fd);
            return 0;
        } else { // got r bytes
            printf("read %zd bytes\n", r);
            ssize_t off = 0;
            while (off < r) { // echo back
                ssize_t w = send(fd, buf + off, (size_t)(r - off), MSG_NOSIGNAL);
                if (w < 0) {
                    // Write what we can in best effort manner. We would need
                    // EPOLLOUT and some kind of buffer per connection to do it
                    // properly in non-blocking way.
                    if (errno == EAGAIN || errno == EWOULDBLOCK) break;
                    perror("send");
                    close(fd);
                    return -1; // signal to skip to next_event
                }
                off += w;
            }
        }
    }
    return 0;
}
Enter fullscreen mode Exit fullscreen mode

The write part is a bit simplified since I do not want to store an extra
state of what has been successfully written and what was not (I could
register connection fds with write interest as well (EPOLLOUT) and get
notified when there is some progress to be made writing).

Now to compile it and run

clang main.c
./a.out
Enter fullscreen mode Exit fullscreen mode

And connect from two separate terminal windows:

nc  127.0.0.1 9000
Enter fullscreen mode Exit fullscreen mode

If I now start writing to the connections, I can see messages being echoed
back, and server logs show its hard work.

Listening on 127.0.0.1:9000...
epoll_wait wake up: 1 events
accepted connection from 127.0.0.1:50496
epoll_wait wake up: 1 events
read 5 bytes
epoll_wait wake up: 1 events
accepted connection from 127.0.0.1:50870
epoll_wait wake up: 1 events
read 6 bytes
epoll_wait wake up: 1 events
read 6 bytes
Enter fullscreen mode Exit fullscreen mode

It can handle multiple connections, runs in a single thread, does not add
any artificial latency, and is not busy looping.

This checks the requirements. So, as all software does eventually, it is
time to rewrite it in Rust.

This full code is available on Github.

Rust

Since I am not planning to become C wizard anytime soon, to build the
foundation for something more complex and come back to the idea of
"async without async", there actually needs to be some async that I
ditch, so Rust it is.

Do not expect fireworks, though, just a bit more "flashy" echo
server...

Mio

To not libc myself into oblivion or unsafe my way to hell, I decide to
take an easier path. Path well trodden by others, the secret async
source behind Tokio - Mio.

Mio not only wraps around Epoll with a nice, easy-to-use API, but
also does so over other OSes' async APIs, making our app
cross-platform!

We can now forget about epoll's naked glory. However, the
overall approach is the same as in C:

  1. Create a Poll instance (which on Linux uses Epoll)
  2. Register listener and accepted connections as a Source
  3. Wait for events, and handle IO in non-blocking way

And to be fair, there is not much more to it, since Mio is handling
all the dirty work behind the scenes.

Poll's API is quite similar to what we saw in C code, but without
making your hands dirty with direct syscall calls.
As with C code, the first thing to do is register a listener socket.
To use it with Poll, it needs to be wrapped with
mio::net::TcpListener,
which provides the aforementioned Source trait implementation
expected by the
Registry::register(...) method (Registry lives inside Poll):

...
let listener = std::net::TcpListener::bind(addr)
    .context(format!("Failed to bind to address {}", addr))?;
listener
    .set_nonblocking(true)
    .context("Failed to set listener to non-blocking mode")?;

let poll = mio::Poll::new().context("Failed to create Poll instance")?;

let mut listener = mio::net::TcpListener::from_std(listener);
let listener_token = mio::Token(0);

poll.registry()
    .register(&mut listener, listener_token, mio::Interest::READABLE)
    .context("Failed to register listener with Poll")?;
...
Enter fullscreen mode Exit fullscreen mode

While in C events are associated with a specific file descriptor, to be
cross platform Mio uses
Token, which is
a wrapper around usize and allows us to map the event back to the
Source, for example a specific TCP connection, or as is the purpose
of listener_token, to TCP listener.

With Poll initialized, we can wait for events and process them:

fn wait_for_events(&mut self, duration: std::time::Duration) -> anyhow::Result<()> {
    let mut events = mio::Events::with_capacity(1024);
    self.poll
        .poll(&mut events, Some(duration))
        .context("Failed to poll events")?;

    for event in events.iter() {
        self.handle_io_for_event(event)?;
    }

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

And to process them, we again come back to Tokens, as each
mio::Event
is associated with the token used when registering event::Source:

fn handle_io_for_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> {
    let token = event.token();

    if token == self.listener_token {
        if !event.is_readable() {
            tracing::warn!("Listener event is not readable");
            return Ok(());
        }

        loop {
            if self.free_tokens.is_empty() {
                tracing::warn!("Maximum connections reached, cannot accept new connection");
                break;
            }

            // Accept on mio::net::TcpListener already sets socket to
            // non-blocking mode and returns mio::net::TcpStream.
            let (mut stream, addr) = match self.listener.accept() {
                Ok((stream, addr)) => (stream, addr),
                // EAGAIN and EWOULDBLOCK are both mapped to std::io::ErrorKind::WouldBlock
                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                    // No more connections to accept at the moment
                    break;
                }
                Err(ref e) if is_transient_listener_error(e) => {
                    tracing::warn!(error =? e, "transient error accepting connection, retrying");
                    continue;
                }
                Err(e) => {
                    return Err(anyhow::anyhow!("Failed to accept new connection: {:?}", e));
                }
            };

            tracing::info!("Accepted new connection from {}", addr);
            let token = self
                .free_tokens
                .pop_front()
                .expect("we checked above that free_tokens is not empty");

            // On Linux Mio by default uses edge-triggered epoll
            let result = self.poll.registry().register(
                &mut stream,
                token,
                mio::Interest::READABLE.add(mio::Interest::WRITABLE),
            );
            match result {
                Ok(_) => {}
                Err(err) => {
                    self.free_tokens.push_back(token);
                    return Err(anyhow::anyhow!(
                        "Failed to register new connection with Poll: {:?}",
                        err
                    ));
                }
            }

            self.connections_by_token
                .insert(token, Connection::new(stream));
        }
    } else if let Some(connection) = self.connections_by_token.get_mut(&token) {
        match connection.process_io_event(event) {
            Ok(_) => {}
            Err(err) => {
                tracing::error!(
                    "Error processing IO event for token {:?}. Dropping connection. Error: {:?}",
                    token,
                    err
                );
                self.connections_by_token.remove(&token);
                self.free_tokens.push_back(token);
            }
        }
    } else {
        tracing::warn!("Received event for unknown token: {:?}", token);
    }

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

It is analogous to our previous echo server, differentiating
between events to the listening socket and connection sockets, with
the difference that here we compare Tokens instead of file
descriptors.

Unlike in the C implementation, however, here we handle writes properly
by registering both READABLE and WRITABLE interests on the TCP
stream from an established connection. For that to work, writes are
initially appended to an in-memory buffer and then written to the
connection whenever we can make progress:

pub struct Connection {
    stream: mio::net::TcpStream,
    write_buffer: Vec<u8>,
}

impl Connection {
    fn new(stream: mio::net::TcpStream) -> Self {
        Connection {
            stream,
            write_buffer: Vec::new(),
        }
    }

    fn process_io_event(&mut self, event: &mio::event::Event) -> anyhow::Result<()> {
        let mut try_write = false;
        if event.is_readable() {
            let data = read_from_stream(&mut self.stream)
                .context("failed to read data from tcp stream")?;
            // dump to logs just for our example
            tracing::debug!(
                "Read from connection: {} bytes: {}",
                data.len(),
                String::from_utf8_lossy(&data)
            );
            // Since we want to write data back, if we read something we will
            // try to write it to connection immediately.
            self.write_buffer.extend_from_slice(&data);
            try_write = data.len() > 0;
        }
        if event.is_writable() || try_write {
            let written = self.write_pending()?;
            tracing::debug!("Wrote {} bytes to connection", written);
        }

        Ok(())
    }

    fn write_pending(&mut self) -> anyhow::Result<usize> {
        let n_written = write_to_stream(&mut self.stream, &self.write_buffer)
            .context("failed to write data to tcp stream")?;
        self.write_buffer.drain(0..n_written);
        Ok(n_written)
    }
}
Enter fullscreen mode Exit fullscreen mode

When reading and writing to the connection, we need to remember to
handle WouldBlock errors as "cannot do more, wait for next epoll
event":

use std::io::{Read, Write};

pub fn read_from_stream<T: Read>(stream: &mut T) -> anyhow::Result<Vec<u8>> {
    let mut all_bytes = Vec::new();
    let mut read_buffer: [u8; 1024] = [0; 1024];

    let mut interrupted = false;

    loop {
        match stream.read(&mut read_buffer) {
            Ok(read) => {
                if read == 0 {
                    return Err(anyhow::anyhow!("Connection closed by peer"));
                }
                all_bytes.extend_from_slice(&read_buffer[..read]);
            }
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                // No more data to read at the moment
                break;
            }
            // Retry once if we hit interrupted error
            Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => {
                interrupted = true;
                continue;
            }
            Err(e) => {
                return Err(anyhow::anyhow!("Failed to read from TCP stream: {}", e));
            }
        }
    }

    Ok(all_bytes)
}

pub fn write_to_stream<T: Write>(stream: &mut T, buffer: &[u8]) -> anyhow::Result<usize> {
    let mut interrupted = false;
    let mut written = 0;

    loop {
        if buffer.is_empty() || written >= buffer.len() {
            break;
        }

        match stream.write(&buffer[written..]) {
            Ok(n) => {
                if n == 0 {
                    return Err(anyhow::anyhow!("Connection closed by peer"));
                }
                written += n;
            }
            Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                // Cannot write more at the moment
                break;
            }
            // Retry once if we hit interrupted error
            Err(e) if e.kind() == std::io::ErrorKind::Interrupted && !interrupted => {
                interrupted = true;
                continue;
            }
            Err(e) => {
                return Err(anyhow::anyhow!("Failed to write to TCP stream: {}", e));
            }
        }
    }

    Ok(written)
}
Enter fullscreen mode Exit fullscreen mode

That gives us all necessary ingredients to be async in non-async
world, and completes the echo server. However, if you paid attention
there is one more objective to be taken care of.

Time driven action

A lot of applications have to do more than just handling IO, and do
some work periodically, be it send some heartbeat or request some
data from another system. Let's consider those as time-based work, as
opposed to request-based work that is triggered by IO events.

It is then time to add a killer feature to the mighty echo server.
Every 5 seconds it is going to send its "status" to all connected
clients. Yes, that's it.

In async Rust one could simply do some
tokio::select
magic with a
interval timer
as one of the branches.

I still want the application to stay single threaded, so a separate
sleeping thread is also not an option. Fortunately, Epoll (or Mio) has
exactly what we need. With epoll_wait we can specify the timeout for
how long we want to wait for events, before the function will return
and presumably come back to the next iteration of the loop. Mio Poll
exposes the same functionality in poll method as well.

fn poll(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>
Enter fullscreen mode Exit fullscreen mode

I already pass timeout through wait_for_events function. And that
brings me to the last piece of this puzzle, which is the aforementioned
loop that will send the status periodically, and wait for the IO
events when idle:

pub fn run(&mut self, exit: Arc<AtomicBool>) -> anyhow::Result<()> {
    tracing::info!(
        "Echo server starting up, listening on {:?}",
        self.listener.local_addr()?
    );

    let mut status_timer = std::time::Instant::now();

    loop {
        tracing::debug!("Echo server main loop iteration starting");
        if exit.load(std::sync::atomic::Ordering::Relaxed) {
            tracing::info!("Echo server exiting as requested");
            return Ok(());
        }

        if status_timer.elapsed() >= self.status_interval {
            let num_connections = self.connections_by_token.len();
            tracing::info!(
                num_connections,
                "Broadcasting server status to all connections"
            );
            let message = format!("Server status: {} active connections\n", num_connections);
            self.broadcast_message(message.as_bytes())?;
            status_timer = std::time::Instant::now();
        }

        let time_remaining = self
            .status_interval
            .checked_sub(status_timer.elapsed())
            .unwrap_or_default();
        self.wait_for_events(time_remaining)?;
    }
}
Enter fullscreen mode Exit fullscreen mode

To set things straight, broadcasting the message is non-blocking as
well. All it does is extend the write buffer for all connections
and try to progress the write as much as it can until hitting
the WouldBlock error. In case of fatal connection errors, we just
drop them:

fn broadcast_message(&mut self, message: &[u8]) -> anyhow::Result<()> {
    let mut connections_to_drop = Vec::new();

    for (token, connection) in self.connections_by_token.iter_mut() {
        connection.write_buffer.extend_from_slice(message);
        match connection.write_pending() {
            Ok(_) => {}
            Err(err) => {
                tracing::error!(
                    "Error writing broadcast message to connection: {:?}. Dropping connection. Error: {:?}",
                    token,
                    err
                );
                connections_to_drop.push(*token);
            }
        }
    }

    for token in connections_to_drop {
        self.connections_by_token.remove(&token);
        self.free_tokens.push_back(token);
    }

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

Just extending the buffer would not be enough, and we would not get
any event if the connection would currently be writable, since the
state did not change.
This is the thing I mentioned here.

There is not a lot more magic than in C implementation, but for the
sake of completeness, let's run it.

You can find full source code on GitHub.

RUST_LOG=echo_server_rust=debug cargo r
2026-03-23T05:55:00.083791Z  INFO echo_server_rust::echo_server: Echo server starting up, listening on 127.0.0.1:9000
2026-03-23T05:55:00.083820Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting
Enter fullscreen mode Exit fullscreen mode

And if we connect from two terminal windows, we can see that the echo
server is cruising.

nc 127.0.0.1 9000
test
test
Server status: 1 active connections
Server status: 2 active connections
Enter fullscreen mode Exit fullscreen mode
2026-03-23T05:59:00.484727Z DEBUG echo_server_rust::echo_server: Echo server main loop iteration starting
2026-03-23T05:59:00.484814Z  INFO echo_server_rust::echo_server: Broadcasting server status to all connections num_connections=2
2026-03-23T05:59:02.280579Z DEBUG echo_server_rust::echo_server: Read from connection: 5 bytes: test

2026-03-23T05:59:02.280732Z DEBUG echo_server_rust::echo_server: Wrote 5 bytes to connection
Enter fullscreen mode Exit fullscreen mode

Summary - Good Bad and Ugly

This concludes the echo server, and thus the proof of concept.
As intended, I have achieved "async IO" without using async Rust.

Well, if we come back to the rant in
Asynchronous IO section, maybe it
is not so async. Perhaps this article should be named "Non blocking,
event driven IO without async", but hey, that is not very catchy.
Anyway, I am digressing...

It, however, begs the question: was it worth it?
If I were about to give you advice on writing echo servers, you are
probably better off just using async, and that is likely true
for most simple applications.

With this simple example it is hard to argue the case of async without
async, yet I am going to give it a shot.

Bad

Writing an echo server in async Rust could probably be done in 1/3 of the
lines of code. For all IO operations instead of catching would block
errors, we would simply await. Mio Poll, although still there,
would be conveniently hidden inside the belly of the Tokio runtime.

Ugly

Things would get a bit ugly if we were to go deep into some more
complex network protocol. Let's consider that we need to perform a
simple handshake. With Tokio we could simply:

send_handshake(connection).await;
let ack = wait_for_ack(connection).await;
confirm_connection(connection).await;
Enter fullscreen mode Exit fullscreen mode

What happens under the hood, is that the compiler would convert
this code into a state machine, with each await marking the state
transition, where it can yield.

This convenience is gone when we drop async Rust. For it to
work with our framework, we would need to build this kind of state
machine ourselves (unless we chose to block the thread), a greatly
simplified example could be:

enum OutgoingConnectionState {
    Initiated,
    SentHandshake,
    AwaitingAck,
    Connected,
}
fn handle_io_event(...) -> ... {
...
match connection.state {
    Initiated => {
        try_send_handshake(connection)
        return SentHandshake
    },
    SentHandshake => {
        try_progress_write(connection)
        if !has_pending_write(connection) {
            return AwaitingAck
        } else {
            return SentHandshake
        }
    },
    AwaitingAck => {
        if let Some(ack) = try_read_handshake_ack(connection) {
            return Connected
        }
        return AwaitingAck
    }
}
...
}
Enter fullscreen mode Exit fullscreen mode

Each "state" logic could be called multiple times in case our "hands"
are so big that they do not fit into send queues, or acks did not
arrive fully at once, therefore there can be much more logic hidden
inside function calls.

Without async, buffers and queues become your friends.

Good

Enough with the beating, though. I was supposed to argue for, not
against, my own creation!

One thing we do not see in this code are Mutexes. Since everything
is single threaded and there are no
Tasks ready to jump
around different threads with the next await, no
Send + Sync + 'static, no Pins and Arcs, we do not annoy the
borrow checker, nor ourselves. There is no async runtime, so when
things get hot, it should be easier to troubleshoot and debug.

The whole system is much more deterministic, IO can be easier to
separate from the application logic (think some kind of event loop),
and perhaps a small thing, but there is no function coloring and
async spreading over the whole codebase.

Of course some of those things could be achieved with async as well.

Conclusion

To wrap up these lengthy conclusions, I am yet to experiment with this
approach in more sophisticated systems and see whether it has some
juice in it.

How it turns out, maybe you will be able to find out in the next
episode article. Until then I hope you enjoyed this experiment and
perhaps even learned something. If you have any questions, comments, or
suggestions, feel free to reach out. Thanks for reading!

Top comments (0)