DEV Community

Rebekah van Wyk
Rebekah van Wyk

Posted on

Reactive Programming and Observables with RxJS

According to the RxJS docs, RxJS, or Reactive Extensions for JavaScript, is a library "for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code”. In this article, we’ll break down the core concepts behind reactive programming and how RxJS makes it easier to work with asynchronous data.

Reactive Programming

To understand what reactive programming is, we should first understand data streams. A data stream is a flow of potentially infinite data, where the data can arrive at various points in time.

Visually, it might look something like this:

Visual representation of a data stream

where the dotted line represents time, and our nodes A, B, and C represent data points arriving at different points in time.

Reactive programming is a paradigm that focuses on data streams and change propagation, meaning that when new data arrives, any part of our system that relies on this data automatically reacts and changes accordingly.

RxJS helps us to achieve this through Observables.

What are Observables?

Put simply, Observables are data producers that represent a data stream. They emit notifications that we can consume.

There are three types of notifications that can be emitted from an Observable:

  • Next notifications deliver a value emitted by the Observable
  • Error notifications indicate that something has gone wrong during the Observable's execution
  • Complete notifications indicate that the Observable has successfully finished its execution

All of these notifications are useless to us, however, if we don't have the relevant Observers and Subscriptions in place to effectively consume these notifications.

Observers and Subscriptions

Observers define how each type of notification from an Observable is handled.

A subscription is what actually connects an Observable to an observer. Without a subscription, the Observable would remain idle, and its logic would never run.

The subscribe() method of RxJS creates a subscription, and the unsubscribe() method ends a subscription. Something to note is that once a notification of type error or complete is emitted from an Observable, the subscription is automatically ended, and nothing further will be emitted by the Observable.

Overall, the relationship between Observables, Observers, and Subscriptions, looks something like this:

Relationship between Observables, Observers, and Subscriptions

Why is it important to unsubscribe?

In Angular, components can live for a long time. If you forget to unsubscribe from long-running streams, you risk memory leaks, performance issues, and in the worst case scenario, your app could even crash.

Let's have a look at a very simple code example of an Observable in action.

execute() {
  // create an Observable
  const observable$ = new Observable<string>((subscriber) => {
    subscriber.next('First emission');
    setTimeout(() => subscriber.next('Second emission'), 2000);
    setTimeout(() => subscriber.complete(), 3000); // indicates that we're done and unsubscribes
    setTimeout(() => subscriber.next('Final emission'), 4000);
  });

  // define an Observer
  const observer = {
    next: (value: string) => console.log('Received: ', value),
    error: (err: any) => console.error('Error: ', err),
    complete: () => console.log('Done'),
  };

  // subscribe to start receiving values
  // pass our Observer to our subscribe function
  observable$.subscribe(observer);
}
Enter fullscreen mode Exit fullscreen mode

In this example, the Observer has indeed defined how notifications from the Observable will be handled; relevant messages will be logged to the console. We'd see the following logged to the console when this execute() method is run:

  • the string "Received: First emission" synchronously
  • the string "Received: Second emission" after 2 seconds
  • the string "Done" after 3 seconds

Then, despite there being another next notification defined, we'd see nothing further in the console due to the complete notification ending the subscription to our Observable. In a more practical example, the number or timing of emissions from our Observable may be unknown, and we'd need to unsubscribe more explicitly. You can learn more about ways to do that in Angular here.

While this example is a valid way to work with Observables, in practice we tend to make use of functions in RxJS called operators to work with Observables more cleanly and easily.

RxJS Operators

As we've just mentioned, RxJS operators are simply functions defined by RxJS which make it easier to work with Observables. There are various types of operators, as listed here on the RxJS docs. We would typically make use of these operators to do things like transform, filter, and combine streams of data easily. Let's look at two important categories of RxJS operators: creation operators and pipeable operators.

Creation operators

Creation operators create new Observables with pre-defined behaviour, or by joining other Observables. They can be called as standalone functions.

Some popular creation operators are of(), combineLatest(), and merge(). We'll just have a brief look at the of() operator.

of()

The of() operator returns an Observable that simply emits the provided arguments synchronously and then completes.

Code snippet:

of(1, 2, 3).subscribe(value => console.log(value))
Enter fullscreen mode Exit fullscreen mode

Logs:
1
2
3

Pipeable operators

Pipeable operators start with an Observable as input, transform or update the emissions of that Observable in some way, and then return the result as a new Observable. These operators are used with RxJS's pipe() method.

Let's have a look at a couple of the most commonly used pipeable operators, namely map() and filter().

map()

The map() operator applies a function to each value emitted from the Observable.

Code snippet:

of(1, 2, 3).pipe(
  map(num => num * 10)
).subscribe(value => console.log(value));
Enter fullscreen mode Exit fullscreen mode

Logs:
10
20
30

filter()

The filter() operator only passes values that match a specified predicate.

Code snippet:

of(1, 2, 3, 4, 5).pipe(
  filter(num => num % 2 === 0)
).subscribe(value => console.log(value));
Enter fullscreen mode Exit fullscreen mode

Logs:
2
4

The beauty of the pipe() function is that it allows us to chain operators together to perform powerful operations. For example, we could combine the map and filter operators in the following way:

of(1, 2, 3, 4, 5).pipe(
  map(num => num * 10),     
  filter(num => num > 20)
).subscribe(value => console.log(value));
Enter fullscreen mode Exit fullscreen mode

Logs:
30
40
50

Note: Marble diagrams are a great visual tool to help us understand how each RxJS operator works. RxMarbles is a website with very useful interactive marble diagrams that can be used to help deepen your understanding of various RxJS Operators. I suggest having a look to further your understanding of the above examples.

Real-world applications of Observables

Observables are most valuable in cases where code is asynchronous or event-driven in nature. Examples of such scenarios include:

  • Handling HTTP requests
  • Managing UI events such as clicks, scrolls, and key presses
  • Working with State Management libraries such as NgRx

Observables provide a clean way to handle data as it arrives, making it much easier to manage these kinds of scenarios.

Conclusion

This introduction to RxJS Observables highlighted how they provide a powerful way to handle asynchronous and event-driven data. We discussed how combining Observables with operators enables you to transform, filter, and combine data streams with ease, making reactive programming more intuitive and achievable. I hope this post provided some clarity around what Observables are, how they work, and why they are useful to us. Thanks for reading!

Top comments (0)