DEV Community

Emmanuel Odongo
Emmanuel Odongo

Posted on • Updated on • Originally published at odongo.pl

Testing Async Generators

Event Emitters and Async Generators

I recently came across a situation where I needed to stream realtime updates from server to client. After some research, I opted not to go with the defacto solution of web sockets, and instead went with the equally well-supported approach of Server Sent Events (SSE).

SSE is a one-directional communication channel with an impressively simple browser API:

// establish connection
const eventSource = new EventSource(url);

// listen and handle events
eventSource.addEventListener(eventName, eventHandler);

// close connection
eventSource.close();
Enter fullscreen mode Exit fullscreen mode

If the connection is interrupted without explicitly being closed by the client, the browser will automatically attempt to reestablish the connection.

On the server side, I used the Fastify SSE Plugin which supports using an event emitter to handle the firing of events.

Here's a simplified version of a GET /rates endpoint used to subscribe to receive exchange rates:

import fastify from "fastify";
import { FastifySSEPlugin } from "fastify-sse-v2";
import { EventEmitter, on } from "events";

const eventEmitter = new EventEmitter();
const server = fastify();
server.register(FastifySSEPlugin);

server.get("/rates", (_request, reply) => {
  reply.sse(
    (async function* () {
      for await (const [payload] of on(eventEmitter, "ratesUpdated")) {
        yield {
          data: JSON.stringify(payload),
          event: "update",
        };
      }
    })()
  );
});
Enter fullscreen mode Exit fullscreen mode

The async generator – async function* () – is what allows us to listen to events fired by the event emitter.

It's a good idea to use an abort controller to clean up when the connection drops. Here's what the code now looks like:

import fastify from "fastify";
import { FastifySSEPlugin } from "fastify-sse-v2";
import { EventEmitter, on } from "events";

const eventEmitter = new EventEmitter();
const server = fastify();
server.register(FastifySSEPlugin);

server.get("/rates", (request, reply) => {
  const abortController = new AbortController();

  request.socket.on("close", () => {
    abortController.abort();
  });

  reply.sse(
    (async function* () {
      for await (const [payload] of on(eventEmitter, "ratesUpdated", { signal: abortController.signal })) {
        yield {
          data: JSON.stringify(payload),
          event: "update",
        };
      }
    })()
  );
});
Enter fullscreen mode Exit fullscreen mode

We can extract the async generator into a reusable and testable unit:

import { EventEmitter, on } from "events";
import { EventMessage } from "fastify-sse-v2";

interface Params {
  abortController: AbortController;
  eventEmitter: EventEmitter;
  eventName: string;
}

function makeEventListenerGenerator({
  abortController,
  eventEmitter,
  eventName,
}: Params) {
  return async function* (): AsyncGenerator<EventMessage> {
    for await (const [data] of on(
      eventEmitter,
      eventName,
      { signal: abortController.signal }
    )) {
      yield { 
        data: JSON.stringify(data), 
        event: "update",
      };
    }
  };
}
Enter fullscreen mode Exit fullscreen mode

This function can then be used in the GET /rates handler as follows:

reply.sse(
  makeEventListenerGenerator({
    abortController,
    eventEmitter,
    eventName: "ratesUpdated",
  })()
);
Enter fullscreen mode Exit fullscreen mode

Writing the Test

Before we can test our makeEventListenerGenerator function, it's important to understand that it returns an async generator function. Calling this function returns an async iterator: an object that can generate a sequence of values asynchronously.

The on function, which we imported from node's events module, is roughly equivalent to the browser's addEventListener method. We can subscribe to events that are fired by the event emitter using the on function.

Firing events is done using the event emitter's emit method.

Here's the whole flow of publishing and consuming events:

import { EventEmitter, on } from "events";

const eventEmitter = new EventEmitter();
const iterator = on(eventEmitter, "ping");

eventEmitter.emit("ping", { key: "value" });

await iterator.next(); // => { value: [{ key: "value" }], done: false }
Enter fullscreen mode Exit fullscreen mode

Armed with this knowledge, we can now unit test the makeEventListenerGenerator function:

import { EventEmitter } from "events";
import { describe, expect, test } from "vitest";

import { makeEventListenerGenerator } from "./makeEventListenerGenerator";

describe("makeEventListenerGenerator", () => {
  test("iterates over emitted events", () => {
    const abortController = new AbortController();
    const eventEmitter = new EventEmitter();
    const eventName = "ratesUpdated";
    const eventPayload = [{ from: "USD", to: "EUR", rate: 0.94 }];

    const eventIterator = makeEventListenerGenerator({
      abortController,
      eventEmitter,
      eventName,
    })();

    (async () => {
      expect(await eventIterator.next()).toHaveProperty("value", {
        data: JSON.stringify(eventPayload),
        event: "update",
      });
    })();

    eventEmitter.emit(eventName, eventPayload);
  });
});
Enter fullscreen mode Exit fullscreen mode

With that, our unit test is complete and we can give ourselves a pat on the back. But before I close off, there is one final key point that I feel needs to be covered.

Typically, unit tests take the form: arrange -> act -> assert. If we read the test we just wrote from top to bottom, it seems like we are doing arrange -> act -> assert -> act. What gives?

The last part of our test that runs is not the eventEmitter.emit(...) line, but rather our assertion: expect(...).toHaveProperty(...). This is because, as soon as the await keyword is encountered, the evaluation of the expression to its right – eventIterator.next() – will be pushed onto the Microtask Queue. The main thread continues executing to the end, and only then can the result of the evaluated expression be processed.

The 2 code snippets below should help clarify this:

console.log("top");
(() => {
  console.log("middle");
})();
console.log("bottom");

// logs "top", "middle", "bottom"
Enter fullscreen mode Exit fullscreen mode
console.log("top");
(async () => {
  console.log(await "middle");
})();
console.log("bottom");

// logs "top", "bottom", "middle"
Enter fullscreen mode Exit fullscreen mode

Great care needs to be taken, not to be caught unawares by this behaviour. The following test passes even though the assertions are clearly wrong:

import { EventEmitter } from "events";
import { describe, expect, test } from "vitest";

import { makeEventListenerGenerator } from "./makeEventListenerGenerator";

describe("makeEventListenerGenerator", () => {
  test("iterates over emitted events", () => {
    const abortController = new AbortController();
    const eventEmitter = new EventEmitter();
    const eventName = "ratesUpdated";
    const eventPayload = [{ from: "USD", to: "EUR", rate: 0.94 }];

    const eventIterator = makeEventListenerGenerator({
      abortController,
      eventEmitter,
      eventName,
    })();

    eventEmitter.emit(eventName, eventPayload);

    (async () => {
      expect(await eventIterator.next()).toHaveProperty("value", "false positive");
      expect(false).toBe(true);
    })();
  });
});
Enter fullscreen mode Exit fullscreen mode

Top comments (0)