DEV Community

Cover image for How I created an Observable library - part 1
Thomas Toledo-Pierre
Thomas Toledo-Pierre

Posted on

How I created an Observable library - part 1

TL;DR

I created a small library called FleuveJS. This post shares how I've come to create it and how it has evolved. Also, I am looking for your feedback on it :-)


Two years ago, I was working on an Angular project (I always work on Angular projects). For those of you who do not know: Angular comes with RxJs, an awesome library to manipulate Observables.

I have always found this library pretty remarkable: the diversity of operators and the structures you can manipulate (Observables, Subjects, BehaviorSubjects) allow developers to write reactive and powerful code. And as I was working on my Angular project, I started to wonder: how hard would it be to write an RxJs-like library? I was really curious about the challenges they would face and how they would overcome it.

Also, I was wondering: would it be possible to have an infinite source of data with Observables?

And so, I started my project.

What should be the MVP?

My first question was: what should be the minimum my library should be able to do?

I knew I wanted to manipulate Observables. What can we do with an Observable ?

To me, an Observable should allow the following operations:

  • subscribe
  • next
  • pipe

I never used the forEach method so I skipped it. The next method was because I wanted to have something mutable.

There it was. My first version should propose to developers a simple structure with three methods: subscribe, next and pipe.

The technical stack

I wanted to keep it as simple as possible: I only used TypeScript to develop this library, webpack for the bundling and Jest for testing.

The first implementation

TDD all the way!

I also wanted to try TDD, so I started by writing a test.

The first ones were the following

  • As an Observable, it:
    • should create a new Observable with no emitting value;
    • should create a new Observable with an emitting value;
  • subscribe method, it:
    • should throw an error if the wrong argument is passed;
    • should add a subscriber to the list of subscribers;
    • should not execute the subscriber if there is no emitting value;
    • should execute the subscriber if there is at least one emitting value;
  • pipe method, it:
    • should return a new Observable with no value if the initial Observable is empty;
    • should return a new Observable with the original Observable's value;

Since I had no operators yet, I did not see any other tests I could implement.

Of course, the tests were failing, so now it was time to implement the Observable class.

The Observable class

First of all, the Observable class should have a generic parameter.

class  Observable<T = never>
Enter fullscreen mode Exit fullscreen mode

Then, it should have an inner sequence, which would be an array of items of type T.

private _innerSequence!: T[];
Enter fullscreen mode Exit fullscreen mode

So far, this simple class:


class  Observable<T = never> {
    private _innerSequence!: T[];
    constructor(...initialSequence: T[]) {
        this._innerSequence = initialSequence;
    }
}
Enter fullscreen mode Exit fullscreen mode

allows to create an observable with some values in it. The problem is: we still cannot pipe it, nor subscribe to it.

The Subscriptions and the Subscribers

How can a Subscriber be represented?
If we take a look at RxJs, we can see that a Subscriber should have three methods: one to handle the emitted values, one to handle the errors, and one to handle when an Observable is completed (i.e: the sequence last element has been emitted).

A simple interface is enough, with an interface for each method as well:

interface  Subscriber<T = any> {
    next?: OnNext<T>, error?: OnError, complete?: OnComplete
}
interface  OnNext<T> { (t: T): void; }
interface  OnError { (err: Error): void }
interface  OnComplete { (): void };
Enter fullscreen mode Exit fullscreen mode

Also, everytime someone subscribes to an Observable, they should receive a Subscription in order to manage if they want to unsubscribe or not. Hence the following class:

class  Subscription {
    constructor(private  _unsubscribeCallback?: UnsubscribeCallback) {}
    unsubscribe(): void {
    this._unsubscribeCallback && this._unsubscribeCallback();
    }
}
Enter fullscreen mode Exit fullscreen mode

Hold on a minute! What is this UnsubscribeCallback and why is it optional?

Let's see how our Observable class should behave when someone subscribes to it.
Everytime there is a subscribing, we should:

  • check if the subscriber is actually a Subscriber (is either of type OnNext<T> or Subscriber<T>), else throw an error;
  • if there is no subscriber passed as an argument, we should read the inner sequence without executing anything;
  • if there is an actual subscriber, add it to the list of subscribers of the Observable and execute its methods according to the inner sequence;
  • finally, we should return a new Subscription.

