DEV Community

Cover image for Secure ML on AWS : Building Production Data Pipelines with S3 and Lambda
Shoaibali Mir
Shoaibali Mir

Posted on

Secure ML on AWS : Building Production Data Pipelines with S3 and Lambda

Reading time: ~15-20 minutes

Level: Intermediate

Prerequisites: Basic AWS knowledge, familiarity with S3 and Lambda

Series: Part 2 of 4 - Read Part 1

Objective: This blog focuses on teaching core concepts and best practices for building data pipelines on AWS. The code examples are functional for learning purposes. Production deployments require additional hardening, testing, and organizational-specific configurations.


Welcome Back!

In Part 1, we explored the AIDLC framework and AWS architecture for secure ML pipelines. Now, let's get hands-on with Phase 1: Data Collection & Preparation.

What you'll build today:

  • Encrypted S3 data lake with proper access controls
  • Automated data validation pipeline with Lambda
  • Data quality monitoring and alerting
  • Complete audit trail with CloudTrail
  • Infrastructure as Code with Terraform

By the end: You'll have a functional data pipeline that demonstrates security and quality best practices.


The Data Pipeline Problem

You can't build reliable ML models on unreliable data.

Common issues include:

Inconsistent data formats - Training fails due to schema changes

Missing security controls - Sensitive data exposed

No data validation - Bad data silently corrupts models

Manual processes - Doesn't scale, error-prone

No audit trail - Can't track data lineage

The solution:

An automated, secure, validated data pipeline.


Architecture Overview

Here's what we're building:

Data Pipeline

AWS Services Used:

  • S3: Encrypted data storage with versioning
  • Lambda: Serverless data validation
  • KMS: Encryption key management
  • CloudTrail: Audit logging
  • CloudWatch: Monitoring and alerting
  • SNS: Notifications
  • Terraform: Infrastructure as Code

Step 1: Setting Up Encrypted S3 Buckets

Why Three Buckets?

Separation of concerns improves security and organization:

  1. Raw Data Bucket - Unvalidated data from sources
  2. Validated Data Bucket - Quality-checked, ready for training
  3. Model Artifacts Bucket - Trained models and metadata

Note: The complete S3 configuration is shown below. For production deployments, consider splitting configuration into modules for better organization and reusability.

Infrastructure Code

Create terraform/s3-buckets.tf:

# KMS key for encryption
resource "aws_kms_key" "data_encryption" {
  description             = "Encryption key for ML data pipeline"
  deletion_window_in_days = 10
  enable_key_rotation     = true

  tags = {
    Name        = "ml-data-encryption-key"
    Environment = var.environment
    Purpose     = "ML-DataPipeline"
  }
}

resource "aws_kms_alias" "data_encryption" {
  name          = "alias/ml-data-encryption"
  target_key_id = aws_kms_key.data_encryption.key_id
}

# Raw data bucket
resource "aws_s3_bucket" "raw_data" {
  bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}"

  tags = {
    Name        = "ML Raw Data"
    Environment = var.environment
    DataStage   = "Raw"
  }
}

# Enable versioning
resource "aws_s3_bucket_versioning" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  versioning_configuration {
    status = "Enabled"
  }
}

# Enable encryption
resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.data_encryption.arn
    }
    bucket_key_enabled = true
  }
}

# Block all public access
resource "aws_s3_bucket_public_access_block" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# Enforce encryption and secure transport
resource "aws_s3_bucket_policy" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "DenyUnencryptedObjectUploads"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:PutObject"
        Resource = "${aws_s3_bucket.raw_data.arn}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "aws:kms"
          }
        }
      },
      {
        Sid    = "DenyInsecureTransport"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:*"
        Resource = [
          aws_s3_bucket.raw_data.arn,
          "${aws_s3_bucket.raw_data.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      }
    ]
  })
}

# Lifecycle policy to manage old versions
resource "aws_s3_bucket_lifecycle_configuration" "raw_data" {
  bucket = aws_s3_bucket.raw_data.id

  rule {
    id     = "delete-old-versions"
    status = "Enabled"

    noncurrent_version_expiration {
      noncurrent_days = 90
    }
  }

  rule {
    id     = "transition-to-glacier"
    status = "Enabled"

    transition {
      days          = 30
      storage_class = "GLACIER"
    }
  }
}

