9 min read

Building a machine learning model that predicts sports outcomes is only the first step. The real engineering challenge lies in constructing a complete, production-ready pipeline that ingests data, computes features, trains models, serves...

Chapter 31: The Complete ML Betting Pipeline

Building a machine learning model that predicts sports outcomes is only the first step. The real engineering challenge lies in constructing a complete, production-ready pipeline that ingests data, computes features, trains models, serves predictions, executes bets, manages risk, and monitors everything continuously. This chapter bridges the gap between a Jupyter notebook prototype and a robust, automated betting system.

We will walk through every layer of the pipeline---from system architecture to monitoring dashboards---with practical Python code throughout. By the end, you will have a blueprint for building a system that runs autonomously, makes disciplined betting decisions, and alerts you when something goes wrong.


31.1 System Architecture and Design

The End-to-End Architecture

A production ML betting pipeline is a chain of interconnected components, each responsible for a specific function. Before writing any code, you need to understand how data flows through the entire system. Here is a text-based architecture diagram:

┌─────────────────────────────────────────────────────────────────────┐
│                     ML BETTING PIPELINE                            │
│                                                                     │
│  ┌──────────┐   ┌──────────────┐   ┌──────────────┐               │
│  │  Data     │──>│  Feature     │──>│  Model       │               │
│  │  Ingestion│   │  Store       │   │  Training    │               │
│  └──────────┘   └──────────────┘   └──────┬───────┘               │
│       │                                     │                       │
│       │              ┌──────────────────────┘                       │
│       │              v                                              │
│       │         ┌──────────────┐   ┌──────────────┐               │
│       │         │  Model       │──>│  Prediction  │               │
│       │         │  Registry    │   │  Service     │               │
│       │         └──────────────┘   └──────┬───────┘               │
│       │                                     │                       │
│       │                                     v                       │
│       │         ┌──────────────┐   ┌──────────────┐               │
│       │         │  Risk &      │<──│  Bet         │               │
│       │         │  Portfolio   │   │  Execution   │               │
│       │         └──────────────┘   └──────────────┘               │
│       │                                                             │
│       v                                                             │
│  ┌──────────────────────────────────────────────────────────┐      │
│  │              Monitoring, Logging, Alerting               │      │
│  └──────────────────────────────────────────────────────────┘      │
└─────────────────────────────────────────────────────────────────────┘

The data flows from left to right and top to bottom. Raw data enters through ingestion, gets transformed into features, feeds into model training or prediction, and ultimately drives bet execution. Monitoring wraps around the entire system, observing every component.

Microservices vs. Monolith

One of the first architectural decisions is whether to build a monolithic application or a collection of microservices.

Monolithic Architecture:

In a monolith, all components---ingestion, feature computation, model training, prediction, and bet execution---live in a single codebase and run as a single process (or a small set of scheduled scripts). This is the right starting point for most individual bettors and small teams.

Advantages: - Simpler deployment and debugging - No inter-service communication overhead - Easier to reason about data flow - Lower infrastructure cost

Disadvantages: - Harder to scale individual components - A failure in one component can crash the entire system - Longer deployment cycles as the codebase grows

Microservices Architecture:

In a microservices approach, each component is an independent service with its own deployment, scaling, and failure isolation. Services communicate via REST APIs, message queues (like RabbitMQ or Redis Streams), or event buses.

Advantages: - Independent scaling (e.g., scale prediction service separately from ingestion) - Fault isolation - Technology diversity (use different languages/frameworks per service) - Independent deployment cycles

Disadvantages: - Significant operational complexity - Network latency between services - Distributed system challenges (consistency, partial failures) - Requires container orchestration (Docker, Kubernetes)

Recommendation: Start with a well-structured monolith. Use clear module boundaries so you can extract services later if needed. Most sports betting operations do not need microservices until they are processing thousands of bets per day across multiple sports and markets simultaneously.

Data Flow and Scheduling

The pipeline operates on two timescales:

  1. Batch processing (hourly/daily): Data ingestion, feature computation, model retraining. Typically orchestrated with a scheduler.
  2. Real-time processing (seconds/minutes): Prediction serving, bet execution, odds monitoring. Runs as long-lived services.

For scheduling, you have several options:

  • cron (Unix) or Task Scheduler (Windows): Simple, built-in, no dependencies.
  • APScheduler (Python library): In-process scheduling with cron-like syntax.
  • Apache Airflow: Full-featured workflow orchestration with DAGs, retries, and monitoring.
  • Prefect / Dagster: Modern alternatives to Airflow with better developer experience.

For a betting pipeline, APScheduler or a simple cron-based approach is usually sufficient. Airflow becomes valuable when you have complex dependency chains between tasks (e.g., "train model only after all features are computed for the day").

Python Project Structure

A well-organized project structure makes the codebase maintainable as it grows. Here is the recommended layout:

betting_pipeline/
├── config/
│   ├── __init__.py
│   ├── settings.py          # Global configuration
│   └── logging_config.py    # Logging setup
├── ingestion/
│   ├── __init__.py
│   ├── scrapers.py          # Data source scrapers
│   ├── api_clients.py       # API wrappers
│   └── raw_storage.py       # Raw data persistence
├── features/
│   ├── __init__.py
│   ├── feature_store.py     # Feature store implementation
│   ├── transformers.py      # Feature computation logic
│   └── validators.py        # Data quality checks
├── models/
│   ├── __init__.py
│   ├── training.py          # Training pipeline
│   ├── registry.py          # Model versioning/registry
│   ├── serving.py           # Prediction API
│   └── evaluation.py        # Model evaluation utilities
├── execution/
│   ├── __init__.py
│   ├── bet_sizing.py        # Kelly criterion, position sizing
│   ├── bet_placer.py        # Bet placement automation
│   └── risk_manager.py      # Risk limits and controls
├── monitoring/
│   ├── __init__.py
│   ├── metrics.py           # Performance tracking
│   ├── drift_detection.py   # Data/model drift
│   └── alerts.py            # Alerting system
├── database/
│   ├── __init__.py
│   ├── models.py            # SQLAlchemy ORM models
│   └── connection.py        # Database connection management
├── tests/
│   ├── test_features.py
│   ├── test_models.py
│   ├── test_execution.py
│   └── test_monitoring.py
├── scripts/
│   ├── run_pipeline.py      # Main entry point
│   ├── retrain_model.py     # Manual retraining script
│   └── backfill_features.py # Historical feature computation
├── requirements.txt
├── setup.py
└── README.md

Here is a foundational configuration module:

# config/settings.py
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from pathlib import Path


@dataclass
class DatabaseConfig:
    host: str = "localhost"
    port: int = 5432
    name: str = "betting_pipeline"
    user: str = "pipeline"
    password: str = ""

    @property
    def connection_string(self) -> str:
        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.name}"


@dataclass
class ModelConfig:
    model_dir: Path = Path("models/artifacts")
    retrain_interval_hours: int = 24
    min_training_samples: int = 500
    validation_split: float = 0.2
    random_seed: int = 42


@dataclass
class ExecutionConfig:
    max_bet_size: float = 100.0      # Maximum dollars per bet
    daily_loss_limit: float = 500.0   # Stop betting after this daily loss
    min_edge_threshold: float = 0.02  # Minimum 2% expected edge
    kelly_fraction: float = 0.25      # Quarter Kelly
    max_concurrent_bets: int = 20


@dataclass
class PipelineConfig:
    database: DatabaseConfig = field(default_factory=DatabaseConfig)
    model: ModelConfig = field(default_factory=ModelConfig)
    execution: ExecutionConfig = field(default_factory=ExecutionConfig)
    log_level: str = "INFO"
    environment: str = "development"  # development, staging, production

    @classmethod
    def from_env(cls) -> "PipelineConfig":
        """Load configuration from environment variables."""
        db_config = DatabaseConfig(
            host=os.getenv("DB_HOST", "localhost"),
            port=int(os.getenv("DB_PORT", "5432")),
            name=os.getenv("DB_NAME", "betting_pipeline"),
            user=os.getenv("DB_USER", "pipeline"),
            password=os.getenv("DB_PASSWORD", ""),
        )
        exec_config = ExecutionConfig(
            max_bet_size=float(os.getenv("MAX_BET_SIZE", "100.0")),
            daily_loss_limit=float(os.getenv("DAILY_LOSS_LIMIT", "500.0")),
            min_edge_threshold=float(os.getenv("MIN_EDGE", "0.02")),
            kelly_fraction=float(os.getenv("KELLY_FRACTION", "0.25")),
        )
        return cls(
            database=db_config,
            execution=exec_config,
            environment=os.getenv("ENVIRONMENT", "development"),
        )

This configuration-as-code approach ensures that every parameter is documented, typed, and easily overridden via environment variables for different deployment environments.


31.2 Data Ingestion and Feature Store

Automated Data Pulling

The ingestion layer is responsible for pulling raw data from external sources---sports APIs, odds feeds, injury reports, weather services---and storing it in a normalized format. Reliability is paramount: data sources go down, APIs change formats, rate limits get hit. Your ingestion code must handle all of this gracefully.

# ingestion/api_clients.py
import time
import logging
import requests
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class APIResponse:
    data: Any
    status_code: int
    timestamp: datetime
    source: str


