In this post, I'll automate the initiation of EC2 malware scans by GuardDuty, using a simple AWS SAM template.
Prerequisites
All EC2 instances to be scanned must be encrypted with AWS KMS CMK
In case you need to modify the KMS encryption key of your existing EBS volume, check out the following resource for more insights
You need the necessary IAM permissions to deploy an AWS SAM application
Walkthrough
We will create a new AWS SAM template file and include the following block of YAML definition to define our Lambda Function responsible of triggering the scans:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
python3.13
Resources:
EC2MalwareScan:
Type: AWS::Serverless::Function
Properties:
FunctionName: ec2-malware-scan-weekly
Description: Initiates GuardDuty on-demand malware scans for running EC2 instances
PackageType: Zip
Runtime: python3.13
Handler: ec2_malware_scan_guardduty.ec2_malware_scan
CodeUri: lambdas/infrastructure/ec2_malware_scan/
Timeout: 60
MemorySize: 256
Tracing: Active
LoggingConfig:
LogFormat: JSON
# CodeSigningConfigArn: !Ref CodeSigningConfig # Optional if you're using code signing already
Architectures:
- x86_64
Events:
WeeklySchedule:
Type: ScheduleV2
Properties:
ScheduleExpression: 'cron(0 6 ? * MON *)'
Name: WeeklyEC2MalwareScan
Description: Weekly EC2 malware scan every Monday at 6 AM UTC
State: ENABLED
RetryPolicy:
MaximumEventAgeInSeconds: 3600
MaximumRetryAttempts: 2
Policies:
- Statement:
- Sid: Ec2Describe
Effect: Allow
Action: ec2:DescribeInstances
Resource: "*"
- Sid: GuardDutyScanOnly
Effect: Allow
Action:
- guardduty:ListDetectors
- guardduty:GetDetector
- guardduty:StartMalwareScan
Resource: "*"
- Sid: IAMPermissions
Effect: Allow
Action:
- iam:GetRole
- iam:PassRole
Resource: "arn:aws:iam::*:role/aws-service-role/malware-protection.guardduty.amazonaws.com/AWSServiceRoleForAmazonGuardDutyMalwareProtection"
- Sid: StsCaller
Effect: Allow
Action: sts:GetCallerIdentity
Resource: "*"
Environment:
Variables:
EXCLUDED_INSTANCES: ""
The above template defines our lambda function running on Python 3.13. It has been scheduled to run on a weekly basis on Monday 6 AM UTC and with a retry policy for the EventBridge schedule to retry the execution of our Lambda.
You can adjust the schedule of the runs according to your needs under the WeeklySchedule event
The IAM permissions attached to the lambda function include the following:
-
EC2 Describe – Allows the function to list running EC2 instances (
ec2:DescribeInstances
). -
GuardDuty Malware Scan – Grants access to list detectors, get detector details, and start on-demand malware scans (
guardduty:ListDetectors
,guardduty:GetDetector
,guardduty:StartMalwareScan
). -
IAM Role Access – Permits the function to read and pass the GuardDuty service-linked role required for malware protection (
iam:GetRole
,iam:PassRole
). -
STS Identity Check – Enables the function to retrieve its own AWS identity for logging and context (
sts:GetCallerIdentity
).
Now, we need to create a Python file to store our code that will launch the automated scans, by leveraging boto3 to interact with GuardDuty.
The file needs to be created under lambdas/infrastructure/ec2_malware_scan/
and should be named ec2_malware_scan.py
as we defined it in the SAM template CodeUri: lambdas/infrastructure/ec2_malware_scan/
Below is the code needed in your Python script:
import boto3
import json
import logging
import os
from typing import List, Dict, Any
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize AWS clients
ec2 = boto3.client('ec2')
guardduty = boto3.client('guardduty')
sts = boto3.client('sts')
def _get_detector_id() -> str:
resp = guardduty.list_detectors()
ids = resp.get('DetectorIds', [])
if not ids:
raise RuntimeError("No GuardDuty detectors found in this region")
return ids[0]
def _malware_protection_enabled() -> bool:
detector_id = _get_detector_id()
det = guardduty.get_detector(DetectorId=detector_id)
ebs = (
det.get('DataSources', {})
.get('MalwareProtection', {})
.get('ScanEc2InstanceWithFindings', {})
.get('EbsVolumes', {})
)
enabled = ebs.get('Status') == 'ENABLED'
if enabled:
logger.info("GuardDuty Malware Protection (EBS) is ENABLED")
else:
logger.warning("GuardDuty Malware Protection (EBS) is DISABLED")
return enabled
def _running_instances() -> List[Dict[str, Any]]:
account_id = sts.get_caller_identity()["Account"]
region = ec2.meta.region_name
paginator = ec2.get_paginator('describe_instances')
pages = paginator.paginate(
Filters=[
{'Name': 'instance-state-name', 'Values': ['running']},
{'Name': 'tag:Name', 'Values': ['*']}
]
)
instances: List[Dict[str, Any]] = []
for page in pages:
for r in page.get('Reservations', []):
for inst in r.get('Instances', []):
iid = inst['InstanceId']
instances.append({
'InstanceId': iid,
'Arn': f"arn:aws:ec2:{region}:{account_id}:instance/{iid}",
})
logger.info(f"Found {len(instances)} running EC2 instances")
return instances
def _start_scan(instance_arn: str) -> Dict[str, Any]:
try:
resp = guardduty.start_malware_scan(ResourceArn=instance_arn)
return {'instance_arn': instance_arn, 'scan_id': resp['ScanId'], 'status': 'started'}
except Exception as e:
logger.error(f"Failed to start scan for {instance_arn}: {e}")
return {'instance_arn': instance_arn, 'status': 'failed', 'error': str(e)}
def ec2_malware_scan(event, context):
logger.info("Weekly GuardDuty EC2 malware scan kickoff")
if not _malware_protection_enabled():
return {
'statusCode': 400,
'body': json.dumps({
'error': 'GuardDuty Malware Protection is not enabled',
'message': 'Enable Malware Protection (EBS volumes) in GuardDuty'
})
}
instances = _running_instances()
# If you want to exclude instances from the scan, you can add their InstanceId to the EXCLUDED_INSTANCES environment variable
excluded = {x.strip() for x in os.environ.get('EXCLUDED_INSTANCES', '').split(',') if x.strip()}
to_scan = [i for i in instances if i['InstanceId'] not in excluded]
logger.info(f"Excluded {len(instances) - len(to_scan)} instance(s); starting scans for {len(to_scan)}")
results = [_start_scan(i['Arn']) for i in to_scan]
return {
'statusCode': 200,
'body': json.dumps({
'message': 'On-demand malware scans initiated',
'instances_scanned': len(results),
'instances_excluded': len(instances) - len(to_scan),
'results': results
})
}
You can exclude instances from being scanned by adding their IDs to the EXCLUDED_INSTANCES environment variable.
Triggering the lambda function manually will run the scans, and you'll hopefully get no malware-infected instances :)
Thanks for tuning in!
Top comments (0)