## Introduction

On the previous post of the series we built our own `Observable`

class.

```
class Observable {
private _subscribe;
private _unsubscribe;
private _stopped = true;
constructor(subscribe) {
this._subscribe = subscribe;
}
_stop() {
this._stopped = true;
setTimeout(() => {
this._unsubscribe();
});
}
subscribe(observer) {
this._stopped = false;
this._unsubscribe = this._subscribe({
next: (value) => {
if (!this._stopped) {
observer.next(value);
}
},
complete: () => {
if (!this._stopped) {
observer.complete();
this._stop();
}
},
error: () => {
if (!this._stopped) {
observer.error();
this._stop();
}
},
});
return { unsubscribe: this._unsubscribe };
}
}
```

Let's see now how to build and compose operators in RxJS.

### Definition

An operator is a function that takes a source `Observable`

as a parameter and returns a new destination `Observable`

. It reacts on the three events from the source observable and depending on the operator logic sends specific events to the destination `Observable`

.

### Custom Operators

Let's build a custom operator that filters out odd numbers.

```
function even(source: Observable) {
const destination = new Observable((observer: Observer) => {
const subscription = source.subscribe({
next: (value) => {
if (value % 2 === 0) {
observer.next(value);
}
},
error: (e) => {
observer.error(e);
},
complete: () => {
observer.complete();
},
});
return () => {
subscription?.unsubscribe();
};
});
return destination;
}
```

Let's apply the operator directly on an `Observable`

without using the pipe function (we'll come back to this later), this is fundamental to understanding operators and how data flows from one to another.

```
const even$ = even(interval$);
const subscription = even$.subscribe({
next: (event) => console.log(event),
});
// later
subscription.unsubscribe();
```

`even$`

is the inner destination `Observable`

created and returned by the `even`

function.

When we unsubscribe from `even$`

, we have to unsubscribe from the source Observable, it is our responsibility to add this logic.

### Configurable custom operators

Now we want to have a mulitply operator that takes the number to multiply by as a parameter. We create a higher order function that, when invoked, returns the actual operator.

```
function multiply(by) {
return function (observable: Observable) {
return new Observable((observer: Observer) => {
const subscription = observable.subscribe({
next: (value) => {
observer.next(value * by);
},
error: (e) => {
observer.error(e);
},
complete: () => {
observer.complete();
},
});
return () => {
subscription?.unsubscribe();
};
});
};
}
```

### pipe

Let's say we want to display only the odd numbers from a interval stream that we have multiplied each value by 3.

```
const interval$ = interval(1000);
const intervalBy3$ = multiply(3)(interval$);
const even$ = even(intervalBy3$);
even$.subscribe({
next: (event) => console.log(event),
});
```

In one line, composing the two function calls.

```
const even$ = even(multiply(3)(interval$));
```

Pipe is just a utility function that pipes functions together, it is not specific to operator functions, it can be used to compose any functions.

```
import { pipe } from "rxjs";
pipe(multiply(3), even)(interval$) === even(multiply(3)(interval$));
```

Preferably we'd want to have the pipe method in our `Observable`

class.

```
import { pipe } from "rxjs";
class Observable {
constructor(subscribe) {
this._subscribe = subscribe;
}
subscribe(observer) {
return this._subscribe(observer);
}
pipe(...operators) {
return pipe(...operators)(this);
}
}
interval$.pipe(multiply(3), even).subscribe({
next: (event) => console.log(event),
});
```

At this point you should have got the whole picture, let's have some practice on the remaining operators: map, take and switchMapTo.

### map

`map`

is easy, we subscribe to the source `observable`

and emit the values using the passed in projection function.

```
function map(projection) {
return function (source) {
return new Observable((observer) => {
const subscription = source.subscribe({
next: (value) => {
observer.next(projection(value));
},
error: (e) => {
observer.error(e);
},
complete: () => {
observer.complete();
},
});
return () => {
subscription?.unsubscribe();
};
});
};
}
```

### take

```
interval$
.pipe(
take(5),
map((val) => val * 2)
)
.subscribe({ next(value) { console.log(value), complete() => console.log('End of stream') });
```

In the example above we're interested only on the first 5 interval events, on the fifth event `take(5)`

:

- completes the source observable (interval$)
- completes the observer otherwise which also completes its observer otherwise the complete in our subscribe will never occur.

```
function take(maxEvents) {
return function (source: Observable) {
return new Observable((observer) => {
let counter = 0;
const subscription = source.subscribe({
next(value) {
observer.next(value);
if (++counter === maxEvents) {
subscription?.unsubscribe();
observer.complete();
}
},
error(e) {
observer.error();
},
complete() {
observer.complete();
},
});
return () => {
subscription?.unsubscribe();
};
});
};
}
```

### switchMapTo

In `switchMapTo`

, we are interested in the source `observable`

only to know that a new event has occurred.

Each time we receive an event from the source observable, we switch to the destination `observable`

, the inner observable, subscribe to it, and send value to the destination Observable.

When a new event is emitted by the source observable, we unsubscribe from the internal `observable`

and create a new subscription. This "unsubscription" is very important because in our case we do not want to have any timers still active.

If we receive an error from the source observable or the innrer observable we pass it down to the observer right away.

If we receive a completion from the source observable we wait until the active inner observable completes then we complete the observer.

```
function switchMapTo(destination: Observable) {
return function (source: Observable) {
return new Observable((observer) => {
let innerSubscription;
let innerCompleted = true;
let isComplete = false;
const checkCompletion = () =>
isComplete && innerCompleted && observer.complete();
const subscription = source.subscribe({
next: (value) => {
innerSubscription?.unsubscribe();
innerSubscription = destination.subscribe({
next(value) {
observer.next(value);
},
error(e) {
observer.error();
},
complete() {
innerCompleted = true;
checkCompletion();
},
});
},
error: (e) => {
observer.error(e);
},
complete: () => {
isComplete = true;
checkCompletion();
},
});
return () => {
innerSubscription?.unsubscribe();
subscription?.unsubscribe();
};
});
};
}
```

## Resources

## Practice

You might have noticed that the timer does not start right away when you click on the button. To fix that we can use `startWith`

operator.

It is your turn to implement it here.

## Summary

Understanding the internal mechanisms of RxJS allowed us to develop robust operators. An operator can be considered as a helper function that is not really bound to a specific domain and that we can reuse in several applications.

In the next article, we will discuss Unicast and Multicast Observables.

## Support

If you like the article, let me know, I hardly ever write, it will motivate me to produce more content.

## Top comments (0)