14 min read

> War Story --- A data scientist at a mid-size SaaS company deployed a churn prediction model in January 2020. It had an AUC of 0.87 on the holdout set. By April 2020, the model's precision had dropped to coin-flip territory. Nobody noticed until...

Chapter 32: Monitoring Models in Production

Drift Detection, Performance Decay, and Knowing When to Retrain


Learning Objectives

By the end of this chapter, you will be able to:

  1. Detect data drift using statistical tests (PSI, KS test, chi-squared)
  2. Monitor model performance with real-time metrics dashboards
  3. Identify concept drift vs. data drift
  4. Set up alerting rules and retraining triggers
  5. Design a retraining pipeline (manual, scheduled, triggered)

Every Model Starts Dying in Production

War Story --- A data scientist at a mid-size SaaS company deployed a churn prediction model in January 2020. It had an AUC of 0.87 on the holdout set. By April 2020, the model's precision had dropped to coin-flip territory. Nobody noticed until May, when the VP of Customer Success asked why the "high-risk churn" list was full of subscribers who had just upgraded to annual plans. The problem was obvious in hindsight: a pandemic had reshaped user behavior overnight. Session frequency doubled. Content consumption patterns inverted. The features the model relied on --- sessions_last_30d, avg_session_minutes, weekend_ratio --- now had completely different distributions than the training data. The model had not broken. The world had changed, and the model was still describing January.

Every model you deploy starts dying the moment it hits production.

This is not a defect. It is a law. The world that generated your training data is not the world your model will encounter next month, next quarter, or next year. Customer behavior shifts. Sensor calibrations drift. Competitor actions change the market. Regulatory changes alter what data you can collect. Seasonal patterns introduce cyclical variation that your training window may not have captured.

The question is never whether your model will degrade. It is when, how fast, and whether you will notice before the business does.

This chapter gives you the tools to notice. We cover four topics:

  1. Data drift --- detecting when input distributions change
  2. Concept drift --- detecting when the relationship between inputs and outputs changes
  3. Performance monitoring --- tracking model metrics on live data
  4. Retraining strategies --- knowing when and how to rebuild

The running examples are StreamFlow (SaaS churn prediction, where user behavior shifts are the primary drift source) and TurbineTech (manufacturing predictive maintenance, where sensor calibration and seasonal temperature effects cause drift). Both are real patterns. Both will break your model if you are not watching.


A Taxonomy of Drift

Before we build monitoring systems, we need precise vocabulary. "The model is broken" is not a diagnosis. The following taxonomy tells you what changed and where to look.

Data Drift (Covariate Shift)

Definition: The distribution of the input features changes, but the relationship between inputs and outputs remains the same.

Formally: $P_{\text{prod}}(X) \neq P_{\text{train}}(X)$, but $P(Y | X)$ is unchanged.

Example: StreamFlow's churn model was trained on data where sessions_last_30d had a mean of 14 and a standard deviation of 6. After a product redesign that added a "Continue Watching" feature on the home screen, the mean jumped to 22. Users were not fundamentally more or less likely to churn given a certain engagement level --- the engagement level itself had shifted upward. The model's decision boundary is now calibrated for a population that no longer exists.

Why it matters: Even if the underlying relationship is intact, a model trained on the old distribution will produce biased predictions on the new distribution. Features near the old decision boundary are now well into safe territory, and the model underestimates churn for users who are actually at risk.

Concept Drift

Definition: The relationship between inputs and outputs changes, even if the input distribution stays the same.

Formally: $P_{\text{prod}}(Y | X) \neq P_{\text{train}}(Y | X)$.

Example: TurbineTech's vibration-based failure prediction model learned that vibration amplitude above 4.2 mm/s was predictive of bearing failure within 30 days. After the maintenance team switched to a new bearing supplier with tighter tolerances, the failure threshold dropped to 3.1 mm/s. The vibration data looked the same --- same distribution, same means --- but the meaning of "high vibration" had changed. The model missed 40% of failures for three months before anyone noticed.

Why it matters: Concept drift is harder to detect than data drift because the inputs look normal. Performance monitoring (tracking accuracy, precision, recall on labeled production data) is the primary detection method, and it requires ground truth labels, which may arrive with a delay.

Prior Probability Shift

Definition: The class balance changes in production.

Formally: $P_{\text{prod}}(Y) \neq P_{\text{train}}(Y)$.

Example: StreamFlow trained its churn model on data with a 12% churn rate. After a price increase, the churn rate jumped to 19%. The model's calibrated probabilities are no longer meaningful --- a predicted probability of 0.15 used to mean "slightly above base rate" and now means "slightly below base rate."

Why it matters: Even a model with good discrimination (high AUC) will produce poorly calibrated probabilities if the prior shifts. Downstream systems that use probability thresholds (e.g., "contact customers with churn probability > 0.20") will silently under-respond or over-respond.

