DEV Community

Diven Rastdus
Diven Rastdus

Posted on

Why Your AI Agent Needs a Kill Switch (and How to Build One)

When I first started building production AI agents, I was focused on the wrong thing. I spent weeks perfecting the prompt, the tool routing, the memory system. What I didn't build was an off switch.

Six months later, one of those agents went into a retry loop at 2 AM and racked up $340 in API costs before I woke up to 47 Slack alerts. The agent wasn't broken -- it was doing exactly what it was designed to do, just in a situation I hadn't anticipated. That's the thing about agents: they're very good at pursuing their objective. The problem is when the objective is wrong, the environment is unexpected, or the costs are real.

Every production agent needs a kill switch. Several, actually.

The Real Failure Modes

Before we talk about solutions, let's be specific about what we're protecting against.

Infinite retry loops. An agent that hits a transient API error and retries forever. Or one that misinterprets a "task not complete" signal as a reason to keep trying. Without a hard iteration ceiling, this is a matter of when, not if.

Runaway costs. LLM calls are not free. An agent processing a malformed input that causes it to spawn sub-agents, which spawn more sub-agents, can hit $1,000 in costs before a human notices. Token counting at the agent level is something most teams skip until they get the bill.

Unintended side effects. An agent with write permissions to a database, email sender access, or file system access can cause irreversible damage. If the agent logic has a bug, the damage compounds with every step it takes before you catch it.

Silent degradation. Some agents don't fail loudly -- they just start producing wrong outputs quietly. Without output validation, you might not notice for hours or days.

These are not edge cases. They're the default trajectory of agents running without guardrails.

Three Types of Kill Switches

Think of kill switches in layers: hard, soft, and budget.

1. Hard Stop (Process Kill)

The nuclear option. Use this when the agent needs to stop immediately, regardless of state. The pattern is simple: a shared flag that any part of the system can flip.

import threading
import signal
import time
from typing import Callable

class HardStop:
    def __init__(self):
        self._stop_event = threading.Event()
        # Register signal handlers so Ctrl+C and SIGTERM work too
        signal.signal(signal.SIGINT, self._handle_signal)
        signal.signal(signal.SIGTERM, self._handle_signal)

    def _handle_signal(self, signum, frame):
        print(f"Signal {signum} received, stopping agent...")
        self.trigger()

    def trigger(self, reason: str = "manual stop"):
        print(f"HARD STOP triggered: {reason}")
        self._stop_event.set()

    def should_stop(self) -> bool:
        return self._stop_event.is_set()

    def check_or_raise(self):
        if self.should_stop():
            raise SystemExit("Agent stopped by kill switch")


# Usage in your agent loop
kill_switch = HardStop()

def run_agent(task: str, kill_switch: HardStop):
    for step in range(100):  # iteration ceiling
        kill_switch.check_or_raise()

        result = execute_step(task, step)

        if result.is_complete:
            return result

    raise RuntimeError("Exceeded max iterations without completing")
Enter fullscreen mode Exit fullscreen mode

For TypeScript, the pattern is similar:

class HardStop {
  private stopped = false;
  private reason = "";

  constructor() {
    process.on("SIGINT", () => this.trigger("SIGINT received"));
    process.on("SIGTERM", () => this.trigger("SIGTERM received"));
  }

  trigger(reason: string): void {
    console.log(`HARD STOP: ${reason}`);
    this.stopped = true;
    this.reason = reason;
  }

  checkOrThrow(): void {
    if (this.stopped) {
      throw new Error(`Agent stopped: ${this.reason}`);
    }
  }

  get isTriggered(): boolean {
    return this.stopped;
  }
}
Enter fullscreen mode Exit fullscreen mode

The key discipline: call check_or_raise() at the top of every loop iteration and before every external call. Not just once at the start.

2. Soft Stop (Graceful Shutdown with State Save)

Hard stops are useful, but often you want the agent to finish what it's doing, save its state, and exit cleanly. This matters when the agent has made partial progress you want to resume later.

import json
from dataclasses import dataclass, asdict
from pathlib import Path

@dataclass
class AgentState:
    task_id: str
    current_step: int
    completed_steps: list
    partial_results: dict
    stop_reason: str = ""

class SoftStop:
    def __init__(self, state_path: str):
        self.state_path = Path(state_path)
        self._graceful_stop = False

    def request_stop(self, reason: str = "graceful shutdown"):
        print(f"Graceful stop requested: {reason}")
        self._graceful_stop = True

    def should_stop_after_current_step(self) -> bool:
        return self._graceful_stop

    def save_and_exit(self, state: AgentState, reason: str):
        state.stop_reason = reason
        with open(self.state_path, "w") as f:
            json.dump(asdict(state), f, indent=2)
        print(f"State saved to {self.state_path}. Resume with task_id={state.task_id}")

    def load_state(self) -> AgentState | None:
        if not self.state_path.exists():
            return None
        with open(self.state_path) as f:
            data = json.load(f)
        return AgentState(**data)


