Case Study 2: From Raw Data to Deployed Model

Overview

This case study walks through the complete end-to-end ML pipeline for a prediction market trading system: from raw data collection through feature engineering, model selection, calibration, deployment, and monitoring. We build a production-ready system that ingests new data, generates calibrated probability estimates, and monitors its own performance over time.

Scenario

A quantitative trading team operates on a prediction market platform that lists hundreds of binary event contracts across politics, economics, and sports. The team wants to build an automated system that:

  1. Collects and processes raw data daily
  2. Engineers predictive features
  3. Generates calibrated probability estimates
  4. Compares estimates against market prices to find trading opportunities
  5. Monitors model performance and triggers retraining when needed

The system must handle the entire lifecycle from raw data to live predictions.

Phase 1: Data Collection and Storage

import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import json
import os

class DataCollector:
    """Collects and stores prediction market data."""

    def __init__(self, data_dir='data'):
        self.data_dir = data_dir
        os.makedirs(data_dir, exist_ok=True)

    def collect_market_data(self, n_events=5000, n_days=365):
        """Simulate collecting market data over time.

        In production, this would interface with a prediction market API
        to fetch contract prices, volumes, and event metadata.
        """
        np.random.seed(42)
        records = []

        for event_id in range(n_events):
            # Event metadata
            event_type = np.random.choice(
                ['politics', 'economics', 'sports'],
                p=[0.3, 0.3, 0.4]
            )
            resolution_date = datetime(2024, 1, 1) + timedelta(
                days=np.random.randint(30, n_days)
            )

            # Number of snapshots for this event
            n_snapshots = np.random.randint(5, 30)

            for snap in range(n_snapshots):
                days_before = np.random.randint(1, 180)
                snapshot_date = resolution_date - timedelta(days=days_before)

                record = {
                    'event_id': event_id,
                    'snapshot_date': snapshot_date,
                    'resolution_date': resolution_date,
                    'days_to_resolution': days_before,
                    'event_type': event_type,
                    # Market features
                    'market_price': np.clip(
                        np.random.beta(3, 3) + np.random.normal(0, 0.05), 0.01, 0.99
                    ),
                    'bid_ask_spread': np.random.exponential(0.02),
                    'volume_24h': np.random.lognormal(5, 2),
                    'volume_7d': np.random.lognormal(7, 2),
                    'price_change_1d': np.random.normal(0, 0.03),
                    'price_change_7d': np.random.normal(0, 0.06),
                    'price_volatility_7d': np.random.exponential(0.04),
                    'price_volatility_30d': np.random.exponential(0.06),
                    # Domain-specific features
                    'primary_indicator': np.random.normal(50, 15),
                    'secondary_indicator': np.random.normal(0, 5),
                    'sentiment_positive': np.random.beta(2, 5),
                    'sentiment_negative': np.random.beta(2, 5),
                    'expert_consensus': np.clip(
                        np.random.normal(0.5, 0.2), 0, 1
                    ),
                    'related_market_price': np.clip(
                        np.random.beta(3, 3), 0.01, 0.99
                    ),
                    'news_volume': np.random.poisson(5),
                    'social_media_mentions': np.random.lognormal(3, 1.5),
                }
                records.append(record)

        df = pd.DataFrame(records)

        # Generate outcomes
        event_outcomes = {}
        for event_id in df['event_id'].unique():
            event_data = df[df['event_id'] == event_id].iloc[-1]
            logit = (
                0.03 * event_data['primary_indicator']
                + 0.2 * event_data['secondary_indicator']
                + 0.4 * event_data['sentiment_positive']
                - 0.3 * event_data['sentiment_negative']
                + 0.5 * event_data['expert_consensus']
                + 0.3 * event_data['market_price']
                - 0.1 * event_data['price_volatility_30d']
                + 0.01 * event_data['primary_indicator'] * event_data['secondary_indicator'] / 10
                - 3.0
            )
            prob = 1 / (1 + np.exp(-logit))
            event_outcomes[event_id] = np.random.binomial(1, prob)

        df['outcome'] = df['event_id'].map(event_outcomes)

        # Sort by snapshot date
        df = df.sort_values('snapshot_date').reset_index(drop=True)

        # Save
        df.to_parquet(f'{self.data_dir}/raw_market_data.parquet', index=False)
        print(f"Collected {len(df)} records for {n_events} events")
        print(f"Outcome distribution: {df.groupby('event_id')['outcome'].first().value_counts().to_dict()}")

        return df

