Machine Learning for Data Engineers: The Patterns I Actually Used Across 7 Projects
Data engineers are not supposed to be machine learning engineers. But at some point every serious DE pipeline ends with a question the data alone cannot answer, and you end up building a model.
Over the past six months I've shipped seven ML-driven projects: price prediction on used Japanese cars, health outcome modelling across 53 African countries, 109 time-series forecasts for 15 African development indicators, financial news sentiment analysis, semantic job search with vector embeddings, inflation forecasting for the East African Community, and crop yield projections for East Africa. None of them were data science projects in the traditional sense. They were data engineering projects where the final step was a model instead of a dashboard.
This article is about what the ML stack actually looks like when a data engineer builds it, what each tool is genuinely good for, and the specific gotchas I hit in production that the documentation does not warn you about.
The Core Stack
Seven projects, four primary tools:
- XGBoost for tabular regression and classification
- Facebook Prophet for time-series forecasting at scale
- SHAP for model explainability
- HuggingFace Transformers for NLP (FinBERT for financial sentiment, sentence-transformers for semantic search)
Supporting cast: scikit-learn for preprocessing and clustering, MLflow for experiment tracking, joblib for model persistence, Optuna for hyperparameter search, and pgvector when embeddings need to be queryable.
Everything runs locally or on a free tier. No OpenAI API keys, no cloud ML platforms. The entire stack is reproducible with uv pip install.
XGBoost: The Workhorse for Tabular Data
I used XGBoost in two projects with very different datasets and got strong results on both.
Japan Car Advisory -- 541 listings scraped from BE FORWARD and SBT Japan. 291 rows after filtering. XGBoost vs LightGBM vs Random Forest:
- XGBoost: MAE = $3,706, R² = 0.722
- LightGBM: slightly worse MAE on this dataset
- Random Forest: R² = 0.68
On a dataset with 291 rows, XGBoost won. The margin was not huge, but it held across multiple random seeds. The model ended up as the champion, trained as part of an Airflow DAG that scraped, validated, trained, and logged to MLflow every week.
Africa Health ML -- 1,219 rows, 53 African countries, 23 years of World Bank data. Three models predicting health outcomes from public investment indicators:
- Life Expectancy: MAE = 1.22 years, R² = 0.934
- Under-5 Mortality: MAE = 9.36 per 1,000, R² = 0.885
- Maternal Mortality: MAE = 47.6 per 100,000, R² = 0.945
R² above 0.88 on all three. That is not a data science achievement -- it is a data quality and feature selection achievement. The World Bank REST API provides clean, consistent historical data. If your features are right, XGBoost does the rest.
The Setup That Actually Works
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
model = xgb.XGBRegressor(
n_estimators=500,
learning_rate=0.05,
max_depth=6,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
early_stopping_rounds=50, # find the real optimal n_estimators
)
model.fit(
X_train, y_train,
eval_set=[(X_test, y_test)],
verbose=False,
)
y_pred = model.predict(X_test)
print(f"MAE: {mean_absolute_error(y_test, y_pred):,.2f}")
print(f"R²: {r2_score(y_test, y_pred):.3f}")
print(f"Best iteration: {model.best_iteration}")
early_stopping_rounds=50 is non-negotiable on small datasets. Without it, XGBoost will train all 500 trees even if optimal performance was reached at tree 120. On the Japan Car dataset the best iteration landed around 180. You also get model.best_iteration for free, which becomes your n_estimators when you retrain the final model.
The Data Leakage Trap
The single most common mistake in any ML project:
# WRONG: scaler sees the test set before training
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X) # test statistics leak into training
X_train, X_test, _, _ = train_test_split(...)
# CORRECT: scaler only sees training data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test) # same transform, no fitting
XGBoost does not need scaling (tree-based models are scale-invariant), but the principle applies to any preprocessing step -- target encoding, imputation statistics, polynomial feature generation. Always fit on training data only.
SHAP: Explainability Is Not Optional
The Africa Health project had a policy simulator: a user could set public health expenditure values for a specific country and see the predicted impact on life expectancy. That feature only works if the model's logic is interpretable.
SHAP (SHapley Additive exPlanations) gives you per-prediction feature attribution that is mathematically grounded. TreeExplainer is the right choice for XGBoost and LightGBM -- it uses the tree structure directly instead of approximation.
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)
# Summary plot: feature importance across all test predictions
shap.summary_plot(shap_values, X_test, show=False)
plt.savefig("plots/shap_summary.png", bbox_inches="tight")
# Waterfall plot: why did this specific country get this prediction?
i = 0 # index of the prediction to explain
shap.waterfall_plot(
shap.Explanation(
values=shap_values[i],
base_values=explainer.expected_value,
data=X_test.iloc[i],
feature_names=list(X_test.columns),
),
show=False,
)
What the Africa Health SHAP analysis confirmed: health expenditure as a percentage of GDP was the strongest driver of life expectancy in the model, followed by access to clean water and sanitation. The dependence plot showed the relationship was non-linear -- the gains from increasing health expenditure were steep below 5% of GDP and flattened above 8%. That is the kind of insight a dashboard cannot surface on its own.
For Streamlit integration, cache the SHAP computation:
@st.cache_data
def get_shap_values(_model, X_test_df):
explainer = shap.TreeExplainer(_model)
sv = explainer.shap_values(X_test_df)
return sv, explainer.expected_value
The underscore prefix on _model tells Streamlit not to hash the model object (which is not hashable). This is a framework quirk that will silently error without it.
Prophet: Time-Series Forecasting at Scale
This is where the data engineer mindset directly applies to ML. Prophet is not hard to use for a single series. The engineering challenge is scaling it to dozens or hundreds of series without the loop collapsing on sparse data.
The Numbers
- Kenya Crop Yield Forecaster: 29 Prophet models across 5 countries and 6 agricultural indicators
- EAC Inflation Forecaster: 50 models across 5 countries and 10 macroeconomic indicators
- Africa Development Trajectory Forecaster: 109 models across 15 countries and 8 development indicators
188 Prophet models total, all trained in automated loops, all feeding Streamlit dashboards with 4 tabs each.
The Loop Pattern
from prophet import Prophet
import pandas as pd
import logging
logging.getLogger("prophet").setLevel(logging.ERROR)
logging.getLogger("cmdstanpy").setLevel(logging.ERROR)
models = {}
forecasts = {}
for country in countries:
for indicator in indicators:
key = f"{country}_{indicator}"
subset = (
df[(df["country"] == country) & (df["indicator"] == indicator)]
.rename(columns={"year": "ds", "value": "y"})
.copy()
)
subset["ds"] = pd.to_datetime(subset["ds"].astype(str))
# Prophet needs at least 2 seasonal cycles
# Africa Dev: skipped Ethiopia/Adult Literacy and Nigeria/Trade Openness
if len(subset) < 10:
print(f"Skipping {key}: only {len(subset)} obs")
continue
m = Prophet(
yearly_seasonality=True,
weekly_seasonality=False,
daily_seasonality=False,
changepoint_prior_scale=0.1,
)
m.fit(subset)
future = m.make_future_dataframe(periods=10, freq="YE")
fc = m.predict(future)
models[key] = m
forecasts[key] = fc
print(f"Trained {len(models)} Prophet models")
Suppress the logging on lines 5-6. Prophet prints Stan convergence diagnostics on every fit. In a loop of 120 iterations that is thousands of lines of noise that obscure real errors.
The Sparse Series Problem
Prophet will fit a model on any dataset with at least 2 rows. It will not warn you that the result is meaningless. On the Africa Dev project, Ethiopia's adult literacy data had 6 observations. Prophet fit it, produced confident-looking 10-year forecasts with narrow confidence intervals, and the forecasts were nonsense.
The fix is the len(subset) < 10 guard. I settled on 10 as the minimum after testing -- fewer than two full seasonal cycles (10 years of annual data) produces forecasts that extrapolate from noise rather than signal. You will know you hit this when the forecast shows a perfectly straight line with zero seasonality.
The Frequency Trap
Annual data must use freq="YE" (year-end) or freq="YS" (year-start). Not "Y". Pandas deprecated bare "Y" in version 2.2 and make_future_dataframe passes this directly to pd.date_range. The error message is a generic pandas deprecation warning that does not mention Prophet at all.
# WRONG
future = model.make_future_dataframe(periods=10, freq="Y")
# CORRECT
future = model.make_future_dataframe(periods=10, freq="YE")
Similarly, future["ds"] returned by make_future_dataframe includes all historical dates. To isolate the actual forecast rows:
last_historical = prophet_df["ds"].max()
forecast_only = fc[fc["ds"] > last_historical]
Separating Forecast from Historical in Plotly Charts
The Streamlit dashboards use Plotly rather than Prophet's built-in matplotlib plots. The pattern for a forecast chart with shaded confidence intervals:
import plotly.graph_objects as go
import pandas as pd
hist_df = fc[fc["ds"] <= prophet_df["ds"].max()]
fc_df = fc[fc["ds"] > prophet_df["ds"].max()]
fig = go.Figure()
fig.add_trace(go.Scatter(
x=prophet_df["ds"], y=prophet_df["y"],
mode="lines+markers", name="Historical",
))
fig.add_trace(go.Scatter(
x=fc_df["ds"], y=fc_df["yhat"],
mode="lines", name="Forecast",
))
fig.add_trace(go.Scatter(
x=pd.concat([fc_df["ds"], fc_df["ds"][::-1]]),
y=pd.concat([fc_df["yhat_upper"], fc_df["yhat_lower"][::-1]]),
fill="toself",
fillcolor="rgba(99,110,250,0.15)",
line=dict(color="rgba(255,255,255,0)"),
name="80% CI",
))
One Plotly 6 gotcha that appeared across multiple projects: showlegend will throw a TypeError if you pass a numpy.bool_ instead of a Python bool. Any boolean derived from a pandas or numpy operation is numpy.bool_. The fix:
show = bool(some_condition)
fig.add_trace(go.Scatter(..., showlegend=show))
NLP for Data Engineers: FinBERT and Embeddings
Two different NLP patterns -- one for sentiment, one for semantic search.
FinBERT for Financial Sentiment (BizPulse Kenya)
BizPulse Kenya classified Kenyan business and financial news articles into positive, negative, and neutral sentiment using an ensemble of three models: FinBERT (financial domain BERT), VADER (rule-based, no GPU needed), and TextBlob (general polarity).
from transformers import pipeline
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from textblob import TextBlob
import torch
device = 0 if torch.cuda.is_available() else -1
finbert = pipeline(
"sentiment-analysis",
model="ProsusAI/finbert",
device=device,
truncation=True, # CRITICAL: BERT has a 512-token hard limit
max_length=512,
)
analyzer = SentimentIntensityAnalyzer()
def classify(text: str) -> str:
fb = finbert(text)[0]["label"]
vader = ("positive" if analyzer.polarity_scores(text)["compound"] > 0.05
else "negative" if analyzer.polarity_scores(text)["compound"] < -0.05
else "neutral")
tb_pol = TextBlob(text).sentiment.polarity
tb = "positive" if tb_pol > 0.05 else "negative" if tb_pol < -0.05 else "neutral"
votes = [fb, vader, tb]
return max(set(votes), key=votes.count)
The truncation=True argument is not optional. FinBERT crashes with a cryptic index error on any text exceeding 512 tokens without it. Most financial news articles are within 512 tokens, but earnings reports and government gazettes are not.
FinBERT labels are lowercase (positive, negative, neutral) -- not the POSITIVE/NEGATIVE format used by general BERT sentiment models. This mismatch will silently break any code that does if label == "POSITIVE".
Sentence Transformers and pgvector (JobSense)
JobSense indexed 604 job listings as vector embeddings in PostgreSQL using pgvector, enabling semantic search rather than keyword matching.
from sentence_transformers import SentenceTransformer
import psycopg2
from pgvector.psycopg2 import register_vector
model = SentenceTransformer("all-MiniLM-L6-v2") # 384 dimensions, runs locally
embeddings = model.encode(job_descriptions, normalize_embeddings=True)
# Semantic search using cosine distance
def search(query: str, top_k: int = 10) -> list:
q_emb = model.encode([query], normalize_embeddings=True)[0]
with conn.cursor() as cur:
cur.execute("""
SELECT title, company, source,
1 - (embedding <=> %s::vector) AS similarity
FROM job_embeddings
JOIN jobs USING (job_id)
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (q_emb.tolist(), q_emb.tolist(), top_k))
return cur.fetchall()
normalize_embeddings=True makes cosine similarity equal to the dot product, which pgvector's <=> operator (cosine distance) is optimised for. Use IVFFlat indexing for datasets up to ~100K rows; HNSW for anything larger or where recall matters more than index build time.
The Airflow Integration Pattern
In Japan Car Advisory, the ML training step was a task in an Airflow DAG:
scrape_beforward + scrape_sbt (parallel) → validate → train → log_to_mlflow
The key design decision: pass file paths through XCom, not model objects.
@task
def train_model(validated_path: str) -> dict:
df = pd.read_parquet(validated_path)
X, y = preprocess(df)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = xgb.XGBRegressor(n_estimators=300, learning_rate=0.05)
model.fit(X_train, y_train)
metrics = {
"mae": float(mean_absolute_error(y_test, y_pred)),
"r2": float(r2_score(y_test, y_pred)),
}
joblib.dump(model, "/opt/airflow/models/xgb_latest.pkl")
return metrics # small dict via XCom
@task
def log_to_mlflow(metrics: dict):
with mlflow.start_run():
mlflow.log_metrics(metrics)
mlflow.log_artifact("/opt/airflow/models/xgb_latest.pkl")
XCom is for metadata. Models are binary blobs -- write them to a shared volume and pass the path. Trying to serialise a 10 MB XGBoost model through XCom will either fail silently or hit the database row size limit.
The Checklist I Run Before Every Model Goes Live
Seven projects in, this is what I verify before any model result goes into a dashboard or gets pushed to GitHub:
1. No data leakage. Scaler and encoder fitted only on X_train. Target-based aggregations computed on training fold only if using cross-validation.
2. Metrics on held-out test, not training. A model with R² = 0.99 on training and R² = 0.72 on test is not a good model.
3. Scale before K-Means. On the Africa Dev project, K-Means without StandardScaler produced clusters dominated entirely by GDP (billions of dollars vs. ratios between 0 and 1). Always scaler.fit_transform(X) before KMeans.fit(X_scaled).
4. Guard sparse Prophet series. Any loop over Prophet models needs if len(subset) < 10: continue. The model will fit silently on 3 observations and produce confident-looking nonsense.
5. Log with MLflow. Even for quick experiments. Reproducing "what were the hyperparameters on the model that got R²=0.88" without MLflow means re-running the full training loop.
6. SHAP before shipping. If you cannot explain why the model made a prediction, you cannot defend it to anyone who asks. TreeExplainer on XGBoost/LightGBM takes seconds. There is no reason to skip it.
What I Would Do Differently
Use Optuna earlier. On the Japan Car project I tuned XGBoost with GridSearchCV. The search space was small enough that it worked, but Optuna's Bayesian optimisation consistently finds better hyperparameters in fewer trials. It should be the default now.
Cache Prophet models. Fitting 109 Prophet models takes 4-8 minutes depending on the machine. In a Streamlit app, this has to happen at startup. The pattern I settled on: compute at import time and store in a module-level dictionary, protected by @st.cache_resource. It works but it is fragile. A proper solution would pre-compute and serialise all models at pipeline time and load them from joblib files at app start.
Use MLflow earlier in the Prophet projects. The time-series projects tracked metrics manually (storing MAE in CSV files). MLflow would have made the experiment comparison much cleaner.
Conclusion
ML as a data engineer looks different from ML as a data scientist. You are not exploring in notebooks -- you are building pipelines that run on a schedule, produce reproducible results, and feed into dashboards that non-technical stakeholders will use to make decisions.
The tools that work for this are not glamorous: XGBoost for tabular data, Prophet for time series, SHAP for explainability, and a straightforward preprocessing pipeline from scikit-learn. They are boring in the best sense. They are predictable, well-documented, fast to iterate on, and genuinely good at what they do.
188 Prophet models, three XGBoost regressors with R² above 0.88, 604 job embeddings queryable in milliseconds, financial sentiment across Kenyan business news. All built in Python, all running locally, all integrated into Airflow pipelines and Streamlit dashboards. You do not need to be a data scientist to ship ML in production. You need to know which tools to reach for and which gotchas to watch out for.
The full cheatsheet with every code pattern referenced in this article is in my GitHub repository.
Follow for more data engineering from production experience.
Built across Japan Car Advisory, Africa Health ML, Kenya Crop Yield Forecaster, EAC Inflation Forecaster, Africa Development Trajectory Forecaster, BizPulse Kenya, and JobSense -- all pushed to GitHub with full source code.
Top comments (0)