DEV Community

Cover image for How I Reverse Engineered RxJs and Learned Reactive Programming? Part 2
Shadid Haque
Shadid Haque

Posted on

How I Reverse Engineered RxJs and Learned Reactive Programming? Part 2

Hello friends, glad you could make it to part 2 of this series. If you haven’t looked at part one please check it out in the link below.

In part one we discussed observables and some core concepts of Reactive Programming. Here, in part 2, we will further indulge ourselves into reactive programming and continue the voyage of reverse engineering RxJs. we will discuss how to reverse engineer operators such as Rx.map, filter. We will also see how observable can be user to replace Promises in javascript.

If you have worked with Angular or any other web applications you have probably noticed that we pass in data from component to component. For instance once an api request is made we receive some data and based on the data we get our components to update their states. We sometime also filter and propagate a portion of the data to other components.

Where am I going with this? You see when we make an api call or when some user interaction happens in our application we are generating a stream of data. We can listen to this stream of data with an observable, furthermore we can also create other observables which will listen to that first observable. In larger applications we see this chaining of observables very often. Returning an observable from another observable is one of the core part of RxJs library.

Here's a diagram to visualize the data flow in an application.

diagram

Alright, in our reactive library we also want to be able to chain observables with each other. First of all we need to mock an api call. So let's do that

function fakeApi(param) {
 return new Promise((resolve, reject) => {
   setTimeout(() => {
     resolve(100)
   }, 1000);
 })
 // or reject
}
Enter fullscreen mode Exit fullscreen mode

As you can see this function is returning a promise. Now let's replace this promised based function to use observable.

const o = new Observable();
function fakeApi(param) {
 setTimeout(() => {
   o.emit(100);
 }, 1000);
}
Enter fullscreen mode Exit fullscreen mode

instead of resolving for a promise we are emitting our result through an observable. Notice that our code works as expected. I believe you have started to see the power of observables by now. Interesting!!! isn't it? 😊😊

Now we want to observe o with another observable. Just like the diagram above. So how do we do that?. Well we already had a pipe function. If you recall the example from previous post, we had the following

const pipe = (...funcs) => x => funcs.reduce((effects, f) => f(effects), x);
Enter fullscreen mode Exit fullscreen mode

Now let’s add this pipe function to our Observable class and return a new Observable

class Observable {
 constructor() {
   this.fnArray = [];
 }

 subscribe(fn) {
   this.fnArray.push(fn);
 }

 emit(v) {
   this.fnArray.map(fun => fun(v));
 }
+ pipe(observable) {
+   const fn = x => observable.emit(x);
+   this.subscribe(fn);
+ }
}
Enter fullscreen mode Exit fullscreen mode

So in the pipe method we took in another observable, used that incoming observable to emit a value through a curried function, and then we subscribe to that curried function.

Functional Programming Funny

So, why did we do that? Well, because we needed the ability to pipe an observable and return another observable. Let's take a look at the code below.

const o = new Observable();
const o2 = new Observable();

function fakeApi(param) {
  setTimeout(() => {
    o.emit(1);
    o.emit(2);
    o.emit(3);
  }, 1000);
}

fakeApi();
o.pipe(o2);

o2.subscribe(
  pipe(
    double,
    double,
    printFunction
  )
);
Enter fullscreen mode Exit fullscreen mode

o is now piping to another observable o2.

outputs

I will print the 4 
I will print the 8 
I will print the 12 
Enter fullscreen mode Exit fullscreen mode

Pretty cool huh? Okay, let’s step it up a notch. We will now try to implement our very own map operators for observable. If you go to the official docs of Rx.js you will find a proper definition of map.

https://rxjs-dev.firebaseapp.com/api/operators/map

In brief map is a function that takes in a function as parameter applies that function and returns a new observable.

If you look at the picture below it makes more sense.

map-ss

class Map {
 constructor(func) {
   this.observable = new Observable();
   this.func = func;
 }
 subscribe(cb) {
   this.observable.subscribe(cb);
 }
 emit(x) {
   const val = this.func(x)
   return this.observable.emit(val);
 }
}
Enter fullscreen mode Exit fullscreen mode

We created a class called Map. In the constructor we initiate a new Observable and we store the passed in function func. In subscribe we subscribe to that specific observable. And finally, in the emit function we apply the this.fun with param x and return the emit of the observable we initiated.

Since map is a keyword in JavaScrip we should encapsulate our map inside an object. So let’s do that.

const Rx = {};
Rx.map = f => new Map(f);
Enter fullscreen mode Exit fullscreen mode

Here we created an empty object Rx and set its key map to a curried function that initiates a new Map() object with a passed function. Now we can use this map like below,

let c = o.pipe(Rx.map(v => v * -3));
c.subscribe(
  pipe(
    double,
    printFunction
  )
);
Enter fullscreen mode Exit fullscreen mode

Which outputs

I will print the -6 
I will print the -12 
I will print the -18 
Enter fullscreen mode Exit fullscreen mode

There you have it. We have our very own map method now. In a similar manner we can also reverse engineer the filter operator.
Let's head to the RxJs docs to see how it works.
http://reactivex.io/documentation/operators/filter.html

pic-rxjs

The picture gives us a broader perspective of its functionality. Similar to map it takes in a function. The only difference is that it is a comparison function. So let's build it.

class Map {
  constructor(func) {
    this.observable = new Observable();
    this.func = func;
  }
  subscribe(cb) {
    this.observable.subscribe(cb);
  }
  emit(x) {
    const val = this.func(x);
    return this.observable.emit(val);
  }
}
//...
Rx.map = f => new Map(f);
Enter fullscreen mode Exit fullscreen mode

When we run it like before we get

let c = o.pipe(Rx.filter(v => v > 0));
c.subscribe(
  pipe(
    double,
    printFunction
  )
);

// output
I will print the 4 
I will print the 6 
Enter fullscreen mode Exit fullscreen mode

I hope you can see a pattern now. RxJs operators are just functions performing certain tasks and we can break them down.

I hope this article was helpful, please leave a comment, like, share and follow me if you would like to see more articles.
🚀🚀🚀🚀🚀🚀

Top comments (3)

Collapse
 
valix85 profile image
Valerio

There is an error in filter box code? Or it's correct? I'm confuse...

Collapse
 
shadid12 profile image
Shadid Haque

Thanks for pointing that out. I'll fix it:)

Collapse
 
abourass profile image
Antonio B. • Edited

Still has an error @ c.subscribe

TypeError: Cannot read property 'subscribe' of undefined

In addition, a link to completed version of this on github would be nice so we can double check that our version matches when trying to resolves errors along the way. Further, the second example 'filter' is just a copy of that map function again