In my previous post, we trained a churn prediction model and deployed it to a SageMaker endpoint. That's where most tutorials stop. But in production, deploying a model is only the beginning.
Models degrade. Customer behaviour shifts. Contract mix changes when competitors launch new offers. A model trained on January's data may be quietly wrong by June — and without active monitoring, nobody will know until the churn rate climbs and someone asks awkward questions in a board meeting.
This post covers the system I built to make that model self-maintaining: an end-to-end MLOps pipeline that retrains automatically, gates on model quality, and raises alerts the moment it detects something drifting.
What We're Building
EventBridge (daily schedule)
│
▼
Lambda: trigger_retraining
│
▼
SageMaker Pipeline
├── Phase 1: PreprocessData → encode, scale, split
├── Phase 2: TrainModel → SKLearn estimator
├── Phase 3: EvaluateModel → writes evaluation.json
└── Phase 4: CheckAccuracy → ROC-AUC ≥ 0.80?
├── YES → RegisterModel (PendingManualApproval)
└── NO → skip registration
Model Monitor (separate Lambda)
├── KS test on numeric features
├── Chi-squared on categorical features
└── Alerts → SNS → CloudWatch
The whole pipeline is infrastructure-as-code (Terraform), parameterised, and testable locally with 24 unit tests.
Phase 1: The Pipeline Definition
SageMaker Pipelines SDK lets you define a DAG of steps in Python. The pipeline object is then serialised to JSON and pushed to SageMaker, which manages execution, retry, and lineage tracking.
# pipeline/pipeline.py
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterFloat, ParameterString
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
def create_pipeline() -> Pipeline:
# Overridable at execution time — no hardcoded values
accuracy_threshold = ParameterFloat(name="AccuracyThreshold", default_value=0.80)
input_data_uri = ParameterString(name="InputDataUri", default_value=f"s3://{default_bucket}/data/raw/")
processing_step = ProcessingStep(
name="PreprocessData",
processor=sklearn_processor,
inputs=[ProcessingInput(source=input_data_uri, destination="/opt/ml/processing/input")],
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test"),
],
code="pipeline/preprocess.py",
)
# ... training_step, evaluation_step setup omitted for brevity — see pipeline.py in the repo
condition_step = ConditionStep(
name="CheckAccuracy",
conditions=[ConditionGreaterThanOrEqualTo(
left=JsonGet(step_name="EvaluateModel", property_file=evaluation_report,
json_path="metrics.accuracy.value"), # accuracy chosen: simpler threshold, business-readable
right=accuracy_threshold,
)],
if_steps=[register_step],
else_steps=[],
)
return Pipeline(
name="telecom-churn-pipeline",
parameters=[input_data_uri, accuracy_threshold],
steps=[processing_step, training_step, evaluation_step, condition_step],
)
Every meaningful value — instance types, thresholds, data paths — is a ParameterString or ParameterFloat. The Lambda trigger can override any of them per execution without touching the pipeline definition.
Why accuracy and not ROC-AUC for gating? ROC-AUC is the better model quality metric (as argued in Part 1), but SageMaker ConditionalStep thresholds need to be intuitive for non-ML stakeholders who approve deployments. "Accuracy must exceed 80%" is easier to explain in a business review than an AUC threshold. You can always swap
metrics.accuracy.value→metrics.roc_auc.valueand adjust the threshold.
Phase 2: Preprocessing with Synthetic Fallback
# pipeline/preprocess.py
def load_data(input_dir: str) -> pd.DataFrame:
csv_files = [f for f in os.listdir(input_dir) if f.endswith(".csv")] \
if os.path.isdir(input_dir) else []
if csv_files:
return pd.read_csv(os.path.join(input_dir, csv_files[0]))
print("No input CSV found — generating synthetic telecom data")
return generate_synthetic_data(n=8000)
def split_and_save(X: np.ndarray, y: np.ndarray, test_size: float = 0.2) -> None:
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
# SageMaker convention: target first, no header
train_df = pd.DataFrame(np.column_stack([y_train, X_train]))
train_df.to_csv(os.path.join(TRAIN_OUTPUT_DIR, "train.csv"), index=False, header=False)
Two details worth noting:
- Stratified split — the churn class is imbalanced (~37%). Without stratification you risk the test set having a different class ratio than training.
- Scaler fit on train only — fit on X_train, transform X_test. Fitting on all data leaks test set statistics into training.
Phase 3: Evaluation Report
The evaluation step writes a JSON file that the ConditionalStep reads:
# pipeline/evaluate.py
def compute_metrics(model, model_type, X_test, y_test) -> dict:
if model_type == "sklearn":
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
else: # keras
y_prob = model.predict(X_test).flatten()
y_pred = (y_prob >= 0.5).astype(int)
return {
"accuracy": {"value": round(accuracy_score(y_test, y_pred), 4)},
"roc_auc": {"value": round(roc_auc_score(y_test, y_prob), 4)},
}
def write_evaluation_report(metrics: dict, output_dir: str) -> None:
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "evaluation.json"), "w") as f:
json.dump({"metrics": metrics}, f, indent=2)
The PropertyFile in the pipeline definition tells SageMaker where to find this JSON and which key to extract (metrics.accuracy.value). The ConditionalStep compares that against AccuracyThreshold.
Phase 4: Automated Retraining via EventBridge
# src/trigger_retraining.py
PARAM_MAP = {
"input_data_uri": "InputDataUri",
"training_instance_type": "TrainingInstanceType",
"accuracy_threshold": "AccuracyThreshold",
}
def handler(event: dict, context) -> dict:
params = [
{"Name": PARAM_MAP[k], "Value": str(v)}
for k, v in event.get("pipeline_parameters", {}).items()
if k in PARAM_MAP
]
response = sagemaker_client.start_pipeline_execution(
PipelineName=PIPELINE_NAME,
PipelineParameters=params,
)
return {"statusCode": 200, "body": json.dumps({"execution_arn": response["PipelineExecutionArn"]})}
EventBridge fires this daily at 02:00 UTC. It can also be invoked on demand when new CSV files land in S3. The pipeline_parameters field lets you run A/B threshold experiments without changing code.
Phase 5: Drift Monitoring
Data Drift — Did the input distribution change?
# src/monitor.py
from scipy import stats
def check_data_drift(baseline, current, threshold=0.05):
# KS test for numeric features
for col in NUMERIC_COLS:
stat, p_value = stats.ks_2samp(baseline[col].dropna(), current[col].dropna())
drifted = bool(p_value < threshold) # cast to Python bool — numpy.bool_ isn't JSON-serialisable
# Chi-squared for categoricals
for col in CATEGORICAL_COLS:
all_cats = set(baseline[col].unique()) | set(current[col].unique())
b_counts = baseline[col].value_counts().reindex(all_cats, fill_value=0)
c_counts = current[col].value_counts().reindex(all_cats, fill_value=0)
stat, p_value = stats.chisquare(f_obs=c_counts.values + 1, f_exp=b_counts.values + 1)
drifted = bool(p_value < threshold)
KS test for numerics (non-parametric, catches any distributional shift), chi-squared for categoricals (frequency count comparison with Laplace smoothing).
One subtle bug: scipy.stats.ks_2samp returns numpy.bool_, not Python bool. The json.dumps in run_monitoring raises TypeError: Object of type bool is not JSON serializable. Fix: bool(p_value < threshold) — one word.
Model Drift — Is the model's behaviour changing?
def check_model_drift(predictions, actuals=None, baseline_accuracy=0.80):
pred_churn_rate = float(predictions.mean())
# Flag if predicted churn rate deviates >15pp from expected ~37%
if abs(pred_churn_rate - 0.37) > 0.15:
results["model_drift_detected"] = True
# Delayed ground truth (labels arrive weeks later in telecom)
if actuals is not None:
degradation = baseline_accuracy - accuracy_score(actuals, predictions)
if degradation > 0.05: # 5pp drop triggers alert
results["model_drift_detected"] = True
In telecom, churn labels arrive weeks after the prediction (when the contract period ends). The actuals parameter handles this delayed feedback loop.
Test Suite: 24/24
tests/test_preprocess.py 10 passed
tests/test_monitor.py 8 passed
tests/test_trigger_retraining.py 6 passed
──────────────────────────────────────────
Total 24 passed
All tests written before implementation (TDD). Key test: test_no_drift_similar_data uses threshold=0.001 — even bootstrap-resampled data should not flag false drift under normal variance.
Infrastructure (Terraform)
All AWS resources provisioned via Terraform — nothing clicked in the console:
resource "aws_lambda_function" "trigger_retraining" {
function_name = "telecom-churn-trigger-retraining"
runtime = "python3.11"
role = aws_iam_role.lambda_execution.arn
environment {
variables = {
PIPELINE_NAME = var.pipeline_name
AWS_REGION = var.aws_region
}
}
}
resource "aws_cloudwatch_event_rule" "daily_retraining" {
schedule_expression = "cron(0 2 * * ? *)" # 02:00 UTC daily
}
Lambda IAM role: sagemaker:StartPipelineExecution scoped to the specific pipeline ARN only.
What's Next
- A/B model deployment — route 10% of traffic to the new model, compare live accuracy before full cutover
- SageMaker Feature Store — consistent feature engineering between training and inference
-
Approval webhook — Slack notification when a model lands in
PendingManualApproval - CDR integration — feed Call Detail Records for real-time churn scoring at the network edge
Source Code
Full project: github.com/tsekatm/mlops-sagemaker-pipeline
Tebogo Tseka — Cloud Solutions Architect & ML Engineer
GitHub: @tsekatm | Blog: tebogosacloud.blog
Top comments (0)