DEV Community

Cover image for Multi-Agent AI Orchestration in TypeScript: AgentGraph, Supervisors, and @Delegate with HazelJS
Muhammad Arslan
Muhammad Arslan

Posted on

Multi-Agent AI Orchestration in TypeScript: AgentGraph, Supervisors, and @Delegate with HazelJS

tl;dr — We just shipped three production-ready multi-agent patterns to @hazeljs/agent — a DAG-based AgentGraph, an LLM-driven SupervisorAgent, and a @Delegate decorator that lets one agent call another as a tool — plus a full BullMQ + Redis async layer that turns any pipeline into a durable, horizontally-scalable, fire-and-forget job with live SSE progress streaming. All four are demonstrated in a complete content-generation pipeline that researches, writes, edits, and SEO-optimises articles end-to-end. Let's break down how each pattern works, when to use each one, and how to wire them all together in a real HazelJS application.


Why multi-agent systems — and why now?

Single-agent systems hit a ceiling fast. Ask one LLM to research a topic, write a 1,000-word article, edit it for tone, and generate SEO metadata — all in a single conversation — and you get mediocre results across the board. The model context fills up, quality degrades, and there is no way to specialise prompts or tools for each task.

Multi-agent architectures solve this by decomposing work the same way software teams do: specialists with narrow expertise, coordinated by a workflow.

The challenge has always been the coordination layer. LangGraph solves it in Python, but TypeScript developers have had to stitch together bespoke solutions or use libraries that feel foreign to the DI-first, decorator-driven style most Node.js backend teams already use.

That gap is what @hazeljs/agent's new orchestration primitives close. Everything is:

  • Decorator-native@Agent, @Tool, @Delegate work the same way as @Service, @Controller, and @Get
  • DI-aware — agents are registered in the container and can receive injected dependencies
  • TypeScript-first — the entire graph state, node config, and router functions are fully typed
  • Runtime-safe — built-in circuit breaking, rate limiting, retry, and per-execution state management

The scenario: an AI content pipeline

To make this concrete, we built a starter that produces publication-ready articles through four specialist agents:

ResearchAgent   searches the web, extracts key facts, gathers statistics
WriterAgent     creates outlines, writes full drafts, writes introductions
EditorAgent     improves clarity, reviews structure, adjusts tone per audience
SEOAgent        extracts keywords, generates title/meta, provides on-page audit
Enter fullscreen mode Exit fullscreen mode

Each agent has three @Tool-decorated methods that the agent's planning LLM can invoke during its execution loop. The tools make real LLM calls to simulate specialised research or writing tasks — in production you would replace the LLM simulation with external APIs like Exa, Tavily, or SEMrush.

The same four agents are driven through three completely different orchestration patterns, so you can compare them side-by-side on the same task.


Pattern 1: AgentGraph — the deterministic pipeline

Concept

An AgentGraph is a compiled directed acyclic graph (DAG) where each node is either:

  • an agent ({ type: 'agent', agentName: '...' }) — runs a registered agent via the runtime
  • a function ({ type: 'function', fn: ... }) — runs arbitrary async logic
  • a parallel node ({ type: 'parallel', branches: [...] }) — fans out to multiple agents concurrently

Edges connect nodes. They can be unconditional (addEdge), conditional (addConditionalEdge), or implicit through parallel branches.

You call .compile() once to validate the topology — it throws immediately if you have dangling edges, missing nodes, or no entry point. The compiled graph is a reusable CompiledGraph object you can .execute() as many times as you want.

Building the content pipeline

// content.service.ts
const graph = runtime
  .createGraph('content-pipeline')

  .addNode('research', {
    type: 'agent',
    agentName: 'ResearchAgent',
    inputMapper: (state) =>
      `Research this topic thoroughly for ${targetAudience}: ${state.input}`,
    outputMapper: (result, state) => ({
      output: result.response ?? '',
      data: { ...state.data, research: result.response ?? '' },
    }),
  })

  .addNode('write', {
    type: 'agent',
    agentName: 'WriterAgent',
    inputMapper: (state) =>
      `Write a ${tone} article for ${targetAudience} based on this research:\n\n${state.data.research}`,
    outputMapper: (result, state) => ({
      output: result.response ?? '',
      data: { ...state.data, draft: result.response ?? '' },
    }),
  })

  .addNode('edit', {
    type: 'agent',
    agentName: 'EditorAgent',
    inputMapper: (state) =>
      `Edit and improve this article for ${targetAudience}:\n\n${state.data.draft}`,
    outputMapper: (result, state) => ({
      output: result.response ?? '',
      data: { ...state.data, editedArticle: result.response ?? '' },
    }),
  })

  .addNode('seo', {
    type: 'agent',
    agentName: 'SEOAgent',
    inputMapper: (state) =>
      `Perform full SEO optimisation for "${topic}":\n\n${state.data.editedArticle}`,
    outputMapper: (result, state) => ({
      output: state.data.editedArticle as string, // article unchanged; SEO stored separately
      data: { ...state.data, seoAnalysis: result.response ?? '' },
    }),
  })

  .addEdge('research', 'write')
  .addEdge('write', 'edit')
  .addEdge('edit', 'seo')
  .addEdge('seo', END)
  .setEntryPoint('research')
  .compile();

