DEV Community

Ninthsun
Ninthsun

Posted on

Connecting Bullmq to AWS MemoryDB (tls, hash slot)

While BullMQ is a powerful message queue which also can be implemented in Nestjs easily, it does not support tls connection or redis cluster very well.

I came across to small troubles when I was trying to use AWS MemoryDB with BullMQ. However, Using AWS MemoryDB instead of local redis server has two big difference.

1) uses tls
2) configured as a cluster

I'll go through these issues step by step. For those who aren't familiar with BullMQ in Nestjs, you can read BullMQ's Nestjs guide which is already pretty good.

TLS Connection

First big problem is that BullMQ does not provide a way to use tls connection, or a connection uri starting with rediss://.

BullModule.forRoot({
  connection: {
    host: 'localhost',
    port: 6379,
    tls: {
      ...
    },
  },
  prefix: 'bull',
})
Enter fullscreen mode Exit fullscreen mode

host field only accepts the host part of the uri, and doesn't want to know about protocol or other query string options. You can still provide tls object, but you need certificates which is not quite necessary for connecting MemoryDB.

Luckily, connection field also accepts IORedis connection object, and IORedis connection object does provide a way to use connection string.

So, install ioredis and create a connection object.

BullModule.forRootAsync({
  imports: [ConfigModule],
  useFactory: (config: ConfigService) => {
    const uri = config.get<string>('redisUri');
    const connection = new IORedis(uri, {
      // you'll get some warnings if not set to null
      maxRetriesPerRequest: null,
    });
    return {
      connection,
      prefix: 'bull',
    };
  },
  inject: [ConfigService],
}),
Enter fullscreen mode Exit fullscreen mode

Also, you can do the same with listeners.

async addQueue(data: YourData): Promise<JobResult> {
  const job = await this.queue.add(JOB_NAME, data, options);

  const listener = this.getListener();
  return job.waitUntilFinished(listener);
}

private getListener() {
  const uri = this.env.get<string>('redisUri');
  const connection = new IORedis(uri, { maxRetriesPerRequest: null });
  return new QueueEvents(QUEUE_NAME, { connection });
}
Enter fullscreen mode Exit fullscreen mode

Hash Slots

Now we made a connection to AWS MemoryDB. But then, it is very likely to get an error saying,

Worker error ReplyError: CROSSSLOT Keys in request don't hash to the same slot
Enter fullscreen mode Exit fullscreen mode

To understand what this error means, you first have to know how Redis Cluster stores many keys across its nodes. Redis Cluster implements a new identifier called slots. Each key is given a slot number, and this slot number is used as an index to find which key-value is in which node.

Redis Cluster Hash Slots

The problem is, redis transaction is only possible between the keys with same slots. Meaning you cannot make a transaction with key3and key4 from the image above, although they are in the same node. And this slot numbers are assigned by some hash algorithm based on the key. The resulted slots have maximum number up to 16384, still different keys are likely to have different slots assigned.

One way to control these assigned slots is to use hash tags. Hash tags is a part of key covered in brackets{}. For example, user1:session and user1:profile is not hash-tagged, so they are likely to get different hash slots. But, {user1}:session and {user1}:profile have hash tag string - user1, and this string is only hashed to get the slot. As a result, two keys are stored under same slot.

If you have a single queue within the same BullMQ connection, then hash tagging prefix like the BullMQ docs is good enough.

return {
  connection,
  prefix: '{bull}',
};
Enter fullscreen mode Exit fullscreen mode

But if you use several queues with the same Redis connection, you can also hash tag queue names. In this case, I recommend creating a constant as queue name is quite frequently used across the code.

// const.ts
export const QueueConfig = {
  QUEUE_NAME: '{prompt}',
} as const;


// prompt.module.ts
@Module({
  imports: [
    BullModule.registerQueue({
      name: QueueConfig.QUEUE_NAME,
      defaultJobOptions: { ... },
    }),
  ],
  ...
})
export class PromptModule {}


// prompt.queue.ts
@Injectable()
@QueueEventsListener(QueueConfig.QUEUE_NAME)
export class PromptQueue extends QueueEventsHost {
  constructor(
    @InjectQueue(QueueConfig.QUEUE_NAME) private queue: Queue<PromptImageForm>,
  ) {
    super();
  }

  async add() { ... }
}

// prompt.processor.ts
@Processor(QueueConfig.QUEUE_NAME, { ... })
export class PromptProcessor extends WorkerHost { ... }
Enter fullscreen mode Exit fullscreen mode

References

Top comments (0)