Summary Table

Type What Changes Detection Method Requires Labels?
Data drift $P(X)$ PSI, KS test, chi-squared No
Concept drift $P(Y \mid X)$ Performance monitoring Yes
Prior probability shift $P(Y)$ Label distribution monitoring Yes

Detecting Data Drift

Data drift detection compares the distribution of each feature in production data against the distribution observed during training. You do not need labels. You need the training data summary statistics and the incoming production data.

Population Stability Index (PSI)

PSI is the industry-standard metric for drift detection. It was originally developed in credit scoring to monitor scorecard stability and has been adopted across ML disciplines.

How it works:

  1. Divide the training distribution of a feature into $n$ bins (typically 10 deciles).
  2. Compute the proportion of observations in each bin for both the training (reference) distribution and the production (test) distribution.
  3. For each bin, compute: $(p_{\text{prod},i} - p_{\text{train},i}) \times \ln\!\left(\frac{p_{\text{prod},i}}{p_{\text{train},i}}\right)$
  4. Sum across all bins.

Interpretation:

PSI Value Interpretation Action
< 0.10 No significant drift Continue monitoring
0.10 -- 0.25 Moderate drift Investigate; may need retraining
> 0.25 Significant drift Retrain the model

These thresholds are conventions, not physical constants. They are a reasonable starting point, but you should calibrate them against your model's actual sensitivity to distributional shifts.

Implementation:

import numpy as np
import pandas as pd
from scipy import stats


def compute_psi(
    reference: np.ndarray,
    production: np.ndarray,
    n_bins: int = 10,
    eps: float = 1e-4,
) -> float:
    """
    Compute Population Stability Index between reference and production data.

    Parameters
    ----------
    reference : array-like
        Feature values from training data.
    production : array-like
        Feature values from production data.
    n_bins : int
        Number of bins (quantile-based from reference distribution).
    eps : float
        Small constant to avoid division by zero.

    Returns
    -------
    float
        PSI value. < 0.10 stable, 0.10-0.25 investigate, > 0.25 retrain.
    """
    # Create bins from reference distribution (quantile-based)
    bin_edges = np.quantile(reference, np.linspace(0, 1, n_bins + 1))
    bin_edges[0] = -np.inf
    bin_edges[-1] = np.inf

    # Compute proportions in each bin
    ref_counts = np.histogram(reference, bins=bin_edges)[0]
    prod_counts = np.histogram(production, bins=bin_edges)[0]

    ref_proportions = ref_counts / len(reference) + eps
    prod_proportions = prod_counts / len(production) + eps

    # PSI formula
    psi = np.sum(
        (prod_proportions - ref_proportions)
        * np.log(prod_proportions / ref_proportions)
    )
    return psi

Let us test this on StreamFlow data where we simulate a behavioral shift:

np.random.seed(42)

# Training distribution: sessions_last_30d ~ Poisson(14)
reference_sessions = np.random.poisson(14, size=50000)

# Production distribution after product change: Poisson(22)
production_sessions = np.random.poisson(22, size=10000)

psi_sessions = compute_psi(reference_sessions, production_sessions)
print(f"PSI (sessions_last_30d): {psi_sessions:.4f}")
# PSI (sessions_last_30d): ~1.05  (significant drift -- retrain)

A PSI of 1.05 is well above the 0.25 threshold. This is not subtle drift --- this is a feature whose distribution has fundamentally changed.

Now compare with a stable feature:

# Training and production: months_active ~ Uniform(1, 60)
reference_tenure = np.random.randint(1, 60, size=50000)
production_tenure = np.random.randint(1, 60, size=10000)

psi_tenure = compute_psi(reference_tenure, production_tenure)
print(f"PSI (months_active): {psi_tenure:.4f}")
# PSI (months_active): ~0.002  (stable -- no action needed)

Kolmogorov-Smirnov Test for Continuous Features

The KS test is a non-parametric test that compares two distributions by measuring the maximum distance between their cumulative distribution functions (CDFs). Unlike PSI, it produces a p-value, which gives you a statistical significance statement.

def ks_drift_test(
    reference: np.ndarray,
    production: np.ndarray,
    alpha: float = 0.05,
) -> dict:
    """
    Kolmogorov-Smirnov test for data drift on a continuous feature.

    Returns
    -------
    dict with keys: statistic, p_value, drift_detected
    """
    stat, p_value = stats.ks_2samp(reference, production)
    return {
        "statistic": stat,
        "p_value": p_value,
        "drift_detected": p_value < alpha,
    }