def run_resumable_agent(task_id: str, soft_stop: SoftStop):
    # Try to resume from previous state
    state = soft_stop.load_state() or AgentState(
        task_id=task_id,
        current_step=0,
        completed_steps=[],
        partial_results={}
    )

    try:
        for step in range(state.current_step, 100):
            result = execute_step(task_id, step)
            state.completed_steps.append(step)
            state.current_step = step + 1

            # Check for graceful stop after each step completes
            if soft_stop.should_stop_after_current_step():
                soft_stop.save_and_exit(state, "graceful shutdown requested")
                return None

        return state.partial_results

    except Exception as e:
        soft_stop.save_and_exit(state, f"error: {e}")
        raise
Enter fullscreen mode Exit fullscreen mode

The soft stop is what you want for most planned maintenance scenarios: deploying a new version, pausing the agent during off-hours, or scaling down temporarily.

3. Budget Kill (Cost Ceiling)

This is the one people forget until they get the bill. Track your costs and stop automatically when you hit a ceiling.

from dataclasses import dataclass, field

@dataclass
class BudgetTracker:
    max_cost_usd: float
    model: str = "gpt-4o"

    _total_input_tokens: int = field(default=0, init=False)
    _total_output_tokens: int = field(default=0, init=False)

    # Approximate costs per 1K tokens (update as pricing changes)
    INPUT_COST_PER_1K = {
        "gpt-4o": 0.0025,
        "gpt-4o-mini": 0.00015,
        "claude-3-5-sonnet": 0.003
    }
    OUTPUT_COST_PER_1K = {
        "gpt-4o": 0.01,
        "gpt-4o-mini": 0.0006,
        "claude-3-5-sonnet": 0.015
    }

    def record_call(self, input_tokens: int, output_tokens: int):
        self._total_input_tokens += input_tokens
        self._total_output_tokens += output_tokens

        if self.current_cost_usd >= self.max_cost_usd:
            raise BudgetExceeded(
                f"Budget exceeded: ${self.current_cost_usd:.4f} >= ${self.max_cost_usd}"
            )

    @property
    def current_cost_usd(self) -> float:
        input_rate = self.INPUT_COST_PER_1K.get(self.model, 0.003)
        output_rate = self.OUTPUT_COST_PER_1K.get(self.model, 0.015)
        return (
            self._total_input_tokens / 1000 * input_rate +
            self._total_output_tokens / 1000 * output_rate
        )

class BudgetExceeded(Exception):
    pass


# Wrap your LLM calls
def llm_call_with_budget(prompt: str, budget: BudgetTracker) -> str:
    response = call_llm(prompt)
    budget.record_call(
        input_tokens=response.usage.input_tokens,
        output_tokens=response.usage.output_tokens
    )
    return response.content
Enter fullscreen mode Exit fullscreen mode

Set the budget ceiling conservatively at first. You can always raise it. You can't un-spend money.

Monitoring Patterns

Kill switches only help if you can detect when to pull them. Three monitoring patterns that work in production:

Heartbeat checks. The agent emits a heartbeat every N seconds. An external watchdog checks for the heartbeat. If it stops, the watchdog kills the agent and alerts.

import time
import threading

class AgentHeartbeat:
    def __init__(self, interval_seconds: int = 30, timeout_seconds: int = 90):
        self.interval = interval_seconds
        self.timeout = timeout_seconds
        self._last_beat = time.time()
        self._start_watchdog()

    def beat(self):
        self._last_beat = time.time()

    def _start_watchdog(self):
        def watch():
            while True:
                time.sleep(self.interval)
                age = time.time() - self._last_beat
                if age > self.timeout:
                    print(f"WATCHDOG: heartbeat timeout ({age:.0f}s), stopping agent")
                    raise SystemExit("Heartbeat timeout")

        thread = threading.Thread(target=watch, daemon=True)
        thread.start()
Enter fullscreen mode Exit fullscreen mode

Output validation. Before acting on agent output, validate it matches your expected schema. This catches silent degradation early.

from pydantic import BaseModel, ValidationError

class AgentOutput(BaseModel):
    action: str
    target: str
    confidence: float

    class Config:
        extra = "forbid"  # No undeclared fields

def validated_agent_step(raw_output: dict, kill_switch: HardStop) -> AgentOutput:
    try:
        return AgentOutput(**raw_output)
    except ValidationError as e:
        print(f"Output validation failed: {e}")
        kill_switch.trigger("output schema violation")
        raise
