Node.jsはシングルスレッドのイベントループモデルを採用しているため、CPU集約型タスク(画像処理、暗号化、データ変換など)をメインスレッドで実行すると、レスポンスが著しく低下します。Worker Threadsを使えば、これらのタスクを並列化してスループットを大幅に改善できます。今回はClaude Codeを活用し、本番運用に耐えるThreadPoolクラスを設計する方法を解説します。
なぜWorker Threadsが必要か
Node.jsのchild_processやclusterと比べ、Worker Threadsは同一プロセス内でメモリを共有できるため、オーバーヘッドが小さく高速です。SharedArrayBufferやAtomicsを組み合わせることで、スレッド間の効率的なデータ共有も実現できます。
ThreadPoolクラスの設計
Claude Codeに以下のような要件を渡してスキャフォールドを生成しました。
- アイドルワーカーの管理(min/maxスレッド数)
- タスクタイムアウト
- キューのバックプレッシャー制御
- TypeScript完全対応
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';
import * as os from 'os';
interface TaskOptions {
timeout?: number;
}
interface PoolTask<T> {
workerScript: string;
data: unknown;
resolve: (value: T) => void;
reject: (reason: Error) => void;
options?: TaskOptions;
}
interface WorkerState {
worker: Worker;
idle: boolean;
}
export class ThreadPool extends EventEmitter {
private readonly minThreads: number;
private readonly maxThreads: number;
private readonly maxQueueSize: number;
private workers: WorkerState[] = [];
private queue: PoolTask<unknown>[] = [];
constructor(options?: { minThreads?: number; maxThreads?: number; maxQueueSize?: number }) {
super();
const cpus = os.cpus().length;
// デフォルト: CPU数-1 をmin/maxに設定
this.minThreads = options?.minThreads ?? Math.max(1, cpus - 1);
this.maxThreads = options?.maxThreads ?? Math.max(2, cpus - 1);
this.maxQueueSize = options?.maxQueueSize ?? 100;
this._initMinWorkers();
}
private _initMinWorkers(): void {
for (let i = 0; i < this.minThreads; i++) this._spawnWorker();
}
private _spawnWorker(): WorkerState | null {
if (this.workers.length >= this.maxThreads) return null;
const state: WorkerState = { worker: new Worker('', { eval: true }), idle: true };
this.workers.push(state);
return state;
}
run<T>(workerScript: string, data: unknown, options?: TaskOptions): Promise<T> {
return new Promise<T>((resolve, reject) => {
if (this.queue.length >= this.maxQueueSize) {
reject(new Error('ThreadPool queue is full (backpressure limit reached)'));
return;
}
const task: PoolTask<T> = { workerScript, data, resolve, reject, options };
this.queue.push(task as PoolTask<unknown>);
this._dispatch();
});
}
private _dispatch(): void {
if (this.queue.length === 0) return;
const idleState = this.workers.find(s => s.idle);
if (!idleState) {
const newState = this._spawnWorker();
if (!newState) return; // maxThreads到達
this._runTask(newState, this.queue.shift()!);
return;
}
idleState.idle = false;
this._runTask(idleState, this.queue.shift()!);
}
private _runTask(state: WorkerState, task: PoolTask<unknown>): void {
const timeout = task.options?.timeout ?? 30_000;
let timer: ReturnType<typeof setTimeout> | null = null;
const worker = new Worker(
`const { workerData, parentPort } = require('worker_threads');
const { script, data } = workerData;
(async () => {
try {
const mod = await import(script);
const result = await mod.default(data);
parentPort.postMessage({ success: true, result });
} catch(e) { parentPort.postMessage({ success: false, error: e.message }); }
})();`,
{ eval: true, workerData: { script: task.workerScript, data: task.data } }
);
if (timeout > 0) {
timer = setTimeout(() => {
worker.terminate();
task.reject(new Error(`Worker timed out after ${timeout}ms`));
state.idle = true;
this._dispatch();
}, timeout);
}
worker.once('message', (msg: { success: boolean; result?: unknown; error?: string }) => {
if (timer) clearTimeout(timer);
if (msg.success) (task.resolve as (v: unknown) => void)(msg.result);
else task.reject(new Error(msg.error ?? 'Worker error'));
state.idle = true;
worker.terminate();
this._dispatch();
});
worker.once('error', (err) => {
if (timer) clearTimeout(timer);
task.reject(err);
state.idle = true;
this._dispatch();
});
}
async shutdown(): Promise<void> {
await Promise.all(this.workers.map(s => s.worker.terminate()));
this.workers = [];
}
}
画像処理ワーカーの実装例
実際のユースケースとして、Sharpを使った画像リサイズワーカーを作成します。
// workers/image-processor.worker.ts
import sharp from 'sharp';
interface ImageTask {
inputPath: string;
outputPath: string;
width: number;
height: number;
quality?: number;
}
export default async function processImage(
task: ImageTask
): Promise<{ outputPath: string; size: number }> {
const info = await sharp(task.inputPath)
.resize(task.width, task.height, { fit: 'cover' })
.webp({ quality: task.quality ?? 80 })
.toFile(task.outputPath);
return { outputPath: task.outputPath, size: info.size };
}
// main.ts - ThreadPoolを使った並列処理
import { ThreadPool } from './thread-pool';
import * as path from 'path';
const pool = new ThreadPool({ minThreads: 2, maxThreads: 6, maxQueueSize: 50 });
async function processImages(imagePaths: string[]): Promise<void> {
const workerScript = path.resolve('./workers/image-processor.worker.ts');
const tasks = imagePaths.map(inputPath =>
pool.run(workerScript, {
inputPath,
outputPath: inputPath.replace(/\.(jpg|png)$/, '-thumb.webp'),
width: 800,
height: 600,
quality: 85,
}, { timeout: 10_000 })
);
const results = await Promise.allSettled(tasks);
results.forEach((r, i) => {
if (r.status === 'fulfilled') console.log(`OK: ${imagePaths[i]}`, r.value);
else console.error(`NG: ${imagePaths[i]}:`, r.reason.message);
});
await pool.shutdown();
}
processImages(['./img1.jpg', './img2.png', './img3.jpg']);
バックプレッシャーの監視
// キュー使用率を監視してアラートを出す
setInterval(() => {
const usage = pool.queue.length / 100; // maxQueueSize=100
if (usage > 0.8) {
console.warn(`ThreadPool queue at ${Math.round(usage * 100)}% capacity`);
}
}, 5_000);
Claude Codeとのワークフロー
Claude Codeを使うと、以下のような反復サイクルが高速化されます。
- 要件を自然言語で記述 → Claude Codeが型定義・クラス骨格を生成
- エッジケースの洗い出し → 「タイムアウト時のメモリリークは?」と質問するだけで対策コードを提案
-
ベンチマーク生成 →
artilleryやautocannonのテストシナリオも自動生成 - リファクタリング → 実装後に「より関数型スタイルに書き直して」と指示するだけ
まとめ
- Worker Threadsは同一プロセス内での真の並列処理を実現し、child_processより低オーバーヘッド
- ThreadPoolパターンでminThreads(CPU-1)〜maxThreadsの範囲でワーカーを動的管理
- バックプレッシャー(maxQueueSize)を設定することで、過負荷時のメモリ枯渇を防止
- タスクタイムアウトで暴走ワーカーを強制終了し、システム全体の安定性を確保
Claude Codeを使えば、これらの複雑な並行処理パターンを安全・高速に実装できます。
Code Review Pack ¥980
Claude Codeを使ったコードレビュー・リファクタリング用プロンプト集です。Worker Threadsのような並行処理パターンのレビューにも対応。
👉 noteで購入する
Top comments (0)