DEV Community

Cover image for A Typesafe Event Bus With RxJS
Dean Radcliffe
Dean Radcliffe

Posted on

A Typesafe Event Bus With RxJS

Intro

In the This Dot Labs post titled
How To Implement An Event Bus in Typescript, Luis Aviles showed us a typesafe implementation of an Event Bus. He also did a great job elaborating on how and when you can use an Event Bus - in short to simplify the passing of events around an application.

For an application I was developing, my team used a similar concept, but with an API adapted to our use case. I thought that with Luis' article as a reference, I might detail how an implementation might go if the event bus were based on an RxJS Subject. While the bundle size is slightly larger than Luis' Vanilla JS version (still only 6Kb), RxJS gives us a level of interoperability and less boilerplate. And the code below only gives a glimpse of what an RxJS-based event bus library is capable of doing.

So let's dive in!

Instance management

An event bus will frequently be a singleton object within an app.
To manage a singleton instance, we could write typical OO singleton code.

export class EventBus {
  private static instance?: EventBus = undefined;

  private constructor() {}

  public static getInstance(): EventBus {
    if (this.instance === undefined) {
      this.instance = new EventBus();
    }

    return this.instance;
  }
}
Enter fullscreen mode Exit fullscreen mode

Then the usage of it everywhere would look like:

EventBus.getInstance().some-method()
Enter fullscreen mode Exit fullscreen mode

But that's a bit verbose. Let's instead use the ES2015 module system to export a single object from a bus.ts file

// bus.ts
export const bus = EventBus.getInstance()
Enter fullscreen mode Exit fullscreen mode

and then callers could simply do:

import { bus } from './bus'
bus.some_method()
Enter fullscreen mode Exit fullscreen mode

No matter from which file you import bus, you get the same reference, so we are effectively using the module system to enforce a single reference without complicating callers.

Now, what should callers have as the interface to our EventBus?

To listen or not to listen?

Event Buses implement the pub-sub (publish-subscribe) metaphor, so there are two sides to them. First we'll create a fresh API for for the side that responds to events. In Luis' article, the method name was register. In the DOM, you listen for events with addEventListener. Let's shorten that to just listen. Let's assume strings are on the bus for now, and assume that TypeScript knows this. When the bus is in scope we have:

bus.listen(
  item => item.startsWith('hello-'),
  (item) => {
     const who = item.replace('hello-','');
     console.log('Hello '+ who);
  }
);
Enter fullscreen mode Exit fullscreen mode

The first argument is a function that returns true or false - called a Predicate. A predicate function is the most flexible way to match an item, and allows for any synchronous function which returns a Boolean. Here we use the nice string method startsWith, from ES2015. Never again do we have to test with indexOf, whew! Now let's see how to put an event on the bus.

Trigger Me!

While dispatch is a popular term from Redux (and the DOM) for sending an action, it's definition implies an action is directed at something or someone. However, the essence of an event bus is that the sender of an event doesn't know who or what is listening. So let's name our method for sending an undirected event trigger.

The argument to trigger should be the type of item the bus allows. Assuming the bus instance was already typed to string, then the code to trigger is simply:

bus.trigger('hello-dave');
Enter fullscreen mode Exit fullscreen mode

Notice you don't have to provide a type argument at the moment of triggering, because the bus knows it. Now- what about when a listener needs to be unregistered?

Will You Stop Listening, Already?

Just as the DOM has removeEventListener, an Event Bus should be able to stop listening - for cleanup purposes at least. In Luis' article, he demonstrated a style of un-listening like this:

const registry = EventBus.getInstance().register(matcher, fn);
registry.unregister();
Enter fullscreen mode Exit fullscreen mode

It's an improvement over removeEventListener because the return value lets you cancel without the original arguments. I like how it resembles getting a Subscription object from an RxJS Observable, on which you can call unsubscribe. So in our example, let's actually return a Subscription:

const listener:Subscription = bus.listen(matcher, fn);
listener.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

This gives us some things for free such as a .closed property on listener, and the ability to shut down several listeners at once by creating an aggregate Subscription via Subscription#add. Since a Subscription object can represent any process that can be shut down, it's actually a great fit for a listener that can be shut down to stop listening.

Implementation

Although a production-ready library that implements this pattern is at omnibus-rxjs, the code that implements only what's shown in this article is simply this:

class Bus<T> {
  private events: Subject<T>;
  constructor() {
    this.events = new Subject();
  }
  listen(matcher: Predicate<T>, handler: (item: T) => void): Subscription {
    return this.events
      .asObservable()
      .pipe(filter(matcher), tap(handler))
      .subscribe();
  }
  trigger(item: T) {
    this.events.next(item);
  }
}
export const bus = new Bus<string>();
// now bus.listen / bus.trigger
Enter fullscreen mode Exit fullscreen mode

See this CodeSandbox for more.

Next Steps

To truly be useful, an event bus needs:

  • True error isolation (between listeners, between triggerers and listeners)
  • Async listeners which return Promises or Observables
  • The ability to retrigger from listeners' results
  • The ability to queue, or otherwise deal with listeners that overlap
  • To be able to tie listeners to component lifetimes as in React
  • A way to validate what events are allowed on the bus at runtime

These will be elaborated on in a future post just about Omnibus.

I hope you enjoyed seeing this use of Typescript to create a powerful, simple Event Bus, with a friendly and interoperable API.

Discussion (0)