DEV Community

Cover image for Building a Multi-Agent ATDD Pipeline with LangGraph and Hexagonal Architecture
Carlos Eduardo Sotelo Pinto
Carlos Eduardo Sotelo Pinto

Posted on

Building a Multi-Agent ATDD Pipeline with LangGraph and Hexagonal Architecture

Building a Multi-Agent ATDD Pipeline with LangGraph and Hexagonal Architecture

Write the spec, mark the story as ready, walk away. The agents do the rest.


The problem with solo AI development

Building a product solo is brutal.

You are the PO, the architect, the developer, and the QA — all at the same time. When AI coding agents entered the picture, I didn't see a magic button. I saw a new kind of team member that needed the same thing any team member needs: clear responsibilities, short tasks, and a verifiable definition of done.

The first thing I tried was the obvious approach: long prompts, one agent, do everything. It failed the way it always fails. The model drifted, lost context, and confidently built the wrong thing.

Then I applied something I already knew from software architecture:

Divide and conquer.

If a long prompt fails, what about a very short one with a very specific context? What if instead of one agent doing everything, you had multiple agents — each with a single role, a precise skill, and just enough context to do their job?

That question led me to build an ATDD orchestrator: a pipeline of specialized AI agents, coordinated by a state machine, that takes a user story from spec to acceptance without human intervention in the technical stages.

In this article I'll walk through the architecture, the design decisions, and — the main focus — how replacing a Celery queue with a LangGraph state machine made the whole thing significantly cleaner and more explicit.


What is ATDD and why does it fit AI agents perfectly?

Acceptance Test Driven Development says: write acceptance criteria first, then build until they pass. The acceptance criteria — written as Gherkin scenarios — are the only real definition of done.

This maps naturally to a multi-agent pipeline:

  1. Architect (human + Claude): define the spec, write the acceptance criteria, refine user stories
  2. Test engineer (autonomous): write unit and integration tests in RED — before any implementation
  3. Developer (autonomous): make the RED tests pass — and only that
  4. Tester (autonomous): quality gate — regressions, ruff, mypy
  5. ATF worker (autonomous): run the Gherkin acceptance scenarios with Playwright + Behave

Each role has a single responsibility. Each transition is triggered by a status change in a file. No agent can skip a stage or self-certify completion.

story.md → status: ready
              │
              ▼
        test_engineer  → tests in RED
              │
              ▼
          developer    → tests GREEN
              │
              ▼
            tester     → quality gate
              │
              ▼
          atf_worker   → acceptance scenarios pass
              │
              ▼
        status: accepted  ✓
Enter fullscreen mode Exit fullscreen mode

The spec is the contract. The Gherkin scenario is the verdict.


The architecture: hexagonal all the way down

Before talking about LangGraph, it's worth understanding the underlying architecture — because it's the reason the LangGraph integration was so clean.

The orchestrator follows strict hexagonal architecture (ports and adapters):

domain          — pure Python, no external dependencies
application     — imports domain only (use cases)
infrastructure  — imports domain + application + external libs
Enter fullscreen mode Exit fullscreen mode

The domain defines three ports (interfaces):

class StoryRepository(ABC):
    def get(self, story_id: str) -> Story: ...
    def save_status(self, story_id: str, status: Status, note: str = "") -> None: ...
    def find_by_status(self, status: Status) -> list[str]: ...

class CodeRunner(ABC):
    def run(self, role: str, prompt: str) -> None: ...

class TaskQueue(ABC):
    def enqueue(self, task_name: str, story_id: str) -> None: ...
Enter fullscreen mode Exit fullscreen mode

Each use case depends only on these ports. For example, RunDeveloper:

class RunDeveloper:
    def __init__(self, story_repo, runner, queue): ...

    def execute(self, story_id: str) -> None:
        try:
            self._runner.run("developer", _PROMPT.format(story_id=story_id))
            self._repo.save_status(story_id, Status.READY_TO_TEST)
            self._queue.enqueue("run_tester", story_id)
        except Exception as exc:
            self._repo.save_status(story_id, Status.BLOCKED, note=str(exc))
            raise
Enter fullscreen mode Exit fullscreen mode

Notice: the use case doesn't know what CodeRunner is (subprocess? API call? mock?), what StoryRepository stores to (files? database?), or how TaskQueue delivers the next task (Celery? Redis? LangGraph?).

This is the key. The infrastructure is replaceable without touching the domain.


The original infrastructure: Celery + Redis

The first implementation used Celery with Redis as the broker. Each role had a dedicated queue. The dispatcher polled every 30 seconds for stories in status INBOX and enqueued the first task.

