DEV Community

Cover image for Redis Streams + NestJS: Part 2 | Reading from Stream
Krisjanis Kallings
Krisjanis Kallings

Posted on • Originally published at hackernoon.com

Redis Streams + NestJS: Part 2 | Reading from Stream

Intro

This is part 2 of a 3-part series, where we will explore how to use Redis streams with NestJS.

It is structured in 3 parts:

  • Part 1 - Setting up the NestJS application and connecting to Redis
  • Part 2 - Populating Redis streams and reading from in fan-out mode
  • Part 3 - Using consumer groups to handle one stream from multiple actors in a way that one message is sent to and processed only by a single actor (consumer)

By the end of this series, you will have the knowledge and tools necessary to create your own NestJS app that utilizes Redis streams to handle real-time data.

Full code is available on the github

What we have

In part 1, we have created a working NestJS application that can connect to the Redis server. We have also created an endpoint /ping-redis for our application that will call a ping command on the Redis server and return its response.
For our development convenience, we run our application and Redis server in a docker. We are using Docker Compose to set up our docker-copose.yml file for simplicity.

Now we are ready to focus on the main topic - using Redis streams.

What are Redis streams

Before we start with our implementation, it's essential to understand what Redis streams are and clear some misconceptions that might have arisen about what Redis streams are not.

Data structure

At its core, Redis streams are a data structure that holds data - messages, in order by the message IDs.
By default, this ID is a timestamp on when this data was added to the stream (followed by sequence number, in case multiple entries happened at the same timestamp).

Not a list

This all sounds very similar to a list. However, the difference here is that while a list can be popped, streams can't. This is because reading the stream does not mutate it. So when calling LPOP or RPOP on a list, the element returned is removed. In streams, XREAD leaves the item in the stream and can be read multiple times.

Not a socket - there is no "subscribe"

Just by the name "stream," it's easy to think that it's a continuous "stream" of data - something that you would connect to and send you new data once it's appended.
It's a stream in the sense that the data represents some direction - older data with smaller IDs and newer data with larger IDs.
But to access it, we will need to read from it manually, just like with other more superficial data structures, like sets or lists. Once a request has been executed and data is sent from the Redis to the application, the command has been completed, and no new data will be received unless it is called again.
So it would also be wrong to think about streams in the form of Pub-Sub. With Pub-sub, once you subscribe to channels, the connection is blocked, and all new messages published will be delivered. No need to re-subscribe.

Basic functionality

We mostly add to stream (XADD) and read from it (XREAD, XRANGE XREADGROUP). There is also functionality for deletion (XDEL), but in most cases, we think of the stream as append-only.

Managing memory

To avoid using up all of the space and evicting data randomly, the length of streams is managed by trimming them. It can be done manually via XTRIM, or we can move the responsibility to the Redis by using capped streams - adding MAXLEN option when using XADD.

Adding to streams

Finally, let's get back into implementing streams in our NestJS application.
Let's create a method to add a message to the stream:

// redis.service.ts

// --snip--
  public async addToStream({
    fieldsToStore,
    streamName,
  }: AddToStreamParams): Promise<string> {
    // Converting object to record to store in redis
    const messageObject = Object.entries(fieldsToStore).reduce(
      (acc, [key, value]) => {
        if (typeof value === 'undefined') {
          return acc;
        }
        acc[key] = typeof value === 'string' ? value : JSON.stringify(value);

        return acc;
      },
      {} as Record<string, string>,
    );

    // Adding to stream with trimming - approximately max 100 messages
    return this.redis.xAdd(streamName, '*', messageObject, {
      TRIM: {
        strategy: 'MAXLEN',
        strategyModifier: '~',
        threshold: 100,
      },
    });
  }

Enter fullscreen mode Exit fullscreen mode

As you can see, we have defined the interface for the parameters of this method:

// interfaces.ts

export interface AddToStreamParams {
  fieldsToStore: Record<string, any>;
  streamName: string;
}
Enter fullscreen mode Exit fullscreen mode

We will take in the name of the stream we want to add data to and specify data as POJO named fieldsToStore

Next, we have a transformation code that transforms our object of any depth into key-value pairs of string type. Later, when reading data, we will parse this back to our original POJO:

// redis.service.ts

