DEV Community

Cover image for I created a solution for AWS called Anomaly Guardian

I created a solution for AWS called Anomaly Guardian

Anomaly Guardian – A Real-Time Anomaly Detection and Response System

What is this solution?

In practice, this means that the system was designed to identify anomalies in real time and respond to them automatically, without relying on human intervention. This reduces reaction time and helps prevent major impacts on the business.

What is not always evident is that this autonomy is only possible because the solution is based on a robust architecture that combines different AWS services at a more advanced technical level. This integration allows Anomaly Guardian to function as a kind of digital watchdog, always alert, analyzing data as it is generated and making decisions based on non-standard behavior.

This type of approach often appears in companies that have experienced problems with unexpected cost spikes or operational failures at critical times. When these areas — monitoring, automation, and response — work together, the impact tends to be clearer: fewer surprises, more control, and an engineering team with more time to focus on what matters.

What makes this solution different in practice

  • Simultaneous multi-pronged analysis – User behavior, transactions, location, and other signals are evaluated together to identify relevant deviations.
  • Automatic response to known incidents – If something has already been mapped as a risk, the system resolves it before anyone even notices.
  • Relationships that make sense – When an anomaly arises, the correlation engine cross-references events to understand if there is a common cause or a larger failure behind it.
  • Ability to predict and act before the problem arises – Based on what has happened before, the system scales or adjusts resources to avoid bottlenecks.
  • Designed for multi-client environments – Each company operates independently, with total security and privacy within the same structure.

Solution Architecture

1. Data Ingestion Layer

[API Gateway (REST API)] --> [Kinesis Data Streams] --> [Lambda Event Processor]
Enter fullscreen mode Exit fullscreen mode

Services used:

  • Amazon API Gateway Acts as the main RESTful entry point, collecting events from distributed systems or user interactions.
  • Amazon Kinesis Data Streams Handles real-time event streaming, ensuring events are reliably captured and queued.
  • AWS Lambda Performs the first layer of logic filtering, enriching, or routing events as needed.

Technical characteristics:

  • Ingestion rate above 10,000 events/second
  • Latency below 100ms for initial processing
  • Auto-scaling behavior according to workload volume

2. ML and Analytics Layer

[Kinesis Analytics] --> [SageMaker Endpoint] --> [Elasticsearch (Indexing)]
Enter fullscreen mode Exit fullscreen mode

Services used:

  • Amazon Kinesis Analytics Performs SQL-based real-time analysis to identify behavioral patterns.
  • Amazon SageMaker Hosts machine learning models that classify anomalies on the fly.
  • Amazon Lookout for Metrics Monitors key business metrics and detects anomalies automatically — no manual rules required.
  • Amazon Elasticsearch (OpenSearch) Indexes processed events to enable complex querying and pattern detection.

Algorithms implemented:

  • Isolation Forest – Detects outliers in high-dimensional data.
  • Statistical Process Control (SPC) – Used to monitor and manage process variations.
  • Deep Learning Autoencoders – Ideal for identifying temporal anomalies in sequences.
  • Correlation Analysis – Finds relationships between multiple variables to uncover contextual anomalies.

3. Intelligence and Response Layer

[Step Functions (Orchestration)] --> [Security Actions (Auto-remediation)] --> [Incident Response]
Enter fullscreen mode Exit fullscreen mode

Services used:

  • AWS Step Functions Coordinates complex workflows based on rule evaluation and event triggers.
  • Amazon SNS Publishes alerts across multiple channels (email, SMS, etc.), keeping teams informed.
  • AWS Lambda Executes targeted response actions based on incident type or rule match.
  • Amazon DynamoDB Stores event history, decision trees, and remediation rules in a scalable way.

Types of Detected Anomalies

The system is designed to recognize different kinds of deviations in user behavior, transactions, and infrastructure. Each type of anomaly triggers a specific detection method and response path.


1. Velocity Anomalies

  • Detection: Unusual number of requests from the same user or IP address
  • Threshold: More than 100 requests within 5 minutes
  • Response: Automatic rate limiting to mitigate abuse

2. Value Anomalies

  • Detection: Transactions involving unusually large or inconsistent amounts
  • Algorithm: Detection based on moving standard deviation
  • Response: Manual review triggered for any transaction above $5,000

3. Geolocation Anomalies

  • Detection: Sudden changes in user’s geographic location
  • Algorithm: Time-series analysis to track location shifts
  • Response: Extra layer of security validation