# Validated data bucket
resource "aws_s3_bucket" "validated_data" {
  bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}"

  tags = {
    Name        = "ML Validated Data"
    Environment = var.environment
    DataStage   = "Validated"
  }
}

resource "aws_s3_bucket_versioning" "validated_data" {
  bucket = aws_s3_bucket.validated_data.id

  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" {
  bucket = aws_s3_bucket.validated_data.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.data_encryption.arn
    }
    bucket_key_enabled = true
  }
}

resource "aws_s3_bucket_public_access_block" "validated_data" {
  bucket = aws_s3_bucket.validated_data.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# Apply same security policies to validated bucket
resource "aws_s3_bucket_policy" "validated_data" {
  bucket = aws_s3_bucket.validated_data.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "DenyUnencryptedObjectUploads"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:PutObject"
        Resource = "${aws_s3_bucket.validated_data.arn}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "aws:kms"
          }
        }
      },
      {
        Sid    = "DenyInsecureTransport"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:*"
        Resource = [
          aws_s3_bucket.validated_data.arn,
          "${aws_s3_bucket.validated_data.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      }
    ]
  })
}

# Model artifacts bucket
resource "aws_s3_bucket" "model_artifacts" {
  bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}"

  tags = {
    Name        = "ML Model Artifacts"
    Environment = var.environment
    DataStage   = "Models"
  }
}

resource "aws_s3_bucket_versioning" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id

  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm     = "aws:kms"
      kms_master_key_id = aws_kms_key.data_encryption.arn
    }
    bucket_key_enabled = true
  }
}

resource "aws_s3_bucket_public_access_block" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_policy" "model_artifacts" {
  bucket = aws_s3_bucket.model_artifacts.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "DenyUnencryptedObjectUploads"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:PutObject"
        Resource = "${aws_s3_bucket.model_artifacts.arn}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "aws:kms"
          }
        }
      },
      {
        Sid    = "DenyInsecureTransport"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:*"
        Resource = [
          aws_s3_bucket.model_artifacts.arn,
          "${aws_s3_bucket.model_artifacts.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      }
    ]
  })
}
Enter fullscreen mode Exit fullscreen mode

Variables File

Create terraform/variables.tf:

variable "project_name" {
  description = "Project name prefix"
  type        = string
  default     = "ml-pipeline"
}

variable "environment" {
  description = "Environment (dev, staging, prod)"
  type        = string
  default     = "dev"
}

variable "aws_region" {
  description = "AWS region"
  type        = string
  default     = "ap-south-1"
}

variable "notification_email" {
  description = "Email for SNS notifications"
  type        = string
}

data "aws_caller_identity" "current" {}
Enter fullscreen mode Exit fullscreen mode

Provider Configuration

Create terraform/providers.tf:

terraform {
  required_version = ">= 1.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }

  # For production, use remote state
  # backend "s3" {
  #   bucket         = "your-terraform-state-bucket"
  #   key            = "ml-pipeline/terraform.tfstate"
  #   region         = "ap-south-1"
  #   dynamodb_table = "terraform-state-lock"
  #   encrypt        = true
  # }
}

