The agent loop had a max iteration count. The engineers were proud of this. "We capped it at 50 iterations," they said. "It can't run forever."
Then they got billed $200 for one hour of compute. The agent had hit exactly 49 iterations — just under the cap — on every single task in the overnight batch. Each iteration called a subagent. The 50-iteration cap was on the outer loop, not the inner one. The inner loops ran unconstrained.
A single stop condition is not enough. You need composable conditions that cover different failure modes independently.
The Shape of the Fix
from llm_stop_conditions import StopEvaluator, MaxIters, MaxUsd, MaxTokens, NoProgress
evaluator = StopEvaluator([
MaxIters(25),
MaxUsd(2.00, model="claude-sonnet-4-6"),
MaxTokens(50_000),
NoProgress(patience=3),
])
state = evaluator.initial_state()
while True:
response = call_llm(messages)
state = evaluator.update(state, response)
should_stop, reason = evaluator.check(state)
if should_stop:
print(f"Stopping because: {reason}")
break
if response.stop_reason == "end_turn":
break
Four conditions. Any one can stop the loop. The reason tells you which one fired. should_stop is a bool. reason is a string for logging.
What It Does NOT Do
llm-stop-conditions does not call the LLM. It evaluates the responses you pass to it. You still control the actual LLM call.
It does not integrate with circuit breakers or deadlines. For wall-clock time limits, use agent-deadline. For circuit breaking per provider, use llm-circuit-breaker-py. Stop conditions are about loop semantics (iterations, cost, tokens, progress), not about time or availability.
It does not guarantee that cost estimates are accurate. MaxUsd uses the same price table as llm-cost-cap. Estimates can be off if the model returns fewer tokens than expected or if cache hits reduce actual cost.
Inside the Library
Each condition is a class with two methods: update(state, response) -> state and check(state) -> (bool, str).
class MaxIters:
def __init__(self, max_iters: int):
self.max_iters = max_iters
def update(self, state: dict, response) -> dict:
return {**state, "iters": state.get("iters", 0) + 1}
def check(self, state: dict) -> tuple[bool, str]:
if state.get("iters", 0) >= self.max_iters:
return True, f"MaxIters: {state['iters']} >= {self.max_iters}"
return False, ""
StopEvaluator composes them: update() calls each condition's update and merges state, check() calls each condition's check and returns at the first True.
NoProgress is the most complex. It tracks whether recent iterations produced meaningful work:
class NoProgress:
def __init__(self, patience: int = 3):
self.patience = patience
def update(self, state: dict, response) -> dict:
has_tool_call = any(b.type == "tool_use" for b in response.content)
streak = state.get("no_progress_streak", 0)
return {**state, "no_progress_streak": 0 if has_tool_call else streak + 1}
def check(self, state: dict) -> tuple[bool, str]:
streak = state.get("no_progress_streak", 0)
if streak >= self.patience:
return True, f"NoProgress: {streak} consecutive iterations with no tool calls"
return False, ""
Custom conditions: subclass StopCondition and implement update() and check(). Pass instances to StopEvaluator.
The 29 tests cover each condition independently, evaluator composition (first condition to fire wins), NoProgress streak tracking, MaxUsd with model price lookup, and custom condition integration.
When to Use It
Use it in any agent loop where the stopping criteria is not purely "the model said end_turn." That is almost every production loop.
The conditions match specific failure modes:
-
MaxIters: prevents loops that ignoreend_turn -
MaxUsd: prevents runaway cost from nested or recursive agents -
MaxTokens: prevents context overflow before the provider enforces it -
NoProgress: detects stuck agents that keep calling the LLM without making progress
Pick the conditions that match your actual risk surface. A simple task with bounded scope might only need MaxIters(10). A multi-step research agent needs all four.
Install
pip install git+https://github.com/MukundaKatta/llm-stop-conditions
from llm_stop_conditions import StopEvaluator, MaxIters, MaxUsd, MaxTokens, NoProgress, StopCondition
# Custom condition: stop if response contains an error message
class StopOnError(StopCondition):
def update(self, state: dict, response) -> dict:
has_error = any("error" in b.text.lower()
for b in response.content
if hasattr(b, "text"))
return {**state, "saw_error": has_error}
def check(self, state: dict) -> tuple[bool, str]:
if state.get("saw_error"):
return True, "StopOnError: response contained error message"
return False, ""
evaluator = StopEvaluator([
MaxIters(20),
MaxUsd(1.00, model="claude-sonnet-4-6"),
NoProgress(patience=2),
StopOnError(),
])
Sibling Libraries
| Library | What it solves |
|---|---|
agent-deadline |
Wall-clock time limit (monotonic clock) |
agent-loop-bound |
Simpler single max-iteration cap |
llm-cost-cap |
Pre-flight cost check before each call |
token-budget-pool |
Shared budget across concurrent agents |
agentsnap |
Record actual usage for drift monitoring |
The three-layer stop stack: llm-stop-conditions for loop semantics, agent-deadline for wall-clock, llm-cost-cap pre-flight. Together they cover iteration count, cost, tokens, progress, and time without any single point of failure.
What's Next
A FirstN condition that stops after the first N tool calls (not iterations) would be useful for agents with very expensive tools. Right now MaxIters counts loop iterations; some loops do multiple tool calls per iteration.
Async support: StopEvaluator.update() is synchronous. If your response parsing is async, an async_update() variant would fit naturally.
Serializable state: right now state is a plain dict, which is serializable by default. But documenting and testing this explicitly would help for checkpoint/resume scenarios where you need to persist the evaluator state across crashes.
Built as part of the agent-stack family: composable Python primitives for production LLM agents.
Top comments (0)