# Collect data
collector = DataCollector()
raw_data = collector.collect_market_data()
print(f"\nDataset shape: {raw_data.shape}")
print(f"Date range: {raw_data['snapshot_date'].min()} to {raw_data['snapshot_date'].max()}")

Phase 2: Feature Engineering Pipeline

class FeatureEngineer:
    """Transforms raw data into ML-ready features."""

    def __init__(self):
        self.feature_names = None

    def build_features(self, df):
        """Generate all features from raw data."""
        feat = df.copy()

        # === Time-based features ===
        feat['days_to_resolution_log'] = np.log1p(feat['days_to_resolution'])
        feat['time_pressure'] = 1.0 / (1.0 + feat['days_to_resolution'])
        feat['is_last_week'] = (feat['days_to_resolution'] <= 7).astype(int)
        feat['is_last_month'] = (feat['days_to_resolution'] <= 30).astype(int)

        # === Market microstructure features ===
        feat['spread_pct'] = feat['bid_ask_spread'] / (feat['market_price'] + 1e-6)
        feat['volume_ratio'] = feat['volume_24h'] / (feat['volume_7d'] / 7 + 1e-6)
        feat['volume_log'] = np.log1p(feat['volume_24h'])
        feat['volume_7d_log'] = np.log1p(feat['volume_7d'])

        # === Price momentum features ===
        feat['momentum_7d'] = feat['price_change_7d'] / (feat['price_volatility_7d'] + 1e-6)
        feat['vol_regime'] = (feat['price_volatility_30d'] > feat['price_volatility_30d'].median()).astype(int)
        feat['price_distance_from_half'] = abs(feat['market_price'] - 0.5)

        # === Sentiment features ===
        feat['sentiment_net'] = feat['sentiment_positive'] - feat['sentiment_negative']
        feat['sentiment_ratio'] = feat['sentiment_positive'] / (
            feat['sentiment_positive'] + feat['sentiment_negative'] + 1e-6
        )
        feat['sentiment_total'] = feat['sentiment_positive'] + feat['sentiment_negative']

        # === Interaction features ===
        feat['indicator_x_sentiment'] = feat['primary_indicator'] * feat['sentiment_net']
        feat['consensus_x_market'] = feat['expert_consensus'] * feat['market_price']
        feat['consensus_market_diff'] = feat['expert_consensus'] - feat['market_price']
        feat['volume_x_volatility'] = feat['volume_log'] * feat['price_volatility_7d']
        feat['time_x_volatility'] = feat['time_pressure'] * feat['price_volatility_7d']

        # === Social/news features ===
        feat['news_volume_log'] = np.log1p(feat['news_volume'])
        feat['social_log'] = np.log1p(feat['social_media_mentions'])
        feat['news_per_social'] = feat['news_volume'] / (feat['social_media_mentions'] + 1e-6)

        # === Derived indicator features ===
        feat['indicator_normalized'] = (
            (feat['primary_indicator'] - feat['primary_indicator'].mean()) /
            (feat['primary_indicator'].std() + 1e-6)
        )
        feat['indicator_squared'] = feat['primary_indicator'] ** 2

        # === Event type encoding ===
        for etype in ['politics', 'economics', 'sports']:
            feat[f'is_{etype}'] = (feat['event_type'] == etype).astype(int)

        # Define final feature columns
        exclude_cols = [
            'event_id', 'snapshot_date', 'resolution_date',
            'event_type', 'outcome'
        ]
        self.feature_names = [c for c in feat.columns if c not in exclude_cols]

        return feat

    def get_feature_matrix(self, df):
        """Return feature matrix and target."""
        feat = self.build_features(df)
        X = feat[self.feature_names].values
        y = feat['outcome'].values
        return X, y, self.feature_names, feat

# Build features
engineer = FeatureEngineer()
X, y, feature_names, feat_data = engineer.get_feature_matrix(raw_data)
print(f"Feature matrix shape: {X.shape}")
print(f"Number of features: {len(feature_names)}")
print(f"\nFeature names:")
for i, name in enumerate(feature_names):
    print(f"  {i+1:2d}. {name}")

Phase 3: Model Training and Selection

import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import brier_score_loss, log_loss
from sklearn.calibration import calibration_curve
import matplotlib.pyplot as plt

