DEV Community

Cover image for RxJS Tutorial: Observables, Operators, and beyond
Hunter Johnson for Educative

Posted on

RxJS Tutorial: Observables, Operators, and beyond

This article was originally published on The Educative blog.

Reactive programming is an essential part of modern web applications. However, few popular programming languages come equipped with the reactive API by default. RxJS allows you to create reactive programs with JavaScript to better serve your users. RxJS is a library used to create asynchronous programs using observable sequences.

Today, we'll explore an overview of reactive programming and RxJS and walk you through a quick tutorial on how to implement all the fundamental components of RxJS in your apps.

Today, we'll learn:

What is reactive programming?

Almost every online application today generates large amounts of real-time, interactive data. Applications are expected to make changes across the app in response to events and remain fully functional during the process. The reactive paradigm was made to handle these "events" with real-time updates across the program.

Reactive programs are structured around events rather than sequential top-down execution of iterative code. This allows them to respond to a trigger event regardless of when what stage the program is on.

Reactive programming is often combined with functional programming and concurrency to create stable, scalable, and event-driven programs.

Asynchronous vs Synchronous

One of the main concepts in reactive programming is synchronous vs asynchronous data. In short, synchronous data is delivered one at a time, as soon as possible.

Asynchronous data waits for a set event and is then delivered all at once through a "callback". Asynchronous data is more popular in reactive programming because it fits well with the paradigm's event-based approach.

Advantages of reactive programming

The main advantage of reactive programming is that it allows a program to remain responsive to events regardless of the program's current task.

Other advantages include:

  • Highly scalable
  • Clean and readable
  • Easy to add new events or response support
  • Improved user experience due to little downtime

The reactive paradigm can also be combined with other paradigms to form a blend, such as object-oriented reactive programming (OORP) or functional reactive programming (FRP). This blendable quality makes reactive programming a versatile paradigm that can be altered to fit a variety of purposes.

What is RxJS?

The reactive paradigm is available for many languages through reactive extensions, or Rx-libraries. These libraries are downloadable APIs that add support for essential reactive tools like observers and reactive operators. With reactive extensions, developers can convert normally iterative languages like JavaScript, Python, C++, etc., to reactive languages.

RxJS is more specifically a functional reactive programming tool featuring the observer pattern and the iterator pattern. It also includes an adapted form of JavaScript's array functions (reduce, map, etc.) to handle asynchronous events as collections.

JavaScript's Rx library is called RxJS. RxJS has become very popular because it simplifies the JavaScript async implementation. Without extensions, JavaScript's async is difficult to use and underdeveloped. RxJS makes async more achievable with tools built specifically for reactive and asynchronous programming.

Many web application frameworks, like Angular, are based on RxJS structures. As a result, you may have indirectly used RxJS already!

Next, we'll break down the core components of RxJS and show you how to implement them in your own code.

RxJS Observables

Observables are parts of our program that generate data over time. An observable's data is a stream of values that can then be transmitted synchronously or asynchronously.

Consumers can then subscribe to observables to listen to all the data they transmit. Consumers can be subscribed to multiple observables at the same time. This data can then be transformed as it moves through the data pipeline toward the user.

Let's see how to create an observable!

const {Observable} = require('rxjs')

const wrapArrayIntoObservable = arr => {
    return new Observable(subscriber => {
        for(let item of arr) {
            subscriber.next(item);
        }
    });
}
const data = [1, 2, 3, 4, 5];

const observable = wrapArrayIntoObservable(data);

observable.subscribe(val => console.log('Subscriber 1: ' + val));
observable.subscribe(val => console.log('Subscriber 2: ' + val));
Enter fullscreen mode Exit fullscreen mode
// Output:
Subscriber 1: 1
Subscriber 1: 2
Subscriber 1: 3
Subscriber 1: 4
Subscriber 1: 5
Subscriber 2: 1
Subscriber 2: 2
Subscriber 2: 3
Subscriber 2: 4
Subscriber 2: 5
Enter fullscreen mode Exit fullscreen mode

