18 min read

You have spent thirty-four chapters learning data science in pieces. SQL extraction. Feature engineering. Model training. Evaluation. Interpretation. Fairness. Deployment. Monitoring. Business communication. Each chapter isolated one skill and...

Chapter 35: Capstone --- End-to-End ML System

From Business Question to Deployed Model with Monitoring


Learning Objectives

By the end of this chapter, you will be able to:

  1. Integrate all components of the ML lifecycle into a single system
  2. Make and justify architectural decisions under real-world constraints
  3. Present a complete ML system to a technical and non-technical audience
  4. Identify what worked, what didn't, and what you'd do differently
  5. Produce a portfolio-ready project that demonstrates the full data science lifecycle

This Is the Whole Thing

You have spent thirty-four chapters learning data science in pieces. SQL extraction. Feature engineering. Model training. Evaluation. Interpretation. Fairness. Deployment. Monitoring. Business communication. Each chapter isolated one skill and practiced it in a controlled environment.

That is not how the job works.

In the real world, nobody hands you clean data and says "build a model." They hand you a business problem, a vague dataset description, a deadline, three stakeholders with different priorities, and a production system that was last updated eighteen months ago. Your job is to figure out what the right question is, whether the data can answer it, how to build a system that answers it reliably, and how to convince people to trust it.

This chapter integrates everything. The StreamFlow churn prediction system --- which you have been building incrementally since Chapter 1 --- becomes a complete, end-to-end ML product. We walk through every component, every connection between components, and every decision that shaped the system. Then we step back and ask the question that separates junior data scientists from senior ones: "What would I do differently?"

Practitioner Note --- If you have been following the progressive project milestones (M0 through M13), you already have most of the components. This chapter connects them, fills in the gaps, and produces a single coherent system. If you skipped some milestones, this chapter tells you what you need to build and where to find the instructions.


The System at a Glance

Before we dive into components, here is the complete architecture. Every box is something you built in a previous chapter. Every arrow is a data flow or dependency you need to make explicit.

graph TD
    A[Business Question<br/>Ch 1, 34] --> B[SQL Extraction<br/>Ch 5]
    B --> C[Feature Engineering Pipeline<br/>Ch 6-10]
    C --> D[Model Training<br/>Ch 11-18]
    D --> E[Experiment Tracking<br/>Ch 30]
    D --> F[Model Evaluation<br/>Ch 16]
    F --> G[Interpretation<br/>Ch 19]
    F --> H[Fairness Audit<br/>Ch 33]
    G --> I[Deployment<br/>Ch 31]
    H --> I
    I --> J[Monitoring<br/>Ch 32]
    J -->|Drift Detected| D
    I --> K[Stakeholder Dashboard<br/>Ch 34]
    K --> L[ROI Analysis<br/>Ch 34]
    L --> A

    style A fill:#f9f,stroke:#333
    style L fill:#f9f,stroke:#333

Notice the cycle. The system starts with a business question and ends with an ROI analysis that feeds back into the business question. A deployed ML system is not a pipeline that runs once. It is a loop that runs continuously, monitored and maintained by humans who understand every component.


Component 1: The Business Question

Theme: Wrong Problem --- The most expensive mistake in data science is solving the wrong problem. Chapters 1 and 34 covered this in detail. Here, we see it in the context of the full system.

StreamFlow's business question, stated precisely:

"Which subscribers are most likely to cancel within the next 60 days, so that the customer success team can intervene with targeted retention offers before the subscriber makes a cancellation decision?"

This question has four critical components that constrain everything downstream:

  1. Prediction target: Binary churn (cancel/retain) within a 60-day window
  2. Prediction timing: The model must predict before the cancellation decision, which means using features that are available at prediction time (no future leakage)
  3. Consumer of predictions: The customer success team, not an automated system. This means the model needs to be interpretable (SHAP), the output needs to be actionable (a ranked list with reasons), and the threshold needs to be calibrated to the team's capacity
  4. Business metric: Net revenue retained. Not AUC, not precision, not recall. Those are technical proxies. The metric the VP of Customer Success reports to the CFO is dollars saved.

Every architectural decision in this chapter traces back to these four constraints. When you build your own capstone, start here. Define the business question with this level of precision. If you cannot, you are not ready to build.


Component 2: SQL Extraction

The data lives in StreamFlow's production database. Chapter 5 built the extraction query. Here it is in full context:

import pandas as pd
import sqlalchemy

# Connection to the analytics replica (never query production directly)
engine = sqlalchemy.create_engine(
    "postgresql://readonly:***@analytics-replica.streamflow.internal:5432/streamflow"
)

