DEV Community

Cover image for **JavaScript Reactive Programming: 8 Essential Techniques That Transform Async Code Management**
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

**JavaScript Reactive Programming: 8 Essential Techniques That Transform Async Code Management**

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Let me tell you about a shift in thinking that changed how I write JavaScript. For years, I wrestled with asynchronous code—callbacks nesting inside callbacks, promises that needed careful chaining, and events that fired from everywhere. It felt like herding cats. Then I started working with observables, and things began to make sense. Reactive programming isn't just another library; it's a way of structuring your entire approach to data that changes over time.

Think of an observable as a pipe. At one end, a source of data—maybe user clicks, incoming WebSocket messages, or data from an API—pushes values into it. At the other end, you, the developer, can listen. But the magic happens in the pipe itself. You can tell the pipe to filter, reshape, combine, or delay the data flowing through it before it ever reaches your hand. You declare what you want to happen to the stream of events, not how to painstakingly manage each one. This is the core of reactive programming.

Let's build this understanding from the ground up. We'll explore eight practical techniques that let you use this powerful pattern in your own projects.

1. Creating the Stream: From Anything to an Observable

The first step is turning your data sources into observable streams. Anything that produces values over time can be wrapped. A button click, a promise from a fetch() call, a simple array, or a timer—all can become observable sequences.

Here’s a basic blueprint for an Observable class. Its constructor takes a function, often called the "subscriber function." This function defines the stream: it's where you put the logic to produce values and, crucially, to clean up when you're done listening.

class Observable {
  constructor(subscribe) {
    this._subscribe = subscribe;
  }

  subscribe(observer) {
    const subscription = {
      closed: false,
      unsubscribe: () => {
        subscription.closed = true;
        if (this._unsubscribe) {
          this._unsubscribe();
        }
      }
    };

    const safeObserver = {
      next: (value) => {
        if (!subscription.closed && observer.next) {
          observer.next(value);
        }
      },
      error: (err) => {
        if (!subscription.closed && observer.error) {
          observer.error(err);
        }
        subscription.unsubscribe();
      },
      complete: () => {
        if (!subscription.closed && observer.complete) {
          observer.complete();
        }
        subscription.unsubscribe();
      }
    };

    this._unsubscribe = this._subscribe(safeObserver);
    return subscription;
  }
}
Enter fullscreen mode Exit fullscreen mode

Now, let's create streams from common sources. The fromEvent method turns a DOM event into an endless stream of event objects. Notice how the function we pass returns a cleanup function—removeEventListener. This ensures we stop listening to the DOM when the subscription ends, preventing memory leaks.

static fromEvent(element, eventName) {
  return new Observable((observer) => {
    const handler = (event) => observer.next(event);
    element.addEventListener(eventName, handler);

    // The cleanup function
    return () => {
      element.removeEventListener(eventName, handler);
    };
  });
}

// Using it
const button = document.getElementById('myButton');
const clicks$ = Observable.fromEvent(button, 'click');

const subscription = clicks$.subscribe({
  next: (event) => console.log('Clicked!', event.clientX)
});

// Later, to stop listening
subscription.unsubscribe();
Enter fullscreen mode Exit fullscreen mode

We can also wrap promises and intervals. The fromPromise stream will emit the resolved value and then complete, or emit an error. The interval method creates a stream that emits an incrementing number every specified period until you unsubscribe.

static fromPromise(promise) {
  return new Observable((observer) => {
    promise
      .then(value => {
        observer.next(value);
        observer.complete(); // Promise resolves only once
      })
      .catch(error => observer.error(error));
    return () => { /* Option to cancel promise */ };
  });
}

static interval(period) {
  return new Observable((observer) => {
    let counter = 0;
    const intervalId = setInterval(() => {
      observer.next(counter++);
    }, period);
    return () => clearInterval(intervalId); // Cleanup
  });
}

// A stream that ticks every second
const seconds$ = Observable.interval(1000);
Enter fullscreen mode Exit fullscreen mode

2. Shaping the Data Flow with Operators

Creating a stream is just the beginning. Operators are the tools that let you mold, filter, and control the flow of data. They are pure functions that take one observable as input and return a new, transformed observable as output. This lets you chain them together declaratively.

The map operator is like Array.prototype.map for streams. It transforms every value that passes through. The filter operator only lets values through that meet a condition.