# Test on the shifted sessions feature
result = ks_drift_test(reference_sessions, production_sessions)
print(f"KS statistic: {result['statistic']:.4f}")
print(f"p-value: {result['p_value']:.2e}")
print(f"Drift detected: {result['drift_detected']}")
# KS statistic: ~0.62
# p-value: ~0.0
# Drift detected: True

Practical Warning --- The KS test is extremely sensitive with large sample sizes. With 50,000 reference samples and 10,000 production samples, even a trivially small distributional difference will produce a statistically significant p-value. This is a feature of all statistical tests at scale: with enough data, everything is "significant." PSI is often more practical because its thresholds are calibrated for business impact, not statistical significance. Use the KS test as a secondary check, not as your primary drift alarm.

Chi-Squared Test for Categorical Features

For categorical features (plan tier, device type, referral source), the chi-squared test compares observed category frequencies against expected frequencies from the training distribution.

def chi2_drift_test(
    reference: pd.Series,
    production: pd.Series,
    alpha: float = 0.05,
) -> dict:
    """
    Chi-squared test for data drift on a categorical feature.

    Returns
    -------
    dict with keys: statistic, p_value, drift_detected
    """
    # Get all categories from both distributions
    all_categories = sorted(
        set(reference.unique()) | set(production.unique())
    )

    # Compute observed counts in production
    observed = production.value_counts().reindex(all_categories, fill_value=0)

    # Compute expected proportions from reference
    ref_proportions = (
        reference.value_counts(normalize=True)
        .reindex(all_categories, fill_value=1e-6)
    )
    expected = ref_proportions * len(production)

    stat, p_value = stats.chisquare(observed, f_exp=expected)
    return {
        "statistic": stat,
        "p_value": p_value,
        "drift_detected": p_value < alpha,
    }


# StreamFlow plan distribution: training vs. production (after price change)
np.random.seed(42)
ref_plans = pd.Series(
    np.random.choice(
        ["basic", "standard", "premium", "family"],
        size=50000,
        p=[0.35, 0.35, 0.20, 0.10],
    )
)
prod_plans = pd.Series(
    np.random.choice(
        ["basic", "standard", "premium", "family"],
        size=10000,
        p=[0.45, 0.30, 0.15, 0.10],  # Shift: more basic after price hike
    )
)

chi2_result = chi2_drift_test(ref_plans, prod_plans)
print(f"Chi-squared statistic: {chi2_result['statistic']:.2f}")
print(f"p-value: {chi2_result['p_value']:.2e}")
print(f"Drift detected: {chi2_result['drift_detected']}")

Multi-Feature Drift Dashboard

In production, you monitor all features simultaneously. Here is a function that computes PSI for every feature and returns a summary:

def compute_drift_report(
    reference_df: pd.DataFrame,
    production_df: pd.DataFrame,
    categorical_cols: list[str] | None = None,
    n_bins: int = 10,
) -> pd.DataFrame:
    """
    Compute drift metrics for all features.

    Returns a DataFrame with columns: feature, test, statistic, drift_level.
    """
    if categorical_cols is None:
        categorical_cols = []

    results = []
    for col in reference_df.columns:
        if col in categorical_cols:
            res = chi2_drift_test(reference_df[col], production_df[col])
            results.append({
                "feature": col,
                "test": "chi-squared",
                "statistic": res["statistic"],
                "p_value": res["p_value"],
                "drift_detected": res["drift_detected"],
                "psi": None,
            })
        else:
            psi_val = compute_psi(
                reference_df[col].values, production_df[col].values, n_bins
            )
            ks_res = ks_drift_test(
                reference_df[col].values, production_df[col].values
            )
            # Classify drift level by PSI
            if psi_val < 0.10:
                level = "stable"
            elif psi_val < 0.25:
                level = "investigate"
            else:
                level = "retrain"

            results.append({
                "feature": col,
                "test": "PSI + KS",
                "statistic": ks_res["statistic"],
                "p_value": ks_res["p_value"],
                "drift_detected": ks_res["drift_detected"],
                "psi": psi_val,
                "drift_level": level,
            })

    report = pd.DataFrame(results)
    return report.sort_values("psi", ascending=False, na_position="last")

Detecting Concept Drift

Concept drift is harder than data drift because you need ground truth labels to detect it. If you are predicting churn with a 90-day window, you will not know whether today's predictions were correct for another 90 days. This label delay is the central challenge of concept drift detection.

Direct Detection: Track Performance on Labeled Data

The most reliable method is to compute model performance metrics on every batch of labeled data as it arrives.

from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score,
    f1_score, brier_score_loss,
)


