DEV Community

Cover image for SageMaker Pipelines: CI/CD for ML with Terraform πŸ”
Suhas Mallesh
Suhas Mallesh

Posted on

SageMaker Pipelines: CI/CD for ML with Terraform πŸ”

Manual model retraining is a reliability risk. SageMaker Pipelines automates the full ML lifecycle - preprocessing, training, evaluation, conditional registration, and deployment. Here's how to build it with Terraform and the Pipelines SDK.

Through Series 5, we've built the workspace, deployed endpoints, and set up the feature store. The missing piece is automation. Right now, retraining means someone manually running a notebook, evaluating results, and updating the endpoint. That doesn't scale and it's a reliability risk.

SageMaker Pipelines brings CI/CD discipline to ML: preprocessing, training, evaluation, conditional model registration, and endpoint deployment run automatically on a schedule or triggered by new data. Each pipeline run is tracked, reproducible, and auditable. Terraform provisions the infrastructure; the Pipelines SDK defines the DAG. 🎯

πŸ—οΈ Pipeline Architecture

Trigger (EventBridge schedule or S3 event)
    ↓
ProcessingStep  β†’  preprocess raw data
    ↓
TrainingStep    β†’  train model on processed data
    ↓
ProcessingStep  β†’  evaluate model metrics
    ↓
ConditionStep   β†’  if accuracy > threshold
    ↓                       ↓
RegisterModel         FailStep
    ↓
EventBridge     β†’  model approved β†’ deploy to endpoint
Enter fullscreen mode Exit fullscreen mode
Step What It Does
ProcessingStep Data preprocessing, feature engineering, evaluation
TrainingStep Model training with SageMaker Training Jobs
ConditionStep Gate on metric threshold before registering
ModelStep Register model version in Model Registry
EventBridge Trigger deployment on model approval

πŸ”§ Terraform: Pipeline Infrastructure

IAM Role

# pipeline/iam.tf

resource "aws_iam_role" "pipeline_execution" {
  name = "${var.environment}-pipeline-execution"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action    = "sts:AssumeRole"
      Effect    = "Allow"
      Principal = { Service = "sagemaker.amazonaws.com" }
    }]
  })
}

resource "aws_iam_role_policy_attachments_exclusive" "pipeline" {
  role_name = aws_iam_role.pipeline_execution.name
  policy_arns = [
    "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
    "arn:aws:iam::aws:policy/AmazonS3FullAccess",
  ]
}
Enter fullscreen mode Exit fullscreen mode

Model Package Group (Model Registry)

# pipeline/registry.tf

resource "aws_sagemaker_model_package_group" "this" {
  model_package_group_name        = "${var.environment}-${var.model_name}"
  model_package_group_description = "Model registry for ${var.model_name}"

  tags = {
    Environment = var.environment
    Model       = var.model_name
  }
}
Enter fullscreen mode Exit fullscreen mode

The Pipeline

# pipeline/pipeline.tf

resource "aws_sagemaker_pipeline" "this" {
  pipeline_name         = "${var.environment}-${var.model_name}-pipeline"
  pipeline_display_name = "${var.environment}-${var.model_name}"
  role_arn              = aws_iam_role.pipeline_execution.arn

  pipeline_definition = templatefile(
    "${path.module}/pipeline_definition.json",
    {
      role_arn          = aws_iam_role.pipeline_execution.arn
      region            = var.region
      account_id        = data.aws_caller_identity.current.account_id
      model_group_name  = aws_sagemaker_model_package_group.this.model_package_group_name
      training_image    = var.training_image_uri
      processing_image  = var.processing_image_uri
      data_bucket       = var.data_bucket
      output_bucket     = var.output_bucket
      accuracy_threshold = var.accuracy_threshold
    }
  )

  tags = {
    Environment = var.environment
    Model       = var.model_name
  }
}
Enter fullscreen mode Exit fullscreen mode

EventBridge: Scheduled Trigger

# pipeline/trigger.tf

