DEV Community

Cover image for Secure ML on AWS : Building Production Data Pipelines with S3 and Lambda
Shoaibali Mir
Shoaibali Mir

Posted on

Secure ML on AWS : Building Production Data Pipelines with S3 and Lambda

Reading time: ~20-25 minutes

Level: Intermediate

Prerequisites: Basic AWS knowledge, familiarity with S3 and Lambda

Series: Part 2 of 4 - Read Part 1

Objective: This blog focuses on teaching core concepts and best practices for building data pipelines on AWS. The code examples are functional for learning purposes. Production deployments require additional hardening, testing, and organizational-specific configurations.


Welcome Back!

In Part 1, we introduced the complete AI/ML Development Lifecycle (AIDLC) framework. This series implements the technical phases:

Complete AIDLC → Series Mapping:

Part 2:

  • Phase 1: Data Collection & Preparation
  • Phase 6: Governance (CloudTrail, KMS, IAM)

Part 3:

  • Phase 2: Model Development & Training
  • Phase 3: Model Evaluation (validation metrics)
  • Phase 6: Governance (experiment tracking, versioning)

Part 4:

  • Phase 4: Model Deployment (SageMaker endpoints)
  • Phase 5: Monitoring & Maintenance (drift detection)
  • Phase 6: Governance (CI/CD, audit logs)

Key Insight: Phase 6 (Governance & Compliance) isn't a separate step—it's embedded into every phase through security practices, audit logging, and compliance controls.

What you'll build today:

  • Encrypted S3 data lake with proper access controls
  • Automated data validation pipeline with Lambda
  • Data quality monitoring and alerting
  • Complete audit trail with CloudTrail
  • Infrastructure as Code with Terraform
  • Data splitting for ML training

By the end: You'll have a functional data pipeline that demonstrates security and quality best practices.


Table of Contents

  1. The Data Pipeline Problem
  2. Architecture Overview
  3. Step 1: Setting Up Encrypted S3 Buckets
  4. Step 2: Data Validation Lambda Function
  5. Step 3: Lambda Infrastructure
  6. Step 4: CloudTrail for Audit Logging
  7. Step 5: CloudWatch Monitoring
  8. Step 6: Testing the Pipeline
  9. Step 6.5: Preparing Data for Training
  10. Step 7: Advanced Features
  11. Step 7.5: Testing Best Practices
  12. Important Notes Before Deploying
  13. Security Best Practices Checklist
  14. Cost Breakdown
  15. Troubleshooting Guide
  16. What's Next?
  17. Key Takeaways

The Data Pipeline Problem

You can't build reliable ML models on unreliable data.

Common issues include:

Inconsistent data formats - Training fails due to schema changes

Missing security controls - Sensitive data exposed

No data validation - Bad data silently corrupts models

Manual processes - Doesn't scale, error-prone

No audit trail - Can't track data lineage

The solution:

An automated, secure, validated data pipeline.


Architecture Overview

Here's the AIDLC Phase 1 data pipeline architecture:

Data Pipeline

Architecture Note: This implements the "Data Collection & Preparation" phase of the AIDLC framework, establishing the secure foundation for model training (Phase 2) and deployment (Phase 3).

AWS Services Used:

  • S3: Encrypted data storage with versioning
  • Lambda: Serverless data validation
  • KMS: Encryption key management
  • CloudTrail: Audit logging
  • CloudWatch: Monitoring and alerting
  • SNS: Notifications
  • Terraform: Infrastructure as Code

Step 1: Setting Up Encrypted S3 Buckets

Why Three Buckets?

Separation of concerns improves security and organization:

  1. Raw Data Bucket - Unvalidated data from sources
  2. Validated Data Bucket - Quality-checked, ready for training
  3. Model Artifacts Bucket - Trained models and metadata

Note: The complete S3 configuration is shown below. For production deployments, consider splitting configuration into modules for better organization and reusability.

Infrastructure Code

Create terraform/s3-buckets.tf:

# KMS key for encryption
resource "aws_kms_key" "data_encryption" {
  description             = "Encryption key for ML data pipeline"
  deletion_window_in_days = 10
  enable_key_rotation     = true

  tags = {
    Name        = "ml-data-encryption-key"
    Environment = var.environment
    Purpose     = "ML-DataPipeline"
  }
}

resource "aws_kms_alias" "data_encryption" {
  name          = "alias/ml-data-encryption"
  target_key_id = aws_kms_key.data_encryption.key_id
}

# Raw data bucket
resource "aws_s3_bucket" "raw_data" {
  bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}"

  tags = {
    Name        = "ML Raw Data"
    Environment = var.environment
    DataStage   = "Raw"
  }
}

# Enable versioning
resource "aws_s3_bucket_versioning" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  versioning_configuration {
    status = "Enabled"
  }
}

# Enable encryption
resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.data_encryption.arn
    }
    bucket_key_enabled = true
  }
}

# Block all public access
resource "aws_s3_bucket_public_access_block" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# Enforce encryption and secure transport
resource "aws_s3_bucket_policy" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "DenyUnencryptedObjectUploads"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:PutObject"
        Resource = "${aws_s3_bucket.raw_data.arn}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "aws:kms"
          }
        }
      },
      {
        Sid    = "DenyInsecureTransport"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:*"
        Resource = [
          aws_s3_bucket.raw_data.arn,
          "${aws_s3_bucket.raw_data.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      }
    ]
  })
}

# Lifecycle policy to manage old versions
resource "aws_s3_bucket_lifecycle_configuration" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  rule {
    id     = "delete-old-versions"
    status = "Enabled"

    noncurrent_version_expiration {
      noncurrent_days = 90
    }
  }

  rule {
    id     = "transition-to-glacier"
    status = "Enabled"

    transition {
      days          = 30
      storage_class = "GLACIER"
    }
  }
}

# Validated data bucket
resource "aws_s3_bucket" "validated_data" {
  bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}"

  tags = {
    Name        = "ML Validated Data"
    Environment = var.environment
    DataStage   = "Validated"
  }
}

resource "aws_s3_bucket_versioning" "validated_data" {
  bucket = aws_s3_bucket.validated_data.id

  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" {
  bucket = aws_s3_bucket.validated_data.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.data_encryption.arn
    }
    bucket_key_enabled = true
  }
}

resource "aws_s3_bucket_public_access_block" "validated_data" {
  bucket = aws_s3_bucket.validated_data.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# Apply same security policies to validated bucket
resource "aws_s3_bucket_policy" "validated_data" {
  bucket = aws_s3_bucket.validated_data.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "DenyUnencryptedObjectUploads"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:PutObject"
        Resource = "${aws_s3_bucket.validated_data.arn}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "aws:kms"
          }
        }
      },
      {
        Sid    = "DenyInsecureTransport"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:*"
        Resource = [
          aws_s3_bucket.validated_data.arn,
          "${aws_s3_bucket.validated_data.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      }
    ]
  })
}

# Model artifacts bucket
resource "aws_s3_bucket" "model_artifacts" {
  bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}"

  tags = {
    Name        = "ML Model Artifacts"
    Environment = var.environment
    DataStage   = "Models"
  }
}

resource "aws_s3_bucket_versioning" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id

  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.data_encryption.arn
    }
    bucket_key_enabled = true
  }
}

