DEV Community

Ayush Raj Jha
Ayush Raj Jha

Posted on

Real-World Incident Automation Using GCP: How I Cut MTTR by 80%

gcp incident response

We used to resolve incidents with Slack messages, gut instinct, and heroics. Now most incidents resolve themselves. Here's exactly how I built that.


The Problem with Manual Incident Response

At 2:47am on a Tuesday, our payment service started throwing errors. A senior engineer woke up to a PagerDuty alert, spent 12 minutes just finding the right runbook, another 20 minutes correlating logs across three different dashboards, and finally identified a misconfigured connection pool that had been deployed 6 hours earlier.

Total time to resolve: 51 minutes of customer-impacting downtime.

The fix itself? Four lines of config. The rest was just finding the problem.

I decided to systematically eliminate the detective work. This post covers the automation layer I built on GCP to detect, diagnose, and in many cases auto-remediate incidents before a human ever gets paged.


System Architecture

[Cloud Monitoring]
       |
       | Alert fires (Pub/Sub notification)
       ▼
[Cloud Functions]          ← Incident Orchestrator
       |
       ├── Enrich context  (logs, traces, recent deploys)
       ├── Classify type   (latency / error rate / saturation)
       ├── Check runbook   (known issue? auto-remediate)
       ├── Notify humans   (Slack + PagerDuty if needed)
       └── Create ticket   (Jira with pre-filled context)
             |
             ▼
       [Firestore]          ← Incident state store
       [Cloud Run]          ← Remediation actions
       [BigQuery]           ← Incident history + analytics
Enter fullscreen mode Exit fullscreen mode

Step 1: Alert Routing via Pub/Sub

All Cloud Monitoring alerts route to a single Pub/Sub topic. The alert payload carries enough context to start triage immediately.

# Cloud Monitoring notification channel config (Terraform)
resource "google_monitoring_notification_channel" "pubsub" {
  display_name = "Incident Automation Topic"
  type         = "pubsub"

  labels = {
    topic = "projects/my-project/topics/monitoring-alerts"
  }
}