query = """
WITH subscriber_features AS (
    SELECT
        s.subscriber_id,
        s.plan_type,
        s.signup_date,
        s.monthly_charges,
        EXTRACT(EPOCH FROM (CURRENT_DATE - s.signup_date)) / 86400 AS tenure_days,
        COUNT(DISTINCT ss.session_id) AS sessions_last_30d,
        COALESCE(AVG(ss.session_duration_minutes), 0) AS avg_session_minutes,
        COALESCE(
            SUM(CASE WHEN EXTRACT(DOW FROM ss.session_start) IN (0, 6) THEN 1 ELSE 0 END)::FLOAT
            / NULLIF(COUNT(ss.session_id), 0),
            0
        ) AS weekend_ratio,
        COUNT(DISTINCT ss.content_id) AS unique_content_last_30d,
        COALESCE(MAX(ss.session_start), s.signup_date) AS last_activity_date,
        EXTRACT(EPOCH FROM (CURRENT_DATE - COALESCE(MAX(ss.session_start), s.signup_date))) / 86400
            AS days_since_last_activity,
        COUNT(DISTINCT t.ticket_id) AS support_tickets_last_90d,
        s.payment_method,
        s.contract_type,
        s.has_partner,
        s.has_dependents,
        s.age_bucket
    FROM subscribers s
    LEFT JOIN sessions ss
        ON s.subscriber_id = ss.subscriber_id
        AND ss.session_start >= CURRENT_DATE - INTERVAL '30 days'
    LEFT JOIN support_tickets t
        ON s.subscriber_id = t.subscriber_id
        AND t.created_at >= CURRENT_DATE - INTERVAL '90 days'
    WHERE s.status IN ('active', 'churned')
        AND s.signup_date <= CURRENT_DATE - INTERVAL '60 days'
    GROUP BY s.subscriber_id, s.plan_type, s.signup_date,
             s.monthly_charges, s.payment_method, s.contract_type,
             s.has_partner, s.has_dependents, s.age_bucket
),
churn_labels AS (
    SELECT
        subscriber_id,
        CASE WHEN status = 'churned'
             AND churn_date BETWEEN CURRENT_DATE - INTERVAL '60 days' AND CURRENT_DATE
             THEN 1 ELSE 0 END AS churned_60d
    FROM subscribers
)
SELECT f.*, l.churned_60d
FROM subscriber_features f
JOIN churn_labels l ON f.subscriber_id = l.subscriber_id;
"""

df_raw = pd.read_sql(query, engine)
print(f"Extracted {len(df_raw):,} rows, {df_raw.shape[1]} columns")
print(f"Churn rate: {df_raw['churned_60d'].mean():.1%}")

Extraction Decisions and Their Consequences

Decision Rationale Chapter Reference
Analytics replica, not production DB Never risk production performance for analytics queries Ch 5
30-day session window Balances recency with stability; matches business reporting cadence Ch 6
90-day support ticket window Longer window because support interactions are sparse Ch 8
60-day churn label window Matches the business question; gives CS team time to intervene Ch 1
LEFT JOIN on sessions Preserves subscribers with zero sessions (important signal) Ch 5, 8
NULLIF for division Prevents division by zero for inactive subscribers Ch 5

Real World vs. Kaggle --- In a Kaggle competition, someone hands you a CSV. In production, the extraction query is part of the model. If the query changes, the model's input distribution changes. Version your SQL alongside your model code.


Component 3: Feature Engineering Pipeline

Chapter 6 built the features. Chapter 10 made the pipeline reproducible. Here they come together as a single scikit-learn pipeline that transforms raw extraction output into model-ready features.

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer

# Feature definitions
NUMERIC_FEATURES = [
    "tenure_days", "monthly_charges", "sessions_last_30d",
    "avg_session_minutes", "weekend_ratio", "unique_content_last_30d",
    "days_since_last_activity", "support_tickets_last_90d",
]

CATEGORICAL_FEATURES = [
    "plan_type", "payment_method", "contract_type",
    "has_partner", "has_dependents", "age_bucket",
]

ENGINEERED_FEATURES = [
    "engagement_intensity",    # sessions_last_30d / tenure_days
    "content_diversity_ratio", # unique_content / sessions (how much they explore)
    "recency_score",           # 1 / (1 + days_since_last_activity)
    "support_burden",          # support_tickets / tenure_months
    "charges_per_session",     # monthly_charges / max(sessions_last_30d, 1)
]

def create_engineered_features(df):
    """Feature engineering from Chapter 6."""
    df = df.copy()
    df["engagement_intensity"] = df["sessions_last_30d"] / np.maximum(df["tenure_days"], 1)
    df["content_diversity_ratio"] = (
        df["unique_content_last_30d"] / np.maximum(df["sessions_last_30d"], 1)
    )
    df["recency_score"] = 1.0 / (1.0 + df["days_since_last_activity"])
    df["support_burden"] = (
        df["support_tickets_last_90d"] / np.maximum(df["tenure_days"] / 30, 1)
    )
    df["charges_per_session"] = (
        df["monthly_charges"] / np.maximum(df["sessions_last_30d"], 1)
    )
    return df

# Full preprocessing pipeline
numeric_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler()),
])

categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])

preprocessor = ColumnTransformer([
    ("numeric", numeric_pipeline, NUMERIC_FEATURES + ENGINEERED_FEATURES),
    ("categorical", categorical_pipeline, CATEGORICAL_FEATURES),
])

Why a Pipeline, Not a Script

The pipeline is a single serializable object that encodes every preprocessing decision. When we deploy the model (Component 6), the prediction endpoint loads the pipeline and the model together. The pipeline guarantees that the same transformations applied during training are applied during inference. No copy-paste errors. No "I forgot to scale the features." No divergence between the training notebook and the production code.

Reproducibility --- If your training code and your inference code apply different transformations to the same features, your model is wrong in production even if it was right in the notebook. The pipeline prevents this. Serialize it. Version it. Test it.


Component 4: Model Training with Experiment Tracking

