Case Study 1: Building an End-to-End ML Pipeline for Market Prediction

Scenario

DataEdge Capital is a small quantitative trading firm that has been manually trading prediction markets for two years. Their lead data scientist, Priya, has built several notebooks with promising models, but the firm faces recurring problems:

  • Priya's notebook models take 30 minutes to retrain manually, and she sometimes forgets to retrain before major events.
  • Another analyst, Marcus, built his own feature engineering code that computes poll averages differently, leading to inconsistent predictions.
  • Last month, a market resolved before the model was updated, resulting in a significant loss. The model had been serving stale predictions for three days because no one noticed the data pipeline had broken.
  • The firm has no record of which model version produced the predictions that led to specific trades, making it impossible to diagnose what went wrong.

The CEO has asked Priya to build a production ML pipeline that eliminates these problems. This case study walks through her complete solution.

Phase 1: Feature Store

Problem

Priya and Marcus each compute features independently. Marcus uses a 5-day rolling average of polls; Priya uses a 7-day rolling average. Neither can easily reproduce the other's features, and there is no guarantee that the features used during training match those used when making predictions.

Solution

Priya builds a centralized feature store using the PredictionMarketFeatureStore class from the chapter, backed by SQLite for simplicity.

import sqlite3
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import json
from typing import List, Dict, Optional


class DataEdgeFeatureStore:
    """Feature store for DataEdge Capital's prediction market system."""

    def __init__(self, db_path: str = "dataedge_features.db"):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self._initialize()

    def _initialize(self):
        self.conn.executescript("""
            CREATE TABLE IF NOT EXISTS features (
                entity_id TEXT NOT NULL,
                feature_name TEXT NOT NULL,
                feature_value REAL,
                event_timestamp TEXT NOT NULL,
                ingestion_timestamp TEXT DEFAULT CURRENT_TIMESTAMP
            );
            CREATE INDEX IF NOT EXISTS idx_features
            ON features(entity_id, feature_name, event_timestamp);

            CREATE TABLE IF NOT EXISTS feature_registry (
                feature_name TEXT PRIMARY KEY,
                description TEXT,
                owner TEXT,
                computation_logic TEXT,
                created_at TEXT DEFAULT CURRENT_TIMESTAMP
            );
        """)
        self.conn.commit()

    def register_feature(self, name, description, owner,
                         computation_logic=""):
        self.conn.execute(
            """INSERT OR REPLACE INTO feature_registry
               VALUES (?, ?, ?, ?, ?)""",
            (name, description, owner, computation_logic,
             datetime.now().isoformat())
        )
        self.conn.commit()

    def ingest_batch(self, entity_id: str, features: Dict[str, float],
                     timestamp: datetime):
        for name, value in features.items():
            self.conn.execute(
                "INSERT INTO features VALUES (?, ?, ?, ?, ?)",
                (entity_id, name, value, timestamp.isoformat(),
                 datetime.now().isoformat())
            )
        self.conn.commit()

    def get_training_data(self, entity_ids: List[str],
                          feature_names: List[str],
                          timestamps: List[datetime]) -> pd.DataFrame:
        """Point-in-time correct feature retrieval."""
        rows = []
        for eid, ts in zip(entity_ids, timestamps):
            row = {"entity_id": eid, "timestamp": ts}
            for fname in feature_names:
                cur = self.conn.execute(
                    """SELECT feature_value FROM features
                       WHERE entity_id=? AND feature_name=?
                         AND event_timestamp<=?
                       ORDER BY event_timestamp DESC LIMIT 1""",
                    (eid, fname, ts.isoformat())
                )
                result = cur.fetchone()
                row[fname] = result[0] if result else np.nan
            rows.append(row)
        return pd.DataFrame(rows)

    def get_latest(self, entity_id: str,
                   feature_names: List[str]) -> Dict:
        features = {}
        for fname in feature_names:
            cur = self.conn.execute(
                """SELECT feature_value FROM features
                   WHERE entity_id=? AND feature_name=?
                   ORDER BY event_timestamp DESC LIMIT 1""",
                (entity_id, fname)
            )
            result = cur.fetchone()
            features[fname] = result[0] if result else None
        return features

    def close(self):
        self.conn.close()


