<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Krisjanis Kallings</title>
    <description>The latest articles on DEV Community by Krisjanis Kallings (@magickriss).</description>
    <link>https://dev.to/magickriss</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F997766%2F8a51b709-c87a-4094-aed3-7d467bec541d.png</url>
      <title>DEV Community: Krisjanis Kallings</title>
      <link>https://dev.to/magickriss</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/magickriss"/>
    <language>en</language>
    <item>
      <title>Redis Streams + NestJS: Part 3 | Consumer groups</title>
      <dc:creator>Krisjanis Kallings</dc:creator>
      <pubDate>Wed, 22 Feb 2023 17:00:06 +0000</pubDate>
      <link>https://dev.to/magickriss/redis-streams-nestjs-part-3-consumer-groups-3am7</link>
      <guid>https://dev.to/magickriss/redis-streams-nestjs-part-3-consumer-groups-3am7</guid>
      <description>&lt;h2&gt;
  
  
  Intro
&lt;/h2&gt;

&lt;p&gt;This is part 3 of a 3-part series, where we will explore how to use Redis streams with NestJS.&lt;/p&gt;

&lt;p&gt;It is structured in 3 parts:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-1-setup-1jcj"&gt;Part 1&lt;/a&gt; - Setting up the NestJS application and connecting to Redis&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-2-reading-from-stream-1643"&gt;Part 2&lt;/a&gt; - Populating Redis streams and reading from in &lt;strong&gt;fan-out mode&lt;/strong&gt;
&lt;/li&gt;
&lt;li&gt;Part 3 - Using &lt;strong&gt;consumer groups&lt;/strong&gt; to handle one stream from multiple actors in a way that one message is sent to and processed only by a single actor (consumer)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By the end of this series, you will have the knowledge and tools necessary to create your own NestJS app that utilizes Redis streams to handle real-time data.&lt;/p&gt;

&lt;p&gt;Full code is available on &lt;a href="https://github.com/MagicKriss/Redis-Streams-NestJS-Example" rel="noopener noreferrer"&gt;the github&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  What we have
&lt;/h2&gt;

&lt;p&gt;In the previous parts, we created a NestJS application and connected it to the Redis server. Then, we implemented functionality for adding messages to the Redis stream and reading from it in a fan-out fashion. Finally, we looked at how we could use the Redis stream data structure for real-time data stream processing by continuously fetching messages from the Redis stream.&lt;/p&gt;

&lt;p&gt;In this post, we will build upon this idea by using consumer groups, thus adding the ability for distributed data processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Understanding consumer groups
&lt;/h2&gt;

&lt;p&gt;In Redis, consumer groups are a way to distribute messages or tasks across multiple consumers while ensuring that each message is processed only once.&lt;br&gt;
When you have many messages to process, a single consumer might need help to keep up with the incoming message rate. You can create multiple consumer instances to share the workload in this scenario.&lt;/p&gt;

&lt;p&gt;Redis consumer groups enable you to distribute messages to multiple consumers in a coordinated, fault-tolerant way.&lt;br&gt;
Each consumer in the group is assigned a unique name and a subset of the stream's messages to process. Redis uses a mechanism called "message acknowledgment" to ensure that each message is processed only once, even if there are multiple consumers in the group.&lt;/p&gt;
&lt;h2&gt;
  
  
  Using consumer groups
&lt;/h2&gt;

&lt;p&gt;We used the &lt;code&gt;XREAD&lt;/code&gt; command to fetch data from a stream. If we want to use consumer groups, we will need to use a different command - &lt;code&gt;XREADGROUP&lt;/code&gt;. You can find general info on &lt;a href="https://redis.io/commands/xreadgroup/" rel="noopener noreferrer"&gt;Redis Docs&lt;/a&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  Reading as a consumer
&lt;/h3&gt;

&lt;p&gt;Let's create a method in &lt;code&gt;RedisService&lt;/code&gt; for reading data in a consumer group manner:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

 &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;readConsumerGroup&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
    &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="nx"&gt;CosnumeStreamParams&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;RedisStreamMessage&lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="na"&gt;response&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;RedsXReadGroupResponse&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xReadGroup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="nf"&gt;commandOptions&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="na"&gt;isolated&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt; &lt;span class="p"&gt;}),&lt;/span&gt; &lt;span class="c1"&gt;// uses new connection from pool not to block other redis calls&lt;/span&gt;
        &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;&amp;gt;&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;BLOCK&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nx"&gt;ClientClosedError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...RECONNECTING`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connectToRedis&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;includes&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;NOGROUP&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...CREATING GROUP`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;createConsumerGroup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s2"&gt;`Failed to xReadGroup from Redis stream: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;

      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;?.[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]?.&lt;/span&gt;&lt;span class="nx"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="c1"&gt;// returning first stream (since only 1 stream used)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We have introduced a new interface for params &lt;code&gt;CosnumeStreamParams&lt;/code&gt;. Since this shares many similarities with &lt;code&gt;ReadStreamParams&lt;/code&gt;, we extracted those to a base interface, &lt;code&gt;StreamParamsBase&lt;/code&gt;, and extended them from both of these interfaces.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// interfaces.ts&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;StreamParamsBase&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="cm"&gt;/** Name of stream to read from */&lt;/span&gt;
  &lt;span class="nl"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="cm"&gt;/** Max time in ms for how long to block Redis connection before returning
   * If 0 is passed, it will block until at least one message is fetched, or timeout happens
   * */&lt;/span&gt;
  &lt;span class="nl"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="cm"&gt;/** Max how many messages to fetch at a time from Redis */&lt;/span&gt;
  &lt;span class="nl"&gt;count&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;ReadStreamParams&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nx"&gt;StreamParamsBase&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="cm"&gt;/** ID of last fetched message */&lt;/span&gt;
  &lt;span class="nl"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;CosnumeStreamParams&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nx"&gt;StreamParamsBase&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="cm"&gt;/** Name of consumer group */&lt;/span&gt;
  &lt;span class="nl"&gt;group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="cm"&gt;/** Name of consumer, must be unique within group */&lt;/span&gt;
  &lt;span class="nl"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The resulting stream message - &lt;code&gt;RedisStreamMessage&lt;/code&gt; stays the same as we had for reading the stream.&lt;br&gt;
Similarly, as we did for &lt;code&gt;XREAD&lt;/code&gt;, we have extracted a response type &lt;code&gt;RedsXReadGroupResponse&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis-client.type.ts&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;type&lt;/span&gt; &lt;span class="nx"&gt;RedsXReadGroupResponse&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;Awaited&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;
  &lt;span class="nb"&gt;ReturnType&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;RedisClient&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;xReadGroup&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The code should be straight forward:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;

    &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;RedsXReadGroupResponse&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xReadGroup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="nf"&gt;commandOptions&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="na"&gt;isolated&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt; &lt;span class="p"&gt;}),&lt;/span&gt; &lt;span class="c1"&gt;// uses new connection from pool not to block other redis calls&lt;/span&gt;
        &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;&amp;gt;&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;BLOCK&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We call &lt;code&gt;xReadGroup&lt;/code&gt; method of &lt;code&gt;node-redis&lt;/code&gt; client, and wrap it in try-catch to handle any error we might get from &lt;code&gt;RedisClient&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Same as with xRead, when using blocking commands, we want to utilize connection pool  and execute these commands in isolation via &lt;code&gt;commandOptions({ isolated: true })&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Then we pass the name of the consumer &lt;code&gt;group&lt;/code&gt; this consumer belongs to and is sharing messages with.&lt;br&gt;
Next, we need to identify the &lt;code&gt;consumer&lt;/code&gt; - entity reading this message.&lt;/p&gt;

&lt;p&gt;Next, we specify from which stream we want to fetch a stream by &lt;code&gt;key&lt;/code&gt; parameter and starting from which message ID. &lt;code&gt;&amp;gt;&lt;/code&gt; is a special symbol meaning we want to fetch only messages that were never delivered to any other consumer. If we used 0 or any other valid ID, we would risk also getting messages that have been given to other users but have yet to be acknowledged.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;NOTE!&lt;/strong&gt;&lt;br&gt;
Here, we are specifying only 1 stream. However, we could read from multiple streams by adding an array of objects containing &lt;code&gt;key&lt;/code&gt; and &lt;code&gt;id&lt;/code&gt; values.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Finally, we set read options. &lt;code&gt;BLOCK&lt;/code&gt; for max blocking time in ms and &lt;code&gt;COUNT&lt;/code&gt; for the max count of messages. These work exactly the same as with &lt;code&gt;XREAD&lt;/code&gt; option.&lt;/p&gt;

&lt;p&gt;Next we have some error handling to do:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
   &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nx"&gt;ClientClosedError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...RECONNECTING`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connectToRedis&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;includes&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;NOGROUP&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...CREATING GROUP`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;createConsumerGroup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s2"&gt;`Failed to xReadGroup from Redis stream: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;

      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As with other Redis calls, we check for a closed connection - &lt;code&gt;ClientClosedError&lt;/code&gt; and try to reconnect in this case.&lt;br&gt;