class ModelTrainer:
    """Trains and evaluates multiple model types."""

    def __init__(self, feature_names):
        self.feature_names = feature_names
        self.models = {}
        self.scalers = {}

    def temporal_split(self, feat_data, train_frac=0.6, val_frac=0.2):
        """Split data temporally."""
        n = len(feat_data)
        n_train = int(n * train_frac)
        n_val = int(n * val_frac)

        train = feat_data.iloc[:n_train]
        val = feat_data.iloc[n_train:n_train + n_val]
        test = feat_data.iloc[n_train + n_val:]

        X_train = train[self.feature_names].values
        y_train = train['outcome'].values
        X_val = val[self.feature_names].values
        y_val = val['outcome'].values
        X_test = test[self.feature_names].values
        y_test = test['outcome'].values

        # Handle NaN and Inf
        for arr in [X_train, X_val, X_test]:
            arr[np.isnan(arr)] = 0
            arr[np.isinf(arr)] = 0

        return (X_train, y_train), (X_val, y_val), (X_test, y_test)

    def train_logistic(self, X_train, y_train):
        """Train logistic regression baseline."""
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X_train)
        model = LogisticRegression(
            C=1.0, max_iter=1000, solver='lbfgs', random_state=42
        )
        model.fit(X_scaled, y_train)
        self.models['logistic'] = model
        self.scalers['logistic'] = scaler
        return model

    def train_random_forest(self, X_train, y_train):
        """Train random forest."""
        model = RandomForestClassifier(
            n_estimators=300,
            max_depth=8,
            min_samples_leaf=20,
            max_features='sqrt',
            random_state=42,
            n_jobs=-1
        )
        model.fit(X_train, y_train)
        self.models['rf'] = model
        return model

    def train_xgboost(self, X_train, y_train, X_val, y_val):
        """Train XGBoost with early stopping."""
        dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=self.feature_names)
        dval = xgb.DMatrix(X_val, label=y_val, feature_names=self.feature_names)

        params = {
            'objective': 'binary:logistic',
            'eval_metric': 'logloss',
            'max_depth': 6,
            'learning_rate': 0.05,
            'subsample': 0.8,
            'colsample_bytree': 0.7,
            'min_child_weight': 15,
            'reg_alpha': 0.1,
            'reg_lambda': 1.5,
            'gamma': 0.2,
            'seed': 42,
        }

        model = xgb.train(
            params, dtrain,
            num_boost_round=1000,
            evals=[(dtrain, 'train'), (dval, 'val')],
            early_stopping_rounds=50,
            verbose_eval=False
        )
        self.models['xgboost'] = model
        return model

    def predict(self, model_name, X):
        """Generate predictions from a named model."""
        if model_name == 'logistic':
            X_scaled = self.scalers['logistic'].transform(X)
            return self.models['logistic'].predict_proba(X_scaled)[:, 1]
        elif model_name == 'rf':
            return self.models['rf'].predict_proba(X)[:, 1]
        elif model_name == 'xgboost':
            dmat = xgb.DMatrix(X, feature_names=self.feature_names)
            return self.models['xgboost'].predict(dmat)
        else:
            raise ValueError(f"Unknown model: {model_name}")

    def evaluate_all(self, X, y, dataset_name='Test'):
        """Evaluate all trained models."""
        results = {}
        for name in self.models:
            probs = self.predict(name, X)
            probs = np.clip(probs, 1e-7, 1 - 1e-7)
            results[name] = {
                'Brier Score': brier_score_loss(y, probs),
                'Log Loss': log_loss(y, probs),
            }
        results_df = pd.DataFrame(results).T
        print(f"\n{dataset_name} Set Results:")
        print(results_df.to_string())
        return results_df

# Train models
trainer = ModelTrainer(feature_names)
(X_train, y_train), (X_val, y_val), (X_test, y_test) = trainer.temporal_split(feat_data)

print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")

trainer.train_logistic(X_train, y_train)
trainer.train_random_forest(X_train, y_train)
trainer.train_xgboost(X_train, y_train, X_val, y_val)

# Evaluate
val_results = trainer.evaluate_all(X_val, y_val, 'Validation')
test_results = trainer.evaluate_all(X_test, y_test, 'Test')

Phase 4: Calibration

from sklearn.isotonic import IsotonicRegression

