DEV Community

Red John
Red John

Posted on

Building a Distributed PubSub Class with Cross-Process and Cross-Tab Capabilities

In modern development, there’s often a need to synchronize messages across multiple execution contexts, be it across processes in Node.js, clustered workers, or even across browser tabs. The PubSub class we’ll build in this post can achieve that goal by using the BroadcastChannel API.

This PubSub class is designed to function:

  • Across processes when using Bun or Node.js's Cluster module.
  • Across multiple worker threads using Deno's --parallel feature.
  • In browser environments, facilitating communication between multiple tabs.

By leveraging the BroadcastChannel API, our PubSub system will enable communication between different contexts seamlessly. We'll also implement asynchronous iteration, so you can consume messages from this system using for await...of syntax in JavaScript.

The PubSub Class Design

Our PubSub class will:

  1. Broadcast messages: Share events with any subscriber, across different processes or browser tabs.
  2. Support asynchronous iteration: Using the createEventIterator helper from the previous post to enable iterating over events.
  3. Offer simple subscribe/unsubscribe mechanics: Listeners can subscribe to receive messages and later unsubscribe if no longer interested.
  4. Work seamlessly across different environments: Whether it's browser tabs or Node.js clusters, it provides a uniform API.

The Code

Here’s how you can implement the PubSub class:

type Listener<T> = (data: T) => void;

export class PubSub<T> implements AsyncIterable<T> {
    #listeners = new Set<Listener<T>>();
    #tx: BroadcastChannel;
    #rx: BroadcastChannel;

    [Symbol.asyncIterator](): AsyncIterator<T> {
        return this.iter();
    }

    constructor(name: string) {
        this.#tx = new BroadcastChannel(name);
        this.#rx = new BroadcastChannel(name);

        // Listen for messages on the receive channel
        this.#rx.onmessage = (ev) => {
            this.#broadcast(ev.data);
        };
    }

    #broadcast(data: T) {
        for (const listener of this.#listeners) {
            listener(data);
        }
    }

    close(): void {
        this.#listeners.clear();
        this.#tx.close();
        this.#rx.close();
    }

    broadcast(data: T): void {
        this.#tx.postMessage(data);
    }

    iter(): AsyncIterator<T> {
        return createEventIterator<T>(({ emit }) => this.subscribe(emit));
    }

    once(listener: Listener<T>): () => void {
        const wrapper: Listener<T> = (data) => {
            this.unsubscribe(wrapper);
            listener(data);
        };
        return this.subscribe(wrapper);
    }

    subscribe(listener: Listener<T>): () => void {
        this.#listeners.add(listener);
        return () => this.unsubscribe(listener);
    }

    unsubscribe(listener: Listener<T>): void {
        this.#listeners.delete(listener);
    }
}
Enter fullscreen mode Exit fullscreen mode

