Traditional ops automation means cron jobs on a dedicated server, custom monitoring daemons, or a full Ansible/Chef setup just to restart a service at midnight.
Serverless flips this model entirely. With Lambda + EventBridge, you can build a fully automated maintenance bot that:
- Reacts to infrastructure events in real time (EC2 failure → auto-snapshot)
- Runs scheduled maintenance at 2 AM without a single server
- Remediates CloudWatch alarms automatically
- Costs essentially nothing when idle
This article walks through building exactly that — a production-grade serverless ops automation system on AWS.
Why Serverless Is a Natural Fit for Ops Automation
Ops automation tasks share a common pattern:
Something happens (event)
│
▼
Run a script (function)
│
▼
Done — wait for next event
This is precisely what Lambda + EventBridge is designed for. The function runs only when triggered, scales to handle multiple simultaneous events, and costs nothing between invocations.
Compare this to the traditional approach:
| Approach | Infrastructure | Cost | Reliability |
|---|---|---|---|
| Cron on EC2 | Dedicated server | Always running | Single point of failure |
| Ansible Tower | Full platform | Expensive | Complex to maintain |
| Lambda + EventBridge | Zero servers | Pay per execution | Managed, highly available |
Architecture Overview
The maintenance bot consists of three independent automation modules, each triggered by a different event source:
┌─────────────────────────────────────────────────────┐
│ Serverless Maintenance Bot │
├─────────────────────────────────────────────────────┤
│ │
│ EC2 State Change ──► auto-snapshot-lambda │
│ (EventBridge) (backup EBS on failure) │
│ │
│ EventBridge Schedule ──► nightly-maintenance-lambda│
│ (cron: 2 AM daily) (cleanup, reset, report) │
│ │
│ CloudWatch Alarm ──► auto-remediation-lambda │
│ (CPU > 80%) (scale out / restart) │
│ │
└─────────────────────────────────────────────────────┘
Module 1: Auto-Snapshot on EC2 Failure
When an EC2 instance fails or reboots unexpectedly, you want an automatic EBS snapshot created immediately — before any manual intervention.
Event source: EventBridge automatically captures EC2 state change notifications (instance stopping, stopping, rebooting) as native events. No custom monitoring agent needed.
The Event Structure
When EC2 emits a state change event, EventBridge delivers this payload to your Lambda:
{
"version": "0",
"id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
"source": "aws.ec2",
"detail-type": "EC2 Instance State-change Notification",
"region": "us-east-1",
"detail": {
"instance-id": "i-0123456789abcdef0",
"state": "stopped"
}
}
The Lambda Handler
# auto_snapshot.py
import boto3
import json
import logging
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ec2 = boto3.client('ec2')
sns = boto3.client('sns')
ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
SNAPSHOT_TAG_KEY = 'AutoSnapshot'
SNAPSHOT_TAG_VALUE = 'true'
def handler(event, context):
"""
Triggered by EventBridge on EC2 state change.
Creates EBS snapshots for all volumes attached to the affected instance.
"""
detail = event['detail']
instance_id = detail['instance-id']
state = detail['state']
region = event['region']
logger.info(f'EC2 state change: {instance_id} → {state}')
# Only act on unexpected stops/reboots (not intentional shutdowns)
trigger_states = {'stopped', 'stopping'}
if state not in trigger_states:
logger.info(f'State {state} does not require snapshot. Skipping.')
return {'action': 'skipped', 'reason': f'state={state}'}
# Check if this instance is tagged for auto-snapshot
instance = ec2.describe_instances(InstanceIds=[instance_id])
tags = instance['Reservations'][0]['Instances'][0].get('Tags', [])
tag_map = {t['Key']: t['Value'] for t in tags}
if tag_map.get(SNAPSHOT_TAG_KEY) != SNAPSHOT_TAG_VALUE:
logger.info(f'Instance {instance_id} not tagged for auto-snapshot. Skipping.')
return {'action': 'skipped', 'reason': 'not tagged'}
instance_name = tag_map.get('Name', instance_id)
# Get all EBS volumes attached to this instance
volumes_response = ec2.describe_volumes(
Filters=[{
'Name': 'attachment.instance-id',
'Values': [instance_id]
}]
)
volumes = volumes_response['Volumes']
snapshot_ids = []
timestamp = datetime.utcnow().strftime('%Y%m%d-%H%M%S')
for volume in volumes:
volume_id = volume['VolumeId']
device = volume['Attachments'][0]['Device']
logger.info(f'Creating snapshot for volume {volume_id} ({device})')
snapshot = ec2.create_snapshot(
VolumeId=volume_id,
Description=f'Auto-snapshot: {instance_name} ({instance_id}) state={state}',
TagSpecifications=[{
'ResourceType': 'snapshot',
'Tags': [
{'Key': 'Name', 'Value': f'auto-{instance_name}-{device}-{timestamp}'},
{'Key': 'InstanceId', 'Value': instance_id},
{'Key': 'TriggerState', 'Value': state},
{'Key': 'AutoCreated', 'Value': 'true'},
{'Key': 'CreatedAt', 'Value': timestamp}
]
}]
)
snapshot_ids.append(snapshot['SnapshotId'])
logger.info(f'Snapshot created: {snapshot["SnapshotId"]} for volume {volume_id}')
# Send alert notification
sns.publish(
TopicArn=ALERT_TOPIC_ARN,
Subject=f'[Auto-Snapshot] {instance_name} ({state})',
Message=json.dumps({
'instance_id': instance_id,
'instance_name': instance_name,
'state': state,
'snapshots_created': snapshot_ids,
'timestamp': timestamp
}, indent=2)
)
logger.info(f'Auto-snapshot complete: {len(snapshot_ids)} snapshots created')
return {
'instance_id': instance_id,
'state': state,
'snapshots_created': snapshot_ids
}
EventBridge Rule Configuration
# serverless.yml
functions:
autoSnapshot:
handler: auto_snapshot.handler
environment:
ALERT_TOPIC_ARN: !Ref OpsAlertTopic
events:
- eventBridge:
pattern:
source:
- aws.ec2
detail-type:
- EC2 Instance State-change Notification
detail:
state:
- stopped
- stopping
iamRoleStatements:
- Effect: Allow
Action:
- ec2:DescribeInstances
- ec2:DescribeVolumes
- ec2:CreateSnapshot
- ec2:CreateTags
- sns:Publish
Resource: '*'
Production tip: Tag your instances with
AutoSnapshot: trueto opt in. This prevents the function from creating snapshots for every EC2 stop event in your account (including intentional deployments).
Module 2: Nightly Maintenance Bot
Scheduled maintenance tasks — cleanup, reporting, data archival — are a perfect fit for Lambda + EventBridge cron rules. No dedicated server, no cron daemon, no SSH access needed.
# nightly_maintenance.py
import boto3
import json
import logging
import os
from datetime import datetime, timedelta
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ec2 = boto3.client('ec2')
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
sns = boto3.client('sns')
RETENTION_DAYS = int(os.environ.get('SNAPSHOT_RETENTION_DAYS', '30'))
METRICS_TABLE = os.environ['METRICS_TABLE']
REPORT_BUCKET = os.environ['REPORT_BUCKET']
ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
def handler(event, context):
"""
Runs nightly at 2 AM UTC via EventBridge scheduled rule.
Performs: snapshot cleanup, metrics archival, daily report generation.
"""
logger.info('Nightly maintenance bot started')
results = {}
# Task 1: Clean up old auto-created snapshots
results['snapshot_cleanup'] = cleanup_old_snapshots()
# Task 2: Archive daily metrics to S3
results['metrics_archival'] = archive_daily_metrics()
# Task 3: Generate and send daily ops report
results['daily_report'] = send_daily_report(results)
logger.info(f'Nightly maintenance complete: {json.dumps(results)}')
return results
def cleanup_old_snapshots() -> dict:
"""Delete auto-created snapshots older than RETENTION_DAYS"""
cutoff_date = datetime.utcnow() - timedelta(days=RETENTION_DAYS)
# Find all auto-created snapshots
response = ec2.describe_snapshots(
OwnerIds=['self'],
Filters=[{'Name': 'tag:AutoCreated', 'Values': ['true']}]
)
deleted = []
kept = []
for snapshot in response['Snapshots']:
start_time = snapshot['StartTime'].replace(tzinfo=None)
if start_time < cutoff_date:
ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
deleted.append(snapshot['SnapshotId'])
logger.info(f'Deleted old snapshot: {snapshot["SnapshotId"]} '
f'(created: {start_time.date()})')
else:
kept.append(snapshot['SnapshotId'])
logger.info(f'Snapshot cleanup: deleted={len(deleted)}, kept={len(kept)}')
return {'deleted': len(deleted), 'kept': len(kept)}
def archive_daily_metrics() -> dict:
"""Archive yesterday's metrics from DynamoDB to S3"""
table = dynamodb.Table(METRICS_TABLE)
yesterday = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')
# Scan yesterday's records (use Query with GSI in production)
response = table.scan(
FilterExpression='begins_with(#ts, :date)',
ExpressionAttributeNames={'#ts': 'timestamp'},
ExpressionAttributeValues={':date': yesterday}
)
records = response['Items']
if records:
# Write to S3 as JSON Lines
s3_key = f'metrics-archive/{yesterday}/metrics.jsonl'
body = '\n'.join(json.dumps(r) for r in records)
s3.put_object(
Bucket=REPORT_BUCKET,
Key=s3_key,
Body=body.encode('utf-8'),
ContentType='application/x-ndjson'
)
logger.info(f'Archived {len(records)} records to s3://{REPORT_BUCKET}/{s3_key}')
return {'records_archived': len(records), 'date': yesterday}
def send_daily_report(results: dict) -> dict:
"""Send daily ops summary via SNS"""
today = datetime.utcnow().strftime('%Y-%m-%d')
report = {
'date': today,
'maintenance_results': results,
'generated_at': datetime.utcnow().isoformat()
}
sns.publish(
TopicArn=ALERT_TOPIC_ARN,
Subject=f'[Daily Ops Report] {today}',
Message=json.dumps(report, indent=2, default=str)
)
return {'report_sent': True, 'date': today}
# serverless.yml
functions:
nightlyMaintenance:
handler: nightly_maintenance.handler
timeout: 300 # 5 minutes — cleanup may take time
environment:
SNAPSHOT_RETENTION_DAYS: '30'
METRICS_TABLE: !Ref MetricsTable
REPORT_BUCKET: !Ref ReportBucket
ALERT_TOPIC_ARN: !Ref OpsAlertTopic
events:
- schedule:
rate: cron(0 2 * * ? *) # 2:00 AM UTC every day
enabled: true
destinations:
onFailure: !Ref OpsAlertTopic # alert if the maintenance bot itself fails
Module 3: CloudWatch Alarm Auto-Remediation
Instead of waking up an engineer at 3 AM for a high-CPU alert, trigger a Lambda function to remediate automatically.
# auto_remediation.py
import boto3
import json
import logging
import os
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ec2 = boto3.client('ec2')
autoscaling = boto3.client('autoscaling')
sns = boto3.client('sns')
ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
ASG_NAME = os.environ.get('AUTO_SCALING_GROUP_NAME')
def handler(event, context):
"""
Triggered by CloudWatch Alarm via SNS → Lambda.
Parses alarm state and takes automated remediation action.
"""
# CloudWatch alarms arrive via SNS — parse the message
for record in event['Records']:
message = json.loads(record['Sns']['Message'])
alarm_name = message['AlarmName']
alarm_state = message['NewStateValue']
old_state = message['OldStateValue']
reason = message['NewStateReason']
logger.info(f'Alarm: {alarm_name} | {old_state} → {alarm_state}')
logger.info(f'Reason: {reason}')
if alarm_state != 'ALARM':
logger.info('Alarm resolved or insufficient data. No action needed.')
continue
# Route to appropriate remediation based on alarm name
if 'high-cpu' in alarm_name.lower():
result = remediate_high_cpu(alarm_name)
elif 'disk-space' in alarm_name.lower():
result = remediate_disk_space(alarm_name)
else:
result = {'action': 'no_handler', 'alarm': alarm_name}
logger.warning(f'No remediation handler for alarm: {alarm_name}')
# Notify ops team of automated action taken
sns.publish(
TopicArn=ALERT_TOPIC_ARN,
Subject=f'[Auto-Remediation] {alarm_name}',
Message=json.dumps({
'alarm': alarm_name,
'state': alarm_state,
'reason': reason,
'automated_action': result
}, indent=2)
)
return {'processed': len(event['Records'])}
def remediate_high_cpu(alarm_name: str) -> dict:
"""Scale out the Auto Scaling Group to handle high CPU load"""
if not ASG_NAME:
return {'action': 'skipped', 'reason': 'ASG_NAME not configured'}
# Get current desired capacity
asg = autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[ASG_NAME]
)['AutoScalingGroups'][0]
current_desired = asg['DesiredCapacity']
max_size = asg['MaxSize']
new_desired = min(current_desired + 2, max_size) # add 2 instances, respect max
if new_desired == current_desired:
logger.warning(f'ASG {ASG_NAME} already at max capacity ({max_size})')
return {'action': 'at_max_capacity', 'current': current_desired}
autoscaling.set_desired_capacity(
AutoScalingGroupName=ASG_NAME,
DesiredCapacity=new_desired,
HonorCooldown=False # bypass cooldown for alarm-triggered scaling
)
logger.info(f'Scaled out {ASG_NAME}: {current_desired} → {new_desired}')
return {
'action': 'scale_out',
'asg': ASG_NAME,
'previous_desired': current_desired,
'new_desired': new_desired
}
def remediate_disk_space(alarm_name: str) -> dict:
"""Log disk space alert — automated cleanup is risky, notify instead"""
logger.warning(f'Disk space alarm: {alarm_name}. Manual review recommended.')
return {
'action': 'notification_sent',
'reason': 'disk cleanup requires manual review'
}
# serverless.yml
functions:
autoRemediation:
handler: auto_remediation.handler
environment:
ALERT_TOPIC_ARN: !Ref OpsAlertTopic
AUTO_SCALING_GROUP_NAME: !Ref AppAutoScalingGroup
events:
- sns:
arn: !Ref CloudWatchAlarmTopic
topicName: cloudwatch-alarms
resources:
Resources:
# CloudWatch alarm that triggers remediation
HighCpuAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: app-server-high-cpu
MetricName: CPUUtilization
Namespace: AWS/EC2
Statistic: Average
Period: 300 # 5-minute average
EvaluationPeriods: 2 # must be high for 10 minutes
Threshold: 80
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref CloudWatchAlarmTopic # → SNS → Lambda
Dimensions:
- Name: AutoScalingGroupName
Value: !Ref AppAutoScalingGroup
Putting It All Together: The Complete IaC Stack
# serverless.yml — complete maintenance bot stack
service: serverless-maintenance-bot
provider:
name: aws
runtime: python3.12
region: us-east-1
iam:
role:
statements:
- Effect: Allow
Action:
- ec2:Describe*
- ec2:CreateSnapshot
- ec2:DeleteSnapshot
- ec2:CreateTags
- autoscaling:Describe*
- autoscaling:SetDesiredCapacity
- dynamodb:Scan
- dynamodb:Query
- s3:PutObject
- sns:Publish
Resource: '*'
functions:
autoSnapshot:
handler: auto_snapshot.handler
environment:
ALERT_TOPIC_ARN: !Ref OpsAlertTopic
events:
- eventBridge:
pattern:
source: [aws.ec2]
detail-type: [EC2 Instance State-change Notification]
detail:
state: [stopped, stopping]
nightlyMaintenance:
handler: nightly_maintenance.handler
timeout: 300
environment:
SNAPSHOT_RETENTION_DAYS: '30'
METRICS_TABLE: !Ref MetricsTable
REPORT_BUCKET: !Ref ReportBucket
ALERT_TOPIC_ARN: !Ref OpsAlertTopic
events:
- schedule:
rate: cron(0 2 * * ? *)
destinations:
onFailure: !Ref OpsAlertTopic
autoRemediation:
handler: auto_remediation.handler
environment:
ALERT_TOPIC_ARN: !Ref OpsAlertTopic
AUTO_SCALING_GROUP_NAME: !Ref AppAutoScalingGroup
events:
- sns:
arn: !Ref CloudWatchAlarmTopic
topicName: cloudwatch-alarms
resources:
Resources:
OpsAlertTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: ops-alerts
Subscription:
- Protocol: email
Endpoint: ${env:OPS_EMAIL}
CloudWatchAlarmTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: cloudwatch-alarms
MetricsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: app-metrics
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ReportBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:service}-reports-${aws:accountId}
LifecycleConfiguration:
Rules:
- Id: archive-old-reports
Status: Enabled
Transitions:
- TransitionInDays: 90
StorageClass: GLACIER
Operational Best Practices
1. Always Set a Failure Destination
Your maintenance bot failing silently is worse than it not existing. Every async Lambda function should have a failure destination.
destinations:
onFailure: !Ref OpsAlertTopic # you WILL know when the bot breaks
2. Make Every Handler Idempotent
EventBridge and SNS guarantee at-least-once delivery. Your function may be called twice for the same event. Design for it.
# Idempotent snapshot creation — check before creating
def create_snapshot_if_not_exists(volume_id: str, instance_id: str) -> str:
existing = ec2.describe_snapshots(
Filters=[
{'Name': 'volume-id', 'Values': [volume_id]},
{'Name': 'tag:InstanceId', 'Values': [instance_id]},
{'Name': 'tag:AutoCreated', 'Values': ['true']},
# Only check snapshots from the last hour
],
OwnerIds=['self']
)
if existing['Snapshots']:
snap_id = existing['Snapshots'][0]['SnapshotId']
logger.info(f'Snapshot already exists: {snap_id}. Skipping duplicate.')
return snap_id
# Create new snapshot
snapshot = ec2.create_snapshot(VolumeId=volume_id, ...)
return snapshot['SnapshotId']
3. Use Dead Letter Queues for Critical Automation
For automation that must not be silently skipped, add an SQS DLQ:
functions:
autoSnapshot:
handler: auto_snapshot.handler
deadLetter:
targetArn: !GetAtt AutoSnapshotDLQ.Arn
4. Tag Everything Your Bot Creates
Every resource created by automation should be tagged — makes auditing, cost allocation, and cleanup trivial.
Tags=[
{'Key': 'AutoCreated', 'Value': 'true'},
{'Key': 'CreatedBy', 'Value': 'serverless-maintenance-bot'},
{'Key': 'LambdaFunction', 'Value': context.function_name},
{'Key': 'CreatedAt', 'Value': datetime.utcnow().isoformat()}
]
Summary
| Module | Trigger | Action |
|---|---|---|
| Auto-Snapshot | EventBridge (EC2 state change) | Create EBS snapshots on unexpected stop |
| Nightly Maintenance | EventBridge (cron 2 AM) | Cleanup, archive, daily report |
| Auto-Remediation | CloudWatch Alarm → SNS | Scale out ASG on high CPU |
The serverless maintenance bot replaces what used to require a dedicated ops server, a monitoring daemon, and an on-call engineer for routine events. The entire stack deploys in minutes, costs cents per month in execution time, and handles failures more reliably than any cron job.
The best ops automation is the kind that runs itself.
Next in this series: **Part 6 — Serverless Workflows: Orchestrating Multi-Step Pipelines with AWS Step Functions**
Top comments (0)