resource "google_monitoring_alert_policy" "high_error_rate" {
  display_name = "High Error Rate - API Service"
  combiner     = "OR"

  conditions {
    display_name = "Error rate > 1%"
    condition_threshold {
      filter          = "metric.type=\"custom.googleapis.com/sli/error_rate\" resource.type=\"k8s_container\""
      duration        = "120s"
      comparison      = "COMPARISON_GT"
      threshold_value = 0.01

      aggregations {
        alignment_period   = "60s"
        per_series_aligner = "ALIGN_RATE"
      }
    }
  }

  notification_channels = [google_monitoring_notification_channel.pubsub.name]

  alert_strategy {
    auto_close = "604800s"
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: The Incident Orchestrator (Cloud Functions)

This is the heart of the system. A Cloud Function subscribes to the alert topic and orchestrates the response:

import functions_framework
import json
import base64
from google.cloud import logging_v2, firestore, pubsub_v1
from datetime import datetime, timedelta
import requests

db = firestore.Client()
log_client = logging_v2.Client()

@functions_framework.cloud_event
def handle_alert(cloud_event):
    """Entry point: triggered by Pub/Sub alert message."""

    # Decode the alert payload
    data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
    alert = json.loads(data)

    incident_id = f"inc_{cloud_event.id[:8]}"

    print(json.dumps({
        "message": "Incident received",
        "incident_id": incident_id,
        "policy": alert.get("incident", {}).get("policy_name"),
        "severity": "INFO"
    }))

    # Store incident state
    incident_ref = db.collection("incidents").document(incident_id)
    incident_ref.set({
        "id": incident_id,
        "alert": alert,
        "status": "investigating",
        "created_at": datetime.utcnow(),
        "timeline": []
    })

    # Run the triage pipeline
    context = enrich_context(alert, incident_id)
    classification = classify_incident(context)
    remediated = attempt_auto_remediation(classification, context, incident_id)

    if not remediated:
        notify_on_call(incident_id, context, classification)

    # Always log to BigQuery for trend analysis
    log_to_bigquery(incident_id, classification, context, remediated)


def enrich_context(alert: dict, incident_id: str) -> dict:
    """Gather all context needed for triage."""

    resource = alert.get("incident", {}).get("resource", {})
    service_name = resource.get("labels", {}).get("container_name", "unknown")

    context = {
        "service": service_name,
        "alert_time": alert.get("incident", {}).get("started_at"),
        "policy": alert.get("incident", {}).get("policy_name"),
        "recent_logs": fetch_recent_errors(service_name),
        "recent_deploys": fetch_recent_deploys(service_name),
        "upstream_health": check_upstream_dependencies(service_name),
    }

    # Add timeline entry
    db.collection("incidents").document(incident_id).update({
        "timeline": firestore.ArrayUnion([{
            "time": datetime.utcnow().isoformat(),
            "action": "context_enriched",
            "data": {"services_checked": list(context.keys())}
        }])
    })

    return context


def fetch_recent_errors(service_name: str) -> list:
    """Pull last 50 error logs from the past 15 minutes."""

    client = log_client
    now = datetime.utcnow()
    window_start = (now - timedelta(minutes=15)).strftime("%Y-%m-%dT%H:%M:%SZ")

    filter_str = f"""
        resource.type="k8s_container"
        resource.labels.container_name="{service_name}"
        severity>=ERROR
        timestamp>="{window_start}"
    """

    entries = list(client.list_entries(filter_=filter_str, max_results=50))

    return [
        {
            "timestamp": entry.timestamp.isoformat(),
            "message": entry.payload if isinstance(entry.payload, str) 
                       else entry.payload.get("message", ""),
            "severity": entry.severity,
        }
        for entry in entries
    ]


def fetch_recent_deploys(service_name: str) -> list:
    """Check Cloud Deploy history for recent changes."""

    # Query your deployment history — this example uses a Firestore deploys collection
    deploys_ref = db.collection("deploys") \
        .where("service", "==", service_name) \
        .where("deployed_at", ">=", datetime.utcnow() - timedelta(hours=4)) \
        .order_by("deployed_at", direction=firestore.Query.DESCENDING) \
        .limit(5)

    return [
        {
            "version": d.get("version"),
            "deployed_at": d.get("deployed_at").isoformat(),
            "deployed_by": d.get("deployed_by"),
        }
        for d in [doc.to_dict() for doc in deploys_ref.stream()]
    ]
Enter fullscreen mode Exit fullscreen mode

Step 3: Incident Classification

Once I have context, I classify the incident type. This drives which remediation playbook to run:

from enum import Enum

class IncidentType(Enum):
    RECENT_DEPLOY_REGRESSION  = "recent_deploy_regression"
    DEPENDENCY_FAILURE        = "dependency_failure"
    RESOURCE_EXHAUSTION       = "resource_exhaustion"
    TRAFFIC_SPIKE             = "traffic_spike"
    UNKNOWN                   = "unknown"


def classify_incident(context: dict) -> dict:
    """
    Classify the incident based on enriched context.
    Returns classification with confidence score.
    """

    recent_deploys = context.get("recent_deploys", [])
    upstream_health = context.get("upstream_health", {})
    recent_logs = context.get("recent_logs", [])

    # Rule 1: Deploy in last 30 mins + errors started → deploy regression
    if recent_deploys:
        latest_deploy_age_mins = _minutes_ago(recent_deploys[0]["deployed_at"])
        if latest_deploy_age_mins < 30:
            return {
                "type": IncidentType.RECENT_DEPLOY_REGRESSION,
                "confidence": 0.85,
                "evidence": f"Deploy {recent_deploys[0]['version']} {latest_deploy_age_mins}m ago",
                "suggested_action": "rollback",
                "rollback_to": recent_deploys[1]["version"] if len(recent_deploys) > 1 else None,
            }

    # Rule 2: Upstream dependency unhealthy
    unhealthy_deps = [k for k, v in upstream_health.items() if not v.get("healthy")]
    if unhealthy_deps:
        return {
            "type": IncidentType.DEPENDENCY_FAILURE,
            "confidence": 0.90,
            "evidence": f"Unhealthy dependencies: {unhealthy_deps}",
            "suggested_action": "notify_dep_team",
            "dependencies": unhealthy_deps,
        }

    # Rule 3: OOM / CPU errors in logs
    oom_logs = [l for l in recent_logs if "OOMKilled" in l.get("message", "")
                                       or "out of memory" in l.get("message", "").lower()]
    if oom_logs:
        return {
            "type": IncidentType.RESOURCE_EXHAUSTION,
            "confidence": 0.80,
            "evidence": f"{len(oom_logs)} OOM events detected",
            "suggested_action": "scale_up",
        }

    return {
        "type": IncidentType.UNKNOWN,
        "confidence": 0.0,
        "suggested_action": "page_human",
    }


def _minutes_ago(iso_timestamp: str) -> float:
    from datetime import timezone
    ts = datetime.fromisoformat(iso_timestamp.replace("Z", "+00:00"))
    return (datetime.now(timezone.utc) - ts).total_seconds() / 60
Enter fullscreen mode Exit fullscreen mode

Step 4: Auto-Remediation Actions

For high-confidence classifications, we attempt automated remediation:

def attempt_auto_remediation(classification: dict, context: dict, incident_id: str) -> bool:
    """
    Attempt automated fix. Returns True if remediation was applied.
    Only acts on high-confidence classifications.
    """

    confidence = classification.get("confidence", 0)
    action = classification.get("suggested_action")

    # Only auto-remediate above 80% confidence
    if confidence < 0.80:
        print(f"Confidence {confidence:.0%} too low for auto-remediation")
        return False

    success = False

    if action == "rollback":
        success = _execute_rollback(
            service=context["service"],
            target_version=classification.get("rollback_to"),
            incident_id=incident_id,
        )

    elif action == "scale_up":
        success = _scale_deployment(
            service=context["service"],
            scale_factor=2,
            incident_id=incident_id,
        )

    if success:
        _add_timeline(incident_id, "auto_remediation_applied", {
            "action": action,
            "confidence": confidence,
        })

        # Notify Slack but don't page — just inform
        _notify_slack(
            channel="#incidents",
            message=f"✅ Auto-remediated: `{action}` applied to `{context['service']}` "
                    f"(confidence: {confidence:.0%}). Monitoring...",
            incident_id=incident_id,
        )

    return success


def _execute_rollback(service: str, target_version: str, incident_id: str) -> bool:
    """Trigger Cloud Run or GKE rollback via Cloud Deploy."""

    import subprocess

    if not target_version:
        print("No rollback target version available")
        return False

    try:
        # GKE rollback via kubectl (in practice, use Cloud Deploy API)
        result = subprocess.run([
            "kubectl", "rollout", "undo",
            f"deployment/{service}",
            f"--to-revision={target_version}",
            "-n", "production"
        ], capture_output=True, text=True, timeout=60)

        if result.returncode == 0:
            print(f"Rollback to {target_version} succeeded for {service}")
            return True
        else:
            print(f"Rollback failed: {result.stderr}")
            return False

    except Exception as e:
        print(f"Rollback exception: {e}")
        return False
Enter fullscreen mode Exit fullscreen mode

Step 5: Smart Notifications

When auto-remediation isn't triggered, we send a pre-filled context package to the on-call engineer instead of a bare alert:

def notify_on_call(incident_id: str, context: dict, classification: dict):
    """
    Send enriched incident brief to Slack + PagerDuty.
    The goal: engineer opens the alert and immediately knows where to look.
    """

    recent_deploy_msg = ""
    if context.get("recent_deploys"):
        d = context["recent_deploys"][0]
        recent_deploy_msg = f"\n• *Recent deploy:* `{d['version']}` by {d['deployed_by']} ({d['deployed_at']})"

    unhealthy = [k for k, v in context.get("upstream_health", {}).items() if not v.get("healthy")]
    dep_msg = f"\n• *Unhealthy deps:* {', '.join(unhealthy)}" if unhealthy else ""

    slack_blocks = [
        {
            "type": "header",
            "text": {"type": "plain_text", "text": f"🚨 Incident {incident_id}"}
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": (
                    f"*Service:* `{context['service']}`\n"
                    f"*Classification:* {classification['type'].value} "
                    f"({classification['confidence']:.0%} confidence)\n"
                    f"*Evidence:* {classification.get('evidence', 'N/A')}"
                    f"{recent_deploy_msg}"
                    f"{dep_msg}"
                )
            }
        },
        {
            "type": "actions",
            "elements": [
                {
                    "type": "button",
                    "text": {"type": "plain_text", "text": "📋 View Runbook"},
                    "url": f"https://wiki.internal/runbooks/{classification['type'].value}"
                },
                {
                    "type": "button",
                    "text": {"type": "plain_text", "text": "📊 Open Dashboard"},
                    "url": f"https://console.cloud.google.com/monitoring?project=my-project"
                },
            ]
        }
    ]

    # Post to Slack
    requests.post(
        "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
        json={"blocks": slack_blocks}
    )