const result = await graph.execute(topic, {
  initialData: { topic, targetAudience, tone },
});
Enter fullscreen mode Exit fullscreen mode

A few things worth noting:

inputMapper transforms the shared GraphState into the string that the agent receives as its task. Without a mapper, the agent gets state.input — the original user input. With a mapper, each node gets exactly the context it needs — the writer gets the research, the editor gets the draft, the SEO agent gets the edited article.

outputMapper merges the agent's result back into GraphState. Here we store intermediate outputs in state.data (a plain Record<string, unknown>) so downstream nodes can reference them by key. This is the equivalent of a shared scratchpad that persists across the entire graph run.

END is a sentinel string constant ('__end__') that terminates the graph. Any edge targeting END stops the execution loop.

The GraphState that flows through nodes

interface GraphState {
  input: string;                                // original user input, never mutated
  output?: string;                              // most recent node output
  messages: GraphMessage[];                     // full message history across nodes
  data: Record<string, unknown>;               // shared scratchpad
  nodeResults: Record<string, AgentExecutionResult>; // full result per node ID
}
Enter fullscreen mode Exit fullscreen mode

Every outputMapper receives the current state and returns a Partial<GraphState>. The graph engine deep-merges data and nodeResults, and replaces messages and output when provided.

Conditional routing

The graph also supports conditional edges — where the next node depends on the current state:

const graph = runtime
  .createGraph('smart-router')
  .addNode('classifier', {
    type: 'function',
    fn: async (state) => {
      // analyse the input and set a type in state.data
      const type = state.input.includes('edit') ? 'edit-only' : 'full-pipeline';
      return { data: { ...state.data, pipelineType: type } };
    },
  })
  .addNode('editor', { type: 'agent', agentName: 'EditorAgent' })
  .addNode('research', { type: 'agent', agentName: 'ResearchAgent' })
  .setEntryPoint('classifier')
  .addConditionalEdge('classifier', (state) =>
    state.data.pipelineType === 'edit-only' ? 'editor' : 'research'
  )
  .addEdge('editor', END)
  .addEdge('research', END)
  .compile();
Enter fullscreen mode Exit fullscreen mode

addConditionalEdge accepts a router function (state: GraphState) => string | END. The function runs after the source node completes and returns the ID of the next node. You can also pass an optional routeMap to map short return values to node IDs:

.addConditionalEdge(
  'classifier',
  (state) => state.data.pipelineType as string,   // returns 'edit' or 'write'
  { edit: 'editor', write: 'research' }            // map to full node IDs
)
Enter fullscreen mode Exit fullscreen mode

Parallel fan-out / fan-in

For tasks where two agents can run independently, use a parallel node:

.addNode('parallel-research', {
  type: 'parallel',
  branches: ['primary-researcher', 'stats-researcher'],
  // optional: custom merge strategy
  mergeStrategy: (results, base) => ({
    output: results.map(r => r.agentResult?.response).join('\n\n---\n\n'),
    data: { ...base.data, allResearch: results },
  }),
})
Enter fullscreen mode Exit fullscreen mode

The graph engine fires all branches with Promise.all, waits for every one to complete (failed branches are captured with their error, not thrown), then merges the results using the default or custom mergeStrategy. The default merge concatenates all outputs separated by ---.

Streaming

For long-running pipelines, you can stream progress to the client instead of waiting for the full result:

for await (const chunk of graph.stream(topic)) {
  if (chunk.done) {
    console.log('Pipeline complete:', chunk.nodeOutput);
  } else {
    console.log(`[${chunk.nodeId}] ${chunk.chunk}`);
  }
}
Enter fullscreen mode Exit fullscreen mode

Each GraphStreamChunk is emitted as soon as a node finishes. Your HTTP layer can forward these via SSE or WebSocket, giving the user live progress updates.


Pattern 2: SupervisorAgent — emergent LLM routing

Concept

A SupervisorAgent is an LLM that acts as a router. At each "round" it receives:

  1. The original task
  2. A summary of all work completed so far (which worker ran, what it returned)

It then responds with a structured JSON decision:

{ "action": "delegate", "worker": "WriterAgent", "subtask": "Write a draft based on: ...", "thought": "Research is complete." }
Enter fullscreen mode Exit fullscreen mode

or, when the task is done:

{ "action": "finish", "response": "Here is the final article: ..." }
Enter fullscreen mode Exit fullscreen mode

The supervisor loop runs until action === 'finish' or maxRounds is reached.

This is fundamentally different from a graph: you don't define the workflow in code. The LLM decides. It can skip agents, call the same agent twice, backtrack, or short-circuit to a direct answer if it determines no worker is needed.

Creating the supervisor

// content.service.ts
const supervisor = runtime.createSupervisor({
  name: 'ContentDirector',
  workers: ['ResearchAgent', 'WriterAgent', 'EditorAgent', 'SEOAgent'],
  systemPrompt: `You are a content director managing a team of AI specialists.
Your goal is to produce high-quality, SEO-optimised articles.

Workflow:
1. Always start with ResearchAgent unless the task is editing/SEO only.
2. Pass research findings to WriterAgent.
3. Pass drafts to EditorAgent.
4. Run SEOAgent on the final edited article.
5. When all steps are complete, return the full article + SEO metadata.`,
  maxRounds: 8,
  temperature: 0,   // deterministic routing decisions
});