Enter fullscreen mode Exit fullscreen mode

The Dead Man's Switch Pattern

This is the most underused pattern in agent design. Instead of the agent running until something tells it to stop, the agent must actively confirm at each checkpoint that it should keep running.

The framing: by default, the agent should stop. It only continues if it explicitly checks in.

class DeadManSwitch:
    def __init__(self, confirmation_interval_steps: int = 10):
        self.interval = confirmation_interval_steps
        self._last_confirmed_step = 0
        self._confirmation_fn = None

    def set_confirmation_fn(self, fn):
        """fn receives current step, returns True to continue, False to stop"""
        self._confirmation_fn = fn

    def check_in(self, current_step: int) -> bool:
        if current_step - self._last_confirmed_step < self.interval:
            return True

        if self._confirmation_fn is None:
            return False  # No confirmation function = stop

        should_continue = self._confirmation_fn(current_step)
        if should_continue:
            self._last_confirmed_step = current_step
        return should_continue


# Example: agent confirms based on progress metrics
def run_with_dead_man_switch(task: str):
    dms = DeadManSwitch(confirmation_interval_steps=5)

    def confirm_continuation(step: int) -> bool:
        progress = measure_progress()
        if progress.rate_per_step < 0.01:
            print(f"Stopping at step {step}: no progress ({progress.rate_per_step:.3f}/step)")
            return False
        return True

    dms.set_confirmation_fn(confirm_continuation)

    for step in range(1000):
        if not dms.check_in(step):
            break
        execute_step(task, step)
Enter fullscreen mode Exit fullscreen mode

The key insight: the default state is "stopped." The agent earns the right to continue each time.

Multi-Agent Kill Switches

When agents spawn sub-agents, your kill switch architecture needs to propagate. A parent stopping should cascade to all children. Without this, you get orphaned sub-agents running after the parent is gone.

import asyncio
from asyncio import Event

class CascadingKillSwitch:
    def __init__(self, parent: "CascadingKillSwitch" = None):
        self._stop = Event()
        self._children: list["CascadingKillSwitch"] = []

        if parent:
            parent._children.append(self)

    def spawn_child(self) -> "CascadingKillSwitch":
        return CascadingKillSwitch(parent=self)

    def trigger(self, reason: str = ""):
        if not self._stop.is_set():
            print(f"Kill switch triggered: {reason}")
            self._stop.set()
            for child in self._children:
                child.trigger(f"cascade from parent: {reason}")

    @property
    def is_triggered(self) -> bool:
        return self._stop.is_set()


async def run_multi_agent_task(task: str):
    root_switch = CascadingKillSwitch()

    child1_switch = root_switch.spawn_child()
    child2_switch = root_switch.spawn_child()

    # If root is triggered, all children stop automatically
    results = await asyncio.gather(
        run_sub_agent("subtask_a", child1_switch),
        run_sub_agent("subtask_b", child2_switch),
    )
    return results
Enter fullscreen mode Exit fullscreen mode

One timeout on the root switch, and everything underneath stops. No cleanup code scattered across agents.

Putting It Together

In production, you use all three kill switch types together, plus monitoring. A minimal production agent setup:

async def production_agent(task_id: str, config: AgentConfig):
    hard_stop = HardStop()
    soft_stop = SoftStop(f"/var/agent_state/{task_id}.json")
    budget = BudgetTracker(max_cost_usd=config.max_cost_usd)
    heartbeat = AgentHeartbeat(interval_seconds=30)
    dms = DeadManSwitch(confirmation_interval_steps=10)

    state = soft_stop.load_state() or AgentState(task_id=task_id, ...)

    try:
        for step in range(config.max_steps):
            hard_stop.check_or_raise()
            heartbeat.beat()

            if soft_stop.should_stop_after_current_step():
                soft_stop.save_and_exit(state, "graceful stop")
                return None

            if not dms.check_in(step):
                soft_stop.save_and_exit(state, "dead man's switch triggered")
                return None

            output = await llm_call_with_budget(build_prompt(state), budget)
            validated = validated_agent_step(output, hard_stop)
            state = apply_step(state, validated)

        return state.results

    except (BudgetExceeded, SystemExit) as e:
        soft_stop.save_and_exit(state, str(e))
        raise
Enter fullscreen mode Exit fullscreen mode

None of this is clever. It's defense in depth. Each layer catches a different failure mode, and together they mean no single bug causes a runaway.

The $340 lesson I mentioned at the start cost less than a contractor's hourly rate. But the real cost was trust in a system that was supposed to be reliable. Kill switches are how you earn that trust back.


I build production AI systems for companies. If your agents need guardrails, I can help. astraedus.dev

Top comments (0)