Case Study 1: Building an End-to-End ML Pipeline for Market Prediction
Scenario
DataEdge Capital is a small quantitative trading firm that has been manually trading prediction markets for two years. Their lead data scientist, Priya, has built several notebooks with promising models, but the firm faces recurring problems:
- Priya's notebook models take 30 minutes to retrain manually, and she sometimes forgets to retrain before major events.
- Another analyst, Marcus, built his own feature engineering code that computes poll averages differently, leading to inconsistent predictions.
- Last month, a market resolved before the model was updated, resulting in a significant loss. The model had been serving stale predictions for three days because no one noticed the data pipeline had broken.
- The firm has no record of which model version produced the predictions that led to specific trades, making it impossible to diagnose what went wrong.
The CEO has asked Priya to build a production ML pipeline that eliminates these problems. This case study walks through her complete solution.
Phase 1: Feature Store
Problem
Priya and Marcus each compute features independently. Marcus uses a 5-day rolling average of polls; Priya uses a 7-day rolling average. Neither can easily reproduce the other's features, and there is no guarantee that the features used during training match those used when making predictions.
Solution
Priya builds a centralized feature store using the PredictionMarketFeatureStore class from the chapter, backed by SQLite for simplicity.
import sqlite3
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import json
from typing import List, Dict, Optional
class DataEdgeFeatureStore:
"""Feature store for DataEdge Capital's prediction market system."""
def __init__(self, db_path: str = "dataedge_features.db"):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self._initialize()
def _initialize(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS features (
entity_id TEXT NOT NULL,
feature_name TEXT NOT NULL,
feature_value REAL,
event_timestamp TEXT NOT NULL,
ingestion_timestamp TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_features
ON features(entity_id, feature_name, event_timestamp);
CREATE TABLE IF NOT EXISTS feature_registry (
feature_name TEXT PRIMARY KEY,
description TEXT,
owner TEXT,
computation_logic TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""")
self.conn.commit()
def register_feature(self, name, description, owner,
computation_logic=""):
self.conn.execute(
"""INSERT OR REPLACE INTO feature_registry
VALUES (?, ?, ?, ?, ?)""",
(name, description, owner, computation_logic,
datetime.now().isoformat())
)
self.conn.commit()
def ingest_batch(self, entity_id: str, features: Dict[str, float],
timestamp: datetime):
for name, value in features.items():
self.conn.execute(
"INSERT INTO features VALUES (?, ?, ?, ?, ?)",
(entity_id, name, value, timestamp.isoformat(),
datetime.now().isoformat())
)
self.conn.commit()
def get_training_data(self, entity_ids: List[str],
feature_names: List[str],
timestamps: List[datetime]) -> pd.DataFrame:
"""Point-in-time correct feature retrieval."""
rows = []
for eid, ts in zip(entity_ids, timestamps):
row = {"entity_id": eid, "timestamp": ts}
for fname in feature_names:
cur = self.conn.execute(
"""SELECT feature_value FROM features
WHERE entity_id=? AND feature_name=?
AND event_timestamp<=?
ORDER BY event_timestamp DESC LIMIT 1""",
(eid, fname, ts.isoformat())
)
result = cur.fetchone()
row[fname] = result[0] if result else np.nan
rows.append(row)
return pd.DataFrame(rows)
def get_latest(self, entity_id: str,
feature_names: List[str]) -> Dict:
features = {}
for fname in feature_names:
cur = self.conn.execute(
"""SELECT feature_value FROM features
WHERE entity_id=? AND feature_name=?
ORDER BY event_timestamp DESC LIMIT 1""",
(entity_id, fname)
)
result = cur.fetchone()
features[fname] = result[0] if result else None
return features
def close(self):
self.conn.close()
# Register standardized features
store = DataEdgeFeatureStore()
store.register_feature(
"poll_avg_7d",
"7-day rolling average of polls (standardized computation)",
"Priya",
"df['polls'].rolling(7).mean()"
)
store.register_feature(
"market_price", "Latest market price (0-1)", "System", ""
)
store.register_feature(
"volume_24h", "24-hour trading volume in USD", "System", ""
)
store.register_feature(
"poll_market_spread",
"poll_avg_7d minus market_price",
"Priya",
"poll_avg_7d - market_price"
)
store.register_feature(
"days_to_resolution",
"Calendar days until market resolution",
"System", ""
)
store.register_feature(
"price_volatility_7d",
"7-day rolling standard deviation of market price",
"Marcus",
"df['market_price'].rolling(7).std()"
)
Result
Both Priya and Marcus now use the same feature definitions. Features are computed once by the data pipeline and stored centrally. When training or serving, the same features are used, eliminating training-serving skew.
Phase 2: scikit-learn Pipeline
Problem
Priya's notebook has feature engineering code spread across 15 cells. Reproducing the exact preprocessing for serving is error-prone.
Solution
She encapsulates everything into a scikit-learn Pipeline.
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.base import BaseEstimator, TransformerMixin
class DataEdgeFeatureEngineer(BaseEstimator, TransformerMixin):
"""Custom feature engineering for DataEdge models."""
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
# Urgency feature: higher as resolution approaches
X['urgency'] = 1.0 / (1.0 + X['days_to_resolution'].clip(lower=0))
# Log-transformed volume
X['log_volume'] = np.log1p(X['volume_24h'].clip(lower=0))
# Spread-to-volatility ratio
X['spread_vol_ratio'] = (
X['poll_market_spread'] /
(X['price_volatility_7d'] + 1e-8)
)
return X
FEATURE_NAMES = [
'poll_avg_7d', 'market_price', 'volume_24h',
'poll_market_spread', 'days_to_resolution',
'price_volatility_7d'
]
ENGINEERED_FEATURES = [
'poll_avg_7d', 'market_price', 'volume_24h',
'poll_market_spread', 'days_to_resolution',
'price_volatility_7d', 'urgency', 'log_volume',
'spread_vol_ratio'
]
def build_dataedge_pipeline(n_estimators=300, learning_rate=0.05,
max_depth=4):
"""Build the standard DataEdge prediction pipeline."""
pipeline = Pipeline([
('feature_engineer', DataEdgeFeatureEngineer()),
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('classifier', CalibratedClassifierCV(
GradientBoostingClassifier(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
subsample=0.8,
random_state=42
),
cv=5,
method='isotonic'
))
])
return pipeline
Phase 3: MLflow Experiment Tracking
Problem
The team has no record of which model configuration produced the best results, and cannot trace back from a trade to the model that made the prediction.
Solution
Every training run is logged to MLflow.
import mlflow
import mlflow.sklearn
from sklearn.metrics import brier_score_loss, log_loss, roc_auc_score
def train_with_tracking(pipeline, X_train, y_train, X_val, y_val,
params: dict, experiment_name: str):
"""Train and log to MLflow."""
mlflow.set_experiment(experiment_name)
with mlflow.start_run() as run:
# Log all parameters
mlflow.log_params(params)
mlflow.set_tag("pipeline_version", "v2.1")
mlflow.set_tag("team", "dataedge")
# Train
pipeline.fit(X_train, y_train)
# Evaluate
y_proba = pipeline.predict_proba(X_val)[:, 1]
brier = brier_score_loss(y_val, y_proba)
ll = log_loss(y_val, y_proba)
auc = roc_auc_score(y_val, y_proba)
mlflow.log_metrics({
"brier_score": brier,
"log_loss": ll,
"auc_roc": auc,
"n_train": len(X_train),
"n_val": len(X_val),
})
# Register model
mlflow.sklearn.log_model(
pipeline,
artifact_path="model",
registered_model_name="dataedge_election_model"
)
print(f"Run {run.info.run_id[:8]}: "
f"Brier={brier:.4f}, AUC={auc:.4f}")
return run.info.run_id, brier
# Example: Train with different hyperparameters
experiments = [
{"n_estimators": 200, "learning_rate": 0.05, "max_depth": 3},
{"n_estimators": 300, "learning_rate": 0.05, "max_depth": 4},
{"n_estimators": 300, "learning_rate": 0.01, "max_depth": 5},
{"n_estimators": 500, "learning_rate": 0.02, "max_depth": 4},
]
# Synthetic data for demonstration
np.random.seed(42)
n = 1000
X_data = pd.DataFrame({
'poll_avg_7d': np.random.beta(5, 5, n),
'market_price': np.random.beta(5, 5, n),
'volume_24h': np.random.exponential(500000, n),
'poll_market_spread': np.random.normal(0, 0.05, n),
'days_to_resolution': np.random.randint(1, 365, n),
'price_volatility_7d': np.random.exponential(0.02, n),
})
y_data = (X_data['poll_avg_7d'] + np.random.normal(0, 0.2, n) > 0.5
).astype(int)
X_tr, X_vl = X_data[:800], X_data[800:]
y_tr, y_vl = y_data[:800], y_data[800:]
best_brier = float('inf')
best_run_id = None
for params in experiments:
pipeline = build_dataedge_pipeline(**params)
run_id, brier = train_with_tracking(
pipeline, X_tr, y_tr, X_vl, y_vl,
params, "dataedge_election_v2"
)
if brier < best_brier:
best_brier = brier
best_run_id = run_id
print(f"\nBest run: {best_run_id[:8]} with Brier={best_brier:.4f}")
Phase 4: Model Serving with FastAPI
Problem
Predictions are generated by running a notebook cell, which requires Priya to be available.
Solution
A FastAPI server loads the production model and serves predictions on demand.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict
import joblib
import numpy as np
import pandas as pd
import time
from datetime import datetime
app = FastAPI(title="DataEdge Prediction Server")
class PredictionRequest(BaseModel):
market_id: str
features: Dict[str, float]
class PredictionResponse(BaseModel):
market_id: str
probability: float
model_version: str
timestamp: str
latency_ms: float
# Global state
MODEL = None
MODEL_VERSION = "unknown"
@app.on_event("startup")
async def load_model():
global MODEL, MODEL_VERSION
try:
import mlflow.pyfunc
MODEL = mlflow.pyfunc.load_model(
"models:/dataedge_election_model/Production"
)
MODEL_VERSION = "production"
except Exception:
MODEL = joblib.load("models/latest_pipeline.joblib")
MODEL_VERSION = "local_latest"
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
if MODEL is None:
raise HTTPException(503, "No model loaded")
start = time.time()
X = pd.DataFrame([request.features])[FEATURE_NAMES]
proba = float(MODEL.predict(X).squeeze())
latency = (time.time() - start) * 1000
return PredictionResponse(
market_id=request.market_id,
probability=np.clip(proba, 0, 1),
model_version=MODEL_VERSION,
timestamp=datetime.now().isoformat(),
latency_ms=round(latency, 2),
)
@app.get("/health")
async def health():
return {
"status": "healthy" if MODEL else "no_model",
"model_version": MODEL_VERSION,
}
Phase 5: Monitoring and Drift Detection
Problem
Last month, the data pipeline silently broke, and the model served stale predictions for three days. No one noticed until the losses appeared.
Solution
A monitoring system checks model health, feature freshness, and prediction drift.
from scipy import stats
class DataEdgeMonitor:
"""Monitoring system for DataEdge's prediction models."""
def __init__(self, reference_predictions: np.ndarray,
reference_brier: float,
feature_store: DataEdgeFeatureStore):
self.ref_preds = reference_predictions
self.ref_brier = reference_brier
self.feature_store = feature_store
self.alerts = []
def check_feature_freshness(self, feature_names: List[str],
max_age_hours: int = 24):
"""Alert if features are stale."""
for fname in feature_names:
cur = self.feature_store.conn.execute(
"""SELECT MAX(ingestion_timestamp) FROM features
WHERE feature_name=?""",
(fname,)
)
result = cur.fetchone()
if result[0] is None:
self.alerts.append({
"type": "staleness",
"severity": "high",
"message": f"Feature '{fname}' has no data"
})
continue
last_update = datetime.fromisoformat(result[0])
age_hours = (
datetime.now() - last_update
).total_seconds() / 3600
if age_hours > max_age_hours:
self.alerts.append({
"type": "staleness",
"severity": "high" if age_hours > 48 else "medium",
"message": (
f"Feature '{fname}' is {age_hours:.1f}h old "
f"(max: {max_age_hours}h)"
)
})
def check_prediction_drift(self, current_predictions: np.ndarray):
"""Check if prediction distribution has shifted."""
ks_stat, ks_p = stats.ks_2samp(
self.ref_preds, current_predictions
)
if ks_p < 0.01:
self.alerts.append({
"type": "prediction_drift",
"severity": "high",
"message": (
f"Prediction drift detected "
f"(KS stat={ks_stat:.4f}, p={ks_p:.6f})"
)
})
def check_performance(self, predictions: np.ndarray,
actuals: np.ndarray):
"""Check if model performance has degraded."""
current_brier = np.mean((predictions - actuals) ** 2)
if current_brier > self.ref_brier * 1.5:
self.alerts.append({
"type": "concept_drift",
"severity": "high",
"message": (
f"Model performance degraded: "
f"Brier {current_brier:.4f} vs "
f"reference {self.ref_brier:.4f}"
)
})
return True # Trigger retraining
return False
def run_all_checks(self, current_predictions, actuals=None):
"""Run all monitoring checks."""
self.alerts = []
self.check_feature_freshness(FEATURE_NAMES)
self.check_prediction_drift(current_predictions)
if actuals is not None:
self.check_performance(current_predictions, actuals)
if self.alerts:
print(f"\n{'='*60}")
print(f"MONITORING ALERTS ({len(self.alerts)})")
print(f"{'='*60}")
for alert in self.alerts:
icon = "!!!" if alert["severity"] == "high" else " ! "
print(f" [{icon}] {alert['type']}: {alert['message']}")
else:
print("All checks passed. System healthy.")
return self.alerts
Results
After deploying the complete pipeline, DataEdge Capital saw the following improvements:
| Metric | Before | After |
|---|---|---|
| Time to detect broken pipeline | 3 days | 15 minutes (alerting) |
| Model retraining | Ad-hoc, manual | Automated, daily |
| Feature consistency | 2 different implementations | 1 centralized store |
| Prediction traceability | None | Full audit trail |
| Average prediction latency | 30 min (manual) | 12 ms (API) |
| Monthly model-related losses | ~$15,000 | ~$2,000 |
Key Lessons Learned
-
Start simple: The SQLite feature store and joblib serialization were sufficient for DataEdge's scale. They did not need Redis, Kubernetes, or a distributed feature store.
-
Monitor feature freshness first: The biggest risk was not model quality but stale data. Feature freshness monitoring caught problems before they affected predictions.
-
Automate the easy parts first: Scheduled daily retraining was simple to implement and eliminated the "forgot to retrain" problem. Trigger-based retraining came later.
-
Audit trails pay for themselves: When a trade went wrong, being able to trace back to the exact model version, features, and predictions made debugging fast instead of impossible.
-
The pipeline is the product: Priya's individual models were good, but the pipeline --- the system that produces, validates, deploys, and monitors models --- was the real deliverable.