class RateLimiter:
    """Token bucket rate limiter for API calls."""

    def __init__(self, calls_per_minute: int = 60):
        self.calls_per_minute = calls_per_minute
        self.interval = 60.0 / calls_per_minute
        self.last_call_time = 0.0

    def wait(self):
        now = time.time()
        elapsed = now - self.last_call_time
        if elapsed < self.interval:
            time.sleep(self.interval - elapsed)
        self.last_call_time = time.time()


class SportsDataClient:
    """Generic sports data API client with retry logic."""

    def __init__(self, base_url: str, api_key: str,
                 calls_per_minute: int = 30):
        self.base_url = base_url.rstrip("/")
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Accept": "application/json",
        })
        self.rate_limiter = RateLimiter(calls_per_minute)
        self.max_retries = 3
        self.retry_delay = 5.0  # seconds

    def _request(self, endpoint: str, params: Optional[Dict] = None
                 ) -> APIResponse:
        url = f"{self.base_url}/{endpoint}"

        for attempt in range(self.max_retries):
            try:
                self.rate_limiter.wait()
                response = self.session.get(url, params=params, timeout=30)

                if response.status_code == 200:
                    return APIResponse(
                        data=response.json(),
                        status_code=200,
                        timestamp=datetime.utcnow(),
                        source=endpoint,
                    )
                elif response.status_code == 429:  # Rate limited
                    wait_time = self.retry_delay * (2 ** attempt)
                    logger.warning(
                        f"Rate limited on {endpoint}. "
                        f"Waiting {wait_time}s (attempt {attempt + 1})"
                    )
                    time.sleep(wait_time)
                elif response.status_code >= 500:
                    logger.warning(
                        f"Server error {response.status_code} on {endpoint}. "
                        f"Retrying (attempt {attempt + 1})"
                    )
                    time.sleep(self.retry_delay)
                else:
                    logger.error(
                        f"API error {response.status_code}: "
                        f"{response.text[:200]}"
                    )
                    return APIResponse(
                        data=None,
                        status_code=response.status_code,
                        timestamp=datetime.utcnow(),
                        source=endpoint,
                    )
            except requests.exceptions.Timeout:
                logger.warning(f"Timeout on {endpoint} (attempt {attempt + 1})")
                time.sleep(self.retry_delay)
            except requests.exceptions.ConnectionError:
                logger.warning(
                    f"Connection error on {endpoint} (attempt {attempt + 1})"
                )
                time.sleep(self.retry_delay * 2)

        logger.error(f"All retries exhausted for {endpoint}")
        return APIResponse(
            data=None, status_code=0,
            timestamp=datetime.utcnow(), source=endpoint,
        )

    def get_games(self, sport: str, date: str) -> List[Dict]:
        """Fetch games for a given sport and date."""
        response = self._request(f"sports/{sport}/games", {"date": date})
        if response.data and "games" in response.data:
            return response.data["games"]
        return []

    def get_odds(self, game_id: str) -> Dict:
        """Fetch current odds for a specific game."""
        response = self._request(f"games/{game_id}/odds")
        return response.data or {}

    def get_team_stats(self, team_id: str, season: str) -> Dict:
        """Fetch team statistics for a season."""
        response = self._request(
            f"teams/{team_id}/stats", {"season": season}
        )
        return response.data or {}


class OddsClient:
    """Client specifically for odds data, supporting multiple books."""

    def __init__(self, api_key: str):
        self.client = SportsDataClient(
            base_url="https://api.the-odds-api.com/v4",
            api_key=api_key,
            calls_per_minute=20,
        )

    def get_live_odds(self, sport: str,
                      markets: List[str] = None) -> List[Dict]:
        """Get live odds across multiple sportsbooks."""
        if markets is None:
            markets = ["h2h", "spreads", "totals"]

        all_odds = []
        for market in markets:
            response = self.client._request(
                f"sports/{sport}/odds",
                params={
                    "regions": "us",
                    "markets": market,
                    "oddsFormat": "american",
                },
            )
            if response.data:
                all_odds.extend(response.data)
        return all_odds

Feature Computation Pipelines

Raw data is rarely useful for models directly. Features must be computed from raw inputs: rolling averages, head-to-head records, Elo ratings, rest days, travel distance, and hundreds of other transformations. The feature computation pipeline should be deterministic---given the same raw data, it always produces the same features.

# features/transformers.py
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import logging

logger = logging.getLogger(__name__)


class FeatureTransformer(ABC):
    """Base class for all feature transformers."""

    @abstractmethod
    def transform(self, raw_data: pd.DataFrame) -> pd.DataFrame:
        pass

    @abstractmethod
    def feature_names(self) -> List[str]:
        pass