resource "aws_s3_bucket_public_access_block" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_policy" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "DenyUnencryptedObjectUploads"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:PutObject"
        Resource = "${aws_s3_bucket.model_artifacts.arn}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "aws:kms"
          }
        }
      },
      {
        Sid    = "DenyInsecureTransport"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:*"
        Resource = [
          aws_s3_bucket.model_artifacts.arn,
          "${aws_s3_bucket.model_artifacts.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      }
    ]
  })
}
Enter fullscreen mode Exit fullscreen mode

Variables File

Create terraform/variables.tf:

variable "project_name" {
  description = "Project name prefix"
  type        = string
  default     = "ml-pipeline"
}

variable "environment" {
  description = "Environment (dev, staging, prod)"
  type        = string
  default     = "dev"
}

variable "aws_region" {
  description = "AWS region"
  type        = string
  default     = "ap-south-1"
}

variable "notification_email" {
  description = "Email for SNS notifications"
  type        = string
}

data "aws_caller_identity" "current" {}
Enter fullscreen mode Exit fullscreen mode

Provider Configuration

Create terraform/providers.tf:

terraform {
  required_version = ">= 1.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }

  # For production, use remote state
  # backend "s3" {
  #   bucket         = "your-terraform-state-bucket"
  #   key            = "ml-pipeline/terraform.tfstate"
  #   region         = "ap-south-1"
  #   dynamodb_table = "terraform-state-lock"
  #   encrypt        = true
  # }
}

provider "aws" {
  region = var.aws_region

  default_tags {
    tags = {
      Project    = var.project_name
      ManagedBy  = "Terraform"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Deploy the Infrastructure

cd terraform

# Initialize Terraform
terraform init

# Review the plan
terraform plan -var="notification_email=your-email@example.com"

# Apply the configuration
terraform apply -var="notification_email=your-email@example.com"
Enter fullscreen mode Exit fullscreen mode

Step 2: Data Validation Lambda Function

Why Validate Data?

Prevent garbage in, garbage out:

  • Catch schema changes early
  • Detect data quality issues
  • Ensure consistency
  • Create audit trail

Important: After deploying, check your email and confirm the SNS subscription to receive notifications.

Validation Logic

Create lambda/data-validation/handler.py:

import json
import os
import boto3
import logging
import pandas as pd
from io import BytesIO
import hashlib
from datetime import datetime

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# AWS clients
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')

# Validation rules
VALIDATION_RULES = {
    'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'],
    'numeric_columns': ['feature_1', 'feature_2', 'target'],
    'max_null_percentage': 0.05,  # 5% max
    'min_rows': 5,  # Adjusted for testing - use 100+ for production
    'max_rows': 1000000,
    'date_columns': ['timestamp']
}

def calculate_checksum(data: bytes) -> str:
    """Calculate SHA256 checksum"""
    return hashlib.sha256(data).hexdigest()

def validate_schema(df: pd.DataFrame) -> dict:
    """Validate DataFrame schema"""
    issues = []

    # Check required columns
    missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns)
    if missing_cols:
        issues.append(f"Missing columns: {missing_cols}")

    # Check numeric columns
    for col in VALIDATION_RULES['numeric_columns']:
        if col in df.columns:
            if not pd.api.types.is_numeric_dtype(df[col]):
                issues.append(f"Column '{col}' should be numeric")

    # Check date columns
    for col in VALIDATION_RULES['date_columns']:
        if col in df.columns:
            try:
                # Actually convert to catch invalid dates
                df[col] = pd.to_datetime(df[col])
            except Exception as e:
                issues.append(f"Column '{col}' has invalid datetime values: {str(e)}")

    return {
        'valid': len(issues) == 0,
        'issues': issues
    }

def validate_data_quality(df: pd.DataFrame) -> dict:
    """Validate data quality"""
    issues = []

    # Check row count
    if len(df) < VALIDATION_RULES['min_rows']:
        issues.append(
            f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}"
        )
    elif len(df) > VALIDATION_RULES['max_rows']:
        issues.append(
            f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}"
        )

    # Check null values
    for col in df.columns:
        null_pct = df[col].isnull().sum() / len(df)
        if null_pct > VALIDATION_RULES['max_null_percentage']:
            issues.append(
                f"Column '{col}' has {null_pct:.2%} nulls "
                f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})"
            )

    # Check duplicates
    duplicate_count = df.duplicated().sum()
    if duplicate_count > 0:
        issues.append(f"Found {duplicate_count} duplicate rows")

    # Data distribution checks
    stats = {}
    for col in VALIDATION_RULES['numeric_columns']:
        if col in df.columns:
            stats[col] = {
                'mean': float(df[col].mean()),
                'std': float(df[col].std()),
                'min': float(df[col].min()),
                'max': float(df[col].max()),
                'null_count': int(df[col].isnull().sum())
            }

    return {
        'valid': len(issues) == 0,
        'issues': issues,
        'stats': {
            'row_count': len(df),
            'column_count': len(df.columns),
            'duplicate_count': duplicate_count,
            'column_stats': stats
        }
    }

def send_notification(topic_arn: str, subject: str, message: str):
    """Send SNS notification"""
    try:
        sns_client.publish(
            TopicArn=topic_arn,
            Subject=subject,
            Message=message
        )
        logger.info(f"Sent notification: {subject}")
    except Exception as e:
        logger.error(f"Failed to send notification: {e}")

def lambda_handler(event, context):
    """
    Lambda handler triggered by S3 upload
    """
    try:
        # Parse S3 event
        record = event['Records'][0]
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        logger.info(f"Processing: s3://{bucket}/{key}")

        # Download file
        response = s3_client.get_object(Bucket=bucket, Key=key)
        file_content = response['Body'].read()

        # Calculate checksum
        checksum = calculate_checksum(file_content)

        # Load data
        df = pd.read_csv(BytesIO(file_content))
        logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")

        # Run validations
        schema_result = validate_schema(df)
        quality_result = validate_data_quality(df)

        # Compile results
        validation_result = {
            'file': f"s3://{bucket}/{key}",
            'timestamp': datetime.utcnow().isoformat(),
            'checksum': checksum,
            'schema_validation': schema_result,
            'quality_validation': quality_result,
            'status': (
                'PASSED' if schema_result['valid'] and quality_result['valid']
                else 'FAILED'
            )
        }

        logger.info(f"Validation status: {validation_result['status']}")

        # Save validation report
        report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json')
        s3_client.put_object(
            Bucket=bucket,
            Key=report_key,
            Body=json.dumps(validation_result, indent=2),
            ServerSideEncryption='aws:kms',
            SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
        )

        # If passed, copy to validated bucket
        if validation_result['status'] == 'PASSED':
            validated_bucket = bucket.replace('raw-data', 'validated-data')
            validated_key = key.replace('raw/', 'validated/')

            # Explicitly set encryption on copy
            s3_client.copy_object(
                CopySource={'Bucket': bucket, 'Key': key},
                Bucket=validated_bucket,
                Key=validated_key,
                ServerSideEncryption='aws:kms',
                SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
            )

            logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}")

            # Send success notification
            send_notification(
                os.environ['SNS_TOPIC_ARN'],
                f'Data Validation Passed: {key}',
                f"File validated successfully\n\n"
                f"Stats:\n"
                f"- Rows: {quality_result['stats']['row_count']}\n"
                f"- Columns: {quality_result['stats']['column_count']}\n"
                f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n"
                f"Validation report: s3://{bucket}/{report_key}"
            )
        else:
            # Send failure notification
            all_issues = schema_result['issues'] + quality_result['issues']
            send_notification(
                os.environ['SNS_TOPIC_ARN'],
                f'Data Validation Failed: {key}',
                f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues)
            )

        return {
            'statusCode': 200,
            'body': json.dumps(validation_result)
        }

    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)

        send_notification(
            os.environ.get('SNS_TOPIC_ARN', ''),
            f'Data Validation Error',
            f"Error processing {key}:\n{str(e)}"
        )

        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }
