DEV Community

myougaTheAxo
myougaTheAxo

Posted on • Originally published at zenn.dev

Claude CodeでNode.js Worker Threadsを設計する:CPU集約タスクの並列化・スレッドプール

Node.jsはシングルスレッドのイベントループモデルを採用しているため、CPU集約型タスク(画像処理、暗号化、データ変換など)をメインスレッドで実行すると、レスポンスが著しく低下します。Worker Threadsを使えば、これらのタスクを並列化してスループットを大幅に改善できます。今回はClaude Codeを活用し、本番運用に耐えるThreadPoolクラスを設計する方法を解説します。

なぜWorker Threadsが必要か

Node.jsのchild_processclusterと比べ、Worker Threadsは同一プロセス内でメモリを共有できるため、オーバーヘッドが小さく高速です。SharedArrayBufferAtomicsを組み合わせることで、スレッド間の効率的なデータ共有も実現できます。

ThreadPoolクラスの設計

Claude Codeに以下のような要件を渡してスキャフォールドを生成しました。

  • アイドルワーカーの管理(min/maxスレッド数)
  • タスクタイムアウト
  • キューのバックプレッシャー制御
  • TypeScript完全対応
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';
import * as os from 'os';

interface TaskOptions {
  timeout?: number;
}

interface PoolTask<T> {
  workerScript: string;
  data: unknown;
  resolve: (value: T) => void;
  reject: (reason: Error) => void;
  options?: TaskOptions;
}

interface WorkerState {
  worker: Worker;
  idle: boolean;
}

export class ThreadPool extends EventEmitter {
  private readonly minThreads: number;
  private readonly maxThreads: number;
  private readonly maxQueueSize: number;
  private workers: WorkerState[] = [];
  private queue: PoolTask<unknown>[] = [];

  constructor(options?: { minThreads?: number; maxThreads?: number; maxQueueSize?: number }) {
    super();
    const cpus = os.cpus().length;
    // デフォルト: CPU数-1 をmin/maxに設定
    this.minThreads = options?.minThreads ?? Math.max(1, cpus - 1);
    this.maxThreads = options?.maxThreads ?? Math.max(2, cpus - 1);
    this.maxQueueSize = options?.maxQueueSize ?? 100;
    this._initMinWorkers();
  }

  private _initMinWorkers(): void {
    for (let i = 0; i < this.minThreads; i++) this._spawnWorker();
  }

  private _spawnWorker(): WorkerState | null {
    if (this.workers.length >= this.maxThreads) return null;
    const state: WorkerState = { worker: new Worker('', { eval: true }), idle: true };
    this.workers.push(state);
    return state;
  }

  run<T>(workerScript: string, data: unknown, options?: TaskOptions): Promise<T> {
    return new Promise<T>((resolve, reject) => {
      if (this.queue.length >= this.maxQueueSize) {
        reject(new Error('ThreadPool queue is full (backpressure limit reached)'));
        return;
      }
      const task: PoolTask<T> = { workerScript, data, resolve, reject, options };
      this.queue.push(task as PoolTask<unknown>);
      this._dispatch();
    });
  }

  private _dispatch(): void {
    if (this.queue.length === 0) return;
    const idleState = this.workers.find(s => s.idle);
    if (!idleState) {
      const newState = this._spawnWorker();
      if (!newState) return; // maxThreads到達
      this._runTask(newState, this.queue.shift()!);
      return;
    }
    idleState.idle = false;
    this._runTask(idleState, this.queue.shift()!);
  }

  private _runTask(state: WorkerState, task: PoolTask<unknown>): void {
    const timeout = task.options?.timeout ?? 30_000;
    let timer: ReturnType<typeof setTimeout> | null = null;

    const worker = new Worker(
      `const { workerData, parentPort } = require('worker_threads');
       const { script, data } = workerData;
       (async () => {
         try {
           const mod = await import(script);
           const result = await mod.default(data);
           parentPort.postMessage({ success: true, result });
         } catch(e) { parentPort.postMessage({ success: false, error: e.message }); }
       })();`,
      { eval: true, workerData: { script: task.workerScript, data: task.data } }
    );

    if (timeout > 0) {
      timer = setTimeout(() => {
        worker.terminate();
        task.reject(new Error(`Worker timed out after ${timeout}ms`));
        state.idle = true;
        this._dispatch();
      }, timeout);
    }

    worker.once('message', (msg: { success: boolean; result?: unknown; error?: string }) => {
      if (timer) clearTimeout(timer);
      if (msg.success) (task.resolve as (v: unknown) => void)(msg.result);
      else task.reject(new Error(msg.error ?? 'Worker error'));
      state.idle = true;
      worker.terminate();
      this._dispatch();
    });

    worker.once('error', (err) => {
      if (timer) clearTimeout(timer);
      task.reject(err);
      state.idle = true;
      this._dispatch();
    });
  }

