We used to resolve incidents with Slack messages, gut instinct, and heroics. Now most incidents resolve themselves. Here's exactly how I built that.
The Problem with Manual Incident Response
At 2:47am on a Tuesday, our payment service started throwing errors. A senior engineer woke up to a PagerDuty alert, spent 12 minutes just finding the right runbook, another 20 minutes correlating logs across three different dashboards, and finally identified a misconfigured connection pool that had been deployed 6 hours earlier.
Total time to resolve: 51 minutes of customer-impacting downtime.
The fix itself? Four lines of config. The rest was just finding the problem.
I decided to systematically eliminate the detective work. This post covers the automation layer I built on GCP to detect, diagnose, and in many cases auto-remediate incidents before a human ever gets paged.
System Architecture
[Cloud Monitoring]
|
| Alert fires (Pub/Sub notification)
▼
[Cloud Functions] ← Incident Orchestrator
|
├── Enrich context (logs, traces, recent deploys)
├── Classify type (latency / error rate / saturation)
├── Check runbook (known issue? auto-remediate)
├── Notify humans (Slack + PagerDuty if needed)
└── Create ticket (Jira with pre-filled context)
|
▼
[Firestore] ← Incident state store
[Cloud Run] ← Remediation actions
[BigQuery] ← Incident history + analytics
Step 1: Alert Routing via Pub/Sub
All Cloud Monitoring alerts route to a single Pub/Sub topic. The alert payload carries enough context to start triage immediately.
# Cloud Monitoring notification channel config (Terraform)
resource "google_monitoring_notification_channel" "pubsub" {
display_name = "Incident Automation Topic"
type = "pubsub"
labels = {
topic = "projects/my-project/topics/monitoring-alerts"
}
}
resource "google_monitoring_alert_policy" "high_error_rate" {
display_name = "High Error Rate - API Service"
combiner = "OR"
conditions {
display_name = "Error rate > 1%"
condition_threshold {
filter = "metric.type=\"custom.googleapis.com/sli/error_rate\" resource.type=\"k8s_container\""
duration = "120s"
comparison = "COMPARISON_GT"
threshold_value = 0.01
aggregations {
alignment_period = "60s"
per_series_aligner = "ALIGN_RATE"
}
}
}
notification_channels = [google_monitoring_notification_channel.pubsub.name]
alert_strategy {
auto_close = "604800s"
}
}
Step 2: The Incident Orchestrator (Cloud Functions)
This is the heart of the system. A Cloud Function subscribes to the alert topic and orchestrates the response:
import functions_framework
import json
import base64
from google.cloud import logging_v2, firestore, pubsub_v1
from datetime import datetime, timedelta
import requests
db = firestore.Client()
log_client = logging_v2.Client()
@functions_framework.cloud_event
def handle_alert(cloud_event):
"""Entry point: triggered by Pub/Sub alert message."""
# Decode the alert payload
data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
alert = json.loads(data)
incident_id = f"inc_{cloud_event.id[:8]}"
print(json.dumps({
"message": "Incident received",
"incident_id": incident_id,
"policy": alert.get("incident", {}).get("policy_name"),
"severity": "INFO"
}))
# Store incident state
incident_ref = db.collection("incidents").document(incident_id)
incident_ref.set({
"id": incident_id,
"alert": alert,
"status": "investigating",
"created_at": datetime.utcnow(),
"timeline": []
})
# Run the triage pipeline
context = enrich_context(alert, incident_id)
classification = classify_incident(context)
remediated = attempt_auto_remediation(classification, context, incident_id)
if not remediated:
notify_on_call(incident_id, context, classification)
# Always log to BigQuery for trend analysis
log_to_bigquery(incident_id, classification, context, remediated)
def enrich_context(alert: dict, incident_id: str) -> dict:
"""Gather all context needed for triage."""
resource = alert.get("incident", {}).get("resource", {})
service_name = resource.get("labels", {}).get("container_name", "unknown")
context = {
"service": service_name,
"alert_time": alert.get("incident", {}).get("started_at"),
"policy": alert.get("incident", {}).get("policy_name"),
"recent_logs": fetch_recent_errors(service_name),
"recent_deploys": fetch_recent_deploys(service_name),
"upstream_health": check_upstream_dependencies(service_name),
}
# Add timeline entry
db.collection("incidents").document(incident_id).update({
"timeline": firestore.ArrayUnion([{
"time": datetime.utcnow().isoformat(),
"action": "context_enriched",
"data": {"services_checked": list(context.keys())}
}])
})
return context
def fetch_recent_errors(service_name: str) -> list:
"""Pull last 50 error logs from the past 15 minutes."""
client = log_client
now = datetime.utcnow()
window_start = (now - timedelta(minutes=15)).strftime("%Y-%m-%dT%H:%M:%SZ")
filter_str = f"""
resource.type="k8s_container"
resource.labels.container_name="{service_name}"
severity>=ERROR
timestamp>="{window_start}"
"""
entries = list(client.list_entries(filter_=filter_str, max_results=50))
return [
{
"timestamp": entry.timestamp.isoformat(),
"message": entry.payload if isinstance(entry.payload, str)
else entry.payload.get("message", ""),
"severity": entry.severity,
}
for entry in entries
]
def fetch_recent_deploys(service_name: str) -> list:
"""Check Cloud Deploy history for recent changes."""
# Query your deployment history — this example uses a Firestore deploys collection
deploys_ref = db.collection("deploys") \
.where("service", "==", service_name) \
.where("deployed_at", ">=", datetime.utcnow() - timedelta(hours=4)) \
.order_by("deployed_at", direction=firestore.Query.DESCENDING) \
.limit(5)
return [
{
"version": d.get("version"),
"deployed_at": d.get("deployed_at").isoformat(),
"deployed_by": d.get("deployed_by"),
}
for d in [doc.to_dict() for doc in deploys_ref.stream()]
]
Step 3: Incident Classification
Once I have context, I classify the incident type. This drives which remediation playbook to run:
from enum import Enum
class IncidentType(Enum):
RECENT_DEPLOY_REGRESSION = "recent_deploy_regression"
DEPENDENCY_FAILURE = "dependency_failure"
RESOURCE_EXHAUSTION = "resource_exhaustion"
TRAFFIC_SPIKE = "traffic_spike"
UNKNOWN = "unknown"
def classify_incident(context: dict) -> dict:
"""
Classify the incident based on enriched context.
Returns classification with confidence score.
"""
recent_deploys = context.get("recent_deploys", [])
upstream_health = context.get("upstream_health", {})
recent_logs = context.get("recent_logs", [])
# Rule 1: Deploy in last 30 mins + errors started → deploy regression
if recent_deploys:
latest_deploy_age_mins = _minutes_ago(recent_deploys[0]["deployed_at"])
if latest_deploy_age_mins < 30:
return {
"type": IncidentType.RECENT_DEPLOY_REGRESSION,
"confidence": 0.85,
"evidence": f"Deploy {recent_deploys[0]['version']} {latest_deploy_age_mins}m ago",
"suggested_action": "rollback",
"rollback_to": recent_deploys[1]["version"] if len(recent_deploys) > 1 else None,
}
# Rule 2: Upstream dependency unhealthy
unhealthy_deps = [k for k, v in upstream_health.items() if not v.get("healthy")]
if unhealthy_deps:
return {
"type": IncidentType.DEPENDENCY_FAILURE,
"confidence": 0.90,
"evidence": f"Unhealthy dependencies: {unhealthy_deps}",
"suggested_action": "notify_dep_team",
"dependencies": unhealthy_deps,
}
# Rule 3: OOM / CPU errors in logs
oom_logs = [l for l in recent_logs if "OOMKilled" in l.get("message", "")
or "out of memory" in l.get("message", "").lower()]
if oom_logs:
return {
"type": IncidentType.RESOURCE_EXHAUSTION,
"confidence": 0.80,
"evidence": f"{len(oom_logs)} OOM events detected",
"suggested_action": "scale_up",
}
return {
"type": IncidentType.UNKNOWN,
"confidence": 0.0,
"suggested_action": "page_human",
}
def _minutes_ago(iso_timestamp: str) -> float:
from datetime import timezone
ts = datetime.fromisoformat(iso_timestamp.replace("Z", "+00:00"))
return (datetime.now(timezone.utc) - ts).total_seconds() / 60
Step 4: Auto-Remediation Actions
For high-confidence classifications, we attempt automated remediation:
def attempt_auto_remediation(classification: dict, context: dict, incident_id: str) -> bool:
"""
Attempt automated fix. Returns True if remediation was applied.
Only acts on high-confidence classifications.
"""
confidence = classification.get("confidence", 0)
action = classification.get("suggested_action")
# Only auto-remediate above 80% confidence
if confidence < 0.80:
print(f"Confidence {confidence:.0%} too low for auto-remediation")
return False
success = False
if action == "rollback":
success = _execute_rollback(
service=context["service"],
target_version=classification.get("rollback_to"),
incident_id=incident_id,
)
elif action == "scale_up":
success = _scale_deployment(
service=context["service"],
scale_factor=2,
incident_id=incident_id,
)
if success:
_add_timeline(incident_id, "auto_remediation_applied", {
"action": action,
"confidence": confidence,
})
# Notify Slack but don't page — just inform
_notify_slack(
channel="#incidents",
message=f"✅ Auto-remediated: `{action}` applied to `{context['service']}` "
f"(confidence: {confidence:.0%}). Monitoring...",
incident_id=incident_id,
)
return success
def _execute_rollback(service: str, target_version: str, incident_id: str) -> bool:
"""Trigger Cloud Run or GKE rollback via Cloud Deploy."""
import subprocess
if not target_version:
print("No rollback target version available")
return False
try:
# GKE rollback via kubectl (in practice, use Cloud Deploy API)
result = subprocess.run([
"kubectl", "rollout", "undo",
f"deployment/{service}",
f"--to-revision={target_version}",
"-n", "production"
], capture_output=True, text=True, timeout=60)
if result.returncode == 0:
print(f"Rollback to {target_version} succeeded for {service}")
return True
else:
print(f"Rollback failed: {result.stderr}")
return False
except Exception as e:
print(f"Rollback exception: {e}")
return False
Step 5: Smart Notifications
When auto-remediation isn't triggered, we send a pre-filled context package to the on-call engineer instead of a bare alert:
def notify_on_call(incident_id: str, context: dict, classification: dict):
"""
Send enriched incident brief to Slack + PagerDuty.
The goal: engineer opens the alert and immediately knows where to look.
"""
recent_deploy_msg = ""
if context.get("recent_deploys"):
d = context["recent_deploys"][0]
recent_deploy_msg = f"\n• *Recent deploy:* `{d['version']}` by {d['deployed_by']} ({d['deployed_at']})"
unhealthy = [k for k, v in context.get("upstream_health", {}).items() if not v.get("healthy")]
dep_msg = f"\n• *Unhealthy deps:* {', '.join(unhealthy)}" if unhealthy else ""
slack_blocks = [
{
"type": "header",
"text": {"type": "plain_text", "text": f"🚨 Incident {incident_id}"}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
f"*Service:* `{context['service']}`\n"
f"*Classification:* {classification['type'].value} "
f"({classification['confidence']:.0%} confidence)\n"
f"*Evidence:* {classification.get('evidence', 'N/A')}"
f"{recent_deploy_msg}"
f"{dep_msg}"
)
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "📋 View Runbook"},
"url": f"https://wiki.internal/runbooks/{classification['type'].value}"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "📊 Open Dashboard"},
"url": f"https://console.cloud.google.com/monitoring?project=my-project"
},
]
}
]
# Post to Slack
requests.post(
"https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
json={"blocks": slack_blocks}
)
Results After 4 Months
| Metric | Before | After |
|---|---|---|
| Mean time to detect (MTTD) | 38 min | 3 min |
| Mean time to resolve (MTTR) | 51 min | 10 min |
| Auto-remediated incidents | 0% | 42% |
| Incidents requiring escalation | 100% | 31% |
| On-call pages per week | 47 | 11 |
| Engineer time on incidents | ~8hr/week | ~1.5hr/week |
The 42% auto-remediation rate was higher than we expected. Turns out, "recent deploy broke something" and "dependency flaked" together account for most incidents — and both are very automatable.
Safety Guardrails We Added
Automation at this level requires careful guardrails:
- Confidence threshold: Never auto-remediate below 80% confidence. Human judgment kicks in for ambiguous cases.
- Blast radius limits: Auto-rollback only if the deployment was in the last 30 minutes. Older issues get human review.
- Audit trail: Every automated action writes to Firestore and BigQuery. Full immutable history.
- Circuit breaker on the automation itself: If auto-remediation fails twice in a row for the same service, disable it and page a human.
- Business hours sensitivity: During peak business hours, we lower the confidence threshold to 90% (more conservative). At 3am, 80% is acceptable.
What I'd Do Differently
- Invest in runbook quality before automation. You're essentially codifying your runbooks. If the runbooks are bad, the automation is bad.
- Start with notification enrichment, not remediation. Even just sending pre-filled context to on-call engineers cut our MTTR by 40% before we automated a single action.
- Build the BigQuery incident history early. Trend analysis on incident types drove which automations to prioritize. Without data, we would have automated the wrong things first.
Resources
- Cloud Monitoring Alerting Policies
- Cloud Functions for Event-Driven Automation
- Google SRE Workbook — Incident Management
- Firestore Documentation
What percentage of your incidents are auto-remediated? I'd love to compare numbers — drop a comment below.
Tags: #sre #googlecloud #incidentmanagement #automation #gcp #devops #platform

Top comments (0)