provider "aws" {
  region = var.aws_region

  default_tags {
    tags = {
      Project    = var.project_name
      ManagedBy  = "Terraform"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Deploy the Infrastructure

cd terraform

# Initialize Terraform
terraform init

# Review the plan
terraform plan -var="notification_email=your-email@example.com"

# Apply the configuration
terraform apply -var="notification_email=your-email@example.com"
Enter fullscreen mode Exit fullscreen mode

Step 2: Data Validation Lambda Function

Why Validate Data?

Prevent garbage in, garbage out:

  • Catch schema changes early
  • Detect data quality issues
  • Ensure consistency
  • Create audit trail

Important: After deploying, check your email and confirm the SNS subscription to receive notifications.

Validation Logic

Create lambda/data-validation/handler.py:

import json
import os
import boto3
import logging
import pandas as pd
from io import BytesIO
import hashlib
from datetime import datetime

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# AWS clients
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')

# Validation rules
VALIDATION_RULES = {
    'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'],
    'numeric_columns': ['feature_1', 'feature_2', 'target'],
    'max_null_percentage': 0.05,  # 5% max
    'min_rows': 5,  # Adjusted for testing - use 100+ for production
    'max_rows': 1000000,
    'date_columns': ['timestamp']
}

def calculate_checksum(data: bytes) -> str:
    """Calculate SHA256 checksum"""
    return hashlib.sha256(data).hexdigest()

def validate_schema(df: pd.DataFrame) -> dict:
    """Validate DataFrame schema"""
    issues = []

    # Check required columns
    missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns)
    if missing_cols:
        issues.append(f"Missing columns: {missing_cols}")

    # Check numeric columns
    for col in VALIDATION_RULES['numeric_columns']:
        if col in df.columns:
            if not pd.api.types.is_numeric_dtype(df[col]):
                issues.append(f"Column '{col}' should be numeric")

    # Check date columns
    for col in VALIDATION_RULES['date_columns']:
        if col in df.columns:
            try:
                # Actually convert to catch invalid dates
                df[col] = pd.to_datetime(df[col])
            except Exception as e:
                issues.append(f"Column '{col}' has invalid datetime values: {str(e)}")

    return {
        'valid': len(issues) == 0,
        'issues': issues
    }

def validate_data_quality(df: pd.DataFrame) -> dict:
    """Validate data quality"""
    issues = []

    # Check row count
    if len(df) < VALIDATION_RULES['min_rows']:
        issues.append(
            f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}"
        )
    elif len(df) > VALIDATION_RULES['max_rows']:
        issues.append(
            f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}"
        )

    # Check null values
    for col in df.columns:
        null_pct = df[col].isnull().sum() / len(df)
        if null_pct > VALIDATION_RULES['max_null_percentage']:
            issues.append(
                f"Column '{col}' has {null_pct:.2%} nulls "
                f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})"
            )

    # Check duplicates
    duplicate_count = df.duplicated().sum()
    if duplicate_count > 0:
        issues.append(f"Found {duplicate_count} duplicate rows")

    # Data distribution checks
    stats = {}
    for col in VALIDATION_RULES['numeric_columns']:
        if col in df.columns:
            stats[col] = {
                'mean': float(df[col].mean()),
                'std': float(df[col].std()),
                'min': float(df[col].min()),
                'max': float(df[col].max()),
                'null_count': int(df[col].isnull().sum())
            }

    return {
        'valid': len(issues) == 0,
        'issues': issues,
        'stats': {
            'row_count': len(df),
            'column_count': len(df.columns),
            'duplicate_count': duplicate_count,
            'column_stats': stats
        }
    }

def send_notification(topic_arn: str, subject: str, message: str):
    """Send SNS notification"""
    try:
        sns_client.publish(
            TopicArn=topic_arn,
            Subject=subject,
            Message=message
        )
        logger.info(f"Sent notification: {subject}")
    except Exception as e:
        logger.error(f"Failed to send notification: {e}")

def lambda_handler(event, context):
    """
    Lambda handler triggered by S3 upload
    """
    try:
        # Parse S3 event
        record = event['Records'][0]
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        logger.info(f"Processing: s3://{bucket}/{key}")

        # Download file
        response = s3_client.get_object(Bucket=bucket, Key=key)
        file_content = response['Body'].read()

        # Calculate checksum
        checksum = calculate_checksum(file_content)

        # Load data
        df = pd.read_csv(BytesIO(file_content))
        logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")

        # Run validations
        schema_result = validate_schema(df)
        quality_result = validate_data_quality(df)

        # Compile results
        validation_result = {
            'file': f"s3://{bucket}/{key}",
            'timestamp': datetime.utcnow().isoformat(),
            'checksum': checksum,
            'schema_validation': schema_result,
            'quality_validation': quality_result,
            'status': (
                'PASSED' if schema_result['valid'] and quality_result['valid']
                else 'FAILED'
            )
        }

        logger.info(f"Validation status: {validation_result['status']}")

        # Save validation report
        report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json')
        s3_client.put_object(
            Bucket=bucket,
            Key=report_key,
            Body=json.dumps(validation_result, indent=2),
            ServerSideEncryption='aws:kms',
            SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
        )

        # If passed, copy to validated bucket
        if validation_result['status'] == 'PASSED':
            validated_bucket = bucket.replace('raw-data', 'validated-data')
            validated_key = key.replace('raw/', 'validated/')

            # Explicitly set encryption on copy
            s3_client.copy_object(
                CopySource={'Bucket': bucket, 'Key': key},
                Bucket=validated_bucket,
                Key=validated_key,
                ServerSideEncryption='aws:kms',
                SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
            )

            logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}")

            # Send success notification
            send_notification(
                os.environ['SNS_TOPIC_ARN'],
                f' Data Validation Passed: {key}',
                f"File validated successfully\n\n"
                f"Stats:\n"
                f"- Rows: {quality_result['stats']['row_count']}\n"
                f"- Columns: {quality_result['stats']['column_count']}\n"
                f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n"
                f"Validation report: s3://{bucket}/{report_key}"
            )
        else:
            # Send failure notification
            all_issues = schema_result['issues'] + quality_result['issues']
            send_notification(
                os.environ['SNS_TOPIC_ARN'],
                f' Data Validation Failed: {key}',
                f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues)
            )

        return {
            'statusCode': 200,
            'body': json.dumps(validation_result)
        }

    except Exception as e:
        logger.error(f"Error: {e}", exc_info=True)

        send_notification(
            os.environ.get('SNS_TOPIC_ARN', ''),
            f' Data Validation Error',
            f"Error processing {key}:\n{str(e)}"
        )

        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }
Enter fullscreen mode Exit fullscreen mode

Lambda Dependencies

Create lambda/data-validation/requirements.txt:

pandas==2.1.0
boto3==1.28.85
Enter fullscreen mode Exit fullscreen mode

Package Lambda Function

Important Note on Lambda Packaging: The pandas library includes platform-specific C extensions. Building on macOS or Windows will create a package incompatible with Lambda's Amazon Linux 2 runtime. Use one of these approaches:

Option 1: Docker Build (Recommended)

cd lambda/data-validation

# Build using Docker with Lambda runtime
docker run --rm -v "$PWD":/var/task public.ecr.aws/lambda/python:3.11 \
  bash -c "pip install -r requirements.txt -t package/ && cp handler.py package/"

# Create deployment package
cd package
zip -r ../function.zip .
cd ..
Enter fullscreen mode Exit fullscreen mode

Option 2: Use Lambda Layer

# Skip pandas installation and use AWS-managed layer
# Update terraform/lambda.tf to include:
# layers = ["arn:aws:lambda:ap-south-1:336392948345:layer:AWSSDKPandas-Python311:12"]
Enter fullscreen mode Exit fullscreen mode

Option 3: EC2/Cloud9 Build

# Build on an Amazon Linux 2 instance
pip install -r requirements.txt -t package/
cp handler.py package/
cd package && zip -r ../function.zip . && cd ..
Enter fullscreen mode Exit fullscreen mode

Step 3: Lambda Infrastructure

Create terraform/lambda.tf:

# IAM role for Lambda
resource "aws_iam_role" "data_validation" {
  name = "${var.project_name}-data-validation-lambda"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

# IAM policy for Lambda
resource "aws_iam_role_policy" "data_validation" {
  name = "${var.project_name}-data-validation-policy"
  role = aws_iam_role.data_validation.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject"
        ]
        Resource = [
          "${aws_s3_bucket.raw_data.arn}/*",
          "${aws_s3_bucket.validated_data.arn}/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "s3:ListBucket"
        ]
        Resource = [
          aws_s3_bucket.raw_data.arn,
          aws_s3_bucket.validated_data.arn
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "kms:Decrypt",
          "kms:GenerateDataKey"
        ]
        Resource = aws_kms_key.data_encryption.arn
      },
      {
        Effect = "Allow"
        Action = "sns:Publish"
        Resource = aws_sns_topic.validation_notifications.arn
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

# SNS topic for notifications
resource "aws_sns_topic" "validation_notifications" {
  name = "${var.project_name}-validation-notifications"

  kms_master_key_id = aws_kms_key.data_encryption.id
}

resource "aws_sns_topic_subscription" "email" {
  topic_arn = aws_sns_topic.validation_notifications.arn
  protocol  = "email"
  endpoint  = var.notification_email
}

# Lambda function
resource "aws_lambda_function" "data_validation" {
  filename         = "${path.module}/../lambda/data-validation/function.zip"
  function_name    = "${var.project_name}-data-validation"
  role            = aws_iam_role.data_validation.arn
  handler         = "handler.lambda_handler"
  runtime         = "python3.11"
  timeout         = 300
  memory_size     = 1024
  source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip")

  environment {
    variables = {
      SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn
      KMS_KEY_ID    = aws_kms_key.data_encryption.id
    }
  }

  # Optional: Use AWS-managed pandas layer instead of packaging it
  # layers = ["arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:12"]

  tags = {
    Name        = "Data Validation Lambda"
    Environment = var.environment
  }
}

# S3 trigger
resource "aws_s3_bucket_notification" "raw_data_trigger" {
  bucket = aws_s3_bucket.raw_data.id

  lambda_function {
    lambda_function_arn = aws_lambda_function.data_validation.arn
    events              = ["s3:ObjectCreated:*"]
    filter_prefix       = "raw/"
    filter_suffix       = ".csv"
  }

  depends_on = [
    aws_lambda_permission.allow_s3,
    aws_lambda_function.data_validation
  ]
}

resource "aws_lambda_permission" "allow_s3" {
  statement_id  = "AllowS3Invoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.data_validation.function_name
  principal     = "s3.amazonaws.com"
  source_arn    = aws_s3_bucket.raw_data.arn
}

# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "data_validation" {
  name              = "/aws/lambda/${aws_lambda_function.data_validation.function_name}"
  retention_in_days = 30
  kms_key_id        = aws_kms_key.data_encryption.arn
}
Enter fullscreen mode Exit fullscreen mode

