Reading time: ~20-25 minutes
Level: Intermediate
Prerequisites: Basic AWS knowledge, familiarity with S3 and Lambda
Series: Part 2 of 4 - Read Part 1
Objective: This blog focuses on teaching core concepts and best practices for building data pipelines on AWS. The code examples are functional for learning purposes. Production deployments require additional hardening, testing, and organizational-specific configurations.
Welcome Back!
In Part 1, we introduced the complete AI/ML Development Lifecycle (AIDLC) framework. This series implements the technical phases:
Complete AIDLC → Series Mapping:
Part 2:
- Phase 1: Data Collection & Preparation
- Phase 6: Governance (CloudTrail, KMS, IAM)
Part 3:
- Phase 2: Model Development & Training
- Phase 3: Model Evaluation (validation metrics)
- Phase 6: Governance (experiment tracking, versioning)
Part 4:
- Phase 4: Model Deployment (SageMaker endpoints)
- Phase 5: Monitoring & Maintenance (drift detection)
- Phase 6: Governance (CI/CD, audit logs)
Key Insight: Phase 6 (Governance & Compliance) isn't a separate step—it's embedded into every phase through security practices, audit logging, and compliance controls.
What you'll build today:
- Encrypted S3 data lake with proper access controls
- Automated data validation pipeline with Lambda
- Data quality monitoring and alerting
- Complete audit trail with CloudTrail
- Infrastructure as Code with Terraform
- Data splitting for ML training
By the end: You'll have a functional data pipeline that demonstrates security and quality best practices.
Table of Contents
- The Data Pipeline Problem
- Architecture Overview
- Step 1: Setting Up Encrypted S3 Buckets
- Step 2: Data Validation Lambda Function
- Step 3: Lambda Infrastructure
- Step 4: CloudTrail for Audit Logging
- Step 5: CloudWatch Monitoring
- Step 6: Testing the Pipeline
- Step 6.5: Preparing Data for Training
- Step 7: Advanced Features
- Step 7.5: Testing Best Practices
- Important Notes Before Deploying
- Security Best Practices Checklist
- Cost Breakdown
- Troubleshooting Guide
- What's Next?
- Key Takeaways
The Data Pipeline Problem
You can't build reliable ML models on unreliable data.
Common issues include:
Inconsistent data formats - Training fails due to schema changes
Missing security controls - Sensitive data exposed
No data validation - Bad data silently corrupts models
Manual processes - Doesn't scale, error-prone
No audit trail - Can't track data lineage
The solution:
An automated, secure, validated data pipeline.
Architecture Overview
Here's the AIDLC Phase 1 data pipeline architecture:
Architecture Note: This implements the "Data Collection & Preparation" phase of the AIDLC framework, establishing the secure foundation for model training (Phase 2) and deployment (Phase 3).
AWS Services Used:
- S3: Encrypted data storage with versioning
- Lambda: Serverless data validation
- KMS: Encryption key management
- CloudTrail: Audit logging
- CloudWatch: Monitoring and alerting
- SNS: Notifications
- Terraform: Infrastructure as Code
Step 1: Setting Up Encrypted S3 Buckets
Why Three Buckets?
Separation of concerns improves security and organization:
- Raw Data Bucket - Unvalidated data from sources
- Validated Data Bucket - Quality-checked, ready for training
- Model Artifacts Bucket - Trained models and metadata
Note: The complete S3 configuration is shown below. For production deployments, consider splitting configuration into modules for better organization and reusability.
Infrastructure Code
Create terraform/s3-buckets.tf:
# KMS key for encryption
resource "aws_kms_key" "data_encryption" {
description = "Encryption key for ML data pipeline"
deletion_window_in_days = 10
enable_key_rotation = true
tags = {
Name = "ml-data-encryption-key"
Environment = var.environment
Purpose = "ML-DataPipeline"
}
}
resource "aws_kms_alias" "data_encryption" {
name = "alias/ml-data-encryption"
target_key_id = aws_kms_key.data_encryption.key_id
}
# Raw data bucket
resource "aws_s3_bucket" "raw_data" {
bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}"
tags = {
Name = "ML Raw Data"
Environment = var.environment
DataStage = "Raw"
}
}
# Enable versioning
resource "aws_s3_bucket_versioning" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
versioning_configuration {
status = "Enabled"
}
}
# Enable encryption
resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.data_encryption.arn
}
bucket_key_enabled = true
}
}
# Block all public access
resource "aws_s3_bucket_public_access_block" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# Enforce encryption and secure transport
resource "aws_s3_bucket_policy" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "DenyUnencryptedObjectUploads"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.raw_data.arn}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "aws:kms"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.raw_data.arn,
"${aws_s3_bucket.raw_data.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
}
]
})
}
# Lifecycle policy to manage old versions
resource "aws_s3_bucket_lifecycle_configuration" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
rule {
id = "delete-old-versions"
status = "Enabled"
noncurrent_version_expiration {
noncurrent_days = 90
}
}
rule {
id = "transition-to-glacier"
status = "Enabled"
transition {
days = 30
storage_class = "GLACIER"
}
}
}
# Validated data bucket
resource "aws_s3_bucket" "validated_data" {
bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}"
tags = {
Name = "ML Validated Data"
Environment = var.environment
DataStage = "Validated"
}
}
resource "aws_s3_bucket_versioning" "validated_data" {
bucket = aws_s3_bucket.validated_data.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" {
bucket = aws_s3_bucket.validated_data.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.data_encryption.arn
}
bucket_key_enabled = true
}
}
resource "aws_s3_bucket_public_access_block" "validated_data" {
bucket = aws_s3_bucket.validated_data.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# Apply same security policies to validated bucket
resource "aws_s3_bucket_policy" "validated_data" {
bucket = aws_s3_bucket.validated_data.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "DenyUnencryptedObjectUploads"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.validated_data.arn}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "aws:kms"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.validated_data.arn,
"${aws_s3_bucket.validated_data.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
}
]
})
}
# Model artifacts bucket
resource "aws_s3_bucket" "model_artifacts" {
bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}"
tags = {
Name = "ML Model Artifacts"
Environment = var.environment
DataStage = "Models"
}
}
resource "aws_s3_bucket_versioning" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.data_encryption.arn
}
bucket_key_enabled = true
}
}
resource "aws_s3_bucket_public_access_block" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_policy" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "DenyUnencryptedObjectUploads"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.model_artifacts.arn}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "aws:kms"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.model_artifacts.arn,
"${aws_s3_bucket.model_artifacts.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
}
]
})
}
Variables File
Create terraform/variables.tf:
variable "project_name" {
description = "Project name prefix"
type = string
default = "ml-pipeline"
}
variable "environment" {
description = "Environment (dev, staging, prod)"
type = string
default = "dev"
}
variable "aws_region" {
description = "AWS region"
type = string
default = "ap-south-1"
}
variable "notification_email" {
description = "Email for SNS notifications"
type = string
}
data "aws_caller_identity" "current" {}
Provider Configuration
Create terraform/providers.tf:
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
# For production, use remote state
# backend "s3" {
# bucket = "your-terraform-state-bucket"
# key = "ml-pipeline/terraform.tfstate"
# region = "ap-south-1"
# dynamodb_table = "terraform-state-lock"
# encrypt = true
# }
}
provider "aws" {
region = var.aws_region
default_tags {
tags = {
Project = var.project_name
ManagedBy = "Terraform"
}
}
}
Deploy the Infrastructure
cd terraform
# Initialize Terraform
terraform init
# Review the plan
terraform plan -var="notification_email=your-email@example.com"
# Apply the configuration
terraform apply -var="notification_email=your-email@example.com"
Step 2: Data Validation Lambda Function
Why Validate Data?
Prevent garbage in, garbage out:
- Catch schema changes early
- Detect data quality issues
- Ensure consistency
- Create audit trail
Important: After deploying, check your email and confirm the SNS subscription to receive notifications.
Validation Logic
Create lambda/data-validation/handler.py:
import json
import os
import boto3
import logging
import pandas as pd
from io import BytesIO
import hashlib
from datetime import datetime
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWS clients
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
# Validation rules
VALIDATION_RULES = {
'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'],
'numeric_columns': ['feature_1', 'feature_2', 'target'],
'max_null_percentage': 0.05, # 5% max
'min_rows': 5, # Adjusted for testing - use 100+ for production
'max_rows': 1000000,
'date_columns': ['timestamp']
}
def calculate_checksum(data: bytes) -> str:
"""Calculate SHA256 checksum"""
return hashlib.sha256(data).hexdigest()
def validate_schema(df: pd.DataFrame) -> dict:
"""Validate DataFrame schema"""
issues = []
# Check required columns
missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns)
if missing_cols:
issues.append(f"Missing columns: {missing_cols}")
# Check numeric columns
for col in VALIDATION_RULES['numeric_columns']:
if col in df.columns:
if not pd.api.types.is_numeric_dtype(df[col]):
issues.append(f"Column '{col}' should be numeric")
# Check date columns
for col in VALIDATION_RULES['date_columns']:
if col in df.columns:
try:
# Actually convert to catch invalid dates
df[col] = pd.to_datetime(df[col])
except Exception as e:
issues.append(f"Column '{col}' has invalid datetime values: {str(e)}")
return {
'valid': len(issues) == 0,
'issues': issues
}
def validate_data_quality(df: pd.DataFrame) -> dict:
"""Validate data quality"""
issues = []
# Check row count
if len(df) < VALIDATION_RULES['min_rows']:
issues.append(
f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}"
)
elif len(df) > VALIDATION_RULES['max_rows']:
issues.append(
f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}"
)
# Check null values
for col in df.columns:
null_pct = df[col].isnull().sum() / len(df)
if null_pct > VALIDATION_RULES['max_null_percentage']:
issues.append(
f"Column '{col}' has {null_pct:.2%} nulls "
f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})"
)
# Check duplicates
duplicate_count = df.duplicated().sum()
if duplicate_count > 0:
issues.append(f"Found {duplicate_count} duplicate rows")
# Data distribution checks
stats = {}
for col in VALIDATION_RULES['numeric_columns']:
if col in df.columns:
stats[col] = {
'mean': float(df[col].mean()),
'std': float(df[col].std()),
'min': float(df[col].min()),
'max': float(df[col].max()),
'null_count': int(df[col].isnull().sum())
}
return {
'valid': len(issues) == 0,
'issues': issues,
'stats': {
'row_count': len(df),
'column_count': len(df.columns),
'duplicate_count': duplicate_count,
'column_stats': stats
}
}
def send_notification(topic_arn: str, subject: str, message: str):
"""Send SNS notification"""
try:
sns_client.publish(
TopicArn=topic_arn,
Subject=subject,
Message=message
)
logger.info(f"Sent notification: {subject}")
except Exception as e:
logger.error(f"Failed to send notification: {e}")
def lambda_handler(event, context):
"""
Lambda handler triggered by S3 upload
"""
try:
# Parse S3 event
record = event['Records'][0]
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
logger.info(f"Processing: s3://{bucket}/{key}")
# Download file
response = s3_client.get_object(Bucket=bucket, Key=key)
file_content = response['Body'].read()
# Calculate checksum
checksum = calculate_checksum(file_content)
# Load data
df = pd.read_csv(BytesIO(file_content))
logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")
# Run validations
schema_result = validate_schema(df)
quality_result = validate_data_quality(df)
# Compile results
validation_result = {
'file': f"s3://{bucket}/{key}",
'timestamp': datetime.utcnow().isoformat(),
'checksum': checksum,
'schema_validation': schema_result,
'quality_validation': quality_result,
'status': (
'PASSED' if schema_result['valid'] and quality_result['valid']
else 'FAILED'
)
}
logger.info(f"Validation status: {validation_result['status']}")
# Save validation report
report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json')
s3_client.put_object(
Bucket=bucket,
Key=report_key,
Body=json.dumps(validation_result, indent=2),
ServerSideEncryption='aws:kms',
SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
)
# If passed, copy to validated bucket
if validation_result['status'] == 'PASSED':
validated_bucket = bucket.replace('raw-data', 'validated-data')
validated_key = key.replace('raw/', 'validated/')
# Explicitly set encryption on copy
s3_client.copy_object(
CopySource={'Bucket': bucket, 'Key': key},
Bucket=validated_bucket,
Key=validated_key,
ServerSideEncryption='aws:kms',
SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
)
logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}")
# Send success notification
send_notification(
os.environ['SNS_TOPIC_ARN'],
f'Data Validation Passed: {key}',
f"File validated successfully\n\n"
f"Stats:\n"
f"- Rows: {quality_result['stats']['row_count']}\n"
f"- Columns: {quality_result['stats']['column_count']}\n"
f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n"
f"Validation report: s3://{bucket}/{report_key}"
)
else:
# Send failure notification
all_issues = schema_result['issues'] + quality_result['issues']
send_notification(
os.environ['SNS_TOPIC_ARN'],
f'Data Validation Failed: {key}',
f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues)
)
return {
'statusCode': 200,
'body': json.dumps(validation_result)
}
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
send_notification(
os.environ.get('SNS_TOPIC_ARN', ''),
f'Data Validation Error',
f"Error processing {key}:\n{str(e)}"
)
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
Lambda Dependencies
Create lambda/data-validation/requirements.txt:
pandas==2.1.0
boto3==1.28.85
scikit-learn==1.3.0
Package Lambda Function
Important Note on Lambda Packaging: The pandas library includes platform-specific C extensions. Building on macOS or Windows will create a package incompatible with Lambda's Amazon Linux 2 runtime. Use one of these approaches:
Option 1: Docker Build (Recommended)
cd lambda/data-validation
# Build using Docker with Lambda runtime
docker run --rm -v "$PWD":/var/task public.ecr.aws/lambda/python:3.11 \
bash -c "pip install -r requirements.txt -t package/ && cp handler.py package/"
# Create deployment package
cd package
zip -r ../function.zip .
cd ..
Option 2: Use Lambda Layer
# Skip pandas installation and use AWS-managed layer
# Update terraform/lambda.tf to include:
# layers = ["arn:aws:lambda:ap-south-1:336392948345:layer:AWSSDKPandas-Python311:12"]
Option 3: EC2/Cloud9 Build
# Build on an Amazon Linux 2 instance
pip install -r requirements.txt -t package/
cp handler.py package/
cd package && zip -r ../function.zip . && cd ..
Step 3: Lambda Infrastructure
Create terraform/lambda.tf:
# IAM role for Lambda
resource "aws_iam_role" "data_validation" {
name = "${var.project_name}-data-validation-lambda"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}]
})
}
# IAM policy for Lambda
resource "aws_iam_role_policy" "data_validation" {
name = "${var.project_name}-data-validation-policy"
role = aws_iam_role.data_validation.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject"
]
Resource = [
"${aws_s3_bucket.raw_data.arn}/*",
"${aws_s3_bucket.validated_data.arn}/*"
]
},
{
Effect = "Allow"
Action = [
"s3:ListBucket"
]
Resource = [
aws_s3_bucket.raw_data.arn,
aws_s3_bucket.validated_data.arn
]
},
{
Effect = "Allow"
Action = [
"kms:Decrypt",
"kms:GenerateDataKey"
]
Resource = aws_kms_key.data_encryption.arn
},
{
Effect = "Allow"
Action = "sns:Publish"
Resource = aws_sns_topic.validation_notifications.arn
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
}
]
})
}
# SNS topic for notifications
resource "aws_sns_topic" "validation_notifications" {
name = "${var.project_name}-validation-notifications"
kms_master_key_id = aws_kms_key.data_encryption.id
}
resource "aws_sns_topic_subscription" "email" {
topic_arn = aws_sns_topic.validation_notifications.arn
protocol = "email"
endpoint = var.notification_email
}
# Lambda function
resource "aws_lambda_function" "data_validation" {
filename = "${path.module}/../lambda/data-validation/function.zip"
function_name = "${var.project_name}-data-validation"
role = aws_iam_role.data_validation.arn
handler = "handler.lambda_handler"
runtime = "python3.11"
timeout = 300
memory_size = 1024
source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip")
environment {
variables = {
SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn
KMS_KEY_ID = aws_kms_key.data_encryption.id
}
}
# Optional: Use AWS-managed pandas layer instead of packaging it
# layers = ["arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:12"]
tags = {
Name = "Data Validation Lambda"
Environment = var.environment
}
}
# S3 trigger
resource "aws_s3_bucket_notification" "raw_data_trigger" {
bucket = aws_s3_bucket.raw_data.id
lambda_function {
lambda_function_arn = aws_lambda_function.data_validation.arn
events = ["s3:ObjectCreated:*"]
filter_prefix = "raw/"
filter_suffix = ".csv"
}
depends_on = [
aws_lambda_permission.allow_s3,
aws_lambda_function.data_validation
]
}
resource "aws_lambda_permission" "allow_s3" {
statement_id = "AllowS3Invoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.data_validation.function_name
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.raw_data.arn
}
# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "data_validation" {
name = "/aws/lambda/${aws_lambda_function.data_validation.function_name}"
retention_in_days = 30
kms_key_id = aws_kms_key.data_encryption.arn
}
Step 4: CloudTrail for Audit Logging
Create terraform/cloudtrail.tf:
# CloudTrail logs bucket
resource "aws_s3_bucket" "cloudtrail_logs" {
bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}"
}
resource "aws_s3_bucket_versioning" "cloudtrail_logs" {
bucket = aws_s3_bucket.cloudtrail_logs.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "cloudtrail_logs" {
bucket = aws_s3_bucket.cloudtrail_logs.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" {
bucket = aws_s3_bucket.cloudtrail_logs.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_policy" "cloudtrail_logs" {
bucket = aws_s3_bucket.cloudtrail_logs.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AWSCloudTrailAclCheck"
Effect = "Allow"
Principal = {
Service = "cloudtrail.amazonaws.com"
}
Action = "s3:GetBucketAcl"
Resource = aws_s3_bucket.cloudtrail_logs.arn
},
{
Sid = "AWSCloudTrailWrite"
Effect = "Allow"
Principal = {
Service = "cloudtrail.amazonaws.com"
}
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*"
Condition = {
StringEquals = {
"s3:x-amz-acl" = "bucket-owner-full-control"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.cloudtrail_logs.arn,
"${aws_s3_bucket.cloudtrail_logs.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
}
]
})
}
# CloudTrail
resource "aws_cloudtrail" "data_events" {
name = "${var.project_name}-data-trail"
s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id
include_global_service_events = true
is_multi_region_trail = true
enable_logging = true
enable_log_file_validation = true
depends_on = [aws_s3_bucket_policy.cloudtrail_logs]
event_selector {
read_write_type = "All"
include_management_events = true
data_resource {
type = "AWS::S3::Object"
values = [
"${aws_s3_bucket.raw_data.arn}/*",
"${aws_s3_bucket.validated_data.arn}/*",
"${aws_s3_bucket.model_artifacts.arn}/*"
]
}
}
tags = {
Name = "ML Data Trail"
Environment = var.environment
}
}
Step 5: CloudWatch Monitoring
Create terraform/monitoring.tf:
# CloudWatch dashboard
resource "aws_cloudwatch_dashboard" "data_pipeline" {
dashboard_name = "${var.project_name}-data-pipeline"
dashboard_body = jsonencode({
widgets = [
{
type = "metric"
x = 0
y = 0
width = 12
height = 6
properties = {
metrics = [
["AWS/Lambda", "Invocations", {
stat = "Sum"
label = "Lambda Invocations"
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}],
[".", "Errors", {
stat = "Sum"
label = "Lambda Errors"
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}],
[".", "Duration", {
stat = "Average"
label = "Avg Duration (ms)"
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}]
]
period = 300
stat = "Average"
region = var.aws_region
title = "Lambda Metrics"
view = "timeSeries"
stacked = false
}
}
]
})
}
# Alarms
resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
alarm_name = "${var.project_name}-lambda-errors"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "Errors"
namespace = "AWS/Lambda"
period = "300"
statistic = "Sum"
threshold = "5"
alarm_description = "Lambda function errors"
alarm_actions = [aws_sns_topic.validation_notifications.arn]
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}
resource "aws_cloudwatch_metric_alarm" "lambda_throttles" {
alarm_name = "${var.project_name}-lambda-throttles"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "Throttles"
namespace = "AWS/Lambda"
period = "300"
statistic = "Sum"
threshold = "10"
alarm_description = "Lambda function throttling"
alarm_actions = [aws_sns_topic.validation_notifications.arn]
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}
Step 6: Testing the Pipeline
1. Create Test Data
Create test-data/sample.csv:
timestamp,feature_1,feature_2,target
2024-01-01T00:00:00,1.5,2.3,0
2024-01-01T01:00:00,1.8,2.1,1
2024-01-01T02:00:00,1.2,2.5,0
2024-01-01T03:00:00,1.9,2.0,1
2024-01-01T04:00:00,1.4,2.4,0
2. Upload to S3
# Get your AWS account ID
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
# Upload test file
aws s3 cp test-data/sample.csv \
s3://ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}/raw/sample.csv \
--server-side-encryption aws:kms
3. Monitor Lambda Execution
# View Lambda logs in real-time
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow
# Or check specific log streams
aws logs describe-log-streams \
--log-group-name /aws/lambda/ml-pipeline-data-validation \
--order-by LastEventTime \
--descending \
--max-items 1
4. Verify Validated Data
# Check validated bucket
aws s3 ls s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/
# Download validation report
aws s3 cp \
s3://ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}/reports/sample_report.json \
./validation-report.json
# View report
cat validation-report.json | python -m json.tool
5. Check Email Notification
You should receive an email notification with validation results. If not:
- Confirm you accepted the SNS subscription
- Check CloudWatch logs for errors
- Verify SNS topic permissions
Step 6.5: Preparing Data for Training
Before moving to Part 3 (model training), you'll need to split your validated data into training and validation sets. Here are two approaches:
Option 1: Add Split Logic to Lambda
This automatically splits data during validation. Update lambda/data-validation/handler.py:
# Add this import at the top
from sklearn.model_selection import train_test_split
# Add this function after send_notification()
def split_and_save_data(df, bucket, key, kms_key_id):
"""Split data into train/val sets (80/20)"""
try:
# Split with stratification if possible
if 'target' in df.columns:
train_df, val_df = train_test_split(
df,
test_size=0.2,
random_state=42,
stratify=df['target']
)
else:
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)
logger.info(f"Split: {len(train_df)} train, {len(val_df)} val samples")
# Save training data
train_key = key.replace('validated/', 'validated/train/')
train_csv = train_df.to_csv(index=False)
s3_client.put_object(
Bucket=bucket,
Key=train_key,
Body=train_csv,
ServerSideEncryption='aws:kms',
SSEKMSKeyId=kms_key_id
)
# Save validation data
val_key = key.replace('validated/', 'validated/val/')
val_csv = val_df.to_csv(index=False)
s3_client.put_object(
Bucket=bucket,
Key=val_key,
Body=val_csv,
ServerSideEncryption='aws:kms',
SSEKMSKeyId=kms_key_id
)
logger.info(f"Train: s3://{bucket}/{train_key}")
logger.info(f"Val: s3://{bucket}/{val_key}")
return train_key, val_key
except Exception as e:
logger.error(f"Failed to split data: {e}")
return None, None
# In lambda_handler, after copying to validated bucket, add:
if validation_result['status'] == 'PASSED':
# ... existing copy_object code ...
# Split data for training
train_key, val_key = split_and_save_data(
df,
validated_bucket,
validated_key,
os.environ.get('KMS_KEY_ID')
)
if train_key and val_key:
logger.info(f"Data split successful")
Then rebuild Lambda package:
cd lambda/data-validation
# Docker build with updated requirements
docker run --rm -v "$PWD":/var/task public.ecr.aws/lambda/python:3.11 \
bash -c "pip install -r requirements.txt -t package/ && cp handler.py package/"
cd package && zip -r ../function.zip . && cd ..
# Redeploy
cd ../../terraform
terraform apply -var="notification_email=your-email@example.com"
Option 2: Manual Split Script
For more control or one-time splits, use this standalone script.
Create scripts/split_data.py:
#!/usr/bin/env python3
"""
Split validated data for ML training
"""
import boto3
import pandas as pd
from sklearn.model_selection import train_test_split
import sys
import argparse
def split_s3_data(bucket, input_key, output_prefix='validated', test_size=0.2):
"""
Split S3 data into train/val sets
Args:
bucket: S3 bucket name
input_key: S3 key of validated file
output_prefix: Prefix for output files
test_size: Fraction for validation set (default 0.2 = 20%)
"""
s3 = boto3.client('s3')
# Download
print(f"Downloading s3://{bucket}/{input_key}")
obj = s3.get_object(Bucket=bucket, Key=input_key)
df = pd.read_csv(obj['Body'])
print(f" Loaded {len(df)} rows, {len(df.columns)} columns")
# Split with stratification if target column exists
try:
if 'target' in df.columns:
train_df, val_df = train_test_split(
df,
test_size=test_size,
random_state=42,
stratify=df['target']
)
print(f" Split with stratification on 'target' column")
else:
train_df, val_df = train_test_split(df, test_size=test_size, random_state=42)
except Exception as e:
print(f" Stratification failed ({e}), using simple split")
train_df, val_df = train_test_split(df, test_size=test_size, random_state=42)
print(f" Split complete: {len(train_df)} train, {len(val_df)} val")
# Upload training data
train_key = f"{output_prefix}/train/{input_key.split('/')[-1]}"
print(f"Uploading training data...")
s3.put_object(
Bucket=bucket,
Key=train_key,
Body=train_df.to_csv(index=False),
ServerSideEncryption='aws:kms'
)
print(f" s3://{bucket}/{train_key}")
# Upload validation data
val_key = f"{output_prefix}/val/{input_key.split('/')[-1]}"
print(f"Uploading validation data...")
s3.put_object(
Bucket=bucket,
Key=val_key,
Body=val_df.to_csv(index=False),
ServerSideEncryption='aws:kms'
)
print(f" s3://{bucket}/{val_key}")
print(f"\n Data split complete!")
return train_key, val_key
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Split validated ML data')
parser.add_argument('bucket', help='S3 bucket name')
parser.add_argument('input_key', help='S3 key of validated file')
parser.add_argument('--test-size', type=float, default=0.2,
help='Validation set fraction (default: 0.2)')
args = parser.parse_args()
split_s3_data(args.bucket, args.input_key, test_size=args.test_size)
Install dependencies and run:
pip install boto3 pandas scikit-learn
# Split your validated data
python scripts/split_data.py \
ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID \
validated/sample.csv
# Or with custom test size (30% validation)
python scripts/split_data.py \
ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID \
validated/sample.csv \
--test-size 0.3
Verify the Split
# Check training data
aws s3 ls s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/train/
# Check validation data
aws s3 ls s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/val/
# Download and inspect
aws s3 cp \
s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/train/sample.csv \
./train-sample.csv
wc -l train-sample.csv # Should show 80% of original rows
Important for Part 3: SageMaker training jobs (covered in Part 3) expect data in
s3://bucket/validated/train/ands3://bucket/validated/val/paths. Make sure to run one of these split methods before proceeding to model training.
Step 7: Advanced Features
Data Quality Metrics
Add custom CloudWatch metrics in Lambda to track data quality over time:
import boto3
from datetime import datetime
cloudwatch = boto3.client('cloudwatch')
def publish_metrics(validation_result, bucket):
"""Publish custom metrics to CloudWatch"""
try:
metrics = [
{
'MetricName': 'DataQualityScore',
'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0,
'Unit': 'Percent',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{'Name': 'Bucket', 'Value': bucket},
{'Name': 'Pipeline', 'Value': 'ML-DataValidation'}
]
},
{
'MetricName': 'RowCount',
'Value': validation_result['quality_validation']['stats']['row_count'],
'Unit': 'Count',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{'Name': 'Bucket', 'Value': bucket},
{'Name': 'Pipeline', 'Value': 'ML-DataValidation'}
]
},
{
'MetricName': 'DuplicateRows',
'Value': validation_result['quality_validation']['stats']['duplicate_count'],
'Unit': 'Count',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{'Name': 'Bucket', 'Value': bucket},
{'Name': 'Pipeline', 'Value': 'ML-DataValidation'}
]
},
{
'MetricName': 'NullPercentage',
'Value': sum(
stats['null_count']
for stats in validation_result['quality_validation']['stats']['column_stats'].values()
) / validation_result['quality_validation']['stats']['row_count'] * 100,
'Unit': 'Percent',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{'Name': 'Bucket', 'Value': bucket},
{'Name': 'Pipeline', 'Value': 'ML-DataValidation'}
]
}
]
cloudwatch.put_metric_data(
Namespace='MLPipeline/DataQuality',
MetricData=metrics
)
logger.info("Published custom metrics to CloudWatch")
except Exception as e:
logger.error(f"Failed to publish metrics: {e}")
# Add to lambda_handler after saving validation report:
publish_metrics(validation_result, bucket)
Data Lineage Tracking
Create comprehensive lineage metadata:
def create_data_lineage(bucket, key, checksum, validation_result):
"""Create data lineage metadata for compliance"""
lineage = {
'data_asset': {
'source_file': key,
'source_bucket': bucket,
'checksum_sha256': checksum,
's3_version_id': validation_result.get('version_id'),
},
'validation': {
'timestamp': datetime.utcnow().isoformat(),
'validator_version': '1.0',
'validation_rules': VALIDATION_RULES,
'validation_status': validation_result['status'],
'quality_score': (
100.0 if validation_result['status'] == 'PASSED' else 0.0
)
},
'pipeline': {
'pipeline_id': os.environ.get('PIPELINE_ID', 'ml-data-pipeline'),
'pipeline_version': os.environ.get('PIPELINE_VERSION', '1.0'),
'lambda_request_id': context.aws_request_id if context else None,
'execution_timestamp': datetime.utcnow().isoformat()
},
'statistics': validation_result['quality_validation']['stats'],
'downstream_assets': []
}
# If validation passed, record downstream locations
if validation_result['status'] == 'PASSED':
validated_bucket = bucket.replace('raw-data', 'validated-data')
lineage['downstream_assets'].append({
'type': 'validated_data',
'location': f"s3://{validated_bucket}/{key.replace('raw/', 'validated/')}",
'created_at': datetime.utcnow().isoformat()
})
# Store lineage
lineage_key = f"lineage/{checksum}.json"
s3_client.put_object(
Bucket=bucket,
Key=lineage_key,
Body=json.dumps(lineage, indent=2),
ServerSideEncryption='aws:kms',
SSEKMSKeyId=os.environ.get('KMS_KEY_ID'),
ContentType='application/json',
Metadata={
'data-classification': 'lineage',
'pipeline-version': '1.0'
}
)
logger.info(f"Lineage created: s3://{bucket}/{lineage_key}")
return lineage_key
# Add to lambda_handler after saving validation report:
lineage_key = create_data_lineage(bucket, key, checksum, validation_result)
Step 7.5: Testing Best Practices
Local Unit Tests
Before deploying Lambda changes, test validation logic locally. This catches bugs early and documents expected behavior.
Create lambda/data-validation/test_handler.py:
import pytest
import pandas as pd
from handler import validate_schema, validate_data_quality
def test_schema_validation_success():
"""Test valid schema passes"""
df = pd.DataFrame({
'timestamp': ['2024-01-01 00:00:00'],
'feature_1': [1.5],
'feature_2': [2.3],
'target': [0]
})
result = validate_schema(df)
assert result['valid'] == True
assert len(result['issues']) == 0
def test_schema_validation_missing_column():
"""Test missing required column fails"""
df = pd.DataFrame({
'timestamp': ['2024-01-01 00:00:00'],
'feature_1': [1.5],
'target': [0]
})
result = validate_schema(df)
assert result['valid'] == False
assert 'feature_2' in str(result['issues'])
def test_schema_validation_wrong_type():
"""Test incorrect column type fails"""
df = pd.DataFrame({
'timestamp': ['2024-01-01 00:00:00'],
'feature_1': ['not_a_number'], # Should be numeric
'feature_2': [2.3],
'target': [0]
})
result = validate_schema(df)
assert result['valid'] == False
assert 'numeric' in str(result['issues']).lower()
def test_data_quality_null_detection():
"""Test null value detection"""
df = pd.DataFrame({
'timestamp': ['2024-01-01 00:00:00', None],
'feature_1': [1.5, 1.8],
'feature_2': [2.3, 2.1],
'target': [0, 1]
})
result = validate_data_quality(df)
assert result['stats']['column_stats']['timestamp']['null_count'] > 0
def test_data_quality_duplicate_detection():
"""Test duplicate row detection"""
df = pd.DataFrame({
'timestamp': ['2024-01-01 00:00:00', '2024-01-01 00:00:00'],
'feature_1': [1.5, 1.5],
'feature_2': [2.3, 2.3],
'target': [0, 0]
})
result = validate_data_quality(df)
assert result['stats']['duplicate_count'] > 0
assert 'duplicate' in str(result['issues']).lower()
def test_data_quality_insufficient_rows():
"""Test minimum row requirement"""
df = pd.DataFrame({
'timestamp': ['2024-01-01 00:00:00'],
'feature_1': [1.5],
'feature_2': [2.3],
'target': [0]
})
result = validate_data_quality(df)
# With min_rows=5, this should fail
assert result['valid'] == False
assert 'Insufficient rows' in str(result['issues'])
def test_data_quality_excessive_nulls():
"""Test excessive null percentage fails"""
df = pd.DataFrame({
'timestamp': ['2024-01-01 00:00:00'] * 10,
'feature_1': [1.5, None, None, None, None, None, None, None, None, None],
'feature_2': [2.3] * 10,
'target': [0] * 10
})
result = validate_data_quality(df)
assert result['valid'] == False
assert 'null' in str(result['issues']).lower()
def test_data_quality_statistics():
"""Test statistical calculations are correct"""
df = pd.DataFrame({
'timestamp': ['2024-01-01 00:00:00'] * 5,
'feature_1': [1.0, 2.0, 3.0, 4.0, 5.0],
'feature_2': [2.0, 2.0, 2.0, 2.0, 2.0],
'target': [0, 1, 0, 1, 0]
})
result = validate_data_quality(df)
# Check feature_1 statistics
assert result['stats']['column_stats']['feature_1']['mean'] == 3.0
assert result['stats']['column_stats']['feature_1']['min'] == 1.0
assert result['stats']['column_stats']['feature_1']['max'] == 5.0
# Check feature_2 statistics (all same value)
assert result['stats']['column_stats']['feature_2']['std'] == 0.0
Run tests locally:
cd lambda/data-validation
# Install test dependencies
pip install pytest pandas
# Run all tests
pytest test_handler.py -v
# Run specific test
pytest test_handler.py::test_schema_validation_success -v
# Run with coverage
pip install pytest-cov
pytest test_handler.py --cov=handler --cov-report=term-missing
Expected output:
============================= test session starts ==============================
test_handler.py::test_schema_validation_success PASSED [ 12%]
test_handler.py::test_schema_validation_missing_column PASSED [ 25%]
test_handler.py::test_schema_validation_wrong_type PASSED [ 37%]
test_handler.py::test_data_quality_null_detection PASSED [ 50%]
test_handler.py::test_data_quality_duplicate_detection PASSED [ 62%]
test_handler.py::test_data_quality_insufficient_rows PASSED [ 75%]
test_handler.py::test_data_quality_excessive_nulls PASSED [ 87%]
test_handler.py::test_data_quality_statistics PASSED [100%]
============================== 8 passed in 0.24s ===============================
Why test locally?
- Faster feedback than deploying to AWS
- Catch bugs before they hit production
- Document expected behavior
- Regression testing when updating validation rules
- Save money on Lambda invocations during development
Integration Testing
After deployment, verify the complete pipeline end-to-end.
Create scripts/test-pipeline.sh:
#!/bin/bash
# Integration test for ML data pipeline
set -e
echo "Starting pipeline integration test..."
echo ""
# Configuration
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
BUCKET="ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}"
TEST_FILE="test-data/sample.csv"
TIMESTAMP=$(date +%s)
# Upload test file
echo " Uploading test data to S3..."
aws s3 cp ${TEST_FILE} s3://${BUCKET}/raw/test-${TIMESTAMP}.csv \
--server-side-encryption aws:kms
echo " Uploaded to s3://${BUCKET}/raw/test-${TIMESTAMP}.csv"
echo ""
# Wait for Lambda processing
echo "Waiting 15 seconds for Lambda to process..."
sleep 15
echo ""
# Check Lambda logs
echo "Recent Lambda logs:"
aws logs tail /aws/lambda/ml-pipeline-data-validation \
--since 2m \
--format short | tail -20
echo ""
# Verify validated data exists
echo "Checking validated data bucket..."
VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}"
aws s3 ls s3://${VALIDATED_BUCKET}/validated/ | tail -5
echo ""
# Check validation report
echo "Downloading validation report..."
aws s3 cp \
s3://${BUCKET}/reports/test-${TIMESTAMP}_report.json \
./test-report.json
echo " Report contents:"
cat test-report.json | python3 -m json.tool | head -30
echo ""
# Verify split data (if Option 1 was implemented)
echo "Checking for train/val split..."
TRAIN_COUNT=$(aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/ | wc -l)
VAL_COUNT=$(aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/ | wc -l)
if [ $TRAIN_COUNT -gt 0 ] && [ $VAL_COUNT -gt 0 ]; then
echo " Train/val split detected"
echo " Training files: $TRAIN_COUNT"
echo " Validation files: $VAL_COUNT"
else
echo " No train/val split found (run Option 1 or 2 from Step 6.5)"
fi
echo ""
# Check CloudWatch metrics
echo "Recent CloudWatch metrics:"
aws cloudwatch get-metric-statistics \
--namespace MLPipeline/DataQuality \
--metric-name DataQualityScore \
--start-time $(date -u -d '10 minutes ago' +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 300 \
--statistics Average \
--dimensions Name=Pipeline,Value=ML-DataValidation \
--query 'Datapoints[0].Average' || echo " No metrics yet (custom metrics take a few minutes)"
echo ""
echo "Pipeline integration test complete!"
echo ""
echo "Next steps:"
echo "1. Check your email for SNS notification"
echo "2. View CloudWatch dashboard: https://console.aws.amazon.com/cloudwatch/"
echo "3. Review validation report: ./test-report.json"
Make executable and run:
chmod +x scripts/test-pipeline.sh
./scripts/test-pipeline.sh
Continuous Integration Testing
For production deployments, integrate tests into CI/CD:
GitHub Actions example (.github/workflows/test.yml):
name: Test Data Pipeline
on:
pull_request:
paths:
- 'lambda/data-validation/**'
- 'terraform/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
cd lambda/data-validation
pip install -r requirements.txt pytest pytest-cov
- name: Run unit tests
run: |
cd lambda/data-validation
pytest test_handler.py --cov=handler --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./lambda/data-validation/coverage.xml
Important Notes Before Deploying
Known Considerations
1. Lambda Package Size and Compatibility
The pandas library (~100MB) has platform-specific dependencies. Building on macOS/Windows creates incompatible packages. Solutions:
- Recommended: Use Docker build (shown in Step 2)
-
Alternative: Use AWS-managed Lambda Layer (uncomment in
lambda.tf) - For CI/CD: Build in CodeBuild or GitHub Actions with Amazon Linux 2 image
2. Validation Rules
The sample uses min_rows: 5 for testing. Adjust for production:
VALIDATION_RULES = {
'required_columns': ['your', 'columns', 'here'],
'numeric_columns': ['numeric', 'columns'],
'max_null_percentage': 0.05, # Adjust based on requirements
'min_rows': 100, # Increase for production
'max_rows': 10000000,
'date_columns': ['timestamp']
}
3. Terraform State Management
For production, use remote state with locking:
# In providers.tf
terraform {
backend "s3" {
bucket = "your-terraform-state-bucket"
key = "ml-pipeline/terraform.tfstate"
region = "ap-south-1"
dynamodb_table = "terraform-state-lock"
encrypt = true
}
}
4. Cost Monitoring
The estimated costs (~$9.50/month) assume minimal development usage. For production:
- Enable AWS Cost Explorer
- Set up AWS Budgets with email alerts
- Monitor CloudWatch Logs storage (can grow quickly)
- Review CloudTrail data event costs (especially with high S3 activity)
Actual CloudTrail costs for 10K Lambda invocations: ~$20-30/month (not $2 as initially estimated).
5. Network Isolation (Production Enhancement)
For maximum security, deploy Lambda in a VPC:
# In lambda.tf, add to aws_lambda_function:
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.lambda.id]
}
# Requires NAT Gateway or VPC endpoints for S3/SNS access
6. Production Patterns Not Implemented
This implementation focuses on core functionality for learning. For production, consider adding:
Idempotency:
# Track processed files to prevent duplicate processing
import hashlib
def is_already_processed(bucket, key):
"""Check if file already validated"""
try:
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('processed-files')
# Use file hash as idempotency key
file_hash = hashlib.md5(f"{bucket}/{key}".encode()).hexdigest()
response = table.get_item(Key={'file_hash': file_hash})
return 'Item' in response
except:
return False
# In lambda_handler:
if is_already_processed(bucket, key):
logger.info(f"Already processed: {key}")
return {'statusCode': 200, 'body': 'Already processed'}
Dead Letter Queue (DLQ):
# In terraform/lambda.tf, add:
resource "aws_sqs_queue" "lambda_dlq" {
name = "${var.project_name}-validation-dlq"
kms_master_key_id = aws_kms_key.data_encryption.id
message_retention_seconds = 1209600 # 14 days
}
# In aws_lambda_function.data_validation:
dead_letter_config {
target_arn = aws_sqs_queue.lambda_dlq.arn
}
Concurrent Execution Limits:
# In aws_lambda_function.data_validation:
reserved_concurrent_executions = 10 # Prevent runaway costs
Enhanced Error Handling:
class ValidationError(Exception):
"""Custom validation exception"""
pass
class DataQualityError(ValidationError):
"""Data quality check failed"""
pass
class SchemaError(ValidationError):
"""Schema validation failed"""
pass
# Use in lambda_handler for better error categorization
try:
schema_result = validate_schema(df)
if not schema_result['valid']:
raise SchemaError(schema_result['issues'])
quality_result = validate_data_quality(df)
if not quality_result['valid']:
raise DataQualityError(quality_result['issues'])
except SchemaError as e:
# Handle schema errors specifically
logger.error(f"Schema validation failed: {e}")
except DataQualityError as e:
# Handle quality errors specifically
logger.error(f"Quality validation failed: {e}")
Security Best Practices Checklist
Encryption:
- KMS encryption for all S3 buckets
- Enforce encryption in bucket policies
- TLS in transit (enforced by bucket policy)
- Encrypted SNS topics
- Encrypted CloudWatch logs
Access Control:
- Least privilege IAM roles
- No public bucket access
- S3 bucket policies deny unencrypted uploads
- S3:ListBucket permission for Lambda (enables proper error handling)
Audit & Compliance:
- CloudTrail logging all data access
- CloudWatch logs retention policy
- SNS notifications for failures
- Log file validation enabled
Data Quality:
- Automated validation
- Schema enforcement
- Quality metrics tracking
- Data checksums for integrity
Cost Breakdown
Estimated Monthly Costs (Development with ~1000 validations/month):
| Service | Usage | Realistic Cost/Month |
|---|---|---|
| S3 Storage (100GB) | Standard tier | ~$2.30 |
| Lambda | 1K invocations, 1GB memory, 30s avg | ~$0.10 |
| CloudWatch Logs | 10GB ingestion + storage | ~$5.00 |
| CloudTrail | Data events (1K S3 operations) | ~$2.00 |
| SNS | 100 notifications | ~$0.01 |
| Total | ~$9.41 |
Production Scaling (100K validations/month):
| Service | Usage | Cost/Month |
|---|---|---|
| S3 Storage (1TB) | With lifecycle policies | ~$15-20 |
| Lambda | 100K invocations | ~$8-12 |
| CloudWatch Logs | 100GB | ~$50 |
| CloudTrail | 100K S3 operations | ~$200 |
| Total | ~$273-282 |
Cost Optimization Tips:
- Use S3 Intelligent-Tiering for automatic cost optimization
- Implement CloudWatch Logs retention policies (30 days recommended)
- Consider CloudTrail Insights instead of all data events for production
- Use Lambda reserved concurrency to prevent runaway costs
- Enable S3 lifecycle transitions to Glacier for archival
Troubleshooting Guide
Issue: Lambda timeout
# Symptoms: Functions timing out on large files
# Solutions:
# 1. Increase timeout in terraform/lambda.tf
timeout = 900 # 15 minutes (max)
# 2. Increase memory (also increases CPU)
memory_size = 2048 # Up to 10,240 MB
# 3. Process files in chunks for very large datasets
# 4. Monitor duration metric
aws cloudwatch get-metric-statistics \
--namespace AWS/Lambda \
--metric-name Duration \
--dimensions Name=FunctionName,Value=ml-pipeline-data-validation \
--start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 300 \
--statistics Average,Maximum
Issue: Permission denied errors
# Verify IAM permissions
aws iam get-role-policy \
--role-name ml-pipeline-data-validation-lambda \
--policy-name ml-pipeline-data-validation-policy
# Check KMS key policy allows Lambda role
aws kms describe-key --key-id alias/ml-data-encryption
# Verify S3 bucket policy
aws s3api get-bucket-policy \
--bucket ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID
# Test Lambda can assume role
aws sts assume-role \
--role-arn arn:aws:iam::YOUR_ACCOUNT_ID:role/ml-pipeline-data-validation-lambda \
--role-session-name test
Issue: Lambda package incompatibility
# Error: "cannot import name '_imaging' from 'PIL'"
# Cause: Built on wrong architecture (macOS/Windows)
# Solution: Use Docker build method from Step 2
# Verify Lambda runtime architecture
aws lambda get-function --function-name ml-pipeline-data-validation \
--query 'Configuration.Architectures'
# Should return: ["x86_64"]
# If you see import errors, rebuild using Docker:
cd lambda/data-validation
docker run --rm -v "$PWD":/var/task public.ecr.aws/lambda/python:3.11 \
bash -c "pip install -r requirements.txt -t package/ && cp handler.py package/"
cd package && zip -r ../function.zip . && cd ..
Issue: S3 event not triggering Lambda
# Check Lambda permissions
aws lambda get-policy --function-name ml-pipeline-data-validation
# Should show s3.amazonaws.com as principal
# Verify S3 bucket notification configuration
aws s3api get-bucket-notification-configuration \
--bucket ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID
# Should show Lambda function ARN
# Test manually with sample event
cat > test-event.json << EOF
{
"Records": [{
"s3": {
"bucket": {"name": "ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID"},
"object": {"key": "raw/sample.csv"}
}
}]
}
EOF
aws lambda invoke \
--function-name ml-pipeline-data-validation \
--payload file://test-event.json \
response.json
cat response.json
Issue: SNS notifications not received
# Check subscription status
aws sns list-subscriptions-by-topic \
--topic-arn arn:aws:sns:ap-south-1:ACCOUNT_ID:ml-pipeline-validation-notifications
# Look for SubscriptionArn (not "PendingConfirmation")
# If pending, check email spam folder for confirmation link
# Test SNS manually
aws sns publish \
--topic-arn arn:aws:sns:ap-south-1:ACCOUNT_ID:ml-pipeline-validation-notifications \
--subject "Test Notification" \
--message "Testing SNS configuration"
# Check CloudWatch logs for SNS publish errors
aws logs filter-log-events \
--log-group-name /aws/lambda/ml-pipeline-data-validation \
--filter-pattern "SNS" \
--start-time $(date -u -d '1 hour ago' +%s)000
Issue: High costs from CloudTrail
# Check CloudTrail event count
aws cloudtrail lookup-events \
--start-time $(date -u -d '24 hours ago' +%Y-%m-%dT%H:%M:%S) \
--max-results 50
# Reduce costs by:
# 1. Using CloudTrail Insights instead of all data events
# 2. Filtering to specific prefixes
# 3. Using CloudWatch Events for real-time needs
# Update CloudTrail configuration
terraform apply -target=aws_cloudtrail.data_events
What's Next?
You now have:
- Secure, encrypted data storage
- Automated data validation
- Complete audit trail
- Monitoring and alerting
- Infrastructure as Code
- Data split for ML training
- Testing framework
In Part 3, we'll implement AIDLC Phase 2: Model Training:
- Custom SageMaker training containers with Docker
- Experiment tracking and versioning
- Model registry and governance
- Hyperparameter optimization
- Cost-effective Spot instance training
- Automated model evaluation
AIDLC Focus: Model Development, Training, and Governance
In Part 4, we'll complete the production ML platform:
- CI/CD pipelines for automated deployment
- SageMaker endpoints with auto-scaling
- Model monitoring and drift detection
- A/B testing and canary deployments
- Production observability and incident response
AIDLC Focus: Deployment, Monitoring, and Compliance
Key Takeaways
- Automate validation - Manual checks don't scale and miss edge cases
- Encrypt everything - KMS at rest, TLS in transit, no exceptions
- Audit all access - CloudTrail provides compliance and debugging trails
- Monitor data quality - Track metrics over time to detect degradation
- Use IaC - Terraform makes infrastructure reproducible and reviewable
- Package carefully - Lambda deployment packages must match runtime architecture
- Test locally first - Unit tests catch bugs before AWS deployment
- Split data properly - Training requires separate train/val sets
Remember: Good data pipelines prevent bad models. The AIDLC framework ensures quality from day one.
Resources
AWS Documentation:
- S3 Encryption Best Practices
- Lambda Best Practices
- Lambda Container Images
- CloudTrail Data Events
- Terraform AWS Provider
Tools:
Testing:
Related Articles:
Let's Connect!
- Questions about the data pipeline? Drop a comment below
- Follow me for Part 3 - SageMaker Training & Model Registry
- Like if this helped you build your pipeline
- Share with your team/connects
What data quality challenges are you facing? Let me know in the comments!
About the Author
Connect with me:
Tags: #aws #machinelearning #mlops #aidlc #dataengineering #terraform #lambda #s3 #cloudwatch #testing

Top comments (1)
Update: I published a strategic deep-dive on Medium explaining the why
behind every architecture decision in this series.
If you're wondering "why security-first instead of iterate-fast?" or
"why Spot instances despite interruptions?" — that post breaks down
the tradeoffs tutorials don't cover.
Link: shoaibalimir.medium.com/building-p...