class CalibrationPipeline:
    """Calibrates model probability outputs."""

    def __init__(self):
        self.calibrators = {}

    def fit(self, model_name, probs_val, y_val, method='platt'):
        """Fit calibration model on validation predictions."""
        if method == 'platt':
            cal = LogisticRegression(C=1e10, solver='lbfgs')
            cal.fit(probs_val.reshape(-1, 1), y_val)
        elif method == 'isotonic':
            cal = IsotonicRegression(out_of_bounds='clip')
            cal.fit(probs_val, y_val)
        else:
            raise ValueError(f"Unknown method: {method}")

        key = f"{model_name}_{method}"
        self.calibrators[key] = (cal, method)
        return cal

    def transform(self, model_name, probs, method='platt'):
        """Apply calibration to new predictions."""
        key = f"{model_name}_{method}"
        cal, cal_method = self.calibrators[key]

        if cal_method == 'platt':
            return cal.predict_proba(probs.reshape(-1, 1))[:, 1]
        else:
            return cal.predict(probs)

    def fit_and_evaluate(self, trainer, X_val, y_val, X_test, y_test):
        """Fit calibration for all models and evaluate."""
        results = []

        for model_name in trainer.models:
            probs_val = trainer.predict(model_name, X_val)
            probs_test = trainer.predict(model_name, X_test)

            # Raw performance
            results.append({
                'Model': model_name,
                'Calibration': 'None',
                'Brier': brier_score_loss(y_test, probs_test),
                'LogLoss': log_loss(y_test, np.clip(probs_test, 1e-7, 1-1e-7)),
            })

            # Platt scaling
            self.fit(model_name, probs_val, y_val, 'platt')
            probs_platt = self.transform(model_name, probs_test, 'platt')
            results.append({
                'Model': model_name,
                'Calibration': 'Platt',
                'Brier': brier_score_loss(y_test, probs_platt),
                'LogLoss': log_loss(y_test, np.clip(probs_platt, 1e-7, 1-1e-7)),
            })

            # Isotonic regression
            self.fit(model_name, probs_val, y_val, 'isotonic')
            probs_iso = self.transform(model_name, probs_test, 'isotonic')
            results.append({
                'Model': model_name,
                'Calibration': 'Isotonic',
                'Brier': brier_score_loss(y_test, probs_iso),
                'LogLoss': log_loss(y_test, np.clip(probs_iso, 1e-7, 1-1e-7)),
            })

        results_df = pd.DataFrame(results)
        print("\nCalibration Comparison (Test Set):")
        print(results_df.to_string(index=False))
        return results_df

# Run calibration
calibrator = CalibrationPipeline()
cal_results = calibrator.fit_and_evaluate(trainer, X_val, y_val, X_test, y_test)

# Select best model + calibration combination
best_row = cal_results.loc[cal_results['Brier'].idxmin()]
print(f"\nBest combination: {best_row['Model']} + {best_row['Calibration']}")
print(f"  Brier Score: {best_row['Brier']:.4f}")
print(f"  Log Loss: {best_row['LogLoss']:.4f}")

Phase 5: SHAP Interpretability Analysis

import shap

class InterpretabilityAnalyzer:
    """Analyze model predictions using SHAP."""

    def __init__(self, model, X_data, feature_names):
        self.model = model
        self.feature_names = feature_names
        self.explainer = shap.TreeExplainer(model)
        self.shap_values = self.explainer.shap_values(
            xgb.DMatrix(X_data, feature_names=feature_names)
        )
        self.X_data = X_data

    def global_importance(self, top_n=15):
        """Compute and display global feature importance."""
        mean_abs = np.abs(self.shap_values).mean(axis=0)
        importance = pd.Series(
            mean_abs, index=self.feature_names
        ).sort_values(ascending=False)

        print(f"\nTop {top_n} Features by Mean |SHAP|:")
        for i, (feat, val) in enumerate(importance.head(top_n).items()):
            print(f"  {i+1:2d}. {feat:30s} {val:.4f}")

        return importance

    def explain_prediction(self, idx):
        """Explain a single prediction."""
        base_value = self.explainer.expected_value
        prediction = base_value + self.shap_values[idx].sum()

        print(f"\nPrediction Explanation (index={idx}):")
        print(f"  Base value (average): {base_value:.4f}")
        print(f"  Model output (logit): {prediction:.4f}")
        print(f"  Predicted probability: {1/(1+np.exp(-prediction)):.4f}")
        print(f"  Top contributing features:")

        contributions = pd.Series(
            self.shap_values[idx], index=self.feature_names
        )
        top_pos = contributions.nlargest(5)
        top_neg = contributions.nsmallest(5)

        print(f"    Pushing UP:")
        for feat, val in top_pos.items():
            if val > 0.001:
                feat_val = self.X_data[idx][self.feature_names.index(feat)]
                print(f"      {feat:30s} SHAP={val:+.4f}  (value={feat_val:.3f})")

        print(f"    Pushing DOWN:")
        for feat, val in top_neg.items():
            if val < -0.001:
                feat_val = self.X_data[idx][self.feature_names.index(feat)]
                print(f"      {feat:30s} SHAP={val:+.4f}  (value={feat_val:.3f})")

    def plot_summary(self, save_path='shap_summary.png'):
        """Create and save SHAP summary plot."""
        X_df = pd.DataFrame(self.X_data, columns=self.feature_names)
        shap.summary_plot(self.shap_values, X_df, show=False, max_display=15)
        plt.tight_layout()
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        plt.close()
        print(f"Summary plot saved to {save_path}")

