When I first started building production AI agents, I was focused on the wrong thing. I spent weeks perfecting the prompt, the tool routing, the memory system. What I didn't build was an off switch.
Six months later, one of those agents went into a retry loop at 2 AM and racked up $340 in API costs before I woke up to 47 Slack alerts. The agent wasn't broken -- it was doing exactly what it was designed to do, just in a situation I hadn't anticipated. That's the thing about agents: they're very good at pursuing their objective. The problem is when the objective is wrong, the environment is unexpected, or the costs are real.
Every production agent needs a kill switch. Several, actually.
The Real Failure Modes
Before we talk about solutions, let's be specific about what we're protecting against.
Infinite retry loops. An agent that hits a transient API error and retries forever. Or one that misinterprets a "task not complete" signal as a reason to keep trying. Without a hard iteration ceiling, this is a matter of when, not if.
Runaway costs. LLM calls are not free. An agent processing a malformed input that causes it to spawn sub-agents, which spawn more sub-agents, can hit $1,000 in costs before a human notices. Token counting at the agent level is something most teams skip until they get the bill.
Unintended side effects. An agent with write permissions to a database, email sender access, or file system access can cause irreversible damage. If the agent logic has a bug, the damage compounds with every step it takes before you catch it.
Silent degradation. Some agents don't fail loudly -- they just start producing wrong outputs quietly. Without output validation, you might not notice for hours or days.
These are not edge cases. They're the default trajectory of agents running without guardrails.
Three Types of Kill Switches
Think of kill switches in layers: hard, soft, and budget.
1. Hard Stop (Process Kill)
The nuclear option. Use this when the agent needs to stop immediately, regardless of state. The pattern is simple: a shared flag that any part of the system can flip.
import threading
import signal
import time
from typing import Callable
class HardStop:
def __init__(self):
self._stop_event = threading.Event()
# Register signal handlers so Ctrl+C and SIGTERM work too
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
def _handle_signal(self, signum, frame):
print(f"Signal {signum} received, stopping agent...")
self.trigger()
def trigger(self, reason: str = "manual stop"):
print(f"HARD STOP triggered: {reason}")
self._stop_event.set()
def should_stop(self) -> bool:
return self._stop_event.is_set()
def check_or_raise(self):
if self.should_stop():
raise SystemExit("Agent stopped by kill switch")
# Usage in your agent loop
kill_switch = HardStop()
def run_agent(task: str, kill_switch: HardStop):
for step in range(100): # iteration ceiling
kill_switch.check_or_raise()
result = execute_step(task, step)
if result.is_complete:
return result
raise RuntimeError("Exceeded max iterations without completing")
For TypeScript, the pattern is similar:
class HardStop {
private stopped = false;
private reason = "";
constructor() {
process.on("SIGINT", () => this.trigger("SIGINT received"));
process.on("SIGTERM", () => this.trigger("SIGTERM received"));
}
trigger(reason: string): void {
console.log(`HARD STOP: ${reason}`);
this.stopped = true;
this.reason = reason;
}
checkOrThrow(): void {
if (this.stopped) {
throw new Error(`Agent stopped: ${this.reason}`);
}
}
get isTriggered(): boolean {
return this.stopped;
}
}
The key discipline: call check_or_raise() at the top of every loop iteration and before every external call. Not just once at the start.
2. Soft Stop (Graceful Shutdown with State Save)
Hard stops are useful, but often you want the agent to finish what it's doing, save its state, and exit cleanly. This matters when the agent has made partial progress you want to resume later.
import json
from dataclasses import dataclass, asdict
from pathlib import Path
@dataclass
class AgentState:
task_id: str
current_step: int
completed_steps: list
partial_results: dict
stop_reason: str = ""
class SoftStop:
def __init__(self, state_path: str):
self.state_path = Path(state_path)
self._graceful_stop = False
def request_stop(self, reason: str = "graceful shutdown"):
print(f"Graceful stop requested: {reason}")
self._graceful_stop = True
def should_stop_after_current_step(self) -> bool:
return self._graceful_stop
def save_and_exit(self, state: AgentState, reason: str):
state.stop_reason = reason
with open(self.state_path, "w") as f:
json.dump(asdict(state), f, indent=2)
print(f"State saved to {self.state_path}. Resume with task_id={state.task_id}")
def load_state(self) -> AgentState | None:
if not self.state_path.exists():
return None
with open(self.state_path) as f:
data = json.load(f)
return AgentState(**data)
def run_resumable_agent(task_id: str, soft_stop: SoftStop):
# Try to resume from previous state
state = soft_stop.load_state() or AgentState(
task_id=task_id,
current_step=0,
completed_steps=[],
partial_results={}
)
try:
for step in range(state.current_step, 100):
result = execute_step(task_id, step)
state.completed_steps.append(step)
state.current_step = step + 1
# Check for graceful stop after each step completes
if soft_stop.should_stop_after_current_step():
soft_stop.save_and_exit(state, "graceful shutdown requested")
return None
return state.partial_results
except Exception as e:
soft_stop.save_and_exit(state, f"error: {e}")
raise
The soft stop is what you want for most planned maintenance scenarios: deploying a new version, pausing the agent during off-hours, or scaling down temporarily.
3. Budget Kill (Cost Ceiling)
This is the one people forget until they get the bill. Track your costs and stop automatically when you hit a ceiling.
from dataclasses import dataclass, field
@dataclass
class BudgetTracker:
max_cost_usd: float
model: str = "gpt-4o"
_total_input_tokens: int = field(default=0, init=False)
_total_output_tokens: int = field(default=0, init=False)
# Approximate costs per 1K tokens (update as pricing changes)
INPUT_COST_PER_1K = {
"gpt-4o": 0.0025,
"gpt-4o-mini": 0.00015,
"claude-3-5-sonnet": 0.003
}
OUTPUT_COST_PER_1K = {
"gpt-4o": 0.01,
"gpt-4o-mini": 0.0006,
"claude-3-5-sonnet": 0.015
}
def record_call(self, input_tokens: int, output_tokens: int):
self._total_input_tokens += input_tokens
self._total_output_tokens += output_tokens
if self.current_cost_usd >= self.max_cost_usd:
raise BudgetExceeded(
f"Budget exceeded: ${self.current_cost_usd:.4f} >= ${self.max_cost_usd}"
)
@property
def current_cost_usd(self) -> float:
input_rate = self.INPUT_COST_PER_1K.get(self.model, 0.003)
output_rate = self.OUTPUT_COST_PER_1K.get(self.model, 0.015)
return (
self._total_input_tokens / 1000 * input_rate +
self._total_output_tokens / 1000 * output_rate
)
class BudgetExceeded(Exception):
pass
# Wrap your LLM calls
def llm_call_with_budget(prompt: str, budget: BudgetTracker) -> str:
response = call_llm(prompt)
budget.record_call(
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens
)
return response.content
Set the budget ceiling conservatively at first. You can always raise it. You can't un-spend money.
Monitoring Patterns
Kill switches only help if you can detect when to pull them. Three monitoring patterns that work in production:
Heartbeat checks. The agent emits a heartbeat every N seconds. An external watchdog checks for the heartbeat. If it stops, the watchdog kills the agent and alerts.
import time
import threading
class AgentHeartbeat:
def __init__(self, interval_seconds: int = 30, timeout_seconds: int = 90):
self.interval = interval_seconds
self.timeout = timeout_seconds
self._last_beat = time.time()
self._start_watchdog()
def beat(self):
self._last_beat = time.time()
def _start_watchdog(self):
def watch():
while True:
time.sleep(self.interval)
age = time.time() - self._last_beat
if age > self.timeout:
print(f"WATCHDOG: heartbeat timeout ({age:.0f}s), stopping agent")
raise SystemExit("Heartbeat timeout")
thread = threading.Thread(target=watch, daemon=True)
thread.start()
Output validation. Before acting on agent output, validate it matches your expected schema. This catches silent degradation early.
from pydantic import BaseModel, ValidationError
class AgentOutput(BaseModel):
action: str
target: str
confidence: float
class Config:
extra = "forbid" # No undeclared fields
def validated_agent_step(raw_output: dict, kill_switch: HardStop) -> AgentOutput:
try:
return AgentOutput(**raw_output)
except ValidationError as e:
print(f"Output validation failed: {e}")
kill_switch.trigger("output schema violation")
raise
The Dead Man's Switch Pattern
This is the most underused pattern in agent design. Instead of the agent running until something tells it to stop, the agent must actively confirm at each checkpoint that it should keep running.
The framing: by default, the agent should stop. It only continues if it explicitly checks in.
class DeadManSwitch:
def __init__(self, confirmation_interval_steps: int = 10):
self.interval = confirmation_interval_steps
self._last_confirmed_step = 0
self._confirmation_fn = None
def set_confirmation_fn(self, fn):
"""fn receives current step, returns True to continue, False to stop"""
self._confirmation_fn = fn
def check_in(self, current_step: int) -> bool:
if current_step - self._last_confirmed_step < self.interval:
return True
if self._confirmation_fn is None:
return False # No confirmation function = stop
should_continue = self._confirmation_fn(current_step)
if should_continue:
self._last_confirmed_step = current_step
return should_continue
# Example: agent confirms based on progress metrics
def run_with_dead_man_switch(task: str):
dms = DeadManSwitch(confirmation_interval_steps=5)
def confirm_continuation(step: int) -> bool:
progress = measure_progress()
if progress.rate_per_step < 0.01:
print(f"Stopping at step {step}: no progress ({progress.rate_per_step:.3f}/step)")
return False
return True
dms.set_confirmation_fn(confirm_continuation)
for step in range(1000):
if not dms.check_in(step):
break
execute_step(task, step)
The key insight: the default state is "stopped." The agent earns the right to continue each time.
Multi-Agent Kill Switches
When agents spawn sub-agents, your kill switch architecture needs to propagate. A parent stopping should cascade to all children. Without this, you get orphaned sub-agents running after the parent is gone.
import asyncio
from asyncio import Event
class CascadingKillSwitch:
def __init__(self, parent: "CascadingKillSwitch" = None):
self._stop = Event()
self._children: list["CascadingKillSwitch"] = []
if parent:
parent._children.append(self)
def spawn_child(self) -> "CascadingKillSwitch":
return CascadingKillSwitch(parent=self)
def trigger(self, reason: str = ""):
if not self._stop.is_set():
print(f"Kill switch triggered: {reason}")
self._stop.set()
for child in self._children:
child.trigger(f"cascade from parent: {reason}")
@property
def is_triggered(self) -> bool:
return self._stop.is_set()
async def run_multi_agent_task(task: str):
root_switch = CascadingKillSwitch()
child1_switch = root_switch.spawn_child()
child2_switch = root_switch.spawn_child()
# If root is triggered, all children stop automatically
results = await asyncio.gather(
run_sub_agent("subtask_a", child1_switch),
run_sub_agent("subtask_b", child2_switch),
)
return results
One timeout on the root switch, and everything underneath stops. No cleanup code scattered across agents.
Putting It Together
In production, you use all three kill switch types together, plus monitoring. A minimal production agent setup:
async def production_agent(task_id: str, config: AgentConfig):
hard_stop = HardStop()
soft_stop = SoftStop(f"/var/agent_state/{task_id}.json")
budget = BudgetTracker(max_cost_usd=config.max_cost_usd)
heartbeat = AgentHeartbeat(interval_seconds=30)
dms = DeadManSwitch(confirmation_interval_steps=10)
state = soft_stop.load_state() or AgentState(task_id=task_id, ...)
try:
for step in range(config.max_steps):
hard_stop.check_or_raise()
heartbeat.beat()
if soft_stop.should_stop_after_current_step():
soft_stop.save_and_exit(state, "graceful stop")
return None
if not dms.check_in(step):
soft_stop.save_and_exit(state, "dead man's switch triggered")
return None
output = await llm_call_with_budget(build_prompt(state), budget)
validated = validated_agent_step(output, hard_stop)
state = apply_step(state, validated)
return state.results
except (BudgetExceeded, SystemExit) as e:
soft_stop.save_and_exit(state, str(e))
raise
None of this is clever. It's defense in depth. Each layer catches a different failure mode, and together they mean no single bug causes a runaway.
The $340 lesson I mentioned at the start cost less than a contractor's hourly rate. But the real cost was trust in a system that was supposed to be reliable. Kill switches are how you earn that trust back.
I build production AI systems for companies. If your agents need guardrails, I can help. astraedus.dev
Top comments (0)