  async shutdown(): Promise<void> {
    await Promise.all(this.workers.map(s => s.worker.terminate()));
    this.workers = [];
  }
}
Enter fullscreen mode Exit fullscreen mode

画像処理ワーカーの実装例

実際のユースケースとして、Sharpを使った画像リサイズワーカーを作成します。

// workers/image-processor.worker.ts
import sharp from 'sharp';

interface ImageTask {
  inputPath: string;
  outputPath: string;
  width: number;
  height: number;
  quality?: number;
}

export default async function processImage(
  task: ImageTask
): Promise<{ outputPath: string; size: number }> {
  const info = await sharp(task.inputPath)
    .resize(task.width, task.height, { fit: 'cover' })
    .webp({ quality: task.quality ?? 80 })
    .toFile(task.outputPath);
  return { outputPath: task.outputPath, size: info.size };
}
Enter fullscreen mode Exit fullscreen mode
// main.ts - ThreadPoolを使った並列処理
import { ThreadPool } from './thread-pool';
import * as path from 'path';

const pool = new ThreadPool({ minThreads: 2, maxThreads: 6, maxQueueSize: 50 });

async function processImages(imagePaths: string[]): Promise<void> {
  const workerScript = path.resolve('./workers/image-processor.worker.ts');
  const tasks = imagePaths.map(inputPath =>
    pool.run(workerScript, {
      inputPath,
      outputPath: inputPath.replace(/\.(jpg|png)$/, '-thumb.webp'),
      width: 800,
      height: 600,
      quality: 85,
    }, { timeout: 10_000 })
  );
  const results = await Promise.allSettled(tasks);
  results.forEach((r, i) => {
    if (r.status === 'fulfilled') console.log(`OK: ${imagePaths[i]}`, r.value);
    else console.error(`NG: ${imagePaths[i]}:`, r.reason.message);
  });
  await pool.shutdown();
}

processImages(['./img1.jpg', './img2.png', './img3.jpg']);
Enter fullscreen mode Exit fullscreen mode

バックプレッシャーの監視

// キュー使用率を監視してアラートを出す
setInterval(() => {
  const usage = pool.queue.length / 100; // maxQueueSize=100
  if (usage > 0.8) {
    console.warn(`ThreadPool queue at ${Math.round(usage * 100)}% capacity`);
  }
}, 5_000);
Enter fullscreen mode Exit fullscreen mode

Claude Codeとのワークフロー

Claude Codeを使うと、以下のような反復サイクルが高速化されます。

  1. 要件を自然言語で記述 → Claude Codeが型定義・クラス骨格を生成
  2. エッジケースの洗い出し → 「タイムアウト時のメモリリークは?」と質問するだけで対策コードを提案
  3. ベンチマーク生成artilleryautocannonのテストシナリオも自動生成
  4. リファクタリング → 実装後に「より関数型スタイルに書き直して」と指示するだけ

まとめ

  • Worker Threadsは同一プロセス内での真の並列処理を実現し、child_processより低オーバーヘッド
  • ThreadPoolパターンでminThreads(CPU-1)〜maxThreadsの範囲でワーカーを動的管理
  • バックプレッシャー(maxQueueSize)を設定することで、過負荷時のメモリ枯渇を防止
  • タスクタイムアウトで暴走ワーカーを強制終了し、システム全体の安定性を確保

Claude Codeを使えば、これらの複雑な並行処理パターンを安全・高速に実装できます。


Code Review Pack ¥980
Claude Codeを使ったコードレビュー・リファクタリング用プロンプト集です。Worker Threadsのような並行処理パターンのレビューにも対応。
👉 noteで購入する

Top comments (0)