DEV Community

James Lee
James Lee

Posted on

Event-Driven Automation: Building a Serverless Maintenance Bot with Lambda & EventBridge

Traditional ops automation means cron jobs on a dedicated server, custom monitoring daemons, or a full Ansible/Chef setup just to restart a service at midnight.

Serverless flips this model entirely. With Lambda + EventBridge, you can build a fully automated maintenance bot that:

  • Reacts to infrastructure events in real time (EC2 failure → auto-snapshot)
  • Runs scheduled maintenance at 2 AM without a single server
  • Remediates CloudWatch alarms automatically
  • Costs essentially nothing when idle

This article walks through building exactly that — a production-grade serverless ops automation system on AWS.


Why Serverless Is a Natural Fit for Ops Automation

Ops automation tasks share a common pattern:

Something happens (event)
       │
       ▼
Run a script (function)
       │
       ▼
Done — wait for next event
Enter fullscreen mode Exit fullscreen mode

This is precisely what Lambda + EventBridge is designed for. The function runs only when triggered, scales to handle multiple simultaneous events, and costs nothing between invocations.

Compare this to the traditional approach:

Approach Infrastructure Cost Reliability
Cron on EC2 Dedicated server Always running Single point of failure
Ansible Tower Full platform Expensive Complex to maintain
Lambda + EventBridge Zero servers Pay per execution Managed, highly available

Architecture Overview

The maintenance bot consists of three independent automation modules, each triggered by a different event source:

┌─────────────────────────────────────────────────────┐
│              Serverless Maintenance Bot              │
├─────────────────────────────────────────────────────┤
│                                                     │
│  EC2 State Change ──► auto-snapshot-lambda          │
│  (EventBridge)         (backup EBS on failure)      │
│                                                     │
│  EventBridge Schedule ──► nightly-maintenance-lambda│
│  (cron: 2 AM daily)       (cleanup, reset, report)  │
│                                                     │
│  CloudWatch Alarm ──► auto-remediation-lambda       │
│  (CPU > 80%)             (scale out / restart)      │
│                                                     │
└─────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Module 1: Auto-Snapshot on EC2 Failure

When an EC2 instance fails or reboots unexpectedly, you want an automatic EBS snapshot created immediately — before any manual intervention.

Event source: EventBridge automatically captures EC2 state change notifications (instance stopping, stopping, rebooting) as native events. No custom monitoring agent needed.

The Event Structure

When EC2 emits a state change event, EventBridge delivers this payload to your Lambda:

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "source": "aws.ec2",
  "detail-type": "EC2 Instance State-change Notification",
  "region": "us-east-1",
  "detail": {
    "instance-id": "i-0123456789abcdef0",
    "state": "stopped"
  }
}
Enter fullscreen mode Exit fullscreen mode

The Lambda Handler

# auto_snapshot.py
import boto3
import json
import logging
from datetime import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ec2 = boto3.client('ec2')
sns = boto3.client('sns')

ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
SNAPSHOT_TAG_KEY = 'AutoSnapshot'
SNAPSHOT_TAG_VALUE = 'true'