class RollingStatsTransformer(FeatureTransformer):
    """Compute rolling statistics for team performance metrics."""

    def __init__(self, windows: List[int] = None,
                 metrics: List[str] = None):
        self.windows = windows or [5, 10, 20]
        self.metrics = metrics or [
            "points_scored", "points_allowed", "turnovers",
            "field_goal_pct", "three_point_pct",
        ]

    def transform(self, raw_data: pd.DataFrame) -> pd.DataFrame:
        """Compute rolling means and standard deviations.

        Input must have columns: team_id, game_date, and all metrics.
        Data must be sorted by game_date within each team.
        """
        features = raw_data[["team_id", "game_date"]].copy()

        for team_id in raw_data["team_id"].unique():
            team_mask = raw_data["team_id"] == team_id
            team_data = raw_data.loc[team_mask].sort_values("game_date")

            for metric in self.metrics:
                if metric not in team_data.columns:
                    continue
                for window in self.windows:
                    # Rolling mean (shift to avoid data leakage)
                    col_mean = f"{metric}_rolling_mean_{window}"
                    features.loc[team_mask, col_mean] = (
                        team_data[metric]
                        .shift(1)
                        .rolling(window, min_periods=max(1, window // 2))
                        .mean()
                    )

                    # Rolling standard deviation
                    col_std = f"{metric}_rolling_std_{window}"
                    features.loc[team_mask, col_std] = (
                        team_data[metric]
                        .shift(1)
                        .rolling(window, min_periods=max(3, window // 2))
                        .std()
                    )

        return features

    def feature_names(self) -> List[str]:
        names = []
        for metric in self.metrics:
            for window in self.windows:
                names.append(f"{metric}_rolling_mean_{window}")
                names.append(f"{metric}_rolling_std_{window}")
        return names


class EloTransformer(FeatureTransformer):
    """Compute Elo ratings from game results."""

    def __init__(self, k_factor: float = 20.0,
                 home_advantage: float = 100.0,
                 initial_rating: float = 1500.0):
        self.k_factor = k_factor
        self.home_advantage = home_advantage
        self.initial_rating = initial_rating
        self.ratings: Dict[str, float] = {}

    def _expected_score(self, rating_a: float, rating_b: float) -> float:
        return 1.0 / (1.0 + 10 ** ((rating_b - rating_a) / 400.0))

    def _update_ratings(self, team_a: str, team_b: str,
                        score_a: float, is_home_a: bool):
        ra = self.ratings.get(team_a, self.initial_rating)
        rb = self.ratings.get(team_b, self.initial_rating)

        ha = self.home_advantage if is_home_a else -self.home_advantage
        expected_a = self._expected_score(ra + ha, rb)

        self.ratings[team_a] = ra + self.k_factor * (score_a - expected_a)
        self.ratings[team_b] = rb + self.k_factor * (
            (1 - score_a) - (1 - expected_a)
        )

    def transform(self, raw_data: pd.DataFrame) -> pd.DataFrame:
        """Compute Elo ratings from game results.

        Input must have: game_date, home_team_id, away_team_id,
        home_score, away_score. Must be sorted by game_date.
        """
        self.ratings = {}
        records = []

        for _, row in raw_data.sort_values("game_date").iterrows():
            home = row["home_team_id"]
            away = row["away_team_id"]

            # Record pre-game ratings
            home_elo = self.ratings.get(home, self.initial_rating)
            away_elo = self.ratings.get(away, self.initial_rating)

            records.append({
                "game_date": row["game_date"],
                "home_team_id": home,
                "away_team_id": away,
                "home_elo_pre": home_elo,
                "away_elo_pre": away_elo,
                "elo_diff": home_elo - away_elo + self.home_advantage,
            })

            # Update ratings with result
            if row["home_score"] > row["away_score"]:
                score_a = 1.0
            elif row["home_score"] < row["away_score"]:
                score_a = 0.0
            else:
                score_a = 0.5

            self._update_ratings(home, away, score_a, is_home_a=True)

        return pd.DataFrame(records)

    def feature_names(self) -> List[str]:
        return ["home_elo_pre", "away_elo_pre", "elo_diff"]


class RestDaysTransformer(FeatureTransformer):
    """Compute rest days and travel features."""

    def transform(self, raw_data: pd.DataFrame) -> pd.DataFrame:
        features = raw_data[["team_id", "game_date"]].copy()
        features["rest_days"] = np.nan

        for team_id in raw_data["team_id"].unique():
            mask = raw_data["team_id"] == team_id
            team_games = raw_data.loc[mask].sort_values("game_date")

            dates = pd.to_datetime(team_games["game_date"])
            rest = dates.diff().dt.days
            features.loc[mask, "rest_days"] = rest.values

        # Cap rest days at 14 (anything more is basically full rest)
        features["rest_days"] = features["rest_days"].clip(upper=14)

        # Binary feature: back-to-back game
        features["is_back_to_back"] = (features["rest_days"] == 1).astype(int)

        return features

    def feature_names(self) -> List[str]:
        return ["rest_days", "is_back_to_back"]

Feature Versioning

Feature definitions evolve over time. A feature called "rolling_mean_10" might use a different window size or normalization method in version 2 versus version 1. If you retrain a model, you need to know exactly which version of each feature was used. Without versioning, you risk subtle bugs where a model trained on one feature definition is served with features computed under a different definition.

Key principles of feature versioning:

  1. Every feature transformation has a version string. When you change the logic, increment the version.
  2. Features are stored with their version metadata. The feature store records which transformer version produced each value.
  3. Models record their feature versions at training time. When serving, verify that the feature versions match.

Storing Features in Databases

For a betting pipeline, a relational database (PostgreSQL) or a lightweight embedded database (SQLite) works well for feature storage. The schema should support efficient lookups by entity (team, player) and time.

# features/feature_store.py
import sqlite3
import json
import hashlib
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Any
from datetime import datetime
from pathlib import Path
import logging

logger = logging.getLogger(__name__)


class FeatureStore:
    """Simple file-backed feature store using SQLite.

    Stores computed features with versioning, timestamps, and metadata.
    Supports point-in-time lookups to prevent data leakage in backtesting.
    """

    def __init__(self, db_path: str = "feature_store.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS features (
                    entity_type TEXT NOT NULL,
                    entity_id TEXT NOT NULL,
                    feature_name TEXT NOT NULL,
                    feature_value REAL,
                    feature_version TEXT NOT NULL,
                    event_date TEXT NOT NULL,
                    computed_at TEXT NOT NULL,
                    PRIMARY KEY (entity_type, entity_id, feature_name,
                                 feature_version, event_date)
                )
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_features_lookup
                ON features (entity_type, entity_id, event_date)
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS feature_metadata (
                    feature_name TEXT NOT NULL,
                    feature_version TEXT NOT NULL,
                    description TEXT,
                    transformer_class TEXT,
                    transformer_params TEXT,
                    created_at TEXT NOT NULL,
                    PRIMARY KEY (feature_name, feature_version)
                )
            """)

    def register_feature(self, name: str, version: str,
                          description: str, transformer_class: str,
                          params: Dict[str, Any]):
        """Register a feature definition with metadata."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """INSERT OR REPLACE INTO feature_metadata
                   (feature_name, feature_version, description,
                    transformer_class, transformer_params, created_at)
                   VALUES (?, ?, ?, ?, ?, ?)""",
                (name, version, description, transformer_class,
                 json.dumps(params), datetime.utcnow().isoformat()),
            )
        logger.info(f"Registered feature {name} v{version}")

    def store_features(self, entity_type: str, features_df: pd.DataFrame,
                        feature_version: str):
        """Store a DataFrame of features.

        DataFrame must have columns: entity_id, event_date,
        plus one or more feature value columns.
        """
        computed_at = datetime.utcnow().isoformat()

        feature_cols = [
            c for c in features_df.columns
            if c not in ("entity_id", "event_date")
        ]

        rows = []
        for _, row in features_df.iterrows():
            for col in feature_cols:
                val = row[col]
                if pd.isna(val):
                    continue
                rows.append((
                    entity_type,
                    str(row["entity_id"]),
                    col,
                    float(val),
                    feature_version,
                    str(row["event_date"]),
                    computed_at,
                ))

        with sqlite3.connect(self.db_path) as conn:
            conn.executemany(
                """INSERT OR REPLACE INTO features
                   (entity_type, entity_id, feature_name, feature_value,
                    feature_version, event_date, computed_at)
                   VALUES (?, ?, ?, ?, ?, ?, ?)""",
                rows,
            )

        logger.info(
            f"Stored {len(rows)} feature values for "
            f"{len(features_df)} entities"
        )

    def get_features(self, entity_type: str, entity_id: str,
                      event_date: str, feature_names: List[str] = None,
                      feature_version: str = None) -> Dict[str, float]:
        """Retrieve features for a specific entity and date.

        Point-in-time lookup: returns features as of event_date,
        never using future data.
        """
        query = """
            SELECT feature_name, feature_value
            FROM features
            WHERE entity_type = ? AND entity_id = ? AND event_date <= ?
        """
        params: list = [entity_type, entity_id, event_date]

        if feature_names:
            placeholders = ",".join("?" * len(feature_names))
            query += f" AND feature_name IN ({placeholders})"
            params.extend(feature_names)

        if feature_version:
            query += " AND feature_version = ?"
            params.append(feature_version)

        # Get the most recent value for each feature up to event_date
        query = f"""
            SELECT feature_name, feature_value
            FROM ({query})
            GROUP BY feature_name
            HAVING event_date = MAX(event_date)
        """

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(query, params)
            return {row[0]: row[1] for row in cursor.fetchall()}

    def get_training_matrix(self, entity_type: str,
                             feature_names: List[str],
                             start_date: str, end_date: str,
                             feature_version: str = None
                             ) -> pd.DataFrame:
        """Build a feature matrix for model training.

        Returns a DataFrame with entity_id, event_date, and all
        requested features as columns.
        """
        query = """
            SELECT entity_id, event_date, feature_name, feature_value
            FROM features
            WHERE entity_type = ?
              AND event_date BETWEEN ? AND ?
              AND feature_name IN ({})
        """.format(",".join("?" * len(feature_names)))

        params = [entity_type, start_date, end_date] + feature_names

        if feature_version:
            query += " AND feature_version = ?"
            params.append(feature_version)

        with sqlite3.connect(self.db_path) as conn:
            df = pd.read_sql_query(query, conn, params=params)

        if df.empty:
            return pd.DataFrame()

        # Pivot from long to wide format
        matrix = df.pivot_table(
            index=["entity_id", "event_date"],
            columns="feature_name",
            values="feature_value",
        ).reset_index()

        matrix.columns.name = None
        return matrix

This feature store supports point-in-time correctness, which is critical for backtesting. When you ask for features "as of" a given date, you never get values computed from future data. This prevents the data leakage that plagues many backtesting frameworks.


31.3 Model Training and Serving

Training Pipeline

The training pipeline orchestrates the full cycle: pulling features from the store, splitting data, training the model, evaluating it, and saving artifacts. It should be idempotent---running it twice with the same data produces the same result.

# models/training.py
import pickle
import json
import hashlib
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import (
    log_loss, brier_score_loss, roc_auc_score, accuracy_score
)
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
import logging

logger = logging.getLogger(__name__)


class ModelTrainer:
    """Handles the complete model training lifecycle."""

    def __init__(self, feature_store, model_dir: str = "models/artifacts"):
        self.feature_store = feature_store
        self.model_dir = Path(model_dir)
        self.model_dir.mkdir(parents=True, exist_ok=True)

    def prepare_data(self, sport: str, feature_names: List[str],
                      target_col: str, start_date: str, end_date: str
                      ) -> Tuple[pd.DataFrame, pd.Series]:
        """Pull features and target from feature store."""
        all_features = feature_names + [target_col]
        matrix = self.feature_store.get_training_matrix(
            entity_type=f"{sport}_game",
            feature_names=all_features,
            start_date=start_date,
            end_date=end_date,
        )

        if matrix.empty:
            raise ValueError(
                f"No training data found between {start_date} and {end_date}"
            )

        # Drop rows with missing target
        matrix = matrix.dropna(subset=[target_col])

        # Separate features and target
        X = matrix[feature_names].copy()
        y = matrix[target_col].copy()

        # Log data summary
        logger.info(
            f"Training data: {len(X)} samples, "
            f"{len(feature_names)} features, "
            f"target distribution: {y.mean():.3f}"
        )

        return X, y

    def train(self, X: pd.DataFrame, y: pd.Series,
              model_params: Dict[str, Any] = None,
              calibrate: bool = True
              ) -> Tuple[Any, Dict[str, float]]:
        """Train and evaluate a model using time-series cross-validation."""
        if model_params is None:
            model_params = {
                "n_estimators": 300,
                "max_depth": 5,
                "learning_rate": 0.05,
                "subsample": 0.8,
                "min_samples_leaf": 20,
                "random_state": 42,
            }

        # Time-series cross-validation
        tscv = TimeSeriesSplit(n_splits=5)
        cv_metrics = {
            "log_loss": [], "brier_score": [],
            "auc": [], "accuracy": [],
        }

        for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

            # Handle missing values
            X_train = X_train.fillna(X_train.median())
            X_val = X_val.fillna(X_train.median())

            model = GradientBoostingClassifier(**model_params)
            model.fit(X_train, y_train)

            y_prob = model.predict_proba(X_val)[:, 1]
            y_pred = model.predict(X_val)

            cv_metrics["log_loss"].append(log_loss(y_val, y_prob))
            cv_metrics["brier_score"].append(brier_score_loss(y_val, y_prob))
            cv_metrics["auc"].append(roc_auc_score(y_val, y_prob))
            cv_metrics["accuracy"].append(accuracy_score(y_val, y_pred))

            logger.info(
                f"Fold {fold + 1}: log_loss={cv_metrics['log_loss'][-1]:.4f}, "
                f"AUC={cv_metrics['auc'][-1]:.4f}"
            )

        # Summary metrics
        metrics = {
            k: {"mean": np.mean(v), "std": np.std(v)}
            for k, v in cv_metrics.items()
        }

        # Train final model on all data
        X_filled = X.fillna(X.median())
        final_model = GradientBoostingClassifier(**model_params)
        final_model.fit(X_filled, y)

        # Calibrate probabilities
        if calibrate:
            cal_model = CalibratedClassifierCV(
                final_model, cv=3, method="isotonic"
            )
            cal_model.fit(X_filled, y)
            final_model = cal_model

        logger.info(
            f"Training complete. CV log_loss: "
            f"{metrics['log_loss']['mean']:.4f} "
            f"+/- {metrics['log_loss']['std']:.4f}"
        )

        return final_model, metrics

    def save_model(self, model: Any, metrics: Dict,
                    feature_names: List[str], sport: str,
                    model_params: Dict[str, Any] = None) -> str:
        """Save model artifact with metadata."""
        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        model_id = f"{sport}_{timestamp}"
        model_path = self.model_dir / model_id
        model_path.mkdir(exist_ok=True)

        # Save model
        with open(model_path / "model.pkl", "wb") as f:
            pickle.dump(model, f)

        # Save metadata
        metadata = {
            "model_id": model_id,
            "sport": sport,
            "created_at": datetime.utcnow().isoformat(),
            "feature_names": feature_names,
            "model_params": model_params or {},
            "cv_metrics": metrics,
            "model_type": type(model).__name__,
        }
        with open(model_path / "metadata.json", "w") as f:
            json.dump(metadata, f, indent=2, default=str)

        logger.info(f"Model saved: {model_id}")
        return model_id

Model Registry

The model registry tracks all trained models, their metrics, and which one is currently "active" (serving predictions). This enables A/B testing and easy rollback.

# models/registry.py
import json
import pickle
import sqlite3
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple
import logging

logger = logging.getLogger(__name__)


class ModelRegistry:
    """Tracks trained models and manages active model selection."""

    def __init__(self, db_path: str = "model_registry.db",
                 model_dir: str = "models/artifacts"):
        self.db_path = db_path
        self.model_dir = Path(model_dir)
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS models (
                    model_id TEXT PRIMARY KEY,
                    sport TEXT NOT NULL,
                    model_type TEXT NOT NULL,
                    feature_names TEXT NOT NULL,
                    cv_log_loss REAL,
                    cv_auc REAL,
                    is_active INTEGER DEFAULT 0,
                    created_at TEXT NOT NULL,
                    activated_at TEXT,
                    deactivated_at TEXT,
                    metadata TEXT
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS ab_tests (
                    test_id TEXT PRIMARY KEY,
                    model_a_id TEXT NOT NULL,
                    model_b_id TEXT NOT NULL,
                    sport TEXT NOT NULL,
                    traffic_split REAL DEFAULT 0.5,
                    start_date TEXT NOT NULL,
                    end_date TEXT,
                    status TEXT DEFAULT 'running',
                    results TEXT,
                    FOREIGN KEY (model_a_id) REFERENCES models(model_id),
                    FOREIGN KEY (model_b_id) REFERENCES models(model_id)
                )
            """)

    def register(self, model_id: str, sport: str,
                  metrics: Dict, feature_names: List[str],
                  metadata: Dict = None):
        """Register a trained model in the registry."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """INSERT INTO models
                   (model_id, sport, model_type, feature_names,
                    cv_log_loss, cv_auc, created_at, metadata)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    model_id, sport,
                    metadata.get("model_type", "unknown") if metadata else "unknown",
                    json.dumps(feature_names),
                    metrics.get("log_loss", {}).get("mean"),
                    metrics.get("auc", {}).get("mean"),
                    datetime.utcnow().isoformat(),
                    json.dumps(metadata or {}),
                ),
            )
        logger.info(f"Registered model {model_id}")

    def activate(self, model_id: str, sport: str):
        """Set a model as the active model for a sport."""
        with sqlite3.connect(self.db_path) as conn:
            # Deactivate current active model
            conn.execute(
                """UPDATE models SET is_active = 0, deactivated_at = ?
                   WHERE sport = ? AND is_active = 1""",
                (datetime.utcnow().isoformat(), sport),
            )
            # Activate new model
            conn.execute(
                """UPDATE models SET is_active = 1, activated_at = ?
                   WHERE model_id = ?""",
                (datetime.utcnow().isoformat(), model_id),
            )
        logger.info(f"Activated model {model_id} for {sport}")

    def get_active_model(self, sport: str) -> Optional[Tuple[str, Any]]:
        """Load the currently active model for a sport."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT model_id FROM models WHERE sport = ? AND is_active = 1",
                (sport,),
            )
            row = cursor.fetchone()

        if not row:
            logger.warning(f"No active model for {sport}")
            return None

        model_id = row[0]
        model_path = self.model_dir / model_id / "model.pkl"

        with open(model_path, "rb") as f:
            model = pickle.load(f)

        return model_id, model

    def start_ab_test(self, model_a_id: str, model_b_id: str,
                       sport: str, traffic_split: float = 0.5) -> str:
        """Start an A/B test between two models."""
        test_id = f"ab_{sport}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"

        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """INSERT INTO ab_tests
                   (test_id, model_a_id, model_b_id, sport,
                    traffic_split, start_date)
                   VALUES (?, ?, ?, ?, ?, ?)""",
                (test_id, model_a_id, model_b_id, sport,
                 traffic_split, datetime.utcnow().isoformat()),
            )

        logger.info(
            f"Started A/B test {test_id}: "
            f"{model_a_id} vs {model_b_id} ({traffic_split:.0%} split)"
        )
        return test_id

Serving Predictions via API

For real-time predictions, we expose the model through a lightweight API. FastAPI is the preferred choice for Python model serving due to its async support, automatic documentation, and type validation.

# models/serving.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Dict, List, Optional
import numpy as np
import pandas as pd
from datetime import datetime
import logging
import uvicorn

logger = logging.getLogger(__name__)

app = FastAPI(
    title="Betting Model Prediction Service",
    version="1.0.0",
)

# Global state (initialized at startup)
model_registry = None
feature_store = None


class PredictionRequest(BaseModel):
    sport: str = Field(..., example="nba")
    home_team_id: str = Field(..., example="LAL")
    away_team_id: str = Field(..., example="BOS")
    game_date: str = Field(..., example="2025-01-15")
    additional_features: Optional[Dict[str, float]] = None


class PredictionResponse(BaseModel):
    home_win_probability: float
    away_win_probability: float
    model_id: str
    prediction_timestamp: str
    feature_values: Dict[str, float]


class HealthResponse(BaseModel):
    status: str
    active_models: Dict[str, str]
    uptime_seconds: float


start_time = datetime.utcnow()


@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Check service health and list active models."""
    return HealthResponse(
        status="healthy",
        active_models={},  # Populated from registry
        uptime_seconds=(datetime.utcnow() - start_time).total_seconds(),
    )


@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Generate a prediction for a game."""
    # Load active model
    result = model_registry.get_active_model(request.sport)
    if result is None:
        raise HTTPException(
            status_code=404,
            detail=f"No active model for sport: {request.sport}",
        )
    model_id, model = result

    # Load model metadata to get feature names
    import json
    from pathlib import Path

    meta_path = (
        Path(model_registry.model_dir) / model_id / "metadata.json"
    )
    with open(meta_path) as f:
        metadata = json.load(f)
    feature_names = metadata["feature_names"]

    # Fetch features from feature store
    home_features = feature_store.get_features(
        entity_type="team",
        entity_id=request.home_team_id,
        event_date=request.game_date,
        feature_names=feature_names,
    )
    away_features = feature_store.get_features(
        entity_type="team",
        entity_id=request.away_team_id,
        event_date=request.game_date,
        feature_names=feature_names,
    )

    # Construct feature vector
    feature_vector = {}
    for fname in feature_names:
        if fname.startswith("home_"):
            base = fname.replace("home_", "")
            feature_vector[fname] = home_features.get(base, np.nan)
        elif fname.startswith("away_"):
            base = fname.replace("away_", "")
            feature_vector[fname] = away_features.get(base, np.nan)
        elif fname == "elo_diff":
            home_elo = home_features.get("elo_rating", 1500)
            away_elo = away_features.get("elo_rating", 1500)
            feature_vector[fname] = home_elo - away_elo
        else:
            feature_vector[fname] = home_features.get(
                fname, away_features.get(fname, np.nan)
            )

    # Add any additional features from the request
    if request.additional_features:
        feature_vector.update(request.additional_features)

    # Create DataFrame and predict
    X = pd.DataFrame([feature_vector])[feature_names]
    X = X.fillna(0)  # Simple imputation for missing values

    home_prob = model.predict_proba(X)[0, 1]

    return PredictionResponse(
        home_win_probability=round(float(home_prob), 4),
        away_win_probability=round(1.0 - float(home_prob), 4),
        model_id=model_id,
        prediction_timestamp=datetime.utcnow().isoformat(),
        feature_values=feature_vector,
    )


def start_server(registry, store, host="0.0.0.0", port=8000):
    """Initialize and start the prediction server."""
    global model_registry, feature_store
    model_registry = registry
    feature_store = store
    uvicorn.run(app, host=host, port=port)

31.4 Bet Execution and Portfolio Management

From Model Output to Bet Decision

A model probability by itself does not constitute a bet. The decision to bet requires comparing the model's probability against the market's implied probability, determining whether the edge exceeds your minimum threshold, and sizing the bet appropriately. This is where Chapters 4 and 14's Kelly criterion concepts become operationally real.

# execution/bet_sizing.py
import numpy as np
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)


@dataclass
class BetOpportunity:
    game_id: str
    market: str          # "moneyline", "spread", "total"
    selection: str       # "home", "away", "over", "under"
    model_prob: float    # Model's estimated probability
    market_odds: float   # Decimal odds from sportsbook
    sportsbook: str
    sport: str

    @property
    def implied_prob(self) -> float:
        """Market implied probability from decimal odds."""
        return 1.0 / self.market_odds

    @property
    def edge(self) -> float:
        """Expected edge: model prob minus implied prob."""
        return self.model_prob - self.implied_prob

    @property
    def expected_value(self) -> float:
        """Expected value per dollar wagered."""
        return self.model_prob * (self.market_odds - 1) - (1 - self.model_prob)


def american_to_decimal(american_odds: int) -> float:
    """Convert American odds to decimal odds."""
    if american_odds > 0:
        return 1 + american_odds / 100.0
    else:
        return 1 + 100.0 / abs(american_odds)


def kelly_criterion(prob: float, decimal_odds: float) -> float:
    """Full Kelly criterion bet size as fraction of bankroll.

    f* = (bp - q) / b
    where b = decimal_odds - 1, p = win probability, q = 1 - p
    """
    b = decimal_odds - 1
    q = 1 - prob
    f_star = (b * prob - q) / b
    return max(0.0, f_star)


class BetSizer:
    """Determines bet size using Kelly criterion with safety adjustments."""

    def __init__(self, bankroll: float, kelly_fraction: float = 0.25,
                 max_bet_pct: float = 0.05, min_edge: float = 0.02,
                 max_bet_size: float = 500.0):
        self.bankroll = bankroll
        self.kelly_fraction = kelly_fraction
        self.max_bet_pct = max_bet_pct
        self.min_edge = min_edge
        self.max_bet_size = max_bet_size

    def size_bet(self, opportunity: BetOpportunity
                 ) -> Optional[Dict[str, float]]:
        """Calculate bet size for an opportunity.

        Returns None if the bet doesn't meet criteria.
        Returns dict with sizing details if it passes.
        """
        # Check minimum edge
        if opportunity.edge < self.min_edge:
            logger.debug(
                f"Edge {opportunity.edge:.4f} below threshold "
                f"{self.min_edge} for {opportunity.game_id}"
            )
            return None

        # Check positive expected value
        if opportunity.expected_value <= 0:
            return None

        # Full Kelly
        full_kelly = kelly_criterion(
            opportunity.model_prob, opportunity.market_odds
        )

        if full_kelly <= 0:
            return None

        # Fractional Kelly
        fractional_kelly = full_kelly * self.kelly_fraction

        # Apply maximum bet percentage of bankroll
        capped_fraction = min(fractional_kelly, self.max_bet_pct)

        # Convert to dollar amount
        bet_amount = capped_fraction * self.bankroll

        # Apply absolute maximum
        bet_amount = min(bet_amount, self.max_bet_size)

        # Round to nearest dollar
        bet_amount = round(bet_amount, 0)

        if bet_amount < 1.0:
            return None

        return {
            "bet_amount": bet_amount,
            "full_kelly_fraction": full_kelly,
            "applied_fraction": capped_fraction,
            "edge": opportunity.edge,
            "expected_value": opportunity.expected_value,
            "model_prob": opportunity.model_prob,
            "implied_prob": opportunity.implied_prob,
            "decimal_odds": opportunity.market_odds,
        }

    def update_bankroll(self, new_bankroll: float):
        """Update bankroll after wins/losses."""
        old = self.bankroll
        self.bankroll = new_bankroll
        logger.info(f"Bankroll updated: ${old:.2f} -> ${new_bankroll:.2f}")

Bet Placement Automation

Automating actual bet placement is the most operationally sensitive component. It interacts with real money and sportsbook APIs. Safety is paramount.

# execution/bet_placer.py
import uuid
import sqlite3
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)


@dataclass
class PlacedBet:
    bet_id: str
    game_id: str
    sport: str
    market: str
    selection: str
    sportsbook: str
    odds: float
    stake: float
    model_prob: float
    edge: float
    placed_at: str
    status: str = "pending"  # pending, won, lost, void, cashout
    pnl: float = 0.0


class BetTracker:
    """Tracks all bets placed by the system."""

    def __init__(self, db_path: str = "bets.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS bets (
                    bet_id TEXT PRIMARY KEY,
                    game_id TEXT NOT NULL,
                    sport TEXT NOT NULL,
                    market TEXT NOT NULL,
                    selection TEXT NOT NULL,
                    sportsbook TEXT NOT NULL,
                    odds REAL NOT NULL,
                    stake REAL NOT NULL,
                    model_prob REAL NOT NULL,
                    edge REAL NOT NULL,
                    model_id TEXT,
                    placed_at TEXT NOT NULL,
                    settled_at TEXT,
                    status TEXT DEFAULT 'pending',
                    pnl REAL DEFAULT 0.0,
                    notes TEXT
                )
            """)

    def record_bet(self, bet: PlacedBet, model_id: str = None):
        """Record a placed bet in the database."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """INSERT INTO bets
                   (bet_id, game_id, sport, market, selection,
                    sportsbook, odds, stake, model_prob, edge,
                    model_id, placed_at, status)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    bet.bet_id, bet.game_id, bet.sport, bet.market,
                    bet.selection, bet.sportsbook, bet.odds, bet.stake,
                    bet.model_prob, bet.edge, model_id, bet.placed_at,
                    bet.status,
                ),
            )
        logger.info(
            f"Recorded bet {bet.bet_id}: {bet.selection} @ {bet.odds} "
            f"for ${bet.stake:.2f}"
        )

    def settle_bet(self, bet_id: str, result: str):
        """Settle a bet as won, lost, or void."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT odds, stake FROM bets WHERE bet_id = ?",
                (bet_id,),
            )
            row = cursor.fetchone()
            if not row:
                logger.error(f"Bet {bet_id} not found")
                return

            odds, stake = row
            if result == "won":
                pnl = stake * (odds - 1)
            elif result == "lost":
                pnl = -stake
            else:  # void
                pnl = 0.0

            conn.execute(
                """UPDATE bets
                   SET status = ?, pnl = ?, settled_at = ?
                   WHERE bet_id = ?""",
                (result, pnl, datetime.utcnow().isoformat(), bet_id),
            )
            logger.info(f"Settled bet {bet_id}: {result}, PnL: ${pnl:.2f}")

    def get_daily_pnl(self, date: str = None) -> float:
        """Get total PnL for a given date."""
        if date is None:
            date = datetime.utcnow().strftime("%Y-%m-%d")

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                """SELECT COALESCE(SUM(pnl), 0) FROM bets
                   WHERE placed_at LIKE ?""",
                (f"{date}%",),
            )
            return cursor.fetchone()[0]

    def get_pending_exposure(self) -> float:
        """Get total dollars at risk in pending bets."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT COALESCE(SUM(stake), 0) FROM bets WHERE status = 'pending'"
            )
            return cursor.fetchone()[0]

    def get_performance_summary(self, sport: str = None,
                                  days: int = 30) -> Dict:
        """Get performance summary over recent period."""
        query = """
            SELECT COUNT(*) as total_bets,
                   SUM(CASE WHEN status = 'won' THEN 1 ELSE 0 END) as wins,
                   SUM(CASE WHEN status = 'lost' THEN 1 ELSE 0 END) as losses,
                   SUM(pnl) as total_pnl,
                   SUM(stake) as total_staked,
                   AVG(edge) as avg_edge
            FROM bets
            WHERE status IN ('won', 'lost')
              AND placed_at >= date('now', ?)
        """
        params = [f"-{days} days"]

        if sport:
            query += " AND sport = ?"
            params.append(sport)

        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(query, params)
            row = cursor.fetchone()

        total, wins, losses, pnl, staked, avg_edge = row

        return {
            "total_bets": total or 0,
            "wins": wins or 0,
            "losses": losses or 0,
            "win_rate": (wins / total) if total else 0,
            "total_pnl": pnl or 0,
            "total_staked": staked or 0,
            "roi": (pnl / staked) if staked else 0,
            "avg_edge": avg_edge or 0,
        }

