Reading time: ~20-25 minutes
Level: Intermediate to Advanced
Prerequisites: Docker installed locally, completed Parts 1 & 2 (especially the train/val data split from Part 2, Step 6.5)
Series: Part 3 of 4 - Part 1 | Part 2Important Production Considerations
This article demonstrates core SageMaker training concepts with functional code. For production deployments:
- MLflow Integration: Requires VPC deployment with proper networking (simplified here for learning)
- Data Preparation: Use train/val splits from Part 2 (verification steps included)
- Security: Production requires VPC isolation, not shown here to keep focus on SageMaker fundamentals
- Cost Management: Monitor usage closely, especially with hyperparameter tuning
The code focuses on teaching SageMaker fundamentals—production readiness requires additional hardening covered in recommendations part.
Welcome to Part 3!
In Part 1, we covered the complete AIDLC framework. In Part 2, we built a secure data pipeline with automated validation and prepared train/validation splits.
AIDLC Framework Progress:
Part 2:
- Phase 1: Data Collection & Preparation
- Phase 6: Governance (CloudTrail, KMS, IAM)
Part 3:
- Phase 2: Model Development & Training
- Phase 3: Model Evaluation (validation metrics)
- Phase 6: Governance (experiment tracking, model versioning)
Part 4:
- Phase 4: Model Deployment
- Phase 5: Monitoring & Maintenance
- Phase 6: Governance (CI/CD, compliance)
Now it's time for the exciting part: training ML models at scale with SageMaker.
What you'll build today:
- SageMaker training infrastructure with custom containers
- Experiment tracking with MLflow (optional, local development only)
- Model versioning and registry
- Cost optimization with Spot instances
- Automated hyperparameter tuning
By the end: You'll have a functional training pipeline demonstrating AWS SageMaker best practices within the AIDLC framework.
The ML Training Problem
Training models manually doesn't scale. Common issues:
Inconsistent environments - "Works on my machine"
Lost experiments - Can't reproduce winning model
Expensive compute - Burning money on idle GPUs
No versioning - Which model is in production?
Manual tuning - Hyperparameter search takes forever
The solution:
An automated, tracked, cost-optimized training pipeline that implements AIDLC Phase 2 (Model Development & Training).
Architecture Overview
AIDLC Phase 2 Reference Architecture
Here's what we're building for Phase 2:
Architecture Note: This implements AIDLC Phase 2 (Model Development & Training), building on the secure data foundation from Phase 1 (Part 2). Model deployment (Phase 4) and monitoring (Phase 5) will be covered in Part 4.
AWS Services Used:
- SageMaker Training: Managed training infrastructure
- ECR: Container registry for custom images
- S3: Data storage and model artifacts (from Part 2)
- Spot Instances: 70% cost savings
- CloudWatch: Training metrics and logging
- SageMaker Model Registry: Model versioning (AIDLC Phase 6)
Step 1: Custom Training Container
Why Custom Containers?
SageMaker built-in algorithms are great, but custom containers give you:
- Full control over dependencies
- Any ML framework (PyTorch, TensorFlow, scikit-learn)
- Custom preprocessing logic
- Integration with your tools
Training Script
Create training/train.py:
import os
import json
import argparse
import numpy as np
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import boto3
# SageMaker environment variables
SM_MODEL_DIR = os.environ.get('SM_MODEL_DIR', '/opt/ml/model')
SM_CHANNEL_TRAINING = os.environ.get('SM_CHANNEL_TRAINING', '/opt/ml/input/data/training')
SM_CHANNEL_VALIDATION = os.environ.get('SM_CHANNEL_VALIDATION', '/opt/ml/input/data/validation')
SM_OUTPUT_DATA_DIR = os.environ.get('SM_OUTPUT_DATA_DIR', '/opt/ml/output/data')
def setup_mlflow():
"""
Configure MLflow tracking - optional for SageMaker
Note: MLflow integration from SageMaker requires:
- MLflow deployed in same VPC as SageMaker
- Proper DNS/ALB endpoint for MLFLOW_TRACKING_URI
- For simplicity, this example makes MLflow optional
For local development, MLflow works great. For SageMaker production,
use SageMaker Experiments instead or deploy MLflow with proper VPC setup.
"""
mlflow_uri = os.environ.get('MLFLOW_TRACKING_URI')
if not mlflow_uri:
print("MLflow tracking disabled (MLFLOW_TRACKING_URI not set)")
print("Using SageMaker's built-in experiment tracking instead")
return False
try:
import mlflow
import mlflow.sklearn
mlflow.set_tracking_uri(mlflow_uri)
mlflow.set_experiment('sagemaker-training')
print(f"MLflow tracking enabled: {mlflow_uri}")
return True
except Exception as e:
print(f"MLflow unavailable: {e}. Continuing without MLflow tracking.")
return False
def load_data(data_path):
"""Load training data from S3"""
print(f"Loading data from {data_path}")
# List all CSV files
files = [f for f in os.listdir(data_path) if f.endswith('.csv')]
if not files:
raise ValueError(f"No CSV files found in {data_path}")
# Combine all files
dfs = []
for file in files:
df = pd.read_csv(os.path.join(data_path, file))
dfs.append(df)
data = pd.concat(dfs, ignore_index=True)
print(f"Loaded {len(data)} samples with {len(data.columns)} features")
return data
def prepare_features(data):
"""Prepare features and target"""
# Drop timestamp column if present
if 'timestamp' in data.columns:
data = data.drop('timestamp', axis=1)
# Assuming last column is target
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
print(f"Features shape: {X.shape}, Target shape: {y.shape}")
return X, y
def train_model(X_train, y_train, hyperparameters):
"""Train Random Forest model"""
print("Training Random Forest model...")
model = RandomForestClassifier(
n_estimators=hyperparameters['n_estimators'],
max_depth=hyperparameters['max_depth'],
min_samples_split=hyperparameters['min_samples_split'],
min_samples_leaf=hyperparameters['min_samples_leaf'],
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
print("Training completed")
return model
def evaluate_model(model, X_val, y_val):
"""Evaluate model on validation set"""
print("Evaluating model...")
y_pred = model.predict(X_val)
metrics = {
'accuracy': float(accuracy_score(y_val, y_pred)),
'precision': float(precision_score(y_val, y_pred, average='weighted', zero_division=0)),
'recall': float(recall_score(y_val, y_pred, average='weighted', zero_division=0)),
'f1_score': float(f1_score(y_val, y_pred, average='weighted', zero_division=0))
}
# Print as JSON for SageMaker regex metric extraction
print(f"Validation Metrics: {json.dumps(metrics)}")
return metrics
def save_model(model, model_dir):
"""Save model artifact"""
print(f"Saving model to {model_dir}")
# Ensure directory exists
os.makedirs(model_dir, exist_ok=True)
model_path = os.path.join(model_dir, 'model.joblib')
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")
return model_path
def save_metrics(metrics, output_dir):
"""Save metrics for SageMaker"""
os.makedirs(output_dir, exist_ok=True)
metrics_path = os.path.join(output_dir, 'metrics.json')
with open(metrics_path, 'w') as f:
json.dump(metrics, f, indent=2)
print(f"Metrics saved to {metrics_path}")
def main():
"""Main training loop"""
parser = argparse.ArgumentParser()
# Hyperparameters
parser.add_argument('--n_estimators', type=int, default=100)
parser.add_argument('--max_depth', type=int, default=10)
parser.add_argument('--min_samples_split', type=int, default=2)
parser.add_argument('--min_samples_leaf', type=int, default=1)
args, _ = parser.parse_known_args()
hyperparameters = {
'n_estimators': args.n_estimators,
'max_depth': args.max_depth,
'min_samples_split': args.min_samples_split,
'min_samples_leaf': args.min_samples_leaf
}
print(f"Hyperparameters: {json.dumps(hyperparameters)}")
# Setup MLflow (optional - works for local dev, not SageMaker without VPC)
mlflow_enabled = setup_mlflow()
# Training logic
try:
# Load data
train_data = load_data(SM_CHANNEL_TRAINING)
val_data = load_data(SM_CHANNEL_VALIDATION)
# Prepare features
X_train, y_train = prepare_features(train_data)
X_val, y_val = prepare_features(val_data)
# Train model
model = train_model(X_train, y_train, hyperparameters)
# Evaluate model (AIDLC Phase 3: Model Evaluation)
metrics = evaluate_model(model, X_val, y_val)
# Log to MLflow if enabled (local development only)
if mlflow_enabled:
import mlflow
import mlflow.sklearn
with mlflow.start_run():
mlflow.log_params(hyperparameters)
mlflow.log_param('train_samples', len(X_train))
mlflow.log_param('val_samples', len(X_val))
mlflow.log_param('n_features', X_train.shape[1])
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, "model")
# Save model
model_path = save_model(model, SM_MODEL_DIR)
# Save metrics for SageMaker
save_metrics(metrics, SM_OUTPUT_DATA_DIR)
print("Training completed successfully!")
except Exception as e:
print(f"Training failed: {e}")
raise
if __name__ == '__main__':
main()
Dockerfile
Create training/Dockerfile:
FROM python:3.11-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /opt/ml/code
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy training script
COPY train.py .
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV SAGEMAKER_PROGRAM=train.py
# Entry point
ENTRYPOINT ["python", "train.py"]
Requirements
Create training/requirements.txt:
scikit-learn==1.3.0
pandas==2.1.0
numpy==1.24.3
joblib==1.3.2
boto3==1.28.85
mlflow==2.7.1
Build and Push Container
# Set variables
export AWS_REGION="ap-south-1" # Change to your region
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/ml-training"
# Build Docker image
cd training
docker build -t ml-training:latest .
# Tag for ECR
docker tag ml-training:latest ${ECR_REPO}:latest
# Login to ECR
aws ecr get-login-password --region ${AWS_REGION} | \
docker login --username AWS --password-stdin ${ECR_REPO}
# Create ECR repository if not exists
aws ecr create-repository \
--repository-name ml-training \
--region ${AWS_REGION} \
--image-scanning-configuration scanOnPush=true \
--encryption-configuration encryptionType=KMS || true
# Push to ECR
docker push ${ECR_REPO}:latest
echo "Container pushed to: ${ECR_REPO}:latest"
Step 2: Verify Data Preparation
Prerequisites: You should have completed Part 2, Step 6.5 (data splitting). This step verifies your data is ready for training.
Check Train/Val Split Exists
# Set your bucket name (from Part 2)
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}"
# Verify training data
echo "Checking training data..."
aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/
# Verify validation data
echo "Checking validation data..."
aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/
Expected output:
Checking training data...
2024-12-27 10:30:45 1234 sample.csv
Checking validation data...
2024-12-27 10:30:45 308 sample.csv
If Data Split Missing
If you see "An error occurred (NoSuchKey)", go back to Part 2, Step 6.5 and run either:
Option 1: Update your Lambda (automated, recommended)
Option 2: Run the manual split script:
# Quick fix: Manual split
python scripts/split_data.py \
${VALIDATED_BUCKET} \
validated/sample.csv \
validated
# Verify again
aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/
aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/
Important: SageMaker training requires separate train and validation data paths. Without this split, training jobs will fail.
Step 3: SageMaker Training Infrastructure
Create terraform/sagemaker.tf:
# ECR Repository for training images
resource "aws_ecr_repository" "ml_training" {
name = "${var.project_name}-training"
image_tag_mutability = "MUTABLE"
image_scanning_configuration {
scan_on_push = true
}
encryption_configuration {
encryption_type = "KMS"
kms_key = aws_kms_key.data_encryption.arn
}
tags = {
Name = "ML Training Repository"
Environment = var.environment
}
}
# ECR repository policy for SageMaker
resource "aws_ecr_repository_policy" "ml_training" {
repository = aws_ecr_repository.ml_training.name
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowSageMakerPull"
Effect = "Allow"
Principal = {
Service = "sagemaker.amazonaws.com"
}
Action = [
"ecr:BatchGetImage",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchCheckLayerAvailability"
]
}
]
})
}
# SageMaker Execution Role
resource "aws_iam_role" "sagemaker_execution" {
name = "${var.project_name}-sagemaker-execution"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "sagemaker.amazonaws.com"
}
}]
})
}
# SageMaker Execution Policy
resource "aws_iam_role_policy" "sagemaker_execution" {
name = "${var.project_name}-sagemaker-policy"
role = aws_iam_role.sagemaker_execution.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket"
]
Resource = [
aws_s3_bucket.validated_data.arn,
"${aws_s3_bucket.validated_data.arn}/*",
aws_s3_bucket.model_artifacts.arn,
"${aws_s3_bucket.model_artifacts.arn}/*"
]
},
{
Effect = "Allow"
Action = [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage"
]
Resource = "*"
},
{
Effect = "Allow"
Action = [
"kms:Decrypt",
"kms:GenerateDataKey"
]
Resource = aws_kms_key.data_encryption.arn
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
},
{
Effect = "Allow"
Action = [
"cloudwatch:PutMetricData"
]
Resource = "*"
}
]
})
}
# SageMaker Model Registry (AIDLC Phase 6: Governance)
resource "aws_sagemaker_model_package_group" "ml_models" {
model_package_group_name = "${var.project_name}-models"
model_package_group_description = "ML model registry for ${var.project_name}"
tags = {
Name = "ML Model Registry"
Environment = var.environment
}
}
Deploy the infrastructure:
cd terraform
terraform apply -var="notification_email=your-email@example.com"
Step 4: Training Job Configuration
Create training/training_config.py:
import boto3
import sagemaker
from sagemaker.estimator import Estimator
from datetime import datetime
import os
# Configuration
PROJECT_NAME = os.environ.get('PROJECT_NAME', 'ml-pipeline')
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
AWS_REGION = os.environ.get('AWS_REGION', 'ap-south-1')
# Initialize SageMaker session
sagemaker_session = sagemaker.Session()
account_id = boto3.client('sts').get_caller_identity()['Account']
role = f"arn:aws:iam::{account_id}:role/{PROJECT_NAME}-sagemaker-execution"
# ECR image URI
image_uri = f"{account_id}.dkr.ecr.{AWS_REGION}.amazonaws.com/{PROJECT_NAME}-training:latest"
# S3 paths - properly split data from Part 2
s3_bucket = f"{PROJECT_NAME}-validated-data-{ENVIRONMENT}-{account_id}"
s3_output = f"{PROJECT_NAME}-model-artifacts-{ENVIRONMENT}-{account_id}"
training_data = f"s3://{s3_bucket}/validated/train/"
validation_data = f"s3://{s3_bucket}/validated/val/"
output_path = f"s3://{s3_output}/models/"
def create_training_job(
instance_type='ml.m5.xlarge',
instance_count=1,
use_spot_instances=True,
hyperparameters=None
):
"""
Create and run a SageMaker training job
"""
if hyperparameters is None:
hyperparameters = {
'n_estimators': 100,
'max_depth': 10,
'min_samples_split': 2,
'min_samples_leaf': 1
}
# Create estimator
estimator = Estimator(
image_uri=image_uri,
role=role,
instance_count=instance_count,
instance_type=instance_type,
output_path=output_path,
sagemaker_session=sagemaker_session,
hyperparameters=hyperparameters,
use_spot_instances=use_spot_instances,
max_wait=7200 if use_spot_instances else None, # 2 hours
max_run=3600, # 1 hour
volume_size=30, # GB
encrypt_inter_container_traffic=True,
enable_network_isolation=False, # Set True for max security
tags=[
{'Key': 'Project', 'Value': PROJECT_NAME},
{'Key': 'Environment', 'Value': ENVIRONMENT},
{'Key': 'ManagedBy', 'Value': 'Terraform'}
]
)
# Start training
job_name = f"{PROJECT_NAME}-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
print(f"Starting training job: {job_name}")
print(f"Training data: {training_data}")
print(f"Validation data: {validation_data}")
estimator.fit(
inputs={
'training': training_data,
'validation': validation_data
},
job_name=job_name,
wait=True,
logs='All'
)
return estimator, job_name
if __name__ == '__main__':
print("Starting SageMaker training job...")
# Run training
estimator, job_name = create_training_job(
instance_type='ml.m5.xlarge',
use_spot_instances=True
)
print(f"\nTraining job completed: {job_name}")
print(f"Model artifacts: {estimator.model_data}")
Step 5: Local Testing (CRITICAL STEP)
ALWAYS TEST LOCALLY FIRST
Local testing catches 90% of issues before you spend money on SageMaker. This is the most important step to avoid wasting time and money.
Why Test Locally?
- Faster iteration: Seconds vs minutes
- Zero AWS costs: No SageMaker charges
- Easier debugging: Full Docker logs locally
- Quick fixes: Edit code, rebuild, retest immediately
1. Prepare Test Environment
# Build container
cd training
docker build -t ml-training:test .
# Create SageMaker directory structure
mkdir -p test-sagemaker/{input/data/training,input/data/validation,model,output/data}
# Copy your split test data (from Part 2)
# If you don't have test data, create minimal samples
cat > test-sagemaker/input/data/training/sample.csv << 'EOF'
timestamp,feature_1,feature_2,target
2024-01-01T00:00:00,1.5,2.3,0
2024-01-01T01:00:00,1.8,2.1,1
2024-01-01T02:00:00,1.2,2.5,0
2024-01-01T03:00:00,1.9,2.0,1
2024-01-01T04:00:00,1.4,2.4,0
2024-01-01T05:00:00,1.6,2.2,1
EOF
cat > test-sagemaker/input/data/validation/sample.csv << 'EOF'
timestamp,feature_1,feature_2,target
2024-01-01T06:00:00,1.7,2.1,0
2024-01-01T07:00:00,1.3,2.6,1
EOF
2. Run Local Training
# Run container with test data
docker run --rm \
-v $(pwd)/test-sagemaker:/opt/ml \
ml-training:test \
--n_estimators 50 \
--max_depth 5
# Expected output:
# Hyperparameters: {"n_estimators": 50, "max_depth": 5, ...}
# Loading data from /opt/ml/input/data/training
# Loaded 6 samples with 4 features
# Training Random Forest model...
# Training completed
# Evaluating model...
# Validation Metrics: {"accuracy": 0.5, "precision": 0.5, ...}
# Model saved to /opt/ml/model/model.joblib
# Training completed successfully!
3. Verify Outputs
# Check if model was created
ls -lh test-sagemaker/model/
# Should see: model.joblib
# Check metrics
cat test-sagemaker/output/data/metrics.json
# Should see JSON with metrics
# Load model to verify it works
python3 << 'EOF'
import joblib
model = joblib.load('test-sagemaker/model/model.joblib')
print(f"Model loaded: {type(model)}")
print(f"Features: {model.n_features_in_}")
EOF
4. Test Different Hyperparameters
# Clear previous outputs
rm -rf test-sagemaker/model/* test-sagemaker/output/*
# Test with different hyperparameters
docker run --rm \
-v $(pwd)/test-sagemaker:/opt/ml \
ml-training:test \
--n_estimators 100 \
--max_depth 15 \
--min_samples_split 5
# Verify outputs again
ls -lh test-sagemaker/model/
cat test-sagemaker/output/data/metrics.json
Common Local Testing Issues
| Error | Cause | Fix |
|---|---|---|
No module named 'sklearn' |
Dependencies not installed | Rebuild: docker build --no-cache -t ml-training:test .
|
No CSV files found |
Data path incorrect | Check: ls test-sagemaker/input/data/training/
|
Insufficient rows |
Not enough test data | Add more rows to sample.csv (min 5) |
Permission denied |
Volume mount issue | Use absolute path: -v /full/path/test-sagemaker:/opt/ml
|
Pro Tip: Keep local testing fast by using small datasets (10-100 rows). Once local tests pass, you can confidently run SageMaker with full datasets.
Step 6: Hyperparameter Tuning
Create training/hyperparameter_tuning.py:
import boto3
from sagemaker.tuner import (
HyperparameterTuner,
IntegerParameter,
)
from sagemaker.estimator import Estimator
from training_config import *
def create_hyperparameter_tuning_job(
max_jobs=20,
max_parallel_jobs=2,
objective_metric_name='validation:f1_score'
):
"""
Run hyperparameter tuning with SageMaker
"""
# Define hyperparameter ranges
hyperparameter_ranges = {
'n_estimators': IntegerParameter(50, 200),
'max_depth': IntegerParameter(5, 20),
'min_samples_split': IntegerParameter(2, 10),
'min_samples_leaf': IntegerParameter(1, 5)
}
# Create base estimator
estimator = Estimator(
image_uri=image_uri,
role=role,
instance_count=1,
instance_type='ml.m5.xlarge',
output_path=output_path,
sagemaker_session=sagemaker_session,
use_spot_instances=True,
max_wait=7200,
max_run=3600
)
# Create tuner with regex matching JSON output from train.py
tuner = HyperparameterTuner(
estimator=estimator,
objective_metric_name=objective_metric_name,
hyperparameter_ranges=hyperparameter_ranges,
metric_definitions=[
{'Name': 'validation:accuracy', 'Regex': r'"accuracy":\s*([0-9\.]+)'},
{'Name': 'validation:precision', 'Regex': r'"precision":\s*([0-9\.]+)'},
{'Name': 'validation:recall', 'Regex': r'"recall":\s*([0-9\.]+)'},
{'Name': 'validation:f1_score', 'Regex': r'"f1_score":\s*([0-9\.]+)'}
],
max_jobs=max_jobs,
max_parallel_jobs=max_parallel_jobs,
objective_type='Maximize',
strategy='Bayesian'
)
# Start tuning
tuning_job_name = f"{PROJECT_NAME}-tuning-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
print(f"Starting hyperparameter tuning: {tuning_job_name}")
print(f"Max jobs: {max_jobs}, Max parallel: {max_parallel_jobs}")
tuner.fit(
inputs={
'training': training_data,
'validation': validation_data
},
job_name=tuning_job_name,
wait=True
)
return tuner, tuning_job_name
if __name__ == '__main__':
print("Starting hyperparameter tuning job...")
tuner, job_name = create_hyperparameter_tuning_job(
max_jobs=20,
max_parallel_jobs=2
)
print(f"\nTuning completed: {job_name}")
# Get best training job
best_job = tuner.best_training_job()
print(f" Best training job: {best_job}")
# Get best hyperparameters
best_params = tuner.best_estimator().hyperparameters()
print(f" Best hyperparameters: {best_params}")
Step 7: Model Registry Integration
Create training/model_registry.py:
import boto3
import json
from datetime import datetime
import os
AWS_REGION = os.environ.get('AWS_REGION', 'ap-south-1')
sagemaker_client = boto3.client('sagemaker', region_name=AWS_REGION)
def get_metrics_from_training_job(training_job_name):
"""
Extract metrics from completed training job
"""
try:
response = sagemaker_client.describe_training_job(
TrainingJobName=training_job_name
)
# Get metrics from training job
final_metrics = response.get('FinalMetricDataList', [])
metrics = {}
for metric in final_metrics:
metric_name = metric['MetricName'].replace('validation:', '')
metrics[metric_name] = float(metric['Value'])
return metrics if metrics else {'accuracy': 0.0, 'f1_score': 0.0}
except Exception as e:
print(f"Could not fetch metrics: {e}")
return {'accuracy': 0.0, 'f1_score': 0.0}
def register_model(
model_package_group_name,
model_data_url,
image_uri,
metrics,
approval_status='PendingManualApproval'
):
"""
Register model in SageMaker Model Registry (AIDLC Phase 6: Governance)
"""
model_package_description = f"Model trained on {datetime.now().isoformat()}"
# Create model package
response = sagemaker_client.create_model_package(
ModelPackageGroupName=model_package_group_name,
ModelPackageDescription=model_package_description,
InferenceSpecification={
'Containers': [{
'Image': image_uri,
'ModelDataUrl': model_data_url
}],
'SupportedContentTypes': ['text/csv', 'application/json'],
'SupportedResponseMIMETypes': ['application/json']
},
ModelApprovalStatus=approval_status,
MetadataProperties={
'GeneratedBy': 'sagemaker-training-pipeline'
},
CustomerMetadataProperties={
'accuracy': str(metrics.get('accuracy', 0)),
'f1_score': str(metrics.get('f1_score', 0)),
'training_date': datetime.now().isoformat()
}
)
model_package_arn = response['ModelPackageArn']
print(f"Model registered: {model_package_arn}")
return model_package_arn
def approve_model(model_package_arn):
"""
Approve model for production deployment
"""
sagemaker_client.update_model_package(
ModelPackageArn=model_package_arn,
ModelApprovalStatus='Approved'
)
print(f"Model approved: {model_package_arn}")
def list_model_versions(model_package_group_name):
"""
List all versions of a model
"""
response = sagemaker_client.list_model_packages(
ModelPackageGroupName=model_package_group_name,
SortBy='CreationTime',
SortOrder='Descending'
)
return response['ModelPackageSummaryList']
Step 8: End-to-End Training Pipeline
Create pipeline/train_pipeline.py:
#!/usr/bin/env python3
"""
Complete training pipeline orchestration with model registry
"""
import argparse
import sys
import os
# Add parent directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'training'))
from training_config import create_training_job, PROJECT_NAME, image_uri
from model_registry import register_model, get_metrics_from_training_job
from hyperparameter_tuning import create_hyperparameter_tuning_job
def run_simple_training(register=True):
"""Run a single training job"""
print("=" * 60)
print("Running Simple Training Job")
print("=" * 60)
estimator, job_name = create_training_job(
instance_type='ml.m5.xlarge',
use_spot_instances=True
)
print(f"\nTraining completed: {job_name}")
print(f"Model artifacts: {estimator.model_data}")
# Register model in Model Registry (AIDLC Phase 6: Governance)
if register:
print("\nRegistering model in SageMaker Model Registry...")
try:
# Get metrics from training job
metrics = get_metrics_from_training_job(job_name)
print(f"Model metrics: {metrics}")
model_arn = register_model(
model_package_group_name=f"{PROJECT_NAME}-models",
model_data_url=estimator.model_data,
image_uri=image_uri,
metrics=metrics,
approval_status='PendingManualApproval'
)
print(f"\nModel registered successfully!")
print(f"Model ARN: {model_arn}")
print(f"\n Model requires manual approval before deployment.")
print(f" Approve via SageMaker console or using approve_model() function.")
except Exception as e:
print(f"Model registration failed: {e}")
print(" Training completed successfully, but model was not registered.")
return estimator
def run_hyperparameter_tuning():
"""Run hyperparameter tuning"""
print("=" * 60)
print("Running Hyperparameter Tuning")
print("=" * 60)
tuner, job_name = create_hyperparameter_tuning_job(
max_jobs=20,
max_parallel_jobs=2
)
print(f"\nTuning completed: {job_name}")
# Get best training job
best_job = tuner.best_training_job()
print(f" Best training job: {best_job}")
# Get best estimator
best_estimator = tuner.best_estimator()
print(f"Best model artifacts: {best_estimator.model_data}")
return tuner
def main():
parser = argparse.ArgumentParser(description='Run ML training pipeline')
parser.add_argument(
'--mode',
choices=['simple', 'tuning'],
default='simple',
help='Training mode: simple or tuning'
)
parser.add_argument(
'--no-register',
action='store_true',
help='Skip model registration'
)
args = parser.parse_args()
try:
if args.mode == 'simple':
estimator = run_simple_training(register=not args.no_register)
else:
tuner = run_hyperparameter_tuning()
print("\n" + "=" * 60)
print("Pipeline completed successfully!")
print("=" * 60)
except Exception as e:
print(f"\nPipeline failed: {e}")
sys.exit(1)
if __name__ == '__main__':
main()
Step 9: Cost Optimization
Spot Instances Strategy
# Already configured in training_config.py:
estimator = Estimator(
# ... other params ...
use_spot_instances=True,
max_wait=7200, # Maximum wait time (2 hours)
max_run=3600, # Maximum training time (1 hour)
)
# Spot instances save ~70% on compute costs
# Trade-off: Training can be interrupted
# Best for: Non-urgent, resumable training jobs
Instance Type Selection
# Development/Testing
instance_type = 'ml.m5.large' # $0.115/hr on-demand
# Production Training
instance_type = 'ml.m5.xlarge' # $0.23/hr on-demand
# GPU Training (deep learning)
instance_type = 'ml.p3.2xlarge' # $3.06/hr (1 GPU)
# With Spot instances (70% savings):
# ml.m5.xlarge: $0.23/hr → ~$0.07/hr with Spot
Realistic Cost Summary
Monthly Costs (Development - 10 training runs):
| Resource | Configuration | Realistic Cost/Month |
|---|---|---|
| Training (Spot) | 10 jobs × 1hr × ml.m5.xlarge @ $0.07 | ~$7 |
| S3 Storage | 50GB models | ~$1.15 |
| CloudWatch | Logs + Metrics (with retention) | ~$8 |
| ECR | Container storage | ~$0.50 |
| Total (Development) | ~$16.65 |
With Hyperparameter Tuning (4 tuning runs/month):
| Additional Resource | Configuration | Cost/Month |
|---|---|---|
| HP Tuning | 4 runs × 20 jobs × 1hr × ml.m5.xlarge @ $0.07 | ~$56 |
| Total (Dev + Tuning) | ~$72.65 |
Production Scale (100 training jobs/month):
| Resource | Configuration | Cost/Month |
|---|---|---|
| Training (Spot) | 100 jobs × 1hr × ml.m5.xlarge @ $0.07 | ~$70 |
| S3 Storage | 500GB models with lifecycle | ~$8 |
| CloudWatch | Increased logs (30-day retention) | ~$25 |
| ECR | Container versions | ~$2 |
| Total (Production) | ~$105 |
Cost Saving Tips:
- Use Spot instances - 70% savings over on-demand (biggest impact)
- S3 lifecycle policies - Transition old models to Glacier after 90 days
- Right-size instances - Start with ml.m5.large, upgrade if needed
- CloudWatch log retention - Set to 30 days max (7 days for dev)
- Delete failed training artifacts - Clean up S3 regularly
- Hyperparameter tuning budget - Limit max_jobs to control costs
- Stop unused resources - Clean up old model versions
Cost Monitoring:
# Enable AWS Cost Explorer
aws ce get-cost-and-usage \
--time-period Start=2024-01-01,End=2024-01-31 \
--granularity MONTHLY \
--metrics BlendedCost \
--group-by Type=SERVICE
# Set up budget alerts (do this in console or via Terraform)
Step 10: Monitoring and Alerts
Create terraform/sagemaker-monitoring.tf:
# CloudWatch Log Group for SageMaker
resource "aws_cloudwatch_log_group" "sagemaker_training" {
name = "/aws/sagemaker/TrainingJobs"
retention_in_days = 30
kms_key_id = aws_kms_key.data_encryption.arn
tags = {
Name = "SageMaker Training Logs"
Environment = var.environment
}
}
# Training Job Failure Alarm
resource "aws_cloudwatch_metric_alarm" "training_failures" {
alarm_name = "${var.project_name}-training-failures"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "TrainingJobsFailed"
namespace = "AWS/SageMaker"
period = "300"
statistic = "Sum"
threshold = "1"
alarm_description = "Alert when training job fails"
alarm_actions = [aws_sns_topic.validation_notifications.arn]
treat_missing_data = "notBreaching"
}
# Training Cost Alert
resource "aws_cloudwatch_metric_alarm" "training_cost" {
alarm_name = "${var.project_name}-training-cost"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "EstimatedCharges"
namespace = "AWS/Billing"
period = "86400"
statistic = "Maximum"
threshold = "100" # $100/day threshold
alarm_description = "Alert when daily training costs exceed $100"
alarm_actions = [aws_sns_topic.validation_notifications.arn]
dimensions = {
Currency = "USD"
ServiceName = "AmazonSageMaker"
}
}
# Spot Instance Interruption Alarm
resource "aws_cloudwatch_metric_alarm" "spot_interruptions" {
alarm_name = "${var.project_name}-spot-interruptions"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "1"
metric_name = "TrainingJobsStoppedDueToSpotInterruption"
namespace = "AWS/SageMaker"
period = "300"
statistic = "Sum"
threshold = "3"
alarm_description = "Alert when multiple spot interruptions occur"
alarm_actions = [aws_sns_topic.validation_notifications.arn]
}
Testing the Pipeline
1. Verify Prerequisites
# Set environment variables
export AWS_REGION="ap-south-1" # Your region
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export VALIDATED_BUCKET="ml-pipeline-validated-data-dev-${AWS_ACCOUNT_ID}"
# Verify data split exists (from Part 2, Step 6.5)
echo "Checking training data..."
aws s3 ls s3://${VALIDATED_BUCKET}/validated/train/
echo "Checking validation data..."
aws s3 ls s3://${VALIDATED_BUCKET}/validated/val/
# Both should show CSV files. If not, go back to Step 2.
2. Test Locally (REQUIRED)
# ALWAYS test locally first!
cd training
docker build -t ml-training:test .
# Prepare test data (if not already done in Step 5)
mkdir -p test-sagemaker/{input/data/training,input/data/validation,model,output/data}
# Copy test data or create minimal samples (see Step 5)
# Run local test
docker run --rm \
-v $(pwd)/test-sagemaker:/opt/ml \
ml-training:test \
--n_estimators 50 \
--max_depth 5
# Verify outputs
ls -lh test-sagemaker/model/ # Should see model.joblib
cat test-sagemaker/output/data/metrics.json # Should see metrics
3. Push Container to ECR
# Build and push (see Step 1 for full commands)
export ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/ml-training"
docker build -t ml-training:latest .
docker tag ml-training:latest ${ECR_REPO}:latest
aws ecr get-login-password --region ${AWS_REGION} | \
docker login --username AWS --password-stdin ${ECR_REPO}
docker push ${ECR_REPO}:latest
echo "Container available at: ${ECR_REPO}:latest"
4. Run Simple Training Job
cd ../pipeline
python train_pipeline.py --mode simple
# Expected output:
# ============================================================
# Running Simple Training Job
# ============================================================
# Starting training job: ml-pipeline-2024-12-27-10-30-45
# Training data: s3://ml-pipeline-validated-data-dev-123456789/validated/train/
# Validation data: s3://ml-pipeline-validated-data-dev-123456789/validated/val/
# ...
# Training completed: ml-pipeline-2024-12-27-10-30-45
# Model artifacts: s3://ml-pipeline-model-artifacts-dev-123456789/models/...
# Registering model in SageMaker Model Registry...
# Model registered successfully!
5. Monitor Training
# Watch CloudWatch logs in real-time
aws logs tail /aws/sagemaker/TrainingJobs --follow
# In another terminal, check SageMaker console
# https://console.aws.amazon.com/sagemaker/home?region=ap-south-1#/jobs
# List model registry versions
aws sagemaker list-model-packages \
--model-package-group-name ml-pipeline-models
6. Run Hyperparameter Tuning (Optional)
# Only run after successful simple training
python train_pipeline.py --mode tuning
# This will:
# - Start 20 training jobs (2 in parallel)
# - Take 2-4 hours with Spot instances
# - Find best hyperparameters
# - Cost: ~$14 with Spot instances
Troubleshooting Guide
Issue: "No CSV files found in /opt/ml/input/data/training"
# Cause: Train/val split missing or incorrect S3 path
# Solution: Verify data split from Part 2
# Check S3 paths
aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/train/
aws s3 ls s3://ml-pipeline-validated-data-dev-YOUR_ACCOUNT_ID/validated/val/
# If empty, go back to Part 2, Step 6.5 and run the split script
Issue: Training job fails immediately
# Check CloudWatch logs for error
aws logs tail /aws/sagemaker/TrainingJobs --follow
# Common causes:
# 1. Invalid S3 path (train/val not split) → See above
# 2. Missing IAM permissions → Check terraform/sagemaker.tf
# 3. Container image issues → Test locally first (Step 5)
# 4. Incorrect data format → Verify CSV schema matches expected columns
Issue: "No module named 'sklearn'"
# Cause: Dependencies not installed in container
# Solution: Rebuild without cache
cd training
docker build --no-cache -t ml-training:latest .
# Verify dependencies
docker run --rm ml-training:latest pip list | grep scikit
# Should see: scikit-learn 1.3.0
Issue: SageMaker can't pull ECR image
# Check ECR repository policy
aws ecr get-repository-policy --repository-name ml-training
# Should see: "Service": "sagemaker.amazonaws.com"
# If not, re-apply Terraform
cd terraform
terraform apply -target=aws_ecr_repository_policy.ml_training
Issue: Spot instance interrupted repeatedly
# Solution 1: Increase max_wait
# In training_config.py, change:
max_wait=14400 # 4 hours instead of 2
# Solution 2: Use on-demand for critical jobs
use_spot_instances=False
# Solution 3: Try different instance type
instance_type='ml.m5.2xlarge' # Less contentious
Issue: Metric regex not matching
# Check actual log output format
aws logs tail /aws/sagemaker/TrainingJobs --follow | grep "Validation Metrics"
# Should see JSON format:
# Validation Metrics: {"accuracy": 0.95, "f1_score": 0.93, ...}
# Verify regex in hyperparameter_tuning.py matches:
# r'"accuracy":\s*([0-9\.]+)'
# If you see: Validation Metrics: accuracy=0.95 (non-JSON)
# Then update train.py to output JSON (already done in our code)
Issue: Model registration fails
# Verify model package group exists
aws sagemaker describe-model-package-group \
--model-package-group-name ml-pipeline-models
# If not exists, apply Terraform
cd terraform
terraform apply -target=aws_sagemaker_model_package_group.ml_models
Issue: High costs unexpectedly
# Check current month's costs
aws ce get-cost-and-usage \
--time-period Start=$(date -d "$(date +%Y-%m-01)" +%Y-%m-%d),End=$(date +%Y-%m-%d) \
--granularity DAILY \
--metrics BlendedCost \
--group-by Type=SERVICE \
--filter file://<(echo '{
"Dimensions": {
"Key": "SERVICE",
"Values": ["Amazon SageMaker"]
}
}')
# Common causes:
# 1. Hyperparameter tuning with too many jobs
# 2. Forgot to enable Spot instances
# 3. Using expensive instance types (ml.p3.*)
# 4. Jobs not terminating (increase max_run timeout)
Security Best Practices
Training Security Checklist:
Data Encryption
- Training data encrypted at rest (S3 KMS)
- Inter-container traffic encrypted
- Model artifacts encrypted
- CloudWatch logs encrypted
Access Control
- IAM role with least privilege
- No hard-coded credentials
- ECR image scanning enabled
- ECR repository policy restricts access
Network Security (Recommended for Production)
- VPC configuration for SageMaker
- Private subnets for sensitive workloads
- VPC endpoints for S3/CloudWatch
- (Not implemented here to keep focus on fundamentals)
Audit & Compliance (AIDLC Phase 6)
- CloudWatch logging for all jobs
- CloudTrail tracking API calls
- Model versioning and lineage
- Cost monitoring and alerts
Container Security
- Regular base image updates
- Vulnerability scanning on push
- Minimal dependencies
- Non-root user in container (recommended)
What's Next?
In Part 4 (Series Finale), we'll complete the AIDLC framework:
Phase 4: Model Deployment
- CI/CD Pipeline for automated deployment
- SageMaker inference endpoints with auto-scaling
- A/B Testing and canary deployments
Phase 5: Monitoring & Maintenance
- Model drift detection
- Data quality monitoring
- Performance degradation alerts
- Automated retraining triggers
Phase 6: Compliance (Final)
- Complete observability stack
- Incident response procedures
- Rollback automation
- Audit trail completion
This final part brings all AIDLC phases together into a production-demo-ready ML system on AWS.
Key Takeaways
- Test locally first - Catches 90% of issues before AWS charges (most important!)
- Use custom containers - Full control over environment and dependencies
- Verify data split - Separate train/val datasets from Part 2 are essential
- Optimize costs - Spot instances save 70%, but monitor usage closely
- Version models - Model registry (AIDLC Phase 6) enables tracking and governance
- Automate tuning - Hyperparameter optimization finds better models faster
- Monitor everything - CloudWatch metrics and alarms catch issues early
- Follow AIDLC - Each phase builds on the previous for production ML
Remember: Good training pipelines are automated, tested, tracked, cost-efficient, and follow the AIDLC framework for production readiness.
Resources
AWS Documentation:
- SageMaker Training
- Managed Spot Training
- Model Registry
- Hyperparameter Tuning
- Custom Training Containers
Tools:
Related Articles:
Let's Connect!
- Questions about training at scale? Drop a comment
- Follow for Part 4 - Production Deployment & Monitoring (Final!)
- Like if this helped you build your training pipeline
- Share with your team/connects
What training challenges are you facing? Let me know in the comments!
About the Author
Connect with me:
Tags: #aws #machinelearning #mlops #aidlc #sagemaker #devops #python #terraform #docker


Top comments (0)