DEV Community

Cover image for Understanding RxJS - Learn From Examples - Part 1
Eran Sakal
Eran Sakal

Posted on • Updated on • Originally published at sakalim.com

Understanding RxJS - Learn From Examples - Part 1

Many articles are discussing RxJS, and this is yet another one addressing beginners and even moderate level developers that want to understand the core concepts. The post was written based on a session I did at my workplace as we are using RxJS intensively to handle complex asynchronous use-cases efficiently.

What Is RxJS

RxJS stands for "Reactive Extension for Javascript" - a library written in Javascript that lets you manage asynchronous data flow by using streams of events. RxJS is based on functional programming fundamentals and is implementing several design patterns like the Observable pattern. It is worth getting familiar with RxJS because once you know RxJS, you will find many complex use-cases that are simplified with RxJS. 

Marble Diagrams

Marble diagrams are used to explain RxJS flows. Marble diagrams are a visual representation of operators that help people learn about an operator by looking at a diagram. The following diagram was taken from this online guide.


marble diagram explanation

Fun fact, you can create marble diagrams online using this great site.

Streams And Observables

RxJS handles asynchronous streams of data. With RxJS, a user can create streams, subscribe to streams, and use operators to manipulate data emitted by a stream.

An observable object represents a stream. There are two types of observable, cold observables and hot observables. Observables are by default cold observables. Creating cold observable does nothing besides creating an Observable object.

Let's look at the following function ajax.getJSON('https://reqres.in/api/users'). This function returns a (cold) observable that can be used by a user to execute requests. Given a URL, the observable performs a fetch request and returns the result as JSON object.


a marble diagram of ajax get json operation

Cold observables are lazy, they don't initiate streams automatically upon creation, so the below example will do nothing besides creating an observable.

import { ajax } from "rxjs/ajax";
import { map } from "rxjs/operators";

const fetchUsers$ = ajax
  .getJSON(`https://reqres.in/api/users`)
  .pipe(map(userResponse => userResponse.data));
Enter fullscreen mode Exit fullscreen mode

Executing a cold observable

To execute the observable, you should .subscribe() to that observable. Only then a stream is initiated, and a call to the API server will be performed. 

(Run example in CodeSandbox)


import { ajax } from "rxjs/ajax";

const fetchUsers$ = ajax
  .getJSON(`https://reqres.in/api/users`);

fetchUsers$.subscribe(result => {
  console.log(result.data);
});
Enter fullscreen mode Exit fullscreen mode

Notice the suffix $, a common way to indicate that the constant is observable. This suffix is usually used with constants but not with functions! So don't write something like .getUsers$() to describe a method that returns an observable, .getUsers() is a preferred way.

Any observable creates a new stream and begin its execution every time you .subscribe() to it.  If three subscriptions are done, three different streams will be created, which will result in three different API calls to the server.

import { ajax } from "rxjs/ajax";
import { map } from "rxjs/operators";

const fetchUsers$ = ajax
  .getJSON(`https://reqres.in/api/users`)
  .pipe(map(userResponse => userResponse.data));

fetchUsers$.subscribe(result => {
  console.log(result);
});

fetchUsers$.subscribe(result => {
  console.log(result);
});

fetchUsers$.subscribe(result => {
  console.log(result);
});
Enter fullscreen mode Exit fullscreen mode

In the network tab of the developer tools, you will see three calls to the API server even when all three were done by subscribe()-ing to the same observable because the cold observable will create a new stream for each subscription.


network snapshop of three calls to fetch users data

RxJS Observables lifecycle

Any stream can be endless. Once a stream was created, you can expect zero to an infinite number of values emitted on that stream. A stream will remain “live" until something forces it to be complete.

It is the responsibility of the Observable creator to close the stream. ajax.getJSON() does exactly that once a response is retrieved from the server.

In marbles diagrams, an endless stream has an arrow on the right, indicating that it continues beyond the diagram.


a marble diagram of an endless stream

Three actions close a stream, as described below.

Calling Complete On An Observable Will Close The Stream

