DEV Community

Vinicius Lambardozzi
Vinicius Lambardozzi

Posted on • Edited on

Rewriting Comlink in Rust (kinda)

In this post I share my thought process for creating a Rust wrapper around postMessage and making it friendlier with a procedural macro. I then use it to build a tally counter that runs on a web worker. The resulting proof of concept is available on my GitHub.

Much of what I describe here was done for fun and learning, but there is a practical takeaway for WebAssembly + Svelte: observables serve as Svelte stores. That means if your WebAssembly code can create an observable and return it to JavaScript, {$observable} will be reactive even for changes on the WebAssembly side. Skip to "Finishing touches" for more details on that. I found out about observables being usable as stores after reading an article about RxJs with Svelte by Tim Deschryver.

The spark

WebAssembly already looked very promising as a way to satisfy my cravings for lower-level 3D graphics, but there are many new and exciting things happening: wasm-bindgen means I can write Rust, which I'm a big fan of; wgpu lets me move on from OpenGL and go back to studying modern graphics APIs; wasm-bindgen-rayon gives me threads; and finally OffscrenCanvas allows me to do all the rendering inside a web worker, making all of this look pretty similar to how you'd write a native renderer.

With SvelteKit as an application framework, we gain access to mdsvex: a way to write interactive markdown.

The destination

A diagram with a dotted vertical line down the middle. On the top left is a monitor icon labeled

The original idea was to have a web worker drawing to a canvas and a DOM-based UI that controls it. In the image above, everything to the left of the horizontal line exists within the main thread, while everything to the right is on web workers.

It all runs on the browser, but to make it easier to reason about, I call the primary web worker a "render server" while the code running on the main thread is the "client". Each gear represents a web worker spawned by wasm-bindgen-rayon, exposed to Rust as threads. The envelope represents data crossing the postMessage boundary.

The journey

Comlink simplifies communication with web workers by providing an RPC implementation built on top of postMessage. I decided to write the client and server in Rust without using Comlink. A way to communicate with the render server was the first problem to solve, and for that I needed a Rust-friendly way to send messages between the main thread and a web worker.

Build setup

The client and server code are in separate crates, and each gets built into its own WebAssembly package. This build setup increases the final bundle size, doubling the overhead of both wasm-bindgen and the communication code. Building a single package and being careful to only use client code on the main thread and server code on a worker addresses that. However, wasm-bindgen-rayon generates code that should only run on workers, so to avoid having to work around that I opted to keep them separate on this demo.

A small takeaway from this setup is that it is possible to organize code with a cargo workspace while using wasm-pack: the build command can take a path. While this means wasm-pack builds the individual crates and doesn't see the workspace, the language server does, so it's still helpful. Just remember that the members will not inherit keys set on the workspace.

Something that can communicate

Many things can send messages and many things implement EventTarget. Our use case is exchanging messages between the main thread and a web worker. That involves working with two interfaces: Worker will be used on the client to send messages and listen to incoming responses from the server, and DedicatedWorkerGlobalScope will be used on the server to send messages and listen to incoming responses from the client.

Messages are a common part of these two interfaces: they both have a postMessage, and both implement EventTarget (specifically, they can receive "message" events). The first step was to define a trait for things that can send and receive messages. It describes the ability to send messages, transfer objects, and add or remove event listeners:

pub trait RawPort {
    fn send_raw(&self, message: JsValue);
    fn transfer_raw(&self, message: JsValue, transfer: JsValue);
    fn add_raw_listener(&self, listener: &js_sys::Function);
    fn remove_raw_listener(&self, listener: &js_sys::Function);
    fn start(&self) {}
}
Enter fullscreen mode Exit fullscreen mode

The start method is needed when working with message ports.

raw indicates that these methods work with JsValues (the Rust representation of an object owned by JavaScript). We'll be using the implementations of RawPort for Worker and DedicatedWorkerGlobalScope.

Converting messages to JsValues and back shouldn't be done manually, but with RawPort it is possible to create an entity that sends Rust types:

struct Port(Box<dyn RawPort>);
impl Port {
    pub fn send<M>(&self, message: M) {
        todo!();
    }

    pub fn add_listener(&self, listener: Closure<dyn Fn(MessageEvent)>) -> Listener {
        todo!();
    }
}
Enter fullscreen mode Exit fullscreen mode

