DEV Community

myougaTheAxo
myougaTheAxo

Posted on

Designing Message Queues with Claude Code: BullMQ, SQS, Dead Letter Queues

Introduction

Running image processing, email sending, and external API calls synchronously in requests causes timeouts. Extract to async processing with Message Queues. Generate designs with Claude Code.


CLAUDE.md Message Queue Rules

## Message Queue Design Rules

### Queue Selection
- Local/single server: BullMQ (Redis, simple)
- Production/high availability: AWS SQS (managed, scalable)
- High throughput: SQS + Lambda (no polling needed)

### Job Design
- Jobs must be idempotent (assume retries)
- Minimal payload (pass ID only, fetch from DB)
- Set timeouts (prevent hangs)

### Retry / Failure Handling
- Retry with exponential backoff (3 times: 1min→5min→30min)
- After max retries, move to Dead Letter Queue (DLQ)
- Monitor DLQ regularly with alerts
- Save detailed failure job logs (for debugging)
Enter fullscreen mode Exit fullscreen mode

Generated BullMQ Implementation

// src/queues/imageProcessor.ts
import { Queue, Worker, Job } from 'bullmq';

const connection = { host: process.env.REDIS_HOST, port: 6379 };

interface ImageProcessJob {
  imageId: string;
  userId: string;
  operations: Array<
    | { type: 'resize'; width: number; height: number }
    | { type: 'convert'; format: 'webp' | 'avif' | 'jpeg' }
  >;
}

export const imageQueue = new Queue<ImageProcessJob>('image-processing', {
  connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 60_000 }, // 1min → 5min → 25min
    removeOnComplete: { count: 1000 },
    removeOnFail: false, // Keep failed jobs for debugging
  },
});

export const imageWorker = new Worker<ImageProcessJob>(
  'image-processing',
  async (job: Job<ImageProcessJob>) => {
    const { imageId, operations } = job.data;
    const image = await prisma.image.findUniqueOrThrow({ where: { id: imageId } });
    const originalBuffer = await downloadFromS3(image.s3Key);

    let pipeline = sharp(originalBuffer);
    for (const [index, op] of operations.entries()) {
      await job.updateProgress(Math.round((index / operations.length) * 80));
      if (op.type === 'resize') {
        pipeline = pipeline.resize(op.width, op.height, { fit: 'inside', withoutEnlargement: true });
      } else if (op.type === 'convert') {
        pipeline = pipeline.toFormat(op.format);
      }
    }

    const processedBuffer = await pipeline.toBuffer();
    const processedKey = `processed/${imageId}.webp`;
    await uploadToS3(processedKey, processedBuffer);

    await prisma.image.update({
      where: { id: imageId },
      data: { processedKey, processedAt: new Date(), status: 'processed' },
    });

    await job.updateProgress(100);
    return { processedKey, size: processedBuffer.length };
  },
  { connection, concurrency: 5 }
);

imageWorker.on('failed', async (job, err) => {
  if (job && job.attemptsMade >= (job.opts.attempts ?? 3)) {
    await prisma.failedJob.create({
      data: { queue: 'image-processing', jobId: job.id!, payload: job.data, error: err.message, failedAt: new Date() },
    });
    await sendAlert(`Image job ${job.id} failed permanently: ${err.message}`);
  }
});
Enter fullscreen mode Exit fullscreen mode

AWS SQS (Production)

// src/queues/sqsWorker.ts
async function processSQSMessages(): Promise<void> {
  while (true) {
    const response = await sqs.send(new ReceiveMessageCommand({
      QueueUrl: QUEUE_URL,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 20, // Long polling
      VisibilityTimeout: 300,
    }));

    if (!response.Messages?.length) continue;

    await Promise.allSettled(
      response.Messages.map(async (message) => {
        try {
          const job = JSON.parse(message.Body!) as ImageProcessJob;
          await processImage(job);
          await sqs.send(new DeleteMessageCommand({ QueueUrl: QUEUE_URL, ReceiptHandle: message.ReceiptHandle! }));
        } catch (err) {
          // Don't delete → retry after VisibilityTimeout
          // SQS auto-moves to DLQ after MaxReceiveCount
        }
      })
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

Summary

Design Message Queues with Claude Code:

  1. CLAUDE.md — document queue selection criteria, retry strategy, DLQ policy
  2. BullMQ — async processing with Redis backend (development to mid-scale production)
  3. Exponential backoff — retry strategy (immediate retry repeats the same failure)
  4. AWS SQS — managed, high-availability queue (Long polling + auto DLQ)

Review message queue designs with **Code Review Pack (¥980)* using /code-review at prompt-works.jp*

myouga (@myougatheaxo) — Axolotl VTuber.

Top comments (0)