DEV Community

Cover image for Async Streams in WebAssembly with WasmRS
Jarrod Overson
Jarrod Overson

Posted on

Async Streams in WebAssembly with WasmRS

TL;DR: WasmRS is an implementation of RSocket for WebAssembly giving you reactive, async streams in and out of WASM modules.
GitHub | Protocol details | Rust source | Go source

WebAssembly has immense potential but it is hardly user-friendly. It's making strides but what we have to work with today is a slog. Baseline WebAssembly only works with integers and floating point values. Your typical "Hello world" is a cumbersome mess of reading raw memory and dealing with bytes. Once you figure out how to transfer complex data structures and make calls in both directions, you are left high and dry if your application needs to do anything asynchronous. When you eventually rig together an async WebAssembly solution, you're stuck when dealing with big data without streams. After streams, you'll eventually need to solve for back pressure. After back press... well you get the point.

And that's the story of how we got to WasmRS.

What's WasmRS?

WasmRS is an implementation of the RSocket protocol in WebAssembly with some reactive stream concepts thrown in for usability. With WasmRS you treat WebAssembly modules like tiny services you open bidirectional sockets into.

WasmRS uses RSocket framing and aligns terminology where possible to keep it familiar. RSocket defines four request types the protocol can handle:

  • Fire & Forget: a request that is sent where the response is ignored.
  • RequestResponse: an asynchronous request with a single payload returning a single payload.
  • RequestStream: a request with a single payload that returns a stream of payloads.
  • RequestChannel: a request that takes a stream that returns a stream.

If you are interested in the protocol details, check out the wasmRS protocol documentation here.

How do I use it?

Using wasmRS directly is a bit like using WebAssembly directly. There's a lot of boilerplate and many esoteric details to get right before you get what you want. If you're the sort that likes those details, check out the baseline Rust implementation in the repository. If you're like me, the details are great but getting started rapidly is more important.

Luckily, we've got apex templates to get us going.

Apexlang is a project template and code generation tool suite that automates much of the boilerplate and getting started headache for projects.

We can use the apex CLI and the project templates in nanobus/iota to whip up a new project with one line.

apex new git@github.com:nanobus/iota.git -p templates/rust example
Enter fullscreen mode Exit fullscreen mode

The apex new command is like git clone plus templating. It makes kickstarting projects easy and keeps being useful with code generation you'll see below.

Hello, World!

Tip: This section bootstraps you into building with wasmRS. To get straight to streams, skip to the next section.

The example/ directory we just created is filled with a handful of new files. Most are rust-specific but take special note of the apex.axdl file. That's Apexlang and is what the apex CLI uses to keep generating code and documentation during the life of your project.

Edit the apex.axdl to look like this:

namespace "example"

interface MyApi @service {
  greet(target: string): string
}
Enter fullscreen mode Exit fullscreen mode

Above, we define a service called MyApi that has one action, greet, that takes an argument and returns a string. The target argument is who to greet.

Now run apex generate to automagically generate a bunch of new files.

Note: The project template includes a justfile. The just tool is a task runner modeled after the good parts of make. If you have just installed, you can run apex generate with the task just codegen

$ apex generate
INFO Writing file ./src/actions/my_api/greet.rs (mode:644)
INFO Writing file ./src/lib.rs (mode:644)
INFO Writing file ./src/error.rs (mode:644)
INFO Writing file ./src/actions/mod.rs (mode:644)
INFO Formatting file ./src/error.rs
INFO Formatting file ./src/lib.rs
INFO Formatting file ./src/actions/mod.rs
INFO Formatting file ./src/actions/my_api/greet.rs
Enter fullscreen mode Exit fullscreen mode

These new files include wasmRS boilerplate, scaffolding, and samples to get you started quickly.

The file ./src/actions/my_api/greet.rs contains a stub for our greet action.

use crate::actions::my_api_service::greet::*;

pub(crate) async fn task(input: Inputs) -> Result<Outputs, crate::Error> {
    todo!("Add implementation");
}
Enter fullscreen mode Exit fullscreen mode

Turn our greeter into an appropriate 'Hello World!" by returning a string like below:

use crate::actions::my_api_service::greet::*;

pub(crate) async fn task(input: Inputs) -> Result<Outputs, crate::Error> {
    Ok(format!("Hello, {}!", input.target))
}
Enter fullscreen mode Exit fullscreen mode

And build!

cargo build --target=wasm32-unknown-unknown
Enter fullscreen mode Exit fullscreen mode

You'll find your new .wasm file at target/wasm32-unknown-unknown/release/example.wasm.

The included justfile has a build command that runs the cargo step above and puts the built .wasm files in a build/ directory. It also runs the codegen task before building to ensure files are up-to-date.

$ just build
$ ls build/
example.wasm

We'll need a suitable runner to see our WebAssembly run on the command line. For that, we can use NanoBus or the wasmrs-request binary.

Running our WebAssembly with wasmrs-request

To use the wasmrs-request tool, first install it with the command:

cargo install wasmrs-request
Enter fullscreen mode Exit fullscreen mode

Then run:

wasmrs-request ./build/example.wasm example.MyApi greet '{"target":"World"}'
Enter fullscreen mode Exit fullscreen mode

Output:

