Case Study 1: StreamFlow's MLflow Experiment Tracking Pipeline
Background
StreamFlow's data science team has been building a subscriber churn prediction model across this textbook. They built a logistic regression baseline in Chapter 11, trained a Random Forest in Chapter 13, and ran XGBoost and LightGBM head-to-head in Chapter 14. In Chapter 18, they ran a Bayesian hyperparameter search with Optuna. In Chapter 19, they interpreted the model with SHAP.
Here is the problem: they did all of this in Jupyter notebooks.
The VP of Data asks a reasonable question at the quarterly review: "What were the exact hyperparameters of the model we deployed? How does it compare to the model we had three months ago? And can we reproduce it?"
The team's lead, Priya, opens a Google Sheet labeled "Churn Model Experiments." It has 89 rows. Row 47 says "best LightGBM" with a learning rate of 0.05 and an AUC of 0.8834. But the deployed model's AUC on the monitoring dashboard is 0.8791. Row 47 also has no record of the data version, the feature set, or the random seed. And the model artifact is a pickle file named lgbm_churn_best_v2_FINAL.pkl in a shared drive.
Priya decides this will never happen again. This case study is the migration: from notebooks and spreadsheets to MLflow.
Phase 1: Setting Up the Tracking Infrastructure
StreamFlow runs their ML workloads on AWS. Priya sets up MLflow with a PostgreSQL backend and S3 artifact store.
# Production MLflow server setup
# (In practice, this runs in a Docker container or on an EC2 instance)
pip install mlflow psycopg2-binary boto3
mlflow server \
--backend-store-uri postgresql://mlflow_user:secure_password@mlflow-db.internal:5432/mlflow \
--default-artifact-root s3://streamflow-ml-artifacts/experiments \
--host 0.0.0.0 \
--port 5000
For this case study, we will use a local setup that mirrors the production pattern:
import mlflow
import mlflow.xgboost
import mlflow.lightgbm
import xgboost as xgb
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
roc_auc_score, f1_score, log_loss, precision_score,
recall_score, average_precision_score
)
import hashlib
import json
import time
import subprocess
# --- MLflow Configuration ---
mlflow.set_tracking_uri("http://127.0.0.1:5000")
# Create the experiment with tags
experiment_name = "streamflow-churn-xgb-v3"
mlflow.set_experiment(experiment_name)
client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name(experiment_name)
Phase 2: Data Preparation with Fingerprinting
The first rule of the new regime: every run logs a data fingerprint.
np.random.seed(42)
n = 50000
streamflow = pd.DataFrame({
'monthly_hours_watched': np.random.exponential(18, n).round(1),
'sessions_last_30d': np.random.poisson(14, n),
'avg_session_minutes': np.random.exponential(28, n).round(1),
'unique_titles_watched': np.random.poisson(8, n),
'content_completion_rate': np.random.beta(3, 2, n).round(3),
'binge_sessions_30d': np.random.poisson(2, n),
'weekend_ratio': np.random.beta(2.5, 3, n).round(3),
'peak_hour_ratio': np.random.beta(3, 2, n).round(3),
'hours_change_pct': np.random.normal(0, 30, n).round(1),
'sessions_change_pct': np.random.normal(0, 25, n).round(1),
'months_active': np.random.randint(1, 60, n),
'plan_price': np.random.choice([9.99, 14.99, 24.99, 29.99], n,
p=[0.35, 0.35, 0.20, 0.10]),
'devices_used': np.random.randint(1, 6, n),
'profiles_active': np.random.randint(1, 5, n),
'payment_failures_6m': np.random.poisson(0.3, n),
'used_promo': np.random.binomial(1, 0.25, n),
'support_tickets_90d': np.random.poisson(1.2, n),
'negative_sentiment_tickets': np.random.poisson(0.3, n),
'genre_diversity': np.random.uniform(0.1, 1.0, n).round(3),
})
# Realistic churn signal
churn_score = (
-0.025 * streamflow['months_active']
- 0.03 * streamflow['monthly_hours_watched']
+ 0.12 * streamflow['support_tickets_90d']
+ 0.25 * streamflow['negative_sentiment_tickets']
+ 0.35 * streamflow['payment_failures_6m']
- 0.02 * streamflow['sessions_last_30d']
- 0.3 * streamflow['content_completion_rate']
- 0.4 * streamflow['genre_diversity']
- 0.008 * streamflow['hours_change_pct']
+ np.random.normal(0, 0.5, n)
)
churn_prob = 1 / (1 + np.exp(-churn_score))
streamflow['churned'] = (np.random.random(n) < churn_prob).astype(int)
X = streamflow.drop(columns=['churned'])
y = streamflow['churned']
# Three-way split
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=0.15, stratify=y, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=0.176, stratify=y_temp, random_state=42
)
print(f"Train: {len(X_train):,} Val: {len(X_val):,} Test: {len(X_test):,}")
print(f"Churn rate --- Train: {y_train.mean():.3f} "
f"Val: {y_val.mean():.3f} Test: {y_test.mean():.3f}")
print(f"Features: {list(X_train.columns)}")
Train: 35,000 Val: 7,500 Test: 7,500
Churn rate --- Train: 0.323 Val: 0.324 Test: 0.322
Features: ['monthly_hours_watched', 'sessions_last_30d', 'avg_session_minutes',
'unique_titles_watched', 'content_completion_rate', 'binge_sessions_30d',
'weekend_ratio', 'peak_hour_ratio', 'hours_change_pct', 'sessions_change_pct',
'months_active', 'plan_price', 'devices_used', 'profiles_active',
'payment_failures_6m', 'used_promo', 'support_tickets_90d',
'negative_sentiment_tickets', 'genre_diversity']
Now the data fingerprint utility:
def compute_data_fingerprint(df):
"""Compute a SHA-256 hash and summary statistics for a DataFrame."""
data_hash = hashlib.sha256(
pd.util.hash_pandas_object(df).values.tobytes()
).hexdigest()[:16]
return {
"data_hash": data_hash,
"n_rows": len(df),
"n_cols": df.shape[1],
"columns": json.dumps(list(df.columns)),
}
def log_data_metadata(X_train, y_train, X_val, y_val, X_test, y_test):
"""Log comprehensive data metadata to the active MLflow run."""
fingerprint = compute_data_fingerprint(X_train)
mlflow.set_tag("data_hash", fingerprint["data_hash"])
mlflow.set_tag("data_version", "streamflow-v3-2024-03")
mlflow.set_tag("train_rows", str(fingerprint["n_rows"]))
mlflow.set_tag("val_rows", str(len(X_val)))
mlflow.set_tag("test_rows", str(len(X_test)))
mlflow.set_tag("feature_count", str(fingerprint["n_cols"]))
mlflow.set_tag("target_rate_train", f"{y_train.mean():.4f}")
mlflow.set_tag("target_rate_val", f"{y_val.mean():.4f}")
mlflow.set_tag("target_rate_test", f"{y_test.mean():.4f}")
mlflow.set_tag("split_random_state", "42")
mlflow.set_tag("split_test_size", "0.15")
mlflow.set_tag("split_val_size", "0.176")
# Save column list as artifact
with open("data_schema.json", "w") as f:
json.dump({
"columns": list(X_train.columns),
"dtypes": {col: str(dtype) for col, dtype in X_train.dtypes.items()},
"fingerprint": fingerprint,
}, f, indent=2)
mlflow.log_artifact("data_schema.json")
Phase 3: Systematic Hyperparameter Search
Priya's team runs a structured search: 12 XGBoost configurations covering a range of learning rates, depths, and regularization strengths. Every configuration is a child run under a parent.
configs = [
# Baseline configurations
{"learning_rate": 0.1, "max_depth": 4, "subsample": 0.8,
"colsample_bytree": 0.8, "reg_alpha": 0, "reg_lambda": 1},
{"learning_rate": 0.1, "max_depth": 6, "subsample": 0.8,
"colsample_bytree": 0.8, "reg_alpha": 0, "reg_lambda": 1},
{"learning_rate": 0.1, "max_depth": 8, "subsample": 0.8,
"colsample_bytree": 0.8, "reg_alpha": 0, "reg_lambda": 1},
# Lower learning rate, deeper search
{"learning_rate": 0.05, "max_depth": 5, "subsample": 0.8,
"colsample_bytree": 0.8, "reg_alpha": 0.1, "reg_lambda": 1},
{"learning_rate": 0.05, "max_depth": 6, "subsample": 0.8,
"colsample_bytree": 0.8, "reg_alpha": 0.1, "reg_lambda": 1},
{"learning_rate": 0.05, "max_depth": 7, "subsample": 0.85,
"colsample_bytree": 0.7, "reg_alpha": 0.1, "reg_lambda": 1.5},
# Even lower learning rate
{"learning_rate": 0.03, "max_depth": 5, "subsample": 0.8,
"colsample_bytree": 0.8, "reg_alpha": 0.05, "reg_lambda": 1.2},
{"learning_rate": 0.03, "max_depth": 6, "subsample": 0.85,
"colsample_bytree": 0.75, "reg_alpha": 0.05, "reg_lambda": 1.2},
{"learning_rate": 0.03, "max_depth": 7, "subsample": 0.8,
"colsample_bytree": 0.8, "reg_alpha": 0.05, "reg_lambda": 1.2},
# High regularization variants
{"learning_rate": 0.05, "max_depth": 6, "subsample": 0.75,
"colsample_bytree": 0.7, "reg_alpha": 0.5, "reg_lambda": 3.0},
{"learning_rate": 0.03, "max_depth": 6, "subsample": 0.8,
"colsample_bytree": 0.8, "reg_alpha": 0.3, "reg_lambda": 2.0},
{"learning_rate": 0.03, "max_depth": 8, "subsample": 0.9,
"colsample_bytree": 0.8, "reg_alpha": 0.1, "reg_lambda": 1.5},
]
def evaluate_model(model, X, y, prefix="val"):
"""Compute standard metrics and return as a dict."""
y_proba = model.predict_proba(X)[:, 1]
y_pred = model.predict(X)
return {
f"{prefix}_auc": roc_auc_score(y, y_proba),
f"{prefix}_f1": f1_score(y, y_pred),
f"{prefix}_precision": precision_score(y, y_pred),
f"{prefix}_recall": recall_score(y, y_pred),
f"{prefix}_avg_precision": average_precision_score(y, y_proba),
f"{prefix}_log_loss": log_loss(y, y_proba),
}
# --- Parent run for the entire search ---
with mlflow.start_run(run_name="xgb-grid-search-2024-03") as parent_run:
mlflow.set_tag("purpose", "hyperparameter_search")
mlflow.set_tag("search_method", "grid")
mlflow.set_tag("total_configs", str(len(configs)))
mlflow.set_tag("model_family", "xgboost")
best_val_auc = 0
best_run_id = None
all_results = []
for i, config in enumerate(configs):
run_name = (f"xgb-{i+1:02d}-lr{config['learning_rate']}"
f"-d{config['max_depth']}")
with mlflow.start_run(run_name=run_name, nested=True) as child_run:
# Log ALL parameters
full_params = {
**config,
"n_estimators": 3000,
"early_stopping_rounds": 50,
"eval_metric": "logloss",
"random_state": 42,
"scale_pos_weight": round(
(1 - y_train.mean()) / y_train.mean(), 4
),
"n_jobs": -1,
}
mlflow.log_params(full_params)
# Log data metadata
log_data_metadata(X_train, y_train, X_val, y_val, X_test, y_test)
# Log code version
mlflow.set_tag("model_type", "XGBClassifier")
mlflow.set_tag("search_index", str(i + 1))
# Train
start_time = time.time()
model = xgb.XGBClassifier(
n_estimators=3000,
early_stopping_rounds=50,
eval_metric="logloss",
random_state=42,
n_jobs=-1,
**config,
)
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
verbose=False,
)
train_time = time.time() - start_time
# Evaluate on validation set
val_metrics = evaluate_model(model, X_val, y_val, prefix="val")
val_metrics["best_iteration"] = model.best_iteration
val_metrics["train_time_seconds"] = round(train_time, 2)
mlflow.log_metrics(val_metrics)
# Log the model artifact
mlflow.xgboost.log_model(model, artifact_path="model")
# Log feature importance
importance = pd.DataFrame({
"feature": X_train.columns,
"importance": model.feature_importances_,
}).sort_values("importance", ascending=False)
importance.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
# Track best
if val_metrics["val_auc"] > best_val_auc:
best_val_auc = val_metrics["val_auc"]
best_run_id = child_run.info.run_id
all_results.append({
"run_name": run_name,
"run_id": child_run.info.run_id[:8],
"val_auc": val_metrics["val_auc"],
"val_f1": val_metrics["val_f1"],
"val_log_loss": val_metrics["val_log_loss"],
"trees": model.best_iteration,
"time_s": train_time,
})
print(f" [{i+1:2d}/{len(configs)}] {run_name}: "
f"AUC={val_metrics['val_auc']:.4f} "
f"F1={val_metrics['val_f1']:.4f} "
f"Trees={model.best_iteration} "
f"Time={train_time:.1f}s")
# Log summary on parent run
mlflow.log_metric("best_val_auc", best_val_auc)
mlflow.set_tag("best_child_run_id", best_run_id)
# Save results summary as artifact on parent
results_df = pd.DataFrame(all_results).sort_values("val_auc", ascending=False)
results_df.to_csv("search_results_summary.csv", index=False)
mlflow.log_artifact("search_results_summary.csv")
print("\n" + "=" * 80)
print("SEARCH RESULTS (sorted by validation AUC)")
print("=" * 80)
print(results_df.to_string(index=False))
[ 1/12] xgb-01-lr0.1-d4: AUC=0.8791 F1=0.7384 Trees=187 Time=2.1s
[ 2/12] xgb-02-lr0.1-d6: AUC=0.8823 F1=0.7426 Trees=142 Time=2.4s
[ 3/12] xgb-03-lr0.1-d8: AUC=0.8810 F1=0.7401 Trees=98 Time=2.8s
[ 4/12] xgb-04-lr0.05-d5: AUC=0.8836 F1=0.7448 Trees=267 Time=3.2s
[ 5/12] xgb-05-lr0.05-d6: AUC=0.8847 F1=0.7461 Trees=289 Time=3.8s
[ 6/12] xgb-06-lr0.05-d7: AUC=0.8854 F1=0.7472 Trees=305 Time=4.1s
[ 7/12] xgb-07-lr0.03-d5: AUC=0.8839 F1=0.7445 Trees=483 Time=5.1s
[ 8/12] xgb-08-lr0.03-d6: AUC=0.8858 F1=0.7480 Trees=498 Time=5.9s
[ 9/12] xgb-09-lr0.03-d7: AUC=0.8862 F1=0.7489 Trees=517 Time=6.2s
[10/12] xgb-10-lr0.05-d6: AUC=0.8819 F1=0.7418 Trees=261 Time=3.4s
[11/12] xgb-11-lr0.03-d6: AUC=0.8842 F1=0.7451 Trees=470 Time=5.5s
[12/12] xgb-12-lr0.03-d8: AUC=0.8849 F1=0.7465 Trees=490 Time=6.8s
================================================================================
SEARCH RESULTS (sorted by validation AUC)
================================================================================
run_name run_id val_auc val_f1 val_log_loss trees time_s
xgb-09-lr0.03-d7 a8b3c2d1 0.8862 0.7489 0.2890 517 6.2
xgb-08-lr0.03-d6 f4e7d6c5 0.8858 0.7480 0.2897 498 5.9
xgb-06-lr0.05-d7 c9d2e5f8 0.8854 0.7472 0.2903 305 4.1
xgb-12-lr0.03-d8 b6c8d0e2 0.8849 0.7465 0.2911 490 6.8
xgb-05-lr0.05-d6 b2c4e6a8 0.8847 0.7461 0.2917 289 3.8
xgb-11-lr0.03-d6 e3f5a7b9 0.8842 0.7451 0.2924 470 5.5
xgb-07-lr0.03-d5 d1e3f5a7 0.8839 0.7445 0.2928 483 5.1
xgb-04-lr0.05-d5 a5b7c9d1 0.8836 0.7448 0.2932 267 3.2
xgb-02-lr0.1-d6 c5d7e9f1 0.8823 0.7426 0.2946 142 2.4
xgb-10-lr0.05-d6 f1a3b5c7 0.8819 0.7418 0.2951 261 3.4
xgb-03-lr0.1-d8 d3e5f7a9 0.8810 0.7401 0.2963 98 2.8
xgb-01-lr0.1-d4 e7f9a1b3 0.8791 0.7384 0.2981 187 2.1
Phase 4: Final Evaluation and Model Registration
The best configuration is run 9 (lr=0.03, max_depth=7). But that is the validation AUC. The team must evaluate on the held-out test set exactly once, then register the model.
# Retrain the best configuration and evaluate on the test set
best_config = configs[8] # xgb-09-lr0.03-d7
with mlflow.start_run(run_name="xgb-final-best-lr0.03-d7"):
full_params = {
**best_config,
"n_estimators": 3000,
"early_stopping_rounds": 50,
"eval_metric": "logloss",
"random_state": 42,
"n_jobs": -1,
}
mlflow.log_params(full_params)
log_data_metadata(X_train, y_train, X_val, y_val, X_test, y_test)
mlflow.set_tag("purpose", "final_evaluation")
mlflow.set_tag("model_type", "XGBClassifier")
mlflow.set_tag("promoted_from_search", "xgb-grid-search-2024-03")
# Train
final_model = xgb.XGBClassifier(
n_estimators=3000,
early_stopping_rounds=50,
eval_metric="logloss",
random_state=42,
n_jobs=-1,
**best_config,
)
final_model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
verbose=False,
)
# Evaluate on VALIDATION set (for comparison to search)
val_metrics = evaluate_model(final_model, X_val, y_val, prefix="val")
val_metrics["best_iteration"] = final_model.best_iteration
mlflow.log_metrics(val_metrics)
# Evaluate on TEST set (the one true evaluation)
test_metrics = evaluate_model(final_model, X_test, y_test, prefix="test")
mlflow.log_metrics(test_metrics)
# Log the model with registration
mlflow.xgboost.log_model(
final_model,
artifact_path="model",
registered_model_name="streamflow-churn-predictor",
)
# Log feature importance
importance = pd.DataFrame({
"feature": X_train.columns,
"importance": final_model.feature_importances_,
}).sort_values("importance", ascending=False)
importance.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
# Log predictions on test set for post-hoc analysis
test_predictions = pd.DataFrame({
"y_true": y_test.values,
"y_pred": final_model.predict(X_test),
"y_proba": final_model.predict_proba(X_test)[:, 1],
})
test_predictions.to_csv("test_predictions.csv", index=False)
mlflow.log_artifact("test_predictions.csv")
print("FINAL MODEL EVALUATION")
print("=" * 50)
print(f"Validation AUC: {val_metrics['val_auc']:.4f}")
print(f"Test AUC: {test_metrics['test_auc']:.4f}")
print(f"Test F1: {test_metrics['test_f1']:.4f}")
print(f"Test Precision: {test_metrics['test_precision']:.4f}")
print(f"Test Recall: {test_metrics['test_recall']:.4f}")
print(f"Test Log Loss: {test_metrics['test_log_loss']:.4f}")
print(f"Best Iteration: {final_model.best_iteration}")
print(f"\nModel registered as: streamflow-churn-predictor v1")
FINAL MODEL EVALUATION
==================================================
Validation AUC: 0.8862
Test AUC: 0.8848
Test F1: 0.7471
Test Precision: 0.7623
Test Recall: 0.7324
Test Log Loss: 0.2924
Best Iteration: 517
Model registered as: streamflow-churn-predictor v1
Key Observation --- The test AUC (0.8848) is slightly lower than the validation AUC (0.8862). This is expected and healthy. A small gap (0.0014) indicates the model generalizes well. A large gap would suggest overfitting to the validation set through repeated selection. If the test AUC were higher than the validation AUC, you should be suspicious of data leakage.
Phase 5: Model Registry Management
# Assign aliases for the deployment pipeline
client = mlflow.tracking.MlflowClient()
# Set the champion alias
client.set_registered_model_alias(
name="streamflow-churn-predictor",
alias="champion",
version=1,
)
# Verify
model_info = client.get_model_version_by_alias(
name="streamflow-churn-predictor",
alias="champion",
)
print(f"Champion model: version {model_info.version}")
print(f"Source run: {model_info.run_id[:12]}")
print(f"Created: {model_info.creation_timestamp}")
Champion model: version 1
Source run: c7d9e1f3a5b7
Created: 1711324800000
Now the deployment pipeline can always load the current champion:
# This is what the deployment pipeline uses
champion_model = mlflow.pyfunc.load_model(
"models:/streamflow-churn-predictor@champion"
)
# Verify it works
sample = X_test.iloc[:5]
predictions = champion_model.predict(sample)
print(f"Sample predictions: {predictions}")
Sample predictions: [0 1 0 0 1]
Phase 6: The Query That Changes Everything
Three months later, the VP asks again: "What hyperparameters produced the model in production?"
This time, Priya runs a query instead of opening a spreadsheet:
client = mlflow.tracking.MlflowClient()
# Get the champion model
champion = client.get_model_version_by_alias(
name="streamflow-churn-predictor",
alias="champion",
)
# Get the run that produced it
run = client.get_run(champion.run_id)
print("=" * 60)
print("PRODUCTION MODEL LINEAGE")
print("=" * 60)
print(f"\nModel: streamflow-churn-predictor v{champion.version}")
print(f"Run ID: {champion.run_id}")
print(f"\n--- Hyperparameters ---")
for key, value in sorted(run.data.params.items()):
print(f" {key}: {value}")
print(f"\n--- Test Metrics ---")
for key, value in sorted(run.data.metrics.items()):
if key.startswith("test_"):
print(f" {key}: {value:.4f}")
print(f"\n--- Data Version ---")
for key, value in sorted(run.data.tags.items()):
if key.startswith("data_"):
print(f" {key}: {value}")
============================================================
PRODUCTION MODEL LINEAGE
============================================================
Model: streamflow-churn-predictor v1
Run ID: c7d9e1f3a5b7d8e9f0a1b2c3d4e5f6a7
--- Hyperparameters ---
colsample_bytree: 0.8
early_stopping_rounds: 50
eval_metric: logloss
learning_rate: 0.03
max_depth: 7
n_estimators: 3000
n_jobs: -1
random_state: 42
reg_alpha: 0.05
reg_lambda: 1.2
subsample: 0.8
--- Test Metrics ---
test_auc: 0.8848
test_avg_precision: 0.8127
test_f1: 0.7471
test_log_loss: 0.2924
test_precision: 0.7623
test_recall: 0.7324
--- Data Version ---
data_hash: a3f7c2e8d1b04a5f
data_version: streamflow-v3-2024-03
That query took three seconds. No spreadsheet archaeology. No guessing. No model_ACTUALLY_final.pkl.
Lessons Learned
1. The migration cost is low; the long-term payoff is enormous. Adding MLflow tracking to an existing training script takes 15-20 lines of code. The team spent one afternoon migrating their pipeline. Within a week, they had more organized experiment history than the previous six months of spreadsheet tracking.
2. The data fingerprint is the most valuable tag. The hyperparameters are important, but the data version is what breaks reproducibility most often. Logging a hash of the training data and the column schema catches silent data pipeline changes that would otherwise go undetected.
3. Nested runs keep the experiment list clean. With 12 configurations per search and multiple searches per week, the experiment list would be unmanageable without parent-child organization. The parent run serves as a summary; the children hold the detail.
4. The Model Registry closes the loop. Without the registry, the path from "best validation AUC" to "model in production" is a series of manual steps prone to error. With the registry, promoting a model is a single alias reassignment, and tracing a production model back to its training run is a single query.
5. The VP never asked about the spreadsheet again. Priya showed her the MLflow UI once. The VP bookmarked it. Now she checks the model comparison dashboard herself before quarterly reviews. That is the real measure of success: the tracking system is useful enough that non-technical stakeholders adopt it voluntarily.
This case study demonstrates MLflow experiment tracking applied to the StreamFlow churn model. Return to the chapter for the full experiment tracking framework.