// --snip--
    const messageObject = Object.entries(fieldsToStore).reduce(
      (acc, [key, value]) => {
        if (typeof value === 'undefined') {
          return acc;
        }
        acc[key] = typeof value === 'string' ? value : JSON.stringify(value);

        return acc;
      },
      {} as Record<string, string>,
    );
// --snip--
Enter fullscreen mode Exit fullscreen mode

Then we execute a call to Redis service with the following parameters:

// redis.service.ts

// --snip--
return this.redis.xAdd(streamName, '*', messageObject, {
      TRIM: {
        strategy: 'MAXLEN',
        strategyModifier: '~',
        threshold: 100,
      },
    });
// --snip--
Enter fullscreen mode Exit fullscreen mode
  • streamName - the name of our stream that we want to append to
  • * - asterisk specifies Redis to auto-generate a unique ID (based on timestamp). Here we could set our own ID if we want.
  • messageObject - key-value pairs that contain the data we want to store
  • Options:
    • TRIM - as mentioned before, we don't want to handle trimming manually, so we specify that we wish Redis to trim the stream by itself. This includes additional options:
      • strategy MAXLEN - here, we specify that we want to trim when the stream has reached the maximum length of a threshold. Other option would be MINID, where the threshold would specify the smallest ID to keep. The rest are marked for eviction.
      • strategyModifier ~ - this tells Redis that we don't want to be strict in size, and Redis can go above the threshold for a little while, but the trim will eventually happen. This optimizes Redis to execute eviction at the most convenient time, not immediately.
      • threshold 100 - that is the max length of the stream we want. In the case of MINID, it would be the smallest ID to keep. And since we have strategy modifier ~, this is an approximate value.

So we would read it like this: Trim stream if the length (MAXLEN) of the stream has reached approximately (~) 100.

Stream handler

Since we will do more stuff with our stream data and we want to keep the basic functionality of Redis server calls on RedisService, we will create a new service called StreamHandlerService for additional handling of the streams.
For now, we will just add "pass-through" method addToStream that will call RedisService addToStream method.
To generate it, we'll call: nest g service redis/stream-handler

// stream-handler.service.ts

import { Injectable } from '@nestjs/common';
import { RedisService } from './redis.service';

@Injectable()
export class StreamHandlerService {

constructor(private readonly redisService: RedisService) {}

  public addToStream(fieldsToStore: Record<string, any>, streamName: string) {
    return this.redisService.addToStream({ fieldsToStore, streamName });
  }
}
Enter fullscreen mode Exit fullscreen mode

And since we don't want to expose our RedisService to other modules and will want them to interact with StreamHandlerService, we will move the ping call functionality to it and remove export from module.

// stream-handler.service.ts