Chapter 14 built the gradient boosting model. Chapter 18 tuned its hyperparameters. Chapter 30 tracked the experiments. Here is the integrated training script:

import lightgbm as lgb
import mlflow
import mlflow.lightgbm
from sklearn.model_selection import train_test_split

# Experiment setup
mlflow.set_experiment("streamflow-churn-v2")

# Prepare data
df_engineered = create_engineered_features(df_raw)
X = df_engineered[NUMERIC_FEATURES + ENGINEERED_FEATURES + CATEGORICAL_FEATURES]
y = df_engineered["churned_60d"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Apply preprocessing
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)

# Best hyperparameters from Chapter 18
params = {
    "objective": "binary",
    "metric": "auc",
    "learning_rate": 0.05,
    "num_leaves": 31,
    "max_depth": 6,
    "min_child_samples": 20,
    "subsample": 0.8,
    "colsample_bytree": 0.8,
    "reg_alpha": 0.1,
    "reg_lambda": 1.0,
    "n_estimators": 2000,
    "random_state": 42,
    "verbose": -1,
}

with mlflow.start_run(run_name="production-candidate-v2") as run:
    # Log parameters
    mlflow.log_params(params)
    mlflow.log_param("n_features", X_train_processed.shape[1])
    mlflow.log_param("n_train_samples", X_train_processed.shape[0])
    mlflow.log_param("churn_rate_train", y_train.mean())

    # Train with early stopping
    model = lgb.LGBMClassifier(**params)
    model.fit(
        X_train_processed, y_train,
        eval_set=[(X_test_processed, y_test)],
        callbacks=[
            lgb.early_stopping(stopping_rounds=50),
            lgb.log_evaluation(period=100),
        ],
    )

    # Evaluate
    from sklearn.metrics import roc_auc_score, average_precision_score, log_loss

    y_proba = model.predict_proba(X_test_processed)[:, 1]

    metrics = {
        "test_auc": roc_auc_score(y_test, y_proba),
        "test_avg_precision": average_precision_score(y_test, y_proba),
        "test_log_loss": log_loss(y_test, y_proba),
        "best_iteration": model.best_iteration_,
    }
    mlflow.log_metrics(metrics)

    # Log model and preprocessor
    mlflow.lightgbm.log_model(model, "model")
    mlflow.sklearn.log_model(preprocessor, "preprocessor")

    # Log the run ID for downstream components
    print(f"Run ID: {run.info.run_id}")
    for name, value in metrics.items():
        print(f"  {name}: {value:.4f}")

The Experiment Tracking Decision

Every training run is logged. Every hyperparameter combination, every metric, every model artifact. When the VP asks "why did we choose this model?" you can pull up the MLflow UI, show the 47 runs you tried, and point to the one with the best AUC-precision-recall tradeoff at the business-optimal threshold. This is not busywork. This is how you build trust with technical reviewers and maintain your own sanity six months from now.

Model Decay --- The model you deploy today will degrade. When you retrain in three months, experiment tracking lets you compare the new model against the old one on the same metrics. Without it, you are starting from scratch every time.


Component 5: Model Interpretation with SHAP

Chapter 19 covered SHAP values. In the capstone, interpretation serves two purposes: debugging the model (for you) and explaining predictions (for the customer success team).

import shap

# Global interpretation: what features matter most?
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test_processed)

# Feature names after preprocessing
feature_names = (
    NUMERIC_FEATURES + ENGINEERED_FEATURES +
    list(preprocessor.named_transformers_["categorical"]
         .named_steps["encoder"]
         .get_feature_names_out(CATEGORICAL_FEATURES))
)

# Summary plot (global)
shap.summary_plot(shap_values, X_test_processed, feature_names=feature_names, show=False)

# Log to MLflow
import matplotlib.pyplot as plt
plt.savefig("shap_summary.png", dpi=150, bbox_inches="tight")
mlflow.log_artifact("shap_summary.png")
plt.close()

Per-Prediction Explanations

The customer success team does not want a global feature importance chart. They want to know, for each subscriber on the high-risk list, why the model thinks this person will churn.

def explain_prediction(subscriber_features, model, explainer, feature_names, top_k=3):
    """Generate human-readable explanation for a single prediction."""
    shap_vals = explainer.shap_values(subscriber_features)
    proba = model.predict_proba(subscriber_features)[:, 1][0]

    # Get top contributing features
    feature_contributions = list(zip(feature_names, shap_vals[0]))
    feature_contributions.sort(key=lambda x: abs(x[1]), reverse=True)

    explanations = []
    for feat_name, shap_val in feature_contributions[:top_k]:
        direction = "increases" if shap_val > 0 else "decreases"
        explanations.append({
            "feature": feat_name,
            "shap_value": round(float(shap_val), 4),
            "direction": direction,
            "impact": "high" if abs(shap_val) > 0.5 else "moderate",
        })

    return {
        "churn_probability": round(float(proba), 4),
        "risk_level": "high" if proba >= 0.20 else "medium" if proba >= 0.10 else "low",
        "top_reasons": explanations,
    }

This function powers the API response. When the customer success team queries a subscriber, they get a probability, a risk level, and the three features most responsible for that prediction. "This subscriber is high risk because: (1) their session frequency dropped 60% in the last 30 days, (2) they filed two support tickets in the last month, and (3) they are on a month-to-month contract." That is actionable. An AUC score is not.


