はじめに
バックグラウンドジョブの進捗をリアルタイムにフロントエンドへ伝える手段として、Server-Sent Events(SSE)は軽量かつシンプルな選択肢です。本記事では Claude Code を活用し、BullMQ + Redis Pub/Sub + SSE の構成でジョブ進捗ストリーミングを設計する方法を解説します。
全体アーキテクチャ
Client (React)
└─ EventSource /api/jobs/:id/progress
└─ SSE Endpoint (Express)
└─ Redis Subscriber(専用接続)
└─ Redis Pub/Sub Channel: job:progress:{id}
└─ JobProgressPublisher (BullMQ Worker内)
ポイントは Redis の接続を2本に分ける ことです。subscribe 状態のクライアントは通常のコマンドを発行できないため、パブリッシュ用とサブスクライブ用を明示的に分離します。
JobProgressPublisher の実装
import { Redis } from 'ioredis';
const publisher = new Redis(process.env.REDIS_URL);
export async function publishJobProgress(
jobId: string,
payload: { step: string; percent: number; message?: string }
): Promise<void> {
const channel = `job:progress:${jobId}`;
await publisher.publish(channel, JSON.stringify({
...payload,
timestamp: Date.now(),
}));
}
BullMQ ワーカー内では処理の各ステップで publishJobProgress を呼び出します。
import { Worker } from 'bullmq';
import { publishJobProgress } from './publisher';
const worker = new Worker('mainQueue', async (job) => {
await publishJobProgress(job.id!, { step: 'start', percent: 0 });
// ... 処理
await publishJobProgress(job.id!, { step: 'processing', percent: 50 });
// ... 処理
await publishJobProgress(job.id!, { step: 'done', percent: 100 });
}, { connection: new Redis(process.env.REDIS_URL) });
SSE エンドポイントの実装
import express from 'express';
import { Redis } from 'ioredis';
const router = express.Router();
const HEARTBEAT_INTERVAL_MS = 15_000;
const AUTO_CLOSE_MS = 5 * 60 * 1000;
router.get('/jobs/:id/progress', async (req, res) => {
const jobId = req.params.id;
const channel = `job:progress:${jobId}`;
// SSE ヘッダー設定
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
// 専用 Subscriber 接続
const subscriber = new Redis(process.env.REDIS_URL);
await subscriber.subscribe(channel);
// 再接続用の最終状態を Redis から取得して送信
const finalState = await new Redis(process.env.REDIS_URL)
.get(`job:final:${jobId}`);
if (finalState) {
res.write(`data: ${finalState}
`);
res.end();
await subscriber.quit();
return;
}
// メッセージ転送
subscriber.on('message', (_ch, message) => {
res.write(`data: ${message}
`);
const parsed = JSON.parse(message);
if (parsed.percent === 100) {
cleanup();
}
});
// 15s ハートビート
const heartbeat = setInterval(() => {
res.write(': heartbeat
');
}, HEARTBEAT_INTERVAL_MS);
// 5分で自動クローズ
const autoClose = setTimeout(() => cleanup(), AUTO_CLOSE_MS);
const cleanup = async () => {
clearInterval(heartbeat);
clearTimeout(autoClose);
await subscriber.unsubscribe(channel);
await subscriber.quit();
res.end();
};
req.on('close', cleanup);
});
export default router;
最終状態の保存(再接続対応)
ジョブ完了時に最終状態を Redis へ保存しておくことで、クライアントが再接続しても最新状態を取得できます。
// ワーカー内で done 時に保存
await redis.set(
`job:final:${jobId}`,
JSON.stringify({ step: 'done', percent: 100 }),
'EX',
3600 // 1時間で期限切れ
);
React クライアント側の実装
import { useEffect, useState } from 'react';
interface Progress {
step: string;
percent: number;
message?: string;
}
export function useJobProgress(jobId: string) {
const [progress, setProgress] = useState<Progress | null>(null);
useEffect(() => {
const es = new EventSource(`/api/jobs/${jobId}/progress`);
es.onmessage = (e) => {
const data: Progress = JSON.parse(e.data);
setProgress(data);
if (data.percent === 100) {
es.close();
}
};
es.onerror = () => {
es.close();
};
return () => es.close();
}, [jobId]);
return progress;
}
コンポーネントで useJobProgress(jobId) を呼び出すだけで進捗バーと連動できます。
まとめ
- 接続分離: Redis の publish 用と subscribe 用を別インスタンスにすることで、コマンド制限を回避できる
-
ハートビート: 15秒間隔の
commentイベントでプロキシ・LBのアイドルタイムアウトを防止する - 再接続耐性: 完了済みジョブの最終状態を Redis に保存しておくことで、再接続時も即座に状態を返せる
- 自動クローズ: 5分のタイムアウトで孤立した SSE 接続をリソースリークなく閉じる
Code Review Pack ¥980
セキュリティ・パフォーマンス・可読性を一括レビューするプロンプトパック。Claude Code / Codex CLI 対応。
Top comments (0)