const result = await supervisor.run(task);
console.log(result.response);   // final article
console.log(result.rounds);     // how many delegation rounds it took
Enter fullscreen mode Exit fullscreen mode

The routing prompt

Under the hood, createSupervisor builds a system prompt that includes the worker list (with descriptions pulled from each agent's @Agent metadata) and injects it into every routing call. The user message at each round looks like this:

Original task: Write a blog post about TypeScript 5.5

Work completed so far:
Round 1 — Worker: ResearchAgent
Subtask: Research TypeScript 5.5 features for developers
Result: Key findings: 1. Inferred type predicates... 2. New Set methods...

Decide the next action. Respond with ONLY a JSON object...
Enter fullscreen mode Exit fullscreen mode

This gives the supervisor full context of what has been done, so it can make informed routing decisions even when the task evolves.

When the supervisor shines

The supervisor is most valuable for open-ended tasks where the ideal workflow is not known upfront:

  • "Edit only the second half of this article, then optimise for SEO" — the supervisor skips ResearchAgent and WriterAgent entirely
  • "Write three variations of the introduction and pick the best one" — the supervisor calls WriterAgent three times, then EditorAgent to compare
  • "Is this article SEO-ready, or does it need a full rewrite?" — the supervisor might call SEOAgent first, then WriterAgent or conclude it's fine

These workflows would require separate graph definitions. With the supervisor, the LLM figures them out from context.

Worker descriptions matter

The supervisor uses each worker's description field (from the @Agent decorator) to decide which one to call. Clear, specific descriptions dramatically improve routing quality:

@Agent({
  name: 'SEOAgent',
  description:
    'Analyses articles for SEO performance and returns optimised keywords, ' +
    'meta descriptions, title tags, and on-page recommendations.',
  // ...
})
Enter fullscreen mode Exit fullscreen mode

Vague descriptions like "An AI agent" will confuse the supervisor. Treat the description as a job posting — it tells the LLM exactly when to hire this specialist.


Pattern 3: @Delegate — agent-as-a-tool

Concept

@Delegate is the most TypeScript-native of the three patterns. You decorate methods on an agent class to mark them as delegation points to other agents. The method body is a stub — it gets replaced at runtime.registerAgentInstance() time with a real runtime.execute(targetAgent, input) call.

From the LLM's perspective, @Delegate methods appear as ordinary @Tool entries. The LLM has no idea it is calling another autonomous agent — it just sees a tool with a name, description, and parameters. The runtime handles the agent-to-agent dispatch transparently.

The OrchestratorAgent

// agents/orchestrator.agent.ts
@Agent({
  name: 'OrchestratorAgent',
  description: 'Manages the full content creation workflow.',
  systemPrompt: `You are a content production manager with access to a team of specialists.
Your team:
  - ResearchAgent: Gathers facts, statistics, and web information
  - WriterAgent: Writes article drafts from research briefs
  - EditorAgent: Reviews and improves written drafts
  - SEOAgent: Optimises content for search engines

Always complete all four steps in order: Research → Write → Edit → SEO.`,
  maxSteps: 10,
  temperature: 0.3,
})
export class OrchestratorAgent {

  @Delegate({
    agent: 'ResearchAgent',
    description: 'Research a topic in depth. Returns a research brief with facts and statistics.',
    inputField: 'query',
  })
  async research(_query: string): Promise<string> {
    return ''; // stub — replaced by runtime.registerAgentInstance()
  }

  @Delegate({
    agent: 'WriterAgent',
    description: 'Write an article draft based on a research brief.',
    inputField: 'brief',
  })
  async write(_brief: string): Promise<string> {
    return ''; // stub
  }

  @Delegate({
    agent: 'EditorAgent',
    description: 'Edit and improve an article draft.',
    inputField: 'draft',
  })
  async edit(_draft: string): Promise<string> {
    return ''; // stub
  }

  @Delegate({
    agent: 'SEOAgent',
    description: 'Optimise an article for search engines.',
    inputField: 'article',
  })
  async optimizeSEO(_article: string): Promise<string> {
    return ''; // stub
  }
}
Enter fullscreen mode Exit fullscreen mode

How the patching works

When runtime.registerAgentInstance('OrchestratorAgent', instance) is called, the runtime:

  1. Scans the instance prototype for @Delegate metadata (stored by the decorator via Reflect.defineMetadata)
  2. For each decorated method, reads { agent: targetName, inputField: 'query' }
  3. Replaces instance[methodName] with:
   async (args: Record<string, unknown> | string) => {
     const input = typeof args === 'string' ? args : args[inputField];
     const result = await runtime.execute(targetName, input as string);
     return result.response ?? '';
   };
Enter fullscreen mode Exit fullscreen mode
  1. The @Tool registration (also applied by @Delegate) picks up the patched method, so the LLM sees it as a callable tool

The original stub body is thrown away. The decorator composition is:

@Delegate  →  stores delegate metadata  +  calls @Tool internally
@Tool      →  registers the method in TOOLS_LIST_KEY on the class
Enter fullscreen mode Exit fullscreen mode

registerAgentInstance() reads both sets of metadata and performs the patch before ToolRegistry.registerAgentTools() reads the final (patched) method references.

What the LLM actually sees

When the OrchestratorAgent's planning loop runs, it gets a tool list like:

[
  {
    "type": "function",
    "function": {
      "name": "ResearchAgent",
      "description": "Research a topic in depth. Returns a research brief with facts and statistics.",
      "parameters": {
        "type": "object",
        "properties": { "query": { "type": "string", "description": "Input for ResearchAgent" } },
        "required": ["query"]
      }
    }
  },
  { ... WriterAgent ... },
  { ... EditorAgent ... },
  { ... SEOAgent ... }
]
Enter fullscreen mode Exit fullscreen mode

The LLM calls ResearchAgent({ "query": "multi-agent AI" }) → the runtime intercepts → fires runtime.execute('ResearchAgent', 'multi-agent AI') → returns the full research brief as a string → the LLM adds it to its context and calls WriterAgent next.

@Delegate vs @Tool — when to use each

Use @Tool when the logic lives inside the agent class itself:

@Tool({ description: 'Calculate word count' })
async countWords({ text }: { text: string }): Promise<number> {
  return text.split(/\s+/).length;
}
Enter fullscreen mode Exit fullscreen mode

Use @Delegate when the logic is owned by another registered agent:

@Delegate({ agent: 'TranslatorAgent', description: 'Translate text to another language' })
async translate(_text: string): Promise<string> {
  return ''; // TranslatorAgent handles it
}
Enter fullscreen mode Exit fullscreen mode

The practical difference: @Tool scales with the agent's own complexity; @Delegate scales with the target agent's full agentic loop including its own tools, memory, and RAG context.


Wiring it all together: the agents module

All three patterns are driven by the same AgentRuntime instance. The AgentRuntimeService sets it up once and exports it for injection:

// agents/agents.module.ts
@Service()
export class AgentRuntimeService {
  private readonly runtime: AgentRuntime;

  constructor() {
    const llmProvider = new OpenAILLMProvider({
      apiKey: process.env.OPENAI_API_KEY,
      model: process.env.AGENT_MODEL ?? 'gpt-4o-mini',
    });

    this.runtime = new AgentRuntime({
      llmProvider,
      defaultMaxSteps: 10,
      defaultTimeout: 300_000,
      enableMetrics: true,
      enableRetry: true,
      enableCircuitBreaker: true,
      rateLimitPerMinute: 30,
    });

    this.registerAgents();
  }

  private registerAgents(): void {
    // Step 1: register class metadata
    this.runtime.registerAgent(ResearchAgent);
    this.runtime.registerAgent(WriterAgent);
    this.runtime.registerAgent(EditorAgent);
    this.runtime.registerAgent(SEOAgent);
    this.runtime.registerAgent(OrchestratorAgent);

    // Step 2: register instances with their dependencies
    this.runtime.registerAgentInstance('ResearchAgent',    new ResearchAgent());
    this.runtime.registerAgentInstance('WriterAgent',      new WriterAgent());
    this.runtime.registerAgentInstance('EditorAgent',      new EditorAgent());
    this.runtime.registerAgentInstance('SEOAgent',         new SEOAgent());

    // Step 3: OrchestratorAgent — @Delegate methods are patched automatically here
    this.runtime.registerAgentInstance('OrchestratorAgent', new OrchestratorAgent());
  }

  getRuntime(): AgentRuntime { return this.runtime; }
}

@HazelModule({
  providers: [AgentRuntimeService],
  exports: [AgentRuntimeService],
})
export class AgentsModule {}
Enter fullscreen mode Exit fullscreen mode

The ContentService injects AgentRuntimeService and calls getRuntime() to create graphs and supervisors on demand:

@Service()
export class ContentService {
  constructor(private readonly agentRuntime: AgentRuntimeService) {}

  async runGraphPipeline(dto: GenerateContentDto) {
    const runtime = this.agentRuntime.getRuntime();
    const graph = runtime.createGraph('content-pipeline')
      // ... nodes and edges ...
      .compile();
    return graph.execute(dto.topic);
  }

  async runSupervisor(task: string) {
    const supervisor = this.agentRuntime.getRuntime().createSupervisor({ ... });
    return supervisor.run(task);
  }

  async runOrchestrator(task: string) {
    return this.agentRuntime.getRuntime().execute('OrchestratorAgent', task);
  }
}
Enter fullscreen mode Exit fullscreen mode

The LLM provider bridge

@hazeljs/agent is provider-agnostic. The AgentRuntime accepts any object that implements the LLMProvider interface:

interface LLMProvider {
  chat(request: LLMChatRequest): Promise<LLMChatResponse>;
  isAvailable?(): Promise<boolean>;
  getSupportedModels?(): string[];
}
Enter fullscreen mode Exit fullscreen mode

LLMChatRequest mirrors the OpenAI chat completions format — it has messages, tools, temperature, and maxTokens. LLMChatResponse has content, tool_calls, usage, and finishReason.

The starter includes a thin OpenAILLMProvider adapter that maps between the two:

// llm/openai-llm.provider.ts
export class OpenAILLMProvider implements LLMProvider {
  private readonly client: OpenAI;
  private readonly model: string;

  constructor(options: OpenAILLMProviderOptions = {}) {
    this.client = new OpenAI({
      apiKey: options.apiKey ?? process.env.OPENAI_API_KEY,
      baseURL: options.baseURL,   // set this for Ollama or any compatible endpoint
    });
    this.model = options.model ?? 'gpt-4o-mini';
  }

  async chat(request: LLMChatRequest): Promise<LLMChatResponse> {
    const response = await this.client.chat.completions.create({
      model: request.model ?? this.model,
      messages: request.messages as OpenAI.Chat.ChatCompletionMessageParam[],
      tools: request.tools as OpenAI.Chat.ChatCompletionTool[] | undefined,
      temperature: request.temperature ?? 0.2,
      max_tokens: request.maxTokens,
    });

    const choice = response.choices[0];
    return {
      content: choice.message.content ?? '',
      tool_calls: choice.message.tool_calls?.map(tc => ({
        id: tc.id,
        type: 'function',
        function: { name: tc.function.name, arguments: tc.function.arguments },
      })),
      usage: response.usage && {
        promptTokens: response.usage.prompt_tokens,
        completionTokens: response.usage.completion_tokens,
        totalTokens: response.usage.total_tokens,
      },
    };
  }
}
Enter fullscreen mode Exit fullscreen mode

Switching to Ollama, Anthropic, or Gemini requires only changing the baseURL / apiKey pair or implementing a new adapter — zero changes to agents, graphs, or supervisors.


Production features baked in

Every AgentRuntime comes with four production concerns handled out of the box.

Circuit breaking

After five consecutive agent execution failures, the circuit opens and all new requests fail fast for 30 seconds. This prevents cascading failures when an upstream LLM provider has an outage:

CLOSED → (5 failures) → OPEN → (30s timeout) → HALF_OPEN → (2 successes) → CLOSED
Enter fullscreen mode Exit fullscreen mode
console.log(runtime.getCircuitBreakerStatus());
// { enabled: true, state: 'CLOSED', failureCount: 0, successCount: 0 }
Enter fullscreen mode Exit fullscreen mode

Rate limiting

The token bucket algorithm enforces a maximum number of concurrent agent executions per minute. When the bucket is empty, new requests block until a token is available (up to a configurable timeout):

new AgentRuntime({
  rateLimitPerMinute: 30,  // max 30 agent executions per minute
})
Enter fullscreen mode Exit fullscreen mode

Retry with exponential backoff

Transient LLM API errors (rate limits, timeouts, 5xx responses) are automatically retried up to 3 times with exponential backoff:

Attempt 1 → fail → wait 1s
Attempt 2 → fail → wait 2s
Attempt 3 → fail → wait 4s
Throw error
Enter fullscreen mode Exit fullscreen mode

Metrics

Every execution is tracked in memory with P50/P95/P99 latency percentiles, tool call counts, token usage, and success rates:

GET /health/agents
# {
#   "metrics": {
#     "totalExecutions": 42,
#     "successRate": "97.6%",
#     "averageLatency": "4210ms",
#     "p95Latency": "18900ms"
#   }
# }
Enter fullscreen mode Exit fullscreen mode

Scaling with BullMQ + Redis: async pipelines that survive restarts

The three patterns above are all synchronous — the HTTP connection stays open while every agent runs, which can take 30–120 seconds for a full Research → Write → Edit → SEO chain. That is fine for internal tools or demos, but in production you want:

  • The client to get a response immediately while the pipeline runs in the background
  • Durability — if the server restarts mid-pipeline, the job resumes from where it stopped
  • Horizontal scaling — add more Node.js processes and each one picks up queued jobs independently
  • Live progress — the client knows which stage just finished without polling

This is exactly what the async layer in the starter adds, using BullMQ for job queues and Redis Pub/Sub for real-time events.

The architecture

POST /pipeline/async
       │
       ▼ enqueue
  ┌─────────────┐     ┌──────────────┐     ┌─────────────┐     ┌──────────────┐
  │agent:research│────►│  agent:write │────►│  agent:edit │────►│  agent:seo   │
  │  concur: 3  │     │  concur: 5   │     │  concur: 3  │     │  concur: 3   │
  └──────┬──────┘     └──────┬───────┘     └──────┬──────┘     └──────┬───────┘
         │ PUBLISH           │ PUBLISH             │ PUBLISH           │ SET result
         │                   │                     │                   │ PUBLISH final
         ▼                   ▼                     ▼                   ▼
              Redis Pub/Sub channel: pipeline:{executionId}
                                   │
                       GET /stream/:executionId
                        (SSE — browser EventSource)
Enter fullscreen mode Exit fullscreen mode

Each queue is a separate BullMQ queue. Workers process jobs independently and in parallel. A single process can handle up to 14 concurrent agent executions across all four queues (3 + 5 + 3 + 3), each of which is itself a non-blocking async call to the LLM provider.

Enqueueing a job

The client fires a single POST and gets back tracking URLs immediately — before a single LLM token is generated:

curl -X POST http://localhost:3000/api/content/pipeline/async \
  -H "Content-Type: application/json" \
  -d '{"topic": "Why TypeScript is the future of backend development"}'

# Response (< 5ms):
{
  "executionId": "graph_550e8400-e29b-41d4-a716-446655440000",
  "statusUrl": "/api/content/status/graph_550e8400...",
  "streamUrl": "/api/content/stream/graph_550e8400..."
}
Enter fullscreen mode Exit fullscreen mode

Inside ContentService, the implementation is two lines:

async runGraphPipelineAsync(dto: GenerateContentDto): Promise<AsyncJobResponse> {
  const executionId = `graph_${randomUUID()}`;
  return this.graphProducer.enqueue(dto, executionId);
}
Enter fullscreen mode Exit fullscreen mode

GraphProducerService.enqueue() adds the first job to agent:research and sets a pipeline:status:{executionId} key in Redis to "pending". The HTTP response is sent immediately.

The worker chain

Each worker follows the same pattern: receive the accumulated PipelineJobData, run the agent, merge the result into state, publish a progress event, and enqueue the next job. Here is the research worker in full:

// src/queue/workers/research.worker.ts
private async process(job: Job<PipelineJobData>): Promise<void> {
  const { executionId, state } = job.data;

  await this.graphProducer.setStatus(executionId, 'running');
  await this.eventsService.publish(executionId, {
    executionId, nodeId: 'research', status: 'started',
    timestamp: new Date().toISOString(),
  });

  const input = `Research this topic thoroughly for a ${state.targetAudience} audience: ${state.topic}`;
  const result = await runtime.execute('ResearchAgent', input);

  await this.eventsService.publish(executionId, {
    executionId, nodeId: 'research', status: 'completed',
    timestamp: new Date().toISOString(),
  });

  // Forward enriched state to the next queue
  await this.writeQueue.add(`pipeline:${executionId}:write`, {
    ...job.data,
    state: { ...state, research: result.response ?? '' },
  }, { attempts: 3, backoff: { type: 'exponential', delay: 1000 } });
}
Enter fullscreen mode Exit fullscreen mode

The state object (PipelineJobData.state) is the BullMQ equivalent of GraphState.data. Each worker reads the fields it needs and writes its own output before forwarding:

Queue Reads from state Writes to state
agent:research topic, targetAudience research
agent:write research, tone draft
agent:edit draft, tone editedDraft
agent:seo editedDraft, topic stores final result in Redis

The SEO worker is the terminal node. Instead of enqueueing to a next queue, it assembles the full PipelineResult object, persists it to pipeline:result:{executionId} with a 24-hour TTL, marks the status as "completed", and publishes a final event with final: true.

Streaming live progress with SSE

GET /api/content/stream/:executionId opens a Server-Sent Events connection. The controller subscribes to the Redis Pub/Sub channel for that execution and forwards every event as an SSE data frame:

// src/events/sse.controller.ts
@Get('/stream/:executionId')
streamExecution(
  @Param('executionId') executionId: string,
  @Req() req: IncomingMessage,
  @Res() hazelRes: any,
): void {
  const rawRes: ServerResponse = hazelRes?.res?.res ?? hazelRes;

  rawRes.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
  });
  rawRes.write(`: connected to pipeline ${executionId}\n\n`);

  const unsubscribe = this.eventsService.subscribe(executionId, (event) => {
    rawRes.write(`data: ${JSON.stringify(event)}\n\n`);
    if (event.final) cleanup();
  });

  const cleanup = () => { unsubscribe(); rawRes.end(); };
  req.on('close', cleanup);
}
Enter fullscreen mode Exit fullscreen mode