A stream will be closed if the creator of the stream .complete()s it. In the ajax example above, once the origin received a result from the API server and emitted that result as a value, there is no point keeping the stream open, so it closed it automatically be calling .complete() on the observable.

In marbles diagrams, a complete operation is represented as a vertical line on the right side just before the arrowhead.


a marble diagram of a stream that was completed

The code below executes the observable by calling .subscribe(). The method .subscribe() gets three optional arguments: 1. A method to execute when the observable emits a value. 2. A method to execute when an error happens. 3. A execute to call when the observable completes.

The user will usually provide the third method if he/she needs to free up resources when the stream completes. In this example, that method is used to write to the console when the stream completes.

Although you might expect to see -- completed in the console log after emitting value C, it doesn't happen.

import { Observable } from "rxjs";

const emitValuesAndComplete$ = Observable.create(observer => {
  observer.next("A");
  observer.next("B");
  observer.next("C");
});

emitValuesAndComplete$.subscribe(
  result => {
    console.log(` ${result}`);
  },
  error => {
    // note - this is an optional argument that allows handling errors
    console.log(" -- error");
  },
  () => {
    // note - this is an optional argument that allows handling complete action
    console.log(" -- completed");
  }
);

// Console Log: A B C
Enter fullscreen mode Exit fullscreen mode

If you are creating a stream, you should be responsible and remember to complete it. Otherwise, the stream remains open, and the subscriber keeps waiting for new values. Let's refactor the example and complete the stream after emitting the last value.

(Run example in CodeSandbox)

import { Observable } from "rxjs";

const emitValuesAndComplete$ = Observable.create(observer => {
  observer.next("A");
  observer.next("B");
  observer.next("C");
  observer.complete(); // New Line
});

emitValuesAndComplete$.subscribe(
  result => {
    console.log(` ${result}`);
  },
  error => {
    // note - this is an optional argument that allows handling errors
    console.log(" -- error");
  },
  () => {
    // note - this is an optional argument that allows handling complete action
    console.log(" -- completed");
  }
);

// Console Log: A B C – completed
Enter fullscreen mode Exit fullscreen mode

Throwing An Error On An Observable Will Close The Stream

