DEV Community

Cover image for Master Asynchronous JavaScript with RxJS
Calvince Moth for Syncfusion, Inc.

Posted on • Originally published at syncfusion.com on

Master Asynchronous JavaScript with RxJS

TL;DR: RxJS is a powerful JavaScript library for managing asynchronous data streams, simplifying complex operations like event handling and API interactions. It uses observables to represent data streams, operators to transform and manipulate them, and subscriptions to react to emitted values.

In the dynamic landscape of modern JavaScript development, efficiently handling asynchronous operations is paramount. RxJS (reactive extensions for JavaScript) is a powerful library developed to address this challenge, enabling developers to manage asynchronous data streams with elegance and precision.

What is RxJS?

RxJS is a library that lets developers work with asynchronous and event-based programs using observable sequences. At its core lies the concept of reactive programming, a paradigm centered around data streams and the propagation of change. This approach is especially valuable when dealing with user interfaces, where various events like user interactions, data fetching, and app state changes can be treated as flowing streams of data. Instead of directly responding to each event, reactive programming encourages developers to declare how the app should behave when changes occur within these streams.

Core concepts

To grasp the power of RxJS, it is essential to understand its fundamental building blocks:

  • Observables: Observables are the heart of RxJS, representing a source of data that emits values over time. They can be created from various sources, including events, promises, and existing data. Think of an observable as a pipeline through which data flows.
  • Observers: An observer is an object that subscribes to an observable and defines how to react to the emitted values. It acts as a listener, dictating what actions to take when new data arrives.
  • Subscriptions: A subscription represents the connection between an observer and an observable. It’s like a contract that allows the observer to receive values from the observable. When you subscribe to an observable, you start receiving data until you explicitly unsubscribe.
  • Operators: Operators are pure functions that enable the transformation, filtering, and combination of observables. They act as modifiers, shaping and refining the data flowing through the observable stream. They provide a declarative way to manipulate data streams without modifying the original source.

Cold vs. hot observables

Observables in RxJS can be categorized as either cold or hot:

  • Cold observables are created on demand and start emitting values only when subscribed to. Each new subscription triggers a fresh execution of the observable. For example, an observable created from an HTTP request is considered cold because it makes the request only when a subscriber expresses interest.
  • Hot observables exist independently of subscriptions and emit values regardless of whether anyone is listening. They represent an ongoing stream of data that is shared among all subscribers. Examples include mouse events or stock tickers, where the data stream continues regardless of the number of observers.

Let’s illustrate these concepts with simple examples.

Creating an Observable

import { Observable } from "rxjs";

const first5Numbers$ = new Observable((obs) => {
  console.log("hello!");
  for (let i = 0; i < 5; i++) {
    obs.next(i);
  }
  obs.complete();
});

// Logs nothing.
first5Numbers$.subscribe((n) => {
  console.log(n);
});

// Logs "hello!" followed by 0 1 2 3 4.
Enter fullscreen mode Exit fullscreen mode

In this example, first5Numbers$ is a cold observable that emits numbers 0 to 4. The subscribe method attaches an observer to the observable. The next method is used to emit values from the observable. The complete method signals the end of the stream.

Using an operator

import { interval } from "rxjs";
import { take } from "rxjs/operators";

const first5SpacedNumbers$ = interval(1000).pipe(take(5));
Enter fullscreen mode Exit fullscreen mode

Here, we create an observable first5SpacedNumbers$ that emits a value every second. The take operator is used to limit the stream to the first five emissions.

Why use RxJS?

RxJS shines in several scenarios:

  • Handling complex, asynchronous operations: RxJS provides a structured approach to manage intricate asynchronous flows, preventing callback hell and deeply nested promises. Its declarative nature allows you to express complex logic concisely, making your code more readable and maintainable.
  • Real-time applications: With its support for hot observables, RxJS excels in building real-time apps like chat apps, stock tickers, and collaborative editing tools.
  • Event handling: RxJS simplifies the handling of user interactions, DOM events, and other asynchronous events, providing a streamlined way to manage event propagation and responses.

RxJS vs. promises and async/await

While promises and async/await are valuable for handling single asynchronous operations, RxJS is geared toward managing streams of asynchronous events. Here’s a comparison:

  • Promises: Resolve with a single value and are primarily useful for one-time asynchronous tasks.
  • Async/await: Provide a more synchronous-looking syntax for working with promises but still focus on individual asynchronous operations.
  • RxJS: Handles multiple values over time, offering operators to transform, filter, and combine these values. It’s ideal for scenarios where data arrives continuously or in bursts.

Setting up RxJS

Installation

You can install RxJS in your project using npm or yarn:

npm install rxjs
Enter fullscreen mode Exit fullscreen mode

or

yarn add rxjs
Enter fullscreen mode Exit fullscreen mode

Alternatively, you can include RxJS via a CDN link in your HTML file.

<script src="https://unpkg.com/rxjs@7/dist/bundles/rxjs.umd.min.js"></script>
Enter fullscreen mode Exit fullscreen mode

Let’s create a simple observable and subscribe to it.

import { of } from "rxjs";

const myObservable$ = of(1, 2, 3);

myObservable$.subscribe((value) => {
  console.log(value); // Outputs: 1, 2, 3
});
Enter fullscreen mode Exit fullscreen mode

In this example, we use the of operator to create an observable that emits the values 1, 2, and 3.

Operators in RxJS

Operators are the backbone of RxJS, providing a rich vocabulary to manipulate data streams. Here are some categories of operators:

  • Creation operators: Create observables from various sources, like of, from, interval, and fromEvent.
  • Transformation operators: Modify the emitted values, such as map, flatMap, switchMap, and scan.
  • Filtering operators: Selectively emit values based on criteria, like filter, distinctUntilChanged, and take.
  • Combination operators: Merge or combine multiple observables, such as merge, concat, zip, and combineLatest.