On line 3, we create the wrapArrayIntoObservable() function that takes an array as a parameter and wraps that array into an observable. This function is then passed to the Observable constructor on line 12 and run for each subscriber. Finally, on lines 14 and 15 each subscriber prints the data stream received.

RxJS data pipeline

Data pipelines are a sequential series of transformations all data in a stream passes through before it is presented to the user. These transformations can be applied to all data that passes through, for example, to make the stream more readable to the user.

There can also be optional transformations that only occur under specific events, like filtering transformations. Data still passes through these optional transformations but are simply not applied.

Each transformation can be thought of as a pipe segment that the data flow passes through.

Let's see how we can create a data pipeline that can deliver a raw output and an optional user-friendly output for multiple subscribers:

const { from } = require('rxjs');
const { tap, filter, map } = require('rxjs/operators');

const arrayDataObservable$ = from([1, 2, 3, 4, 5]);

const dataPipeline = arrayDataObservable$.pipe(
    tap(val => console.log('Value passing through the stream: ' + val)),
    filter(val => val > 2),
    map(val => val * 2)
)

const subscribeToBaseObservable = subscriberName => {
    return arrayDataObservable$.subscribe(val => {
        console.log(subscriberName + ' received: ' + val);
    })
}

const subscribeToDataPipeline = subscriberName => {
    return dataPipeline.subscribe(val => {
        console.log(subscriberName + ' received: ' + val);
    })
}

const handleSubscriptionToBaseObservable = () => {
    const subscription1 = subscribeToBaseObservable('Subscriber1');
    const subscription2 = subscribeToBaseObservable('Subscriber2');
}


const handleSubscriptionToDataPipeline = () => {
    const subscription1 = subscribeToDataPipeline('Subscriber1');
    const subscription2 = subscribeToDataPipeline('Subscriber2');
}

// 1. Execute this function first
handleSubscriptionToBaseObservable();

// 2. Execute this function next
//handleSubscriptionToDataPipeline();
Enter fullscreen mode Exit fullscreen mode
//raw output
Subscriber1 received: 1
Subscriber1 received: 2
Subscriber1 received: 3
Subscriber1 received: 4
Subscriber1 received: 5
Subscriber2 received: 1
Subscriber2 received: 2
Subscriber2 received: 3
Subscriber2 received: 4
Subscriber2 received: 5
Enter fullscreen mode Exit fullscreen mode
//filtered output
Value passing through the stream: 1
Value passing through the stream: 2
Value passing through the stream: 3
Subscriber1 received: 6
Value passing through the stream: 4
Subscriber1 received: 8
Value passing through the stream: 5
Subscriber1 received: 10
Value passing through the stream: 1
Value passing through the stream: 2
Value passing through the stream: 3
Subscriber2 received: 6
Value passing through the stream: 4
Subscriber2 received: 8
Value passing through the stream: 5
Subscriber2 received: 10
Enter fullscreen mode Exit fullscreen mode

Through executing the two different functions, you can see how pipelines can be used to deliver the same data in different ways based on the user's subscriptions. Users are also notified through tap that the data has been transformed.

RxJS creational operators

The most common operators used in RxJS data pipelines are creational operators. We'll cover the simple from creational operator used in the previous section and the closely related of operator.

from

The from operator is used to wrap an array, a promise, or an iterable into an Observable. This operator directs the program to an already created data collection, like an array, that is then used to populate the observable values.

Here's an example:

const { from } = require('rxjs');


const DATA_SOURCE = [ 'String 1', 'String 2', 'Yet another string', 'I am the last string' ];
const observable$ = from(DATA_SOURCE)

observable$.subscribe(console.log)
Enter fullscreen mode Exit fullscreen mode
// output
String 1
String 2
Yet another string
I am the last string
Enter fullscreen mode Exit fullscreen mode

of

The of operator is the second most common creational operator. The of operator is syntactically similar to from but of accepts sequential data rather than iterative data like arrays. If it does get an array, of simply prints the array like a declarative statement. When wrapping observables, of is best used if the data does make sense in an array.

const { of } = require('rxjs');


const DATA_SOURCE = [ 'String 1', 'String 2', 'Yet another string', 'I am the last string' ];
const observableArray$ = of(DATA_SOURCE)