Then We check for an error with a message including &lt;code&gt;NOGROUP&lt;/code&gt;&lt;br&gt;
The full error might look similar to this - &lt;code&gt;NOGROUP No such key 'example-stream' or consumer group 'example-group' in XREADGROUP with GROUP option&lt;/code&gt;, but if we see &lt;code&gt;NOGROUP&lt;/code&gt;, we know that this stream does not have the group we want our consumer to be part of. &lt;br&gt;
So we need to create this group.:&lt;br&gt;
&lt;code&gt;await this.createConsumerGroup(streamName, group);&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

  &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;createConsumerGroup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xGroupCreate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// use 0 to create group from the beginning of the stream, use '$' to create group from the end of the stream&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="na"&gt;MKSTREAM&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;includes&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;BUSYGROUP&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Consumer group already exists&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nx"&gt;ClientClosedError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...RECONNECTING`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connectToRedis&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`Failed to xGroupCreate: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We are calling &lt;code&gt;XGROUP CREATE&lt;/code&gt; command providing &lt;code&gt;streamName&lt;/code&gt;, the &lt;code&gt;group&lt;/code&gt; name we want to create.&lt;br&gt;
We can also choose the start position where we want to start consuming messages. Here it's set to &lt;code&gt;0&lt;/code&gt;, which means the latest message in the stream. It could be any valid ID or a special character &lt;code&gt;$&lt;/code&gt; - this would start consuming messages added only after group creation.&lt;/p&gt;

&lt;p&gt;We also set &lt;code&gt;MKSTREAM&lt;/code&gt; option to true, so in case we don't have this stream yet for some reason, e.g., no messages have been added, Redis gets restarted/switched, or in case of bad memory management, the stream has evicted, we will create a new stream with this name (as well as add a consumer group).&lt;/p&gt;

&lt;p&gt;We added error handling for &lt;code&gt;BUSYGROUP&lt;/code&gt; - meaning that this group (and stream) exists after all ( e.g., in case it has been just created by another group consumer)&lt;br&gt;
The rest of the error handing stays the same as for other Redis calls.&lt;/p&gt;

&lt;p&gt;Finally, jumping back to &lt;code&gt;readConsumerGroup&lt;/code&gt;, after we have fetched messages and done our error handling, we extract the first stream's messages (since we added only one &lt;code&gt;key&lt;/code&gt; and &lt;code&gt;id&lt;/code&gt; object)  and return them.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;?.[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]?.&lt;/span&gt;&lt;span class="nx"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="c1"&gt;// returning first stream (since only 1 stream used)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Message acknowledgement
&lt;/h3&gt;

&lt;p&gt;Consumer groups is a great way to distribute data on stream to different consumers only once per group. But how to guarantee that a message sent out by the Redis server was actually received and processed by the consumer it was sent to? The answer is "acknowledgment".&lt;/p&gt;

&lt;p&gt;When a message is sent to a consumer, as a side effect, Redis puts the message ID and that consumer in the Pending Entries List (PEL) of that stream consumer group.  &lt;/p&gt;

&lt;p&gt;The consumer must send an acknowledgment (also known as an "ack") to the Redis server to signal that it has successfully processed the message. This acknowledgment is sent using the &lt;code&gt;XACK&lt;/code&gt; command, which takes the name of the consumer group, the stream's name, and the IDs of the messages that were processed.&lt;/p&gt;

&lt;p&gt;Once the Redis server receives an acknowledgment from a consumer, it updates its internal tracking data to mark the message as processed by that consumer. The server also removes the message from the PEL for that consumer.&lt;/p&gt;

&lt;p&gt;If a consumer fails to send an acknowledgment within a configurable period, the message is considered unacknowledged. The Redis server will then deliver the message again to another consumer in the same consumer group. This ensures that messages are not lost if a consumer fails or crashes before it can process a message. &lt;/p&gt;

&lt;h4&gt;
  
  
  Implementing acknowledgement
&lt;/h4&gt;

&lt;p&gt;Let's handle acknowledgement using the &lt;code&gt;xAck&lt;/code&gt; method:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;acknowledgeMessages&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
    &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;messageIds&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="nx"&gt;AcknowledgeMessageParams&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xAck&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;messageIds&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nx"&gt;ClientClosedError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...RECONNECTING`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connectToRedis&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`Failed to xAck from Redis stream: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For this we have create yet another interface - &lt;code&gt;AcknowledgeMessageParams&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// interfaces.ts&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;AcknowledgeMessageParams&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="cm"&gt;/** Name of stream to acknowledge message in */&lt;/span&gt;
  &lt;span class="nl"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="cm"&gt;/** Name of consumer group */&lt;/span&gt;
  &lt;span class="nl"&gt;group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="cm"&gt;/** ID of messages to acknowledge */&lt;/span&gt;
  &lt;span class="nl"&gt;messageIds&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;[];&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;XACK&lt;/code&gt; takes single or multiple message IDs, so we can batch multiple acks in one call.&lt;/p&gt;

&lt;p&gt;The rest is straightforward - we send the command and handle errors as we do for other Redis methods.&lt;/p&gt;

&lt;h3&gt;
  
  
  (Auto)Claiming unacknowledged messages
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;XAUTOCLAIM&lt;/code&gt; is a command in Redis that automates the process of claiming unacknowledged messages from a consumer group. It is similar to the &lt;code&gt;XPENDING&lt;/code&gt; and &lt;code&gt;XCLAIM&lt;/code&gt; commands, but it simplifies the process by automatically allowing Redis to claim unacknowledged messages on behalf of a consumer.&lt;/p&gt;

&lt;p&gt;A consumer sends a &lt;code&gt;XAUTOCLAIM&lt;/code&gt; command with the consumer group's name, the consumer's name, and the name of the stream to consume from. Redis returns unacknowledged messages for the consumer to claim, which can be processed and acknowledged like a message received from &lt;code&gt;XREADGROUP&lt;/code&gt;. Another consumer can claim unacknowledged messages if the acknowledgment window expires. On subsequent &lt;code&gt;XAUTOCLAIM&lt;/code&gt; commands, Redis only returns unacknowledged messages that have not been previously returned to the consumer and have yet reached the maximum delivery attempts.&lt;/p&gt;

&lt;h4&gt;
  
  
  Implementing &lt;code&gt;xAutoClaim&lt;/code&gt;
&lt;/h4&gt;

&lt;p&gt;Here the code is almost the same as we had for &lt;code&gt;xReadGroup&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;autoClaimMessage&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
    &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;minIdleTimeMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="nx"&gt;AutoclaimMessageParams&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;RedsXAutoClaimResponse&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xAutoClaim&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;minIdleTimeMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0-0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// use 0-0 to claim all messages. In case of multiple consumers, this will be used to claim messages from other consumers&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="na"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nx"&gt;ClientClosedError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...RECONNECTING`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connectToRedis&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`Failed to xAutoClaim from Redis stream: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;?.&lt;/span&gt;&lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We have created a params interface &lt;code&gt;AutoclaimMessageParams&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// interfaces.ts&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;AutoclaimMessageParams&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nl"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;minIdleTimeMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;count&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here the distinct params is &lt;code&gt;minIdleTimeMs&lt;/code&gt; - this is a minimum idle time in ms for message to be eligible for auto-claim.&lt;/p&gt;

&lt;h3&gt;
  
  
  Utilizing generators to consume messages
&lt;/h3&gt;

&lt;p&gt;We have the functionality to consume, ack, and even re-claim messages. Let's add all of that together and abstract all of that logic inside a generator so that the &lt;code&gt;StreamHandlerService&lt;/code&gt; clients don't need to worry about all of these mechanisms.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// stream-handler.service&lt;/span&gt;

  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="nf"&gt;getConsumerMessageGenerator&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
    &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;autoClaimMinIdleTimeMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;autoAck&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="nx"&gt;ReadConsumerGroupParams&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt; &lt;span class="nx"&gt;AsyncRedisStreamGenerator&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;fetchNewMessages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="c1"&gt;// Toggle for switching between fetching new messages and auto claiming messages&lt;/span&gt;
    &lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isAlive&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;RedisStreamMessage&lt;/span&gt;&lt;span class="p"&gt;[];&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;fetchNewMessages&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;readConsumerGroup&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
          &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// 0 = infinite blocking until at least one message is fetched, or timeout happens&lt;/span&gt;
          &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Try to auto claim messages that are idle for a certain amount of time&lt;/span&gt;
        &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;autoClaimMessage&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
          &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;minIdleTimeMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nx"&gt;autoClaimMinIdleTimeMs&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nx"&gt;StreamHandlerService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DEFAULT_IDLE_TIME_MS&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;

      &lt;span class="c1"&gt;// Acknowledge messages if autoAck is enabled&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;autoAck&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;?.&lt;/span&gt;&lt;span class="nx"&gt;length&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;acknowledgeMessages&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
          &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;messageIds&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;m&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;m&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;

      &lt;span class="c1"&gt;// Toggle between fetching new messages and auto claiming messages&lt;/span&gt;
      &lt;span class="nx"&gt;fetchNewMessages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="nx"&gt;fetchNewMessages&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

      &lt;span class="c1"&gt;// If no messages returned, continue to next iteration without yielding&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;length&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;continue&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="k"&gt;for &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt; &lt;span class="k"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here's the new params interface &lt;code&gt;ReadConsumerGroupParams&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// interfaces.ts&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;ReadConsumerGroupParams&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nl"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;count&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;autoClaimMinIdleTimeMs&lt;/span&gt;&lt;span class="p"&gt;?:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;autoAck&lt;/span&gt;&lt;span class="p"&gt;?:&lt;/span&gt; &lt;span class="nx"&gt;boolean&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We will expect &lt;code&gt;streamName&lt;/code&gt;, consumer &lt;code&gt;group&lt;/code&gt; name, and a unique &lt;code&gt;consumer&lt;/code&gt; name. A &lt;code&gt;count&lt;/code&gt; of how many messages to try to fetch in one call, as well as &lt;br&gt;