Observable.prototype.map = function(project) {
  // 'this' is the source observable
  return new Observable((observer) => {
    const subscription = this.subscribe({
      next: (value) => observer.next(project(value)), // Transform here
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
    return () => subscription.unsubscribe();
  });
};

Observable.prototype.filter = function(predicate) {
  return new Observable((observer) => {
    const subscription = this.subscribe({
      next: (value) => {
        if (predicate(value)) { // Condition check
          observer.next(value);
        }
      },
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
    return () => subscription.unsubscribe();
  });
};

// Usage: Get only the even numbers from a ticking clock
const seconds$ = Observable.interval(1000);
const evenSeconds$ = seconds$
  .map(count => count * 2) // Transform count
  .filter(num => num % 2 === 0); // Filter result

evenSeconds$.subscribe({ next: val => console.log(`Even second: ${val}`) });
Enter fullscreen mode Exit fullscreen mode

Time-based operators are incredibly useful for handling user input. debounceTime is like a "wait for silence" operator. Imagine a search box: you don't want to call the API on every keystroke. debounceTime waits for a pause in emissions before passing along the latest value.

Observable.prototype.debounceTime = function(delay) {
  return new Observable((observer) => {
    let timeoutId;
    let lastValue;
    let hasValue = false;

    const subscription = this.subscribe({
      next: (value) => {
        lastValue = value;
        hasValue = true;
        clearTimeout(timeoutId); // Reset the timer on new value
        timeoutId = setTimeout(() => {
          if (hasValue) {
            observer.next(lastValue); // Emit only after the pause
            hasValue = false;
          }
        }, delay);
      },
      error: (err) => observer.error(err),
      complete: () => {
        clearTimeout(timeoutId);
        if (hasValue) {
          observer.next(lastValue); // Don't forget the last buffered value
        }
        observer.complete();
      }
    });
    return () => {
      clearTimeout(timeoutId);
      subscription.unsubscribe();
    };
  });
};
Enter fullscreen mode Exit fullscreen mode

3. Combining Multiple Streams

Real applications rarely have just one stream. You often need to combine them. The merge operator combines multiple streams into one, emitting values from whichever source produces them. It's like listening to several radio stations at once on a single speaker.

Observable.prototype.merge = function(other) {
  return new Observable((observer) => {
    let completedA = false;
    let completedB = false;

    const subscriptionA = this.subscribe({
      next: (value) => observer.next(value),
      error: (err) => observer.error(err),
      complete: () => {
        completedA = true;
        if (completedB) observer.complete();
      }
    });

    const subscriptionB = other.subscribe({
      next: (value) => observer.next(value),
      error: (err) => observer.error(err),
      complete: () => {
        completedB = true;
        if (completedA) observer.complete();
      }
    });

    return () => {
      subscriptionA.unsubscribe();
      subscriptionB.unsubscribe();
    };
  });
};

// Example: Combine clicks and keypresses
const clicks$ = Observable.fromEvent(document, 'click');
const keys$ = Observable.fromEvent(document, 'keypress');
const allEvents$ = clicks$.merge(keys$);
Enter fullscreen mode Exit fullscreen mode

combineLatest is different. It waits until all input observables have emitted at least one value, then emits a combined array every time any of the sources emits a new value. It's perfect for situations like a form, where you want to enable a "Submit" button only when all fields are valid, and recalculate on every change.

Observable.prototype.combineLatest = function(other) {
  return new Observable((observer) => {
    let lastA, hasA = false;
    let lastB, hasB = false;

    const emitIfReady = () => {
      if (hasA && hasB) {
        observer.next([lastA, lastB]); // Emit the latest pair
      }
    };

    const subA = this.subscribe({
      next: (value) => { lastA = value; hasA = true; emitIfReady(); },
      error: (err) => observer.error(err),
      complete: () => { if (hasB) observer.complete(); }
    });

    const subB = other.subscribe({
      next: (value) => { lastB = value; hasB = true; emitIfReady(); },
      error: (err) => observer.error(err),
      complete: () => { if (hasA) observer.complete(); }
    });

    return () => { subA.unsubscribe(); subB.unsubscribe(); };
  });
};
Enter fullscreen mode Exit fullscreen mode

4. The Special Bridge: Subjects

A Subject is a unique kind of observable. It acts as both an observer and an observable. You can push values into it using its .next(), .error(), and .complete() methods, and multiple subscribers can listen to the stream it produces. It's a centralized event bus.

class Subject extends Observable {
  constructor() {
    super((observer) => {
      this.observers.add(observer); // New subscriber joins the list
      return () => this.observers.delete(observer); // Cleanup on unsubscribe
    });
    this.observers = new Set();
    this.closed = false;
  }

  next(value) {
    if (this.closed) return;
    this.observers.forEach(observer => observer.next(value)); // Broadcast
  }
  error(err) { /* broadcast error and cleanup */ }
  complete() { /* broadcast completion and cleanup */ }
}

const messageBus = new Subject();

// Component A can send messages
messageBus.next('User logged in');

// Components B and C can receive them
const sub1 = messageBus.subscribe({ next: msg => console.log('B got:', msg) });
const sub2 = messageBus.subscribe({ next: msg => console.log('C got:', msg) });
Enter fullscreen mode Exit fullscreen mode

There are useful Subject variants. A BehaviorSubject is like a subject with a memory; it stores the latest value and immediately replays it to any new subscriber. This is ideal for representing state, like the current user or a theme.

class BehaviorSubject extends Subject {
  constructor(initialValue) {
    super();
    this._value = initialValue;
  }
  get value() { return this._value; }
  next(value) {
    this._value = value; // Store it
    super.next(value); // Broadcast it
  }
  subscribe(observer) {
    const subscription = super.subscribe(observer);
    observer.next(this._value); // Replay the current value on subscribe
    return subscription;
  }
}

// Represents application theme
const theme$ = new BehaviorSubject('light');
theme$.subscribe({ next: theme => console.log('Theme is now:', theme) }); // Logs: 'Theme is now: light'
theme$.next('dark'); // Logs: 'Theme is now: dark'

// A new component subscribes later, gets the current theme immediately
theme$.subscribe({ next: theme => console.log('Late subscriber sees:', theme) }); // Logs: 'Late subscriber sees: dark'
Enter fullscreen mode Exit fullscreen mode

A ReplaySubject can remember more than just the last value. You can configure it to buffer a certain number of past emissions and replay the whole buffer to new subscribers. This is useful for caching initial data.

5. Handling the Inevitable: Errors

In asynchronous streams, errors happen. The reactive approach provides structured ways to handle them without breaking the entire subscription chain. The catchError operator lets you intercept an error from the source observable, and switch to a "fallback" observable to continue the stream.

Observable.prototype.catchError = function(handler) {
  return new Observable((observer) => {
    const subscription = this.subscribe({
      next: (value) => observer.next(value),
      error: (err) => { // Error caught here
        const recoveryObservable = handler(err);
        recoveryObservable.subscribe(observer); // Switch to the recovery stream
      },
      complete: () => observer.complete()
    });
    return () => subscription.unsubscribe();
  });
};

// Usage: If the main API fails, fall back to a cached version
const apiData$ = Observable.fromPromise(fetch('/api/data'));
const cachedData$ = Observable.fromPromise(caches.match('/api/data'));

const robustData$ = apiData$.catchError(err => {
  console.warn('API failed, using cache', err);
  return cachedData$;
});
Enter fullscreen mode Exit fullscreen mode

The retry operator offers a different strategy: automatically resubscribing to the source observable if it fails, hoping the next attempt succeeds. You can control how many times it retries.

Observable.prototype.retry = function(maxRetries = 3) {
  return new Observable((observer) => {
    let retries = 0;
    const subscribe = () => {
      this.subscribe({
        next: (value) => observer.next(value),
        error: (err) => {
          retries++;
          if (retries <= maxRetries) {
            subscribe(); // Resubscribe on error
          } else {
            observer.error(err); // Give up after max retries
          }
        },
        complete: () => observer.complete()
      });
    };
    subscribe();
  });
};
Enter fullscreen mode Exit fullscreen mode

6. Controlling Timing with Schedulers

A scheduler controls when a task executes. This is vital for managing performance and testing. Our basic Scheduler allows scheduling work to run after a delay, and it returns a way to cancel it.

class Scheduler {
  constructor(scheduleFn) { this.scheduleFn = scheduleFn; }
  schedule(work, delay = 0, state) {
    return this.scheduleFn(work, delay, state);
  }

  static queue = new Scheduler((work, delay, state) => {
    if (delay <= 0) {
      work(state); // Execute immediately
      return { unsubscribe: () => {} };
    }
    const id = setTimeout(() => work(state), delay); // Use setTimeout for delay
    return { unsubscribe: () => clearTimeout(id) };
  });
}
Enter fullscreen mode Exit fullscreen mode

We can create an animation frame scheduler that aligns work with the browser's repaint cycle, perfect for smooth animations. We can also create an observeOn operator to make an observable emit its notifications on a specific scheduler.

static animationFrame = new Scheduler((work, state) => {
  const frameId = requestAnimationFrame(() => work(state));
  return { unsubscribe: () => cancelAnimationFrame(frameId) };
});

Observable.prototype.observeOn = function(scheduler) {
  return new Observable((observer) => {
    return this.subscribe({
      next: (value) => scheduler.schedule(() => observer.next(value)),
      error: (err) => scheduler.schedule(() => observer.error(err)),
      complete: () => scheduler.schedule(() => observer.complete())
    });
  });
};

// Animate a value stream smoothly
const valueStream$ = Observable.interval(16).observeOn(Scheduler.animationFrame);
Enter fullscreen mode Exit fullscreen mode

7. Sharing Work: Hot vs. Cold Observables

This is a crucial concept. A cold observable starts its work, like making an HTTP request or setting up an event listener, for each subscriber. Two subscribers to a cold interval observable would get two independent counters.

A hot observable shares a single execution among all its subscribers. A Subject is inherently hot. We can turn a cold observable hot using the share operator. It sets up a Subject internally. The first subscriber triggers the source observable. Subsequent subscribers join the existing Subject, sharing the same data. When the last subscriber leaves, it automatically disconnects from the source.

Observable.prototype.share = function() {
  let subject; // The internal, shared Subject
  let refCount = 0;
  let connection; // Subscription to the original cold source

  return new Observable((observer) => {
    refCount++;
    if (!subject) {
      subject = new Subject();
      connection = this.subscribe(subject); // Connect to source ONCE
    }
    const subSubscription = subject.subscribe(observer); // Subscribe to the shared subject

    return () => {
      refCount--;
      subSubscription.unsubscribe();
      if (refCount === 0 && connection) { // Last one out turns off the lights
        connection.unsubscribe();
        subject = null;
        connection = null;
      }
    };
  });
};

// Simulate an expensive API call
const coldApiCall$ = new Observable(observer => {
  console.log('Making expensive API call...');
  setTimeout(() => observer.next('Data'), 1000);
});

const sharedApiCall$ = coldApiCall$.share();

// First subscriber triggers the call
sharedApiCall$.subscribe(data => console.log('Sub1:', data));
// A moment later, second subscriber joins. It does NOT trigger a new call.
setTimeout(() => {
  sharedApiCall$.subscribe(data => console.log('Sub2 gets same data:', data));
}, 500);
Enter fullscreen mode Exit fullscreen mode

8. Testing Asynchronous Streams

Testing observables requires controlling time. Marble testing is a brilliant pattern for this. You describe streams and their expected outcomes using ASCII diagrams, where - is the passage of time, a, b, c are emitted values, | is completion, and # is an error. A special TestScheduler runs this in "virtual time," making asynchronous tests synchronous and deterministic.

const testScheduler = new TestScheduler();

// Create a cold observable from a marble string
// "a-b-c|" means: emit 'a', wait, emit 'b', wait, emit 'c', complete.
const source$ = testScheduler.createColdObservable('a-b-c|', { a: 1, b: 2, c: 3 });
// Apply an operator
const result$ = source$.map(x => x * 10);

// Set up what we expect to happen
const expectedMarbles = 'a-b-c|';
const expectedValues = { a: 10, b: 20, c: 30 };

// Schedule assertions and flush the virtual timeline
testScheduler.schedule(() => {
  // ... assertion logic comparing result$ to expected stream ...
}, 0);

testScheduler.flush(); // Executes all scheduled actions in virtual time
Enter fullscreen mode Exit fullscreen mode

The TestScheduler lets you advance time precisely, see exactly what values are emitted when, and verify complex async logic without real-world delays or flakiness.

Putting it all together, these eight techniques give you a complete toolkit. You start by creating streams from any source. You shape them with operators like map, filter, and debounceTime. You combine them with merge and combineLatest. You manage shared state or events with Subjects. You build resilience with catchError and retry. You control timing with Schedulers, optimize with share, and validate everything with marble testing.

When I started thinking about my application as a collection of dynamic data streams that I could declaratively compose, the complexity of modern front-end and server-side development became much more manageable. The code expresses what it does more clearly, and the patterns for handling async logic become consistent, whether it's a click event or a real-time data feed. It's a powerful way to write JavaScript.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)