Enter fullscreen mode Exit fullscreen mode

Results After 4 Months

Metric Before After
Mean time to detect (MTTD) 38 min 3 min
Mean time to resolve (MTTR) 51 min 10 min
Auto-remediated incidents 0% 42%
Incidents requiring escalation 100% 31%
On-call pages per week 47 11
Engineer time on incidents ~8hr/week ~1.5hr/week

The 42% auto-remediation rate was higher than we expected. Turns out, "recent deploy broke something" and "dependency flaked" together account for most incidents — and both are very automatable.


Safety Guardrails We Added

Automation at this level requires careful guardrails:

  1. Confidence threshold: Never auto-remediate below 80% confidence. Human judgment kicks in for ambiguous cases.
  2. Blast radius limits: Auto-rollback only if the deployment was in the last 30 minutes. Older issues get human review.
  3. Audit trail: Every automated action writes to Firestore and BigQuery. Full immutable history.
  4. Circuit breaker on the automation itself: If auto-remediation fails twice in a row for the same service, disable it and page a human.
  5. Business hours sensitivity: During peak business hours, we lower the confidence threshold to 90% (more conservative). At 3am, 80% is acceptable.

What I'd Do Differently

  1. Invest in runbook quality before automation. You're essentially codifying your runbooks. If the runbooks are bad, the automation is bad.
  2. Start with notification enrichment, not remediation. Even just sending pre-filled context to on-call engineers cut our MTTR by 40% before we automated a single action.
  3. Build the BigQuery incident history early. Trend analysis on incident types drove which automations to prioritize. Without data, we would have automated the wrong things first.

Resources


What percentage of your incidents are auto-remediated? I'd love to compare numbers — drop a comment below.

Tags: #sre #googlecloud #incidentmanagement #automation #gcp #devops #platform

Top comments (0)