Hello, World!
Enter fullscreen mode Exit fullscreen mode

Running our WebAssembly with NanoBus

Info: NanoBus is a framework for wiring components like wasmRS modules together into applications. If you want to turn a module like this into a web service or CLI app, check it out!

To use NanoBus we need a configuration that points to our .wasm file. Make an iota.yaml that looks like this:

id: example
version: 0.0.1
main: target/wasm32-unknown-unknown/release/example.wasm
# Or, if you're using `just build`:
# main: build/example.wasm
Enter fullscreen mode Exit fullscreen mode

Run nanobus invoke with a piped payload to witness our Hello World executed in all its glory.

echo '{"target":"World"}' | nanobus invoke iota.yaml example.MyApi::greet
Enter fullscreen mode Exit fullscreen mode

Output:

"Hello, World!"
Enter fullscreen mode Exit fullscreen mode

Streams!

Now that you're familiar with building wasmRS WebAssembly and running it with NanoBus or wasmrs-request, let's get to streaming.

Your system's command line is a great place to experiment. Every CLI process's input and output is a stream.

Let's add a reverse method to our API that takes a stream of string and outputs a stream of string. This will let us pipe a file to our action and see the contents reversed, ready to pipe to another CLI process.

namespace "example"

interface MyApi @service {
  greet(target: string): string
  reverse(input: stream string): stream string
}
Enter fullscreen mode Exit fullscreen mode

Run apex generate (or just codegen) to generate the new code:

$ apex generate
INFO Writing file ./src/actions/my_api/reverse.rs (mode:644)
INFO Writing file ./src/actions/mod.rs (mode:644)
INFO Formatting file ./src/actions/my_api/reverse.rs
INFO Formatting file ./src/actions/mod.rs
Enter fullscreen mode Exit fullscreen mode

Notice how apex intelligently rewrites only some files and doesn't clobber your existing action. Generated files that shouldn't be edited typically have a header or warning calling it out. You're safe to edit others.

Our new stub looks a little different than the simple Request/Response stub above:

use crate::actions::my_api_service::reverse::*;

pub(crate) async fn task(
    mut input: FluxReceiver<Inputs, PayloadError>,
    outputs: Flux<Outputs, PayloadError>,
) -> Result<Flux<Outputs, PayloadError>, crate::Error> {
    todo!("Add implementation");
}
Enter fullscreen mode Exit fullscreen mode

WasmRS uses terminology from RSocket and reactive-streams to stay consistent. A Flux is like a rust Stream mixed with a channel. You can push to it, pass it around, pipe one to another, and await values. A FluxReceiver is a Flux that you can only receive values from. It's like the receiving end of a channel implemented as a Stream.

To work with our streams, we await values from our input stream and push to our output stream. This example reverses each line of the input and sends it to the output.

use crate::actions::my_api_service::reverse::*;

pub(crate) async fn task(
  mut input: FluxReceiver<Inputs, PayloadError>,
  outputs: Flux<Outputs, PayloadError>,
) -> Result<Flux<Outputs, PayloadError>, crate::Error> {
  while let Some(line) = input.next().await {
    match line {
      Ok(line) => {
        outputs.send(line.chars().rev().collect()).unwrap();
      }
      Err(e) => outputs.error(PayloadError::application_error(e.to_string())).unwrap(),
    }
  }
  outputs.complete();
  Ok(outputs)
}
Enter fullscreen mode Exit fullscreen mode

To build it, we can use the justfile again:

cargo build --release --target=wasm32-unknown-unknown
# or `just build`
Enter fullscreen mode Exit fullscreen mode

To run it with wasmrs-request, we use the same path and action arguments as above with the addition of the --channel flag and piped input.

cat Cargo.toml |  wasmrs-request --channel ./build/example.wasm example.MyApi reverse
Enter fullscreen mode Exit fullscreen mode

Now anything you pipe to our reverse action will come out reversed!

]egakcap[
"elpmaxe" = eman
"0.1.0" = noisrev
"1202" = noitide

]bil[
]"bilydc"[ = epyt-etarc

]esaeler.eliforp[
"slobmys" = pirts
1 = stinu-negedoc
eslaf = gubed
eurt = otl
"z" = level-tpo
"troba" = cinap

]seicnedneped[
"2.0" = tseug-srmsaw
"0.1" = rorresiht
} ]"evired"[ = serutaef ,eslaf = serutaef-tluafed ,"1" = noisrev { = edres
"1.0" = tiart-cnysa
"0.82.0" = ajnijinim

]seicnedneped-ved[
Enter fullscreen mode Exit fullscreen mode

Streaming data is critical for large payloads. Dealing with an enormous file or an asynchronous stream of text would be difficult to impossible without streaming concepts. WasmRS lets you take WebAssembly to new levels.

Where to go next?

WasmRS is the protocol we're using for iota dependencies. Iotas are libraries, microservices, and WebAssembly modules that use a common protocol so they can be swapped out, integrated, composed, and tested without changing your application.

WasmRS is independent, generic and un-opinionated. You can use wasmRS in your own projects, use the iota code generators and run iotas yourself, or use wasmRS in completely new ways. The iota implementation is our opinionated implementation. Take what you want and leave what you don't.

More links

Attribution

Photo by Jason Pischke on Unsplash

Top comments (0)