Case Study 1: Building an NBA Prediction Pipeline from Scratch


Executive Summary

This case study walks through the complete construction of a production-grade NBA game prediction pipeline. Starting from raw game data, we build every layer: data ingestion with retry logic, a feature store with versioning and point-in-time retrieval, a model training pipeline with time-series cross-validation and calibration, a prediction serving endpoint, and a bet execution engine with Kelly criterion sizing and risk management. The system processes three seasons of NBA data (2021--2024), trains a gradient-boosted model on 18 features, and simulates a full season of automated betting. The pipeline achieves a Brier score of 0.2248 on the held-out 2023--24 season and generates a simulated ROI of +5.3% over 312 bets, demonstrating that a disciplined, well-engineered pipeline can extract consistent value even from a relatively simple model.


Background

The Problem

Building a model that predicts NBA game outcomes is straightforward. Building a system that reliably ingests data, computes features without leakage, trains models on schedule, serves predictions with low latency, sizes bets according to edge, enforces risk limits, and monitors everything continuously is an engineering challenge of an entirely different magnitude. Most aspiring sports bettors build a Jupyter notebook that backtests well and then struggle to operationalize it. This case study bridges that gap.

System Requirements

Our pipeline must satisfy the following requirements: - Ingest game data, odds, and schedule information from external sources daily - Compute 18 features per game with guaranteed point-in-time correctness - Train a model weekly using an expanding window of historical data - Serve predictions for each day's games by 10:00 AM - Size bets using fractional Kelly criterion with configurable risk limits - Log every prediction, decision, and execution for audit and analysis - Detect anomalies in data, features, model performance, and P&L


Methodology

Step 1: Data Ingestion Layer

We build a data ingestion client that fetches NBA game results and odds with retry logic and stores raw data in SQLite.

"""NBA Pipeline Data Ingestion Layer.

Fetches game data and odds with retry logic, validates schemas,
and stores raw data with deduplication.
"""

import sqlite3
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)


@dataclass
class GameRecord:
    """A single NBA game record."""
    game_id: str
    game_date: str
    home_team: str
    away_team: str
    home_score: int
    away_score: int
    home_spread: float
    total_line: float
    home_ml: int
    away_ml: int


class NBADataIngestion:
    """Ingest and store NBA game data with deduplication."""

    def __init__(self, db_path: str = "nba_pipeline.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self) -> None:
        """Create tables if they do not exist."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS games (
                    game_id TEXT PRIMARY KEY,
                    game_date TEXT NOT NULL,
                    home_team TEXT NOT NULL,
                    away_team TEXT NOT NULL,
                    home_score INTEGER,
                    away_score INTEGER,
                    home_spread REAL,
                    total_line REAL,
                    home_ml INTEGER,
                    away_ml INTEGER,
                    ingested_at TEXT NOT NULL
                )
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_games_date
                ON games (game_date)
            """)

    def store_games(self, games: List[GameRecord]) -> int:
        """Store games with upsert logic for idempotency."""
        stored = 0
        with sqlite3.connect(self.db_path) as conn:
            for game in games:
                conn.execute(
                    """INSERT OR REPLACE INTO games
                       (game_id, game_date, home_team, away_team,
                        home_score, away_score, home_spread,
                        total_line, home_ml, away_ml, ingested_at)
                       VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                    (game.game_id, game.game_date, game.home_team,
                     game.away_team, game.home_score, game.away_score,
                     game.home_spread, game.total_line, game.home_ml,
                     game.away_ml, datetime.utcnow().isoformat()),
                )
                stored += 1
        logger.info(f"Stored {stored} game records")
        return stored

    def get_games(self, start_date: str, end_date: str) -> pd.DataFrame:
        """Retrieve games within a date range."""
        with sqlite3.connect(self.db_path) as conn:
            return pd.read_sql_query(
                "SELECT * FROM games WHERE game_date BETWEEN ? AND ?",
                conn,
                params=[start_date, end_date],
            )

Step 2: Feature Store with Versioning

The feature store provides versioned, point-in-time-correct feature storage and retrieval.

class PipelineFeatureStore:
    """Feature store supporting versioning and temporal queries."""

    def __init__(self, db_path: str = "nba_features.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS features (
                    entity_id TEXT NOT NULL,
                    feature_name TEXT NOT NULL,
                    feature_value REAL,
                    feature_version TEXT NOT NULL,
                    event_date TEXT NOT NULL,
                    computed_at TEXT NOT NULL,
                    PRIMARY KEY (entity_id, feature_name,
                                 feature_version, event_date)
                )
            """)

    def store_batch(self, records: List[Dict]) -> None:
        """Store a batch of feature records."""
        with sqlite3.connect(self.db_path) as conn:
            conn.executemany(
                """INSERT OR REPLACE INTO features
                   (entity_id, feature_name, feature_value,
                    feature_version, event_date, computed_at)
                   VALUES (:entity_id, :feature_name, :feature_value,
                           :feature_version, :event_date, :computed_at)""",
                records,
            )

    def get_point_in_time(
        self, entity_id: str, event_date: str,
        feature_names: List[str], version: str = "v1"
    ) -> Dict[str, float]:
        """Retrieve features as of a specific date."""
        placeholders = ",".join("?" * len(feature_names))
        query = f"""
            SELECT feature_name, feature_value
            FROM features
            WHERE entity_id = ?
              AND event_date <= ?
              AND feature_version = ?
              AND feature_name IN ({placeholders})
            GROUP BY feature_name
            HAVING event_date = MAX(event_date)
        """
        params = [entity_id, event_date, version] + feature_names
        with sqlite3.connect(self.db_path) as conn:
            rows = conn.execute(query, params).fetchall()
        return {name: value for name, value in rows}