And when this Subscription has its unsubscribe method called, we should finally remove the corresponding Subscriber of the Observable list of subscribers. A simple way to do it is by passing to the Subscription constructor a callback that will simply filter out the corresponding Subscriber, like so:

() => (this._subscribers = this._subscribers.filter((s) =>  s !== subscriber))
Enter fullscreen mode Exit fullscreen mode

This callback will then be executed when the unsubscribe method is called.
Now, why is it optional?

There is a case where we do not need any callback to be executed: it is when one subscribes to an Observable without providing any Subscriber. In this particular case, we won't add anything to our list of subscribers.

pipe and the operators

At this step of my project, I need to implement operators so my Observables can be transformed.
An operator, such as map, filter, tap or even switchMap, is nothing but a function taking a callback as an input, and returning a result as an input. The result can be anything: a number, a string, a boolean, void, or even another Observable.

The callback can have a generalized representation we can call OperatorFunction. There are operators that will take an argument of type T, and output a result of type U, such as U can be anything, including T. And there are operators that will take an argument of type T, but won't necessarily output anything.

Given this information, we have a type OperatorFunction<T, U = never> = (source: T) => U.

Now, there is a tricky case (there are many but this one was very challenging for me): the switchMap operator.
This operator takes a value of type T, and returns an Observable of type U. The question is: how can I use the pipe method with any operators I want, with each operator not having to care if the previous returns a new Observable or not?

In short, the following code should work:

const obs$ = of(12, 13, 14)
    .pipe(
    map((x) => x + 1), //will output 13, 14, 15 
    switchMap((x) => of(x - 3)), //will output Observable(10), Observable(11), Observable(12)
    filter((x) => x > 10) //will output false, true, true
    );
Enter fullscreen mode Exit fullscreen mode

But how can the filter operator receive the values wrapped in the Observables returned by the switchMap operator, and not the actual Observables?

To answer that, I chose to specify the result of every operator as an OperationResult. An OperationResult should bear a value (the result), as well as an error if the operation threw one, and a flag.

What are flags for?
I saw four cases:

  • the result comes from an operator such as switchMap and mus actually be unwrapped;
  • the previous operator was a predicate that wasn't matched and the rest of the operation pipeline must stop for the whole inner sequence;
  • the previous operator was a predicate that wasn't matched and the rest of the operation pipeline will be skipped to the next inner sequence item;
  • the previous operator threw an error.

It gives us the following enum:

enum OperationResultFlag {
    UnwrapSwitch = 'UnwrapSwitch',
    MustStop = 'MustStop',
    FilterNotMatched = 'FilterNotMatched',
    OperationError = 'OperationError',
}
Enter fullscreen mode Exit fullscreen mode

As for the OperationResult, it's a class, implemented as follow:

class  OperationResult<T> {
    constructor(private  _value: T, private  _flag?: OperationResultFlag, private  _error?: Error) {}

get  value() {return  this._value;}

get  flag() {return  this._flag;}

get  error() {return  this._error;}

isUnwrapSwitch(): boolean {
    return  this._flag === OperationResultFlag.UnwrapSwitch;
}
isMustStop(): boolean {
    return  this._flag === OperationResultFlag.MustStop;
}
isFilterNotMatched(): boolean {
    return  this._flag === OperationResultFlag.FilterNotMatched;
}
isOperationError(): boolean {
    return  this._flag === OperationResultFlag.OperationError
}
Enter fullscreen mode Exit fullscreen mode

Now that we have a class for our operation results, we can actually combine multiple operators. In our Observable class, we will internally check after each operation if the result is flagged, and according to it what we need to do.

Follow for the next part

In the next part, I will explain how I came up with more specific Observable implementations such as MutableObservable or ObservableFork.

These implementations implied a lot of change and thinking in my code, and are still evolving.

Anyway, I hope you liked this article, do not hesitate to leave your feedback!

Top comments (0)