> "The last mile of machine learning is the hardest. Getting a model to work in a notebook is science; getting it to work in production is engineering." --- Adapted from common ML engineering wisdom
In This Chapter
- 27.1 The MLOps Challenge
- 27.2 scikit-learn Pipelines
- 27.3 Feature Stores for Prediction Markets
- 27.4 Experiment Tracking with MLflow
- 27.5 Model Versioning and Registry
- 27.6 Automated Training Pipelines
- 27.7 Model Monitoring and Drift Detection
- 27.8 Serving Predictions
- 27.9 CI/CD for ML
- 27.10 Reproducibility and Governance
- 27.11 Putting It All Together: Production ML Architecture
- 27.12 Chapter Summary
- What's Next
Chapter 27: Feature Stores, Pipelines, and MLOps
"The last mile of machine learning is the hardest. Getting a model to work in a notebook is science; getting it to work in production is engineering." --- Adapted from common ML engineering wisdom
In the preceding chapters, we built models that predict market outcomes, estimate probabilities, and detect inefficiencies. We trained gradient-boosted trees, calibrated classifiers, and evaluated our work with proper scoring rules. But a model sitting in a Jupyter notebook generates zero profit. The chasm between "it works on my machine" and "it runs reliably in production, 24/7, making real decisions on real markets" is vast --- and it is the subject of this chapter.
MLOps (Machine Learning Operations) is the discipline that bridges this gap. It encompasses the tools, practices, and organizational patterns needed to deploy, monitor, maintain, and govern machine learning systems in production. For prediction market practitioners, MLOps is not optional: markets move in real time, data distributions shift as events unfold, and a stale or broken model can hemorrhage money before anyone notices.
This chapter covers the entire production ML lifecycle: from organizing features into reusable stores, through building robust pipelines with scikit-learn, tracking experiments with MLflow, deploying models behind APIs, monitoring for drift, and automating the entire cycle with CI/CD. By the end, you will have a complete blueprint for a production-grade prediction market ML system.
27.1 The MLOps Challenge
27.1.1 The Notebook-to-Production Gap
Most data science work begins in a notebook. You load data, explore it, engineer features, train a model, and evaluate it. The notebook is interactive, iterative, and forgiving. Production is none of these things.
Consider the differences:
| Aspect | Notebook | Production |
|---|---|---|
| Data source | Static CSV or snapshot | Live database, streaming API |
| Feature engineering | Ad-hoc, inline code | Reproducible, versioned pipeline |
| Training | Manual, on-demand | Scheduled or triggered |
| Model selection | Visual comparison | Automated validation gates |
| Serving | model.predict(X) inline |
REST API, batch job, or streaming |
| Monitoring | Manual inspection | Automated alerts, dashboards |
| Rollback | "Undo" in the notebook | Versioned model with rollback procedure |
| Reproducibility | "It worked yesterday" | Deterministic, auditable |
The gap between these two worlds is where most ML projects fail. According to industry surveys, roughly 85--90% of ML models never make it to production. The ones that do often degrade silently, producing increasingly poor predictions as the world changes beneath them.
27.1.2 Why Prediction Markets Amplify the Challenge
Prediction markets present unique MLOps challenges:
-
Non-stationarity: The data-generating process changes constantly. A model trained on pre-debate polling data faces a fundamentally different distribution after a major debate. This is not gradual drift --- it can be sudden and dramatic.
-
Time sensitivity: Markets react in seconds. A model that takes minutes to retrain and redeploy may miss critical trading windows. Latency from feature computation to prediction must be tightly controlled.
-
Adversarial dynamics: Other market participants adapt. If your model exploits a pattern, others will eventually exploit it too, eroding the edge. Your MLOps system must detect when edges disappear.
-
High cost of errors: A miscalibrated model does not just reduce accuracy on a leaderboard --- it loses real money. Silent failures (serving stale predictions, using corrupted features) can be catastrophic.
-
Regulatory and audit requirements: Depending on jurisdiction, prediction market trading may require audit trails, explainability, and compliance documentation.
27.1.3 The MLOps Maturity Model
Organizations typically progress through levels of MLOps maturity:
- Level 0 --- Manual: Everything is manual. Models are trained in notebooks, exported as pickle files, and deployed by copying files to servers.
- Level 1 --- Pipeline Automation: Training pipelines are automated. Feature engineering, training, and evaluation run as scheduled jobs. Model artifacts are stored in a registry.
- Level 2 --- CI/CD for ML: Code changes trigger automated testing, training, and deployment. Model quality gates prevent bad models from reaching production.
- Level 3 --- Full Automation with Monitoring: The system detects drift, triggers retraining, validates new models, and deploys them automatically. Humans oversee but rarely intervene.
This chapter will take you through all four levels, building the components needed for a Level 3 system.
27.1.4 The MLOps Technology Stack
A complete MLOps stack for prediction markets includes:
+---------------------+
| Model Monitoring | <-- Drift detection, performance alerts
+---------------------+
| Model Serving | <-- REST API, batch scoring
+---------------------+
| Model Registry | <-- Version control, staging/production
+---------------------+
| Experiment Tracking| <-- MLflow, parameters, metrics, artifacts
+---------------------+
| Training Pipeline | <-- Automated training, validation gates
+---------------------+
| Feature Store | <-- Online/offline features, point-in-time
+---------------------+
| Data Pipeline | <-- Ingestion, cleaning, transformation
+---------------------+
| Data Sources | <-- Market APIs, polling data, news feeds
+---------------------+
We will build each layer in this chapter.
27.2 scikit-learn Pipelines
27.2.1 Why Pipelines Matter
A prediction market model typically requires multiple preprocessing steps: imputing missing values, encoding categorical variables, scaling numeric features, engineering domain-specific features, and then feeding the result to a classifier or regressor. Without pipelines, this logic lives in scattered notebook cells, making it fragile, hard to reproduce, and impossible to deploy cleanly.
scikit-learn's Pipeline and ColumnTransformer solve this by encapsulating the entire transformation-and-prediction workflow into a single object that can be fitted, serialized, and deployed as a unit.
27.2.2 Basic Pipeline Construction
A Pipeline chains a sequence of transformers followed by an estimator:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('classifier', GradientBoostingClassifier(
n_estimators=200,
learning_rate=0.1,
max_depth=4,
random_state=42
))
])
# Fit the entire pipeline
pipeline.fit(X_train, y_train)
# Predict with the entire pipeline
predictions = pipeline.predict_proba(X_test)[:, 1]
Every step except the last must implement fit and transform. The last step must implement fit and (optionally) predict or predict_proba.
27.2.3 ColumnTransformer for Heterogeneous Data
Prediction market datasets typically contain both numeric features (polling averages, market prices, volume) and categorical features (event type, candidate party, market platform). ColumnTransformer applies different transformations to different column subsets:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
numeric_features = ['poll_average', 'poll_trend', 'market_price',
'volume_24h', 'days_to_resolution']
categorical_features = ['event_type', 'market_platform', 'party']
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='drop' # drop unspecified columns
)
27.2.4 Custom Transformers
Production prediction market models often need domain-specific transformations that do not exist in scikit-learn. You can create custom transformers by inheriting from BaseEstimator and TransformerMixin:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
class PollMarketSpreadTransformer(BaseEstimator, TransformerMixin):
"""Compute spread between poll average and market-implied probability."""
def __init__(self, poll_col='poll_average', market_col='market_price'):
self.poll_col = poll_col
self.market_col = market_col
def fit(self, X, y=None):
# No fitting needed for this transformer
return self
def transform(self, X):
X = X.copy()
X['poll_market_spread'] = X[self.poll_col] - X[self.market_col]
X['poll_market_ratio'] = X[self.poll_col] / (X[self.market_col] + 1e-8)
return X
class VolatilityFeatureTransformer(BaseEstimator, TransformerMixin):
"""Compute rolling volatility features from price history."""
def __init__(self, price_col='market_price', windows=None):
self.price_col = price_col
self.windows = windows or [5, 10, 20]
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
for w in self.windows:
col_name = f'volatility_{w}d'
X[col_name] = (
X[self.price_col]
.rolling(window=w, min_periods=1)
.std()
.fillna(0)
)
return X
class TimeToResolutionTransformer(BaseEstimator, TransformerMixin):
"""Engineer time-based features from resolution date."""
def __init__(self, resolution_col='resolution_date',
current_date_col='observation_date'):
self.resolution_col = resolution_col
self.current_date_col = current_date_col
def fit(self, X, y=None):
return self
def transform(self, X):
X = X.copy()
res_dates = pd.to_datetime(X[self.resolution_col])
obs_dates = pd.to_datetime(X[self.current_date_col])
days_remaining = (res_dates - obs_dates).dt.days
X['days_to_resolution'] = days_remaining
X['log_days_to_resolution'] = np.log1p(
np.maximum(days_remaining, 0)
)
X['urgency'] = 1.0 / (1.0 + days_remaining)
return X
27.2.5 End-to-End Prediction Pipeline
Combining custom transformers with standard scikit-learn components:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
# Step 1: Domain-specific feature engineering
feature_engineer = Pipeline([
('spread', PollMarketSpreadTransformer()),
('time', TimeToResolutionTransformer()),
])
# Step 2: Column-specific preprocessing
numeric_cols = ['poll_average', 'poll_trend', 'market_price',
'volume_24h', 'poll_market_spread', 'poll_market_ratio',
'days_to_resolution', 'log_days_to_resolution', 'urgency']
categorical_cols = ['event_type', 'market_platform']
preprocessor = ColumnTransformer([
('num', StandardScaler(), numeric_cols),
('cat', OneHotEncoder(handle_unknown='ignore',
sparse_output=False), categorical_cols),
])
# Step 3: Full pipeline with calibrated classifier
full_pipeline = Pipeline([
('features', feature_engineer),
('preprocess', preprocessor),
('classifier', CalibratedClassifierCV(
GradientBoostingClassifier(
n_estimators=300,
learning_rate=0.05,
max_depth=5,
subsample=0.8,
random_state=42
),
cv=5,
method='isotonic'
))
])
# Fit
full_pipeline.fit(X_train, y_train)
# Predict probabilities
probabilities = full_pipeline.predict_proba(X_test)[:, 1]
27.2.6 Pipeline Serialization
Once trained, the pipeline must be serialized for deployment. The standard approach uses joblib:
import joblib
from datetime import datetime
# Save pipeline
model_path = f"models/pipeline_{datetime.now():%Y%m%d_%H%M%S}.joblib"
joblib.dump(full_pipeline, model_path)
# Load pipeline
loaded_pipeline = joblib.load(model_path)
# Verify identical predictions
assert np.allclose(
loaded_pipeline.predict_proba(X_test)[:, 1],
probabilities
)
Security note: joblib.load and pickle.load can execute arbitrary code. Never load model files from untrusted sources. In production, use signed artifacts and verify checksums.
27.2.7 Pipeline Introspection and Debugging
Debugging complex pipelines requires introspection:
# Access individual pipeline steps
imputer = full_pipeline.named_steps['preprocess']
# Get feature names after transformation
feature_names = full_pipeline[:-1].get_feature_names_out()
# Inspect intermediate results
intermediate = Pipeline(full_pipeline.steps[:-1]).transform(X_test)
print(f"Shape after preprocessing: {intermediate.shape}")
print(f"Feature names: {feature_names}")
# Access parameters
print(full_pipeline.get_params())
# Set parameters (useful for grid search)
full_pipeline.set_params(classifier__cv=3)
27.3 Feature Stores for Prediction Markets
27.3.1 What Is a Feature Store?
A feature store is a centralized repository for storing, managing, and serving machine learning features. It acts as the bridge between data engineering (which produces raw data) and machine learning (which consumes features).
Without a feature store, feature engineering code is typically duplicated across notebooks, training scripts, and serving code. This leads to:
- Training-serving skew: Features computed differently in training versus serving
- Duplicated effort: Multiple team members re-implementing the same features
- Point-in-time violations: Accidentally using future data during training
- No discoverability: No central catalog of available features
27.3.2 Feature Store Architecture
A feature store has two main components:
Offline store: Stores historical feature values for training. Optimized for batch reads of large datasets. Typically backed by a data warehouse (BigQuery, Snowflake) or file storage (Parquet files, Delta Lake).
Online store: Stores the latest feature values for real-time serving. Optimized for low-latency point lookups. Typically backed by Redis, DynamoDB, or a similar key-value store.
+-------------------+
| Feature Store |
| |
Training <---- | Offline Store | <---- Data Pipeline
Pipeline | (historical) | (batch)
| |
Serving <---- | Online Store | <---- Data Pipeline
(real-time) | (latest values) | (streaming)
+-------------------+
27.3.3 Point-in-Time Correctness
Point-in-time correctness is arguably the most critical feature store concept for prediction markets. When training a model, you must ensure that the features used for each training example reflect only information that was available at the time of that example.
Consider a market for "Will Candidate X win the election?" observed on October 1st. Your features should include:
- Poll averages available on September 30th (correct)
- The actual election result from November 5th (incorrect --- data leakage!)
- A poll released on October 3rd (incorrect --- future data!)
A feature store enforces point-in-time correctness through temporal joins:
def point_in_time_join(entity_df, feature_df,
entity_timestamp_col='event_timestamp',
feature_timestamp_col='feature_timestamp'):
"""
Join features to entities using point-in-time semantics.
For each entity row, find the most recent feature row
where feature_timestamp <= entity_timestamp.
"""
# Sort features by timestamp
feature_df = feature_df.sort_values(feature_timestamp_col)
results = []
for _, entity_row in entity_df.iterrows():
entity_time = entity_row[entity_timestamp_col]
entity_id = entity_row['entity_id']
# Filter features: same entity, timestamp <= entity timestamp
valid_features = feature_df[
(feature_df['entity_id'] == entity_id) &
(feature_df[feature_timestamp_col] <= entity_time)
]
if len(valid_features) > 0:
# Take the most recent valid feature row
latest = valid_features.iloc[-1]
merged = {**entity_row.to_dict(), **latest.to_dict()}
results.append(merged)
return pd.DataFrame(results)
In practice, use pandas.merge_asof for efficient point-in-time joins:
import pandas as pd
def efficient_pit_join(entity_df, feature_df,
on='entity_id',
entity_time='event_timestamp',
feature_time='feature_timestamp'):
"""Efficient point-in-time join using merge_asof."""
entity_sorted = entity_df.sort_values(entity_time)
feature_sorted = feature_df.sort_values(feature_time)
result = pd.merge_asof(
entity_sorted,
feature_sorted,
left_on=entity_time,
right_on=feature_time,
by=on,
direction='backward' # only look at past features
)
return result
27.3.4 Building a Simple Feature Store
For small-to-medium prediction market systems, a SQLite-backed feature store provides a practical starting point:
import sqlite3
import pandas as pd
import json
from datetime import datetime
from typing import List, Dict, Optional
class PredictionMarketFeatureStore:
"""
A simple feature store for prediction market features.
Supports:
- Registering feature definitions
- Ingesting feature values with timestamps
- Point-in-time retrieval for training
- Latest-value retrieval for serving
"""
def __init__(self, db_path: str = "feature_store.db"):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self._initialize_schema()
def _initialize_schema(self):
"""Create the feature store schema."""
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS feature_definitions (
feature_name TEXT PRIMARY KEY,
description TEXT,
data_type TEXT,
feature_group TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT
);
CREATE TABLE IF NOT EXISTS feature_values (
entity_id TEXT NOT NULL,
feature_name TEXT NOT NULL,
feature_value REAL,
event_timestamp TIMESTAMP NOT NULL,
ingestion_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (feature_name)
REFERENCES feature_definitions(feature_name)
);
CREATE INDEX IF NOT EXISTS idx_feature_values_lookup
ON feature_values(entity_id, feature_name, event_timestamp);
CREATE TABLE IF NOT EXISTS feature_groups (
group_name TEXT PRIMARY KEY,
description TEXT,
entity_type TEXT,
features TEXT
);
""")
self.conn.commit()
def register_feature(self, name: str, description: str,
data_type: str = "float",
feature_group: str = "default",
metadata: Optional[Dict] = None):
"""Register a new feature definition."""
self.conn.execute(
"""INSERT OR REPLACE INTO feature_definitions
(feature_name, description, data_type,
feature_group, metadata)
VALUES (?, ?, ?, ?, ?)""",
(name, description, data_type, feature_group,
json.dumps(metadata or {}))
)
self.conn.commit()
def ingest(self, entity_id: str, features: Dict[str, float],
event_timestamp: datetime):
"""Ingest feature values for an entity at a given timestamp."""
for name, value in features.items():
self.conn.execute(
"""INSERT INTO feature_values
(entity_id, feature_name, feature_value,
event_timestamp)
VALUES (?, ?, ?, ?)""",
(entity_id, name, value,
event_timestamp.isoformat())
)
self.conn.commit()
def get_training_features(
self,
entity_ids: List[str],
feature_names: List[str],
timestamps: List[datetime]
) -> pd.DataFrame:
"""
Get features for training with point-in-time correctness.
For each (entity_id, timestamp) pair, returns the most recent
feature values on or before that timestamp.
"""
results = []
for entity_id, ts in zip(entity_ids, timestamps):
row = {'entity_id': entity_id, 'timestamp': ts}
for fname in feature_names:
cursor = self.conn.execute(
"""SELECT feature_value FROM feature_values
WHERE entity_id = ?
AND feature_name = ?
AND event_timestamp <= ?
ORDER BY event_timestamp DESC
LIMIT 1""",
(entity_id, fname, ts.isoformat())
)
result = cursor.fetchone()
row[fname] = result[0] if result else None
results.append(row)
return pd.DataFrame(results)
def get_online_features(
self,
entity_id: str,
feature_names: List[str]
) -> Dict[str, float]:
"""
Get latest feature values for real-time serving.
"""
features = {}
for fname in feature_names:
cursor = self.conn.execute(
"""SELECT feature_value FROM feature_values
WHERE entity_id = ? AND feature_name = ?
ORDER BY event_timestamp DESC
LIMIT 1""",
(entity_id, fname)
)
result = cursor.fetchone()
features[fname] = result[0] if result else None
return features
def list_features(self, group: Optional[str] = None) -> pd.DataFrame:
"""List all registered features."""
query = "SELECT * FROM feature_definitions"
params = ()
if group:
query += " WHERE feature_group = ?"
params = (group,)
return pd.read_sql(query, self.conn, params=params)
def get_feature_freshness(self, feature_name: str) -> Dict:
"""Check when a feature was last updated."""
cursor = self.conn.execute(
"""SELECT
MAX(event_timestamp) as latest_event,
MAX(ingestion_timestamp) as latest_ingestion,
COUNT(DISTINCT entity_id) as entity_count,
COUNT(*) as value_count
FROM feature_values
WHERE feature_name = ?""",
(feature_name,)
)
row = cursor.fetchone()
return {
'feature_name': feature_name,
'latest_event_timestamp': row[0],
'latest_ingestion_timestamp': row[1],
'entity_count': row[2],
'value_count': row[3]
}
def close(self):
"""Close the database connection."""
self.conn.close()
27.3.5 Using the Feature Store
# Initialize
store = PredictionMarketFeatureStore("pm_features.db")
# Register features
store.register_feature(
"poll_average", "7-day rolling poll average",
feature_group="polling"
)
store.register_feature(
"market_price", "Current market price (0-1)",
feature_group="market"
)
store.register_feature(
"volume_24h", "24-hour trading volume in USD",
feature_group="market"
)
store.register_feature(
"poll_market_spread", "Difference between poll and market price",
feature_group="derived"
)
# Ingest features (e.g., from a daily pipeline)
from datetime import datetime
store.ingest(
entity_id="election_2024_president",
features={
"poll_average": 0.52,
"market_price": 0.48,
"volume_24h": 1250000.0,
"poll_market_spread": 0.04,
},
event_timestamp=datetime(2024, 10, 1)
)
# Get training features with point-in-time correctness
training_df = store.get_training_features(
entity_ids=["election_2024_president"] * 3,
feature_names=["poll_average", "market_price", "poll_market_spread"],
timestamps=[
datetime(2024, 9, 15),
datetime(2024, 10, 1),
datetime(2024, 10, 15),
]
)
# Get online features for real-time serving
online_features = store.get_online_features(
entity_id="election_2024_president",
feature_names=["poll_average", "market_price", "volume_24h"]
)
27.3.6 Feature Store Best Practices
- Feature naming conventions: Use
{group}__{feature_name}(e.g.,polling__7day_average,market__24h_volume). - Versioned feature definitions: When a feature computation changes, create a new version rather than modifying the old one.
- Feature documentation: Every feature should have a description, owner, data type, and acceptable value range.
- Monitoring feature quality: Track null rates, value distributions, and staleness for every feature.
- Access control: In team settings, control who can write to the feature store and who can read.
27.4 Experiment Tracking with MLflow
27.4.1 The Need for Experiment Tracking
Training a prediction market model involves hundreds of decisions: which features to include, how to preprocess them, which algorithm to use, what hyperparameters to set, which training window to use, and how to calibrate the output. Without systematic tracking, you quickly lose track of what you tried, what worked, and why.
Experiment tracking solves this by logging every training run with its parameters, metrics, artifacts, and metadata. MLflow is the most widely adopted open-source experiment tracking tool, and it integrates cleanly with scikit-learn, XGBoost, LightGBM, and other frameworks common in prediction market modeling.
27.4.2 MLflow Core Concepts
- Experiment: A named collection of runs (e.g., "election_model_v3")
- Run: A single execution of training code, with logged parameters, metrics, and artifacts
- Parameter: An input to the training process (e.g.,
learning_rate=0.1) - Metric: An output measurement (e.g.,
brier_score=0.18) - Artifact: A file produced by the run (e.g., the serialized model, a calibration plot)
- Model Registry: A centralized store for managing model versions and lifecycle stages
27.4.3 Setting Up MLflow
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
# Set tracking URI (local file store or remote server)
mlflow.set_tracking_uri("sqlite:///mlflow.db") # local
# mlflow.set_tracking_uri("http://mlflow-server:5000") # remote
# Create or get experiment
experiment_name = "prediction_market_election_model"
mlflow.set_experiment(experiment_name)
27.4.4 Logging a Training Run
import mlflow
import mlflow.sklearn
import numpy as np
from sklearn.metrics import brier_score_loss, log_loss, roc_auc_score
from sklearn.model_selection import cross_val_score
def train_and_log_model(pipeline, X_train, y_train, X_test, y_test,
params: dict, tags: dict = None):
"""Train a model and log everything to MLflow."""
with mlflow.start_run():
# Log parameters
mlflow.log_params(params)
# Log tags
if tags:
mlflow.set_tags(tags)
mlflow.set_tag("model_type", "prediction_market")
mlflow.set_tag("data_version", "2024-10-15")
# Train
pipeline.fit(X_train, y_train)
# Predict
y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
y_pred = pipeline.predict(X_test)
# Compute metrics
brier = brier_score_loss(y_test, y_pred_proba)
logloss = log_loss(y_test, y_pred_proba)
auc = roc_auc_score(y_test, y_pred_proba)
# Cross-validation Brier score
cv_scores = cross_val_score(
pipeline, X_train, y_train,
scoring='neg_brier_score', cv=5
)
cv_brier = -cv_scores.mean()
cv_brier_std = cv_scores.std()
# Log metrics
mlflow.log_metric("brier_score", brier)
mlflow.log_metric("log_loss", logloss)
mlflow.log_metric("auc_roc", auc)
mlflow.log_metric("cv_brier_score", cv_brier)
mlflow.log_metric("cv_brier_std", cv_brier_std)
# Log calibration metrics
from sklearn.calibration import calibration_curve
prob_true, prob_pred = calibration_curve(
y_test, y_pred_proba, n_bins=10
)
calibration_error = np.mean(np.abs(prob_true - prob_pred))
mlflow.log_metric("mean_calibration_error", calibration_error)
# Log the model
mlflow.sklearn.log_model(
pipeline,
artifact_path="model",
registered_model_name="election_predictor"
)
# Log artifacts (e.g., plots, data summaries)
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(8, 6))
ax.plot([0, 1], [0, 1], 'k--', label='Perfect calibration')
ax.plot(prob_pred, prob_true, 's-', label='Model')
ax.set_xlabel('Predicted probability')
ax.set_ylabel('Observed frequency')
ax.set_title('Calibration Curve')
ax.legend()
fig.savefig("calibration_plot.png", dpi=100, bbox_inches='tight')
plt.close(fig)
mlflow.log_artifact("calibration_plot.png")
run_id = mlflow.active_run().info.run_id
print(f"Run ID: {run_id}")
print(f"Brier Score: {brier:.4f}")
print(f"Log Loss: {logloss:.4f}")
print(f"AUC-ROC: {auc:.4f}")
print(f"CV Brier Score: {cv_brier:.4f} +/- {cv_brier_std:.4f}")
return run_id
27.4.5 Comparing Runs
MLflow's UI makes it easy to compare runs visually, but you can also do it programmatically:
from mlflow.tracking import MlflowClient
client = MlflowClient()
def compare_runs(experiment_name: str, metric: str = "brier_score",
top_n: int = 5):
"""Compare the best runs in an experiment."""
experiment = client.get_experiment_by_name(experiment_name)
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=[f"metrics.{metric} ASC"],
max_results=top_n
)
print(f"Top {top_n} runs by {metric}:")
print("-" * 80)
for run in runs:
params = run.data.params
metrics = run.data.metrics
print(f" Run: {run.info.run_id[:8]} "
f"Brier: {metrics.get('brier_score', 'N/A'):.4f} "
f"AUC: {metrics.get('auc_roc', 'N/A'):.4f} "
f"LR: {params.get('learning_rate', 'N/A')} "
f"Depth: {params.get('max_depth', 'N/A')}")
return runs
27.4.6 Logging Custom Metrics Over Time
For prediction markets, you often want to track how a model's performance evolves. MLflow supports step-based metric logging:
def log_rolling_performance(model, data_stream, window=50):
"""Log rolling Brier score as the model processes new data."""
predictions = []
actuals = []
with mlflow.start_run():
for i, (features, outcome) in enumerate(data_stream):
prob = model.predict_proba(features.reshape(1, -1))[0, 1]
predictions.append(prob)
actuals.append(outcome)
if len(predictions) >= window:
rolling_brier = brier_score_loss(
actuals[-window:], predictions[-window:]
)
mlflow.log_metric(
"rolling_brier_score", rolling_brier, step=i
)
27.5 Model Versioning and Registry
27.5.1 Why Version Models?
A prediction market trading system may run for months or years. During that time, you will train dozens or hundreds of model versions. You need to:
- Track which version is currently serving predictions
- Roll back to a previous version if the current one degrades
- Run A/B tests between model versions
- Maintain an audit trail of which model made which predictions
27.5.2 MLflow Model Registry
The MLflow Model Registry provides a centralized model store with versioning and lifecycle management:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register a model (happens automatically with registered_model_name
# in log_model, or manually)
model_name = "election_predictor"
# Transition model version to staging
client.transition_model_version_stage(
name=model_name,
version=2,
stage="Staging"
)
# After validation, promote to production
client.transition_model_version_stage(
name=model_name,
version=2,
stage="Production"
)
# Archive the old production model
client.transition_model_version_stage(
name=model_name,
version=1,
stage="Archived"
)
27.5.3 Model Lifecycle States
+--------+ promote +---------+ promote +------------+
| None | --------------> | Staging | --------------> | Production |
+--------+ +---------+ +------------+
| | |
| archive | archive |
+-------------------------> +--------------------------> +
|
+----------+
| Archived |
+----------+
- None: Initial state after registration
- Staging: Model under validation, not yet serving live traffic
- Production: Model actively serving predictions
- Archived: Previous production model, kept for audit and rollback
27.5.4 Loading Models by Stage
import mlflow.pyfunc
# Load the current production model
model = mlflow.pyfunc.load_model(
model_uri=f"models:/{model_name}/Production"
)
# Load a specific version
model_v2 = mlflow.pyfunc.load_model(
model_uri=f"models:/{model_name}/2"
)
# Make predictions
predictions = model.predict(X_new)
27.5.5 Automated Model Validation
Before promoting a model, you should validate it automatically:
def validate_model_for_promotion(model_name: str, version: int,
X_val, y_val,
max_brier: float = 0.25,
min_auc: float = 0.60):
"""
Validate a model version before promoting to production.
Returns True if the model passes all quality gates.
"""
# Load the candidate model
model = mlflow.pyfunc.load_model(
model_uri=f"models:/{model_name}/{version}"
)
# Make predictions
predictions = model.predict(X_val)
# If predictions are probabilities (single column)
if predictions.ndim == 1:
y_pred_proba = predictions
else:
y_pred_proba = predictions[:, 1]
# Compute metrics
brier = brier_score_loss(y_val, y_pred_proba)
auc = roc_auc_score(y_val, y_pred_proba)
# Quality gates
checks = {
'brier_score': (brier <= max_brier, brier, max_brier),
'auc_roc': (auc >= min_auc, auc, min_auc),
}
# Compare with current production model
try:
prod_model = mlflow.pyfunc.load_model(
model_uri=f"models:/{model_name}/Production"
)
prod_predictions = prod_model.predict(X_val)
if prod_predictions.ndim == 1:
prod_proba = prod_predictions
else:
prod_proba = prod_predictions[:, 1]
prod_brier = brier_score_loss(y_val, prod_proba)
checks['better_than_prod'] = (
brier <= prod_brier * 1.05, # allow 5% tolerance
brier, prod_brier
)
except Exception:
# No production model yet
checks['better_than_prod'] = (True, brier, None)
# Report
all_passed = True
for check_name, (passed, actual, threshold) in checks.items():
status = "PASS" if passed else "FAIL"
print(f" [{status}] {check_name}: {actual:.4f} "
f"(threshold: {threshold})")
if not passed:
all_passed = False
return all_passed
27.5.6 A/B Testing Model Versions
import random
from typing import Optional
class ModelABTester:
"""A/B test between two model versions."""
def __init__(self, model_name: str,
control_version: int,
treatment_version: int,
treatment_fraction: float = 0.1):
self.model_name = model_name
self.control = mlflow.pyfunc.load_model(
f"models:/{model_name}/{control_version}"
)
self.treatment = mlflow.pyfunc.load_model(
f"models:/{model_name}/{treatment_version}"
)
self.treatment_fraction = treatment_fraction
self.control_results = []
self.treatment_results = []
def predict(self, X) -> tuple:
"""
Returns (prediction, model_version) tuple.
Routes traffic between control and treatment.
"""
if random.random() < self.treatment_fraction:
pred = self.treatment.predict(X)
return pred, "treatment"
else:
pred = self.control.predict(X)
return pred, "control"
def record_outcome(self, prediction: float, actual: int,
group: str):
"""Record the outcome for analysis."""
if group == "control":
self.control_results.append((prediction, actual))
else:
self.treatment_results.append((prediction, actual))
def analyze(self) -> dict:
"""Compare control and treatment performance."""
if not self.control_results or not self.treatment_results:
return {"status": "insufficient_data"}
ctrl_preds, ctrl_actuals = zip(*self.control_results)
treat_preds, treat_actuals = zip(*self.treatment_results)
ctrl_brier = brier_score_loss(ctrl_actuals, ctrl_preds)
treat_brier = brier_score_loss(treat_actuals, treat_preds)
return {
"control_brier": ctrl_brier,
"treatment_brier": treat_brier,
"improvement": ctrl_brier - treat_brier,
"control_n": len(self.control_results),
"treatment_n": len(self.treatment_results),
"treatment_is_better": treat_brier < ctrl_brier,
}
27.6 Automated Training Pipelines
27.6.1 Why Automate Training?
Manual retraining is unreliable. You forget, you are busy, or you do not notice that the model needs updating. Automated training pipelines ensure models are retrained on schedule or in response to triggers, validated before deployment, and deployed without human intervention (when appropriate).
27.6.2 Scheduled vs. Triggered Retraining
Two primary strategies for retraining:
Scheduled retraining: Retrain on a fixed schedule (e.g., daily, weekly). Simple to implement and reason about, but may retrain unnecessarily or miss critical updates.
Triggered retraining: Retrain when specific conditions are met: - New data exceeds a threshold (e.g., 1000 new resolved markets) - Model performance drops below a threshold - Data drift is detected - A significant event occurs (e.g., major news)
In practice, most prediction market systems use both: scheduled retraining as a baseline, with triggered retraining for exceptional situations.
27.6.3 Training Pipeline Architecture
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Callable, Optional, List
import json
logger = logging.getLogger(__name__)
@dataclass
class TrainingConfig:
"""Configuration for a training pipeline run."""
model_name: str
training_window_days: int = 365
validation_window_days: int = 30
min_training_samples: int = 500
max_brier_score: float = 0.25
min_auc: float = 0.60
feature_names: List[str] = field(default_factory=list)
hyperparameters: dict = field(default_factory=dict)
class AutomatedTrainingPipeline:
"""
End-to-end automated training pipeline for prediction markets.
Steps:
1. Fetch training data from feature store
2. Split into train/validation
3. Train model
4. Validate against quality gates
5. Register model if it passes
6. Optionally promote to production
"""
def __init__(self, feature_store, config: TrainingConfig):
self.feature_store = feature_store
self.config = config
self.run_history = []
def fetch_training_data(self, as_of: datetime = None):
"""Fetch training data from the feature store."""
if as_of is None:
as_of = datetime.now()
train_start = as_of - timedelta(
days=self.config.training_window_days
)
val_start = as_of - timedelta(
days=self.config.validation_window_days
)
logger.info(
f"Fetching training data: {train_start} to {val_start}"
)
logger.info(
f"Fetching validation data: {val_start} to {as_of}"
)
# In practice, fetch from feature store
# This is a simplified placeholder
return {
'train_start': train_start,
'train_end': val_start,
'val_start': val_start,
'val_end': as_of,
}
def build_pipeline(self):
"""Build the scikit-learn pipeline."""
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
hp = self.config.hyperparameters
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
('classifier', CalibratedClassifierCV(
GradientBoostingClassifier(
n_estimators=hp.get('n_estimators', 200),
learning_rate=hp.get('learning_rate', 0.05),
max_depth=hp.get('max_depth', 4),
subsample=hp.get('subsample', 0.8),
random_state=42
),
cv=5,
method='isotonic'
))
])
return pipeline
def validate(self, pipeline, X_val, y_val) -> dict:
"""Validate the model against quality gates."""
from sklearn.metrics import (
brier_score_loss, log_loss, roc_auc_score
)
y_proba = pipeline.predict_proba(X_val)[:, 1]
brier = brier_score_loss(y_val, y_proba)
logloss = log_loss(y_val, y_proba)
auc = roc_auc_score(y_val, y_proba)
passed = (
brier <= self.config.max_brier_score and
auc >= self.config.min_auc
)
return {
'passed': passed,
'brier_score': brier,
'log_loss': logloss,
'auc_roc': auc,
'thresholds': {
'max_brier': self.config.max_brier_score,
'min_auc': self.config.min_auc,
}
}
def run(self, X_train, y_train, X_val, y_val,
auto_promote: bool = False) -> dict:
"""Execute the full training pipeline."""
import mlflow
import mlflow.sklearn
run_result = {
'timestamp': datetime.now().isoformat(),
'config': self.config.__dict__,
}
# Check minimum data requirements
if len(X_train) < self.config.min_training_samples:
run_result['status'] = 'skipped'
run_result['reason'] = (
f'Insufficient training data: '
f'{len(X_train)} < {self.config.min_training_samples}'
)
logger.warning(run_result['reason'])
return run_result
with mlflow.start_run() as run:
# Log configuration
mlflow.log_params({
'training_window_days':
self.config.training_window_days,
'n_train_samples': len(X_train),
'n_val_samples': len(X_val),
**self.config.hyperparameters,
})
# Build and train pipeline
pipeline = self.build_pipeline()
logger.info("Training model...")
pipeline.fit(X_train, y_train)
# Validate
validation_result = self.validate(
pipeline, X_val, y_val
)
mlflow.log_metrics({
'brier_score': validation_result['brier_score'],
'log_loss': validation_result['log_loss'],
'auc_roc': validation_result['auc_roc'],
})
run_result['validation'] = validation_result
run_result['run_id'] = run.info.run_id
if validation_result['passed']:
# Register model
model_info = mlflow.sklearn.log_model(
pipeline,
artifact_path="model",
registered_model_name=self.config.model_name,
)
run_result['status'] = 'registered'
run_result['model_uri'] = model_info.model_uri
logger.info(
f"Model registered: {model_info.model_uri}"
)
if auto_promote:
client = mlflow.tracking.MlflowClient()
latest = client.get_latest_versions(
self.config.model_name, stages=["None"]
)
if latest:
client.transition_model_version_stage(
name=self.config.model_name,
version=latest[0].version,
stage="Production",
)
run_result['status'] = 'promoted'
logger.info("Model promoted to Production")
else:
run_result['status'] = 'rejected'
logger.warning(
f"Model rejected: {validation_result}"
)
self.run_history.append(run_result)
return run_result
27.6.4 Scheduling with APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
def create_training_scheduler(training_pipeline, data_fetcher):
"""Create a scheduler for automated retraining."""
scheduler = BackgroundScheduler()
def retrain_job():
"""Scheduled retraining job."""
logger.info("Starting scheduled retraining...")
try:
X_train, y_train, X_val, y_val = data_fetcher()
result = training_pipeline.run(
X_train, y_train, X_val, y_val,
auto_promote=False
)
logger.info(f"Retraining complete: {result['status']}")
except Exception as e:
logger.error(f"Retraining failed: {e}")
# Retrain daily at 2 AM
scheduler.add_job(
retrain_job,
trigger=CronTrigger(hour=2, minute=0),
id='daily_retrain',
name='Daily Model Retraining'
)
return scheduler
27.7 Model Monitoring and Drift Detection
27.7.1 Why Models Degrade
A model's performance can degrade for several reasons:
-
Concept drift: The relationship between features and the target changes. For prediction markets, this might happen when a new type of candidate emerges, when market structure changes, or when participant behavior shifts.
-
Data drift (covariate shift): The distribution of input features changes, even if the underlying relationship stays the same. For example, if your model was trained on markets with 30-90 day horizons but is now being asked to predict markets with 7-day horizons.
-
Label drift: The distribution of outcomes changes. If your training data was 60% yes / 40% no outcomes, but production data is 90% yes / 10% no, the model may be miscalibrated.
-
Upstream data issues: A data source changes format, goes down, or starts reporting different values. Feature pipelines break silently.
27.7.2 Monitoring Metrics
For prediction market models, the key metrics to monitor are:
Performance metrics (require labels, available after resolution): - Brier score (primary) - Log loss - Calibration error - Resolution-weighted profit/loss
Proxy metrics (available immediately, without labels): - Prediction distribution statistics (mean, variance, quantiles) - Feature distribution statistics - Prediction-market price agreement - Confidence intervals
27.7.3 Statistical Tests for Drift
Population Stability Index (PSI)
PSI measures how much a distribution has shifted relative to a reference distribution. It is commonly used in credit scoring and is well-suited for monitoring prediction distributions:
$$\text{PSI} = \sum_{i=1}^{n} (p_i^{\text{new}} - p_i^{\text{ref}}) \cdot \ln\left(\frac{p_i^{\text{new}}}{p_i^{\text{ref}}}\right)$$
where $p_i^{\text{ref}}$ and $p_i^{\text{new}}$ are the proportions in each bin for the reference and new distributions respectively.
Interpretation: - PSI < 0.1: No significant drift - 0.1 <= PSI < 0.25: Moderate drift, investigate - PSI >= 0.25: Significant drift, action required
Kolmogorov-Smirnov Test
The KS test compares two distributions by computing the maximum difference between their empirical CDFs:
$$D = \sup_x |F_{\text{ref}}(x) - F_{\text{new}}(x)|$$
The test returns a statistic and a p-value. A small p-value (e.g., < 0.05) indicates the distributions are significantly different.
Wasserstein Distance (Earth Mover's Distance)
The Wasserstein distance measures the "work" needed to transform one distribution into another:
$$W_1(P, Q) = \int_{-\infty}^{\infty} |F_P(x) - F_Q(x)| \, dx$$
This metric is more sensitive to the magnitude of the shift than the KS test.
27.7.4 Implementing a Drift Monitor
import numpy as np
from scipy import stats
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dataclass
class DriftAlert:
"""Represents a drift alert."""
timestamp: datetime
feature_name: str
drift_type: str # 'data_drift', 'concept_drift', 'prediction_drift'
test_name: str
statistic: float
p_value: Optional[float]
severity: str # 'low', 'medium', 'high'
message: str
class DriftMonitor:
"""
Monitor for detecting data drift and concept drift
in prediction market models.
"""
def __init__(self, reference_predictions: np.ndarray = None,
reference_features: np.ndarray = None,
feature_names: List[str] = None,
psi_threshold_moderate: float = 0.1,
psi_threshold_high: float = 0.25,
ks_p_value_threshold: float = 0.05):
self.reference_predictions = reference_predictions
self.reference_features = reference_features
self.feature_names = feature_names or []
self.psi_threshold_moderate = psi_threshold_moderate
self.psi_threshold_high = psi_threshold_high
self.ks_p_value_threshold = ks_p_value_threshold
self.alerts: List[DriftAlert] = []
def set_reference(self, predictions: np.ndarray = None,
features: np.ndarray = None):
"""Set the reference distribution from training/validation."""
if predictions is not None:
self.reference_predictions = predictions.copy()
if features is not None:
self.reference_features = features.copy()
@staticmethod
def compute_psi(reference: np.ndarray, current: np.ndarray,
n_bins: int = 10) -> float:
"""Compute Population Stability Index."""
# Create bins from the reference distribution
_, bin_edges = np.histogram(reference, bins=n_bins)
# Extend the outer bins to capture all values
bin_edges[0] = -np.inf
bin_edges[-1] = np.inf
ref_counts, _ = np.histogram(reference, bins=bin_edges)
cur_counts, _ = np.histogram(current, bins=bin_edges)
# Convert to proportions (add small epsilon to avoid log(0))
eps = 1e-8
ref_props = ref_counts / len(reference) + eps
cur_props = cur_counts / len(current) + eps
# Normalize to ensure they sum to 1
ref_props = ref_props / ref_props.sum()
cur_props = cur_props / cur_props.sum()
psi = np.sum(
(cur_props - ref_props) * np.log(cur_props / ref_props)
)
return psi
@staticmethod
def compute_ks_test(reference: np.ndarray,
current: np.ndarray) -> Tuple[float, float]:
"""Compute Kolmogorov-Smirnov test statistic and p-value."""
statistic, p_value = stats.ks_2samp(reference, current)
return statistic, p_value
@staticmethod
def compute_wasserstein(reference: np.ndarray,
current: np.ndarray) -> float:
"""Compute Wasserstein (Earth Mover's) distance."""
return stats.wasserstein_distance(reference, current)
def check_prediction_drift(
self, current_predictions: np.ndarray
) -> List[DriftAlert]:
"""Check if prediction distribution has drifted."""
alerts = []
if self.reference_predictions is None:
logger.warning("No reference predictions set")
return alerts
# PSI
psi = self.compute_psi(
self.reference_predictions, current_predictions
)
if psi >= self.psi_threshold_high:
severity = 'high'
elif psi >= self.psi_threshold_moderate:
severity = 'medium'
else:
severity = 'low'
if severity != 'low':
alert = DriftAlert(
timestamp=datetime.now(),
feature_name='predictions',
drift_type='prediction_drift',
test_name='PSI',
statistic=psi,
p_value=None,
severity=severity,
message=(
f"Prediction distribution drift detected "
f"(PSI={psi:.4f}, severity={severity})"
)
)
alerts.append(alert)
logger.warning(alert.message)
# KS test
ks_stat, ks_p = self.compute_ks_test(
self.reference_predictions, current_predictions
)
if ks_p < self.ks_p_value_threshold:
alert = DriftAlert(
timestamp=datetime.now(),
feature_name='predictions',
drift_type='prediction_drift',
test_name='KS',
statistic=ks_stat,
p_value=ks_p,
severity='medium' if ks_p < 0.01 else 'low',
message=(
f"KS test detected prediction drift "
f"(stat={ks_stat:.4f}, p={ks_p:.6f})"
)
)
alerts.append(alert)
self.alerts.extend(alerts)
return alerts
def check_feature_drift(
self, current_features: np.ndarray
) -> List[DriftAlert]:
"""Check if feature distributions have drifted."""
alerts = []
if self.reference_features is None:
logger.warning("No reference features set")
return alerts
n_features = min(
self.reference_features.shape[1],
current_features.shape[1]
)
for i in range(n_features):
fname = (self.feature_names[i]
if i < len(self.feature_names)
else f"feature_{i}")
ref_col = self.reference_features[:, i]
cur_col = current_features[:, i]
# PSI
psi = self.compute_psi(ref_col, cur_col)
if psi >= self.psi_threshold_moderate:
severity = (
'high' if psi >= self.psi_threshold_high
else 'medium'
)
alert = DriftAlert(
timestamp=datetime.now(),
feature_name=fname,
drift_type='data_drift',
test_name='PSI',
statistic=psi,
p_value=None,
severity=severity,
message=(
f"Feature '{fname}' drift detected "
f"(PSI={psi:.4f})"
)
)
alerts.append(alert)
logger.warning(alert.message)
# KS test
ks_stat, ks_p = self.compute_ks_test(ref_col, cur_col)
if ks_p < self.ks_p_value_threshold:
alert = DriftAlert(
timestamp=datetime.now(),
feature_name=fname,
drift_type='data_drift',
test_name='KS',
statistic=ks_stat,
p_value=ks_p,
severity='medium',
message=(
f"KS test: feature '{fname}' drift "
f"(stat={ks_stat:.4f}, p={ks_p:.6f})"
)
)
alerts.append(alert)
self.alerts.extend(alerts)
return alerts
def check_concept_drift(
self,
predictions: np.ndarray,
actuals: np.ndarray,
reference_brier: float,
threshold_factor: float = 1.5
) -> List[DriftAlert]:
"""
Check for concept drift by monitoring prediction quality.
Concept drift is detected when model performance degrades
significantly.
"""
alerts = []
current_brier = np.mean((predictions - actuals) ** 2)
if current_brier > reference_brier * threshold_factor:
severity = (
'high'
if current_brier > reference_brier * 2.0
else 'medium'
)
alert = DriftAlert(
timestamp=datetime.now(),
feature_name='model_performance',
drift_type='concept_drift',
test_name='brier_degradation',
statistic=current_brier,
p_value=None,
severity=severity,
message=(
f"Concept drift: Brier score degraded from "
f"{reference_brier:.4f} to {current_brier:.4f} "
f"({current_brier/reference_brier:.1f}x)"
)
)
alerts.append(alert)
logger.warning(alert.message)
self.alerts.extend(alerts)
return alerts
def get_alerts_summary(self) -> Dict:
"""Get a summary of all alerts."""
if not self.alerts:
return {"status": "healthy", "alerts": []}
high = [a for a in self.alerts if a.severity == 'high']
medium = [a for a in self.alerts if a.severity == 'medium']
low = [a for a in self.alerts if a.severity == 'low']
status = (
'critical' if high
else 'warning' if medium
else 'info'
)
return {
"status": status,
"total_alerts": len(self.alerts),
"high": len(high),
"medium": len(medium),
"low": len(low),
"alerts": [
{
"feature": a.feature_name,
"type": a.drift_type,
"severity": a.severity,
"message": a.message,
}
for a in self.alerts
]
}
27.7.5 Monitoring Dashboard Metrics
In practice, you would push these metrics to a monitoring system. Here is how to structure the data for a monitoring dashboard:
class ModelMonitoringDashboard:
"""Collect and export metrics for a monitoring dashboard."""
def __init__(self, model_name: str):
self.model_name = model_name
self.metrics_history = []
def record_batch(self, predictions: np.ndarray,
features: np.ndarray,
actuals: np.ndarray = None):
"""Record metrics for a batch of predictions."""
metrics = {
'timestamp': datetime.now().isoformat(),
'model_name': self.model_name,
'batch_size': len(predictions),
'pred_mean': float(np.mean(predictions)),
'pred_std': float(np.std(predictions)),
'pred_min': float(np.min(predictions)),
'pred_max': float(np.max(predictions)),
'pred_median': float(np.median(predictions)),
'pred_q25': float(np.percentile(predictions, 25)),
'pred_q75': float(np.percentile(predictions, 75)),
}
# Feature statistics
for i in range(features.shape[1]):
col = features[:, i]
metrics[f'feature_{i}_mean'] = float(np.mean(col))
metrics[f'feature_{i}_std'] = float(np.std(col))
metrics[f'feature_{i}_null_rate'] = float(
np.isnan(col).mean()
)
# Performance metrics (when labels available)
if actuals is not None:
metrics['brier_score'] = float(
np.mean((predictions - actuals) ** 2)
)
metrics['log_loss'] = float(
-np.mean(
actuals * np.log(predictions + 1e-8)
+ (1 - actuals) * np.log(1 - predictions + 1e-8)
)
)
self.metrics_history.append(metrics)
return metrics
def get_trend(self, metric_name: str,
last_n: int = 30) -> List[float]:
"""Get trend for a specific metric."""
values = [
m[metric_name]
for m in self.metrics_history[-last_n:]
if metric_name in m
]
return values
27.8 Serving Predictions
27.8.1 Serving Architectures
There are three main approaches to serving ML predictions:
-
Batch serving: Run predictions on a schedule (e.g., every hour), store results, and serve from a cache. Best for markets where latency of minutes is acceptable.
-
Real-time serving: Serve predictions on-demand via a REST API or gRPC service. Required when you need sub-second predictions, such as when responding to live market movements.
-
Streaming serving: Process events as they arrive (e.g., from a Kafka topic) and emit predictions. Best for high-throughput scenarios with complex event processing.
For prediction markets, real-time serving is typically required for trading, while batch serving is sufficient for portfolio rebalancing and reporting.
27.8.2 Building a FastAPI Prediction Server
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
import joblib
import numpy as np
import pandas as pd
import logging
import time
from datetime import datetime
logger = logging.getLogger(__name__)
app = FastAPI(
title="Prediction Market Model Server",
description="Real-time prediction serving for market models",
version="1.0.0"
)
class PredictionRequest(BaseModel):
"""Request schema for single prediction."""
market_id: str
features: Dict[str, float]
class Config:
json_schema_extra = {
"example": {
"market_id": "election_2024_president",
"features": {
"poll_average": 0.52,
"poll_trend": 0.01,
"market_price": 0.48,
"volume_24h": 1250000.0,
"days_to_resolution": 35
}
}
}
class BatchPredictionRequest(BaseModel):
"""Request schema for batch predictions."""
requests: List[PredictionRequest]
class PredictionResponse(BaseModel):
"""Response schema for predictions."""
market_id: str
probability: float = Field(..., ge=0.0, le=1.0)
model_version: str
timestamp: str
latency_ms: float
class BatchPredictionResponse(BaseModel):
"""Response schema for batch predictions."""
predictions: List[PredictionResponse]
total_latency_ms: float
class ModelServer:
"""Manages model loading and prediction."""
def __init__(self):
self.model = None
self.model_version = None
self.feature_names = None
self.request_count = 0
self.total_latency = 0.0
def load_model(self, model_path: str, version: str,
feature_names: List[str]):
"""Load a model from disk."""
self.model = joblib.load(model_path)
self.model_version = version
self.feature_names = feature_names
logger.info(f"Loaded model version {version}")
def predict(self, features: Dict[str, float]) -> float:
"""Make a single prediction."""
if self.model is None:
raise RuntimeError("No model loaded")
# Construct feature vector in correct order
feature_vector = [
features.get(name, 0.0)
for name in self.feature_names
]
X = np.array(feature_vector).reshape(1, -1)
# Predict
proba = self.model.predict_proba(X)[0, 1]
return float(proba)
# Global model server
model_server = ModelServer()
@app.on_event("startup")
async def startup_event():
"""Load model on startup."""
try:
model_server.load_model(
model_path="models/latest_pipeline.joblib",
version="v1.0",
feature_names=[
"poll_average", "poll_trend", "market_price",
"volume_24h", "days_to_resolution"
]
)
except FileNotFoundError:
logger.warning(
"No model found at startup. Load via /admin/load-model"
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make a single prediction."""
start_time = time.time()
try:
probability = model_server.predict(request.features)
except RuntimeError as e:
raise HTTPException(status_code=503, detail=str(e))
except Exception as e:
logger.error(f"Prediction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
latency_ms = (time.time() - start_time) * 1000
model_server.request_count += 1
model_server.total_latency += latency_ms
return PredictionResponse(
market_id=request.market_id,
probability=probability,
model_version=model_server.model_version or "unknown",
timestamp=datetime.now().isoformat(),
latency_ms=round(latency_ms, 2)
)
@app.post("/predict/batch",
response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Make batch predictions."""
start_time = time.time()
predictions = []
for req in request.requests:
pred_start = time.time()
try:
probability = model_server.predict(req.features)
except Exception as e:
logger.error(
f"Prediction error for {req.market_id}: {e}"
)
probability = 0.5 # fallback
pred_latency = (time.time() - pred_start) * 1000
predictions.append(PredictionResponse(
market_id=req.market_id,
probability=probability,
model_version=model_server.model_version or "unknown",
timestamp=datetime.now().isoformat(),
latency_ms=round(pred_latency, 2)
))
total_latency = (time.time() - start_time) * 1000
return BatchPredictionResponse(
predictions=predictions,
total_latency_ms=round(total_latency, 2)
)
@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "healthy" if model_server.model else "no_model",
"model_version": model_server.model_version,
"request_count": model_server.request_count,
"avg_latency_ms": (
model_server.total_latency / model_server.request_count
if model_server.request_count > 0 else 0
)
}
@app.get("/metrics")
async def metrics():
"""Prometheus-compatible metrics endpoint."""
return {
"model_version": model_server.model_version,
"total_requests": model_server.request_count,
"avg_latency_ms": (
model_server.total_latency / model_server.request_count
if model_server.request_count > 0 else 0
),
}
27.8.3 Latency Considerations for Trading
When a prediction market model drives automated trading, latency matters:
- Feature computation: 10-50ms (precomputed features from the online store)
- Model inference: 1-10ms (scikit-learn, XGBoost, or LightGBM)
- Network round-trip: 5-50ms (depends on infrastructure)
- Total budget: Aim for < 100ms end-to-end
Optimization strategies: 1. Precompute features in the online feature store rather than computing on the fly 2. Model simplification: Fewer trees, shallower depth, fewer features 3. Model compilation: Tools like ONNX Runtime or Treelite can speed up tree ensemble inference by 2-10x 4. Caching: Cache predictions for recently-seen feature combinations 5. Batching: Group multiple prediction requests into a single batch inference call
27.8.4 Graceful Degradation
What happens when the model fails? Your serving layer must handle failures gracefully:
class ResilientModelServer:
"""Model server with fallback logic."""
def __init__(self):
self.primary_model = None
self.fallback_model = None
self.fallback_count = 0
def predict(self, features: Dict[str, float]) -> Dict:
"""Predict with fallback."""
try:
proba = self.primary_model.predict(features)
return {
"probability": proba,
"source": "primary",
}
except Exception as e:
logger.error(f"Primary model failed: {e}")
if self.fallback_model:
try:
proba = self.fallback_model.predict(features)
self.fallback_count += 1
return {
"probability": proba,
"source": "fallback",
}
except Exception as e2:
logger.error(f"Fallback model failed: {e2}")
# Ultimate fallback: return market price if available
if 'market_price' in features:
return {
"probability": features['market_price'],
"source": "market_passthrough",
}
# Last resort: return 0.5
return {
"probability": 0.5,
"source": "default",
}
27.9 CI/CD for ML
27.9.1 Why CI/CD for ML Is Different
Traditional CI/CD validates that code works correctly. ML CI/CD must also validate that:
- Models produce reasonable predictions
- Model quality meets minimum thresholds
- Feature pipelines produce expected output
- New models do not regress on key metrics
- Data schemas have not changed unexpectedly
27.9.2 The ML CI/CD Pipeline
Code Push ---> Lint/Test ---> Build ---> Train ---> Validate ---> Deploy
| | | | |
v v v v v
Unit tests Docker Train on Quality Canary
Type checks image validation gates deploy
Lint checks data A/B test Full
comparison deploy
27.9.3 Testing ML Code
ML code requires several types of tests:
Unit tests: Test individual functions (feature engineering, data loading, preprocessing).
import pytest
import numpy as np
import pandas as pd
def test_poll_market_spread_transformer():
"""Test that the spread transformer computes correctly."""
transformer = PollMarketSpreadTransformer()
X = pd.DataFrame({
'poll_average': [0.55, 0.40],
'market_price': [0.50, 0.45],
})
result = transformer.transform(X)
assert 'poll_market_spread' in result.columns
np.testing.assert_allclose(
result['poll_market_spread'].values,
[0.05, -0.05]
)
def test_pipeline_predict_shape():
"""Test that the pipeline produces correct output shape."""
pipeline = build_test_pipeline() # helper to build a small pipeline
X = create_test_data(n_samples=100)
pipeline.fit(X, np.random.randint(0, 2, 100))
predictions = pipeline.predict_proba(X)
assert predictions.shape == (100, 2)
assert np.all(predictions >= 0)
assert np.all(predictions <= 1)
def test_feature_store_point_in_time():
"""Test that the feature store respects point-in-time."""
store = PredictionMarketFeatureStore(":memory:")
store.register_feature("price", "test price")
# Ingest at two timestamps
store.ingest("m1", {"price": 0.50},
datetime(2024, 1, 1))
store.ingest("m1", {"price": 0.60},
datetime(2024, 2, 1))
# Query at Jan 15 should get the Jan 1 value
result = store.get_training_features(
["m1"], ["price"], [datetime(2024, 1, 15)]
)
assert result['price'].iloc[0] == 0.50 # NOT 0.60
Integration tests: Test that components work together (pipeline + feature store, model + serving).
def test_end_to_end_prediction():
"""Test the full prediction path."""
# 1. Ingest features
store = PredictionMarketFeatureStore(":memory:")
store.register_feature("poll_avg", "poll average")
store.register_feature("mkt_price", "market price")
store.ingest("m1", {"poll_avg": 0.55, "mkt_price": 0.50},
datetime(2024, 1, 1))
# 2. Retrieve features
features = store.get_online_features(
"m1", ["poll_avg", "mkt_price"]
)
# 3. Predict (with a simple mock model)
probability = simple_model_predict(features)
# 4. Verify output is valid
assert 0 <= probability <= 1
Model quality tests: Verify that the model meets minimum quality thresholds.
def test_model_quality_gates(trained_model, validation_data):
"""Test that the model meets quality requirements."""
X_val, y_val = validation_data
y_pred = trained_model.predict_proba(X_val)[:, 1]
brier = brier_score_loss(y_val, y_pred)
auc = roc_auc_score(y_val, y_pred)
assert brier < 0.25, f"Brier score too high: {brier:.4f}"
assert auc > 0.60, f"AUC too low: {auc:.4f}"
# Check calibration
prob_true, prob_pred = calibration_curve(
y_val, y_pred, n_bins=10
)
max_cal_error = np.max(np.abs(prob_true - prob_pred))
assert max_cal_error < 0.15, (
f"Max calibration error too high: {max_cal_error:.4f}"
)
27.9.4 GitHub Actions for ML CI/CD
# .github/workflows/ml-ci-cd.yml
name: ML CI/CD Pipeline
on:
push:
branches: [main]
paths:
- 'models/**'
- 'features/**'
- 'pipelines/**'
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run unit tests
run: pytest tests/unit/ -v
- name: Run integration tests
run: pytest tests/integration/ -v
train-and-validate:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Train model on validation data
run: python pipelines/train.py --config config/ci.yaml
- name: Run quality gates
run: python pipelines/validate.py --model-path models/latest
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: models/latest/
deploy:
needs: train-and-validate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: model
path: models/latest/
- name: Deploy to staging
run: python pipelines/deploy.py --stage staging
- name: Run smoke tests
run: python tests/smoke/test_serving.py
- name: Promote to production
run: python pipelines/deploy.py --stage production
27.9.5 Data Validation in CI
import pandera as pa
from pandera import Column, DataFrameSchema, Check
# Define expected schema for training data
training_data_schema = DataFrameSchema({
"poll_average": Column(
float, [
Check.ge(0), Check.le(1),
Check(lambda s: s.isna().mean() < 0.05,
error="Too many nulls in poll_average")
]
),
"market_price": Column(
float, [
Check.ge(0), Check.le(1),
]
),
"volume_24h": Column(
float, [
Check.ge(0),
]
),
"days_to_resolution": Column(
int, [
Check.ge(0), Check.le(3650),
]
),
"outcome": Column(
int, [
Check.isin([0, 1]),
]
),
})
def validate_training_data(df: pd.DataFrame) -> bool:
"""Validate training data schema and quality."""
try:
training_data_schema.validate(df, lazy=True)
return True
except pa.errors.SchemaErrors as e:
logger.error(f"Data validation failed: {e}")
return False
27.10 Reproducibility and Governance
27.10.1 The Reproducibility Challenge
Reproducibility in ML means that given the same data, code, and configuration, you get the same model and predictions. This is harder than it sounds:
- Random seeds must be fixed at every level (NumPy, Python, algorithm-specific)
- Library versions must be pinned (different scikit-learn versions may produce different results)
- Data must be versioned (the same query may return different results on different days)
- Hardware can affect results (floating-point arithmetic differs between CPU architectures)
27.10.2 Strategies for Reproducibility
1. Pin all dependencies:
# requirements.txt
scikit-learn==1.4.0
xgboost==2.0.3
mlflow==2.10.0
pandas==2.2.0
numpy==1.26.3
2. Version your data:
import hashlib
import json
def compute_data_hash(df: pd.DataFrame) -> str:
"""Compute a deterministic hash of a DataFrame."""
# Convert to a canonical string representation
data_str = df.to_csv(index=False)
return hashlib.sha256(data_str.encode()).hexdigest()[:16]
def log_data_version(df: pd.DataFrame, name: str):
"""Log data version to MLflow."""
data_hash = compute_data_hash(df)
mlflow.log_param(f"data_{name}_hash", data_hash)
mlflow.log_param(f"data_{name}_rows", len(df))
mlflow.log_param(f"data_{name}_cols", len(df.columns))
3. Set random seeds everywhere:
import random
import numpy as np
def set_all_seeds(seed: int = 42):
"""Set random seeds for reproducibility."""
random.seed(seed)
np.random.seed(seed)
# If using PyTorch
try:
import torch
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
except ImportError:
pass
4. Log the full environment:
import platform
import sys
def log_environment():
"""Log the full environment to MLflow for reproducibility."""
mlflow.log_param("python_version", sys.version)
mlflow.log_param("platform", platform.platform())
mlflow.log_param("machine", platform.machine())
# Log installed packages
import pkg_resources
packages = {
p.project_name: p.version
for p in pkg_resources.working_set
}
with open("environment.json", "w") as f:
json.dump(packages, f, indent=2)
mlflow.log_artifact("environment.json")
27.10.3 Model Lineage and Audit Trails
For regulatory compliance and debugging, you need to trace: - Which data was used to train each model - Which features were included - What preprocessing was applied - Which model version produced a given prediction - When the prediction was made and what it was
@dataclass
class ModelLineage:
"""Complete lineage record for a model version."""
model_name: str
model_version: str
training_data_hash: str
training_data_date_range: tuple
feature_list: List[str]
hyperparameters: dict
training_metrics: dict
validation_metrics: dict
training_timestamp: str
code_commit: str
environment_hash: str
parent_model_version: Optional[str] = None
def to_dict(self) -> dict:
return {
'model_name': self.model_name,
'model_version': self.model_version,
'training_data_hash': self.training_data_hash,
'training_data_date_range': self.training_data_date_range,
'feature_list': self.feature_list,
'hyperparameters': self.hyperparameters,
'training_metrics': self.training_metrics,
'validation_metrics': self.validation_metrics,
'training_timestamp': self.training_timestamp,
'code_commit': self.code_commit,
'environment_hash': self.environment_hash,
'parent_model_version': self.parent_model_version,
}
def save(self, path: str):
with open(path, 'w') as f:
json.dump(self.to_dict(), f, indent=2, default=str)
class PredictionAuditLog:
"""Audit log for predictions."""
def __init__(self, log_path: str = "audit_log.jsonl"):
self.log_path = log_path
def log_prediction(self, market_id: str, model_version: str,
features: dict, prediction: float,
timestamp: datetime = None):
"""Log a prediction for audit purposes."""
record = {
'timestamp': (timestamp or datetime.now()).isoformat(),
'market_id': market_id,
'model_version': model_version,
'features': features,
'prediction': prediction,
}
with open(self.log_path, 'a') as f:
f.write(json.dumps(record) + '\n')
27.10.4 Compliance Documentation
For prediction markets subject to financial regulation, you may need:
- Model documentation: Description of the model, its intended use, training methodology, and known limitations.
- Validation report: Independent assessment of model quality, including out-of-sample testing and stress testing.
- Change log: Record of all changes to the model, with justification and approval.
- Performance reports: Regular reports on model performance in production.
- Incident records: Documentation of any model failures, their impact, and remediation.
27.11 Putting It All Together: Production ML Architecture
27.11.1 Complete Architecture Diagram
Here is the complete architecture for a production prediction market ML system:
+------------------------------------------------------------------+
| DATA SOURCES |
| [Market APIs] [Polling Data] [News Feeds] [Economic Data] |
+------------------------------------------------------------------+
| | |
v v v
+------------------------------------------------------------------+
| DATA PIPELINE |
| Ingestion -> Cleaning -> Validation -> Feature Computation |
| (Airflow / Prefect / Dagster) |
+------------------------------------------------------------------+
| |
v v
+---------------------------+ +---------------------------+
| OFFLINE FEATURE STORE | | ONLINE FEATURE STORE |
| (Parquet / SQLite / DW) | | (Redis / DynamoDB) |
| Historical features | | Latest feature values |
| Point-in-time joins | | Low-latency reads |
+---------------------------+ +---------------------------+
| |
v |
+---------------------------+ |
| TRAINING PIPELINE | |
| 1. Fetch training data | |
| 2. Build sklearn pipeline| |
| 3. Train model | |
| 4. Validate (gates) | |
| 5. Log to MLflow | |
| (Scheduled + Triggered) | |
+---------------------------+ |
| |
v |
+---------------------------+ |
| MLFLOW | |
| Experiment Tracking | |
| Model Registry | |
| Artifact Store | |
| [None]->[Staging]->[Prod]| |
+---------------------------+ |
| |
v v
+------------------------------------------------------------------+
| MODEL SERVING |
| FastAPI Server |
| Load model from registry |
| Fetch online features |
| Return predictions with < 100ms latency |
| Fallback logic for failures |
+------------------------------------------------------------------+
| |
v v
+---------------------------+ +--------------------------------+
| TRADING SYSTEM | | MODEL MONITORING |
| Receives predictions | | Prediction drift (PSI, KS) |
| Executes trades | | Feature drift |
| Manages positions | | Concept drift (Brier score) |
| Risk management | | Alerts -> Trigger retraining |
+---------------------------+ +--------------------------------+
|
v
+------------------+
| CI/CD PIPELINE |
| Tests |
| Quality gates |
| Auto-deploy |
+------------------+
27.11.2 Data Flow
The data flows through the system in three main paths:
Training path (batch, periodic): 1. Data pipeline ingests raw data from market APIs, polling aggregators, and news feeds 2. Feature computation transforms raw data into features, storing them in the offline feature store with timestamps 3. Training pipeline fetches features with point-in-time correctness, trains a new model, and validates it 4. If validation passes, the model is registered in MLflow and optionally promoted to production
Serving path (real-time): 1. Client sends a prediction request with a market ID 2. Server fetches latest features from the online feature store 3. Model produces a probability estimate 4. Response is returned with the prediction, model version, and latency
Monitoring path (continuous): 1. All predictions are logged with their features and model version 2. When outcomes are known, they are matched with predictions 3. Drift detection runs on a schedule, comparing current distributions to reference 4. If drift is detected, alerts are raised and retraining may be triggered
27.11.3 Handling Market Events
Prediction markets have a unique lifecycle that the system must handle:
- Market creation: A new market appears. Features are sparse initially.
- Active trading: The market has sufficient data for meaningful predictions.
- Late stage: The event is imminent. Predictions should converge toward certainty.
- Resolution: The outcome is known. Update training data, evaluate past predictions.
The system should detect which phase a market is in and adjust behavior accordingly: - In early stages, rely more on prior information and be conservative - In active trading, use the full model - In late stages, increase retraining frequency - At resolution, evaluate and log performance for future training
27.11.4 Scaling Considerations
As the number of markets and prediction frequency grows:
| Scale | Markets | Predictions/sec | Architecture |
|---|---|---|---|
| Small | < 100 | < 10 | Single server, SQLite features |
| Medium | 100-1000 | 10-100 | Redis features, load-balanced API |
| Large | 1000+ | 100+ | Distributed features, Kubernetes, horizontal scaling |
For most individual traders and small teams, the "small" architecture is more than sufficient. Do not over-engineer. Start simple and scale when you have evidence that you need to.
27.12 Chapter Summary
This chapter covered the full MLOps lifecycle for prediction market models:
-
scikit-learn Pipelines: Encapsulate preprocessing and modeling into a single, serializable object. Use
ColumnTransformerfor heterogeneous data and custom transformers for domain-specific features. -
Feature Stores: Centralize feature management with offline stores for training (point-in-time correctness) and online stores for serving (low-latency lookups).
-
Experiment Tracking: Use MLflow to log every training run with parameters, metrics, and artifacts. Compare runs systematically.
-
Model Versioning: Manage model lifecycle with staging, production, and archived states. Validate before promoting. Support rollback.
-
Automated Training: Schedule retraining and/or trigger it based on drift detection or new data. Validate through automated quality gates.
-
Model Monitoring: Detect data drift (PSI, KS test), concept drift (performance degradation), and prediction drift. Raise alerts and trigger remediation.
-
Model Serving: Deploy behind FastAPI with health checks, batch support, latency tracking, and graceful degradation.
-
CI/CD for ML: Test ML code (unit, integration, and model quality tests), validate data schemas, and automate deployment with quality gates.
-
Reproducibility and Governance: Pin dependencies, version data, set seeds, maintain audit trails, and document model lineage.
This completes Part IV: Data Science & Modeling. You now have the tools to build, deploy, and maintain production-grade ML systems for prediction markets.
The key insight of this chapter is that building a good model is necessary but not sufficient. The model must be embedded in a system that keeps it fresh, monitors its health, and responds automatically when conditions change. Prediction markets are dynamic environments where yesterday's model is, almost by definition, less useful than today's.
What's Next
Part V: Market Design & Mechanism Engineering shifts focus from modeling to designing and building prediction markets themselves. In the chapters ahead, we will explore:
- Chapter 28: Principles of prediction market design --- question design, resolution criteria, and wording pitfalls
- Chapter 29: Liquidity provision and market making --- subsidized market making, adverse selection modeling, and incentive design
- Chapter 30: Combinatorial prediction markets --- handling the combinatorial explosion with LMSR extensions and approximations
- Chapter 31: Decision markets and futarchy --- conditional markets, causal inference, and corporate experiments
- Chapter 32: Building a platform from scratch --- FastAPI backend, order book engine, LMSR AMM, and REST API
With a production ML system in place (this chapter), you will be able to apply your models to the platforms and mechanisms you learn about in Part V, bridging the gap between data science and market engineering.
Related Reading
Explore this topic in other books
AI Engineering Feature Engineering for AI Sports Betting Feature Engineering for Betting