DEV Community

Cover image for The Observer Pattern in TypeScript: When You Don't Need RxJS
Gabriel Anhaia
Gabriel Anhaia

Posted on

The Observer Pattern in TypeScript: When You Don't Need RxJS


You pick up a ticket. The product spec is one line: "search field should debounce keystrokes for 300ms, then fetch results, and cancel the old request if the user keeps typing." Twenty minutes later, the PR diff is +1 import and a pipe(debounceTime(300), switchMap(fetch)). The new dependency is RxJS.

It is the right tool for some shapes of problem, and it is the wrong tool for the shape of problem most engineers reach for it to solve. The team will carry RxJS for the next six months because nobody wants to be the person who removes it after one component used switchMap. Meanwhile the actual reactive surface area of the app is a search box, a websocket toast notifier, and a "user clicked logout, tear down the side panel" event.

That is a six-line problem with a multi-month dependency strapped to it. The skill is reading which of the two you have in front of you.

The spectrum runs from native EventEmitter for the trivial case, through a typed event bus for the common case and DOM EventTarget in the browser, to the three places where RxJS or Effect Stream genuinely earn their weight.

The Native EventEmitter, Typed Properly

Node ships an event emitter in core (node:events). Bun and Deno expose the same API. The default ergonomics are loose because the API predates TypeScript by a decade, but the modern type signature uses generics over a string-keyed event map and the result is solid.

import { EventEmitter } from "node:events";

type CartEvents = {
  added: [{ sku: string; qty: number }];
  removed: [{ sku: string }];
  cleared: [];
};

class CartBus extends EventEmitter<CartEvents> {}

const bus = new CartBus();

bus.on("added", (payload) => {
  // payload is { sku: string; qty: number }
  console.log(`+${payload.qty} ${payload.sku}`);
});

bus.emit("added", { sku: "abc", qty: 2 });
bus.emit("added", { sku: "abc" }); // type error: missing qty
bus.on("typo", () => {});           // type error: not in CartEvents
Enter fullscreen mode Exit fullscreen mode

The generic parameter on EventEmitter is a record where each key is an event name and each value is the tuple of arguments emitted with it. The runtime API was always this shape; the generic landed in @types/node (DefinitelyTyped) mid-2024, so any modern Node, Bun, or Deno project on a recent @types/node picks it up.

A few things this gives you for free:

  • emit("added", ...) checks the payload tuple at the call site.
  • on("added", listener) infers the listener parameter type without a manual annotation.
  • off, once, and removeAllListeners use the same map.
  • EventEmitter already handles "10 listeners is suspicious" warnings, async iteration via events.on(emitter, "event"), and one-shot via events.once(emitter, "event") returning a promise.

For the search debounce problem, if you want to stay native:

import { EventEmitter } from "node:events";

type SearchEvents = { query: [string]; results: [unknown[]] };

const bus = new EventEmitter<SearchEvents>();
let timer: ReturnType<typeof setTimeout> | undefined;
let inflight: AbortController | undefined;

bus.on("query", (q) => {
  clearTimeout(timer);
  timer = setTimeout(async () => {
    inflight?.abort();
    inflight = new AbortController();
    try {
      const res = await fetch(`/search?q=${q}`, {
        signal: inflight.signal,
      });
      bus.emit("results", await res.json());
    } catch (e) {
      if ((e as Error).name !== "AbortError") throw e;
    }
  }, 300);
});
Enter fullscreen mode Exit fullscreen mode

That is the whole thing: debounce, abort-on-new-input, typed events, zero dependencies. It is the shape RxJS gets imported to solve. RxJS does it more elegantly. It does not do it cheaper.

A Typed Event Bus With Discriminated-Union Events

EventEmitter is fine when events feel like channels. Sometimes you want a single channel where the payload itself names the event. That is the discriminated-union event bus, and it composes better with a domain model that already uses unions.

type DomainEvent =
  | { type: "user.signed-up"; userId: string; email: string }
  | { type: "user.deleted"; userId: string }
  | { type: "order.placed"; orderId: string; total: number }
  | { type: "order.refunded"; orderId: string; cents: number };

type Listener<E extends DomainEvent> = (event: E) => void | Promise<void>;

class Bus<E extends { type: string }> {
  private listeners = new Map<E["type"], Set<Listener<any>>>();

  on<T extends E["type"]>(type: T, fn: Listener<Extract<E, { type: T }>>) {
    let set = this.listeners.get(type);
    if (!set) {
      set = new Set();
      this.listeners.set(type, set);
    }
    set.add(fn as Listener<any>);
    return () => set!.delete(fn as Listener<any>);
  }

  async emit(event: E): Promise<void> {
    const set = this.listeners.get(event.type as E["type"]);
    if (!set) return;
    await Promise.all([...set].map((fn) => fn(event)));
  }
}

const bus = new Bus<DomainEvent>();

const off = bus.on("order.placed", (e) => {
  // e is { type: "order.placed"; orderId: string; total: number }
  console.log(`order ${e.orderId} for ${e.total}`);
});

await bus.emit({
  type: "order.placed",
  orderId: "o-1",
  total: 4200,
});