From a browser:

const src = new EventSource('/api/content/stream/graph_abc123');

src.onmessage = (e) => {
  const event = JSON.parse(e.data);
  // { executionId, nodeId: 'research', status: 'completed', timestamp: '...' }
  console.log(`[${event.nodeId}] ${event.status}`);

  if (event.final) {
    src.close();
    // fetch /api/content/status/:id for the full result
  }
};
Enter fullscreen mode Exit fullscreen mode

A keep-alive comment is written every 15 seconds (": heartbeat\n\n") to prevent proxy and load-balancer idle timeouts.

Polling for the result

Once the SSE stream emits final: true, the client fetches the assembled result:

curl http://localhost:3000/api/content/status/graph_abc123

# { "status": "completed", "result": { "article": "...", "seo": {...}, "steps": [...] } }
# or, if still running:
# { "status": "running" }
# or, on failure:
# { "status": "failed", "error": "ResearchAgent timed out after 300000ms" }
Enter fullscreen mode Exit fullscreen mode

Retry and dead-letter

Every queue.add() call includes:

{ attempts: 3, backoff: { type: 'exponential', delay: 1000 } }
// 1st retry after 1s, 2nd after 2s, 3rd after 4s
Enter fullscreen mode Exit fullscreen mode

If all three attempts fail, BullMQ moves the job to the queue's built-in failed set — a permanent record you can inspect, replay, or alert on. The pipeline status is set to "failed" and the error message is stored in Redis for the polling endpoint.