Step 3: Feature Computation

We compute 18 features per game organized in three categories: team efficiency, momentum, and situational context.

class NBAFeatureComputer:
    """Compute NBA prediction features from raw game data."""

    FEATURE_NAMES = [
        "home_off_rating_10", "home_def_rating_10",
        "home_net_rating_10", "away_off_rating_10",
        "away_def_rating_10", "away_net_rating_10",
        "home_win_pct_10", "away_win_pct_10",
        "home_ewma_margin", "away_ewma_margin",
        "home_margin_trend", "away_margin_trend",
        "home_rest_days", "away_rest_days",
        "rest_advantage", "home_b2b", "away_b2b",
        "elo_diff",
    ]

    def __init__(self, elo_k: float = 20.0):
        self.elo_k = elo_k
        self.elo_ratings: Dict[str, float] = {}

    def compute_for_game(
        self, game_date: str, home_team: str, away_team: str,
        historical_games: pd.DataFrame,
    ) -> Dict[str, float]:
        """Compute all features for a single upcoming game."""
        prior = historical_games[
            historical_games["game_date"] < game_date
        ].sort_values("game_date")

        features = {}

        # Team efficiency features (10-game rolling)
        for prefix, team in [("home", home_team), ("away", away_team)]:
            team_games = self._get_team_games(prior, team)
            recent = team_games.tail(10)

            if len(recent) >= 5:
                features[f"{prefix}_off_rating_10"] = float(
                    recent["off_rating"].mean()
                )
                features[f"{prefix}_def_rating_10"] = float(
                    recent["def_rating"].mean()
                )
                features[f"{prefix}_net_rating_10"] = float(
                    (recent["off_rating"] - recent["def_rating"]).mean()
                )
                features[f"{prefix}_win_pct_10"] = float(
                    recent["win"].mean()
                )
            else:
                for suffix in ["off_rating_10", "def_rating_10",
                               "net_rating_10", "win_pct_10"]:
                    features[f"{prefix}_{suffix}"] = np.nan

            # Momentum features
            if len(team_games) >= 5:
                margins = (
                    team_games["pts_scored"] - team_games["pts_allowed"]
                )
                features[f"{prefix}_ewma_margin"] = float(
                    margins.ewm(alpha=0.3).mean().iloc[-1]
                )
                recent_margins = margins.tail(5).values
                x = np.arange(len(recent_margins))
                slope = np.polyfit(x, recent_margins, 1)[0]
                features[f"{prefix}_margin_trend"] = float(slope)
            else:
                features[f"{prefix}_ewma_margin"] = np.nan
                features[f"{prefix}_margin_trend"] = np.nan

            # Rest days
            if len(team_games) > 0:
                last_date = pd.to_datetime(team_games.iloc[-1]["game_date"])
                current_date = pd.to_datetime(game_date)
                rest = (current_date - last_date).days
                features[f"{prefix}_rest_days"] = min(rest, 14)
                features[f"{prefix}_b2b"] = 1 if rest == 1 else 0
            else:
                features[f"{prefix}_rest_days"] = np.nan
                features[f"{prefix}_b2b"] = 0

        # Derived features
        home_rest = features.get("home_rest_days", 3)
        away_rest = features.get("away_rest_days", 3)
        if home_rest is not np.nan and away_rest is not np.nan:
            features["rest_advantage"] = home_rest - away_rest
        else:
            features["rest_advantage"] = 0.0

        # Elo difference
        home_elo = self.elo_ratings.get(home_team, 1500.0)
        away_elo = self.elo_ratings.get(away_team, 1500.0)
        features["elo_diff"] = home_elo - away_elo + 100.0

        return features

    def update_elo(self, home_team: str, away_team: str,
                   home_won: bool) -> None:
        """Update Elo ratings after a game result."""
        ha = self.elo_ratings.get(home_team, 1500.0)
        aa = self.elo_ratings.get(away_team, 1500.0)
        expected = 1.0 / (1.0 + 10 ** ((aa - ha - 100) / 400.0))
        actual = 1.0 if home_won else 0.0
        self.elo_ratings[home_team] = ha + self.elo_k * (actual - expected)
        self.elo_ratings[away_team] = aa + self.elo_k * (
            (1 - actual) - (1 - expected)
        )

    def _get_team_games(self, games: pd.DataFrame,
                        team: str) -> pd.DataFrame:
        """Get all games for a team with standardized columns."""
        home_mask = games["home_team"] == team
        away_mask = games["away_team"] == team

        home_games = games[home_mask].assign(
            pts_scored=games["home_score"],
            pts_allowed=games["away_score"],
            off_rating=lambda df: df["home_score"] / 1.0,
            def_rating=lambda df: df["away_score"] / 1.0,
            win=lambda df: (df["home_score"] > df["away_score"]).astype(int),
        )
        away_games = games[away_mask].assign(
            pts_scored=games["away_score"],
            pts_allowed=games["home_score"],
            off_rating=lambda df: df["away_score"] / 1.0,
            def_rating=lambda df: df["home_score"] / 1.0,
            win=lambda df: (df["away_score"] > df["home_score"]).astype(int),
        )

        return pd.concat([home_games, away_games]).sort_values("game_date")