Step 4: CloudTrail for Audit Logging

Create terraform/cloudtrail.tf:

# CloudTrail logs bucket
resource "aws_s3_bucket" "cloudtrail_logs" {
  bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}"
}

resource "aws_s3_bucket_versioning" "cloudtrail_logs" {
  bucket = aws_s3_bucket.cloudtrail_logs.id

  versioning_configuration {
    status = "Enabled"
  }
}

resource "aws_s3_bucket_server_side_encryption_configuration" "cloudtrail_logs" {
  bucket = aws_s3_bucket.cloudtrail_logs.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" {
  bucket = aws_s3_bucket.cloudtrail_logs.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_s3_bucket_policy" "cloudtrail_logs" {
  bucket = aws_s3_bucket.cloudtrail_logs.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "AWSCloudTrailAclCheck"
        Effect = "Allow"
        Principal = {
          Service = "cloudtrail.amazonaws.com"
        }
        Action   = "s3:GetBucketAcl"
        Resource = aws_s3_bucket.cloudtrail_logs.arn
      },
      {
        Sid    = "AWSCloudTrailWrite"
        Effect = "Allow"
        Principal = {
          Service = "cloudtrail.amazonaws.com"
        }
        Action   = "s3:PutObject"
        Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*"
        Condition = {
          StringEquals = {
            "s3:x-amz-acl" = "bucket-owner-full-control"
          }
        }
      },
      {
        Sid    = "DenyInsecureTransport"
        Effect = "Deny"
        Principal = "*"
        Action = "s3:*"
        Resource = [
          aws_s3_bucket.cloudtrail_logs.arn,
          "${aws_s3_bucket.cloudtrail_logs.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      }
    ]
  })
}