# Run SHAP analysis on XGBoost (assumed best model)
analyzer = InterpretabilityAnalyzer(
    trainer.models['xgboost'], X_test, feature_names
)
importance = analyzer.global_importance()
analyzer.explain_prediction(0)
analyzer.explain_prediction(len(X_test) - 1)
analyzer.plot_summary()

Phase 6: Deployment System

import joblib
from datetime import datetime

class DeployedModel:
    """Production-ready prediction model."""

    def __init__(self, model_dir='deployed_model'):
        self.model_dir = model_dir
        os.makedirs(model_dir, exist_ok=True)
        self.model = None
        self.calibrator_model = None
        self.feature_engineer = None
        self.metadata = None

    def save(self, xgb_model, calibrator, feature_engineer, feature_names,
             val_metrics, test_metrics):
        """Save all components for deployment."""
        # Save XGBoost model
        xgb_model.save_model(f'{self.model_dir}/model.json')

        # Save calibrator
        joblib.dump(calibrator, f'{self.model_dir}/calibrator.joblib')

        # Save metadata
        self.metadata = {
            'model_type': 'xgboost',
            'calibration_method': 'platt',
            'feature_names': feature_names,
            'n_features': len(feature_names),
            'deployment_date': datetime.now().isoformat(),
            'validation_brier': float(val_metrics.get('Brier Score', 0)),
            'test_brier': float(test_metrics.get('Brier Score', 0)),
            'validation_logloss': float(val_metrics.get('Log Loss', 0)),
            'test_logloss': float(test_metrics.get('Log Loss', 0)),
            'version': '1.0.0',
        }

        with open(f'{self.model_dir}/metadata.json', 'w') as f:
            json.dump(self.metadata, f, indent=2)

        print(f"Model saved to {self.model_dir}/")
        print(f"  Model file: model.json")
        print(f"  Calibrator: calibrator.joblib")
        print(f"  Metadata: metadata.json")

    def load(self):
        """Load deployed model."""
        self.model = xgb.Booster()
        self.model.load_model(f'{self.model_dir}/model.json')
        self.calibrator_model = joblib.load(f'{self.model_dir}/calibrator.joblib')

        with open(f'{self.model_dir}/metadata.json') as f:
            self.metadata = json.load(f)

        self.feature_names = self.metadata['feature_names']
        print(f"Loaded model v{self.metadata['version']}")

    def predict(self, features_array):
        """Generate calibrated probability prediction."""
        if self.model is None:
            self.load()

        # Handle NaN/Inf
        features_clean = np.nan_to_num(features_array, nan=0.0, posinf=0.0, neginf=0.0)

        # Raw prediction
        if features_clean.ndim == 1:
            features_clean = features_clean.reshape(1, -1)

        dmat = xgb.DMatrix(features_clean, feature_names=self.feature_names)
        raw_probs = self.model.predict(dmat)

        # Calibrate
        calibrated_probs = self.calibrator_model.predict_proba(
            raw_probs.reshape(-1, 1)
        )[:, 1]

        return {
            'calibrated_probability': calibrated_probs.tolist(),
            'raw_probability': raw_probs.tolist(),
            'model_version': self.metadata['version'],
            'timestamp': datetime.now().isoformat(),
        }

    def find_opportunities(self, features_array, market_prices, threshold=0.05):
        """Find trading opportunities where model disagrees with market."""
        result = self.predict(features_array)
        model_probs = np.array(result['calibrated_probability'])

        opportunities = []
        for i in range(len(model_probs)):
            edge = model_probs[i] - market_prices[i]
            if abs(edge) > threshold:
                opportunities.append({
                    'index': i,
                    'model_prob': float(model_probs[i]),
                    'market_price': float(market_prices[i]),
                    'edge': float(edge),
                    'direction': 'BUY' if edge > 0 else 'SELL',
                    'confidence': abs(edge),
                })

        opportunities.sort(key=lambda x: x['confidence'], reverse=True)
        return opportunities