&lt;code&gt;autoClaimMinIdleTimeMs&lt;/code&gt; - minimum idle time in ms for the message to be eligible for auto-claim, re-claim from another consumer, if not acknowledged.&lt;/p&gt;

&lt;p&gt;And finally, &lt;code&gt;autoAck&lt;/code&gt; - Should messages be acknowledged automatically after being read. If set to false, the client must manually acknowledge messages using &lt;code&gt;acknowledgeMessage&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;We will try to alternate between fetching new messages and auto claiming forgotten ones. For this a variable &lt;code&gt;fetchNewMessages&lt;/code&gt; is introduced.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
    &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;fetchNewMessages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isAlive&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As for our fan-out generator, we will use a while loop that will become invalid when the module is destroyed.&lt;/p&gt;

&lt;p&gt;Then we alternate between new messages and auto-claim and assigning the result to a &lt;code&gt;response&lt;/code&gt; variable.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
      &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;RedisStreamMessage&lt;/span&gt;&lt;span class="p"&gt;[];&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;fetchNewMessages&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;readConsumerGroup&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
          &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// 0 = infinite blocking until at least one message is fetched, or timeout happens&lt;/span&gt;
          &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Try to auto claim messages that are idle for a certain amount of time&lt;/span&gt;
        &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;autoClaimMessage&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
          &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;minIdleTimeMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nx"&gt;autoClaimMinIdleTimeMs&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nx"&gt;StreamHandlerService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;DEFAULT_IDLE_TIME_MS&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, we'll handle the auto-ack functionality. This will allow us to remember to ack message after processing. &lt;br&gt;
In production applications, it might or might not make sense. For example, if you need to ensure that a message has been received and successfully processed, you will need to do ack manually after processing.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;autoAck&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;?.&lt;/span&gt;&lt;span class="nx"&gt;length&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;acknowledgeMessages&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
          &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;messageIds&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;m&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;m&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we want to toggle the "fetching mode"&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
      &lt;span class="nx"&gt;fetchNewMessages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="nx"&gt;fetchNewMessages&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And finally, only if there are any messages, we want to &lt;code&gt;yield&lt;/code&gt; them:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;length&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;continue&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="k"&gt;for &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt; &lt;span class="k"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Consuming messages
&lt;/h2&gt;

&lt;p&gt;The hard part is over. Now we can happily read the Redis stream via consumer groups.&lt;br&gt;
Let's do that from our &lt;code&gt;AppService&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// app.service.ts

  public async consumeMessageFromGroup(
    group: string,
    consumer: string,
    count: number,
  ) {
    const generator = this.streamService.getConsumerMessageGenerator({
      streamName: EXAMPLE_STREAM_NAME,
      group,
      consumer,
      count,
    });
    const messages: Record&amp;lt;string, string&amp;gt;[] = [];
    let counter = 0;
    for await (const messageObj of generator) {
      messages.push(this.parseMessage(messageObj.message));
      counter++;
      if (counter &amp;gt;= count) {
        break;
      }
    }
    return {
      group,
      consumer,
      messages,
    };
  }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The code is almost the same as for multiple new messages reads in a fan-out mode that we create in &lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-2-reading-from-stream-1643"&gt;part 2&lt;/a&gt;. The only addition is &lt;code&gt;group&lt;/code&gt; and &lt;code&gt;consumer&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Let's also add an simple endpoint from which we can consume messages and specify the group, consumer name and count:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.controller&lt;/span&gt;

  &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;consume/:group/:consumer/:count&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="nf"&gt;consumeMessages&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Param&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;group&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Param&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;consumer&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Param&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;count&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;appService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;consumeMessageFromGroup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;group&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this case, we won't create multiple instances of our service, but it would work the same. &lt;br&gt;
Here is an example:&lt;/p&gt;

&lt;p&gt;If we open our service with &lt;code&gt;http://localhost:8081/consume/example-group/example-consumer/3&lt;/code&gt;, we get messages from the stream beginning for a group named &lt;code&gt;example-group&lt;/code&gt; and for a consumer named &lt;code&gt;example-consumer&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcdnjrppres5vrbgcsslh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcdnjrppres5vrbgcsslh.png" alt="3 consumer group messages for example-consumer in example-group in browser" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If we try a different consumer, e.g., &lt;code&gt;example-consumer-2&lt;/code&gt; we will get new messages that are not pending and have not been ack by any of the consumers yet.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fthhlol6gzhosjnl8n2zr.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fthhlol6gzhosjnl8n2zr.png" alt="Different 3 consumer group messages for example-consumer-2 in example-group in browser" width="690" height="520"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, if we try to fetch messages from a different group, they will be fetched from the very beginning, no matter if another group or even the same consumer in another group has acknowledged them already.&lt;/p&gt;

&lt;p&gt;Here is an example with &lt;code&gt;example-consumer&lt;/code&gt; consuming messages from the same stream, but on &lt;code&gt;example-group-2&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft53jjwb1gw3zdr295r59.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft53jjwb1gw3zdr295r59.png" alt="Same 3 consumer group messages for example-consumer in example-group-2 in browser" width="800" height="400"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Notice that these messages are the same as first fetched by &lt;code&gt;example-group&lt;/code&gt;. That is because they were consumed and acknowledged by that group only. Different groups do not share the PEL.&lt;/p&gt;

&lt;h2&gt;
  
  
  That's all, floks!
&lt;/h2&gt;

&lt;p&gt;Over this 3-part article series, we have successfully built a NestJS application that connects to a Redis server and utilizes the Redis stream data structure. &lt;br&gt;
In &lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-1-setup-1jcj"&gt;part 1&lt;/a&gt;, we set up the Redis server, installed the Redis client library, and established a connection between the NestJS application and Redis. &lt;/p&gt;

&lt;p&gt;In &lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-2-reading-from-stream-1643"&gt;part 2&lt;/a&gt;, we explored the basics of Redis streams and learned how to use the Redis client to add messages to the stream. We also implemented fan-out mode, which allows multiple clients to consume messages from a single stream.&lt;/p&gt;

&lt;p&gt;In part 3, we took our implementation to the next level by using Redis consumer groups. We learned how to create a consumer group and use it to consume messages in a more scalable and reliable way.&lt;/p&gt;

&lt;p&gt;We also implemented some error handling to ensure our application can recover from errors and continue processing messages without interruption.&lt;/p&gt;

&lt;p&gt;Overall, this article series has provided a comprehensive guide to building a NestJS application that utilizes Redis streams and consumer groups. By following along and experimenting with the code samples, you should now understand how to implement real-time systems that can easily handle large amounts of data, thanks to the power and flexibility of Redis and NestJS.&lt;/p&gt;

</description>
      <category>website</category>
      <category>webdev</category>
      <category>welcome</category>
    </item>
    <item>
      <title>Redis Streams + NestJS: Part 2 | Reading from Stream</title>
      <dc:creator>Krisjanis Kallings</dc:creator>
      <pubDate>Wed, 22 Feb 2023 16:58:57 +0000</pubDate>
      <link>https://dev.to/magickriss/redis-streams-nestjs-part-2-reading-from-stream-1643</link>
      <guid>https://dev.to/magickriss/redis-streams-nestjs-part-2-reading-from-stream-1643</guid>
      <description>&lt;h2&gt;
  
  
  Intro
&lt;/h2&gt;

&lt;p&gt;This is part 2 of a 3-part series, where we will explore how to use Redis streams with NestJS.&lt;/p&gt;

&lt;p&gt;It is structured in 3 parts:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-1-setup-1jcj"&gt;Part 1&lt;/a&gt; - Setting up the NestJS application and connecting to Redis&lt;/li&gt;
&lt;li&gt;Part 2 - Populating Redis streams and reading from in &lt;strong&gt;fan-out mode&lt;/strong&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-3-consumer-groups-3am7"&gt;Part 3&lt;/a&gt; - Using &lt;strong&gt;consumer groups&lt;/strong&gt; to handle one stream from multiple actors in a way that one message is sent to and processed only by a single actor (consumer)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By the end of this series, you will have the knowledge and tools necessary to create your own NestJS app that utilizes Redis streams to handle real-time data.&lt;/p&gt;

&lt;p&gt;Full code is available on &lt;a href="https://github.com/MagicKriss/Redis-Streams-NestJS-Example" rel="noopener noreferrer"&gt;the github&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  What we have
&lt;/h2&gt;