def handler(event, context):
    """
    Triggered by EventBridge on EC2 state change.
    Creates EBS snapshots for all volumes attached to the affected instance.
    """
    detail = event['detail']
    instance_id = detail['instance-id']
    state = detail['state']
    region = event['region']

    logger.info(f'EC2 state change: {instance_id}{state}')

    # Only act on unexpected stops/reboots (not intentional shutdowns)
    trigger_states = {'stopped', 'stopping'}
    if state not in trigger_states:
        logger.info(f'State {state} does not require snapshot. Skipping.')
        return {'action': 'skipped', 'reason': f'state={state}'}

    # Check if this instance is tagged for auto-snapshot
    instance = ec2.describe_instances(InstanceIds=[instance_id])
    tags = instance['Reservations'][0]['Instances'][0].get('Tags', [])
    tag_map = {t['Key']: t['Value'] for t in tags}

    if tag_map.get(SNAPSHOT_TAG_KEY) != SNAPSHOT_TAG_VALUE:
        logger.info(f'Instance {instance_id} not tagged for auto-snapshot. Skipping.')
        return {'action': 'skipped', 'reason': 'not tagged'}

    instance_name = tag_map.get('Name', instance_id)

    # Get all EBS volumes attached to this instance
    volumes_response = ec2.describe_volumes(
        Filters=[{
            'Name': 'attachment.instance-id',
            'Values': [instance_id]
        }]
    )

    volumes = volumes_response['Volumes']
    snapshot_ids = []
    timestamp = datetime.utcnow().strftime('%Y%m%d-%H%M%S')

    for volume in volumes:
        volume_id = volume['VolumeId']
        device = volume['Attachments'][0]['Device']

        logger.info(f'Creating snapshot for volume {volume_id} ({device})')

        snapshot = ec2.create_snapshot(
            VolumeId=volume_id,
            Description=f'Auto-snapshot: {instance_name} ({instance_id}) state={state}',
            TagSpecifications=[{
                'ResourceType': 'snapshot',
                'Tags': [
                    {'Key': 'Name', 'Value': f'auto-{instance_name}-{device}-{timestamp}'},
                    {'Key': 'InstanceId', 'Value': instance_id},
                    {'Key': 'TriggerState', 'Value': state},
                    {'Key': 'AutoCreated', 'Value': 'true'},
                    {'Key': 'CreatedAt', 'Value': timestamp}
                ]
            }]
        )

        snapshot_ids.append(snapshot['SnapshotId'])
        logger.info(f'Snapshot created: {snapshot["SnapshotId"]} for volume {volume_id}')

    # Send alert notification
    sns.publish(
        TopicArn=ALERT_TOPIC_ARN,
        Subject=f'[Auto-Snapshot] {instance_name} ({state})',
        Message=json.dumps({
            'instance_id': instance_id,
            'instance_name': instance_name,
            'state': state,
            'snapshots_created': snapshot_ids,
            'timestamp': timestamp
        }, indent=2)
    )

    logger.info(f'Auto-snapshot complete: {len(snapshot_ids)} snapshots created')
    return {
        'instance_id': instance_id,
        'state': state,
        'snapshots_created': snapshot_ids
    }
Enter fullscreen mode Exit fullscreen mode

EventBridge Rule Configuration

# serverless.yml
functions:
  autoSnapshot:
    handler: auto_snapshot.handler
    environment:
      ALERT_TOPIC_ARN: !Ref OpsAlertTopic
    events:
      - eventBridge:
          pattern:
            source:
              - aws.ec2
            detail-type:
              - EC2 Instance State-change Notification
            detail:
              state:
                - stopped
                - stopping
    iamRoleStatements:
      - Effect: Allow
        Action:
          - ec2:DescribeInstances
          - ec2:DescribeVolumes
          - ec2:CreateSnapshot
          - ec2:CreateTags
          - sns:Publish
        Resource: '*'
Enter fullscreen mode Exit fullscreen mode

Production tip: Tag your instances with AutoSnapshot: true to opt in. This prevents the function from creating snapshots for every EC2 stop event in your account (including intentional deployments).


Module 2: Nightly Maintenance Bot

Scheduled maintenance tasks — cleanup, reporting, data archival — are a perfect fit for Lambda + EventBridge cron rules. No dedicated server, no cron daemon, no SSH access needed.

# nightly_maintenance.py
import boto3
import json
import logging
import os
from datetime import datetime, timedelta

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ec2 = boto3.client('ec2')
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
sns = boto3.client('sns')

RETENTION_DAYS = int(os.environ.get('SNAPSHOT_RETENTION_DAYS', '30'))
METRICS_TABLE = os.environ['METRICS_TABLE']
REPORT_BUCKET = os.environ['REPORT_BUCKET']
ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']


def handler(event, context):
    """
    Runs nightly at 2 AM UTC via EventBridge scheduled rule.
    Performs: snapshot cleanup, metrics archival, daily report generation.
    """
    logger.info('Nightly maintenance bot started')
    results = {}

    # Task 1: Clean up old auto-created snapshots
    results['snapshot_cleanup'] = cleanup_old_snapshots()

    # Task 2: Archive daily metrics to S3
    results['metrics_archival'] = archive_daily_metrics()

    # Task 3: Generate and send daily ops report
    results['daily_report'] = send_daily_report(results)

    logger.info(f'Nightly maintenance complete: {json.dumps(results)}')
    return results


