Building a Multi-Agent ATDD Pipeline with LangGraph and Hexagonal Architecture
Write the spec, mark the story as ready, walk away. The agents do the rest.
The problem with solo AI development
Building a product solo is brutal.
You are the PO, the architect, the developer, and the QA — all at the same time. When AI coding agents entered the picture, I didn't see a magic button. I saw a new kind of team member that needed the same thing any team member needs: clear responsibilities, short tasks, and a verifiable definition of done.
The first thing I tried was the obvious approach: long prompts, one agent, do everything. It failed the way it always fails. The model drifted, lost context, and confidently built the wrong thing.
Then I applied something I already knew from software architecture:
Divide and conquer.
If a long prompt fails, what about a very short one with a very specific context? What if instead of one agent doing everything, you had multiple agents — each with a single role, a precise skill, and just enough context to do their job?
That question led me to build an ATDD orchestrator: a pipeline of specialized AI agents, coordinated by a state machine, that takes a user story from spec to acceptance without human intervention in the technical stages.
In this article I'll walk through the architecture, the design decisions, and — the main focus — how replacing a Celery queue with a LangGraph state machine made the whole thing significantly cleaner and more explicit.
What is ATDD and why does it fit AI agents perfectly?
Acceptance Test Driven Development says: write acceptance criteria first, then build until they pass. The acceptance criteria — written as Gherkin scenarios — are the only real definition of done.
This maps naturally to a multi-agent pipeline:
- Architect (human + Claude): define the spec, write the acceptance criteria, refine user stories
- Test engineer (autonomous): write unit and integration tests in RED — before any implementation
- Developer (autonomous): make the RED tests pass — and only that
- Tester (autonomous): quality gate — regressions, ruff, mypy
- ATF worker (autonomous): run the Gherkin acceptance scenarios with Playwright + Behave
Each role has a single responsibility. Each transition is triggered by a status change in a file. No agent can skip a stage or self-certify completion.
story.md → status: ready
│
▼
test_engineer → tests in RED
│
▼
developer → tests GREEN
│
▼
tester → quality gate
│
▼
atf_worker → acceptance scenarios pass
│
▼
status: accepted ✓
The spec is the contract. The Gherkin scenario is the verdict.
The architecture: hexagonal all the way down
Before talking about LangGraph, it's worth understanding the underlying architecture — because it's the reason the LangGraph integration was so clean.
The orchestrator follows strict hexagonal architecture (ports and adapters):
domain — pure Python, no external dependencies
application — imports domain only (use cases)
infrastructure — imports domain + application + external libs
The domain defines three ports (interfaces):
class StoryRepository(ABC):
def get(self, story_id: str) -> Story: ...
def save_status(self, story_id: str, status: Status, note: str = "") -> None: ...
def find_by_status(self, status: Status) -> list[str]: ...
class CodeRunner(ABC):
def run(self, role: str, prompt: str) -> None: ...
class TaskQueue(ABC):
def enqueue(self, task_name: str, story_id: str) -> None: ...
Each use case depends only on these ports. For example, RunDeveloper:
class RunDeveloper:
def __init__(self, story_repo, runner, queue): ...
def execute(self, story_id: str) -> None:
try:
self._runner.run("developer", _PROMPT.format(story_id=story_id))
self._repo.save_status(story_id, Status.READY_TO_TEST)
self._queue.enqueue("run_tester", story_id)
except Exception as exc:
self._repo.save_status(story_id, Status.BLOCKED, note=str(exc))
raise
Notice: the use case doesn't know what CodeRunner is (subprocess? API call? mock?), what StoryRepository stores to (files? database?), or how TaskQueue delivers the next task (Celery? Redis? LangGraph?).
This is the key. The infrastructure is replaceable without touching the domain.
The original infrastructure: Celery + Redis
The first implementation used Celery with Redis as the broker. Each role had a dedicated queue. The dispatcher polled every 30 seconds for stories in status INBOX and enqueued the first task.
# infrastructure/celery/tasks.py
@app.task(name="run_developer", queue="ready-to-dev")
def task_developer(self, project_path: str, story_id: str) -> None:
repo, runner, queue, notifier = _deps(project_path)
RunDeveloper(repo, runner, queue).execute(story_id)
This worked. But it had friction:
- Redis is required even for local development
- The state machine logic is implicit — split across four separate queue definitions and the enqueue calls inside use cases
- Retries when a quality gate fails required manual re-enqueue logic scattered across use cases
-
docker compose upjust to run a pipeline on your laptop
I wanted the workflow to be explicit. That's where LangGraph came in.
The new infrastructure: LangGraph
LangGraph is a library for building stateful, graph-based workflows. You define nodes (work units) and edges (transitions), compile the graph, and invoke it with an initial state. It handles traversal, conditional routing, and can even do checkpointing.
It's designed for AI agent workflows — and an ATDD pipeline is exactly that.
State
The graph state is a TypedDict — a plain Python dict with type annotations:
from typing import TypedDict, Optional
from atdd_orchestrator.domain.story import Status
class PipelineState(TypedDict):
story_id: str
project_path: str
status: Status
blocked_reason: Optional[str]
dev_retries: int
dev_retries is the key addition over the Celery implementation. It's the counter that prevents infinite loops when the quality gate or acceptance tests keep failing.
Nodes
Each node calls the existing use case with one important change: it receives a NoOpQueue instead of a real queue adapter. The graph handles routing — the use cases don't need to know what comes next.
class _NoOpQueue(TaskQueue):
def enqueue(self, task_name: str, story_id: str) -> None:
pass # routing is the graph's job
After executing the use case, the node reads the current status back from the repository (the use case already persisted it), and returns the updated state:
def developer_node(state: PipelineState) -> PipelineState:
repo, runner, notifier, queue = _deps(state["project_path"])
try:
RunDeveloper(repo, runner, queue).execute(state["story_id"])
except Exception:
pass # use case already saved BLOCKED to the repo
status, reason = _read_status(state["project_path"], state["story_id"])
return {**state, "status": status, "blocked_reason": reason}
The node is thin. It wires dependencies, delegates to the use case, reads the result, and returns. No business logic lives here.
Graph with conditional edges
This is where LangGraph pays off. The state machine that was previously implicit becomes explicit Python code:
from langgraph.graph import StateGraph, END
MAX_DEV_RETRIES = 3
def _route_after_tester(state: PipelineState) -> str:
if state["status"] == Status.READY_TO_ATF:
return "atf"
if state["status"] == Status.READY_TO_DEV:
if state["dev_retries"] < MAX_DEV_RETRIES:
return "developer" # quality gate failed → retry
return END # blocked or retries exhausted
def _route_after_atf(state: PipelineState) -> str:
if state["status"] == Status.DONE:
return END
if state["status"] == Status.READY_TO_DEV:
if state["dev_retries"] < MAX_DEV_RETRIES:
return "developer" # acceptance failed → retry
return END
def build_pipeline():
g = StateGraph(PipelineState)
g.add_node("test_engineer", test_engineer_node)
g.add_node("developer", developer_node)
g.add_node("tester", tester_node)
g.add_node("atf", atf_node)
g.set_entry_point("test_engineer")
g.add_conditional_edges("test_engineer", _route_after_test_engineer,
{"developer": "developer", END: END})
g.add_conditional_edges("developer", _route_after_developer,
{"tester": "tester", END: END})
g.add_conditional_edges("tester", _route_after_tester,
{"atf": "atf", "developer": "developer", END: END})
g.add_conditional_edges("atf", _route_after_atf,
{"developer": "developer", END: END})
return g.compile()
Reading this, you immediately understand the full lifecycle of a story:
test_engineer
→ READY_TO_DEV → developer
→ BLOCKED → END
developer
→ READY_TO_TEST → tester
→ BLOCKED → END
tester
→ READY_TO_ATF → atf
→ READY_TO_DEV → developer (retry, up to 3×)
→ else → END
atf
→ DONE → END
→ READY_TO_DEV → developer (retry, up to 3×)
→ else → END
The retry logic, the blocking conditions, the terminal states — all visible, all in one place.
Dispatcher
With LangGraph, the dispatcher becomes trivially simple:
def run(project_path: str) -> None:
pipeline = build_pipeline()
while True:
repo = FrontmatterStoryRepository(project_path)
for story_id in repo.find_by_status(Status.INBOX):
initial_state = {
"story_id": story_id,
"project_path": project_path,
"status": Status.INBOX,
"blocked_reason": None,
"dev_retries": 0,
}
final_state = pipeline.invoke(initial_state)
log.info("Pipeline done — story: %s | final status: %s",
story_id, final_state["status"])
time.sleep(POLL_INTERVAL)
No broker, no worker process, no docker compose up. Just:
pip install -e ".[langgraph]"
python dispatcher_langgraph.py /path/to/your/project
The design decisions worth discussing
Why is the state stored in files, not a database?
Each user story's state lives in a story.md file with YAML frontmatter:
---
id: US04
title: User can reset their password
status: in-progress:ready-to-test
sprint: sprint_02
---
Keeping state in the repository means:
- Any agent, any tool, any human can read and update it with a text editor
- The state survives restarts with no migration
- Git history shows every state transition
- The orchestrator doesn't own the state — the project does
Why is the Architect role never automated?
The architect (human + Claude) defines scope, acceptance criteria, and what the story means. That judgment stays human-controlled.
Automating that step is how you end up building the wrong thing perfectly. The spec is the contract — someone has to own it.
Why keep Celery?
LangGraph is a better fit for local development and single-machine setups. Celery is better when workers need to run on separate machines or when you need horizontal scaling.
Both share the same domain and the same use cases. The difference is purely in which infrastructure adapter you wire in. That's hexagonal architecture earning its cost.
The NoOpQueue pattern
This is worth calling out explicitly. The use cases currently call self._queue.enqueue("run_tester", story_id) as a side effect of their execution. In the LangGraph world, that call is meaningless — the graph decides what runs next.
Rather than modifying the use cases (which would break the Celery flow), we pass a NoOpQueue that swallows the enqueue calls silently. The use case's core behavior — running the agent, saving the status, raising on failure — is unchanged. Only the side effect is suppressed.
This is dependency injection doing exactly what it's supposed to do.
What I tested on a real project
The first project running through this pipeline is atf-ai — a CLI tool with Playwright-based acceptance tests. Five user stories, end-to-end:
| Story | Status |
|---|---|
| US01 — Scaffolding CLI |
damaged (1 failing scenario, known issue) |
| US02 — Docker Runner |
accepted ✓ |
| US03 — Screenplay Actors & Steps |
accepted ✓ |
| US04 — Feedback & State Tracking |
accepted ✓ |
| US05 — Reports Pipeline & PyPI |
accepted ✓ |
Four out of five stories went from ready to accepted autonomously. The fifth is blocked on a known state mismatch that requires architectural review — exactly the kind of thing that should block, not silently pass.
What's next
A few things on the backlog:
- LangGraph checkpointing — persist graph state to disk so a pipeline can resume after a crash without re-running completed stages
-
Parallel stories — run multiple stories concurrently using
asyncioand LangGraph's async support - Observability — emit structured events at each node transition for tracing and debugging
-
Self-healing architect — when a story is
blocked, trigger a Claude session to diagnose and propose a fix
Repository
The full source is open:
github.com/csotelo/atdd-framework
atdd_orchestrator/
├── domain/ # Story, Status, ports — pure Python
├── application/ # use cases — depend only on domain
└── infrastructure/
├── celery/ # Celery + Redis adapter
└── langgraph/ # LangGraph adapter (new)
35 tests, 0 failures. No Redis, no OpenCode, no network required to run the test suite — all ports are stubbed.
Final thought
The thing that surprised me most about this project wasn't the AI part. It was how much cleaner the orchestration became once I made the state machine explicit.
With Celery, the workflow lived in queue names, routing keys, and scattered enqueue() calls inside use cases. You had to read four files to understand what happened after a test failure.
With LangGraph, the entire lifecycle is in one function. The retry logic, the terminal conditions, the branching paths — all visible at a glance. That's not a small thing when you're the only engineer on the project and you come back to the code three weeks later.
If you're building multi-agent pipelines, make the state machine explicit. Your future self will thank you.
Questions, issues, or contributions: github.com/csotelo/atdd-framework
Top comments (0)