Risk Management

Risk management is the guardrail that prevents catastrophic losses. It enforces hard limits that the system cannot override.

# execution/risk_manager.py
from typing import Optional
from dataclasses import dataclass
from datetime import datetime
import logging

logger = logging.getLogger(__name__)


@dataclass
class RiskLimits:
    daily_loss_limit: float = 500.0
    max_single_bet: float = 200.0
    max_pending_exposure: float = 1000.0
    max_bets_per_day: int = 20
    max_bets_per_game: int = 2
    max_correlated_exposure: float = 500.0
    min_bankroll_fraction: float = 0.5  # Stop if bankroll drops below 50%


class RiskManager:
    """Enforces risk limits on all bet placement."""

    def __init__(self, bet_tracker, limits: RiskLimits = None,
                 initial_bankroll: float = 10000.0):
        self.bet_tracker = bet_tracker
        self.limits = limits or RiskLimits()
        self.initial_bankroll = initial_bankroll
        self._halted = False
        self._halt_reason = ""

    def check_bet(self, stake: float, game_id: str, sport: str,
                   current_bankroll: float) -> tuple:
        """Check if a bet passes all risk checks.

        Returns (approved: bool, reason: str).
        """
        if self._halted:
            return False, f"Trading halted: {self._halt_reason}"

        # Check bankroll floor
        bankroll_fraction = current_bankroll / self.initial_bankroll
        if bankroll_fraction < self.limits.min_bankroll_fraction:
            self._halt("Bankroll below minimum fraction")
            return False, "Bankroll below minimum fraction - halted"

        # Check single bet size
        if stake > self.limits.max_single_bet:
            return False, f"Stake ${stake:.2f} exceeds max ${self.limits.max_single_bet:.2f}"

        # Check daily loss limit
        daily_pnl = self.bet_tracker.get_daily_pnl()
        if daily_pnl <= -self.limits.daily_loss_limit:
            return False, f"Daily loss limit reached: ${daily_pnl:.2f}"

        # Check pending exposure
        pending = self.bet_tracker.get_pending_exposure()
        if pending + stake > self.limits.max_pending_exposure:
            return False, (
                f"Pending exposure ${pending:.2f} + ${stake:.2f} "
                f"exceeds limit ${self.limits.max_pending_exposure:.2f}"
            )

        # Check daily bet count
        today = datetime.utcnow().strftime("%Y-%m-%d")
        import sqlite3
        with sqlite3.connect(self.bet_tracker.db_path) as conn:
            cursor = conn.execute(
                "SELECT COUNT(*) FROM bets WHERE placed_at LIKE ?",
                (f"{today}%",),
            )
            daily_count = cursor.fetchone()[0]

        if daily_count >= self.limits.max_bets_per_day:
            return False, f"Daily bet limit ({self.limits.max_bets_per_day}) reached"

        # Check bets per game
        with sqlite3.connect(self.bet_tracker.db_path) as conn:
            cursor = conn.execute(
                """SELECT COUNT(*) FROM bets
                   WHERE game_id = ? AND status = 'pending'""",
                (game_id,),
            )
            game_count = cursor.fetchone()[0]

        if game_count >= self.limits.max_bets_per_game:
            return False, f"Max bets per game ({self.limits.max_bets_per_game}) reached"

        return True, "Approved"

    def _halt(self, reason: str):
        """Emergency halt all betting."""
        self._halted = True
        self._halt_reason = reason
        logger.critical(f"BETTING HALTED: {reason}")

    def resume(self):
        """Resume betting after manual review."""
        logger.info(f"Betting resumed (was halted: {self._halt_reason})")
        self._halted = False
        self._halt_reason = ""

    @property
    def is_halted(self) -> bool:
        return self._halted

