Claude Codeに「Redisでメッセージキューを作りたい。コンシューマーグループとDLQも必要」と伝えると、Redis Streamsを使った堅牢な設計を生成してくれる。本記事ではその実装パターンをまとめる。
Redis Streamsとは
Redis 5.0から追加されたログ型データ構造。Kafkaに近い概念で、コンシューマーグループ・ACK・未処理メッセージの追跡をRedis単体で実現できる。追加インフラなしで信頼性の高いメッセージ処理が構築できる。
1. XADDでメッセージ追加(MAXLEN ~10000)
ストリームが無限に肥大化しないよう、MAXLEN ~10000で近似トリミングを指定する。~によって厳密ではなく効率的なトリミングが行われる。
import Redis from 'ioredis';
const redis = new Redis();
async function publishEvent(streamKey: string, payload: Record<string, string>) {
const id = await redis.xadd(
streamKey,
'MAXLEN', '~', '10000',
'*',
...Object.entries(payload).flat()
);
console.log(`Published: ${id}`);
return id;
}
await publishEvent('orders', {
orderId: 'ord-123',
userId: 'user-456',
amount: '9800',
});
Claude Codeへの指示:「XADDでイベントを追加。ストリームはMAXLEN ~10000でトリミングして」と伝えると即座にこのパターンを出力する。
2. XREADGROUPでコンシューマーグループ処理
コンシューマーグループを使うと、複数ワーカーが同じストリームを並列処理できる。各メッセージは1つのコンシューマーにのみ配信される。
const GROUP = 'order-processors';
const STREAM = 'orders';
async function ensureGroup() {
try {
await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
} catch (e: any) {
if (!e.message.includes('BUSYGROUP')) throw e;
}
}
async function consume(consumerName: string) {
const results = await redis.xreadgroup(
'GROUP', GROUP, consumerName,
'COUNT', '10',
'BLOCK', '2000',
'STREAMS', STREAM, '>'
) as any[];
if (!results) return [];
return results[0][1] as [string, string[]][];
}
3. 手動ACKで処理完了を確認
XACKを送るまでメッセージはPELに残る。処理成功後に明示的にACKする。
async function processMessage(msgId: string, data: Record<string, string>) {
try {
await handleOrder(data);
await redis.xack(STREAM, GROUP, msgId);
console.log(`ACK: ${msgId}`);
} catch (err) {
console.error(`Failed: ${msgId}`, err);
// ACKしない → PELに残る → 後でXPENDINGで検出
}
}
4. XPENDING + XCLAIMで30秒スタックメッセージを再割り当て
30秒以上ACKされないメッセージ(ワーカーがクラッシュした等)を検出し、別コンシューマーに再割り当てする。
const IDLE_THRESHOLD_MS = 30_000;
async function reclaim(consumerName: string) {
const pending = await redis.xpending(
STREAM, GROUP,
'-', '+',
'100'
) as any[];
for (const [msgId, owner, idleMs] of pending) {
if (idleMs < IDLE_THRESHOLD_MS) continue;
const claimed = await redis.xclaim(
STREAM, GROUP, consumerName,
String(IDLE_THRESHOLD_MS),
msgId
) as any[];
if (claimed.length > 0) {
console.log(`Reclaimed from ${owner}: ${msgId}`);
}
}
}
5. 3回失敗でDLQストリームへ転送
XCLAIMのdelivery-countを使い、3回以上配信されたメッセージをDLQ(Dead Letter Queue)ストリームに移す。
const DLQ_STREAM = 'orders:dlq';
const MAX_RETRIES = 3;
async function processWithDLQ(consumerName: string) {
const messages = await consume(consumerName);
for (const [msgId, fields] of messages) {
const data = parseFields(fields);
const pendingDetail = await redis.xpending(
STREAM, GROUP, msgId, msgId, '1'
) as any[];
const deliveryCount = pendingDetail[0]?.[3] ?? 1;
if (deliveryCount > MAX_RETRIES) {
await redis.xadd(
DLQ_STREAM,
'MAXLEN', '~', '1000',
'*',
'originalId', msgId,
'reason', 'max-retries-exceeded',
...Object.entries(data).flat()
);
await redis.xack(STREAM, GROUP, msgId);
console.warn(`DLQ: ${msgId} (delivered ${deliveryCount} times)`);
continue;
}
await processMessage(msgId, data);
}
}
function parseFields(fields: string[]): Record<string, string> {
const result: Record<string, string> = {};
for (let i = 0; i < fields.length; i += 2) {
result[fields[i]] = fields[i + 1];
}
return result;
}
無限ループで動かす
async function startWorker(consumerName: string) {
await ensureGroup();
console.log(`Worker ${consumerName} started`);
while (true) {
await processWithDLQ(consumerName);
await reclaim(consumerName);
}
}
startWorker(`worker-${process.pid}`);
まとめ
- XADD + MAXLEN ~10000 — ストリームを自動トリミング。メモリ使用量を制御しつつ高速追記。
- XREADGROUPコンシューマーグループ — 複数ワーカーで並列処理。1メッセージは1ワーカーのみが処理。
-
手動ACK — 処理成功後のみ
XACK。クラッシュしてもPELでメッセージを追跡可能。 - XPENDING + XCLAIM + DLQ — 30秒スタックを再割り当て、3回失敗でDLQへ。データロストゼロのフェイルセーフ設計。
Claude Codeにこの設計要件を伝えると、型安全なTypeScriptで完全な実装を一気に生成してくれる。「コンシューマーグループ・ACK管理・DLQ」というキーワードを含めるのがコツだ。
Code Review Pack ¥980 — Claude Codeで生成したコードをレビューするためのプロンプトセット。セキュリティ・パフォーマンス・可読性の観点でチェックリストを網羅。
https://prompt-works.jp
Top comments (0)