off(); // unsubscribe
Enter fullscreen mode Exit fullscreen mode

What this buys you over the native emitter:

  • Events are first-class values you can log, persist, and replay. The bus is just one consumer.
  • Extract<E, { type: T }> narrows the listener to exactly the variant that matches the type literal. Adding a new variant to DomainEvent automatically makes a new event name available to on with the right payload.
  • emit takes the whole event object, so the call site reads like the domain ("an order was placed with these details") rather than like an RPC ("call the orderPlaced function with these args").
  • on returns the unsubscribe function. No paired off calls, no leaked listeners on component unmount.

The trade is one extra Map allocation per event type and a slightly less idiomatic-Node feel. For a domain layer that already speaks in events, the trade is worth it.

If you want the same shape with assertNever on the consumer side, switch on event.type with an assertNever(event) default branch, and tsc will fail any branch that forgets a variant.

The Browser: EventTarget, AbortSignal, and No Imports

In the browser you do not need node:events at all. The platform ships EventTarget, the same primitive window, document, every DOM node, AbortController, and WebSocket are built on. Using it directly skips a polyfill and matches the way the rest of the app already listens to events.

type Detail = {
  "search:query": { q: string };
  "search:results": { items: unknown[] };
};

class TypedTarget<M extends Record<string, unknown>> extends EventTarget {
  emit<K extends keyof M & string>(type: K, detail: M[K]) {
    this.dispatchEvent(new CustomEvent(type, { detail }));
  }

  on<K extends keyof M & string>(
    type: K,
    fn: (detail: M[K]) => void,
    opts?: AddEventListenerOptions,
  ) {
    const handler = (e: Event) => fn((e as CustomEvent<M[K]>).detail);
    this.addEventListener(type, handler, opts);
    return () => this.removeEventListener(type, handler);
  }
}

const search = new TypedTarget<Detail>();

const ctrl = new AbortController();
search.on("search:results", (d) => render(d.items), { signal: ctrl.signal });

// later, on unmount:
ctrl.abort();
Enter fullscreen mode Exit fullscreen mode

Two things matter here:

  1. AddEventListenerOptions accepts an AbortSignal. The platform handles teardown for you. One controller can cancel every listener attached to it, which solves the React-strict-mode and component-unmount cleanup problem without bookkeeping. RxJS calls this "subscription management" and ships an entire Subscription class for it. The browser ships it as four lines of constructor.
  2. CustomEvent<T> is a typed envelope. The wrapper above turns the dispatch + listener pair into something tsc can check, while keeping the underlying event machinery DOM-native. That matters in three places: DevTools shows the events, breakpoints work, and dispatchEvent integrates with whatever else is listening to the target.

For the search-debounce problem in the browser, combine EventTarget with AbortController and you have not imported a single line of JS. The tab is faster and the bundle is smaller.

The Three Cases Where RxJS Earns Its Keep

RxJS is not a bad library. It is a precisely targeted library that gets aimed at the wrong target. Three shapes of problem make the dependency worth carrying:

Complex stream composition with declarative operators. When you have three sources that need to merge, where each emits at a different rate, and you need to throttle one, debounce another, combine them with the latest of a third, and replay the last value to late subscribers, you are writing RxJS whether you import it or not. The difference is whether you write it correctly.

import {
  combineLatest,
  fromEvent,
  debounceTime,
  switchMap,
  shareReplay,
} from "rxjs";

const query$ = fromEvent<InputEvent>(input, "input").pipe(
  debounceTime(300),
);
const filter$ = fromEvent<Event>(filterSelect, "change");
const sort$ = fromEvent<Event>(sortSelect, "change");

const results$ = combineLatest([query$, filter$, sort$]).pipe(
  switchMap(([q, f, s]) => fetchResults(q, f, s)),
  shareReplay(1),
);
Enter fullscreen mode Exit fullscreen mode

combineLatest + switchMap + shareReplay is an idiom RxJS expresses in three lines. Hand-rolling it on top of EventEmitter is a hundred lines of state-machine bookkeeping that will have a bug. If your dashboard has six filter dropdowns and a search box and a date range and a tab selector and they all feed one results table, RxJS is the right tool.

Backpressure. When a producer can outpace a consumer (a websocket firing 100 messages a second into a UI that can render 60 frames), you need explicit policy for "drop", "buffer", "throttle", or "sample". RxJS encodes those policies as operators (auditTime, sampleTime, bufferCount, throttleTime, exhaustMap). Without an operator vocabulary, every backpressure decision becomes a custom timer-and-flag dance per call site, and the second one diverges from the first.

Cancellation graphs. The "switchMap" semantic is one node: start a new request, abandon the in-flight one. The graph version is harder. A parent stream owns three child streams, and unsubscribing the parent must cleanly tear down all three plus their in-flight HTTP requests plus the websocket they opened. RxJS subscriptions form a tree, and calling .unsubscribe() on the root cascades. Building this on top of AbortController is doable; building it correctly the first time is not.

If your problem fits one of those three, the bundle is paying for itself. If not, the bundle is rent. Tree-shaking can keep the actual cost modest when your build is set up for it, but a CommonJS pipeline or a sideEffects: true declaration in package.json defeats it silently, so check the analyzer output rather than trust the marketing number.

