The Problem
It's 3 AM. PagerDuty fires.
You drag yourself to your laptop. Open Grafana. Squint at a spike. Switch to Kibana, filter logs, grep for errors. Cross-reference a recent deployment. Form a hypothesis. Write a Slack message explaining what you found. Wait for someone to approve your fix. Apply it. Verify it worked. Then spend an hour writing a post-mortem that goes into a folder nobody opens.
You do this for every incident. Every single time.
I've been that engineer. So I built IRAS an Intelligent Incident Response Agent System that handles the full first-response lifecycle automatically, and only wakes you up to press Approve.
Here's the architecture, the interesting engineering problems, and the decisions I'd make again (and the ones I wouldn't).
What IRAS Does
When an alert fires, IRAS:
- Ingests the alert from any monitoring system Prometheus AlertManager, PagerDuty, Datadog, or a raw JSON webhook
- Triages severity (P0–P3) and identifies affected services using Claude Haiku
- Gathers context error logs from Elasticsearch/Loki, metrics from Prometheus, recent deployments from GitHub
- Runs root-cause analysis with Claude Sonnet, retrying with broader context if confidence is below threshold
- Generates a step-by-step remediation plan with rollback commands for every step
- Pauses and waits for human approval via Slack or REST API
- Applies the fix if approved, or escalates to PagerDuty if rejected/timed out
- Writes a structured post-mortem timeline, root cause, resolution, action items stored in PostgreSQL and posted to Slack
Total response time from alert to post-mortem: under 2 minutes.
Here's what that looks like in practice:
Alert: "High error rate on payment-service http_error_rate: 45% (threshold: 5%)"
[10:30:01] ▶ Incident ingested
[10:30:02] ▶ P1 · payment-service · ~5,000 users affected · confidence: 0.9
[10:30:04] ▶ DB connection errors in logs, deployment 2m before alert
[10:30:07] ▶ Root cause: DB connection pool exhausted after canary deploy · confidence: 0.88 ✓
[10:30:09] ▶ 3-step remediation plan ready · low risk · rollback commands included
[10:30:09] ▶ Approval request sent to #incidents [Approve] [Reject]
... engineer reviews and clicks Approve (1m 35s later) ...
[10:31:44] ▶ Step 1/3 increase DB_POOL_SIZE from 10 to 50
[10:31:45] ▶ Step 2/3 rolling restart payment-service pods
[10:31:45] ▶ Step 3/3 verify error rate dropped below 2%
[10:31:46] ▶ Post-mortem written and posted to #incidents
[10:31:46] ▶ Resolved · total response time: 1m 45s
The Architecture
IRAS is a 9-node LangGraph state machine with a FastAPI layer on top.
Alert → Ingest → Triage → Context → RCA → Plan → [YOU ⏸] → Apply → Post-mortem
↑ ↓
└── retry if confidence < 0.7
The full system overview:
Alert Sources (PagerDuty / Prometheus / Datadog / any webhook)
↓
FastAPI (POST /webhook/alert)
↓
LangGraph State Machine
├── ingestion — validate, stamp UUID + timestamp
├── triage — Claude Haiku: P0–P3, affected services
├── context_gathering — Claude Haiku + tool calls: logs, metrics, deployments
├── rca — Claude Sonnet: root cause + confidence score
│ ↓ confidence < 0.7? loop back to context_gathering
├── generate_plan — Claude Sonnet: remediation steps + rollback commands
├── approval — interrupt() ⏸ human-in-the-loop
│ ↓ approved → apply_remediation
│ ↓ rejected → escalation
├── apply_remediation — execute steps, rollback on failure
├── escalation — PagerDuty trigger + Slack alert
└── postmortem — Claude Sonnet: structured post-mortem → PostgreSQL + Slack
Now let me walk through the interesting engineering decisions.
Part 1: The Durable Interrupt Pattern
This is the most technically interesting part of IRAS, and the main reason I chose LangGraph over simpler frameworks.
The problem with polling
The naive approach to human-in-the-loop approval is polling. When the agent needs approval, it writes a flag to a database, sends a Slack message, and then polls in a loop:
# The naive approach DON'T do this
async def approval_node(state):
await slack.send_approval_message(state["plan"])
while True:
decision = await db.get_decision(state["incident_id"])
if decision is not None:
return decision
await asyncio.sleep(5) # poll every 5 seconds
This breaks the moment the server restarts. The coroutine is gone. The incident is orphaned. The on-call engineer is staring at a dead Slack message with no way to resume.
LangGraph's interrupt() genuine suspension
LangGraph's interrupt() is fundamentally different. It doesn't poll. It doesn't sleep. It genuinely suspends graph execution, serializes the entire state to the checkpointer (PostgreSQL in our case), and returns control to the caller.
# src/iras/graph/nodes/approval.py
from langgraph.types import interrupt, Command
from ..state import IncidentState
async def approval_node(state: IncidentState) -> dict:
"""
Pauses graph execution and waits for human decision.
State is checkpointed to PostgreSQL survives server restarts.
"""
human_decision = interrupt({
"message": "Remediation plan ready for approval",
"incident_id": state["incident_id"],
"severity": state["triage_result"].severity,
"plan": state["remediation_plan"].model_dump(),
})
# Execution resumes HERE after Command(resume=...) is sent
return {"human_approved": human_decision["approved"]}
When interrupt() is called:
- The graph state is serialized to PostgreSQL via
AsyncPostgresSaver - The coroutine is suspended
- The FastAPI endpoint returns
202 Acceptedwith theincident_id - The server can restart. The process can crash. The incident is safe.
When the engineer hits POST /incidents/{id}/approve:
# src/iras/api/routes/approval.py
@router.post("/incidents/{incident_id}/approve")
async def approve_incident(incident_id: str, graph=Depends(get_graph)):
"""Resume the paused graph with an approval decision."""
await graph.ainvoke(
Command(resume={"approved": True}),
config={"configurable": {"thread_id": incident_id}}
)
return {"incident_id": incident_id, "decision": "approved", "status": "resumed"}
LangGraph reconstructs the graph state from the PostgreSQL checkpoint using thread_id, injects the Command(resume=...), and execution continues exactly where it left off same state, same node, no re-running prior stages.
The checkpointer setup
# src/iras/graph/checkpointer.py
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import asyncio
_checkpointer: AsyncPostgresSaver | None = None
_lock = asyncio.Lock()
async def get_checkpointer(postgres_url: str) -> AsyncPostgresSaver:
"""Singleton with asyncio.Lock to prevent double-initialization."""
global _checkpointer
async with _lock:
if _checkpointer is None:
_checkpointer = AsyncPostgresSaver.from_conn_string(postgres_url)
await _checkpointer.setup() # creates checkpoint tables
return _checkpointer
The singleton + asyncio.Lock() pattern is important here. Without it, multiple concurrent requests during startup can race to initialize the checkpointer, resulting in duplicate table creation attempts.
Timeout monitoring without in-memory state
Because all state is in PostgreSQL, the approval timeout monitor doesn't need in-memory state either:
# src/iras/api/background.py
async def monitor_approval_timeouts(graph, settings):
"""
Runs as a background task. Queries PostgreSQL for interrupted
threads that have exceeded their SLA window.
No in-memory state required survives restarts cleanly.
"""
while True:
await asyncio.sleep(60) # check every minute
interrupted_incidents = await get_interrupted_incidents()
for incident in interrupted_incidents:
timeout = get_timeout_for_severity(incident.severity, settings)
elapsed = datetime.utcnow() - incident.interrupted_at
if elapsed > timeout:
# Escalate by resuming with approved=False
await graph.ainvoke(
Command(resume={"approved": False, "reason": "timeout"}),
config={"configurable": {"thread_id": incident.incident_id}}
)
P0 incidents escalate after 15 minutes. P1–P3 after 2 hours. Configurable via environment variables.
Part 2: Typed Agent Outputs with Pydantic AI
Most AI agent code I've seen looks like this:
response = await llm.generate(prompt)
text = response.content
# Now parse the text... somehow
severity = re.search(r"severity: (P\d)", text).group(1)
confidence = float(re.search(r"confidence: ([\d.]+)", text).group(1))
This is fragile. The model output format drifts. Regex breaks. You get None at 3 AM when you least want it.
IRAS uses Pydantic AI to get strongly-typed, validated outputs directly from every agent. Here's the triage agent:
# src/iras/models/incident.py
from pydantic import BaseModel, Field
from enum import Enum
class Severity(str, Enum):
P0 = "P0"
P1 = "P1"
P2 = "P2"
P3 = "P3"
class TriageResult(BaseModel):
severity: Severity
affected_services: list[str]
estimated_users_affected: int
confidence: float = Field(ge=0.0, le=1.0)
reasoning: str
# src/iras/agents/triage.py
from pydantic_ai import Agent
from ..models.incident import TriageResult
triage_agent = Agent(
model="claude-haiku-4-5",
result_type=TriageResult, # Pydantic AI validates and parses this automatically
system_prompt="""
You are a production incident triage specialist.
Classify the incident severity, identify affected services,
estimate user impact, and provide a confidence score.
Severity guide:
- P0: Complete service outage, all users affected
- P1: Major degradation, >20% of users affected
- P2: Partial degradation, <20% of users affected
- P3: Warning or informational, no user impact
"""
)
async def run_triage(alert_payload: dict) -> TriageResult:
result = await triage_agent.run(str(alert_payload))
return result.data # TriageResult fully validated, type-safe
Every stage follows this pattern. The RCA agent returns a RootCauseHypothesis. The remediation agent returns a RemediationPlan. The post-mortem agent returns a PostMortem. The rest of the graph code is just Python no parsing, no regex, no json.loads() on LLM output.
# src/iras/models/incident.py (continued)
class RootCauseHypothesis(BaseModel):
primary_cause: str
contributing_factors: list[str]
evidence: list[str] # specific log lines or metric values
confidence: float = Field(ge=0.0, le=1.0)
recommended_investigation: str
class RemediationStep(BaseModel):
action: str
rollback_command: str
risk_level: Literal["low", "medium", "high"]
estimated_duration_seconds: int
class RemediationPlan(BaseModel):
steps: list[RemediationStep]
overall_risk: Literal["low", "medium", "high"]
reversible: bool
requires_human_approval: bool
estimated_total_duration_seconds: int
class PostMortem(BaseModel):
incident_id: str
severity: Severity
timeline: list[str]
root_cause_summary: str
resolution_summary: str
action_items: list[str]
total_duration_minutes: float
resolved: bool
Model selection per agent
Each agent instantiates its own model. This matters:
# Fast and cheap for classification tasks
triage_agent = Agent(model="claude-haiku-4-5", result_type=TriageResult, ...)
context_agent = Agent(model="claude-haiku-4-5", result_type=ContextBundle, ...)
# Slower and more capable for deep reasoning
rca_agent = Agent(model="claude-sonnet-4-5", result_type=RootCauseHypothesis, ...)
remediation_agent = Agent(model="claude-sonnet-4-5", result_type=RemediationPlan, ...)
postmortem_agent = Agent(model="claude-sonnet-4-5", result_type=PostMortem, ...)
Haiku costs roughly 20x less than Sonnet and is fast enough for triage and context gathering. Sonnet is worth the cost for RCA and remediation planning these are the decisions that affect production.
Part 3: The Confidence-Gated RCA Retry Loop
Root cause analysis is genuinely hard. The first attempt often doesn't have enough evidence. IRAS handles this with a confidence-gated retry loop baked into the LangGraph conditional edges:
# src/iras/graph/nodes/rca.py
async def rca_node(state: IncidentState) -> IncidentState:
hypothesis = await run_rca(
context=state["context_bundle"],
alert=state["alert_payload"],
attempt=state.get("rca_attempts", 0)
)
return {
**state,
"rca_hypothesis": hypothesis,
"rca_attempts": state.get("rca_attempts", 0) + 1,
}
def should_retry_rca(state: IncidentState) -> str:
"""Conditional edge: decides what happens after RCA."""
hypothesis = state["rca_hypothesis"]
attempts = state.get("rca_attempts", 0)
max_attempts = state["settings"].rca_max_attempts
threshold = state["settings"].rca_confidence_threshold
if hypothesis.confidence >= threshold:
return "generate_plan" # confidence is good, proceed
elif attempts < max_attempts:
return "context_gathering" # loop back for more evidence
else:
return "escalation" # exhausted retries, escalate
# src/iras/graph/builder.py
graph.add_conditional_edges(
"rca",
should_retry_rca,
{
"generate_plan": "generate_plan",
"context_gathering": "context_gathering",
"escalation": "escalation",
}
)
On retry, the context agent widens its evidence window pulling a longer log time range and more deployment history. This typically lifts confidence from 0.5–0.6 to above 0.7 on the second attempt.
Default thresholds: confidence >= 0.7 to proceed, max 3 RCA attempts before auto-escalation.
Part 4: We Don't Trust the Model
This is the part I'm most proud of and that I think most AI agent projects get wrong.
When you're building an agent that can modify production systems, the model's output isn't just text it's an instruction set. You need to treat it like untrusted input.
Safety invariants enforced in code
Two rules apply to every remediation plan, regardless of what the model returns:
# src/iras/graph/nodes/generate_plan.py
async def generate_plan_node(state: IncidentState) -> IncidentState:
plan = await run_remediation_agent(
hypothesis=state["rca_hypothesis"],
context=state["context_bundle"],
)
# SAFETY RULE 1: Any high-risk step forces human approval.
# The model cannot classify all steps as "low" to bypass this.
if any(step.risk_level == "high" for step in plan.steps):
plan.requires_human_approval = True
# SAFETY RULE 2: Any step without a rollback command marks
# the plan as irreversible and forces human approval.
if any(not step.rollback_command.strip() for step in plan.steps):
plan.reversible = False
plan.requires_human_approval = True
return {**state, "remediation_plan": plan}
These are not prompts. They're not suggestions. They run on every plan output, unconditionally.
Adversarial test scenarios
The stress test suite includes 47 scenarios specifically designed to test model misbehavior:
# tests/stress/test_adversarial.py
class TestAdversarialModelOutputs:
async def test_model_lies_about_risk_level(self, graph, mock_claude):
"""Model claims all steps are low-risk to bypass approval."""
mock_claude.remediation_returns(RemediationPlan(
steps=[
RemediationStep(
action="delete all pods",
rollback_command="", # empty rollback
risk_level="low", # model lying
estimated_duration_seconds=5,
)
],
overall_risk="low",
reversible=True,
requires_human_approval=False, # model bypassing approval
))
result = await graph.ainvoke(make_incident_state())
# Safety invariants caught it
assert result["remediation_plan"].requires_human_approval is True
assert result["remediation_plan"].reversible is False
async def test_all_context_tools_fail(self, graph, mock_tools):
"""All external integrations return errors simultaneously."""
mock_tools.logs.raises(ConnectionError("Elasticsearch down"))
mock_tools.metrics.raises(ConnectionError("Prometheus down"))
mock_tools.deployments.raises(ConnectionError("GitHub API rate limited"))
# Should degrade gracefully, not crash
result = await graph.ainvoke(make_incident_state())
assert result["status"] != "crashed"
assert result["context_bundle"] is not None # empty but valid
async def test_twenty_concurrent_incidents(self, graph):
"""No state contamination between concurrent incident graphs."""
incidents = [make_incident_state(f"incident-{i}") for i in range(20)]
results = await asyncio.gather(*[
graph.ainvoke(state) for state in incidents
])
# Every incident has its own isolated state
incident_ids = [r["incident_id"] for r in results]
assert len(set(incident_ids)) == 20 # all unique
292 tests total, 99% coverage. The test suite takes about 30 seconds to run.
Part 5: The Context Gathering Agent (Tool Calls)
The context agent uses Claude Haiku with tool calling to gather evidence from three sources simultaneously:
# src/iras/agents/context_gathering.py
from pydantic_ai import Agent
from pydantic_ai.tools import Tool
from ..models.incident import ContextBundle
from ..deps import ContextDeps
context_agent = Agent(
model="claude-haiku-4-5",
result_type=ContextBundle,
deps_type=ContextDeps,
system_prompt="""
You are an SRE context gathering specialist.
Use the available tools to collect evidence about the incident.
Fetch logs, metrics, and deployment history for the affected service.
Bundle all evidence into a structured ContextBundle.
"""
)
@context_agent.tool
async def fetch_logs(ctx, service: str, time_range_minutes: int = 30) -> list[str]:
"""Fetch recent error and warning logs for a service."""
return await ctx.deps.log_client.get_logs(
service=service,
time_range_minutes=time_range_minutes,
levels=["ERROR", "WARN"],
)
@context_agent.tool
async def fetch_metrics(ctx, service: str) -> dict:
"""Fetch current metrics vs 7-day baseline for a service."""
return await ctx.deps.metrics_client.get_comparison(
service=service,
metrics=["error_rate", "latency_p99", "request_rate", "cpu_usage"],
)
@context_agent.tool
async def fetch_deployments(ctx, service: str, hours: int = 24) -> list[dict]:
"""Fetch recent deployments for a service from GitHub."""
return await ctx.deps.deployment_client.get_recent(
service=service,
hours=hours,
)
Each tool has a Mock*Client fallback. If ELASTICSEARCH_BASE_URL isn't set, a mock client returns realistic fake data. This means the full graph runs end-to-end with only two environment variables: ANTHROPIC_API_KEY and POSTGRES_URL.
Dependency injection
Tool clients are injected via ContextDeps, making them swappable in tests:
# src/iras/agents/deps.py
from dataclasses import dataclass
from ..tools.log_fetcher import LogClient, MockLogClient
from ..tools.metrics import MetricsClient, MockMetricsClient
from ..tools.deployment import DeploymentClient, MockDeploymentClient
@dataclass
class ContextDeps:
log_client: LogClient | MockLogClient
metrics_client: MetricsClient | MockMetricsClient
deployment_client: DeploymentClient | MockDeploymentClient
def make_context_deps(settings) -> ContextDeps:
"""Returns real or mock clients based on environment config."""
return ContextDeps(
log_client=LogClient(settings.elasticsearch_url)
if settings.elasticsearch_url
else MockLogClient(),
metrics_client=MetricsClient(settings.prometheus_url)
if settings.prometheus_url
else MockMetricsClient(),
deployment_client=DeploymentClient(settings.github_token)
if settings.github_token
else MockDeploymentClient(),
)
Part 6: The LangGraph State Machine
The full graph wiring:
# src/iras/graph/builder.py
from langgraph.graph import StateGraph, START, END
from .state import IncidentState
from .nodes import (
ingestion, triage, context_gathering, rca,
generate_plan, approval, apply_remediation,
escalation, postmortem
)
def build_graph(checkpointer) -> CompiledGraph:
builder = StateGraph(IncidentState)
# Add nodes
builder.add_node("ingestion", ingestion.run)
builder.add_node("triage", triage.run)
builder.add_node("context_gathering", context_gathering.run)
builder.add_node("rca", rca.run)
builder.add_node("generate_plan", generate_plan.run)
builder.add_node("approval", approval.run)
builder.add_node("apply_remediation", apply_remediation.run)
builder.add_node("escalation", escalation.run)
builder.add_node("postmortem", postmortem.run)
# Linear edges
builder.add_edge(START, "ingestion")
builder.add_edge("ingestion", "triage")
builder.add_edge("triage", "context_gathering")
builder.add_edge("context_gathering", "rca")
# Confidence-gated RCA retry loop
builder.add_conditional_edges(
"rca",
should_retry_rca,
{
"generate_plan": "generate_plan",
"context_gathering": "context_gathering",
"escalation": "escalation",
}
)
# Human approval branch
builder.add_edge("generate_plan", "approval")
builder.add_conditional_edges(
"approval",
lambda state: "apply_remediation" if state["human_approved"] else "escalation",
{
"apply_remediation": "apply_remediation",
"escalation": "escalation",
}
)
# Both paths converge at postmortem
builder.add_edge("apply_remediation", "postmortem")
builder.add_edge("escalation", "postmortem")
builder.add_edge("postmortem", END)
return builder.compile(
checkpointer=checkpointer,
interrupt_before=["approval"], # pause before the approval node
)
One important detail: interrupt_before=["approval"] tells LangGraph to checkpoint state before entering the approval node, not inside it. This means the plan is fully generated and the Slack message is sent before the graph suspends.
Running It
Only two things required:
git clone https://github.com/krishnashakula/IRAS.git && cd IRAS
# Start Postgres
docker run -d --name iras-postgres \
-e POSTGRES_USER=iras -e POSTGRES_PASSWORD=secret -e POSTGRES_DB=iras \
-p 5432:5432 postgres:16
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
cp .env.example .env
# Set ANTHROPIC_API_KEY and POSTGRES_URL
python run.py
Then fire a test alert:
curl -X POST http://localhost:8000/webhook/alert \
-H "Content-Type: application/json" \
-d '{
"title": "High error rate on payment-service",
"timestamp": "2026-05-03T10:30:00Z",
"service": "payment-service",
"error_rate": 0.45
}'
# {"incident_id": "550e8400-...", "status": "processing"}
# Approve the plan (or wait for the Slack message if configured)
curl -X POST http://localhost:8000/incidents/550e8400-.../approve
What I'd Do Differently
1. Start with MemorySaver, not PostgreSQL
For local development and prototyping, LangGraph's MemorySaver (in-memory checkpointer) is much faster to set up. I spent time early on getting Postgres running when I didn't need durability yet. Start with MemorySaver, switch to AsyncPostgresSaver when you're ready for production.
2. Separate trace IDs from thread IDs
IRAS uses the same UUID for the HTTP response incident_id, the LangGraph thread_id, and the database primary key. Convenient, but it creates coupling. If you ever want to re-run an incident or fork a graph for testing, you'll want these to be different.
3. Add streaming earlier
The graph produces intermediate outputs (triage result, context bundle, etc.) as it runs. Currently these are only visible via LangSmith traces. Adding Server-Sent Events to stream node outputs to a UI would make the "watching it work" experience much better.
Key Takeaways
LangGraph's interrupt() is not a workaround it's a first-class primitive for durable human-in-the-loop workflows. If you're building agents that need human approval in production, this is the pattern.
Pydantic AI's typed outputs eliminate an entire class of bugs. Parsing LLM output with regex or manual JSON extraction is fragile. Defining your output schema as a Pydantic model and letting the framework handle parsing is strictly better.
Safety invariants belong in code, not prompts. Prompting the model to be safe is not enough when the output drives production changes. Enforce your invariants programmatically, after the model responds.
Mock clients everywhere. If every external integration falls back to a mock, the full system is testable with zero infrastructure. This pays for itself immediately in CI speed and developer experience.
Links
- GitHub: https://github.com/krishnashakula/IRAS
- LangGraph docs: https://langchain-ai.github.io/langgraph/
- Pydantic AI docs: https://ai.pydantic.dev
- AsyncPostgresSaver: https://langchain-ai.github.io/langgraph/reference/checkpoints/
If you've built something similar, or have questions about the interrupt pattern or the Pydantic AI setup, drop them in the comments happy to go deeper on any of it.
And if IRAS would've saved your last 3 AM page, give it a ⭐ on GitHub.
Top comments (0)