I Built an LLM Orchestration API That Actually Handles Human-in-Loop — Here's How
The part that took the longest to get right, and the two architectural decisions I'm still unsure about.
Every AI project I built started the same way.
I'd spend the first two weeks writing infrastructure that had nothing to do with the actual product — state machines, retry logic, DAG execution, human approval flows, persistence layers. The same plumbing, project after project.
So I built Orchflow and open-sourced it. This is the technical story of how it works, what was hard, and what I'm still unsure about.
What Orchflow does
You POST a goal. It handles the rest.
curl -X POST https://api.orchflow.cloud/api/v1/runs \
-H "X-API-Key: orch_your_key" \
-H "Content-Type: application/json" \
-d '{
"task": "Write a technical blog post about async Python",
"provider_id": "prv_abc123",
"agents": [
{
"name": "research",
"system_prompt": "You are an expert researcher. Find key facts and structure them clearly."
},
{
"name": "writer",
"system_prompt": "You are a technical writer. Write clearly for developers."
}
],
"ask_me_about": ["tone", "audience"]
}'
{
"run_id": "run_abc123",
"status": "pending",
"message": "Run started. Poll GET /runs/run_abc123 for status."
}
Returns in milliseconds. Executes in the background. Pauses when it needs you. Resumes when you respond.
Here's what happens under the hood.
The engine: LangGraph + FastAPI
The core is a LangGraph state graph that runs five nodes in sequence:
split_tasks → infer_deps → assign_agents → execute loop → finalize
1. split_tasks
The LLM splits your goal into 3–6 subtasks. Not hardcoded — generated at runtime based on what you asked for.
def split_tasks(state: AgentState) -> AgentState:
llm = resolve_llm(state.get("llm_config"))
res = llm.invoke([
SystemMessage(content=(
"Split the task into 3-6 clear, actionable subtasks. "
"Return a JSON array of short subtask names only."
)),
HumanMessage(content=state["task"]),
])
names = clean_json(get_content(res))
state["subtasks"] = {}
for i, name in enumerate(names, 1):
state["subtasks"][f"T{i}"] = SubTask(
id=f"T{i}", name=name, depends_on=[],
assign_to="ai", state="pending",
retries=0, max_retries=2,
output=None, context={}, error=None, agent_used=None,
)
return state
2. infer_deps
This is the part that makes it a real DAG instead of a linear chain. The LLM looks at all the subtask names and returns an adjacency list:
def infer_dependencies(state: AgentState) -> AgentState:
task_list = [
{"id": s["id"], "name": s["name"]}
for s in state["subtasks"].values()
]
res = llm.invoke([
SystemMessage(content=(
"Given these subtasks, determine dependencies.\n"
"Return JSON: {task_id: [dependency_ids]}\n"
"Only add dependencies that are genuinely required.\n"
"Tasks that can start immediately get [].\n"
"No circular deps. Every ID must appear as a key.\n"
"Return ONLY valid JSON."
)),
HumanMessage(content=json.dumps(task_list)),
])
deps = clean_json(get_content(res))
for sid, dep_list in deps.items():
state["subtasks"][sid]["depends_on"] = [
d for d in dep_list
if d in state["subtasks"] and d != sid
]
return state
Result: tasks with no dependencies execute immediately. Tasks that genuinely need prior output wait. This means parallel execution where possible.
3. assign_agents
Tasks get routed to human or AI based on what you specified:
def assign_agents(state: AgentState) -> AgentState:
gates = [g.lower() for g in state.get("human_gates", [])]
require_all = state.get("require_approval", False)
agents = state.get("agents", [])
for sub in state["subtasks"].values():
if require_all:
sub["assign_to"] = "human"
elif gates and any(g in sub["name"].lower() for g in gates):
sub["assign_to"] = "human"
else:
sub["assign_to"] = "ai"
matched = match_agent(sub["name"], agents)
if matched:
sub["agent_used"] = matched["name"]
return state
Agent matching is substring-based — if the task name contains the agent name, that agent handles it. More on why I'm unsure about this later.
4. Execute loop
Each AI task runs with full shared context — not just its direct dependencies, but every prior completed output in the run:
def build_shared_context(state: AgentState, current_sid: str) -> str:
lines = [f"Original task: {state['task']}\n"]
for sid, sub in state["subtasks"].items():
if sid == current_sid:
break
if sub.get("output") and sub["state"] == "completed":
assignee = sub.get("agent_used") or sub["assign_to"]
lines.append(
f"[{sub['name']} - handled by {assignee}]\n"
f"{sub['output']}\n"
)
return "\n".join(lines)
This is what "agents speaking with the same context" means. The editor agent sees what the researcher found and what the writer drafted — not just what its direct parent produced.
Each task also self-evaluates its output and can request a retry:
{
"output": "...",
"verdict": "accept | retry | escalate",
"issues": "brief note if not accept"
}
If verdict is retry and retries remain, the failure reason is injected into the next attempt's context. If verdict is escalate, the task routes to human.
The hard part: human-in-loop that actually works
This took the longest to get right.
When a task hits a human gate, the run needs to suspend cleanly — not crash, not lose state, not restart from scratch when it resumes.
Here's the problem with LangGraph: when you want to "pause" a graph, you can't just stop mid-execution and resume later. The graph either runs to completion or it doesn't run at all. There's no native "suspend and come back in 3 hours" built in.
My solution was two separate graphs:
Graph 1 — full run:
split → deps → assign → execute loop → finalize
Graph 2 — resume graph (skips setup, re-enters at router):
router_entry → execute loop → finalize
When a task hits waiting_human, the router exits to finalize. But finalize checks for suspended tasks first:
def finalize(state: AgentState) -> AgentState:
# Don't call LLM with empty content when suspended
waiting = any(
s["state"] == "waiting_human"
for s in state["subtasks"].values()
)
if waiting:
state["done"] = False
state["final_output"] = None
return state # exit cleanly
# Normal finalize path
combined = "\n\n".join(...)
...
State is saved to Redis (hot, 24h TTL) and Postgres (permanent). When the human responds via API:
curl -X POST https://api.orchflow.cloud/api/v1/runs/run_abc123/tasks/T3/complete \
-H "X-API-Key: orch_your_key" \
-d '{"output": "Focus on the async/await mental model, target senior devs"}'
The human output is injected into the suspended task's state, and the resume graph picks up from the router — skipping split, deps, and assign entirely.
The run can be suspended for hours or days. State survives server restarts because it's persisted to Postgres, not just in-memory.
Security: encrypted provider keys
Users bring their own LLM API keys (Gemini, OpenAI, Anthropic, Ollama). The naive approach — accepting api_key directly in the request body — means keys end up in Redis state, Postgres run records, and uvicorn logs.
Instead, users register their key once:
POST /api/v1/providers
{
"provider": "openai",
"api_key": "sk-...",
"label": "My OpenAI key"
}
The key is encrypted with AES-GCM before storage:
def encrypt_key(plaintext: str) -> str:
key = _get_key() # 32-byte key from env
nonce = secrets.token_bytes(12)
aesgcm = AESGCM(key)
ct_with_tag = aesgcm.encrypt(nonce, plaintext.encode(), None)
combined = nonce + ct_with_tag
return base64.b64encode(combined).decode()
Returns a provider_id. Future runs reference provider_id — the raw key never appears in request bodies, Redis state, or logs again. Decryption happens in executor memory only, used to build the LangChain client, then garbage collected.
Progressive Postgres saves
The run executes in a background thread (LangGraph is synchronous, FastAPI is async — they need a bridge). The executor runs the graph in a thread pool:
engine_task = loop.run_in_executor(
None, lambda: engine.invoke(initial_state)
)
The problem: if you GET /tasks mid-run, Postgres has nothing yet — all state is in memory.
The solution is a ProgressiveSaver — a thread-safe queue that collects state snapshots from the engine thread:
class ProgressiveSaver:
def __init__(self, run_id: str):
self.run_id = run_id
self._queue: queue.Queue = queue.Queue()
def on_task_done(self, state: dict):
# Called from engine thread after each task
self._queue.put(dict(state["subtasks"]))
async def drain_loop(self, stop_event: threading.Event):
while not stop_event.is_set():
await self.drain()
await asyncio.sleep(1)
await self.drain() # final drain
A parallel async task drains the queue every second and writes to Postgres. GET /tasks returns live data mid-run.
Two things I'm genuinely unsure about
1. Human gate keyword matching
Right now it's substring matching — if "pricing" is in your ask_me_about list and a task is named "Define pricing strategy", it routes to human.
This works but it's naive. What if your task is named "Implement secure authentication" and you have "auth" as a gate from a different context? False positives are a real problem.
I haven't found a clean solution that doesn't make the API much more complex. Regex? Semantic similarity? Both add friction. Open to ideas.
2. Non-deterministic DAG
The dependency graph is LLM-generated at runtime. Same goal, different run, slightly different subtask names, potentially different graph structure. For most workflows this is fine — the LLM is generally consistent. But for anything requiring reproducibility or auditability it's a problem.
I could let users define the DAG explicitly, but that defeats the "just POST a goal" simplicity. Not sure where the right tradeoff is.
What's next
- Persistent agent registry (register agents once, reuse across runs)
- Python SDK (
pip install orchflow) - Tool calling per agent (web search, code execution)
- Run history dashboard
- Skill based routing for humans
Try it
GitHub (MIT): github.com/RunProgrammer/orchflow
Hosted API: api.orchflow.cloud — first 3 runs free, no setup needed
Docs: api.orchflow.cloud/docs
Self-host: docker compose up — that's it.
If you're building with LLMs and have opinions on either of those two open questions — the keyword matching problem or the non-deterministic DAG — I'd genuinely love to hear them in the comments.
Top comments (0)