Real-world use cases

Let’s explore some real-world examples of key operators:

  • map: Transform the values emitted by an observable. For example, you can use a map to extract specific data from an HTTP response.
import { of } from "rxjs";
import { map } from "rxjs/operators";

const source$ = of({ name: "John", age: 30 });

source$.pipe(map((person) => person.name)).subscribe((name) => {
    console.log(name); // Outputs: "John"
});
Enter fullscreen mode Exit fullscreen mode
  • filter: Emit only values that meet a specific condition. For instance, you can filter a stream of events to process only mouse clicks within a certain area.
import { fromEvent } from "rxjs";
import { filter } from "rxjs/operators";

const clicks$ = fromEvent(document, "click").pipe(
  filter(event => event.clientX > 100)
);
Enter fullscreen mode Exit fullscreen mode
  • merge: Combine multiple observables into a single stream, emitting values from all sources as they arrive. This is useful for handling events from different sources, like user input and server responses.
import { merge } from "rxjs";

const clicks$ = fromEvent(document, "click");
const keypresses$ = fromEvent(document, "keypress");

merge(clicks$, keypresses$).subscribe((event) => {
    console.log(event); // Outputs both click and keypress events
});
Enter fullscreen mode Exit fullscreen mode
  • switchMap: When the source observable emits a value, it subscribes to a new inner observable and cancels the previous inner observable. This is useful for scenarios like API calls triggered by user input, where you only care about the latest request.
import { fromEvent } from "rxjs";
import { switchMap } from "rxjs/operators";

const searchInput = document.getElementById("search") as HTMLInputElement;
const searchTerm$ = fromEvent(searchInput, "input").pipe(
  map((event) => (event.target as HTMLInputElement).value),
  switchMap((term) => fetch(`/api/search?q=${term}`))
);
Enter fullscreen mode Exit fullscreen mode
  • catchError: Handle errors gracefully within an observable stream. It allows you to catch errors, perform actions like logging or retrying, and optionally return a new observable to continue the stream.
import { of } from 'rxjs';
import { catchError, map } from 'rxjs/operators';

const source$ = of(1, 2, 3, 4, 5).pipe(
  map((value) => {
    if (value === 4) {
      throw new Error('Error occurred!');
    }
    return value;
  }),
  catchError((error) => {
    console.error(error);
    return of(0); // Replace the error with 0
  })
);
Enter fullscreen mode Exit fullscreen mode

Error handling in RxJS

RxJS provides robust mechanisms for managing errors within observable streams.

  • retry: If an observable emits an error, the retry operator resubscribes to the source observable, attempting to recover from the error. You can specify the number of retry attempts or apply retry logic based on the error type.
  • catchError: As mentioned earlier, the catchError operator allows you to gracefully handle errors, log them, replace the error with a default value, or even return a new observable to continue the stream.
  • finalize: This operator executes a callback function regardless of whether the observable completes successfully or emits an error. It’s useful for cleanup tasks, like closing resources or resetting state.

Refer to the following code example for error handling in RxJS.

import { of } from "rxjs"
import { retry, catchError, finalize } from "rxjs/operators"

const source$ = of(1, 2, 3, 4, 5).pipe(
    map((value) => {
        if (value === 4) {
            throw new Error("Error occurred!")
        }
        return value
    }),
    retry(2), // Retry twice if an error occurs
    catchError((error) => {
        console.error(error)
        return of(0) // Replace the error with 0
    }),
    finalize(() => console.log("Observable finalized!"))
)
Enter fullscreen mode Exit fullscreen mode

In this example, the observable attempts to retry twice if an error occurs. If all retries fail, the catchError operator handles the error. The finalize operator logs a message when the observable completes or errors.

Practical applications

Let’s see how RxJS can be applied in real-world scenarios:

  • Form validation: RxJS is well-suited for creating reactive forms, where validation occurs in real-time as the user types. You can use observables to monitor input changes, apply validation rules, and provide immediate feedback.
  • API polling: RxJS simplifies the implementation of polling mechanisms. You can use operators like interval and switchMap to periodically fetch data from an API, handling responses and errors gracefully.
  • Real-time chat apps: RxJS is a natural fit for building real-time chat apps. Hot observables can represent the stream of messages, and operators like map and filter can be used to process and display messages.

Tips and best practices

To effectively utilize RxJS in your projects:

  • Decomposition: Break down complex logic into smaller, manageable observables that can be combined using operators.
  • Error handling: Employ catchError and retry to gracefully handle errors and enhance app resilience.
  • Unsubscribing: Prevent memory leaks by unsubscribing from observables when they are no longer needed. Consider using tools like takeUntil or the async pipe in Angular to simplify subscription management.
  • Testing: Leverage RxJS testing utilities, like TestScheduler, to thoroughly test your observable logic.

Common pitfalls

  • Overusing RxJS: While powerful, RxJS can add complexity if used inappropriately. Stick to scenarios where its strengths are truly beneficial.
  • Memory leaks: Neglecting to unsubscribe from observables can lead to memory leaks. Always ensure proper subscription management.

Conclusion

Thanks for reading the blog! RxJS provides a powerful and elegant way to handle asynchronous data streams in JavaScript apps. Its reactive programming model, coupled with a rich set of operators, enables developers to build responsive, scalable, and maintainable apps. By embracing the concepts of observables, observers, and operators, you can unlock the full potential of RxJS and elevate your JavaScript development skills. Its learning curve might seem steep initially, but the rewards in terms of code clarity, maintainability, and efficiency are well worth the effort.

Related Blogs

Top comments (0)