Introduction
I really love reactive programing. Especially when working on frontend applications. I know this is a hot topic and we will not discuss the strengths and weaknesses of RxJS in this post. However, one of the weaknesses is truly that testing becomes more complicated. More complicated code, more complicated test. This article shows what means we have to testing today, and how I implemented an easier patter to really boost productivity.
What tools do we already have?
RxJS actually comes with a really nice set of tools to test observables. Example below is from RxJS documentation and if some of you are like me, they would say it is quite complicated.
Most of my observables emit simple values, and I usually do not test the exact timings. Usually you want to test that for a given input A you end up with exactly one result of B. Maintaining tests like the ones below, is a bit harder than it should. Even though I look upon my own RxJS skills as above average, this is still quite hard to read.
// This test runs synchronously.
it('generates the stream correctly', () => {
testScheduler.run((helpers) => {
const { cold, time, expectObservable, expectSubscriptions } = helpers;
const e1 = cold(' -a--b--c---|');
const e1subs = ' ^----------!';
const t = time(' ---| '); // t = 3
const expected = '-a-----c---|';
expectObservable(e1.pipe(throttleTime(t))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
I have used the marble diagram for testing production code once. And most of the time, I have been writing tests using lastValueFrom
in combination with await. That creates a lot of clutter in the tests and is also quite hard to follow. It is also hard to test when an observable completes using that method.
Lets get inspired by Dart
I wrote some apps in dart, and while doing so I was impressed by the amazing tooling dart has created for testing streams. Streams in dart are quite comparable to RxJS. RxDart is a package basically just extending the existing Stream-class found in Dart. And testing streams in Dart is awesome. Just look at this example.
final stream = Stream.fromIterable([
'This.',
'Is',
'Very readable!!!!!!',
]);
expect(
stream,
emitsInOrder([
'This',
'Is',
'Very readable!!!!!'
]),
);
This is very simple. Define expected outputs and the stream. In many cases, this is more than enough and it makes the tests very readable. This is what we want in typescript!
Lets make our own!
The experience I want, is shown below
describe('simple tests', () => {
it('double stream', async () => {
const source = from([1, 2, 3, 4]);
const double$ = source.pipe(map((value) => value * 2));
expectEmitsInOrder(double$, [2, 4, 6, 8, StreamComplete]);
});
});
describe('pattern used when testing services and similar', () => {
it('double stream', async () => {
const source = new BehaviorSubject(0);
const double$ = source.pipe(map((value) => value * 2));
expectEmitsInOrder(double$, [0, 40, 80]);
source.next(20);
source.next(40);
});
});
describe('We want to test that things fail as they should', () => {
it('double stream', async () => {
const backendError = new Error('Backend is at fault!!!!');
const source = from([1, 2, 3, 4, '5' as unknown as number]);
const double$ = source.pipe(
map((value) => {
if (typeof value === 'number') {
return value * 2;
}
throw backendError;
}),
);
expectEmitsInOrder(double$, [
2,
4,
6,
8,
{
error: backendError,
},
]);
});
});
The spec I wanted is simple
- Should be possible to test simple sequences
- Easy to read
- Can test completions
- Can test for errors
Implementation
Implementation below is not too complicated. Please feel free to come with suggestions to how we can make developer experience event better 🔥
I could put this code into a small node-package, however it is better if you just put it into your project. The code below should work with Vitest and Jest. Hope this simplifies your RxJS testing as much as it has done for me. Happy hacking.
import {
type Observable,
lastValueFrom,
take,
switchMap,
catchError,
timeout,
tap,
of,
TimeoutError,
} from 'rxjs';
type StreamEvent<T> =
| { type: 'VALUE'; value: T }
| { type: 'ERROR'; error: unknown }
| { type: 'TIMEOUT' }
| { type: 'COMPLETION' };
export const StreamComplete = { type: 'COMPLETION' };
export const StreamTimeout = { type: 'TIMEOUT' };
type StreamEmissionEvent<T> = T | typeof StreamComplete | { error: unknown };
type StreamRecorderOptions = {
timeout?: number;
};
async function streamEmitsInOrder<T>(
stream$: Observable<T>,
events: StreamEmissionEvent<T>[],
options?: StreamRecorderOptions,
) {
const actualEvents: StreamEvent<T>[] = [];
const timeoutMilliseconds = options?.timeout ?? 2000;
try {
const innerStream = stream$.pipe(
timeout({ each: timeoutMilliseconds }),
tap({
next: (valueEvent) =>
actualEvents.push({ type: 'VALUE', value: valueEvent }),
complete: () => actualEvents.push({ type: 'COMPLETION' }),
error: (e) => {
if (e instanceof TimeoutError) {
actualEvents.push({ type: 'TIMEOUT' });
} else {
actualEvents.push({ error: e, type: 'ERROR' });
}
},
}),
);
const expectingStream = of(0).pipe(
switchMap(() => innerStream),
take(events.length),
catchError(() => of()),
);
await lastValueFrom(expectingStream);
} catch {}
return actualEvents;
}
export function expectEmitsInOrder<T>(
stream$: Observable<T>,
events: StreamEmissionEvent<T>[],
options?: {
timeout?: number;
},
) {
expect(streamEmitsInOrder(stream$, events, options)).resolves.toStrictEqual(
streamEmissionEventToStreamEvent(events),
);
}
function streamEmissionEventToStreamEvent<T>(events: StreamEmissionEvent<T>[]) {
return events.map((event) => {
if (event === StreamComplete) {
return StreamComplete;
}
if (typeof event === 'object' && event && 'error' in event) {
return { type: 'ERROR', error: event.error };
}
if (
typeof event === 'object' &&
event &&
'type' in event &&
event.type === 'TIMEOUT'
) {
return { type: 'TIMEOUT' };
}
return { type: 'VALUE', value: event };
});
}
If you love lit and RxJS
I really like writing Lit-elements. Mental model is simple, it is close to bare metal javascript. I have made quite a lot of applications using RxJS and Lit-element and I have taken my abstractions and put it into a simple node-package. Check out Litworks. It is basically RxJS-bindings, routing and simple authentication. Hope you enjoy.
Top comments (1)
Very neat
streamEmitsInOrder
, well done!