Putting It All Together: The Execution Engine

# execution/engine.py
import uuid
from datetime import datetime
from typing import List
import logging

logger = logging.getLogger(__name__)


class ExecutionEngine:
    """Orchestrates the bet decision and placement process."""

    def __init__(self, bet_sizer, risk_manager, bet_tracker,
                 prediction_service, dry_run: bool = True):
        self.bet_sizer = bet_sizer
        self.risk_manager = risk_manager
        self.bet_tracker = bet_tracker
        self.prediction_service = prediction_service
        self.dry_run = dry_run

    def evaluate_opportunities(self, opportunities: list) -> List[dict]:
        """Evaluate a list of betting opportunities and execute valid bets."""
        executed = []

        for opp in opportunities:
            # Size the bet
            sizing = self.bet_sizer.size_bet(opp)
            if sizing is None:
                continue

            # Risk check
            approved, reason = self.risk_manager.check_bet(
                stake=sizing["bet_amount"],
                game_id=opp.game_id,
                sport=opp.sport,
                current_bankroll=self.bet_sizer.bankroll,
            )

            if not approved:
                logger.info(
                    f"Bet rejected for {opp.game_id}: {reason}"
                )
                continue

            # Create bet record
            from execution.bet_placer import PlacedBet
            bet = PlacedBet(
                bet_id=str(uuid.uuid4())[:8],
                game_id=opp.game_id,
                sport=opp.sport,
                market=opp.market,
                selection=opp.selection,
                sportsbook=opp.sportsbook,
                odds=opp.market_odds,
                stake=sizing["bet_amount"],
                model_prob=opp.model_prob,
                edge=opp.edge,
                placed_at=datetime.utcnow().isoformat(),
            )

            if self.dry_run:
                logger.info(
                    f"[DRY RUN] Would place bet: {bet.selection} "
                    f"@ {bet.odds} for ${bet.stake:.2f} "
                    f"(edge: {bet.edge:.4f})"
                )
            else:
                # In production, this would call the sportsbook API
                logger.info(
                    f"PLACING BET: {bet.selection} @ {bet.odds} "
                    f"for ${bet.stake:.2f}"
                )

            self.bet_tracker.record_bet(bet)
            executed.append({
                "bet": bet,
                "sizing": sizing,
            })

        logger.info(
            f"Evaluated {len(opportunities)} opportunities, "
            f"executed {len(executed)} bets"
        )
        return executed