Concurrency tuning

The concurrency of each worker is driven by env vars, so you can tune without a code change:

# .env
RESEARCH_CONCURRENCY=3   # 3 concurrent ResearchAgent calls (3 LLM calls/job)
WRITE_CONCURRENCY=5      # 5 concurrent WriterAgent calls (longest context, most tokens)
EDIT_CONCURRENCY=3
SEO_CONCURRENCY=3
Enter fullscreen mode Exit fullscreen mode

The write stage gets the highest concurrency because it is typically the token bottleneck — the WriterAgent produces the most output and waits the longest for the LLM. Tune based on your OpenAI rate limits and observed P95 latency.

Horizontal scaling

Because job state is entirely in Redis, you can run any number of Node.js processes and they all share the same queues:

# Run 4 worker processes on one machine via PM2
pm2 start dist/main.js -i 4

# Or deploy separate Docker containers — all pointing at the same Redis
docker run -e REDIS_URL=redis://redis:6379 hazeljs-starter:latest
docker run -e REDIS_URL=redis://redis:6379 hazeljs-starter:latest
docker run -e REDIS_URL=redis://redis:6379 hazeljs-starter:latest
Enter fullscreen mode Exit fullscreen mode

BullMQ uses distributed locking to ensure a job is processed by exactly one worker. Each process that starts up will create its own Worker instances and compete for jobs from the shared queues.