# Register standardized features
store = DataEdgeFeatureStore()

store.register_feature(
    "poll_avg_7d",
    "7-day rolling average of polls (standardized computation)",
    "Priya",
    "df['polls'].rolling(7).mean()"
)
store.register_feature(
    "market_price", "Latest market price (0-1)", "System", ""
)
store.register_feature(
    "volume_24h", "24-hour trading volume in USD", "System", ""
)
store.register_feature(
    "poll_market_spread",
    "poll_avg_7d minus market_price",
    "Priya",
    "poll_avg_7d - market_price"
)
store.register_feature(
    "days_to_resolution",
    "Calendar days until market resolution",
    "System", ""
)
store.register_feature(
    "price_volatility_7d",
    "7-day rolling standard deviation of market price",
    "Marcus",
    "df['market_price'].rolling(7).std()"
)

Result

Both Priya and Marcus now use the same feature definitions. Features are computed once by the data pipeline and stored centrally. When training or serving, the same features are used, eliminating training-serving skew.

Phase 2: scikit-learn Pipeline

Problem

Priya's notebook has feature engineering code spread across 15 cells. Reproducing the exact preprocessing for serving is error-prone.

Solution

She encapsulates everything into a scikit-learn Pipeline.

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.base import BaseEstimator, TransformerMixin


class DataEdgeFeatureEngineer(BaseEstimator, TransformerMixin):
    """Custom feature engineering for DataEdge models."""

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        # Urgency feature: higher as resolution approaches
        X['urgency'] = 1.0 / (1.0 + X['days_to_resolution'].clip(lower=0))
        # Log-transformed volume
        X['log_volume'] = np.log1p(X['volume_24h'].clip(lower=0))
        # Spread-to-volatility ratio
        X['spread_vol_ratio'] = (
            X['poll_market_spread'] /
            (X['price_volatility_7d'] + 1e-8)
        )
        return X


FEATURE_NAMES = [
    'poll_avg_7d', 'market_price', 'volume_24h',
    'poll_market_spread', 'days_to_resolution',
    'price_volatility_7d'
]

ENGINEERED_FEATURES = [
    'poll_avg_7d', 'market_price', 'volume_24h',
    'poll_market_spread', 'days_to_resolution',
    'price_volatility_7d', 'urgency', 'log_volume',
    'spread_vol_ratio'
]


def build_dataedge_pipeline(n_estimators=300, learning_rate=0.05,
                             max_depth=4):
    """Build the standard DataEdge prediction pipeline."""
    pipeline = Pipeline([
        ('feature_engineer', DataEdgeFeatureEngineer()),
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
        ('classifier', CalibratedClassifierCV(
            GradientBoostingClassifier(
                n_estimators=n_estimators,
                learning_rate=learning_rate,
                max_depth=max_depth,
                subsample=0.8,
                random_state=42
            ),
            cv=5,
            method='isotonic'
        ))
    ])
    return pipeline

Phase 3: MLflow Experiment Tracking

Problem

The team has no record of which model configuration produced the best results, and cannot trace back from a trade to the model that made the prediction.

Solution

Every training run is logged to MLflow.

import mlflow
import mlflow.sklearn
from sklearn.metrics import brier_score_loss, log_loss, roc_auc_score


def train_with_tracking(pipeline, X_train, y_train, X_val, y_val,
                        params: dict, experiment_name: str):
    """Train and log to MLflow."""
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run() as run:
        # Log all parameters
        mlflow.log_params(params)
        mlflow.set_tag("pipeline_version", "v2.1")
        mlflow.set_tag("team", "dataedge")

        # Train
        pipeline.fit(X_train, y_train)

        # Evaluate
        y_proba = pipeline.predict_proba(X_val)[:, 1]
        brier = brier_score_loss(y_val, y_proba)
        ll = log_loss(y_val, y_proba)
        auc = roc_auc_score(y_val, y_proba)

        mlflow.log_metrics({
            "brier_score": brier,
            "log_loss": ll,
            "auc_roc": auc,
            "n_train": len(X_train),
            "n_val": len(X_val),
        })

        # Register model
        mlflow.sklearn.log_model(
            pipeline,
            artifact_path="model",
            registered_model_name="dataedge_election_model"
        )

        print(f"Run {run.info.run_id[:8]}: "
              f"Brier={brier:.4f}, AUC={auc:.4f}")
        return run.info.run_id, brier