def compute_performance_metrics(
    y_true: np.ndarray,
    y_pred_proba: np.ndarray,
    threshold: float = 0.5,
) -> dict:
    """Compute a suite of classification performance metrics."""
    y_pred = (y_pred_proba >= threshold).astype(int)
    return {
        "auc": roc_auc_score(y_true, y_pred_proba),
        "precision": precision_score(y_true, y_pred, zero_division=0),
        "recall": recall_score(y_true, y_pred, zero_division=0),
        "f1": f1_score(y_true, y_pred, zero_division=0),
        "brier_score": brier_score_loss(y_true, y_pred_proba),
        "prediction_rate": y_pred.mean(),
        "actual_rate": y_true.mean(),
    }

Key Insight --- Track the prediction rate (mean predicted probability) alongside the actual rate (observed label mean). When these diverge, something is wrong. If the prediction rate stays at 0.12 while the actual churn rate climbs to 0.19, your model is underestimating risk across the board. This is the fastest signal you can get for concept drift.

Indirect Detection: Prediction Distribution Monitoring

When labels are delayed, you can monitor the distribution of the model's predictions as a proxy. If the model was well-calibrated at deployment time, a sudden shift in the prediction distribution --- without a known change in input distributions --- may indicate concept drift.

def monitor_prediction_distribution(
    reference_predictions: np.ndarray,
    production_predictions: np.ndarray,
) -> dict:
    """
    Compare prediction distributions between reference and production.
    Uses PSI and basic summary statistics.
    """
    psi = compute_psi(reference_predictions, production_predictions)
    return {
        "prediction_psi": psi,
        "ref_mean": reference_predictions.mean(),
        "prod_mean": production_predictions.mean(),
        "ref_std": reference_predictions.std(),
        "prod_std": production_predictions.std(),
        "ref_median": np.median(reference_predictions),
        "prod_median": np.median(production_predictions),
        "mean_shift": production_predictions.mean() - reference_predictions.mean(),
    }

The Label Delay Problem

In many domains, you cannot get labels quickly:

Domain Prediction Label Delay
SaaS churn Will this customer churn in 90 days? 90 days
Loan default Will this loan default? 6--24 months
Manufacturing failure Will this turbine fail in 30 days? 30 days
Ad click-through Will the user click? Seconds to minutes
Fraud detection Is this transaction fraudulent? Days to weeks (investigation)

When label delay is long, you depend more heavily on data drift detection (no labels required) and prediction distribution monitoring (no labels required). When labels arrive quickly (ad clicks, real-time fraud labels), you can monitor concept drift directly.


Building a Monitoring Pipeline

A production monitoring system has four components: data collection, metric computation, alerting, and visualization. Here is a complete implementation.

The Monitor Class

import json
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass, field


@dataclass
class AlertRule:
    """A single alerting rule for a monitored metric."""
    metric_name: str
    threshold: float
    direction: str  # "above" or "below"
    severity: str   # "warning" or "critical"
    message: str = ""


@dataclass
class MonitoringResult:
    """Result from a single monitoring check."""
    timestamp: str
    metrics: dict
    drift_report: pd.DataFrame | None = None
    alerts_triggered: list[dict] = field(default_factory=list)