# Deploy the model
deployed = DeployedModel()

# Get validation metrics for the best model
xgb_probs_val = trainer.predict('xgboost', X_val)
val_metrics = {
    'Brier Score': brier_score_loss(y_val, xgb_probs_val),
    'Log Loss': log_loss(y_val, np.clip(xgb_probs_val, 1e-7, 1-1e-7)),
}
xgb_probs_test = trainer.predict('xgboost', X_test)
test_metrics = {
    'Brier Score': brier_score_loss(y_test, xgb_probs_test),
    'Log Loss': log_loss(y_test, np.clip(xgb_probs_test, 1e-7, 1-1e-7)),
}

# Save Platt calibrator
platt_cal = calibrator.calibrators['xgboost_platt'][0]

deployed.save(
    trainer.models['xgboost'],
    platt_cal,
    engineer,
    feature_names,
    val_metrics,
    test_metrics
)

# Test prediction
print("\n--- Test Prediction ---")
test_prediction = deployed.predict(X_test[0])
print(f"Raw probability: {test_prediction['raw_probability'][0]:.4f}")
print(f"Calibrated probability: {test_prediction['calibrated_probability'][0]:.4f}")

# Find opportunities
market_prices_sim = np.clip(xgb_probs_test + np.random.normal(0, 0.08, len(xgb_probs_test)), 0.01, 0.99)
opportunities = deployed.find_opportunities(X_test, market_prices_sim, threshold=0.06)
print(f"\nFound {len(opportunities)} trading opportunities (threshold=6%)")
if opportunities:
    print("\nTop 5 opportunities:")
    for opp in opportunities[:5]:
        print(f"  {opp['direction']:4s}  Edge={opp['edge']:+.3f}  "
              f"Model={opp['model_prob']:.3f}  Market={opp['market_price']:.3f}")

Phase 7: Monitoring System