def cleanup_old_snapshots() -> dict:
    """Delete auto-created snapshots older than RETENTION_DAYS"""
    cutoff_date = datetime.utcnow() - timedelta(days=RETENTION_DAYS)

    # Find all auto-created snapshots
    response = ec2.describe_snapshots(
        OwnerIds=['self'],
        Filters=[{'Name': 'tag:AutoCreated', 'Values': ['true']}]
    )

    deleted = []
    kept = []

    for snapshot in response['Snapshots']:
        start_time = snapshot['StartTime'].replace(tzinfo=None)

        if start_time < cutoff_date:
            ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
            deleted.append(snapshot['SnapshotId'])
            logger.info(f'Deleted old snapshot: {snapshot["SnapshotId"]} '
                       f'(created: {start_time.date()})')
        else:
            kept.append(snapshot['SnapshotId'])

    logger.info(f'Snapshot cleanup: deleted={len(deleted)}, kept={len(kept)}')
    return {'deleted': len(deleted), 'kept': len(kept)}


def archive_daily_metrics() -> dict:
    """Archive yesterday's metrics from DynamoDB to S3"""
    table = dynamodb.Table(METRICS_TABLE)
    yesterday = (datetime.utcnow() - timedelta(days=1)).strftime('%Y-%m-%d')

    # Scan yesterday's records (use Query with GSI in production)
    response = table.scan(
        FilterExpression='begins_with(#ts, :date)',
        ExpressionAttributeNames={'#ts': 'timestamp'},
        ExpressionAttributeValues={':date': yesterday}
    )

    records = response['Items']

    if records:
        # Write to S3 as JSON Lines
        s3_key = f'metrics-archive/{yesterday}/metrics.jsonl'
        body = '\n'.join(json.dumps(r) for r in records)

        s3.put_object(
            Bucket=REPORT_BUCKET,
            Key=s3_key,
            Body=body.encode('utf-8'),
            ContentType='application/x-ndjson'
        )

        logger.info(f'Archived {len(records)} records to s3://{REPORT_BUCKET}/{s3_key}')

    return {'records_archived': len(records), 'date': yesterday}


def send_daily_report(results: dict) -> dict:
    """Send daily ops summary via SNS"""
    today = datetime.utcnow().strftime('%Y-%m-%d')

    report = {
        'date': today,
        'maintenance_results': results,
        'generated_at': datetime.utcnow().isoformat()
    }

    sns.publish(
        TopicArn=ALERT_TOPIC_ARN,
        Subject=f'[Daily Ops Report] {today}',
        Message=json.dumps(report, indent=2, default=str)
    )

    return {'report_sent': True, 'date': today}
Enter fullscreen mode Exit fullscreen mode
# serverless.yml
functions:
  nightlyMaintenance:
    handler: nightly_maintenance.handler
    timeout: 300          # 5 minutes — cleanup may take time
    environment:
      SNAPSHOT_RETENTION_DAYS: '30'
      METRICS_TABLE: !Ref MetricsTable
      REPORT_BUCKET: !Ref ReportBucket
      ALERT_TOPIC_ARN: !Ref OpsAlertTopic
    events:
      - schedule:
          rate: cron(0 2 * * ? *)    # 2:00 AM UTC every day
          enabled: true
    destinations:
      onFailure: !Ref OpsAlertTopic  # alert if the maintenance bot itself fails
Enter fullscreen mode Exit fullscreen mode

Module 3: CloudWatch Alarm Auto-Remediation

Instead of waking up an engineer at 3 AM for a high-CPU alert, trigger a Lambda function to remediate automatically.

# auto_remediation.py
import boto3
import json
import logging
import os

logger = logging.getLogger()
logger.setLevel(logging.INFO)

ec2 = boto3.client('ec2')
autoscaling = boto3.client('autoscaling')
sns = boto3.client('sns')

ALERT_TOPIC_ARN = os.environ['ALERT_TOPIC_ARN']
ASG_NAME = os.environ.get('AUTO_SCALING_GROUP_NAME')


def handler(event, context):
    """
    Triggered by CloudWatch Alarm via SNS → Lambda.
    Parses alarm state and takes automated remediation action.
    """
    # CloudWatch alarms arrive via SNS — parse the message
    for record in event['Records']:
        message = json.loads(record['Sns']['Message'])

        alarm_name = message['AlarmName']
        alarm_state = message['NewStateValue']
        old_state = message['OldStateValue']
        reason = message['NewStateReason']

        logger.info(f'Alarm: {alarm_name} | {old_state}{alarm_state}')
        logger.info(f'Reason: {reason}')

        if alarm_state != 'ALARM':
            logger.info('Alarm resolved or insufficient data. No action needed.')
            continue

        # Route to appropriate remediation based on alarm name
        if 'high-cpu' in alarm_name.lower():
            result = remediate_high_cpu(alarm_name)
        elif 'disk-space' in alarm_name.lower():
            result = remediate_disk_space(alarm_name)
        else:
            result = {'action': 'no_handler', 'alarm': alarm_name}
            logger.warning(f'No remediation handler for alarm: {alarm_name}')

        # Notify ops team of automated action taken
        sns.publish(
            TopicArn=ALERT_TOPIC_ARN,
            Subject=f'[Auto-Remediation] {alarm_name}',
            Message=json.dumps({
                'alarm': alarm_name,
                'state': alarm_state,
                'reason': reason,
                'automated_action': result
            }, indent=2)
        )

    return {'processed': len(event['Records'])}


