DEV Community

Joseph Avery Ferrante
Joseph Avery Ferrante

Posted on

RxJS - Simplifying Complex Operator Chains

RxJS is a powerful library that allows multiple operators to be applied to data coming from the Observable stream. While the library greatly simplifies working with asynchronous data, it can still introduce complex and hard to read code. In this article we will explore a strategy for simplifying a chain of complex operations into a format that is easier to understand.

Assumptions

Familiarity with RxJS, Observables, and using the pipe() function to apply multiple operators to data emitted through the stream.

Base Problem

Imagine we have an Observable stream where we want to transform the data that is coming through:

interval(500).pipe(
    filter((num: number) => num % 2 === 0),
    take(10)
    map((num: number) => num * 10),
);

Here we use the interval function to emit values every 500 milliseconds.
Looking at the operators in the pipe function, you can derive that we are

  1. Allowing only even numbers to pass
  2. Taking a total of 10 values before completing
  3. Multiplying each number by 10

While this example may be simple and contrived, you must realize that understanding what is happing within the pipe() function takes effort and time. If you extract this simple example to a real world use case with multiple operators performing application logic, it can quickly become complex and take quite a bit of effort, especially for new members, to grasp what is happening with the data.

Solution - Version 1

We can actually create small, bite sized, and well named functions to house our transformation logic. Let's focus on the first operator in the pipe chain:

filter((num: number) => num % 2 === 0)

We can refactor this into an encapsulating function:

private function takeEvenNumbers() {
    return filter((num: number) => num % 2 === 0);
}

which can then be used within the original observable chain:

interval(500).pipe(
    this.takeEvenNumbers(),
    take(10)
    map((num: number) => num * 10),
);

Already this simplifies the process of introducing someone to the logic within the operator chain, but we can go further.

Solution - Version 2

Just as we can have multiple operators in a pipe() function attached to the Observable, we can also return multiple operators from our encapsulating functions. Let's refactor the first two operators from our original pipe chain:

private function take10EvenNumbers() {
    return pipe(
        filter((num: number) => num % 2 === 0),
        take(10)
    );
}

NOTE: the pipe function used here is imported from 'rxjs' (import { pipe } from 'rxjs')

We can now rewrite the original Observable as such:

interval(500).pipe(
    this.take10EvenNumbers(),
    map((num: number) => num * 10),
);

Depending on the data transformation happening/level of granularity the developer desires, you can use these self created operator functions to build up other operator functions:

private function takeEvenNumbers() {
    return filter((num: number) => num % 2 === 0);
}

private function take10EvenNumbers() {
    return pipe(
        this.takeEvenNumbers(),
        take(10)
    );
}

interval(500).pipe(
    this.take10EvenNumbers(),
    map((num: number) => num * 10),
);

Solution - Version 3

Looking at the previous solution, while improved, is potentially too rigid or specific. The function take10EvenNumbers(), while useful here, could be generalized for use elsewhere. We can achieve such:

private function takeXEvenNumbers(amount: number) {
    return pipe(
        filter((num: number) => num % 2 === 0),
        take(amount)
    );
}

We now have flexibility, allowing us to take any amount of even numbers.

interval(500).pipe(
    this.takeXEvenNumbers(10),
    map((num: number) => num * 10),
);

Conclusion

Using the method described above, we can abstract potentailly complex & confusing logic into bite sized and understandable chunks. The onus for deciding what granularity/abstraction level is useful falls to the developer.

interval(500).pipe(
    this.takeXEvenNumbersAndMultiplyBy(10, 10)
);

The above may or may not be useful, but that is an exercise for the development team. As a final, more real world example, imagine facing this:

this.clientService.getServersByDealerId(dealerId).pipe(
      pluck('results'),
      mergeMap((objArr: Server[]) => timer(2000).pipe(mapTo(objArr))),
      mergeMap((objArr: Server[]) => {
        let observableArray = [];
        objArr.forEach(client => {
          observableArray.push(this.livestreamService.getMediaSourcesByServerId(client.id).pipe(
            map(objArr => {
              objArr.forEach(o => o.hostClientId = client.name)
              return objArr
            })
          ))
        })
        return forkJoin(observableArray);
      }),
      map((camerasInServers: Camera[][]) => {
        return camerasInServers
          .filter(sub => sub.length !== 0)
          .map(sub => {
            let container: CameraContainer = {
              hostClientId: sub[0].hostClientId,
              cameras: sub
            }
            return container;
          });
      }),
      distinctUntilChanged((p: CameraContainer[], q: CameraContainer[]) => JSON.stringify(p) === JSON.stringify(q))
    )

versus facing this

this.clientService.getServersByDealerId(dealerId).pipe(
    pluck('results'),
    this.emitServerResultsEvery(2000),
    this.getCamerasFromServers(),
    this.mapToCameraContainer(),
    this.emitChangedCameras()
)

The second is easier to read, debug, and understand what is going on. Reading the second one you may derive relatively quickly what is happening: a request for server objects that contain IDs needed for polling child object changes.

Top comments (2)

Collapse
 
sargalias profile image
Spyros Argalias

Completely agree.

I usually do this technique, and not just for RxJS. It's better to read a name of what a function does than having to parse every single thing, such as your final "real world" example.

To nitpick a little bit, I probably wouldn't have a single composed function in pipe. I would try to name the stream well and keep the functions in pipe simple. This way the name of the stream gives sufficient information and we can peek into the stream for more. However with a pipe that has a single composed function, that doesn't offer more information than the stream name itself.

Something like:

const take10EvenIntervalsMultipledBy10$ = interval(500).pipe(
    filter(isEven),
    take(10)
    map(multiplyBy10),
);
Collapse
 
averyferrante profile image
Joseph Avery Ferrante • Edited

Very good point. In my examples I'm not naming the stream variable, but that is definitely also a good source for code clarity/self commenting code.

Also I agree that only having one function in the pipe chain is probably overkill, but really wanted to drive the point home.

To expand on this idea just a bit, you can still make a function more flexible in this case (multiplyBy10 is a bit rigid) by using a factory function:

function multiplyBy(amount: number) {
  return (numberFromStream: number) => amount * numberFromStream;
}

interval(500).pipe(
  map(multiplyBy(10))
)

Thanks for providing a bit of an alternative!