DEV Community

myougaTheAxo
myougaTheAxo

Posted on • Originally published at zenn.dev

Claude CodeでSSEによるジョブ進捗ストリーミングを設計する:BullMQ・Redis Pub/Sub

はじめに

バックグラウンドジョブの進捗をリアルタイムにフロントエンドへ伝える手段として、Server-Sent Events(SSE)は軽量かつシンプルな選択肢です。本記事では Claude Code を活用し、BullMQ + Redis Pub/Sub + SSE の構成でジョブ進捗ストリーミングを設計する方法を解説します。


全体アーキテクチャ

Client (React)
  └─ EventSource /api/jobs/:id/progress
        └─ SSE Endpoint (Express)
              └─ Redis Subscriber(専用接続)
                    └─ Redis Pub/Sub Channel: job:progress:{id}
                          └─ JobProgressPublisher (BullMQ Worker内)
Enter fullscreen mode Exit fullscreen mode

ポイントは Redis の接続を2本に分ける ことです。subscribe 状態のクライアントは通常のコマンドを発行できないため、パブリッシュ用とサブスクライブ用を明示的に分離します。


JobProgressPublisher の実装

import { Redis } from 'ioredis';

const publisher = new Redis(process.env.REDIS_URL);

export async function publishJobProgress(
  jobId: string,
  payload: { step: string; percent: number; message?: string }
): Promise<void> {
  const channel = `job:progress:${jobId}`;
  await publisher.publish(channel, JSON.stringify({
    ...payload,
    timestamp: Date.now(),
  }));
}
Enter fullscreen mode Exit fullscreen mode

BullMQ ワーカー内では処理の各ステップで publishJobProgress を呼び出します。

import { Worker } from 'bullmq';
import { publishJobProgress } from './publisher';

const worker = new Worker('mainQueue', async (job) => {
  await publishJobProgress(job.id!, { step: 'start', percent: 0 });
  // ... 処理
  await publishJobProgress(job.id!, { step: 'processing', percent: 50 });
  // ... 処理
  await publishJobProgress(job.id!, { step: 'done', percent: 100 });
}, { connection: new Redis(process.env.REDIS_URL) });
Enter fullscreen mode Exit fullscreen mode

SSE エンドポイントの実装

import express from 'express';
import { Redis } from 'ioredis';

const router = express.Router();
const HEARTBEAT_INTERVAL_MS = 15_000;
const AUTO_CLOSE_MS = 5 * 60 * 1000;

router.get('/jobs/:id/progress', async (req, res) => {
  const jobId = req.params.id;
  const channel = `job:progress:${jobId}`;

  // SSE ヘッダー設定
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.flushHeaders();

  // 専用 Subscriber 接続
  const subscriber = new Redis(process.env.REDIS_URL);
  await subscriber.subscribe(channel);

  // 再接続用の最終状態を Redis から取得して送信
  const finalState = await new Redis(process.env.REDIS_URL)
    .get(`job:final:${jobId}`);
  if (finalState) {
    res.write(`data: ${finalState}

`);
    res.end();
    await subscriber.quit();
    return;
  }

  // メッセージ転送
  subscriber.on('message', (_ch, message) => {
    res.write(`data: ${message}

`);
    const parsed = JSON.parse(message);
    if (parsed.percent === 100) {
      cleanup();
    }
  });

  // 15s ハートビート
  const heartbeat = setInterval(() => {
    res.write(': heartbeat

');
  }, HEARTBEAT_INTERVAL_MS);

  // 5分で自動クローズ
  const autoClose = setTimeout(() => cleanup(), AUTO_CLOSE_MS);

  const cleanup = async () => {
    clearInterval(heartbeat);
    clearTimeout(autoClose);
    await subscriber.unsubscribe(channel);
    await subscriber.quit();
    res.end();
  };

  req.on('close', cleanup);
});

export default router;
Enter fullscreen mode Exit fullscreen mode

最終状態の保存(再接続対応)

ジョブ完了時に最終状態を Redis へ保存しておくことで、クライアントが再接続しても最新状態を取得できます。

// ワーカー内で done 時に保存
await redis.set(
  `job:final:${jobId}`,
  JSON.stringify({ step: 'done', percent: 100 }),
  'EX',
  3600 // 1時間で期限切れ
);
Enter fullscreen mode Exit fullscreen mode

React クライアント側の実装

import { useEffect, useState } from 'react';

interface Progress {
  step: string;
  percent: number;
  message?: string;
}

export function useJobProgress(jobId: string) {
  const [progress, setProgress] = useState<Progress | null>(null);

  useEffect(() => {
    const es = new EventSource(`/api/jobs/${jobId}/progress`);

    es.onmessage = (e) => {
      const data: Progress = JSON.parse(e.data);
      setProgress(data);
      if (data.percent === 100) {
        es.close();
      }
    };

    es.onerror = () => {
      es.close();
    };

    return () => es.close();
  }, [jobId]);

  return progress;
}
Enter fullscreen mode Exit fullscreen mode

コンポーネントで useJobProgress(jobId) を呼び出すだけで進捗バーと連動できます。


まとめ

  • 接続分離: Redis の publish 用と subscribe 用を別インスタンスにすることで、コマンド制限を回避できる
  • ハートビート: 15秒間隔の comment イベントでプロキシ・LBのアイドルタイムアウトを防止する
  • 再接続耐性: 完了済みジョブの最終状態を Redis に保存しておくことで、再接続時も即座に状態を返せる
  • 自動クローズ: 5分のタイムアウトで孤立した SSE 接続をリソースリークなく閉じる

Code Review Pack ¥980

セキュリティ・パフォーマンス・可読性を一括レビューするプロンプトパック。Claude Code / Codex CLI 対応。

👉 Code Review Pack を見る(PromptWorks)

Top comments (0)