class ProductionMonitor:
    """Monitors deployed model performance in production."""

    def __init__(self, model_metadata, alert_threshold=0.03):
        self.metadata = model_metadata
        self.alert_threshold = alert_threshold
        self.prediction_log = []
        self.performance_history = []
        self.alerts = []

    def log_prediction(self, event_id, features, model_prob, market_price):
        """Log a prediction for later evaluation."""
        self.prediction_log.append({
            'timestamp': datetime.now().isoformat(),
            'event_id': event_id,
            'model_prob': model_prob,
            'market_price': market_price,
            'outcome': None,  # filled in later
        })

    def record_outcome(self, event_id, outcome):
        """Record the actual outcome for a previously predicted event."""
        for entry in self.prediction_log:
            if entry['event_id'] == event_id and entry['outcome'] is None:
                entry['outcome'] = outcome

    def compute_rolling_metrics(self, window=100):
        """Compute rolling performance metrics."""
        resolved = [e for e in self.prediction_log if e['outcome'] is not None]
        if len(resolved) < window:
            return None

        recent = resolved[-window:]
        probs = np.array([e['model_prob'] for e in recent])
        outcomes = np.array([e['outcome'] for e in recent])
        market_probs = np.array([e['market_price'] for e in recent])

        model_brier = np.mean((outcomes - probs) ** 2)
        market_brier = np.mean((outcomes - market_probs) ** 2)

        metrics = {
            'timestamp': datetime.now().isoformat(),
            'window': window,
            'n_resolved': len(resolved),
            'model_brier': float(model_brier),
            'market_brier': float(market_brier),
            'model_vs_market': float(model_brier - market_brier),
            'model_logloss': float(log_loss(outcomes, np.clip(probs, 1e-7, 1-1e-7))),
        }

        self.performance_history.append(metrics)
        return metrics

    def check_calibration(self, n_bins=5):
        """Check if model is still well-calibrated."""
        resolved = [e for e in self.prediction_log if e['outcome'] is not None]
        if len(resolved) < 100:
            return "Insufficient data"

        probs = np.array([e['model_prob'] for e in resolved])
        outcomes = np.array([e['outcome'] for e in resolved])

        bins = np.linspace(0, 1, n_bins + 1)
        report = []
        total_ece = 0.0

        for i in range(n_bins):
            mask = (probs >= bins[i]) & (probs < bins[i+1])
            if mask.sum() > 0:
                expected = probs[mask].mean()
                observed = outcomes[mask].mean()
                gap = abs(expected - observed)
                total_ece += mask.sum() / len(probs) * gap
                report.append({
                    'bin': f'{bins[i]:.2f}-{bins[i+1]:.2f}',
                    'count': int(mask.sum()),
                    'expected': f'{expected:.3f}',
                    'observed': f'{observed:.3f}',
                    'gap': f'{gap:.3f}',
                })

        return {
            'ECE': float(total_ece),
            'bins': report,
            'needs_recalibration': total_ece > 0.05,
        }

    def check_for_drift(self):
        """Detect performance degradation."""
        if len(self.performance_history) < 2:
            return {'drift_detected': False, 'message': 'Insufficient history'}

        baseline_brier = self.metadata.get('test_brier', 0.25)
        recent_brier = self.performance_history[-1]['model_brier']
        degradation = recent_brier - baseline_brier

        alert = {
            'drift_detected': degradation > self.alert_threshold,
            'baseline_brier': baseline_brier,
            'current_brier': recent_brier,
            'degradation': degradation,
            'message': (
                f"ALERT: Brier degraded by {degradation:.4f}"
                if degradation > self.alert_threshold
                else "Performance within acceptable range"
            ),
        }

        if alert['drift_detected']:
            self.alerts.append({
                'timestamp': datetime.now().isoformat(),
                'type': 'performance_degradation',
                'details': alert,
            })

        return alert

    def generate_report(self):
        """Generate a monitoring report."""
        resolved = [e for e in self.prediction_log if e['outcome'] is not None]

        report = {
            'model_version': self.metadata.get('version', 'unknown'),
            'total_predictions': len(self.prediction_log),
            'resolved_predictions': len(resolved),
            'pending_predictions': len(self.prediction_log) - len(resolved),
            'alerts': len(self.alerts),
        }

        if len(resolved) > 0:
            probs = np.array([e['model_prob'] for e in resolved])
            outcomes = np.array([e['outcome'] for e in resolved])
            report['overall_brier'] = float(np.mean((outcomes - probs) ** 2))
            report['overall_logloss'] = float(
                log_loss(outcomes, np.clip(probs, 1e-7, 1-1e-7))
            )

        calibration = self.check_calibration()
        if isinstance(calibration, dict):
            report['calibration_ece'] = calibration['ECE']
            report['needs_recalibration'] = calibration['needs_recalibration']

        drift = self.check_for_drift()
        report['drift_detected'] = drift['drift_detected']

        return report

# Simulate monitoring
monitor = ProductionMonitor(deployed.metadata, alert_threshold=0.03)

# Simulate 500 predictions and outcomes
np.random.seed(123)
for i in range(500):
    event_id = f"event_{i}"
    idx = i % len(X_test)
    pred = deployed.predict(X_test[idx:idx+1])
    model_prob = pred['calibrated_probability'][0]
    market_price = np.clip(model_prob + np.random.normal(0, 0.06), 0.01, 0.99)

    monitor.log_prediction(event_id, X_test[idx], model_prob, market_price)

    # Simulate outcome
    true_prob = model_prob + np.random.normal(0, 0.1)
    outcome = int(np.random.random() < np.clip(true_prob, 0, 1))
    monitor.record_outcome(event_id, outcome)

    # Check metrics periodically
    if (i + 1) % 100 == 0:
        metrics = monitor.compute_rolling_metrics(window=100)
        if metrics:
            print(f"After {i+1} predictions: "
                  f"Model Brier={metrics['model_brier']:.4f}, "
                  f"Market Brier={metrics['market_brier']:.4f}, "
                  f"Difference={metrics['model_vs_market']:+.4f}")

