DEV Community

Red John
Red John

Posted on

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Introduction

In modern web development, we often deal with events, whether it's handling incoming WebSocket messages, server-sent events (SSE), or data streams from services like Redis Pub/Sub. While Node.js provides event-driven capabilities, it lacks an out-of-the-box way to asynchronously iterate over events using for await...of loops.

In this post, I'll walk you through a simple yet powerful way to create an asynchronous event iterator using TypeScript and AsyncGenerator. This approach is designed to allow you to consume events from any kind of event emitter in a clean and predictable way, with full control over cancellation and cleanup logic.

The Use Case: Redis Pub/Sub

In one of my recent projects, I needed to listen to Redis Pub/Sub channels and dispatch server-sent events (SSE) asynchronously to connected clients. The challenge was handling incoming events without overwhelming the system while allowing the consumer to cancel the event stream at any time.

The solution? An event iterator that converts any event emitter (such as Redis Pub/Sub) into an asynchronous iterable. This allows us to process events in a controlled manner and gracefully handle cancellation when necessary.

Let’s dive into the implementation.

The Code

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}
Enter fullscreen mode Exit fullscreen mode

How It Works

This function accepts a subscriber function that you can hook into any event emitter or pub/sub system. The subscriber provides two essential methods:

  1. emit: Allows the subscriber to push new events into the iterator.
  2. cancel: Provides a way to signal that the iteration should stop.

The function returns an AsyncGenerator<T>, allowing you to iterate over events using a for await...of loop.

Breaking Down the Code

  1. Context Object:
    The Context<T> type provides an interface to emit new events or cancel the subscription. The subscriber uses this context to control the flow of events.

  2. Event Queue:
    The events: T[] array serves as a buffer to store emitted events. The generator will process these events one by one. If there are no events in the queue, it will wait for the next event to be emitted.

  3. Emit Logic:
    The emit function adds new events to the queue and resolves any pending promise (i.e., if the generator is waiting for new events).

  4. Cancellation:
    If the cancel function is called, it sets a flag (cancelled = true) to signal that the loop should exit. Any remaining events in the queue will still be processed before the generator completes.

  5. Cleanup:
    After cancellation, the generator will invoke the unsubscribe function (if provided) to perform any necessary cleanup. This is especially important for unsubscribing from external systems like Redis or cleaning up resources.

Example: Listening to Redis Pub/Sub

Let’s see how we can use this event iterator to listen to Redis Pub/Sub and asynchronously iterate over the incoming messages.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();
Enter fullscreen mode Exit fullscreen mode

In this example, we use createEventIterator to subscribe to a Redis Pub/Sub channel and asynchronously iterate over the messages. Each time a new message arrives, it is emitted into the generator, where we can process it in real-time. If a specific message (e.g., "STOP") is received, we break the loop and unsubscribe from Redis.

Example: Using EventEmitter

Here's how you can use createEventIterator with Node.js's EventEmitter:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();
Enter fullscreen mode Exit fullscreen mode

In this example:

  • We use EventEmitter to emit events, which are captured by createEventIterator.
  • The iterator listens for the 'data' event and processes it asynchronously.
  • Similar to the Redis example, we can stop the iteration when a specific event ('STOP') is received.

Benefits of This Approach

  • Asynchronous Control: By leveraging the AsyncGenerator, we can handle events asynchronously, process them at our own pace, and pause processing when needed.

  • Cancellation: The ability to cancel the event stream at any time makes this approach flexible, especially in real-world scenarios where connections may need to be closed gracefully.

  • General-Purpose: This iterator can be used for any event emitter or Pub/Sub system, making it versatile for different applications.

Conclusion

Event-driven architectures are a cornerstone of many modern web applications, but they can become tricky to manage when we need to control the flow of events asynchronously. With the power of AsyncGenerator in TypeScript, you can build elegant solutions like this event iterator, making your event-handling code cleaner and easier to maintain.

I hope this post helps you get started with async iterators for your own event emitters. If you have any questions or thoughts, feel free to share them in the comments!

Top comments (5)

Collapse
 
wouterleistra profile image
Wouter Leistra • Edited

I tried this, and it abstracts things nicely away with the Redis PubSub client. I do however notice that for every single Redis message the for await (const message of ...) yields twice... Any idea why that happens?

Collapse
 
redjohnsh profile image
Red John

That's weird. Let me try to replicate and come back to you.

Collapse
 
wouterleistra profile image
Wouter Leistra • Edited

Sorry it was my bad, I forgot I had two browser windows open subscribing to the events so obviously it was emitted twice. One thing I was wondering is the subscribe with the event handlers... We are using the AsyncIterator with Redis on TRPC subscription and it would subscribe more than I think it should. We moved the subscribe event outside the subscription procedure (outside the for await ... of ... to subscribe only once. Not sure about the implications but it felt unnecessary to keep subscribing to the same channel over and over. We also had issues getting it to work reliably with the two awaits on the return callback. Love to hear your thoughts behind the need of those. Thanks so much for your post. It was truly helpful to get the Redis PubSub hooked up without the need for a separete EventEmitter.

Thread Thread
 
redjohnsh profile image
Red John • Edited

Maybe I can share how I’m using it on my end. I have a Deno application set up with Hono, and I’m utilizing Deno’s --parallel flag to run it across all available cores. This app scrapes several sources every minute or so and updates a local SQLite database. If the local value changes, I emit an event using a custom pub/sub class, which I covered in this post: Building a Distributed Pub/Sub Class with Cross-Process and Cross-Tab Capabilities.

I also have an endpoint, /realtime, that emits server-sent events (SSE). Whenever a client connects, a new for await (const message of pubsub) loop is created—essentially one subscription per "socket" connection. While I used to rely on Redis for this, I switched to a custom in-memory solution since everything runs on a single VPS node.

Here’s a simplified version of my real-time endpoint:

app.get('/realtime', (c) => {
  return streamSSE(async (stream) => {
    for await (const message of pubsub) {
      // Breaking here will remove the subscription from pubsub
      if (c.req.raw.signal.aborted) break;
      stream.writeSSE({ event: message.type, data: JSON.stringify(message.payload) });
    }
  });
});
Enter fullscreen mode Exit fullscreen mode

To clarify, the custom pub/sub class broadcasts events across workers or processes since it uses the BroadcastChannel internally. So, if an event is emitted on worker 1 and a subscription exists on worker 3, the message will be received seamlessly across workers.

Let me know if you find my use case helpful! If not, feel free to reach out again, and we can try to come up with a better solution. Cheers!

Collapse
 
fullstacktanmay profile image
Tanmay Patil

Informative, will try this out, thanks!