The module wiring

All queue infrastructure lives in QueueModule, keeping the app module clean:

// src/queue/queue.module.ts
@Service()
export class WorkerOrchestratorService {
  constructor(
    private readonly agentRuntime: AgentRuntimeService,
    private readonly eventsService: EventsService,
    private readonly graphProducer: GraphProducerService,
    private readonly researchWorker: ResearchWorkerService,
    // ... write, edit, seo workers
  ) {}

  async onModuleInit(): Promise<void> {
    // init() passes dependencies through — workers don't use DI constructors
    // because BullMQ Workers need to be created after all deps resolve
    this.researchWorker.init(this.agentRuntime, this.eventsService, this.graphProducer);
    this.writeWorker.init(this.agentRuntime, this.eventsService, this.graphProducer);
    this.editWorker.init(this.agentRuntime, this.eventsService, this.graphProducer);
    this.seoWorker.init(this.agentRuntime, this.eventsService, this.graphProducer);
  }
}
Enter fullscreen mode Exit fullscreen mode

onModuleInit is the correct lifecycle hook to start workers — by the time it fires, all @Service providers in the container are resolved, so AgentRuntimeService (which builds the AgentRuntime and registers all agents) is guaranteed to be ready.

Synchronous vs async — when to use each

Synchronous (POST /pipeline) Async (POST /pipeline/async)
Response time 30–120 s (blocks HTTP) < 5 ms (immediate)
Durability Lost on server restart Persisted in Redis
Scaling Single process Any number of processes
Progress None / polling SSE real-time stream
Use when Dev / demos / short pipelines Production / long-running jobs

The synchronous endpoints are not removed — they are ideal for development, integration tests, and pipelines that complete quickly. The async layer is additive, not a replacement.


Choosing the right pattern

The three patterns exist on a spectrum from maximum predictability to maximum flexibility:

AgentGraph ─────────────────────────────────────────── @Delegate ─── SupervisorAgent
 (you control                                          (LLM controls    (LLM controls
  the workflow)                                         one agent)      everything)
Enter fullscreen mode Exit fullscreen mode

In practice:

Use AgentGraph when:

  • The workflow is fixed — every request goes through the same steps
  • You need reproducible results and predictable cost
  • You want streaming progress updates during a long pipeline
  • You're building a CI-style workflow (lint → test → deploy)

Use SupervisorAgent when:

  • The task is open-ended and the best workflow depends on the input
  • You want the AI to adapt (skip steps, retry, handle edge cases)
  • You're building a general-purpose assistant or chatbot back-end
  • Routing cost (one extra LLM call per round) is acceptable

Use @Delegate when:

  • One agent logically "owns" the end-to-end workflow
  • You want a single system prompt to control sequencing
  • The workflow is semi-structured but the orchestrator needs to reason about it
  • You want agent hierarchies — orchestrators calling orchestrators

All three can be mixed. A real production system might use an AgentGraph for the "happy path" article pipeline, a SupervisorAgent for the /ask endpoint, and an OrchestratorAgent as a fallback when the graph's classifier cannot determine the route.


The complete API surface

Once the starter is running, these are all nine content endpoints:

── Synchronous (in-process) ──────────────────────────────────────────────────

POST /api/content/pipeline
  Body: { topic, targetAudience?, tone? }
  → AgentGraph — Research → Write → Edit → SEO, always in order
  → Returns full article + step-by-step breakdown + SEO metadata

POST /api/content/supervisor
  Body: { task }
  → SupervisorAgent — LLM decides which agents to call
  → Returns final response + round count + duration

POST /api/content/orchestrator
  Body: { task }
  → OrchestratorAgent — delegates via @Delegate methods
  → Returns response + duration

POST /api/content/generate
  Body: { topic, mode: 'graph'|'supervisor'|'orchestrator', targetAudience?, tone? }
  → Unified endpoint — picks the right pattern based on mode

POST /api/content/ask
  Body: { task }
  → Supervisor for any free-form content task

GET  /api/content/agents
  → Lists all five registered agents by name

