Home → Blog → AI Agent for Utilities
# AI Agent for Utilities: Automate Grid Management, Water Treatment & Customer Operations
Photo by Robert So on Pexels
March 28, 2026
14 min read
Utilities
AI Agents
The US electric grid experiences **$150 billion per year in outage costs**. Water utilities lose 20–30% of treated water to leaks before it reaches customers. Customer call centers handle 2–5 million calls per year per large utility, with 40% being simple billing inquiries. AI agents that predict equipment failure, optimize water treatment chemistry, detect leaks acoustically, and handle customer requests autonomously are reshaping the $2.1 trillion global utilities sector.
This guide covers building autonomous agents for electric, gas, and water utilities. Production-ready Python code, SCADA/OT integration patterns, and hard ROI numbers from real deployments.
### Table of Contents
- [1. Smart Grid Optimization Agent](#grid)
- [2. Outage Prediction & Response Agent](#outage)
- [3. Water Treatment Optimization Agent](#water)
- [4. Leak Detection Agent](#leak)
- [5. Demand Response Agent](#demand)
- [6. Customer Operations Agent](#customer)
- [7. ROI Analysis](#roi)
## 1. Smart Grid Optimization Agent
Modern grids must balance variable renewable generation (solar, wind), distributed energy resources (rooftop solar, batteries, EVs), and demand patterns that shift hourly. The agent processes real-time telemetry from thousands of sensors to optimize voltage regulation, reactive power, and load balancing across the distribution network.
import numpy as np
from datetime import datetime, timedelta
class SmartGridAgent:
"""Optimizes electric grid operations in real time."""
VOLTAGE_LIMITS = {
"distribution_120v": {"min": 114, "max": 126}, # ANSI C84.1 Range A
"distribution_240v": {"min": 228, "max": 252},
"primary_4kv": {"min": 3800, "max": 4200},
"primary_13kv": {"min": 12350, "max": 13650},
}
def __init__(self, scada_feed, weather_api, der_registry, market_api):
self.scada = scada_feed
self.weather = weather_api
self.der = der_registry # Distributed Energy Resources
self.market = market_api
def optimize_voltage(self, feeder_id):
"""Volt-VAR optimization for a distribution feeder."""
measurements = self.scada.get_feeder_measurements(feeder_id)
capacitor_banks = self.scada.get_capacitors(feeder_id)
voltage_regulators = self.scada.get_regulators(feeder_id)
violations = []
actions = []
for node in measurements:
voltage_class = node["voltage_class"]
limits = self.VOLTAGE_LIMITS.get(voltage_class, {})
v = node["voltage"]
if v limits.get("max", 999999):
violations.append({
"node": node["id"],
"type": "overvoltage",
"value": v,
"limit": limits["max"],
"deviation_pct": round((v - limits["max"]) / limits["max"] * 100, 2),
})
# Determine corrections
if violations:
low_v = [v for v in violations if v["type"] == "undervoltage"]
high_v = [v for v in violations if v["type"] == "overvoltage"]
if low_v:
# Switch on capacitor banks to boost voltage
for cap in capacitor_banks:
if cap["status"] == "open":
actions.append({
"device": cap["id"],
"action": "close",
"type": "capacitor_bank",
"expected_voltage_boost_pct": 1.5,
})
break
# Raise voltage regulator taps
for reg in voltage_regulators:
if reg["tap_position"] 25:
return 0
if wind_speed_ms >= 12:
return 1.0
return ((wind_speed_ms - 3) / 9) ** 3
**Production tip:** Volt-VAR optimization (VVO) alone saves utilities 2–4% on distribution losses. At scale, that's $20–50M per year for a large utility. The key is sub-second SCADA telemetry and accurate feeder models.
## 2. Outage Prediction & Response Agent
Power outages cost the US economy **$150B per year**. Most are caused by vegetation contact, equipment failure, and weather. The agent combines weather forecasts, vegetation growth models, equipment age data, and historical outage patterns to predict and prevent outages before they happen.
class OutagePredictionAgent:
"""Predicts and manages power outages."""
def __init__(self, oms_client, weather_api, asset_db, vegetation_model):
self.oms = oms_client # Outage Management System
self.weather = weather_api
self.assets = asset_db
self.veg = vegetation_model
def predict_outage_risk(self, region, hours_ahead=48):
"""Predict outage probability by circuit."""
circuits = self.assets.get_circuits(region)
weather = self.weather.get_severe_forecast(region, hours_ahead)
risk_scores = []
for circuit in circuits:
# Weather risk
weather_risk = 0
if weather.get("wind_gust_mph", 0) > 40:
weather_risk += 0.3
if weather.get("ice_accumulation_in", 0) > 0.25:
weather_risk += 0.5 # Ice storms are devastating
if weather.get("lightning_probability", 0) > 0.6:
weather_risk += 0.15
# Equipment age risk
avg_pole_age = circuit.get("avg_pole_age_years", 30)
equipment_risk = min(0.3, avg_pole_age / 100)
# Vegetation risk
veg_score = self.veg.get_encroachment_score(circuit["id"])
veg_risk = veg_score * 0.25
# Historical pattern
historical = self.oms.get_outage_frequency(
circuit["id"], years=3
)
history_risk = min(0.2, historical / 10)
composite = min(1.0, weather_risk + equipment_risk + veg_risk + history_risk)
risk_scores.append({
"circuit_id": circuit["id"],
"circuit_name": circuit["name"],
"customers_served": circuit["customer_count"],
"risk_score": round(composite, 3),
"risk_level": (
"critical" if composite > 0.7
else "high" if composite > 0.4
else "medium" if composite > 0.2
else "low"
),
"primary_driver": max(
[("weather", weather_risk), ("equipment", equipment_risk),
("vegetation", veg_risk), ("history", history_risk)],
key=lambda x: x[1]
)[0],
"components": {
"weather": round(weather_risk, 3),
"equipment": round(equipment_risk, 3),
"vegetation": round(veg_risk, 3),
"history": round(history_risk, 3),
},
})
return sorted(risk_scores, key=lambda r: -r["risk_score"])
def optimize_crew_dispatch(self, active_outages, available_crews):
"""Assign restoration crews to outages by priority."""
# Priority: customers affected * estimated restoration time * critical facilities
scored_outages = []
for outage in active_outages:
critical_count = outage.get("critical_facilities", 0) # hospitals, etc
customer_impact = outage["customers_affected"]
est_restore_hrs = outage["estimated_restore_hours"]
priority = (
customer_impact * 1.0 +
critical_count * 500 +
(1 / max(est_restore_hrs, 0.5)) * 100 # Faster fixes first
)
scored_outages.append({**outage, "priority_score": priority})
scored_outages.sort(key=lambda o: -o["priority_score"])
assignments = []
assigned_crews = set()
for outage in scored_outages:
best_crew = None
best_travel = float("inf")
for crew in available_crews:
if crew["id"] in assigned_crews:
continue
if crew["skill_level"] recommended_dose * 0.45,
}
def monitor_disinfection(self, plant_id):
"""Ensure adequate disinfection while minimizing DBP formation."""
clearwell = self.scada.get_current(plant_id, "clearwell")
ct = clearwell["chlorine_mg_l"] * clearwell["contact_time_min"]
# CT requirement depends on temperature and pH
required_ct = self._get_required_ct(
clearwell["temp_c"], clearwell["ph"],
target_log_removal=3.0 # 3-log Giardia
)
# DBP formation potential
toc = clearwell.get("toc_mg_l", 2.0)
chlorine = clearwell["chlorine_mg_l"]
temp = clearwell["temp_c"]
# Simplified THM formation model
estimated_thm = toc * chlorine * (temp / 20) * 8 # ppb estimate
return {
"plant_id": plant_id,
"actual_ct": round(ct, 1),
"required_ct": round(required_ct, 1),
"ct_ratio": round(ct / required_ct, 2),
"compliant": ct >= required_ct,
"chlorine_residual": clearwell["chlorine_mg_l"],
"estimated_thm_ppb": round(estimated_thm, 1),
"thm_limit_ppb": self.EPA_LIMITS["thm_ppb"],
"thm_margin_pct": round(
(1 - estimated_thm / self.EPA_LIMITS["thm_ppb"]) * 100, 1
),
"recommendation": (
"reduce_chlorine" if estimated_thm > 60 and ct > required_ct * 1.5
else "increase_chlorine" if ct 0 else 0
# Minimum Night Flow (MNF) analysis — best leak indicator
mnf = self.sensors.get_minimum_night_flow(dma_id)
legitimate_night_use = consumption.get("night_use_estimate_m3h", 0.5)
leak_estimate = max(0, mnf - legitimate_night_use)
# Trend analysis
mnf_history = self.sensors.get_mnf_history(dma_id, days=30)
mnf_trend = np.polyfit(range(len(mnf_history)), mnf_history, 1)[0]
return {
"dma_id": dma_id,
"net_inflow_m3h": round(net_inflow, 2),
"consumption_m3h": round(total_consumption, 2),
"unaccounted_m3h": round(unaccounted, 2),
"nrw_pct": round(nrw_pct, 1),
"mnf_m3h": round(mnf, 2),
"estimated_leakage_m3h": round(leak_estimate, 2),
"mnf_trend_m3h_per_day": round(mnf_trend, 4),
"alert": nrw_pct > 25 or mnf_trend > 0.05,
"severity": (
"critical" if nrw_pct > 40 or leak_estimate > 10
else "high" if nrw_pct > 25
else "medium" if nrw_pct > 15
else "low"
),
}
def acoustic_leak_locate(self, pipe_segment_id):
"""Use acoustic sensor correlation to pinpoint leak location."""
sensors = self.sensors.get_acoustic_pair(pipe_segment_id)
if len(sensors)
**Real-world impact:** Thames Water (UK) deployed AI leak detection and reduced leakage by 15% in the first year — saving 100 million liters per day. The acoustic correlation technique can pinpoint leaks to within 1 meter on metallic pipes.
## 5. Demand Response Agent
Peak electricity demand drives **10–25% of total grid investment** despite occurring less than 100 hours per year. Demand response programs that reduce peak load by 5–10% can defer billions in infrastructure spending. The agent coordinates load curtailment across commercial, industrial, and residential customers.
class DemandResponseAgent:
"""Manages demand response events and distributed resources."""
def __init__(self, customer_db, der_registry, market_api, weather_api):
self.customers = customer_db
self.der = der_registry
self.market = market_api
self.weather = weather_api
def trigger_demand_response(self, target_reduction_mw, duration_hours):
"""Orchestrate a demand response event."""
# Get all enrolled resources sorted by cost
resources = self.customers.get_dr_enrolled()
resource_stack = []
for r in resources:
curtailable_kw = r["max_curtailment_kw"]
cost_per_kwh = r.get("incentive_rate", 0.25)
reliability = r.get("historical_performance", 0.85)
resource_stack.append({
"customer_id": r["id"],
"name": r["name"],
"type": r["customer_type"], # commercial, industrial, residential
"curtailable_kw": curtailable_kw,
"effective_kw": curtailable_kw * reliability,
"cost_per_kwh": cost_per_kwh,
"total_event_cost": curtailable_kw * cost_per_kwh * duration_hours,
})
# Sort by cost-effectiveness
resource_stack.sort(key=lambda r: r["cost_per_kwh"])
# Stack resources until target met
dispatched = []
total_dispatched_kw = 0
target_kw = target_reduction_mw * 1000
for resource in resource_stack:
if total_dispatched_kw >= target_kw * 1.1: # 10% over-dispatch
break
dispatched.append(resource)
total_dispatched_kw += resource["effective_kw"]
total_cost = sum(r["total_event_cost"] for r in dispatched)
avoided_market_cost = target_reduction_mw * self.market.get_peak_price() * duration_hours
return {
"target_mw": target_reduction_mw,
"dispatched_mw": round(total_dispatched_kw / 1000, 2),
"resources_dispatched": len(dispatched),
"duration_hours": duration_hours,
"total_incentive_cost": round(total_cost),
"avoided_market_cost": round(avoided_market_cost),
"net_savings": round(avoided_market_cost - total_cost),
"dispatched_resources": dispatched,
}
## 6. Customer Operations Agent
Utility customer centers handle **2–5 million calls per year**. 40% are billing inquiries, 25% are outage reports, 15% are service requests. AI agents can handle 70–85% of these interactions autonomously, reducing call center costs by $15–25M per year.
class CustomerOpsAgent:
"""Handles utility customer inquiries autonomously."""
def __init__(self, billing_db, oms_client, crm_client, llm):
self.billing = billing_db
self.oms = oms_client
self.crm = crm_client
self.llm = llm
def handle_inquiry(self, customer_id, inquiry_text):
"""Route and resolve customer inquiries."""
# Classify intent
intent = self._classify_intent(inquiry_text)
customer = self.crm.get_customer(customer_id)
if intent == "billing_inquiry":
return self._handle_billing(customer, inquiry_text)
elif intent == "outage_report":
return self._handle_outage(customer)
elif intent == "high_bill":
return self._handle_high_bill(customer)
elif intent == "payment_arrangement":
return self._handle_payment_plan(customer)
elif intent == "service_request":
return self._handle_service_request(customer, inquiry_text)
else:
return {"action": "escalate_to_agent", "reason": "unclassified_intent"}
def _handle_high_bill(self, customer):
"""Analyze and explain high bill complaints."""
current = self.billing.get_current_bill(customer["id"])
previous = self.billing.get_bill_history(customer["id"], months=12)
avg_bill = np.mean([b["amount"] for b in previous])
# Usage analysis
usage_current = current["usage_kwh"]
usage_avg = np.mean([b["usage_kwh"] for b in previous])
usage_increase_pct = (usage_current - usage_avg) / usage_avg * 100
# Weather impact
hdd = current.get("heating_degree_days", 0)
cdd = current.get("cooling_degree_days", 0)
avg_hdd = np.mean([b.get("heating_degree_days", 0) for b in previous])
avg_cdd = np.mean([b.get("cooling_degree_days", 0) for b in previous])
# Rate change check
rate_changed = current["rate_per_kwh"] != previous[-1]["rate_per_kwh"]
explanations = []
if usage_increase_pct > 15:
if hdd > avg_hdd * 1.2:
explanations.append(
f"Heating demand was {round((hdd/avg_hdd - 1)*100)}% above average due to colder weather"
)
elif cdd > avg_cdd * 1.2:
explanations.append(
f"Cooling demand was {round((cdd/avg_cdd - 1)*100)}% above average due to hotter weather"
)
else:
explanations.append(
f"Usage increased {round(usage_increase_pct)}% vs your 12-month average"
)
if rate_changed:
explanations.append("A rate adjustment took effect this billing period")
return {
"action": "auto_respond",
"current_bill": current["amount"],
"average_bill": round(avg_bill, 2),
"usage_change_pct": round(usage_increase_pct, 1),
"explanations": explanations,
"suggestions": [
"Enroll in budget billing to spread costs evenly",
"Schedule a free energy audit",
"Review thermostat settings",
],
}
## 7. ROI Analysis
AgentAnnual SavingsImplementationPayback
Smart Grid (VVO)$20–50M (loss reduction)$5–10M3–6 months
Outage Prediction$15–40M (avoided outages)$3–6M3–5 months
Water Treatment$5–12M (chemicals + energy)$2–4M4–8 months
Leak Detection$10–30M (water savings)$4–8M4–8 months
Demand Response$30–80M (deferred capex)$5–10M2–4 months
Customer Operations$15–25M (call center)$2–4M2–3 months
**Total portfolio: $95–237M in annual savings** against $21–42M in implementation costs. The biggest impact comes from demand response (deferred infrastructure) and smart grid optimization (distribution loss reduction).
### Build Your Own AI Agent
Get the complete blueprint for building autonomous AI agents — includes templates, security checklists, and deployment guides.
[Get The AI Agent Playbook — $19](https://paxrelate.gumroad.com/l/ai-agent-playbook)
### Not ready to buy? Start with Chapter 1 — free
Get the first chapter of The AI Agent Playbook delivered to your inbox. Learn what AI agents really are and see real production examples.
[Get Free Chapter →](/free-chapter.html)
### Related Articles
[
#### AI Agent for Energy
Grid optimization, renewable forecasting, and energy trading.
](https://paxrel.com/blog-ai-agent-energy.html)
[
#### AI Agent for Construction
Project scheduling, safety monitoring, and cost estimation.
](https://paxrel.com/blog-ai-agent-construction.html)
[
#### AI Agent for Government
Citizen services, document processing, and compliance.
](https://paxrel.com/blog-ai-agent-government.html)
---
*Get our free [AI Agent Starter Kit](https://paxrel.com/ai-agent-starter-kit.html) — templates, checklists, and deployment guides for building production AI agents.*
Top comments (0)