31.5 Monitoring, Logging, and Alerting

Tracking Model Performance Over Time

A model that worked last month may not work this month. Markets adapt, team rosters change, and the statistical relationships that your model exploits can weaken or disappear. Continuous monitoring is not optional---it is the most important part of a production system.

# monitoring/metrics.py
import sqlite3
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from sklearn.metrics import brier_score_loss, log_loss
import logging

logger = logging.getLogger(__name__)


class PerformanceTracker:
    """Tracks and analyzes model and betting performance over time."""

    def __init__(self, db_path: str = "monitoring.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS predictions (
                    prediction_id TEXT PRIMARY KEY,
                    game_id TEXT NOT NULL,
                    sport TEXT NOT NULL,
                    model_id TEXT NOT NULL,
                    predicted_prob REAL NOT NULL,
                    actual_outcome INTEGER,
                    odds_at_prediction REAL,
                    predicted_at TEXT NOT NULL,
                    settled_at TEXT
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS daily_metrics (
                    date TEXT NOT NULL,
                    sport TEXT NOT NULL,
                    model_id TEXT NOT NULL,
                    num_predictions INTEGER,
                    brier_score REAL,
                    log_loss_value REAL,
                    calibration_error REAL,
                    total_pnl REAL,
                    roi REAL,
                    PRIMARY KEY (date, sport, model_id)
                )
            """)

    def record_prediction(self, prediction_id: str, game_id: str,
                           sport: str, model_id: str,
                           predicted_prob: float,
                           odds: float = None):
        """Record a model prediction for later evaluation."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """INSERT OR REPLACE INTO predictions
                   (prediction_id, game_id, sport, model_id,
                    predicted_prob, odds_at_prediction, predicted_at)
                   VALUES (?, ?, ?, ?, ?, ?, ?)""",
                (prediction_id, game_id, sport, model_id,
                 predicted_prob, odds, datetime.utcnow().isoformat()),
            )

    def update_outcome(self, game_id: str, outcome: int):
        """Update the actual outcome for a game."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """UPDATE predictions
                   SET actual_outcome = ?, settled_at = ?
                   WHERE game_id = ?""",
                (outcome, datetime.utcnow().isoformat(), game_id),
            )

    def compute_daily_metrics(self, date: str = None):
        """Compute and store daily model performance metrics."""
        if date is None:
            date = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")

        with sqlite3.connect(self.db_path) as conn:
            df = pd.read_sql_query(
                """SELECT model_id, sport, predicted_prob, actual_outcome,
                          odds_at_prediction
                   FROM predictions
                   WHERE predicted_at LIKE ? AND actual_outcome IS NOT NULL""",
                conn,
                params=(f"{date}%",),
            )

        if df.empty:
            logger.info(f"No settled predictions for {date}")
            return

        for (model_id, sport), group in df.groupby(["model_id", "sport"]):
            probs = group["predicted_prob"].values
            actuals = group["actual_outcome"].values

            metrics = {
                "num_predictions": len(group),
                "brier_score": brier_score_loss(actuals, probs),
                "log_loss_value": log_loss(
                    actuals, probs, labels=[0, 1]
                ),
                "calibration_error": self._calibration_error(probs, actuals),
            }

            # Compute PnL if odds available
            if group["odds_at_prediction"].notna().any():
                pnl = self._compute_pnl(group)
                metrics["total_pnl"] = pnl["total_pnl"]
                metrics["roi"] = pnl["roi"]

            with sqlite3.connect(self.db_path) as conn:
                conn.execute(
                    """INSERT OR REPLACE INTO daily_metrics
                       (date, sport, model_id, num_predictions,
                        brier_score, log_loss_value, calibration_error,
                        total_pnl, roi)
                       VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                    (date, sport, model_id,
                     metrics["num_predictions"],
                     metrics["brier_score"],
                     metrics["log_loss_value"],
                     metrics["calibration_error"],
                     metrics.get("total_pnl"),
                     metrics.get("roi")),
                )

            logger.info(
                f"{date} | {sport} | {model_id}: "
                f"Brier={metrics['brier_score']:.4f}, "
                f"n={metrics['num_predictions']}"
            )

    def _calibration_error(self, probs: np.ndarray,
                            actuals: np.ndarray,
                            n_bins: int = 10) -> float:
        """Expected calibration error (ECE)."""
        bin_edges = np.linspace(0, 1, n_bins + 1)
        ece = 0.0

        for i in range(n_bins):
            mask = (probs >= bin_edges[i]) & (probs < bin_edges[i + 1])
            if mask.sum() == 0:
                continue
            bin_acc = actuals[mask].mean()
            bin_conf = probs[mask].mean()
            ece += mask.sum() / len(probs) * abs(bin_acc - bin_conf)

        return ece

    def _compute_pnl(self, group: pd.DataFrame) -> Dict:
        """Compute PnL from predictions with odds."""
        total_pnl = 0
        total_staked = 0

        for _, row in group.iterrows():
            if pd.isna(row["odds_at_prediction"]):
                continue
            # Assume flat $100 stakes for tracking purposes
            stake = 100
            total_staked += stake
            if row["actual_outcome"] == 1:
                total_pnl += stake * (row["odds_at_prediction"] - 1)
            else:
                total_pnl -= stake

        return {
            "total_pnl": total_pnl,
            "roi": total_pnl / total_staked if total_staked > 0 else 0,
        }

    def get_trend(self, sport: str, model_id: str,
                   metric: str = "brier_score",
                   days: int = 30) -> pd.DataFrame:
        """Get metric trend over time for dashboarding."""
        with sqlite3.connect(self.db_path) as conn:
            df = pd.read_sql_query(
                f"""SELECT date, {metric}
                    FROM daily_metrics
                    WHERE sport = ? AND model_id = ?
                      AND date >= date('now', ?)
                    ORDER BY date""",
                conn,
                params=(sport, model_id, f"-{days} days"),
            )
        return df