# infrastructure/celery/tasks.py
@app.task(name="run_developer", queue="ready-to-dev")
def task_developer(self, project_path: str, story_id: str) -> None:
    repo, runner, queue, notifier = _deps(project_path)
    RunDeveloper(repo, runner, queue).execute(story_id)
Enter fullscreen mode Exit fullscreen mode

This worked. But it had friction:

  • Redis is required even for local development
  • The state machine logic is implicit — split across four separate queue definitions and the enqueue calls inside use cases
  • Retries when a quality gate fails required manual re-enqueue logic scattered across use cases
  • docker compose up just to run a pipeline on your laptop

I wanted the workflow to be explicit. That's where LangGraph came in.


The new infrastructure: LangGraph

LangGraph is a library for building stateful, graph-based workflows. You define nodes (work units) and edges (transitions), compile the graph, and invoke it with an initial state. It handles traversal, conditional routing, and can even do checkpointing.

It's designed for AI agent workflows — and an ATDD pipeline is exactly that.

State

The graph state is a TypedDict — a plain Python dict with type annotations:

from typing import TypedDict, Optional
from atdd_orchestrator.domain.story import Status

class PipelineState(TypedDict):
    story_id: str
    project_path: str
    status: Status
    blocked_reason: Optional[str]
    dev_retries: int
Enter fullscreen mode Exit fullscreen mode

dev_retries is the key addition over the Celery implementation. It's the counter that prevents infinite loops when the quality gate or acceptance tests keep failing.

Nodes

Each node calls the existing use case with one important change: it receives a NoOpQueue instead of a real queue adapter. The graph handles routing — the use cases don't need to know what comes next.

class _NoOpQueue(TaskQueue):
    def enqueue(self, task_name: str, story_id: str) -> None:
        pass  # routing is the graph's job
Enter fullscreen mode Exit fullscreen mode

After executing the use case, the node reads the current status back from the repository (the use case already persisted it), and returns the updated state:

def developer_node(state: PipelineState) -> PipelineState:
    repo, runner, notifier, queue = _deps(state["project_path"])
    try:
        RunDeveloper(repo, runner, queue).execute(state["story_id"])
    except Exception:
        pass  # use case already saved BLOCKED to the repo

    status, reason = _read_status(state["project_path"], state["story_id"])
    return {**state, "status": status, "blocked_reason": reason}
Enter fullscreen mode Exit fullscreen mode

The node is thin. It wires dependencies, delegates to the use case, reads the result, and returns. No business logic lives here.

Graph with conditional edges

This is where LangGraph pays off. The state machine that was previously implicit becomes explicit Python code:

from langgraph.graph import StateGraph, END

MAX_DEV_RETRIES = 3

def _route_after_tester(state: PipelineState) -> str:
    if state["status"] == Status.READY_TO_ATF:
        return "atf"
    if state["status"] == Status.READY_TO_DEV:
        if state["dev_retries"] < MAX_DEV_RETRIES:
            return "developer"   # quality gate failed → retry
    return END                   # blocked or retries exhausted

def _route_after_atf(state: PipelineState) -> str:
    if state["status"] == Status.DONE:
        return END
    if state["status"] == Status.READY_TO_DEV:
        if state["dev_retries"] < MAX_DEV_RETRIES:
            return "developer"   # acceptance failed → retry
    return END

def build_pipeline():
    g = StateGraph(PipelineState)

    g.add_node("test_engineer", test_engineer_node)
    g.add_node("developer",     developer_node)
    g.add_node("tester",        tester_node)
    g.add_node("atf",           atf_node)

    g.set_entry_point("test_engineer")

    g.add_conditional_edges("test_engineer", _route_after_test_engineer,
        {"developer": "developer", END: END})
    g.add_conditional_edges("developer",     _route_after_developer,
        {"tester": "tester",       END: END})
    g.add_conditional_edges("tester",        _route_after_tester,
        {"atf": "atf", "developer": "developer", END: END})
    g.add_conditional_edges("atf",           _route_after_atf,
        {"developer": "developer", END: END})

    return g.compile()
Enter fullscreen mode Exit fullscreen mode

Reading this, you immediately understand the full lifecycle of a story:

test_engineer
  → READY_TO_DEV   → developer
  → BLOCKED        → END

developer
  → READY_TO_TEST  → tester
  → BLOCKED        → END

tester
  → READY_TO_ATF   → atf
  → READY_TO_DEV   → developer (retry, up to 3×)
  → else           → END

atf
  → DONE           → END
  → READY_TO_DEV   → developer (retry, up to 3×)
  → else           → END
