DEV Community

George Knap
George Knap

Posted on

The catch in the RxJS catchError

Hi friends, George here.

Catching errors in rxjs streams can be troublesome.

Let's explain 3 different options and outcomes for our demo use case:
Let's imagine we have a component that fetch data for different entities from the database. Fetch requests come in time in a form of rxjs stream - for our case let us mock it with rxjs interval

const interval$ = interval(1000).pipe(skip(1));
Enter fullscreen mode Exit fullscreen mode

Also, we have an API service which is responsible for getting a data. However, our API is not reliable and from time to time throws an error

@Injectable(
  providedIn: 'root',
})
export class DemoService {
  /**
   * mock of an api endpoint which throws an error on each even call
   */
  getData(counter: number): Observable<any> {
    if (counter % 2 === 0) {
      return throwError(() => new Error());
    }


    return of(`some demo data ${counter}`);
  }
}
Enter fullscreen mode Exit fullscreen mode

Let's get the data from the API service in ngOnInit

  ngOnInit() {
    const interval$ = interval(1000).pipe(skip(1));


    this.data$ = interval$.pipe(
      tap((i) => console.log(i)),
      switchMap((i) => this.demoService.getData(i)),
    );
Enter fullscreen mode Exit fullscreen mode

Now, how do we handle these errors incoming from the API?

1. catchError and returning new observable

In my team this is what we have done many time without thinking it through

this.data$ = interval$.pipe(
      switchMap((i) => this.demoService.getData(i)),
      catchError(() => of('error is cought from api'))
    );
Enter fullscreen mode Exit fullscreen mode

What we did not realize is that we are re-emitting new observable - the one created by of() function and the data$ will subscribe to it. Hence returning the caught value and completing the stream.
This is what happens

> 1
> 2 //cought

streamed was unsubscribed
Enter fullscreen mode Exit fullscreen mode

2. catchError while returning the cought observable

If you look at the catchError documentation you can see the callback actually takes two parameters: error itself and the cought observable

catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>
Enter fullscreen mode Exit fullscreen mode

we can use this to re-emit the original observable

this.data$ = interval$.pipe(
      switchMap((i) => this.demoService.getData(i)),
      catchError((err, cought$) => cought$)
    );
Enter fullscreen mode Exit fullscreen mode

this way the stream will start emitting values again

> 1
> 2//cought
> 1
> 2//cought
> 1
> ...
Enter fullscreen mode Exit fullscreen mode

3. catchError inside service stream

In order to continue the original stream with new incoming values we have to move the catch to the inner observable

    this.data$ = interval$.pipe (
      switchMap((i) =>
        this.demoService.getData(i).pipe(catchError(() => of('cought')))
      )
    );
Enter fullscreen mode Exit fullscreen mode

this way we get a stream with cought values

> 1
> 2 //cought
> 3
> 4 //cought
> 5
> 6 //cought
> 7
> ...
Enter fullscreen mode Exit fullscreen mode

If you're lazy like me feel free to see the demo in Stackblitz

Happy catching!

Top comments (1)

Collapse
 
kasir-barati profile image
Mohammad Jawad (Kasir) Barati

Hi, Thanks for the article. Question, I read this post and RxJS doc too but could not figure it out whether catchError will catch errors occurred in all previous steps or just the one before it. Assume we have this code:

observable.pipe(
  map((apiResponse) => anotherPromise(apiResponse)),
  tap(async (someData) => anotherObservable(someData)),
  catchError((error, caught) => { 
    logger.error({ error, caught });
    throw error;
  }),
);
Enter fullscreen mode Exit fullscreen mode

Here I like to catch any error that might have occurred in the map or tap.