Component 6: Fairness Audit

Chapter 33 built the fairness audit. In the capstone, we run it as a gate before deployment. The model does not go to production until the fairness audit passes.

from sklearn.metrics import roc_auc_score, precision_score, recall_score

def fairness_audit(y_true, y_proba, sensitive_attribute, threshold=0.20):
    """
    Audit model fairness across groups defined by a sensitive attribute.
    Returns per-group metrics and disparity ratios.
    """
    y_pred = (y_proba >= threshold).astype(int)
    groups = sorted(sensitive_attribute.unique())

    results = {}
    for group in groups:
        mask = sensitive_attribute == group
        if mask.sum() < 50:  # Skip groups too small for reliable metrics
            continue
        results[group] = {
            "n": int(mask.sum()),
            "base_rate": float(y_true[mask].mean()),
            "positive_rate": float(y_pred[mask].mean()),
            "auc": float(roc_auc_score(y_true[mask], y_proba[mask])),
            "precision": float(precision_score(y_true[mask], y_pred[mask], zero_division=0)),
            "recall": float(recall_score(y_true[mask], y_pred[mask], zero_division=0)),
        }

    # Compute disparity ratios (each group vs. overall)
    overall_positive_rate = y_pred.mean()
    overall_recall = recall_score(y_true, y_pred, zero_division=0)

    for group in results:
        results[group]["demographic_parity_ratio"] = (
            results[group]["positive_rate"] / max(overall_positive_rate, 1e-10)
        )
        results[group]["equalized_odds_ratio"] = (
            results[group]["recall"] / max(overall_recall, 1e-10)
        )

    return results

# Run the audit
audit_results = fairness_audit(
    y_true=y_test,
    y_proba=y_proba,
    sensitive_attribute=X_test["age_bucket"],
    threshold=0.20,
)

# Check: is the demographic parity ratio within [0.80, 1.25] for all groups?
for group, metrics in audit_results.items():
    dpr = metrics["demographic_parity_ratio"]
    status = "PASS" if 0.80 <= dpr <= 1.25 else "FAIL"
    print(f"  {group}: DPR = {dpr:.3f} [{status}]")

Tradeoffs --- The 0.80 threshold for demographic parity ratio is a policy decision, not a mathematical constant. Chapter 33 discussed why. The point is that you have a threshold, you document it, and you enforce it before deployment. If the audit fails, you go back to Component 3 (feature engineering) or Component 4 (training) and fix the problem. You do not deploy a model that systematically disadvantages a subgroup and hope nobody notices.


Component 7: FastAPI Deployment

Chapter 31 built the API. Here is the full production endpoint, integrated with the preprocessing pipeline, model, and SHAP explainer:

# app.py
import joblib
import numpy as np
import shap
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(
    title="StreamFlow Churn Prediction API",
    version="2.0.0",
    description="Predicts subscriber churn probability with SHAP explanations.",
)

# Load artifacts at startup
preprocessor = joblib.load("artifacts/preprocessor.joblib")
model = joblib.load("artifacts/model.joblib")
explainer = shap.TreeExplainer(model)
feature_names = joblib.load("artifacts/feature_names.joblib")


class SubscriberFeatures(BaseModel):
    """Input schema: raw subscriber features from the database."""
    tenure_days: float = Field(..., ge=0, description="Days since signup")
    monthly_charges: float = Field(..., ge=0, description="Monthly subscription cost")
    sessions_last_30d: int = Field(..., ge=0, description="Session count, last 30 days")
    avg_session_minutes: float = Field(..., ge=0, description="Average session length")
    weekend_ratio: float = Field(..., ge=0, le=1, description="Fraction of weekend sessions")
    unique_content_last_30d: int = Field(..., ge=0, description="Unique content items viewed")
    days_since_last_activity: float = Field(..., ge=0, description="Days since last session")
    support_tickets_last_90d: int = Field(..., ge=0, description="Support tickets, last 90 days")
    plan_type: str = Field(..., description="Subscription plan: basic/standard/premium/family")
    payment_method: str
    contract_type: str
    has_partner: str
    has_dependents: str
    age_bucket: str


class ShapExplanation(BaseModel):
    feature: str
    shap_value: float
    direction: str
    impact: str


class PredictionResponse(BaseModel):
    subscriber_id: Optional[str] = None
    churn_probability: float
    risk_level: str
    top_reasons: List[ShapExplanation]
    model_version: str = "2.0.0"


@app.get("/health")
def health_check():
    return {"status": "healthy", "model_version": "2.0.0"}


