DEV Community

Tebogo Tseka
Tebogo Tseka

Posted on

Building a Production MLOps Pipeline on AWS SageMaker for Telecom Churn

In my previous post, we trained a churn prediction model and deployed it to a SageMaker endpoint. That's where most tutorials stop. But in production, deploying a model is only the beginning.

Models degrade. Customer behaviour shifts. Contract mix changes when competitors launch new offers. A model trained on January's data may be quietly wrong by June — and without active monitoring, nobody will know until the churn rate climbs and someone asks awkward questions in a board meeting.

This post covers the system I built to make that model self-maintaining: an end-to-end MLOps pipeline that retrains automatically, gates on model quality, and raises alerts the moment it detects something drifting.


What We're Building

EventBridge (daily schedule)
        │
        ▼
Lambda: trigger_retraining
        │
        ▼
SageMaker Pipeline
  ├── Phase 1: PreprocessData   → encode, scale, split
  ├── Phase 2: TrainModel       → SKLearn estimator
  ├── Phase 3: EvaluateModel    → writes evaluation.json
  └── Phase 4: CheckAccuracy    → ROC-AUC ≥ 0.80?
           ├── YES → RegisterModel (PendingManualApproval)
           └── NO  → skip registration

Model Monitor (separate Lambda)
  ├── KS test on numeric features
  ├── Chi-squared on categorical features
  └── Alerts → SNS → CloudWatch
Enter fullscreen mode Exit fullscreen mode

The whole pipeline is infrastructure-as-code (Terraform), parameterised, and testable locally with 24 unit tests.


Phase 1: The Pipeline Definition

SageMaker Pipelines SDK lets you define a DAG of steps in Python. The pipeline object is then serialised to JSON and pushed to SageMaker, which manages execution, retry, and lineage tracking.

# pipeline/pipeline.py
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import ParameterFloat, ParameterString
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo

def create_pipeline() -> Pipeline:
    # Overridable at execution time — no hardcoded values
    accuracy_threshold = ParameterFloat(name="AccuracyThreshold", default_value=0.80)
    input_data_uri = ParameterString(name="InputDataUri", default_value=f"s3://{default_bucket}/data/raw/")

    processing_step = ProcessingStep(
        name="PreprocessData",
        processor=sklearn_processor,
        inputs=[ProcessingInput(source=input_data_uri, destination="/opt/ml/processing/input")],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),
            ProcessingOutput(output_name="test",  source="/opt/ml/processing/output/test"),
        ],
        code="pipeline/preprocess.py",
    )

    # ... training_step, evaluation_step setup omitted for brevity — see pipeline.py in the repo

    condition_step = ConditionStep(
        name="CheckAccuracy",
        conditions=[ConditionGreaterThanOrEqualTo(
            left=JsonGet(step_name="EvaluateModel", property_file=evaluation_report,
                         json_path="metrics.accuracy.value"),  # accuracy chosen: simpler threshold, business-readable
            right=accuracy_threshold,
        )],
        if_steps=[register_step],
        else_steps=[],
    )

    return Pipeline(
        name="telecom-churn-pipeline",
        parameters=[input_data_uri, accuracy_threshold],
        steps=[processing_step, training_step, evaluation_step, condition_step],
    )
Enter fullscreen mode Exit fullscreen mode

Every meaningful value — instance types, thresholds, data paths — is a ParameterString or ParameterFloat. The Lambda trigger can override any of them per execution without touching the pipeline definition.

Why accuracy and not ROC-AUC for gating? ROC-AUC is the better model quality metric (as argued in Part 1), but SageMaker ConditionalStep thresholds need to be intuitive for non-ML stakeholders who approve deployments. "Accuracy must exceed 80%" is easier to explain in a business review than an AUC threshold. You can always swap metrics.accuracy.valuemetrics.roc_auc.value and adjust the threshold.


Phase 2: Preprocessing with Synthetic Fallback

# pipeline/preprocess.py
def load_data(input_dir: str) -> pd.DataFrame:
    csv_files = [f for f in os.listdir(input_dir) if f.endswith(".csv")] \
                if os.path.isdir(input_dir) else []
    if csv_files:
        return pd.read_csv(os.path.join(input_dir, csv_files[0]))
    print("No input CSV found — generating synthetic telecom data")
    return generate_synthetic_data(n=8000)

def split_and_save(X: np.ndarray, y: np.ndarray, test_size: float = 0.2) -> None:
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42, stratify=y
    )
    # SageMaker convention: target first, no header
    train_df = pd.DataFrame(np.column_stack([y_train, X_train]))
    train_df.to_csv(os.path.join(TRAIN_OUTPUT_DIR, "train.csv"), index=False, header=False)
Enter fullscreen mode Exit fullscreen mode

Two details worth noting:

  • Stratified split — the churn class is imbalanced (~37%). Without stratification you risk the test set having a different class ratio than training.
  • Scaler fit on train only — fit on X_train, transform X_test. Fitting on all data leaks test set statistics into training.

Phase 3: Evaluation Report

The evaluation step writes a JSON file that the ConditionalStep reads:

# pipeline/evaluate.py
def compute_metrics(model, model_type, X_test, y_test) -> dict:
    if model_type == "sklearn":
        y_pred = model.predict(X_test)
        y_prob = model.predict_proba(X_test)[:, 1]
    else:  # keras
        y_prob = model.predict(X_test).flatten()
        y_pred = (y_prob >= 0.5).astype(int)

    return {
        "accuracy":  {"value": round(accuracy_score(y_test, y_pred), 4)},
        "roc_auc":   {"value": round(roc_auc_score(y_test, y_prob), 4)},
    }

