In modern development, there’s often a need to synchronize messages across multiple execution contexts, be it across processes in Node.js, clustered workers, or even across browser tabs. The PubSub
class we’ll build in this post can achieve that goal by using the BroadcastChannel
API.
This PubSub
class is designed to function:
- Across processes when using Bun or Node.js's Cluster module.
- Across multiple worker threads using Deno's
--parallel
feature. - In browser environments, facilitating communication between multiple tabs.
By leveraging the BroadcastChannel
API, our PubSub
system will enable communication between different contexts seamlessly. We'll also implement asynchronous iteration, so you can consume messages from this system using for await...of
syntax in JavaScript.
The PubSub
Class Design
Our PubSub
class will:
- Broadcast messages: Share events with any subscriber, across different processes or browser tabs.
-
Support asynchronous iteration: Using the
createEventIterator
helper from the previous post to enable iterating over events. - Offer simple subscribe/unsubscribe mechanics: Listeners can subscribe to receive messages and later unsubscribe if no longer interested.
- Work seamlessly across different environments: Whether it's browser tabs or Node.js clusters, it provides a uniform API.
The Code
Here’s how you can implement the PubSub
class:
type Listener<T> = (data: T) => void;
export class PubSub<T> implements AsyncIterable<T> {
#listeners = new Set<Listener<T>>();
#tx: BroadcastChannel;
#rx: BroadcastChannel;
[Symbol.asyncIterator](): AsyncIterator<T> {
return this.iter();
}
constructor(name: string) {
this.#tx = new BroadcastChannel(name);
this.#rx = new BroadcastChannel(name);
// Listen for messages on the receive channel
this.#rx.onmessage = (ev) => {
this.#broadcast(ev.data);
};
}
#broadcast(data: T) {
for (const listener of this.#listeners) {
listener(data);
}
}
close(): void {
this.#listeners.clear();
this.#tx.close();
this.#rx.close();
}
broadcast(data: T): void {
this.#tx.postMessage(data);
}
iter(): AsyncIterator<T> {
return createEventIterator<T>(({ emit }) => this.subscribe(emit));
}
once(listener: Listener<T>): () => void {
const wrapper: Listener<T> = (data) => {
this.unsubscribe(wrapper);
listener(data);
};
return this.subscribe(wrapper);
}
subscribe(listener: Listener<T>): () => void {
this.#listeners.add(listener);
return () => this.unsubscribe(listener);
}
unsubscribe(listener: Listener<T>): void {
this.#listeners.delete(listener);
}
}
Key Components
Broadcast Channels: The class uses two
BroadcastChannel
instances, one for sending (#tx
) and one for receiving (#rx
) messages. Each instance is tied to a specific channel name.Asynchronous Iteration: The class implements the
AsyncIterable
interface via the[Symbol.asyncIterator]()
method, which returns an async iterator. This is where thecreateEventIterator
function, discussed in the previous post, comes into play. It allows you to iterate over incoming events asynchronously.Subscriptions: You can subscribe multiple listeners to the same broadcast. Each listener will be triggered whenever a message is broadcasted on the channel. You can also register one-time listeners with the
once()
method.Cross-context communication: Thanks to the
BroadcastChannel
API, this works across different execution contexts. For example, in browsers, it works across different tabs, while in Node.js or Deno, it works across different processes or threads.
Usage in Different Environments
1. Node.js / Bun Cluster with Typed Messages
Let's improve the example by using a discriminated union type to represent different message types. This adds a layer of type-safety and clarity to how messages are handled.
type Message =
| { type: 'user-connected', userId: number }
| { type: 'user-disconnected' }
| { type: 'message', content: string, userId: number };
const pubsub = new PubSub<Message>('my-channel');
// Broadcast messages across processes
if (process.isMaster) {
setInterval(() => {
pubsub.broadcast({ type: 'user-connected', userId: 1 });
pubsub.broadcast({ type: 'message', content: 'Hello, world!', userId: 1 });
}, 1000);
} else {
// Worker process: Listen for messages
(async () => {
for await (const message of pubsub) {
switch (message.type) {
case 'user-connected':
console.log(`User connected: ${message.userId}`);
break;
case 'user-disconnected':
console.log('User disconnected');
break;
case 'message':
console.log(`User ${message.userId} says: ${message.content}`);
break;
}
}
})();
}
This ensures that every message is type-safe and you can handle different message types using TypeScript’s type system.
2. Deno Workers with --parallel
and Typed Events
type Message =
| { type: 'job-started', jobId: string }
| { type: 'job-completed', jobId: string };
const pubsub = new PubSub<Message>('deno-channel');
if (Deno.args[0] === 'worker') {
// Worker process: Listen for messages
(async () => {
for await (const message of pubsub) {
switch (message.type) {
case 'job-started':
console.log(`Job ${message.jobId} started`);
break;
case 'job-completed':
console.log(`Job ${message.jobId} completed`);
break;
}
}
})();
} else {
// Main process: Broadcast messages to workers
setInterval(() => {
pubsub.broadcast({ type: 'job-started', jobId: '12345' });
setTimeout(() => pubsub.broadcast({ type: 'job-completed', jobId: '12345' }), 3000);
}, 5000);
}
3. Cross-Tab Communication in the Browser
Here’s an example where you can send and receive typed chat messages across multiple browser tabs.
type Message =
| { type: 'chat-message', content: string, userId: number }
| { type: 'user-joined', userId: number }
| { type: 'user-left', userId: number };
const pubsub = new PubSub<Message>('chat-channel');
// Send a message when a button is clicked
document.getElementById('sendBtn')?.addEventListener('click', () => {
const userId = 42;
pubsub.broadcast({ type: 'chat-message', content: 'Hello from this tab!', userId });
});
// Listen for messages in other tabs
(async () => {
for await (const message of pubsub) {
switch (message.type) {
case 'chat-message':
console.log(`User ${message.userId} says: ${message.content}`);
break;
case 'user-joined':
console.log(`User ${message.userId} joined the chat`);
break;
case 'user-left':
console.log(`User ${message.userId} left the chat`);
break;
}
}
})();
Now, whenever a user sends a message from one tab, all other tabs will receive and display it, and you can also notify when users join or leave the chat.
Why This is Useful
This PubSub
system is especially helpful when building distributed systems or multi-window browser apps where you need a seamless way to share data without relying on external servers. Here’s why you might use it:
- Cross-process: Share data between Node.js processes, Deno workers, or Bun worker threads.
- Cross-tab in browsers: Synchronize state or events between browser tabs (e.g., messaging apps, shared state).
- Type-safe messaging: The use of TypeScript’s type system allows for safe and predictable message handling, ensuring each message is handled correctly.
Conclusion
The PubSub
class provides a simple, yet powerful solution for broadcasting events across multiple contexts, whether that's processes in Node.js, worker threads in Deno, or browser tabs. By utilizing the BroadcastChannel
API and TypeScript’s discriminated union types, it can deliver cross-process or cross-tab event streaming with full type-safety, making it a versatile tool in both server-side and client-side environments.
With this in place, you can build more sophisticated, distributed systems or simply synchronize events across tabs or processes with minimal effort while enjoying all the benefits of TypeScript’s type system.
Top comments (1)
Very interesting, you should consider adding it as an NPM library later on if its already finished.