# CloudTrail
resource "aws_cloudtrail" "data_events" {
  name                          = "${var.project_name}-data-trail"
  s3_bucket_name                = aws_s3_bucket.cloudtrail_logs.id
  include_global_service_events = true
  is_multi_region_trail         = true
  enable_logging                = true
  enable_log_file_validation    = true

  depends_on = [aws_s3_bucket_policy.cloudtrail_logs]

  event_selector {
    read_write_type           = "All"
    include_management_events = true

    data_resource {
      type = "AWS::S3::Object"
      values = [
        "${aws_s3_bucket.raw_data.arn}/*",
        "${aws_s3_bucket.validated_data.arn}/*",
        "${aws_s3_bucket.model_artifacts.arn}/*"
      ]
    }
  }

  tags = {
    Name        = "ML Data Trail"
    Environment = var.environment
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 5: CloudWatch Monitoring

Create terraform/monitoring.tf:

# CloudWatch dashboard
resource "aws_cloudwatch_dashboard" "data_pipeline" {
  dashboard_name = "${var.project_name}-data-pipeline"

  dashboard_body = jsonencode({
    widgets = [
      {
        type   = "metric"
        x      = 0
        y      = 0
        width  = 12
        height = 6
        properties = {
          metrics = [
            ["AWS/Lambda", "Invocations", {
              stat = "Sum"
              label = "Lambda Invocations"
              dimensions = {
                FunctionName = aws_lambda_function.data_validation.function_name
              }
            }],
            [".", "Errors", {
              stat = "Sum"
              label = "Lambda Errors"
              dimensions = {
                FunctionName = aws_lambda_function.data_validation.function_name
              }
            }],
            [".", "Duration", {
              stat = "Average"
              label = "Avg Duration (ms)"
              dimensions = {
                FunctionName = aws_lambda_function.data_validation.function_name
              }
            }]
          ]
          period  = 300
          stat    = "Average"
          region  = var.aws_region
          title   = "Lambda Metrics"
          view    = "timeSeries"
          stacked = false
        }
      }
    ]
  })
}

# Alarms
resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
  alarm_name          = "${var.project_name}-lambda-errors"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "Errors"
  namespace           = "AWS/Lambda"
  period              = "300"
  statistic           = "Sum"
  threshold           = "5"
  alarm_description   = "Lambda function errors"
  alarm_actions       = [aws_sns_topic.validation_notifications.arn]

  dimensions = {
    FunctionName = aws_lambda_function.data_validation.function_name
  }
}

resource "aws_cloudwatch_metric_alarm" "lambda_throttles" {
  alarm_name          = "${var.project_name}-lambda-throttles"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "1"
  metric_name         = "Throttles"
  namespace           = "AWS/Lambda"
  period              = "300"
  statistic           = "Sum"
  threshold           = "10"
  alarm_description   = "Lambda function throttling"
  alarm_actions       = [aws_sns_topic.validation_notifications.arn]

  dimensions = {
    FunctionName = aws_lambda_function.data_validation.function_name
  }
}
Enter fullscreen mode Exit fullscreen mode

Step 6: Testing the Pipeline

1. Create Test Data

Create test-data/sample.csv:

timestamp,feature_1,feature_2,target
2024-01-01T00:00:00,1.5,2.3,0
2024-01-01T01:00:00,1.8,2.1,1
2024-01-01T02:00:00,1.2,2.5,0
2024-01-01T03:00:00,1.9,2.0,1
2024-01-01T04:00:00,1.4,2.4,0
Enter fullscreen mode Exit fullscreen mode

2. Upload to S3

# Get your AWS account ID
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)

# Upload test file
aws s3 cp test-data/sample.csv \
  s3://ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}/raw/sample.csv \
  --server-side-encryption aws:kms
Enter fullscreen mode Exit fullscreen mode

3. Monitor Lambda Execution

# View Lambda logs in real-time
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow

# Or check specific log streams
aws logs describe-log-streams \
  --log-group-name /aws/lambda/ml-pipeline-data-validation \
  --order-by LastEventTime \
  --descending \
  --max-items 1
Enter fullscreen mode Exit fullscreen mode

4. Verify Validated Data

# Check validated bucket
aws s3 ls s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/

# Download validation report
aws s3 cp \
  s3://ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}/reports/sample_report.json \
  ./validation-report.json

# View report
cat validation-report.json | python -m json.tool
Enter fullscreen mode Exit fullscreen mode

5. Check Email Notification

You should receive an email notification with validation results. If not:

  • Confirm you accepted the SNS subscription
  • Check CloudWatch logs for errors
  • Verify SNS topic permissions

Step 7: Advanced Features

Data Quality Metrics

Add custom CloudWatch metrics in Lambda:

import boto3

cloudwatch = boto3.client('cloudwatch')

def publish_metrics(validation_result):
    """Publish custom metrics to CloudWatch"""
    metrics = [
        {
            'MetricName': 'DataQualityScore',
            'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0,
            'Unit': 'Percent',
            'Timestamp': datetime.utcnow()
        },
        {
            'MetricName': 'RowCount',
            'Value': validation_result['quality_validation']['stats']['row_count'],
            'Unit': 'Count',
            'Timestamp': datetime.utcnow()
        },
        {
            'MetricName': 'DuplicateRows',
            'Value': validation_result['quality_validation']['stats']['duplicate_count'],
            'Unit': 'Count',
            'Timestamp': datetime.utcnow()
        }
    ]

    cloudwatch.put_metric_data(
        Namespace='MLPipeline/DataQuality',
        MetricData=metrics
    )