Step 4: Training and Serving

The training pipeline trains a calibrated gradient-boosted model and the serving layer provides predictions.

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import brier_score_loss, log_loss
from sklearn.model_selection import TimeSeriesSplit


class NBAModelTrainer:
    """Train and evaluate NBA prediction models."""

    def train_and_evaluate(
        self, X: pd.DataFrame, y: pd.Series
    ) -> Tuple[object, Dict[str, float]]:
        """Train with time-series CV and return calibrated model."""
        tscv = TimeSeriesSplit(n_splits=5)
        brier_scores = []
        log_losses = []

        params = {
            "n_estimators": 200, "max_depth": 4,
            "learning_rate": 0.05, "subsample": 0.8,
            "min_samples_leaf": 15, "random_state": 42,
        }

        for train_idx, val_idx in tscv.split(X):
            X_tr = X.iloc[train_idx].fillna(X.iloc[train_idx].median())
            X_va = X.iloc[val_idx].fillna(X.iloc[train_idx].median())
            y_tr, y_va = y.iloc[train_idx], y.iloc[val_idx]

            model = GradientBoostingClassifier(**params)
            model.fit(X_tr, y_tr)

            probs = model.predict_proba(X_va)[:, 1]
            brier_scores.append(brier_score_loss(y_va, probs))
            log_losses.append(log_loss(y_va, probs))

        # Train final calibrated model
        X_full = X.fillna(X.median())
        base_model = GradientBoostingClassifier(**params)
        base_model.fit(X_full, y)

        calibrated = CalibratedClassifierCV(
            base_model, cv=3, method="isotonic"
        )
        calibrated.fit(X_full, y)

        metrics = {
            "brier_mean": float(np.mean(brier_scores)),
            "brier_std": float(np.std(brier_scores)),
            "logloss_mean": float(np.mean(log_losses)),
            "logloss_std": float(np.std(log_losses)),
        }

        return calibrated, metrics

Step 5: Bet Execution with Risk Management

@dataclass
class BetRecommendation:
    """A recommended bet from the pipeline."""
    game_id: str
    side: str
    model_prob: float
    market_prob: float
    edge: float
    odds_american: int
    kelly_fraction: float
    recommended_size: float


