The Angular async
pipe is the cornerstone of making applications performant. How exactly does it work? Let's walk through the code together to understand it.
If you have ever searched for articles about Angular applications and performance, you have read about OnPush
change detection. A lot of people jump right into using it, however, I see a tendency of using object mutation for manipulating data and relying on the Default
change detection strategy. Usually, when an application is built on object mutation, changing to OnPush
change detection breaks the application. There are usually two solutions, one is to revert to using the Default
change detection strategy, the other one is injecting the ChangeDetectorRef
to every component where a subscription occurs and call its markForCheck()
method at the end of the callback function.
Using the Default
change detection strategy in these cases won't increase performance, injecting the ChangeDetectorRef
into every component can be rather cumbersome and obnoxious. However, you can avoid it with RxJS
, and the async
pipe.
Data composition is important
I have met with the phenomena of subscribing to an Observable inside a component and saving the result into a class member property. You might be familiar with this structure as well:
// ...
data: Data[] = [];
constructor(private http: HttpClient) {}
ngOnInit(): void {
this.http.get(`some/url`).subscribe(result => {
this.data = result;
})
}
Instead of assigning to the data property of your component class, you could (and in my humble opinion should) use the async pipe in your template, to subscribe to the observable, and it would handle unsubscribing for you.
{{ data$ | async }}
// ts
data$ = this.http.get(`some/url`);
How does the async pipe work?
@Pipe({name: 'async', pure: false})
export class AsyncPipe implements OnDestroy, PipeTransform {
// ...
}
The Angular async
pipe is not pure. Whenever a pipe has an internal state the pure
property of the @Pipe()
decorator config should be set to false. This means, that the transform()
method of the pipe gets invoked on every change-detection cycle. Since the async
pipe usually deals with Observable
or Promise
inputs, the pipe itself has an internal state for storing the last value. However, to properly handle the teardown logic and to avoid memory leaks, the Subscription
, the source (_obj
) and the SubscriptionStrategy
are stored in memory as well.
// ...
private _latestValue: any = null;
private _subscription: SubscriptionLike|Promise<any>|null = null;
private _obj: Observable<any>|Promise<any>|EventEmitter<any>|null = null;
private _strategy: SubscriptionStrategy = null!;
constructor(private _ref: ChangeDetectorRef) {}
//...
As you can see, the ChangeDetectorRef
is injected into every async
pipe instance, but more on that later. First, let's check the SubscriptionStrategy
interface. The classes that implement this interface must have the following methods: createSubscription
, dispose
and onDestroy
. The first creates the subscription, dispose and onDestroy are responsible for handling the teardown logic, so memory leaks can be avoided.
interface SubscriptionStrategy {
createSubscription(async: Observable<any>|Promise<any>, updateLatestValue: any): SubscriptionLike | Promise<any>;
dispose(subscription: SubscriptionLike|Promise<any>): void;
onDestroy(subscription: SubscriptionLike|Promise<any>): void;
}
class ObservableStrategy implements SubscriptionStrategy {
createSubscription(async: Observable<any>, updateLatestValue: any): SubscriptionLike {
return async.subscribe({
next: updateLatestValue,
error: (e: any) => {
throw e;
}
});
}
dispose(subscription: SubscriptionLike): void {
subscription.unsubscribe();
}
onDestroy(subscription: SubscriptionLike): void {
subscription.unsubscribe();
}
}
class PromiseStrategy implements SubscriptionStrategy {
createSubscription(async: Promise<any>, updateLatestValue: (v: any) => any): Promise<any> {
return async.then(updateLatestValue, e => {
throw e;
});
}
dispose(subscription: Promise<any>): void {}
onDestroy(subscription: Promise<any>): void {}
}
const _promiseStrategy = new PromiseStrategy();
const _observableStrategy = new ObservableStrategy();
// ... Pipe class declaration
The ObservableStartegy
and the PromiseStrategy
classes are wrappers around the logic that needs to be handled. While the dispose
and onDestroy
methods for Promise handling are void
methods, the Observable strategy calls .unsubscribe()
in both of those methods. Although, the onDestroy
method never gets called in the async_pipe.ts
file the dispose
method handles unsubscribing.
@Pipe({name: 'async', pure: false})
export class AsyncPipe implements OnDestroy, PipeTransform {
// ...
ngOnDestroy(): void {
if (this._subscription) {
this._dispose();
}
}
// ...
private _dispose(): void {
this._strategy.dispose(this._subscription!);
this._latestValue = null;
this._subscription = null;
this._obj = null;
}
// ...
}
As it shows, the async
pipe implements the OnDestroy
lifecycle hook, and if there is a subscription stored in the instance, it calls the internal _dispose()
method. This method calls dispose
on the internally stored _strategy
, and sets everything to null. When this occurs, the JS engine's garbage collector will deal with the rest.
// ...
transform<T>(obj: null): null;
transform<T>(obj: undefined): undefined;
transform<T>(obj: Observable<T>|null|undefined): T|null;
transform<T>(obj: Promise<T>|null|undefined): T|null;
transform(obj: Observable<any>|Promise<any>|null|undefined): any {
if (!this._obj) {
if (obj) {
this._subscribe(obj);
}
return this._latestValue;
}
if (obj !== this._obj) {
this._dispose();
return this.transform(obj as any);
}
return this._latestValue;
}
// ...
The transform()
method always returns the internally stored _latestValue
, therefore whenever an async
pipe is used, the first value is always null
. The first time the method gets called, and the provided parameter is neither null
nor undefined
, a subscription occurs. This internal _subscribe
method handles a couple of things. It saves the reference of the pipe's target, then selects the proper strategy for it via the Angular internal ɵisPromise
and ɵisObservable
helper functions.
private _subscribe(obj: Observable<any>|Promise<any>|EventEmitter<any>): void {
this._obj = obj;
this._strategy = this._selectStrategy(obj);
this._subscription = this._strategy.createSubscription(
obj, (value: Object) => this._updateLatestValue(obj, value));
}
private _selectStrategy(obj: Observable<any>|Promise<any>|EventEmitter<any>): any {
if (ɵisPromise(obj)) {
return _promiseStrategy;
}
if (ɵisObservable(obj)) {
return _observableStrategy;
}
throw invalidPipeArgumentError(AsyncPipe, obj);
}
Finally, it saves the subscription by creating it with the createSubscription
method, providing the internal _updateLatestValue
callback method. This method checks if the internally stored Observable
and the passed Observable
are the same and have the same reference. If they are, the _latestValue
is updated, and the ChangeDetectorRef
's markForCheck()
method gets called, triggering a change detection when the subscribed Observable
emits a new value. This is the part, where using RxJS and the async
pipe handles using the OnPush
change detection strategy.
private _updateLatestValue(async: any, value: Object): void {
if (async === this._obj) {
this._latestValue = value;
this._ref.markForCheck();
}
}
That is not all, since the pipe's target can be a new Observable
instance as well. Since Observables
are objects, they are passed by reference. Therefore, whenever you assign a new observable to a member property, the transform
method runs recursively.
// ...
transform(obj: Observable<any>|Promise<any>|null|undefined): any {
if (!this._obj) {
if (obj) {
this._subscribe(obj);
}
return this._latestValue;
}
if (obj !== this._obj) {
this._dispose();
return this.transform(obj as any);
}
return this._latestValue;
}
// ...
You'll notice, when there is an existing subscription, the internally stored and target Observables
are checked against each other, and if they differ by reference, the old (internally stored) Observable
gets disposed, and the transform
method gets called recursively to create a new subscription.
Example in action
Let's create a component with two timers. One timer should emit every 2 seconds, and it should use the async pipe, the other one should emit every second, but it should use object mutation. For now, let's use the default change detection strategy.
@Component({
selector: 'app-test',
template: `
<div> Timer 1: {{ timer1$ | async }} </div>
<div> Timer 2: {{ timer2 }} </div>
`
})
export class TestComponent {
timer1$ = timer(0, 2000);
timer2 = 0
constructor() {
timer(0, 1000).subscribe((count) => {
timer2 = count;
})
}
}
When using the Default
change detection strategy, you can see that timer2
gets increased by 1 every second, and timer1$
with the async pipe gets increased by 1 every two seconds. Now let's switch to OnPush
change detection, by adding changeDetection: ChangeDetectionStrategy.OnPush
to the component decorator.
Now the timer2
binding gets increased by 2 every 2 seconds, and the timer1$
behaves the same way as before, namely, it gets increased by 1 every 2 seconds. Why does timer2
get increased when timer1$
emits? Because the async pipe triggers change detection. If you comment out the {{ timer$1 | async }}
part from the template, you can observe that nothing gets updated.
Conclusion
Using the async
pipe and understanding how it works allows us to write better-performing applications. When you use OnPush
change detection, Angular can work more efficiently, because it does not have to watch for object mutation. In these cases, RxJS
and data composition can help you in making reactive and performant applications.
Top comments (0)