Today, the RxJs library is rather known amongst frontend developers. It is a powerful library, which I enjoy using!
But lately, I wanted to give to some of my students a taste of that library, by making them implement a rather simple Observable
class.
What our Observable
class should do
-
subscribe
: obviously, we'd like ourObservable
to allow the users to subscribe to its value(s). This method should take in argument a function to execute everytime the value changes. This method will return a Subscription object; -
next
: ourObservable
will rather be a Subject, as we're going to implement anext
method. Thisnext
method will allow us to simply update theObservable
inner value(s); -
pipe
: we would like ourObservable
to allow multiple treatments to be done to its value(s), but without directly modifying it. This method should return a newObservable
; -
unsubscribe
: this method will actually belong to the Subscription class.
The specs are pretty simple, let's get down to it.
A first implementation
For a start, here is our Observable
class declaration
class Observable {
}
Wonderful. Now, our class will contain an inner value. This value will be private. Which means we can do it according to two ways: we either use the #myPrivateField
notation -but it is still at stage 3, hopefully stage 4 in 2021-, or declare everything in the constructor.
For a change, I will use the first way.
class Observable {
#value;
constructor(value) {
this.#value = value;
}
}
So now, we basically have a class with an inner value we cannot access.
We most certainly can move on to the next part of this article.
The subscribe
method
We want to register a function that will be executed everytime our Observable
's value will change.
In order to do that, we will need a simple array -let's call it subscriptions
and make it a private field- and a method that push the function into the array. This method should also execute the function as soon as it is subscribed, passing the value to it.
class Observable {
#value;
#subscriptions = [];
constructor(value) {
this.#value = value;
}
subscribe(f) {
this.#subscriptions.push(f);
f(this.#value);
}
}
The next
method
This method should allow the user to update the inner value of the Observable
. It should also trigger all of the subscriptions.
class Observable {
#value;
#subscriptions = [];
constructor(value) {
this.#value = value;
}
subscribe(f) {
this.#subscriptions.push(f);
f(this.#value);
}
next(value) {
this.#value = value;
this.#subscriptions.forEach((f) => f(this.#value));
}
}
The pipe
method
A pipe
should take in parameters an undefined number of functions to execute, and should execute them by passing to the next the result of the previous one.
An implementation, using the Array.prototype.reduce
method, could be this:
function pipe(...fList) {
return (...args) => {
return fList.slice(1)
.reduce((f, val) => f(val), fList[0](...args));
}
}
This implementation actually returns a new pipe.
In our case, it will be a little different, as we already have an initial value, and we do not want to return a new pipe. Also, our pipe
method should return a new Observable
, containing a new value.
class Observable {
#value;
#subscriptions = [];
constructor(value) {
this.#value = value;
}
subscribe(f) {
this.#subscriptions.push(f);
f(this.#value);
}
next(value) {
this.#value = value;
this.#subscriptions.forEach((f) => f(this.#value));
}
pipe(...fList) {
const obs = new Observable();
const res = fList.slice(1)
.reduce((val, f) => f(val), fList[0](this.#value));
obs.next(res);
return obs;
}
}
The unsubscribe
method
As I previously said, the unsubscribe
method will belong to a Subscription
class.
This class should allow to unregister a function previously registered with the Observable.prototype.subscribe
method.
It will need no argument, and will return nothing. If the subscription is already unsubscribed, then it will silently do nothing.
It should work as follow:
const obs = new Observable(12);
// will print 12
const sub = obs.subscribe((val) => console.log(val));
// the subscription will print "toto"
obs.next('toto');
sub.unsubscribe();
obs.next('something'); // nothing will happen
The Subscription
class should have a function passed as a constructor argument. This function would be its unsubscribe
method, and it would be created during the subscribe
procedure.
Here is how I am going to do it:
First of all, I am going to change a little bit the way we store our functions in the #subscriptions
array of the Observable
class: I am going to use a Map
.
This Map
will pair an ID with a function.
Next, I am going to implement a class Subscription
that will only take an unsubscribe
function as a constructor parameter. If nothing is supplied, it will simply set a default function that does nothing.
Finally, in the subscribe
method of Observable
, I will refactor a bit the code and return a new Subscription
.
class Observable {
#value;
#subscriptions = new Map();
constructor(value) {
this.#value = value;
}
subscribe(f) {
const id = this.#subscriptions.size;
this.#subscriptions.set(id, f);
f(this.#value);
return new Subscription(() => this.#subscriptions.delete(id));
}
next(value) {
this.#value = value;
this.#subscriptions.forEach((f) => f(this.#value));
}
pipe(...fList) {
const obs = new Observable();
const res = fList.slice(1).reduce((val, f) => f(val), fList[0](this.#value));
obs.next(res);
return obs;
}
}
class Subscription {
constructor(unsubscribe = () => void 0) {
this.unsubscribe = unsubscribe;
}
}
And voilà !
We now have a very minimalist Observable
class with a Subscription mechanism, allowing us to defer treatments.
Be careful using this implementation, as we did not cover the whole range of possible scenarios. For instance, the following code would not work:
const obs = new Observable([
{name: 'john', age: 28},
{name: 'bastien', age: 24},
{name: 'julia', age: 40}
]);
const sub1 = obs.subscribe(
([john, bastien, julia]) => {
console.log(john, bastien, julia);
}
);
obs.next(12);
If you have not figured out why this code would throw an exception, try it in your browser console.
We will cover this, and much more, in the upcoming posts. In the meantine, do not hesitate to react in the comments and / or to give me your thoughts about it :)
Bye!
Top comments (0)