DEV Community

Pax
Pax

Posted on • Originally published at paxrel.com

AI Agent for Utilities: Automate Grid Management, Water Treatment & Customer Operations

HomeBlog → AI Agent for Utilities

# AI Agent for Utilities: Automate Grid Management, Water Treatment & Customer Operations
Enter fullscreen mode Exit fullscreen mode

Photo by Robert So on Pexels

    March 28, 2026
    14 min read
    Utilities
    AI Agents


The US electric grid experiences **$150 billion per year in outage costs**. Water utilities lose 20–30% of treated water to leaks before it reaches customers. Customer call centers handle 2–5 million calls per year per large utility, with 40% being simple billing inquiries. AI agents that predict equipment failure, optimize water treatment chemistry, detect leaks acoustically, and handle customer requests autonomously are reshaping the $2.1 trillion global utilities sector.

This guide covers building autonomous agents for electric, gas, and water utilities. Production-ready Python code, SCADA/OT integration patterns, and hard ROI numbers from real deployments.


    ### Table of Contents

        - [1. Smart Grid Optimization Agent](#grid)
        - [2. Outage Prediction & Response Agent](#outage)
        - [3. Water Treatment Optimization Agent](#water)
        - [4. Leak Detection Agent](#leak)
        - [5. Demand Response Agent](#demand)
        - [6. Customer Operations Agent](#customer)
        - [7. ROI Analysis](#roi)



## 1. Smart Grid Optimization Agent

Modern grids must balance variable renewable generation (solar, wind), distributed energy resources (rooftop solar, batteries, EVs), and demand patterns that shift hourly. The agent processes real-time telemetry from thousands of sensors to optimize voltage regulation, reactive power, and load balancing across the distribution network.
Enter fullscreen mode Exit fullscreen mode
import numpy as np
from datetime import datetime, timedelta

class SmartGridAgent:
    """Optimizes electric grid operations in real time."""

    VOLTAGE_LIMITS = {
        "distribution_120v": {"min": 114, "max": 126},   # ANSI C84.1 Range A
        "distribution_240v": {"min": 228, "max": 252},
        "primary_4kv": {"min": 3800, "max": 4200},
        "primary_13kv": {"min": 12350, "max": 13650},
    }

    def __init__(self, scada_feed, weather_api, der_registry, market_api):
        self.scada = scada_feed
        self.weather = weather_api
        self.der = der_registry      # Distributed Energy Resources
        self.market = market_api

    def optimize_voltage(self, feeder_id):
        """Volt-VAR optimization for a distribution feeder."""
        measurements = self.scada.get_feeder_measurements(feeder_id)
        capacitor_banks = self.scada.get_capacitors(feeder_id)
        voltage_regulators = self.scada.get_regulators(feeder_id)

        violations = []
        actions = []

        for node in measurements:
            voltage_class = node["voltage_class"]
            limits = self.VOLTAGE_LIMITS.get(voltage_class, {})
            v = node["voltage"]

            if v  limits.get("max", 999999):
                violations.append({
                    "node": node["id"],
                    "type": "overvoltage",
                    "value": v,
                    "limit": limits["max"],
                    "deviation_pct": round((v - limits["max"]) / limits["max"] * 100, 2),
                })

        # Determine corrections
        if violations:
            low_v = [v for v in violations if v["type"] == "undervoltage"]
            high_v = [v for v in violations if v["type"] == "overvoltage"]

            if low_v:
                # Switch on capacitor banks to boost voltage
                for cap in capacitor_banks:
                    if cap["status"] == "open":
                        actions.append({
                            "device": cap["id"],
                            "action": "close",
                            "type": "capacitor_bank",
                            "expected_voltage_boost_pct": 1.5,
                        })
                        break

                # Raise voltage regulator taps
                for reg in voltage_regulators:
                    if reg["tap_position"]  25:
            return 0
        if wind_speed_ms >= 12:
            return 1.0
        return ((wind_speed_ms - 3) / 9) ** 3
Enter fullscreen mode Exit fullscreen mode
    **Production tip:** Volt-VAR optimization (VVO) alone saves utilities 2–4% on distribution losses. At scale, that's $20–50M per year for a large utility. The key is sub-second SCADA telemetry and accurate feeder models.


## 2. Outage Prediction & Response Agent

Power outages cost the US economy **$150B per year**. Most are caused by vegetation contact, equipment failure, and weather. The agent combines weather forecasts, vegetation growth models, equipment age data, and historical outage patterns to predict and prevent outages before they happen.
Enter fullscreen mode Exit fullscreen mode
class OutagePredictionAgent:
    """Predicts and manages power outages."""

    def __init__(self, oms_client, weather_api, asset_db, vegetation_model):
        self.oms = oms_client          # Outage Management System
        self.weather = weather_api
        self.assets = asset_db
        self.veg = vegetation_model

    def predict_outage_risk(self, region, hours_ahead=48):
        """Predict outage probability by circuit."""
        circuits = self.assets.get_circuits(region)
        weather = self.weather.get_severe_forecast(region, hours_ahead)
        risk_scores = []

        for circuit in circuits:
            # Weather risk
            weather_risk = 0
            if weather.get("wind_gust_mph", 0) > 40:
                weather_risk += 0.3
            if weather.get("ice_accumulation_in", 0) > 0.25:
                weather_risk += 0.5  # Ice storms are devastating
            if weather.get("lightning_probability", 0) > 0.6:
                weather_risk += 0.15

            # Equipment age risk
            avg_pole_age = circuit.get("avg_pole_age_years", 30)
            equipment_risk = min(0.3, avg_pole_age / 100)

            # Vegetation risk
            veg_score = self.veg.get_encroachment_score(circuit["id"])
            veg_risk = veg_score * 0.25

            # Historical pattern
            historical = self.oms.get_outage_frequency(
                circuit["id"], years=3
            )
            history_risk = min(0.2, historical / 10)

            composite = min(1.0, weather_risk + equipment_risk + veg_risk + history_risk)
            risk_scores.append({
                "circuit_id": circuit["id"],
                "circuit_name": circuit["name"],
                "customers_served": circuit["customer_count"],
                "risk_score": round(composite, 3),
                "risk_level": (
                    "critical" if composite > 0.7
                    else "high" if composite > 0.4
                    else "medium" if composite > 0.2
                    else "low"
                ),
                "primary_driver": max(
                    [("weather", weather_risk), ("equipment", equipment_risk),
                     ("vegetation", veg_risk), ("history", history_risk)],
                    key=lambda x: x[1]
                )[0],
                "components": {
                    "weather": round(weather_risk, 3),
                    "equipment": round(equipment_risk, 3),
                    "vegetation": round(veg_risk, 3),
                    "history": round(history_risk, 3),
                },
            })

        return sorted(risk_scores, key=lambda r: -r["risk_score"])

    def optimize_crew_dispatch(self, active_outages, available_crews):
        """Assign restoration crews to outages by priority."""
        # Priority: customers affected * estimated restoration time * critical facilities
        scored_outages = []
        for outage in active_outages:
            critical_count = outage.get("critical_facilities", 0)  # hospitals, etc
            customer_impact = outage["customers_affected"]
            est_restore_hrs = outage["estimated_restore_hours"]

            priority = (
                customer_impact * 1.0 +
                critical_count * 500 +
                (1 / max(est_restore_hrs, 0.5)) * 100  # Faster fixes first
            )
            scored_outages.append({**outage, "priority_score": priority})

        scored_outages.sort(key=lambda o: -o["priority_score"])

        assignments = []
        assigned_crews = set()
        for outage in scored_outages:
            best_crew = None
            best_travel = float("inf")

            for crew in available_crews:
                if crew["id"] in assigned_crews:
                    continue
                if crew["skill_level"]  recommended_dose * 0.45,
        }

    def monitor_disinfection(self, plant_id):
        """Ensure adequate disinfection while minimizing DBP formation."""
        clearwell = self.scada.get_current(plant_id, "clearwell")
        ct = clearwell["chlorine_mg_l"] * clearwell["contact_time_min"]

        # CT requirement depends on temperature and pH
        required_ct = self._get_required_ct(
            clearwell["temp_c"], clearwell["ph"],
            target_log_removal=3.0  # 3-log Giardia
        )

        # DBP formation potential
        toc = clearwell.get("toc_mg_l", 2.0)
        chlorine = clearwell["chlorine_mg_l"]
        temp = clearwell["temp_c"]
        # Simplified THM formation model
        estimated_thm = toc * chlorine * (temp / 20) * 8  # ppb estimate

        return {
            "plant_id": plant_id,
            "actual_ct": round(ct, 1),
            "required_ct": round(required_ct, 1),
            "ct_ratio": round(ct / required_ct, 2),
            "compliant": ct >= required_ct,
            "chlorine_residual": clearwell["chlorine_mg_l"],
            "estimated_thm_ppb": round(estimated_thm, 1),
            "thm_limit_ppb": self.EPA_LIMITS["thm_ppb"],
            "thm_margin_pct": round(
                (1 - estimated_thm / self.EPA_LIMITS["thm_ppb"]) * 100, 1
            ),
            "recommendation": (
                "reduce_chlorine" if estimated_thm > 60 and ct > required_ct * 1.5
                else "increase_chlorine" if ct  0 else 0

        # Minimum Night Flow (MNF) analysis — best leak indicator
        mnf = self.sensors.get_minimum_night_flow(dma_id)
        legitimate_night_use = consumption.get("night_use_estimate_m3h", 0.5)
        leak_estimate = max(0, mnf - legitimate_night_use)

        # Trend analysis
        mnf_history = self.sensors.get_mnf_history(dma_id, days=30)
        mnf_trend = np.polyfit(range(len(mnf_history)), mnf_history, 1)[0]

        return {
            "dma_id": dma_id,
            "net_inflow_m3h": round(net_inflow, 2),
            "consumption_m3h": round(total_consumption, 2),
            "unaccounted_m3h": round(unaccounted, 2),
            "nrw_pct": round(nrw_pct, 1),
            "mnf_m3h": round(mnf, 2),
            "estimated_leakage_m3h": round(leak_estimate, 2),
            "mnf_trend_m3h_per_day": round(mnf_trend, 4),
            "alert": nrw_pct > 25 or mnf_trend > 0.05,
            "severity": (
                "critical" if nrw_pct > 40 or leak_estimate > 10
                else "high" if nrw_pct > 25
                else "medium" if nrw_pct > 15
                else "low"
            ),
        }

    def acoustic_leak_locate(self, pipe_segment_id):
        """Use acoustic sensor correlation to pinpoint leak location."""
        sensors = self.sensors.get_acoustic_pair(pipe_segment_id)
        if len(sensors) 
        **Real-world impact:** Thames Water (UK) deployed AI leak detection and reduced leakage by 15% in the first year  saving 100 million liters per day. The acoustic correlation technique can pinpoint leaks to within 1 meter on metallic pipes.


    ## 5. Demand Response Agent

    Peak electricity demand drives **1025% of total grid investment** despite occurring less than 100 hours per year. Demand response programs that reduce peak load by 510% can defer billions in infrastructure spending. The agent coordinates load curtailment across commercial, industrial, and residential customers.


Enter fullscreen mode Exit fullscreen mode

class DemandResponseAgent:
"""Manages demand response events and distributed resources."""

def __init__(self, customer_db, der_registry, market_api, weather_api):
    self.customers = customer_db
    self.der = der_registry
    self.market = market_api
    self.weather = weather_api

def trigger_demand_response(self, target_reduction_mw, duration_hours):
    """Orchestrate a demand response event."""
    # Get all enrolled resources sorted by cost
    resources = self.customers.get_dr_enrolled()
    resource_stack = []

    for r in resources:
        curtailable_kw = r["max_curtailment_kw"]
        cost_per_kwh = r.get("incentive_rate", 0.25)
        reliability = r.get("historical_performance", 0.85)

        resource_stack.append({
            "customer_id": r["id"],
            "name": r["name"],
            "type": r["customer_type"],  # commercial, industrial, residential
            "curtailable_kw": curtailable_kw,
            "effective_kw": curtailable_kw * reliability,
            "cost_per_kwh": cost_per_kwh,
            "total_event_cost": curtailable_kw * cost_per_kwh * duration_hours,
        })

    # Sort by cost-effectiveness
    resource_stack.sort(key=lambda r: r["cost_per_kwh"])

    # Stack resources until target met
    dispatched = []
    total_dispatched_kw = 0
    target_kw = target_reduction_mw * 1000

    for resource in resource_stack:
        if total_dispatched_kw >= target_kw * 1.1:  # 10% over-dispatch
            break
        dispatched.append(resource)
        total_dispatched_kw += resource["effective_kw"]

    total_cost = sum(r["total_event_cost"] for r in dispatched)
    avoided_market_cost = target_reduction_mw * self.market.get_peak_price() * duration_hours

    return {
        "target_mw": target_reduction_mw,
        "dispatched_mw": round(total_dispatched_kw / 1000, 2),
        "resources_dispatched": len(dispatched),
        "duration_hours": duration_hours,
        "total_incentive_cost": round(total_cost),
        "avoided_market_cost": round(avoided_market_cost),
        "net_savings": round(avoided_market_cost - total_cost),
        "dispatched_resources": dispatched,
    }
Enter fullscreen mode Exit fullscreen mode

    ## 6. Customer Operations Agent

    Utility customer centers handle **2–5 million calls per year**. 40% are billing inquiries, 25% are outage reports, 15% are service requests. AI agents can handle 70–85% of these interactions autonomously, reducing call center costs by $15–25M per year.


Enter fullscreen mode Exit fullscreen mode

class CustomerOpsAgent:
"""Handles utility customer inquiries autonomously."""

def __init__(self, billing_db, oms_client, crm_client, llm):
    self.billing = billing_db
    self.oms = oms_client
    self.crm = crm_client
    self.llm = llm

def handle_inquiry(self, customer_id, inquiry_text):
    """Route and resolve customer inquiries."""
    # Classify intent
    intent = self._classify_intent(inquiry_text)
    customer = self.crm.get_customer(customer_id)

    if intent == "billing_inquiry":
        return self._handle_billing(customer, inquiry_text)
    elif intent == "outage_report":
        return self._handle_outage(customer)
    elif intent == "high_bill":
        return self._handle_high_bill(customer)
    elif intent == "payment_arrangement":
        return self._handle_payment_plan(customer)
    elif intent == "service_request":
        return self._handle_service_request(customer, inquiry_text)
    else:
        return {"action": "escalate_to_agent", "reason": "unclassified_intent"}

def _handle_high_bill(self, customer):
    """Analyze and explain high bill complaints."""
    current = self.billing.get_current_bill(customer["id"])
    previous = self.billing.get_bill_history(customer["id"], months=12)
    avg_bill = np.mean([b["amount"] for b in previous])

    # Usage analysis
    usage_current = current["usage_kwh"]
    usage_avg = np.mean([b["usage_kwh"] for b in previous])
    usage_increase_pct = (usage_current - usage_avg) / usage_avg * 100

    # Weather impact
    hdd = current.get("heating_degree_days", 0)
    cdd = current.get("cooling_degree_days", 0)
    avg_hdd = np.mean([b.get("heating_degree_days", 0) for b in previous])
    avg_cdd = np.mean([b.get("cooling_degree_days", 0) for b in previous])

    # Rate change check
    rate_changed = current["rate_per_kwh"] != previous[-1]["rate_per_kwh"]

    explanations = []
    if usage_increase_pct > 15:
        if hdd > avg_hdd * 1.2:
            explanations.append(
                f"Heating demand was {round((hdd/avg_hdd - 1)*100)}% above average due to colder weather"
            )
        elif cdd > avg_cdd * 1.2:
            explanations.append(
                f"Cooling demand was {round((cdd/avg_cdd - 1)*100)}% above average due to hotter weather"
            )
        else:
            explanations.append(
                f"Usage increased {round(usage_increase_pct)}% vs your 12-month average"
            )
    if rate_changed:
        explanations.append("A rate adjustment took effect this billing period")

    return {
        "action": "auto_respond",
        "current_bill": current["amount"],
        "average_bill": round(avg_bill, 2),
        "usage_change_pct": round(usage_increase_pct, 1),
        "explanations": explanations,
        "suggestions": [
            "Enroll in budget billing to spread costs evenly",
            "Schedule a free energy audit",
            "Review thermostat settings",
        ],
    }
Enter fullscreen mode Exit fullscreen mode



    ## 7. ROI Analysis


        AgentAnnual SavingsImplementationPayback
        Smart Grid (VVO)$20–50M (loss reduction)$5–10M3–6 months
        Outage Prediction$15–40M (avoided outages)$3–6M3–5 months
        Water Treatment$5–12M (chemicals + energy)$2–4M4–8 months
        Leak Detection$10–30M (water savings)$4–8M4–8 months
        Demand Response$30–80M (deferred capex)$5–10M2–4 months
        Customer Operations$15–25M (call center)$2–4M2–3 months


    **Total portfolio: $95–237M in annual savings** against $21–42M in implementation costs. The biggest impact comes from demand response (deferred infrastructure) and smart grid optimization (distribution loss reduction).


        ### Build Your Own AI Agent
        Get the complete blueprint for building autonomous AI agents — includes templates, security checklists, and deployment guides.

        [Get The AI Agent Playbook — $19](https://paxrelate.gumroad.com/l/ai-agent-playbook)




            ### Not ready to buy? Start with Chapter 1 — free
            Get the first chapter of The AI Agent Playbook delivered to your inbox. Learn what AI agents really are and see real production examples.

            [Get Free Chapter →](/free-chapter.html)



        ### Related Articles

            [
                #### AI Agent for Energy
                Grid optimization, renewable forecasting, and energy trading.

            ](https://paxrel.com/blog-ai-agent-energy.html)
            [
                #### AI Agent for Construction
                Project scheduling, safety monitoring, and cost estimation.

            ](https://paxrel.com/blog-ai-agent-construction.html)
            [
                #### AI Agent for Government
                Citizen services, document processing, and compliance.

            ](https://paxrel.com/blog-ai-agent-government.html)

---

*Get our free [AI Agent Starter Kit](https://paxrel.com/ai-agent-starter-kit.html) — templates, checklists, and deployment guides for building production AI agents.*
Enter fullscreen mode Exit fullscreen mode

Top comments (0)