Building a Multi-Channel AWS GuardDuty Alert System: From Detection to Action
Introduction & Problem Statement
In today's cloud-first world, security threats are evolving rapidly, and traditional perimeter-based security models are no longer sufficient. AWS GuardDuty provides intelligent threat detection using machine learning, anomaly detection, and integrated threat intelligence, but detecting threats is only half the battle. The real challenge lies in ensuring that security teams are immediately notified when threats are detected, enabling rapid response and mitigation.
The Challenge
Many organizations struggle with:
- Alert Fatigue: Security teams receive hundreds of notifications daily
- Delayed Response: Critical security findings buried in email or lost in noise
- Fragmented Communication: Different teams use different communication channels
- Manual Processes: No automated workflow from detection to notification
Our Solution
This article presents a comprehensive, serverless solution that automatically processes AWS GuardDuty findings and delivers intelligent notifications across multiple channels including SNS, email, and Google Chat. The system is designed to be:
- Serverless: No infrastructure to manage
- Multi-channel: Flexible notification delivery
- Configurable: Easy to customize for different environments
- Production-ready: Built with enterprise security and reliability in mind
Architecture & Design Overview
Our solution follows a event-driven architecture pattern that leverages AWS native services for maximum reliability and minimal operational overhead.
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ GuardDuty │ │ EventBridge │ │ Lambda Function │
│ Findings │───▶│ Rule │───▶│ Processor │
└─────────────────┘ └──────────────────┘ └─────────────────────┘
│
┌──────────────────────────────────┼──────────────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ SNS Topic │ │ SMTP Server │ │ Google Chat │
│ Notification │ │ Email Alert │ │ Webhook │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Key Components
- EventBridge Rule: Captures GuardDuty findings in real-time
- Lambda Function: Processes findings and orchestrates notifications
- Multi-Channel Delivery: SNS, Email, and Google Chat integration
- Configuration Management: Environment-based settings for flexibility
Solution Approach
Design Principles
Our solution is built on several key principles:
1. Event-Driven Architecture
- Leverages AWS EventBridge for real-time event processing
- Decoupled components for better maintainability
- Automatic scaling based on finding volume
2. Fail-Safe Notifications
- Multiple notification channels for redundancy
- Graceful degradation if one channel fails
- Comprehensive error handling and logging
3. Security First
- Least privilege IAM roles
- Secure credential management
- No hardcoded secrets in code
4. Operational Excellence
- Comprehensive logging for troubleshooting
- CloudFormation for infrastructure as code
- Automated deployment scripts
Code Walkthrough
Let's examine the key components of our solution:
1. AWS Session Management (AWSSession.py)
The session manager provides flexible AWS authentication supporting multiple credential types:
import boto3
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
def get_aws_session(credentials: Dict[str, Any]) -> boto3.Session:
"""
Create AWS session with flexible authentication methods.
Args:
credentials: Dictionary containing AWS credential information
Returns:
boto3.Session: Configured AWS session
Raises:
ValueError: If invalid credentials provided
boto3.exceptions.Boto3Error: If session creation fails
"""
region = credentials.get("region_name", "us-east-1")
try:
if credentials.get("profile_name"):
logger.info("Creating AWS session with profile authentication")
return boto3.Session(
profile_name=credentials["profile_name"],
region_name=region
)
elif credentials.get("role_arn"):
logger.info("Creating AWS session with assumed role")
return _create_assumed_role_session(credentials["role_arn"], region)
elif credentials.get("session_token"):
logger.info("Creating AWS session with temporary credentials")
return boto3.Session(
aws_access_key_id=credentials["access_key"],
aws_secret_access_key=credentials["secret_key"],
aws_session_token=credentials["session_token"],
region_name=region
)
elif credentials.get("access_key"):
logger.info("Creating AWS session with access keys")
return boto3.Session(
aws_access_key_id=credentials["access_key"],
aws_secret_access_key=credentials["secret_key"],
region_name=region
)
else:
logger.info("Creating AWS session with default credentials")
return boto3.Session(region_name=region)
except Exception as e:
logger.error(f"Failed to create AWS session: {str(e)}")
raise
def _create_assumed_role_session(role_arn: str, region: str) -> boto3.Session:
"""Create session using assumed role credentials."""
sts_client = boto3.client('sts', region_name=region)
response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName='GuardDutyNotificationSession',
DurationSeconds=3600
)
credentials = response['Credentials']
return boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'],
region_name=region
)
2. Notification Engine (Notification.py)
The notification engine handles email delivery with robust error handling:
import smtplib
import logging
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from typing import Dict, List, Any
import os
logger = logging.getLogger(__name__)
def send_email(smtp_config: Dict[str, str], email_details: Dict[str, Any], content: str) -> None:
"""
Send email notification with comprehensive error handling.
Args:
smtp_config: SMTP server configuration
email_details: Email recipient and subject details
content: HTML email content
Raises:
smtplib.SMTPException: If SMTP operation fails
ValueError: If invalid email configuration
"""
logger.info("Initiating email notification")
try:
message = _build_email_message(smtp_config, email_details, content)
recipients = _get_all_recipients(email_details)
_send_via_smtp(smtp_config, message, recipients)
logger.info("Email notification sent successfully")
except Exception as e:
logger.error(f"Email notification failed: {str(e)}")
raise
def _build_email_message(smtp_config: Dict[str, str], email_details: Dict[str, Any], content: str) -> MIMEMultipart:
"""Build email message with headers and content."""
current_date = datetime.now().strftime("%d %B %Y")
subject = f"{email_details.get('subject_prefix', '')} | {email_details['email_subject']} | {current_date}"
message = MIMEMultipart()
message['Subject'] = subject
message['From'] = smtp_config["from_email"]
message['To'] = ",".join(email_details.get("to", []))
message['Cc'] = ",".join(email_details.get("cc", []))
# Attach HTML content
message.attach(MIMEText(content, 'html'))
# Handle attachments if present
for attachment_path in email_details.get("attachments", []):
_attach_file(message, attachment_path)
return message
def _get_all_recipients(email_details: Dict[str, Any]) -> List[str]:
"""Extract all email recipients from configuration."""
recipients = []
for field in ['to', 'cc', 'bcc']:
field_value = email_details.get(field, [])
if isinstance(field_value, list):
recipients.extend(field_value)
elif field_value:
recipients.append(field_value)
if not recipients:
raise ValueError("No valid email recipients configured")
return recipients
def _send_via_smtp(smtp_config: Dict[str, str], message: MIMEMultipart, recipients: List[str]) -> None:
"""Send email via SMTP server."""
server = smtplib.SMTP(smtp_config["host"], int(smtp_config["port"]), timeout=30)
try:
server.starttls()
if smtp_config.get("username"):
server.login(smtp_config["username"], smtp_config["password"])
server.sendmail(smtp_config["from_email"], recipients, message.as_string())
finally:
server.quit()
def _attach_file(message: MIMEMultipart, file_path: str) -> None:
"""Attach file to email message."""
try:
with open(file_path, 'rb') as f:
attachment = MIMEApplication(f.read())
filename = os.path.basename(file_path)
attachment.add_header('Content-Disposition', 'attachment', filename=filename)
message.attach(attachment)
logger.info(f"File attached: {filename}")
except Exception as e:
logger.error(f"Failed to attach file {file_path}: {str(e)}")
raise
3. Lambda Function Handler (lambda_function.py)
The main Lambda function orchestrates the entire notification process:
import json
import os
import logging
from typing import Dict, Any
import requests
from AWSSession import get_aws_session
from Notification import send_email
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
"""
Process GuardDuty findings and send multi-channel notifications.
Args:
event: EventBridge event containing GuardDuty finding
context: Lambda context object
Returns:
Dict containing response status and details
"""
logger.info("Processing GuardDuty finding notification")
try:
# Load configuration
config = _load_configuration()
# Extract finding details
finding_data = _extract_finding_data(event)
# Send notifications based on configuration
notification_results = _send_notifications(config, finding_data)
logger.info("GuardDuty notification processing completed successfully")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Notifications processed successfully',
'results': notification_results,
'finding_id': finding_data['finding_id']
})
}
except Exception as e:
logger.error(f"Failed to process GuardDuty notification: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Notification processing failed',
'message': str(e)
})
}
def _load_configuration() -> Dict[str, Any]:
"""Load configuration from input.json file."""
try:
with open('input.json', 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load configuration: {str(e)}")
raise
def _extract_finding_data(event: Dict[str, Any]) -> Dict[str, Any]:
"""Extract relevant data from GuardDuty finding event."""
detail = event['detail']
return {
'severity': detail['severity'],
'title': detail['title'],
'description': detail['description'],
'finding_type': detail['type'],
'resource': detail['resource']['resourceType'],
'account': event['account'],
'region': event['region'],
'finding_id': detail['id'],
'created_at': detail.get('createdAt', ''),
'updated_at': detail.get('updatedAt', '')
}
def _send_notifications(config: Dict[str, Any], finding_data: Dict[str, Any]) -> Dict[str, str]:
"""Send notifications via configured channels."""
results = {}
# SNS Notification
if _is_enabled('ENABLE_SNS') and os.environ.get('SNS_TOPIC_ARN'):
try:
_send_sns_notification(config, finding_data)
results['sns'] = 'Success'
except Exception as e:
logger.error(f"SNS notification failed: {str(e)}")
results['sns'] = f'Failed: {str(e)}'
# Email Notification
if _is_enabled('ENABLE_EMAIL'):
try:
_send_email_notification(config, finding_data)
results['email'] = 'Success'
except Exception as e:
logger.error(f"Email notification failed: {str(e)}")
results['email'] = f'Failed: {str(e)}'
# Google Chat Notification
if _is_enabled('ENABLE_CHAT') and os.environ.get('GOOGLE_CHAT_WEBHOOK'):
try:
_send_chat_notification(finding_data)
results['chat'] = 'Success'
except Exception as e:
logger.error(f"Chat notification failed: {str(e)}")
results['chat'] = f'Failed: {str(e)}'
return results
def _send_sns_notification(config: Dict[str, Any], finding_data: Dict[str, Any]) -> None:
"""Send SNS notification."""
session = get_aws_session(config['awsCredentials'])
sns_client = session.client('sns')
message = _format_sns_message(finding_data)
subject = f"🚨 GuardDuty Alert | {finding_data['title']}"
sns_client.publish(
TopicArn=os.environ['SNS_TOPIC_ARN'],
Subject=subject,
Message=message
)
logger.info("SNS notification sent successfully")
def _send_email_notification(config: Dict[str, Any], finding_data: Dict[str, Any]) -> None:
"""Send email notification."""
email_content = _format_email_content(finding_data)
email_details = config['emailNotification'].copy()
email_details['email_subject'] = f"GuardDuty Alert | {finding_data['title']}"
send_email(config['smtpCredentials'], email_details, email_content)
logger.info("Email notification sent successfully")
def _send_chat_notification(finding_data: Dict[str, Any]) -> None:
"""Send Google Chat notification."""
message = _format_chat_message(finding_data)
response = requests.post(
os.environ['GOOGLE_CHAT_WEBHOOK'],
json=message,
timeout=10
)
response.raise_for_status()
logger.info("Google Chat notification sent successfully")
def _format_sns_message(finding_data: Dict[str, Any]) -> str:
"""Format message for SNS notification."""
return f"""GuardDuty Security Finding
Title: {finding_data['title']}
Description: {finding_data['description']}
Type: {finding_data['finding_type']}
Severity: {finding_data['severity']}
Resource: {finding_data['resource']}
Account: {finding_data['account']}
Region: {finding_data['region']}
Finding ID: {finding_data['finding_id']}
Action Required: Please investigate this finding in the AWS GuardDuty console immediately."""
def _format_email_content(finding_data: Dict[str, Any]) -> str:
"""Format HTML content for email notification."""
severity_color = _get_severity_color(finding_data['severity'])
return f"""
<html>
<body style="font-family: Arial, sans-serif; margin: 20px;">
<div style="border-left: 4px solid {severity_color}; padding-left: 20px;">
<h2 style="color: {severity_color};">🚨 GuardDuty Security Alert</h2>
<table style="border-collapse: collapse; width: 100%; margin-top: 20px;">
<tr><td style="padding: 8px; font-weight: bold; background-color: #f5f5f5;">Title:</td>
<td style="padding: 8px;">{finding_data['title']}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; background-color: #f5f5f5;">Description:</td>
<td style="padding: 8px;">{finding_data['description']}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; background-color: #f5f5f5;">Type:</td>
<td style="padding: 8px;">{finding_data['finding_type']}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; background-color: #f5f5f5;">Severity:</td>
<td style="padding: 8px; color: {severity_color}; font-weight: bold;">{finding_data['severity']}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; background-color: #f5f5f5;">Resource:</td>
<td style="padding: 8px;">{finding_data['resource']}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; background-color: #f5f5f5;">Account:</td>
<td style="padding: 8px;">{finding_data['account']}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; background-color: #f5f5f5;">Region:</td>
<td style="padding: 8px;">{finding_data['region']}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; background-color: #f5f5f5;">Finding ID:</td>
<td style="padding: 8px; font-family: monospace;">{finding_data['finding_id']}</td></tr>
</table>
<div style="margin-top: 20px; padding: 15px; background-color: #fff3cd; border: 1px solid #ffeaa7; border-radius: 4px;">
<strong>Action Required:</strong> Please investigate this finding in the AWS GuardDuty console immediately.
</div>
</div>
</body>
</html>
"""
def _format_chat_message(finding_data: Dict[str, Any]) -> Dict[str, str]:
"""Format message for Google Chat notification."""
return {
"text": f"""🚨 *GuardDuty Security Alert*
*Title:* {finding_data['title']}
*Description:* {finding_data['description']}
*Type:* {finding_data['finding_type']}
*Severity:* {finding_data['severity']}
*Resource:* {finding_data['resource']}
*Account:* {finding_data['account']}
*Region:* {finding_data['region']}
*Finding ID:* {finding_data['finding_id']}
⚠️ *Action Required:* Please investigate this finding in the AWS GuardDuty console immediately."""
}
def _get_severity_color(severity: float) -> str:
"""Get color code based on severity level."""
if severity >= 7.0:
return "#dc3545" # Red for high severity
elif severity >= 4.0:
return "#fd7e14" # Orange for medium severity
else:
return "#28a745" # Green for low severity
def _is_enabled(env_var: str) -> bool:
"""Check if notification channel is enabled."""
return os.environ.get(env_var, 'false').lower() == 'true'
Configuration & Setup Instructions
Prerequisites
- AWS CLI configured with appropriate permissions
- Python 3.9+ for local development
- Access to AWS Lambda, EventBridge, and SNS services
Step 1: Clone and Prepare
git clone <repository-url>
cd aws-guardduty-notification
Step 2: Configure Input Parameters
Update input.json with your environment-specific settings:
{
"awsCredentials": {
"region_name": "us-east-1",
"profile_name": "your-aws-profile"
},
"smtpCredentials": {
"host": "smtp.gmail.com",
"port": "587",
"username": "your-email@company.com",
"password": "your-app-password",
"from_email": "alerts@company.com"
},
"emailNotification": {
"email_subject": "GuardDuty Security Alert",
"subject_prefix": "SECURITY",
"to": ["security-team@company.com"],
"cc": ["devops@company.com"],
"bcc": []
}
}
Step 3: Deploy Infrastructure
# Make deployment script executable
chmod +x cloudformation_deploy.sh
# Update deployment parameters
export SNS_TOPIC_ARN="arn:aws:sns:us-east-1:123456789012:security-alerts"
export GOOGLE_CHAT_WEBHOOK="https://chat.googleapis.com/v1/spaces/xxx/messages?key=xxx"
# Deploy the stack
./cloudformation_deploy.sh
Step 4: Verify Deployment
# Check stack status
aws cloudformation describe-stacks --stack-name guardduty-notifications
# Test Lambda function
aws lambda invoke --function-name guardduty-notifications-guardduty-processor \
--payload file://test-event.json response.json
Usage Examples
Testing with Sample Event
Create a test event file (test-event.json):
{
"version": "0",
"id": "test-event-id",
"detail-type": "GuardDuty Finding",
"source": "aws.guardduty",
"account": "123456789012",
"time": "2025-01-15T10:30:00Z",
"region": "us-east-1",
"detail": {
"id": "test-finding-id",
"type": "Backdoor:EC2/C&CActivity.B!DNS",
"title": "EC2 instance is querying a domain name associated with a known command and control server",
"description": "EC2 instance i-1234567890abcdef0 is querying a domain name associated with a known command and control server.",
"severity": 8.5,
"resource": {
"resourceType": "Instance"
},
"createdAt": "2025-01-15T10:30:00.000Z",
"updatedAt": "2025-01-15T10:30:00.000Z"
}
}
Environment Variables Configuration
The Lambda function uses these environment variables:
SNS_TOPIC_ARN=arn:aws:sns:us-east-1:123456789012:security-alerts
GOOGLE_CHAT_WEBHOOK=https://chat.googleapis.com/v1/spaces/xxx/messages?key=xxx
ENABLE_SNS=true
ENABLE_EMAIL=true
ENABLE_CHAT=true
Best Practices Followed
1. Security Best Practices
Least Privilege IAM Roles
- Lambda execution role has minimal required permissions
- No hardcoded credentials in source code
- Secure parameter handling for sensitive data
Credential Management
- Support for multiple AWS authentication methods
- Secure SMTP credential handling
- Environment variable usage for configuration
2. Operational Excellence
Comprehensive Logging
- Structured logging throughout the application
- Error tracking and debugging information
- Performance monitoring capabilities
Infrastructure as Code
- Complete CloudFormation template
- Automated deployment scripts
- Version-controlled infrastructure
3. Reliability & Performance
Error Handling
- Graceful degradation when channels fail
- Retry logic for transient failures
- Comprehensive exception handling
Scalability
- Serverless architecture for automatic scaling
- Efficient resource utilization
- Minimal cold start impact
Security & Performance Considerations
Security Measures
-
Data Protection
- No sensitive data logged
- Secure credential transmission
- Encrypted communication channels
-
Access Control
- IAM role-based permissions
- Resource-level access restrictions
- Audit trail maintenance
-
Network Security
- VPC deployment options
- Security group configurations
- Encrypted data in transit
Performance Optimizations
-
Lambda Optimization
- Minimal package size
- Efficient memory allocation
- Connection pooling for SMTP
-
Cost Management
- Pay-per-use pricing model
- Efficient resource utilization
- Automated scaling
Common Pitfalls & Troubleshooting
Issue 1: Email Delivery Failures
Symptoms:
- Email notifications not received
- SMTP authentication errors
Solutions:
# Check SMTP credentials
aws logs filter-log-events --log-group-name /aws/lambda/guardduty-processor \
--filter-pattern "SMTP"
# Verify email configuration
python -c "
import smtplib
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login('username', 'password')
print('SMTP connection successful')
"
Issue 2: Lambda Timeout Errors
Symptoms:
- Function timeouts during execution
- Incomplete notification delivery
Solutions:
- Increase Lambda timeout in CloudFormation template
- Optimize SMTP connection handling
- Implement asynchronous processing for multiple channels
Issue 3: EventBridge Rule Not Triggering
Symptoms:
- GuardDuty findings not triggering Lambda
- Missing event pattern matches
Solutions:
# Verify EventBridge rule
aws events describe-rule --name guardduty-notifications-guardduty-rule
# Check rule targets
aws events list-targets-by-rule --rule guardduty-notifications-guardduty-rule
# Test event pattern
aws events test-event-pattern --event-pattern file://event-pattern.json \
--event file://test-event.json
Issue 4: Google Chat Webhook Failures
Symptoms:
- Chat notifications not appearing
- Webhook authentication errors
Solutions:
- Verify webhook URL format and permissions
- Check Google Chat space configuration
- Test webhook independently:
curl -X POST "https://chat.googleapis.com/v1/spaces/xxx/messages?key=xxx" \
-H "Content-Type: application/json" \
-d '{"text": "Test message"}'
Enhancements & Future Improvements
Planned Enhancements
-
Advanced Filtering
- Severity-based routing
- Finding type categorization
- Custom filtering rules
-
Integration Expansions
- Slack integration
- Microsoft Teams support
- PagerDuty escalation
-
Analytics & Reporting
- Finding trend analysis
- Response time metrics
- Dashboard integration
-
Automation Workflows
- Automatic remediation for low-risk findings
- Ticket creation integration
- Escalation workflows
Implementation Roadmap
Phase 1: Enhanced Filtering (Q2 2025)
def should_notify(finding_data: Dict[str, Any], filters: Dict[str, Any]) -> bool:
"""Determine if finding should trigger notification based on filters."""
severity_threshold = filters.get('min_severity', 0)
excluded_types = filters.get('excluded_types', [])
if finding_data['severity'] < severity_threshold:
return False
if finding_data['finding_type'] in excluded_types:
return False
return True
Phase 2: Slack Integration (Q3 2025)
def send_slack_notification(webhook_url: str, finding_data: Dict[str, Any]) -> None:
"""Send formatted notification to Slack channel."""
payload = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"🚨 GuardDuty Alert: {finding_data['title']}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Severity:* {finding_data['severity']}"},
{"type": "mrkdwn", "text": f"*Account:* {finding_data['account']}"}
]
}
]
}
requests.post(webhook_url, json=payload)
Phase 3: Automated Remediation (Q4 2025)
- Integration with AWS Systems Manager
- Automated EC2 instance isolation
- Security group modification workflows
Conclusion
Building a robust security notification system requires careful consideration of multiple factors: reliability, security, performance, and operational excellence. Our AWS GuardDuty notification solution demonstrates how to leverage serverless architecture and AWS native services to create a production-ready system that scales automatically and provides comprehensive security alerting.
Key Takeaways
- Event-Driven Architecture: Leveraging EventBridge provides real-time processing with minimal operational overhead
- Multi-Channel Redundancy: Supporting multiple notification channels ensures critical alerts are never missed
- Security First: Implementing proper IAM roles and credential management from the start
- Operational Excellence: Comprehensive logging and infrastructure as code enable reliable operations
Getting Started
The complete solution is available in our repository with detailed setup instructions. Start with the basic configuration and gradually enable additional notification channels based on your team's needs.
Community & Support
We encourage contributions and feedback from the community. Whether you're implementing additional notification channels, improving error handling, or adding new features, your contributions help make this solution better for everyone.
Next Steps:
- Deploy the basic solution in your environment
- Customize notification channels for your team
- Implement additional filtering based on your security requirements
- Consider contributing enhancements back to the community
Security is a team effort, and automated notification systems like this one help ensure that your security team can respond quickly to threats. By following the patterns and practices outlined in this article, you can build robust, scalable security automation that grows with your organization's needs.
This solution has been tested in production environments and follows AWS Well-Architected Framework principles. For questions or contributions, please refer to the repository's issue tracker and contribution guidelines.
Top comments (0)