DEV Community

myougaTheAxo
myougaTheAxo

Posted on • Originally published at zenn.dev

Claude CodeでRedis Streamsを設計する:コンシューマーグループ・ACK管理・DLQ

Claude Codeに「Redisでメッセージキューを作りたい。コンシューマーグループとDLQも必要」と伝えると、Redis Streamsを使った堅牢な設計を生成してくれる。本記事ではその実装パターンをまとめる。

Redis Streamsとは

Redis 5.0から追加されたログ型データ構造。Kafkaに近い概念で、コンシューマーグループ・ACK・未処理メッセージの追跡をRedis単体で実現できる。追加インフラなしで信頼性の高いメッセージ処理が構築できる。

1. XADDでメッセージ追加(MAXLEN ~10000)

ストリームが無限に肥大化しないよう、MAXLEN ~10000で近似トリミングを指定する。~によって厳密ではなく効率的なトリミングが行われる。

import Redis from 'ioredis';
const redis = new Redis();

async function publishEvent(streamKey: string, payload: Record<string, string>) {
  const id = await redis.xadd(
    streamKey,
    'MAXLEN', '~', '10000',
    '*',
    ...Object.entries(payload).flat()
  );
  console.log(`Published: ${id}`);
  return id;
}

await publishEvent('orders', {
  orderId: 'ord-123',
  userId: 'user-456',
  amount: '9800',
});
Enter fullscreen mode Exit fullscreen mode

Claude Codeへの指示:「XADDでイベントを追加。ストリームはMAXLEN ~10000でトリミングして」と伝えると即座にこのパターンを出力する。

2. XREADGROUPでコンシューマーグループ処理

コンシューマーグループを使うと、複数ワーカーが同じストリームを並列処理できる。各メッセージは1つのコンシューマーにのみ配信される。

const GROUP = 'order-processors';
const STREAM = 'orders';

async function ensureGroup() {
  try {
    await redis.xgroup('CREATE', STREAM, GROUP, '$', 'MKSTREAM');
  } catch (e: any) {
    if (!e.message.includes('BUSYGROUP')) throw e;
  }
}

async function consume(consumerName: string) {
  const results = await redis.xreadgroup(
    'GROUP', GROUP, consumerName,
    'COUNT', '10',
    'BLOCK', '2000',
    'STREAMS', STREAM, '>'
  ) as any[];

  if (!results) return [];
  return results[0][1] as [string, string[]][];
}
Enter fullscreen mode Exit fullscreen mode

3. 手動ACKで処理完了を確認

XACKを送るまでメッセージはPELに残る。処理成功後に明示的にACKする。

async function processMessage(msgId: string, data: Record<string, string>) {
  try {
    await handleOrder(data);

    await redis.xack(STREAM, GROUP, msgId);
    console.log(`ACK: ${msgId}`);
  } catch (err) {
    console.error(`Failed: ${msgId}`, err);
    // ACKしない → PELに残る → 後でXPENDINGで検出
  }
}
Enter fullscreen mode Exit fullscreen mode

4. XPENDING + XCLAIMで30秒スタックメッセージを再割り当て

30秒以上ACKされないメッセージ(ワーカーがクラッシュした等)を検出し、別コンシューマーに再割り当てする。

const IDLE_THRESHOLD_MS = 30_000;

async function reclaim(consumerName: string) {
  const pending = await redis.xpending(
    STREAM, GROUP,
    '-', '+',
    '100'
  ) as any[];

  for (const [msgId, owner, idleMs] of pending) {
    if (idleMs < IDLE_THRESHOLD_MS) continue;

    const claimed = await redis.xclaim(
      STREAM, GROUP, consumerName,
      String(IDLE_THRESHOLD_MS),
      msgId
    ) as any[];

    if (claimed.length > 0) {
      console.log(`Reclaimed from ${owner}: ${msgId}`);
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

5. 3回失敗でDLQストリームへ転送

XCLAIMのdelivery-countを使い、3回以上配信されたメッセージをDLQ(Dead Letter Queue)ストリームに移す。

const DLQ_STREAM = 'orders:dlq';
const MAX_RETRIES = 3;

async function processWithDLQ(consumerName: string) {
  const messages = await consume(consumerName);

  for (const [msgId, fields] of messages) {
    const data = parseFields(fields);

    const pendingDetail = await redis.xpending(
      STREAM, GROUP, msgId, msgId, '1'
    ) as any[];

    const deliveryCount = pendingDetail[0]?.[3] ?? 1;

    if (deliveryCount > MAX_RETRIES) {
      await redis.xadd(
        DLQ_STREAM,
        'MAXLEN', '~', '1000',
        '*',
        'originalId', msgId,
        'reason', 'max-retries-exceeded',
        ...Object.entries(data).flat()
      );
      await redis.xack(STREAM, GROUP, msgId);
      console.warn(`DLQ: ${msgId} (delivered ${deliveryCount} times)`);
      continue;
    }

    await processMessage(msgId, data);
  }
}

function parseFields(fields: string[]): Record<string, string> {
  const result: Record<string, string> = {};
  for (let i = 0; i < fields.length; i += 2) {
    result[fields[i]] = fields[i + 1];
  }
  return result;
}
Enter fullscreen mode Exit fullscreen mode

無限ループで動かす

async function startWorker(consumerName: string) {
  await ensureGroup();
  console.log(`Worker ${consumerName} started`);

  while (true) {
    await processWithDLQ(consumerName);
    await reclaim(consumerName);
  }
}

startWorker(`worker-${process.pid}`);
Enter fullscreen mode Exit fullscreen mode

まとめ

  1. XADD + MAXLEN ~10000 — ストリームを自動トリミング。メモリ使用量を制御しつつ高速追記。
  2. XREADGROUPコンシューマーグループ — 複数ワーカーで並列処理。1メッセージは1ワーカーのみが処理。
  3. 手動ACK — 処理成功後のみXACK。クラッシュしてもPELでメッセージを追跡可能。
  4. XPENDING + XCLAIM + DLQ — 30秒スタックを再割り当て、3回失敗でDLQへ。データロストゼロのフェイルセーフ設計。

Claude Codeにこの設計要件を伝えると、型安全なTypeScriptで完全な実装を一気に生成してくれる。「コンシューマーグループ・ACK管理・DLQ」というキーワードを含めるのがコツだ。


Code Review Pack ¥980 — Claude Codeで生成したコードをレビューするためのプロンプトセット。セキュリティ・パフォーマンス・可読性の観点でチェックリストを網羅。
https://prompt-works.jp

Top comments (0)