Data Drift Detection

Data drift occurs when the statistical properties of input features change over time. If the distribution of features in production diverges significantly from the distribution seen during training, model predictions become unreliable.

# monitoring/drift_detection.py
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Tuple
import logging

logger = logging.getLogger(__name__)


class DriftDetector:
    """Detects data drift in feature distributions."""

    def __init__(self, reference_data: pd.DataFrame = None,
                 significance_level: float = 0.01):
        self.reference_data = reference_data
        self.significance_level = significance_level
        self.reference_stats = {}

        if reference_data is not None:
            self._compute_reference_stats()

    def _compute_reference_stats(self):
        """Compute summary statistics from reference (training) data."""
        for col in self.reference_data.select_dtypes(include=[np.number]).columns:
            data = self.reference_data[col].dropna()
            self.reference_stats[col] = {
                "mean": data.mean(),
                "std": data.std(),
                "min": data.min(),
                "max": data.max(),
                "q25": data.quantile(0.25),
                "q50": data.quantile(0.50),
                "q75": data.quantile(0.75),
                "values": data.values,  # Keep for KS test
            }

    def set_reference(self, reference_data: pd.DataFrame):
        """Set or update the reference distribution."""
        self.reference_data = reference_data
        self._compute_reference_stats()

    def check_drift(self, current_data: pd.DataFrame
                     ) -> Dict[str, Dict]:
        """Check each feature for distribution drift.

        Uses the Kolmogorov-Smirnov test to compare distributions
        and also checks for range violations.
        """
        results = {}

        for col in current_data.select_dtypes(include=[np.number]).columns:
            if col not in self.reference_stats:
                continue

            current_values = current_data[col].dropna()
            if len(current_values) < 10:
                continue

            ref = self.reference_stats[col]

            # KS test
            ks_stat, ks_pvalue = stats.ks_2samp(
                ref["values"], current_values.values
            )

            # Population Stability Index (PSI)
            psi = self._compute_psi(ref["values"], current_values.values)

            # Mean shift (in standard deviations)
            if ref["std"] > 0:
                mean_shift = abs(current_values.mean() - ref["mean"]) / ref["std"]
            else:
                mean_shift = 0.0

            # Range violation check
            range_violations = (
                (current_values < ref["min"]).sum()
                + (current_values > ref["max"]).sum()
            )
            range_violation_pct = range_violations / len(current_values)

            is_drifted = (
                ks_pvalue < self.significance_level
                or psi > 0.2  # PSI > 0.2 indicates significant shift
                or mean_shift > 3.0
            )

            severity = "none"
            if is_drifted:
                if psi > 0.25 or mean_shift > 5.0:
                    severity = "critical"
                elif psi > 0.1 or mean_shift > 3.0:
                    severity = "warning"
                else:
                    severity = "info"

            results[col] = {
                "is_drifted": is_drifted,
                "severity": severity,
                "ks_statistic": ks_stat,
                "ks_pvalue": ks_pvalue,
                "psi": psi,
                "mean_shift_sigmas": mean_shift,
                "range_violation_pct": range_violation_pct,
                "current_mean": current_values.mean(),
                "reference_mean": ref["mean"],
            }

        drifted_features = [
            f for f, r in results.items() if r["is_drifted"]
        ]
        if drifted_features:
            logger.warning(
                f"Drift detected in {len(drifted_features)} features: "
                f"{drifted_features}"
            )

        return results

    def _compute_psi(self, reference: np.ndarray,
                      current: np.ndarray,
                      n_bins: int = 10) -> float:
        """Compute Population Stability Index."""
        # Create bins from reference data
        bin_edges = np.percentile(
            reference,
            np.linspace(0, 100, n_bins + 1),
        )
        bin_edges[0] = -np.inf
        bin_edges[-1] = np.inf

        # Remove duplicate edges
        bin_edges = np.unique(bin_edges)

        ref_counts = np.histogram(reference, bins=bin_edges)[0]
        cur_counts = np.histogram(current, bins=bin_edges)[0]

        # Convert to proportions with smoothing
        ref_pct = (ref_counts + 1) / (ref_counts.sum() + len(ref_counts))
        cur_pct = (cur_counts + 1) / (cur_counts.sum() + len(cur_counts))

        psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
        return float(psi)

Alerting on Degradation

When monitoring detects problems, the system must notify you immediately. Alerts should be actionable and prioritized.

# monitoring/alerts.py
import json
import smtplib
import requests
from email.mime.text import MIMEText
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
import logging

logger = logging.getLogger(__name__)


@dataclass
class Alert:
    severity: str       # "critical", "warning", "info"
    category: str       # "model_drift", "performance", "risk", "system"
    title: str
    message: str
    metadata: Dict = field(default_factory=dict)
    timestamp: str = ""

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.utcnow().isoformat()


class AlertManager:
    """Manages alert generation and delivery."""

    def __init__(self, config: Dict = None):
        self.config = config or {}
        self.alert_history: List[Alert] = []
        self.suppressed_alerts: Dict[str, datetime] = {}
        self.suppression_minutes = 60  # Don't repeat same alert within 1 hour

    def send_alert(self, alert: Alert):
        """Send an alert through configured channels."""
        # Check suppression
        alert_key = f"{alert.category}:{alert.title}"
        if alert_key in self.suppressed_alerts:
            last_sent = self.suppressed_alerts[alert_key]
            elapsed = (
                datetime.utcnow() - last_sent
            ).total_seconds() / 60
            if elapsed < self.suppression_minutes:
                logger.debug(f"Alert suppressed: {alert_key}")
                return

        self.alert_history.append(alert)
        self.suppressed_alerts[alert_key] = datetime.utcnow()

        # Log the alert
        log_method = {
            "critical": logger.critical,
            "warning": logger.warning,
            "info": logger.info,
        }.get(alert.severity, logger.info)

        log_method(f"ALERT [{alert.severity}] {alert.title}: {alert.message}")

        # Send via configured channels
        if "slack_webhook" in self.config:
            self._send_slack(alert)
        if "email" in self.config:
            self._send_email(alert)

    def _send_slack(self, alert: Alert):
        """Send alert to Slack via webhook."""
        emoji = {
            "critical": ":rotating_light:",
            "warning": ":warning:",
            "info": ":information_source:",
        }.get(alert.severity, "")

        payload = {
            "text": f"{emoji} *{alert.title}*\n{alert.message}",
            "attachments": [
                {
                    "color": {
                        "critical": "#FF0000",
                        "warning": "#FFA500",
                        "info": "#0000FF",
                    }.get(alert.severity, "#808080"),
                    "fields": [
                        {"title": k, "value": str(v), "short": True}
                        for k, v in alert.metadata.items()
                    ],
                }
            ],
        }

        try:
            response = requests.post(
                self.config["slack_webhook"],
                json=payload,
                timeout=10,
            )
            response.raise_for_status()
        except Exception as e:
            logger.error(f"Failed to send Slack alert: {e}")

    def _send_email(self, alert: Alert):
        """Send alert via email."""
        email_config = self.config["email"]

        msg = MIMEText(
            f"{alert.message}\n\n"
            f"Category: {alert.category}\n"
            f"Severity: {alert.severity}\n"
            f"Time: {alert.timestamp}\n"
            f"Metadata: {json.dumps(alert.metadata, indent=2)}"
        )
        msg["Subject"] = f"[{alert.severity.upper()}] {alert.title}"
        msg["From"] = email_config["from"]
        msg["To"] = email_config["to"]

        try:
            with smtplib.SMTP(email_config["smtp_host"],
                              email_config["smtp_port"]) as server:
                server.starttls()
                server.login(email_config["username"],
                             email_config["password"])
                server.send_message(msg)
        except Exception as e:
            logger.error(f"Failed to send email alert: {e}")