def remediate_high_cpu(alarm_name: str) -> dict:
    """Scale out the Auto Scaling Group to handle high CPU load"""
    if not ASG_NAME:
        return {'action': 'skipped', 'reason': 'ASG_NAME not configured'}

    # Get current desired capacity
    asg = autoscaling.describe_auto_scaling_groups(
        AutoScalingGroupNames=[ASG_NAME]
    )['AutoScalingGroups'][0]

    current_desired = asg['DesiredCapacity']
    max_size = asg['MaxSize']
    new_desired = min(current_desired + 2, max_size)  # add 2 instances, respect max

    if new_desired == current_desired:
        logger.warning(f'ASG {ASG_NAME} already at max capacity ({max_size})')
        return {'action': 'at_max_capacity', 'current': current_desired}

    autoscaling.set_desired_capacity(
        AutoScalingGroupName=ASG_NAME,
        DesiredCapacity=new_desired,
        HonorCooldown=False  # bypass cooldown for alarm-triggered scaling
    )

    logger.info(f'Scaled out {ASG_NAME}: {current_desired}{new_desired}')
    return {
        'action': 'scale_out',
        'asg': ASG_NAME,
        'previous_desired': current_desired,
        'new_desired': new_desired
    }


def remediate_disk_space(alarm_name: str) -> dict:
    """Log disk space alert — automated cleanup is risky, notify instead"""
    logger.warning(f'Disk space alarm: {alarm_name}. Manual review recommended.')
    return {
        'action': 'notification_sent',
        'reason': 'disk cleanup requires manual review'
    }
Enter fullscreen mode Exit fullscreen mode
# serverless.yml
functions:
  autoRemediation:
    handler: auto_remediation.handler
    environment:
      ALERT_TOPIC_ARN: !Ref OpsAlertTopic
      AUTO_SCALING_GROUP_NAME: !Ref AppAutoScalingGroup
    events:
      - sns:
          arn: !Ref CloudWatchAlarmTopic
          topicName: cloudwatch-alarms

resources:
  Resources:
    # CloudWatch alarm that triggers remediation
    HighCpuAlarm:
      Type: AWS::CloudWatch::Alarm
      Properties:
        AlarmName: app-server-high-cpu
        MetricName: CPUUtilization
        Namespace: AWS/EC2
        Statistic: Average
        Period: 300           # 5-minute average
        EvaluationPeriods: 2  # must be high for 10 minutes
        Threshold: 80
        ComparisonOperator: GreaterThanThreshold
        AlarmActions:
          - !Ref CloudWatchAlarmTopic   # → SNS → Lambda
        Dimensions:
          - Name: AutoScalingGroupName
            Value: !Ref AppAutoScalingGroup
Enter fullscreen mode Exit fullscreen mode

Putting It All Together: The Complete IaC Stack

# serverless.yml — complete maintenance bot stack
service: serverless-maintenance-bot

provider:
  name: aws
  runtime: python3.12
  region: us-east-1
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            - ec2:Describe*
            - ec2:CreateSnapshot
            - ec2:DeleteSnapshot
            - ec2:CreateTags
            - autoscaling:Describe*
            - autoscaling:SetDesiredCapacity
            - dynamodb:Scan
            - dynamodb:Query
            - s3:PutObject
            - sns:Publish
          Resource: '*'