The Three Cases Where Effect Stream Wins Over RxJS

If you are reaching for RxJS because of the three reasons above, Effect is worth a look before you commit. effect/Stream covers the same ground as RxJS observables and adds three things RxJS structurally cannot give you.

Typed errors. RxJS observables are Observable<T>. The error channel is unknown. If a stream can fail with three different error shapes, you find out at the subscribe call site by inspecting the value. Effect's Stream<A, E, R> parametrises the error type. A Stream<User, NetworkError | ParseError, never> tells you, at the type level, exactly which failures you must handle, and Stream.catchTag narrows by tag the way a switch narrows over a discriminated union. The compiler enforces that you handled every failure mode.

import { Stream, Effect } from "effect";

class NetworkError {
  readonly _tag = "NetworkError";
  constructor(readonly cause: unknown) {}
}
class ParseError {
  readonly _tag = "ParseError";
  constructor(readonly raw: string) {}
}

const users: Stream.Stream<User, NetworkError | ParseError, never> =
  Stream.fromAsyncIterable(fetchUsersIter(), (e) => new NetworkError(e));

const safe = users.pipe(
  Stream.catchTag("ParseError", (e) => Stream.empty),
  // NetworkError is still on the channel; the type system knows.
);
Enter fullscreen mode Exit fullscreen mode

Structured concurrency. Effect runs every fiber in a parent-child tree where cancelling the parent guarantees children are cancelled and finalisers run. Stream.runDrain and Stream.run integrate into that tree. RxJS subscriptions are a tree too, but the tree is by convention. Operator implementations have to remember to register their teardown. Effect's tree is enforced by the runtime. That difference matters once you have streams that own resources (DB connections, file handles, websocket sessions) that absolutely must be released.

Tracing, contexts, and dependency injection. Streams in Effect carry an R parameter for required services. A Stream<Order, NetworkError, Database | Logger> cannot be run until you provide a Database and a Logger. The runtime ships a tracer (OpenTelemetry-compatible) that automatically threads spans through stream operators, so a refactor from synchronous code to a stream pipeline keeps your traces intact. RxJS has no equivalent. Tracing observables means manually wrapping every operator.

The cost is the learning curve. Effect is a substantial library with its own mental model: fibers, layers, schedules, refinements. If your team is already working in functional-TypeScript style (using Result types, avoiding exceptions for control flow, thinking in terms of effects), Effect Stream lands in familiar territory. If they are not, RxJS is the easier sell.

Reading the Shape of the Problem

The question is not "is RxJS good." It is "is the problem in front of me actually a stream-composition problem, or is it a single-channel notification with debounce." Most reactive needs in a typical web or backend app are the second. They look like the first because RxJS examples on the internet are uniformly "hello world is a combineLatest of three input streams." That marketing is not the median use case.

The skill is recognising the single-channel case when it is in front of you and not reaching for the multi-source library out of habit. The dev who installed RxJS for the search box did not write a worse program. They wrote a heavier program. On the day a junior asks "why is RxJS in this app, what does it do," the answer ought to be more interesting than "we use it for one debounced fetch."

The next reactive feature you build, write the EventEmitter version first. If the second feature also fits EventEmitter, keep going. The day a feature genuinely needs combineLatest over three sources with shareReplay and a typed error channel, you will know. That is the day to import the library.


If this was useful

The Observer pattern's TypeScript story is mostly about types: how EventEmitter accepts a record of event-to-payload tuples, how discriminated-union events compose with Extract<E, { type: T }>, how Stream<A, E, R> parametrises errors at the type level. The TypeScript Type System is where that machinery gets unpacked from the ground up: generics over event maps, conditional types, branded types, infer. If you found the typed-bus pattern useful and want to write your own typed message frameworks, the book is the path.

If you are coming from Kotlin or Java where Flow and Flux cover this ground, Kotlin and Java to TypeScript maps the patterns across. If you are coming from PHP 8+ where reactive code is rarer, PHP to TypeScript covers the async paradigm shift in detail. If you are shipping production TypeScript and need to decide between EventTarget-only and a streams library across a monorepo, TypeScript in Production covers those build and library-authoring concerns.

The five-book set:

  • TypeScript Essentials — From Working Developer to Confident TS, Across Node, Bun, Deno, and the Browser — entry point: amazon.com/dp/B0GZB7QRW3
  • The TypeScript Type System — From Generics to DSL-Level Types — deep dive: amazon.com/dp/B0GZB86QYW
  • Kotlin and Java to TypeScript — A Bridge for JVM Developers — bridge for JVM devs: amazon.com/dp/B0GZB2333H
  • PHP to TypeScript — A Bridge for Modern PHP 8+ Developers — bridge for PHP devs: amazon.com/dp/B0GZBD5HMF
  • TypeScript in Production — Tooling, Build, and Library Authoring Across Runtimes — production layer: amazon.com/dp/B0GZB7F471

All five books ship in ebook, paperback, and hardcover.

The TypeScript Library — the 5-book collection

Top comments (0)