DEV Community

Vital Leshchyk
Vital Leshchyk

Posted on

Async Event Dispatcher in Rust

Introduction

In this article, we'll build a simple event dispatcher in Rust. Our goal is to create a system that allows events to be handled asynchronously by multiple listeners, ensuring flexibility and thread safety. By the end, you'll have a reusable event dispatcher that can be integrated into larger Rust applications.

This article is for beginner and intermediate Rust developers who are familiar with async programming, traits, and basic concurrency concepts like Arc and Mutex.

Here's what we want in our event dispatcher:

  • An extensible event model
  • The event dispatcher should handle events asynchronously
  • Event listeners should handle events asynchronously
  • It should be easy to use
  • Event handling should be idempotent

What we definitely don't want:

  • Priority for event listeners
  • Listeners blocking or canceling other listeners

Nice-to-have features (but out of scope):

  • Error handling
  • Logging/tracing

Motivation

Async: Most applications are asynchronous, and we want to create a tool that is compatible with them. This means our event listeners must also be async.

Idempotent: We want to be able to retry events if something fails. Idempotent handling ensures that retrying an event won’t cause unintended side effects. For example, an event that updates a database should be designed to avoid duplicate updates. In this case, the author of the event listener should ensure that events are handled idempotently.

Extensible: Consumers of the event dispatcher should be able to add new events and listeners without modifying existing code.

No priority: Since events are idempotent, priority is unnecessary.

No blocking or cancellation: Blocking or canceling listeners can dramatically increase debugging complexity. For example, if you have 10 listeners and the 5th one cancels all others, it's hard to understand why the 6th listener didn't run.

Of course, it would be nice to add error handling to the event dispatcher and listener. However, that's beyond the scope of this article.

Dependencies

First, we need to use tokio because it is the most popular async runtime for Rust.
Since we decided to make our event system extensible, we will use traits. In this case, async-trait helps us to use async functions in traits.

Cargo.toml

[dependencies]
async-trait = "0.1.*"
tokio = { version = "1.*", features = ["full"] }
Enter fullscreen mode Exit fullscreen mode

main.rs

use std::any::Any;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
Enter fullscreen mode Exit fullscreen mode

Event

Let's start with the simplest part of our event system - Trait Event. All you need to do is implement this trait for your event.

pub trait Event: Sync + Any {
    fn name(&self) -> &'static str;
    fn as_any(&self) -> &dyn Any;
}
Enter fullscreen mode Exit fullscreen mode

Sync: Ensures that the event can be safely shared across threads, which is essential for async operations.

Trait Any and fn as_any - enables dynamic type checking and downcasting, allowing listeners to handle specific event types.

fn name: basically, it can be used for logging and transmitting between applications.

Listener

Implementing an event listener is also straightforward. You just need to implement the handle method, which will be called when an event is dispatched.
The handle method takes a trait object &dyn Event as a parameter. To work with the event, you'll need to use as_any to cast it to its concrete type.

#[async_trait]
pub trait EventListener: Sync {
    async fn handle(&self, event: &dyn Event);
}
Enter fullscreen mode Exit fullscreen mode

Event Dispatcher

The dispatcher collects Vec of listeners. The register_listener function adds a new listener to the list.
Function dispatch is the simplest way to dispatch an event to all listeners. It just iterates over all listeners.

Arc and Mutex are used to ensure thread safety.

#[derive(Default)]
pub struct EventDispatcher {
    listeners: Mutex<Vec<Arc<dyn EventListener>>>,
}

impl EventDispatcher {
    pub fn new() -> Self {
        EventDispatcher {
            listeners: Mutex::new(Vec::new()),
        }
    }

    pub fn register_listener(
        &self,
        listener: Arc<dyn EventListener>,
    ){
        self.listeners.lock().unwrap().push(listener);
    }

    pub async fn dispatch(&self, event: impl Event) {
        let event = Arc::new(event);

        let listeners = self.listeners.lock().unwrap().clone();
        for listener in listeners {
            listener.handle(&*event).await;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Examples of events and listeners

Now all abstractions are defined. The next step is to create some events and listeners.

#[async_trait]
#[derive(Debug, Clone)]
pub struct TestEvent {
    pub data: String,
}

impl Event for TestEvent {
    fn name(&self) -> &'static str {
        "TestEvent"
    }

    fn as_any(&self) -> &dyn Any {
        self
    }
}

pub struct TestListener {}

#[async_trait]
impl EventListener for TestListener {
    async fn handle(&self, event: &dyn Event) {
        if let Some(test_event) = event.as_any().downcast_ref::<TestEvent>() {
            println!("Handling {} with data `{}`", test_event.name(), test_event.data);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The data field of TestEvent is an example of payload. In the real world, you can replace it with an ID, idempotency key, entity, and/or any other data type.

Take a look at the handle function implementation. It includes a check for the event type.
downcast_ref allows handling multiple events in one listener.

Main

Let's check how it works!

We need to:

  • Create a new dispatcher
  • Register a listener
  • Dispatch an event
#[tokio::main]
async fn main() {
    let dispatcher = EventDispatcher::new();
    let listener = Arc::new(TestListener {});
    dispatcher.register_listener(listener);

    let event = TestEvent {data: "Hello".into()};
    dispatcher.dispatch(event).await;
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

In this article, we built a simple async event dispatcher (and listener) in Rust.
Feel free to experiment with the code and adapt it to your needs! However, avoid using unwrap in production. Instead, implement proper error handling.

All source code is available on GitLab.

Top comments (0)