Key Components

  1. Broadcast Channels: The class uses two BroadcastChannel instances, one for sending (#tx) and one for receiving (#rx) messages. Each instance is tied to a specific channel name.

  2. Asynchronous Iteration: The class implements the AsyncIterable interface via the [Symbol.asyncIterator]() method, which returns an async iterator. This is where the createEventIterator function, discussed in the previous post, comes into play. It allows you to iterate over incoming events asynchronously.

  3. Subscriptions: You can subscribe multiple listeners to the same broadcast. Each listener will be triggered whenever a message is broadcasted on the channel. You can also register one-time listeners with the once() method.

  4. Cross-context communication: Thanks to the BroadcastChannel API, this works across different execution contexts. For example, in browsers, it works across different tabs, while in Node.js or Deno, it works across different processes or threads.

Usage in Different Environments

1. Node.js / Bun Cluster with Typed Messages

Let's improve the example by using a discriminated union type to represent different message types. This adds a layer of type-safety and clarity to how messages are handled.

type Message = 
  | { type: 'user-connected', userId: number } 
  | { type: 'user-disconnected' } 
  | { type: 'message', content: string, userId: number };

const pubsub = new PubSub<Message>('my-channel');

// Broadcast messages across processes
if (process.isMaster) {
    setInterval(() => {
        pubsub.broadcast({ type: 'user-connected', userId: 1 });
        pubsub.broadcast({ type: 'message', content: 'Hello, world!', userId: 1 });
    }, 1000);
} else {
    // Worker process: Listen for messages
    (async () => {
        for await (const message of pubsub) {
            switch (message.type) {
                case 'user-connected':
                    console.log(`User connected: ${message.userId}`);
                    break;
                case 'user-disconnected':
                    console.log('User disconnected');
                    break;
                case 'message':
                    console.log(`User ${message.userId} says: ${message.content}`);
                    break;
            }
        }
    })();
}
Enter fullscreen mode Exit fullscreen mode

This ensures that every message is type-safe and you can handle different message types using TypeScript’s type system.

2. Deno Workers with --parallel and Typed Events

type Message = 
  | { type: 'job-started', jobId: string } 
  | { type: 'job-completed', jobId: string };

const pubsub = new PubSub<Message>('deno-channel');

if (Deno.args[0] === 'worker') {
    // Worker process: Listen for messages
    (async () => {
        for await (const message of pubsub) {
            switch (message.type) {
                case 'job-started':
                    console.log(`Job ${message.jobId} started`);
                    break;
                case 'job-completed':
                    console.log(`Job ${message.jobId} completed`);
                    break;
            }
        }
    })();
} else {
    // Main process: Broadcast messages to workers
    setInterval(() => {
        pubsub.broadcast({ type: 'job-started', jobId: '12345' });
        setTimeout(() => pubsub.broadcast({ type: 'job-completed', jobId: '12345' }), 3000);
    }, 5000);
}
Enter fullscreen mode Exit fullscreen mode

3. Cross-Tab Communication in the Browser

Here’s an example where you can send and receive typed chat messages across multiple browser tabs.

type Message = 
  | { type: 'chat-message', content: string, userId: number } 
  | { type: 'user-joined', userId: number } 
  | { type: 'user-left', userId: number };

const pubsub = new PubSub<Message>('chat-channel');

// Send a message when a button is clicked
document.getElementById('sendBtn')?.addEventListener('click', () => {
    const userId = 42;
    pubsub.broadcast({ type: 'chat-message', content: 'Hello from this tab!', userId });
});

// Listen for messages in other tabs
(async () => {
    for await (const message of pubsub) {
        switch (message.type) {
            case 'chat-message':
                console.log(`User ${message.userId} says: ${message.content}`);
                break;
            case 'user-joined':
                console.log(`User ${message.userId} joined the chat`);
                break;
            case 'user-left':
                console.log(`User ${message.userId} left the chat`);
                break;
        }
    }
})();
Enter fullscreen mode Exit fullscreen mode

Now, whenever a user sends a message from one tab, all other tabs will receive and display it, and you can also notify when users join or leave the chat.

Why This is Useful

This PubSub system is especially helpful when building distributed systems or multi-window browser apps where you need a seamless way to share data without relying on external servers. Here’s why you might use it:

  • Cross-process: Share data between Node.js processes, Deno workers, or Bun worker threads.
  • Cross-tab in browsers: Synchronize state or events between browser tabs (e.g., messaging apps, shared state).
  • Type-safe messaging: The use of TypeScript’s type system allows for safe and predictable message handling, ensuring each message is handled correctly.

Conclusion

The PubSub class provides a simple, yet powerful solution for broadcasting events across multiple contexts, whether that's processes in Node.js, worker threads in Deno, or browser tabs. By utilizing the BroadcastChannel API and TypeScript’s discriminated union types, it can deliver cross-process or cross-tab event streaming with full type-safety, making it a versatile tool in both server-side and client-side environments.

With this in place, you can build more sophisticated, distributed systems or simply synchronize events across tabs or processes with minimal effort while enjoying all the benefits of TypeScript’s type system.

Top comments (1)

Collapse
 
adrberia profile image
Adrberia

Very interesting, you should consider adding it as an NPM library later on if its already finished.