The Listener handle is helpful for any situation where you want to pass a Rust closure to JavaScript. Since JavaScript has no concept of ownership, the listener closure would be dropped when add_listener returns, and the event handler would access an invalid reference. The example on the wasm-bindgen book "solves" this by calling closure.forget(). That leaks memory, so it's worth taking a brief detour to talk about it.

Passing Rust closures to JavaScript

Since RawPort calls a listener closure whenever it receives an event, active listener closures must be valid for as long as the event listener is attached. We can achieve that by storing the closure on a container whose lifetime doesn't exceed the lifetime of the port.

pub struct Listener<'a> {
    owner: & 'a dyn RawPort,
    inner: Closure<dyn Fn(MessageEvent)>,
}
Enter fullscreen mode Exit fullscreen mode

Here we need a reference to the owner to clear the event listener. If we didn't, _marker: PhantomData<& 'a ()> would be enough to tie the lifetime.

Whoever holds this handle should be able to clear the associated event listener (in case they are no longer interested in that event), but more importantly, the event listener must be cleared if the closure is dropped:

impl<'a> Listener<'a> {
    pub fn clear(self) {
        self.remove_listener();
    }

    fn remove_listener(&self) {
        self
            .owner
            .remove_raw_listener(self.inner.as_ref().unchecked_ref());
    }
}

impl<'a> Drop for Listener<'a> {
    fn drop(&mut self) {
        self.remove_listener();
    }
}
Enter fullscreen mode Exit fullscreen mode

The public method clear consumes self because this handle should not be reused after being cleared.

Now add_listener can return a Listener handle, and the event handler will never call a closure that has been dropped.

