Case Study 2: From Raw Data to Deployed Model
Overview
This case study walks through the complete end-to-end ML pipeline for a prediction market trading system: from raw data collection through feature engineering, model selection, calibration, deployment, and monitoring. We build a production-ready system that ingests new data, generates calibrated probability estimates, and monitors its own performance over time.
Scenario
A quantitative trading team operates on a prediction market platform that lists hundreds of binary event contracts across politics, economics, and sports. The team wants to build an automated system that:
- Collects and processes raw data daily
- Engineers predictive features
- Generates calibrated probability estimates
- Compares estimates against market prices to find trading opportunities
- Monitors model performance and triggers retraining when needed
The system must handle the entire lifecycle from raw data to live predictions.
Phase 1: Data Collection and Storage
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import json
import os
class DataCollector:
"""Collects and stores prediction market data."""
def __init__(self, data_dir='data'):
self.data_dir = data_dir
os.makedirs(data_dir, exist_ok=True)
def collect_market_data(self, n_events=5000, n_days=365):
"""Simulate collecting market data over time.
In production, this would interface with a prediction market API
to fetch contract prices, volumes, and event metadata.
"""
np.random.seed(42)
records = []
for event_id in range(n_events):
# Event metadata
event_type = np.random.choice(
['politics', 'economics', 'sports'],
p=[0.3, 0.3, 0.4]
)
resolution_date = datetime(2024, 1, 1) + timedelta(
days=np.random.randint(30, n_days)
)
# Number of snapshots for this event
n_snapshots = np.random.randint(5, 30)
for snap in range(n_snapshots):
days_before = np.random.randint(1, 180)
snapshot_date = resolution_date - timedelta(days=days_before)
record = {
'event_id': event_id,
'snapshot_date': snapshot_date,
'resolution_date': resolution_date,
'days_to_resolution': days_before,
'event_type': event_type,
# Market features
'market_price': np.clip(
np.random.beta(3, 3) + np.random.normal(0, 0.05), 0.01, 0.99
),
'bid_ask_spread': np.random.exponential(0.02),
'volume_24h': np.random.lognormal(5, 2),
'volume_7d': np.random.lognormal(7, 2),
'price_change_1d': np.random.normal(0, 0.03),
'price_change_7d': np.random.normal(0, 0.06),
'price_volatility_7d': np.random.exponential(0.04),
'price_volatility_30d': np.random.exponential(0.06),
# Domain-specific features
'primary_indicator': np.random.normal(50, 15),
'secondary_indicator': np.random.normal(0, 5),
'sentiment_positive': np.random.beta(2, 5),
'sentiment_negative': np.random.beta(2, 5),
'expert_consensus': np.clip(
np.random.normal(0.5, 0.2), 0, 1
),
'related_market_price': np.clip(
np.random.beta(3, 3), 0.01, 0.99
),
'news_volume': np.random.poisson(5),
'social_media_mentions': np.random.lognormal(3, 1.5),
}
records.append(record)
df = pd.DataFrame(records)
# Generate outcomes
event_outcomes = {}
for event_id in df['event_id'].unique():
event_data = df[df['event_id'] == event_id].iloc[-1]
logit = (
0.03 * event_data['primary_indicator']
+ 0.2 * event_data['secondary_indicator']
+ 0.4 * event_data['sentiment_positive']
- 0.3 * event_data['sentiment_negative']
+ 0.5 * event_data['expert_consensus']
+ 0.3 * event_data['market_price']
- 0.1 * event_data['price_volatility_30d']
+ 0.01 * event_data['primary_indicator'] * event_data['secondary_indicator'] / 10
- 3.0
)
prob = 1 / (1 + np.exp(-logit))
event_outcomes[event_id] = np.random.binomial(1, prob)
df['outcome'] = df['event_id'].map(event_outcomes)
# Sort by snapshot date
df = df.sort_values('snapshot_date').reset_index(drop=True)
# Save
df.to_parquet(f'{self.data_dir}/raw_market_data.parquet', index=False)
print(f"Collected {len(df)} records for {n_events} events")
print(f"Outcome distribution: {df.groupby('event_id')['outcome'].first().value_counts().to_dict()}")
return df
# Collect data
collector = DataCollector()
raw_data = collector.collect_market_data()
print(f"\nDataset shape: {raw_data.shape}")
print(f"Date range: {raw_data['snapshot_date'].min()} to {raw_data['snapshot_date'].max()}")
Phase 2: Feature Engineering Pipeline
class FeatureEngineer:
"""Transforms raw data into ML-ready features."""
def __init__(self):
self.feature_names = None
def build_features(self, df):
"""Generate all features from raw data."""
feat = df.copy()
# === Time-based features ===
feat['days_to_resolution_log'] = np.log1p(feat['days_to_resolution'])
feat['time_pressure'] = 1.0 / (1.0 + feat['days_to_resolution'])
feat['is_last_week'] = (feat['days_to_resolution'] <= 7).astype(int)
feat['is_last_month'] = (feat['days_to_resolution'] <= 30).astype(int)
# === Market microstructure features ===
feat['spread_pct'] = feat['bid_ask_spread'] / (feat['market_price'] + 1e-6)
feat['volume_ratio'] = feat['volume_24h'] / (feat['volume_7d'] / 7 + 1e-6)
feat['volume_log'] = np.log1p(feat['volume_24h'])
feat['volume_7d_log'] = np.log1p(feat['volume_7d'])
# === Price momentum features ===
feat['momentum_7d'] = feat['price_change_7d'] / (feat['price_volatility_7d'] + 1e-6)
feat['vol_regime'] = (feat['price_volatility_30d'] > feat['price_volatility_30d'].median()).astype(int)
feat['price_distance_from_half'] = abs(feat['market_price'] - 0.5)
# === Sentiment features ===
feat['sentiment_net'] = feat['sentiment_positive'] - feat['sentiment_negative']
feat['sentiment_ratio'] = feat['sentiment_positive'] / (
feat['sentiment_positive'] + feat['sentiment_negative'] + 1e-6
)
feat['sentiment_total'] = feat['sentiment_positive'] + feat['sentiment_negative']
# === Interaction features ===
feat['indicator_x_sentiment'] = feat['primary_indicator'] * feat['sentiment_net']
feat['consensus_x_market'] = feat['expert_consensus'] * feat['market_price']
feat['consensus_market_diff'] = feat['expert_consensus'] - feat['market_price']
feat['volume_x_volatility'] = feat['volume_log'] * feat['price_volatility_7d']
feat['time_x_volatility'] = feat['time_pressure'] * feat['price_volatility_7d']
# === Social/news features ===
feat['news_volume_log'] = np.log1p(feat['news_volume'])
feat['social_log'] = np.log1p(feat['social_media_mentions'])
feat['news_per_social'] = feat['news_volume'] / (feat['social_media_mentions'] + 1e-6)
# === Derived indicator features ===
feat['indicator_normalized'] = (
(feat['primary_indicator'] - feat['primary_indicator'].mean()) /
(feat['primary_indicator'].std() + 1e-6)
)
feat['indicator_squared'] = feat['primary_indicator'] ** 2
# === Event type encoding ===
for etype in ['politics', 'economics', 'sports']:
feat[f'is_{etype}'] = (feat['event_type'] == etype).astype(int)
# Define final feature columns
exclude_cols = [
'event_id', 'snapshot_date', 'resolution_date',
'event_type', 'outcome'
]
self.feature_names = [c for c in feat.columns if c not in exclude_cols]
return feat
def get_feature_matrix(self, df):
"""Return feature matrix and target."""
feat = self.build_features(df)
X = feat[self.feature_names].values
y = feat['outcome'].values
return X, y, self.feature_names, feat
# Build features
engineer = FeatureEngineer()
X, y, feature_names, feat_data = engineer.get_feature_matrix(raw_data)
print(f"Feature matrix shape: {X.shape}")
print(f"Number of features: {len(feature_names)}")
print(f"\nFeature names:")
for i, name in enumerate(feature_names):
print(f" {i+1:2d}. {name}")
Phase 3: Model Training and Selection
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import brier_score_loss, log_loss
from sklearn.calibration import calibration_curve
import matplotlib.pyplot as plt
class ModelTrainer:
"""Trains and evaluates multiple model types."""
def __init__(self, feature_names):
self.feature_names = feature_names
self.models = {}
self.scalers = {}
def temporal_split(self, feat_data, train_frac=0.6, val_frac=0.2):
"""Split data temporally."""
n = len(feat_data)
n_train = int(n * train_frac)
n_val = int(n * val_frac)
train = feat_data.iloc[:n_train]
val = feat_data.iloc[n_train:n_train + n_val]
test = feat_data.iloc[n_train + n_val:]
X_train = train[self.feature_names].values
y_train = train['outcome'].values
X_val = val[self.feature_names].values
y_val = val['outcome'].values
X_test = test[self.feature_names].values
y_test = test['outcome'].values
# Handle NaN and Inf
for arr in [X_train, X_val, X_test]:
arr[np.isnan(arr)] = 0
arr[np.isinf(arr)] = 0
return (X_train, y_train), (X_val, y_val), (X_test, y_test)
def train_logistic(self, X_train, y_train):
"""Train logistic regression baseline."""
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_train)
model = LogisticRegression(
C=1.0, max_iter=1000, solver='lbfgs', random_state=42
)
model.fit(X_scaled, y_train)
self.models['logistic'] = model
self.scalers['logistic'] = scaler
return model
def train_random_forest(self, X_train, y_train):
"""Train random forest."""
model = RandomForestClassifier(
n_estimators=300,
max_depth=8,
min_samples_leaf=20,
max_features='sqrt',
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
self.models['rf'] = model
return model
def train_xgboost(self, X_train, y_train, X_val, y_val):
"""Train XGBoost with early stopping."""
dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=self.feature_names)
dval = xgb.DMatrix(X_val, label=y_val, feature_names=self.feature_names)
params = {
'objective': 'binary:logistic',
'eval_metric': 'logloss',
'max_depth': 6,
'learning_rate': 0.05,
'subsample': 0.8,
'colsample_bytree': 0.7,
'min_child_weight': 15,
'reg_alpha': 0.1,
'reg_lambda': 1.5,
'gamma': 0.2,
'seed': 42,
}
model = xgb.train(
params, dtrain,
num_boost_round=1000,
evals=[(dtrain, 'train'), (dval, 'val')],
early_stopping_rounds=50,
verbose_eval=False
)
self.models['xgboost'] = model
return model
def predict(self, model_name, X):
"""Generate predictions from a named model."""
if model_name == 'logistic':
X_scaled = self.scalers['logistic'].transform(X)
return self.models['logistic'].predict_proba(X_scaled)[:, 1]
elif model_name == 'rf':
return self.models['rf'].predict_proba(X)[:, 1]
elif model_name == 'xgboost':
dmat = xgb.DMatrix(X, feature_names=self.feature_names)
return self.models['xgboost'].predict(dmat)
else:
raise ValueError(f"Unknown model: {model_name}")
def evaluate_all(self, X, y, dataset_name='Test'):
"""Evaluate all trained models."""
results = {}
for name in self.models:
probs = self.predict(name, X)
probs = np.clip(probs, 1e-7, 1 - 1e-7)
results[name] = {
'Brier Score': brier_score_loss(y, probs),
'Log Loss': log_loss(y, probs),
}
results_df = pd.DataFrame(results).T
print(f"\n{dataset_name} Set Results:")
print(results_df.to_string())
return results_df
# Train models
trainer = ModelTrainer(feature_names)
(X_train, y_train), (X_val, y_val), (X_test, y_test) = trainer.temporal_split(feat_data)
print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")
trainer.train_logistic(X_train, y_train)
trainer.train_random_forest(X_train, y_train)
trainer.train_xgboost(X_train, y_train, X_val, y_val)
# Evaluate
val_results = trainer.evaluate_all(X_val, y_val, 'Validation')
test_results = trainer.evaluate_all(X_test, y_test, 'Test')
Phase 4: Calibration
from sklearn.isotonic import IsotonicRegression
class CalibrationPipeline:
"""Calibrates model probability outputs."""
def __init__(self):
self.calibrators = {}
def fit(self, model_name, probs_val, y_val, method='platt'):
"""Fit calibration model on validation predictions."""
if method == 'platt':
cal = LogisticRegression(C=1e10, solver='lbfgs')
cal.fit(probs_val.reshape(-1, 1), y_val)
elif method == 'isotonic':
cal = IsotonicRegression(out_of_bounds='clip')
cal.fit(probs_val, y_val)
else:
raise ValueError(f"Unknown method: {method}")
key = f"{model_name}_{method}"
self.calibrators[key] = (cal, method)
return cal
def transform(self, model_name, probs, method='platt'):
"""Apply calibration to new predictions."""
key = f"{model_name}_{method}"
cal, cal_method = self.calibrators[key]
if cal_method == 'platt':
return cal.predict_proba(probs.reshape(-1, 1))[:, 1]
else:
return cal.predict(probs)
def fit_and_evaluate(self, trainer, X_val, y_val, X_test, y_test):
"""Fit calibration for all models and evaluate."""
results = []
for model_name in trainer.models:
probs_val = trainer.predict(model_name, X_val)
probs_test = trainer.predict(model_name, X_test)
# Raw performance
results.append({
'Model': model_name,
'Calibration': 'None',
'Brier': brier_score_loss(y_test, probs_test),
'LogLoss': log_loss(y_test, np.clip(probs_test, 1e-7, 1-1e-7)),
})
# Platt scaling
self.fit(model_name, probs_val, y_val, 'platt')
probs_platt = self.transform(model_name, probs_test, 'platt')
results.append({
'Model': model_name,
'Calibration': 'Platt',
'Brier': brier_score_loss(y_test, probs_platt),
'LogLoss': log_loss(y_test, np.clip(probs_platt, 1e-7, 1-1e-7)),
})
# Isotonic regression
self.fit(model_name, probs_val, y_val, 'isotonic')
probs_iso = self.transform(model_name, probs_test, 'isotonic')
results.append({
'Model': model_name,
'Calibration': 'Isotonic',
'Brier': brier_score_loss(y_test, probs_iso),
'LogLoss': log_loss(y_test, np.clip(probs_iso, 1e-7, 1-1e-7)),
})
results_df = pd.DataFrame(results)
print("\nCalibration Comparison (Test Set):")
print(results_df.to_string(index=False))
return results_df
# Run calibration
calibrator = CalibrationPipeline()
cal_results = calibrator.fit_and_evaluate(trainer, X_val, y_val, X_test, y_test)
# Select best model + calibration combination
best_row = cal_results.loc[cal_results['Brier'].idxmin()]
print(f"\nBest combination: {best_row['Model']} + {best_row['Calibration']}")
print(f" Brier Score: {best_row['Brier']:.4f}")
print(f" Log Loss: {best_row['LogLoss']:.4f}")
Phase 5: SHAP Interpretability Analysis
import shap
class InterpretabilityAnalyzer:
"""Analyze model predictions using SHAP."""
def __init__(self, model, X_data, feature_names):
self.model = model
self.feature_names = feature_names
self.explainer = shap.TreeExplainer(model)
self.shap_values = self.explainer.shap_values(
xgb.DMatrix(X_data, feature_names=feature_names)
)
self.X_data = X_data
def global_importance(self, top_n=15):
"""Compute and display global feature importance."""
mean_abs = np.abs(self.shap_values).mean(axis=0)
importance = pd.Series(
mean_abs, index=self.feature_names
).sort_values(ascending=False)
print(f"\nTop {top_n} Features by Mean |SHAP|:")
for i, (feat, val) in enumerate(importance.head(top_n).items()):
print(f" {i+1:2d}. {feat:30s} {val:.4f}")
return importance
def explain_prediction(self, idx):
"""Explain a single prediction."""
base_value = self.explainer.expected_value
prediction = base_value + self.shap_values[idx].sum()
print(f"\nPrediction Explanation (index={idx}):")
print(f" Base value (average): {base_value:.4f}")
print(f" Model output (logit): {prediction:.4f}")
print(f" Predicted probability: {1/(1+np.exp(-prediction)):.4f}")
print(f" Top contributing features:")
contributions = pd.Series(
self.shap_values[idx], index=self.feature_names
)
top_pos = contributions.nlargest(5)
top_neg = contributions.nsmallest(5)
print(f" Pushing UP:")
for feat, val in top_pos.items():
if val > 0.001:
feat_val = self.X_data[idx][self.feature_names.index(feat)]
print(f" {feat:30s} SHAP={val:+.4f} (value={feat_val:.3f})")
print(f" Pushing DOWN:")
for feat, val in top_neg.items():
if val < -0.001:
feat_val = self.X_data[idx][self.feature_names.index(feat)]
print(f" {feat:30s} SHAP={val:+.4f} (value={feat_val:.3f})")
def plot_summary(self, save_path='shap_summary.png'):
"""Create and save SHAP summary plot."""
X_df = pd.DataFrame(self.X_data, columns=self.feature_names)
shap.summary_plot(self.shap_values, X_df, show=False, max_display=15)
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close()
print(f"Summary plot saved to {save_path}")
# Run SHAP analysis on XGBoost (assumed best model)
analyzer = InterpretabilityAnalyzer(
trainer.models['xgboost'], X_test, feature_names
)
importance = analyzer.global_importance()
analyzer.explain_prediction(0)
analyzer.explain_prediction(len(X_test) - 1)
analyzer.plot_summary()
Phase 6: Deployment System
import joblib
from datetime import datetime
class DeployedModel:
"""Production-ready prediction model."""
def __init__(self, model_dir='deployed_model'):
self.model_dir = model_dir
os.makedirs(model_dir, exist_ok=True)
self.model = None
self.calibrator_model = None
self.feature_engineer = None
self.metadata = None
def save(self, xgb_model, calibrator, feature_engineer, feature_names,
val_metrics, test_metrics):
"""Save all components for deployment."""
# Save XGBoost model
xgb_model.save_model(f'{self.model_dir}/model.json')
# Save calibrator
joblib.dump(calibrator, f'{self.model_dir}/calibrator.joblib')
# Save metadata
self.metadata = {
'model_type': 'xgboost',
'calibration_method': 'platt',
'feature_names': feature_names,
'n_features': len(feature_names),
'deployment_date': datetime.now().isoformat(),
'validation_brier': float(val_metrics.get('Brier Score', 0)),
'test_brier': float(test_metrics.get('Brier Score', 0)),
'validation_logloss': float(val_metrics.get('Log Loss', 0)),
'test_logloss': float(test_metrics.get('Log Loss', 0)),
'version': '1.0.0',
}
with open(f'{self.model_dir}/metadata.json', 'w') as f:
json.dump(self.metadata, f, indent=2)
print(f"Model saved to {self.model_dir}/")
print(f" Model file: model.json")
print(f" Calibrator: calibrator.joblib")
print(f" Metadata: metadata.json")
def load(self):
"""Load deployed model."""
self.model = xgb.Booster()
self.model.load_model(f'{self.model_dir}/model.json')
self.calibrator_model = joblib.load(f'{self.model_dir}/calibrator.joblib')
with open(f'{self.model_dir}/metadata.json') as f:
self.metadata = json.load(f)
self.feature_names = self.metadata['feature_names']
print(f"Loaded model v{self.metadata['version']}")
def predict(self, features_array):
"""Generate calibrated probability prediction."""
if self.model is None:
self.load()
# Handle NaN/Inf
features_clean = np.nan_to_num(features_array, nan=0.0, posinf=0.0, neginf=0.0)
# Raw prediction
if features_clean.ndim == 1:
features_clean = features_clean.reshape(1, -1)
dmat = xgb.DMatrix(features_clean, feature_names=self.feature_names)
raw_probs = self.model.predict(dmat)
# Calibrate
calibrated_probs = self.calibrator_model.predict_proba(
raw_probs.reshape(-1, 1)
)[:, 1]
return {
'calibrated_probability': calibrated_probs.tolist(),
'raw_probability': raw_probs.tolist(),
'model_version': self.metadata['version'],
'timestamp': datetime.now().isoformat(),
}
def find_opportunities(self, features_array, market_prices, threshold=0.05):
"""Find trading opportunities where model disagrees with market."""
result = self.predict(features_array)
model_probs = np.array(result['calibrated_probability'])
opportunities = []
for i in range(len(model_probs)):
edge = model_probs[i] - market_prices[i]
if abs(edge) > threshold:
opportunities.append({
'index': i,
'model_prob': float(model_probs[i]),
'market_price': float(market_prices[i]),
'edge': float(edge),
'direction': 'BUY' if edge > 0 else 'SELL',
'confidence': abs(edge),
})
opportunities.sort(key=lambda x: x['confidence'], reverse=True)
return opportunities
# Deploy the model
deployed = DeployedModel()
# Get validation metrics for the best model
xgb_probs_val = trainer.predict('xgboost', X_val)
val_metrics = {
'Brier Score': brier_score_loss(y_val, xgb_probs_val),
'Log Loss': log_loss(y_val, np.clip(xgb_probs_val, 1e-7, 1-1e-7)),
}
xgb_probs_test = trainer.predict('xgboost', X_test)
test_metrics = {
'Brier Score': brier_score_loss(y_test, xgb_probs_test),
'Log Loss': log_loss(y_test, np.clip(xgb_probs_test, 1e-7, 1-1e-7)),
}
# Save Platt calibrator
platt_cal = calibrator.calibrators['xgboost_platt'][0]
deployed.save(
trainer.models['xgboost'],
platt_cal,
engineer,
feature_names,
val_metrics,
test_metrics
)
# Test prediction
print("\n--- Test Prediction ---")
test_prediction = deployed.predict(X_test[0])
print(f"Raw probability: {test_prediction['raw_probability'][0]:.4f}")
print(f"Calibrated probability: {test_prediction['calibrated_probability'][0]:.4f}")
# Find opportunities
market_prices_sim = np.clip(xgb_probs_test + np.random.normal(0, 0.08, len(xgb_probs_test)), 0.01, 0.99)
opportunities = deployed.find_opportunities(X_test, market_prices_sim, threshold=0.06)
print(f"\nFound {len(opportunities)} trading opportunities (threshold=6%)")
if opportunities:
print("\nTop 5 opportunities:")
for opp in opportunities[:5]:
print(f" {opp['direction']:4s} Edge={opp['edge']:+.3f} "
f"Model={opp['model_prob']:.3f} Market={opp['market_price']:.3f}")
Phase 7: Monitoring System
class ProductionMonitor:
"""Monitors deployed model performance in production."""
def __init__(self, model_metadata, alert_threshold=0.03):
self.metadata = model_metadata
self.alert_threshold = alert_threshold
self.prediction_log = []
self.performance_history = []
self.alerts = []
def log_prediction(self, event_id, features, model_prob, market_price):
"""Log a prediction for later evaluation."""
self.prediction_log.append({
'timestamp': datetime.now().isoformat(),
'event_id': event_id,
'model_prob': model_prob,
'market_price': market_price,
'outcome': None, # filled in later
})
def record_outcome(self, event_id, outcome):
"""Record the actual outcome for a previously predicted event."""
for entry in self.prediction_log:
if entry['event_id'] == event_id and entry['outcome'] is None:
entry['outcome'] = outcome
def compute_rolling_metrics(self, window=100):
"""Compute rolling performance metrics."""
resolved = [e for e in self.prediction_log if e['outcome'] is not None]
if len(resolved) < window:
return None
recent = resolved[-window:]
probs = np.array([e['model_prob'] for e in recent])
outcomes = np.array([e['outcome'] for e in recent])
market_probs = np.array([e['market_price'] for e in recent])
model_brier = np.mean((outcomes - probs) ** 2)
market_brier = np.mean((outcomes - market_probs) ** 2)
metrics = {
'timestamp': datetime.now().isoformat(),
'window': window,
'n_resolved': len(resolved),
'model_brier': float(model_brier),
'market_brier': float(market_brier),
'model_vs_market': float(model_brier - market_brier),
'model_logloss': float(log_loss(outcomes, np.clip(probs, 1e-7, 1-1e-7))),
}
self.performance_history.append(metrics)
return metrics
def check_calibration(self, n_bins=5):
"""Check if model is still well-calibrated."""
resolved = [e for e in self.prediction_log if e['outcome'] is not None]
if len(resolved) < 100:
return "Insufficient data"
probs = np.array([e['model_prob'] for e in resolved])
outcomes = np.array([e['outcome'] for e in resolved])
bins = np.linspace(0, 1, n_bins + 1)
report = []
total_ece = 0.0
for i in range(n_bins):
mask = (probs >= bins[i]) & (probs < bins[i+1])
if mask.sum() > 0:
expected = probs[mask].mean()
observed = outcomes[mask].mean()
gap = abs(expected - observed)
total_ece += mask.sum() / len(probs) * gap
report.append({
'bin': f'{bins[i]:.2f}-{bins[i+1]:.2f}',
'count': int(mask.sum()),
'expected': f'{expected:.3f}',
'observed': f'{observed:.3f}',
'gap': f'{gap:.3f}',
})
return {
'ECE': float(total_ece),
'bins': report,
'needs_recalibration': total_ece > 0.05,
}
def check_for_drift(self):
"""Detect performance degradation."""
if len(self.performance_history) < 2:
return {'drift_detected': False, 'message': 'Insufficient history'}
baseline_brier = self.metadata.get('test_brier', 0.25)
recent_brier = self.performance_history[-1]['model_brier']
degradation = recent_brier - baseline_brier
alert = {
'drift_detected': degradation > self.alert_threshold,
'baseline_brier': baseline_brier,
'current_brier': recent_brier,
'degradation': degradation,
'message': (
f"ALERT: Brier degraded by {degradation:.4f}"
if degradation > self.alert_threshold
else "Performance within acceptable range"
),
}
if alert['drift_detected']:
self.alerts.append({
'timestamp': datetime.now().isoformat(),
'type': 'performance_degradation',
'details': alert,
})
return alert
def generate_report(self):
"""Generate a monitoring report."""
resolved = [e for e in self.prediction_log if e['outcome'] is not None]
report = {
'model_version': self.metadata.get('version', 'unknown'),
'total_predictions': len(self.prediction_log),
'resolved_predictions': len(resolved),
'pending_predictions': len(self.prediction_log) - len(resolved),
'alerts': len(self.alerts),
}
if len(resolved) > 0:
probs = np.array([e['model_prob'] for e in resolved])
outcomes = np.array([e['outcome'] for e in resolved])
report['overall_brier'] = float(np.mean((outcomes - probs) ** 2))
report['overall_logloss'] = float(
log_loss(outcomes, np.clip(probs, 1e-7, 1-1e-7))
)
calibration = self.check_calibration()
if isinstance(calibration, dict):
report['calibration_ece'] = calibration['ECE']
report['needs_recalibration'] = calibration['needs_recalibration']
drift = self.check_for_drift()
report['drift_detected'] = drift['drift_detected']
return report
# Simulate monitoring
monitor = ProductionMonitor(deployed.metadata, alert_threshold=0.03)
# Simulate 500 predictions and outcomes
np.random.seed(123)
for i in range(500):
event_id = f"event_{i}"
idx = i % len(X_test)
pred = deployed.predict(X_test[idx:idx+1])
model_prob = pred['calibrated_probability'][0]
market_price = np.clip(model_prob + np.random.normal(0, 0.06), 0.01, 0.99)
monitor.log_prediction(event_id, X_test[idx], model_prob, market_price)
# Simulate outcome
true_prob = model_prob + np.random.normal(0, 0.1)
outcome = int(np.random.random() < np.clip(true_prob, 0, 1))
monitor.record_outcome(event_id, outcome)
# Check metrics periodically
if (i + 1) % 100 == 0:
metrics = monitor.compute_rolling_metrics(window=100)
if metrics:
print(f"After {i+1} predictions: "
f"Model Brier={metrics['model_brier']:.4f}, "
f"Market Brier={metrics['market_brier']:.4f}, "
f"Difference={metrics['model_vs_market']:+.4f}")
# Generate final report
report = monitor.generate_report()
print("\n" + "=" * 60)
print("MONITORING REPORT")
print("=" * 60)
for key, value in report.items():
print(f" {key}: {value}")
# Check calibration detail
cal_check = monitor.check_calibration()
if isinstance(cal_check, dict):
print(f"\nCalibration Check (ECE={cal_check['ECE']:.4f}):")
for bin_info in cal_check['bins']:
print(f" Bin {bin_info['bin']}: "
f"Count={bin_info['count']:4s}, "
f"Expected={bin_info['expected']}, "
f"Observed={bin_info['observed']}, "
f"Gap={bin_info['gap']}")
Phase 8: Retraining Pipeline
class RetrainingPipeline:
"""Automated retraining when performance degrades."""
def __init__(self, model_trainer, calibrator, deployed_model, monitor):
self.trainer = model_trainer
self.calibrator = calibrator
self.deployed = deployed_model
self.monitor = monitor
def should_retrain(self):
"""Determine if retraining is needed."""
drift_check = self.monitor.check_for_drift()
cal_check = self.monitor.check_calibration()
reasons = []
if drift_check.get('drift_detected', False):
reasons.append('Performance degradation detected')
if isinstance(cal_check, dict) and cal_check.get('needs_recalibration', False):
reasons.append('Calibration drift detected')
return len(reasons) > 0, reasons
def retrain(self, new_data_feat, feature_names):
"""Retrain models on new data."""
print("\n" + "=" * 50)
print("RETRAINING PIPELINE")
print("=" * 50)
# Split new data
(X_tr, y_tr), (X_v, y_v), (X_te, y_te) = self.trainer.temporal_split(new_data_feat)
# Retrain all models
self.trainer.train_logistic(X_tr, y_tr)
self.trainer.train_random_forest(X_tr, y_tr)
self.trainer.train_xgboost(X_tr, y_tr, X_v, y_v)
# Evaluate
print("\nRetrained model performance:")
self.trainer.evaluate_all(X_te, y_te, 'Test')
# Recalibrate
cal_results = self.calibrator.fit_and_evaluate(
self.trainer, X_v, y_v, X_te, y_te
)
# Select best
best_idx = cal_results['Brier'].idxmin()
best = cal_results.loc[best_idx]
print(f"\nBest: {best['Model']} + {best['Calibration']} (Brier={best['Brier']:.4f})")
# Compare with current production model
current_brier = self.deployed.metadata.get('test_brier', 1.0)
improvement = current_brier - best['Brier']
print(f"Current production Brier: {current_brier:.4f}")
print(f"New model Brier: {best['Brier']:.4f}")
print(f"Improvement: {improvement:+.4f}")
if improvement > 0.005:
print("RECOMMENDATION: Deploy new model")
return True
else:
print("RECOMMENDATION: Keep current model (improvement too small)")
return False
# Demonstrate retraining check
pipeline = RetrainingPipeline(trainer, calibrator, deployed, monitor)
needs_retrain, reasons = pipeline.should_retrain()
print(f"\nNeeds retraining: {needs_retrain}")
if reasons:
print(f"Reasons: {', '.join(reasons)}")
Key Takeaways
-
End-to-end pipeline design matters. Each phase (data collection, feature engineering, model training, calibration, deployment, monitoring) must be designed to work together. A strong model with poor calibration or monitoring will underperform in production.
-
Feature engineering is the highest-leverage activity. The raw features (market price, volume, sentiment) are just the starting point. Thoughtful engineering (momentum, interactions, time-based features) can substantially improve performance.
-
Model selection should be systematic. Train multiple model types, calibrate each, and compare with proper metrics. The best model on raw accuracy may not be the best after calibration.
-
Monitoring is not optional. Without monitoring, you will not know when your model's edge has disappeared. The monitoring system should track calibration, Brier scores, and feature distribution drift.
-
Retraining should be triggered by evidence, not by schedule alone. While periodic retraining is a reasonable baseline, drift detection enables faster response to genuine distributional shifts.
-
The deployment interface should be clean and defensive. Input validation, NaN handling, and metadata logging are not luxuries — they are requirements for production ML systems.