class ModelMonitor:
    """
    Production model monitoring system.

    Tracks data drift, performance metrics, and prediction distributions.
    Triggers alerts when thresholds are breached.
    """

    def __init__(
        self,
        reference_data: pd.DataFrame,
        reference_predictions: np.ndarray,
        feature_columns: list[str],
        categorical_columns: list[str] | None = None,
        alert_rules: list[AlertRule] | None = None,
    ):
        self.reference_data = reference_data
        self.reference_predictions = reference_predictions
        self.feature_columns = feature_columns
        self.categorical_columns = categorical_columns or []
        self.alert_rules = alert_rules or self._default_alert_rules()
        self.history: list[MonitoringResult] = []

    def _default_alert_rules(self) -> list[AlertRule]:
        """Sensible defaults for monitoring alerts."""
        return [
            AlertRule(
                metric_name="max_feature_psi",
                threshold=0.25,
                direction="above",
                severity="critical",
                message="Feature PSI exceeds 0.25 --- significant data drift detected.",
            ),
            AlertRule(
                metric_name="max_feature_psi",
                threshold=0.10,
                direction="above",
                severity="warning",
                message="Feature PSI exceeds 0.10 --- moderate drift, investigate.",
            ),
            AlertRule(
                metric_name="prediction_psi",
                threshold=0.25,
                direction="above",
                severity="critical",
                message="Prediction distribution has shifted significantly.",
            ),
            AlertRule(
                metric_name="auc",
                threshold=0.70,
                direction="below",
                severity="critical",
                message="AUC has dropped below 0.70 --- model performance degraded.",
            ),
            AlertRule(
                metric_name="auc",
                threshold=0.80,
                direction="below",
                severity="warning",
                message="AUC below 0.80 --- performance declining.",
            ),
        ]

    def check_alerts(self, metrics: dict) -> list[dict]:
        """Evaluate all alert rules against current metrics."""
        triggered = []
        for rule in self.alert_rules:
            value = metrics.get(rule.metric_name)
            if value is None:
                continue
            if rule.direction == "above" and value > rule.threshold:
                triggered.append({
                    "rule": rule.metric_name,
                    "severity": rule.severity,
                    "value": value,
                    "threshold": rule.threshold,
                    "message": rule.message,
                })
            elif rule.direction == "below" and value < rule.threshold:
                triggered.append({
                    "rule": rule.metric_name,
                    "severity": rule.severity,
                    "value": value,
                    "threshold": rule.threshold,
                    "message": rule.message,
                })
        return triggered

    def run_check(
        self,
        production_data: pd.DataFrame,
        production_predictions: np.ndarray,
        y_true: np.ndarray | None = None,
    ) -> MonitoringResult:
        """
        Run a full monitoring check.

        Parameters
        ----------
        production_data : DataFrame
            Current batch of production feature data.
        production_predictions : array
            Model predictions for the current batch.
        y_true : array, optional
            Ground truth labels, if available.

        Returns
        -------
        MonitoringResult
        """
        timestamp = datetime.now().isoformat()

        # 1. Data drift
        drift_report = compute_drift_report(
            self.reference_data[self.feature_columns],
            production_data[self.feature_columns],
            categorical_cols=self.categorical_columns,
        )

        # 2. Prediction distribution
        pred_metrics = monitor_prediction_distribution(
            self.reference_predictions, production_predictions
        )

        # 3. Performance metrics (if labels available)
        perf_metrics = {}
        if y_true is not None:
            perf_metrics = compute_performance_metrics(
                y_true, production_predictions
            )

        # 4. Aggregate metrics
        psi_values = drift_report["psi"].dropna()
        metrics = {
            "max_feature_psi": psi_values.max() if len(psi_values) > 0 else 0.0,
            "mean_feature_psi": psi_values.mean() if len(psi_values) > 0 else 0.0,
            "features_drifted": int((psi_values > 0.25).sum()),
            "features_warning": int(
                ((psi_values > 0.10) & (psi_values <= 0.25)).sum()
            ),
            "prediction_psi": pred_metrics["prediction_psi"],
            "prediction_mean_shift": pred_metrics["mean_shift"],
            **perf_metrics,
        }

        # 5. Check alerts
        alerts = self.check_alerts(metrics)

        result = MonitoringResult(
            timestamp=timestamp,
            metrics=metrics,
            drift_report=drift_report,
            alerts_triggered=alerts,
        )
        self.history.append(result)
        return result

    def get_history_df(self) -> pd.DataFrame:
        """Return monitoring history as a DataFrame for dashboarding."""
        rows = []
        for result in self.history:
            row = {"timestamp": result.timestamp, **result.metrics}
            rows.append(row)
        return pd.DataFrame(rows)

Using the Monitor

np.random.seed(42)
n_ref = 50000
n_prod = 5000

# --- Reference data (training distribution) ---
reference = pd.DataFrame({
    "sessions_last_30d": np.random.poisson(14, n_ref),
    "avg_session_minutes": np.random.exponential(28, n_ref).round(1),
    "content_completion_rate": np.random.beta(3, 2, n_ref).round(3),
    "hours_change_pct": np.random.normal(0, 30, n_ref).round(1),
    "months_active": np.random.randint(1, 60, n_ref),
    "devices_used": np.random.randint(1, 6, n_ref),
    "support_tickets_90d": np.random.poisson(1.2, n_ref),
})
ref_predictions = np.random.beta(2, 15, n_ref)  # Calibrated predictions

# --- Production data: Week 1 (stable) ---
prod_week1 = pd.DataFrame({
    "sessions_last_30d": np.random.poisson(14, n_prod),
    "avg_session_minutes": np.random.exponential(28, n_prod).round(1),
    "content_completion_rate": np.random.beta(3, 2, n_prod).round(3),
    "hours_change_pct": np.random.normal(0, 30, n_prod).round(1),
    "months_active": np.random.randint(1, 60, n_prod),
    "devices_used": np.random.randint(1, 6, n_prod),
    "support_tickets_90d": np.random.poisson(1.2, n_prod),
})
pred_week1 = np.random.beta(2, 15, n_prod)

# --- Production data: Week 8 (post-product-change, drifted) ---
prod_week8 = pd.DataFrame({
    "sessions_last_30d": np.random.poisson(22, n_prod),      # Shifted
    "avg_session_minutes": np.random.exponential(35, n_prod).round(1),  # Shifted
    "content_completion_rate": np.random.beta(3, 2, n_prod).round(3),
    "hours_change_pct": np.random.normal(5, 35, n_prod).round(1),  # Slight shift
    "months_active": np.random.randint(1, 60, n_prod),
    "devices_used": np.random.randint(1, 6, n_prod),
    "support_tickets_90d": np.random.poisson(1.8, n_prod),   # Shifted
})
pred_week8 = np.random.beta(2.5, 12, n_prod)  # Predictions shifted too