&lt;p&gt;In &lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-1-setup-1jcj"&gt;part 1&lt;/a&gt;, we have created a working NestJS application that can connect to the Redis server. We have also created an endpoint &lt;code&gt;/ping-redis&lt;/code&gt; for our application that will call a ping command on the Redis server and return its response.&lt;br&gt;
For our development convenience, we run our application and Redis server in a docker. We are using Docker Compose to set up our &lt;code&gt;docker-copose.yml&lt;/code&gt; file for simplicity.&lt;/p&gt;

&lt;p&gt;Now we are ready to focus on the main topic - using Redis streams.&lt;/p&gt;
&lt;h2&gt;
  
  
  What are Redis streams
&lt;/h2&gt;

&lt;p&gt;Before we start with our implementation, it's essential to understand what Redis streams are and clear some misconceptions that might have arisen about what Redis streams are not.&lt;/p&gt;
&lt;h3&gt;
  
  
  Data structure
&lt;/h3&gt;

&lt;p&gt;At its core, Redis streams are a data structure that holds data - messages, in order by the message IDs.&lt;br&gt;
By default, this ID is a timestamp on when this data was added to the stream (followed by sequence number, in case multiple entries happened at the same timestamp).&lt;/p&gt;
&lt;h4&gt;
  
  
  Not a list
&lt;/h4&gt;

&lt;p&gt;This all sounds very similar to a list. However, the difference here is that while a list can be popped, streams can't. This is because reading the stream does not mutate it. So when calling &lt;code&gt;LPOP&lt;/code&gt; or &lt;code&gt;RPOP&lt;/code&gt; on a list, the element returned is removed. In streams, &lt;code&gt;XREAD&lt;/code&gt; leaves the item in the stream and can be read multiple times.&lt;/p&gt;
&lt;h4&gt;
  
  
  Not a socket - there is no "subscribe"
&lt;/h4&gt;

&lt;p&gt;Just by the name "stream," it's easy to think that it's a continuous "stream" of data - something that you would connect to and send you new data once it's appended.&lt;br&gt;
It's a stream in the sense that the data represents some direction - older data with smaller IDs and newer data with larger IDs.&lt;br&gt;
But to access it, we will need to read from it manually, just like with other more superficial data structures, like sets or lists. Once a request has been executed and data is sent from the Redis to the application, the command has been completed, and no new data will be received unless it is called again.&lt;br&gt;
So it would also be wrong to think about streams in the form of Pub-Sub. With Pub-sub, once you subscribe to channels, the connection is blocked, and all new messages published will be delivered. No need to re-subscribe.&lt;/p&gt;
&lt;h3&gt;
  
  
  Basic functionality
&lt;/h3&gt;

&lt;p&gt;We mostly add to stream (&lt;code&gt;XADD&lt;/code&gt;) and read from it (&lt;code&gt;XREAD&lt;/code&gt;, &lt;code&gt;XRANGE&lt;/code&gt; &lt;code&gt;XREADGROUP&lt;/code&gt;). There is also functionality for deletion (&lt;code&gt;XDEL&lt;/code&gt;), but in most cases, we think of the stream as append-only.&lt;/p&gt;
&lt;h3&gt;
  
  
  Managing memory
&lt;/h3&gt;

&lt;p&gt;To avoid using up all of the space and evicting data randomly, the length of streams is managed by trimming them. It can be done manually via &lt;code&gt;XTRIM&lt;/code&gt;, or we can move the responsibility to the Redis by using capped streams - adding &lt;code&gt;MAXLEN&lt;/code&gt; option when using &lt;code&gt;XADD&lt;/code&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  Adding to streams
&lt;/h2&gt;

&lt;p&gt;Finally, let's get back into implementing streams in our NestJS application.&lt;br&gt;
Let's create a method to add a message to the stream:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;addToStream&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
    &lt;span class="nx"&gt;fieldsToStore&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="nx"&gt;AddToStreamParams&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// Converting object to record to store in redis&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messageObject&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;Object&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;entries&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;fieldsToStore&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;reduce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;typeof&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;undefined&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;key&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;typeof&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;string&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;stringify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;},&lt;/span&gt;
      &lt;span class="p"&gt;{}&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;Record&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;

    &lt;span class="c1"&gt;// Adding to stream with trimming - approximately max 100 messages&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xAdd&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;*&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;messageObject&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="na"&gt;TRIM&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="na"&gt;strategy&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;MAXLEN&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;strategyModifier&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;~&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;threshold&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;});&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you can see, we have defined the interface for the parameters of this method:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// interfaces.ts&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;AddToStreamParams&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nl"&gt;fieldsToStore&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Record&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kr"&gt;any&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We will take in the name of the stream we want to add data to and specify data as POJO named &lt;code&gt;fieldsToStore&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Next, we have a transformation code that transforms our object of any depth into key-value pairs of string type. Later, when reading data, we will parse this back to our original POJO:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messageObject&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;Object&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;entries&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;fieldsToStore&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;reduce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;typeof&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;undefined&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;key&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;typeof&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;string&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;stringify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;},&lt;/span&gt;
      &lt;span class="p"&gt;{}&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;Record&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we execute a call to Redis service with the following parameters:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xAdd&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;*&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;messageObject&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="na"&gt;TRIM&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="na"&gt;strategy&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;MAXLEN&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;strategyModifier&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;~&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;threshold&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;});&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;streamName&lt;/code&gt; - the name of our stream that we want to append to&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;*&lt;/code&gt; - asterisk specifies Redis to auto-generate a unique ID (based on timestamp). Here we could set our own ID if we want.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;messageObject&lt;/code&gt; - key-value pairs that contain the data we want to store&lt;/li&gt;
&lt;li&gt;Options:

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;TRIM&lt;/code&gt; - as mentioned before, we don't want to handle trimming manually, so we specify that we wish Redis to trim the stream by itself. This includes additional options:

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;strategy&lt;/code&gt; &lt;code&gt;MAXLEN&lt;/code&gt; - here, we specify that we want to trim when the stream has reached the maximum length of a &lt;code&gt;threshold&lt;/code&gt;. Other option would be &lt;code&gt;MINID&lt;/code&gt;, where the threshold would specify the smallest ID to keep. The rest are marked for eviction.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;strategyModifier&lt;/code&gt; &lt;code&gt;~&lt;/code&gt; - this tells Redis that we don't want to be strict in size, and Redis can go above the threshold for a little while, but the trim will eventually happen. This optimizes Redis to execute eviction at the most convenient time, not immediately.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;threshold&lt;/code&gt; &lt;code&gt;100&lt;/code&gt; - that is the max length of the stream we want. In the case of MINID, it would be the smallest ID to keep. And since we have strategy modifier &lt;code&gt;~&lt;/code&gt;, this is an approximate value.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;So we would read it like this: Trim stream if the length (MAXLEN) of the stream has reached approximately (~) 100.&lt;/p&gt;

&lt;h3&gt;
  
  
  Stream handler
&lt;/h3&gt;

&lt;p&gt;Since we will do more stuff with our stream data and we want to keep the basic functionality of Redis server calls on &lt;code&gt;RedisService&lt;/code&gt;, we will create a new service called &lt;code&gt;StreamHandlerService&lt;/code&gt; for additional handling of the streams.&lt;br&gt;
For now, we will just add "pass-through" method &lt;code&gt;addToStream&lt;/code&gt; that will call &lt;code&gt;RedisService&lt;/code&gt; &lt;code&gt;addToStream&lt;/code&gt; method.&lt;br&gt;
To generate it, we'll call: &lt;code&gt;nest g service redis/stream-handler&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// stream-handler.service.ts&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;Injectable&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;@nestjs/common&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;RedisService&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./redis.service&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Injectable&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;StreamHandlerService&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

&lt;span class="nf"&gt;constructor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;RedisService&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;

  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;addToStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;fieldsToStore&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Record&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kr"&gt;any&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;addToStream&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="nx"&gt;fieldsToStore&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;streamName&lt;/span&gt; &lt;span class="p"&gt;});&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And since we don't want to expose our &lt;code&gt;RedisService&lt;/code&gt; to other modules and will want them to interact with &lt;code&gt;StreamHandlerService&lt;/code&gt;, we will move the ping call functionality to it and remove export from module.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// stream-handler.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;ping&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ping&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To use it in other modules, we need to export it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.module.ts&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;Module&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;@nestjs/common&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;redisClientFactory&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./redis-client.factory&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;RedisService&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./redis.service&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;StreamHandlerService&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./stream-handler.service&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Module&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
  &lt;span class="na"&gt;providers&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;redisClientFactory&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;RedisService&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;StreamHandlerService&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
  &lt;span class="na"&gt;exports&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;StreamHandlerService&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="c1"&gt;// Removed RedisService from exports&lt;/span&gt;
&lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisModule&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And in &lt;code&gt;AppService&lt;/code&gt;, we are going to call ping from &lt;code&gt;StreamHandlerService&lt;/code&gt; instead:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="nf"&gt;constructor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;StreamHandlerService&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt; &lt;span class="c1"&gt;// changed from RedisService to StreamHandlerService&lt;/span&gt;

  &lt;span class="nf"&gt;redisPing&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ping&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt; &lt;span class="c1"&gt;// changed here as well&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;In theory, other modules should not care whether it is Redis, RabbitMQ, Kafka, or even something else we are calling. . It should only know that it is a stream, and we get the specific data by calling methods made public on exported services. &lt;br&gt;
