DEV Community

Cover image for Flattening and Transforming Observable Arrays with RxJs
thomas for This is Angular

Posted on • Updated on • Originally published at Medium

Flattening and Transforming Observable Arrays with RxJs

Rxjs is an open-source library use to handle streams of events. It's a very powerful tool but it has a steep learning curve and can be tricky at times.

The issue we will be discussing in this article is when dealing with a list of objects inside a stream, and needing to execute another async operation on one property of those objects. This can result in a Observable<Array<Observable<T>>>, which can be challenging to subscribe to. We'll explore different solutions to overcome this challenge and transform it into an Observable<Observable<Array<T>>>.

This article will also help you to resolve challenge #11 of Angular Challenges, which is intended for developers who already have a strong understanding of observables. However anyone can read this article and learn from it. If you want to try this challenge first, I encourage you to do so then come back to compare your solution with the guidance provided here. (You can also submit a PR that I'll review)


Let's start with a very basic exemple to understand the issue more clearly.

readonly persons$ = this.service.get(selectPersons);
         // ^? Observable<Persons[]> where Person = {id: number, name: string}
Enter fullscreen mode Exit fullscreen mode

We have a readonly persons$ observable that retrieves a list of persons from our store, which could be NgRx, RxAngular, or a simple service using a Subject. In this example, the implementation details are hidden behind this.service.

The goal is to fetch the address of each person from our backend and return a list of persons with their corresponding addresses.

Imperative approach:

The approach I see the most often inside codebase is to use imperative style and nest the subscribe callbacks, like this:

let personWithAddress: PersonWithAddress[];

this.persons$.subscribe((persons) => {
  persons.forEach((person) => {
    this.http.getAddress(person.id).subscribe((address) => {
      personWithAddress.push({...person, ...address});
    });
  });
});
Enter fullscreen mode Exit fullscreen mode

However, this approach is discouraged as it results in nested subscribes and can quickly become messy, even in this simple example.

Naive approach with Rxjs:

Let's try again with a more reactive approach, using rxjs operators.

What I often see when teaching people on that type of issue is this code :

 personWithAddress$ = this.persons$.pipe(
  // ?^ Observable<Observable<PersonWithAddress>>
    mergeMap((persons) => 
        persons.map((p) =>
                //^ map is not a rxjs operator but a Array function
            this.http.getAddress(p.id).pipe(map(address => ({...p, ...address})))))
                        // ^return an observable we never subscribe to  
);
Enter fullscreen mode Exit fullscreen mode

People often struggle with trying to use mergeMap, switchMap,... to flatten the Observable<Observable<T>> structure. However, this is not possible since persons.map returns an array of Observables, not a single Observable.

Forkjoin:

Another well-known option is the forkJoin operator, often used when working with an array of Observables that complete (note that the word "complete" is crucial, as forkJoin will only emit once all Observables have completed).

Here's an example using forkJoin:

  personWithAddress$ = this.persons$.pipe(
    mergeMap((persons) =>
      forkJoin(
        persons.map((p) =>
          this.http.getAddress(p.id).pipe(map((address) => ({ ...p, ...address })))
        )
      )
    )
  );
Enter fullscreen mode Exit fullscreen mode

Heyy, this code compiles and works as intended. However, it can be difficult to read and debug due to the high level of nesting inside the block of code.

The final trick:

One very important piece of advice to remember when working with streams of object arrays is to first flatten the structure to work with a stream of simple objects. Working with arrays can create a lot of overhead, while working with a simple stream of objects is what we are used to working with every day.

The two magical operators for achieving this are mergeAll and toArray. The first one lets us flatten the array to a stream of simple objects, and the second one puts the stream back into an array when it's finished.

Here's an example of how we can use these operators in action:

  personWithAddress$ = this.persons$.pipe(
    mergeAll(), // flatten to Observable<Person>
    mergeMap((p) =>
      this.getAddress(p.id).pipe(map((address) => ({ ...p, ...address })))
    ),
    toArray() // back to Observable<PersonWithAddress[]>
  );
Enter fullscreen mode Exit fullscreen mode

Notes:

  • Only one level of nesting
  • Working with a simple object is preferred over working with an array of objects.

That's it for this article! I hope you have learned a new trick to ease your usage of rxjs and become a better reactive programmer. 🚀

You can find me on Twitter or Github.Don't hesitate to reach out to me if you have any questions.

Top comments (3)

Collapse
 
alejandrocuevas profile image
AlejandroCuevas

Awesome! Great article and grerat tip! Many thanks for writing it!

I have one question, if it is a common pattern doing that stuff, would it make sense to create an operator that uses source, mergeAll, mergeMap (the service call will be send by param) and toArray?

Something like:

source.pipe(
 flattenObsArray(this.getAddress(id).pipe(map)...)
)
Enter fullscreen mode Exit fullscreen mode
Collapse
 
achtlos profile image
thomas

Thanks 🙏
If you are doing it a lot inside your project and also the same way, you can always create your own operator.

Collapse
 
alejandrocuevas profile image
AlejandroCuevas

Okey! Thanks for the answer!