Anomaly Guardian – A Real-Time Anomaly Detection and Response System
What is this solution?
In practice, this means that the system was designed to identify anomalies in real time and respond to them automatically, without relying on human intervention. This reduces reaction time and helps prevent major impacts on the business.
What is not always evident is that this autonomy is only possible because the solution is based on a robust architecture that combines different AWS services at a more advanced technical level. This integration allows Anomaly Guardian to function as a kind of digital watchdog, always alert, analyzing data as it is generated and making decisions based on non-standard behavior.
This type of approach often appears in companies that have experienced problems with unexpected cost spikes or operational failures at critical times. When these areas — monitoring, automation, and response — work together, the impact tends to be clearer: fewer surprises, more control, and an engineering team with more time to focus on what matters.
What makes this solution different in practice
- Simultaneous multi-pronged analysis – User behavior, transactions, location, and other signals are evaluated together to identify relevant deviations.
- Automatic response to known incidents – If something has already been mapped as a risk, the system resolves it before anyone even notices.
- Relationships that make sense – When an anomaly arises, the correlation engine cross-references events to understand if there is a common cause or a larger failure behind it.
- Ability to predict and act before the problem arises – Based on what has happened before, the system scales or adjusts resources to avoid bottlenecks.
- Designed for multi-client environments – Each company operates independently, with total security and privacy within the same structure.
Solution Architecture
1. Data Ingestion Layer
[API Gateway (REST API)] --> [Kinesis Data Streams] --> [Lambda Event Processor]
Services used:
- Amazon API Gateway Acts as the main RESTful entry point, collecting events from distributed systems or user interactions.
- Amazon Kinesis Data Streams Handles real-time event streaming, ensuring events are reliably captured and queued.
- AWS Lambda Performs the first layer of logic filtering, enriching, or routing events as needed.
Technical characteristics:
- Ingestion rate above 10,000 events/second
- Latency below 100ms for initial processing
- Auto-scaling behavior according to workload volume
2. ML and Analytics Layer
[Kinesis Analytics] --> [SageMaker Endpoint] --> [Elasticsearch (Indexing)]
Services used:
- Amazon Kinesis Analytics Performs SQL-based real-time analysis to identify behavioral patterns.
- Amazon SageMaker Hosts machine learning models that classify anomalies on the fly.
- Amazon Lookout for Metrics Monitors key business metrics and detects anomalies automatically — no manual rules required.
- Amazon Elasticsearch (OpenSearch) Indexes processed events to enable complex querying and pattern detection.
Algorithms implemented:
- Isolation Forest – Detects outliers in high-dimensional data.
- Statistical Process Control (SPC) – Used to monitor and manage process variations.
- Deep Learning Autoencoders – Ideal for identifying temporal anomalies in sequences.
- Correlation Analysis – Finds relationships between multiple variables to uncover contextual anomalies.
3. Intelligence and Response Layer
[Step Functions (Orchestration)] --> [Security Actions (Auto-remediation)] --> [Incident Response]
Services used:
- AWS Step Functions Coordinates complex workflows based on rule evaluation and event triggers.
- Amazon SNS Publishes alerts across multiple channels (email, SMS, etc.), keeping teams informed.
- AWS Lambda Executes targeted response actions based on incident type or rule match.
- Amazon DynamoDB Stores event history, decision trees, and remediation rules in a scalable way.
Types of Detected Anomalies
The system is designed to recognize different kinds of deviations in user behavior, transactions, and infrastructure. Each type of anomaly triggers a specific detection method and response path.
1. Velocity Anomalies
- Detection: Unusual number of requests from the same user or IP address
- Threshold: More than 100 requests within 5 minutes
- Response: Automatic rate limiting to mitigate abuse
2. Value Anomalies
- Detection: Transactions involving unusually large or inconsistent amounts
- Algorithm: Detection based on moving standard deviation
- Response: Manual review triggered for any transaction above $5,000
3. Geolocation Anomalies
- Detection: Sudden changes in user’s geographic location
- Algorithm: Time-series analysis to track location shifts
- Response: Extra layer of security validation
4. Behavioral Anomalies
- Detection: Unexpected or erratic browsing patterns
- Algorithm: Session-based Machine Learning model
- Response: Intensive monitoring mode enabled
5. System Anomalies
- Detection: Drops in performance or availability
- Algorithm: Statistical Process Control (SPC)
- Response: Auto-scaling and workload rebalancing
Configuration and Deployment
Setting up the Anomaly Guardian is straightforward, as long as the basic environment and permissions are in place. Below is a quick guide to ensure everything runs smoothly from the start.
Prerequisites
# AWS CLI v2+
aws --version
# Python 3.9+
python3 --version
# Required IAM Permissions:
# (All services need Full Access for deployment purposes)
- CloudFormation
- SageMaker
- Kinesis
- Lambda
- DynamoDB
- Elasticsearch
- Step Functions
Automated Deployment
# Clone the templates
git clone <repository>
cd anomaly-guardian
# Set up environment variables
export PROJECT_NAME="anomaly-guardian"
export ENVIRONMENT="prod"
export AWS_REGION="us-east-1"
export EMAIL_ALERT="admin@company.com"
# Run the deployment
chmod +x deploy.sh
./deploy.sh deploy`
Manual Deployment
Base Infrastructure Deployment
aws cloudformation deploy \
--template-file anomaly-guardian-base.yaml \
--stack-name anomaly-guardian-prod-base \
--parameter-overrides \
Environment=prod \
ProjectName=anomaly-guardian \
AlertEmail=admin@company.com \
--capabilities CAPABILITY_NAMED_IAM
Advanced Components Deployment
aws cloudformation deploy \
--template-file anomaly-guardian-advanced.yaml \
--stack-name anomaly-guardian-prod-advanced \
--parameter-overrides \
BaseStackName=anomaly-guardian-prod-base \
Environment=prod \
ProjectName=anomaly-guardian \
--capabilities CAPABILITY_NAMED_IAM
Monitoring and Observability
Business Metrics
- Anomalies Detected/Hour: Measures how many anomalies are identified every hour. Useful for spotting spikes or patterns in unusual behavior.
- False Positive Rate: Helps evaluate model precision. A high rate might indicate overly aggressive detection rules.
- Mean Time to Detection (MTTD): Tracks how long it typically takes to detect an anomaly after it occurs.
- Mean Time to Response (MTTR): Measures the average time between detection and mitigation or escalation.
Technical Metrics
- API Gateway Latency: Observes the responsiveness of the ingestion endpoint.
- Lambda Duration: Indicates how long your functions are taking to execute. Spikes may hint at processing bottlenecks.
- Kinesis Incoming/Outgoing Records: Tracks throughput for streaming data is important for ensuring no event backlog.
- SageMaker Endpoint Utilization: Reflects how actively the ML model is being used for inference.
CloudWatch Dashboards
To make monitoring more accessible, the system automatically generates a custom CloudWatch dashboard with widgets that group key metrics by function.
{
"widgets": [
{
"title": "Anomaly Detection Overview",
"metrics": ["AnomalyDetected", "AnomalyScore", "ResponseTime"]
},
{
"title": "System Health",
"metrics": ["Lambda Errors", "Kinesis Throughput", "SageMaker Utilization"]
},
{
"title": "Security Actions",
"metrics": ["Users Blocked", "Rate Limited", "Incidents Created"]
}
]
}
Configured alerts
The system continuously monitors critical thresholds to trigger automated alerts. These alerts are divided into two levels: Critical and Warning, based on severity and impact.
Critical alerts
- Anomaly Score > 0.9 for 3 consecutive periods Indicates highly suspicious behavior sustained over time.
- Lambda Error Rate > 5% A spike in function errors, usually signaling a misconfiguration or downstream issue.
- SageMaker Latency > 5 seconds When model response time degrades significantly, this alert is triggered to prevent detection delays.
Warning alerts
- Anomaly Score > 0.7 for 5 periods: Less severe but still notable often an early sign of unusual behavior emerging.
- Kinesis Utilization > 80%: May indicate stream saturation; important to track before hitting capacity limits.
- Event Correlation Failure: Triggered when expected correlations across dimensions break, which might signal fragmented or inconsistent data.
Security
Security is embedded at every layer of the architecture from encrypted communication channels to granular access policies and network isolation.
Cryptography
In transit:
- TLS 1.2+ enforced across all communications
- SSL/TLS certificates managed through AWS Certificate Manager
At Rest:
- Kinesis: Encrypted with AWS-managed KMS keys
- DynamoDB: Server-side encryption enabled
- S3: AES-256 encryption for stored objects
- Elasticsearch: Encryption at rest enabled
Access control
IAM Roles and Policies:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:*:*:stream/anomaly-guardian-*"
}
]
}
VPC and Network Security
- Critical resources are deployed in private subnets
- Security Groups follow least privilege access rules
- Network ACLs (NACLs) are used for additional subnet-level control
Audit
CloudTrail integration
- Logs all administrative actions performed within the environment.
- Integrated with AWS Config for configuration tracking and compliance auditing.
- Log retention configured for 7 years, supporting long-term audit requirements.
GDPR
- Pseudonymization of personal data where applicable.
- Right to be forgotten implementation included for sensitive datasets.
- Clearly defined data retention policies in line with data protection laws.
Performance and Scalability
Understanding how the system performs under load and how it scales is key to ensuring it can handle real-world demands without degradation.
Performance Benchmarks
Maximum Throughput
- API Gateway: Up to 10,000 RPS (with caching enabled)
- Kinesis: Scales with 1,000 shards (1MB/s per shard)
- Lambda: Supports 1,000 concurrent executions
- SageMaker: Up to 1,000 inferences per second
Typical Latency
- P50 (median): < 50ms (ingestion + initial processing)
- P95: < 200ms
- P99: < 500ms
Auto-Scaling Configuration
Horizontal Scaling
SageMaker:
MinInstances: 1
MaxInstances: 10
TargetUtilization: 70%
Lambda:
ReservedConcurrency: 100
BurstConcurrency: 1000
Kinesis:
AutoScaling: Enabled
TargetUtilization: 70%
Vertical Scaling
- SageMaker endpoints scale automatically based on inference load.
- DynamoDB uses on-demand billing, scaling read/write capacity as needed.
- Elasticsearch (OpenSearch) includes auto-scaling for storage.
Maintenance and Operations
Operational excellence isn’t just about automation it also involves structured routines and a clear playbook for common issues.
Daily
- Health metrics verification
- Review of critical anomalies
- Configuration backup procedures
Weekly
- ML model updates
- Review of false positives
- Threshold optimization
Monthly
- Trend analysis and anomaly patterns
- Capacity planning
- Disaster recovery testing
Troubleshooting common issues
High false positive rate
# Check current thresholds
aws dynamodb scan --table-name anomaly-rules
# Adjust sensitivity
aws dynamodb update-item \
--table-name anomaly-rules \
--key '{"ruleId": {"S": "velocity-check"}}' \
--update-expression "SET threshold = :val" \
--expression-attribute-values '{":val": {"N": "150"}}'
SageMaker Endpoint Down
# Check current endpoint status
aws sagemaker describe-endpoint --endpoint-name anomaly-detection-endpoint
# Restart if needed
aws sagemaker update-endpoint \
--endpoint-name anomaly-detection-endpoint \
--endpoint-config-name new-config
Kinesis Stream Throttling
# Check stream metrics
aws cloudwatch get-metric-statistics \
--namespace AWS/Kinesis \
--metric-name WriteProvisionedThroughputExceeded \
--dimensions Name=StreamName,Value=event-stream
# Increase shard count if needed
aws kinesis update-shard-count \
--stream-name event-stream \
--target-shard-count 16 \
--scaling-type UNIFORM_SCALING
Backup & Disaster Recovery
Backup Strategy
- DynamoDB: Point-in-time recovery (PITR) enabled
- S3: Versioning and cross-region replication configured
- Code: Managed via Git with multiple remote repositories
- Configuration: Daily backups of Parameter Store values
RTO / RPO Targets
- RTO (Recovery Time Objective): Less than 4 hours
- RPO (Recovery Point Objective): Less than 15 minutes
Disaster Recovery (DR) procedures
- Activate secondary region
- Restore DynamoDB backups
- Redeploy CloudFormation stacks
- Redirect traffic to the failover stack
- Perform full system validation
API Reference
Event Ingestion
POST /events
Main endpoint used for real-time event ingestion into the system.
Headers
Content-Type: application/json
Authorization: AWS4-HMAC-SHA256 ...
Request Body
{
"eventId": "unique-event-id",
"userId": "user-123",
"timestamp": "2025-07-24T10:30:00Z",
"eventType": "transaction",
"transactionAmount": 250.00,
"requestsPerSecond": 5,
"userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"geoLocation": "US-East-1",
"sessionLength": 1800,
"ipAddress": "192.168.1.100",
"metadata": {
"productId": "prod-456",
"category": "electronics",
"paymentMethod": "credit_card"
}
}
Response
{
"status": "success",
"message": "Event received",
"eventId": "unique-event-id",
"timestamp": "2025-07-24T10:30:01Z"
}
Error Responses
{
"status": "error",
"code": "INVALID_PAYLOAD",
"message": "Missing required field: userId"
}
Anomaly Query
GET /anomalies
Used to retrieve detected anomalies using optional filters.
Query Parameters
- severity: CRITICAL, HIGH, MEDIUM, LOW
- fromDate: ISO 8601 datetime
- toDate: ISO 8601 datetime
- userId: ID of a specific user
- limit: Max number of results (default: 100)
Example Request
curl -X GET "https://api.anomaly-guardian.com/anomalies?severity=HIGH&limit=50" \
-H "Authorization: Bearer <token>"
Response Example
{
"anomalies": [
{
"anomalyId": "anom-789",
"severity": "HIGH",
"anomalyScore": 0.85,
"detectedAt": "2025-07-24T10:25:00Z",
"userId": "user-123",
"anomalyType": "velocity",
"description": "Unusual request rate detected",
"originalEvent": { ... },
"actionsPerformed": ["rate_limit", "enhanced_monitoring"]
}
],
"pagination": {
"nextToken": "eyJ0aW1lc3RhbXAiOiIyMDI1...",
"hasMore": true
}
}
Rule management
Create Rule
POST /rules
Request Body
{
"ruleName": "high-velocity-check",
"ruleType": "velocity",
"description": "Detect high request velocity",
"conditions": {
"metric": "requestsPerSecond",
"operator": "greater_than",
"threshold": 100,
"timeWindow": "5m"
},
"actions": [
{
"type": "rate_limit",
"parameters": {
"maxRequestsPerMinute": 10,
"duration": "1h"
}
},
{
"type": "notify",
"parameters": {
"channels": ["email", "slack"],
"severity": "HIGH"
}
}
],
"enabled": true
}
Update rule
PUT /rules/{ruleId}
Used to update an existing anomaly detection rule.
Delete rule
DELETE /rules/{ruleId}
Soft delete an existing rule, the rule is deactivated but remains stored.
Webhook Configuration
Register Webhook
POST /webhooks
Request Body
{
"url": "https://your-system.com/webhooks/anomaly",
"events": ["anomaly.detected", "anomaly.resolved"],
"severityFilter": ["CRITICAL", "HIGH"],
"secret": "webhook-secret-key",
"retryPolicy": {
"maxRetries": 3,
"backoffMultiplier": 2
}
}
Advanced Use Cases
- E-commerce Fraud Detection Scenario:
An online store with over 1 million transactions per day needs to detect fraudulent behavior in real time.
Specific configuration:
Anomaly Rules:
- High-Value Transactions: >$5,000
- Velocity Attacks: >50 attempts/minute
- Geographic Anomalies: Multiple locations within <1 hour
- Device Fingerprinting: Suspicious device changes
Response Actions:
- Automatic Transaction Hold
- Multi-factor Authentication Request
- Manual Review Queue
- Customer Communication
Success metrics:
- 95% reduction in undetected fraud attempts
- Detection time under 2 seconds
- False positive rate below 0.1%
- API Security Monitoring Scenario:
Protect critical APIs against DDoS attacks and abusive behavior.
Specific configuration:
Protection Layers:
- Rate Limiting: By IP, user, and endpoint
- Behavior Analysis: Unusual usage patterns
- Bot Detection: Based on user-agent and behavioral fingerprinting
- Geo-blocking: Restrict access from high-risk countries
Auto-Response:
- IP Blacklisting (temporary)
- CAPTCHA Challenge
- Traffic Throttling
- Incident Creation
- IoT Device Monitoring Scenario:
Monitoring of 100k+ IoT devices to detect anomalies, failures, and potential attacks.
Specific Configuration:
Sensor Metrics:
- Temperature Anomalies
- Connectivity Patterns
- Power Consumption
- Data Transmission Rates
ML Models:
- Time Series Forecasting
- Clustering for Device Behavior
- Outlier Detection
- Predictive Maintenance
Machine Learning details
This section outlines the models used for anomaly detection, including feature selection and detection logic.
- Isolation Forest
from sklearn.ensemble import IsolationForest
model = IsolationForest(
contamination=0.1, # 10% expected anomalies
random_state=42,
n_estimators=100,
max_samples='auto'
)
# Selected Features
features = [
'transaction_amount_normalized',
'requests_per_second',
'session_length',
'hour_of_day',
'day_of_week',
'geo_distance_from_usual',
'device_change_indicator'
]
- LSTM Autoencoder (For Time Series)
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.LSTM(50, return_sequences=True, input_shape=(timesteps, features)),
tf.keras.layers.LSTM(30, return_sequences=False),
tf.keras.layers.RepeatVector(timesteps),
tf.keras.layers.LSTM(30, return_sequences=True),
tf.keras.layers.LSTM(50, return_sequences=True),
tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(features))
])
# Anomaly is detected when reconstruction error > threshold
threshold = np.percentile(reconstruction_errors, 95)
- Statistical Process Control (SPC)
import numpy as np
from scipy import stats
def calculate_control_limits(data, window_size=30):
"""Calculates dynamic control limits based on rolling stats"""
rolling_mean = data.rolling(window=window_size).mean()
rolling_std = data.rolling(window=window_size).std()
upper_limit = rolling_mean + 3 * rolling_std
lower_limit = rolling_mean - 3 * rolling_std
return upper_limit, lower_limit
def detect_anomaly(value, upper_limit, lower_limit):
"""Detects anomaly if value is outside control limits"""
return value > upper_limit or value < lower_limit
Feature engineering pipeline
Applied Transformations
- Normalization: Z-score normalization for numerical features.
- Categorical Encoding: One-hot encoding for categorical variables.
- Time Features: Extracted features such as hour, day of week, and month.
- Aggregations: Moving averages, percentiles, counters.
- Geo Features: Distance from typical geolocation patterns.
Feature Selection
from sklearn.feature_selection import SelectKBest, f_classif
# Select top 20 most relevant features
selector = SelectKBest(score_func=f_classif, k=20)
X_selected = selector.fit_transform(X, y)
# Automatically selected features
selected_features = selector.get_support(indices=True)
Model Training pipeline
Data Pipeline
Steps:
1. Data Extraction:
- Source: Kinesis Data Streams
- Format: JSON events
- Volume: 1M+ events/day
2. Data Preprocessing:
- Cleaning: Remove duplicates, handle missing values
- Transformation: Apply feature engineering
- Validation: Data quality checks
3. Feature Store:
- Storage: SageMaker Feature Store
- Serving: Real-time and batch
- Monitoring: Feature drift detection
4. Model Training:
- Framework: SageMaker Training Jobs
- Algorithms: Multiple models ensemble
- Validation: Cross-validation + holdout
5. Model Evaluation:
- Metrics: Precision, Recall, F1, AUC
- Business Metrics: Cost of false positives/negatives
- A/B Testing: Champion/Challenger comparison
6. Model Deployment:
- Staging: Canary deployment
- Production: Blue/green deployment
- Monitoring: Real-time performance tracking
Continuous Learning
Model Retraining Strategy
- Scheduled Retraining: Weekly retraining with newly collected data.
- Performance-Based: Triggered when evaluation metrics drop below acceptable thresholds.
- Concept Drift Detection: Ongoing monitoring of data distribution and feature drift in real time.
Feedback Loop
Incorporating analyst feedback into model updates allows the system to continuously learn from real-world validation, improving accuracy over time.
def update_model_with_feedback(feedback_data):
"""Update model with analyst feedback"""
# Combine analyst labels with original features
labeled_data = combine_feedback_with_features(feedback_data)
# Incremental model retraining
model.partial_fit(labeled_data.features, labeled_data.labels)
# Validate new performance
new_performance = evaluate_model(model, validation_set)
if new_performance > current_performance:
deploy_model(model)
log_model_update(new_performance)
Reports & Analytics
Executive Dashboard
Key KPIs:
- Security Posture Score: 0–100 based on multiple metrics.
- Threat Detection Rate: % of threats detected vs. total.
- False Positive Rate: % of alerts that were false positives.
- Mean Time to Detection: Average time to detect anomalies.
Visualizations:
{
"widgets": [
{
"type": "scorecard",
"title": "Security Score",
"value": 94,
"trend": "+2% from last week"
},
{
"type": "timeseries",
"title": "Anomalies Over Time",
"data": "hourly_anomaly_counts"
},
{
"type": "heatmap",
"title": "Anomaly Types by Hour",
"data": "anomaly_type_hour_matrix"
},
{
"type": "geographic",
"title": "Geographic Distribution",
"data": "anomalies_by_location"
}
]
}
Automated reports
Daily Report:
def generate_daily_report():
report = {
"date": today(),
"summary": {
"total_events_processed": get_event_count(),
"anomalies_detected": get_anomaly_count(),
"actions_performed": get_action_count(),
"system_performance": get_performance_metrics()
},
"top_anomalies": get_top_anomalies(limit=10),
"system_health": get_health_status(),
"recommendations": generate_recommendations()
}
send_report(report, recipients=["security-team@company.com"])
store_report(report, s3_bucket="reports-bucket")
Weekly report:
- Trend analysis
- ML model performance
- Threat evolution
- Adjustment recommendations
Monthly report:
- Solution ROI
- Industry benchmarks
- Capacity planning
- Improvement roadmap
CI/CD Pipeline
Infrastructure as Code
GitOps Workflow:
# .github/workflows/deploy.yml
name: Deploy Anomaly Guardian
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Validate CloudFormation
run: |
aws cloudformation validate-template \
--template-body file://anomaly-guardian-base.yaml
aws cloudformation validate-template \
--template-body file://anomaly-guardian-advanced.yaml
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run Tests
run: |
python -m pytest tests/
python -m pytest integration_tests/
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Security Scan
run: |
checkov -f anomaly-guardian-base.yaml
cfn-lint anomaly-guardian-advanced.yaml
deploy-staging:
needs: [validate, test, security-scan]
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to Staging
run: ./deploy.sh deploy
env:
ENVIRONMENT: staging
integration-tests:
needs: deploy-staging
runs-on: ubuntu-latest
steps:
- name: Run Integration Tests
run: ./deploy.sh test
env:
ENVIRONMENT: staging
deploy-production:
needs: integration-tests
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to Production
run: ./deploy.sh deploy
env:
ENVIRONMENT: prod
Testing Strategy
Unit Tests
import pytest
from anomaly_detector import AnomalyDetector
class TestAnomalyDetection:
def setup_method(self):
self.detector = AnomalyDetector()
def test_velocity_anomaly_detection(self):
event = {
"userId": "test-user",
"requestsPerSecond": 150
}
result = self.detector.detect(event)
assert result.is_anomaly is True
assert result.anomaly_type == "velocity"
def test_normal_behavior(self):
event = {
"userId": "test-user",
"requestsPerSecond": 10
}
result = self.detector.detect(event)
assert result.is_anomaly is False
Integration Tests
import boto3
import json
import time
def test_end_to_end_anomaly_detection():
# Send an anomalous event
kinesis = boto3.client("kinesis")
event = create_anomalous_event()
kinesis.put_record(
StreamName="anomaly-guardian-test-event-stream",
Data=json.dumps(event),
PartitionKey=event["userId"]
)
# Wait for processing
time.sleep(90)
# Verify if anomaly was stored
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("anomaly-guardian-test-anomaly-history")
response = table.get_item(
Key={"anomalyId": event["eventId"]}
)
assert "Item" in response
assert response["Item"]["severity"] == "HIGH"
Performance Tests
import concurrent.futures
import time
def test_throughput():
"""Test maximum system throughput"""
def send_event():
# Send event via API Gateway
response = requests.post(API_ENDPOINT, json=test_event)
return response.status_code == 200
# Run test with 1000 concurrent events
with concurrent.futures.ThreadPoolExecutor(max_workers=1000) as executor:
futures = [executor.submit(send_event) for _ in range(1000)]
results = [r.result() for r in futures]
success_rate = sum(results) / len(results)
assert success_rate >= 0.99 # 99% success rate
Cost Estimation - Production Environment (2M events/day)
Compute
AWS Lambda: Responsible for processing incoming events. Estimated cost is $45/month, considering invocation volume and average execution time.
SageMaker Endpoint: Hosts real-time ML inference. Due to the need for a persistent endpoint, the cost is relatively higher, around $180/month.
Step Functions: Manages orchestration between different stages of the processing flow. The estimated cost is $25/month.Storage
DynamoDB: Used to store anomaly history and related metadata. With high read/write throughput, cost is estimated at $120/month.
S3 (Simple Storage Service): Stores logs, backups, and reports. Assuming 1 TB of monthly traffic, storage costs are around $35/month.
Elasticsearch (OpenSearch): Powers fast search and analytics on event data. With high availability enabled, estimated cost is $200/month.Streaming
Kinesis Data Streams: Handles real-time event ingestion. Based on provisioned throughput, the cost is $150/month.
Kinesis Data Analytics: Runs real-time SQL queries on incoming streams. This service contributes approximately $100/month.Networking
API Gateway: Serves HTTP endpoints to receive and route events. For high-volume access, the estimated monthly cost is $35.
Data Transfer: Covers outbound data (egress) from AWS. Estimated cost is $65/month, assuming standard internet traffic volumes.
Total Monthly Estimate: US$955/month
Cost Optimization Strategies:
- Reserved Instances: 40% savings on SageMaker
# Purchase Reserved Instance for SageMaker
aws sagemaker put-reserved-capacity \
--reserved-capacity-offering-id "offering-123" \
--instance-count 2
- DynamoDB On-Demand: Savings for variable workloads
BillingMode: PAY_PER_REQUEST # vs PROVISIONED
- S3 Intelligent Tiering: Automatic storage cost savings
StorageClass: INTELLIGENT_TIERING
- Lambda Provisioned Concurrency: Optimization for cold starts
ProvisionedConcurrency: 10 # For critical functions only
Cost Monitoring
Budget alerts
MonthlyBudget:
Amount: 1000
Unit: USD
Alerts:
- Threshold: 80%
Type: ACTUAL
- Threshold: 100%
Type: FORECASTED
Cost Anomaly Detection:
def detect_cost_anomalies():
"""Detect anomalies in AWS costs"""
ce = boto3.client("ce")
response = ce.get_anomalies(
DateInterval={
"StartDate": "2025-07-01",
"EndDate": "2025-07-24"
}
)
for anomaly in response["Anomalies"]:
if anomaly["Impact"]["TotalImpact"] > 100: # Trigger alert
send_cost_alert(anomaly)
Advanced Security Features
Encryption Everywhere
Data Classification:
Sensitive Data:
- User IDs: Pseudonymized
- IP Addresses: Hashed
- Transaction Amounts: Encrypted
- Geographic Data: Generalized
Public Data:
- Aggregate Statistics
- System Performance Metrics
- Non-PII Event Metadata
Key Management:
KMS Keys:
Application Key:
Description: "Anomaly Guardian Application Encryption"
KeyRotation: Enabled
Aliases: ["alias/anomaly-guardian-app"]
Database Key:
Description: "DynamoDB Encryption Key"
KeyRotation: Enabled
Aliases: ["alias/anomaly-guardian-db"]
Network Security
VPC Configuration:
Network Architecture:
VPC: 10.0.0.0/16
Public Subnets:
- 10.0.101.0/24 (AZ-1)
- 10.0.102.0/24 (AZ-2)
Private Subnets:
- 10.0.1.0/24 (AZ-1)
- 10.0.2.0/24 (AZ-2)
Database Subnets:
- 10.0.201.0/24 (AZ-1)
- 10.0.202.0/24 (AZ-2)
Security Groups:
Lambda Security Group:
Ingress:
- HTTPS (443) from anywhere
Egress:
- DynamoDB VPC endpoint
- Elasticsearch cluster
API Gateway Security Group:
Ingress:
- HTTPS (443) from anywhere
Egress: None (managed service)
Elasticsearch Security Group:
Ingress:
- Port 443 from Lambda SG
- Port 9200 from Lambda SG
Egress: None
WAF Protection:
WAF Rules:
- Name: "AWSManagedRulesCommonRuleSet"
Priority: 1
Statement:
ManagedRuleGroupStatement:
VendorName: "AWS"
Name: "AWSManagedRulesCommonRuleSet"
- Name: "RateLimitRule"
Priority: 2
Statement:
RateBasedStatement:
Limit: 2000
AggregateKeyType: "IP"
When dealing with highly complex environments and high data volumes, the response time between identifying an anomaly and taking action is crucial. More than just a functional architecture, what is presented here is an approach focused on operational reliability, processing scalability, and cost control.
Each component has been integrated with a purpose: to accurately detect deviations, trigger automated responses, and maintain active governance at all levels from security to resource allocation. This type of solution becomes truly effective when aligned with the organization's technical routine and business objectives. It is not just about technology, but about delivering predictability in scenarios where failures are costly.
Thank you, see you next time.
Top comments (2)
Wow, this is so detailed! Excellent work.
Thank you a lot, @jasondunn ! The primary reason is to expand my capabilities and explore new solutions using AWS.