Since I was a kid, I’ve always been curious about how things work under the hood. At the same time, I’ve always loved learning and building things from the bottom up to deeply understand new topics.
Translated to programming, I strongly believe that learning the fundamental ideas behind a library/framework is the best way to build a solid knowledge about the topic and “beyond” the topic. Hence, I wrote this article!
In this post we’ll start to implement RxJS from scratch, step by step. We’ll follow an architecture equivalent to the real RxJS codebase but without all the optimizations and non-essential features.
We’ll start by building the core abstraction of the library: the Observable. Along the way, we’ll also create Observers, Subscribers and Subscriptions, while in the next episode we’ll implement pipeable operators.
Introduction
Why RxJS has become so popular? The answer comes straight from the docs:
RxJS is a library for composing asynchronous and event-based programs by using observable sequences.
The library makes these tasks simpler, declarative and easy to reason about. To achieve this goal, RxJS provides three main benefits:
- clear control flow: pipeable operators help you easily control how the events flow through your Observables
- function purity: the ability to produce and process values “safely” using pure functions
- value transformation: you can transform values passed through your Observables as you want
Let’s clarify why RxJS is so powerful by making a toy-example (and trust me, you can do much much more with RxJS).
// without RxJS: IMPERATIVE
let count = 0
const rate = 1000
let lastClick = Date.now() - rate;
document.addEventListener('click', event => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count);
lastClick = Date.now();
}
})
// with RxJS: DECLARATIVE
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
map(event => event.clientX),
scan((count, clientX) => count + clientX, 0)
)
.subscribe(console.log)
Everything seems built around Observables, that’s indeed the case. So, what is an Observable? Let’s explain this concept.
Observables
Observables are lazy Push collections of multiple values.
Let’s break the definition into parts.
The terms Pull and Push describe how a producer can communicate with a consumer. Functions are pull entities, in fact the caller (the developer) explicitly asks data to them. Conversely, in a push system the producer itself is in charge of sending data to a consumer, which doesn’t know when events will be emitted.
Observables are a push system, like Promises. In fact, they can be “listened to” and they are responsible for emitting values, by wrapping a specific data source (like DOM events, intervals, promises, sync data and so on).
Observables are like Promises but with some differences:
- they can “resolve” multiple times emitting multiple values
- they have a
subscribe
method, instead of athen
one - they could be unsubscribed before completion
To conclude, what is meant by "lazy collection"? Observables are lazy collections because they do not actually emit data until subscribed. Simple!
Here’s an explicative image about the differences between sync and pull entities (functions, generators) and async and push entities (promises and observables).
Functions are asked for a single synchronous value, while generators for multiple values. Promises emit (resolve to) a single value asynchronously and they can be listened to using their methods (then, catch, finally). Observables can be subscribed (listened) too, but they can emit multiple values over time.
Enough theory, let's get practical. It’s time to dive into our implementation of RxJS starting from Observables.
Observables & Subscribers
The Observable abstraction is implemented as a class.
class Observable {
constructor(initFunc) {
this.initFunc = initFunc;
}
subscribe(observer) {
return this.initFunc(observer);
}
}
The Observable class requires a function as an argument in its constructor. This function is responsible for generating and emitting values to a consumer (called Observer), but it’s not immediately invoked. Instead, it’s stored inside the class instance. When we subscribe to the Observable, the initFunc
will be called with the observer as an argument.
For now, it's enough to know that an Observer is just an object with three methods each of them being eventually called when the Observable emits data. The method next
is invoked if everything is fine, err
if there is an error, complete
when the Observable completes or become exhausted.
Side note: we use a setInterval
inside the init function, but the logic we build will be applicable to each type of Observable.
Let’s try our primitive Observable.
const myIntervalObx = new Observable(observer => { // <- provide the initFunc argument
let counter = 0
setInterval(() => observer.next(++counter), 700)
})
myIntervalObx.subscribe({
next: (val) => console.log(val),
error: (err) => console.log(err),
complete: () => console.log('Completed!')
})
// 1 <- emission starts after subscribing
// 2
// 3
// ...
Only if and when subscribed, the Observable will call the stored function with the Observer as an argument, starting the value emission (in this case a simple interval).
Nice! But now we have a problem: if we call next
after complete
inside our init function values will be emitted to the Observer nonetheless. We want to stop event emission after a complete
call instead.
To make the problem explicit look at the snippet below.
const myIntervalObx = new Observable(observer => {
let counter = 0
setInterval(() => observer.next(++counter), 700)
setTimeout(() => observer.complete(), 2500)
})
myIntervalObx.subscribe({
next: (val) => console.log(val),
error: (err) => console.log(err),
complete: () => console.log('Completed!')
})
// 1
// 2
// 3
// Completed! <- observable calls 'complete'
// 4 <- values are still emitted
// 5
// ...
As explained above, after 2500ms the Observable become completed, but the next
invocation inside the interval callback is still active and running.
To avoid this problem, let’s make a safe observer called Subscriber by wrapping the Observer itself in a proxy-like object. The Subscriber checks if complete has already been called, and eventually stops the event propagation into the wrapped Observer.
// a safe wrapper around observers
class Subscriber {
constructor(observer) {
this.observer = observer;
this.closed = false;
}
next(value) {
if (!this.closed) {
this.observer.next(value);
}
}
error(err) {
if (!this.closed) {
this.closed = true;
this.observer.error(err);
}
}
complete() {
if (!this.closed) {
this.closed = true;
this.observer.complete();
}
}
}
Now we need to change the subscribe method of the Observable class.
class Observable {
//...
subscribe(observer) {
const subscriber = new Subscriber(observer)
return this.initFunc(subscriber)
}
}
// after completion, the events will not propagate further
myIntervalObx.subscribe({ /* same as before */ })
// 1
// 2
// 3
// Completed! <- stops here
We have solved the problem! Well, not completely. Although the event propagation is stopped, the interval is still running and consuming resources. Let’s see how to solve this issue.
Teardown Logic
Let’s make the problem explicit by adding a log inside our interval callback.
const myIntervalObx = new Observable(observer => {
let counter = 0
setInterval(() => {
counter++
console.log(`Still active. Current value: ${counter}`)
observer.next(counter)
}, 700)
setTimeout(() => observer.complete(), 2500)
})
myIntervalObx.subscribe({ /* ... */ })
// Still active. Current value: 1 <- from the interval callback
// 1 <- from the Observer
// Still active. Current value: 2
// 2
// Still active. Current value: 3
// 3
// Completed!
// Still active. Current value: 4 <- interval still running after ‘complete’
// Still active. Current value: 5
// ...
We need a way to clean up resources used by Observables. We’ll return a function from initFunc
, which will be used to execute a teardown logic. We will call this function "subscription", which will in turn be returned from the subscribe method.
const myIntervalObx = new Observable(observer => {
let counter = 0
let id = setInterval(() => {
counter++
console.log(`Still active. Current value: ${counter}`)
observer.next(counter)
}, 700)
setTimeout(() => observer.complete(), 2500)
// return the teardown logic
return () => {
console.log('Teardown logic')
clearInterval(id)
}
})
const subscription = myIntervalObx.subscribe({ /* same as before */ })
// logs...
subscription() // <- stops the interval
We are almost done! I’m sure you noticed another issue: the completion of the Observable and the teardown logic are kind of independent. Do we like this behaviour? No, at all!
It doesn’t make sense to keep waisting resources for a completed Observable, vice versa it doesn’t make sense to call the teardown logic without closing the Observable too. So, the last step to complete our Observable implementation is to sync the completion with the teardown logic. We need to create the Subscription class to handle this task and to improve the management of teardown functions.
Subscriptions
The Subscription class is a container for functions, including the old subscription one. Each of these functions will be called by invoking the unsubscribe
method. Here’s the implementation.
// a container for functions
class Subscription {
constructor() {
this.teardowns = [];
}
add(teardown) {
this.teardowns.push(teardown);
}
unsubscribe() {
this.teardowns.forEach(teardown => teardown())
this.teardowns = [];
}
}
In order to couple the completion logic (observer.complete
) and the teardown logic (returned from the init function) we must give the ability to both the Subscription and the Subscriber to call both the functions.
Now, bear with me for a minute! 😁 To synchronize the two logics we must:
- provide the Subscription to the Subscriber constructor, which will add an Observer completion logic to the Subscription container
- enable the Subscriber to call
unsubscribe
on completion - add the teardown logic (returned from initFunc) to the Subscription container
- return the Subscription from the Observable subscribe method (same as before)
Remember that here the Subscription object is passed by reference.
Now both the unsubscription from the outside and the completion of the Observable will execute both the completion logic and the teardown logic. To clarify the idea, look below at the refactored Observable and Subscriber classes.
class Observable {
constructor(initFunc) {
this.initFunc = initFunc;
}
subscribe(observer) {
const subscription = new Subscription()
const subscriber = new Subscriber(observer, subscription) // <- passed by reference
const teardown = this.initFunc(subscriber)
// 3. add the teardown logic to the Subscription instance
subscription.add(teardown) // <- second function inside the subscription
return subscription
}
}
// a safe wrapper around observers
class Subscriber {
constructor(observer, subscription) {
this.observer = observer;
this.closed = false;
this.subscription = subscription
// 1. add an Observer completion logic to the Subscription container
this.subscription.add(() => this.closed = true) // <- first function inside the subscription
}
next(value) {
if (!this.closed) {
this.observer.next(value);
}
}
error(err) {
if (!this.closed) {
this.closed = true;
this.observer.error(err);
// 2. enable the Subscriber to call `unsubscribe` on completion
this.subscription.unsubscribe() // <- unsubscribe on error
}
}
complete() {
if (!this.closed) {
this.closed = true;
this.observer.complete();
this.subscription.unsubscribe() // <- unsubscribe on completion
}
}
}
To summarize, the subscriber can call unsubscribe
in case of completion/error causing both stored functions to run, which is also true if the caller (the developer) calls the unsubscribe method from the outside. To be more precise, in the latter case the subscriber closes by setting the flag this.closed
to true, but it doesn’t actually call the complete
method of the Observer. Same for the original RxJS library.
We have synchronized the teardown logic with the completion of the Observable. Now we are really done! 😁
All the other parts of RxJS will be an extension of this logic, as you will see with pipeable operators in the next article!
Conclusions
We have implemented the first part of our version of RxJS by creating the Observable class. The logic in charge to generate and emit values is provided to Observables through an init function. In a real-world scenario, RxJS provides us with creation operators to easily generate Observables for a lot of sync and async cases.
The examples below both return an Observable.
// CREATION OPERATORs
const fromEvent = (eventTarget, eventType) => {
return new Observable(observer => {
const eventHandler = e => observer.next(e)
eventTarget.addEventListener(eventType, eventHandler)
return () => {
eventTarget.removeEventListener(eventType, eventHandler)
}
})
}
const interval = (period) => {
return new Observable(observer => {
let counter = 0
const id = setInterval(() => observer.next(++counter), period)
return () => {
clearInterval(id)
}
})
}
RxJS has a lot more to offer. Pipeable operators are the next big things to integrate into our library. These operators allow us to concatenate multiple Observable to easily handle complex streams of data.
Reactive programming is a complex topic, but I think that a solid understanding of the building blocks is the proper way to learn RxJS. I hope this article has helped you!
See you in part two! 😁
PS: English is not my mother tongue, so errors are just around the corner. Feel free to comment with corrections!
Top comments (4)
Great article! Please keep it up)
Thank you!
Thanks
Great article! Thank uuu :)))