impl Port {
    pub fn add_listener(&self, listener: Closure<dyn Fn(MessageEvent)>) -> Listener {
        self.0.add_raw_listener(listener.as_ref().unchecked_ref());

        Listener {
            owner: self.0.as_ref(),
            inner: listener,
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

What are we sending?

Implementing the send method is harder. On the first code snippet, the message type was M. That is convenient because it allows sending and receiving whatever type works best for the situation. Imagine something like this:

Request (client) Response (server)
Set { key: "seed", value: "iLikeRocks"} Ok(())
Set { key: "skyColor", value: "blue"} Err("skyColor expects a hex string")
Query { key: "memoryUsage" } Ok(128)

I left out something important though. Behind the scenes, send uses postMessage and that matters:

  • messages must be capable of crossing the Rust/JavaScript boundary;
  • the data must keep its meaning after being serialized with the structured clone algorithm;
  • some messages might require transfering objects to the destination side (for example, when sending an OffscreenCanvas)

Since post_message sends JsValues and those can cross the Rust/JavaScript boundary, it must be possible to convert messages to JsValue. An optional list of objects to transfer can be described as an Option<JsValue>. With those two requirements in mind, we can define a message as a type that can be converted to a tuple (JsValue, Option<JsValue>): the content of the message and an an optional list of objects to transfer.
There are three content types to consider here, and I have named them as follows:

  • "raw": wasm-bindgen provides Into<JsValue>, From<JsValue> implementations for these types;
  • "serde": arbitrary types that must be serialized, we let serde handle those;
  • "shareable": nested messages composed by content types that fall under one of these three categories.

Finally, the receiving side must be able to convert a JsValue back into the original type.

We can write that as a trait. Read the following code as: "Shareable is something that can be sent using postMessage while possibly transferring some objects, and then read back as a Rust type on the receiving side".

pub trait Shareable:
    TryInto<(JsValue, Option<JsValue>), Error = ShareableError>
    + TryFrom<JsValue, Error = ShareableError>
{
}
Enter fullscreen mode Exit fullscreen mode

Now, we can finally reason about what the implementation of our send method might look like. First we introduce a trait bound on M, requiring it to be Shareable. That allows converting messages into a (data, transfer) tuple. If there is nothing to transfer, send the data using send_raw. Otherwise, send the data and transfer some objects with transfer_raw.

pub fn send<M>(&self, message: M)
where
    M: Shareable,
{
    let (data, transfer) = message.try_into().unwrap();
    match transfer {
        Some(transfer) => self.transfer_raw(data, transfer),
        None => self.send_raw(data),
    }
}
Enter fullscreen mode Exit fullscreen mode

There should be error handling here (for example, if serialization fails).

That is enough. Any type M that implements Shareable can be sent between the main thread and a web worker using a Port. It is transparent too: Rust on both sides, no JsValues involved.

pub struct M {
    // --snip--
}

impl Into<(JsValue, Option<JsValue>)> for M {
    // --snip--
}
impl TryFrom<JsValue> for M {
    // --snip--
}
impl Shareable for ClientMessage {}
Enter fullscreen mode Exit fullscreen mode
// client.rs
let m = M::new();
port.send(m);
Enter fullscreen mode Exit fullscreen mode
// server.rs
port.add_listener(Closure::new(|event: MessageEvent| {
    let m: M = event.data().try_into().unwrap();
    trace!(m);
}));
Enter fullscreen mode Exit fullscreen mode

Going back in the git history, you can see that this is what I used for my first tests. Notice the TODO: // TODO #[derive(Shareable)].

#[derive(Shareable)]

This only looks convenient because I am hiding the implementations for TryInto<(JsValue, Option<JsValue>)> and TryFrom<JsValue>.

TryInto must be called before sending and does the following:

  1. make a JsArray and push a string that identifies the type;
  2. if the type is an enum, push a string that identifies the variant;
  3. if there are any fields, convert their values to JsValue and push them to the array;
  4. if there are things to transfer, make another JsArray and push the transferrable objects.

TryFrom is called after receiving and does the following:

  1. read the type identifier from the array;
  2. if the type is an enum, read the variant identifier;
  3. if there are any fields, read the JsValues and convert them back to Rust types;
  4. reconstruct the original message.

An array representing ClientMessage::Set("key", "value") looks like this: ["ClientMessage", "Set", "key", "value"]

These steps are straightforward, but implementing them for every transferrable type is repetitive, error-prone, and inconvenient. Thankfully, that is a perfect use case for derive macros.

Adding the derive attribute to a struct, enum, or union allows you to read the token stream that defines that item and create a new token stream that the compiler then appends to the source. cargo-expand lets you see this in action.

So long, and thanks for all the macros

After adding the macro, we are actually done and can derive the implementation for Shareable.

All you need to do to turn a Rust type into a valid message type is annotate it with #[derive(Shareable)] and possibly set some attributes on each item. It is possible to set the representation to be raw, serde, or shareable, which controls how to transform the value into a JsValue, and it is possible to mark items to transfer. Here is a practical example:

#[derive(Debug, Shareable)]
struct FontOptions {
    #[shareable(repr = "serde")]
    family: String,
    #[shareable(repr = "serde")]
    size: u8,
}

#[derive(Debug, Shareable)]
enum Message {
    Ping,
    RenderDebugString(
        #[shareable(repr = "serde")] String,
        #[shareable(repr = "shareable")] FontOptions
    ),
    CreateWindow {
        #[shareable(repr = "serde")]
        title: String,
        #[shareable(repr = "raw", transfer)]
        surface: OffscreenCanvas,
    },
}
Enter fullscreen mode Exit fullscreen mode

Check the macro tests to see incremental examples of the macro in action.

It derives Shareable for both enums and structs, handles arbitrary data with serde, works with nested shareable types, and possibly transfers values when posting messages. The macro generates all the required trait implementations, and the Port provides all the glue code to wrap types capable of sending and receiving messages.

Rust macros are too broad of a topic to cover in detail here, so from the entire macro derivation I chose a single highlight for this section: error handling.

The procedural macro workshop was the best resource I could find on procedural macros. Jon Gjengset has walkthroughs for some of the workshop projects.

Error handling

A screenshot of a Tweet by mcc @mcclure111. The Tweet was posted at 4:30PM - 1 Jun 2018 and has 292 retweets, 962 likes and 20 replies. It reads: In C++ we don't say >::push_back(const block &)': cannot convert argument 1 from 'std::_Vector_iterator>>' to 'block &&'" and I think that's beautiful."/>

Compilation errors caused by generated code are notoriously annoying to debug and often result in unhelpful error messages. Let's make a typo while defining a repr:

#[derive(Debug, Shareable)]
pub struct WindowProxy {
    #[shareable(repr = "rraw", transfer)]
    handle: OffscreenCanvas
}
Enter fullscreen mode Exit fullscreen mode

The macro derivation being done in Rust already does a lot to address this issue. My code for parsing the repr attribute looks similar to this:

match lit.to_string().as_ref() {
    "\"raw\"" => Repr::Raw,
    "\"serde\"" => Repr::Serde,
    "\"shareable\"" => Repr::Shareable,
    _ => todo!("invalid repr"),
}
Enter fullscreen mode Exit fullscreen mode

Since the match must be exaustive, every typo will get caught by the catch-all arm, which is where we should handle this error. When dealing with conditional compilation or generated code, the compile_error! macro can signal the compiler to fail. Just as an irrecoverable error during runtime should invoke panic!, an irrecoverable error during compilation time should invoke compile_error!.

Replacing the todo! with code that adds a compiler_error! invocation to the token stream is a good start: no wall of text and a helpful message. Still, the error only tells us something went wrong during the macro derivation.

error: invalid repr
  --> lib.rs:28:17
   |
28 | #[derive(Debug, Shareable)]
   |                 ^^^^^^^^^
   |
   = note: this error originates in the derive macro `Shareable` (in Nightly builds, run with -Z macro-backtrace for more info)
Enter fullscreen mode Exit fullscreen mode

The syn crate gives us a better way to represent errors using syn::Error: a type that can hold a Span along with a message. For our purposes, a Span is something that holds a start and an end index into a string, identifying a range of characters contained in the source code. For example, on the following snippet the Span could be used to determine where the markers (^) should start and where they should end.

28 | #[derive(Debug, Shareable)]
   |                 ^^^^^^^^^
Enter fullscreen mode Exit fullscreen mode

When parsing a literal into a repr, we have access to the span of that literal, and that allows us to create an error that along with the message also stores where in the source code our typo happened. Here is that snippet again, now much closer to the actual implementation:

match lit.to_string().as_ref() {
    "\"raw\"" => Ok(Repr::Raw),
    "\"serde\"" => Ok(Repr::Serde),
    "\"shareable\"" => Ok(Repr::Shareable),
     _ => Err(syn::Error::new(lit.span(), "invalid repr, expected literal: \"raw\", \"serde\", or \"shareable\"")),
}
Enter fullscreen mode Exit fullscreen mode

Now, the typo will cause a compiler error specifying what went wrong, which token that caused it, and how to fix it:

error: invalid repr, expected literal: "raw", "serde", or "shareable"
  --> src\lib.rs:24:24
   |
24 |     #[shareable(repr = "rraw", transfer)]
   |                        ^^^^^^

error: could not compile `example` (lib) due to previous error
Enter fullscreen mode Exit fullscreen mode
Handling macro derivation errors with Result

To render a syn::Error into a token stream that invokes compiler_error! we use syn::Error::into_compile_error. Here is how that happens, note how it gets the start and end from the Span and uses those when constructing the token stream.

Little fun fact: syn uses Delimiter::Brace when constructing the group, so this generates ::core::compile_error!{$message}. Delimiters don't matter for macro_rules! macros: for example, vec!(1, 2, 3), vec! [1, 2, 3], and vec!{1, 2, 3} are all valid.

You can delegate your macro derivation to a function that returns a Result, and then consume it with unwrap_or_else to render any syn::Error that might happen during derivation. Here's a copy-pastable snippet:

#[proc_macro_derive(Shareable, attributes(shareable))]
pub fn derive_shareable(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
    let ast = parse_macro_input!(input as DeriveInput);

    expand_derive_shareable(&ast)
        .unwrap_or_else(syn::Error::into_compile_error)
        .into()
}

fn expand_derive_shareable(ast: &syn::DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
    //--snip--
}
Enter fullscreen mode Exit fullscreen mode

Notice the proc_macro2::TokenStream as opposed to proc_macro::TokenStream. Types from proc_macro can only exist inside procedural macros, which is why syn uses proc_macro2 instead. Keep an eye out for this if you ever come across errors saying you "can't use TokenStream and must use TokenStream instead".

Making requests

With Comlink, it's possible to call a function from the webworker and await the return on the main thread. Wrapping message passing instead of implementing an RPC server was a choice, still we can use what we've developed so far to await responses.

Let's define what we want to achieve: the client should have an async function capable of taking a Shareable message, sending it to the server, awaiting a Shareable response, and returning it.

// types.rs
#[derive(Debug, Shareable)]
enum ClientRequest {
    Ping
}

#[derive(Debug, Shareable)]
enum ServerResponse {
    Pong
}
Enter fullscreen mode Exit fullscreen mode
// client.rs
async fn request(&self, message: ClientRequest) -> ServerResponse {
    todo!();
}

async fn run(&self) {
    let message = ClientRequest::Ping;
    trace!("client->server: {:?}", message);
    trace!("client<-server: {:?}", self.request(message).await);
}
Enter fullscreen mode Exit fullscreen mode

How should we implement request? Before sending a message, a listener must be ready to receive a response and to set that up we can call add_listener on the sender port. Upon receiving the response it's easy to get back the ServerResponse using try_into (whose implementation we didn't have to write because ServerResponse is Shareable). With a listener set up, the client then sends a message to the server, waits for a response, and returns it (without forgetting to clear the listener).

Before looking at the Rust code it's useful to think about how the implementation would look like using pseudo-code and promises. Using JavaScript to write that pseudo-code, it looks like this:

async function request(clientMessage) {
    return new Promise((resolve, _reject) => {
        function onResponse(event) {
        resolve(event.data);
    }

    this.port.addEventListener('message', onResponse, { once: true });
    this.port.postMessage(clientMessage);
    });
}
Enter fullscreen mode Exit fullscreen mode

Small tip: when prototyping, it's a good idea to be aware of optional parameters being ignored. The Rust compiler has that covered, but with JavaScript the no-unused-vars lint rule makes that difficult, even if you are willing to do it manually. For example, new Promise(resolve, _reject) ... causes a lint error if _reject is not used. However, the lint rule accepts an ignore pattern, and can be made to accept this as long as the unused variable name matches that pattern (^_).

js_sys has bindings for Promise and wasm-bindgen-futures provides a bridge between JavaScript promises and Rust futures, meaning this can be translated to Rust. Dealing with the reject and resolve closures would be extra mental overhead, and I had already done some tests that involved installing tokio, so for the sake of this proof of concept using channels was faster.

async fn request(&self, message: ClientMessage) -> ServerResponse {
    let (tx, mut rx) = channel::<ServerResponse>(1);

    let listener = self
        .port
        .add_listener(Closure::new(move |event: MessageEvent| {
            let payload: ServerResponse = event.data().try_into().unwrap();

            tx.try_send(payload.message)
                .expect("Request channel should not be closed.");
        }));

    self.pipe.send(message);

    let response = rx.recv().await.expect("Should post a valid response.");
    listener.clear();

    response
}
Enter fullscreen mode Exit fullscreen mode

It almost works, but if the client makes multiple concurrent requests it will have no way of associating them with one of the incoming responses. That issue can be solved by including a random request id that the server copies over to the response.

The tally counter

Comlink uses a counter as an example, and we are now ready to replicate that. The client should be able to:

  • query the current value;
  • increment or decrement the value;
  • set the value.

An enum can represent those operations:

// types.rs
#[derive(Debug, Shareable)]
pub enum ClientMessage {
    Query,
    Inc,
    Dec,
    Set(#[shareable(repr = "serde")] u8),
}
Enter fullscreen mode Exit fullscreen mode

The client still needs some glue code that has to be written manually (this impl block could also be generated automatically using derive macros).

// client.rs
impl Client {
    pub async fn query(&self) -> u8 {
        self.request(ClientMessage::Query).await
    }
    pub async fn inc(&self) -> u8 {
        self.request(ClientMessage::Inc).await
    }
    pub async fn dec(&self) -> u8 {
        self.request(ClientMessage::Dec).await
    }
    pub async fn set(&self, value: u8) -> u8 {
        self.request(ClientMessage::Set(value)).await
    }
}
Enter fullscreen mode Exit fullscreen mode

The server then must store a value and either return or mutate it depending on the request. I am leaving out the details on how the server receives, processes, and responds to messages (where I also used tokio channels) so we can focus on responding to a request:

let response = match request{
    ClientMessage::Query => {
        ServerMessage::Query(self.counter)
    }
    ClientMessage::Inc => {
        self.counter += 1;
        ServerMessage::Query(self.counter)
    }
    ClientMessage::Dec => {
        self.counter -= 1;
        ServerMessage::Query(self.counter)
    }
    ClientMessage::Set(value) => {
        self.counter = value;
        ServerMessage.Query(self.counter)
    }
};
Enter fullscreen mode Exit fullscreen mode

After all this work, we can finally write some Svelte and create a working demo. After initializing wasm-bindgen on the main thread, instantiate the web worker and give its handle to the client, which allows us to perform requests. The worker needs to initialize wasm-bindgen and the rayon thread pool. For that reason, creating the worker is done through a utility function returning a promise that only resolves after the worker initialization finishes.

<script lang="ts">
    //--snip--
    let client: Client;

    onMount(async () => {
        await initWasm();

        // spawn returns a promise that resolves after the worker
        // has finished initializing wasm-bindgen and rayon.
        const server = await spawn(Worker);
        client = new Client(server);
    });
</script>

<button on:click={client.dec}>-</button>
<button on:click={async () => console.log(await client.query())}>GET</button>
<button on:click={client.inc}>+</button>
Enter fullscreen mode Exit fullscreen mode

Addition and subtraction without blocking the main thread! More importantly, no need to think about Rust when writing JavaScript, and no need to think about JavaScript when writing Rust.

Finishing touches

It would be nice to show the value on the DOM instead of the console, and we could already do that by manually updating a DOM node whenever dec, inc, or set are called. Using any modern web framework in the past couple of years will tell you that there is a better way of doing that. Svelte automatically updates the DOM when a component's state changes, and ideally we want to be able to write <span>{counter}</span> and call it a day.

Before this experiment, I did some tests with wasm-bindgen running on the main thread of a React application. The same issue showed itself, and the solution wasn't trivial: React can't see state changes inside a wasm-bindgen object because the reference to that object stays the same. The first solution that came to mind was to give Rust a function that could run whenever a state change happened and update React state from that function.

function Counter() {
    const [wasmState, setWasmState] = useState({ counter: 0 });

    function onChange(wasmStateDiff) {
        setWasmState((previousState) => ({
            ...previousState,
            ...wasmStateDiff,
        }));
    }

    const _wasmCounter = useRef(new WasmCounter(0, onChange));

    return <span>{wasmState.counter}</span>
}
Enter fullscreen mode Exit fullscreen mode

You can imagine WasmCounter calling onChange({ counter: newValue }) whenever it changes the counter value.

I didn't put a lot of thought or research time into this at the time, but it worked so now with Svelte I started by considering a similar approach. Things looked much worse though: now the data lives on a web worker and is always at least one postMessage away.

That was when I came across a seemingly unrelated article about using RxJs with Svelte. It is a good look into how Svelte reactivity works and very much worth a read, but here is the main takeaway: an RxJs Observable serves as a Svelte store (and by extension, so does any observable).

A way to describe observables

For completeness, I want to briefly explain how observables are used here. Check the RxJs docs on observables for a proper description.

For our purposes, an observable is something that can notify you about changes in a value. To tell the observable you want to be notified you subscribe to it, and to subscribe you provide a callback that will receive those notifications. Subscribing also gives you an unsubscribe function for when you want to stop observing the value. Some code:

let counter = new Observable(0);
setInterval(() => counter.set(counter.get() + 1), 1000);

const unsubscribe = counter.subscribe(
    (newValue) => console.log(`counter changed to: ${newValue}`)
);

//--snip--

// Stop observing the value.
unsubscribe();
Enter fullscreen mode Exit fullscreen mode

Server events

We still cannot use this knowledge unless the server has a way to notify the client of value changes. In practice, all we need is for the server to post a message with some sort of event descriptor (WebSockets are a good analogy for this).

#[derive(Debug, Shareable)]
pub enum ServerEvent {
    Count(#[shareable(repr = "serde")] u8)
}
Enter fullscreen mode Exit fullscreen mode

Next, there should be a way to subscribe to these events and since there might be many types of events, subscribing needs an ID specifying which event we are subscribing to. I used the enum name and variant name separated by ::, since that matches how you would describe the Rust type. For example, to subscribe to the counter change events, you would use ServerEvent :: Count as the id.

The spaces before and after :: are to match Rust's stringify!: a macro that turns a list of tokens into a & 'static str.

// client.rs
    //--snip--
    pub fn observe(&mut self, id: String) -> Observable {
        todo!();
    }
Enter fullscreen mode Exit fullscreen mode

Like we did with the requests, before talking about the Rust implementation let's see how we'd implement an observable for these server events using pseudo code written in JavaScript.

The observable needs to store the ID of the events it cares about and a port that receives events. When subscribe is called, it gets a function that will be used to notify the subscriber of changes. That function has to be stored and called whenever the value changes. Finally, it needs to return an unsubscribe function. It looks like this:

class Observable {
    constructor(id, port) {
        this.id = id;
        this.port = port;
        this.listeners = [];

        this.port.addEventListener('message', (event) => {
            if (event.data.id === this.id) {
                listeners.forEach(onChange => onChange(event.data.newValue))
            }
        })
    }

    subscribe(onChange) {
        this.listeners.push(onChange)

        function unsubscribe() {
            this.listeners = this.listeners.filter(l => l !== onChange)
        }
        return unsubscribe;
    }
}
Enter fullscreen mode Exit fullscreen mode

Notice how onChange will not be called when we first subscribe, meaning the subscriber won't know the current value unless it changes. That can be fixed by either querying the current value whenever a new subscriber is added or keeping a local copy of the value on the observable. I went with the first solution, which is the simplest.

Here is the Rust API:

impl Observable {
    pub fn subscribe(&mut self, on_change: js_sys::Function) -> JsValue {
    //--snip--
}
Enter fullscreen mode Exit fullscreen mode

Helpful tip: to return a wasm-bindgen closure to JavaScript and transfer the memory management responsibility from Rust to the JavaScript garbage collector, call into_js_value.

The hacky part

I didn't talk about the Rust implementation of an observable, and the reason for that is simple: it's hacky.

By this point, I had already achieved my initial goal but only had a boring console demo to show for it. Observables are something I added later to put together a more interesting demo. That was when I came across one of the most practical ideas of this entire project (reactivity with Svelte + WebAssembly). So instead of skipping it I will quickly go over why this part of the implementation is hacky.

An observable should be able to hold a reference to a port and receive events through there. That would require the observable to have a lifetime parameter, but structs shared with JavaScript cannot have lifetime or type parameters (a wasm-bindgen limitation). Cloning the port would solve this, but the port holds a Box<dyn RawPort> which is not Clone. Another solution is to transmute away the lifetime, and I wanted to avoid doing that.

To get around this, I decided to create a MessageChannel that receives the server events and then forwards then to a BroadcastChannel, which can easily be accessed from anywhere using just a string ID. This was also an opportunity to implement RawPort for a new type.

In the end, it was unavoidable to transmute away a lifetime so I could keep a handle to this forwarding listener. By "transmuting away a lifetime" I mean changing the lifetime to 'static. Very rarely will that be the correct thing to do, and if done incorrectly it will leak memory. So don't do it (unless you're writing a demo).

Putting it all together

That was quite the journey for a tally counter, but all the pieces are in place now.

There is an issue with Vite that prevents me from building this, so sadly I cannot give you a link with the working counter. I encourage you to clone the repo and play around with it, building and running this is surprisingly easy.

Let's start by describing what we want to achieve:

  • the current value of the counter should be shown on the DOM;
  • there should be buttons to increment and decrement the counter;
  • you should be able to subscribe and unsubscribe from a logger, which prints every counter change to the console.

And then we can put it all together: the +/- buttons just need to call the corresponding client functions. To show the value on the DOM, we create an observable from the ServerEvent::Count events. And finally, we subscribe to that observable with a function that logs changes to the console, storing the unsubscribe function for later.

<script lang="ts">
    //--snip--    
    let client: Client;
    let count: Observable;
    let unsubscribe: Function | undefined;

    const logger = (value: number) => console.log(`observed: ${value}`);

    onMount(async () => {
        await initWasm();

        // spawn returns a promise that resolves after the worker
        // has finished initializing wasm-bindgen and rayon.
        const server = await spawn(Worker);
        client = new Client(server);

        count = client.observe('ServerEvent :: Count');
        unsubscribe = count.subscribe(logger)
    });
</script>

<button on:click={client.dec}>-</button>
<span>{$count}</span>
<button on:click={client.inc}>+</button>

<button
    on:click={() => {
        unsubscribe?.();
        unsubscribe = undefined;
    }}
>detach logger</button>
<button
    on:click={() => {
        unsubscribe = count.subscribe(logger);
    }}
>attach logger</button>
Enter fullscreen mode Exit fullscreen mode

A recording of a website with the finished tally counter. It shows the current value, a button with a minus sign to the left, and a button with a plus sign to the right. Under the  value are buttons called

Let's take a moment to appreciate this line: <span>{$count}</span>. count lives on our server: Rust code running on a web worker. And yet whenever it changes, this span updates. Clicking a button will use a Rust client running on the main thread to post a message, which tells a Rust server running on a web worker to mutate some state. After the mutation, the server will generate a change event, post a message to the client, and update the DOM. I hope you find that as amazing (and useful) as I do.

Top comments (0)