はじめに
メッセージキューとストリーム処理の違いを理解しながら——KafkaJSでコンシューマーグループ・オフセット管理・デッドレターキューを設計する。Claude Codeに生成させる。
Kafka設計のCLAUDE.mdルール
- autoCommit: false(手動オフセット管理)
- エラー時はリトライ3回 → DLQトピックへ
- 冪等性: processingIdで重複処理検知
- acks: 'all'(全レプリカへの書き込み確認)
生成されるKafka実装
// src/kafka/consumer.ts — 手動オフセット管理 + DLQ
export abstract class BaseKafkaConsumer<T = unknown> {
async start(): Promise<void> {
await this.consumer.run({
autoCommit: false,
eachMessage: async ({ partition, message, heartbeat }) => {
const event = JSON.parse(message.value!.toString()) as KafkaEvent<T>;
// 冪等性チェック
if (await redis.get(`processed:${event.eventId}`)) {
await this.commitOffset(partition, message.offset);
return;
}
// リトライ3回
for (let attempt = 1; attempt <= 3; attempt++) {
try {
await this.process(event, partition, message.offset);
await redis.set(`processed:${event.eventId}`, '1', { EX: 86400 });
await this.commitOffset(partition, message.offset);
await heartbeat();
return;
} catch (err) {
if (attempt === 3) {
await this.sendToDLQ(message, 'PROCESSING_FAILED', String(err));
await this.commitOffset(partition, message.offset);
}
}
}
},
});
}
private async sendToDLQ(msg: KafkaMessage, reason: string, error: string): Promise<void> {
await producer.send({
topic: `${this.topic}.dlq`,
messages: [{
value: msg.value,
headers: { ...msg.headers, 'dlq-reason': reason, 'dlq-error': error },
}],
});
}
}
まとめ
- 手動オフセットコミット で処理成功後のみオフセットを進め、再起動時の重複処理を防止
-
Redisで冪等性管理
processed:{eventId}を24時間TTLで保持 - DLQトピック に失敗理由・元トピック・エラー詳細をヘッダーに付けて送信
- グレースフルシャットダウン でSIGTERM時に全コンシューマーをdisconnect
Kafka設計のレビューは **Code Review Pack(¥980)* の /code-review で確認できます。*
Top comments (0)