This way, we can easily change our implementation of streams without needing to change other parts of code - they become de-coupled.&lt;br&gt;
Since this is a project about Redis and Redis streams, we have called this module &lt;code&gt;RedisModule&lt;/code&gt;, but expanding on the previous taught, IRL it would make more sense to call it &lt;code&gt;StreamModule&lt;/code&gt; or &lt;code&gt;MessagingModule&lt;/code&gt;, or something else, depending on what these streams are going to be used for.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Populating Redis with messages
&lt;/h3&gt;

&lt;p&gt;Let's use our &lt;code&gt;addToStream&lt;/code&gt; method to populate our Redis.&lt;br&gt;
We could go the same route as with ping and expose it to our API endpoint, but this time, since we want to "simulate" data flow, we'll call this method in an interval so that we have an ongoing flow of data in our stream.&lt;br&gt;
Let's add it to our &lt;code&gt;AppService&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;

&lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Injectable&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;AppService&lt;/span&gt; &lt;span class="k"&gt;implements&lt;/span&gt; &lt;span class="nx"&gt;OnModuleInit&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;OnModuleDestroy&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="nx"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;NodeJS&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;Timeout&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;

  &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="nf"&gt;populateStream&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;interval&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;setInterval&lt;/span&gt;&lt;span class="p"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;addToStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
          &lt;span class="na"&gt;hello&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;world&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;date&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Date&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
          &lt;span class="na"&gt;nestedObj&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;num&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;onModuleInit&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;populateStream&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="nf"&gt;onModuleDestroy&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nf"&gt;clearInterval&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;interval&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you can see, we have created a &lt;code&gt;populateStream&lt;/code&gt; method that will add some dummy data into our stream.&lt;br&gt;
&lt;code&gt;EXAMPLE_STREAM_NAME&lt;/code&gt; is just a constant of our stream name we have defined in &lt;code&gt;constants.ts&lt;/code&gt; file and imported it here:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// constants.ts&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;example-stream&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We created an interval that we call on &lt;code&gt;onModuleInit&lt;/code&gt; lifestyle event, and we clear it in our &lt;code&gt;onModuleDestroy&lt;/code&gt; lifecycle event so that we don't leave dangling references.&lt;/p&gt;

&lt;p&gt;This will add a message to our Redis stream on every &lt;code&gt;1000&lt;/code&gt; ms.&lt;br&gt;
To verify that, let's now create a way to read from stream.&lt;/p&gt;
&lt;h2&gt;
  
  
  Reading from stream
&lt;/h2&gt;

&lt;p&gt;To read from Redis streams, we can use either &lt;code&gt;XREAD&lt;/code&gt; or &lt;code&gt;XRANGE&lt;/code&gt; (and &lt;code&gt;REVXRANGE&lt;/code&gt;) commands. Since the goal of this project is to read new messages, both commands could be used to achieve this goal. We are going to use &lt;code&gt;XREAD&lt;/code&gt; in this article.&lt;br&gt;
Let's add a method that wraps &lt;code&gt;XREAD&lt;/code&gt; command call and does some extra error handling for us:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;

  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;readStream&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
    &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;}:&lt;/span&gt; &lt;span class="nx"&gt;ReadStreamParams&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt; &lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;RedisStreamMessage&lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xRead&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="nf"&gt;commandOptions&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="na"&gt;isolated&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt; &lt;span class="p"&gt;}),&lt;/span&gt; &lt;span class="c1"&gt;// uses new connection from pool not to block other redis calls&lt;/span&gt;
        &lt;span class="p"&gt;[&lt;/span&gt;
          &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;],&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;BLOCK&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;

      &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;?.[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt; &lt;span class="c1"&gt;// returning first stream (since only 1 stream used)&lt;/span&gt;

      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nx"&gt;ClientClosedError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...RECONNECTING`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connectToRedis&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s2"&gt;`Failed to xRead from Redis Stream: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's break down this code:&lt;/p&gt;

&lt;p&gt;First, notice that we have added a parameters interface &lt;code&gt;ReadStreamParams&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// interfaces.ts&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;ReadStreamParams&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nl"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="nl"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;as well as we have defined our response message &lt;code&gt;RedisStreamMessage&lt;/code&gt;. Since &lt;code&gt;node-redis&lt;/code&gt; is scarce with its type exports, we need to extract it on our own:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;

&lt;span class="c1"&gt;// redis-client.types&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;type&lt;/span&gt; &lt;span class="nx"&gt;RedisStreamMessage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;Awaited&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;
  &lt;span class="nb"&gt;ReturnType&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;RedisClient&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;xRead&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;messages&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the &lt;code&gt;try&lt;/code&gt; block, we have our command call:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
      &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;xRead&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="nf"&gt;commandOptions&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="na"&gt;isolated&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt; &lt;span class="p"&gt;}),&lt;/span&gt; &lt;span class="c1"&gt;// uses new connection from pool not to block other redis calls&lt;/span&gt;
        &lt;span class="p"&gt;[&lt;/span&gt;
          &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;],&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;BLOCK&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;

      &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;?.[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt; &lt;span class="c1"&gt;// returning first stream (since only 1 stream used)&lt;/span&gt;

      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We first use the call command option to run it in isolation &lt;code&gt;commandOptions({ isolated: true })&lt;/code&gt;.&lt;br&gt;
This is needed because we are going to &lt;code&gt;BLOCK&lt;/code&gt; the connection. This means that once this command call takes a connection to the Redis server if the results won't be on the Redis at the time the call arrives, it will wait for the specified amount of time (&lt;code&gt;BLOCK&lt;/code&gt; parameter) and, if during that period, data arrives that matches the request, it will be returned immediately, or it will wait till the end of the specified time and return nothing.&lt;br&gt;
During this time, the connection is used-up, and no other commands can be called. Essentially, our command &lt;strong&gt;BLOCKS&lt;/strong&gt; the connection.&lt;/p&gt;

&lt;p&gt;If we would like to create multiple calls, we would quickly run into delays and performance problems. For this, we can use &lt;code&gt;isolated execution&lt;/code&gt;. This means that &lt;code&gt;node-redis&lt;/code&gt; will create multiple connections using &lt;a href="https://github.com/coopernurse/node-pool" rel="noopener noreferrer"&gt;Generic resource pool&lt;/a&gt; under the hood, so we'll have multiple connections to use in parallel.&lt;/p&gt;

&lt;p&gt;Next, we say which streams we want to read and starting from what ID. We need to pass it in an array, since we could call multiple streams in a single call. We'll handling of multiple streams in one request out of the scope of this article:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;        &lt;span class="p"&gt;[&lt;/span&gt;
          &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;],&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next we pass &lt;em&gt;optional&lt;/em&gt; parameters:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;        &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;BLOCK&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;BLOCK - is blocking time in milliseconds. *&lt;em&gt;If &lt;code&gt;0&lt;/code&gt; is specified, it will block indefinitely until at least 1 message can be returned. *&lt;/em&gt;
&lt;/li&gt;
&lt;li&gt;COUNT - this is a &lt;strong&gt;maximum&lt;/strong&gt; count we will get returned. If there are fewer messages available on Redis server, it will &lt;strong&gt;not&lt;/strong&gt; wait for the count to match but return that many messages how many there are.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Next we unravel the data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;      &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;?.[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt;

      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Since we called only 1 stream, the &lt;code&gt;response&lt;/code&gt; array will always be of length 1.&lt;br&gt;
All messages returned in an &lt;code&gt;messages&lt;/code&gt; object.&lt;br&gt;
And finally, we return either the stream messages array or &lt;code&gt;null&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Here is also the place we can handle some initial errors:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// --- snip---&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nx"&gt;ClientClosedError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; ...RECONNECTING`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connectToRedis&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s2"&gt;`Failed to xRead from Redis Stream: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In case of an error, we will return &lt;code&gt;null&lt;/code&gt; value.&lt;br&gt;
If we have &lt;code&gt;ClientClosedError&lt;/code&gt;, there is no use tying to fetch anything further, so we try to reconnect to the Redis.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// --- snip---&lt;/span&gt;
  &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;connectToRedis&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="c1"&gt;// Try to reconnect only if the connection socket is closed. Else let it be handled by reconnect strategy.&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isOpen&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s2"&gt;`[&lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;name&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;] &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;error&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We check for &lt;code&gt;isOpen&lt;/code&gt; to see if the socket is open. If it is, we will trust reconnect strategy to do its job and reconnect to the client. If it's not, then we are trying to do it ourselves.&lt;/p&gt;

&lt;p&gt;And that's how we can fetch a single message from a stream. Let's expand on this and add the ability to read multiple messages.&lt;/p&gt;

&lt;h3&gt;
  
  
  Exposing read to our API
&lt;/h3&gt;

&lt;p&gt;Exposing the Redis stream to our API is similar to what we did with &lt;code&gt;ping&lt;/code&gt;.&lt;br&gt;
Let's add another method to our &lt;code&gt;StreamHandlerService&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;//stream-handler.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --- snip --&lt;/span&gt;
  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;readFromStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;readStream&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
      &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="na"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// 0 = infinite blocking until at least one message is fetched, or timeout happens&lt;/span&gt;
      &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// max how many messages to fetch at a time&lt;/span&gt;
      &lt;span class="na"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;$&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;});&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Simple method where we pass &lt;code&gt;streamName&lt;/code&gt; of the stream we want to read and &lt;code&gt;count&lt;/code&gt; of messages we would like to receive. As mentioned before, &lt;code&gt;COUNT&lt;/code&gt; actually is "max count" since, if there are less or no messages on stream, Redis will return from 1 to &lt;code&gt;count&lt;/code&gt; number of messages.&lt;br&gt;