# Example: Train with different hyperparameters
experiments = [
    {"n_estimators": 200, "learning_rate": 0.05, "max_depth": 3},
    {"n_estimators": 300, "learning_rate": 0.05, "max_depth": 4},
    {"n_estimators": 300, "learning_rate": 0.01, "max_depth": 5},
    {"n_estimators": 500, "learning_rate": 0.02, "max_depth": 4},
]

# Synthetic data for demonstration
np.random.seed(42)
n = 1000
X_data = pd.DataFrame({
    'poll_avg_7d': np.random.beta(5, 5, n),
    'market_price': np.random.beta(5, 5, n),
    'volume_24h': np.random.exponential(500000, n),
    'poll_market_spread': np.random.normal(0, 0.05, n),
    'days_to_resolution': np.random.randint(1, 365, n),
    'price_volatility_7d': np.random.exponential(0.02, n),
})
y_data = (X_data['poll_avg_7d'] + np.random.normal(0, 0.2, n) > 0.5
          ).astype(int)

X_tr, X_vl = X_data[:800], X_data[800:]
y_tr, y_vl = y_data[:800], y_data[800:]

best_brier = float('inf')
best_run_id = None

for params in experiments:
    pipeline = build_dataedge_pipeline(**params)
    run_id, brier = train_with_tracking(
        pipeline, X_tr, y_tr, X_vl, y_vl,
        params, "dataedge_election_v2"
    )
    if brier < best_brier:
        best_brier = brier
        best_run_id = run_id

print(f"\nBest run: {best_run_id[:8]} with Brier={best_brier:.4f}")

Phase 4: Model Serving with FastAPI

Problem

Predictions are generated by running a notebook cell, which requires Priya to be available.

Solution

A FastAPI server loads the production model and serves predictions on demand.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict
import joblib
import numpy as np
import pandas as pd
import time
from datetime import datetime

app = FastAPI(title="DataEdge Prediction Server")


class PredictionRequest(BaseModel):
    market_id: str
    features: Dict[str, float]


class PredictionResponse(BaseModel):
    market_id: str
    probability: float
    model_version: str
    timestamp: str
    latency_ms: float


# Global state
MODEL = None
MODEL_VERSION = "unknown"


@app.on_event("startup")
async def load_model():
    global MODEL, MODEL_VERSION
    try:
        import mlflow.pyfunc
        MODEL = mlflow.pyfunc.load_model(
            "models:/dataedge_election_model/Production"
        )
        MODEL_VERSION = "production"
    except Exception:
        MODEL = joblib.load("models/latest_pipeline.joblib")
        MODEL_VERSION = "local_latest"


@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    if MODEL is None:
        raise HTTPException(503, "No model loaded")

    start = time.time()
    X = pd.DataFrame([request.features])[FEATURE_NAMES]
    proba = float(MODEL.predict(X).squeeze())
    latency = (time.time() - start) * 1000

    return PredictionResponse(
        market_id=request.market_id,
        probability=np.clip(proba, 0, 1),
        model_version=MODEL_VERSION,
        timestamp=datetime.now().isoformat(),
        latency_ms=round(latency, 2),
    )


@app.get("/health")
async def health():
    return {
        "status": "healthy" if MODEL else "no_model",
        "model_version": MODEL_VERSION,
    }

Phase 5: Monitoring and Drift Detection

Problem

Last month, the data pipeline silently broke, and the model served stale predictions for three days. No one noticed until the losses appeared.

Solution

A monitoring system checks model health, feature freshness, and prediction drift.

from scipy import stats