Enter fullscreen mode Exit fullscreen mode

Data Versioning

Track data lineage:

def create_data_lineage(bucket, key, checksum):
    """Create data lineage metadata"""
    metadata = {
        'source_file': key,
        'checksum': checksum,
        'ingestion_time': datetime.utcnow().isoformat(),
        'validation_version': '1.0',
        'pipeline_version': os.environ.get('PIPELINE_VERSION', 'unknown')
    }

    # Store lineage in S3
    lineage_key = f"lineage/{checksum}.json"
    s3_client.put_object(
        Bucket=bucket,
        Key=lineage_key,
        Body=json.dumps(metadata, indent=2),
        ServerSideEncryption='aws:kms',
        SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
    )
Enter fullscreen mode Exit fullscreen mode

Important Notes Before Deploying

Known Considerations

1. Lambda Package Size and Compatibility

The pandas library (~100MB) has platform-specific dependencies. Building on macOS/Windows creates incompatible packages. Solutions:

  • Recommended: Use Docker build (shown in Step 2)
  • Alternative: Use AWS-managed Lambda Layer (uncomment in lambda.tf)
  • For CI/CD: Build in CodeBuild or GitHub Actions with Amazon Linux 2 image

2. Validation Rules

The sample uses min_rows: 5 for testing. Adjust for production:

VALIDATION_RULES = {
    'required_columns': ['your', 'columns', 'here'],
    'numeric_columns': ['numeric', 'columns'],
    'max_null_percentage': 0.05,  # Adjust based on requirements
    'min_rows': 100,  # Increase for production
    'max_rows': 10000000,
    'date_columns': ['timestamp']
}
Enter fullscreen mode Exit fullscreen mode

3. Terraform State Management

For production, use remote state with locking:

# In providers.tf
terraform {
  backend "s3" {
    bucket         = "your-terraform-state-bucket"
    key            = "ml-pipeline/terraform.tfstate"
    region         = "ap-south-1"
    dynamodb_table = "terraform-state-lock"
    encrypt        = true
  }
}
Enter fullscreen mode Exit fullscreen mode

4. Cost Monitoring

The estimated costs (~$9.50/month) assume minimal development usage. For production:

  • Enable AWS Cost Explorer
  • Set up AWS Budgets with email alerts
  • Monitor CloudWatch Logs storage (can grow quickly)
  • Review CloudTrail data event costs (especially with high S3 activity)

Actual CloudTrail costs for 10K Lambda invocations: ~$20-30/month (not $2 as estimated).

5. Network Isolation (Production Enhancement)

For maximum security, deploy Lambda in a VPC:

# In lambda.tf, add to aws_lambda_function:
vpc_config {
  subnet_ids         = var.private_subnet_ids
  security_group_ids = [aws_security_group.lambda.id]
}

# Requires NAT Gateway or VPC endpoints for S3/SNS access
Enter fullscreen mode Exit fullscreen mode

Security Best Practices Checklist

Encryption:

  • KMS encryption for all S3 buckets
  • Enforce encryption in bucket policies
  • TLS in transit (enforced by bucket policy)
  • Encrypted SNS topics
  • Encrypted CloudWatch logs

Access Control:

  • Least privilege IAM roles
  • No public bucket access
  • S3 bucket policies deny unencrypted uploads
  • S3:ListBucket permission for Lambda (enables proper error handling)

Audit & Compliance:

  • CloudTrail logging all data access
  • CloudWatch logs retention policy
  • SNS notifications for failures
  • Log file validation enabled

Data Quality:

  • Automated validation
  • Schema enforcement
  • Quality metrics tracking
  • Data checksums for integrity

Cost Breakdown

Estimated Monthly Costs (Development with ~1000 validations/month):

Service Usage Realistic Cost/Month
S3 Storage (100GB) Standard tier ~$2.30
Lambda 1K invocations, 512MB, 30s avg ~$0.10
CloudWatch Logs 10GB ingestion + storage ~$5.00
CloudTrail Data events (1K S3 operations) ~$2.00
SNS 100 notifications ~$0.01
Total ~$9.41

Production Scaling (100K validations/month):

