Intro
This is part 2 of a 3-part series, where we will explore how to use Redis streams with NestJS.
It is structured in 3 parts:
- Part 1 - Setting up the NestJS application and connecting to Redis
- Part 2 - Populating Redis streams and reading from in fan-out mode
- Part 3 - Using consumer groups to handle one stream from multiple actors in a way that one message is sent to and processed only by a single actor (consumer)
By the end of this series, you will have the knowledge and tools necessary to create your own NestJS app that utilizes Redis streams to handle real-time data.
Full code is available on the github
What we have
In part 1, we have created a working NestJS application that can connect to the Redis server. We have also created an endpoint /ping-redis
for our application that will call a ping command on the Redis server and return its response.
For our development convenience, we run our application and Redis server in a docker. We are using Docker Compose to set up our docker-copose.yml
file for simplicity.
Now we are ready to focus on the main topic - using Redis streams.
What are Redis streams
Before we start with our implementation, it's essential to understand what Redis streams are and clear some misconceptions that might have arisen about what Redis streams are not.
Data structure
At its core, Redis streams are a data structure that holds data - messages, in order by the message IDs.
By default, this ID is a timestamp on when this data was added to the stream (followed by sequence number, in case multiple entries happened at the same timestamp).
Not a list
This all sounds very similar to a list. However, the difference here is that while a list can be popped, streams can't. This is because reading the stream does not mutate it. So when calling LPOP
or RPOP
on a list, the element returned is removed. In streams, XREAD
leaves the item in the stream and can be read multiple times.
Not a socket - there is no "subscribe"
Just by the name "stream," it's easy to think that it's a continuous "stream" of data - something that you would connect to and send you new data once it's appended.
It's a stream in the sense that the data represents some direction - older data with smaller IDs and newer data with larger IDs.
But to access it, we will need to read from it manually, just like with other more superficial data structures, like sets or lists. Once a request has been executed and data is sent from the Redis to the application, the command has been completed, and no new data will be received unless it is called again.
So it would also be wrong to think about streams in the form of Pub-Sub. With Pub-sub, once you subscribe to channels, the connection is blocked, and all new messages published will be delivered. No need to re-subscribe.
Basic functionality
We mostly add to stream (XADD
) and read from it (XREAD
, XRANGE
XREADGROUP
). There is also functionality for deletion (XDEL
), but in most cases, we think of the stream as append-only.
Managing memory
To avoid using up all of the space and evicting data randomly, the length of streams is managed by trimming them. It can be done manually via XTRIM
, or we can move the responsibility to the Redis by using capped streams - adding MAXLEN
option when using XADD
.
Adding to streams
Finally, let's get back into implementing streams in our NestJS application.
Let's create a method to add a message to the stream:
// redis.service.ts
// --snip--
public async addToStream({
fieldsToStore,
streamName,
}: AddToStreamParams): Promise<string> {
// Converting object to record to store in redis
const messageObject = Object.entries(fieldsToStore).reduce(
(acc, [key, value]) => {
if (typeof value === 'undefined') {
return acc;
}
acc[key] = typeof value === 'string' ? value : JSON.stringify(value);
return acc;
},
{} as Record<string, string>,
);
// Adding to stream with trimming - approximately max 100 messages
return this.redis.xAdd(streamName, '*', messageObject, {
TRIM: {
strategy: 'MAXLEN',
strategyModifier: '~',
threshold: 100,
},
});
}
As you can see, we have defined the interface for the parameters of this method:
// interfaces.ts
export interface AddToStreamParams {
fieldsToStore: Record<string, any>;
streamName: string;
}
We will take in the name of the stream we want to add data to and specify data as POJO named fieldsToStore
Next, we have a transformation code that transforms our object of any depth into key-value pairs of string type. Later, when reading data, we will parse this back to our original POJO:
// redis.service.ts
// --snip--
const messageObject = Object.entries(fieldsToStore).reduce(
(acc, [key, value]) => {
if (typeof value === 'undefined') {
return acc;
}
acc[key] = typeof value === 'string' ? value : JSON.stringify(value);
return acc;
},
{} as Record<string, string>,
);
// --snip--
Then we execute a call to Redis service with the following parameters:
// redis.service.ts
// --snip--
return this.redis.xAdd(streamName, '*', messageObject, {
TRIM: {
strategy: 'MAXLEN',
strategyModifier: '~',
threshold: 100,
},
});
// --snip--
-
streamName
- the name of our stream that we want to append to -
*
- asterisk specifies Redis to auto-generate a unique ID (based on timestamp). Here we could set our own ID if we want. -
messageObject
- key-value pairs that contain the data we want to store - Options:
-
TRIM
- as mentioned before, we don't want to handle trimming manually, so we specify that we wish Redis to trim the stream by itself. This includes additional options:-
strategy
MAXLEN
- here, we specify that we want to trim when the stream has reached the maximum length of athreshold
. Other option would beMINID
, where the threshold would specify the smallest ID to keep. The rest are marked for eviction. -
strategyModifier
~
- this tells Redis that we don't want to be strict in size, and Redis can go above the threshold for a little while, but the trim will eventually happen. This optimizes Redis to execute eviction at the most convenient time, not immediately. -
threshold
100
- that is the max length of the stream we want. In the case of MINID, it would be the smallest ID to keep. And since we have strategy modifier~
, this is an approximate value.
-
-
So we would read it like this: Trim stream if the length (MAXLEN) of the stream has reached approximately (~) 100.
Stream handler
Since we will do more stuff with our stream data and we want to keep the basic functionality of Redis server calls on RedisService
, we will create a new service called StreamHandlerService
for additional handling of the streams.
For now, we will just add "pass-through" method addToStream
that will call RedisService
addToStream
method.
To generate it, we'll call: nest g service redis/stream-handler
// stream-handler.service.ts
import { Injectable } from '@nestjs/common';
import { RedisService } from './redis.service';
@Injectable()
export class StreamHandlerService {
constructor(private readonly redisService: RedisService) {}
public addToStream(fieldsToStore: Record<string, any>, streamName: string) {
return this.redisService.addToStream({ fieldsToStore, streamName });
}
}
And since we don't want to expose our RedisService
to other modules and will want them to interact with StreamHandlerService
, we will move the ping call functionality to it and remove export from module.
// stream-handler.service.ts
// --snip--
public ping() {
return this.redisService.ping();
}
// --snip--
To use it in other modules, we need to export it:
// redis.module.ts
import { Module } from '@nestjs/common';
import { redisClientFactory } from './redis-client.factory';
import { RedisService } from './redis.service';
import { StreamHandlerService } from './stream-handler.service';
@Module({
providers: [redisClientFactory, RedisService, StreamHandlerService],
exports: [StreamHandlerService], // Removed RedisService from exports
})
export class RedisModule {}
And in AppService
, we are going to call ping from StreamHandlerService
instead:
// app.service.ts
// --snip--
constructor(private readonly streamService: StreamHandlerService) {} // changed from RedisService to StreamHandlerService
redisPing() {
return this.streamService.ping(); // changed here as well
}
// --snip--
In theory, other modules should not care whether it is Redis, RabbitMQ, Kafka, or even something else we are calling. . It should only know that it is a stream, and we get the specific data by calling methods made public on exported services.
This way, we can easily change our implementation of streams without needing to change other parts of code - they become de-coupled.
Since this is a project about Redis and Redis streams, we have called this moduleRedisModule
, but expanding on the previous taught, IRL it would make more sense to call itStreamModule
orMessagingModule
, or something else, depending on what these streams are going to be used for.
Populating Redis with messages
Let's use our addToStream
method to populate our Redis.
We could go the same route as with ping and expose it to our API endpoint, but this time, since we want to "simulate" data flow, we'll call this method in an interval so that we have an ongoing flow of data in our stream.
Let's add it to our AppService
:
// app.service.ts
// --snip--
@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
private interval: NodeJS.Timeout = null;
// --snip--
private populateStream() {
this.interval = setInterval(() => {
this.streamService.addToStream(
{
hello: 'world',
date: new Date(),
nestedObj: { num: Date.now() % 100 },
},
EXAMPLE_STREAM_NAME,
);
}, 1000);
}
async onModuleInit() {
this.populateStream();
}
onModuleDestroy() {
clearInterval(this.interval);
}
}
As you can see, we have created a populateStream
method that will add some dummy data into our stream.
EXAMPLE_STREAM_NAME
is just a constant of our stream name we have defined in constants.ts
file and imported it here:
// constants.ts
export const EXAMPLE_STREAM_NAME = 'example-stream';
We created an interval that we call on onModuleInit
lifestyle event, and we clear it in our onModuleDestroy
lifecycle event so that we don't leave dangling references.
This will add a message to our Redis stream on every 1000
ms.
To verify that, let's now create a way to read from stream.
Reading from stream
To read from Redis streams, we can use either XREAD
or XRANGE
(and REVXRANGE
) commands. Since the goal of this project is to read new messages, both commands could be used to achieve this goal. We are going to use XREAD
in this article.
Let's add a method that wraps XREAD
command call and does some extra error handling for us:
// redis.service.ts
// --snip--
public async readStream({
streamName,
blockMs,
count,
lastMessageId,
}: ReadStreamParams): Promise<RedisStreamMessage[] | null> {
try {
const response = await this.redis.xRead(
commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
[
{
key: streamName,
id: lastMessageId,
},
],
{ BLOCK: blockMs, COUNT: count },
);
const { messages } = response?.[0]; // returning first stream (since only 1 stream used)
return messages || null;
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
console.error(
`Failed to xRead from Redis Stream: ${error.message}`,
error,
);
return null;
}
}
Let's break down this code:
First, notice that we have added a parameters interface ReadStreamParams
:
// interfaces.ts
export interface ReadStreamParams {
streamName: string;
blockMs: number;
lastMessageId: string;
}
// --snip--
as well as we have defined our response message RedisStreamMessage
. Since node-redis
is scarce with its type exports, we need to extract it on our own:
// redis-client.types
// --snip--
export type RedisStreamMessage = Awaited<
ReturnType<RedisClient['xRead']>
>[number]['messages'][number];
// --snip--
In the try
block, we have our command call:
// --snip--
const response = await this.redis.xRead(
commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
[
{
key: streamName,
id: lastMessageId,
},
],
{ BLOCK: blockMs, COUNT: count },
);
const { messages } = response?.[0]; // returning first stream (since only 1 stream used)
return messages || null;
} catch (error) {
// --snip--
We first use the call command option to run it in isolation commandOptions({ isolated: true })
.
This is needed because we are going to BLOCK
the connection. This means that once this command call takes a connection to the Redis server if the results won't be on the Redis at the time the call arrives, it will wait for the specified amount of time (BLOCK
parameter) and, if during that period, data arrives that matches the request, it will be returned immediately, or it will wait till the end of the specified time and return nothing.
During this time, the connection is used-up, and no other commands can be called. Essentially, our command BLOCKS the connection.
If we would like to create multiple calls, we would quickly run into delays and performance problems. For this, we can use isolated execution
. This means that node-redis
will create multiple connections using Generic resource pool under the hood, so we'll have multiple connections to use in parallel.
Next, we say which streams we want to read and starting from what ID. We need to pass it in an array, since we could call multiple streams in a single call. We'll handling of multiple streams in one request out of the scope of this article:
[
{
key: streamName,
id: lastMessageId,
},
],
Next we pass optional parameters:
{ BLOCK: blockMs, COUNT: count },
- BLOCK - is blocking time in milliseconds. *If
0
is specified, it will block indefinitely until at least 1 message can be returned. * - COUNT - this is a maximum count we will get returned. If there are fewer messages available on Redis server, it will not wait for the count to match but return that many messages how many there are.
Next we unravel the data:
const { messages } = response?.[0];
return messages || null;
Since we called only 1 stream, the response
array will always be of length 1.
All messages returned in an messages
object.
And finally, we return either the stream messages array or null
.
Here is also the place we can handle some initial errors:
// --- snip---
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
console.error(
`Failed to xRead from Redis Stream: ${error.message}`,
error,
);
return null;
}
In case of an error, we will return null
value.
If we have ClientClosedError
, there is no use tying to fetch anything further, so we try to reconnect to the Redis.
// --- snip---
private async connectToRedis() {
try {
// Try to reconnect only if the connection socket is closed. Else let it be handled by reconnect strategy.
if (!this.redis.isOpen) {
await this.redis.connect();
}
} catch (error) {
console.error(
`[${error.name}] ${error.message}`,
error,
);
}
}
We check for isOpen
to see if the socket is open. If it is, we will trust reconnect strategy to do its job and reconnect to the client. If it's not, then we are trying to do it ourselves.
And that's how we can fetch a single message from a stream. Let's expand on this and add the ability to read multiple messages.
Exposing read to our API
Exposing the Redis stream to our API is similar to what we did with ping
.
Let's add another method to our StreamHandlerService
:
//stream-handler.service.ts
// --- snip --
public readFromStream(streamName, count) {
return this.redisService.readStream({
streamName,
blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
count, // max how many messages to fetch at a time
lastMessageId: '$',
});
}
Simple method where we pass streamName
of the stream we want to read and count
of messages we would like to receive. As mentioned before, COUNT
actually is "max count" since, if there are less or no messages on stream, Redis will return from 1 to count
number of messages.
We are using special ID $
- this represents "anything that comes after the XREAD
request.
Now let's handle reading a single message:
// app.service.ts
// --snip--
public getSingleNewMessage() {
return this.streamService.readFromStream(EXAMPLE_STREAM_NAME, 1);
}
// --snip--
Again, using EXAMPLE_STREAM_NAME
- same stream that we populate, and setting count
to 1.
And finally, let's add a new endpoint to our controller:
// app.controller.ts
// --snip--
@Get('message')
getMessage() {
return this.appService.getSingleNewMessage();
}
// --snip--
Now calling the endpoint we should see single message generated by our populateStream
method:
Handling results as a stream
Getting a single message is relatively straight forward.
But how do we get multiple messages? What if we want to get a specific number of messages, e.g., precisely 3?
At first, it might seem simple - just set a count
to the number of messages you want. But remember - COUNT
does not guarantee that we will get the exact number of messages. It's more of a "max count". So you might be tempted to call XREAD
multiple times. But, since we are passing $
as our last message ID, we might miss some messages that come in between our calls.
That means that we need to remember our last id.
And what if we want to read from a stream continuously and indefinitely?
This all adds additional complexity that we need to handle.
Generators
Generators are a valuable tool to have in our toolbox. Here they come in handy when we want to fetch the data from some external resource (Redis) lazily.
I will not focus on the "what and how" of generators.
If you haven't used them before, I encourage you to check JavaScript Generators reference and Dr. Axel Rauschmayer's book's "JavaScript for impatient programmers" on asynchronous generators.
The main feature we can do is create an infinite loop that we can stop and restart whenever we want and need.
Let's add a new method to our StreamHandlerService
:
// stream-handler.service.ts
export class StreamHandlerService implements OnModuleDestroy {
private isAlive = true;
onModuleDestroy() {
this.isAlive = false;
}
// --snip--
public async *getStreamMessageGenerator(
streamName: string,
count: number,
): AsyncRedisStreamGenerator {
// Start with latest data
let lastMessageId = '$';
while (this.isAlive) {
const response = await this.redisService.readStream({
streamName,
blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
count, // max how many messages to fetch at a time
lastMessageId,
});
// If no messages returned, continue to next iteration without yielding
if (!response || response.length === 0) {
continue;
}
// Update last message id to be the last message returned from redis
lastMessageId = response[response.length - 1].id;
for (const message of response) {
yield message;
}
}
}
// --snip--
Let's break down what we have written:
First, you will notice that we have added a new boolean isAlive
that we will initially set to true
, and only during the destruction of the module we will set it to false
. This will serve as a while-true loop that we can exit from when the application ends.
Also, for our convenience, we have created a new type AsyncRedisStreamGenerator
// redis-client.type.ts
// --snip--
export type AsyncRedisStreamGenerator = AsyncGenerator<
RedisStreamMessage,
void,
unknown
>;
// --snip--
This is an async generator that will yield
or produce RedisStreamMessage
values, will return nothing (void
) when it returns and will accept unknown
messages, since we won't pass anything to next
method of our generator.
We are going to call readStream
method in an infinite loop:
let lastMessageId = '$';
while (this.isAlive) {
const response = await this.redisService.readStream({
streamName,
blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
count, // max how many messages to fetch at a time
lastMessageId,
});
// -- snip --
}
As in our previous example, we will start to read data only when the generator is created and calls the stream by using $
as lastMessageId
.
The rest is also the same as in the previous example, except we are calling it in a loop.
Next, we will handle the case where we get an empty response. We will simply go into the next iteration without yielding any results
// If no messages returned, continue to next iteration without yielding
if (!response || response.length === 0) {
continue;
}
If we have some responses, we are going to handle them:
// Update last message id to be the last message returned from redis
lastMessageId = response[response.length - 1].id;
for (const message of response) {
yield message;
}
To avoid gaps in messages and read all messages from the time we started this generator, we will update lastMessageId
variable with the last message we received.
Since we have count
, we will get an array of sizes ranging from 1 to count
with messages. However, this generator produces only a single message at a time. So we will iterate through the array and yield
only one message at once.
That is for the generator part. Let's see how we can use it to solve our previous questions:
Reading multiple results
Now let's use our generator to generate multiple results by creating a new method in AppService
// app.service.ts
// --snip--
public async getMultipleNewMessages(count: number) {
const generator = this.streamService.getStreamMessageGenerator(
EXAMPLE_STREAM_NAME,
count,
);
const messages: Record<string, string>[] = [];
let counter = 0;
for await (const messageObj of generator) {
messages.push(this.parseMessage(messageObj.message));
counter++;
if (counter >= count) {
break;
}
}
return messages;
}
// --snip--
First, we create a generator:
const generator = this.streamService.getStreamMessageGenerator(
EXAMPLE_STREAM_NAME,
count,
);
We are passing it our count for "optimistic" fetch. We will try to get everything in a single Redis call. But if that does not happen, we will call Redis as often as needed.
We create an array to collect our messages:
const messages: Record<string, string>[] = [];
and define a counter
where we will count how many we have received.
One convenient feature of generators is that they are iterable as they implement @@iterator
method. So we can call for..of
on them. In this case, since the generator is async it's for await .. of
for await (const messageObj of generator) {
messages.push(this.parseMessage(messageObj.message));
counter++;
if (counter >= count) {
break;
}
}
return messages;
Once the count is reached, we simply break
out of the loop and then return our collected messages.
Since Redis stores all values as strings, we also have created a generic parser method:
//app.service.ts
// --snip--
private parseMessage(message: Record<string, string>) {
return Object.entries(message).reduce((acc, [key, value]) => {
try{
acc[key] = JSON.parse(value);
}catch(e){
acc[key] =value
}
return acc;
}, {});
}
// --snip--
To check that we indeed are getting specified amount of messages, we'll create a new endpoint /messages
where we will fetch 3 messages :
// app.controller.ts
// --snip--
@Get('messages')
getMessages() {
return this.appService.getMultipleNewMessages(3);
}
// --snip--
Again, when we call our newly created endpoint, we indeed, have 3 messages fetched:
Continuous reading of the stream
The biggest power of a Redis stream comes from using it as a continuous data stream. To do that, we can re-use our generator:
// app.service.ts
// --snip--
private isAlive = true;
// --snip--
async onModuleInit() {
// --snip--
this.continuousReadMessages();
}
onModuleDestroy() {
// --snip--
this.isAlive = false;
}
// --snip--
private async continuousReadMessages() {
const generator = this.streamService.getStreamMessageGenerator(
EXAMPLE_STREAM_NAME,
3,
);
for await (const messageObj of generator) {
console.log(
`Got message with ID: ${messageObj.id}`,
JSON.stringify(this.parseMessage(messageObj.message), undefined, 2),
);
if (!this.isAlive) {
break;
}
}
}
// --snip--
Same as with multiple message fetch, we start by creating a generator:
const generator = this.streamService.getStreamMessageGenerator(
EXAMPLE_STREAM_NAME,
10,
);
We give it an arbitrary number for count
since we want to try to fetch multiple responses in one Redis call, if possible.
And then we just do whatever we need to do with the data in the loop. So here it's simply logging the message.
And breaking the loop once the module is destroyed, we don't have any dangling references and can stop our loop.
for await (const messageObj of generator) {
console.log(
`Got message with ID: ${messageObj.id}`,
JSON.stringify(this.parseMessage(messageObj.message), undefined, 2),
);
if (!this.isAlive) {
break;
}
}
And we call this method once on our module init, the same as we did with populating stream:
async onModuleInit() {
this.populateStream();
this.continuousReadMessages();
}
And in our console, we should see that we are fetching the messages continuously
...
app | Got message with ID: 1675377159049-0 {
app | "hello": "world",
app | "date": "2023-02-02T22:32:39.048Z",
app | "nestedObj": {
app | "num": 48
app | }
app | }
app | Got message with ID: 1675377160050-0 {
app | "hello": "world",
app | "date": "2023-02-02T22:32:40.050Z",
app | "nestedObj": {
app | "num": 50
app | }
app | }
...
Refactor for a single message
Since we are using the generator to handle stream messages, let's refactor our single message fetch example.
// app.service.ts
// --snip--
public async getSingleNewMessage() {
const generator = this.streamService.getStreamMessageGenerator(
EXAMPLE_STREAM_NAME,
1,
);
const messageObj = await generator.next();
if (!messageObj.done && messageObj.value) {
return this.parseMessage(messageObj.value.message);
}
}
// --snip--
Same as for all examples, we are creating a generator on our EXAMPLE_STREAM_NAME
stream, this time with count
1.
const generator = this.streamService.getStreamMessageGenerator(
EXAMPLE_STREAM_NAME,
1,
);
Since we need only a single message, we do not need to use a loop. Instead, we call next
method manually:
const messageObj = await generator.next();
and then handle the response - if the generator has not returned/ended - it's not done
and we have a message t return, we parse message and return it to the client:
if (!messageObj.done && messageObj.value) {
return this.parseMessage(messageObj.value.message);
}
Everything else stays the same for the client, and the same result is returned.
This is the end of part 2 of the 3-part series.
We built on what we created in part 1. As a result, you should have an application that can fetch single or multiple responses and continuously fetch new results from the Redis stream.
In part 3, we are going to look at how we can use consumer groups to divide messages to multiple consumers so that each one gets a unique set of messages (not like we have now, where if we connected more clients, all clients would get getting all of the messages).
Top comments (1)
I'd like to extend my gratitude to the author for providing such valuable insights within the article.
Although the information was incredibly useful, I encountered an issue while attempting to implement the Redis module as described in the article. I imported Redis Module into 2 separate modules in my NestJS App. In each module, I set up a distinct consumer service, injected StreamHandlerService to it, and implemented
continuousReadMessages()
method, which in its turn, read particular Redis stream continuously (each consumer read data from a separate stream).However, things get tricky when both consumers are active, as I encounter issues. Messages seem to go missing, some aren't processed at all. Interestingly, when I run only one consumer (no matter which one), everything works perfectly. Could you please advise if I have missed or misunderstood something?