Enter fullscreen mode Exit fullscreen mode

The retry logic, the blocking conditions, the terminal states — all visible, all in one place.

Dispatcher

With LangGraph, the dispatcher becomes trivially simple:

def run(project_path: str) -> None:
    pipeline = build_pipeline()

    while True:
        repo = FrontmatterStoryRepository(project_path)
        for story_id in repo.find_by_status(Status.INBOX):
            initial_state = {
                "story_id": story_id,
                "project_path": project_path,
                "status": Status.INBOX,
                "blocked_reason": None,
                "dev_retries": 0,
            }
            final_state = pipeline.invoke(initial_state)
            log.info("Pipeline done — story: %s | final status: %s",
                     story_id, final_state["status"])

        time.sleep(POLL_INTERVAL)
Enter fullscreen mode Exit fullscreen mode

No broker, no worker process, no docker compose up. Just:

pip install -e ".[langgraph]"
python dispatcher_langgraph.py /path/to/your/project
Enter fullscreen mode Exit fullscreen mode

The design decisions worth discussing

Why is the state stored in files, not a database?

Each user story's state lives in a story.md file with YAML frontmatter:

---
id: US04
title: User can reset their password
status: in-progress:ready-to-test
sprint: sprint_02
---
Enter fullscreen mode Exit fullscreen mode

Keeping state in the repository means:

  • Any agent, any tool, any human can read and update it with a text editor
  • The state survives restarts with no migration
  • Git history shows every state transition
  • The orchestrator doesn't own the state — the project does

Why is the Architect role never automated?

The architect (human + Claude) defines scope, acceptance criteria, and what the story means. That judgment stays human-controlled.

Automating that step is how you end up building the wrong thing perfectly. The spec is the contract — someone has to own it.

Why keep Celery?

LangGraph is a better fit for local development and single-machine setups. Celery is better when workers need to run on separate machines or when you need horizontal scaling.

Both share the same domain and the same use cases. The difference is purely in which infrastructure adapter you wire in. That's hexagonal architecture earning its cost.

The NoOpQueue pattern

This is worth calling out explicitly. The use cases currently call self._queue.enqueue("run_tester", story_id) as a side effect of their execution. In the LangGraph world, that call is meaningless — the graph decides what runs next.

Rather than modifying the use cases (which would break the Celery flow), we pass a NoOpQueue that swallows the enqueue calls silently. The use case's core behavior — running the agent, saving the status, raising on failure — is unchanged. Only the side effect is suppressed.

This is dependency injection doing exactly what it's supposed to do.


What I tested on a real project

The first project running through this pipeline is atf-ai — a CLI tool with Playwright-based acceptance tests. Five user stories, end-to-end:

Story Status
US01 — Scaffolding CLI damaged (1 failing scenario, known issue)
US02 — Docker Runner accepted
US03 — Screenplay Actors & Steps accepted
US04 — Feedback & State Tracking accepted
US05 — Reports Pipeline & PyPI accepted

Four out of five stories went from ready to accepted autonomously. The fifth is blocked on a known state mismatch that requires architectural review — exactly the kind of thing that should block, not silently pass.


What's next

A few things on the backlog:

  • LangGraph checkpointing — persist graph state to disk so a pipeline can resume after a crash without re-running completed stages
  • Parallel stories — run multiple stories concurrently using asyncio and LangGraph's async support
  • Observability — emit structured events at each node transition for tracing and debugging
  • Self-healing architect — when a story is blocked, trigger a Claude session to diagnose and propose a fix

Repository

The full source is open:

github.com/csotelo/atdd-framework

atdd_orchestrator/
├── domain/          # Story, Status, ports — pure Python
├── application/     # use cases — depend only on domain
└── infrastructure/
    ├── celery/      # Celery + Redis adapter
    └── langgraph/   # LangGraph adapter (new)
Enter fullscreen mode Exit fullscreen mode

35 tests, 0 failures. No Redis, no OpenCode, no network required to run the test suite — all ports are stubbed.


Final thought

The thing that surprised me most about this project wasn't the AI part. It was how much cleaner the orchestration became once I made the state machine explicit.

With Celery, the workflow lived in queue names, routing keys, and scattered enqueue() calls inside use cases. You had to read four files to understand what happened after a test failure.

With LangGraph, the entire lifecycle is in one function. The retry logic, the terminal conditions, the branching paths — all visible at a glance. That's not a small thing when you're the only engineer on the project and you come back to the code three weeks later.

If you're building multi-agent pipelines, make the state machine explicit. Your future self will thank you.


Questions, issues, or contributions: github.com/csotelo/atdd-framework

Top comments (0)