An error happens if the observable call .error(new Error(“Something happened")). When an error is thrown, the stream is dead, no other value will be emitted after the error is thrown as the stream is not live anymore. So basically, if you throw an error on the stream, you don't also need to complete it.

In marbles diagrams, an error is represented as a red X on the right side just before the arrowhead.


a marble diagram of a stream with error

(Run example in CodeSandbox)

import { Observable } from "rxjs";

const emitValuesAndError$ = Observable.create((observer) => {
  observer.next('A');
  observer.next('B');
  observer.next('C');
  observer.error(new Error('something bad happened'));
});

emitValuesAndError$.subscribe(result => {
  console.log(result);
}, (e) => {
  // note - this is an optional argument that allows handling errors
  console.log(' -- error with message: ' + e.message);
}, () => {
  // note - this is an optional argument that allows handling complete action
  console.log(' -- completed');
})

// Console Log: A B C – error
Enter fullscreen mode Exit fullscreen mode

Unsubscribe From A (Cold) Observable Will Close The Stream

A user can .unsubscribe()s from the stream at any time, even before it completes or even emitted a value. The example below shows how you handle a request to unsubscribe. The example below shows an observable that emits the letter A every 0.5 seconds.

import { Observable } from "rxjs";

const emitValuesAndComplete$ = Observable.create(observer => {
setInterval(() => {
    console.log(`Emitting value A`);
    observer.next("A");
  }, 500);
});

const subscription = emitValuesAndComplete$.subscribe(result => {
  console.log(result);
});

setTimeout(() => {
  subscription.unsubscribe();
}, 2000);

/* Console Log:
Emitting value A
A
Emitting value A
A
Emitting value A
A
Emitting value A
A
Emitting value A
Emitting value A
Emitting value A

…

…
Emitting value A <- - - - - infinite console log every 0.5 second
/*.
Enter fullscreen mode Exit fullscreen mode

Cold Observables Tips

Now that you are familiar with the stream's lifecycle and ways to manipulate them, let's review some useful tips.

Tip #1 – You should be responsible to clean after yourself

You should beware of memory leaks. Did you notice in the above example a memory leak? If you create an observable, it is our responsibility to clean resources by providing a teardown method that will be executed when the stream is closed.

The problem with the code above is that the interval in our observable will continue to run even when you unsubscribed from the stream. It might seem minor as here you only forgot to stop the interval, but in real examples, it will probably be more noticeable.

Let's fix the example above by adding a teardown method.

(Run example in CodeSandbox)

import { Observable } from "rxjs";

const emitValuesAndComplete$ = Observable.create(observer => {
  const intervalToken = setInterval(() => {
    console.log(`Emitting value A`);
    observer.next("A");
  }, 500);

  return () => {
    // this is an optional teardown method that is called when user unsubscribe to let you free resources and prevent memory leaks
    clearInterval(intervalToken);
  };
});

const subscription = emitValuesAndComplete$.subscribe(result => {
  console.log(result);
});

setTimeout(() => {
  subscription.unsubscribe();
}, 2000);

/* Console Log:

Emitting value A
A
Emitting value A
A
Emitting value A
A
/*
Enter fullscreen mode Exit fullscreen mode

Tip #2 – Observables are cancellable

The example above raises a significant difference between promises and observables, a topic that is covered later in this post. You just saw that you could cancel a subscription, something fundamental that cannot be achieved with promises unless you are using some 3rd party libraries. Observables are built with cancellation API that not only allows the subscribe to unsubscribe but also will enable the stream origin to do cleanups like canceling the actual API request call to the server. 

Tip #3 - Feel free to unsubscribe

The user shouldn't bother to check if the stream is active or not and can call .unsubscribe() anytime. To demonstrate, the following code can be called (although one call is enough):

setTimeout(() => {
  subscription.unsubscribe(); // one time is enough
  subscription.unsubscribe(); // this will not do any good or harm
  subscription.unsubscribe(); // this will not do any good or harm
  subscription.unsubscribe(); // this will not do any good or harm
}, 2000);
Enter fullscreen mode Exit fullscreen mode

Using An Hot Observable To Share A Stream

The other type of observable is hot. As opposed to cold observables, hot observables are eager; they don't wait for someone to subscribe and initiate streams immediately.

To simplify the creation of hot observables, you use the subject. A subject-object provides both an API of an observable where users can subscribe to it, and also an API of the inner Observer which allows you to perform operations like emitting the .next() value, throwing .error() an error on the stream or .complete() the stream.

There are two key differences between hot and cold observables:

(1) Hot observables run immediately and emit values even if no one has subscribed to them, while cold observables will run only when someone subscribes to them.

In the example below, you create a subject which is the simplest way to create hot observables. You then use the method next to emit values on the subject stream. You can see that the console logs are written even when nobody registered to the hot observable, represented by a subject.

(Run example in CodeSandbox)

import { Subject } from "rxjs";

let currentValue = 0;
const counterSubject = new Subject();

setInterval(() => {
  currentValue++;
  console.log(`Emitting value ${currentValue}`);
  counterSubject.next(currentValue);
}, 1000);

/* Console Log:  
Emitting Value 1
Emitting Value 2
Emitting Value 3
…
…
Emitting Value 1000 <- - - - - infinite console log every 1 second
*/
Enter fullscreen mode Exit fullscreen mode

(2) Hot observables share the stream between all subscribers, which means that three subscriptions will use the same single stream. Any value emitted will reach all the observable subscriptions, whereas cold observables create a separate stream for every subscription. Three subscriptions will result in the same code running three times, one for each subscriber.

Continuing our previous example, you register twice to the subject and write to the console the values of stream A and B. Note that the message emitted to the console in the interval is emitted only once.

(Run example in CodeSandbox)

import { Subject } from "rxjs";

let currentValue = 0;
const counterSubject = new Subject();

setInterval(() => {
  currentValue++;
  console.log(`Emitting value ${currentValue}`);
  counterSubject.next(currentValue);
}, 1000);

counterSubject.subscribe(result => {
  console.log(`A > ${result}`);
});

counterSubject.subscribe(result => {
  console.log(`B > ${result}`);
});

/* Console Log  
Emitting value 
A > 1
B > 1
Emitting value 2
A > 2
B > 2
…
…
Emitting value 1000 <- - - - - infinite console log every 1 second
A > 1000
B > 1000
*/
Enter fullscreen mode Exit fullscreen mode

Hot Observables Tips

Tip #1 – You Cannot Restore Past Events

The thing about a subject is that you are notified of its current and future values once you subscribe to it. So the user is not aware of any previous values emitted by the subject. In the example below, you register to the subject only after three seconds, and you can see in the console log that you don't register the first three values.

(Run example in CodeSandbox)

import { Subject } from "rxjs";

let currentValue = 0;
const counterSubject = new Subject();

setInterval(() => {
  currentValue++;
  console.log(`Emitting value ${currentValue}`);
  counterSubject.next(currentValue);
}, 1000);

setTimeout(() => {

  console.log(`Subscribing to subject`);
  counterSubject.subscribe(result => {
    console.log(`A > ${result}`);
  });
}, 3000);

/* Console Log:  
Emitting value 
Emitting value 2
Emitting value 3
Subscribing to subject
Emitting value 4
A > 4
Emitting value 5
A > 5
Emitting value 6
A > 6
…
…
Emitting value 1000 <- - - - - infinite console log every 1 second
A > 1000
*/
Enter fullscreen mode Exit fullscreen mode

Tip #2 – You Cannot Recover From An Error

When an error is thrown on a stream, it is closed immediately, regardless if it is a cold or a hot observable. I don't remember times where I throw errors on a hot observable because the users will not be able to recover from it.

Instead, consider exposing a status on the observable value, so the user will be able to react to the error and continue getting new values once they are emitted.

Special Kinds Of Subjects

Let's review two variants of subjects that can help us with previous values.

A ReplaySubject is used to echo the last X emitted values.  The number of values to bank is configurable upon the creation of the subject. For example, you can use ReplaySubject to emit updates about stocks. In the ReplaySubject constructor, you provide the number of last emitted values offered, and during subscription, the subscriber will immediately get those values. Extending our previous example, all you need to do is to use ReplaySubject instead of the subject to achieve that outcome.

(Run example in CodeSandbox)

import { ReplaySubject } from "rxjs";

let currentValue = 0;
const counterSubject = new ReplaySubject();

setInterval(() => {
  currentValue++;
  console.log(`Emitting value ${currentValue}`);
  counterSubject.next(currentValue);
}, 1000);

setTimeout(() => {
  console.log(`Subscribing to the subject`);
  counterSubject.subscribe(result => {
    console.log(`A > ${result}`);
  });
}, 3000);

/* Console Log: 
Emitting value 1
Emitting value 2
Emitting value 3
Subscribing to subject
A > 1
A > 2
A > 3
Emitting value 4
A > 4
Emitting value 5
A > 5
…
…
Emitting value 1000 <- - - - - infinite console log every 1 second
A > 1000
*/
Enter fullscreen mode Exit fullscreen mode

A BehaviorSubject is used to represent a behavioral state. For example, you can use BehaviorSubject to keep data about the direction of a car. Every time you get a new direction, you emit it on the subject, the subject retains that value internally. It then:

(1) emits it immediately when someone subscribes to the subject.

(2) exposes a unique function that can be called to retrieve that value synchronously.

(Run example in CodeSandbox)

import { BehaviorSubject } from "rxjs";

const direction = ["left", "top", "right", "bottom"];
let directionIndex = 0;
const directionSubject = new BehaviorSubject(direction[directionIndex]);

setInterval(() => {
  directionIndex++;
  const newDirection = direction[directionIndex % 4];
  console.log(`Emitting value ${newDirection}`);
  directionSubject.next(newDirection);
}, 1000);

setTimeout(() => {
  console.log(
    `directionSubject.value = '${directionSubject.value}' (synchronous API)`
  );
  console.log(`Subscribing to the subject`);
  directionSubject.subscribe(result => {
    console.log(`A > ${result}`);
  });
}, 3000);

/* Console Log: 
Emitting value top
Emitting value right
Emitting value bottom
directionSubject.value = 'bottom' (synchronous API)
Subscribing to subject
A > bottom
Emitting value left
A > left
…
…
Emitting value top <- - - - - infinite console log every 1 second
A > top
*/
Enter fullscreen mode Exit fullscreen mode

An observable can be warm observable. This Is a funny term that describes a stream that has a mixed behavior of cold and hot observables. The observable will not create a stream until it has a subscriber, a behavior that resembles a cold observable. But any further subscriptions that are active at the same time will share the same stream created by the first one, a behavior that resembles a hot observable

Why bother with RxJS when you can use Promises?

I hear this question frequently since they both deal with asynchronous operations. It might seem the same at the beginning, but once you understand RxJS and observables, the differences are quite noticeable.

  • Promises cannot be canceled, while observable can be canceled quickly. 

  • Promises are limited when it comes to complex flows. RxJS supports powerful operator chaining to modify the result of the flow.

  • A promise runs immediately. Observables are executed only when needed.

  • A promise returns one value max and then completes right after. Observables can emit multiple values and stop only if errored or if the event's producer completes the observable.

  • You can use async/await with promises, not with observables.

  • Modern browsers widely support promises, whereas RxJS adds 10.9kB minified and gzipped (although it can participate in tree shaking).

  • Promises are always asynchronous, whereas observables can be either synchronous or asynchronous.

  • Promises are easy to master, whereas it takes time to feel comfortable with observables.

How to choose between Promises and RxJS?

You should use RxJs if one or more stipulations below are relevant to your project.

  1. When you need to allow canceling of operations.

  2. When you have complex operations.

  3. When you create your state management and need a library to manage event-based services (data services).

  4. If you are already using RxJS in your application (no reason not to use it if it is there).

  5. When you are not worried about an extra 10.9kB bundled to your application.

How to proceed?

I'm working on a dedicated post that elaborates about standard operators and use-cases. If you would like to learn more about the RxJS core concepts, read the article  "The Introduction To Reactive Programming You've Been Missing".

Use Learn RxJS for a detailed list of operators grouped by contexts such as combination operators, creation operators, and filtering operators.

Follow the interactive Functional Programming in Javascript guide to learn about five functions that are probably the most powerful, flexible, and useful functions you'll ever learn.

Print the following visual tables that help you to find the most relevant creator operator to use and the most relevant instance operator to use. Note that those tables are not maintained, and you might encounter an old implementation that is not supported anymore.

Checkout thinkrx playground that provides visualized examples that can be manipulated in real-time for many standard operators.

You can consider purchasing the course Use Higher-Order Observables in RxJS Effectively from @andrestaltz in egghead for more useful examples.


The post Understanding RxJS - Learn From Examples - Part 1 appeared first on sakalim.com blog.

Cover photo by Veronica Silva on Unsplash

Latest comments (5)

Collapse
 
xuhen profile image
Edward xu

great writting, looking for your part2 :)

Collapse
 
diamonddarrell profile image
Stas Kohut

A great post, love it! ❤️

One thing though, when you described API of a subject for throwing errors there’s a tiny typo: it’s .error() function, not .throw() 😉

Collapse
 
eransakal profile image
Eran Sakal

Thanks Stas!
Nice catch, I have missed it since the console showed the expected output... the word error :)

Adding the error message to the console showed

 -- error with message: observer.throw is not a function 

and now it shows the expected error

 -- error with message: something bad happened

I updated the post accordingly, Thanks again!

Collapse
 
jwp profile image
John Peters

Thanks Eran;
Excellent article!

Collapse
 
eransakal profile image
Eran Sakal

Thank you John, glad to here that.