DEV Community

Jay Vincent
Jay Vincent

Posted on

Creating Observables from an array of sequentially completing Promises

A super-quick post from me today, as I wanted to share some newly acquired knowledge.

Let's say you have an array of data, userIds in this example:

export class PageComponent {
  userIds = ['user_1', 'user_2', 'user_3'];
}

And you want to create an Observable stream of User objects from an API request, where each API request is run in sequence - ie. the request for user_2 won't be made until user_1 has resolved. How would you do it?

Let's assume we have a function which returns a promise of our User object.

You may be tempted to map over the userIds and use Promise.all like so:

export class PageComponent {
  userIds = ['user_1', 'user_2', 'user_3'];
  users: User[];

  async ngOnInit(): void {
    this.users = await Promise.all([
      ...this.userIds.map(userId => this.userService.getUser(userId))
    ]);
  }
}

However, this will make all asynchronous calls at the same time and won't set this.users until all inner promises have resolved.

Observables are a much better pattern for this kind of use-case.

What we want to do is create an Observable from the userIds array with the rxjs from function, then use the concatMap operator to map each userId to an inner observable (or promise in this case), which won't be subscribed to until the previous one has completed.

export class PageComponent {
  userIds = ['user_1', 'user_2', 'user_3'];
  users$: Observable<User>;

  ngOnInit(): void {
    this.users$ = from(this.userIds).pipe(
      concatMap(userId => this.userService.getUser(userId))
    );
  }
}

When subscribed to, this observable will request and emit each user in sequence, emitting 3 User objects over time before completing.

Bonus points - use scan to accumulate values over time

Let's say you wanted the Observable to build up an array of User objects over time and not just emit individual User objects. Let's use the rxjs scan operator!

export class PageComponent {
  userIds = ['user_1', 'user_2', 'user_3'];
  users$: Observable<User[]>;

  ngOnInit(): void {
    this.users$ = from(this.userIds).pipe(
      concatMap(userId => this.userService.getUser(userId)),
      scan((acc, curr) => acc.push(curr), [])
    );
  }
}

Et voila! Simply loop over users$ | async in your template and watch your user list grow.

The power and magic of rxjs and Observables everybody :)

Top comments (3)

Collapse
 
jwp profile image
John Peters

Nice. I recently did a similar thing but had to delay each outbound request by 1 sec. to avoid browser running out of memory. I used the delay function with concatmap.

Collapse
 
_builtbyjay profile image
Jay Vincent

Thanks John. That's interesting - how many requests in total were you making?

Collapse
 
jwp profile image
John Peters • Edited

We are developing a program that get's an api endpoint which returns a large array of IDs for already finished jobs. But to get further job information, we currently have to send the get requests one at a time. We were finding that doing this without a delay, the browser would run out of memory in as little as 500 get requests, if I sent them all at once. The pipe with the delay did the trick. The total number of get requests in our tests so far is showing over 1700; but it's possible over time this could go much higher. Clearly this is a design point problem.

We will put in better query-ability on the back end starting next week. For example "Get all jobs of job type and date-range", "Get all failed jobs by date range" etc. And probably most important, we'll implement paging logic on back end. We know that nobody needs more data than they can read one page at a time.