console.log("Array data source")
observableArray$.subscribe(console.log)

console.log("\n")
console.log("Sequence data source")
const observableSequence$ = of('String 1', 'String 2', 'Yet another string', 'I am the last string')

observableSequence$.subscribe(console.log)
Enter fullscreen mode Exit fullscreen mode
//output
Array data source
[ 'String 1',
  'String 2',
  'Yet another string',
  'I am the last string' ]


Sequence data source
String 1
String 2
Yet another string
I am the last string
Enter fullscreen mode Exit fullscreen mode

RxJS pipe function and pipeable operators

The pipe() function calls all operators other than creational operators. These non-creational operators are the second type of operator, called pipeable operators.

Pipeable operators take one observable as input and return an observable as output to continue the pipeline. They can be called like a normal function, op1()(obs), but are more often called in sequence to form a data pipeline. The pipe() function is a cleaner way to call multiple operators in sequence and is therefore the preferred way to call operators.

// standard
op4()(op3()(op2()(op1()(obs))))
Enter fullscreen mode Exit fullscreen mode
// pipe function
obs.pipe(
  op1(),
  op2(),
  op3(),
  op3(),
)
Enter fullscreen mode Exit fullscreen mode

It is considered best practice to use the pipe() function even if you only call one operator.

RxJS filtering operators

The most common type of pipeable operator is the filtering operator. These operators remove all values that do not fit their passed criteria. We'll look at the popular filter and first filtering operators.

filter

The filter operator takes a predicate function, like val => val + 1 == 3, that is applied to all passed values. For each value, the program compares the given value to the predicate function and keeps any values that make the function true.

The example below will only allow even numbers through:


const { from } = require('rxjs');
const { filter } = require('rxjs/operators');

const observable$ = from([1, 2, 3, 4, 5, 6])

observable$.pipe(
    filter(val => val % 2 == 0)
).subscribe(console.log)
Enter fullscreen mode Exit fullscreen mode
//output
2
4
6
Enter fullscreen mode Exit fullscreen mode

The filter operator is a great tool for transforming data to suit the needs of specific subscribers. For example, some users may want to see all product listings while others may only want to see products from a certain price range.

first

The first operator can be used in two ways. By default, it returns the first value emitted by the observable. The benefit of returning the first value is that the turnaround time is very low, making this use great for times when a simple, quick response is sufficient.

const { from } = require('rxjs');
const { first } = require('rxjs/operators');

const observable$ = from([1, 2, 3, 4, 5, 6])

// take first
observable$.pipe(
    first()
).subscribe(console.log)
Enter fullscreen mode Exit fullscreen mode
// output
1
Enter fullscreen mode Exit fullscreen mode

The other use of the first operator adds a predicate function or default value to compare against passed values. Similar to filter, first then returns the first value to match the predicate. This use helps you search a data stream when you only need one value.

const { from } = require('rxjs');
const { first } = require('rxjs/operators');

const observable$ = from([1, 2, 3, 4, 5, 6])

// Example 1 - take first that passes the predicate, or default otherwise
observable$.pipe(
    first(val => val > 6, -1)
).subscribe(console.log)
Enter fullscreen mode Exit fullscreen mode
//output
-1
Enter fullscreen mode Exit fullscreen mode

Filter operations are pure operations, meaning that the original observable data is not affected by any transformations to the emitted data stream.

What to learn next

Congratulations on completing our quick tutorial on RxJS basics. You now have the tools to create observables, use common creational and pipeable operators, and string them all together with the pipe() function.

This is only a snapshot of what RxJS can offer you. Some intermediate topics you can tackle are:

  • Subjects
  • Transformational and Combinational Operators
  • Custom Operators
  • DOM event integration
  • Reactive error handling

To help you pick up these advanced topics, Educative has developed Building Reactive Applications with RxJS. This course is full of interactive code blocks and full coding projects to help give you the hands-on experience you'll need to master RxJS. By the end, you'll have extensive reactive programming skills to develop your very own web applications!

Continue reading about web applications on Educative

Start a discussion

What RxJS tutorial would you like to read next? Was this article helpful? Let us know in the comments below!

Top comments (0)