Enter fullscreen mode Exit fullscreen mode

Lambda Dependencies

Create lambda/data-validation/requirements.txt:

pandas==2.1.0
boto3==1.28.85
scikit-learn==1.3.0
Enter fullscreen mode Exit fullscreen mode

Package Lambda Function

Important Note on Lambda Packaging: The pandas library includes platform-specific C extensions. Building on macOS or Windows will create a package incompatible with Lambda's Amazon Linux 2 runtime. Use one of these approaches:

Option 1: Docker Build (Recommended)

cd lambda/data-validation

# Build using Docker with Lambda runtime
docker run --rm -v "$PWD":/var/task public.ecr.aws/lambda/python:3.11 \
  bash -c "pip install -r requirements.txt -t package/ && cp handler.py package/"

# Create deployment package
cd package
zip -r ../function.zip .
cd ..
Enter fullscreen mode Exit fullscreen mode

Option 2: Use Lambda Layer

# Skip pandas installation and use AWS-managed layer
# Update terraform/lambda.tf to include:
# layers = ["arn:aws:lambda:ap-south-1:336392948345:layer:AWSSDKPandas-Python311:12"]
Enter fullscreen mode Exit fullscreen mode

Option 3: EC2/Cloud9 Build

# Build on an Amazon Linux 2 instance
pip install -r requirements.txt -t package/
cp handler.py package/
cd package && zip -r ../function.zip . && cd ..
Enter fullscreen mode Exit fullscreen mode

Step 3: Lambda Infrastructure

Create terraform/lambda.tf:

# IAM role for Lambda
resource "aws_iam_role" "data_validation" {
  name = "${var.project_name}-data-validation-lambda"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

# IAM policy for Lambda
resource "aws_iam_role_policy" "data_validation" {
  name = "${var.project_name}-data-validation-policy"
  role = aws_iam_role.data_validation.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject"
        ]
        Resource = [
          "${aws_s3_bucket.raw_data.arn}/*",
          "${aws_s3_bucket.validated_data.arn}/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "s3:ListBucket"
        ]
        Resource = [
          aws_s3_bucket.raw_data.arn,
          aws_s3_bucket.validated_data.arn
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "kms:Decrypt",
          "kms:GenerateDataKey"
        ]
        Resource = aws_kms_key.data_encryption.arn
      },
      {
        Effect = "Allow"
        Action = "sns:Publish"
        Resource = aws_sns_topic.validation_notifications.arn
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

# SNS topic for notifications
resource "aws_sns_topic" "validation_notifications" {
  name = "${var.project_name}-validation-notifications"

  kms_master_key_id = aws_kms_key.data_encryption.id
}

resource "aws_sns_topic_subscription" "email" {
  topic_arn = aws_sns_topic.validation_notifications.arn
  protocol  = "email"
  endpoint  = var.notification_email
}

# Lambda function
resource "aws_lambda_function" "data_validation" {
  filename         = "${path.module}/../lambda/data-validation/function.zip"
  function_name    = "${var.project_name}-data-validation"
  role            = aws_iam_role.data_validation.arn
  handler         = "handler.lambda_handler"
  runtime         = "python3.11"
  timeout         = 300
  memory_size     = 1024
  source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip")

  environment {
    variables = {
      SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn
      KMS_KEY_ID    = aws_kms_key.data_encryption.id
    }
  }

  # Optional: Use AWS-managed pandas layer instead of packaging it
  # layers = ["arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:12"]

  tags = {
    Name        = "Data Validation Lambda"
    Environment = var.environment
  }
}

# S3 trigger
resource "aws_s3_bucket_notification" "raw_data_trigger" {
  bucket = aws_s3_bucket.raw_data.id

  lambda_function {
    lambda_function_arn = aws_lambda_function.data_validation.arn
    events              = ["s3:ObjectCreated:*"]
    filter_prefix       = "raw/"
    filter_suffix       = ".csv"
  }

  depends_on = [
    aws_lambda_permission.allow_s3,
    aws_lambda_function.data_validation
  ]
}

resource "aws_lambda_permission" "allow_s3" {
  statement_id  = "AllowS3Invoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.data_validation.function_name
  principal     = "s3.amazonaws.com"
  source_arn    = aws_s3_bucket.raw_data.arn
}

# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "data_validation" {
  name              = "/aws/lambda/${aws_lambda_function.data_validation.function_name}"
  retention_in_days = 30
  kms_key_id        = aws_kms_key.data_encryption.arn
}
Enter fullscreen mode Exit fullscreen mode

Step 4: CloudTrail for Audit Logging

Create terraform/cloudtrail.tf:

# CloudTrail logs bucket
resource "aws_s3_bucket" "cloudtrail_logs" {
  bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}"
}

resource "aws_s3_bucket_versioning" "cloudtrail_logs" {
  bucket = aws_s3_bucket.cloudtrail_logs.id

  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "cloudtrail_logs" {
  bucket = aws_s3_bucket.cloudtrail_logs.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" {
  bucket = aws_s3_bucket.cloudtrail_logs.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_policy" "cloudtrail_logs" {
  bucket = aws_s3_bucket.cloudtrail_logs.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "AWSCloudTrailAclCheck"
        Effect = "Allow"
        Principal = {
          Service = "cloudtrail.amazonaws.com"
        }
        Action   = "s3:GetBucketAcl"
        Resource = aws_s3_bucket.cloudtrail_logs.arn
      },
      {
        Sid    = "AWSCloudTrailWrite"
        Effect = "Allow"
        Principal = {
          Service = "cloudtrail.amazonaws.com"
        }
        Action   = "s3:PutObject"
        Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*"
        Condition = {
          StringEquals = {
            "s3:x-amz-acl" = "bucket-owner-full-control"
          }
        }
      },
      {
        Sid    = "DenyInsecureTransport"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:*"
        Resource = [
          aws_s3_bucket.cloudtrail_logs.arn,
          "${aws_s3_bucket.cloudtrail_logs.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      }
    ]
  })
}

