I always ask myself what I want to become in the future. The AI era came too fast and I felt both fear (am I going to be redundant?) and unlimited possibility (now I have all the knowledge, the skills, and the appropriate time to achieve whatever I want). I will start with the possibilities and I will finish with my fears.
I always wanted to develop software that can reduce carbon footprint and cost on AWS. In one of my previous companies my focus was the cost optimization pillar. At first I thought it was not interesting, but from a stakeholder's perspective money always matters (well, this is not always true — some companies would never care about the costs). Years went by and now I think I am mature enough to develop something that could actually save costs.
I focused on Strands SDK because it is a model-driven approach to agent architecture. It mimics how you do OOP, or how you model the solution for a difficult problem — you define the problem, you select the inputs, and you understand the connections between the problem, the inputs, and the final solution. With experience I understood that selecting and writing the right tools for a single agent could lead to better results than reaching for a swarm architecture right away.
Contents
- The model: configuration as the single source of truth
- The session: one object, every analyzer reads from it
- Cost Explorer wrappers: the most expensive API in AWS
- Bootstrap: let the spend tell you which regions matter
- The cost layer: trends, anomalies, commitments
- Compute: where Strands' model-driven approach earns its keep
- Storage: the single biggest hidden cost
- Databases and network: same shape, different problem
- Logs, KMS, secrets: small line items that compound
- Security tooling and governance: paying for what you don't have
- The synthesis tool: one PDF, one call, at the end
- Wiring the agent: the most underrated part of Strands
- CLI: same code, your account or somebody else's
- PDF renderer
- Fears
Below is the example.
The model: configuration as the single source of truth
The first decision was where the agent's knobs live. I have seen too many AWS scripts where the threshold for "idle" is hardcoded somewhere on line 437, and the lookback window is hardcoded somewhere else, and you have to read the entire file to figure out what the script considers waste. So I started with a single CONFIG dict, fed by environment variables, and I pinned the small set of constants that the analyzers reference (the EOL Lambda runtimes, the Cost Explorer filter that strips out credits and refunds). The same code now runs in Lambda, in CI, or on my laptop — only the env vars change.
from __future__ import annotations
import argparse
import os
import sys
import traceback
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Callable
import boto3
from botocore.exceptions import ClientError
from strands import Agent, tool
from strands.models.bedrock import BedrockModel
import pdf_renderer
CONFIG = {
"model": os.getenv("FINOPS_BEDROCK_MODEL", "us.anthropic.claude-sonnet-4-5-20250929-v1:0"),
"region": os.getenv("AWS_REGION", "us-east-1"),
"output_dir": Path(os.getenv("FINOPS_OUTPUT_DIR", str(Path.home() / "Documents/finops-reports"))),
"lookback_months": int(os.getenv("FINOPS_LOOKBACK_MONTHS", "6")),
"delta_usd": float(os.getenv("FINOPS_DELTA_USD", "5")),
"delta_pct": float(os.getenv("FINOPS_DELTA_PCT", "25")),
"stale_days": int(os.getenv("FINOPS_STALE_DAYS", "90")),
"idle_days": int(os.getenv("FINOPS_IDLE_DAYS", "14")),
"max_regions": int(os.getenv("FINOPS_MAX_REGIONS", "8")),
"sns_topic_arn": os.getenv("FINOPS_SNS_TOPIC_ARN"),
}
NO_CREDITS = {"Not": {"Dimensions": {"Key": "RECORD_TYPE", "Values": ["Credit", "Refund"]}}}
EOL_LAMBDA_RUNTIMES = {
# Python
"python2.7", "python3.6", "python3.7", "python3.8", "python3.9",
# Node.js
"nodejs4.3", "nodejs6.10", "nodejs8.10", "nodejs10.x", "nodejs12.x",
"nodejs14.x", "nodejs16.x", "nodejs18.x",
# Ruby
"ruby2.5", "ruby2.6", "ruby2.7",
# Go (now use provided.al2/al2023)
"go1.x",
# Java
"java8", "java8.al2", "java11",
# .NET
"dotnetcore1.0", "dotnetcore2.0", "dotnetcore2.1", "dotnetcore3.1",
"dotnet5.0", "dotnet6", "dotnet7",
# Custom
"provided",
}
The session: one object, every analyzer reads from it
If I follow the OOP analogy, the Session is my object model. It holds the boto3 session, the account identity, the active regions, and a lazy client cache so I do not pay the cost of constructing a new boto3 client for every region in every tool. The two globals (SESSION and FINDINGS) are deliberate — every @tool reads from s() and writes through record(), and that is the only state the agent shares. I tried passing the session through tool arguments first, but Strands tools are called by the model, not by my code, and I do not want the model to invent session arguments. The helpers below — try_aws, paginate, per_region, cw_stats — are the boring infrastructure that lets every analyzer survive a region with broken IAM and keep going instead of crashing the whole run.
class Session:
"""Bundles credentials, account identity, region list, and a client cache."""
def __init__(self, boto: boto3.Session, account_id: str, alias: str | None,
primary_region: str, regions: list[str]):
self.boto = boto
self.account_id = account_id
self.alias = alias
self.primary_region = primary_region
self.regions = regions
self._clients: dict[tuple[str, str], Any] = {}
def client(self, service: str, region: str | None = None):
key = (service, region or self.primary_region)
if key not in self._clients:
self._clients[key] = self.boto.client(service, region_name=key[1])
return self._clients[key]
SESSION: Session | None = None
FINDINGS: dict[str, dict] = {}
def s() -> Session:
if SESSION is None:
raise RuntimeError("Session not initialized — call build_session() first")
return SESSION
def record(section: str, data: dict) -> dict:
"""Store findings under `section` AND return them. Each tool ends with this."""
FINDINGS[section] = data
return data
def today() -> date:
return datetime.now(timezone.utc).date()
def money(x) -> float:
try:
return round(float(x or 0), 4)
except (TypeError, ValueError):
return 0.0
def try_aws(fn: Callable, default):
"""Run fn(); on any AWS/runtime error log to stderr and return `default`.
Lets tools keep going even if some regions/services lack permission."""
try:
return fn()
except Exception as e:
print(f"[finops-agent] {type(e).__name__}: {e}", file=sys.stderr)
return default
def paginate(client, op: str, key: str, **kw) -> list:
items: list = []
for page in client.get_paginator(op).paginate(**kw):
items.extend(page.get(key, []))
return items
def per_region(service: str, fn: Callable[[str, Any], dict]) -> dict[str, dict]:
"""Call fn(region, client_for_service) for each active region, swallow errors,
return {region: result}. Tools that scan multi-region use this."""
out = {}
for region in s().regions:
client = s().client(service, region)
out[region] = try_aws(lambda c=client, r=region: fn(r, c), {})
return out
def cw_stats(client, namespace: str, metric: str, dims: list[dict],
days: int, stat: str = "Average", period: int = 86400) -> list[float]:
end = datetime.now(timezone.utc)
start = end - timedelta(days=days)
r = try_aws(
lambda: client.get_metric_statistics(
Namespace=namespace, MetricName=metric, Dimensions=dims,
StartTime=start, EndTime=end, Period=period, Statistics=[stat],
),
{"Datapoints": []},
)
return [d[stat] for d in r.get("Datapoints", [])]
Cost Explorer wrappers: the most expensive API in AWS
Cost Explorer is the most opinionated API in AWS — and one of the few where every call costs you money ($0.01 per request). Every analyzer that touches money goes through these two helpers — _ce_groups for one-shot service-grouped cost windows and _ce_monthly_with_top for the trailing-N-months trend with the top services per month. If I let every tool issue its own raw get_cost_and_usage calls, I would be paying for the privilege of looking at my own waste. I also always strip out credits and refunds with NO_CREDITS, otherwise the trend chart shows phantom drops every time AWS applies a credit and the agent gets confused about what the spend actually looks like.
def _ce_groups(start: date, end: date, key: str, group_type: str = "DIMENSION") -> dict[str, float]:
"""Run a CE GetCostAndUsage and flatten Service-grouped results to {service: usd}."""
r = try_aws(
lambda: s().client("ce").get_cost_and_usage(
TimePeriod={"Start": start.isoformat(), "End": end.isoformat()},
Granularity="MONTHLY",
Metrics=["UnblendedCost"],
GroupBy=[{"Type": group_type, "Key": key}],
Filter=NO_CREDITS,
),
{"ResultsByTime": []},
)
out: dict[str, float] = {}
for tp in r.get("ResultsByTime", []):
for g in tp.get("Groups", []):
k = g["Keys"][0]
out[k] = out.get(k, 0.0) + money(g["Metrics"]["UnblendedCost"]["Amount"])
return out
def _ce_monthly_with_top(months: int) -> list[dict]:
"""Last N months grouped by service, with top 8 services per month."""
end = today() + timedelta(days=1)
start = (today() - timedelta(days=30 * months)).replace(day=1)
r = try_aws(
lambda: s().client("ce").get_cost_and_usage(
TimePeriod={"Start": start.isoformat(), "End": end.isoformat()},
Granularity="MONTHLY",
Metrics=["UnblendedCost"],
GroupBy=[{"Type": "DIMENSION", "Key": "SERVICE"}],
Filter=NO_CREDITS,
),
{"ResultsByTime": []},
)
out = []
for tp in r.get("ResultsByTime", []):
services = sorted(
({"service": g["Keys"][0],
"cost_usd": money(g["Metrics"]["UnblendedCost"]["Amount"])}
for g in tp.get("Groups", [])),
key=lambda x: -x["cost_usd"],
)
out.append({
"month": tp["TimePeriod"]["Start"],
"total_usd": round(sum(svc["cost_usd"] for svc in services), 2),
"top": services[:8],
})
return out
Bootstrap: let the spend tell you which regions matter
Now the bootstrap. This is where the agent figures out which AWS account it is even talking to and which regions are worth scanning. The trick is _detect_active_regions — instead of scanning every AWS region (28+ at the time of writing), I ask Cost Explorer which regions had non-zero spend in the last 30 days and only scan those. A personal dev account ends up with one region. An enterprise workload ends up with eight. The agent does not need to know in advance which scenario it is in; the data tells it. The same build_session also handles the cross-account case via --role-arn, which was the original use case — running the agent against customer accounts where I had audit access.
def build_session(profile: str | None = None, role_arn: str | None = None,
region: str = None, explicit_regions: list[str] | None = None) -> Session:
region = region or CONFIG["region"]
base = boto3.Session(profile_name=profile) if profile else boto3.Session()
if role_arn:
creds = base.client("sts", region_name=region).assume_role(
RoleArn=role_arn, RoleSessionName="finops-agent",
)["Credentials"]
sess = boto3.Session(
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"],
region_name=region,
)
else:
sess = base
account_id = sess.client("sts", region_name=region).get_caller_identity()["Account"]
aliases = try_aws(lambda: sess.client("iam").list_account_aliases().get("AccountAliases", []), [])
alias = aliases[0] if aliases else None
regions = explicit_regions or _detect_active_regions(sess, region)
return Session(sess, account_id, alias, region, regions[:CONFIG["max_regions"]])
def _detect_active_regions(sess: boto3.Session, primary: str) -> list[str]:
"""Use CE to find regions with >$0 spend in the last 30 days. Falls back to [primary]."""
end = date.today() + timedelta(days=1)
start = end - timedelta(days=31)
r = try_aws(
lambda: sess.client("ce", region_name="us-east-1").get_cost_and_usage(
TimePeriod={"Start": start.isoformat(), "End": end.isoformat()},
Granularity="MONTHLY",
Metrics=["UnblendedCost"],
GroupBy=[{"Type": "DIMENSION", "Key": "REGION"}],
Filter=NO_CREDITS,
),
{"ResultsByTime": []},
)
pairs: list[tuple[str, float]] = []
for tp in r.get("ResultsByTime", []):
for g in tp.get("Groups", []):
rg, cost = g["Keys"][0], money(g["Metrics"]["UnblendedCost"]["Amount"])
if cost > 0 and rg and rg not in ("NoRegion", "global"):
pairs.append((rg, cost))
pairs.sort(key=lambda x: -x[1])
out = [r for r, _ in pairs]
if primary not in out:
out.insert(0, primary)
return out or [primary]
The cost layer: trends, anomalies, commitments
These are the first four tools the agent will call. discover_account always runs first — it anchors the account identity in the conversation and tells the agent "here is what you are looking at." Then analyze_cost_trends builds the spend profile and the 7-day-vs-prior-7-day deltas, which is where most "why is the bill suddenly higher?" questions are answered. analyze_anomalies_and_budgets checks whether the account even has Cost Anomaly Detection or Budgets configured — most accounts I have looked at do not, and that itself is a finding. analyze_commitments looks at Reserved Instance and Savings Plans coverage, but it includes a guardrail recommendation: skip RI/SP if compute spend is below $50/month. Telling a hobby account to buy a 1-year commitment is the kind of advice that makes people stop trusting the agent.
# TOOLS
@tool
def discover_account() -> dict:
"""First tool to call. Returns account identity, alias, organization status,
and the regions that have non-zero spend. No arguments."""
sess = s()
org = try_aws(
lambda: sess.client("organizations").describe_organization().get("Organization", {}),
{},
)
return record("account", {
"account_id": sess.account_id,
"account_alias": sess.alias,
"primary_region": sess.primary_region,
"active_regions": sess.regions,
"is_payer_or_member": bool(org.get("Id")),
"master_account_id": org.get("MasterAccountId"),
})
@tool
def analyze_cost_trends() -> dict:
"""Last N months by service + 7d-vs-prior-7d service deltas + current-month forecast."""
months = _ce_monthly_with_top(CONFIG["lookback_months"])
end = today() + timedelta(days=1)
cur = _ce_groups(today() - timedelta(days=7), end, "SERVICE")
prior = _ce_groups(today() - timedelta(days=14), today() - timedelta(days=7), "SERVICE")
deltas = []
for svc in set(cur) | set(prior):
c, p = cur.get(svc, 0.0), prior.get(svc, 0.0)
delta = c - p
pct = (delta / p * 100) if p > 0 else (100.0 if c > 0 else 0.0)
flagged = abs(delta) > CONFIG["delta_usd"] or abs(pct) > CONFIG["delta_pct"]
if flagged or abs(delta) >= 0.5:
deltas.append({
"service": svc, "current_7d": round(c, 2), "prior_7d": round(p, 2),
"delta_usd": round(delta, 2), "delta_pct": round(pct, 1), "flagged": flagged,
})
deltas.sort(key=lambda x: -abs(x["delta_usd"]))
forecast_resp = try_aws(
lambda: s().client("ce").get_cost_forecast(
TimePeriod={
"Start": today().isoformat(),
"End": (today().replace(day=1) + timedelta(days=32)).replace(day=1).isoformat(),
},
Granularity="MONTHLY", Metric="UNBLENDED_COST",
),
None,
)
forecast = (
round(money(forecast_resp["Total"]["Amount"]), 2)
if forecast_resp and "Total" in forecast_resp else None
)
return record("costs", {
"monthly": months,
"deltas_7d": deltas[:15],
"forecast_current_month_usd": forecast,
"last_month_total_usd": months[-1]["total_usd"] if months else 0,
"trailing_months": CONFIG["lookback_months"],
})
@tool
def analyze_anomalies_and_budgets() -> dict:
"""Cost Anomaly Detection (last 90d) + all AWS Budgets state."""
sess = s()
anom_resp = try_aws(
lambda: sess.client("ce").get_anomalies(
DateInterval={
"StartDate": (today() - timedelta(days=90)).isoformat(),
"EndDate": today().isoformat(),
},
),
{"Anomalies": []},
)
anomalies = [
{
"id": a.get("AnomalyId"),
"score": a.get("AnomalyScore", {}).get("CurrentScore"),
"impact_usd": round(money(a.get("Impact", {}).get("TotalImpact", 0)), 2),
"service": (a.get("RootCauses") or [{}])[0].get("Service"),
"start": a.get("AnomalyStartDate"),
"end": a.get("AnomalyEndDate"),
}
for a in anom_resp.get("Anomalies", [])
]
monitor_count = len(try_aws(lambda: sess.client("ce").get_anomaly_monitors().get("AnomalyMonitors", []), []))
raw_budgets = try_aws(
lambda: sess.client("budgets").describe_budgets(AccountId=sess.account_id).get("Budgets", []),
[],
)
budgets = []
for b in raw_budgets:
actual = money(b.get("CalculatedSpend", {}).get("ActualSpend", {}).get("Amount", 0))
forecast = money(b.get("CalculatedSpend", {}).get("ForecastedSpend", {}).get("Amount", 0))
limit = money(b["BudgetLimit"]["Amount"])
budgets.append({
"name": b["BudgetName"],
"limit_usd": limit,
"actual_usd": round(actual, 2),
"forecast_usd": round(forecast, 2),
"exceeded_actual": bool(limit) and actual > limit,
"exceeded_forecast": bool(limit) and forecast > limit,
"time_unit": b.get("TimeUnit"),
})
return record("anomalies_budgets", {
"anomalies": anomalies,
"anomaly_monitor_count": monitor_count,
"anomaly_detection_configured": monitor_count > 0,
"budgets": budgets,
"budget_count": len(budgets),
})
@tool
def analyze_commitments() -> dict:
"""RI + Savings Plans coverage and OnDemand exposure for the last 30 days."""
sess = s()
start, end = today() - timedelta(days=30), today() + timedelta(days=1)
sp_cov = try_aws(
lambda: sess.client("ce").get_savings_plans_coverage(
TimePeriod={"Start": start.isoformat(), "End": end.isoformat()},
Granularity="MONTHLY",
),
{"SavingsPlansCoverages": []},
)
sp_summary = [
{
"covered_usd": money(c.get("Coverage", {}).get("SpendCoveredBySavingsPlans", 0)),
"ondemand_usd": money(c.get("Coverage", {}).get("OnDemandCost", 0)),
"coverage_pct": money(c.get("Coverage", {}).get("CoveragePercentage", 0)),
}
for c in sp_cov.get("SavingsPlansCoverages", [])
]
ri_cov = try_aws(
lambda: sess.client("ce").get_reservation_coverage(
TimePeriod={"Start": start.isoformat(), "End": end.isoformat()},
Granularity="MONTHLY",
),
{"CoveragesByTime": []},
)
ri_summary = [
{"metric": k, "value": v}
for tp in ri_cov.get("CoveragesByTime", [])
for k, v in tp.get("Total", {}).get("CoverageHours", {}).items()
]
ondemand = sum(x["ondemand_usd"] for x in sp_summary)
rec = (
"skip RI/SP — compute spend too low to justify a commitment"
if ondemand < 50 else
"consider Compute Savings Plan — OnDemand compute > $50/mo"
)
return record("commitments", {
"savings_plans_coverage": sp_summary,
"reservation_coverage": ri_summary,
"ondemand_compute_30d_usd": round(ondemand, 2),
"recommendation": rec,
})
Compute: where Strands' model-driven approach earns its keep
I could have written one giant analyze_compute tool that returned a 50-key dictionary, and the model would have done its best with it — but the model reasons better when each tool has a small, clear scope. So analyze_compute_optimizer is a thin wrapper over the AWS Compute Optimizer API across six resource types, and analyze_lambda does what Compute Optimizer cannot: it counts the x86_64-vs-arm64 split (Graviton migration opportunity, usually a 20% saving for the same workload), finds SnapStart "On" published versions that nobody invokes (perpetual cache cost that the team forgot about), and lists the EOL runtimes that AWS will eventually force off the platform anyway. This is the "right tools for a single agent" idea in practice — surgical scopes, not a swarm.
@tool
def analyze_compute_optimizer() -> dict:
"""AWS Compute Optimizer recommendations across EC2, ASG, EBS, Lambda, ECS, RDS."""
co = s().client("compute-optimizer", region=s().primary_region)
enrollment = try_aws(lambda: co.get_enrollment_status(), {})
if enrollment.get("status") not in ("Active", "Pending"):
return record("compute_optimizer", {"enrolled": False, "note": "Compute Optimizer not enabled"})
operations = [
("ec2", "get_ec2_instance_recommendations", "instanceRecommendations"),
("asg", "get_auto_scaling_group_recommendations", "autoScalingGroupRecommendations"),
("ebs", "get_ebs_volume_recommendations", "volumeRecommendations"),
("lambda", "get_lambda_function_recommendations", "lambdaFunctionRecommendations"),
("ecs", "get_ecs_service_recommendations", "ecsServiceRecommendations"),
("rds", "get_rds_database_recommendations", "rdsDBRecommendations"),
]
summary: dict = {}
for label, op, key in operations:
recs = try_aws(lambda op=op, key=key: getattr(co, op)(maxResults=50).get(key, []), [])
summary[label] = {
"count": len(recs),
"samples": [
{
"resource": (
r.get("instanceArn") or r.get("functionArn") or r.get("volumeArn")
or r.get("autoScalingGroupArn") or r.get("serviceArn") or r.get("resourceArn")
),
"finding": r.get("finding") or r.get("findingClassification") or r.get("findingReasonCodes"),
"current": r.get("currentInstanceType") or r.get("currentConfiguration"),
}
for r in recs[:5]
],
}
return record("compute_optimizer", {
"enrolled": True,
"total_recommendations": sum(v["count"] for v in summary.values()),
"by_resource_type": summary,
})
@tool
def analyze_lambda() -> dict:
"""Lambda inventory: x86_64 vs arm64 split, SnapStart-On orphan published versions
(perpetual cache cost), EOL runtimes."""
snapstart_drift: list[dict] = []
runtimes_at_eol: list[dict] = []
def scan(region: str, lam) -> dict:
fns = paginate(lam, "list_functions", "Functions")
rt = {"functions": len(fns), "x86_64": 0, "arm64": 0, "snapstart_orphans": 0}
for fn in fns:
arch = (fn.get("Architectures") or ["x86_64"])[0]
rt["x86_64" if arch == "x86_64" else "arm64"] += 1
if fn.get("Runtime") in EOL_LAMBDA_RUNTIMES:
runtimes_at_eol.append({
"region": region, "function": fn["FunctionName"], "runtime": fn["Runtime"],
})
versions = try_aws(
lambda fn=fn: lam.list_versions_by_function(FunctionName=fn["FunctionName"])["Versions"],
[],
)
snap = [
v["Version"] for v in versions
if v["Version"] != "$LATEST"
and v.get("SnapStart", {}).get("OptimizationStatus") == "On"
]
if snap:
rt["snapstart_orphans"] += len(snap)
snapstart_drift.append({
"region": region, "function": fn["FunctionName"], "snapstart_versions": snap,
})
return rt
by_region = per_region("lambda", scan)
totals = {k: sum(r.get(k, 0) for r in by_region.values())
for k in ("functions", "snapstart_orphans", "x86_64", "arm64")}
return record("lambda", {
"totals": totals,
"by_region": by_region,
"snapstart_drift": snapstart_drift[:20],
"runtimes_at_eol": runtimes_at_eol[:20],
"graviton_opportunity_count": totals["x86_64"],
})
Storage: the single biggest hidden cost
From my experience S3 is the single biggest hidden cost in most AWS accounts and the easiest to fix — one missing lifecycle policy on a 200 GB log bucket can be more than half of an account's monthly waste. The S3 analyzer walks every bucket, gets its size from CloudWatch (which is the only way to get bucket size without listing every object), checks lifecycle rules, versioning, Intelligent-Tiering, and the public-access-block, and explicitly flags any bucket over 1 GB without any lifecycle rule. EBS gets the same treatment but with three flag categories: unattached volumes (paying for nothing), gp2 volumes (always cheaper as gp3 with a one-line API call), and snapshots older than 90 days that nobody remembers creating.
def _bucket_size_bytes(cw, bucket: str) -> int:
pts = cw_stats(
cw, "AWS/S3", "BucketSizeBytes",
[{"Name": "BucketName", "Value": bucket}, {"Name": "StorageType", "Value": "StandardStorage"}],
days=2, stat="Maximum",
)
return int(max(pts)) if pts else 0
def _bucket_lifecycle_actions(rules: list[dict]) -> list[str]:
keys = ("Expiration", "Transitions", "AbortIncompleteMultipartUpload", "NoncurrentVersionExpiration")
return sorted({k for r in rules for k in keys if r.get(k)})
@tool
def analyze_storage_s3() -> dict:
"""S3 bucket inventory: size (CloudWatch), lifecycle rules + actions, versioning,
Intelligent-Tiering, public-access-block. Flags any bucket >1 GB without any
lifecycle rule (usually the biggest single S3 win)."""
sess = s()
s3 = sess.client("s3", region=sess.primary_region)
raw_buckets = try_aws(lambda: s3.list_buckets().get("Buckets", []), [])
findings: list[dict] = []
no_lc_over_1gb: list[dict] = []
total_bytes = 0
for b in raw_buckets:
name = b["Name"]
loc = try_aws(lambda: s3.get_bucket_location(Bucket=name).get("LocationConstraint"), None) or "us-east-1"
if loc == "EU":
loc = "eu-west-1"
size = try_aws(lambda: _bucket_size_bytes(sess.client("cloudwatch", loc), name), 0)
rules = try_aws(lambda: s3.get_bucket_lifecycle_configuration(Bucket=name).get("Rules", []), [])
versioning = try_aws(lambda: s3.get_bucket_versioning(Bucket=name).get("Status"), None)
pab = try_aws(
lambda: s3.get_public_access_block(Bucket=name).get("PublicAccessBlockConfiguration", {}),
{},
)
it = try_aws(
lambda: s3.list_bucket_intelligent_tiering_configurations(Bucket=name).get(
"IntelligentTieringConfigurationList", []),
[],
)
gb = size / 1_073_741_824
total_bytes += size
finding = {
"name": name,
"region": loc,
"size_gb": round(gb, 3),
"lifecycle_rules": len(rules),
"lifecycle_actions": _bucket_lifecycle_actions(rules),
"versioning": versioning,
"intelligent_tiering_configs": len(it),
"public_access_block_full": bool(pab) and all(pab.values()),
}
findings.append(finding)
if gb > 1.0 and not rules:
no_lc_over_1gb.append({"bucket": name, "size_gb": round(gb, 3), "region": loc})
findings.sort(key=lambda x: -x["size_gb"])
return record("s3", {
"bucket_count": len(raw_buckets),
"total_size_gb": round(total_bytes / 1_073_741_824, 2),
"buckets": findings[:30],
"no_lifecycle_over_1gb": no_lc_over_1gb,
"no_lifecycle_count": len(no_lc_over_1gb),
})
@tool
def analyze_storage_ebs() -> dict:
"""EBS volumes (status, type, age) + snapshots (age) per region. Flags
unattached volumes, gp2 (gp3 migration), snapshots > 90 days."""
snap_cutoff = datetime.now(timezone.utc) - timedelta(days=90)
available_volumes: list[dict] = []
gp2_volumes: list[dict] = []
old_snapshots: list[dict] = []
def scan(region: str, ec2) -> dict:
vols = paginate(ec2, "describe_volumes", "Volumes")
snaps = paginate(ec2, "describe_snapshots", "Snapshots", OwnerIds=[s().account_id])
rt = {"volumes": len(vols), "available": 0, "gp2": 0,
"snapshots": len(snaps), "old_snapshots": 0}
for v in vols:
if v["State"] == "available":
rt["available"] += 1
available_volumes.append({"region": region, "id": v["VolumeId"],
"size_gb": v["Size"], "type": v["VolumeType"]})
if v["VolumeType"] == "gp2":
rt["gp2"] += 1
gp2_volumes.append({"region": region, "id": v["VolumeId"], "size_gb": v["Size"]})
for sn in snaps:
if sn["StartTime"] < snap_cutoff:
rt["old_snapshots"] += 1
old_snapshots.append({
"region": region, "id": sn["SnapshotId"],
"age_days": (datetime.now(timezone.utc) - sn["StartTime"]).days,
"size_gb": sn["VolumeSize"],
})
return rt
by_region = per_region("ec2", scan)
totals = {
"volumes": sum(r.get("volumes", 0) for r in by_region.values()),
"available_volumes": sum(r.get("available", 0) for r in by_region.values()),
"gp2_volumes": sum(r.get("gp2", 0) for r in by_region.values()),
"old_snapshots": sum(r.get("old_snapshots", 0) for r in by_region.values()),
}
return record("ebs", {
"by_region": by_region,
"totals": totals,
"available_volumes": available_volumes[:20],
"gp2_volumes": gp2_volumes[:20],
"old_snapshots": old_snapshots[:20],
})
Databases and network: same shape, different problem
Databases and network are different problems but they share the same shape — find the resource, get a CloudWatch metric, decide whether the resource is doing work. For RDS, "doing work" means more than one average database connection over 14 days; below that threshold, the instance is a candidate to stop or downsize. For NAT gateways, it means more than 1 GB of outbound traffic over the same window; an idle NAT gateway is around $32/month for nothing. The network tool also catches unattached Elastic IPs and load balancers with zero registered targets, which are the kind of resources you forgot you provisioned during a project that got cancelled six months ago.
@tool
def analyze_databases() -> dict:
"""RDS instances + DynamoDB tables. RDS: idle detection (avg connections < 1
over 14 days). DynamoDB: billing mode, size, item count."""
sess = s()
idle_rds: list[dict] = []
rds_by_region: dict[str, dict] = {}
ddb_tables: list[dict] = []
for region in sess.regions:
rds = sess.client("rds", region)
cw = sess.client("cloudwatch", region)
instances = try_aws(lambda: paginate(rds, "describe_db_instances", "DBInstances"), [])
engines: dict[str, int] = {}
for inst in instances:
engines[inst["Engine"]] = engines.get(inst["Engine"], 0) + 1
if inst.get("DBInstanceStatus") != "available":
continue
pts = cw_stats(
cw, "AWS/RDS", "DatabaseConnections",
[{"Name": "DBInstanceIdentifier", "Value": inst["DBInstanceIdentifier"]}],
days=CONFIG["idle_days"],
)
avg = sum(pts) / len(pts) if pts else 0
if avg < 1:
idle_rds.append({
"region": region, "id": inst["DBInstanceIdentifier"],
"class": inst["DBInstanceClass"], "engine": inst["Engine"],
"avg_connections_14d": round(avg, 2), "storage_gb": inst.get("AllocatedStorage"),
})
rds_by_region[region] = {"instance_count": len(instances), "engines": engines}
ddb = sess.client("dynamodb", region)
for name in try_aws(lambda: paginate(ddb, "list_tables", "TableNames"), [])[:50]:
desc = try_aws(lambda n=name: ddb.describe_table(TableName=n)["Table"], None)
if not desc:
continue
ddb_tables.append({
"region": region, "name": name,
"billing_mode": desc.get("BillingModeSummary", {}).get("BillingMode") or "PROVISIONED",
"size_bytes": desc.get("TableSizeBytes", 0),
"item_count": desc.get("ItemCount", 0),
})
return record("databases", {
"rds": {"by_region": rds_by_region, "idle_candidates": idle_rds[:20]},
"dynamodb": {"tables": ddb_tables[:30]},
})
@tool
def analyze_network() -> dict:
"""NAT gateways with low traffic, unattached EIPs, ALB/NLB with no targets."""
sess = s()
unattached_eips: list[dict] = []
empty_lbs: list[dict] = []
low_traffic_nat: list[dict] = []
totals = {"nat_gateways": 0, "elbs": 0, "eips": 0}
for region in sess.regions:
ec2 = sess.client("ec2", region)
elbv2 = sess.client("elbv2", region)
cw = sess.client("cloudwatch", region)
eips = try_aws(lambda: ec2.describe_addresses().get("Addresses", []), [])
nats = try_aws(lambda: ec2.describe_nat_gateways().get("NatGateways", []), [])
lbs = try_aws(lambda: elbv2.describe_load_balancers().get("LoadBalancers", []), [])
totals["eips"] += len(eips)
totals["nat_gateways"] += sum(1 for n in nats if n["State"] == "available")
totals["elbs"] += len(lbs)
for eip in eips:
if not eip.get("AssociationId"):
unattached_eips.append({
"region": region, "ip": eip.get("PublicIp"),
"allocation_id": eip.get("AllocationId"),
})
for lb in lbs:
tgs = try_aws(
lambda lb=lb: elbv2.describe_target_groups(LoadBalancerArn=lb["LoadBalancerArn"])
.get("TargetGroups", []),
[],
)
target_count = sum(
len(try_aws(lambda tg=tg: elbv2.describe_target_health(TargetGroupArn=tg["TargetGroupArn"])
.get("TargetHealthDescriptions", []),
[]))
for tg in tgs
)
if target_count == 0:
empty_lbs.append({"region": region, "name": lb["LoadBalancerName"], "type": lb["Type"]})
for n in nats:
if n["State"] != "available":
continue
pts = cw_stats(
cw, "AWS/NATGateway", "BytesOutToDestination",
[{"Name": "NatGatewayId", "Value": n["NatGatewayId"]}],
days=CONFIG["idle_days"], stat="Sum",
)
outbound_gb = sum(pts) / 1_073_741_824
if outbound_gb < 1:
low_traffic_nat.append({
"region": region, "id": n["NatGatewayId"],
"outbound_gb_14d": round(outbound_gb, 3),
})
return record("network", {
"totals": totals,
"unattached_eips": unattached_eips[:15],
"empty_load_balancers": empty_lbs[:15],
"low_traffic_nat_gateways": low_traffic_nat[:15],
})
Logs, KMS, secrets: small line items that compound
Logs, KMS, and secrets are individually small but they compound across regions and accounts. A log group without retention grows forever ($0.03/GB/month — small until it isn't). An orphan /aws/lambda/* log group from a function that was deleted three years ago is pure waste. A customer-managed KMS key with no grants and no aliases is $1/month for nothing — but I am careful to flag the verification step before deletion in the tool's own docstring, because a key with no grants may still be referenced by encrypted objects somewhere, and deleting it is irreversible. A secret in Secrets Manager that has not been read in 90 days is $0.40/month per stale secret. None of these alone is interesting. Together, in an account with hundreds of resources, they often pay for a small EC2 instance.
@tool
def analyze_logs_and_metrics() -> dict:
"""CloudWatch Logs hygiene + custom metrics + dashboards.
- log groups missing retention (= infinite growth)
- /aws/lambda/* log groups whose function no longer exists
- top 20 log groups by stored bytes
- custom metric namespaces (cost = $0.30/metric/mo above 10 free)
- dashboards count ($3 each above 3 free)"""
sess = s()
missing_retention: list[dict] = []
orphan_lambda_lg: list[dict] = []
largest: list[dict] = []
custom_namespaces: dict[str, int] = {}
dashboards: list[dict] = []
by_region: dict[str, dict] = {}
totals = {"log_groups": 0, "missing_retention_count": 0,
"orphan_lambda_count": 0, "stored_gb": 0.0, "dashboards": 0}
for region in sess.regions:
logs = sess.client("logs", region)
cw = sess.client("cloudwatch", region)
lam = sess.client("lambda", region)
lambda_names = {f["FunctionName"]
for f in try_aws(lambda: paginate(lam, "list_functions", "Functions"), [])}
log_groups = try_aws(lambda: paginate(logs, "describe_log_groups", "logGroups"), [])
rt = {"log_groups": len(log_groups), "missing_retention": 0, "orphan_lambda": 0, "stored_gb": 0.0}
for lg in log_groups:
name = lg["logGroupName"]
stored = lg.get("storedBytes", 0)
gb = round(stored / 1_073_741_824, 4)
rt["stored_gb"] += gb
entry = {"region": region, "name": name, "stored_gb": gb}
largest.append(entry)
if lg.get("retentionInDays") is None:
rt["missing_retention"] += 1
missing_retention.append(entry)
if name.startswith("/aws/lambda/"):
fn_name = name[len("/aws/lambda/"):]
if fn_name not in lambda_names:
rt["orphan_lambda"] += 1
orphan_lambda_lg.append(entry)
by_region[region] = rt
totals["log_groups"] += rt["log_groups"]
totals["missing_retention_count"] += rt["missing_retention"]
totals["orphan_lambda_count"] += rt["orphan_lambda"]
totals["stored_gb"] = round(totals["stored_gb"] + rt["stored_gb"], 2)
for m in try_aws(lambda: cw.list_metrics().get("Metrics", []), []):
ns = m.get("Namespace", "")
if not ns.startswith("AWS/"):
custom_namespaces[ns] = custom_namespaces.get(ns, 0) + 1
region_dashboards = try_aws(lambda: cw.list_dashboards().get("DashboardEntries", []), [])
dashboards.extend({"region": region, "name": d["DashboardName"]} for d in region_dashboards)
totals["dashboards"] += len(region_dashboards)
largest.sort(key=lambda x: -x["stored_gb"])
return record("logs", {
"by_region": by_region,
"totals": totals,
"missing_retention": missing_retention[:20],
"orphan_lambda_log_groups": orphan_lambda_lg[:20],
"largest_log_groups": largest[:20],
"custom_metric_namespaces": custom_namespaces,
"dashboards": dashboards,
})
@tool
def analyze_kms() -> dict:
"""Customer-managed KMS keys with 0 grants AND 0 aliases (deletion candidates).
Operator must verify against CloudTrail Decrypt events before scheduling deletion —
a key with no grants/aliases may still be referenced by encrypted data."""
candidates: list[dict] = []
def scan(region: str, kms) -> dict:
keys = paginate(kms, "list_keys", "Keys")
aliases = paginate(kms, "list_aliases", "Aliases")
alias_by_target: dict[str, list[str]] = {}
for a in aliases:
tk = a.get("TargetKeyId")
if tk:
alias_by_target.setdefault(tk, []).append(a["AliasName"])
rt = {"customer_managed": 0, "candidates": 0}
for k in keys:
kid = k["KeyId"]
meta = try_aws(lambda kid=kid: kms.describe_key(KeyId=kid)["KeyMetadata"], None)
if not meta or meta.get("KeyManager") != "CUSTOMER" or meta.get("KeyState") != "Enabled":
continue
rt["customer_managed"] += 1
grants = try_aws(lambda kid=kid: kms.list_grants(KeyId=kid).get("Grants", []), [])
if not grants and not alias_by_target.get(kid):
rt["candidates"] += 1
candidates.append({
"region": region, "key_id": kid,
"created": meta["CreationDate"].isoformat(),
"description": meta.get("Description") or "(none)",
})
return rt
by_region = per_region("kms", scan)
totals = {
"customer_managed": sum(r.get("customer_managed", 0) for r in by_region.values()),
"candidates": sum(r.get("candidates", 0) for r in by_region.values()),
}
return record("kms", {
"by_region": by_region,
"totals": totals,
"candidates_for_deletion": candidates[:30],
})
@tool
def analyze_secrets() -> dict:
"""Secrets Manager: secrets not accessed in N days (default 90).
Saves $0.40/secret/mo per stale secret deleted."""
cutoff = datetime.now(timezone.utc) - timedelta(days=CONFIG["stale_days"])
stale: list[dict] = []
def scan(region: str, sm) -> dict:
secrets = paginate(sm, "list_secrets", "SecretList")
rt = {"secrets": len(secrets), "stale": 0}
for sec in secrets:
last = sec.get("LastAccessedDate")
if last and last < cutoff:
rt["stale"] += 1
stale.append({
"region": region, "name": sec["Name"],
"last_accessed": last.isoformat(),
"days_since_access": (datetime.now(timezone.utc) - last).days,
})
return rt
by_region = per_region("secretsmanager", scan)
totals = {
"secrets": sum(r.get("secrets", 0) for r in by_region.values()),
"stale": sum(r.get("stale", 0) for r in by_region.values()),
}
return record("secrets", {"by_region": by_region, "totals": totals, "stale": stale[:30]})
Security tooling and governance: paying for what you don't have
The next two tools step away from direct cost and into governance. Security tooling is something I check because most accounts have GuardDuty, Security Hub, Inspector, or Macie enabled with features that scan resources that do not exist (S3 protection enabled in an account with no buckets, EBS scanning where there are no EC2 instances, full-fat Macie sessions on accounts that store nothing sensitive). The governance tool looks at cost-allocation tags, because an account with zero active tags is an account where you cannot answer "what does this team's spend look like?" — and that is a problem long before it is a cost problem.
def _strip_arn_segment(arn: str) -> str:
return arn.rsplit("/", 4)[-3] if "/" in arn else arn
@tool
def analyze_security_tooling() -> dict:
"""Security stack vs actual workload: Security Hub standards + AutoEnable,
GuardDuty enabled features, Inspector resource scope, Macie status. Surfaces
features enabled for infrastructure that doesn't exist (waste)."""
sess = s()
by_region: dict[str, dict] = {}
for region in sess.regions:
rt: dict = {}
sh = sess.client("securityhub", region)
hub = try_aws(lambda: sh.describe_hub(), None)
if hub:
stds = try_aws(lambda: sh.get_enabled_standards().get("StandardsSubscriptions", []), [])
rt["security_hub"] = {
"auto_enable_controls": hub.get("AutoEnableControls"),
"control_finding_generator": hub.get("ControlFindingGenerator"),
"standards": [_strip_arn_segment(s.get("StandardsArn", "")) for s in stds],
}
else:
rt["security_hub"] = {"enabled": False}
gd = sess.client("guardduty", region)
ds = try_aws(lambda: gd.list_detectors().get("DetectorIds", []), [])
if ds:
d = try_aws(lambda: gd.get_detector(DetectorId=ds[0]), {})
rt["guardduty"] = {
"status": d.get("Status"),
"features_enabled": [f["Name"] for f in d.get("Features", []) if f.get("Status") == "ENABLED"],
"features_disabled": [f["Name"] for f in d.get("Features", []) if f.get("Status") == "DISABLED"],
}
else:
rt["guardduty"] = {"enabled": False}
insp = sess.client("inspector2", region)
accs = try_aws(
lambda: insp.batch_get_account_status(accountIds=[sess.account_id]).get("accounts", []),
[],
)
rt["inspector"] = (
{"status": accs[0]["state"]["status"],
"resources": {k: v["status"] for k, v in accs[0].get("resourceState", {}).items()}}
if accs else {"enabled": False}
)
macie_sess = try_aws(lambda: sess.client("macie2", region).get_macie_session(), None)
rt["macie"] = {"status": macie_sess.get("status")} if macie_sess else {"enabled": False}
by_region[region] = rt
return record("security_tooling", {"by_region": by_region})
@tool
def analyze_governance() -> dict:
"""Cost allocation tag activation + tag-coverage analysis. Reports % of last
30 days of spend covered by each tag key."""
sess = s()
ce = sess.client("ce")
end = today() + timedelta(days=1)
start = today() - timedelta(days=30)
active = try_aws(
lambda: ce.list_cost_allocation_tags(Status="Active").get("CostAllocationTags", []),
[],
)
all_tags = try_aws(lambda: ce.list_cost_allocation_tags().get("CostAllocationTags", []), [])
inactive_user = [t for t in all_tags
if t.get("Type") == "UserDefined" and t.get("Status") == "Inactive"]
coverage: dict = {}
for t in active[:5]:
r = try_aws(
lambda t=t: ce.get_cost_and_usage(
TimePeriod={"Start": start.isoformat(), "End": end.isoformat()},
Granularity="MONTHLY",
Metrics=["UnblendedCost"],
GroupBy=[{"Type": "TAG", "Key": t["TagKey"]}],
Filter=NO_CREDITS,
),
None,
)
if not r:
continue
tagged = untagged = 0.0
for tp in r.get("ResultsByTime", []):
for g in tp.get("Groups", []):
cost = money(g["Metrics"]["UnblendedCost"]["Amount"])
if g["Keys"][0].endswith("$"): # untagged comes back as "TagKey$"
untagged += cost
else:
tagged += cost
total = tagged + untagged
coverage[t["TagKey"]] = {
"tagged_usd": round(tagged, 2),
"untagged_usd": round(untagged, 2),
"tagged_pct": round(tagged / total * 100, 1) if total else 0,
}
cats = try_aws(
lambda: ce.list_cost_category_definitions().get("CostCategoryReferences", []),
[],
)
return record("governance", {
"active_cost_allocation_tag_count": len(active),
"active_cost_allocation_tags": [t["TagKey"] for t in active],
"inactive_user_defined_tag_count": len(inactive_user),
"inactive_user_defined_tags": [t["TagKey"] for t in inactive_user],
"tag_coverage_30d": coverage,
"cost_categories": [c["Name"] for c in cats],
})
The synthesis tool: one PDF, one call, at the end
This is the synthesis tool, and it is the only one that is not read-only. The agent calls render_pdf_report exactly once at the end, with the executive summary it has written and the prioritized recommendation list it has built across all the previous tools. The PDF gets saved locally and (optionally) a notification gets pushed to SNS, which is what makes this useful in a Lambda or scheduled run. I deliberately did not make the PDF generation part of any analysis tool — the model decides when it has enough information to render, and that decision is more accurate when it is a single explicit step instead of an implicit one.
@tool
def render_pdf_report(executive_summary: str, recommendations: list[dict]) -> dict:
"""Render the final PDF using ALL the findings collected so far + the agent's
synthesis. Pass the executive summary text and a prioritized list of
recommendations (each: {priority, title, savings_usd_per_month, rationale,
action_steps}). Returns the saved PDF path.
Args:
executive_summary: 2-4 paragraph plain-text overview.
recommendations: list of dicts with keys priority, title,
savings_usd_per_month, rationale, action_steps.
"""
sess = s()
out_dir = Path(os.getenv("FINOPS_OUTPUT_DIR", str(CONFIG["output_dir"])))
out_dir.mkdir(parents=True, exist_ok=True)
label = sess.alias or sess.account_id
safe_label = "".join(c if c.isalnum() else "-" for c in label)[:40].strip("-")
pdf_path = out_dir / f"finops-{safe_label}-{today().isoformat()}.pdf"
pdf_renderer.render(
output_path=pdf_path,
account_id=sess.account_id,
account_alias=sess.alias,
regions=sess.regions,
run_at=datetime.now(timezone.utc),
executive_summary=executive_summary,
recommendations=recommendations,
findings=[{"section": k, "data": v} for k, v in FINDINGS.items()],
)
sns_status = "skipped"
if CONFIG["sns_topic_arn"]:
sns_status = try_aws(
lambda: (sess.client("sns").publish(
TopicArn=CONFIG["sns_topic_arn"],
Subject=f"FinOps report: {label}"[:99],
Message=(f"FinOps report for {sess.account_id} ({label}).\n"
f"PDF: {pdf_path}\n\nExecutive summary:\n{executive_summary[:1000]}"),
) or "published"),
"publish-failed",
)
return {
"pdf_path": str(pdf_path),
"size_bytes": pdf_path.stat().st_size,
"page_count_estimate": max(1, len(FINDINGS) + 2),
"sns": sns_status,
}
Wiring the agent: the most underrated part of Strands
Now the part of the code that is the most Strands-specific and, in my opinion, the most underrated. The agent is just a system prompt, a model, and a list of tool functions. There is no orchestration layer, no agent graph, no router. The system prompt tells the agent the workflow order, the analysis principles, and the output quality rules — and Strands lets the model decide which tool to call next based on what it has already seen and what it still needs. This is what I meant earlier about "the right tools for a single agent." A swarm would not have helped here. The work is sequential by nature (you cannot synthesize recommendations until you have collected the findings), and adding more agents would just add coordination overhead and prompt-engineering surface area for no benefit. The model is the orchestrator.
SYSTEM_PROMPT = """You are a senior FinOps consultant performing a comprehensive cost-optimization review of an AWS account.
GOAL: Produce a thorough, actionable PDF report that any AWS account owner can use — whether it's a personal dev account ($20/mo) or an enterprise workload ($20k/mo). You don't know which until you discover it.
WORKFLOW (in order):
1. `discover_account` first — anchors account id, alias, active regions.
2. `analyze_cost_trends` — spend profile and recent deltas.
3. `analyze_anomalies_and_budgets`, `analyze_commitments` — cost-layer signals.
4. All inventory analyzers: `analyze_compute_optimizer`, `analyze_lambda`, `analyze_storage_s3`, `analyze_storage_ebs`, `analyze_databases`, `analyze_network`, `analyze_logs_and_metrics`, `analyze_kms`, `analyze_secrets`, `analyze_security_tooling`, `analyze_governance`.
5. Synthesize a prioritized recommendation list across sections, ordered by estimated savings descending.
6. Call `render_pdf_report` ONCE at the end with:
- executive_summary: 2-4 paragraphs covering the spend profile, the 3 biggest opportunities, and the overall posture.
- recommendations: list of {priority: 'high'|'medium'|'low', title, savings_usd_per_month, rationale, action_steps: list[str] of concrete CLI commands}.
ANALYSIS PRINCIPLES:
- Scale recommendations to spend. A $20 personal account doesn't need RI/SP analysis; a $20k workload absolutely does.
- Always identify the BIGGEST lever first. Most accounts have one issue worth >50% of waste.
- For S3: missing lifecycle on >1 GB buckets, gp2 storage class skew, no Intelligent-Tiering on large unpredictable buckets.
- For Lambda: SnapStart drift on published versions, x86_64-to-Graviton migration, EOL runtimes.
- For logs: missing retention = infinite growth, orphan log groups from deleted Lambdas.
- For KMS: $1/key/mo for customer-managed keys with zero usage signals.
- For databases: idle RDS (avg connections < 1 over 14d) and unused DynamoDB tables.
- For network: NAT gateways with <1 GB monthly traffic, unattached EIPs, ALBs with no targets.
- For security tooling: features for infra that doesn't exist; no Cost Anomaly Detection = governance gap.
- For governance: 0 active cost-allocation tags = blind spend.
OUTPUT QUALITY:
- Reference actual resource IDs/names from tool outputs in your recommendations.
- Estimate savings honestly. Use AWS published prices where applicable. Use 0 if you can't estimate.
- Don't recommend deletion of KMS keys without flagging the verification step (grants/CloudTrail).
- For each recommendation, action_steps should include exact aws CLI commands.
When you call render_pdf_report, that's your final action. After it returns, your last message should be the path of the saved PDF and a one-line summary of the top finding.
"""
TOOL_FUNCS = [
discover_account,
analyze_cost_trends,
analyze_anomalies_and_budgets,
analyze_commitments,
analyze_compute_optimizer,
analyze_lambda,
analyze_storage_s3,
analyze_storage_ebs,
analyze_databases,
analyze_network,
analyze_logs_and_metrics,
analyze_kms,
analyze_secrets,
analyze_security_tooling,
analyze_governance,
render_pdf_report,
]
def build_agent() -> Agent:
return Agent(
model=BedrockModel(model_id=CONFIG["model"], region_name=s().primary_region),
system_prompt=SYSTEM_PROMPT,
tools=TOOL_FUNCS,
)
def run_once() -> str:
sess = s()
prompt = (
f"Run the full FinOps review for AWS account {sess.account_id} "
f"(alias: {sess.alias or 'none'}). Active regions: {', '.join(sess.regions)}. "
f"Use every analysis tool, synthesize cross-section recommendations, render a PDF. "
f"Return the PDF path."
)
return str(build_agent()(prompt))
CLI: same code, your account or somebody else's
The CLI is small but it is the part that decides whether the agent runs against my own AWS account or somebody else's via --role-arn. The original use case was "run this on a customer account that I have audit access to" — but the same code now runs on my personal dev account, on team accounts at work, and on any account I can assume a role into. If you want to put it in a Lambda, the main() function is what you replace with a lambda_handler; the rest of the code stays the same.
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(
prog="finops-agent",
description="Generic FinOps cost-optimization agent for any AWS account.",
)
p.add_argument("--profile", help="AWS named profile to use")
p.add_argument("--role-arn", help="IAM role ARN to assume before running")
p.add_argument("--region", default=CONFIG["region"], help="Primary region")
p.add_argument("--regions", help="Comma-separated regions (default: auto-detect from spend)")
p.add_argument("--output-dir", default=str(CONFIG["output_dir"]), help="Where the PDF is saved")
p.add_argument("--model", default=CONFIG["model"], help="Bedrock model ID")
return p.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = _parse_args(argv)
CONFIG["model"] = args.model
CONFIG["output_dir"] = Path(args.output_dir)
os.environ["FINOPS_OUTPUT_DIR"] = args.output_dir
os.environ["FINOPS_BEDROCK_MODEL"] = args.model
explicit_regions = args.regions.split(",") if args.regions else None
global SESSION, FINDINGS
FINDINGS = {}
SESSION = build_session(
profile=args.profile,
role_arn=args.role_arn,
region=args.region,
explicit_regions=explicit_regions,
)
print(
f"[finops-agent] account={SESSION.account_id} alias={SESSION.alias or '(none)'} "
f"regions={SESSION.regions} output={args.output_dir}",
file=sys.stderr,
)
try:
print(run_once())
return 0
except Exception:
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())
PDF renderer
This is the function or tool that prepares the report for the stakeholders
"""
PDF renderer for the FinOps agent.
Pure-Python (ReportLab + PIL via reportlab.graphics) so it runs in Lambda
without external binaries. Produces a multi-section PDF with:
- cover (account, run timestamp, regions scanned, headline savings)
- executive summary
- prioritized recommendations table
- per-section findings (one section per analyzer)
- a spend-trend bar chart
"""
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Any
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import mm
from reportlab.platypus import (
BaseDocTemplate,
Frame,
KeepInFrame,
KeepTogether,
PageBreak,
PageTemplate,
Paragraph,
Spacer,
Table,
TableStyle,
)
from reportlab.graphics.shapes import Drawing, String
from reportlab.graphics.charts.barcharts import VerticalBarChart
# ---------- styling ----------
AWS_NAVY = colors.HexColor("#232f3e")
AWS_ORANGE = colors.HexColor("#ff9900")
AWS_GREY_LIGHT = colors.HexColor("#fafafa")
AWS_GREY_BORDER = colors.HexColor("#d5dbdb")
RED = colors.HexColor("#d13212")
GREEN = colors.HexColor("#1d8102")
BLUE = colors.HexColor("#0073bb")
def _styles() -> dict[str, ParagraphStyle]:
base = getSampleStyleSheet()
return {
"title": ParagraphStyle(
"title", parent=base["Title"], fontSize=22, leading=26,
textColor=AWS_NAVY, spaceAfter=4,
),
"subtitle": ParagraphStyle(
"subtitle", parent=base["Normal"], fontSize=11, leading=14,
textColor=colors.HexColor("#545b64"),
),
"h1": ParagraphStyle(
"h1", parent=base["Heading1"], fontSize=15, leading=18,
textColor=AWS_NAVY, spaceBefore=10, spaceAfter=6,
),
"h2": ParagraphStyle(
"h2", parent=base["Heading2"], fontSize=12, leading=15,
textColor=AWS_NAVY, spaceBefore=8, spaceAfter=4,
),
"body": ParagraphStyle(
"body", parent=base["Normal"], fontSize=9.5, leading=13,
),
"small": ParagraphStyle(
"small", parent=base["Normal"], fontSize=8.5, leading=11,
),
"code": ParagraphStyle(
"code", parent=base["Normal"], fontSize=8.5, leading=11,
fontName="Courier", textColor=colors.HexColor("#16191f"),
leftIndent=4, backColor=AWS_GREY_LIGHT,
),
"callout": ParagraphStyle(
"callout", parent=base["Normal"], fontSize=9.5, leading=13,
leftIndent=8, rightIndent=8,
backColor=colors.HexColor("#fff8e7"),
),
"high": ParagraphStyle(
"high", parent=base["Normal"], fontSize=9, leading=11,
textColor=RED, fontName="Helvetica-Bold",
),
"medium": ParagraphStyle(
"medium", parent=base["Normal"], fontSize=9, leading=11,
textColor=AWS_ORANGE, fontName="Helvetica-Bold",
),
"low": ParagraphStyle(
"low", parent=base["Normal"], fontSize=9, leading=11,
textColor=BLUE,
),
}
def _table_style(header: bool = True) -> TableStyle:
cmds = [
("FONTSIZE", (0, 0), (-1, -1), 8.5),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("GRID", (0, 0), (-1, -1), 0.25, AWS_GREY_BORDER),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, AWS_GREY_LIGHT]),
("LEFTPADDING", (0, 0), (-1, -1), 4),
("RIGHTPADDING", (0, 0), (-1, -1), 4),
("TOPPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
]
if header:
cmds.extend(
[
("BACKGROUND", (0, 0), (-1, 0), AWS_NAVY),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
]
)
return TableStyle(cmds)
def _on_page(canvas, doc):
canvas.saveState()
canvas.setStrokeColor(AWS_ORANGE)
canvas.setLineWidth(2)
canvas.line(14 * mm, A4[1] - 12 * mm, A4[0] - 14 * mm, A4[1] - 12 * mm)
canvas.setFont("Helvetica", 7.5)
canvas.setFillColor(colors.HexColor("#687078"))
canvas.drawString(14 * mm, 8 * mm, doc._meta_left)
canvas.drawRightString(A4[0] - 14 * mm, 8 * mm, f"Page {doc.page}")
canvas.restoreState()
def _spend_chart(monthly: list[dict]) -> Drawing | None:
if not monthly:
return None
d = Drawing(420, 150)
chart = VerticalBarChart()
chart.x = 50
chart.y = 30
chart.width = 350
chart.height = 100
data = [[m["total_usd"] for m in monthly]]
chart.data = data
chart.categoryAxis.categoryNames = [m["month"][:7] for m in monthly]
chart.categoryAxis.labels.fontSize = 7
chart.valueAxis.valueMin = 0
max_v = max(data[0]) if data[0] else 1
chart.valueAxis.valueMax = max_v * 1.15
chart.valueAxis.labels.fontSize = 7
chart.bars[0].fillColor = AWS_ORANGE
chart.bars[0].strokeColor = AWS_NAVY
chart.barWidth = 12
chart.groupSpacing = 10
d.add(chart)
d.add(String(50, 140, "Monthly total spend (USD)", fontSize=9, fillColor=AWS_NAVY))
return d
# ---------- section helpers ----------
def _section_costs(data: dict, styles) -> list:
elems: list = []
monthly = data.get("monthly", [])
if monthly:
chart = _spend_chart(monthly)
if chart:
elems.append(chart)
elems.append(Spacer(1, 4))
rows = [["Month", "Total USD", "Top services (top 3)"]]
for m in monthly:
top = ", ".join(f"{s['service']} ${s['cost_usd']:.2f}" for s in m.get("top", [])[:3])
rows.append([m["month"][:7], f"${m['total_usd']:.2f}", Paragraph(top or "—", styles["small"])])
t = Table(rows, colWidths=[24 * mm, 24 * mm, 120 * mm])
t.setStyle(_table_style())
elems.append(t)
elems.append(Spacer(1, 6))
forecast = data.get("forecast_current_month_usd")
if forecast is not None:
elems.append(Paragraph(f"<b>Current-month forecast:</b> ${forecast:.2f}", styles["body"]))
elems.append(Spacer(1, 4))
deltas = data.get("deltas_7d", [])
if deltas:
elems.append(Paragraph("<b>Service deltas — last 7 days vs prior 7 days:</b>", styles["body"]))
rows = [["Service", "Current $", "Prior $", "Δ $", "Δ %", "Flagged"]]
for d in deltas[:12]:
rows.append(
[
Paragraph(d["service"], styles["small"]),
f"${d['current_7d']:.2f}",
f"${d['prior_7d']:.2f}",
f"${d['delta_usd']:+.2f}",
f"{d['delta_pct']:+.1f}%",
"yes" if d["flagged"] else "—",
]
)
t = Table(rows, colWidths=[60 * mm, 22 * mm, 22 * mm, 22 * mm, 22 * mm, 18 * mm])
t.setStyle(_table_style())
elems.append(t)
return elems
def _kv_table(d: dict, styles, col_widths=(60 * mm, 100 * mm)) -> Table:
rows = []
for k, v in d.items():
if isinstance(v, (list, dict)):
v = str(v)
rows.append([Paragraph(str(k), styles["small"]), Paragraph(str(v), styles["small"])])
if not rows:
rows = [[Paragraph("—", styles["small"]), Paragraph("—", styles["small"])]]
t = Table(rows, colWidths=col_widths)
t.setStyle(_table_style(header=False))
return t
def _list_table(items: list[dict], cols: list[tuple[str, str]], styles, max_rows: int = 15) -> Table | Paragraph:
if not items:
return Paragraph("None.", styles["small"])
headers = [c[1] for c in cols]
rows = [headers]
for it in items[:max_rows]:
row = []
for key, _ in cols:
v = it.get(key, "")
if isinstance(v, list):
v = ", ".join(str(x) for x in v[:5])
elif isinstance(v, dict):
v = str(v)
row.append(Paragraph(str(v), styles["small"]))
rows.append(row)
if len(items) > max_rows:
rows.append([Paragraph(f"… {len(items) - max_rows} more", styles["small"])] + [""] * (len(cols) - 1))
t = Table(rows, repeatRows=1)
t.setStyle(_table_style())
return t
def _section_renderers(styles) -> dict:
"""Map section name -> renderer function (data) -> list of flowables."""
def costs(data):
return _section_costs(data, styles)
def anomalies_budgets(data):
elems: list = []
elems.append(
Paragraph(
f"<b>Cost Anomaly Detection:</b> "
f"{'configured' if data.get('anomaly_detection_configured') else 'NOT CONFIGURED — governance gap'} "
f"({data.get('anomaly_monitor_count', 0)} monitor(s)).",
styles["body"],
)
)
elems.append(Spacer(1, 4))
anom = data.get("anomalies", [])
if anom:
elems.append(Paragraph(f"<b>Recent anomalies (last 90d):</b> {len(anom)} detected.", styles["body"]))
elems.append(_list_table(
anom,
[("service", "Service"), ("impact_usd", "Impact $"), ("score", "Score"), ("start", "Start"), ("end", "End")],
styles, max_rows=10
))
elems.append(Spacer(1, 6))
else:
elems.append(Paragraph("No anomalies detected in last 90 days.", styles["body"]))
elems.append(Spacer(1, 6))
budgets = data.get("budgets", [])
if budgets:
elems.append(Paragraph(f"<b>AWS Budgets ({len(budgets)}):</b>", styles["body"]))
rows = [["Name", "Limit $", "Actual $", "Forecast $", "Exceeded?"]]
for b in budgets:
exc = []
if b.get("exceeded_actual"): exc.append("actual")
if b.get("exceeded_forecast"): exc.append("forecast")
rows.append([
Paragraph(b["name"], styles["small"]),
f"${b['limit_usd']:.2f}",
f"${b['actual_usd']:.2f}",
f"${b['forecast_usd']:.2f}",
Paragraph(", ".join(exc) if exc else "—", styles["small"]),
])
t = Table(rows, colWidths=[60 * mm, 22 * mm, 22 * mm, 22 * mm, 25 * mm])
t.setStyle(_table_style())
elems.append(t)
else:
elems.append(Paragraph("No budgets configured.", styles["body"]))
return elems
def commitments(data):
elems: list = []
elems.append(Paragraph(f"<b>Recommendation:</b> {data.get('recommendation', '—')}", styles["body"]))
elems.append(Paragraph(f"<b>OnDemand compute (30d):</b> ${data.get('ondemand_compute_30d_usd', 0):.2f}", styles["body"]))
sp = data.get("savings_plans_coverage", [])
if sp:
rows = [["Period", "Covered $", "OnDemand $", "Coverage %"]]
for c in sp:
rows.append(["—", f"${c['covered_usd']:.2f}", f"${c['ondemand_usd']:.2f}", f"{c['coverage_pct']:.1f}%"])
t = Table(rows, colWidths=[40 * mm, 30 * mm, 30 * mm, 30 * mm])
t.setStyle(_table_style())
elems.append(t)
return elems
def compute_optimizer(data):
if not data.get("enrolled"):
return [Paragraph(data.get("note", "Compute Optimizer not enrolled"), styles["body"])]
elems = [Paragraph(f"<b>Total recommendations:</b> {data.get('total_recommendations', 0)}", styles["body"])]
rows = [["Resource type", "Count", "Sample finding"]]
for rt, info in data.get("by_resource_type", {}).items():
sample = info.get("samples", [{}])[0] if info.get("samples") else {}
finding = str(sample.get("finding", ""))[:80]
rows.append([rt.upper(), str(info.get("count", 0)), Paragraph(finding or "—", styles["small"])])
t = Table(rows, colWidths=[30 * mm, 20 * mm, 120 * mm])
t.setStyle(_table_style())
elems.append(t)
return elems
def lambda_section(data):
elems = [
Paragraph(
f"<b>Functions:</b> {data['totals'].get('functions', 0)} "
f"<b>arm64:</b> {data['totals'].get('arm64', 0)} "
f"<b>x86_64:</b> {data['totals'].get('x86_64', 0)} "
f"<b>Orphan SnapStart versions:</b> {data['totals'].get('snapstart_orphans', 0)}",
styles["body"],
)
]
if data.get("snapstart_drift"):
elems.append(Spacer(1, 4))
elems.append(Paragraph("<b>Lambda SnapStart drift (recurring cache cost):</b>", styles["body"]))
elems.append(_list_table(
data["snapstart_drift"],
[("region", "Region"), ("function", "Function"), ("snapstart_versions", "Versions")],
styles, max_rows=10
))
if data.get("runtimes_at_eol"):
elems.append(Spacer(1, 4))
elems.append(Paragraph("<b>EOL runtimes (security + future cost risk):</b>", styles["body"]))
elems.append(_list_table(
data["runtimes_at_eol"],
[("region", "Region"), ("function", "Function"), ("runtime", "Runtime")],
styles, max_rows=10
))
return elems
def s3(data):
elems = [
Paragraph(
f"<b>Buckets:</b> {data.get('bucket_count', 0)} "
f"<b>Total size:</b> {data.get('total_size_gb', 0):.2f} GB "
f"<b>Without lifecycle (>1 GB):</b> {data.get('no_lifecycle_count', 0)}",
styles["body"],
),
Spacer(1, 4),
]
no_lc = data.get("no_lifecycle_over_1gb", [])
if no_lc:
elems.append(Paragraph("<b>Buckets >1 GB without any lifecycle rule:</b>", styles["body"]))
elems.append(_list_table(
no_lc, [("bucket", "Bucket"), ("region", "Region"), ("size_gb", "Size GB")],
styles, max_rows=15
))
elems.append(Spacer(1, 4))
elems.append(Paragraph("<b>Bucket inventory (top 20 by size):</b>", styles["body"]))
elems.append(_list_table(
data.get("buckets", []),
[
("name", "Bucket"), ("region", "Region"), ("size_gb", "Size GB"),
("lifecycle_rules", "LC rules"),
("lifecycle_actions", "LC actions"),
("versioning", "Versioning"),
("intelligent_tiering_configs", "Int. Tier"),
],
styles, max_rows=20,
))
return elems
def ebs(data):
t = data.get("totals", {})
elems = [
Paragraph(
f"<b>Volumes:</b> {t.get('volumes', 0)} "
f"<b>Available (unattached):</b> {t.get('available_volumes', 0)} "
f"<b>gp2 (migrate to gp3):</b> {t.get('gp2_volumes', 0)} "
f"<b>Old snapshots (>90d):</b> {t.get('old_snapshots', 0)}",
styles["body"],
),
Spacer(1, 4),
]
if data.get("available_volumes"):
elems.append(Paragraph("<b>Unattached volumes:</b>", styles["body"]))
elems.append(_list_table(
data["available_volumes"],
[("region", "Region"), ("id", "Volume ID"), ("size_gb", "Size GB"), ("type", "Type")],
styles, max_rows=15,
))
elems.append(Spacer(1, 4))
if data.get("gp2_volumes"):
elems.append(Paragraph("<b>gp2 volumes (eligible for gp3 migration):</b>", styles["body"]))
elems.append(_list_table(
data["gp2_volumes"],
[("region", "Region"), ("id", "Volume ID"), ("size_gb", "Size GB")],
styles, max_rows=10,
))
elems.append(Spacer(1, 4))
if data.get("old_snapshots"):
elems.append(Paragraph("<b>Snapshots > 90 days old:</b>", styles["body"]))
elems.append(_list_table(
data["old_snapshots"],
[("region", "Region"), ("id", "Snapshot ID"), ("age_days", "Age d"), ("size_gb", "Size GB")],
styles, max_rows=10,
))
return elems
def databases(data):
rds = data.get("rds", {})
ddb = data.get("dynamodb", {})
elems = []
elems.append(Paragraph(f"<b>RDS instances by region:</b> {sum(v.get('instance_count', 0) for v in rds.get('by_region', {}).values())}", styles["body"]))
if rds.get("idle_candidates"):
elems.append(Paragraph("<b>Idle RDS candidates (avg connections < 1 in 14d):</b>", styles["body"]))
elems.append(_list_table(
rds["idle_candidates"],
[("region", "Region"), ("id", "Instance"), ("class", "Class"), ("engine", "Engine"), ("avg_connections_14d", "Avg conn"), ("storage_gb", "Storage GB")],
styles, max_rows=10,
))
elems.append(Spacer(1, 4))
elems.append(Paragraph(f"<b>DynamoDB tables:</b> {len(ddb.get('tables', []))}", styles["body"]))
if ddb.get("tables"):
elems.append(_list_table(
ddb["tables"],
[("region", "Region"), ("name", "Table"), ("billing_mode", "Billing"), ("size_bytes", "Size B"), ("item_count", "Items")],
styles, max_rows=10,
))
return elems
def network(data):
t = data.get("totals", {})
elems = [
Paragraph(
f"<b>NAT GWs:</b> {t.get('nat_gateways', 0)} "
f"<b>ELBs:</b> {t.get('elbs', 0)} "
f"<b>EIPs:</b> {t.get('eips', 0)}",
styles["body"],
),
Spacer(1, 4),
]
if data.get("unattached_eips"):
elems.append(Paragraph("<b>Unattached Elastic IPs ($3.60/mo each):</b>", styles["body"]))
elems.append(_list_table(
data["unattached_eips"],
[("region", "Region"), ("ip", "IP"), ("allocation_id", "Allocation ID")],
styles, max_rows=10,
))
elems.append(Spacer(1, 4))
if data.get("empty_load_balancers"):
elems.append(Paragraph("<b>Load balancers with zero targets ($16-22/mo each):</b>", styles["body"]))
elems.append(_list_table(
data["empty_load_balancers"],
[("region", "Region"), ("name", "Name"), ("type", "Type")],
styles, max_rows=10,
))
elems.append(Spacer(1, 4))
if data.get("low_traffic_nat_gateways"):
elems.append(Paragraph("<b>Low-traffic NAT gateways (<1 GB outbound in 14d, $32+/mo):</b>", styles["body"]))
elems.append(_list_table(
data["low_traffic_nat_gateways"],
[("region", "Region"), ("id", "NAT GW ID"), ("outbound_gb_14d", "Out GB 14d")],
styles, max_rows=10,
))
return elems
def logs(data):
t = data.get("totals", {})
elems = [
Paragraph(
f"<b>Log groups:</b> {t.get('log_groups', 0)} "
f"<b>Total stored:</b> {t.get('stored_gb', 0):.2f} GB "
f"<b>Missing retention:</b> {t.get('missing_retention_count', 0)} "
f"<b>Orphan Lambda LGs:</b> {t.get('orphan_lambda_count', 0)} "
f"<b>Dashboards:</b> {t.get('dashboards', 0)}",
styles["body"],
),
Spacer(1, 4),
]
if data.get("largest_log_groups"):
elems.append(Paragraph("<b>Largest log groups:</b>", styles["body"]))
elems.append(_list_table(
data["largest_log_groups"],
[("region", "Region"), ("name", "Log group"), ("stored_gb", "Stored GB")],
styles, max_rows=10,
))
elems.append(Spacer(1, 4))
if data.get("orphan_lambda_log_groups"):
elems.append(Paragraph("<b>Orphan Lambda log groups (function deleted):</b>", styles["body"]))
elems.append(_list_table(
data["orphan_lambda_log_groups"],
[("region", "Region"), ("name", "Log group"), ("stored_gb", "Stored GB")],
styles, max_rows=10,
))
elems.append(Spacer(1, 4))
if data.get("missing_retention"):
elems.append(Paragraph("<b>Log groups WITHOUT retention policy (infinite growth):</b>", styles["body"]))
elems.append(_list_table(
data["missing_retention"],
[("region", "Region"), ("name", "Log group"), ("stored_gb", "Stored GB")],
styles, max_rows=15,
))
return elems
def kms(data):
t = data.get("totals", {})
elems = [
Paragraph(
f"<b>Customer-managed keys:</b> {t.get('customer_managed', 0)} "
f"<b>Candidates for review:</b> {t.get('candidates', 0)} (no grants, no aliases)",
styles["body"],
),
Spacer(1, 2),
Paragraph(
"<b>Verify before deletion:</b> a key with no grants/aliases may still be referenced by encrypted "
"data (S3 SSE-KMS objects, CloudTrail logs, EBS volumes, DynamoDB tables). Check CloudTrail "
"Decrypt events for the last 90 days before scheduling deletion.",
styles["callout"],
),
Spacer(1, 4),
]
if data.get("candidates_for_deletion"):
elems.append(_list_table(
data["candidates_for_deletion"],
[("region", "Region"), ("key_id", "Key ID"), ("created", "Created"), ("description", "Description")],
styles, max_rows=20,
))
return elems
def secrets(data):
t = data.get("totals", {})
elems = [
Paragraph(
f"<b>Secrets:</b> {t.get('secrets', 0)} "
f"<b>Stale (>90d unused):</b> {t.get('stale', 0)}",
styles["body"],
),
Spacer(1, 4),
]
if data.get("stale"):
elems.append(_list_table(
data["stale"],
[("region", "Region"), ("name", "Secret name"), ("last_accessed", "Last accessed"), ("days_since_access", "Days since")],
styles, max_rows=20,
))
return elems
def security_tooling(data):
elems = []
for region, info in data.get("by_region", {}).items():
elems.append(Paragraph(f"<b>Region: {region}</b>", styles["body"]))
sh = info.get("security_hub") or {}
gd = info.get("guardduty") or {}
insp = info.get("inspector") or {}
macie = info.get("macie") or {}
rows = [
["Service", "Status / config"],
["Security Hub", str({k: v for k, v in sh.items() if k != "standards"} | {"standards": sh.get("standards", [])})],
["GuardDuty", str(gd)],
["Inspector", str(insp)],
["Macie", str(macie)],
]
wrapped = [[Paragraph(c[0], styles["small"]), Paragraph(c[1], styles["small"])] for c in rows[1:]]
wrapped = [[Paragraph(rows[0][0], styles["small"]), Paragraph(rows[0][1], styles["small"])]] + wrapped
t = Table(wrapped, colWidths=[35 * mm, 130 * mm])
t.setStyle(_table_style())
elems.append(t)
elems.append(Spacer(1, 4))
return elems
def governance(data):
elems = [
Paragraph(
f"<b>Active cost allocation tags:</b> {data.get('active_cost_allocation_tag_count', 0)} "
f"<b>Inactive (user-defined):</b> {data.get('inactive_user_defined_tag_count', 0)} "
f"<b>Cost categories:</b> {len(data.get('cost_categories', []))}",
styles["body"],
),
Spacer(1, 4),
]
cov = data.get("tag_coverage_30d", {})
if cov:
rows = [["Tag key", "Tagged $", "Untagged $", "Coverage %"]]
for k, v in cov.items():
rows.append([k, f"${v['tagged_usd']:.2f}", f"${v['untagged_usd']:.2f}", f"{v['tagged_pct']:.1f}%"])
t = Table(rows, colWidths=[60 * mm, 30 * mm, 30 * mm, 30 * mm])
t.setStyle(_table_style())
elems.append(t)
else:
elems.append(Paragraph(
"No active cost allocation tags = 100% of spend is unclassifiable by project. "
"Activate AWS-generated tags (aws:cloudformation:stack-name, aws:createdBy) in the Billing console.",
styles["callout"],
))
return elems
return {
"costs": ("Cost trends", costs),
"anomalies_budgets": ("Anomalies & budgets", anomalies_budgets),
"commitments": ("Commitments (RI/SP)", commitments),
"compute_optimizer": ("Compute Optimizer", compute_optimizer),
"lambda": ("Lambda", lambda_section),
"s3": ("S3 storage & lifecycle", s3),
"ebs": ("EBS volumes & snapshots", ebs),
"databases": ("Databases", databases),
"network": ("Network (NAT/ELB/EIP)", network),
"logs": ("Logs & metrics", logs),
"kms": ("KMS keys", kms),
"secrets": ("Secrets Manager", secrets),
"security_tooling": ("Security tooling scope", security_tooling),
"governance": ("Governance & tagging", governance),
}
def _executive_summary_block(text: str, styles) -> list:
elems = [Paragraph("Executive summary", styles["h1"])]
for para in (text or "").split("\n\n"):
para = para.strip()
if para:
elems.append(Paragraph(para.replace("\n", "<br/>"), styles["body"]))
elems.append(Spacer(1, 4))
return elems
def _recommendations_block(recs: list[dict], styles) -> list:
elems = [Paragraph("Recommendations", styles["h1"])]
if not recs:
elems.append(Paragraph("No recommendations.", styles["body"]))
return elems
total_save = sum(_safe_float(r.get("savings_usd_per_month", 0)) for r in recs)
elems.append(Paragraph(
f"<b>{len(recs)} recommendations</b>, estimated total savings <b>${total_save:.2f}/mo</b> "
f"(<b>${total_save * 12:.2f}/yr</b>).",
styles["body"],
))
elems.append(Spacer(1, 6))
rows = [["#", "Priority", "Title", "Save $/mo"]]
for i, r in enumerate(sorted(recs, key=lambda x: -_safe_float(x.get("savings_usd_per_month", 0))), start=1):
prio = (r.get("priority") or "low").lower()
prio_para = Paragraph(prio.upper(), styles.get(prio, styles["low"]))
rows.append([
str(i),
prio_para,
Paragraph(r.get("title", ""), styles["small"]),
f"${_safe_float(r.get('savings_usd_per_month', 0)):.2f}",
])
t = Table(rows, colWidths=[10 * mm, 22 * mm, 110 * mm, 24 * mm])
t.setStyle(_table_style())
elems.append(t)
elems.append(Spacer(1, 8))
elems.append(Paragraph("Detail (in priority order)", styles["h2"]))
for i, r in enumerate(sorted(recs, key=lambda x: -_safe_float(x.get("savings_usd_per_month", 0))), start=1):
chunk = [
Paragraph(
f"<b>{i}. {r.get('title', '(untitled)')}</b> — "
f"priority <b>{(r.get('priority') or 'low').upper()}</b>, "
f"save <b>${_safe_float(r.get('savings_usd_per_month', 0)):.2f}/mo</b>",
styles["body"],
),
Paragraph(r.get("rationale") or "", styles["body"]),
]
steps = r.get("action_steps") or []
if steps:
chunk.append(Paragraph("<b>Action:</b>", styles["body"]))
for step in steps:
chunk.append(Paragraph(step.replace("\n", "<br/>"), styles["code"]))
chunk.append(Spacer(1, 6))
elems.append(KeepTogether(chunk))
return elems
def _safe_float(x) -> float:
try:
return float(x or 0)
except (TypeError, ValueError):
return 0.0
def render(
*,
output_path: Path,
account_id: str,
account_alias: str | None,
regions: list[str],
run_at: datetime,
executive_summary: str,
recommendations: list[dict],
findings: list[dict],
) -> Path:
styles = _styles()
renderers = _section_renderers(styles)
label = account_alias or account_id
doc = BaseDocTemplate(
str(output_path),
pagesize=A4,
leftMargin=14 * mm,
rightMargin=14 * mm,
topMargin=20 * mm,
bottomMargin=14 * mm,
title=f"FinOps report — {label}",
author="FinOps Agent",
)
doc._meta_left = f"FinOps report — {label} ({account_id}) — {run_at.strftime('%Y-%m-%d %H:%M UTC')}"
frame = Frame(
doc.leftMargin, doc.bottomMargin,
doc.width, doc.height,
showBoundary=0, id="normal",
)
doc.addPageTemplates([PageTemplate(id="default", frames=frame, onPage=_on_page)])
story: list = []
# Cover
story.append(Paragraph("AWS FinOps Report", styles["title"]))
story.append(Paragraph(
f"Account <b>{account_id}</b> "
f" | Alias <b>{account_alias or '(none)'}</b> "
f" | Generated {run_at.strftime('%Y-%m-%d %H:%M UTC')} "
f" | Regions scanned: {', '.join(regions)}",
styles["subtitle"],
))
story.append(Spacer(1, 10))
story.extend(_executive_summary_block(executive_summary, styles))
story.append(Spacer(1, 6))
story.extend(_recommendations_block(recommendations, styles))
story.append(PageBreak())
# Detailed findings sections
story.append(Paragraph("Detailed findings", styles["h1"]))
for section in findings:
name = section.get("section")
if name == "account":
continue # rendered on cover
spec = renderers.get(name)
if not spec:
continue
title, fn = spec
story.append(Paragraph(title, styles["h2"]))
try:
elems = fn(section.get("data", {}))
except Exception as e:
elems = [Paragraph(f"Render error in section {name}: {e}", styles["small"])]
story.extend(elems)
story.append(Spacer(1, 8))
doc.build(story)
return output_path
Fears
So, the fears.
The agent above is the kind of tool that, two years ago, would have taken me a sprint to build and another sprint to validate. With AI assistance and the maturity of the SDKs (Strands in particular), I built it in days. That same speedup applies to anyone — including engineers with less AWS experience than I have. The boring infrastructure that I spent years internalizing — pagination patterns, region scanning, CloudWatch metric queries, the difference between unblended and amortized cost, the gotchas of LocationConstraint returning EU instead of eu-west-1 — is exactly the kind of knowledge that an LLM with the right tools can substitute for.
So, am I going to be redundant? I do not have a clean answer. What I notice is that the agent above did not invent the analyses. It executes them. The decisions about WHAT to analyze — only regions with non-zero spend, KMS keys with no grants but flag the CloudTrail verification, skip RI/SP on hobby accounts, flag SnapStart drift on published Lambda versions, look for orphan log groups whose function no longer exists — those decisions came from years of looking at AWS bills and learning what the actual signal was versus what was just noise. The model can run the analyses faster than I can. It cannot yet decide which analyses are worth running.
That is where I want to become in the future. The person who decides what to analyze. The model can do the rest.

Top comments (0)