class PipelineMonitor:
    """High-level monitor that ties together all monitoring components."""

    def __init__(self, performance_tracker, drift_detector,
                 alert_manager, bet_tracker):
        self.perf = performance_tracker
        self.drift = drift_detector
        self.alerts = alert_manager
        self.bets = bet_tracker

    def run_daily_checks(self):
        """Run all daily monitoring checks."""
        logger.info("Running daily monitoring checks")

        self._check_model_performance()
        self._check_data_drift()
        self._check_risk_metrics()

    def _check_model_performance(self):
        """Alert on model performance degradation."""
        # Compare recent Brier score to historical average
        import sqlite3
        with sqlite3.connect(self.perf.db_path) as conn:
            recent = pd.read_sql_query(
                """SELECT AVG(brier_score) as recent_brier
                   FROM daily_metrics
                   WHERE date >= date('now', '-7 days')""",
                conn,
            )
            baseline = pd.read_sql_query(
                """SELECT AVG(brier_score) as baseline_brier
                   FROM daily_metrics
                   WHERE date BETWEEN date('now', '-90 days')
                     AND date('now', '-7 days')""",
                conn,
            )

        if recent.empty or baseline.empty:
            return

        recent_brier = recent["recent_brier"].iloc[0]
        baseline_brier = baseline["baseline_brier"].iloc[0]

        if recent_brier and baseline_brier:
            degradation = (recent_brier - baseline_brier) / baseline_brier

            if degradation > 0.20:  # 20% worse
                self.alerts.send_alert(Alert(
                    severity="critical",
                    category="performance",
                    title="Model Performance Degradation",
                    message=(
                        f"Brier score increased {degradation:.1%} "
                        f"(baseline: {baseline_brier:.4f}, "
                        f"recent: {recent_brier:.4f}). "
                        f"Consider retraining."
                    ),
                    metadata={
                        "baseline_brier": f"{baseline_brier:.4f}",
                        "recent_brier": f"{recent_brier:.4f}",
                        "degradation_pct": f"{degradation:.1%}",
                    },
                ))
            elif degradation > 0.10:  # 10% worse
                self.alerts.send_alert(Alert(
                    severity="warning",
                    category="performance",
                    title="Model Performance Warning",
                    message=(
                        f"Brier score increased {degradation:.1%}. "
                        f"Monitor closely."
                    ),
                ))

    def _check_data_drift(self):
        """Check for feature distribution drift."""
        # This would pull recent feature data and compare against training
        # Simplified version showing the alert logic
        pass

    def _check_risk_metrics(self):
        """Alert on adverse risk conditions."""
        summary = self.bets.get_performance_summary(days=7)

        if summary["total_bets"] > 10 and summary["roi"] < -0.15:
            self.alerts.send_alert(Alert(
                severity="warning",
                category="risk",
                title="Negative ROI Alert",
                message=(
                    f"7-day ROI is {summary['roi']:.1%} "
                    f"across {summary['total_bets']} bets. "
                    f"Total PnL: ${summary['total_pnl']:.2f}"
                ),
                metadata=summary,
            ))

        pending = self.bets.get_pending_exposure()
        if pending > 2000:
            self.alerts.send_alert(Alert(
                severity="warning",
                category="risk",
                title="High Pending Exposure",
                message=f"Current pending exposure: ${pending:.2f}",
            ))

Dashboard Concepts

A monitoring dashboard provides at-a-glance visibility into the pipeline's health. While a full web dashboard (using Dash, Streamlit, or Grafana) is beyond the scope of this chapter, here are the key metrics to display:

Model Health Panel: - Current Brier score (7-day rolling) - Calibration plot (predicted vs. actual probability) - Feature importance drift over time - Prediction volume trend

Betting Performance Panel: - Cumulative PnL curve - Daily ROI trend - Win rate by sport and market - Edge distribution histogram

Risk Panel: - Current pending exposure - Daily loss vs. limit - Bet count vs. limit - Bankroll trajectory

System Health Panel: - Data ingestion success rate - API latency - Feature freshness (time since last update) - Alert history

A minimal Streamlit dashboard can be created with surprisingly little code:

# monitoring/dashboard.py (run with: streamlit run dashboard.py)
import streamlit as st
import pandas as pd
import sqlite3
from datetime import datetime, timedelta


def load_daily_metrics(db_path="monitoring.db", days=90):
    with sqlite3.connect(db_path) as conn:
        return pd.read_sql_query(
            """SELECT * FROM daily_metrics
               WHERE date >= date('now', ?)
               ORDER BY date""",
            conn,
            params=(f"-{days} days",),
        )


def load_bet_history(db_path="bets.db", days=90):
    with sqlite3.connect(db_path) as conn:
        return pd.read_sql_query(
            """SELECT * FROM bets
               WHERE placed_at >= date('now', ?)
               ORDER BY placed_at""",
            conn,
            params=(f"-{days} days",),
        )


def main():
    st.title("ML Betting Pipeline Dashboard")

    # Sidebar controls
    days = st.sidebar.slider("Lookback Period (days)", 7, 365, 90)

    # Model metrics
    st.header("Model Performance")
    metrics = load_daily_metrics(days=days)
    if not metrics.empty:
        col1, col2, col3 = st.columns(3)
        with col1:
            recent_brier = metrics.tail(7)["brier_score"].mean()
            st.metric("7-Day Brier Score", f"{recent_brier:.4f}")
        with col2:
            recent_ll = metrics.tail(7)["log_loss_value"].mean()
            st.metric("7-Day Log Loss", f"{recent_ll:.4f}")
        with col3:
            daily_preds = metrics.tail(7)["num_predictions"].mean()
            st.metric("Avg Daily Predictions", f"{daily_preds:.0f}")

        st.line_chart(metrics.set_index("date")["brier_score"])

    # Betting performance
    st.header("Betting Performance")
    bets = load_bet_history(days=days)
    if not bets.empty:
        settled = bets[bets["status"].isin(["won", "lost"])]
        if not settled.empty:
            settled = settled.sort_values("placed_at")
            settled["cumulative_pnl"] = settled["pnl"].cumsum()
            st.line_chart(
                settled.set_index("placed_at")["cumulative_pnl"]
            )

            col1, col2, col3, col4 = st.columns(4)
            with col1:
                st.metric("Total Bets", len(settled))
            with col2:
                win_rate = (settled["status"] == "won").mean()
                st.metric("Win Rate", f"{win_rate:.1%}")
            with col3:
                total_pnl = settled["pnl"].sum()
                st.metric("Total PnL", f"${total_pnl:.2f}")
            with col4:
                roi = settled["pnl"].sum() / settled["stake"].sum()
                st.metric("ROI", f"{roi:.1%}")


if __name__ == "__main__":
    main()

31.6 Chapter Summary

This chapter presented a complete, production-ready ML betting pipeline from architecture to monitoring. The key takeaways are:

Architecture matters. Start with a well-structured monolith using clear module boundaries. Separate concerns---ingestion, features, models, execution, monitoring---into distinct modules even if they run in a single process. This makes the system testable, debuggable, and ready for future decomposition into services.

The feature store is the backbone. A feature store with point-in-time correctness, versioning, and efficient retrieval enables reproducible model training, prevents data leakage in backtesting, and ensures that production predictions use the same feature definitions as training.

The model registry enables safe iteration. Track every model version, its training metrics, and which one is currently active. This supports A/B testing, gradual rollouts, and instant rollback when a new model underperforms.

Risk management is non-negotiable. Hard limits on daily losses, position sizes, and total exposure prevent catastrophic outcomes. These limits should be enforced at the system level, not left to human discipline. The risk manager has veto power over every bet.

Monitoring closes the loop. Track model calibration, data drift, and betting performance continuously. Automated alerts catch degradation before it costs you money. A dashboard provides at-a-glance health visibility for the human operator.

The pipeline described in this chapter is not a toy---it is a battle-tested architecture used by quantitative betting operations. The specific implementations can be adapted to your scale, sport, and risk tolerance, but the architectural principles are universal. Build the pipeline right, and it becomes a durable competitive advantage. Build it wrong, and even the best model will fail in production.

In the next chapter, we turn to Natural Language Processing and explore how to extract predictive signals from text data---news articles, injury reports, social media, and more.