# Generate final report
report = monitor.generate_report()
print("\n" + "=" * 60)
print("MONITORING REPORT")
print("=" * 60)
for key, value in report.items():
    print(f"  {key}: {value}")

# Check calibration detail
cal_check = monitor.check_calibration()
if isinstance(cal_check, dict):
    print(f"\nCalibration Check (ECE={cal_check['ECE']:.4f}):")
    for bin_info in cal_check['bins']:
        print(f"  Bin {bin_info['bin']}: "
              f"Count={bin_info['count']:4s}, "
              f"Expected={bin_info['expected']}, "
              f"Observed={bin_info['observed']}, "
              f"Gap={bin_info['gap']}")

Phase 8: Retraining Pipeline

class RetrainingPipeline:
    """Automated retraining when performance degrades."""

    def __init__(self, model_trainer, calibrator, deployed_model, monitor):
        self.trainer = model_trainer
        self.calibrator = calibrator
        self.deployed = deployed_model
        self.monitor = monitor

    def should_retrain(self):
        """Determine if retraining is needed."""
        drift_check = self.monitor.check_for_drift()
        cal_check = self.monitor.check_calibration()

        reasons = []
        if drift_check.get('drift_detected', False):
            reasons.append('Performance degradation detected')
        if isinstance(cal_check, dict) and cal_check.get('needs_recalibration', False):
            reasons.append('Calibration drift detected')

        return len(reasons) > 0, reasons

    def retrain(self, new_data_feat, feature_names):
        """Retrain models on new data."""
        print("\n" + "=" * 50)
        print("RETRAINING PIPELINE")
        print("=" * 50)

        # Split new data
        (X_tr, y_tr), (X_v, y_v), (X_te, y_te) = self.trainer.temporal_split(new_data_feat)

        # Retrain all models
        self.trainer.train_logistic(X_tr, y_tr)
        self.trainer.train_random_forest(X_tr, y_tr)
        self.trainer.train_xgboost(X_tr, y_tr, X_v, y_v)

        # Evaluate
        print("\nRetrained model performance:")
        self.trainer.evaluate_all(X_te, y_te, 'Test')

        # Recalibrate
        cal_results = self.calibrator.fit_and_evaluate(
            self.trainer, X_v, y_v, X_te, y_te
        )

        # Select best
        best_idx = cal_results['Brier'].idxmin()
        best = cal_results.loc[best_idx]
        print(f"\nBest: {best['Model']} + {best['Calibration']} (Brier={best['Brier']:.4f})")

        # Compare with current production model
        current_brier = self.deployed.metadata.get('test_brier', 1.0)
        improvement = current_brier - best['Brier']
        print(f"Current production Brier: {current_brier:.4f}")
        print(f"New model Brier: {best['Brier']:.4f}")
        print(f"Improvement: {improvement:+.4f}")

        if improvement > 0.005:
            print("RECOMMENDATION: Deploy new model")
            return True
        else:
            print("RECOMMENDATION: Keep current model (improvement too small)")
            return False

# Demonstrate retraining check
pipeline = RetrainingPipeline(trainer, calibrator, deployed, monitor)
needs_retrain, reasons = pipeline.should_retrain()
print(f"\nNeeds retraining: {needs_retrain}")
if reasons:
    print(f"Reasons: {', '.join(reasons)}")

Key Takeaways

  1. End-to-end pipeline design matters. Each phase (data collection, feature engineering, model training, calibration, deployment, monitoring) must be designed to work together. A strong model with poor calibration or monitoring will underperform in production.

  2. Feature engineering is the highest-leverage activity. The raw features (market price, volume, sentiment) are just the starting point. Thoughtful engineering (momentum, interactions, time-based features) can substantially improve performance.

  3. Model selection should be systematic. Train multiple model types, calibrate each, and compare with proper metrics. The best model on raw accuracy may not be the best after calibration.

  4. Monitoring is not optional. Without monitoring, you will not know when your model's edge has disappeared. The monitoring system should track calibration, Brier scores, and feature distribution drift.

  5. Retraining should be triggered by evidence, not by schedule alone. While periodic retraining is a reasonable baseline, drift detection enables faster response to genuine distributional shifts.

  6. The deployment interface should be clean and defensive. Input validation, NaN handling, and metadata logging are not luxuries — they are requirements for production ML systems.