Introduction
In this article, we'll build a simple event dispatcher in Rust. Our goal is to create a system that allows events to be handled asynchronously by multiple listeners, ensuring flexibility and thread safety. By the end, you'll have a reusable event dispatcher that can be integrated into larger Rust applications.
This article is for beginner and intermediate Rust developers who are familiar with async programming, traits, and basic concurrency concepts like Arc and Mutex.
Here's what we want in our event dispatcher:
- An extensible event model
- The event dispatcher should handle events asynchronously
- Event listeners should handle events asynchronously
- It should be easy to use
- Event handling should be idempotent
What we definitely don't want:
- Priority for event listeners
- Listeners blocking or canceling other listeners
Nice-to-have features (but out of scope):
- Error handling
- Logging/tracing
Motivation
Async: Most applications are asynchronous, and we want to create a tool that is compatible with them. This means our event listeners must also be async.
Idempotent: We want to be able to retry events if something fails. Idempotent handling ensures that retrying an event wonβt cause unintended side effects. For example, an event that updates a database should be designed to avoid duplicate updates. In this case, the author of the event listener should ensure that events are handled idempotently.
Extensible: Consumers of the event dispatcher should be able to add new events and listeners without modifying existing code.
No priority: Since events are idempotent, priority is unnecessary.
No blocking or cancellation: Blocking or canceling listeners can dramatically increase debugging complexity. For example, if you have 10 listeners and the 5th one cancels all others, it's hard to understand why the 6th listener didn't run.
Of course, it would be nice to add error handling to the event dispatcher and listener. However, that's beyond the scope of this article.
Dependencies
First, we need to use tokio because it is the most popular async runtime for Rust.
Since we decided to make our event system extensible, we will use traits. In this case, async-trait helps us to use async functions in traits.
Cargo.toml
[dependencies]
async-trait = "0.1.*"
tokio = { version = "1.*", features = ["full"] }
main.rs
use std::any::Any;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
Event
Let's start with the simplest part of our event system - Trait Event. All you need to do is implement this trait for your event.
pub trait Event: Sync + Any {
fn name(&self) -> &'static str;
fn as_any(&self) -> &dyn Any;
}
Sync: Ensures that the event can be safely shared across threads, which is essential for async operations.
Trait Any and fn as_any - enables dynamic type checking and downcasting, allowing listeners to handle specific event types.
fn name: basically, it can be used for logging and transmitting between applications.
Listener
Implementing an event listener is also straightforward. You just need to implement the handle method, which will be called when an event is dispatched.
The handle method takes a trait object &dyn Event as a parameter. To work with the event, you'll need to use as_any to cast it to its concrete type.
#[async_trait]
pub trait EventListener: Sync {
async fn handle(&self, event: &dyn Event);
}
Event Dispatcher
The dispatcher collects Vec of listeners. The register_listener function adds a new listener to the list.
Function dispatch is the simplest way to dispatch an event to all listeners. It just iterates over all listeners.
Arc and Mutex are used to ensure thread safety.
#[derive(Default)]
pub struct EventDispatcher {
listeners: Mutex<Vec<Arc<dyn EventListener>>>,
}
impl EventDispatcher {
pub fn new() -> Self {
EventDispatcher {
listeners: Mutex::new(Vec::new()),
}
}
pub fn register_listener(
&self,
listener: Arc<dyn EventListener>,
){
self.listeners.lock().unwrap().push(listener);
}
pub async fn dispatch(&self, event: impl Event) {
let event = Arc::new(event);
let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners {
listener.handle(&*event).await;
}
}
}
Examples of events and listeners
Now all abstractions are defined. The next step is to create some events and listeners.
#[async_trait]
#[derive(Debug, Clone)]
pub struct TestEvent {
pub data: String,
}
impl Event for TestEvent {
fn name(&self) -> &'static str {
"TestEvent"
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub struct TestListener {}
#[async_trait]
impl EventListener for TestListener {
async fn handle(&self, event: &dyn Event) {
if let Some(test_event) = event.as_any().downcast_ref::<TestEvent>() {
println!("Handling {} with data `{}`", test_event.name(), test_event.data);
}
}
}
The data field of TestEvent is an example of payload. In the real world, you can replace it with an ID, idempotency key, entity, and/or any other data type.
Take a look at the handle function implementation. It includes a check for the event type.
downcast_ref allows handling multiple events in one listener.
Main
Let's check how it works!
We need to:
- Create a new dispatcher
- Register a listener
- Dispatch an event
#[tokio::main]
async fn main() {
let dispatcher = EventDispatcher::new();
let listener = Arc::new(TestListener {});
dispatcher.register_listener(listener);
let event = TestEvent {data: "Hello".into()};
dispatcher.dispatch(event).await;
}
Conclusion
In this article, we built a simple async event dispatcher (and listener) in Rust.
Feel free to experiment with the code and adapt it to your needs! However, avoid using unwrap in production. Instead, implement proper error handling.
All source code is available on GitLab.
Top comments (0)