resource "aws_scheduler_schedule" "pipeline_trigger" {
  name = "${var.environment}-${var.model_name}-pipeline-trigger"

  flexible_time_window {
    mode = "OFF"
  }

  schedule_expression = var.pipeline_schedule  # e.g. "cron(0 2 * * ? *)"

  target {
    arn      = "arn:aws:sagemaker:${var.region}:${data.aws_caller_identity.current.account_id}:pipeline/${aws_sagemaker_pipeline.this.pipeline_name}"
    role_arn = aws_iam_role.scheduler.arn

    sagemaker_pipeline_parameters {
      pipeline_parameter_list {
        name  = "InputDataUri"
        value = "s3://${var.data_bucket}/latest/"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

EventBridge: Auto-Deploy on Model Approval

# pipeline/deployment_trigger.tf

resource "aws_cloudwatch_event_rule" "model_approval" {
  name = "${var.environment}-model-approved"

  event_pattern = jsonencode({
    source      = ["aws.sagemaker"]
    detail-type = ["SageMaker Model Package State Change"]
    detail = {
      ModelPackageGroupName = [aws_sagemaker_model_package_group.this.model_package_group_name]
      ModelApprovalStatus   = ["Approved"]
    }
  })
}

resource "aws_cloudwatch_event_target" "deploy_lambda" {
  rule      = aws_cloudwatch_event_rule.model_approval.name
  target_id = "deploy-approved-model"
  arn       = aws_lambda_function.deploy_model.arn
}
Enter fullscreen mode Exit fullscreen mode

When a model is approved in the registry, EventBridge triggers a Lambda that updates the SageMaker endpoint to the new model version.

🐍 Pipeline Definition (Pipelines SDK)

Terraform stores the pipeline definition as a JSON file generated by the SDK:

# generate_pipeline.py

import json
import boto3
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.estimator import Estimator
from sagemaker.workflow.functions import JsonGet

role = "ROLE_ARN"
session = boto3.Session()

# Pipeline parameters
input_data = ParameterString(name="InputDataUri", default_value="s3://bucket/data/")

# Step 1: Preprocessing
preprocessor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.m5.large",
    instance_count=1,
)

step_process = ProcessingStep(
    name="Preprocess",
    processor=preprocessor,
    inputs=[...],
    outputs=[...],
    code="scripts/preprocess.py",
)

# Step 2: Training
estimator = Estimator(
    image_uri="TRAINING_IMAGE",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path="s3://output-bucket/models/",
)

step_train = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={"train": step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri},
)

# Step 3: Evaluation
step_eval = ProcessingStep(
    name="Evaluate",
    processor=preprocessor,
    inputs=[...],
    outputs=[...],
    code="scripts/evaluate.py",
    property_files=[evaluation_report],
)

# Step 4: Conditional registration
accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(step_name=step_eval.name, property_file=evaluation_report, json_path="metrics.accuracy"),
    right=0.85,  # Threshold - override with var.accuracy_threshold
)

step_register = ModelStep(
    name="RegisterModel",
    step_args=model.register(
        content_types=["application/json"],
        response_types=["application/json"],
        inference_instances=["ml.m5.xlarge"],
        model_package_group_name="MODEL_GROUP_NAME",
        approval_status="PendingManualApproval",
    ),
)

step_condition = ConditionStep(
    name="CheckAccuracy",
    conditions=[accuracy_condition],
    if_steps=[step_register],
    else_steps=[],
)

# Build pipeline
pipeline = Pipeline(
    name="PIPELINE_NAME",
    parameters=[input_data],
    steps=[step_process, step_train, step_eval, step_condition],
)

# Export definition for Terraform
with open("pipeline_definition.json", "w") as f:
    json.dump(json.loads(pipeline.definition()), f, indent=2)
Enter fullscreen mode Exit fullscreen mode

Run this script to generate pipeline_definition.json, then reference it in the aws_sagemaker_pipeline Terraform resource.

πŸ“ Environment Configuration

# environments/dev.tfvars
model_name           = "fraud-detector"
accuracy_threshold   = 0.80
pipeline_schedule    = "cron(0 6 * * ? *)"  # Daily at 6am
training_image_uri   = "123456789012.dkr.ecr.us-east-1.amazonaws.com/training:dev"

# environments/prod.tfvars
model_name           = "fraud-detector"
accuracy_threshold   = 0.90
pipeline_schedule    = "cron(0 2 * * ? *)"  # Daily at 2am
training_image_uri   = "123456789012.dkr.ecr.us-east-1.amazonaws.com/training:v2.1.0"
Enter fullscreen mode Exit fullscreen mode

Higher accuracy thresholds in prod. A model that clears 80% in dev might need 90% before auto-registering in prod. The ConditionStep enforces this gate automatically.

πŸ”§ The CI/CD Flow

1. EventBridge fires on schedule
        ↓
2. SageMaker Pipeline starts
        ↓
3. Preprocessing job runs
        ↓
4. Training job runs
        ↓
5. Evaluation job computes accuracy
        ↓
6a. accuracy >= threshold β†’ RegisterModel (PendingManualApproval)
6b. accuracy < threshold β†’ Pipeline fails with clear error
        ↓
7. Human reviews model in Model Registry
        ↓
8. Approved β†’ EventBridge fires
        ↓
9. Lambda updates SageMaker Endpoint to new model
Enter fullscreen mode Exit fullscreen mode

Manual approval at step 7 is optional. Set approval_status = "Approved" in the registration step for fully automated deployments.

⚠️ Gotchas and Tips

Pipeline definition is JSON. The aws_sagemaker_pipeline resource takes a JSON string. Generate it with the SDK, store it as a file, and use templatefile() to inject Terraform variable values (role ARNs, bucket names, image URIs).

Each pipeline run is versioned. Every execution is logged with its inputs, outputs, and metrics. Use the SageMaker Studio Pipelines tab to inspect any historical run.

Container images must exist before terraform apply. The pipeline references training and processing container images. Build and push them to ECR before running Terraform.

The Model Registry is the gate. PendingManualApproval gives your team a human review step before deployment. Approved auto-deploys. Choose based on your risk tolerance per environment.

Lambda for deployment. The EventBridge-to-deployment pattern uses a Lambda to call the SageMaker API and update the endpoint. Keep the Lambda simple - just a boto3 call to update the endpoint config and create a new endpoint version.

⏭️ Series 5 Complete!

This is Post 4 of the ML Pipelines & MLOps with Terraform series.


Your ML workflow is automated. Scheduled retraining, metric-gated model registration, human approval gates, and automatic endpoint updates. From raw data to production, every step is tracked, reproducible, and auditable. πŸ”

Found this helpful? Follow for the next series! πŸ’¬

Top comments (0)