We are using special ID &lt;code&gt;$&lt;/code&gt; - this represents "anything that comes after the &lt;code&gt;XREAD&lt;/code&gt; request.&lt;/p&gt;

&lt;p&gt;Now let's handle reading a single message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;getSingleNewMessage&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;readFromStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Again, using &lt;code&gt;EXAMPLE_STREAM_NAME&lt;/code&gt; - same stream that we populate, and setting &lt;code&gt;count&lt;/code&gt; to 1.&lt;/p&gt;

&lt;p&gt;And finally, let's add a new endpoint to our controller:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.controller.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;message&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="nf"&gt;getMessage&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;appService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getSingleNewMessage&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now calling the endpoint we should see single message generated by our &lt;code&gt;populateStream&lt;/code&gt; method:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7l88hcswlp7v209hya16.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7l88hcswlp7v209hya16.png" alt="Stream message displayed in browser" width="569" height="411"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Handling results as a stream
&lt;/h2&gt;

&lt;p&gt;Getting a single message is relatively straight forward. &lt;br&gt;
But how do we get multiple messages? What if we want to get a specific number of messages, e.g., precisely 3?&lt;br&gt;
At first, it might seem simple - just set a &lt;code&gt;count&lt;/code&gt; to the number of messages you want. But remember - &lt;code&gt;COUNT&lt;/code&gt; does not guarantee that we will get the exact number of messages. It's more of a "max count". So you might be tempted to call &lt;code&gt;XREAD&lt;/code&gt; multiple times. But, since we are passing &lt;code&gt;$&lt;/code&gt; as our last message ID, we might miss some messages that come in between our calls.&lt;br&gt;
That means that we need to remember our last id.&lt;/p&gt;

&lt;p&gt;And what if we want to read from a stream continuously and indefinitely?&lt;/p&gt;

&lt;p&gt;This all adds additional complexity that we need to handle.&lt;/p&gt;
&lt;h3&gt;
  
  
  Generators
&lt;/h3&gt;

&lt;p&gt;Generators are a valuable tool to have in our toolbox. Here they come in handy when we want to fetch the data from some external resource (Redis) lazily.&lt;br&gt;
I will not focus on the "what and how" of generators.&lt;br&gt;
If you haven't used them before, I encourage you to check &lt;a href="https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Generator" rel="noopener noreferrer"&gt;JavaScript Generators reference&lt;/a&gt; and  Dr. Axel Rauschmayer's book's "JavaScript for impatient programmers" on &lt;a href="https://exploringjs.com/impatient-js/ch_async-iteration.html#async-generators" rel="noopener noreferrer"&gt;asynchronous generators&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The main feature we can do is create an infinite loop that we can stop and restart whenever we want and need.&lt;/p&gt;

&lt;p&gt;Let's add a new method to our &lt;code&gt;StreamHandlerService&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// stream-handler.service.ts&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;StreamHandlerService&lt;/span&gt; &lt;span class="k"&gt;implements&lt;/span&gt; &lt;span class="nx"&gt;OnModuleDestroy&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

  &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="nx"&gt;isAlive&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

  &lt;span class="nf"&gt;onModuleDestroy&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isAlive&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="nf"&gt;getStreamMessageGenerator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;):&lt;/span&gt; &lt;span class="nx"&gt;AsyncRedisStreamGenerator&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// Start with latest data&lt;/span&gt;
    &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;lastMessageId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;$&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isAlive&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;readStream&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
        &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// 0 = infinite blocking until at least one message is fetched, or timeout happens&lt;/span&gt;
        &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// max how many messages to fetch at a time&lt;/span&gt;
        &lt;span class="nx"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;});&lt;/span&gt;

      &lt;span class="c1"&gt;// If no messages returned, continue to next iteration without yielding&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;length&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;continue&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="c1"&gt;// Update last message id to be the last message returned from redis&lt;/span&gt;
      &lt;span class="nx"&gt;lastMessageId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;length&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="k"&gt;for &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt; &lt;span class="k"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's break down what we have written:&lt;/p&gt;

&lt;p&gt;First, you will notice that we have added a new boolean &lt;code&gt;isAlive&lt;/code&gt; that we will initially set to &lt;code&gt;true&lt;/code&gt;, and only during the destruction of the module we will set it to &lt;code&gt;false&lt;/code&gt;. This will serve as a while-true loop that we can exit from when the application ends.&lt;/p&gt;

&lt;p&gt;Also, for our convenience, we have created a new type &lt;code&gt;AsyncRedisStreamGenerator&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis-client.type.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;type&lt;/span&gt; &lt;span class="nx"&gt;AsyncRedisStreamGenerator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;AsyncGenerator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;
  &lt;span class="nx"&gt;RedisStreamMessage&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;void&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nx"&gt;unknown&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is an async generator that will &lt;code&gt;yield&lt;/code&gt; or produce &lt;code&gt;RedisStreamMessage&lt;/code&gt; values, will return nothing (&lt;code&gt;void&lt;/code&gt;) when it returns and will accept &lt;code&gt;unknown&lt;/code&gt; messages, since we won't pass anything to &lt;code&gt;next&lt;/code&gt; method of our generator.&lt;/p&gt;

&lt;p&gt;We are going to call &lt;code&gt;readStream&lt;/code&gt; method in an infinite loop:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;lastMessageId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;$&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isAlive&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;readStream&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
        &lt;span class="nx"&gt;streamName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;blockMs&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// 0 = infinite blocking until at least one message is fetched, or timeout happens&lt;/span&gt;
        &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// max how many messages to fetch at a time&lt;/span&gt;
        &lt;span class="nx"&gt;lastMessageId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="p"&gt;});&lt;/span&gt;
&lt;span class="c1"&gt;// -- snip --&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As in our previous example, we will start to read data only when the generator is created and calls the stream by using &lt;code&gt;$&lt;/code&gt; as &lt;code&gt;lastMessageId&lt;/code&gt;.&lt;br&gt;
The rest is also the same as in the previous example, except we are calling it in a loop.&lt;/p&gt;

&lt;p&gt;Next, we will handle the case where we get an empty response. We will simply go into the next iteration without yielding any results&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;   &lt;span class="c1"&gt;// If no messages returned, continue to next iteration without yielding&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="nx"&gt;response&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;length&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;continue&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we have some responses, we are going to handle them:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;      &lt;span class="c1"&gt;// Update last message id to be the last message returned from redis&lt;/span&gt;
      &lt;span class="nx"&gt;lastMessageId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;length&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="k"&gt;for &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt; &lt;span class="k"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;response&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To avoid gaps in messages and read all messages from the time we started this generator, we will update &lt;code&gt;lastMessageId&lt;/code&gt; variable with the last message we received.&lt;/p&gt;

&lt;p&gt;Since we have &lt;code&gt;count&lt;/code&gt;, we will get an array of sizes ranging from 1 to &lt;code&gt;count&lt;/code&gt; with messages. However, this generator produces only a single message at a time. So we will iterate through the array and &lt;code&gt;yield&lt;/code&gt; only one message at once.&lt;/p&gt;

&lt;p&gt;That is for the generator part. Let's see how we can use it to solve our previous questions:&lt;/p&gt;

&lt;h3&gt;
  
  
  Reading multiple results
&lt;/h3&gt;

&lt;p&gt;Now let's use our generator to generate multiple results by creating a new method in &lt;code&gt;AppService&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;getMultipleNewMessages&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;number&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getStreamMessageGenerator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Record&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[];&lt;/span&gt;
    &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;counter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="k"&gt;await &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messageObj&lt;/span&gt; &lt;span class="k"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;push&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parseMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
      &lt;span class="nx"&gt;counter&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;counter&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;First, we create a generator:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getStreamMessageGenerator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We are passing it our count for "optimistic" fetch. We will try to get everything in a single Redis call. But if that does not happen, we will call Redis as often as needed.&lt;br&gt;
We create an array to collect our messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Record&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[];&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and define a &lt;code&gt;counter&lt;/code&gt; where we will count how many we have received.&lt;/p&gt;

&lt;p&gt;One convenient feature of generators is that they are iterable as they implement &lt;code&gt;@@iterator&lt;/code&gt; method. So we can call &lt;code&gt;for..of&lt;/code&gt; on them. In this case, since the generator is async it's &lt;code&gt;for await .. of&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="k"&gt;await &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messageObj&lt;/span&gt; &lt;span class="k"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;push&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parseMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
      &lt;span class="nx"&gt;counter&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;counter&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="nx"&gt;count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the count is reached, we simply &lt;code&gt;break&lt;/code&gt; out of the loop and then return our collected messages.&lt;/p&gt;

