Introduction
Running image processing, email sending, and external API calls synchronously in requests causes timeouts. Extract to async processing with Message Queues. Generate designs with Claude Code.
CLAUDE.md Message Queue Rules
## Message Queue Design Rules
### Queue Selection
- Local/single server: BullMQ (Redis, simple)
- Production/high availability: AWS SQS (managed, scalable)
- High throughput: SQS + Lambda (no polling needed)
### Job Design
- Jobs must be idempotent (assume retries)
- Minimal payload (pass ID only, fetch from DB)
- Set timeouts (prevent hangs)
### Retry / Failure Handling
- Retry with exponential backoff (3 times: 1min→5min→30min)
- After max retries, move to Dead Letter Queue (DLQ)
- Monitor DLQ regularly with alerts
- Save detailed failure job logs (for debugging)
Generated BullMQ Implementation
// src/queues/imageProcessor.ts
import { Queue, Worker, Job } from 'bullmq';
const connection = { host: process.env.REDIS_HOST, port: 6379 };
interface ImageProcessJob {
imageId: string;
userId: string;
operations: Array<
| { type: 'resize'; width: number; height: number }
| { type: 'convert'; format: 'webp' | 'avif' | 'jpeg' }
>;
}
export const imageQueue = new Queue<ImageProcessJob>('image-processing', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 60_000 }, // 1min → 5min → 25min
removeOnComplete: { count: 1000 },
removeOnFail: false, // Keep failed jobs for debugging
},
});
export const imageWorker = new Worker<ImageProcessJob>(
'image-processing',
async (job: Job<ImageProcessJob>) => {
const { imageId, operations } = job.data;
const image = await prisma.image.findUniqueOrThrow({ where: { id: imageId } });
const originalBuffer = await downloadFromS3(image.s3Key);
let pipeline = sharp(originalBuffer);
for (const [index, op] of operations.entries()) {
await job.updateProgress(Math.round((index / operations.length) * 80));
if (op.type === 'resize') {
pipeline = pipeline.resize(op.width, op.height, { fit: 'inside', withoutEnlargement: true });
} else if (op.type === 'convert') {
pipeline = pipeline.toFormat(op.format);
}
}
const processedBuffer = await pipeline.toBuffer();
const processedKey = `processed/${imageId}.webp`;
await uploadToS3(processedKey, processedBuffer);
await prisma.image.update({
where: { id: imageId },
data: { processedKey, processedAt: new Date(), status: 'processed' },
});
await job.updateProgress(100);
return { processedKey, size: processedBuffer.length };
},
{ connection, concurrency: 5 }
);
imageWorker.on('failed', async (job, err) => {
if (job && job.attemptsMade >= (job.opts.attempts ?? 3)) {
await prisma.failedJob.create({
data: { queue: 'image-processing', jobId: job.id!, payload: job.data, error: err.message, failedAt: new Date() },
});
await sendAlert(`Image job ${job.id} failed permanently: ${err.message}`);
}
});
AWS SQS (Production)
// src/queues/sqsWorker.ts
async function processSQSMessages(): Promise<void> {
while (true) {
const response = await sqs.send(new ReceiveMessageCommand({
QueueUrl: QUEUE_URL,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20, // Long polling
VisibilityTimeout: 300,
}));
if (!response.Messages?.length) continue;
await Promise.allSettled(
response.Messages.map(async (message) => {
try {
const job = JSON.parse(message.Body!) as ImageProcessJob;
await processImage(job);
await sqs.send(new DeleteMessageCommand({ QueueUrl: QUEUE_URL, ReceiptHandle: message.ReceiptHandle! }));
} catch (err) {
// Don't delete → retry after VisibilityTimeout
// SQS auto-moves to DLQ after MaxReceiveCount
}
})
);
}
}
Summary
Design Message Queues with Claude Code:
- CLAUDE.md — document queue selection criteria, retry strategy, DLQ policy
- BullMQ — async processing with Redis backend (development to mid-scale production)
- Exponential backoff — retry strategy (immediate retry repeats the same failure)
- AWS SQS — managed, high-availability queue (Long polling + auto DLQ)
Review message queue designs with **Code Review Pack (¥980)* using /code-review at prompt-works.jp*
myouga (@myougatheaxo) — Axolotl VTuber.
Top comments (0)