# CloudTrail
resource "aws_cloudtrail" "data_events" {
  name                          = "${var.project_name}-data-trail"
  s3_bucket_name                = aws_s3_bucket.cloudtrail_logs.id
  include_global_service_events = true
  is_multi_region_trail         = true
  enable_logging                = true
  enable_log_file_validation    = true

  depends_on = [aws_s3_bucket_policy.cloudtrail_logs]

  event_selector {
    read_write_type           = "All"
    include_management_events = true

    data_resource {
      type = "AWS::S3::Object"
      values = [
        "${aws_s3_bucket.raw_data.arn}/*",
        "${aws_s3_bucket.validated_data.arn}/*",
        "${aws_s3_bucket.model_artifacts.arn}/*"
      ]
    }
  }

  tags = {
    Name        = "ML Data Trail"
    Environment = var.environment
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 5: CloudWatch Monitoring

Create terraform/monitoring.tf:

# CloudWatch dashboard
resource "aws_cloudwatch_dashboard" "data_pipeline" {
  dashboard_name = "${var.project_name}-data-pipeline"

  dashboard_body = jsonencode({
    widgets = [
      {
        type   = "metric"
        x      = 0
        y      = 0
        width  = 12
        height = 6
        properties = {
          metrics = [
            ["AWS/Lambda", "Invocations", {
              stat = "Sum"
              label = "Lambda Invocations"
              dimensions = {
                FunctionName = aws_lambda_function.data_validation.function_name
              }
            }],
            [".", "Errors", {
              stat = "Sum"
              label = "Lambda Errors"
              dimensions = {
                FunctionName = aws_lambda_function.data_validation.function_name
              }
            }],
            [".", "Duration", {
              stat = "Average"
              label = "Avg Duration (ms)"
              dimensions = {
                FunctionName = aws_lambda_function.data_validation.function_name
              }
            }]
          ]
          period  = 300
          stat    = "Average"
          region  = var.aws_region
          title   = "Lambda Metrics"
          view    = "timeSeries"
          stacked = false
        }
      }
    ]
  })
}

# Alarms
resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
  alarm_name          = "${var.project_name}-lambda-errors"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "Errors"
  namespace           = "AWS/Lambda"
  period              = "300"
  statistic           = "Sum"
  threshold           = "5"
  alarm_description   = "Lambda function errors"
  alarm_actions       = [aws_sns_topic.validation_notifications.arn]

  dimensions = {
    FunctionName = aws_lambda_function.data_validation.function_name
  }
}

resource "aws_cloudwatch_metric_alarm" "lambda_throttles" {
  alarm_name          = "${var.project_name}-lambda-throttles"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "Throttles"
  namespace           = "AWS/Lambda"
  period              = "300"
  statistic           = "Sum"
  threshold           = "10"
  alarm_description   = "Lambda function throttling"
  alarm_actions       = [aws_sns_topic.validation_notifications.arn]

  dimensions = {
    FunctionName = aws_lambda_function.data_validation.function_name
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 6: Testing the Pipeline

1. Create Test Data

Create test-data/sample.csv:

timestamp,feature_1,feature_2,target
2024-01-01T00:00:00,1.5,2.3,0
2024-01-01T01:00:00,1.8,2.1,1
2024-01-01T02:00:00,1.2,2.5,0
2024-01-01T03:00:00,1.9,2.0,1
2024-01-01T04:00:00,1.4,2.4,0
Enter fullscreen mode Exit fullscreen mode

2. Upload to S3

# Get your AWS account ID
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

# Upload test file
aws s3 cp test-data/sample.csv \
  s3://ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}/raw/sample.csv \
  --server-side-encryption aws:kms
Enter fullscreen mode Exit fullscreen mode

3. Monitor Lambda Execution

# View Lambda logs in real-time
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow

# Or check specific log streams
aws logs describe-log-streams \
  --log-group-name /aws/lambda/ml-pipeline-data-validation \
  --order-by LastEventTime \
  --descending \
  --max-items 1
Enter fullscreen mode Exit fullscreen mode

4. Verify Validated Data

# Check validated bucket
aws s3 ls s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/

# Download validation report
aws s3 cp \
  s3://ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}/reports/sample_report.json \
  ./validation-report.json

# View report
cat validation-report.json | python -m json.tool
Enter fullscreen mode Exit fullscreen mode

5. Check Email Notification

You should receive an email notification with validation results. If not:

  • Confirm you accepted the SNS subscription
  • Check CloudWatch logs for errors
  • Verify SNS topic permissions

Step 6.5: Preparing Data for Training

Before moving to Part 3 (model training), you'll need to split your validated data into training and validation sets. Here are two approaches:

Option 1: Add Split Logic to Lambda

This automatically splits data during validation. Update lambda/data-validation/handler.py:

# Add this import at the top
from sklearn.model_selection import train_test_split

# Add this function after send_notification()
def split_and_save_data(df, bucket, key, kms_key_id):
    """Split data into train/val sets (80/20)"""
    try:
        # Split with stratification if possible
        if 'target' in df.columns:
            train_df, val_df = train_test_split(
                df, 
                test_size=0.2, 
                random_state=42,
                stratify=df['target']
            )
        else:
            train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

        logger.info(f"Split: {len(train_df)} train, {len(val_df)} val samples")

        # Save training data
        train_key = key.replace('validated/', 'validated/train/')
        train_csv = train_df.to_csv(index=False)
        s3_client.put_object(
            Bucket=bucket,
            Key=train_key,
            Body=train_csv,
            ServerSideEncryption='aws:kms',
            SSEKMSKeyId=kms_key_id
        )

        # Save validation data
        val_key = key.replace('validated/', 'validated/val/')
        val_csv = val_df.to_csv(index=False)
        s3_client.put_object(
            Bucket=bucket,
            Key=val_key,
            Body=val_csv,
            ServerSideEncryption='aws:kms',
            SSEKMSKeyId=kms_key_id
        )

        logger.info(f"Train: s3://{bucket}/{train_key}")
        logger.info(f"Val: s3://{bucket}/{val_key}")

        return train_key, val_key
    except Exception as e:
        logger.error(f"Failed to split data: {e}")
        return None, None

# In lambda_handler, after copying to validated bucket, add:
if validation_result['status'] == 'PASSED':
    # ... existing copy_object code ...

    # Split data for training
    train_key, val_key = split_and_save_data(
        df, 
        validated_bucket, 
        validated_key,
        os.environ.get('KMS_KEY_ID')
    )

    if train_key and val_key:
        logger.info(f"Data split successful")
Enter fullscreen mode Exit fullscreen mode

Then rebuild Lambda package:

cd lambda/data-validation

# Docker build with updated requirements
docker run --rm -v "$PWD":/var/task public.ecr.aws/lambda/python:3.11 \
  bash -c "pip install -r requirements.txt -t package/ && cp handler.py package/"

cd package && zip -r ../function.zip . && cd ..

# Redeploy
cd ../../terraform
terraform apply -var="notification_email=your-email@example.com"
Enter fullscreen mode Exit fullscreen mode

Option 2: Manual Split Script

For more control or one-time splits, use this standalone script.

Create scripts/split_data.py:

#!/usr/bin/env python3
"""
Split validated data for ML training
"""
import boto3
import pandas as pd
from sklearn.model_selection import train_test_split
import sys
import argparse

def split_s3_data(bucket, input_key, output_prefix='validated', test_size=0.2):
    """
    Split S3 data into train/val sets

    Args:
        bucket: S3 bucket name
        input_key: S3 key of validated file
        output_prefix: Prefix for output files
        test_size: Fraction for validation set (default 0.2 = 20%)
    """
    s3 = boto3.client('s3')

    # Download
    print(f"Downloading s3://{bucket}/{input_key}")
    obj = s3.get_object(Bucket=bucket, Key=input_key)
    df = pd.read_csv(obj['Body'])
    print(f"   Loaded {len(df)} rows, {len(df.columns)} columns")

    # Split with stratification if target column exists
    try:
        if 'target' in df.columns:
            train_df, val_df = train_test_split(
                df, 
                test_size=test_size, 
                random_state=42,
                stratify=df['target']
            )
            print(f"   Split with stratification on 'target' column")
        else:
            train_df, val_df = train_test_split(df, test_size=test_size, random_state=42)
    except Exception as e:
        print(f"   Stratification failed ({e}), using simple split")
        train_df, val_df = train_test_split(df, test_size=test_size, random_state=42)

    print(f" Split complete: {len(train_df)} train, {len(val_df)} val")

    # Upload training data
    train_key = f"{output_prefix}/train/{input_key.split('/')[-1]}"
    print(f"Uploading training data...")
    s3.put_object(
        Bucket=bucket,
        Key=train_key,
        Body=train_df.to_csv(index=False),
        ServerSideEncryption='aws:kms'
    )
    print(f"    s3://{bucket}/{train_key}")

    # Upload validation data
    val_key = f"{output_prefix}/val/{input_key.split('/')[-1]}"
    print(f"Uploading validation data...")
    s3.put_object(
        Bucket=bucket,
        Key=val_key,
        Body=val_df.to_csv(index=False),
        ServerSideEncryption='aws:kms'
    )
    print(f"    s3://{bucket}/{val_key}")

    print(f"\n Data split complete!")
    return train_key, val_key

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Split validated ML data')
    parser.add_argument('bucket', help='S3 bucket name')
    parser.add_argument('input_key', help='S3 key of validated file')
    parser.add_argument('--test-size', type=float, default=0.2, 
                       help='Validation set fraction (default: 0.2)')

    args = parser.parse_args()

    split_s3_data(args.bucket, args.input_key, test_size=args.test_size)
Enter fullscreen mode Exit fullscreen mode

Install dependencies and run:

pip install boto3 pandas scikit-learn

# Split your validated data
python scripts/split_data.py \
  ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID \
  validated/sample.csv

# Or with custom test size (30% validation)
python scripts/split_data.py \
  ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID \
  validated/sample.csv \
  --test-size 0.3
Enter fullscreen mode Exit fullscreen mode

Verify the Split

# Check training data
aws s3 ls s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/train/

# Check validation data
aws s3 ls s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/val/

# Download and inspect
aws s3 cp \
  s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/train/sample.csv \
  ./train-sample.csv

wc -l train-sample.csv  # Should show 80% of original rows
Enter fullscreen mode Exit fullscreen mode

Important for Part 3: SageMaker training jobs (covered in Part 3) expect data in s3://bucket/validated/train/ and s3://bucket/validated/val/ paths. Make sure to run one of these split methods before proceeding to model training.


Step 7: Advanced Features

Data Quality Metrics

Add custom CloudWatch metrics in Lambda to track data quality over time:

import boto3
from datetime import datetime

cloudwatch = boto3.client('cloudwatch')

def publish_metrics(validation_result, bucket):
    """Publish custom metrics to CloudWatch"""
    try:
        metrics = [
            {
                'MetricName': 'DataQualityScore',
                'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0,
                'Unit': 'Percent',
                'Timestamp': datetime.utcnow(),
                'Dimensions': [
                    {'Name': 'Bucket', 'Value': bucket},
                    {'Name': 'Pipeline', 'Value': 'ML-DataValidation'}
                ]
            },
            {
                'MetricName': 'RowCount',
                'Value': validation_result['quality_validation']['stats']['row_count'],
                'Unit': 'Count',
                'Timestamp': datetime.utcnow(),
                'Dimensions': [
                    {'Name': 'Bucket', 'Value': bucket},
                    {'Name': 'Pipeline', 'Value': 'ML-DataValidation'}
                ]
            },
            {
                'MetricName': 'DuplicateRows',
                'Value': validation_result['quality_validation']['stats']['duplicate_count'],
                'Unit': 'Count',
                'Timestamp': datetime.utcnow(),
                'Dimensions': [
                    {'Name': 'Bucket', 'Value': bucket},
                    {'Name': 'Pipeline', 'Value': 'ML-DataValidation'}
                ]
            },
            {
                'MetricName': 'NullPercentage',
                'Value': sum(
                    stats['null_count'] 
                    for stats in validation_result['quality_validation']['stats']['column_stats'].values()
                ) / validation_result['quality_validation']['stats']['row_count'] * 100,
                'Unit': 'Percent',
                'Timestamp': datetime.utcnow(),
                'Dimensions': [
                    {'Name': 'Bucket', 'Value': bucket},
                    {'Name': 'Pipeline', 'Value': 'ML-DataValidation'}
                ]
            }
        ]

        cloudwatch.put_metric_data(
            Namespace='MLPipeline/DataQuality',
            MetricData=metrics
        )
        logger.info("Published custom metrics to CloudWatch")
    except Exception as e:
        logger.error(f"Failed to publish metrics: {e}")

# Add to lambda_handler after saving validation report:
publish_metrics(validation_result, bucket)
Enter fullscreen mode Exit fullscreen mode

Data Lineage Tracking

Create comprehensive lineage metadata:

def create_data_lineage(bucket, key, checksum, validation_result):
    """Create data lineage metadata for compliance"""
    lineage = {
        'data_asset': {
            'source_file': key,
            'source_bucket': bucket,
            'checksum_sha256': checksum,
            's3_version_id': validation_result.get('version_id'),
        },
        'validation': {
            'timestamp': datetime.utcnow().isoformat(),
            'validator_version': '1.0',
            'validation_rules': VALIDATION_RULES,
            'validation_status': validation_result['status'],
            'quality_score': (
                100.0 if validation_result['status'] == 'PASSED' else 0.0
            )
        },
        'pipeline': {
            'pipeline_id': os.environ.get('PIPELINE_ID', 'ml-data-pipeline'),
            'pipeline_version': os.environ.get('PIPELINE_VERSION', '1.0'),
            'lambda_request_id': context.aws_request_id if context else None,
            'execution_timestamp': datetime.utcnow().isoformat()
        },
        'statistics': validation_result['quality_validation']['stats'],
        'downstream_assets': []
    }

    # If validation passed, record downstream locations
    if validation_result['status'] == 'PASSED':
        validated_bucket = bucket.replace('raw-data', 'validated-data')
        lineage['downstream_assets'].append({
            'type': 'validated_data',
            'location': f"s3://{validated_bucket}/{key.replace('raw/', 'validated/')}",
            'created_at': datetime.utcnow().isoformat()
        })

    # Store lineage
    lineage_key = f"lineage/{checksum}.json"
    s3_client.put_object(
        Bucket=bucket,
        Key=lineage_key,
        Body=json.dumps(lineage, indent=2),
        ServerSideEncryption='aws:kms',
        SSEKMSKeyId=os.environ.get('KMS_KEY_ID'),
        ContentType='application/json',
        Metadata={
            'data-classification': 'lineage',
            'pipeline-version': '1.0'
        }
    )

    logger.info(f"Lineage created: s3://{bucket}/{lineage_key}")
    return lineage_key

# Add to lambda_handler after saving validation report:
lineage_key = create_data_lineage(bucket, key, checksum, validation_result)
Enter fullscreen mode Exit fullscreen mode

Step 7.5: Testing Best Practices

Local Unit Tests

Before deploying Lambda changes, test validation logic locally. This catches bugs early and documents expected behavior.

Create lambda/data-validation/test_handler.py:

import pytest
import pandas as pd
from handler import validate_schema, validate_data_quality

def test_schema_validation_success():
    """Test valid schema passes"""
    df = pd.DataFrame({
        'timestamp': ['2024-01-01 00:00:00'],
        'feature_1': [1.5],
        'feature_2': [2.3],
        'target': [0]
    })

    result = validate_schema(df)
    assert result['valid'] == True
    assert len(result['issues']) == 0

def test_schema_validation_missing_column():
    """Test missing required column fails"""
    df = pd.DataFrame({
        'timestamp': ['2024-01-01 00:00:00'],
        'feature_1': [1.5],
        'target': [0]
    })

    result = validate_schema(df)
    assert result['valid'] == False
    assert 'feature_2' in str(result['issues'])

def test_schema_validation_wrong_type():
    """Test incorrect column type fails"""
    df = pd.DataFrame({
        'timestamp': ['2024-01-01 00:00:00'],
        'feature_1': ['not_a_number'],  # Should be numeric
        'feature_2': [2.3],
        'target': [0]
    })

    result = validate_schema(df)
    assert result['valid'] == False
    assert 'numeric' in str(result['issues']).lower()

def test_data_quality_null_detection():
    """Test null value detection"""
    df = pd.DataFrame({
        'timestamp': ['2024-01-01 00:00:00', None],
        'feature_1': [1.5, 1.8],
        'feature_2': [2.3, 2.1],
        'target': [0, 1]
    })

    result = validate_data_quality(df)
    assert result['stats']['column_stats']['timestamp']['null_count'] > 0

def test_data_quality_duplicate_detection():
    """Test duplicate row detection"""
    df = pd.DataFrame({
        'timestamp': ['2024-01-01 00:00:00', '2024-01-01 00:00:00'],
        'feature_1': [1.5, 1.5],
        'feature_2': [2.3, 2.3],
        'target': [0, 0]
    })

    result = validate_data_quality(df)
    assert result['stats']['duplicate_count'] > 0
    assert 'duplicate' in str(result['issues']).lower()

def test_data_quality_insufficient_rows():
    """Test minimum row requirement"""
    df = pd.DataFrame({
        'timestamp': ['2024-01-01 00:00:00'],
        'feature_1': [1.5],
        'feature_2': [2.3],
        'target': [0]
    })

    result = validate_data_quality(df)
    # With min_rows=5, this should fail
    assert result['valid'] == False
    assert 'Insufficient rows' in str(result['issues'])

def test_data_quality_excessive_nulls():
    """Test excessive null percentage fails"""
    df = pd.DataFrame({
        'timestamp': ['2024-01-01 00:00:00'] * 10,
        'feature_1': [1.5, None, None, None, None, None, None, None, None, None],
        'feature_2': [2.3] * 10,
        'target': [0] * 10
    })

    result = validate_data_quality(df)
    assert result['valid'] == False
    assert 'null' in str(result['issues']).lower()

def test_data_quality_statistics():
    """Test statistical calculations are correct"""
    df = pd.DataFrame({
        'timestamp': ['2024-01-01 00:00:00'] * 5,
        'feature_1': [1.0, 2.0, 3.0, 4.0, 5.0],
        'feature_2': [2.0, 2.0, 2.0, 2.0, 2.0],
        'target': [0, 1, 0, 1, 0]
    })

    result = validate_data_quality(df)

    # Check feature_1 statistics
    assert result['stats']['column_stats']['feature_1']['mean'] == 3.0
    assert result['stats']['column_stats']['feature_1']['min'] == 1.0
    assert result['stats']['column_stats']['feature_1']['max'] == 5.0

    # Check feature_2 statistics (all same value)
    assert result['stats']['column_stats']['feature_2']['std'] == 0.0
Enter fullscreen mode Exit fullscreen mode

Run tests locally:

cd lambda/data-validation

# Install test dependencies
pip install pytest pandas

# Run all tests
pytest test_handler.py -v

# Run specific test
pytest test_handler.py::test_schema_validation_success -v

# Run with coverage
pip install pytest-cov
pytest test_handler.py --cov=handler --cov-report=term-missing
Enter fullscreen mode Exit fullscreen mode

Expected output:

============================= test session starts ==============================
test_handler.py::test_schema_validation_success PASSED                  [ 12%]
test_handler.py::test_schema_validation_missing_column PASSED           [ 25%]
test_handler.py::test_schema_validation_wrong_type PASSED               [ 37%]
test_handler.py::test_data_quality_null_detection PASSED                [ 50%]
test_handler.py::test_data_quality_duplicate_detection PASSED           [ 62%]
test_handler.py::test_data_quality_insufficient_rows PASSED             [ 75%]
test_handler.py::test_data_quality_excessive_nulls PASSED               [ 87%]
test_handler.py::test_data_quality_statistics PASSED                    [100%]

============================== 8 passed in 0.24s ===============================
Enter fullscreen mode Exit fullscreen mode

Why test locally?

  • Faster feedback than deploying to AWS
  • Catch bugs before they hit production
  • Document expected behavior
  • Regression testing when updating validation rules
  • Save money on Lambda invocations during development

Integration Testing

After deployment, verify the complete pipeline end-to-end.

Create scripts/test-pipeline.sh:

#!/bin/bash
# Integration test for ML data pipeline

set -e

echo "Starting pipeline integration test..."
echo ""

# Configuration
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
BUCKET="ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}"
TEST_FILE="test-data/sample.csv"
TIMESTAMP=$(date +%s)

# Upload test file
echo " Uploading test data to S3..."
aws s3 cp ${TEST_FILE} s3://${BUCKET}/raw/test-${TIMESTAMP}.csv \
  --server-side-encryption aws:kms
echo "   Uploaded to s3://${BUCKET}/raw/test-${TIMESTAMP}.csv"
echo ""

# Wait for Lambda processing
echo "Waiting 15 seconds for Lambda to process..."
sleep 15
echo ""

# Check Lambda logs
echo "Recent Lambda logs:"
aws logs tail /aws/lambda/ml-pipeline-data-validation \
  --since 2m \
  --format short | tail -20
echo ""

# Verify validated data exists
echo "Checking validated data bucket..."
VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}"
aws s3 ls s3://${VALIDATED_BUCKET}/validated/ | tail -5
echo ""

# Check validation report
echo "Downloading validation report..."
aws s3 cp \
  s3://${BUCKET}/reports/test-${TIMESTAMP}_report.json \
  ./test-report.json
echo "   Report contents:"
cat test-report.json | python3 -m json.tool | head -30
echo ""

# Verify split data (if Option 1 was implemented)
echo "Checking for train/val split..."
TRAIN_COUNT=$(aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ | wc -l)
VAL_COUNT=$(aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ | wc -l)

if [ $TRAIN_COUNT -gt 0 ] && [ $VAL_COUNT -gt 0 ]; then
    echo "      Train/val split detected"
    echo "      Training files: $TRAIN_COUNT"
    echo "      Validation files: $VAL_COUNT"
else
    echo "    No train/val split found (run Option 1 or 2 from Step 6.5)"
fi
echo ""

# Check CloudWatch metrics
echo "Recent CloudWatch metrics:"
aws cloudwatch get-metric-statistics \
  --namespace MLPipeline/DataQuality \
  --metric-name DataQualityScore \
  --start-time $(date -u -d '10 minutes ago' +%Y-%m-%dT%H:%M:%S) \
  --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
  --period 300 \
  --statistics Average \
  --dimensions Name=Pipeline,Value=ML-DataValidation \
  --query 'Datapoints[0].Average' || echo "   No metrics yet (custom metrics take a few minutes)"
echo ""

echo "Pipeline integration test complete!"
echo ""
echo "Next steps:"
echo "1. Check your email for SNS notification"
echo "2. View CloudWatch dashboard: https://console.aws.amazon.com/cloudwatch/"
echo "3. Review validation report: ./test-report.json"
Enter fullscreen mode Exit fullscreen mode

Make executable and run:

chmod +x scripts/test-pipeline.sh
./scripts/test-pipeline.sh
Enter fullscreen mode Exit fullscreen mode

Continuous Integration Testing

For production deployments, integrate tests into CI/CD:

GitHub Actions example (.github/workflows/test.yml):

name: Test Data Pipeline

on:
  pull_request:
    paths:
      - 'lambda/data-validation/**'
      - 'terraform/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          cd lambda/data-validation
          pip install -r requirements.txt pytest pytest-cov

      - name: Run unit tests
        run: |
          cd lambda/data-validation
          pytest test_handler.py --cov=handler --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          file: ./lambda/data-validation/coverage.xml
Enter fullscreen mode Exit fullscreen mode

Important Notes Before Deploying

Known Considerations

1. Lambda Package Size and Compatibility

The pandas library (~100MB) has platform-specific dependencies. Building on macOS/Windows creates incompatible packages. Solutions:

  • Recommended: Use Docker build (shown in Step 2)
  • Alternative: Use AWS-managed Lambda Layer (uncomment in lambda.tf)
  • For CI/CD: Build in CodeBuild or GitHub Actions with Amazon Linux 2 image

2. Validation Rules

The sample uses min_rows: 5 for testing. Adjust for production:

VALIDATION_RULES = {
    'required_columns': ['your', 'columns', 'here'],
    'numeric_columns': ['numeric', 'columns'],
    'max_null_percentage': 0.05,  # Adjust based on requirements
    'min_rows': 100,  # Increase for production
    'max_rows': 10000000,
    'date_columns': ['timestamp']
}
Enter fullscreen mode Exit fullscreen mode

3. Terraform State Management

For production, use remote state with locking:

# In providers.tf
terraform {
  backend "s3" {
    bucket         = "your-terraform-state-bucket"
    key            = "ml-pipeline/terraform.tfstate"
    region         = "ap-south-1"
    dynamodb_table = "terraform-state-lock"
    encrypt        = true
  }
}
Enter fullscreen mode Exit fullscreen mode

4. Cost Monitoring

The estimated costs (~$9.50/month) assume minimal development usage. For production:

  • Enable AWS Cost Explorer
  • Set up AWS Budgets with email alerts
  • Monitor CloudWatch Logs storage (can grow quickly)
  • Review CloudTrail data event costs (especially with high S3 activity)

Actual CloudTrail costs for 10K Lambda invocations: ~$20-30/month (not $2 as initially estimated).

5. Network Isolation (Production Enhancement)

For maximum security, deploy Lambda in a VPC:

# In lambda.tf, add to aws_lambda_function:
vpc_config {
  subnet_ids         = var.private_subnet_ids
  security_group_ids = [aws_security_group.lambda.id]
}

# Requires NAT Gateway or VPC endpoints for S3/SNS access
Enter fullscreen mode Exit fullscreen mode

6. Production Patterns Not Implemented

This implementation focuses on core functionality for learning. For production, consider adding:

Idempotency:

# Track processed files to prevent duplicate processing
import hashlib

def is_already_processed(bucket, key):
    """Check if file already validated"""
    try:
        dynamodb = boto3.resource('dynamodb')
        table = dynamodb.Table('processed-files')

        # Use file hash as idempotency key
        file_hash = hashlib.md5(f"{bucket}/{key}".encode()).hexdigest()

        response = table.get_item(Key={'file_hash': file_hash})
        return 'Item' in response
    except:
        return False

# In lambda_handler:
if is_already_processed(bucket, key):
    logger.info(f"Already processed: {key}")
    return {'statusCode': 200, 'body': 'Already processed'}
Enter fullscreen mode Exit fullscreen mode

Dead Letter Queue (DLQ):

# In terraform/lambda.tf, add:
resource "aws_sqs_queue" "lambda_dlq" {
  name = "${var.project_name}-validation-dlq"

  kms_master_key_id = aws_kms_key.data_encryption.id

  message_retention_seconds = 1209600  # 14 days
}

# In aws_lambda_function.data_validation:
dead_letter_config {
  target_arn = aws_sqs_queue.lambda_dlq.arn
}
Enter fullscreen mode Exit fullscreen mode

Concurrent Execution Limits:

# In aws_lambda_function.data_validation:
reserved_concurrent_executions = 10  # Prevent runaway costs
Enter fullscreen mode Exit fullscreen mode

Enhanced Error Handling:

class ValidationError(Exception):
    """Custom validation exception"""
    pass

class DataQualityError(ValidationError):
    """Data quality check failed"""
    pass

class SchemaError(ValidationError):
    """Schema validation failed"""
    pass

# Use in lambda_handler for better error categorization
try:
    schema_result = validate_schema(df)
    if not schema_result['valid']:
        raise SchemaError(schema_result['issues'])

    quality_result = validate_data_quality(df)
    if not quality_result['valid']:
        raise DataQualityError(quality_result['issues'])
except SchemaError as e:
    # Handle schema errors specifically
    logger.error(f"Schema validation failed: {e}")
except DataQualityError as e:
    # Handle quality errors specifically
    logger.error(f"Quality validation failed: {e}")
Enter fullscreen mode Exit fullscreen mode

Security Best Practices Checklist

Encryption:

  • KMS encryption for all S3 buckets
  • Enforce encryption in bucket policies
  • TLS in transit (enforced by bucket policy)
  • Encrypted SNS topics
  • Encrypted CloudWatch logs

Access Control:

  • Least privilege IAM roles
  • No public bucket access
  • S3 bucket policies deny unencrypted uploads
  • S3:ListBucket permission for Lambda (enables proper error handling)

Audit & Compliance:

  • CloudTrail logging all data access
  • CloudWatch logs retention policy
  • SNS notifications for failures
  • Log file validation enabled

Data Quality:

  • Automated validation
  • Schema enforcement
  • Quality metrics tracking
  • Data checksums for integrity

Cost Breakdown

Estimated Monthly Costs (Development with ~1000 validations/month):

Service Usage Realistic Cost/Month
S3 Storage (100GB) Standard tier ~$2.30
Lambda 1K invocations, 1GB memory, 30s avg ~$0.10
CloudWatch Logs 10GB ingestion + storage ~$5.00
CloudTrail Data events (1K S3 operations) ~$2.00
SNS 100 notifications ~$0.01
Total ~$9.41

Production Scaling (100K validations/month):

Service Usage Cost/Month
S3 Storage (1TB) With lifecycle policies ~$15-20
Lambda 100K invocations ~$8-12
CloudWatch Logs 100GB ~$50
CloudTrail 100K S3 operations ~$200
Total ~$273-282

Cost Optimization Tips:

  1. Use S3 Intelligent-Tiering for automatic cost optimization
  2. Implement CloudWatch Logs retention policies (30 days recommended)
  3. Consider CloudTrail Insights instead of all data events for production
  4. Use Lambda reserved concurrency to prevent runaway costs
  5. Enable S3 lifecycle transitions to Glacier for archival

Troubleshooting Guide

Issue: Lambda timeout

# Symptoms: Functions timing out on large files
# Solutions:

# 1. Increase timeout in terraform/lambda.tf
timeout = 900  # 15 minutes (max)

# 2. Increase memory (also increases CPU)
memory_size = 2048  # Up to 10,240 MB

# 3. Process files in chunks for very large datasets
# 4. Monitor duration metric
aws cloudwatch get-metric-statistics \
  --namespace AWS/Lambda \
  --metric-name Duration \
  --dimensions Name=FunctionName,Value=ml-pipeline-data-validation \
  --start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
  --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
  --period 300 \
  --statistics Average,Maximum
Enter fullscreen mode Exit fullscreen mode

Issue: Permission denied errors

# Verify IAM permissions
aws iam get-role-policy \
  --role-name ml-pipeline-data-validation-lambda \
  --policy-name ml-pipeline-data-validation-policy

# Check KMS key policy allows Lambda role
aws kms describe-key --key-id alias/ml-data-encryption

# Verify S3 bucket policy
aws s3api get-bucket-policy \
  --bucket ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID

# Test Lambda can assume role
aws sts assume-role \
  --role-arn arn:aws:iam::YOUR_ACCOUNT_ID:role/ml-pipeline-data-validation-lambda \
  --role-session-name test
Enter fullscreen mode Exit fullscreen mode

Issue: Lambda package incompatibility

# Error: "cannot import name '_imaging' from 'PIL'"
# Cause: Built on wrong architecture (macOS/Windows)
# Solution: Use Docker build method from Step 2

# Verify Lambda runtime architecture
aws lambda get-function --function-name ml-pipeline-data-validation \
  --query 'Configuration.Architectures'

# Should return: ["x86_64"]

# If you see import errors, rebuild using Docker:
cd lambda/data-validation
docker run --rm -v "$PWD":/var/task public.ecr.aws/lambda/python:3.11 \
  bash -c "pip install -r requirements.txt -t package/ && cp handler.py package/"
cd package && zip -r ../function.zip . && cd ..
Enter fullscreen mode Exit fullscreen mode

Issue: S3 event not triggering Lambda

# Check Lambda permissions
aws lambda get-policy --function-name ml-pipeline-data-validation

# Should show s3.amazonaws.com as principal

# Verify S3 bucket notification configuration
aws s3api get-bucket-notification-configuration \
  --bucket ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID

# Should show Lambda function ARN

# Test manually with sample event
cat > test-event.json << EOF
{
  "Records": [{
    "s3": {
      "bucket": {"name": "ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID"},
      "object": {"key": "raw/sample.csv"}
    }
  }]
}
EOF

aws lambda invoke \
  --function-name ml-pipeline-data-validation \
  --payload file://test-event.json \
  response.json

cat response.json
Enter fullscreen mode Exit fullscreen mode

Issue: SNS notifications not received

# Check subscription status
aws sns list-subscriptions-by-topic \
  --topic-arn arn:aws:sns:ap-south-1:ACCOUNT_ID:ml-pipeline-validation-notifications

# Look for SubscriptionArn (not "PendingConfirmation")
# If pending, check email spam folder for confirmation link

# Test SNS manually
aws sns publish \
  --topic-arn arn:aws:sns:ap-south-1:ACCOUNT_ID:ml-pipeline-validation-notifications \
  --subject "Test Notification" \
  --message "Testing SNS configuration"

# Check CloudWatch logs for SNS publish errors
aws logs filter-log-events \
  --log-group-name /aws/lambda/ml-pipeline-data-validation \
  --filter-pattern "SNS" \
  --start-time $(date -u -d '1 hour ago' +%s)000
Enter fullscreen mode Exit fullscreen mode

Issue: High costs from CloudTrail

# Check CloudTrail event count
aws cloudtrail lookup-events \
  --start-time $(date -u -d '24 hours ago' +%Y-%m-%dT%H:%M:%S) \
  --max-results 50

# Reduce costs by:
# 1. Using CloudTrail Insights instead of all data events
# 2. Filtering to specific prefixes
# 3. Using CloudWatch Events for real-time needs

# Update CloudTrail configuration
terraform apply -target=aws_cloudtrail.data_events
Enter fullscreen mode Exit fullscreen mode

What's Next?

You now have:

  • Secure, encrypted data storage
  • Automated data validation
  • Complete audit trail
  • Monitoring and alerting
  • Infrastructure as Code
  • Data split for ML training
  • Testing framework

In Part 3, we'll implement AIDLC Phase 2: Model Training:

  • Custom SageMaker training containers with Docker
  • Experiment tracking and versioning
  • Model registry and governance
  • Hyperparameter optimization
  • Cost-effective Spot instance training
  • Automated model evaluation

AIDLC Focus: Model Development, Training, and Governance

In Part 4, we'll complete the production ML platform:

  • CI/CD pipelines for automated deployment
  • SageMaker endpoints with auto-scaling
  • Model monitoring and drift detection
  • A/B testing and canary deployments
  • Production observability and incident response

AIDLC Focus: Deployment, Monitoring, and Compliance


Key Takeaways

  1. Automate validation - Manual checks don't scale and miss edge cases
  2. Encrypt everything - KMS at rest, TLS in transit, no exceptions
  3. Audit all access - CloudTrail provides compliance and debugging trails
  4. Monitor data quality - Track metrics over time to detect degradation
  5. Use IaC - Terraform makes infrastructure reproducible and reviewable
  6. Package carefully - Lambda deployment packages must match runtime architecture
  7. Test locally first - Unit tests catch bugs before AWS deployment
  8. Split data properly - Training requires separate train/val sets

Remember: Good data pipelines prevent bad models. The AIDLC framework ensures quality from day one.


Resources

AWS Documentation:

Tools:

Testing:

Related Articles:


Let's Connect!

  • Questions about the data pipeline? Drop a comment below
  • Follow me for Part 3 - SageMaker Training & Model Registry
  • Like if this helped you build your pipeline
  • Share with your team/connects

What data quality challenges are you facing? Let me know in the comments!


About the Author

Connect with me:


Tags: #aws #machinelearning #mlops #aidlc #dataengineering #terraform #lambda #s3 #cloudwatch #testing


Top comments (1)

Collapse
 
shoaibalimir profile image
Shoaibali Mir

Update: I published a strategic deep-dive on Medium explaining the why
behind every architecture decision in this series.

If you're wondering "why security-first instead of iterate-fast?" or
"why Spot instances despite interruptions?" — that post breaks down
the tradeoffs tutorials don't cover.

Link: shoaibalimir.medium.com/building-p...