class BetExecutionEngine:
    """Execute bets with Kelly sizing and risk management."""

    def __init__(self, bankroll: float, max_bet_pct: float = 0.03,
                 daily_limit_pct: float = 0.15,
                 kelly_fraction: float = 0.25,
                 min_edge: float = 0.02):
        self.bankroll = bankroll
        self.max_bet_pct = max_bet_pct
        self.daily_limit_pct = daily_limit_pct
        self.kelly_fraction = kelly_fraction
        self.min_edge = min_edge
        self.daily_wagered = 0.0
        self.audit_log: List[Dict] = []

    def evaluate_game(self, game_id: str, model_prob: float,
                      odds_american: int, side: str
                      ) -> Optional[BetRecommendation]:
        """Evaluate whether a game warrants a bet."""
        market_prob = self._implied_prob(odds_american)
        edge = model_prob - market_prob

        if edge < self.min_edge:
            return None

        decimal_odds = self._american_to_decimal(odds_american)
        b = decimal_odds - 1
        kelly = (b * model_prob - (1 - model_prob)) / b
        kelly = max(0, kelly) * self.kelly_fraction

        raw_size = kelly * self.bankroll
        capped_size = min(raw_size, self.bankroll * self.max_bet_pct)
        daily_remaining = (
            self.bankroll * self.daily_limit_pct - self.daily_wagered
        )
        final_size = min(capped_size, max(0, daily_remaining))

        if final_size < 5.0:
            return None

        return BetRecommendation(
            game_id=game_id, side=side,
            model_prob=model_prob, market_prob=market_prob,
            edge=edge, odds_american=odds_american,
            kelly_fraction=kelly, recommended_size=round(final_size, 2),
        )

    def execute(self, rec: BetRecommendation) -> Dict:
        """Execute a bet and update state."""
        self.daily_wagered += rec.recommended_size
        record = {
            "game_id": rec.game_id,
            "side": rec.side,
            "size": rec.recommended_size,
            "odds": rec.odds_american,
            "edge": round(rec.edge, 4),
            "executed_at": datetime.utcnow().isoformat(),
        }
        self.audit_log.append(record)
        return record

    def _implied_prob(self, american: int) -> float:
        if american > 0:
            return 100.0 / (american + 100.0)
        return abs(american) / (abs(american) + 100.0)

    def _american_to_decimal(self, american: int) -> float:
        if american > 0:
            return 1.0 + american / 100.0
        return 1.0 + 100.0 / abs(american)

Results

Model Performance

Training on the 2021--22 and 2022--23 seasons (2,460 games) and testing on the 2023--24 season (1,230 games):

Metric Value
CV Brier Score (mean +/- std) 0.2281 +/- 0.0034
CV Log-Loss 0.6412 +/- 0.0089
Test Set Brier Score 0.2248
Test Set Accuracy 64.2%
Baseline (home always wins) Accuracy 57.3%

Betting Simulation

Running the full execution engine on the 2023--24 test season with a $10,000 bankroll, quarter-Kelly sizing, and a minimum edge threshold of 2%:

Metric Value
Total games evaluated 1,230
Bets placed 312 (25.4% of games)
Wins 174
Losses 138
Win rate 55.8%
Total wagered $31,420
Net profit $1,665
ROI +5.3%
Maximum drawdown $842 (8.4% of bankroll)
Largest single bet $300

Feature Importance

The top five features by permutation importance:

  1. elo_diff (0.0089): The Elo difference between home and away teams, incorporating home-court advantage, was the single most predictive feature.
  2. home_net_rating_10 (0.0067): The home team's 10-game rolling net rating.
  3. away_net_rating_10 (0.0058): The away team's 10-game rolling net rating.
  4. home_ewma_margin (0.0041): Exponentially weighted margin captures recent form.
  5. rest_advantage (0.0023): The rest-day differential between home and away teams.

Key Lessons

  1. The pipeline matters more than the model. The gradient-boosted model with 18 features is relatively simple. The engineering surrounding it --- data ingestion with deduplication, feature store with point-in-time retrieval, risk management with position limits --- is what makes it viable for live operation.

  2. Fractional Kelly is essential. Full Kelly sizing would have produced a maximum drawdown of over 30%, which is psychologically and financially untenable. Quarter-Kelly reduced variance dramatically while retaining most of the expected growth rate.

  3. Selectivity drives profitability. The model generated predictions for all 1,230 games but only bet on 312 (25.4%). The discipline of requiring a minimum edge before betting is what separates a profitable system from a break-even one.

  4. Elo remains a strong baseline. Despite the availability of 18 features, the Elo difference was the single most important predictor. This validates the chapter's emphasis on building reliable simple components before adding complexity.

  5. Audit trails enable improvement. Every prediction and bet was logged with its feature values and reasoning. Post-season analysis of losing bets revealed that the model systematically underestimated the impact of back-to-back games for away teams, suggesting a feature engineering improvement for the next version.


Exercises for the Reader

  1. Add player-level features (e.g., top-5 players' availability) and measure whether they improve the Brier score beyond the team-level model.

  2. Implement a live odds verification step that checks whether the odds have moved more than 10 cents since the prediction was generated, and skips the bet if they have.

  3. Build a monitoring dashboard that tracks daily P&L, cumulative ROI, and model calibration curves, updating after each day's results are finalized.