&lt;p&gt;Since Redis stores all values as strings, we also have created a generic parser method:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;//app.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="nf"&gt;parseMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Record&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nb"&gt;Object&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;entries&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;reduce&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;key&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parse&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="k"&gt;catch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;e&lt;/span&gt;&lt;span class="p"&gt;){&lt;/span&gt;
        &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;key&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nx"&gt;value&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;acc&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="p"&gt;{});&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To check that we indeed are getting specified amount of messages, we'll create a new endpoint &lt;code&gt;/messages&lt;/code&gt; where we will fetch 3 messages :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.controller.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;messages&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="nf"&gt;getMessages&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;appService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getMultipleNewMessages&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Again, when we call our newly created endpoint, we indeed, have 3 messages fetched:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr59uy0n57b8xhhhsfi97.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr59uy0n57b8xhhhsfi97.png" alt="Multiple stream messages displayed in browser" width="454" height="484"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Continuous reading of the stream
&lt;/h3&gt;

&lt;p&gt;The biggest power of a Redis stream comes from using it as a continuous data stream. To do that, we can re-use our generator:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="nx"&gt;isAlive&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;onModuleInit&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;continuousReadMessages&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="nf"&gt;onModuleDestroy&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;

    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isAlive&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;continuousReadMessages&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getStreamMessageGenerator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="k"&gt;await &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messageObj&lt;/span&gt; &lt;span class="k"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s2"&gt;`Got message with ID: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;stringify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parseMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="kc"&gt;undefined&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isAlive&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Same as with multiple message fetch, we start by creating a generator:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getStreamMessageGenerator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We give it an arbitrary number for &lt;code&gt;count&lt;/code&gt; since we want to try to fetch multiple responses in one Redis call, if possible.&lt;/p&gt;

&lt;p&gt;And then we just do whatever we need to do with the data in the loop. So here it's simply logging the message.&lt;/p&gt;

&lt;p&gt;And breaking the loop once the module is destroyed, we don't have any dangling references and can stop our loop.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="k"&gt;await &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messageObj&lt;/span&gt; &lt;span class="k"&gt;of&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="nx"&gt;console&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s2"&gt;`Got message with ID: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;JSON&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;stringify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parseMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="kc"&gt;undefined&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="p"&gt;);&lt;/span&gt;
      &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;isAlive&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And we call this method once on our module init, the same as we did with populating stream:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;
  &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;onModuleInit&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;populateStream&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;continuousReadMessages&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And in our console, we should see that we are fetching the messages continuously&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;...
app      | Got message with ID: 1675377159049-0 &lt;span class="o"&gt;{&lt;/span&gt;
app      |   &lt;span class="s2"&gt;"hello"&lt;/span&gt;: &lt;span class="s2"&gt;"world"&lt;/span&gt;,
app      |   &lt;span class="s2"&gt;"date"&lt;/span&gt;: &lt;span class="s2"&gt;"2023-02-02T22:32:39.048Z"&lt;/span&gt;,
app      |   &lt;span class="s2"&gt;"nestedObj"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
app      |     &lt;span class="s2"&gt;"num"&lt;/span&gt;: 48
app      |   &lt;span class="o"&gt;}&lt;/span&gt;
app      | &lt;span class="o"&gt;}&lt;/span&gt;
app      | Got message with ID: 1675377160050-0 &lt;span class="o"&gt;{&lt;/span&gt;
app      |   &lt;span class="s2"&gt;"hello"&lt;/span&gt;: &lt;span class="s2"&gt;"world"&lt;/span&gt;,
app      |   &lt;span class="s2"&gt;"date"&lt;/span&gt;: &lt;span class="s2"&gt;"2023-02-02T22:32:40.050Z"&lt;/span&gt;,
app      |   &lt;span class="s2"&gt;"nestedObj"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
app      |     &lt;span class="s2"&gt;"num"&lt;/span&gt;: 50
app      |   &lt;span class="o"&gt;}&lt;/span&gt;
app      | &lt;span class="o"&gt;}&lt;/span&gt;
...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Refactor for a single message
&lt;/h4&gt;

&lt;p&gt;Since we are using the generator to handle stream messages, let's refactor our single message fetch example.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="nf"&gt;getSingleNewMessage&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getStreamMessageGenerator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messageObj&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;done&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parseMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Same as for all examples, we are creating a generator on our &lt;code&gt;EXAMPLE_STREAM_NAME&lt;/code&gt; stream, this time with &lt;code&gt;count&lt;/code&gt; 1.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;streamService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getStreamMessageGenerator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nx"&gt;EXAMPLE_STREAM_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Since we need only a single message, we do not need to use a loop. Instead, we call &lt;code&gt;next&lt;/code&gt; method manually:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;messageObj&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;generator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and then handle the response - if the generator has not returned/ended - it's not &lt;code&gt;done&lt;/code&gt; and we have a message t return, we parse message and return it to the client:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;    &lt;span class="k"&gt;if &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;done&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parseMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;messageObj&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Everything else stays the same for the client, and the same result is returned.&lt;/p&gt;

&lt;p&gt;This is the end of part 2  of the 3-part series.&lt;br&gt;
We built on what we created in part 1. As a result, you should have an application that can fetch single or multiple responses and continuously fetch new results from the Redis stream.&lt;/p&gt;

&lt;p&gt;In &lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-3-consumer-groups-3am7"&gt;part 3&lt;/a&gt;, we are going to look at how we can use consumer groups to divide messages to multiple consumers so that each one gets a unique set of messages (not like we have now, where if we connected more clients, all clients would get getting all of the messages).&lt;/p&gt;

</description>
      <category>javascript</category>
      <category>react</category>
      <category>programming</category>
      <category>softwaredevelopment</category>
    </item>
    <item>
      <title>Redis Streams + NestJS: Part 1 | Setup</title>
      <dc:creator>Krisjanis Kallings</dc:creator>
      <pubDate>Wed, 22 Feb 2023 16:58:39 +0000</pubDate>
      <link>https://dev.to/magickriss/redis-streams-nestjs-part-1-setup-1jcj</link>
      <guid>https://dev.to/magickriss/redis-streams-nestjs-part-1-setup-1jcj</guid>
      <description>&lt;h2&gt;
  
  
  Intro
&lt;/h2&gt;

&lt;p&gt;This is part 1 of a 3-part series, where we will explore how to use Redis streams with NestJS.&lt;/p&gt;

&lt;p&gt;It is structured in 3 parts:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Part 1 - Setting up the NestJS application and connecting to Redis&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-2-reading-from-stream-1643"&gt;Part 2&lt;/a&gt; -Populating Redis streams and reading from in &lt;strong&gt;fan-out mode&lt;/strong&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-3-consumer-groups-3am7"&gt;Part 3&lt;/a&gt; - Using &lt;strong&gt;consumer groups&lt;/strong&gt; to handle one stream from multiple actors in a way that one message is sent to and processed only by a single actor (consumer)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By the end of this series, you will have the knowledge and tools necessary to create your own NestJS app that utilizes Redis streams to handle real-time data.&lt;/p&gt;

&lt;p&gt;Full code is available on &lt;a href="https://github.com/MagicKriss/Redis-Streams-NestJS-Example" rel="noopener noreferrer"&gt;the github&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Assumed prior knowledge
&lt;/h2&gt;

&lt;p&gt;Before diving into this post, it is assumed that you have a basic understanding of &lt;strong&gt;NestJS&lt;/strong&gt; and &lt;strong&gt;Redis&lt;/strong&gt;.&lt;br&gt;
Familiarity with &lt;strong&gt;JavaScript generators&lt;/strong&gt; is also recommended, as they will be used in the examples to listen to the Redis stream continuously.&lt;br&gt;
We will also use &lt;strong&gt;Docker&lt;/strong&gt; and &lt;strong&gt;Docker Compose&lt;/strong&gt; to create and run our Redis server and application.&lt;br&gt;
NestJS is a typescript framework, so you should be familiar with it too.&lt;br&gt;
To familiarize yourself with these things, I recommend:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://docs.nestjs.com/" rel="noopener noreferrer"&gt;NestJS docs&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://www.typescriptlang.org/" rel="noopener noreferrer"&gt;TypeScript&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;Redis:

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://redis.io/docs/data-types/streams/" rel="noopener noreferrer"&gt;Redis streams intro&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://redis.io/docs/data-types/streams-tutorial/" rel="noopener noreferrer"&gt;The tutorial&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;Generators:

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Generator" rel="noopener noreferrer"&gt;JavaScript Generators reference&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;Dr. Axel Rauschmayer's book's "JavaScript for impatient programmers" sections on &lt;a href="https://exploringjs.com/impatient-js/ch_sync-generators.html" rel="noopener noreferrer"&gt;synchronous&lt;/a&gt; and &lt;a href="https://exploringjs.com/impatient-js/ch_async-iteration.html#async-generators" rel="noopener noreferrer"&gt;asynchronous &lt;/a&gt; generators (available free online)&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;Docker:

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://www.docker.com/get-started/" rel="noopener noreferrer"&gt;Docker&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.docker.com/get-started/08_using_compose/" rel="noopener noreferrer"&gt;Docker Compose&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Side note
&lt;/h3&gt;

