DEV Community

tonybui1812
tonybui1812

Posted on

Redis Streams - model data as a log or a stream of events

Redis Streams is a feature that allows you to model data as a log or a stream of events. You can use it for various purposes, including event-driven architecture, message queuing, and real-time data processing. Here's an example of how you can use Redis Streams to implement a simple message queue:

const redis = require('redis');
const { promisify } = require('util');

const redisClient = redis.createClient();
const xaddAsync = promisify(redisClient.xadd).bind(redisClient);
const xreadgroupAsync = promisify(redisClient.xreadgroup).bind(redisClient);

const STREAM_NAME = 'message_stream';
const CONSUMER_GROUP_NAME = 'message_consumers';

// Create a stream (if it doesn't exist)
async function createStreamIfNotExists() {
  try {
    await redisClient.xgroup('CREATE', STREAM_NAME, CONSUMER_GROUP_NAME, '$', 'MKSTREAM');
  } catch (err) {
    // Ignore if the stream already exists
    if (!err.message.includes('BUSYGROUP Consumer Group name already exists')) {
      throw err;
    }
  }
}

// Produce a message to the stream
async function produceMessage(message) {
  const messageId = await xaddAsync(STREAM_NAME, '*', 'message', message);
  console.log(`Produced message with ID: ${messageId}`);
}

// Consume messages from the stream
async function consumeMessages() {
  const consumerName = 'consumer-1';

  while (true) {
    try {
      const messages = await xreadgroupAsync(
        'GROUP',
        CONSUMER_GROUP_NAME,
        consumerName,
        'BLOCK',
        0,
        'COUNT',
        10,
        'STREAMS',
        STREAM_NAME,
        '>',
      );

      for (const [stream, messageData] of messages) {
        for (const [messageId, message] of messageData) {
          console.log(`Received message with ID ${messageId}: ${message}`);
          // Process the message here

          // Acknowledge the message to remove it from the stream
          await redisClient.xack(STREAM_NAME, CONSUMER_GROUP_NAME, messageId);
        }
      }
    } catch (err) {
      console.error('Error consuming messages:', err);
    }
  }
}

(async () => {
  await createStreamIfNotExists();

  // Start consuming messages
  consumeMessages();

  // Produce some example messages
  for (let i = 1; i <= 10; i++) {
    await produceMessage(`Message ${i}`);
    await new Promise((resolve) => setTimeout(resolve, 1000)); // Delay between messages
  }
})();
Enter fullscreen mode Exit fullscreen mode

In this example:

  • We create a Redis stream named message_stream and a consumer group named message_consumers.

  • The produceMessage function is used to produce messages to the stream, and the consumeMessages function consumes messages from the stream.

  • Messages are produced to the stream with unique IDs, and consumers can read and acknowledge (ack) them. Once a message is acknowledged, it's considered processed and removed from the stream.

  • The consumer script runs continuously, waiting for new messages to arrive in the stream. It processes each message and acknowledges it.

Redis Streams is a versatile feature that can be used for more complex use cases beyond simple message queues, including real-time event processing and log aggregation. You can customize and extend this example to fit your specific requirements.

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay