DEV Community

Cover image for I Built an LLM Orchestration API That Actually Handles Human-in-Loop — Here's How
Tarun N
Tarun N

Posted on

I Built an LLM Orchestration API That Actually Handles Human-in-Loop — Here's How

I Built an LLM Orchestration API That Actually Handles Human-in-Loop — Here's How

The part that took the longest to get right, and the two architectural decisions I'm still unsure about.


Every AI project I built started the same way.

I'd spend the first two weeks writing infrastructure that had nothing to do with the actual product — state machines, retry logic, DAG execution, human approval flows, persistence layers. The same plumbing, project after project.

So I built Orchflow and open-sourced it. This is the technical story of how it works, what was hard, and what I'm still unsure about.


What Orchflow does

You POST a goal. It handles the rest.

curl -X POST https://api.orchflow.cloud/api/v1/runs \
  -H "X-API-Key: orch_your_key" \
  -H "Content-Type: application/json" \
  -d '{
    "task": "Write a technical blog post about async Python",
    "provider_id": "prv_abc123",
    "agents": [
      {
        "name": "research",
        "system_prompt": "You are an expert researcher. Find key facts and structure them clearly."
      },
      {
        "name": "writer",
        "system_prompt": "You are a technical writer. Write clearly for developers."
      }
    ],
    "ask_me_about": ["tone", "audience"]
  }'
Enter fullscreen mode Exit fullscreen mode
{
  "run_id": "run_abc123",
  "status": "pending",
  "message": "Run started. Poll GET /runs/run_abc123 for status."
}
Enter fullscreen mode Exit fullscreen mode

Returns in milliseconds. Executes in the background. Pauses when it needs you. Resumes when you respond.

Here's what happens under the hood.


The engine: LangGraph + FastAPI

The core is a LangGraph state graph that runs five nodes in sequence:

split_tasks → infer_deps → assign_agents → execute loop → finalize
Enter fullscreen mode Exit fullscreen mode

1. split_tasks

The LLM splits your goal into 3–6 subtasks. Not hardcoded — generated at runtime based on what you asked for.

def split_tasks(state: AgentState) -> AgentState:
    llm = resolve_llm(state.get("llm_config"))
    res = llm.invoke([
        SystemMessage(content=(
            "Split the task into 3-6 clear, actionable subtasks. "
            "Return a JSON array of short subtask names only."
        )),
        HumanMessage(content=state["task"]),
    ])
    names = clean_json(get_content(res))
    state["subtasks"] = {}
    for i, name in enumerate(names, 1):
        state["subtasks"][f"T{i}"] = SubTask(
            id=f"T{i}", name=name, depends_on=[],
            assign_to="ai", state="pending",
            retries=0, max_retries=2,
            output=None, context={}, error=None, agent_used=None,
        )
    return state
Enter fullscreen mode Exit fullscreen mode

2. infer_deps

This is the part that makes it a real DAG instead of a linear chain. The LLM looks at all the subtask names and returns an adjacency list:

def infer_dependencies(state: AgentState) -> AgentState:
    task_list = [
        {"id": s["id"], "name": s["name"]}
        for s in state["subtasks"].values()
    ]
    res = llm.invoke([
        SystemMessage(content=(
            "Given these subtasks, determine dependencies.\n"
            "Return JSON: {task_id: [dependency_ids]}\n"
            "Only add dependencies that are genuinely required.\n"
            "Tasks that can start immediately get [].\n"
            "No circular deps. Every ID must appear as a key.\n"
            "Return ONLY valid JSON."
        )),
        HumanMessage(content=json.dumps(task_list)),
    ])
    deps = clean_json(get_content(res))
    for sid, dep_list in deps.items():
        state["subtasks"][sid]["depends_on"] = [
            d for d in dep_list
            if d in state["subtasks"] and d != sid
        ]
    return state
Enter fullscreen mode Exit fullscreen mode

Result: tasks with no dependencies execute immediately. Tasks that genuinely need prior output wait. This means parallel execution where possible.

3. assign_agents

Tasks get routed to human or AI based on what you specified:

def assign_agents(state: AgentState) -> AgentState:
    gates = [g.lower() for g in state.get("human_gates", [])]
    require_all = state.get("require_approval", False)
    agents = state.get("agents", [])

    for sub in state["subtasks"].values():
        if require_all:
            sub["assign_to"] = "human"
        elif gates and any(g in sub["name"].lower() for g in gates):
            sub["assign_to"] = "human"
        else:
            sub["assign_to"] = "ai"

        matched = match_agent(sub["name"], agents)
        if matched:
            sub["agent_used"] = matched["name"]

    return state
Enter fullscreen mode Exit fullscreen mode

Agent matching is substring-based — if the task name contains the agent name, that agent handles it. More on why I'm unsure about this later.

4. Execute loop

Each AI task runs with full shared context — not just its direct dependencies, but every prior completed output in the run:

def build_shared_context(state: AgentState, current_sid: str) -> str:
    lines = [f"Original task: {state['task']}\n"]
    for sid, sub in state["subtasks"].items():
        if sid == current_sid:
            break
        if sub.get("output") and sub["state"] == "completed":
            assignee = sub.get("agent_used") or sub["assign_to"]
            lines.append(
                f"[{sub['name']} - handled by {assignee}]\n"
                f"{sub['output']}\n"
            )
    return "\n".join(lines)
Enter fullscreen mode Exit fullscreen mode

This is what "agents speaking with the same context" means. The editor agent sees what the researcher found and what the writer drafted — not just what its direct parent produced.

Each task also self-evaluates its output and can request a retry:

{
  "output": "...",
  "verdict": "accept | retry | escalate",
  "issues": "brief note if not accept"
}
Enter fullscreen mode Exit fullscreen mode

If verdict is retry and retries remain, the failure reason is injected into the next attempt's context. If verdict is escalate, the task routes to human.


The hard part: human-in-loop that actually works

This took the longest to get right.

When a task hits a human gate, the run needs to suspend cleanly — not crash, not lose state, not restart from scratch when it resumes.

Here's the problem with LangGraph: when you want to "pause" a graph, you can't just stop mid-execution and resume later. The graph either runs to completion or it doesn't run at all. There's no native "suspend and come back in 3 hours" built in.

My solution was two separate graphs:

Graph 1 — full run:

split → deps → assign → execute loop → finalize
Enter fullscreen mode Exit fullscreen mode

Graph 2 — resume graph (skips setup, re-enters at router):

router_entry → execute loop → finalize
Enter fullscreen mode Exit fullscreen mode

When a task hits waiting_human, the router exits to finalize. But finalize checks for suspended tasks first:

def finalize(state: AgentState) -> AgentState:
    # Don't call LLM with empty content when suspended
    waiting = any(
        s["state"] == "waiting_human"
        for s in state["subtasks"].values()
    )
    if waiting:
        state["done"] = False
        state["final_output"] = None
        return state  # exit cleanly

    # Normal finalize path
    combined = "\n\n".join(...)
    ...
Enter fullscreen mode Exit fullscreen mode

State is saved to Redis (hot, 24h TTL) and Postgres (permanent). When the human responds via API:

curl -X POST https://api.orchflow.cloud/api/v1/runs/run_abc123/tasks/T3/complete \
  -H "X-API-Key: orch_your_key" \
  -d '{"output": "Focus on the async/await mental model, target senior devs"}'
Enter fullscreen mode Exit fullscreen mode

The human output is injected into the suspended task's state, and the resume graph picks up from the router — skipping split, deps, and assign entirely.

The run can be suspended for hours or days. State survives server restarts because it's persisted to Postgres, not just in-memory.


Security: encrypted provider keys

Users bring their own LLM API keys (Gemini, OpenAI, Anthropic, Ollama). The naive approach — accepting api_key directly in the request body — means keys end up in Redis state, Postgres run records, and uvicorn logs.

Instead, users register their key once:

POST /api/v1/providers
{
  "provider": "openai",
  "api_key": "sk-...",
  "label": "My OpenAI key"
}
Enter fullscreen mode Exit fullscreen mode

The key is encrypted with AES-GCM before storage:

def encrypt_key(plaintext: str) -> str:
    key = _get_key()          # 32-byte key from env
    nonce = secrets.token_bytes(12)
    aesgcm = AESGCM(key)
    ct_with_tag = aesgcm.encrypt(nonce, plaintext.encode(), None)
    combined = nonce + ct_with_tag
    return base64.b64encode(combined).decode()
Enter fullscreen mode Exit fullscreen mode

Returns a provider_id. Future runs reference provider_id — the raw key never appears in request bodies, Redis state, or logs again. Decryption happens in executor memory only, used to build the LangChain client, then garbage collected.


Progressive Postgres saves

The run executes in a background thread (LangGraph is synchronous, FastAPI is async — they need a bridge). The executor runs the graph in a thread pool:

engine_task = loop.run_in_executor(
    None, lambda: engine.invoke(initial_state)
)
Enter fullscreen mode Exit fullscreen mode

The problem: if you GET /tasks mid-run, Postgres has nothing yet — all state is in memory.

The solution is a ProgressiveSaver — a thread-safe queue that collects state snapshots from the engine thread:

class ProgressiveSaver:
    def __init__(self, run_id: str):
        self.run_id = run_id
        self._queue: queue.Queue = queue.Queue()

    def on_task_done(self, state: dict):
        # Called from engine thread after each task
        self._queue.put(dict(state["subtasks"]))

    async def drain_loop(self, stop_event: threading.Event):
        while not stop_event.is_set():
            await self.drain()
            await asyncio.sleep(1)
        await self.drain()  # final drain
Enter fullscreen mode Exit fullscreen mode

A parallel async task drains the queue every second and writes to Postgres. GET /tasks returns live data mid-run.


Two things I'm genuinely unsure about

1. Human gate keyword matching

Right now it's substring matching — if "pricing" is in your ask_me_about list and a task is named "Define pricing strategy", it routes to human.

This works but it's naive. What if your task is named "Implement secure authentication" and you have "auth" as a gate from a different context? False positives are a real problem.

I haven't found a clean solution that doesn't make the API much more complex. Regex? Semantic similarity? Both add friction. Open to ideas.

2. Non-deterministic DAG

The dependency graph is LLM-generated at runtime. Same goal, different run, slightly different subtask names, potentially different graph structure. For most workflows this is fine — the LLM is generally consistent. But for anything requiring reproducibility or auditability it's a problem.

I could let users define the DAG explicitly, but that defeats the "just POST a goal" simplicity. Not sure where the right tradeoff is.


What's next

  • Persistent agent registry (register agents once, reuse across runs)
  • Python SDK (pip install orchflow)
  • Tool calling per agent (web search, code execution)
  • Run history dashboard
  • Skill based routing for humans

Try it

GitHub (MIT): github.com/RunProgrammer/orchflow

Hosted API: api.orchflow.cloud — first 3 runs free, no setup needed

Docs: api.orchflow.cloud/docs

Self-host: docker compose up — that's it.


If you're building with LLMs and have opinions on either of those two open questions — the keyword matching problem or the non-deterministic DAG — I'd genuinely love to hear them in the comments.

Top comments (0)