── Async (BullMQ + Redis) ────────────────────────────────────────────────────

POST /api/content/pipeline/async
  Body: { topic, targetAudience?, tone? }
  → Enqueues a pipeline job, returns immediately
  → Response: { executionId, statusUrl, streamUrl }

GET  /api/content/stream/:executionId
  → Server-Sent Events stream of pipeline progress
  → Events: { nodeId, status: 'started'|'completed'|'failed', final? }
  → Stream closes when final: true

GET  /api/content/status/:executionId
  → Polls job status: pending | running | completed | failed
  → Response includes full result object once completed
Enter fullscreen mode Exit fullscreen mode

What comes next

This release ships the full orchestration + async stack. Here is what is coming next in @hazeljs/agent:

Agent checkpointing and replay — save the full AgentContext at any step and replay from that point for debugging or resuming failed executions. This will integrate with the BullMQ layer so long-running jobs survive server restarts without re-running completed nodes.

Bull Board UI — a one-import dashboard that shows all queue depths, in-progress jobs, failed jobs, and retry history. Invaluable for debugging production async pipelines.

Sub-agent spawning — let @Tool methods dynamically spin up new agent instances at runtime with runtime.spawn(), enabling truly recursive multi-agent hierarchies.

OpenTelemetry integration — distributed traces across all agent executions, tool calls, and LLM requests, exportable to Langfuse, Jaeger, or any OTEL backend.

Typed tool schemas with Zod — replace the current ToolParameter[] config with Zod schemas for auto-validated, type-safe tool inputs.

GraphRAG node type — a built-in { type: 'rag' } node that queries a @hazeljs/rag knowledge base and injects context into the graph state before the next agent runs.

Kafka transport — for teams that need durability guarantees, cross-service event sourcing, and consumer groups with independent replay, a Kafka transport plugin that replaces BullMQ with zero changes to agent or worker code.


Getting started

1. Clone and install

git clone https://github.com/hazeljs/hazeljs
cd hazeljs-ai-multiagent-starter
cp .env.example .env      # set OPENAI_API_KEY
npm install
Enter fullscreen mode Exit fullscreen mode

2. Start Redis (for the async pipeline)

# Docker (fastest)
docker run -d -p 6379:6379 --name hazel-redis redis:7-alpine

# or Homebrew on macOS
brew services start redis
Enter fullscreen mode Exit fullscreen mode

If you only want to use the synchronous endpoints, Redis is optional — the app starts fine without it and falls back gracefully (the async endpoints will fail with a connection error, but all other routes work normally).

3. Run the server

npm run dev
Enter fullscreen mode Exit fullscreen mode

4. Try the synchronous pipeline

curl -X POST http://localhost:3000/api/content/pipeline \
  -H "Content-Type: application/json" \
  -d '{
    "topic": "Why TypeScript is the future of backend development",
    "targetAudience": "engineering managers",
    "tone": "authoritative"
  }'
Enter fullscreen mode Exit fullscreen mode

5. Try the async pipeline with SSE

In one terminal, open the SSE stream before the job starts:

# Watch live progress events
curl -N http://localhost:3000/api/content/stream/graph_REPLACE_WITH_ID
Enter fullscreen mode Exit fullscreen mode

In another terminal, fire the async job:

curl -X POST http://localhost:3000/api/content/pipeline/async \
  -H "Content-Type: application/json" \
  -d '{"topic": "Why TypeScript is the future of backend development"}'

# → { "executionId": "graph_abc123", "statusUrl": "...", "streamUrl": "..." }
Enter fullscreen mode Exit fullscreen mode

Paste the executionId into the SSE curl command. You'll see events stream in real time:

data: {"executionId":"graph_abc123","nodeId":"research","status":"started","timestamp":"..."}
data: {"executionId":"graph_abc123","nodeId":"research","status":"completed","timestamp":"..."}
data: {"executionId":"graph_abc123","nodeId":"write","status":"started","timestamp":"..."}
...
data: {"executionId":"graph_abc123","nodeId":"seo","status":"completed","final":true,"timestamp":"..."}
Enter fullscreen mode Exit fullscreen mode

Then fetch the full result:

curl http://localhost:3000/api/content/status/graph_abc123
# { "status": "completed", "result": { "article": "...", "seo": {...}, "steps": [...] } }
Enter fullscreen mode Exit fullscreen mode

Source layout

hazeljs-ai-multiagent-starter/src/
├── agents/          # @Agent classes + AgentRuntimeService
├── content/         # ContentController + ContentService + DTOs
├── events/          # EventsService (Pub/Sub) + SSEController
├── health/          # HealthController
├── llm/             # OpenAILLMProvider
├── queue/
│   ├── workers/     # research, write, edit, seo workers
│   ├── graph.producer.ts
│   ├── queue.config.ts
│   ├── queue.module.ts
│   ├── queue.names.ts
│   └── queue.types.ts
└── main.ts
Enter fullscreen mode Exit fullscreen mode

The @hazeljs/agent package source (AgentGraph, SupervisorAgent, @Delegate) is in hazeljs/packages/agent/src/.

If you build something with it, share it in the HazelJS community — we'd love to see what multi-agent workflows people come up with.


Published by the HazelJS team · March 2026

Top comments (0)