functions:
  autoSnapshot:
    handler: auto_snapshot.handler
    environment:
      ALERT_TOPIC_ARN: !Ref OpsAlertTopic
    events:
      - eventBridge:
          pattern:
            source: [aws.ec2]
            detail-type: [EC2 Instance State-change Notification]
            detail:
              state: [stopped, stopping]

  nightlyMaintenance:
    handler: nightly_maintenance.handler
    timeout: 300
    environment:
      SNAPSHOT_RETENTION_DAYS: '30'
      METRICS_TABLE: !Ref MetricsTable
      REPORT_BUCKET: !Ref ReportBucket
      ALERT_TOPIC_ARN: !Ref OpsAlertTopic
    events:
      - schedule:
          rate: cron(0 2 * * ? *)
    destinations:
      onFailure: !Ref OpsAlertTopic

  autoRemediation:
    handler: auto_remediation.handler
    environment:
      ALERT_TOPIC_ARN: !Ref OpsAlertTopic
      AUTO_SCALING_GROUP_NAME: !Ref AppAutoScalingGroup
    events:
      - sns:
          arn: !Ref CloudWatchAlarmTopic
          topicName: cloudwatch-alarms

resources:
  Resources:
    OpsAlertTopic:
      Type: AWS::SNS::Topic
      Properties:
        TopicName: ops-alerts
        Subscription:
          - Protocol: email
            Endpoint: ${env:OPS_EMAIL}

    CloudWatchAlarmTopic:
      Type: AWS::SNS::Topic
      Properties:
        TopicName: cloudwatch-alarms

    MetricsTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: app-metrics
        BillingMode: PAY_PER_REQUEST
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH

    ReportBucket:
      Type: AWS::S3::Bucket
      Properties:
        BucketName: ${self:service}-reports-${aws:accountId}
        LifecycleConfiguration:
          Rules:
            - Id: archive-old-reports
              Status: Enabled
              Transitions:
                - TransitionInDays: 90
                  StorageClass: GLACIER
Enter fullscreen mode Exit fullscreen mode

Operational Best Practices

1. Always Set a Failure Destination

Your maintenance bot failing silently is worse than it not existing. Every async Lambda function should have a failure destination.

destinations:
  onFailure: !Ref OpsAlertTopic  # you WILL know when the bot breaks
Enter fullscreen mode Exit fullscreen mode

2. Make Every Handler Idempotent

EventBridge and SNS guarantee at-least-once delivery. Your function may be called twice for the same event. Design for it.

# Idempotent snapshot creation — check before creating
def create_snapshot_if_not_exists(volume_id: str, instance_id: str) -> str:
    existing = ec2.describe_snapshots(
        Filters=[
            {'Name': 'volume-id', 'Values': [volume_id]},
            {'Name': 'tag:InstanceId', 'Values': [instance_id]},
            {'Name': 'tag:AutoCreated', 'Values': ['true']},
            # Only check snapshots from the last hour
        ],
        OwnerIds=['self']
    )

    if existing['Snapshots']:
        snap_id = existing['Snapshots'][0]['SnapshotId']
        logger.info(f'Snapshot already exists: {snap_id}. Skipping duplicate.')
        return snap_id

    # Create new snapshot
    snapshot = ec2.create_snapshot(VolumeId=volume_id, ...)
    return snapshot['SnapshotId']
Enter fullscreen mode Exit fullscreen mode

3. Use Dead Letter Queues for Critical Automation

For automation that must not be silently skipped, add an SQS DLQ:

functions:
  autoSnapshot:
    handler: auto_snapshot.handler
    deadLetter:
      targetArn: !GetAtt AutoSnapshotDLQ.Arn
Enter fullscreen mode Exit fullscreen mode

4. Tag Everything Your Bot Creates

Every resource created by automation should be tagged — makes auditing, cost allocation, and cleanup trivial.

Tags=[
    {'Key': 'AutoCreated', 'Value': 'true'},
    {'Key': 'CreatedBy', 'Value': 'serverless-maintenance-bot'},
    {'Key': 'LambdaFunction', 'Value': context.function_name},
    {'Key': 'CreatedAt', 'Value': datetime.utcnow().isoformat()}
]
Enter fullscreen mode Exit fullscreen mode

Summary

Module Trigger Action
Auto-Snapshot EventBridge (EC2 state change) Create EBS snapshots on unexpected stop
Nightly Maintenance EventBridge (cron 2 AM) Cleanup, archive, daily report
Auto-Remediation CloudWatch Alarm → SNS Scale out ASG on high CPU

The serverless maintenance bot replaces what used to require a dedicated ops server, a monitoring daemon, and an on-call engineer for routine events. The entire stack deploys in minutes, costs cents per month in execution time, and handles failures more reliably than any cron job.

The best ops automation is the kind that runs itself.


Next in this series: **Part 6 — Serverless Workflows: Orchestrating Multi-Step Pipelines with AWS Step Functions**

Top comments (0)