def write_evaluation_report(metrics: dict, output_dir: str) -> None:
    os.makedirs(output_dir, exist_ok=True)
    with open(os.path.join(output_dir, "evaluation.json"), "w") as f:
        json.dump({"metrics": metrics}, f, indent=2)
Enter fullscreen mode Exit fullscreen mode

The PropertyFile in the pipeline definition tells SageMaker where to find this JSON and which key to extract (metrics.accuracy.value). The ConditionalStep compares that against AccuracyThreshold.


Phase 4: Automated Retraining via EventBridge

# src/trigger_retraining.py
PARAM_MAP = {
    "input_data_uri":         "InputDataUri",
    "training_instance_type": "TrainingInstanceType",
    "accuracy_threshold":     "AccuracyThreshold",
}

def handler(event: dict, context) -> dict:
    params = [
        {"Name": PARAM_MAP[k], "Value": str(v)}
        for k, v in event.get("pipeline_parameters", {}).items()
        if k in PARAM_MAP
    ]
    response = sagemaker_client.start_pipeline_execution(
        PipelineName=PIPELINE_NAME,
        PipelineParameters=params,
    )
    return {"statusCode": 200, "body": json.dumps({"execution_arn": response["PipelineExecutionArn"]})}
Enter fullscreen mode Exit fullscreen mode

EventBridge fires this daily at 02:00 UTC. It can also be invoked on demand when new CSV files land in S3. The pipeline_parameters field lets you run A/B threshold experiments without changing code.


Phase 5: Drift Monitoring

Data Drift — Did the input distribution change?

# src/monitor.py
from scipy import stats

def check_data_drift(baseline, current, threshold=0.05):
    # KS test for numeric features
    for col in NUMERIC_COLS:
        stat, p_value = stats.ks_2samp(baseline[col].dropna(), current[col].dropna())
        drifted = bool(p_value < threshold)  # cast to Python bool — numpy.bool_ isn't JSON-serialisable

    # Chi-squared for categoricals
    for col in CATEGORICAL_COLS:
        all_cats = set(baseline[col].unique()) | set(current[col].unique())
        b_counts = baseline[col].value_counts().reindex(all_cats, fill_value=0)
        c_counts = current[col].value_counts().reindex(all_cats, fill_value=0)
        stat, p_value = stats.chisquare(f_obs=c_counts.values + 1, f_exp=b_counts.values + 1)
        drifted = bool(p_value < threshold)
Enter fullscreen mode Exit fullscreen mode

KS test for numerics (non-parametric, catches any distributional shift), chi-squared for categoricals (frequency count comparison with Laplace smoothing).

One subtle bug: scipy.stats.ks_2samp returns numpy.bool_, not Python bool. The json.dumps in run_monitoring raises TypeError: Object of type bool is not JSON serializable. Fix: bool(p_value < threshold) — one word.

Model Drift — Is the model's behaviour changing?

def check_model_drift(predictions, actuals=None, baseline_accuracy=0.80):
    pred_churn_rate = float(predictions.mean())

    # Flag if predicted churn rate deviates >15pp from expected ~37%
    if abs(pred_churn_rate - 0.37) > 0.15:
        results["model_drift_detected"] = True

    # Delayed ground truth (labels arrive weeks later in telecom)
    if actuals is not None:
        degradation = baseline_accuracy - accuracy_score(actuals, predictions)
        if degradation > 0.05:  # 5pp drop triggers alert
            results["model_drift_detected"] = True
Enter fullscreen mode Exit fullscreen mode

In telecom, churn labels arrive weeks after the prediction (when the contract period ends). The actuals parameter handles this delayed feedback loop.


Test Suite: 24/24

tests/test_preprocess.py          10 passed
tests/test_monitor.py              8 passed
tests/test_trigger_retraining.py   6 passed
──────────────────────────────────────────
Total                             24 passed
Enter fullscreen mode Exit fullscreen mode

All tests written before implementation (TDD). Key test: test_no_drift_similar_data uses threshold=0.001 — even bootstrap-resampled data should not flag false drift under normal variance.


Infrastructure (Terraform)

All AWS resources provisioned via Terraform — nothing clicked in the console:

resource "aws_lambda_function" "trigger_retraining" {
  function_name = "telecom-churn-trigger-retraining"
  runtime       = "python3.11"
  role          = aws_iam_role.lambda_execution.arn

  environment {
    variables = {
      PIPELINE_NAME = var.pipeline_name
      AWS_REGION    = var.aws_region
    }
  }
}

resource "aws_cloudwatch_event_rule" "daily_retraining" {
  schedule_expression = "cron(0 2 * * ? *)"  # 02:00 UTC daily
}
Enter fullscreen mode Exit fullscreen mode

Lambda IAM role: sagemaker:StartPipelineExecution scoped to the specific pipeline ARN only.


What's Next

  • A/B model deployment — route 10% of traffic to the new model, compare live accuracy before full cutover
  • SageMaker Feature Store — consistent feature engineering between training and inference
  • Approval webhook — Slack notification when a model lands in PendingManualApproval
  • CDR integration — feed Call Detail Records for real-time churn scoring at the network edge

Source Code

Full project: github.com/tsekatm/mlops-sagemaker-pipeline


Tebogo Tseka — Cloud Solutions Architect & ML Engineer
GitHub: @tsekatm | Blog: tebogosacloud.blog

Top comments (0)