Building a machine learning model that predicts sports outcomes is only the first step. The real engineering challenge lies in constructing a complete, production-ready pipeline that ingests data, computes features, trains models, serves...
In This Chapter
Chapter 31: The Complete ML Betting Pipeline
Building a machine learning model that predicts sports outcomes is only the first step. The real engineering challenge lies in constructing a complete, production-ready pipeline that ingests data, computes features, trains models, serves predictions, executes bets, manages risk, and monitors everything continuously. This chapter bridges the gap between a Jupyter notebook prototype and a robust, automated betting system.
We will walk through every layer of the pipeline---from system architecture to monitoring dashboards---with practical Python code throughout. By the end, you will have a blueprint for building a system that runs autonomously, makes disciplined betting decisions, and alerts you when something goes wrong.
31.1 System Architecture and Design
The End-to-End Architecture
A production ML betting pipeline is a chain of interconnected components, each responsible for a specific function. Before writing any code, you need to understand how data flows through the entire system. Here is a text-based architecture diagram:
┌─────────────────────────────────────────────────────────────────────┐
│ ML BETTING PIPELINE │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Data │──>│ Feature │──>│ Model │ │
│ │ Ingestion│ │ Store │ │ Training │ │
│ └──────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │ │
│ │ ┌──────────────────────┘ │
│ │ v │
│ │ ┌──────────────┐ ┌──────────────┐ │
│ │ │ Model │──>│ Prediction │ │
│ │ │ Registry │ │ Service │ │
│ │ └──────────────┘ └──────┬───────┘ │
│ │ │ │
│ │ v │
│ │ ┌──────────────┐ ┌──────────────┐ │
│ │ │ Risk & │<──│ Bet │ │
│ │ │ Portfolio │ │ Execution │ │
│ │ └──────────────┘ └──────────────┘ │
│ │ │
│ v │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Monitoring, Logging, Alerting │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
The data flows from left to right and top to bottom. Raw data enters through ingestion, gets transformed into features, feeds into model training or prediction, and ultimately drives bet execution. Monitoring wraps around the entire system, observing every component.
Microservices vs. Monolith
One of the first architectural decisions is whether to build a monolithic application or a collection of microservices.
Monolithic Architecture:
In a monolith, all components---ingestion, feature computation, model training, prediction, and bet execution---live in a single codebase and run as a single process (or a small set of scheduled scripts). This is the right starting point for most individual bettors and small teams.
Advantages: - Simpler deployment and debugging - No inter-service communication overhead - Easier to reason about data flow - Lower infrastructure cost
Disadvantages: - Harder to scale individual components - A failure in one component can crash the entire system - Longer deployment cycles as the codebase grows
Microservices Architecture:
In a microservices approach, each component is an independent service with its own deployment, scaling, and failure isolation. Services communicate via REST APIs, message queues (like RabbitMQ or Redis Streams), or event buses.
Advantages: - Independent scaling (e.g., scale prediction service separately from ingestion) - Fault isolation - Technology diversity (use different languages/frameworks per service) - Independent deployment cycles
Disadvantages: - Significant operational complexity - Network latency between services - Distributed system challenges (consistency, partial failures) - Requires container orchestration (Docker, Kubernetes)
Recommendation: Start with a well-structured monolith. Use clear module boundaries so you can extract services later if needed. Most sports betting operations do not need microservices until they are processing thousands of bets per day across multiple sports and markets simultaneously.
Data Flow and Scheduling
The pipeline operates on two timescales:
- Batch processing (hourly/daily): Data ingestion, feature computation, model retraining. Typically orchestrated with a scheduler.
- Real-time processing (seconds/minutes): Prediction serving, bet execution, odds monitoring. Runs as long-lived services.
For scheduling, you have several options:
- cron (Unix) or Task Scheduler (Windows): Simple, built-in, no dependencies.
- APScheduler (Python library): In-process scheduling with cron-like syntax.
- Apache Airflow: Full-featured workflow orchestration with DAGs, retries, and monitoring.
- Prefect / Dagster: Modern alternatives to Airflow with better developer experience.
For a betting pipeline, APScheduler or a simple cron-based approach is usually sufficient. Airflow becomes valuable when you have complex dependency chains between tasks (e.g., "train model only after all features are computed for the day").
Python Project Structure
A well-organized project structure makes the codebase maintainable as it grows. Here is the recommended layout:
betting_pipeline/
├── config/
│ ├── __init__.py
│ ├── settings.py # Global configuration
│ └── logging_config.py # Logging setup
├── ingestion/
│ ├── __init__.py
│ ├── scrapers.py # Data source scrapers
│ ├── api_clients.py # API wrappers
│ └── raw_storage.py # Raw data persistence
├── features/
│ ├── __init__.py
│ ├── feature_store.py # Feature store implementation
│ ├── transformers.py # Feature computation logic
│ └── validators.py # Data quality checks
├── models/
│ ├── __init__.py
│ ├── training.py # Training pipeline
│ ├── registry.py # Model versioning/registry
│ ├── serving.py # Prediction API
│ └── evaluation.py # Model evaluation utilities
├── execution/
│ ├── __init__.py
│ ├── bet_sizing.py # Kelly criterion, position sizing
│ ├── bet_placer.py # Bet placement automation
│ └── risk_manager.py # Risk limits and controls
├── monitoring/
│ ├── __init__.py
│ ├── metrics.py # Performance tracking
│ ├── drift_detection.py # Data/model drift
│ └── alerts.py # Alerting system
├── database/
│ ├── __init__.py
│ ├── models.py # SQLAlchemy ORM models
│ └── connection.py # Database connection management
├── tests/
│ ├── test_features.py
│ ├── test_models.py
│ ├── test_execution.py
│ └── test_monitoring.py
├── scripts/
│ ├── run_pipeline.py # Main entry point
│ ├── retrain_model.py # Manual retraining script
│ └── backfill_features.py # Historical feature computation
├── requirements.txt
├── setup.py
└── README.md
Here is a foundational configuration module:
# config/settings.py
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from pathlib import Path
@dataclass
class DatabaseConfig:
host: str = "localhost"
port: int = 5432
name: str = "betting_pipeline"
user: str = "pipeline"
password: str = ""
@property
def connection_string(self) -> str:
return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.name}"
@dataclass
class ModelConfig:
model_dir: Path = Path("models/artifacts")
retrain_interval_hours: int = 24
min_training_samples: int = 500
validation_split: float = 0.2
random_seed: int = 42
@dataclass
class ExecutionConfig:
max_bet_size: float = 100.0 # Maximum dollars per bet
daily_loss_limit: float = 500.0 # Stop betting after this daily loss
min_edge_threshold: float = 0.02 # Minimum 2% expected edge
kelly_fraction: float = 0.25 # Quarter Kelly
max_concurrent_bets: int = 20
@dataclass
class PipelineConfig:
database: DatabaseConfig = field(default_factory=DatabaseConfig)
model: ModelConfig = field(default_factory=ModelConfig)
execution: ExecutionConfig = field(default_factory=ExecutionConfig)
log_level: str = "INFO"
environment: str = "development" # development, staging, production
@classmethod
def from_env(cls) -> "PipelineConfig":
"""Load configuration from environment variables."""
db_config = DatabaseConfig(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "5432")),
name=os.getenv("DB_NAME", "betting_pipeline"),
user=os.getenv("DB_USER", "pipeline"),
password=os.getenv("DB_PASSWORD", ""),
)
exec_config = ExecutionConfig(
max_bet_size=float(os.getenv("MAX_BET_SIZE", "100.0")),
daily_loss_limit=float(os.getenv("DAILY_LOSS_LIMIT", "500.0")),
min_edge_threshold=float(os.getenv("MIN_EDGE", "0.02")),
kelly_fraction=float(os.getenv("KELLY_FRACTION", "0.25")),
)
return cls(
database=db_config,
execution=exec_config,
environment=os.getenv("ENVIRONMENT", "development"),
)
This configuration-as-code approach ensures that every parameter is documented, typed, and easily overridden via environment variables for different deployment environments.
31.2 Data Ingestion and Feature Store
Automated Data Pulling
The ingestion layer is responsible for pulling raw data from external sources---sports APIs, odds feeds, injury reports, weather services---and storing it in a normalized format. Reliability is paramount: data sources go down, APIs change formats, rate limits get hit. Your ingestion code must handle all of this gracefully.
# ingestion/api_clients.py
import time
import logging
import requests
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class APIResponse:
data: Any
status_code: int
timestamp: datetime
source: str
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(self, calls_per_minute: int = 60):
self.calls_per_minute = calls_per_minute
self.interval = 60.0 / calls_per_minute
self.last_call_time = 0.0
def wait(self):
now = time.time()
elapsed = now - self.last_call_time
if elapsed < self.interval:
time.sleep(self.interval - elapsed)
self.last_call_time = time.time()
class SportsDataClient:
"""Generic sports data API client with retry logic."""
def __init__(self, base_url: str, api_key: str,
calls_per_minute: int = 30):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
})
self.rate_limiter = RateLimiter(calls_per_minute)
self.max_retries = 3
self.retry_delay = 5.0 # seconds
def _request(self, endpoint: str, params: Optional[Dict] = None
) -> APIResponse:
url = f"{self.base_url}/{endpoint}"
for attempt in range(self.max_retries):
try:
self.rate_limiter.wait()
response = self.session.get(url, params=params, timeout=30)
if response.status_code == 200:
return APIResponse(
data=response.json(),
status_code=200,
timestamp=datetime.utcnow(),
source=endpoint,
)
elif response.status_code == 429: # Rate limited
wait_time = self.retry_delay * (2 ** attempt)
logger.warning(
f"Rate limited on {endpoint}. "
f"Waiting {wait_time}s (attempt {attempt + 1})"
)
time.sleep(wait_time)
elif response.status_code >= 500:
logger.warning(
f"Server error {response.status_code} on {endpoint}. "
f"Retrying (attempt {attempt + 1})"
)
time.sleep(self.retry_delay)
else:
logger.error(
f"API error {response.status_code}: "
f"{response.text[:200]}"
)
return APIResponse(
data=None,
status_code=response.status_code,
timestamp=datetime.utcnow(),
source=endpoint,
)
except requests.exceptions.Timeout:
logger.warning(f"Timeout on {endpoint} (attempt {attempt + 1})")
time.sleep(self.retry_delay)
except requests.exceptions.ConnectionError:
logger.warning(
f"Connection error on {endpoint} (attempt {attempt + 1})"
)
time.sleep(self.retry_delay * 2)
logger.error(f"All retries exhausted for {endpoint}")
return APIResponse(
data=None, status_code=0,
timestamp=datetime.utcnow(), source=endpoint,
)
def get_games(self, sport: str, date: str) -> List[Dict]:
"""Fetch games for a given sport and date."""
response = self._request(f"sports/{sport}/games", {"date": date})
if response.data and "games" in response.data:
return response.data["games"]
return []
def get_odds(self, game_id: str) -> Dict:
"""Fetch current odds for a specific game."""
response = self._request(f"games/{game_id}/odds")
return response.data or {}
def get_team_stats(self, team_id: str, season: str) -> Dict:
"""Fetch team statistics for a season."""
response = self._request(
f"teams/{team_id}/stats", {"season": season}
)
return response.data or {}
class OddsClient:
"""Client specifically for odds data, supporting multiple books."""
def __init__(self, api_key: str):
self.client = SportsDataClient(
base_url="https://api.the-odds-api.com/v4",
api_key=api_key,
calls_per_minute=20,
)
def get_live_odds(self, sport: str,
markets: List[str] = None) -> List[Dict]:
"""Get live odds across multiple sportsbooks."""
if markets is None:
markets = ["h2h", "spreads", "totals"]
all_odds = []
for market in markets:
response = self.client._request(
f"sports/{sport}/odds",
params={
"regions": "us",
"markets": market,
"oddsFormat": "american",
},
)
if response.data:
all_odds.extend(response.data)
return all_odds
Feature Computation Pipelines
Raw data is rarely useful for models directly. Features must be computed from raw inputs: rolling averages, head-to-head records, Elo ratings, rest days, travel distance, and hundreds of other transformations. The feature computation pipeline should be deterministic---given the same raw data, it always produces the same features.
# features/transformers.py
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import logging
logger = logging.getLogger(__name__)
class FeatureTransformer(ABC):
"""Base class for all feature transformers."""
@abstractmethod
def transform(self, raw_data: pd.DataFrame) -> pd.DataFrame:
pass
@abstractmethod
def feature_names(self) -> List[str]:
pass
class RollingStatsTransformer(FeatureTransformer):
"""Compute rolling statistics for team performance metrics."""
def __init__(self, windows: List[int] = None,
metrics: List[str] = None):
self.windows = windows or [5, 10, 20]
self.metrics = metrics or [
"points_scored", "points_allowed", "turnovers",
"field_goal_pct", "three_point_pct",
]
def transform(self, raw_data: pd.DataFrame) -> pd.DataFrame:
"""Compute rolling means and standard deviations.
Input must have columns: team_id, game_date, and all metrics.
Data must be sorted by game_date within each team.
"""
features = raw_data[["team_id", "game_date"]].copy()
for team_id in raw_data["team_id"].unique():
team_mask = raw_data["team_id"] == team_id
team_data = raw_data.loc[team_mask].sort_values("game_date")
for metric in self.metrics:
if metric not in team_data.columns:
continue
for window in self.windows:
# Rolling mean (shift to avoid data leakage)
col_mean = f"{metric}_rolling_mean_{window}"
features.loc[team_mask, col_mean] = (
team_data[metric]
.shift(1)
.rolling(window, min_periods=max(1, window // 2))
.mean()
)
# Rolling standard deviation
col_std = f"{metric}_rolling_std_{window}"
features.loc[team_mask, col_std] = (
team_data[metric]
.shift(1)
.rolling(window, min_periods=max(3, window // 2))
.std()
)
return features
def feature_names(self) -> List[str]:
names = []
for metric in self.metrics:
for window in self.windows:
names.append(f"{metric}_rolling_mean_{window}")
names.append(f"{metric}_rolling_std_{window}")
return names
class EloTransformer(FeatureTransformer):
"""Compute Elo ratings from game results."""
def __init__(self, k_factor: float = 20.0,
home_advantage: float = 100.0,
initial_rating: float = 1500.0):
self.k_factor = k_factor
self.home_advantage = home_advantage
self.initial_rating = initial_rating
self.ratings: Dict[str, float] = {}
def _expected_score(self, rating_a: float, rating_b: float) -> float:
return 1.0 / (1.0 + 10 ** ((rating_b - rating_a) / 400.0))
def _update_ratings(self, team_a: str, team_b: str,
score_a: float, is_home_a: bool):
ra = self.ratings.get(team_a, self.initial_rating)
rb = self.ratings.get(team_b, self.initial_rating)
ha = self.home_advantage if is_home_a else -self.home_advantage
expected_a = self._expected_score(ra + ha, rb)
self.ratings[team_a] = ra + self.k_factor * (score_a - expected_a)
self.ratings[team_b] = rb + self.k_factor * (
(1 - score_a) - (1 - expected_a)
)
def transform(self, raw_data: pd.DataFrame) -> pd.DataFrame:
"""Compute Elo ratings from game results.
Input must have: game_date, home_team_id, away_team_id,
home_score, away_score. Must be sorted by game_date.
"""
self.ratings = {}
records = []
for _, row in raw_data.sort_values("game_date").iterrows():
home = row["home_team_id"]
away = row["away_team_id"]
# Record pre-game ratings
home_elo = self.ratings.get(home, self.initial_rating)
away_elo = self.ratings.get(away, self.initial_rating)
records.append({
"game_date": row["game_date"],
"home_team_id": home,
"away_team_id": away,
"home_elo_pre": home_elo,
"away_elo_pre": away_elo,
"elo_diff": home_elo - away_elo + self.home_advantage,
})
# Update ratings with result
if row["home_score"] > row["away_score"]:
score_a = 1.0
elif row["home_score"] < row["away_score"]:
score_a = 0.0
else:
score_a = 0.5
self._update_ratings(home, away, score_a, is_home_a=True)
return pd.DataFrame(records)
def feature_names(self) -> List[str]:
return ["home_elo_pre", "away_elo_pre", "elo_diff"]
class RestDaysTransformer(FeatureTransformer):
"""Compute rest days and travel features."""
def transform(self, raw_data: pd.DataFrame) -> pd.DataFrame:
features = raw_data[["team_id", "game_date"]].copy()
features["rest_days"] = np.nan
for team_id in raw_data["team_id"].unique():
mask = raw_data["team_id"] == team_id
team_games = raw_data.loc[mask].sort_values("game_date")
dates = pd.to_datetime(team_games["game_date"])
rest = dates.diff().dt.days
features.loc[mask, "rest_days"] = rest.values
# Cap rest days at 14 (anything more is basically full rest)
features["rest_days"] = features["rest_days"].clip(upper=14)
# Binary feature: back-to-back game
features["is_back_to_back"] = (features["rest_days"] == 1).astype(int)
return features
def feature_names(self) -> List[str]:
return ["rest_days", "is_back_to_back"]
Feature Versioning
Feature definitions evolve over time. A feature called "rolling_mean_10" might use a different window size or normalization method in version 2 versus version 1. If you retrain a model, you need to know exactly which version of each feature was used. Without versioning, you risk subtle bugs where a model trained on one feature definition is served with features computed under a different definition.
Key principles of feature versioning:
- Every feature transformation has a version string. When you change the logic, increment the version.
- Features are stored with their version metadata. The feature store records which transformer version produced each value.
- Models record their feature versions at training time. When serving, verify that the feature versions match.
Storing Features in Databases
For a betting pipeline, a relational database (PostgreSQL) or a lightweight embedded database (SQLite) works well for feature storage. The schema should support efficient lookups by entity (team, player) and time.
# features/feature_store.py
import sqlite3
import json
import hashlib
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Any
from datetime import datetime
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class FeatureStore:
"""Simple file-backed feature store using SQLite.
Stores computed features with versioning, timestamps, and metadata.
Supports point-in-time lookups to prevent data leakage in backtesting.
"""
def __init__(self, db_path: str = "feature_store.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS features (
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
feature_name TEXT NOT NULL,
feature_value REAL,
feature_version TEXT NOT NULL,
event_date TEXT NOT NULL,
computed_at TEXT NOT NULL,
PRIMARY KEY (entity_type, entity_id, feature_name,
feature_version, event_date)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_features_lookup
ON features (entity_type, entity_id, event_date)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS feature_metadata (
feature_name TEXT NOT NULL,
feature_version TEXT NOT NULL,
description TEXT,
transformer_class TEXT,
transformer_params TEXT,
created_at TEXT NOT NULL,
PRIMARY KEY (feature_name, feature_version)
)
""")
def register_feature(self, name: str, version: str,
description: str, transformer_class: str,
params: Dict[str, Any]):
"""Register a feature definition with metadata."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO feature_metadata
(feature_name, feature_version, description,
transformer_class, transformer_params, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(name, version, description, transformer_class,
json.dumps(params), datetime.utcnow().isoformat()),
)
logger.info(f"Registered feature {name} v{version}")
def store_features(self, entity_type: str, features_df: pd.DataFrame,
feature_version: str):
"""Store a DataFrame of features.
DataFrame must have columns: entity_id, event_date,
plus one or more feature value columns.
"""
computed_at = datetime.utcnow().isoformat()
feature_cols = [
c for c in features_df.columns
if c not in ("entity_id", "event_date")
]
rows = []
for _, row in features_df.iterrows():
for col in feature_cols:
val = row[col]
if pd.isna(val):
continue
rows.append((
entity_type,
str(row["entity_id"]),
col,
float(val),
feature_version,
str(row["event_date"]),
computed_at,
))
with sqlite3.connect(self.db_path) as conn:
conn.executemany(
"""INSERT OR REPLACE INTO features
(entity_type, entity_id, feature_name, feature_value,
feature_version, event_date, computed_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
rows,
)
logger.info(
f"Stored {len(rows)} feature values for "
f"{len(features_df)} entities"
)
def get_features(self, entity_type: str, entity_id: str,
event_date: str, feature_names: List[str] = None,
feature_version: str = None) -> Dict[str, float]:
"""Retrieve features for a specific entity and date.
Point-in-time lookup: returns features as of event_date,
never using future data.
"""
query = """
SELECT feature_name, feature_value
FROM features
WHERE entity_type = ? AND entity_id = ? AND event_date <= ?
"""
params: list = [entity_type, entity_id, event_date]
if feature_names:
placeholders = ",".join("?" * len(feature_names))
query += f" AND feature_name IN ({placeholders})"
params.extend(feature_names)
if feature_version:
query += " AND feature_version = ?"
params.append(feature_version)
# Get the most recent value for each feature up to event_date
query = f"""
SELECT feature_name, feature_value
FROM ({query})
GROUP BY feature_name
HAVING event_date = MAX(event_date)
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(query, params)
return {row[0]: row[1] for row in cursor.fetchall()}
def get_training_matrix(self, entity_type: str,
feature_names: List[str],
start_date: str, end_date: str,
feature_version: str = None
) -> pd.DataFrame:
"""Build a feature matrix for model training.
Returns a DataFrame with entity_id, event_date, and all
requested features as columns.
"""
query = """
SELECT entity_id, event_date, feature_name, feature_value
FROM features
WHERE entity_type = ?
AND event_date BETWEEN ? AND ?
AND feature_name IN ({})
""".format(",".join("?" * len(feature_names)))
params = [entity_type, start_date, end_date] + feature_names
if feature_version:
query += " AND feature_version = ?"
params.append(feature_version)
with sqlite3.connect(self.db_path) as conn:
df = pd.read_sql_query(query, conn, params=params)
if df.empty:
return pd.DataFrame()
# Pivot from long to wide format
matrix = df.pivot_table(
index=["entity_id", "event_date"],
columns="feature_name",
values="feature_value",
).reset_index()
matrix.columns.name = None
return matrix
This feature store supports point-in-time correctness, which is critical for backtesting. When you ask for features "as of" a given date, you never get values computed from future data. This prevents the data leakage that plagues many backtesting frameworks.
31.3 Model Training and Serving
Training Pipeline
The training pipeline orchestrates the full cycle: pulling features from the store, splitting data, training the model, evaluating it, and saving artifacts. It should be idempotent---running it twice with the same data produces the same result.
# models/training.py
import pickle
import json
import hashlib
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import (
log_loss, brier_score_loss, roc_auc_score, accuracy_score
)
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
import logging
logger = logging.getLogger(__name__)
class ModelTrainer:
"""Handles the complete model training lifecycle."""
def __init__(self, feature_store, model_dir: str = "models/artifacts"):
self.feature_store = feature_store
self.model_dir = Path(model_dir)
self.model_dir.mkdir(parents=True, exist_ok=True)
def prepare_data(self, sport: str, feature_names: List[str],
target_col: str, start_date: str, end_date: str
) -> Tuple[pd.DataFrame, pd.Series]:
"""Pull features and target from feature store."""
all_features = feature_names + [target_col]
matrix = self.feature_store.get_training_matrix(
entity_type=f"{sport}_game",
feature_names=all_features,
start_date=start_date,
end_date=end_date,
)
if matrix.empty:
raise ValueError(
f"No training data found between {start_date} and {end_date}"
)
# Drop rows with missing target
matrix = matrix.dropna(subset=[target_col])
# Separate features and target
X = matrix[feature_names].copy()
y = matrix[target_col].copy()
# Log data summary
logger.info(
f"Training data: {len(X)} samples, "
f"{len(feature_names)} features, "
f"target distribution: {y.mean():.3f}"
)
return X, y
def train(self, X: pd.DataFrame, y: pd.Series,
model_params: Dict[str, Any] = None,
calibrate: bool = True
) -> Tuple[Any, Dict[str, float]]:
"""Train and evaluate a model using time-series cross-validation."""
if model_params is None:
model_params = {
"n_estimators": 300,
"max_depth": 5,
"learning_rate": 0.05,
"subsample": 0.8,
"min_samples_leaf": 20,
"random_state": 42,
}
# Time-series cross-validation
tscv = TimeSeriesSplit(n_splits=5)
cv_metrics = {
"log_loss": [], "brier_score": [],
"auc": [], "accuracy": [],
}
for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
# Handle missing values
X_train = X_train.fillna(X_train.median())
X_val = X_val.fillna(X_train.median())
model = GradientBoostingClassifier(**model_params)
model.fit(X_train, y_train)
y_prob = model.predict_proba(X_val)[:, 1]
y_pred = model.predict(X_val)
cv_metrics["log_loss"].append(log_loss(y_val, y_prob))
cv_metrics["brier_score"].append(brier_score_loss(y_val, y_prob))
cv_metrics["auc"].append(roc_auc_score(y_val, y_prob))
cv_metrics["accuracy"].append(accuracy_score(y_val, y_pred))
logger.info(
f"Fold {fold + 1}: log_loss={cv_metrics['log_loss'][-1]:.4f}, "
f"AUC={cv_metrics['auc'][-1]:.4f}"
)
# Summary metrics
metrics = {
k: {"mean": np.mean(v), "std": np.std(v)}
for k, v in cv_metrics.items()
}
# Train final model on all data
X_filled = X.fillna(X.median())
final_model = GradientBoostingClassifier(**model_params)
final_model.fit(X_filled, y)
# Calibrate probabilities
if calibrate:
cal_model = CalibratedClassifierCV(
final_model, cv=3, method="isotonic"
)
cal_model.fit(X_filled, y)
final_model = cal_model
logger.info(
f"Training complete. CV log_loss: "
f"{metrics['log_loss']['mean']:.4f} "
f"+/- {metrics['log_loss']['std']:.4f}"
)
return final_model, metrics
def save_model(self, model: Any, metrics: Dict,
feature_names: List[str], sport: str,
model_params: Dict[str, Any] = None) -> str:
"""Save model artifact with metadata."""
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
model_id = f"{sport}_{timestamp}"
model_path = self.model_dir / model_id
model_path.mkdir(exist_ok=True)
# Save model
with open(model_path / "model.pkl", "wb") as f:
pickle.dump(model, f)
# Save metadata
metadata = {
"model_id": model_id,
"sport": sport,
"created_at": datetime.utcnow().isoformat(),
"feature_names": feature_names,
"model_params": model_params or {},
"cv_metrics": metrics,
"model_type": type(model).__name__,
}
with open(model_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2, default=str)
logger.info(f"Model saved: {model_id}")
return model_id
Model Registry
The model registry tracks all trained models, their metrics, and which one is currently "active" (serving predictions). This enables A/B testing and easy rollback.
# models/registry.py
import json
import pickle
import sqlite3
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
import logging
logger = logging.getLogger(__name__)
class ModelRegistry:
"""Tracks trained models and manages active model selection."""
def __init__(self, db_path: str = "model_registry.db",
model_dir: str = "models/artifacts"):
self.db_path = db_path
self.model_dir = Path(model_dir)
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS models (
model_id TEXT PRIMARY KEY,
sport TEXT NOT NULL,
model_type TEXT NOT NULL,
feature_names TEXT NOT NULL,
cv_log_loss REAL,
cv_auc REAL,
is_active INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
activated_at TEXT,
deactivated_at TEXT,
metadata TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS ab_tests (
test_id TEXT PRIMARY KEY,
model_a_id TEXT NOT NULL,
model_b_id TEXT NOT NULL,
sport TEXT NOT NULL,
traffic_split REAL DEFAULT 0.5,
start_date TEXT NOT NULL,
end_date TEXT,
status TEXT DEFAULT 'running',
results TEXT,
FOREIGN KEY (model_a_id) REFERENCES models(model_id),
FOREIGN KEY (model_b_id) REFERENCES models(model_id)
)
""")
def register(self, model_id: str, sport: str,
metrics: Dict, feature_names: List[str],
metadata: Dict = None):
"""Register a trained model in the registry."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT INTO models
(model_id, sport, model_type, feature_names,
cv_log_loss, cv_auc, created_at, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
model_id, sport,
metadata.get("model_type", "unknown") if metadata else "unknown",
json.dumps(feature_names),
metrics.get("log_loss", {}).get("mean"),
metrics.get("auc", {}).get("mean"),
datetime.utcnow().isoformat(),
json.dumps(metadata or {}),
),
)
logger.info(f"Registered model {model_id}")
def activate(self, model_id: str, sport: str):
"""Set a model as the active model for a sport."""
with sqlite3.connect(self.db_path) as conn:
# Deactivate current active model
conn.execute(
"""UPDATE models SET is_active = 0, deactivated_at = ?
WHERE sport = ? AND is_active = 1""",
(datetime.utcnow().isoformat(), sport),
)
# Activate new model
conn.execute(
"""UPDATE models SET is_active = 1, activated_at = ?
WHERE model_id = ?""",
(datetime.utcnow().isoformat(), model_id),
)
logger.info(f"Activated model {model_id} for {sport}")
def get_active_model(self, sport: str) -> Optional[Tuple[str, Any]]:
"""Load the currently active model for a sport."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT model_id FROM models WHERE sport = ? AND is_active = 1",
(sport,),
)
row = cursor.fetchone()
if not row:
logger.warning(f"No active model for {sport}")
return None
model_id = row[0]
model_path = self.model_dir / model_id / "model.pkl"
with open(model_path, "rb") as f:
model = pickle.load(f)
return model_id, model
def start_ab_test(self, model_a_id: str, model_b_id: str,
sport: str, traffic_split: float = 0.5) -> str:
"""Start an A/B test between two models."""
test_id = f"ab_{sport}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT INTO ab_tests
(test_id, model_a_id, model_b_id, sport,
traffic_split, start_date)
VALUES (?, ?, ?, ?, ?, ?)""",
(test_id, model_a_id, model_b_id, sport,
traffic_split, datetime.utcnow().isoformat()),
)
logger.info(
f"Started A/B test {test_id}: "
f"{model_a_id} vs {model_b_id} ({traffic_split:.0%} split)"
)
return test_id
Serving Predictions via API
For real-time predictions, we expose the model through a lightweight API. FastAPI is the preferred choice for Python model serving due to its async support, automatic documentation, and type validation.
# models/serving.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Dict, List, Optional
import numpy as np
import pandas as pd
from datetime import datetime
import logging
import uvicorn
logger = logging.getLogger(__name__)
app = FastAPI(
title="Betting Model Prediction Service",
version="1.0.0",
)
# Global state (initialized at startup)
model_registry = None
feature_store = None
class PredictionRequest(BaseModel):
sport: str = Field(..., example="nba")
home_team_id: str = Field(..., example="LAL")
away_team_id: str = Field(..., example="BOS")
game_date: str = Field(..., example="2025-01-15")
additional_features: Optional[Dict[str, float]] = None
class PredictionResponse(BaseModel):
home_win_probability: float
away_win_probability: float
model_id: str
prediction_timestamp: str
feature_values: Dict[str, float]
class HealthResponse(BaseModel):
status: str
active_models: Dict[str, str]
uptime_seconds: float
start_time = datetime.utcnow()
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Check service health and list active models."""
return HealthResponse(
status="healthy",
active_models={}, # Populated from registry
uptime_seconds=(datetime.utcnow() - start_time).total_seconds(),
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Generate a prediction for a game."""
# Load active model
result = model_registry.get_active_model(request.sport)
if result is None:
raise HTTPException(
status_code=404,
detail=f"No active model for sport: {request.sport}",
)
model_id, model = result
# Load model metadata to get feature names
import json
from pathlib import Path
meta_path = (
Path(model_registry.model_dir) / model_id / "metadata.json"
)
with open(meta_path) as f:
metadata = json.load(f)
feature_names = metadata["feature_names"]
# Fetch features from feature store
home_features = feature_store.get_features(
entity_type="team",
entity_id=request.home_team_id,
event_date=request.game_date,
feature_names=feature_names,
)
away_features = feature_store.get_features(
entity_type="team",
entity_id=request.away_team_id,
event_date=request.game_date,
feature_names=feature_names,
)
# Construct feature vector
feature_vector = {}
for fname in feature_names:
if fname.startswith("home_"):
base = fname.replace("home_", "")
feature_vector[fname] = home_features.get(base, np.nan)
elif fname.startswith("away_"):
base = fname.replace("away_", "")
feature_vector[fname] = away_features.get(base, np.nan)
elif fname == "elo_diff":
home_elo = home_features.get("elo_rating", 1500)
away_elo = away_features.get("elo_rating", 1500)
feature_vector[fname] = home_elo - away_elo
else:
feature_vector[fname] = home_features.get(
fname, away_features.get(fname, np.nan)
)
# Add any additional features from the request
if request.additional_features:
feature_vector.update(request.additional_features)
# Create DataFrame and predict
X = pd.DataFrame([feature_vector])[feature_names]
X = X.fillna(0) # Simple imputation for missing values
home_prob = model.predict_proba(X)[0, 1]
return PredictionResponse(
home_win_probability=round(float(home_prob), 4),
away_win_probability=round(1.0 - float(home_prob), 4),
model_id=model_id,
prediction_timestamp=datetime.utcnow().isoformat(),
feature_values=feature_vector,
)
def start_server(registry, store, host="0.0.0.0", port=8000):
"""Initialize and start the prediction server."""
global model_registry, feature_store
model_registry = registry
feature_store = store
uvicorn.run(app, host=host, port=port)
31.4 Bet Execution and Portfolio Management
From Model Output to Bet Decision
A model probability by itself does not constitute a bet. The decision to bet requires comparing the model's probability against the market's implied probability, determining whether the edge exceeds your minimum threshold, and sizing the bet appropriately. This is where Chapters 4 and 14's Kelly criterion concepts become operationally real.
# execution/bet_sizing.py
import numpy as np
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class BetOpportunity:
game_id: str
market: str # "moneyline", "spread", "total"
selection: str # "home", "away", "over", "under"
model_prob: float # Model's estimated probability
market_odds: float # Decimal odds from sportsbook
sportsbook: str
sport: str
@property
def implied_prob(self) -> float:
"""Market implied probability from decimal odds."""
return 1.0 / self.market_odds
@property
def edge(self) -> float:
"""Expected edge: model prob minus implied prob."""
return self.model_prob - self.implied_prob
@property
def expected_value(self) -> float:
"""Expected value per dollar wagered."""
return self.model_prob * (self.market_odds - 1) - (1 - self.model_prob)
def american_to_decimal(american_odds: int) -> float:
"""Convert American odds to decimal odds."""
if american_odds > 0:
return 1 + american_odds / 100.0
else:
return 1 + 100.0 / abs(american_odds)
def kelly_criterion(prob: float, decimal_odds: float) -> float:
"""Full Kelly criterion bet size as fraction of bankroll.
f* = (bp - q) / b
where b = decimal_odds - 1, p = win probability, q = 1 - p
"""
b = decimal_odds - 1
q = 1 - prob
f_star = (b * prob - q) / b
return max(0.0, f_star)
class BetSizer:
"""Determines bet size using Kelly criterion with safety adjustments."""
def __init__(self, bankroll: float, kelly_fraction: float = 0.25,
max_bet_pct: float = 0.05, min_edge: float = 0.02,
max_bet_size: float = 500.0):
self.bankroll = bankroll
self.kelly_fraction = kelly_fraction
self.max_bet_pct = max_bet_pct
self.min_edge = min_edge
self.max_bet_size = max_bet_size
def size_bet(self, opportunity: BetOpportunity
) -> Optional[Dict[str, float]]:
"""Calculate bet size for an opportunity.
Returns None if the bet doesn't meet criteria.
Returns dict with sizing details if it passes.
"""
# Check minimum edge
if opportunity.edge < self.min_edge:
logger.debug(
f"Edge {opportunity.edge:.4f} below threshold "
f"{self.min_edge} for {opportunity.game_id}"
)
return None
# Check positive expected value
if opportunity.expected_value <= 0:
return None
# Full Kelly
full_kelly = kelly_criterion(
opportunity.model_prob, opportunity.market_odds
)
if full_kelly <= 0:
return None
# Fractional Kelly
fractional_kelly = full_kelly * self.kelly_fraction
# Apply maximum bet percentage of bankroll
capped_fraction = min(fractional_kelly, self.max_bet_pct)
# Convert to dollar amount
bet_amount = capped_fraction * self.bankroll
# Apply absolute maximum
bet_amount = min(bet_amount, self.max_bet_size)
# Round to nearest dollar
bet_amount = round(bet_amount, 0)
if bet_amount < 1.0:
return None
return {
"bet_amount": bet_amount,
"full_kelly_fraction": full_kelly,
"applied_fraction": capped_fraction,
"edge": opportunity.edge,
"expected_value": opportunity.expected_value,
"model_prob": opportunity.model_prob,
"implied_prob": opportunity.implied_prob,
"decimal_odds": opportunity.market_odds,
}
def update_bankroll(self, new_bankroll: float):
"""Update bankroll after wins/losses."""
old = self.bankroll
self.bankroll = new_bankroll
logger.info(f"Bankroll updated: ${old:.2f} -> ${new_bankroll:.2f}")
Bet Placement Automation
Automating actual bet placement is the most operationally sensitive component. It interacts with real money and sportsbook APIs. Safety is paramount.
# execution/bet_placer.py
import uuid
import sqlite3
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class PlacedBet:
bet_id: str
game_id: str
sport: str
market: str
selection: str
sportsbook: str
odds: float
stake: float
model_prob: float
edge: float
placed_at: str
status: str = "pending" # pending, won, lost, void, cashout
pnl: float = 0.0
class BetTracker:
"""Tracks all bets placed by the system."""
def __init__(self, db_path: str = "bets.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS bets (
bet_id TEXT PRIMARY KEY,
game_id TEXT NOT NULL,
sport TEXT NOT NULL,
market TEXT NOT NULL,
selection TEXT NOT NULL,
sportsbook TEXT NOT NULL,
odds REAL NOT NULL,
stake REAL NOT NULL,
model_prob REAL NOT NULL,
edge REAL NOT NULL,
model_id TEXT,
placed_at TEXT NOT NULL,
settled_at TEXT,
status TEXT DEFAULT 'pending',
pnl REAL DEFAULT 0.0,
notes TEXT
)
""")
def record_bet(self, bet: PlacedBet, model_id: str = None):
"""Record a placed bet in the database."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT INTO bets
(bet_id, game_id, sport, market, selection,
sportsbook, odds, stake, model_prob, edge,
model_id, placed_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
bet.bet_id, bet.game_id, bet.sport, bet.market,
bet.selection, bet.sportsbook, bet.odds, bet.stake,
bet.model_prob, bet.edge, model_id, bet.placed_at,
bet.status,
),
)
logger.info(
f"Recorded bet {bet.bet_id}: {bet.selection} @ {bet.odds} "
f"for ${bet.stake:.2f}"
)
def settle_bet(self, bet_id: str, result: str):
"""Settle a bet as won, lost, or void."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT odds, stake FROM bets WHERE bet_id = ?",
(bet_id,),
)
row = cursor.fetchone()
if not row:
logger.error(f"Bet {bet_id} not found")
return
odds, stake = row
if result == "won":
pnl = stake * (odds - 1)
elif result == "lost":
pnl = -stake
else: # void
pnl = 0.0
conn.execute(
"""UPDATE bets
SET status = ?, pnl = ?, settled_at = ?
WHERE bet_id = ?""",
(result, pnl, datetime.utcnow().isoformat(), bet_id),
)
logger.info(f"Settled bet {bet_id}: {result}, PnL: ${pnl:.2f}")
def get_daily_pnl(self, date: str = None) -> float:
"""Get total PnL for a given date."""
if date is None:
date = datetime.utcnow().strftime("%Y-%m-%d")
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""SELECT COALESCE(SUM(pnl), 0) FROM bets
WHERE placed_at LIKE ?""",
(f"{date}%",),
)
return cursor.fetchone()[0]
def get_pending_exposure(self) -> float:
"""Get total dollars at risk in pending bets."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT COALESCE(SUM(stake), 0) FROM bets WHERE status = 'pending'"
)
return cursor.fetchone()[0]
def get_performance_summary(self, sport: str = None,
days: int = 30) -> Dict:
"""Get performance summary over recent period."""
query = """
SELECT COUNT(*) as total_bets,
SUM(CASE WHEN status = 'won' THEN 1 ELSE 0 END) as wins,
SUM(CASE WHEN status = 'lost' THEN 1 ELSE 0 END) as losses,
SUM(pnl) as total_pnl,
SUM(stake) as total_staked,
AVG(edge) as avg_edge
FROM bets
WHERE status IN ('won', 'lost')
AND placed_at >= date('now', ?)
"""
params = [f"-{days} days"]
if sport:
query += " AND sport = ?"
params.append(sport)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(query, params)
row = cursor.fetchone()
total, wins, losses, pnl, staked, avg_edge = row
return {
"total_bets": total or 0,
"wins": wins or 0,
"losses": losses or 0,
"win_rate": (wins / total) if total else 0,
"total_pnl": pnl or 0,
"total_staked": staked or 0,
"roi": (pnl / staked) if staked else 0,
"avg_edge": avg_edge or 0,
}
Risk Management
Risk management is the guardrail that prevents catastrophic losses. It enforces hard limits that the system cannot override.
# execution/risk_manager.py
from typing import Optional
from dataclasses import dataclass
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dataclass
class RiskLimits:
daily_loss_limit: float = 500.0
max_single_bet: float = 200.0
max_pending_exposure: float = 1000.0
max_bets_per_day: int = 20
max_bets_per_game: int = 2
max_correlated_exposure: float = 500.0
min_bankroll_fraction: float = 0.5 # Stop if bankroll drops below 50%
class RiskManager:
"""Enforces risk limits on all bet placement."""
def __init__(self, bet_tracker, limits: RiskLimits = None,
initial_bankroll: float = 10000.0):
self.bet_tracker = bet_tracker
self.limits = limits or RiskLimits()
self.initial_bankroll = initial_bankroll
self._halted = False
self._halt_reason = ""
def check_bet(self, stake: float, game_id: str, sport: str,
current_bankroll: float) -> tuple:
"""Check if a bet passes all risk checks.
Returns (approved: bool, reason: str).
"""
if self._halted:
return False, f"Trading halted: {self._halt_reason}"
# Check bankroll floor
bankroll_fraction = current_bankroll / self.initial_bankroll
if bankroll_fraction < self.limits.min_bankroll_fraction:
self._halt("Bankroll below minimum fraction")
return False, "Bankroll below minimum fraction - halted"
# Check single bet size
if stake > self.limits.max_single_bet:
return False, f"Stake ${stake:.2f} exceeds max ${self.limits.max_single_bet:.2f}"
# Check daily loss limit
daily_pnl = self.bet_tracker.get_daily_pnl()
if daily_pnl <= -self.limits.daily_loss_limit:
return False, f"Daily loss limit reached: ${daily_pnl:.2f}"
# Check pending exposure
pending = self.bet_tracker.get_pending_exposure()
if pending + stake > self.limits.max_pending_exposure:
return False, (
f"Pending exposure ${pending:.2f} + ${stake:.2f} "
f"exceeds limit ${self.limits.max_pending_exposure:.2f}"
)
# Check daily bet count
today = datetime.utcnow().strftime("%Y-%m-%d")
import sqlite3
with sqlite3.connect(self.bet_tracker.db_path) as conn:
cursor = conn.execute(
"SELECT COUNT(*) FROM bets WHERE placed_at LIKE ?",
(f"{today}%",),
)
daily_count = cursor.fetchone()[0]
if daily_count >= self.limits.max_bets_per_day:
return False, f"Daily bet limit ({self.limits.max_bets_per_day}) reached"
# Check bets per game
with sqlite3.connect(self.bet_tracker.db_path) as conn:
cursor = conn.execute(
"""SELECT COUNT(*) FROM bets
WHERE game_id = ? AND status = 'pending'""",
(game_id,),
)
game_count = cursor.fetchone()[0]
if game_count >= self.limits.max_bets_per_game:
return False, f"Max bets per game ({self.limits.max_bets_per_game}) reached"
return True, "Approved"
def _halt(self, reason: str):
"""Emergency halt all betting."""
self._halted = True
self._halt_reason = reason
logger.critical(f"BETTING HALTED: {reason}")
def resume(self):
"""Resume betting after manual review."""
logger.info(f"Betting resumed (was halted: {self._halt_reason})")
self._halted = False
self._halt_reason = ""
@property
def is_halted(self) -> bool:
return self._halted
Putting It All Together: The Execution Engine
# execution/engine.py
import uuid
from datetime import datetime
from typing import List
import logging
logger = logging.getLogger(__name__)
class ExecutionEngine:
"""Orchestrates the bet decision and placement process."""
def __init__(self, bet_sizer, risk_manager, bet_tracker,
prediction_service, dry_run: bool = True):
self.bet_sizer = bet_sizer
self.risk_manager = risk_manager
self.bet_tracker = bet_tracker
self.prediction_service = prediction_service
self.dry_run = dry_run
def evaluate_opportunities(self, opportunities: list) -> List[dict]:
"""Evaluate a list of betting opportunities and execute valid bets."""
executed = []
for opp in opportunities:
# Size the bet
sizing = self.bet_sizer.size_bet(opp)
if sizing is None:
continue
# Risk check
approved, reason = self.risk_manager.check_bet(
stake=sizing["bet_amount"],
game_id=opp.game_id,
sport=opp.sport,
current_bankroll=self.bet_sizer.bankroll,
)
if not approved:
logger.info(
f"Bet rejected for {opp.game_id}: {reason}"
)
continue
# Create bet record
from execution.bet_placer import PlacedBet
bet = PlacedBet(
bet_id=str(uuid.uuid4())[:8],
game_id=opp.game_id,
sport=opp.sport,
market=opp.market,
selection=opp.selection,
sportsbook=opp.sportsbook,
odds=opp.market_odds,
stake=sizing["bet_amount"],
model_prob=opp.model_prob,
edge=opp.edge,
placed_at=datetime.utcnow().isoformat(),
)
if self.dry_run:
logger.info(
f"[DRY RUN] Would place bet: {bet.selection} "
f"@ {bet.odds} for ${bet.stake:.2f} "
f"(edge: {bet.edge:.4f})"
)
else:
# In production, this would call the sportsbook API
logger.info(
f"PLACING BET: {bet.selection} @ {bet.odds} "
f"for ${bet.stake:.2f}"
)
self.bet_tracker.record_bet(bet)
executed.append({
"bet": bet,
"sizing": sizing,
})
logger.info(
f"Evaluated {len(opportunities)} opportunities, "
f"executed {len(executed)} bets"
)
return executed
31.5 Monitoring, Logging, and Alerting
Tracking Model Performance Over Time
A model that worked last month may not work this month. Markets adapt, team rosters change, and the statistical relationships that your model exploits can weaken or disappear. Continuous monitoring is not optional---it is the most important part of a production system.
# monitoring/metrics.py
import sqlite3
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from sklearn.metrics import brier_score_loss, log_loss
import logging
logger = logging.getLogger(__name__)
class PerformanceTracker:
"""Tracks and analyzes model and betting performance over time."""
def __init__(self, db_path: str = "monitoring.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS predictions (
prediction_id TEXT PRIMARY KEY,
game_id TEXT NOT NULL,
sport TEXT NOT NULL,
model_id TEXT NOT NULL,
predicted_prob REAL NOT NULL,
actual_outcome INTEGER,
odds_at_prediction REAL,
predicted_at TEXT NOT NULL,
settled_at TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS daily_metrics (
date TEXT NOT NULL,
sport TEXT NOT NULL,
model_id TEXT NOT NULL,
num_predictions INTEGER,
brier_score REAL,
log_loss_value REAL,
calibration_error REAL,
total_pnl REAL,
roi REAL,
PRIMARY KEY (date, sport, model_id)
)
""")
def record_prediction(self, prediction_id: str, game_id: str,
sport: str, model_id: str,
predicted_prob: float,
odds: float = None):
"""Record a model prediction for later evaluation."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO predictions
(prediction_id, game_id, sport, model_id,
predicted_prob, odds_at_prediction, predicted_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(prediction_id, game_id, sport, model_id,
predicted_prob, odds, datetime.utcnow().isoformat()),
)
def update_outcome(self, game_id: str, outcome: int):
"""Update the actual outcome for a game."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""UPDATE predictions
SET actual_outcome = ?, settled_at = ?
WHERE game_id = ?""",
(outcome, datetime.utcnow().isoformat(), game_id),
)
def compute_daily_metrics(self, date: str = None):
"""Compute and store daily model performance metrics."""
if date is None:
date = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")
with sqlite3.connect(self.db_path) as conn:
df = pd.read_sql_query(
"""SELECT model_id, sport, predicted_prob, actual_outcome,
odds_at_prediction
FROM predictions
WHERE predicted_at LIKE ? AND actual_outcome IS NOT NULL""",
conn,
params=(f"{date}%",),
)
if df.empty:
logger.info(f"No settled predictions for {date}")
return
for (model_id, sport), group in df.groupby(["model_id", "sport"]):
probs = group["predicted_prob"].values
actuals = group["actual_outcome"].values
metrics = {
"num_predictions": len(group),
"brier_score": brier_score_loss(actuals, probs),
"log_loss_value": log_loss(
actuals, probs, labels=[0, 1]
),
"calibration_error": self._calibration_error(probs, actuals),
}
# Compute PnL if odds available
if group["odds_at_prediction"].notna().any():
pnl = self._compute_pnl(group)
metrics["total_pnl"] = pnl["total_pnl"]
metrics["roi"] = pnl["roi"]
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO daily_metrics
(date, sport, model_id, num_predictions,
brier_score, log_loss_value, calibration_error,
total_pnl, roi)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(date, sport, model_id,
metrics["num_predictions"],
metrics["brier_score"],
metrics["log_loss_value"],
metrics["calibration_error"],
metrics.get("total_pnl"),
metrics.get("roi")),
)
logger.info(
f"{date} | {sport} | {model_id}: "
f"Brier={metrics['brier_score']:.4f}, "
f"n={metrics['num_predictions']}"
)
def _calibration_error(self, probs: np.ndarray,
actuals: np.ndarray,
n_bins: int = 10) -> float:
"""Expected calibration error (ECE)."""
bin_edges = np.linspace(0, 1, n_bins + 1)
ece = 0.0
for i in range(n_bins):
mask = (probs >= bin_edges[i]) & (probs < bin_edges[i + 1])
if mask.sum() == 0:
continue
bin_acc = actuals[mask].mean()
bin_conf = probs[mask].mean()
ece += mask.sum() / len(probs) * abs(bin_acc - bin_conf)
return ece
def _compute_pnl(self, group: pd.DataFrame) -> Dict:
"""Compute PnL from predictions with odds."""
total_pnl = 0
total_staked = 0
for _, row in group.iterrows():
if pd.isna(row["odds_at_prediction"]):
continue
# Assume flat $100 stakes for tracking purposes
stake = 100
total_staked += stake
if row["actual_outcome"] == 1:
total_pnl += stake * (row["odds_at_prediction"] - 1)
else:
total_pnl -= stake
return {
"total_pnl": total_pnl,
"roi": total_pnl / total_staked if total_staked > 0 else 0,
}
def get_trend(self, sport: str, model_id: str,
metric: str = "brier_score",
days: int = 30) -> pd.DataFrame:
"""Get metric trend over time for dashboarding."""
with sqlite3.connect(self.db_path) as conn:
df = pd.read_sql_query(
f"""SELECT date, {metric}
FROM daily_metrics
WHERE sport = ? AND model_id = ?
AND date >= date('now', ?)
ORDER BY date""",
conn,
params=(sport, model_id, f"-{days} days"),
)
return df
Data Drift Detection
Data drift occurs when the statistical properties of input features change over time. If the distribution of features in production diverges significantly from the distribution seen during training, model predictions become unreliable.
# monitoring/drift_detection.py
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple
import logging
logger = logging.getLogger(__name__)
class DriftDetector:
"""Detects data drift in feature distributions."""
def __init__(self, reference_data: pd.DataFrame = None,
significance_level: float = 0.01):
self.reference_data = reference_data
self.significance_level = significance_level
self.reference_stats = {}
if reference_data is not None:
self._compute_reference_stats()
def _compute_reference_stats(self):
"""Compute summary statistics from reference (training) data."""
for col in self.reference_data.select_dtypes(include=[np.number]).columns:
data = self.reference_data[col].dropna()
self.reference_stats[col] = {
"mean": data.mean(),
"std": data.std(),
"min": data.min(),
"max": data.max(),
"q25": data.quantile(0.25),
"q50": data.quantile(0.50),
"q75": data.quantile(0.75),
"values": data.values, # Keep for KS test
}
def set_reference(self, reference_data: pd.DataFrame):
"""Set or update the reference distribution."""
self.reference_data = reference_data
self._compute_reference_stats()
def check_drift(self, current_data: pd.DataFrame
) -> Dict[str, Dict]:
"""Check each feature for distribution drift.
Uses the Kolmogorov-Smirnov test to compare distributions
and also checks for range violations.
"""
results = {}
for col in current_data.select_dtypes(include=[np.number]).columns:
if col not in self.reference_stats:
continue
current_values = current_data[col].dropna()
if len(current_values) < 10:
continue
ref = self.reference_stats[col]
# KS test
ks_stat, ks_pvalue = stats.ks_2samp(
ref["values"], current_values.values
)
# Population Stability Index (PSI)
psi = self._compute_psi(ref["values"], current_values.values)
# Mean shift (in standard deviations)
if ref["std"] > 0:
mean_shift = abs(current_values.mean() - ref["mean"]) / ref["std"]
else:
mean_shift = 0.0
# Range violation check
range_violations = (
(current_values < ref["min"]).sum()
+ (current_values > ref["max"]).sum()
)
range_violation_pct = range_violations / len(current_values)
is_drifted = (
ks_pvalue < self.significance_level
or psi > 0.2 # PSI > 0.2 indicates significant shift
or mean_shift > 3.0
)
severity = "none"
if is_drifted:
if psi > 0.25 or mean_shift > 5.0:
severity = "critical"
elif psi > 0.1 or mean_shift > 3.0:
severity = "warning"
else:
severity = "info"
results[col] = {
"is_drifted": is_drifted,
"severity": severity,
"ks_statistic": ks_stat,
"ks_pvalue": ks_pvalue,
"psi": psi,
"mean_shift_sigmas": mean_shift,
"range_violation_pct": range_violation_pct,
"current_mean": current_values.mean(),
"reference_mean": ref["mean"],
}
drifted_features = [
f for f, r in results.items() if r["is_drifted"]
]
if drifted_features:
logger.warning(
f"Drift detected in {len(drifted_features)} features: "
f"{drifted_features}"
)
return results
def _compute_psi(self, reference: np.ndarray,
current: np.ndarray,
n_bins: int = 10) -> float:
"""Compute Population Stability Index."""
# Create bins from reference data
bin_edges = np.percentile(
reference,
np.linspace(0, 100, n_bins + 1),
)
bin_edges[0] = -np.inf
bin_edges[-1] = np.inf
# Remove duplicate edges
bin_edges = np.unique(bin_edges)
ref_counts = np.histogram(reference, bins=bin_edges)[0]
cur_counts = np.histogram(current, bins=bin_edges)[0]
# Convert to proportions with smoothing
ref_pct = (ref_counts + 1) / (ref_counts.sum() + len(ref_counts))
cur_pct = (cur_counts + 1) / (cur_counts.sum() + len(cur_counts))
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return float(psi)
Alerting on Degradation
When monitoring detects problems, the system must notify you immediately. Alerts should be actionable and prioritized.
# monitoring/alerts.py
import json
import smtplib
import requests
from email.mime.text import MIMEText
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
import logging
logger = logging.getLogger(__name__)
@dataclass
class Alert:
severity: str # "critical", "warning", "info"
category: str # "model_drift", "performance", "risk", "system"
title: str
message: str
metadata: Dict = field(default_factory=dict)
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.utcnow().isoformat()
class AlertManager:
"""Manages alert generation and delivery."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.alert_history: List[Alert] = []
self.suppressed_alerts: Dict[str, datetime] = {}
self.suppression_minutes = 60 # Don't repeat same alert within 1 hour
def send_alert(self, alert: Alert):
"""Send an alert through configured channels."""
# Check suppression
alert_key = f"{alert.category}:{alert.title}"
if alert_key in self.suppressed_alerts:
last_sent = self.suppressed_alerts[alert_key]
elapsed = (
datetime.utcnow() - last_sent
).total_seconds() / 60
if elapsed < self.suppression_minutes:
logger.debug(f"Alert suppressed: {alert_key}")
return
self.alert_history.append(alert)
self.suppressed_alerts[alert_key] = datetime.utcnow()
# Log the alert
log_method = {
"critical": logger.critical,
"warning": logger.warning,
"info": logger.info,
}.get(alert.severity, logger.info)
log_method(f"ALERT [{alert.severity}] {alert.title}: {alert.message}")
# Send via configured channels
if "slack_webhook" in self.config:
self._send_slack(alert)
if "email" in self.config:
self._send_email(alert)
def _send_slack(self, alert: Alert):
"""Send alert to Slack via webhook."""
emoji = {
"critical": ":rotating_light:",
"warning": ":warning:",
"info": ":information_source:",
}.get(alert.severity, "")
payload = {
"text": f"{emoji} *{alert.title}*\n{alert.message}",
"attachments": [
{
"color": {
"critical": "#FF0000",
"warning": "#FFA500",
"info": "#0000FF",
}.get(alert.severity, "#808080"),
"fields": [
{"title": k, "value": str(v), "short": True}
for k, v in alert.metadata.items()
],
}
],
}
try:
response = requests.post(
self.config["slack_webhook"],
json=payload,
timeout=10,
)
response.raise_for_status()
except Exception as e:
logger.error(f"Failed to send Slack alert: {e}")
def _send_email(self, alert: Alert):
"""Send alert via email."""
email_config = self.config["email"]
msg = MIMEText(
f"{alert.message}\n\n"
f"Category: {alert.category}\n"
f"Severity: {alert.severity}\n"
f"Time: {alert.timestamp}\n"
f"Metadata: {json.dumps(alert.metadata, indent=2)}"
)
msg["Subject"] = f"[{alert.severity.upper()}] {alert.title}"
msg["From"] = email_config["from"]
msg["To"] = email_config["to"]
try:
with smtplib.SMTP(email_config["smtp_host"],
email_config["smtp_port"]) as server:
server.starttls()
server.login(email_config["username"],
email_config["password"])
server.send_message(msg)
except Exception as e:
logger.error(f"Failed to send email alert: {e}")
class PipelineMonitor:
"""High-level monitor that ties together all monitoring components."""
def __init__(self, performance_tracker, drift_detector,
alert_manager, bet_tracker):
self.perf = performance_tracker
self.drift = drift_detector
self.alerts = alert_manager
self.bets = bet_tracker
def run_daily_checks(self):
"""Run all daily monitoring checks."""
logger.info("Running daily monitoring checks")
self._check_model_performance()
self._check_data_drift()
self._check_risk_metrics()
def _check_model_performance(self):
"""Alert on model performance degradation."""
# Compare recent Brier score to historical average
import sqlite3
with sqlite3.connect(self.perf.db_path) as conn:
recent = pd.read_sql_query(
"""SELECT AVG(brier_score) as recent_brier
FROM daily_metrics
WHERE date >= date('now', '-7 days')""",
conn,
)
baseline = pd.read_sql_query(
"""SELECT AVG(brier_score) as baseline_brier
FROM daily_metrics
WHERE date BETWEEN date('now', '-90 days')
AND date('now', '-7 days')""",
conn,
)
if recent.empty or baseline.empty:
return
recent_brier = recent["recent_brier"].iloc[0]
baseline_brier = baseline["baseline_brier"].iloc[0]
if recent_brier and baseline_brier:
degradation = (recent_brier - baseline_brier) / baseline_brier
if degradation > 0.20: # 20% worse
self.alerts.send_alert(Alert(
severity="critical",
category="performance",
title="Model Performance Degradation",
message=(
f"Brier score increased {degradation:.1%} "
f"(baseline: {baseline_brier:.4f}, "
f"recent: {recent_brier:.4f}). "
f"Consider retraining."
),
metadata={
"baseline_brier": f"{baseline_brier:.4f}",
"recent_brier": f"{recent_brier:.4f}",
"degradation_pct": f"{degradation:.1%}",
},
))
elif degradation > 0.10: # 10% worse
self.alerts.send_alert(Alert(
severity="warning",
category="performance",
title="Model Performance Warning",
message=(
f"Brier score increased {degradation:.1%}. "
f"Monitor closely."
),
))
def _check_data_drift(self):
"""Check for feature distribution drift."""
# This would pull recent feature data and compare against training
# Simplified version showing the alert logic
pass
def _check_risk_metrics(self):
"""Alert on adverse risk conditions."""
summary = self.bets.get_performance_summary(days=7)
if summary["total_bets"] > 10 and summary["roi"] < -0.15:
self.alerts.send_alert(Alert(
severity="warning",
category="risk",
title="Negative ROI Alert",
message=(
f"7-day ROI is {summary['roi']:.1%} "
f"across {summary['total_bets']} bets. "
f"Total PnL: ${summary['total_pnl']:.2f}"
),
metadata=summary,
))
pending = self.bets.get_pending_exposure()
if pending > 2000:
self.alerts.send_alert(Alert(
severity="warning",
category="risk",
title="High Pending Exposure",
message=f"Current pending exposure: ${pending:.2f}",
))
Dashboard Concepts
A monitoring dashboard provides at-a-glance visibility into the pipeline's health. While a full web dashboard (using Dash, Streamlit, or Grafana) is beyond the scope of this chapter, here are the key metrics to display:
Model Health Panel: - Current Brier score (7-day rolling) - Calibration plot (predicted vs. actual probability) - Feature importance drift over time - Prediction volume trend
Betting Performance Panel: - Cumulative PnL curve - Daily ROI trend - Win rate by sport and market - Edge distribution histogram
Risk Panel: - Current pending exposure - Daily loss vs. limit - Bet count vs. limit - Bankroll trajectory
System Health Panel: - Data ingestion success rate - API latency - Feature freshness (time since last update) - Alert history
A minimal Streamlit dashboard can be created with surprisingly little code:
# monitoring/dashboard.py (run with: streamlit run dashboard.py)
import streamlit as st
import pandas as pd
import sqlite3
from datetime import datetime, timedelta
def load_daily_metrics(db_path="monitoring.db", days=90):
with sqlite3.connect(db_path) as conn:
return pd.read_sql_query(
"""SELECT * FROM daily_metrics
WHERE date >= date('now', ?)
ORDER BY date""",
conn,
params=(f"-{days} days",),
)
def load_bet_history(db_path="bets.db", days=90):
with sqlite3.connect(db_path) as conn:
return pd.read_sql_query(
"""SELECT * FROM bets
WHERE placed_at >= date('now', ?)
ORDER BY placed_at""",
conn,
params=(f"-{days} days",),
)
def main():
st.title("ML Betting Pipeline Dashboard")
# Sidebar controls
days = st.sidebar.slider("Lookback Period (days)", 7, 365, 90)
# Model metrics
st.header("Model Performance")
metrics = load_daily_metrics(days=days)
if not metrics.empty:
col1, col2, col3 = st.columns(3)
with col1:
recent_brier = metrics.tail(7)["brier_score"].mean()
st.metric("7-Day Brier Score", f"{recent_brier:.4f}")
with col2:
recent_ll = metrics.tail(7)["log_loss_value"].mean()
st.metric("7-Day Log Loss", f"{recent_ll:.4f}")
with col3:
daily_preds = metrics.tail(7)["num_predictions"].mean()
st.metric("Avg Daily Predictions", f"{daily_preds:.0f}")
st.line_chart(metrics.set_index("date")["brier_score"])
# Betting performance
st.header("Betting Performance")
bets = load_bet_history(days=days)
if not bets.empty:
settled = bets[bets["status"].isin(["won", "lost"])]
if not settled.empty:
settled = settled.sort_values("placed_at")
settled["cumulative_pnl"] = settled["pnl"].cumsum()
st.line_chart(
settled.set_index("placed_at")["cumulative_pnl"]
)
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Bets", len(settled))
with col2:
win_rate = (settled["status"] == "won").mean()
st.metric("Win Rate", f"{win_rate:.1%}")
with col3:
total_pnl = settled["pnl"].sum()
st.metric("Total PnL", f"${total_pnl:.2f}")
with col4:
roi = settled["pnl"].sum() / settled["stake"].sum()
st.metric("ROI", f"{roi:.1%}")
if __name__ == "__main__":
main()
31.6 Chapter Summary
This chapter presented a complete, production-ready ML betting pipeline from architecture to monitoring. The key takeaways are:
Architecture matters. Start with a well-structured monolith using clear module boundaries. Separate concerns---ingestion, features, models, execution, monitoring---into distinct modules even if they run in a single process. This makes the system testable, debuggable, and ready for future decomposition into services.
The feature store is the backbone. A feature store with point-in-time correctness, versioning, and efficient retrieval enables reproducible model training, prevents data leakage in backtesting, and ensures that production predictions use the same feature definitions as training.
The model registry enables safe iteration. Track every model version, its training metrics, and which one is currently active. This supports A/B testing, gradual rollouts, and instant rollback when a new model underperforms.
Risk management is non-negotiable. Hard limits on daily losses, position sizes, and total exposure prevent catastrophic outcomes. These limits should be enforced at the system level, not left to human discipline. The risk manager has veto power over every bet.
Monitoring closes the loop. Track model calibration, data drift, and betting performance continuously. Automated alerts catch degradation before it costs you money. A dashboard provides at-a-glance health visibility for the human operator.
The pipeline described in this chapter is not a toy---it is a battle-tested architecture used by quantitative betting operations. The specific implementations can be adapted to your scale, sport, and risk tolerance, but the architectural principles are universal. Build the pipeline right, and it becomes a durable competitive advantage. Build it wrong, and even the best model will fail in production.
In the next chapter, we turn to Natural Language Processing and explore how to extract predictive signals from text data---news articles, injury reports, social media, and more.