Case Study 2: Detecting and Responding to Model Drift
Scenario
Meridian Analytics operates a prediction market trading bot that has been profitable for six months. The bot uses a gradient-boosted model trained on political prediction markets, predicting the probability of "Yes" outcomes for binary markets. The model was trained on data from January through June 2024 and achieved a Brier score of 0.18 on held-out validation data.
In late September, the head of trading, Raj, notices that profitability has been declining for the past three weeks. He suspects the model is degrading but has no diagnostic tools to confirm this. This case study follows the team as they implement drift detection, diagnose the problem, retrain the model, and set up automated monitoring to prevent future occurrences.
Phase 1: Establishing a Baseline
Before detecting drift, the team needs a reference baseline --- the "normal" behavior of the model.
import numpy as np
import pandas as pd
from scipy import stats
from datetime import datetime, timedelta
from sklearn.metrics import brier_score_loss, roc_auc_score
from typing import List, Dict, Tuple
import json
class BaselineEstablisher:
"""Establish reference distributions for drift detection."""
def __init__(self, model, X_reference, y_reference,
feature_names: List[str]):
self.model = model
self.feature_names = feature_names
# Compute reference predictions
self.ref_predictions = model.predict_proba(X_reference)[:, 1]
self.ref_features = X_reference.values if hasattr(
X_reference, 'values') else X_reference
# Reference performance
self.ref_brier = brier_score_loss(y_reference,
self.ref_predictions)
self.ref_auc = roc_auc_score(y_reference,
self.ref_predictions)
# Reference distribution statistics
self.ref_stats = self._compute_stats(
self.ref_predictions, self.ref_features
)
def _compute_stats(self, predictions, features) -> Dict:
"""Compute distribution statistics for reference."""
stats_dict = {
'prediction_mean': float(np.mean(predictions)),
'prediction_std': float(np.std(predictions)),
'prediction_median': float(np.median(predictions)),
'prediction_q10': float(np.percentile(predictions, 10)),
'prediction_q90': float(np.percentile(predictions, 90)),
}
for i, fname in enumerate(self.feature_names):
col = features[:, i]
stats_dict[f'{fname}_mean'] = float(np.nanmean(col))
stats_dict[f'{fname}_std'] = float(np.nanstd(col))
stats_dict[f'{fname}_null_rate'] = float(
np.isnan(col).mean()
)
return stats_dict
def save(self, path: str):
"""Save baseline to disk."""
baseline = {
'ref_brier': self.ref_brier,
'ref_auc': self.ref_auc,
'ref_stats': self.ref_stats,
'ref_predictions': self.ref_predictions.tolist(),
'ref_feature_means': [
float(np.nanmean(self.ref_features[:, i]))
for i in range(self.ref_features.shape[1])
],
'ref_feature_stds': [
float(np.nanstd(self.ref_features[:, i]))
for i in range(self.ref_features.shape[1])
],
'feature_names': self.feature_names,
'timestamp': datetime.now().isoformat(),
}
with open(path, 'w') as f:
json.dump(baseline, f, indent=2)
# Establish baseline from validation period (June 2024)
# baseline = BaselineEstablisher(model, X_val, y_val, FEATURE_NAMES)
# baseline.save("baselines/baseline_2024_06.json")
Phase 2: Detecting the Drift
Step 1: Prediction Distribution Analysis
The team collects the model's recent predictions and compares them to the reference distribution.
class DriftDiagnostics:
"""Comprehensive drift detection and diagnostics."""
def __init__(self, ref_predictions: np.ndarray,
ref_features: np.ndarray,
feature_names: List[str],
ref_brier: float):
self.ref_predictions = ref_predictions
self.ref_features = ref_features
self.feature_names = feature_names
self.ref_brier = ref_brier
@staticmethod
def compute_psi(reference: np.ndarray, current: np.ndarray,
n_bins: int = 10) -> float:
"""Population Stability Index."""
eps = 1e-8
bins = np.linspace(
min(reference.min(), current.min()) - eps,
max(reference.max(), current.max()) + eps,
n_bins + 1
)
ref_hist = np.histogram(reference, bins=bins)[0] / len(reference)
cur_hist = np.histogram(current, bins=bins)[0] / len(current)
ref_hist = np.clip(ref_hist, eps, None)
cur_hist = np.clip(cur_hist, eps, None)
psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))
return float(psi)
def diagnose_prediction_drift(
self, current_predictions: np.ndarray
) -> Dict:
"""Full prediction distribution analysis."""
psi = self.compute_psi(
self.ref_predictions, current_predictions
)
ks_stat, ks_p = stats.ks_2samp(
self.ref_predictions, current_predictions
)
wasserstein = stats.wasserstein_distance(
self.ref_predictions, current_predictions
)
ref_mean = np.mean(self.ref_predictions)
cur_mean = np.mean(current_predictions)
mean_shift = cur_mean - ref_mean
ref_std = np.std(self.ref_predictions)
cur_std = np.std(current_predictions)
std_ratio = cur_std / (ref_std + 1e-8)
report = {
'prediction_drift': {
'psi': psi,
'psi_severity': (
'high' if psi >= 0.25
else 'medium' if psi >= 0.1
else 'low'
),
'ks_statistic': ks_stat,
'ks_p_value': ks_p,
'ks_significant': ks_p < 0.05,
'wasserstein_distance': wasserstein,
'mean_shift': mean_shift,
'std_ratio': std_ratio,
},
'reference': {
'mean': ref_mean,
'std': ref_std,
'n': len(self.ref_predictions),
},
'current': {
'mean': cur_mean,
'std': cur_std,
'n': len(current_predictions),
}
}
return report
def diagnose_feature_drift(
self, current_features: np.ndarray
) -> Dict:
"""Per-feature drift analysis."""
feature_reports = {}
for i, fname in enumerate(self.feature_names):
ref_col = self.ref_features[:, i]
cur_col = current_features[:, i]
# Remove NaNs for statistical tests
ref_clean = ref_col[~np.isnan(ref_col)]
cur_clean = cur_col[~np.isnan(cur_col)]
if len(ref_clean) == 0 or len(cur_clean) == 0:
feature_reports[fname] = {
'status': 'insufficient_data'
}
continue
psi = self.compute_psi(ref_clean, cur_clean)
ks_stat, ks_p = stats.ks_2samp(ref_clean, cur_clean)
wasserstein = stats.wasserstein_distance(
ref_clean, cur_clean
)
ref_null_rate = float(np.isnan(ref_col).mean())
cur_null_rate = float(np.isnan(cur_col).mean())
feature_reports[fname] = {
'psi': psi,
'psi_severity': (
'high' if psi >= 0.25
else 'medium' if psi >= 0.1
else 'low'
),
'ks_statistic': ks_stat,
'ks_p_value': ks_p,
'ks_significant': ks_p < 0.05,
'wasserstein': wasserstein,
'mean_shift': float(
np.mean(cur_clean) - np.mean(ref_clean)
),
'ref_null_rate': ref_null_rate,
'cur_null_rate': cur_null_rate,
'null_rate_change': cur_null_rate - ref_null_rate,
}
return feature_reports
def diagnose_concept_drift(
self, predictions: np.ndarray, actuals: np.ndarray,
window_size: int = 50
) -> Dict:
"""Analyze concept drift through rolling performance."""
current_brier = brier_score_loss(actuals, predictions)
degradation_ratio = current_brier / (self.ref_brier + 1e-8)
# Rolling Brier score
rolling_briers = []
for i in range(window_size, len(predictions)):
window_preds = predictions[i - window_size:i]
window_actuals = actuals[i - window_size:i]
rolling_brier = brier_score_loss(
window_actuals, window_preds
)
rolling_briers.append(rolling_brier)
# Detect when performance started degrading
degradation_start = None
threshold = self.ref_brier * 1.3 # 30% above reference
for i, rb in enumerate(rolling_briers):
if rb > threshold:
degradation_start = i + window_size
break
return {
'current_brier': current_brier,
'ref_brier': self.ref_brier,
'degradation_ratio': degradation_ratio,
'is_degraded': degradation_ratio > 1.3,
'rolling_briers': rolling_briers,
'degradation_start_index': degradation_start,
'n_predictions': len(predictions),
}
def full_diagnosis(self, current_predictions: np.ndarray,
current_features: np.ndarray,
actuals: np.ndarray = None) -> Dict:
"""Complete drift diagnosis."""
report = {
'timestamp': datetime.now().isoformat(),
'prediction_drift': self.diagnose_prediction_drift(
current_predictions
),
'feature_drift': self.diagnose_feature_drift(
current_features
),
}
if actuals is not None:
report['concept_drift'] = self.diagnose_concept_drift(
current_predictions, actuals
)
# Overall severity assessment
severities = []
# Check prediction drift
pred_sev = report['prediction_drift']['prediction_drift'][
'psi_severity'
]
severities.append(pred_sev)
# Check feature drift
for fname, freport in report['feature_drift'].items():
if 'psi_severity' in freport:
severities.append(freport['psi_severity'])
# Check concept drift
if 'concept_drift' in report:
if report['concept_drift']['is_degraded']:
severities.append('high')
if 'high' in severities:
report['overall_severity'] = 'high'
report['recommendation'] = 'Immediate retraining required'
elif 'medium' in severities:
report['overall_severity'] = 'medium'
report['recommendation'] = (
'Investigate and consider retraining'
)
else:
report['overall_severity'] = 'low'
report['recommendation'] = 'No action needed'
return report
Step 2: Running the Diagnosis
# Simulate the scenario
np.random.seed(42)
# Reference data (training period: Jan-Jun 2024)
n_ref = 500
ref_features = np.column_stack([
np.random.beta(5, 5, n_ref), # poll_avg_7d
np.random.beta(5, 5, n_ref), # market_price
np.random.exponential(500000, n_ref), # volume_24h
np.random.normal(0, 0.05, n_ref), # poll_market_spread
np.random.randint(10, 200, n_ref).astype(float), # days_to_resolution
np.random.exponential(0.02, n_ref), # price_volatility_7d
])
ref_predictions = np.random.beta(3, 3, n_ref)
ref_actuals = (ref_predictions + np.random.normal(0, 0.15, n_ref)
> 0.5).astype(int)
ref_brier = brier_score_loss(ref_actuals, ref_predictions)
# Current data (Sep-Oct 2024) - with drift
n_cur = 200
# Simulate drift: markets are now much closer to resolution,
# volume has spiked, and spreads have widened
cur_features = np.column_stack([
np.random.beta(3, 7, n_cur), # polls shifted
np.random.beta(6, 4, n_cur), # market prices shifted
np.random.exponential(1500000, n_cur), # higher volume
np.random.normal(0.08, 0.10, n_cur), # wider spreads
np.random.randint(1, 45, n_cur).astype(float), # closer to resolution
np.random.exponential(0.05, n_cur), # higher volatility
])
# Concept drift: the relationship has changed
cur_predictions = np.random.beta(4, 2, n_cur) # predictions skewed
cur_actuals = (np.random.beta(2, 4, n_cur) > 0.4).astype(int)
# Run diagnosis
feature_names = [
'poll_avg_7d', 'market_price', 'volume_24h',
'poll_market_spread', 'days_to_resolution',
'price_volatility_7d'
]
diagnostics = DriftDiagnostics(
ref_predictions, ref_features, feature_names, ref_brier
)
report = diagnostics.full_diagnosis(
cur_predictions, cur_features, cur_actuals
)
# Print the report
print("=" * 70)
print("DRIFT DIAGNOSIS REPORT")
print("=" * 70)
print(f"\nOverall Severity: {report['overall_severity'].upper()}")
print(f"Recommendation: {report['recommendation']}")
print(f"\n--- Prediction Drift ---")
pd_report = report['prediction_drift']['prediction_drift']
print(f" PSI: {pd_report['psi']:.4f} ({pd_report['psi_severity']})")
print(f" KS Statistic: {pd_report['ks_statistic']:.4f} "
f"(p={pd_report['ks_p_value']:.6f})")
print(f" Mean Shift: {pd_report['mean_shift']:.4f}")
print(f"\n--- Feature Drift ---")
for fname, freport in report['feature_drift'].items():
if 'psi_severity' in freport:
marker = "***" if freport['psi_severity'] == 'high' else " "
print(f" {marker} {fname}: PSI={freport['psi']:.4f} "
f"({freport['psi_severity']}), "
f"KS p={freport['ks_p_value']:.6f}")
if 'concept_drift' in report:
cd = report['concept_drift']
print(f"\n--- Concept Drift ---")
print(f" Current Brier: {cd['current_brier']:.4f}")
print(f" Reference Brier: {cd['ref_brier']:.4f}")
print(f" Degradation Ratio: {cd['degradation_ratio']:.2f}x")
print(f" Degraded: {cd['is_degraded']}")
Diagnosis Findings
The diagnosis reveals three types of drift:
-
Feature drift:
days_to_resolutionhas shifted dramatically (markets are now much closer to resolution),volume_24hhas tripled, andpoll_market_spreadhas widened significantly. -
Prediction drift: The model's prediction distribution has shifted (PSI > 0.25), reflecting the changed inputs.
-
Concept drift: The Brier score has degraded from 0.18 to 0.28, a 55% increase. The relationship between features and outcomes has changed as the election approaches.
Phase 3: Responding to Drift
Retraining with Recent Data
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.model_selection import TimeSeriesSplit
class DriftResponsePipeline:
"""Pipeline for responding to detected drift."""
def __init__(self, feature_names: List[str]):
self.feature_names = feature_names
self.results = {}
def retrain(self, X_new: np.ndarray, y_new: np.ndarray,
X_old: np.ndarray, y_old: np.ndarray,
strategies: List[str] = None):
"""
Try multiple retraining strategies and compare.
Strategies:
- 'new_only': Train only on recent data
- 'combined': Train on old + new data equally
- 'weighted': Train on old + new data with recency weighting
- 'expanding_window': Use all available data
"""
if strategies is None:
strategies = [
'new_only', 'combined', 'weighted',
'expanding_window'
]
results = {}
for strategy in strategies:
print(f"\nTraining with strategy: {strategy}")
X_train, y_train, sample_weight = (
self._prepare_data(
strategy, X_new, y_new, X_old, y_old
)
)
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('classifier', CalibratedClassifierCV(
GradientBoostingClassifier(
n_estimators=300,
learning_rate=0.05,
max_depth=4,
subsample=0.8,
random_state=42
),
cv=3,
method='isotonic'
))
])
# Time series cross-validation on the new data
tscv = TimeSeriesSplit(n_splits=3)
cv_briers = []
for train_idx, val_idx in tscv.split(X_new):
X_cv_train = X_new[train_idx]
y_cv_train = y_new[train_idx]
X_cv_val = X_new[val_idx]
y_cv_val = y_new[val_idx]
# For combined/weighted strategies, augment
# CV training data
if strategy in ('combined', 'weighted',
'expanding_window'):
X_cv_train = np.vstack([X_old, X_cv_train])
y_cv_train = np.concatenate([y_old, y_cv_train])
pipeline_copy = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('classifier', GradientBoostingClassifier(
n_estimators=300,
learning_rate=0.05,
max_depth=4,
subsample=0.8,
random_state=42
))
])
pipeline_copy.fit(X_cv_train, y_cv_train)
y_pred = pipeline_copy.predict_proba(X_cv_val)[:, 1]
cv_briers.append(
brier_score_loss(y_cv_val, y_pred)
)
# Final training on all data
pipeline.fit(X_train, y_train)
results[strategy] = {
'pipeline': pipeline,
'cv_brier_mean': np.mean(cv_briers),
'cv_brier_std': np.std(cv_briers),
'n_train': len(X_train),
}
print(f" CV Brier: {np.mean(cv_briers):.4f} "
f"+/- {np.std(cv_briers):.4f}")
self.results = results
return results
def _prepare_data(self, strategy, X_new, y_new,
X_old, y_old):
"""Prepare training data based on strategy."""
if strategy == 'new_only':
return X_new, y_new, None
elif strategy == 'combined':
X = np.vstack([X_old, X_new])
y = np.concatenate([y_old, y_new])
return X, y, None
elif strategy == 'weighted':
X = np.vstack([X_old, X_new])
y = np.concatenate([y_old, y_new])
# Weight recent data more heavily
weights = np.concatenate([
np.ones(len(X_old)) * 0.3,
np.ones(len(X_new)) * 1.0,
])
return X, y, weights
elif strategy == 'expanding_window':
X = np.vstack([X_old, X_new])
y = np.concatenate([y_old, y_new])
return X, y, None
else:
raise ValueError(f"Unknown strategy: {strategy}")
def select_best(self) -> Tuple[str, Pipeline]:
"""Select the best retraining strategy."""
best_strategy = min(
self.results.keys(),
key=lambda s: self.results[s]['cv_brier_mean']
)
best = self.results[best_strategy]
print(f"\nBest strategy: {best_strategy}")
print(f" CV Brier: {best['cv_brier_mean']:.4f}")
return best_strategy, best['pipeline']
# Run the retraining comparison
response = DriftResponsePipeline(feature_names)
results = response.retrain(
X_new=cur_features[:150], y_new=cur_actuals[:150],
X_old=ref_features, y_old=ref_actuals
)
best_strategy, best_pipeline = response.select_best()
Phase 4: Validating the Retrained Model
class ModelValidator:
"""Validate the retrained model before deployment."""
def __init__(self, old_model, new_model,
feature_names: List[str]):
self.old_model = old_model
self.new_model = new_model
self.feature_names = feature_names
def compare_on_holdout(self, X_test: np.ndarray,
y_test: np.ndarray) -> Dict:
"""Compare old and new models on holdout data."""
old_preds = self.old_model.predict_proba(X_test)[:, 1]
new_preds = self.new_model.predict_proba(X_test)[:, 1]
old_brier = brier_score_loss(y_test, old_preds)
new_brier = brier_score_loss(y_test, new_preds)
improvement = old_brier - new_brier
relative_improvement = improvement / (old_brier + 1e-8)
return {
'old_brier': old_brier,
'new_brier': new_brier,
'improvement': improvement,
'relative_improvement': relative_improvement,
'new_is_better': new_brier < old_brier,
'significant_improvement': relative_improvement > 0.05,
}
def stability_check(self, X_test: np.ndarray,
n_bootstrap: int = 100) -> Dict:
"""Check prediction stability via bootstrapping."""
new_preds = self.new_model.predict_proba(X_test)[:, 1]
n = len(X_test)
bootstrap_means = []
bootstrap_stds = []
for _ in range(n_bootstrap):
idx = np.random.choice(n, n, replace=True)
boot_preds = new_preds[idx]
bootstrap_means.append(np.mean(boot_preds))
bootstrap_stds.append(np.std(boot_preds))
return {
'prediction_mean': float(np.mean(new_preds)),
'prediction_std': float(np.std(new_preds)),
'bootstrap_mean_ci': (
float(np.percentile(bootstrap_means, 2.5)),
float(np.percentile(bootstrap_means, 97.5))
),
'bootstrap_std_ci': (
float(np.percentile(bootstrap_stds, 2.5)),
float(np.percentile(bootstrap_stds, 97.5))
),
}
def full_validation(self, X_test, y_test) -> Dict:
"""Complete validation report."""
comparison = self.compare_on_holdout(X_test, y_test)
stability = self.stability_check(X_test)
should_deploy = (
comparison['new_is_better'] and
comparison['new_brier'] < 0.25 # absolute threshold
)
return {
'comparison': comparison,
'stability': stability,
'should_deploy': should_deploy,
'decision': (
'DEPLOY' if should_deploy
else 'REJECT - keep current model'
),
}
# Validate on holdout (last 50 samples of current data)
X_holdout = cur_features[150:]
y_holdout = cur_actuals[150:]
# For this example, use a simple pipeline as the "old model"
from sklearn.pipeline import Pipeline as SKPipeline
old_pipeline = SKPipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('classifier', GradientBoostingClassifier(
n_estimators=300, learning_rate=0.05,
max_depth=4, random_state=42
))
])
old_pipeline.fit(ref_features, ref_actuals)
validator = ModelValidator(old_pipeline, best_pipeline, feature_names)
validation = validator.full_validation(X_holdout, y_holdout)
print("\n" + "=" * 60)
print("MODEL VALIDATION REPORT")
print("=" * 60)
print(f"\nDecision: {validation['decision']}")
print(f"\nComparison:")
comp = validation['comparison']
print(f" Old model Brier: {comp['old_brier']:.4f}")
print(f" New model Brier: {comp['new_brier']:.4f}")
print(f" Improvement: {comp['improvement']:.4f} "
f"({comp['relative_improvement']:.1%})")
print(f"\nStability:")
stab = validation['stability']
print(f" Prediction mean: {stab['prediction_mean']:.4f}")
print(f" Prediction std: {stab['prediction_std']:.4f}")
print(f" Mean 95% CI: {stab['bootstrap_mean_ci']}")
Phase 5: Setting Up Automated Monitoring
With the retrained model deployed, the team sets up automated monitoring to catch future drift early.
class AutomatedDriftResponse:
"""Automated system for detecting and responding to drift."""
def __init__(self, model, ref_predictions, ref_features,
ref_brier, feature_names,
retrain_callback=None):
self.diagnostics = DriftDiagnostics(
ref_predictions, ref_features,
feature_names, ref_brier
)
self.model = model
self.feature_names = feature_names
self.prediction_buffer = []
self.feature_buffer = []
self.actual_buffer = []
self.check_interval = 100 # check every 100 predictions
self.retrain_callback = retrain_callback
self.drift_history = []
def on_prediction(self, features: np.ndarray,
prediction: float):
"""Called after each prediction."""
self.prediction_buffer.append(prediction)
self.feature_buffer.append(features)
if len(self.prediction_buffer) >= self.check_interval:
self._run_checks()
def on_resolution(self, prediction: float, actual: int):
"""Called when a market resolves."""
self.actual_buffer.append((prediction, actual))
def _run_checks(self):
"""Run periodic drift checks."""
preds = np.array(self.prediction_buffer)
feats = np.array(self.feature_buffer)
# Check prediction drift
pred_report = self.diagnostics.diagnose_prediction_drift(
preds
)
pred_drift = pred_report['prediction_drift']
# Check feature drift
feat_report = self.diagnostics.diagnose_feature_drift(feats)
# Check concept drift if we have resolved markets
concept_report = None
if len(self.actual_buffer) >= 50:
recent = self.actual_buffer[-50:]
recent_preds = np.array([p for p, _ in recent])
recent_actuals = np.array([a for _, a in recent])
concept_report = self.diagnostics.diagnose_concept_drift(
recent_preds, recent_actuals
)
# Determine severity
severity = 'low'
if pred_drift['psi_severity'] == 'high':
severity = 'high'
elif pred_drift['psi_severity'] == 'medium':
severity = 'medium'
drifted_features = [
fname for fname, fr in feat_report.items()
if fr.get('psi_severity') in ('high', 'medium')
]
if len(drifted_features) >= 3:
severity = 'high'
elif len(drifted_features) >= 1 and severity == 'low':
severity = 'medium'
if (concept_report and
concept_report.get('is_degraded', False)):
severity = 'high'
# Record
record = {
'timestamp': datetime.now().isoformat(),
'severity': severity,
'prediction_psi': pred_drift['psi'],
'drifted_features': drifted_features,
'concept_degraded': (
concept_report.get('is_degraded', False)
if concept_report else None
),
}
self.drift_history.append(record)
# Respond
if severity == 'high':
print(f"\n[ALERT] HIGH severity drift detected!")
print(f" Prediction PSI: {pred_drift['psi']:.4f}")
print(f" Drifted features: {drifted_features}")
if concept_report:
print(f" Concept degraded: "
f"{concept_report['is_degraded']}")
if self.retrain_callback:
print(" --> Triggering automated retraining")
self.retrain_callback()
elif severity == 'medium':
print(f"\n[WARNING] Medium severity drift detected.")
print(f" Drifted features: {drifted_features}")
# Reset buffers
self.prediction_buffer = []
self.feature_buffer = []
# Set up automated monitoring
def trigger_retraining():
"""Callback to trigger model retraining."""
print("Retraining pipeline initiated...")
# In production, this would call the AutomatedTrainingPipeline
pass
monitor = AutomatedDriftResponse(
model=best_pipeline,
ref_predictions=best_pipeline.predict_proba(
cur_features[:150])[:, 1],
ref_features=cur_features[:150],
ref_brier=brier_score_loss(
cur_actuals[:150],
best_pipeline.predict_proba(cur_features[:150])[:, 1]
),
feature_names=feature_names,
retrain_callback=trigger_retraining,
)
print("Automated drift monitoring is active.")
print(f"Checks run every {monitor.check_interval} predictions.")
Results
After implementing the drift detection and response system:
| Metric | Before (manual) | After (automated) |
|---|---|---|
| Time to detect drift | 3 weeks | < 1 hour |
| Time to retrain | 1-2 days | 30 minutes (automated) |
| Performance during drift | Brier 0.28 (degraded) | Brier 0.20 (retrained) |
| Losses during drift period | ~$25,000 | ~$3,000 | |
| False alarms (per month) | N/A | 2 (manageable) |
Key Lessons
-
Drift is inevitable in prediction markets: Market dynamics change as events approach, as new information arrives, and as participant behavior shifts. The question is not whether drift will occur, but how quickly you detect and respond to it.
-
Multiple drift types require different responses: Data drift (feature distributions change) can sometimes be addressed by feature engineering. Concept drift (relationships change) requires retraining. Upstream data issues require fixing the data pipeline.
-
Not all retraining strategies are equal: Training on only recent data risks losing valuable historical patterns. Training on all data equally risks learning outdated patterns. Recency-weighted training often provides the best balance.
-
Validation before deployment is non-negotiable: Even when drift is detected, the retrained model must pass quality gates before replacing the production model. A bad retrained model is worse than a slightly drifted one.
-
Baselines must evolve: After successfully retraining, update the reference baseline. The new model's predictions become the new "normal" for future drift detection. Failing to update baselines leads to false positives.
-
Combine statistical tests: No single drift test captures all types of distributional change. PSI, KS test, and Wasserstein distance each detect different patterns. Use them together for comprehensive monitoring.