@app.post("/predict", response_model=PredictionResponse)
def predict(features: SubscriberFeatures):
    try:
        # Convert to DataFrame (matches pipeline expectations)
        import pandas as pd
        df = pd.DataFrame([features.model_dump()])

        # Engineer features
        df["engagement_intensity"] = df["sessions_last_30d"] / np.maximum(df["tenure_days"], 1)
        df["content_diversity_ratio"] = (
            df["unique_content_last_30d"] / np.maximum(df["sessions_last_30d"], 1)
        )
        df["recency_score"] = 1.0 / (1.0 + df["days_since_last_activity"])
        df["support_burden"] = (
            df["support_tickets_last_90d"] / np.maximum(df["tenure_days"] / 30, 1)
        )
        df["charges_per_session"] = (
            df["monthly_charges"] / np.maximum(df["sessions_last_30d"], 1)
        )

        # Preprocess
        X_processed = preprocessor.transform(df)

        # Predict
        proba = float(model.predict_proba(X_processed)[:, 1][0])

        # Explain
        shap_vals = explainer.shap_values(X_processed)
        contributions = sorted(
            zip(feature_names, shap_vals[0]),
            key=lambda x: abs(x[1]),
            reverse=True,
        )[:3]

        explanations = [
            ShapExplanation(
                feature=name,
                shap_value=round(float(val), 4),
                direction="increases" if val > 0 else "decreases",
                impact="high" if abs(val) > 0.5 else "moderate",
            )
            for name, val in contributions
        ]

        risk_level = "high" if proba >= 0.20 else "medium" if proba >= 0.10 else "low"

        logger.info(f"Prediction: probability={proba:.4f}, risk={risk_level}")

        return PredictionResponse(
            churn_probability=round(proba, 4),
            risk_level=risk_level,
            top_reasons=explanations,
        )

    except Exception as e:
        logger.error(f"Prediction failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

Deployment Architecture

graph LR
    subgraph "Docker Container"
        A[FastAPI App] --> B[Preprocessor<br/>joblib]
        A --> C[LightGBM Model<br/>joblib]
        A --> D[SHAP Explainer]
    end
    E[Customer Success<br/>Dashboard] -->|HTTP POST /predict| A
    F[Batch Scoring<br/>Cron Job] -->|HTTP POST /predict| A
    G[Monitoring<br/>Service] -->|HTTP GET /health| A
    A -->|Logs| H[Logging Service]

Two consumers. The customer success dashboard makes real-time predictions when a team member views a subscriber profile. The batch scoring job runs nightly, scores all active subscribers, and writes the results to a database table that powers the weekly "high-risk" list. Both use the same endpoint. Same model, same preprocessing, same explanations.


Component 8: Monitoring Dashboard

Chapter 32 built the monitoring system. In the capstone, monitoring covers three concerns: model performance, data drift, and business impact.

import pandas as pd
import numpy as np
from scipy import stats
from datetime import datetime, timedelta


class ChurnModelMonitor:
    """
    Production monitoring for the StreamFlow churn model.
    Tracks data drift (PSI), performance (AUC on labeled data),
    and business metrics (intervention outcomes).
    """

    def __init__(self, reference_data: pd.DataFrame, feature_columns: list):
        self.reference_data = reference_data
        self.feature_columns = feature_columns
        self.alerts = []

    def compute_psi(self, production_data: pd.DataFrame, n_bins: int = 10) -> dict:
        """Compute Population Stability Index for each feature."""
        psi_results = {}
        for col in self.feature_columns:
            if production_data[col].dtype in ["object", "category"]:
                continue  # PSI for numeric features only

            ref = self.reference_data[col].dropna().values
            prod = production_data[col].dropna().values

            # Create bins from reference distribution
            bins = np.percentile(ref, np.linspace(0, 100, n_bins + 1))
            bins[0] = -np.inf
            bins[-1] = np.inf

            ref_counts = np.histogram(ref, bins=bins)[0] / len(ref)
            prod_counts = np.histogram(prod, bins=bins)[0] / len(prod)

            # Avoid division by zero
            ref_counts = np.clip(ref_counts, 1e-4, None)
            prod_counts = np.clip(prod_counts, 1e-4, None)

            psi = float(np.sum((prod_counts - ref_counts) * np.log(prod_counts / ref_counts)))
            psi_results[col] = {
                "psi": round(psi, 4),
                "status": "OK" if psi < 0.10 else "WARNING" if psi < 0.25 else "ALERT",
            }

            if psi >= 0.25:
                self.alerts.append({
                    "type": "data_drift",
                    "feature": col,
                    "psi": psi,
                    "timestamp": datetime.now().isoformat(),
                })

        return psi_results

    def compute_performance(self, y_true, y_proba, threshold=0.20) -> dict:
        """Compute production performance metrics (requires labeled data)."""
        from sklearn.metrics import roc_auc_score, precision_score, recall_score

        y_pred = (y_proba >= threshold).astype(int)

        metrics = {
            "auc": round(float(roc_auc_score(y_true, y_proba)), 4),
            "precision": round(float(precision_score(y_true, y_pred, zero_division=0)), 4),
            "recall": round(float(recall_score(y_true, y_pred, zero_division=0)), 4),
            "positive_rate": round(float(y_pred.mean()), 4),
            "actual_churn_rate": round(float(y_true.mean()), 4),
            "n_samples": int(len(y_true)),
            "timestamp": datetime.now().isoformat(),
        }

        # Alert if AUC drops below threshold
        if metrics["auc"] < 0.80:
            self.alerts.append({
                "type": "performance_decay",
                "metric": "auc",
                "value": metrics["auc"],
                "threshold": 0.80,
                "timestamp": datetime.now().isoformat(),
            })

        return metrics

    def compute_business_metrics(self, interventions_df: pd.DataFrame) -> dict:
        """
        Compute business impact from the CRM intervention data.
        interventions_df columns: subscriber_id, predicted_risk, intervened,
                                  retained_60d, plan_value_monthly
        """
        intervened = interventions_df[interventions_df["intervened"]]
        not_intervened = interventions_df[~interventions_df["intervened"]]

        retention_rate_intervened = intervened["retained_60d"].mean()
        retention_rate_control = not_intervened["retained_60d"].mean()

        lift = retention_rate_intervened - retention_rate_control

        revenue_saved = (
            intervened["retained_60d"].sum()
            * intervened["plan_value_monthly"].mean()
            * 12  # annualized
            * lift  # attributable to intervention
        )

        return {
            "n_interventions": int(len(intervened)),
            "retention_rate_intervened": round(float(retention_rate_intervened), 4),
            "retention_rate_control": round(float(retention_rate_control), 4),
            "lift": round(float(lift), 4),
            "estimated_annual_revenue_saved": round(float(revenue_saved), 2),
            "timestamp": datetime.now().isoformat(),
        }

    def get_alerts(self) -> list:
        return self.alerts

When to Retrain

The monitoring system triggers a retrain under three conditions:

  1. Data drift: PSI > 0.25 for any of the top 5 features (by SHAP importance)
  2. Performance decay: AUC drops below 0.80 on two consecutive weekly evaluations
  3. Prior shift: The observed churn rate changes by more than 3 percentage points from the training churn rate

When any of these triggers fire, the system sends an alert. A human reviews the alert, diagnoses the cause, and decides whether to retrain with fresh data, re-engineer features, or investigate a business change that invalidated the model's assumptions.

Practitioner Note --- Automatic retraining sounds appealing. It is also dangerous. A model that retrains on drifted data without human review can learn to reproduce whatever caused the drift --- including bugs in the data pipeline, one-time promotional events, or data quality issues. Always have a human in the loop for the retraining decision. Automate the detection. Keep the decision manual.


Component 9: ROI Analysis

Chapter 34 built the business case. In the capstone, the ROI analysis is not a one-time calculation. It is a monthly report that the monitoring dashboard produces automatically.

def compute_monthly_roi(
    n_subscribers: int = 10000,
    churn_rate: float = 0.12,
    model_recall: float = 0.72,
    model_precision: float = 0.70,
    intervention_success_rate: float = 0.55,
    avg_monthly_value: float = 16.50,
    avg_subscriber_lifetime_months: float = 24,
    intervention_cost_per_subscriber: float = 12.00,
    infrastructure_cost_monthly: float = 2500.00,
) -> dict:
    """
    Compute the monthly ROI of the churn prediction system.
    All assumptions are explicit and adjustable for sensitivity analysis.
    """
    # Subscribers at risk
    churners = int(n_subscribers * churn_rate)
    non_churners = n_subscribers - churners

    # Model predictions
    true_positives = int(churners * model_recall)
    false_negatives = churners - true_positives
    flagged = int(true_positives / max(model_precision, 0.01))
    false_positives = flagged - true_positives

    # Intervention outcomes
    retained_by_intervention = int(true_positives * intervention_success_rate)
    remaining_lifetime_value = avg_monthly_value * avg_subscriber_lifetime_months

    # Revenue
    revenue_saved = retained_by_intervention * remaining_lifetime_value

    # Costs
    intervention_cost = flagged * intervention_cost_per_subscriber
    total_cost = intervention_cost + infrastructure_cost_monthly

    # Net value
    net_monthly_value = revenue_saved - total_cost

    return {
        "churners_detected": true_positives,
        "false_alarms": false_positives,
        "churners_missed": false_negatives,
        "retained_by_intervention": retained_by_intervention,
        "revenue_saved": round(revenue_saved, 2),
        "intervention_cost": round(intervention_cost, 2),
        "infrastructure_cost": round(infrastructure_cost_monthly, 2),
        "total_cost": round(total_cost, 2),
        "net_monthly_value": round(net_monthly_value, 2),
        "roi_ratio": round(revenue_saved / max(total_cost, 1), 2),
    }


roi = compute_monthly_roi()
print("Monthly ROI Report")
print("=" * 50)
for key, value in roi.items():
    if isinstance(value, float):
        print(f"  {key:.<40} ${value:>12,.2f}")
    else:
        print(f"  {key:.<40} {value:>12,}")

The Complete Architecture: What Connects to What

Here is the full system, with every component numbered and every data flow labeled:

graph TD
    subgraph "Data Layer"
        DB[(Production DB)]
        AR[(Analytics Replica)]
        DB -->|Replication| AR
    end

    subgraph "Feature Pipeline"
        SQL[SQL Extraction<br/>Component 2]
        FE[Feature Engineering<br/>Component 3]
        AR --> SQL
        SQL --> FE
    end

    subgraph "Training Pipeline"
        TRAIN[Model Training<br/>Component 4]
        MLFLOW[MLflow<br/>Experiment Tracking]
        SHAP_GLOBAL[SHAP Global<br/>Component 5]
        FAIR[Fairness Audit<br/>Component 6]
        FE --> TRAIN
        TRAIN --> MLFLOW
        TRAIN --> SHAP_GLOBAL
        TRAIN --> FAIR
    end

    subgraph "Serving Layer"
        API[FastAPI Endpoint<br/>Component 7]
        DOCKER[Docker Container]
        API --> DOCKER
    end

    subgraph "Monitoring Layer"
        MON[Drift + Performance<br/>Component 8]
        BIZ[Business Metrics<br/>Component 9]
        ALERT[Alert System]
        MON --> ALERT
        BIZ --> ALERT
    end

    subgraph "Consumer Layer"
        CS[Customer Success<br/>Dashboard]
        BATCH[Nightly Batch<br/>Scoring]
        EXEC[Executive<br/>ROI Report]
    end

    MLFLOW -->|Promote Model| API
    FAIR -->|Gate: Pass/Fail| API
    DOCKER --> CS
    DOCKER --> BATCH
    BATCH --> MON
    MON --> BIZ
    BIZ --> EXEC
    ALERT -->|Retrain Signal| TRAIN

Architectural Decisions Log

Every system has decisions that are invisible in the code but critical in practice. Document them.

Decision Options Considered Chosen Rationale
Serving mode Batch only vs. real-time API Both Dashboard needs real-time; weekly list needs batch
Model framework XGBoost vs. LightGBM LightGBM Faster training, native categorical support, comparable AUC
Deployment Lambda vs. ECS vs. EC2 Docker on ECS Consistent environment, auto-scaling, moderate traffic
Monitoring frequency Hourly vs. daily vs. weekly Daily drift, weekly performance Daily PSI is cheap; weekly performance needs labeled data
Retraining trigger Automatic vs. manual Manual with automated alerts Avoids silent model degradation from pipeline bugs
Fairness gate Advisory vs. blocking Blocking Model does not deploy if fairness audit fails
SHAP computation Real-time vs. cached Real-time (TreeExplainer) Fast enough for single predictions; batch uses cached
Threshold Fixed vs. dynamic Fixed (0.20), reviewed quarterly Stability for CS team; avoids confusion from changing lists

The Three Capstone Tracks

Not every reader needs to build every component. Here are three tracks calibrated to different goals:

Track A: Minimal Viable System

Goal: A working end-to-end system that demonstrates the ML lifecycle.

Components: SQL extraction, feature engineering, model training, evaluation, SHAP interpretation, FastAPI endpoint, basic health check.

Deliverables: - A Jupyter notebook with the full pipeline (data to model) - A FastAPI app with /predict and /health endpoints - A one-page summary of the model's business value

Time estimate: 15--20 hours.

Track B: Full System with Monitoring

Goal: A production-ready system with experiment tracking, fairness audit, monitoring, and ROI analysis.

Components: Everything in Track A, plus MLflow experiment tracking, fairness audit, monitoring dashboard (PSI + performance), monthly ROI report.

Deliverables: - Everything in Track A - MLflow experiment log with at least 5 runs - Fairness audit report showing per-group metrics - A monitoring script that computes PSI and performance on simulated production data - A Dockerfile that packages the API

Time estimate: 30--40 hours.

Track C: Extended System with A/B Test Design and Fairness Analysis

Goal: The complete system, plus the experimental and ethical analysis that would be required in a real deployment.

Components: Everything in Track B, plus an A/B test design for the intervention program, a detailed fairness analysis across multiple protected attributes, a stakeholder presentation deck, and a written retrospective.

Deliverables: - Everything in Track B - An A/B test design document: hypothesis, primary metric, sample size calculation, runtime estimate, guardrail metrics - A multi-attribute fairness analysis (age, plan type, geography) with documentation of any disparities and mitigation steps - A 10-slide stakeholder presentation following the Pyramid Principle (Chapter 34) - A 2-page written retrospective answering: "What worked, what didn't, and what I'd do differently"

Time estimate: 50--60 hours.


The Retrospective: What I Would Do Differently

This is the most important section of the chapter. Not because it contains new technical content, but because it contains the kind of reflection that turns a project into learning.

Here is the honest retrospective for the StreamFlow system:

What Worked

  1. Starting with the business question. Defining the prediction target, the consumer, and the success metric before writing any code kept the project focused. We never built a feature that nobody would use.

  2. The preprocessing pipeline. Serializing the pipeline with the model eliminated the train-serve skew that kills most production models. The same preprocessor.transform() runs in training and in the API.

  3. SHAP explanations in the API response. The customer success team trusts the model because they can see why it flagged a subscriber. "Low session frequency" is something they can act on. A probability is not.

  4. The fairness gate. Making fairness a blocking condition for deployment forced us to deal with age-based disparities during development, not after a PR crisis.

What Didn't Work

  1. The 60-day churn window is too long. By the time the model predicts a subscriber will churn in the next 60 days, many have already mentally decided to leave. A 30-day window with more recent features would catch subscribers earlier, when intervention has a higher success rate. We kept 60 days because the labeled data was available at that window, not because it was optimal.

  2. Feature engineering was manual. The five engineered features were hand-crafted based on domain intuition. We never systematically tested whether they improved the model versus raw features alone. A proper ablation study would have taken two days and might have eliminated two features.

  3. Monitoring requires labeled data that arrives late. Performance monitoring (AUC, precision, recall) requires knowing which subscribers actually churned. That label is only available 60 days after the prediction. Drift monitoring (PSI) works in real-time, but it is a proxy for performance degradation, not a direct measurement. The 60-day label delay means the model could be degrading for two months before we have evidence.

  4. The ROI calculation has three assumptions that nobody validated. Intervention success rate (55%), average subscriber lifetime (24 months), and attribution (all retained subscribers are attributed to the model). Each of these could be off by a factor of two. A proper A/B test of the intervention program would resolve the attribution question, but it has not been run.

What I Would Do Differently

  1. Run the A/B test first. Before deploying the model, run a randomized controlled trial: half of high-risk subscribers get the intervention, half do not. This gives a causal estimate of the intervention's effect, not a correlational one. Without this, the ROI analysis is an estimate, not a measurement.

  2. Build a feature store. The feature engineering code is duplicated between the training notebook and the API. A feature store (even a simple one backed by a database table) would compute features once, serve them to both training and inference, and guarantee consistency.

  3. Add a calibration step. The model's predicted probabilities are not perfectly calibrated. A subscriber with a predicted probability of 0.30 does not have exactly a 30% chance of churning. Platt scaling or isotonic regression (Chapter 16) would improve calibration, which matters because downstream decisions use the probability threshold directly.

  4. Invest in the monitoring dashboard earlier. We built monitoring last, but the team needed it from day one. The first month of production data was wasted because nobody was measuring drift or performance in real-time.


Your Turn: Building a Capstone with a Different Dataset

The StreamFlow system is one instantiation of the ML lifecycle. The skills transfer. Here are prompts for building your own capstone with a different dataset:

Option 1: Hospital Readmission Prediction

Business question: Which patients discharged from the hospital are most likely to be readmitted within 30 days, so that care coordinators can schedule follow-up appointments before discharge?

Dataset: MIMIC-III (freely available with credentialing) or CMS Hospital Readmissions data.

Key differences from StreamFlow: Healthcare domain requires stricter fairness constraints (equity across racial groups), regulatory considerations (HIPAA), and clinical validation (the model must be reviewed by physicians before deployment).

Option 2: Manufacturing Predictive Maintenance

Business question: Which turbines are most likely to experience bearing failure within the next 14 days, so that maintenance crews can schedule preventive repairs during planned downtime?

Dataset: NASA Turbofan Engine Degradation Simulation (C-MAPSS) or a synthetic dataset.

Key differences from StreamFlow: Time-series features (sensor readings over time), survival analysis framing (time to failure, not binary classification), extreme class imbalance (failures are rare), and the cost asymmetry is inverted (false negatives are catastrophically expensive).

Option 3: E-Commerce Conversion Optimization

Business question: Which website visitors are most likely to complete a purchase, so that the product team can optimize the checkout experience for different user segments?

Dataset: Google Analytics sample dataset or a Kaggle e-commerce dataset.

Key differences from StreamFlow: Session-level prediction (not user-level), real-time scoring requirements (during the session, not batch), A/B testing is the primary evaluation method, and the model feeds into a recommendation engine rather than a human team.

For Any Dataset: The Capstone Checklist

Regardless of which dataset you choose, your capstone should include:

  • [ ] A precise business question with a defined prediction target, consumer, and success metric
  • [ ] A documented data extraction process (SQL or equivalent)
  • [ ] A reproducible feature engineering pipeline
  • [ ] Model training with experiment tracking (MLflow or equivalent)
  • [ ] Model evaluation with business-relevant metrics (not just AUC)
  • [ ] SHAP or equivalent interpretation for per-prediction explanations
  • [ ] A fairness audit across at least one protected attribute
  • [ ] A deployed API endpoint (FastAPI or equivalent)
  • [ ] A monitoring plan for drift and performance
  • [ ] An ROI analysis with sensitivity analysis
  • [ ] A one-page stakeholder summary
  • [ ] A written retrospective: what worked, what didn't, what you'd do differently

The Ideal Process vs. the Messy Reality

Textbooks present the ML lifecycle as a clean, sequential process. Define the problem. Collect the data. Engineer features. Train the model. Evaluate. Deploy. Monitor.

The reality is a directed graph with cycles, dead ends, and surprises.

You will discover during feature engineering that the data does not contain the signal you expected. You will discover during evaluation that the model performs well on average but terribly for a subgroup. You will discover during deployment that the preprocessing step that works in pandas does not work when called from a REST API at 200 requests per second. You will discover during monitoring that the model degraded three weeks ago and nobody noticed because the alerting threshold was set too high.

Each of these discoveries sends you backward in the lifecycle. That is not a failure. That is the process. The value of building the complete system --- even once, even with a toy dataset --- is that you learn where the connections are fragile. You learn which assumptions are load-bearing. You learn where the debugging is hard.

Domain Knowledge --- The difference between a junior data scientist and a senior one is not the algorithms they know. It is the number of times they have been surprised by production. Every surprise teaches you where to look next time. This capstone project gives you the first set of surprises in a safe environment.


Chapter Summary

This chapter integrated every component of the ML lifecycle into a single system: the StreamFlow churn prediction platform. You saw how each component connects to the others, how architectural decisions propagate through the system, and how the system forms a loop rather than a pipeline.

The three capstone tracks (Minimal, Full, Extended) provide different levels of depth. Track A builds the core system. Track B adds production infrastructure. Track C adds the experimental design and ethical analysis that distinguish a portfolio project from a homework assignment.

The retrospective is the most important artifact. Not because it reveals technical mistakes --- though it does --- but because it demonstrates the self-awareness that employers value more than any algorithm.

Build the system. Deploy it. Monitor it. Break it. Fix it. Write down what you learned.

That is the job.


Next: Chapter 36: The Road to Advanced Data Science --- where to go from here.