&lt;blockquote&gt;
&lt;p&gt;It's also important to note that while this post explicitly covers the integration of Redis streams with NestJS, the concepts and techniques discussed here can also be applied to other frameworks and languages. Redis streams is a versatile tool that can be used in a wide range of applications and architectures. Therefore, even if you are not using NestJS, the general concepts and approaches outlined in this post can still apply to your projects and stack.&lt;/p&gt;
&lt;/blockquote&gt;
&lt;h2&gt;
  
  
  Setup
&lt;/h2&gt;
&lt;h3&gt;
  
  
  Creating a new NestJS application
&lt;/h3&gt;

&lt;p&gt;To get started, we will need to create a new NestJS application. This can be done using the NestJS CLI, which can be installed via npm by running &lt;code&gt;npm i -g @nestjs/cli&lt;/code&gt;. Once installed, you can create a new application by running &lt;code&gt;nest new redis-streams&lt;/code&gt;, where "redis-streams" is the name of your application.&lt;br&gt;
This will create a new directory with the same name, containing the basic structure of a NestJS application.&lt;/p&gt;
&lt;h3&gt;
  
  
  Docker setup
&lt;/h3&gt;

&lt;p&gt;To simplify things, we are going to use &lt;code&gt;docker-compose.yml&lt;/code&gt; file to include both our app and Redis server:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="c1"&gt;# docker-compose.yml&lt;/span&gt;

&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;3'&lt;/span&gt;

&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;app&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;app&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;node:18&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;HTTPS_METHOD&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;noredirect&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;8081:3000&lt;/span&gt;
    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./:/usr/src/app/&lt;/span&gt;
    &lt;span class="na"&gt;working_dir&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/usr/src/app&lt;/span&gt;
    &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;npm run start:dev&lt;/span&gt;

  &lt;span class="na"&gt;redis&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redis&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redis:7&lt;/span&gt;
    &lt;span class="na"&gt;restart&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;always&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now you can run &lt;code&gt;docker-compose up -d&lt;/code&gt; to build and run your services. To see console output, use &lt;code&gt;docker-compose logs&lt;/code&gt;&lt;br&gt;
You should see the following message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt; LOG [NestApplication] Nest application successfully started +7ms
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Connecting to Redis
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Setting up &lt;code&gt;node-redis&lt;/code&gt; client library
&lt;/h3&gt;

&lt;p&gt;To call Redis from we are going to use the official NodeJS Redis client library &lt;a href="https://github.com/redis/node-redis" rel="noopener noreferrer"&gt;node-redis&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ npm i redis
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There are also other libraries, e.g. &lt;code&gt;ioredis&lt;/code&gt; is noteworthy alternative. You can see the list of clients &lt;a href="https://redis.io/resources/clients/#nodejs" rel="noopener noreferrer"&gt;on the Redis website&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Redis module
&lt;/h4&gt;

&lt;p&gt;Finally, we can start working with Redis from our application.&lt;br&gt;
First, will create a new module for Redis-related services.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ nest g module redis
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should see this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.module.ts&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;Module&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;@nestjs/common&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Module&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
  &lt;span class="na"&gt;providers&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[],&lt;/span&gt;
  &lt;span class="na"&gt;exports&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[],&lt;/span&gt;
&lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisModule&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  RedisClient factory
&lt;/h4&gt;

&lt;p&gt;To use &lt;code&gt;RedisClient&lt;/code&gt; from &lt;code&gt;node-redis&lt;/code&gt; library we are going to create a factory provider for it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis-client.factory.ts&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;FactoryProvider&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;@nestjs/common&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;createClient&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;redis&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nx"&gt;RedisClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;REDIS_CLIENT&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;./redis-client.type&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;redisClientFactory&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;FactoryProvider&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;RedisClient&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="na"&gt;provide&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;REDIS_CLIENT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;useFactory&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;async &lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;createClient&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="na"&gt;url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;redis://redis:6379/0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;});&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;};&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's break it down:&lt;br&gt;
We create a &lt;code&gt;FactoryProvider&lt;/code&gt; that will call async function provided in &lt;code&gt;useFactory&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis-client.factory.ts&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="nx"&gt;useFactory&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;async &lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;createClient&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt; &lt;span class="na"&gt;url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;redis://redis:6379/0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;});&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;We call &lt;code&gt;createClient&lt;/code&gt; function from &lt;code&gt;redis&lt;/code&gt; library, and pass it the URL consisting of &lt;code&gt;{protocol}://{host}:{port}/{database}&lt;/code&gt;, where:

&lt;ul&gt;
&lt;li&gt;protocol= &lt;code&gt;redis&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;host = &lt;code&gt;redis&lt;/code&gt; - this is specified in &lt;code&gt;docker-compose.yml&lt;/code&gt; with &lt;code&gt;container_host: redis&lt;/code&gt;. Usually, you would create an environment variable with your Redis instance IP and use it here.&lt;/li&gt;
&lt;li&gt;port = &lt;code&gt;6379&lt;/code&gt; - default Redis port&lt;/li&gt;
&lt;li&gt;database = &lt;code&gt;0&lt;/code&gt; default database&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;We connect to the Redis server &lt;code&gt;await client.connect();&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Return the created &amp;amp; connected client.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;You may have noticed that we did not provide an instance of &lt;code&gt;RedisClient&lt;/code&gt; type but &lt;code&gt;REDIS_CLIENT&lt;/code&gt;, which is our injection token. Also, &lt;code&gt;RedisClient&lt;/code&gt; is our custom type, not &lt;code&gt;redis&lt;/code&gt;.&lt;br&gt;
This is due to &lt;code&gt;node-redis&lt;/code&gt; on v4 not exporting the &lt;code&gt;RedisClient&lt;/code&gt; type, so we need to create our own in &lt;code&gt;/redis-client.type.ts&lt;/code&gt; file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;type&lt;/span&gt; &lt;span class="nx"&gt;RedisClient&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;ReturnType&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;typeof&lt;/span&gt; &lt;span class="nx"&gt;createClient&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;REDIS_CLIENT&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Symbol&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;REDIS_CLIENT&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;All that is left is to add this to our module:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.module.ts&lt;/span&gt;

&lt;span class="c1"&gt;//  --snip--&lt;/span&gt;
&lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Module&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
  &lt;span class="na"&gt;providers&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;redisClientFactory&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisModule&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Creating RedisService
&lt;/h4&gt;

&lt;p&gt;Next, let's add a &lt;code&gt;RedisService&lt;/code&gt; that will create a layer of abstraction from &lt;code&gt;RedisClient&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ nest g service redis
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In there we will inject our &lt;code&gt;RedisClient&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Injectable&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisService&lt;/span&gt; &lt;span class="k"&gt;implements&lt;/span&gt; &lt;span class="nx"&gt;OnModuleDestroy&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;constructor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Inject&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;REDIS_CLIENT&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;RedisClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;

  &lt;span class="nf"&gt;onModuleDestroy&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;quit&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Not to leave hanging connections, we are going to close the connection to the Redis by calling &lt;code&gt;this.redis.quit()&lt;/code&gt; on &lt;code&gt;OnModuleDestroy&lt;/code&gt; lifecycle event.&lt;/p&gt;

&lt;h4&gt;
  
  
  Ping for Redis server
&lt;/h4&gt;

&lt;p&gt;To check that we have successfully connected to Redis, let's add an API endpoint that calls call Redis &lt;code&gt;ping&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.service.ts&lt;/span&gt;

  &lt;span class="nf"&gt;ping&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redis&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ping&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's export RedisService so that we can use it in other modules:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// redis.module.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="nx"&gt;exports&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;RedisService&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now we will import it into &lt;code&gt;AppService&lt;/code&gt; and pass through our call to ping redis:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.service.ts&lt;/span&gt;

&lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Injectable&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;AppService&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
  &lt;span class="nf"&gt;constructor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;RedisService&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;

  &lt;span class="nf"&gt;redisPing&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;redisService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ping&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// --snip--&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, we can add a new endpoint to AppController that will execute a ping to the Redis server and send the response to the user:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="c1"&gt;// app.controller.ts&lt;/span&gt;

&lt;span class="c1"&gt;// --snip--&lt;/span&gt;

&lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Controller&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;export&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;AppController&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="nf"&gt;constructor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="nx"&gt;appService&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;AppService&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;

  &lt;span class="p"&gt;@&lt;/span&gt;&lt;span class="nd"&gt;Get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;redis-ping&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="nf"&gt;redisPing&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;appService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;redisPing&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, with our app and Redis server running, we can open our &lt;code&gt;/redis-ping&lt;/code&gt; endpoint &lt;a href="http://localhost:8081/redis-ping" rel="noopener noreferrer"&gt;http://localhost:8081/redis-ping&lt;/a&gt;, and you should see the response:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fih3bt59mzkezab654ki2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fih3bt59mzkezab654ki2.png" alt="Pong message displayed in browser" width="512" height="271"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Congratulations! We have finished part 1 of the 3-part series and created a NestJS application with a working connection to our Redis server! In &lt;a href="https://dev.to/magickriss/redis-streams-nestjs-part-2-reading-from-stream-1643"&gt;part 2&lt;/a&gt; we are going to create, populate and read from Redis streams.&lt;/p&gt;

</description>
      <category>firebase</category>
      <category>wordpress</category>
      <category>webdev</category>
      <category>discuss</category>
    </item>
  </channel>
</rss>