# --- Initialize monitor ---
feature_cols = list(reference.columns)
monitor = ModelMonitor(
    reference_data=reference,
    reference_predictions=ref_predictions,
    feature_columns=feature_cols,
)

# --- Run checks ---
result_week1 = monitor.run_check(prod_week1, pred_week1)
print("=== Week 1 (Stable) ===")
print(f"Max PSI: {result_week1.metrics['max_feature_psi']:.4f}")
print(f"Features drifted: {result_week1.metrics['features_drifted']}")
print(f"Alerts: {len(result_week1.alerts_triggered)}")

result_week8 = monitor.run_check(prod_week8, pred_week8)
print("\n=== Week 8 (Drifted) ===")
print(f"Max PSI: {result_week8.metrics['max_feature_psi']:.4f}")
print(f"Features drifted: {result_week8.metrics['features_drifted']}")
print(f"Alerts: {len(result_week8.alerts_triggered)}")
for alert in result_week8.alerts_triggered:
    print(f"  [{alert['severity'].upper()}] {alert['message']}")

Performance Monitoring with Ground Truth

When labels arrive, performance monitoring becomes the gold standard. Here is a weekly performance tracking pattern:

import matplotlib.pyplot as plt


def plot_performance_history(history_df: pd.DataFrame) -> None:
    """Plot model performance metrics over time."""
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    fig.suptitle("Model Performance Monitoring Dashboard", fontsize=14)

    metrics_to_plot = [
        ("auc", "AUC-ROC", 0.70, "red"),
        ("f1", "F1 Score", 0.60, "orange"),
        ("prediction_mean_shift", "Prediction Mean Shift", None, None),
        ("max_feature_psi", "Max Feature PSI", 0.25, "red"),
    ]

    for ax, (metric, title, threshold, color) in zip(
        axes.flatten(), metrics_to_plot
    ):
        if metric in history_df.columns:
            ax.plot(
                range(len(history_df)),
                history_df[metric],
                marker="o",
                linewidth=2,
            )
            if threshold is not None:
                ax.axhline(
                    y=threshold, color=color, linestyle="--",
                    label=f"Threshold ({threshold})",
                )
                ax.legend()
            ax.set_title(title)
            ax.set_xlabel("Week")
            ax.set_ylabel(metric)
            ax.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig("monitoring_dashboard.png", dpi=150, bbox_inches="tight")
    plt.show()

Alerting Rules: From Metrics to Action

An alert that nobody reads is not an alert. A well-designed alerting system has three properties: it fires when something genuinely needs attention, it does not fire when everything is fine, and it tells the recipient what to do.

Alert Design Principles

  1. Severity levels matter. A warning says "look at this when you have time." A critical alert says "look at this now." If everything is critical, nothing is critical.

  2. Threshold hysteresis prevents flapping. A metric that oscillates around 0.25 will trigger and resolve repeatedly. Use separate arm and disarm thresholds (e.g., trigger at 0.25, resolve at 0.20) to prevent alert fatigue.

  3. Aggregate before alerting. If one feature out of twenty has PSI = 0.26, that is a warning. If eight features have PSI > 0.25, that is a critical alert. Alert on the count of drifted features, not on individual features.

  4. Include context in the alert. "PSI threshold exceeded" is useless. "Feature sessions_last_30d PSI = 0.87, up from 0.03 last week. Mean shifted from 14.1 to 22.3. Investigate product changes or data pipeline issues." is actionable.

A Complete Alert Configuration

production_alert_rules = [
    # --- Data Drift Alerts ---
    AlertRule(
        metric_name="features_drifted",
        threshold=3,
        direction="above",
        severity="critical",
        message="3+ features show significant drift (PSI > 0.25). Retrain.",
    ),
    AlertRule(
        metric_name="max_feature_psi",
        threshold=0.25,
        direction="above",
        severity="warning",
        message="At least one feature has PSI > 0.25. Investigate.",
    ),
    # --- Performance Alerts ---
    AlertRule(
        metric_name="auc",
        threshold=0.75,
        direction="below",
        severity="critical",
        message="AUC below 0.75. Model performance severely degraded.",
    ),
    AlertRule(
        metric_name="auc",
        threshold=0.82,
        direction="below",
        severity="warning",
        message="AUC below 0.82. Performance declining. Review recent drift.",
    ),
    # --- Prediction Distribution Alerts ---
    AlertRule(
        metric_name="prediction_psi",
        threshold=0.25,
        direction="above",
        severity="critical",
        message="Prediction distribution shifted. Possible concept drift.",
    ),
    # --- Calibration Alerts ---
    AlertRule(
        metric_name="prediction_mean_shift",
        threshold=0.05,
        direction="above",
        severity="warning",
        message="Mean predicted probability shifted by > 0.05.",
    ),
]

