Reading time: ~15-20 minutes
Level: Intermediate
Prerequisites: Basic AWS knowledge, familiarity with S3 and Lambda
Series: Part 2 of 4 - Read Part 1
Objective: This blog focuses on teaching core concepts and best practices for building data pipelines on AWS. The code examples are functional for learning purposes. Production deployments require additional hardening, testing, and organizational-specific configurations.
Welcome Back!
In Part 1, we explored the AIDLC framework and AWS architecture for secure ML pipelines. Now, let's get hands-on with Phase 1: Data Collection & Preparation.
What you'll build today:
- Encrypted S3 data lake with proper access controls
- Automated data validation pipeline with Lambda
- Data quality monitoring and alerting
- Complete audit trail with CloudTrail
- Infrastructure as Code with Terraform
By the end: You'll have a functional data pipeline that demonstrates security and quality best practices.
The Data Pipeline Problem
You can't build reliable ML models on unreliable data.
Common issues include:
Inconsistent data formats - Training fails due to schema changes
Missing security controls - Sensitive data exposed
No data validation - Bad data silently corrupts models
Manual processes - Doesn't scale, error-prone
No audit trail - Can't track data lineage
The solution:
An automated, secure, validated data pipeline.
Architecture Overview
Here's what we're building:
AWS Services Used:
- S3: Encrypted data storage with versioning
- Lambda: Serverless data validation
- KMS: Encryption key management
- CloudTrail: Audit logging
- CloudWatch: Monitoring and alerting
- SNS: Notifications
- Terraform: Infrastructure as Code
Step 1: Setting Up Encrypted S3 Buckets
Why Three Buckets?
Separation of concerns improves security and organization:
- Raw Data Bucket - Unvalidated data from sources
- Validated Data Bucket - Quality-checked, ready for training
- Model Artifacts Bucket - Trained models and metadata
Note: The complete S3 configuration is shown below. For production deployments, consider splitting configuration into modules for better organization and reusability.
Infrastructure Code
Create terraform/s3-buckets.tf:
# KMS key for encryption
resource "aws_kms_key" "data_encryption" {
description = "Encryption key for ML data pipeline"
deletion_window_in_days = 10
enable_key_rotation = true
tags = {
Name = "ml-data-encryption-key"
Environment = var.environment
Purpose = "ML-DataPipeline"
}
}
resource "aws_kms_alias" "data_encryption" {
name = "alias/ml-data-encryption"
target_key_id = aws_kms_key.data_encryption.key_id
}
# Raw data bucket
resource "aws_s3_bucket" "raw_data" {
bucket = "${var.project_name}-raw-data-${var.environment}-${data.aws_caller_identity.current.account_id}"
tags = {
Name = "ML Raw Data"
Environment = var.environment
DataStage = "Raw"
}
}
# Enable versioning
resource "aws_s3_bucket_versioning" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
versioning_configuration {
status = "Enabled"
}
}
# Enable encryption
resource "aws_s3_bucket_server_side_encryption_configuration" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.data_encryption.arn
}
bucket_key_enabled = true
}
}
# Block all public access
resource "aws_s3_bucket_public_access_block" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# Enforce encryption and secure transport
resource "aws_s3_bucket_policy" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "DenyUnencryptedObjectUploads"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.raw_data.arn}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "aws:kms"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.raw_data.arn,
"${aws_s3_bucket.raw_data.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
}
]
})
}
# Lifecycle policy to manage old versions
resource "aws_s3_bucket_lifecycle_configuration" "raw_data" {
bucket = aws_s3_bucket.raw_data.id
rule {
id = "delete-old-versions"
status = "Enabled"
noncurrent_version_expiration {
noncurrent_days = 90
}
}
rule {
id = "transition-to-glacier"
status = "Enabled"
transition {
days = 30
storage_class = "GLACIER"
}
}
}
# Validated data bucket
resource "aws_s3_bucket" "validated_data" {
bucket = "${var.project_name}-validated-data-${var.environment}-${data.aws_caller_identity.current.account_id}"
tags = {
Name = "ML Validated Data"
Environment = var.environment
DataStage = "Validated"
}
}
resource "aws_s3_bucket_versioning" "validated_data" {
bucket = aws_s3_bucket.validated_data.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "validated_data" {
bucket = aws_s3_bucket.validated_data.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.data_encryption.arn
}
bucket_key_enabled = true
}
}
resource "aws_s3_bucket_public_access_block" "validated_data" {
bucket = aws_s3_bucket.validated_data.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# Apply same security policies to validated bucket
resource "aws_s3_bucket_policy" "validated_data" {
bucket = aws_s3_bucket.validated_data.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "DenyUnencryptedObjectUploads"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.validated_data.arn}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "aws:kms"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.validated_data.arn,
"${aws_s3_bucket.validated_data.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
}
]
})
}
# Model artifacts bucket
resource "aws_s3_bucket" "model_artifacts" {
bucket = "${var.project_name}-model-artifacts-${var.environment}-${data.aws_caller_identity.current.account_id}"
tags = {
Name = "ML Model Artifacts"
Environment = var.environment
DataStage = "Models"
}
}
resource "aws_s3_bucket_versioning" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "aws:kms"
kms_master_key_id = aws_kms_key.data_encryption.arn
}
bucket_key_enabled = true
}
}
resource "aws_s3_bucket_public_access_block" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_policy" "model_artifacts" {
bucket = aws_s3_bucket.model_artifacts.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "DenyUnencryptedObjectUploads"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.model_artifacts.arn}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "aws:kms"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.model_artifacts.arn,
"${aws_s3_bucket.model_artifacts.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
}
]
})
}
Variables File
Create terraform/variables.tf:
variable "project_name" {
description = "Project name prefix"
type = string
default = "ml-pipeline"
}
variable "environment" {
description = "Environment (dev, staging, prod)"
type = string
default = "dev"
}
variable "aws_region" {
description = "AWS region"
type = string
default = "ap-south-1"
}
variable "notification_email" {
description = "Email for SNS notifications"
type = string
}
data "aws_caller_identity" "current" {}
Provider Configuration
Create terraform/providers.tf:
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
# For production, use remote state
# backend "s3" {
# bucket = "your-terraform-state-bucket"
# key = "ml-pipeline/terraform.tfstate"
# region = "ap-south-1"
# dynamodb_table = "terraform-state-lock"
# encrypt = true
# }
}
provider "aws" {
region = var.aws_region
default_tags {
tags = {
Project = var.project_name
ManagedBy = "Terraform"
}
}
}
Deploy the Infrastructure
cd terraform
# Initialize Terraform
terraform init
# Review the plan
terraform plan -var="notification_email=your-email@example.com"
# Apply the configuration
terraform apply -var="notification_email=your-email@example.com"
Step 2: Data Validation Lambda Function
Why Validate Data?
Prevent garbage in, garbage out:
- Catch schema changes early
- Detect data quality issues
- Ensure consistency
- Create audit trail
Important: After deploying, check your email and confirm the SNS subscription to receive notifications.
Validation Logic
Create lambda/data-validation/handler.py:
import json
import os
import boto3
import logging
import pandas as pd
from io import BytesIO
import hashlib
from datetime import datetime
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWS clients
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
# Validation rules
VALIDATION_RULES = {
'required_columns': ['timestamp', 'feature_1', 'feature_2', 'target'],
'numeric_columns': ['feature_1', 'feature_2', 'target'],
'max_null_percentage': 0.05, # 5% max
'min_rows': 5, # Adjusted for testing - use 100+ for production
'max_rows': 1000000,
'date_columns': ['timestamp']
}
def calculate_checksum(data: bytes) -> str:
"""Calculate SHA256 checksum"""
return hashlib.sha256(data).hexdigest()
def validate_schema(df: pd.DataFrame) -> dict:
"""Validate DataFrame schema"""
issues = []
# Check required columns
missing_cols = set(VALIDATION_RULES['required_columns']) - set(df.columns)
if missing_cols:
issues.append(f"Missing columns: {missing_cols}")
# Check numeric columns
for col in VALIDATION_RULES['numeric_columns']:
if col in df.columns:
if not pd.api.types.is_numeric_dtype(df[col]):
issues.append(f"Column '{col}' should be numeric")
# Check date columns
for col in VALIDATION_RULES['date_columns']:
if col in df.columns:
try:
# Actually convert to catch invalid dates
df[col] = pd.to_datetime(df[col])
except Exception as e:
issues.append(f"Column '{col}' has invalid datetime values: {str(e)}")
return {
'valid': len(issues) == 0,
'issues': issues
}
def validate_data_quality(df: pd.DataFrame) -> dict:
"""Validate data quality"""
issues = []
# Check row count
if len(df) < VALIDATION_RULES['min_rows']:
issues.append(
f"Insufficient rows: {len(df)} < {VALIDATION_RULES['min_rows']}"
)
elif len(df) > VALIDATION_RULES['max_rows']:
issues.append(
f"Too many rows: {len(df)} > {VALIDATION_RULES['max_rows']}"
)
# Check null values
for col in df.columns:
null_pct = df[col].isnull().sum() / len(df)
if null_pct > VALIDATION_RULES['max_null_percentage']:
issues.append(
f"Column '{col}' has {null_pct:.2%} nulls "
f"(max: {VALIDATION_RULES['max_null_percentage']:.2%})"
)
# Check duplicates
duplicate_count = df.duplicated().sum()
if duplicate_count > 0:
issues.append(f"Found {duplicate_count} duplicate rows")
# Data distribution checks
stats = {}
for col in VALIDATION_RULES['numeric_columns']:
if col in df.columns:
stats[col] = {
'mean': float(df[col].mean()),
'std': float(df[col].std()),
'min': float(df[col].min()),
'max': float(df[col].max()),
'null_count': int(df[col].isnull().sum())
}
return {
'valid': len(issues) == 0,
'issues': issues,
'stats': {
'row_count': len(df),
'column_count': len(df.columns),
'duplicate_count': duplicate_count,
'column_stats': stats
}
}
def send_notification(topic_arn: str, subject: str, message: str):
"""Send SNS notification"""
try:
sns_client.publish(
TopicArn=topic_arn,
Subject=subject,
Message=message
)
logger.info(f"Sent notification: {subject}")
except Exception as e:
logger.error(f"Failed to send notification: {e}")
def lambda_handler(event, context):
"""
Lambda handler triggered by S3 upload
"""
try:
# Parse S3 event
record = event['Records'][0]
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
logger.info(f"Processing: s3://{bucket}/{key}")
# Download file
response = s3_client.get_object(Bucket=bucket, Key=key)
file_content = response['Body'].read()
# Calculate checksum
checksum = calculate_checksum(file_content)
# Load data
df = pd.read_csv(BytesIO(file_content))
logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")
# Run validations
schema_result = validate_schema(df)
quality_result = validate_data_quality(df)
# Compile results
validation_result = {
'file': f"s3://{bucket}/{key}",
'timestamp': datetime.utcnow().isoformat(),
'checksum': checksum,
'schema_validation': schema_result,
'quality_validation': quality_result,
'status': (
'PASSED' if schema_result['valid'] and quality_result['valid']
else 'FAILED'
)
}
logger.info(f"Validation status: {validation_result['status']}")
# Save validation report
report_key = key.replace('raw/', 'reports/').replace('.csv', '_report.json')
s3_client.put_object(
Bucket=bucket,
Key=report_key,
Body=json.dumps(validation_result, indent=2),
ServerSideEncryption='aws:kms',
SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
)
# If passed, copy to validated bucket
if validation_result['status'] == 'PASSED':
validated_bucket = bucket.replace('raw-data', 'validated-data')
validated_key = key.replace('raw/', 'validated/')
# Explicitly set encryption on copy
s3_client.copy_object(
CopySource={'Bucket': bucket, 'Key': key},
Bucket=validated_bucket,
Key=validated_key,
ServerSideEncryption='aws:kms',
SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
)
logger.info(f"Copied to: s3://{validated_bucket}/{validated_key}")
# Send success notification
send_notification(
os.environ['SNS_TOPIC_ARN'],
f' Data Validation Passed: {key}',
f"File validated successfully\n\n"
f"Stats:\n"
f"- Rows: {quality_result['stats']['row_count']}\n"
f"- Columns: {quality_result['stats']['column_count']}\n"
f"- Duplicates: {quality_result['stats']['duplicate_count']}\n\n"
f"Validation report: s3://{bucket}/{report_key}"
)
else:
# Send failure notification
all_issues = schema_result['issues'] + quality_result['issues']
send_notification(
os.environ['SNS_TOPIC_ARN'],
f' Data Validation Failed: {key}',
f"Validation issues found:\n\n" + "\n".join(f"- {issue}" for issue in all_issues)
)
return {
'statusCode': 200,
'body': json.dumps(validation_result)
}
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
send_notification(
os.environ.get('SNS_TOPIC_ARN', ''),
f' Data Validation Error',
f"Error processing {key}:\n{str(e)}"
)
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
Lambda Dependencies
Create lambda/data-validation/requirements.txt:
pandas==2.1.0
boto3==1.28.85
Package Lambda Function
Important Note on Lambda Packaging: The pandas library includes platform-specific C extensions. Building on macOS or Windows will create a package incompatible with Lambda's Amazon Linux 2 runtime. Use one of these approaches:
Option 1: Docker Build (Recommended)
cd lambda/data-validation
# Build using Docker with Lambda runtime
docker run --rm -v "$PWD":/var/task public.ecr.aws/lambda/python:3.11 \
bash -c "pip install -r requirements.txt -t package/ && cp handler.py package/"
# Create deployment package
cd package
zip -r ../function.zip .
cd ..
Option 2: Use Lambda Layer
# Skip pandas installation and use AWS-managed layer
# Update terraform/lambda.tf to include:
# layers = ["arn:aws:lambda:ap-south-1:336392948345:layer:AWSSDKPandas-Python311:12"]
Option 3: EC2/Cloud9 Build
# Build on an Amazon Linux 2 instance
pip install -r requirements.txt -t package/
cp handler.py package/
cd package && zip -r ../function.zip . && cd ..
Step 3: Lambda Infrastructure
Create terraform/lambda.tf:
# IAM role for Lambda
resource "aws_iam_role" "data_validation" {
name = "${var.project_name}-data-validation-lambda"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}]
})
}
# IAM policy for Lambda
resource "aws_iam_role_policy" "data_validation" {
name = "${var.project_name}-data-validation-policy"
role = aws_iam_role.data_validation.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject"
]
Resource = [
"${aws_s3_bucket.raw_data.arn}/*",
"${aws_s3_bucket.validated_data.arn}/*"
]
},
{
Effect = "Allow"
Action = [
"s3:ListBucket"
]
Resource = [
aws_s3_bucket.raw_data.arn,
aws_s3_bucket.validated_data.arn
]
},
{
Effect = "Allow"
Action = [
"kms:Decrypt",
"kms:GenerateDataKey"
]
Resource = aws_kms_key.data_encryption.arn
},
{
Effect = "Allow"
Action = "sns:Publish"
Resource = aws_sns_topic.validation_notifications.arn
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
}
]
})
}
# SNS topic for notifications
resource "aws_sns_topic" "validation_notifications" {
name = "${var.project_name}-validation-notifications"
kms_master_key_id = aws_kms_key.data_encryption.id
}
resource "aws_sns_topic_subscription" "email" {
topic_arn = aws_sns_topic.validation_notifications.arn
protocol = "email"
endpoint = var.notification_email
}
# Lambda function
resource "aws_lambda_function" "data_validation" {
filename = "${path.module}/../lambda/data-validation/function.zip"
function_name = "${var.project_name}-data-validation"
role = aws_iam_role.data_validation.arn
handler = "handler.lambda_handler"
runtime = "python3.11"
timeout = 300
memory_size = 1024
source_code_hash = filebase64sha256("${path.module}/../lambda/data-validation/function.zip")
environment {
variables = {
SNS_TOPIC_ARN = aws_sns_topic.validation_notifications.arn
KMS_KEY_ID = aws_kms_key.data_encryption.id
}
}
# Optional: Use AWS-managed pandas layer instead of packaging it
# layers = ["arn:aws:lambda:${var.aws_region}:336392948345:layer:AWSSDKPandas-Python311:12"]
tags = {
Name = "Data Validation Lambda"
Environment = var.environment
}
}
# S3 trigger
resource "aws_s3_bucket_notification" "raw_data_trigger" {
bucket = aws_s3_bucket.raw_data.id
lambda_function {
lambda_function_arn = aws_lambda_function.data_validation.arn
events = ["s3:ObjectCreated:*"]
filter_prefix = "raw/"
filter_suffix = ".csv"
}
depends_on = [
aws_lambda_permission.allow_s3,
aws_lambda_function.data_validation
]
}
resource "aws_lambda_permission" "allow_s3" {
statement_id = "AllowS3Invoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.data_validation.function_name
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.raw_data.arn
}
# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "data_validation" {
name = "/aws/lambda/${aws_lambda_function.data_validation.function_name}"
retention_in_days = 30
kms_key_id = aws_kms_key.data_encryption.arn
}
Step 4: CloudTrail for Audit Logging
Create terraform/cloudtrail.tf:
# CloudTrail logs bucket
resource "aws_s3_bucket" "cloudtrail_logs" {
bucket = "${var.project_name}-cloudtrail-logs-${data.aws_caller_identity.current.account_id}"
}
resource "aws_s3_bucket_versioning" "cloudtrail_logs" {
bucket = aws_s3_bucket.cloudtrail_logs.id
versioning_configuration {
status = "Enabled"
}
}
resource "aws_s3_bucket_server_side_encryption_configuration" "cloudtrail_logs" {
bucket = aws_s3_bucket.cloudtrail_logs.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
resource "aws_s3_bucket_public_access_block" "cloudtrail_logs" {
bucket = aws_s3_bucket.cloudtrail_logs.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_s3_bucket_policy" "cloudtrail_logs" {
bucket = aws_s3_bucket.cloudtrail_logs.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AWSCloudTrailAclCheck"
Effect = "Allow"
Principal = {
Service = "cloudtrail.amazonaws.com"
}
Action = "s3:GetBucketAcl"
Resource = aws_s3_bucket.cloudtrail_logs.arn
},
{
Sid = "AWSCloudTrailWrite"
Effect = "Allow"
Principal = {
Service = "cloudtrail.amazonaws.com"
}
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.cloudtrail_logs.arn}/*"
Condition = {
StringEquals = {
"s3:x-amz-acl" = "bucket-owner-full-control"
}
}
},
{
Sid = "DenyInsecureTransport"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.cloudtrail_logs.arn,
"${aws_s3_bucket.cloudtrail_logs.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
}
]
})
}
# CloudTrail
resource "aws_cloudtrail" "data_events" {
name = "${var.project_name}-data-trail"
s3_bucket_name = aws_s3_bucket.cloudtrail_logs.id
include_global_service_events = true
is_multi_region_trail = true
enable_logging = true
enable_log_file_validation = true
depends_on = [aws_s3_bucket_policy.cloudtrail_logs]
event_selector {
read_write_type = "All"
include_management_events = true
data_resource {
type = "AWS::S3::Object"
values = [
"${aws_s3_bucket.raw_data.arn}/*",
"${aws_s3_bucket.validated_data.arn}/*",
"${aws_s3_bucket.model_artifacts.arn}/*"
]
}
}
tags = {
Name = "ML Data Trail"
Environment = var.environment
}
}
Step 5: CloudWatch Monitoring
Create terraform/monitoring.tf:
# CloudWatch dashboard
resource "aws_cloudwatch_dashboard" "data_pipeline" {
dashboard_name = "${var.project_name}-data-pipeline"
dashboard_body = jsonencode({
widgets = [
{
type = "metric"
x = 0
y = 0
width = 12
height = 6
properties = {
metrics = [
["AWS/Lambda", "Invocations", {
stat = "Sum"
label = "Lambda Invocations"
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}],
[".", "Errors", {
stat = "Sum"
label = "Lambda Errors"
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}],
[".", "Duration", {
stat = "Average"
label = "Avg Duration (ms)"
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}]
]
period = 300
stat = "Average"
region = var.aws_region
title = "Lambda Metrics"
view = "timeSeries"
stacked = false
}
}
]
})
}
# Alarms
resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
alarm_name = "${var.project_name}-lambda-errors"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "Errors"
namespace = "AWS/Lambda"
period = "300"
statistic = "Sum"
threshold = "5"
alarm_description = "Lambda function errors"
alarm_actions = [aws_sns_topic.validation_notifications.arn]
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}
resource "aws_cloudwatch_metric_alarm" "lambda_throttles" {
alarm_name = "${var.project_name}-lambda-throttles"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "Throttles"
namespace = "AWS/Lambda"
period = "300"
statistic = "Sum"
threshold = "10"
alarm_description = "Lambda function throttling"
alarm_actions = [aws_sns_topic.validation_notifications.arn]
dimensions = {
FunctionName = aws_lambda_function.data_validation.function_name
}
}
Step 6: Testing the Pipeline
1. Create Test Data
Create test-data/sample.csv:
timestamp,feature_1,feature_2,target
2024-01-01T00:00:00,1.5,2.3,0
2024-01-01T01:00:00,1.8,2.1,1
2024-01-01T02:00:00,1.2,2.5,0
2024-01-01T03:00:00,1.9,2.0,1
2024-01-01T04:00:00,1.4,2.4,0
2. Upload to S3
# Get your AWS account ID
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
# Upload test file
aws s3 cp test-data/sample.csv \
s3://ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}/raw/sample.csv \
--server-side-encryption aws:kms
3. Monitor Lambda Execution
# View Lambda logs in real-time
aws logs tail /aws/lambda/ml-pipeline-data-validation --follow
# Or check specific log streams
aws logs describe-log-streams \
--log-group-name /aws/lambda/ml-pipeline-data-validation \
--order-by LastEventTime \
--descending \
--max-items 1
4. Verify Validated Data
# Check validated bucket
aws s3 ls s3://ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}/validated/
# Download validation report
aws s3 cp \
s3://ml-pipeline-raw-data-dev-${AWS_ACCOUNT_ID}/reports/sample_report.json \
./validation-report.json
# View report
cat validation-report.json | python -m json.tool
5. Check Email Notification
You should receive an email notification with validation results. If not:
- Confirm you accepted the SNS subscription
- Check CloudWatch logs for errors
- Verify SNS topic permissions
Step 7: Advanced Features
Data Quality Metrics
Add custom CloudWatch metrics in Lambda:
import boto3
cloudwatch = boto3.client('cloudwatch')
def publish_metrics(validation_result):
"""Publish custom metrics to CloudWatch"""
metrics = [
{
'MetricName': 'DataQualityScore',
'Value': 100.0 if validation_result['status'] == 'PASSED' else 0.0,
'Unit': 'Percent',
'Timestamp': datetime.utcnow()
},
{
'MetricName': 'RowCount',
'Value': validation_result['quality_validation']['stats']['row_count'],
'Unit': 'Count',
'Timestamp': datetime.utcnow()
},
{
'MetricName': 'DuplicateRows',
'Value': validation_result['quality_validation']['stats']['duplicate_count'],
'Unit': 'Count',
'Timestamp': datetime.utcnow()
}
]
cloudwatch.put_metric_data(
Namespace='MLPipeline/DataQuality',
MetricData=metrics
)
Data Versioning
Track data lineage:
def create_data_lineage(bucket, key, checksum):
"""Create data lineage metadata"""
metadata = {
'source_file': key,
'checksum': checksum,
'ingestion_time': datetime.utcnow().isoformat(),
'validation_version': '1.0',
'pipeline_version': os.environ.get('PIPELINE_VERSION', 'unknown')
}
# Store lineage in S3
lineage_key = f"lineage/{checksum}.json"
s3_client.put_object(
Bucket=bucket,
Key=lineage_key,
Body=json.dumps(metadata, indent=2),
ServerSideEncryption='aws:kms',
SSEKMSKeyId=os.environ.get('KMS_KEY_ID')
)
Important Notes Before Deploying
Known Considerations
1. Lambda Package Size and Compatibility
The pandas library (~100MB) has platform-specific dependencies. Building on macOS/Windows creates incompatible packages. Solutions:
- Recommended: Use Docker build (shown in Step 2)
-
Alternative: Use AWS-managed Lambda Layer (uncomment in
lambda.tf) - For CI/CD: Build in CodeBuild or GitHub Actions with Amazon Linux 2 image
2. Validation Rules
The sample uses min_rows: 5 for testing. Adjust for production:
VALIDATION_RULES = {
'required_columns': ['your', 'columns', 'here'],
'numeric_columns': ['numeric', 'columns'],
'max_null_percentage': 0.05, # Adjust based on requirements
'min_rows': 100, # Increase for production
'max_rows': 10000000,
'date_columns': ['timestamp']
}
3. Terraform State Management
For production, use remote state with locking:
# In providers.tf
terraform {
backend "s3" {
bucket = "your-terraform-state-bucket"
key = "ml-pipeline/terraform.tfstate"
region = "ap-south-1"
dynamodb_table = "terraform-state-lock"
encrypt = true
}
}
4. Cost Monitoring
The estimated costs (~$9.50/month) assume minimal development usage. For production:
- Enable AWS Cost Explorer
- Set up AWS Budgets with email alerts
- Monitor CloudWatch Logs storage (can grow quickly)
- Review CloudTrail data event costs (especially with high S3 activity)
Actual CloudTrail costs for 10K Lambda invocations: ~$20-30/month (not $2 as estimated).
5. Network Isolation (Production Enhancement)
For maximum security, deploy Lambda in a VPC:
# In lambda.tf, add to aws_lambda_function:
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.lambda.id]
}
# Requires NAT Gateway or VPC endpoints for S3/SNS access
Security Best Practices Checklist
Encryption:
- KMS encryption for all S3 buckets
- Enforce encryption in bucket policies
- TLS in transit (enforced by bucket policy)
- Encrypted SNS topics
- Encrypted CloudWatch logs
Access Control:
- Least privilege IAM roles
- No public bucket access
- S3 bucket policies deny unencrypted uploads
- S3:ListBucket permission for Lambda (enables proper error handling)
Audit & Compliance:
- CloudTrail logging all data access
- CloudWatch logs retention policy
- SNS notifications for failures
- Log file validation enabled
Data Quality:
- Automated validation
- Schema enforcement
- Quality metrics tracking
- Data checksums for integrity
Cost Breakdown
Estimated Monthly Costs (Development with ~1000 validations/month):
| Service | Usage | Realistic Cost/Month |
|---|---|---|
| S3 Storage (100GB) | Standard tier | ~$2.30 |
| Lambda | 1K invocations, 512MB, 30s avg | ~$0.10 |
| CloudWatch Logs | 10GB ingestion + storage | ~$5.00 |
| CloudTrail | Data events (1K S3 operations) | ~$2.00 |
| SNS | 100 notifications | ~$0.01 |
| Total | ~$9.41 |
Production Scaling (100K validations/month):
| Service | Usage | Cost/Month |
|---|---|---|
| S3 Storage (1TB) | With lifecycle policies | ~$15-20 |
| Lambda | 100K invocations | ~$8-12 |
| CloudWatch Logs | 100GB | ~$50 |
| CloudTrail | 100K S3 operations | ~$200 |
| Total | ~$273-282 |
Cost Optimization Tips:
- Use S3 Intelligent-Tiering for automatic cost optimization
- Implement CloudWatch Logs retention policies (30 days recommended)
- Consider CloudTrail Insights instead of all data events for production
- Use Lambda reserved concurrency to prevent runaway costs
Troubleshooting Guide
Issue: Lambda timeout
# Symptoms: Functions timing out on large files
# Solutions:
# 1. Increase timeout in terraform/lambda.tf
timeout = 900 # 15 minutes (max)
# 2. Increase memory (also increases CPU)
memory_size = 2048 # Up to 10,240 MB
# 3. Process files in chunks for very large datasets
Issue: Permission denied errors
# Verify IAM permissions
aws iam get-role-policy \
--role-name ml-pipeline-data-validation-lambda \
--policy-name ml-pipeline-data-validation-policy
# Check KMS key policy allows Lambda role
aws kms describe-key --key-id alias/ml-data-encryption
# Verify S3 bucket policy
aws s3api get-bucket-policy \
--bucket ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID
Issue: Lambda package incompatibility
# Error: "cannot import name '_imaging' from 'PIL'"
# Cause: Built on wrong architecture (macOS/Windows)
# Solution: Use Docker build method from Step 2
# Verify Lambda runtime architecture
aws lambda get-function --function-name ml-pipeline-data-validation \
--query 'Configuration.Architectures'
Issue: S3 event not triggering Lambda
# Check Lambda permissions
aws lambda get-policy --function-name ml-pipeline-data-validation
# Verify S3 bucket notification configuration
aws s3api get-bucket-notification-configuration \
--bucket ml-pipeline-raw-data-dev-YOUR_ACCOUNT_ID
# Test manually
aws lambda invoke \
--function-name ml-pipeline-data-validation \
--payload file://test-event.json \
response.json
Issue: SNS notifications not received
# Check subscription status
aws sns list-subscriptions-by-topic \
--topic-arn arn:aws:sns:ap-south-1:ACCOUNT_ID:ml-pipeline-validation-notifications
# Subscription must show "SubscriptionArn" (not "PendingConfirmation")
# If pending, check email spam folder for confirmation link
What's Next?
You now have:
- Secure, encrypted data storage
- Automated data validation
- Complete audit trail
- Monitoring and alerting
- Infrastructure as Code
In Part 3, we'll build the training pipeline with SageMaker:
- Custom training containers with Docker
- Distributed training infrastructure
- Experiment tracking with MLflow
- Model versioning and registry
- Hyperparameter optimization with cost-effective Spot instances
In Part 4 (Series Finale), we'll complete the production ML platform:
- CI/CD pipelines for automated deployment
- SageMaker endpoints with auto-scaling
- Model monitoring and drift detection
- A/B testing and canary deployments
- Production observability and incident response
Key Takeaways
- Automate validation - Manual checks don't scale and miss edge cases
- Encrypt everything - KMS at rest, TLS in transit, no exceptions
- Audit all access - CloudTrail provides compliance and debugging trails
- Monitor data quality - Track metrics over time to detect degradation
- Use IaC - Terraform makes infrastructure reproducible and reviewable
- Package carefully - Lambda deployment packages must match runtime architecture
Remember: Good data pipelines prevent bad models. Invest time in data quality upfront.
Resources
AWS Documentation:
- S3 Encryption Best Practices
- Lambda Best Practices
- Lambda Container Images
- CloudTrail Data Events
- Terraform AWS Provider
Tools:
Related Articles:
Let's Connect!
- Questions about the data pipeline? Drop a comment below
- Follow me for Part 3 - SageMaker Training
- Like if this helped you build your pipeline
- Share with your team
What data quality challenges are you facing? Let me know in the comments!
About the Author
Connect with me:
Tags: #aws #machinelearning #mlops #dataengineering #terraform #lambda #s3 #cloudwatch

Top comments (0)