Service Usage Cost/Month
S3 Storage (1TB) With lifecycle policies ~$15-20
Lambda 100K invocations ~$8-12
CloudWatch Logs 100GB ~$50
CloudTrail 100K S3 operations ~$200
Total ~$273-282

Cost Optimization Tips:

  1. Use S3 Intelligent-Tiering for automatic cost optimization
  2. Implement CloudWatch Logs retention policies (30 days recommended)
  3. Consider CloudTrail Insights instead of all data events for production
  4. Use Lambda reserved concurrency to prevent runaway costs

Troubleshooting Guide

Issue: Lambda timeout

# Symptoms: Functions timing out on large files
# Solutions:

# 1. Increase timeout in terraform/lambda.tf
timeout = 900  # 15 minutes (max)

# 2. Increase memory (also increases CPU)
memory_size = 2048  # Up to 10,240 MB

# 3. Process files in chunks for very large datasets
Enter fullscreen mode Exit fullscreen mode

Issue: Permission denied errors

# Verify IAM permissions
aws iam get-role-policy \
  --role-name ml-pipeline-data-validation-lambda \
  --policy-name ml-pipeline-data-validation-policy

# Check KMS key policy allows Lambda role
aws kms describe-key --key-id alias/ml-data-encryption

# Verify S3 bucket policy
aws s3api get-bucket-policy \
  --bucket ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID
Enter fullscreen mode Exit fullscreen mode

Issue: Lambda package incompatibility

# Error: "cannot import name '_imaging' from 'PIL'"
# Cause: Built on wrong architecture (macOS/Windows)
# Solution: Use Docker build method from Step 2

# Verify Lambda runtime architecture
aws lambda get-function --function-name ml-pipeline-data-validation \
  --query 'Configuration.Architectures'
Enter fullscreen mode Exit fullscreen mode

Issue: S3 event not triggering Lambda

# Check Lambda permissions
aws lambda get-policy --function-name ml-pipeline-data-validation

# Verify S3 bucket notification configuration
aws s3api get-bucket-notification-configuration \
  --bucket ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID

# Test manually
aws lambda invoke \
  --function-name ml-pipeline-data-validation \
  --payload file://test-event.json \
  response.json
Enter fullscreen mode Exit fullscreen mode

Issue: SNS notifications not received

# Check subscription status
aws sns list-subscriptions-by-topic \
  --topic-arn arn:aws:sns:ap-south-1:ACCOUNT_ID:ml-pipeline-validation-notifications

# Subscription must show "SubscriptionArn" (not "PendingConfirmation")
# If pending, check email spam folder for confirmation link
Enter fullscreen mode Exit fullscreen mode

What's Next?

You now have:

  • Secure, encrypted data storage
  • Automated data validation
  • Complete audit trail
  • Monitoring and alerting
  • Infrastructure as Code

In Part 3, we'll build the training pipeline with SageMaker:

  • Custom training containers with Docker
  • Distributed training infrastructure
  • Experiment tracking with MLflow
  • Model versioning and registry
  • Hyperparameter optimization with cost-effective Spot instances

In Part 4 (Series Finale), we'll complete the production ML platform:

  • CI/CD pipelines for automated deployment
  • SageMaker endpoints with auto-scaling
  • Model monitoring and drift detection
  • A/B testing and canary deployments
  • Production observability and incident response

Key Takeaways

  1. Automate validation - Manual checks don't scale and miss edge cases
  2. Encrypt everything - KMS at rest, TLS in transit, no exceptions
  3. Audit all access - CloudTrail provides compliance and debugging trails
  4. Monitor data quality - Track metrics over time to detect degradation
  5. Use IaC - Terraform makes infrastructure reproducible and reviewable
  6. Package carefully - Lambda deployment packages must match runtime architecture

Remember: Good data pipelines prevent bad models. Invest time in data quality upfront.


Resources

AWS Documentation:

Tools:

Related Articles:


Let's Connect!

  • Questions about the data pipeline? Drop a comment below
  • Follow me for Part 3 - SageMaker Training
  • Like if this helped you build your pipeline
  • Share with your team

What data quality challenges are you facing? Let me know in the comments!


About the Author

Connect with me:


Tags: #aws #machinelearning #mlops #dataengineering #terraform #lambda #s3 #cloudwatch


Top comments (0)