Retraining Strategies

You have detected drift. Performance is declining. Now what? There are three retraining strategies, and the right one depends on your team, your data, and your tolerance for stale models.

Strategy 1: Scheduled Retraining

What: Retrain on a fixed schedule --- daily, weekly, or monthly --- regardless of whether drift has been detected.

When to use: When label delay is short (hours to days), data volume is high, and the cost of retraining is low.

Pros: Simple to implement. No monitoring infrastructure required for triggering (though you should still monitor for safety). Guarantees a maximum staleness window.

Cons: Wastes compute if the model is still performing well. Does not respond to sudden drift events between retraining windows.

# Pseudocode: scheduled retraining (e.g., in Airflow or cron)
# Runs every Sunday at 02:00 UTC

def scheduled_retrain():
    """Weekly scheduled retraining pipeline."""
    # 1. Pull latest labeled data
    train_data = fetch_training_data(lookback_days=180)

    # 2. Run standard training pipeline
    model = train_model(train_data, random_state=42)

    # 3. Evaluate on holdout
    metrics = evaluate_model(model, holdout_data)

    # 4. Gate: only deploy if performance meets threshold
    if metrics["auc"] >= 0.80 and metrics["f1"] >= 0.60:
        deploy_model(model, version=f"v{datetime.now():%Y%m%d}")
        update_reference_data(train_data)
        log_retraining_event(metrics, trigger="scheduled")
    else:
        send_alert(
            "Scheduled retrain produced model below threshold. "
            f"AUC={metrics['auc']:.4f}, F1={metrics['f1']:.4f}. "
            "Manual review required."
        )

Strategy 2: Triggered Retraining

What: Retrain only when a monitoring signal crosses a threshold --- PSI > 0.25, AUC < 0.80, or a configurable combination.

When to use: When retraining is expensive, labels are delayed, and you want to minimize unnecessary retraining.

Pros: Responds to drift when it happens. Does not waste compute on unnecessary retraining. Can respond to sudden events (pandemic, product launch, competitor action).

Cons: Requires a functioning monitoring pipeline. False positives trigger unnecessary retraining. False negatives allow degraded performance.

def triggered_retrain(monitoring_result: MonitoringResult) -> bool:
    """
    Decide whether to trigger retraining based on monitoring signals.

    Returns True if retraining was triggered.
    """
    critical_alerts = [
        a for a in monitoring_result.alerts_triggered
        if a["severity"] == "critical"
    ]

    if not critical_alerts:
        return False

    print(f"Triggered retraining due to {len(critical_alerts)} critical alerts:")
    for alert in critical_alerts:
        print(f"  - {alert['message']}")

    # 1. Pull latest data
    train_data = fetch_training_data(lookback_days=180)

    # 2. Retrain
    model = train_model(train_data, random_state=42)

    # 3. Evaluate
    metrics = evaluate_model(model, holdout_data)

    # 4. A/B test or shadow deployment before full rollout
    if metrics["auc"] >= 0.80:
        deploy_as_shadow(model, version=f"triggered-{datetime.now():%Y%m%d}")
        log_retraining_event(
            metrics,
            trigger="monitoring_alert",
            alerts=critical_alerts,
        )
        return True
    else:
        send_alert(
            "Triggered retrain did not improve performance. "
            "Root cause investigation required."
        )
        return False

What: Scheduled retraining on a regular cadence plus triggered retraining when monitoring detects significant drift.

When to use: Most production systems.

                    +-----------+
                    | Scheduled |
                    | (weekly)  |
                    +-----+-----+
                          |
                          v
+------------+      +-----+------+      +----------+
| Monitor    | ---> | Retrain    | ---> | Evaluate |
| (daily)    |      | Pipeline   |      | & Gate   |
+-----+------+      +-----+------+      +-----+----+
      |                                        |
      | critical alert                         |
      +-----------> triggered retrain ---------+
                                               |
                                               v
                                        +------+------+
                                        | Shadow      |
                                        | Deployment  |
                                        +------+------+
                                               |
                                               v
                                        +------+------+
                                        | Promote or  |
                                        | Roll Back   |
                                        +-------------+

Safe Deployment of Retrained Models

Retraining is only half the problem. The other half is getting the new model into production without breaking anything.

Shadow Deployment

Run the new model alongside the existing model. Both receive the same production traffic. The existing model generates the actual predictions; the new model's predictions are logged but not used. Compare performance over a validation window (typically 1--2 weeks).