4. Behavioral Anomalies

  • Detection: Unexpected or erratic browsing patterns
  • Algorithm: Session-based Machine Learning model
  • Response: Intensive monitoring mode enabled

5. System Anomalies

  • Detection: Drops in performance or availability
  • Algorithm: Statistical Process Control (SPC)
  • Response: Auto-scaling and workload rebalancing

Configuration and Deployment

Setting up the Anomaly Guardian is straightforward, as long as the basic environment and permissions are in place. Below is a quick guide to ensure everything runs smoothly from the start.

Prerequisites

# AWS CLI v2+
aws --version

# Python 3.9+
python3 --version

# Required IAM Permissions:
# (All services need Full Access for deployment purposes)
- CloudFormation
- SageMaker
- Kinesis
- Lambda
- DynamoDB
- Elasticsearch
- Step Functions
Enter fullscreen mode Exit fullscreen mode

Automated Deployment

# Clone the templates
git clone <repository>
cd anomaly-guardian

# Set up environment variables
export PROJECT_NAME="anomaly-guardian"
export ENVIRONMENT="prod"
export AWS_REGION="us-east-1"
export EMAIL_ALERT="admin@company.com"

# Run the deployment
chmod +x deploy.sh
./deploy.sh deploy`
Enter fullscreen mode Exit fullscreen mode

Manual Deployment

Base Infrastructure Deployment

aws cloudformation deploy \
  --template-file anomaly-guardian-base.yaml \
  --stack-name anomaly-guardian-prod-base \
  --parameter-overrides \
    Environment=prod \
    ProjectName=anomaly-guardian \
    AlertEmail=admin@company.com \
  --capabilities CAPABILITY_NAMED_IAM
Enter fullscreen mode Exit fullscreen mode

Advanced Components Deployment

aws cloudformation deploy \
  --template-file anomaly-guardian-advanced.yaml \
  --stack-name anomaly-guardian-prod-advanced \
  --parameter-overrides \
    BaseStackName=anomaly-guardian-prod-base \
    Environment=prod \
    ProjectName=anomaly-guardian \
  --capabilities CAPABILITY_NAMED_IAM
Enter fullscreen mode Exit fullscreen mode

Monitoring and Observability

Business Metrics

  • Anomalies Detected/Hour: Measures how many anomalies are identified every hour. Useful for spotting spikes or patterns in unusual behavior.
  • False Positive Rate: Helps evaluate model precision. A high rate might indicate overly aggressive detection rules.
  • Mean Time to Detection (MTTD): Tracks how long it typically takes to detect an anomaly after it occurs.
  • Mean Time to Response (MTTR): Measures the average time between detection and mitigation or escalation.

Technical Metrics

  • API Gateway Latency: Observes the responsiveness of the ingestion endpoint.
  • Lambda Duration: Indicates how long your functions are taking to execute. Spikes may hint at processing bottlenecks.
  • Kinesis Incoming/Outgoing Records: Tracks throughput for streaming data is important for ensuring no event backlog.
  • SageMaker Endpoint Utilization: Reflects how actively the ML model is being used for inference.

CloudWatch Dashboards

To make monitoring more accessible, the system automatically generates a custom CloudWatch dashboard with widgets that group key metrics by function.

{
  "widgets": [
    {
      "title": "Anomaly Detection Overview",
      "metrics": ["AnomalyDetected", "AnomalyScore", "ResponseTime"]
    },
    {
      "title": "System Health",
      "metrics": ["Lambda Errors", "Kinesis Throughput", "SageMaker Utilization"]
    },
    {
      "title": "Security Actions",
      "metrics": ["Users Blocked", "Rate Limited", "Incidents Created"]
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Configured alerts

The system continuously monitors critical thresholds to trigger automated alerts. These alerts are divided into two levels: Critical and Warning, based on severity and impact.

Critical alerts

  • Anomaly Score > 0.9 for 3 consecutive periods Indicates highly suspicious behavior sustained over time.
  • Lambda Error Rate > 5% A spike in function errors, usually signaling a misconfiguration or downstream issue.
  • SageMaker Latency > 5 seconds When model response time degrades significantly, this alert is triggered to prevent detection delays.

Warning alerts

  • Anomaly Score > 0.7 for 5 periods: Less severe but still notable often an early sign of unusual behavior emerging.
  • Kinesis Utilization > 80%: May indicate stream saturation; important to track before hitting capacity limits.
  • Event Correlation Failure: Triggered when expected correlations across dimensions break, which might signal fragmented or inconsistent data.

Security

Security is embedded at every layer of the architecture from encrypted communication channels to granular access policies and network isolation.

Cryptography

In transit:

  • TLS 1.2+ enforced across all communications
  • SSL/TLS certificates managed through AWS Certificate Manager

At Rest:

  • Kinesis: Encrypted with AWS-managed KMS keys
  • DynamoDB: Server-side encryption enabled
  • S3: AES-256 encryption for stored objects
  • Elasticsearch: Encryption at rest enabled

Access control

IAM Roles and Policies:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:*:*:stream/anomaly-guardian-*"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

VPC and Network Security

  • Critical resources are deployed in private subnets
  • Security Groups follow least privilege access rules
  • Network ACLs (NACLs) are used for additional subnet-level control

Audit

CloudTrail integration

  • Logs all administrative actions performed within the environment.
  • Integrated with AWS Config for configuration tracking and compliance auditing.
  • Log retention configured for 7 years, supporting long-term audit requirements.

GDPR

  • Pseudonymization of personal data where applicable.
  • Right to be forgotten implementation included for sensitive datasets.
  • Clearly defined data retention policies in line with data protection laws.

Performance and Scalability

Understanding how the system performs under load and how it scales is key to ensuring it can handle real-world demands without degradation.

Performance Benchmarks

Maximum Throughput

  • API Gateway: Up to 10,000 RPS (with caching enabled)
  • Kinesis: Scales with 1,000 shards (1MB/s per shard)
  • Lambda: Supports 1,000 concurrent executions
  • SageMaker: Up to 1,000 inferences per second

Typical Latency

  • P50 (median): < 50ms (ingestion + initial processing)
  • P95: < 200ms
  • P99: < 500ms

Auto-Scaling Configuration

Horizontal Scaling

SageMaker:
  MinInstances: 1
  MaxInstances: 10
  TargetUtilization: 70%

Lambda:
  ReservedConcurrency: 100
  BurstConcurrency: 1000

Kinesis:
  AutoScaling: Enabled
  TargetUtilization: 70%
Enter fullscreen mode Exit fullscreen mode

Vertical Scaling

  • SageMaker endpoints scale automatically based on inference load.
  • DynamoDB uses on-demand billing, scaling read/write capacity as needed.
  • Elasticsearch (OpenSearch) includes auto-scaling for storage.

Maintenance and Operations

Operational excellence isn’t just about automation it also involves structured routines and a clear playbook for common issues.

Daily

  • Health metrics verification
  • Review of critical anomalies
  • Configuration backup procedures

Weekly

  • ML model updates
  • Review of false positives
  • Threshold optimization

Monthly

  • Trend analysis and anomaly patterns
  • Capacity planning
  • Disaster recovery testing

Troubleshooting common issues

High false positive rate

# Check current thresholds
aws dynamodb scan --table-name anomaly-rules

# Adjust sensitivity
aws dynamodb update-item \
  --table-name anomaly-rules \
  --key '{"ruleId": {"S": "velocity-check"}}' \
  --update-expression "SET threshold = :val" \
  --expression-attribute-values '{":val": {"N": "150"}}'
Enter fullscreen mode Exit fullscreen mode

SageMaker Endpoint Down

# Check current endpoint status
aws sagemaker describe-endpoint --endpoint-name anomaly-detection-endpoint

# Restart if needed
aws sagemaker update-endpoint \
  --endpoint-name anomaly-detection-endpoint \
  --endpoint-config-name new-config
Enter fullscreen mode Exit fullscreen mode

Kinesis Stream Throttling

# Check stream metrics
aws cloudwatch get-metric-statistics \
  --namespace AWS/Kinesis \
  --metric-name WriteProvisionedThroughputExceeded \
  --dimensions Name=StreamName,Value=event-stream

# Increase shard count if needed
aws kinesis update-shard-count \
  --stream-name event-stream \
  --target-shard-count 16 \
  --scaling-type UNIFORM_SCALING
Enter fullscreen mode Exit fullscreen mode

Backup & Disaster Recovery

Backup Strategy

  • DynamoDB: Point-in-time recovery (PITR) enabled
  • S3: Versioning and cross-region replication configured
  • Code: Managed via Git with multiple remote repositories
  • Configuration: Daily backups of Parameter Store values

RTO / RPO Targets

  • RTO (Recovery Time Objective): Less than 4 hours
  • RPO (Recovery Point Objective): Less than 15 minutes

Disaster Recovery (DR) procedures

  1. Activate secondary region
  2. Restore DynamoDB backups
  3. Redeploy CloudFormation stacks
  4. Redirect traffic to the failover stack
  5. Perform full system validation

API Reference

Event Ingestion

POST /events
Main endpoint used for real-time event ingestion into the system.

Headers

Content-Type: application/json  
Authorization: AWS4-HMAC-SHA256 ...
Enter fullscreen mode Exit fullscreen mode

Request Body

{
  "eventId": "unique-event-id",
  "userId": "user-123",
  "timestamp": "2025-07-24T10:30:00Z",
  "eventType": "transaction",
  "transactionAmount": 250.00,
  "requestsPerSecond": 5,
  "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
  "geoLocation": "US-East-1",
  "sessionLength": 1800,
  "ipAddress": "192.168.1.100",
  "metadata": {
    "productId": "prod-456",
    "category": "electronics",
    "paymentMethod": "credit_card"
  }
}
Enter fullscreen mode Exit fullscreen mode

Response

{
  "status": "success",
  "message": "Event received",
  "eventId": "unique-event-id",
  "timestamp": "2025-07-24T10:30:01Z"
}
Enter fullscreen mode Exit fullscreen mode

Error Responses

{
  "status": "error",
  "code": "INVALID_PAYLOAD",
  "message": "Missing required field: userId"
}
Enter fullscreen mode Exit fullscreen mode

Anomaly Query

GET /anomalies
Used to retrieve detected anomalies using optional filters.

Query Parameters

  • severity: CRITICAL, HIGH, MEDIUM, LOW
  • fromDate: ISO 8601 datetime
  • toDate: ISO 8601 datetime
  • userId: ID of a specific user
  • limit: Max number of results (default: 100)

Example Request

curl -X GET "https://api.anomaly-guardian.com/anomalies?severity=HIGH&limit=50" \
  -H "Authorization: Bearer <token>"
Enter fullscreen mode Exit fullscreen mode

Response Example

{
  "anomalies": [
    {
      "anomalyId": "anom-789",
      "severity": "HIGH",
      "anomalyScore": 0.85,
      "detectedAt": "2025-07-24T10:25:00Z",
      "userId": "user-123",
      "anomalyType": "velocity",
      "description": "Unusual request rate detected",
      "originalEvent": { ... },
      "actionsPerformed": ["rate_limit", "enhanced_monitoring"]
    }
  ],
  "pagination": {
    "nextToken": "eyJ0aW1lc3RhbXAiOiIyMDI1...",
    "hasMore": true
  }
}
Enter fullscreen mode Exit fullscreen mode

Rule management

Create Rule
POST /rules
Request Body

{
  "ruleName": "high-velocity-check",
  "ruleType": "velocity",
  "description": "Detect high request velocity",
  "conditions": {
    "metric": "requestsPerSecond",
    "operator": "greater_than",
    "threshold": 100,
    "timeWindow": "5m"
  },
  "actions": [
    {
      "type": "rate_limit",
      "parameters": {
        "maxRequestsPerMinute": 10,
        "duration": "1h"
      }
    },
    {
      "type": "notify",
      "parameters": {
        "channels": ["email", "slack"],
        "severity": "HIGH"
      }
    }
  ],
  "enabled": true
}
Enter fullscreen mode Exit fullscreen mode

Update rule
PUT /rules/{ruleId}
Used to update an existing anomaly detection rule.

Delete rule
DELETE /rules/{ruleId}
Soft delete an existing rule, the rule is deactivated but remains stored.

Webhook Configuration

Register Webhook
POST /webhooks

Request Body

{
  "url": "https://your-system.com/webhooks/anomaly",
  "events": ["anomaly.detected", "anomaly.resolved"],
  "severityFilter": ["CRITICAL", "HIGH"],
  "secret": "webhook-secret-key",
  "retryPolicy": {
    "maxRetries": 3,
    "backoffMultiplier": 2
  }
}
Enter fullscreen mode Exit fullscreen mode

Advanced Use Cases

  1. E-commerce Fraud Detection Scenario:

An online store with over 1 million transactions per day needs to detect fraudulent behavior in real time.

Specific configuration:

Anomaly Rules:
  - High-Value Transactions: >$5,000
  - Velocity Attacks: >50 attempts/minute
  - Geographic Anomalies: Multiple locations within <1 hour
  - Device Fingerprinting: Suspicious device changes

Response Actions:
  - Automatic Transaction Hold
  - Multi-factor Authentication Request
  - Manual Review Queue
  - Customer Communication
Enter fullscreen mode Exit fullscreen mode

Success metrics:

  • 95% reduction in undetected fraud attempts
  • Detection time under 2 seconds
  • False positive rate below 0.1%
  1. API Security Monitoring Scenario:

Protect critical APIs against DDoS attacks and abusive behavior.

Specific configuration:

Protection Layers:
  - Rate Limiting: By IP, user, and endpoint
  - Behavior Analysis: Unusual usage patterns
  - Bot Detection: Based on user-agent and behavioral fingerprinting
  - Geo-blocking: Restrict access from high-risk countries

Auto-Response:
  - IP Blacklisting (temporary)
  - CAPTCHA Challenge
  - Traffic Throttling
  - Incident Creation
Enter fullscreen mode Exit fullscreen mode
  1. IoT Device Monitoring Scenario:

Monitoring of 100k+ IoT devices to detect anomalies, failures, and potential attacks.

Specific Configuration:

Sensor Metrics:
  - Temperature Anomalies
  - Connectivity Patterns
  - Power Consumption
  - Data Transmission Rates

ML Models:
  - Time Series Forecasting
  - Clustering for Device Behavior
  - Outlier Detection
  - Predictive Maintenance
Enter fullscreen mode Exit fullscreen mode

Machine Learning details

This section outlines the models used for anomaly detection, including feature selection and detection logic.

  1. Isolation Forest
from sklearn.ensemble import IsolationForest

model = IsolationForest(
    contamination=0.1,      # 10% expected anomalies
    random_state=42,
    n_estimators=100,
    max_samples='auto'
)

# Selected Features
features = [
    'transaction_amount_normalized',
    'requests_per_second',
    'session_length',
    'hour_of_day',
    'day_of_week',
    'geo_distance_from_usual',
    'device_change_indicator'
]
Enter fullscreen mode Exit fullscreen mode
  1. LSTM Autoencoder (For Time Series)
import tensorflow as tf

model = tf.keras.Sequential([
    tf.keras.layers.LSTM(50, return_sequences=True, input_shape=(timesteps, features)),
    tf.keras.layers.LSTM(30, return_sequences=False),
    tf.keras.layers.RepeatVector(timesteps),
    tf.keras.layers.LSTM(30, return_sequences=True),
    tf.keras.layers.LSTM(50, return_sequences=True),
    tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(features))
])

# Anomaly is detected when reconstruction error > threshold
threshold = np.percentile(reconstruction_errors, 95)
Enter fullscreen mode Exit fullscreen mode
  1. Statistical Process Control (SPC)
import numpy as np
from scipy import stats

def calculate_control_limits(data, window_size=30):
    """Calculates dynamic control limits based on rolling stats"""
    rolling_mean = data.rolling(window=window_size).mean()
    rolling_std = data.rolling(window=window_size).std()

    upper_limit = rolling_mean + 3 * rolling_std
    lower_limit = rolling_mean - 3 * rolling_std

    return upper_limit, lower_limit

def detect_anomaly(value, upper_limit, lower_limit):
    """Detects anomaly if value is outside control limits"""
    return value > upper_limit or value < lower_limit
Enter fullscreen mode Exit fullscreen mode

Feature engineering pipeline

Applied Transformations

  1. Normalization: Z-score normalization for numerical features.
  2. Categorical Encoding: One-hot encoding for categorical variables.
  3. Time Features: Extracted features such as hour, day of week, and month.
  4. Aggregations: Moving averages, percentiles, counters.
  5. Geo Features: Distance from typical geolocation patterns.

Feature Selection

from sklearn.feature_selection import SelectKBest, f_classif

# Select top 20 most relevant features
selector = SelectKBest(score_func=f_classif, k=20)
X_selected = selector.fit_transform(X, y)

# Automatically selected features
selected_features = selector.get_support(indices=True)
Enter fullscreen mode Exit fullscreen mode

Model Training pipeline

Data Pipeline

Steps:
1. Data Extraction:
   - Source: Kinesis Data Streams
   - Format: JSON events
   - Volume: 1M+ events/day

2. Data Preprocessing:
   - Cleaning: Remove duplicates, handle missing values
   - Transformation: Apply feature engineering
   - Validation: Data quality checks

3. Feature Store:
   - Storage: SageMaker Feature Store
   - Serving: Real-time and batch
   - Monitoring: Feature drift detection

4. Model Training:
   - Framework: SageMaker Training Jobs
   - Algorithms: Multiple models ensemble
   - Validation: Cross-validation + holdout

5. Model Evaluation:
   - Metrics: Precision, Recall, F1, AUC
   - Business Metrics: Cost of false positives/negatives
   - A/B Testing: Champion/Challenger comparison

6. Model Deployment:
   - Staging: Canary deployment
   - Production: Blue/green deployment
   - Monitoring: Real-time performance tracking
Enter fullscreen mode Exit fullscreen mode

Continuous Learning

Model Retraining Strategy

  • Scheduled Retraining: Weekly retraining with newly collected data.
  • Performance-Based: Triggered when evaluation metrics drop below acceptable thresholds.
  • Concept Drift Detection: Ongoing monitoring of data distribution and feature drift in real time.

Feedback Loop

Incorporating analyst feedback into model updates allows the system to continuously learn from real-world validation, improving accuracy over time.

def update_model_with_feedback(feedback_data):
    """Update model with analyst feedback"""

    # Combine analyst labels with original features
    labeled_data = combine_feedback_with_features(feedback_data)

    # Incremental model retraining
    model.partial_fit(labeled_data.features, labeled_data.labels)

    # Validate new performance
    new_performance = evaluate_model(model, validation_set)

    if new_performance > current_performance:
        deploy_model(model)
        log_model_update(new_performance)
Enter fullscreen mode Exit fullscreen mode

Reports & Analytics

Executive Dashboard

Key KPIs:

  • Security Posture Score: 0–100 based on multiple metrics.
  • Threat Detection Rate: % of threats detected vs. total.
  • False Positive Rate: % of alerts that were false positives.
  • Mean Time to Detection: Average time to detect anomalies.

Visualizations:

{
  "widgets": [
    {
      "type": "scorecard",
      "title": "Security Score",
      "value": 94,
      "trend": "+2% from last week"
    },
    {
      "type": "timeseries",
      "title": "Anomalies Over Time",
      "data": "hourly_anomaly_counts"
    },
    {
      "type": "heatmap",
      "title": "Anomaly Types by Hour",
      "data": "anomaly_type_hour_matrix"
    },
    {
      "type": "geographic",
      "title": "Geographic Distribution",
      "data": "anomalies_by_location"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Automated reports

Daily Report:

def generate_daily_report():
    report = {
        "date": today(),
        "summary": {
            "total_events_processed": get_event_count(),
            "anomalies_detected": get_anomaly_count(),
            "actions_performed": get_action_count(),
            "system_performance": get_performance_metrics()
        },
        "top_anomalies": get_top_anomalies(limit=10),
        "system_health": get_health_status(),
        "recommendations": generate_recommendations()
    }

    send_report(report, recipients=["security-team@company.com"])
    store_report(report, s3_bucket="reports-bucket")
Enter fullscreen mode Exit fullscreen mode

Weekly report:

  • Trend analysis
  • ML model performance
  • Threat evolution
  • Adjustment recommendations

Monthly report:

  • Solution ROI
  • Industry benchmarks
  • Capacity planning
  • Improvement roadmap

CI/CD Pipeline

Infrastructure as Code

GitOps Workflow:

# .github/workflows/deploy.yml
name: Deploy Anomaly Guardian

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:

  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Validate CloudFormation
        run: |
          aws cloudformation validate-template \
            --template-body file://anomaly-guardian-base.yaml
          aws cloudformation validate-template \
            --template-body file://anomaly-guardian-advanced.yaml

  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run Tests
        run: |
          python -m pytest tests/
          python -m pytest integration_tests/

  security-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Security Scan
        run: |
          checkov -f anomaly-guardian-base.yaml
          cfn-lint anomaly-guardian-advanced.yaml

  deploy-staging:
    needs: [validate, test, security-scan]
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Staging
        run: ./deploy.sh deploy
        env:
          ENVIRONMENT: staging

  integration-tests:
    needs: deploy-staging
    runs-on: ubuntu-latest
    steps:
      - name: Run Integration Tests
        run: ./deploy.sh test
        env:
          ENVIRONMENT: staging

  deploy-production:
    needs: integration-tests
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Production
        run: ./deploy.sh deploy
        env:
          ENVIRONMENT: prod
Enter fullscreen mode Exit fullscreen mode

Testing Strategy

Unit Tests

import pytest
from anomaly_detector import AnomalyDetector

class TestAnomalyDetection:
    def setup_method(self):
        self.detector = AnomalyDetector()

    def test_velocity_anomaly_detection(self):
        event = {
            "userId": "test-user",
            "requestsPerSecond": 150
        }
        result = self.detector.detect(event)
        assert result.is_anomaly is True
        assert result.anomaly_type == "velocity"

    def test_normal_behavior(self):
        event = {
            "userId": "test-user",
            "requestsPerSecond": 10
        }
        result = self.detector.detect(event)
        assert result.is_anomaly is False
Enter fullscreen mode Exit fullscreen mode

Integration Tests

import boto3
import json
import time

def test_end_to_end_anomaly_detection():
    # Send an anomalous event
    kinesis = boto3.client("kinesis")
    event = create_anomalous_event()

    kinesis.put_record(
        StreamName="anomaly-guardian-test-event-stream",
        Data=json.dumps(event),
        PartitionKey=event["userId"]
    )

    # Wait for processing
    time.sleep(90)

    # Verify if anomaly was stored
    dynamodb = boto3.resource("dynamodb")
    table = dynamodb.Table("anomaly-guardian-test-anomaly-history")

    response = table.get_item(
        Key={"anomalyId": event["eventId"]}
    )

    assert "Item" in response
    assert response["Item"]["severity"] == "HIGH"
Enter fullscreen mode Exit fullscreen mode

Performance Tests

import concurrent.futures
import time

def test_throughput():
    """Test maximum system throughput"""

    def send_event():
        # Send event via API Gateway
        response = requests.post(API_ENDPOINT, json=test_event)
        return response.status_code == 200

    # Run test with 1000 concurrent events
    with concurrent.futures.ThreadPoolExecutor(max_workers=1000) as executor:
        futures = [executor.submit(send_event) for _ in range(1000)]
        results = [r.result() for r in futures]

    success_rate = sum(results) / len(results)
    assert success_rate >= 0.99  # 99% success rate
Enter fullscreen mode Exit fullscreen mode

Cost Estimation - Production Environment (2M events/day)

  1. Compute

    AWS Lambda: Responsible for processing incoming events. Estimated cost is $45/month, considering invocation volume and average execution time.

    SageMaker Endpoint: Hosts real-time ML inference. Due to the need for a persistent endpoint, the cost is relatively higher, around $180/month.

    Step Functions: Manages orchestration between different stages of the processing flow. The estimated cost is $25/month.

  2. Storage

    DynamoDB: Used to store anomaly history and related metadata. With high read/write throughput, cost is estimated at $120/month.

    S3 (Simple Storage Service): Stores logs, backups, and reports. Assuming 1 TB of monthly traffic, storage costs are around $35/month.

    Elasticsearch (OpenSearch): Powers fast search and analytics on event data. With high availability enabled, estimated cost is $200/month.

  3. Streaming

    Kinesis Data Streams: Handles real-time event ingestion. Based on provisioned throughput, the cost is $150/month.

    Kinesis Data Analytics: Runs real-time SQL queries on incoming streams. This service contributes approximately $100/month.

  4. Networking

    API Gateway: Serves HTTP endpoints to receive and route events. For high-volume access, the estimated monthly cost is $35.

    Data Transfer: Covers outbound data (egress) from AWS. Estimated cost is $65/month, assuming standard internet traffic volumes.

Total Monthly Estimate: US$955/month

Cost Optimization Strategies:

  1. Reserved Instances: 40% savings on SageMaker
# Purchase Reserved Instance for SageMaker
aws sagemaker put-reserved-capacity \
  --reserved-capacity-offering-id "offering-123" \
  --instance-count 2
Enter fullscreen mode Exit fullscreen mode
  1. DynamoDB On-Demand: Savings for variable workloads
BillingMode: PAY_PER_REQUEST  # vs PROVISIONED
Enter fullscreen mode Exit fullscreen mode
  1. S3 Intelligent Tiering: Automatic storage cost savings
StorageClass: INTELLIGENT_TIERING
Enter fullscreen mode Exit fullscreen mode
  1. Lambda Provisioned Concurrency: Optimization for cold starts
ProvisionedConcurrency: 10  # For critical functions only
Enter fullscreen mode Exit fullscreen mode

Cost Monitoring

Budget alerts

MonthlyBudget:
  Amount: 1000
  Unit: USD
  Alerts:
    - Threshold: 80%
      Type: ACTUAL
    - Threshold: 100%
      Type: FORECASTED
Enter fullscreen mode Exit fullscreen mode

Cost Anomaly Detection:

def detect_cost_anomalies():
    """Detect anomalies in AWS costs"""
    ce = boto3.client("ce")

    response = ce.get_anomalies(
        DateInterval={
            "StartDate": "2025-07-01",
            "EndDate": "2025-07-24"
        }
    )

    for anomaly in response["Anomalies"]:
        if anomaly["Impact"]["TotalImpact"] > 100:  # Trigger alert
            send_cost_alert(anomaly)
Enter fullscreen mode Exit fullscreen mode

Advanced Security Features

Encryption Everywhere

Data Classification:

Sensitive Data:
  - User IDs: Pseudonymized
  - IP Addresses: Hashed
  - Transaction Amounts: Encrypted
  - Geographic Data: Generalized

Public Data:
  - Aggregate Statistics
  - System Performance Metrics
  - Non-PII Event Metadata
Enter fullscreen mode Exit fullscreen mode

Key Management:

KMS Keys:
  Application Key:
    Description: "Anomaly Guardian Application Encryption"
    KeyRotation: Enabled
    Aliases: ["alias/anomaly-guardian-app"]

  Database Key:
    Description: "DynamoDB Encryption Key"
    KeyRotation: Enabled
    Aliases: ["alias/anomaly-guardian-db"]
Enter fullscreen mode Exit fullscreen mode

Network Security

VPC Configuration:

Network Architecture:
  VPC: 10.0.0.0/16

Public Subnets:
  - 10.0.101.0/24 (AZ-1)
  - 10.0.102.0/24 (AZ-2)

Private Subnets:
  - 10.0.1.0/24 (AZ-1)
  - 10.0.2.0/24 (AZ-2)

Database Subnets:
  - 10.0.201.0/24 (AZ-1)
  - 10.0.202.0/24 (AZ-2)
Enter fullscreen mode Exit fullscreen mode

Security Groups:

Lambda Security Group:
  Ingress:
    - HTTPS (443) from anywhere
  Egress:
    - DynamoDB VPC endpoint
    - Elasticsearch cluster

API Gateway Security Group:
  Ingress:
    - HTTPS (443) from anywhere
  Egress: None (managed service)

Elasticsearch Security Group:
  Ingress:
    - Port 443 from Lambda SG
    - Port 9200 from Lambda SG
  Egress: None
Enter fullscreen mode Exit fullscreen mode

WAF Protection:

WAF Rules:
  - Name: "AWSManagedRulesCommonRuleSet"
    Priority: 1
    Statement:
      ManagedRuleGroupStatement:
        VendorName: "AWS"
        Name: "AWSManagedRulesCommonRuleSet"

  - Name: "RateLimitRule"
    Priority: 2
    Statement:
      RateBasedStatement:
        Limit: 2000
        AggregateKeyType: "IP"
Enter fullscreen mode Exit fullscreen mode

When dealing with highly complex environments and high data volumes, the response time between identifying an anomaly and taking action is crucial. More than just a functional architecture, what is presented here is an approach focused on operational reliability, processing scalability, and cost control.

Each component has been integrated with a purpose: to accurately detect deviations, trigger automated responses, and maintain active governance at all levels from security to resource allocation. This type of solution becomes truly effective when aligned with the organization's technical routine and business objectives. It is not just about technology, but about delivering predictability in scenarios where failures are costly.

Thank you, see you next time.

Top comments (2)

Collapse
 
jasondunn profile image
Jason Dunn [AWS] AWS Community Builders

Wow, this is so detailed! Excellent work.

Collapse
 
carlosfilho profile image
Carlos Filho AWS Community Builders • Edited

Thank you a lot, @jasondunn ! The primary reason is to expand my capabilities and explore new solutions using AWS.