// --snip--
  public ping() {
    return this.redisService.ping();
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

To use it in other modules, we need to export it:

// redis.module.ts

import { Module } from '@nestjs/common';
import { redisClientFactory } from './redis-client.factory';
import { RedisService } from './redis.service';
import { StreamHandlerService } from './stream-handler.service';

@Module({
  providers: [redisClientFactory, RedisService, StreamHandlerService],
  exports: [StreamHandlerService], // Removed RedisService from exports
})
export class RedisModule {}
Enter fullscreen mode Exit fullscreen mode

And in AppService, we are going to call ping from StreamHandlerService instead:

// app.service.ts

// --snip--
  constructor(private readonly streamService: StreamHandlerService) {} // changed from RedisService to StreamHandlerService

  redisPing() {
    return this.streamService.ping(); // changed here as well
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

In theory, other modules should not care whether it is Redis, RabbitMQ, Kafka, or even something else we are calling. . It should only know that it is a stream, and we get the specific data by calling methods made public on exported services.
This way, we can easily change our implementation of streams without needing to change other parts of code - they become de-coupled.
Since this is a project about Redis and Redis streams, we have called this module RedisModule, but expanding on the previous taught, IRL it would make more sense to call it StreamModule or MessagingModule, or something else, depending on what these streams are going to be used for.

Populating Redis with messages

Let's use our addToStream method to populate our Redis.
We could go the same route as with ping and expose it to our API endpoint, but this time, since we want to "simulate" data flow, we'll call this method in an interval so that we have an ongoing flow of data in our stream.
Let's add it to our AppService:

// app.service.ts

// --snip--

@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
  private interval: NodeJS.Timeout = null;

// --snip--

  private populateStream() {
    this.interval = setInterval(() => {
      this.streamService.addToStream(
        {
          hello: 'world',
          date: new Date(),
          nestedObj: { num: Date.now() % 100 },
        },
        EXAMPLE_STREAM_NAME,
      );
    }, 1000);
  }

  async onModuleInit() {
    this.populateStream();
  }

  onModuleDestroy() {
    clearInterval(this.interval);
  }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, we have created a populateStream method that will add some dummy data into our stream.
EXAMPLE_STREAM_NAME is just a constant of our stream name we have defined in constants.ts file and imported it here:

// constants.ts
export const EXAMPLE_STREAM_NAME = 'example-stream';
Enter fullscreen mode Exit fullscreen mode

We created an interval that we call on onModuleInit lifestyle event, and we clear it in our onModuleDestroy lifecycle event so that we don't leave dangling references.

This will add a message to our Redis stream on every 1000 ms.
To verify that, let's now create a way to read from stream.

Reading from stream

To read from Redis streams, we can use either XREAD or XRANGE (and REVXRANGE) commands. Since the goal of this project is to read new messages, both commands could be used to achieve this goal. We are going to use XREAD in this article.
Let's add a method that wraps XREAD command call and does some extra error handling for us:

// redis.service.ts

// --snip--

  public async readStream({
    streamName,
    blockMs,
    count,
    lastMessageId,
  }: ReadStreamParams): Promise<RedisStreamMessage[] | null> {
    try {
      const response = await this.redis.xRead(
        commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
        [
          {
            key: streamName,
            id: lastMessageId,
          },
        ],
        { BLOCK: blockMs, COUNT: count },
      );

      const { messages } = response?.[0]; // returning first stream (since only 1 stream used)

      return messages || null;
    } catch (error) {
      if (error instanceof ClientClosedError) {
        console.log(`${error.message} ...RECONNECTING`);
        await this.connectToRedis();
        return null;
      }
      console.error(
        `Failed to xRead from Redis Stream: ${error.message}`,
        error,
      );
      return null;
    }
  }
Enter fullscreen mode Exit fullscreen mode

Let's break down this code:

First, notice that we have added a parameters interface ReadStreamParams:

// interfaces.ts

export interface ReadStreamParams {
  streamName: string;
  blockMs: number;
  lastMessageId: string;
}
// --snip--
Enter fullscreen mode Exit fullscreen mode

as well as we have defined our response message RedisStreamMessage. Since node-redis is scarce with its type exports, we need to extract it on our own:



// redis-client.types

// --snip--

export type RedisStreamMessage = Awaited<
  ReturnType<RedisClient['xRead']>
>[number]['messages'][number];

// --snip--
Enter fullscreen mode Exit fullscreen mode

In the try block, we have our command call:


// --snip--
      const response = await this.redis.xRead(
        commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
        [
          {
            key: streamName,
            id: lastMessageId,
          },
        ],
        { BLOCK: blockMs, COUNT: count },
      );

      const { messages } = response?.[0]; // returning first stream (since only 1 stream used)

      return messages || null;
    } catch (error) {
// --snip--
Enter fullscreen mode Exit fullscreen mode

We first use the call command option to run it in isolation commandOptions({ isolated: true }).
This is needed because we are going to BLOCK the connection. This means that once this command call takes a connection to the Redis server if the results won't be on the Redis at the time the call arrives, it will wait for the specified amount of time (BLOCK parameter) and, if during that period, data arrives that matches the request, it will be returned immediately, or it will wait till the end of the specified time and return nothing.
During this time, the connection is used-up, and no other commands can be called. Essentially, our command BLOCKS the connection.

If we would like to create multiple calls, we would quickly run into delays and performance problems. For this, we can use isolated execution. This means that node-redis will create multiple connections using Generic resource pool under the hood, so we'll have multiple connections to use in parallel.

Next, we say which streams we want to read and starting from what ID. We need to pass it in an array, since we could call multiple streams in a single call. We'll handling of multiple streams in one request out of the scope of this article:

        [
          {
            key: streamName,
            id: lastMessageId,
          },
        ],
Enter fullscreen mode Exit fullscreen mode

Next we pass optional parameters:

        { BLOCK: blockMs, COUNT: count },
Enter fullscreen mode Exit fullscreen mode
  • BLOCK - is blocking time in milliseconds. *If 0 is specified, it will block indefinitely until at least 1 message can be returned. *
  • COUNT - this is a maximum count we will get returned. If there are fewer messages available on Redis server, it will not wait for the count to match but return that many messages how many there are.

Next we unravel the data:

      const { messages } = response?.[0];

      return messages || null;
Enter fullscreen mode Exit fullscreen mode

Since we called only 1 stream, the response array will always be of length 1.
All messages returned in an messages object.
And finally, we return either the stream messages array or null.

Here is also the place we can handle some initial errors:

// --- snip---
    } catch (error) {
      if (error instanceof ClientClosedError) {
        console.log(`${error.message} ...RECONNECTING`);
        await this.connectToRedis();
        return null;
      }
      console.error(
        `Failed to xRead from Redis Stream: ${error.message}`,
        error,
      );
      return null;
    }
Enter fullscreen mode Exit fullscreen mode

In case of an error, we will return null value.
If we have ClientClosedError, there is no use tying to fetch anything further, so we try to reconnect to the Redis.

// --- snip---
  private async connectToRedis() {
    try {
      // Try to reconnect only if the connection socket is closed. Else let it be handled by reconnect strategy.
      if (!this.redis.isOpen) {
        await this.redis.connect();
      }
    } catch (error) {
      console.error(
        `[${error.name}] ${error.message}`,
        error,
      );
    }
  }
Enter fullscreen mode Exit fullscreen mode

We check for isOpen to see if the socket is open. If it is, we will trust reconnect strategy to do its job and reconnect to the client. If it's not, then we are trying to do it ourselves.

And that's how we can fetch a single message from a stream. Let's expand on this and add the ability to read multiple messages.

Exposing read to our API

Exposing the Redis stream to our API is similar to what we did with ping.
Let's add another method to our StreamHandlerService:

//stream-handler.service.ts

// --- snip --
  public readFromStream(streamName, count) {
    return this.redisService.readStream({
      streamName,
      blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
      count, // max how many messages to fetch at a time
      lastMessageId: '$',
    });
  }
Enter fullscreen mode Exit fullscreen mode

Simple method where we pass streamName of the stream we want to read and count of messages we would like to receive. As mentioned before, COUNT actually is "max count" since, if there are less or no messages on stream, Redis will return from 1 to count number of messages.
We are using special ID $ - this represents "anything that comes after the XREAD request.

Now let's handle reading a single message:

// app.service.ts

// --snip--
  public getSingleNewMessage() {
    return this.streamService.readFromStream(EXAMPLE_STREAM_NAME, 1);
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

Again, using EXAMPLE_STREAM_NAME - same stream that we populate, and setting count to 1.

And finally, let's add a new endpoint to our controller:

// app.controller.ts

// --snip--
  @Get('message')
  getMessage() {
    return this.appService.getSingleNewMessage();
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

Now calling the endpoint we should see single message generated by our populateStream method:

Stream message displayed in browser

Handling results as a stream

Getting a single message is relatively straight forward.
But how do we get multiple messages? What if we want to get a specific number of messages, e.g., precisely 3?
At first, it might seem simple - just set a count to the number of messages you want. But remember - COUNT does not guarantee that we will get the exact number of messages. It's more of a "max count". So you might be tempted to call XREAD multiple times. But, since we are passing $ as our last message ID, we might miss some messages that come in between our calls.
That means that we need to remember our last id.

And what if we want to read from a stream continuously and indefinitely?

This all adds additional complexity that we need to handle.

Generators

Generators are a valuable tool to have in our toolbox. Here they come in handy when we want to fetch the data from some external resource (Redis) lazily.
I will not focus on the "what and how" of generators.
If you haven't used them before, I encourage you to check JavaScript Generators reference and Dr. Axel Rauschmayer's book's "JavaScript for impatient programmers" on asynchronous generators.

The main feature we can do is create an infinite loop that we can stop and restart whenever we want and need.

Let's add a new method to our StreamHandlerService:

// stream-handler.service.ts

export class StreamHandlerService implements OnModuleDestroy {

  private isAlive = true;

  onModuleDestroy() {
    this.isAlive = false;
  }

// --snip--
  public async *getStreamMessageGenerator(
    streamName: string,
    count: number,
  ): AsyncRedisStreamGenerator {
    // Start with latest data
    let lastMessageId = '$';
    while (this.isAlive) {
      const response = await this.redisService.readStream({
        streamName,
        blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
        count, // max how many messages to fetch at a time
        lastMessageId,
      });

      // If no messages returned, continue to next iteration without yielding
      if (!response || response.length === 0) {
        continue;
      }
      // Update last message id to be the last message returned from redis
      lastMessageId = response[response.length - 1].id;
      for (const message of response) {
        yield message;
      }
    }
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

Let's break down what we have written:

First, you will notice that we have added a new boolean isAlive that we will initially set to true, and only during the destruction of the module we will set it to false. This will serve as a while-true loop that we can exit from when the application ends.

Also, for our convenience, we have created a new type AsyncRedisStreamGenerator

// redis-client.type.ts

// --snip--
export type AsyncRedisStreamGenerator = AsyncGenerator<
  RedisStreamMessage,
  void,
  unknown
>;
// --snip--
Enter fullscreen mode Exit fullscreen mode

This is an async generator that will yield or produce RedisStreamMessage values, will return nothing (void) when it returns and will accept unknown messages, since we won't pass anything to next method of our generator.

We are going to call readStream method in an infinite loop:

    let lastMessageId = '$';
    while (this.isAlive) {
      const response = await this.redisService.readStream({
        streamName,
        blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
        count, // max how many messages to fetch at a time
        lastMessageId,
      });
// -- snip --
    }

Enter fullscreen mode Exit fullscreen mode

As in our previous example, we will start to read data only when the generator is created and calls the stream by using $ as lastMessageId.
The rest is also the same as in the previous example, except we are calling it in a loop.

Next, we will handle the case where we get an empty response. We will simply go into the next iteration without yielding any results

   // If no messages returned, continue to next iteration without yielding
      if (!response || response.length === 0) {
        continue;
      }
Enter fullscreen mode Exit fullscreen mode

If we have some responses, we are going to handle them:

      // Update last message id to be the last message returned from redis
      lastMessageId = response[response.length - 1].id;
      for (const message of response) {
        yield message;
      }
Enter fullscreen mode Exit fullscreen mode

To avoid gaps in messages and read all messages from the time we started this generator, we will update lastMessageId variable with the last message we received.

Since we have count, we will get an array of sizes ranging from 1 to count with messages. However, this generator produces only a single message at a time. So we will iterate through the array and yield only one message at once.

That is for the generator part. Let's see how we can use it to solve our previous questions:

Reading multiple results

Now let's use our generator to generate multiple results by creating a new method in AppService

// app.service.ts

// --snip--
  public async getMultipleNewMessages(count: number) {
    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      count,
    );
    const messages: Record<string, string>[] = [];
    let counter = 0;
    for await (const messageObj of generator) {
      messages.push(this.parseMessage(messageObj.message));
      counter++;
      if (counter >= count) {
        break;
      }
    }
    return messages;
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

First, we create a generator:

    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      count,
    );
Enter fullscreen mode Exit fullscreen mode

We are passing it our count for "optimistic" fetch. We will try to get everything in a single Redis call. But if that does not happen, we will call Redis as often as needed.
We create an array to collect our messages:

    const messages: Record<string, string>[] = [];
Enter fullscreen mode Exit fullscreen mode

and define a counter where we will count how many we have received.

One convenient feature of generators is that they are iterable as they implement @@iterator method. So we can call for..of on them. In this case, since the generator is async it's for await .. of

    for await (const messageObj of generator) {
      messages.push(this.parseMessage(messageObj.message));
      counter++;
      if (counter >= count) {
        break;
      }
    }
    return messages;
Enter fullscreen mode Exit fullscreen mode

Once the count is reached, we simply break out of the loop and then return our collected messages.

Since Redis stores all values as strings, we also have created a generic parser method:

//app.service.ts

// --snip--
  private parseMessage(message: Record<string, string>) {
    return Object.entries(message).reduce((acc, [key, value]) => {
      try{
      acc[key] = JSON.parse(value);
      }catch(e){
        acc[key] =value
      }
      return acc;
    }, {});
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

To check that we indeed are getting specified amount of messages, we'll create a new endpoint /messages where we will fetch 3 messages :

// app.controller.ts

// --snip--
  @Get('messages')
  getMessages() {
    return this.appService.getMultipleNewMessages(3);
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

Again, when we call our newly created endpoint, we indeed, have 3 messages fetched:

Multiple stream messages displayed in browser

Continuous reading of the stream

The biggest power of a Redis stream comes from using it as a continuous data stream. To do that, we can re-use our generator:

// app.service.ts

// --snip--
  private isAlive = true;
// --snip--
  async onModuleInit() {
// --snip--
    this.continuousReadMessages();
  }
  onModuleDestroy() {
// --snip--

    this.isAlive = false;
  }
// --snip--
  private async continuousReadMessages() {
    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      3,
    );
    for await (const messageObj of generator) {
      console.log(
        `Got message with ID: ${messageObj.id}`,
        JSON.stringify(this.parseMessage(messageObj.message), undefined, 2),
      );
      if (!this.isAlive) {
        break;
      }
    }
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

Same as with multiple message fetch, we start by creating a generator:

    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      10,
    );
Enter fullscreen mode Exit fullscreen mode

We give it an arbitrary number for count since we want to try to fetch multiple responses in one Redis call, if possible.

And then we just do whatever we need to do with the data in the loop. So here it's simply logging the message.

And breaking the loop once the module is destroyed, we don't have any dangling references and can stop our loop.

    for await (const messageObj of generator) {
      console.log(
        `Got message with ID: ${messageObj.id}`,
        JSON.stringify(this.parseMessage(messageObj.message), undefined, 2),
      );
      if (!this.isAlive) {
        break;
      }
    }
Enter fullscreen mode Exit fullscreen mode

And we call this method once on our module init, the same as we did with populating stream:


  async onModuleInit() {
    this.populateStream();
    this.continuousReadMessages();
  }
Enter fullscreen mode Exit fullscreen mode

And in our console, we should see that we are fetching the messages continuously

...
app      | Got message with ID: 1675377159049-0 {
app      |   "hello": "world",
app      |   "date": "2023-02-02T22:32:39.048Z",
app      |   "nestedObj": {
app      |     "num": 48
app      |   }
app      | }
app      | Got message with ID: 1675377160050-0 {
app      |   "hello": "world",
app      |   "date": "2023-02-02T22:32:40.050Z",
app      |   "nestedObj": {
app      |     "num": 50
app      |   }
app      | }
...
Enter fullscreen mode Exit fullscreen mode

Refactor for a single message

Since we are using the generator to handle stream messages, let's refactor our single message fetch example.

// app.service.ts

// --snip--
  public async getSingleNewMessage() {
    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      1,
    );
    const messageObj = await generator.next();
    if (!messageObj.done && messageObj.value) {
      return this.parseMessage(messageObj.value.message);
    }
  }
// --snip--
Enter fullscreen mode Exit fullscreen mode

Same as for all examples, we are creating a generator on our EXAMPLE_STREAM_NAME stream, this time with count 1.

    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      1,
    );
Enter fullscreen mode Exit fullscreen mode

Since we need only a single message, we do not need to use a loop. Instead, we call next method manually:

    const messageObj = await generator.next();
Enter fullscreen mode Exit fullscreen mode

and then handle the response - if the generator has not returned/ended - it's not done and we have a message t return, we parse message and return it to the client:

    if (!messageObj.done && messageObj.value) {
      return this.parseMessage(messageObj.value.message);
    }
Enter fullscreen mode Exit fullscreen mode

Everything else stays the same for the client, and the same result is returned.

This is the end of part 2 of the 3-part series.
We built on what we created in part 1. As a result, you should have an application that can fetch single or multiple responses and continuously fetch new results from the Redis stream.

In part 3, we are going to look at how we can use consumer groups to divide messages to multiple consumers so that each one gets a unique set of messages (not like we have now, where if we connected more clients, all clients would get getting all of the messages).

Top comments (1)

Collapse
 
alacore profile image
alacore

I'd like to extend my gratitude to the author for providing such valuable insights within the article.

Although the information was incredibly useful, I encountered an issue while attempting to implement the Redis module as described in the article. I imported Redis Module into 2 separate modules in my NestJS App. In each module, I set up a distinct consumer service, injected StreamHandlerService to it, and implemented continuousReadMessages() method, which in its turn, read particular Redis stream continuously (each consumer read data from a separate stream).
However, things get tricky when both consumers are active, as I encounter issues. Messages seem to go missing, some aren't processed at all. Interestingly, when I run only one consumer (no matter which one), everything works perfectly. Could you please advise if I have missed or misunderstood something?