class ShadowDeployment:
    """Run two models side by side; serve only champion predictions."""

    def __init__(self, champion_model, challenger_model):
        self.champion = champion_model
        self.challenger = challenger_model
        self.comparison_log = []

    def predict(self, X: pd.DataFrame) -> np.ndarray:
        """Return champion predictions; log both for comparison."""
        champion_pred = self.champion.predict_proba(X)[:, 1]
        challenger_pred = self.challenger.predict_proba(X)[:, 1]

        self.comparison_log.append({
            "timestamp": datetime.now().isoformat(),
            "n_samples": len(X),
            "champion_mean": champion_pred.mean(),
            "challenger_mean": challenger_pred.mean(),
            "mean_abs_diff": np.abs(champion_pred - challenger_pred).mean(),
        })

        return champion_pred  # Only champion serves traffic

    def evaluate_challenger(
        self, y_true: np.ndarray, X: pd.DataFrame
    ) -> dict:
        """Compare models on labeled data."""
        champ_pred = self.champion.predict_proba(X)[:, 1]
        chall_pred = self.challenger.predict_proba(X)[:, 1]
        return {
            "champion_auc": roc_auc_score(y_true, champ_pred),
            "challenger_auc": roc_auc_score(y_true, chall_pred),
            "auc_improvement": (
                roc_auc_score(y_true, chall_pred)
                - roc_auc_score(y_true, champ_pred)
            ),
        }

Canary Release

Route a small percentage of traffic (e.g., 5%) to the new model. Monitor error rates and performance metrics on the canary cohort. If the canary performs as well as or better than the existing model, gradually increase the traffic percentage.

A/B Testing in Production

Route traffic randomly between the old and new models. Measure a business metric (not just ML metric) --- conversion rate, retention rate, revenue per user. This is the gold standard for evaluating whether a retrained model actually improves business outcomes, not just offline metrics.

Critical Distinction --- A model with higher AUC does not necessarily produce better business outcomes. A retrained churn model with AUC = 0.89 (up from 0.85) might identify the same high-risk customers but also flag too many false positives, overwhelming the customer success team. Always evaluate on the metric that matters to the business, not the metric that matters to the data scientist.


Putting It All Together: A Production Monitoring Checklist

Use this as a reference when setting up monitoring for a new model.

Before deployment:

  • [ ] Save reference data distributions (training data summary statistics)
  • [ ] Save reference prediction distribution (predictions on the validation set)
  • [ ] Define PSI thresholds per feature (default: 0.10 warning, 0.25 critical)
  • [ ] Define performance thresholds (minimum acceptable AUC, F1, precision, recall)
  • [ ] Set up the monitoring pipeline (daily or weekly batch job)
  • [ ] Configure alert routing (who gets the alert and how)

Weekly monitoring:

  • [ ] Compute PSI for all features; flag any above 0.10
  • [ ] Compute prediction distribution PSI
  • [ ] Compute performance metrics on any newly labeled data
  • [ ] Review the monitoring dashboard
  • [ ] Log results to the experiment tracker (MLflow)

When drift is detected:

  • [ ] Identify which features drifted and why (product change? data pipeline issue? seasonal effect?)
  • [ ] Determine if the drift is temporary or permanent
  • [ ] If permanent: retrain with data that includes the new distribution
  • [ ] If temporary (seasonal): consider retraining with a wider training window
  • [ ] If pipeline issue: fix the pipeline, not the model

When retraining:

  • [ ] Retrain on the most recent labeled data
  • [ ] Evaluate on a holdout set; apply the deployment gate
  • [ ] Deploy as shadow or canary first
  • [ ] Monitor the retrained model for at least one week
  • [ ] Promote to production only after validation passes
  • [ ] Update reference distributions to reflect the new training data

Chapter Summary

Model monitoring is not optional post-deployment polish. It is the system that tells you whether your model is still doing its job. Without it, you are flying blind --- you will not know your model has degraded until a stakeholder notices, and by then the damage is done.

The core concepts:

  • Data drift changes input distributions. Detect it with PSI and KS tests. No labels required.
  • Concept drift changes the input-output relationship. Detect it with performance monitoring. Labels required.
  • Prior probability shift changes the class balance. Detect it by tracking the actual label rate.
  • PSI thresholds --- < 0.10 stable, 0.10--0.25 investigate, > 0.25 retrain --- are a practical starting point.
  • Retraining strategies range from scheduled (simple, predictable) to triggered (responsive, requires monitoring infrastructure) to hybrid (recommended for most systems).
  • Safe deployment --- shadow deployments, canary releases, and A/B tests --- ensures that a retrained model improves outcomes before it replaces the existing one.

The goal is not to prevent drift. Drift is inevitable. The goal is to detect it fast enough that you can respond before the business notices.


Next chapter: Chapter 33 --- Fairness and Responsible ML