tldr;
I use Observables and RxJS every day when developing in Angular, but many times I just use the default, out-of-the-box, standard features. But what I really wanted for a recent project was to cache some of the data that we needed to get from the server. The data doesn’t change frequently, so caching the response for an hour or two before re-fetching the data was perfect. Luckily, RxJS has the shareReplay
operator, which automatically shares previous responses with new subscribers. In many cases, that’s a sufficient way to cache the data. But if you want to update the data after a certain time period, you need a combination of shareReplay
, defer
, and mergeMap
so that when a subscriber is added to an observable, the data is refreshed if necessary. Check out the examples below, and read on for more information.
Caching Data with RxJS
The other day I was working on an application at work where we needed to get some data from the server, but the data doesn’t update very frequently. So getting the information once an hour or so is no issue. RxJS makes this really easy to do! I was in luck! Here’s all you need to do to to have RxJS cache data and provide that same data to other subscribers:
this._http.get(this.url).pipe(shareReplay(1));
This example uses Angular’s HttpClient
module to make a call to an API endpoint at some url
, and uses the shareReplay
operator to tell RxJS to send the latest data to new subscribers. You can read more about shareReplay
in the RxJS docs. The 1
that is passed in to the shareReplay
operator here sets the buffer size of the operator to 1, which is why it shares only the latest data with new subscribers.
In many cases, this caching method is sufficient. The data will be updated when the application is refreshed. As long as the data doesn’t need to be refreshed before then, this works perfectly. However, if you can’t wait until the app is refreshed for the data to be refreshed, we’ll need to do a little more.
Caching and Updating Data with RxJS
Okay, so now let’s pretend we want to cache our data for 5 minutes, and after 5 minutes we need to get it updated for the next subscriber. Luckily, shareReplay
can take a second parameter, windowTime
. windowTime
essentially declares how long the data will be cached with shareReplay
. So, if we passed in 5 minutes as the windowTime
, then any subscribers from the first one at 0 seconds to a subscriber at 4 minutes and 59 seconds will get the same data. At 5 minutes, the observable completes and any subscribers after will just get a null value back.
Now, that’s not exactly what we want, because new subscribers after the windowTime
have no data, but we really want them to have data (obviously). So this is the question that Ben Lesh answered on Twitter for me. Here’s his tweet:
@prestonjlamb @samjulien @DeborahKurata Hmm.. that's tricky. RxJS doesn't have anything like that OOTB, but the first thing that pops into my mind would be something like this:23:09 PM - 28 Aug 2019
You can read the comments in the code for his picture as well, but the createShared
function initializes the shared$
observable with the data$
observable. It also uses defer
, which is used if null is the first value emitted from the shared$
observable. In that case, it creates a new instance of the observable and passes that along. I tested this out, and it worked perfectly. Take a look at this StackBlitz example. The observable is subscribed to as soon as the component loads, and then after the windowTime
expires a second subscriber jumps in and gets new data. There’s a good chance that the two sets of data will not be matched up, since the first observable completed. (* Note: the item that shows is randomly chosen from 1 to 10 from the list of people returned from the Star Wars API. There’s a chance that the same number comes up twice, but the first one didn’t update its data to match a new, second call for data.)
To make it easier to create this type of observable, I created this function:
let returnObs$: Observable<any>;
const createReturnObs = (obs: Observable<any>, time: number, bufferReplays: number) =>
(returnObs$ = obs.pipe(shareReplay(bufferReplays, time)));
export function renewAfterTimer(obs: Observable<any>, time: number, bufferReplays: number = 1) {
return createReturnObs(obs, time, bufferReplays).pipe(
first(null, defer(() => createReturnObs(obs, time, bufferReplays))),
mergeMap(d => (isObservable(d) ? d : of(d))),
);
}
To get a new observable that will cache data for a given amount of time, and then be refreshed for new subscribers after the given time, use that function like this:
myObs$ = renewAfterTimer(of(true), 1500);
where of(true)
is the observable you need to be refreshed.
Conclusion
So, as a review, use the shareReplay
operator from RxJS to cache data for your application. Use the above example from Ben (or the function I built to create those observables that Ben mentioned) to create an observable that caches the data for a certain amount of time and then refreshes for new subscribers.
Top comments (0)