It is relatively simple to convert a promise-based function to RxJS observables. Given the following function definition.
async function doSomething<T>(): Promise<T>;
Simply, rename the doSomething
function to something like doSomethingPromise
. Then create a new function doSomething
by simply wrapping the call to doSomethingPromise
using the RxJS from
function as shown in the following example.
async function doSomethingPromise<T>(): Promise<T>;
function doSomething<T>(): Observable<T> {
return from(doSomethingPromise());
}
However, what if you want to rewrite a promise-based function to RxJS observables? In simple cases, it is relatively straightforward but less so if it involves loops.
Context
It is easier to illustrate with an example. Suppose, you have a graph-like data structure (as shown below) and you want to merge the values
such that the values
in the "child" has higher-precedence.
interface Node {
id: string;
parentID?: string;
values: KeyValue[];
}
interface KeyValue {
key: string;
value: string;
}
Here is an example data.
const value3: Node = {
id: 'id-1',
values: [
{ key: 'key-1', value: 'value-1 from value3' },
{ key: 'key-2', value: 'value-2 from value3' },
{ key: 'key-3', value: 'value-3 from value3' },
{ key: 'key-4', value: 'value-4 from value3' },
],
};
const value2: Node = {
id: 'id-2',
parentID: value3.id,
values: [
{ key: 'key-1', value: 'value-1 from value2' },
{ key: 'key-2', value: 'value-2 from value2' },
{ key: 'key-3', value: 'value-3 from value2' },
],
};
const value1: Node = {
id: 'id-3',
parentID: value2.id,
values: [
{ key: 'key-1', value: 'value-1 from value1' },
{ key: 'key-2', value: 'value-2 from value1' },
],
};
The merged values
should be as follows.
const merged = [
{ key: 'key-1', value: 'value-1 from value1' },
{ key: 'key-2', value: 'value-2 from value1' },
{ key: 'key-3', value: 'value-3 from value2' },
{ key: 'key-4', value: 'value-4 from value3' },
];
You may be thinking that the logic to merge the values
can be written without it being asynchronous. But suppose that the data is retrieved from a database through an ORM library such as Prisma, in which the functions are asynchronous. The following is an example.
const prisma = new PrismaClient();
// the findUnique function returns a Promise<Node>
const node = await prisma.node.findUnique({
include: { values: true },
where: { id: 'id-3' },
});
Then suppose the merge
function is as follows.
async function merge(node: Node): Promise<Node> {
let parent: Node | null = node;
const valuesMap = new Map<string, string>();
const values = [] as KeyValue[];
while (parent != null) {
parent.values.reduce(
({ valuesMap, values }, keyValue) => {
if (!valuesMap.has(keyValue.key)) {
valuesMap.set(keyValue.key, keyValue.value);
values.push(keyValue);
}
return { valuesMap, values };
},
{ valuesMap, values },
);
const parentID: string | undefined = parent.parentID;
parent =
parentID == null
? null
: await prisma.node.findUnique({
include: { values: true },
where: { id: parentID },
});
}
return { ...node, values };
}
Since the findUnique
function returns a Promise<Node>
, it is necessary to use await
, which means that the merge
function has to be declared async
.
Observables
How do I rewrite the merge
function to use observables operators?
First, we create an observable from the node
parameter so that we can start piping operators. The while
logic can be replicated using the expand
operator. It is necessary to use the from
function for the findUnique
because we should not be modifying the Prisma library. The observable is completed when parentID
is null
by returning empty (of()
).
Then we use the filter
operator to remove null
s. And extract the values
using the map
operator.
Next, we use the reduce
operator to merge the values
from each node. It is similar to the Array's reduce
function.
Finally, the last map
operator to return the node with the merged values
. The following is a complete example.
function merge(node: Node): Observable<Node> {
return of(node).pipe(
expand((node) => {
const parentID = node?.parentID;
return parentID == null
? of()
: from(
prisma.node.findUnique({
include: { values: true },
where: { id: parentID },
}),
);
}),
filter(Boolean),
map((node) => {
return node.values;
}),
reduce(
({ valuesMap, values }, keyValues) => {
keyValues.forEach((keyValue) => {
if (!valuesMap.has(keyValue.key)) {
valuesMap.set(keyValue.key, keyValue.value);
values.push(keyValue);
}
});
return { valuesMap, values };
},
{
valuesMap: new Map<string, string>(),
values: [] as KeyValue[],
},
),
map(({ values }) => {
return { ...node, values };
}),
);
}
Conclusion
Figuring out how to use "loop"s with RxJS took me a while to figure out. Since I'm fairly new to the RxJS library, it wasn't obvious what was needed. I even tried asking ChatGPT to convert the function to use observables and the results were disastrous.
I hope that this post can provide a little clarity to those who are fairly new to RxJS.
Top comments (0)