class DataEdgeMonitor:
    """Monitoring system for DataEdge's prediction models."""

    def __init__(self, reference_predictions: np.ndarray,
                 reference_brier: float,
                 feature_store: DataEdgeFeatureStore):
        self.ref_preds = reference_predictions
        self.ref_brier = reference_brier
        self.feature_store = feature_store
        self.alerts = []

    def check_feature_freshness(self, feature_names: List[str],
                                max_age_hours: int = 24):
        """Alert if features are stale."""
        for fname in feature_names:
            cur = self.feature_store.conn.execute(
                """SELECT MAX(ingestion_timestamp) FROM features
                   WHERE feature_name=?""",
                (fname,)
            )
            result = cur.fetchone()
            if result[0] is None:
                self.alerts.append({
                    "type": "staleness",
                    "severity": "high",
                    "message": f"Feature '{fname}' has no data"
                })
                continue

            last_update = datetime.fromisoformat(result[0])
            age_hours = (
                datetime.now() - last_update
            ).total_seconds() / 3600

            if age_hours > max_age_hours:
                self.alerts.append({
                    "type": "staleness",
                    "severity": "high" if age_hours > 48 else "medium",
                    "message": (
                        f"Feature '{fname}' is {age_hours:.1f}h old "
                        f"(max: {max_age_hours}h)"
                    )
                })

    def check_prediction_drift(self, current_predictions: np.ndarray):
        """Check if prediction distribution has shifted."""
        ks_stat, ks_p = stats.ks_2samp(
            self.ref_preds, current_predictions
        )
        if ks_p < 0.01:
            self.alerts.append({
                "type": "prediction_drift",
                "severity": "high",
                "message": (
                    f"Prediction drift detected "
                    f"(KS stat={ks_stat:.4f}, p={ks_p:.6f})"
                )
            })

    def check_performance(self, predictions: np.ndarray,
                          actuals: np.ndarray):
        """Check if model performance has degraded."""
        current_brier = np.mean((predictions - actuals) ** 2)
        if current_brier > self.ref_brier * 1.5:
            self.alerts.append({
                "type": "concept_drift",
                "severity": "high",
                "message": (
                    f"Model performance degraded: "
                    f"Brier {current_brier:.4f} vs "
                    f"reference {self.ref_brier:.4f}"
                )
            })
            return True  # Trigger retraining
        return False

    def run_all_checks(self, current_predictions, actuals=None):
        """Run all monitoring checks."""
        self.alerts = []
        self.check_feature_freshness(FEATURE_NAMES)
        self.check_prediction_drift(current_predictions)
        if actuals is not None:
            self.check_performance(current_predictions, actuals)

        if self.alerts:
            print(f"\n{'='*60}")
            print(f"MONITORING ALERTS ({len(self.alerts)})")
            print(f"{'='*60}")
            for alert in self.alerts:
                icon = "!!!" if alert["severity"] == "high" else " ! "
                print(f"  [{icon}] {alert['type']}: {alert['message']}")
        else:
            print("All checks passed. System healthy.")

        return self.alerts

Results

After deploying the complete pipeline, DataEdge Capital saw the following improvements:

Metric Before After
Time to detect broken pipeline 3 days 15 minutes (alerting)
Model retraining Ad-hoc, manual Automated, daily
Feature consistency 2 different implementations 1 centralized store
Prediction traceability None Full audit trail
Average prediction latency 30 min (manual) 12 ms (API)
Monthly model-related losses ~$15,000 | ~$2,000

Key Lessons Learned

  1. Start simple: The SQLite feature store and joblib serialization were sufficient for DataEdge's scale. They did not need Redis, Kubernetes, or a distributed feature store.

  2. Monitor feature freshness first: The biggest risk was not model quality but stale data. Feature freshness monitoring caught problems before they affected predictions.

  3. Automate the easy parts first: Scheduled daily retraining was simple to implement and eliminated the "forgot to retrain" problem. Trigger-based retraining came later.

  4. Audit trails pay for themselves: When a trade went wrong, being able to trace back to the exact model version, features, and predictions made debugging fast instead of impossible.

  5. The pipeline is the product: Priya's individual models were good, but the pipeline --- the system that produces, validates, deploys, and monitors models --- was the real deliverable.