8 min read

Building a game prediction system represents one of the most challenging and rewarding applications of basketball analytics. This capstone project guides you through creating a production-ready system that combines statistical modeling, machine...

Capstone Project 3: Develop a Game Prediction System

Project Overview

Building a game prediction system represents one of the most challenging and rewarding applications of basketball analytics. This capstone project guides you through creating a production-ready system that combines statistical modeling, machine learning, and real-time data processing to generate accurate game predictions.

The system we build will incorporate multiple prediction methodologies, from classic Elo ratings to modern machine learning approaches, creating an ensemble that leverages the strengths of each method. By the end of this project, you will have a fully functional prediction API capable of generating win probabilities, point spread predictions, and confidence intervals for any NBA matchup.

Learning Objectives

Upon completing this capstone project, you will be able to:

  1. Design and implement a scalable prediction system architecture
  2. Build robust data pipelines for basketball statistics
  3. Engineer predictive features from raw game data
  4. Implement and calibrate Elo rating systems
  5. Train and evaluate machine learning models for game prediction
  6. Integrate real-time data sources for live predictions
  7. Deploy a prediction API with proper monitoring
  8. Establish processes for continuous model improvement

Project Scope

Our prediction system will handle three primary prediction types:

  • Win Probability: The likelihood each team wins the game
  • Point Spread: The expected margin of victory
  • Total Points: The expected combined score

The system architecture supports both pre-game predictions and in-game probability updates, though this capstone focuses primarily on pre-game prediction with foundations for live prediction expansion.


System Architecture

A production prediction system requires careful architectural planning to ensure reliability, scalability, and maintainability. Our architecture follows a modular design pattern that separates concerns and enables independent scaling of components.

High-Level Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     Data Sources Layer                          │
├─────────────────────────────────────────────────────────────────┤
│  NBA API  │  Injury Reports  │  Schedule  │  Historical Data   │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                     Data Pipeline Layer                         │
├─────────────────────────────────────────────────────────────────┤
│  Collectors  │  Validators  │  Transformers  │  Storage         │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                     Feature Engineering Layer                   │
├─────────────────────────────────────────────────────────────────┤
│  Team Stats  │  Player Stats  │  Contextual  │  Feature Store  │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                     Model Layer                                 │
├─────────────────────────────────────────────────────────────────┤
│  Elo System  │  ML Models  │  Ensemble  │  Model Registry      │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                     API Layer                                   │
├─────────────────────────────────────────────────────────────────┤
│  REST API  │  Authentication  │  Rate Limiting  │  Caching     │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                     Monitoring Layer                            │
├─────────────────────────────────────────────────────────────────┤
│  Metrics  │  Logging  │  Alerting  │  Performance Dashboard    │
└─────────────────────────────────────────────────────────────────┘

Component Responsibilities

Data Sources Layer: External data providers including official NBA statistics, injury reporting services, and historical databases. This layer abstracts the complexity of multiple data formats and delivery mechanisms.

Data Pipeline Layer: Responsible for data ingestion, validation, transformation, and storage. Implements retry logic, error handling, and data quality checks to ensure downstream components receive clean, consistent data.

Feature Engineering Layer: Transforms raw data into predictive features. Maintains a feature store for efficient retrieval and ensures feature consistency between training and inference.

Model Layer: Houses all prediction models including Elo ratings and machine learning models. Implements the ensemble logic that combines individual model outputs into final predictions.

API Layer: Exposes prediction functionality through a REST API. Handles authentication, rate limiting, request validation, and response formatting.

Monitoring Layer: Tracks system health, prediction accuracy, and model performance. Generates alerts when metrics fall outside acceptable ranges.

Technology Stack

For this implementation, we use the following technologies:

  • Python 3.10+: Primary programming language
  • FastAPI: API framework with automatic documentation
  • SQLite/PostgreSQL: Data storage (SQLite for development, PostgreSQL for production)
  • Pandas/NumPy: Data manipulation and numerical computing
  • Scikit-learn/XGBoost: Machine learning models
  • Redis: Caching layer (optional for production)
  • Docker: Containerization for deployment

Data Pipeline Design

The data pipeline forms the foundation of our prediction system. Reliable predictions require reliable data, making pipeline robustness a critical concern.

Data Sources

Our system integrates data from multiple sources:

Official Statistics: Game results, box scores, team statistics, and player statistics. These form the core input for our models.

Injury Reports: Current player availability and injury status. Critical for adjusting predictions based on lineup changes.

Schedule Information: Game dates, times, locations, and travel requirements. Enables calculation of rest days and travel fatigue.

Historical Data: Multiple seasons of historical games for model training and Elo initialization.

Pipeline Architecture

"""
Data Pipeline Module

This module handles all data collection, validation, and storage
for the prediction system.
"""

from dataclasses import dataclass
from datetime import datetime, date
from typing import Optional, List, Dict, Any
from enum import Enum
import logging

logger = logging.getLogger(__name__)


class DataSource(Enum):
    """Enumeration of supported data sources."""
    NBA_API = "nba_api"
    INJURY_REPORT = "injury_report"
    SCHEDULE = "schedule"
    HISTORICAL = "historical"


@dataclass
class GameData:
    """Represents a single game's data."""
    game_id: str
    date: date
    home_team: str
    away_team: str
    home_score: Optional[int] = None
    away_score: Optional[int] = None
    venue: Optional[str] = None
    attendance: Optional[int] = None

    @property
    def is_completed(self) -> bool:
        """Check if the game has been completed."""
        return self.home_score is not None and self.away_score is not None

    @property
    def home_win(self) -> Optional[bool]:
        """Return True if home team won, None if game not completed."""
        if not self.is_completed:
            return None
        return self.home_score > self.away_score


@dataclass
class TeamStats:
    """Aggregated team statistics."""
    team_id: str
    games_played: int
    wins: int
    losses: int
    points_per_game: float
    points_allowed_per_game: float
    offensive_rating: float
    defensive_rating: float
    net_rating: float
    pace: float
    effective_fg_pct: float
    turnover_pct: float
    offensive_rebound_pct: float
    free_throw_rate: float

    @property
    def win_pct(self) -> float:
        """Calculate win percentage."""
        if self.games_played == 0:
            return 0.5
        return self.wins / self.games_played


class DataValidator:
    """Validates incoming data for consistency and completeness."""

    @staticmethod
    def validate_game(game: GameData) -> List[str]:
        """
        Validate a game record.

        Returns a list of validation errors (empty if valid).
        """
        errors = []

        if not game.game_id:
            errors.append("Missing game_id")

        if not game.home_team or not game.away_team:
            errors.append("Missing team information")

        if game.home_team == game.away_team:
            errors.append("Home and away teams cannot be the same")

        if game.is_completed:
            if game.home_score < 0 or game.away_score < 0:
                errors.append("Scores cannot be negative")
            if game.home_score == game.away_score:
                errors.append("NBA games cannot end in a tie")

        return errors

    @staticmethod
    def validate_team_stats(stats: TeamStats) -> List[str]:
        """Validate team statistics."""
        errors = []

        if stats.games_played < 0:
            errors.append("Games played cannot be negative")

        if stats.wins + stats.losses != stats.games_played:
            errors.append("Wins + losses must equal games played")

        if not 0 <= stats.effective_fg_pct <= 1:
            errors.append("Effective FG% must be between 0 and 1")

        return errors


class DataPipeline:
    """
    Main data pipeline class.

    Handles data collection, validation, transformation, and storage.
    """

    def __init__(self, storage_backend: str = "sqlite"):
        self.storage_backend = storage_backend
        self.validator = DataValidator()
        self._cache: Dict[str, Any] = {}

    def collect_games(
        self,
        start_date: date,
        end_date: date,
        source: DataSource = DataSource.NBA_API
    ) -> List[GameData]:
        """
        Collect game data for a date range.

        In production, this would call external APIs.
        """
        logger.info(f"Collecting games from {start_date} to {end_date}")

        # Implementation would connect to actual data source
        # For demonstration, we show the interface pattern
        games = self._fetch_from_source(source, start_date, end_date)

        # Validate all collected games
        valid_games = []
        for game in games:
            errors = self.validator.validate_game(game)
            if errors:
                logger.warning(f"Invalid game {game.game_id}: {errors}")
            else:
                valid_games.append(game)

        return valid_games

    def _fetch_from_source(
        self,
        source: DataSource,
        start_date: date,
        end_date: date
    ) -> List[GameData]:
        """Fetch data from specified source."""
        # Implementation varies by source
        # This is a placeholder for the actual API calls
        return []

    def get_team_stats(
        self,
        team_id: str,
        as_of_date: Optional[date] = None
    ) -> TeamStats:
        """
        Get team statistics, optionally as of a specific date.

        Uses caching to improve performance.
        """
        cache_key = f"team_stats_{team_id}_{as_of_date}"

        if cache_key in self._cache:
            return self._cache[cache_key]

        stats = self._calculate_team_stats(team_id, as_of_date)
        self._cache[cache_key] = stats

        return stats

    def _calculate_team_stats(
        self,
        team_id: str,
        as_of_date: Optional[date]
    ) -> TeamStats:
        """Calculate team statistics from game data."""
        # Implementation would aggregate from stored games
        pass

Data Quality Monitoring

Data quality issues can silently degrade prediction accuracy. Implement comprehensive monitoring:

class DataQualityMonitor:
    """Monitors data quality metrics over time."""

    def __init__(self):
        self.metrics_history: List[Dict[str, Any]] = []

    def check_completeness(self, games: List[GameData]) -> float:
        """
        Calculate data completeness score.

        Returns percentage of games with all required fields.
        """
        if not games:
            return 0.0

        complete_count = sum(
            1 for g in games
            if g.game_id and g.home_team and g.away_team and g.date
        )

        return complete_count / len(games)

    def check_timeliness(self, games: List[GameData]) -> Dict[str, Any]:
        """Check if data is being collected in a timely manner."""
        if not games:
            return {"status": "no_data", "lag_hours": None}

        latest_game = max(games, key=lambda g: g.date)
        lag = datetime.now().date() - latest_game.date

        return {
            "status": "ok" if lag.days <= 1 else "delayed",
            "lag_days": lag.days,
            "latest_game_date": latest_game.date.isoformat()
        }

    def generate_report(self) -> Dict[str, Any]:
        """Generate a comprehensive data quality report."""
        return {
            "timestamp": datetime.now().isoformat(),
            "completeness": self.metrics_history[-1].get("completeness", 0),
            "timeliness": self.metrics_history[-1].get("timeliness", {}),
            "total_records": self.metrics_history[-1].get("total_records", 0)
        }

Feature Engineering

Feature engineering transforms raw data into predictive signals. The quality of features directly impacts model performance, making this one of the most important components of our system.

Feature Categories

Our system uses five main feature categories:

  1. Team Performance Features: Offensive and defensive ratings, pace, shooting efficiency
  2. Matchup Features: Historical head-to-head performance, style matchup indicators
  3. Rest and Schedule Features: Days of rest, back-to-back games, travel distance
  4. Injury Features: Impact of missing players on team performance
  5. Venue Features: Home court advantage, altitude, travel factors

Team Performance Features

"""
Feature Engineering Module

Transforms raw basketball data into predictive features.
"""

from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from datetime import date, timedelta
import numpy as np


@dataclass
class TeamPerformanceFeatures:
    """Features derived from team performance metrics."""

    # Offensive metrics
    offensive_rating: float  # Points per 100 possessions
    effective_fg_pct: float  # (FG + 0.5 * 3P) / FGA
    true_shooting_pct: float  # PTS / (2 * (FGA + 0.44 * FTA))
    assist_rate: float  # AST / FGM
    turnover_rate: float  # TOV / possessions
    offensive_rebound_rate: float  # ORB / (ORB + opp DRB)

    # Defensive metrics
    defensive_rating: float  # Points allowed per 100 possessions
    opponent_efg_pct: float  # Opponent effective FG%
    steal_rate: float
    block_rate: float
    defensive_rebound_rate: float

    # Pace and style
    pace: float  # Possessions per 48 minutes
    three_point_rate: float  # 3PA / FGA
    free_throw_rate: float  # FTA / FGA

    # Recent form (last 10 games)
    recent_offensive_rating: float
    recent_defensive_rating: float
    recent_win_pct: float


class FeatureEngineer:
    """
    Main feature engineering class.

    Computes all features needed for game prediction.
    """

    # Team location coordinates for travel calculation
    TEAM_LOCATIONS: Dict[str, Tuple[float, float]] = {
        "ATL": (33.7573, -84.3963),
        "BOS": (42.3662, -71.0621),
        "BKN": (40.6826, -73.9754),
        "CHA": (35.2251, -80.8392),
        "CHI": (41.8807, -87.6742),
        "CLE": (41.4966, -81.6882),
        "DAL": (32.7905, -96.8103),
        "DEN": (39.7487, -105.0077),
        "DET": (42.3410, -83.0553),
        "GSW": (37.7680, -122.3879),
        "HOU": (29.7508, -95.3621),
        "IND": (39.7640, -86.1555),
        "LAC": (34.0430, -118.2673),
        "LAL": (34.0430, -118.2673),
        "MEM": (35.1382, -90.0505),
        "MIA": (25.7814, -80.1870),
        "MIL": (43.0451, -87.9172),
        "MIN": (44.9795, -93.2761),
        "NOP": (29.9490, -90.0821),
        "NYK": (40.7505, -73.9934),
        "OKC": (35.4634, -97.5151),
        "ORL": (28.5392, -81.3839),
        "PHI": (39.9012, -75.1720),
        "PHX": (33.4457, -112.0712),
        "POR": (45.5316, -122.6668),
        "SAC": (38.5802, -121.4997),
        "SAS": (29.4271, -98.4375),
        "TOR": (43.6435, -79.3791),
        "UTA": (40.7683, -111.9011),
        "WAS": (38.8981, -77.0209)
    }

    # Historical home court advantage (points)
    HOME_COURT_ADVANTAGE = 3.2

    def __init__(self, data_pipeline):
        self.data_pipeline = data_pipeline

    def compute_game_features(
        self,
        home_team: str,
        away_team: str,
        game_date: date
    ) -> Dict[str, float]:
        """
        Compute all features for a game prediction.

        Returns a dictionary of feature names to values.
        """
        features = {}

        # Team performance features
        home_perf = self._get_team_performance(home_team, game_date)
        away_perf = self._get_team_performance(away_team, game_date)

        features.update(self._performance_diff_features(home_perf, away_perf))

        # Rest and schedule features
        features.update(self._rest_features(home_team, away_team, game_date))

        # Travel features
        features.update(self._travel_features(home_team, away_team, game_date))

        # Home court advantage
        features["home_court_advantage"] = self.HOME_COURT_ADVANTAGE

        # Matchup features
        features.update(self._matchup_features(home_team, away_team, game_date))

        return features

    def _get_team_performance(
        self,
        team: str,
        as_of_date: date
    ) -> TeamPerformanceFeatures:
        """Get team performance features as of a specific date."""
        stats = self.data_pipeline.get_team_stats(team, as_of_date)

        # Calculate recent form (last 10 games)
        recent_games = self._get_recent_games(team, as_of_date, n_games=10)

        return TeamPerformanceFeatures(
            offensive_rating=stats.offensive_rating,
            effective_fg_pct=stats.effective_fg_pct,
            true_shooting_pct=self._calculate_ts_pct(stats),
            assist_rate=self._calculate_assist_rate(stats),
            turnover_rate=stats.turnover_pct,
            offensive_rebound_rate=stats.offensive_rebound_pct,
            defensive_rating=stats.defensive_rating,
            opponent_efg_pct=self._calculate_opp_efg(stats),
            steal_rate=self._calculate_steal_rate(stats),
            block_rate=self._calculate_block_rate(stats),
            defensive_rebound_rate=self._calculate_drb_rate(stats),
            pace=stats.pace,
            three_point_rate=self._calculate_3pt_rate(stats),
            free_throw_rate=stats.free_throw_rate,
            recent_offensive_rating=self._recent_off_rating(recent_games),
            recent_defensive_rating=self._recent_def_rating(recent_games),
            recent_win_pct=self._recent_win_pct(recent_games)
        )

    def _performance_diff_features(
        self,
        home: TeamPerformanceFeatures,
        away: TeamPerformanceFeatures
    ) -> Dict[str, float]:
        """Calculate performance differential features."""
        return {
            # Net rating differential
            "net_rating_diff": (
                (home.offensive_rating - home.defensive_rating) -
                (away.offensive_rating - away.defensive_rating)
            ),

            # Offensive comparison
            "offensive_rating_diff": home.offensive_rating - away.offensive_rating,
            "efg_diff": home.effective_fg_pct - away.effective_fg_pct,
            "ts_pct_diff": home.true_shooting_pct - away.true_shooting_pct,

            # Defensive comparison
            "defensive_rating_diff": away.defensive_rating - home.defensive_rating,
            "opp_efg_diff": away.opponent_efg_pct - home.opponent_efg_pct,

            # Pace and style
            "pace_diff": home.pace - away.pace,
            "avg_pace": (home.pace + away.pace) / 2,
            "three_point_rate_diff": home.three_point_rate - away.three_point_rate,

            # Turnover differential
            "turnover_diff": away.turnover_rate - home.turnover_rate,

            # Rebounding differential
            "orb_rate_diff": home.offensive_rebound_rate - away.offensive_rebound_rate,
            "drb_rate_diff": home.defensive_rebound_rate - away.defensive_rebound_rate,

            # Recent form
            "recent_net_diff": (
                (home.recent_offensive_rating - home.recent_defensive_rating) -
                (away.recent_offensive_rating - away.recent_defensive_rating)
            ),
            "recent_win_pct_diff": home.recent_win_pct - away.recent_win_pct
        }

    def _rest_features(
        self,
        home_team: str,
        away_team: str,
        game_date: date
    ) -> Dict[str, float]:
        """Calculate rest-related features."""
        home_rest = self._days_of_rest(home_team, game_date)
        away_rest = self._days_of_rest(away_team, game_date)

        return {
            "home_rest_days": min(home_rest, 7),  # Cap at 7
            "away_rest_days": min(away_rest, 7),
            "rest_advantage": min(home_rest, 4) - min(away_rest, 4),
            "home_back_to_back": 1.0 if home_rest == 0 else 0.0,
            "away_back_to_back": 1.0 if away_rest == 0 else 0.0,
            "both_rested": 1.0 if home_rest >= 2 and away_rest >= 2 else 0.0
        }

    def _days_of_rest(self, team: str, game_date: date) -> int:
        """Calculate days since team's last game."""
        # Would query database for team's previous game
        # Returns integer days of rest
        pass

    def _travel_features(
        self,
        home_team: str,
        away_team: str,
        game_date: date
    ) -> Dict[str, float]:
        """Calculate travel-related features."""
        # Get away team's previous game location
        prev_location = self._get_previous_location(away_team, game_date)
        game_location = self.TEAM_LOCATIONS.get(home_team)

        if prev_location and game_location:
            distance = self._haversine_distance(prev_location, game_location)
            timezone_diff = self._timezone_difference(prev_location, game_location)
        else:
            distance = 0
            timezone_diff = 0

        return {
            "away_travel_distance": distance,
            "away_timezone_diff": abs(timezone_diff),
            "west_to_east_travel": 1.0 if timezone_diff > 0 else 0.0,
            "long_distance_travel": 1.0 if distance > 1500 else 0.0
        }

    def _haversine_distance(
        self,
        coord1: Tuple[float, float],
        coord2: Tuple[float, float]
    ) -> float:
        """Calculate distance between two coordinates in miles."""
        lat1, lon1 = np.radians(coord1)
        lat2, lon2 = np.radians(coord2)

        dlat = lat2 - lat1
        dlon = lon2 - lon1

        a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
        c = 2 * np.arcsin(np.sqrt(a))

        # Earth's radius in miles
        r = 3956

        return c * r

    def _timezone_difference(
        self,
        coord1: Tuple[float, float],
        coord2: Tuple[float, float]
    ) -> int:
        """Estimate timezone difference based on longitude."""
        # Rough approximation: 15 degrees longitude per timezone
        lon_diff = coord2[1] - coord1[1]
        return round(lon_diff / 15)

    def _matchup_features(
        self,
        home_team: str,
        away_team: str,
        game_date: date
    ) -> Dict[str, float]:
        """Calculate head-to-head matchup features."""
        # Get historical matchups (last 2 seasons)
        h2h_games = self._get_head_to_head(
            home_team,
            away_team,
            game_date - timedelta(days=730),
            game_date
        )

        if not h2h_games:
            return {
                "h2h_home_win_pct": 0.5,
                "h2h_avg_margin": 0.0,
                "h2h_games_played": 0
            }

        home_wins = sum(1 for g in h2h_games if g.home_team == home_team and g.home_win)
        home_wins += sum(1 for g in h2h_games if g.away_team == home_team and not g.home_win)

        margins = []
        for g in h2h_games:
            if g.home_team == home_team:
                margins.append(g.home_score - g.away_score)
            else:
                margins.append(g.away_score - g.home_score)

        return {
            "h2h_home_win_pct": home_wins / len(h2h_games),
            "h2h_avg_margin": np.mean(margins),
            "h2h_games_played": len(h2h_games)
        }

    def _get_recent_games(self, team: str, as_of_date: date, n_games: int):
        """Get team's most recent games."""
        pass

    def _get_head_to_head(self, team1: str, team2: str, start: date, end: date):
        """Get head-to-head games between two teams."""
        pass

    def _get_previous_location(self, team: str, game_date: date):
        """Get the location of team's previous game."""
        pass

    # Additional helper methods for calculations
    def _calculate_ts_pct(self, stats) -> float:
        pass

    def _calculate_assist_rate(self, stats) -> float:
        pass

    def _calculate_opp_efg(self, stats) -> float:
        pass

    def _calculate_steal_rate(self, stats) -> float:
        pass

    def _calculate_block_rate(self, stats) -> float:
        pass

    def _calculate_drb_rate(self, stats) -> float:
        pass

    def _calculate_3pt_rate(self, stats) -> float:
        pass

    def _recent_off_rating(self, games) -> float:
        pass

    def _recent_def_rating(self, games) -> float:
        pass

    def _recent_win_pct(self, games) -> float:
        pass

Injury Impact Features

Player availability significantly impacts game outcomes. Our system quantifies this impact:

@dataclass
class InjuryImpact:
    """Quantifies the impact of player absences."""
    player_id: str
    player_name: str
    team: str
    status: str  # "Out", "Doubtful", "Questionable", "Probable"
    estimated_impact: float  # Expected point differential impact
    minutes_typically_played: float
    replacement_level_diff: float


class InjuryFeatureGenerator:
    """Generates features related to player injuries and availability."""

    # Status to probability of playing
    STATUS_PROBABILITIES = {
        "Out": 0.0,
        "Doubtful": 0.15,
        "Questionable": 0.50,
        "Probable": 0.85,
        "Available": 1.0
    }

    def calculate_team_injury_impact(
        self,
        team: str,
        injuries: List[InjuryImpact]
    ) -> Dict[str, float]:
        """
        Calculate total injury impact for a team.

        Uses player value estimates and injury status probabilities.
        """
        team_injuries = [inj for inj in injuries if inj.team == team]

        if not team_injuries:
            return {
                "injury_impact": 0.0,
                "star_player_out": 0.0,
                "rotation_players_out": 0
            }

        total_impact = 0.0
        star_out = False
        rotation_out = 0

        for injury in team_injuries:
            prob_out = 1.0 - self.STATUS_PROBABILITIES.get(
                injury.status, 0.5
            )
            expected_impact = injury.estimated_impact * prob_out
            total_impact += expected_impact

            if injury.estimated_impact > 3.0 and prob_out > 0.5:
                star_out = True

            if injury.minutes_typically_played >= 15 and prob_out > 0.5:
                rotation_out += 1

        return {
            "injury_impact": total_impact,
            "star_player_out": 1.0 if star_out else 0.0,
            "rotation_players_out": rotation_out
        }

Elo Rating Implementation

Elo ratings provide a robust, interpretable foundation for game prediction. Originally developed for chess, Elo systems have been successfully adapted for many sports including basketball.

Elo Fundamentals

The Elo system maintains a rating for each team that represents their current strength. After each game, ratings update based on the result compared to the expected outcome. The key parameters are:

  • K-factor: Controls how quickly ratings respond to new results
  • Home Court Advantage: Expected point differential for home team
  • Initial Rating: Starting point for new teams

Implementation

"""
Elo Rating System

Implements Elo ratings optimized for NBA game prediction.
"""

from dataclasses import dataclass, field
from datetime import date
from typing import Dict, List, Optional, Tuple
import math
import logging

logger = logging.getLogger(__name__)


@dataclass
class EloConfig:
    """Configuration parameters for the Elo system."""
    k_factor: float = 20.0  # Rating update magnitude
    home_advantage: float = 100.0  # Home court advantage in Elo points
    initial_rating: float = 1500.0  # Starting rating for new teams
    regression_factor: float = 0.75  # Season-to-season regression to mean
    margin_multiplier: float = True  # Use margin of victory in updates


@dataclass
class EloRating:
    """Represents a team's Elo rating at a point in time."""
    team_id: str
    rating: float
    games_played: int = 0
    last_updated: Optional[date] = None
    rating_history: List[Tuple[date, float]] = field(default_factory=list)


class EloSystem:
    """
    NBA Elo Rating System.

    Maintains ratings for all teams and provides win probability
    and point spread predictions.
    """

    # Elo point to point spread conversion
    # Approximately 25 Elo points = 1 point spread
    ELO_TO_SPREAD_FACTOR = 25.0

    def __init__(self, config: Optional[EloConfig] = None):
        self.config = config or EloConfig()
        self.ratings: Dict[str, EloRating] = {}
        self._initialize_ratings()

    def _initialize_ratings(self):
        """Initialize ratings for all NBA teams."""
        nba_teams = [
            "ATL", "BOS", "BKN", "CHA", "CHI", "CLE", "DAL", "DEN",
            "DET", "GSW", "HOU", "IND", "LAC", "LAL", "MEM", "MIA",
            "MIL", "MIN", "NOP", "NYK", "OKC", "ORL", "PHI", "PHX",
            "POR", "SAC", "SAS", "TOR", "UTA", "WAS"
        ]

        for team in nba_teams:
            self.ratings[team] = EloRating(
                team_id=team,
                rating=self.config.initial_rating
            )

    def get_rating(self, team: str) -> float:
        """Get current rating for a team."""
        if team not in self.ratings:
            logger.warning(f"Unknown team {team}, using initial rating")
            return self.config.initial_rating
        return self.ratings[team].rating

    def expected_score(
        self,
        home_team: str,
        away_team: str,
        neutral_site: bool = False
    ) -> float:
        """
        Calculate expected score (win probability) for home team.

        Uses the standard Elo formula:
        E = 1 / (1 + 10^((R_away - R_home - HCA) / 400))
        """
        home_rating = self.get_rating(home_team)
        away_rating = self.get_rating(away_team)

        home_advantage = 0 if neutral_site else self.config.home_advantage

        rating_diff = home_rating + home_advantage - away_rating
        expected = 1.0 / (1.0 + math.pow(10, -rating_diff / 400))

        return expected

    def predict_spread(
        self,
        home_team: str,
        away_team: str,
        neutral_site: bool = False
    ) -> float:
        """
        Predict point spread (positive = home favored).

        Converts Elo difference to expected point margin.
        """
        home_rating = self.get_rating(home_team)
        away_rating = self.get_rating(away_team)

        home_advantage = 0 if neutral_site else self.config.home_advantage

        rating_diff = home_rating + home_advantage - away_rating
        spread = rating_diff / self.ELO_TO_SPREAD_FACTOR

        return spread

    def update_ratings(
        self,
        home_team: str,
        away_team: str,
        home_score: int,
        away_score: int,
        game_date: date,
        neutral_site: bool = False
    ) -> Tuple[float, float]:
        """
        Update ratings based on game result.

        Returns the rating changes for (home, away).
        """
        # Calculate expected outcome
        expected_home = self.expected_score(home_team, away_team, neutral_site)

        # Actual outcome (1 = home win, 0 = away win)
        actual_home = 1.0 if home_score > away_score else 0.0

        # Calculate K-factor adjustment based on margin of victory
        if self.config.margin_multiplier:
            margin = abs(home_score - away_score)
            k_multiplier = self._margin_of_victory_multiplier(
                margin,
                home_score > away_score,
                expected_home
            )
        else:
            k_multiplier = 1.0

        k = self.config.k_factor * k_multiplier

        # Update ratings
        home_change = k * (actual_home - expected_home)
        away_change = k * (expected_home - actual_home)

        self._apply_update(home_team, home_change, game_date)
        self._apply_update(away_team, away_change, game_date)

        return home_change, away_change

    def _margin_of_victory_multiplier(
        self,
        margin: int,
        home_won: bool,
        expected_home: float
    ) -> float:
        """
        Calculate multiplier based on margin of victory.

        Uses FiveThirtyEight's formula to prevent rating inflation
        from blowout wins by favorites.
        """
        # Expected margin based on win probability
        if home_won:
            expected_margin = (expected_home - 0.5) * 20  # Rough estimate
        else:
            expected_margin = (0.5 - expected_home) * 20

        # Multiplier that increases with margin but decreases
        # the benefit of blowouts by favorites
        multiplier = math.log(margin + 1) * (2.2 / (1 + 0.001 * max(0, margin - expected_margin)))

        return max(0.5, min(multiplier, 3.0))  # Bound between 0.5 and 3.0

    def _apply_update(
        self,
        team: str,
        change: float,
        game_date: date
    ):
        """Apply rating change to a team."""
        if team not in self.ratings:
            self.ratings[team] = EloRating(
                team_id=team,
                rating=self.config.initial_rating
            )

        rating = self.ratings[team]
        rating.rating += change
        rating.games_played += 1
        rating.last_updated = game_date
        rating.rating_history.append((game_date, rating.rating))

    def apply_season_regression(self):
        """
        Regress ratings toward the mean at season start.

        This accounts for roster changes and uncertainty
        about team strength at the beginning of a new season.
        """
        mean_rating = sum(r.rating for r in self.ratings.values()) / len(self.ratings)

        for rating in self.ratings.values():
            regression = (mean_rating - rating.rating) * (1 - self.config.regression_factor)
            rating.rating += regression
            rating.games_played = 0

        logger.info(f"Applied season regression, mean rating: {mean_rating:.1f}")

    def calibrate(
        self,
        historical_games: List[Dict],
        parameter_ranges: Optional[Dict] = None
    ) -> EloConfig:
        """
        Calibrate Elo parameters using historical data.

        Uses grid search to find optimal k-factor and home advantage.
        """
        if parameter_ranges is None:
            parameter_ranges = {
                "k_factor": [15, 20, 25, 30],
                "home_advantage": [75, 100, 125, 150]
            }

        best_config = None
        best_log_loss = float("inf")

        for k in parameter_ranges["k_factor"]:
            for hca in parameter_ranges["home_advantage"]:
                config = EloConfig(k_factor=k, home_advantage=hca)
                log_loss = self._evaluate_config(config, historical_games)

                if log_loss < best_log_loss:
                    best_log_loss = log_loss
                    best_config = config

        logger.info(
            f"Best config: k={best_config.k_factor}, "
            f"HCA={best_config.home_advantage}, "
            f"log_loss={best_log_loss:.4f}"
        )

        return best_config

    def _evaluate_config(
        self,
        config: EloConfig,
        games: List[Dict]
    ) -> float:
        """Evaluate an Elo configuration using log loss."""
        # Create temporary system with this config
        temp_system = EloSystem(config)

        total_log_loss = 0.0
        n_games = 0

        for game in games:
            expected = temp_system.expected_score(
                game["home_team"],
                game["away_team"]
            )

            actual = 1.0 if game["home_score"] > game["away_score"] else 0.0

            # Log loss calculation (with small epsilon to avoid log(0))
            epsilon = 1e-10
            expected = max(epsilon, min(1 - epsilon, expected))
            log_loss = -(
                actual * math.log(expected) +
                (1 - actual) * math.log(1 - expected)
            )
            total_log_loss += log_loss
            n_games += 1

            # Update ratings
            temp_system.update_ratings(
                game["home_team"],
                game["away_team"],
                game["home_score"],
                game["away_score"],
                game["date"]
            )

        return total_log_loss / n_games if n_games > 0 else float("inf")

    def get_all_ratings(self) -> List[Dict]:
        """Get current ratings for all teams, sorted by rating."""
        ratings_list = [
            {
                "team": team,
                "rating": rating.rating,
                "games_played": rating.games_played,
                "last_updated": rating.last_updated.isoformat() if rating.last_updated else None
            }
            for team, rating in self.ratings.items()
        ]

        return sorted(ratings_list, key=lambda x: x["rating"], reverse=True)

Elo Extensions

Our implementation includes several extensions beyond basic Elo:

  1. Margin of Victory: Adjusts updates based on game margin while preventing rating inflation from blowouts
  2. Season Regression: Accounts for offseason roster changes
  3. Calibration: Optimizes parameters using historical data

Machine Learning Models

While Elo provides a solid baseline, machine learning models can capture complex patterns and utilize additional features. Our system implements multiple models combined in an ensemble.

Model Selection

We implement three complementary models:

  1. Logistic Regression: Interpretable baseline with regularization
  2. Gradient Boosting (XGBoost): Captures non-linear patterns and feature interactions
  3. Neural Network: Can learn complex representations (optional, for advanced users)

Implementation

"""
Machine Learning Models for Game Prediction

Implements multiple models combined in an ensemble.
"""

from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any
from datetime import date
import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.calibration import CalibratedClassifierCV
import pickle
import logging

logger = logging.getLogger(__name__)

# XGBoost import with fallback
try:
    import xgboost as xgb
    HAS_XGBOOST = True
except ImportError:
    HAS_XGBOOST = False
    logger.warning("XGBoost not available, using sklearn GradientBoosting")
    from sklearn.ensemble import GradientBoostingClassifier


@dataclass
class ModelConfig:
    """Configuration for ML models."""
    features: List[str]
    target: str = "home_win"
    test_size: float = 0.2
    cv_folds: int = 5
    random_state: int = 42


@dataclass
class Prediction:
    """Represents a game prediction."""
    home_team: str
    away_team: str
    game_date: date
    home_win_prob: float
    predicted_spread: float
    predicted_total: Optional[float] = None
    confidence: float = 0.0
    model_contributions: Optional[Dict[str, float]] = None


class LogisticRegressionModel:
    """Logistic regression model for win probability."""

    def __init__(self, config: ModelConfig):
        self.config = config
        self.scaler = StandardScaler()
        self.model = LogisticRegression(
            C=1.0,
            max_iter=1000,
            random_state=config.random_state
        )
        self.is_fitted = False

    def fit(self, X: np.ndarray, y: np.ndarray):
        """Fit the model."""
        X_scaled = self.scaler.fit_transform(X)
        self.model.fit(X_scaled, y)
        self.is_fitted = True

        # Log feature importances
        if hasattr(self.model, 'coef_'):
            importances = list(zip(self.config.features, self.model.coef_[0]))
            importances.sort(key=lambda x: abs(x[1]), reverse=True)
            logger.info("Top features: " + str(importances[:5]))

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        """Predict win probabilities."""
        if not self.is_fitted:
            raise ValueError("Model not fitted")
        X_scaled = self.scaler.transform(X)
        return self.model.predict_proba(X_scaled)[:, 1]

    def evaluate(self, X: np.ndarray, y: np.ndarray) -> Dict[str, float]:
        """Evaluate model performance."""
        X_scaled = self.scaler.transform(X)

        # Cross-validation with time series split
        tscv = TimeSeriesSplit(n_splits=self.config.cv_folds)

        scores = cross_val_score(
            self.model, X_scaled, y,
            cv=tscv, scoring='neg_log_loss'
        )

        return {
            "mean_log_loss": -scores.mean(),
            "std_log_loss": scores.std(),
            "accuracy": self.model.score(X_scaled, y)
        }


class GradientBoostingModel:
    """Gradient boosting model for win probability."""

    def __init__(self, config: ModelConfig):
        self.config = config
        self.scaler = StandardScaler()

        if HAS_XGBOOST:
            self.model = xgb.XGBClassifier(
                n_estimators=100,
                max_depth=4,
                learning_rate=0.1,
                subsample=0.8,
                colsample_bytree=0.8,
                random_state=config.random_state,
                use_label_encoder=False,
                eval_metric='logloss'
            )
        else:
            self.model = GradientBoostingClassifier(
                n_estimators=100,
                max_depth=4,
                learning_rate=0.1,
                random_state=config.random_state
            )

        self.is_fitted = False

    def fit(self, X: np.ndarray, y: np.ndarray):
        """Fit the model."""
        X_scaled = self.scaler.fit_transform(X)
        self.model.fit(X_scaled, y)
        self.is_fitted = True

        # Log feature importances
        if hasattr(self.model, 'feature_importances_'):
            importances = list(zip(
                self.config.features,
                self.model.feature_importances_
            ))
            importances.sort(key=lambda x: x[1], reverse=True)
            logger.info("Top features: " + str(importances[:5]))

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        """Predict win probabilities."""
        if not self.is_fitted:
            raise ValueError("Model not fitted")
        X_scaled = self.scaler.transform(X)
        return self.model.predict_proba(X_scaled)[:, 1]

    def evaluate(self, X: np.ndarray, y: np.ndarray) -> Dict[str, float]:
        """Evaluate model performance."""
        X_scaled = self.scaler.transform(X)

        tscv = TimeSeriesSplit(n_splits=self.config.cv_folds)
        scores = cross_val_score(
            self.model, X_scaled, y,
            cv=tscv, scoring='neg_log_loss'
        )

        return {
            "mean_log_loss": -scores.mean(),
            "std_log_loss": scores.std(),
            "accuracy": self.model.score(X_scaled, y)
        }


class SpreadModel:
    """Ridge regression model for point spread prediction."""

    def __init__(self, config: ModelConfig):
        self.config = config
        self.scaler = StandardScaler()
        self.model = Ridge(alpha=1.0, random_state=config.random_state)
        self.is_fitted = False

    def fit(self, X: np.ndarray, y: np.ndarray):
        """Fit the model. y should be point differential (home - away)."""
        X_scaled = self.scaler.fit_transform(X)
        self.model.fit(X_scaled, y)
        self.is_fitted = True

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Predict point spread."""
        if not self.is_fitted:
            raise ValueError("Model not fitted")
        X_scaled = self.scaler.transform(X)
        return self.model.predict(X_scaled)

    def evaluate(self, X: np.ndarray, y: np.ndarray) -> Dict[str, float]:
        """Evaluate model performance."""
        predictions = self.predict(X)

        mae = np.mean(np.abs(predictions - y))
        rmse = np.sqrt(np.mean((predictions - y) ** 2))

        # Against the spread accuracy
        correct = np.sum((predictions > 0) == (y > 0))
        ats_accuracy = correct / len(y)

        return {
            "mae": mae,
            "rmse": rmse,
            "ats_accuracy": ats_accuracy
        }


class EnsembleModel:
    """
    Ensemble combining multiple models.

    Uses weighted averaging of predictions from:
    - Elo rating system
    - Logistic regression
    - Gradient boosting
    """

    def __init__(
        self,
        elo_system,
        feature_engineer,
        config: ModelConfig,
        weights: Optional[Dict[str, float]] = None
    ):
        self.elo_system = elo_system
        self.feature_engineer = feature_engineer
        self.config = config

        # Default weights based on typical performance
        self.weights = weights or {
            "elo": 0.30,
            "logistic": 0.30,
            "xgboost": 0.40
        }

        self.logistic_model = LogisticRegressionModel(config)
        self.xgboost_model = GradientBoostingModel(config)
        self.spread_model = SpreadModel(config)

        self.is_fitted = False

    def fit(
        self,
        games: List[Dict],
        features_matrix: np.ndarray,
        win_labels: np.ndarray,
        spread_labels: np.ndarray
    ):
        """Fit all models in the ensemble."""
        logger.info(f"Training ensemble on {len(games)} games")

        # Train individual models
        self.logistic_model.fit(features_matrix, win_labels)
        self.xgboost_model.fit(features_matrix, win_labels)
        self.spread_model.fit(features_matrix, spread_labels)

        self.is_fitted = True

        # Evaluate and potentially adjust weights
        self._calibrate_weights(features_matrix, win_labels)

    def _calibrate_weights(
        self,
        X: np.ndarray,
        y: np.ndarray
    ):
        """Calibrate ensemble weights using validation performance."""
        # Get individual model performance
        logistic_eval = self.logistic_model.evaluate(X, y)
        xgboost_eval = self.xgboost_model.evaluate(X, y)

        # Could implement more sophisticated weight optimization here
        # For now, log the performance
        logger.info(f"Logistic log loss: {logistic_eval['mean_log_loss']:.4f}")
        logger.info(f"XGBoost log loss: {xgboost_eval['mean_log_loss']:.4f}")

    def predict(
        self,
        home_team: str,
        away_team: str,
        game_date: date
    ) -> Prediction:
        """Generate prediction for a game."""
        if not self.is_fitted:
            raise ValueError("Ensemble not fitted")

        # Get features
        features = self.feature_engineer.compute_game_features(
            home_team, away_team, game_date
        )
        X = np.array([[features[f] for f in self.config.features]])

        # Get individual predictions
        elo_prob = self.elo_system.expected_score(home_team, away_team)
        logistic_prob = self.logistic_model.predict_proba(X)[0]
        xgboost_prob = self.xgboost_model.predict_proba(X)[0]

        # Ensemble probability
        ensemble_prob = (
            self.weights["elo"] * elo_prob +
            self.weights["logistic"] * logistic_prob +
            self.weights["xgboost"] * xgboost_prob
        )

        # Spread prediction
        elo_spread = self.elo_system.predict_spread(home_team, away_team)
        ml_spread = self.spread_model.predict(X)[0]

        # Combine spreads (weight ML model more)
        ensemble_spread = 0.4 * elo_spread + 0.6 * ml_spread

        # Calculate confidence based on model agreement
        probs = [elo_prob, logistic_prob, xgboost_prob]
        confidence = 1.0 - np.std(probs) * 2  # Higher agreement = higher confidence
        confidence = max(0.0, min(1.0, confidence))

        return Prediction(
            home_team=home_team,
            away_team=away_team,
            game_date=game_date,
            home_win_prob=ensemble_prob,
            predicted_spread=ensemble_spread,
            confidence=confidence,
            model_contributions={
                "elo": elo_prob,
                "logistic": logistic_prob,
                "xgboost": xgboost_prob
            }
        )

    def save(self, path: str):
        """Save the ensemble to disk."""
        with open(path, 'wb') as f:
            pickle.dump({
                'logistic': self.logistic_model,
                'xgboost': self.xgboost_model,
                'spread': self.spread_model,
                'weights': self.weights,
                'config': self.config
            }, f)
        logger.info(f"Ensemble saved to {path}")

    def load(self, path: str):
        """Load the ensemble from disk."""
        with open(path, 'rb') as f:
            data = pickle.load(f)

        self.logistic_model = data['logistic']
        self.xgboost_model = data['xgboost']
        self.spread_model = data['spread']
        self.weights = data['weights']
        self.config = data['config']
        self.is_fitted = True

        logger.info(f"Ensemble loaded from {path}")

Real-Time Data Integration

Production prediction systems require timely data to generate accurate predictions. This section covers integrating live data feeds.

Data Source Integration

"""
Real-Time Data Integration

Connects to live data sources for current statistics and injury reports.
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, date
from typing import Dict, List, Optional, Any
import asyncio
import aiohttp
import logging

logger = logging.getLogger(__name__)


@dataclass
class DataSourceConfig:
    """Configuration for a data source."""
    name: str
    base_url: str
    api_key: Optional[str] = None
    rate_limit: int = 60  # Requests per minute
    timeout: int = 30  # Seconds


class DataSourceAdapter(ABC):
    """Abstract base class for data source adapters."""

    @abstractmethod
    async def fetch_games(self, date: date) -> List[Dict]:
        """Fetch games for a specific date."""
        pass

    @abstractmethod
    async def fetch_team_stats(self, team_id: str) -> Dict:
        """Fetch current stats for a team."""
        pass

    @abstractmethod
    async def fetch_injuries(self) -> List[Dict]:
        """Fetch current injury report."""
        pass


class NBAApiAdapter(DataSourceAdapter):
    """Adapter for NBA official API."""

    def __init__(self, config: DataSourceConfig):
        self.config = config
        self._session: Optional[aiohttp.ClientSession] = None
        self._rate_limiter = asyncio.Semaphore(config.rate_limit)

    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create aiohttp session."""
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=self.config.timeout)
            )
        return self._session

    async def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
        """Make rate-limited request to API."""
        async with self._rate_limiter:
            session = await self._get_session()
            url = f"{self.config.base_url}/{endpoint}"

            headers = {}
            if self.config.api_key:
                headers["Authorization"] = f"Bearer {self.config.api_key}"

            try:
                async with session.get(url, params=params, headers=headers) as resp:
                    resp.raise_for_status()
                    return await resp.json()
            except aiohttp.ClientError as e:
                logger.error(f"API request failed: {e}")
                raise

    async def fetch_games(self, game_date: date) -> List[Dict]:
        """Fetch games for a specific date."""
        data = await self._make_request(
            "scoreboard",
            {"gameDate": game_date.strftime("%Y-%m-%d")}
        )

        return [
            {
                "game_id": g["gameId"],
                "home_team": g["homeTeam"]["teamTricode"],
                "away_team": g["awayTeam"]["teamTricode"],
                "home_score": g.get("homeTeam", {}).get("score"),
                "away_score": g.get("awayTeam", {}).get("score"),
                "status": g["gameStatus"]
            }
            for g in data.get("scoreboard", {}).get("games", [])
        ]

    async def fetch_team_stats(self, team_id: str) -> Dict:
        """Fetch current stats for a team."""
        data = await self._make_request(
            f"teams/{team_id}/stats"
        )
        return data

    async def fetch_injuries(self) -> List[Dict]:
        """Fetch current injury report."""
        data = await self._make_request("players/injuries")
        return data.get("injuries", [])

    async def close(self):
        """Close the session."""
        if self._session and not self._session.closed:
            await self._session.close()


class RealTimeDataManager:
    """
    Manages real-time data collection and caching.

    Coordinates multiple data sources and maintains a cache
    of recent data to minimize API calls.
    """

    def __init__(self):
        self.adapters: Dict[str, DataSourceAdapter] = {}
        self._cache: Dict[str, Any] = {}
        self._cache_timestamps: Dict[str, datetime] = {}
        self._cache_ttl = 300  # 5 minutes

    def register_adapter(self, name: str, adapter: DataSourceAdapter):
        """Register a data source adapter."""
        self.adapters[name] = adapter
        logger.info(f"Registered adapter: {name}")

    async def get_todays_games(self) -> List[Dict]:
        """Get today's games with caching."""
        cache_key = f"games_{date.today().isoformat()}"

        if self._is_cache_valid(cache_key):
            return self._cache[cache_key]

        # Try primary adapter, fall back to others
        for name, adapter in self.adapters.items():
            try:
                games = await adapter.fetch_games(date.today())
                self._update_cache(cache_key, games)
                return games
            except Exception as e:
                logger.warning(f"Adapter {name} failed: {e}")
                continue

        raise RuntimeError("All data sources failed")

    async def get_current_injuries(self) -> List[Dict]:
        """Get current injury report with caching."""
        cache_key = "injuries_current"

        if self._is_cache_valid(cache_key):
            return self._cache[cache_key]

        for name, adapter in self.adapters.items():
            try:
                injuries = await adapter.fetch_injuries()
                self._update_cache(cache_key, injuries)
                return injuries
            except Exception as e:
                logger.warning(f"Adapter {name} failed: {e}")
                continue

        raise RuntimeError("All data sources failed")

    def _is_cache_valid(self, key: str) -> bool:
        """Check if cached data is still valid."""
        if key not in self._cache:
            return False

        timestamp = self._cache_timestamps.get(key)
        if timestamp is None:
            return False

        age = (datetime.now() - timestamp).total_seconds()
        return age < self._cache_ttl

    def _update_cache(self, key: str, data: Any):
        """Update cache with new data."""
        self._cache[key] = data
        self._cache_timestamps[key] = datetime.now()

    async def close(self):
        """Close all adapters."""
        for adapter in self.adapters.values():
            if hasattr(adapter, 'close'):
                await adapter.close()

Prediction API Design

The API layer exposes prediction functionality to external consumers. We use FastAPI for its performance, automatic documentation, and type validation.

API Implementation

"""
Prediction API

FastAPI-based REST API for game predictions.
"""

from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from datetime import date, datetime
from typing import List, Optional, Dict
import logging

logger = logging.getLogger(__name__)


# Pydantic models for request/response validation
class PredictionRequest(BaseModel):
    """Request model for single game prediction."""
    home_team: str = Field(..., description="Home team abbreviation (e.g., 'LAL')")
    away_team: str = Field(..., description="Away team abbreviation (e.g., 'BOS')")
    game_date: date = Field(default_factory=date.today, description="Game date")

    class Config:
        json_schema_extra = {
            "example": {
                "home_team": "LAL",
                "away_team": "BOS",
                "game_date": "2024-01-15"
            }
        }


class PredictionResponse(BaseModel):
    """Response model for game prediction."""
    home_team: str
    away_team: str
    game_date: date
    home_win_probability: float = Field(..., ge=0, le=1)
    away_win_probability: float = Field(..., ge=0, le=1)
    predicted_spread: float
    predicted_total: Optional[float] = None
    confidence: float = Field(..., ge=0, le=1)
    model_breakdown: Optional[Dict[str, float]] = None
    generated_at: datetime


class TeamRating(BaseModel):
    """Team Elo rating."""
    team: str
    rating: float
    rank: int
    games_played: int


class HealthResponse(BaseModel):
    """Health check response."""
    status: str
    version: str
    model_version: str
    last_update: datetime


# Create FastAPI app
app = FastAPI(
    title="NBA Game Prediction API",
    description="Production-ready API for NBA game predictions",
    version="1.0.0"
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# Dependency for getting prediction system
def get_prediction_system():
    """Dependency injection for prediction system."""
    # In production, this would return a configured prediction system
    # For now, return a placeholder
    from prediction_system import PredictionSystem
    return PredictionSystem()


@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Check API health status."""
    return HealthResponse(
        status="healthy",
        version="1.0.0",
        model_version="2024.01",
        last_update=datetime.now()
    )


@app.post("/predict", response_model=PredictionResponse)
async def predict_game(
    request: PredictionRequest,
    system = Depends(get_prediction_system)
):
    """
    Generate prediction for a single game.

    Returns win probabilities, predicted spread, and confidence level.
    """
    try:
        prediction = system.predict(
            home_team=request.home_team,
            away_team=request.away_team,
            game_date=request.game_date
        )

        return PredictionResponse(
            home_team=prediction.home_team,
            away_team=prediction.away_team,
            game_date=prediction.game_date,
            home_win_probability=prediction.home_win_prob,
            away_win_probability=1 - prediction.home_win_prob,
            predicted_spread=prediction.predicted_spread,
            predicted_total=prediction.predicted_total,
            confidence=prediction.confidence,
            model_breakdown=prediction.model_contributions,
            generated_at=datetime.now()
        )

    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        logger.error(f"Prediction failed: {e}")
        raise HTTPException(status_code=500, detail="Prediction failed")


@app.get("/predict/today", response_model=List[PredictionResponse])
async def predict_todays_games(
    system = Depends(get_prediction_system)
):
    """
    Generate predictions for all of today's games.

    Automatically fetches the current schedule and returns predictions.
    """
    try:
        predictions = system.predict_todays_games()

        return [
            PredictionResponse(
                home_team=p.home_team,
                away_team=p.away_team,
                game_date=p.game_date,
                home_win_probability=p.home_win_prob,
                away_win_probability=1 - p.home_win_prob,
                predicted_spread=p.predicted_spread,
                confidence=p.confidence,
                model_breakdown=p.model_contributions,
                generated_at=datetime.now()
            )
            for p in predictions
        ]

    except Exception as e:
        logger.error(f"Bulk prediction failed: {e}")
        raise HTTPException(status_code=500, detail="Prediction failed")


@app.get("/ratings", response_model=List[TeamRating])
async def get_ratings(
    system = Depends(get_prediction_system)
):
    """
    Get current Elo ratings for all teams.

    Returns teams sorted by rating (highest first).
    """
    ratings = system.elo_system.get_all_ratings()

    return [
        TeamRating(
            team=r["team"],
            rating=r["rating"],
            rank=i + 1,
            games_played=r["games_played"]
        )
        for i, r in enumerate(ratings)
    ]


@app.get("/ratings/{team}")
async def get_team_rating(
    team: str,
    system = Depends(get_prediction_system)
):
    """Get rating for a specific team."""
    rating = system.elo_system.get_rating(team.upper())

    if rating is None:
        raise HTTPException(status_code=404, detail=f"Team {team} not found")

    return {"team": team.upper(), "rating": rating}


# Error handlers
@app.exception_handler(ValueError)
async def value_error_handler(request, exc):
    return HTTPException(status_code=400, detail=str(exc))

API Documentation

FastAPI automatically generates OpenAPI documentation. Access it at /docs for interactive Swagger UI or /redoc for ReDoc format.


Performance Tracking and Monitoring

Continuous monitoring ensures the prediction system maintains accuracy over time. Implement comprehensive tracking of predictions and outcomes.

Metrics Collection

"""
Performance Tracking and Monitoring

Tracks prediction accuracy and system health metrics.
"""

from dataclasses import dataclass, field
from datetime import date, datetime
from typing import Dict, List, Optional
import numpy as np
from collections import defaultdict
import logging

logger = logging.getLogger(__name__)


@dataclass
class PredictionRecord:
    """Record of a prediction and its outcome."""
    game_id: str
    prediction_time: datetime
    home_team: str
    away_team: str
    home_win_prob: float
    predicted_spread: float
    actual_home_win: Optional[bool] = None
    actual_margin: Optional[int] = None
    resolved: bool = False


@dataclass
class PerformanceMetrics:
    """Aggregated performance metrics."""
    period_start: date
    period_end: date
    total_predictions: int
    resolved_predictions: int

    # Win probability metrics
    log_loss: float
    brier_score: float
    accuracy: float

    # Spread metrics
    spread_mae: float
    spread_rmse: float
    ats_accuracy: float

    # Calibration
    calibration_error: float


class PerformanceTracker:
    """
    Tracks and analyzes prediction performance.

    Maintains a history of predictions and calculates
    various accuracy metrics over time.
    """

    def __init__(self):
        self.predictions: Dict[str, PredictionRecord] = {}
        self.metrics_history: List[PerformanceMetrics] = []

    def record_prediction(
        self,
        game_id: str,
        home_team: str,
        away_team: str,
        home_win_prob: float,
        predicted_spread: float
    ):
        """Record a new prediction."""
        record = PredictionRecord(
            game_id=game_id,
            prediction_time=datetime.now(),
            home_team=home_team,
            away_team=away_team,
            home_win_prob=home_win_prob,
            predicted_spread=predicted_spread
        )

        self.predictions[game_id] = record
        logger.debug(f"Recorded prediction for {game_id}")

    def resolve_prediction(
        self,
        game_id: str,
        home_score: int,
        away_score: int
    ):
        """Update prediction with actual outcome."""
        if game_id not in self.predictions:
            logger.warning(f"No prediction found for {game_id}")
            return

        record = self.predictions[game_id]
        record.actual_home_win = home_score > away_score
        record.actual_margin = home_score - away_score
        record.resolved = True

        logger.debug(f"Resolved prediction for {game_id}")

    def calculate_metrics(
        self,
        start_date: Optional[date] = None,
        end_date: Optional[date] = None
    ) -> PerformanceMetrics:
        """Calculate performance metrics for a period."""
        # Filter predictions by date
        resolved = [
            p for p in self.predictions.values()
            if p.resolved and (start_date is None or p.prediction_time.date() >= start_date)
            and (end_date is None or p.prediction_time.date() <= end_date)
        ]

        if not resolved:
            raise ValueError("No resolved predictions in period")

        # Extract arrays
        probs = np.array([p.home_win_prob for p in resolved])
        actuals = np.array([1.0 if p.actual_home_win else 0.0 for p in resolved])
        predicted_spreads = np.array([p.predicted_spread for p in resolved])
        actual_margins = np.array([p.actual_margin for p in resolved])

        # Win probability metrics
        log_loss = self._calculate_log_loss(probs, actuals)
        brier_score = self._calculate_brier_score(probs, actuals)
        accuracy = np.mean((probs > 0.5) == actuals)

        # Spread metrics
        spread_mae = np.mean(np.abs(predicted_spreads - actual_margins))
        spread_rmse = np.sqrt(np.mean((predicted_spreads - actual_margins) ** 2))
        ats_accuracy = np.mean((predicted_spreads > 0) == (actual_margins > 0))

        # Calibration
        calibration_error = self._calculate_calibration_error(probs, actuals)

        metrics = PerformanceMetrics(
            period_start=start_date or min(p.prediction_time.date() for p in resolved),
            period_end=end_date or max(p.prediction_time.date() for p in resolved),
            total_predictions=len(self.predictions),
            resolved_predictions=len(resolved),
            log_loss=log_loss,
            brier_score=brier_score,
            accuracy=accuracy,
            spread_mae=spread_mae,
            spread_rmse=spread_rmse,
            ats_accuracy=ats_accuracy,
            calibration_error=calibration_error
        )

        self.metrics_history.append(metrics)
        return metrics

    def _calculate_log_loss(
        self,
        probs: np.ndarray,
        actuals: np.ndarray
    ) -> float:
        """Calculate log loss (cross-entropy)."""
        epsilon = 1e-10
        probs = np.clip(probs, epsilon, 1 - epsilon)

        log_loss = -np.mean(
            actuals * np.log(probs) +
            (1 - actuals) * np.log(1 - probs)
        )

        return log_loss

    def _calculate_brier_score(
        self,
        probs: np.ndarray,
        actuals: np.ndarray
    ) -> float:
        """Calculate Brier score."""
        return np.mean((probs - actuals) ** 2)

    def _calculate_calibration_error(
        self,
        probs: np.ndarray,
        actuals: np.ndarray,
        n_bins: int = 10
    ) -> float:
        """
        Calculate expected calibration error.

        Measures how well probability predictions match actual outcomes.
        """
        bin_edges = np.linspace(0, 1, n_bins + 1)
        calibration_error = 0.0

        for i in range(n_bins):
            mask = (probs >= bin_edges[i]) & (probs < bin_edges[i + 1])
            if np.sum(mask) == 0:
                continue

            bin_prob = np.mean(probs[mask])
            bin_actual = np.mean(actuals[mask])
            bin_weight = np.sum(mask) / len(probs)

            calibration_error += bin_weight * abs(bin_prob - bin_actual)

        return calibration_error

    def get_performance_trend(
        self,
        metric: str = "log_loss",
        window: int = 100
    ) -> List[float]:
        """Calculate rolling metric over recent predictions."""
        resolved = sorted(
            [p for p in self.predictions.values() if p.resolved],
            key=lambda x: x.prediction_time
        )

        if len(resolved) < window:
            return []

        trend = []
        for i in range(window, len(resolved) + 1):
            window_preds = resolved[i - window:i]

            probs = np.array([p.home_win_prob for p in window_preds])
            actuals = np.array([1.0 if p.actual_home_win else 0.0 for p in window_preds])

            if metric == "log_loss":
                value = self._calculate_log_loss(probs, actuals)
            elif metric == "accuracy":
                value = np.mean((probs > 0.5) == actuals)
            else:
                raise ValueError(f"Unknown metric: {metric}")

            trend.append(value)

        return trend

    def generate_report(self) -> Dict:
        """Generate comprehensive performance report."""
        if not self.metrics_history:
            self.calculate_metrics()

        latest = self.metrics_history[-1]

        return {
            "summary": {
                "total_predictions": latest.total_predictions,
                "resolved_predictions": latest.resolved_predictions,
                "period": f"{latest.period_start} to {latest.period_end}"
            },
            "win_probability": {
                "log_loss": round(latest.log_loss, 4),
                "brier_score": round(latest.brier_score, 4),
                "accuracy": f"{latest.accuracy * 100:.1f}%"
            },
            "spread": {
                "mae": round(latest.spread_mae, 2),
                "rmse": round(latest.spread_rmse, 2),
                "ats_accuracy": f"{latest.ats_accuracy * 100:.1f}%"
            },
            "calibration": {
                "expected_calibration_error": round(latest.calibration_error, 4)
            }
        }

Alerting

Set up alerts for significant performance degradation:

class AlertManager:
    """Manages performance alerts."""

    def __init__(self, tracker: PerformanceTracker):
        self.tracker = tracker
        self.thresholds = {
            "log_loss": 0.70,  # Alert if log loss exceeds this
            "accuracy": 0.55,  # Alert if accuracy drops below this
            "calibration_error": 0.10  # Alert if calibration error exceeds this
        }

    def check_alerts(self) -> List[Dict]:
        """Check for any metric threshold violations."""
        alerts = []

        try:
            metrics = self.tracker.calculate_metrics()
        except ValueError:
            return alerts

        if metrics.log_loss > self.thresholds["log_loss"]:
            alerts.append({
                "severity": "warning",
                "metric": "log_loss",
                "value": metrics.log_loss,
                "threshold": self.thresholds["log_loss"],
                "message": f"Log loss {metrics.log_loss:.4f} exceeds threshold"
            })

        if metrics.accuracy < self.thresholds["accuracy"]:
            alerts.append({
                "severity": "warning",
                "metric": "accuracy",
                "value": metrics.accuracy,
                "threshold": self.thresholds["accuracy"],
                "message": f"Accuracy {metrics.accuracy:.1%} below threshold"
            })

        if metrics.calibration_error > self.thresholds["calibration_error"]:
            alerts.append({
                "severity": "warning",
                "metric": "calibration_error",
                "value": metrics.calibration_error,
                "threshold": self.thresholds["calibration_error"],
                "message": f"Calibration error {metrics.calibration_error:.4f} exceeds threshold"
            })

        return alerts

Continuous Model Improvement

A production prediction system requires ongoing maintenance and improvement. This section covers strategies for keeping the system accurate over time.

Scheduled Retraining

"""
Model Retraining Pipeline

Automates periodic model updates with new data.
"""

from datetime import date, timedelta
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class RetrainingPipeline:
    """
    Manages scheduled model retraining.

    Implements:
    - Daily Elo updates
    - Weekly feature recalculation
    - Monthly full model retraining
    """

    def __init__(
        self,
        prediction_system,
        data_pipeline,
        feature_engineer
    ):
        self.prediction_system = prediction_system
        self.data_pipeline = data_pipeline
        self.feature_engineer = feature_engineer
        self.last_elo_update: Optional[date] = None
        self.last_feature_update: Optional[date] = None
        self.last_full_retrain: Optional[date] = None

    def run_daily_updates(self):
        """Run daily update tasks."""
        logger.info("Running daily updates")

        # Update Elo ratings with yesterday's results
        yesterday = date.today() - timedelta(days=1)
        games = self.data_pipeline.collect_games(yesterday, yesterday)

        for game in games:
            if game.is_completed:
                self.prediction_system.elo_system.update_ratings(
                    game.home_team,
                    game.away_team,
                    game.home_score,
                    game.away_score,
                    game.date
                )

        self.last_elo_update = date.today()
        logger.info(f"Updated Elo ratings with {len(games)} games")

    def run_weekly_updates(self):
        """Run weekly update tasks."""
        logger.info("Running weekly updates")

        # Recalculate team statistics
        # Update feature cache
        # Check for data quality issues

        self.last_feature_update = date.today()

    def run_monthly_retrain(self):
        """Run full model retraining."""
        logger.info("Running monthly model retraining")

        # Collect all data from current season
        season_start = self._get_season_start()
        games = self.data_pipeline.collect_games(season_start, date.today())

        # Generate features for all games
        features_matrix, labels = self._prepare_training_data(games)

        # Retrain ensemble
        self.prediction_system.ensemble.fit(
            games,
            features_matrix,
            labels["win"],
            labels["spread"]
        )

        # Evaluate and log performance
        metrics = self.prediction_system.tracker.calculate_metrics()
        logger.info(f"Retrained model - Log loss: {metrics.log_loss:.4f}")

        self.last_full_retrain = date.today()

    def _get_season_start(self) -> date:
        """Get the start date of the current NBA season."""
        today = date.today()
        year = today.year if today.month >= 10 else today.year - 1
        return date(year, 10, 1)

    def _prepare_training_data(self, games):
        """Prepare feature matrix and labels from games."""
        features_list = []
        win_labels = []
        spread_labels = []

        for game in games:
            if not game.is_completed:
                continue

            features = self.feature_engineer.compute_game_features(
                game.home_team,
                game.away_team,
                game.date
            )

            features_list.append(features)
            win_labels.append(1 if game.home_win else 0)
            spread_labels.append(game.home_score - game.away_score)

        import numpy as np
        feature_names = list(features_list[0].keys()) if features_list else []
        features_matrix = np.array([
            [f[name] for name in feature_names]
            for f in features_list
        ])

        return features_matrix, {
            "win": np.array(win_labels),
            "spread": np.array(spread_labels)
        }


class ModelVersionManager:
    """Manages model versions and rollback capability."""

    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        self.versions: List[Dict] = []

    def save_version(
        self,
        model,
        version_name: str,
        metrics: Dict
    ):
        """Save a model version with metadata."""
        import os

        version_path = os.path.join(
            self.storage_path,
            f"model_{version_name}.pkl"
        )

        model.save(version_path)

        self.versions.append({
            "name": version_name,
            "path": version_path,
            "created_at": datetime.now().isoformat(),
            "metrics": metrics
        })

        logger.info(f"Saved model version: {version_name}")

    def rollback(self, version_name: str):
        """Rollback to a previous model version."""
        version = next(
            (v for v in self.versions if v["name"] == version_name),
            None
        )

        if version is None:
            raise ValueError(f"Version {version_name} not found")

        logger.info(f"Rolling back to version: {version_name}")
        return version["path"]

    def get_best_version(self, metric: str = "log_loss") -> str:
        """Get the version with best performance on a metric."""
        if not self.versions:
            raise ValueError("No versions available")

        # Lower is better for log_loss, higher for accuracy
        reverse = metric in ["accuracy", "ats_accuracy"]

        best = sorted(
            self.versions,
            key=lambda v: v["metrics"].get(metric, float("inf")),
            reverse=reverse
        )[0]

        return best["name"]

Complete System Integration

The following brings all components together into a unified prediction system.

"""
Complete Prediction System

Integrates all components into a production-ready system.
"""

from datetime import date
from typing import List, Optional
import logging

# Import all components
from elo import EloSystem, EloConfig
from features import FeatureEngineer
from models import EnsembleModel, ModelConfig, Prediction

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PredictionSystem:
    """
    Complete NBA Game Prediction System.

    Integrates:
    - Data pipeline
    - Feature engineering
    - Elo ratings
    - Machine learning models
    - Performance tracking
    """

    # Features used by ML models
    FEATURE_LIST = [
        "net_rating_diff",
        "offensive_rating_diff",
        "defensive_rating_diff",
        "efg_diff",
        "pace_diff",
        "turnover_diff",
        "orb_rate_diff",
        "recent_net_diff",
        "rest_advantage",
        "home_back_to_back",
        "away_back_to_back",
        "away_travel_distance",
        "h2h_home_win_pct",
        "injury_impact_diff",
        "home_court_advantage"
    ]

    def __init__(self, config_path: Optional[str] = None):
        """Initialize the prediction system."""
        logger.info("Initializing Prediction System")

        # Initialize components
        self.data_pipeline = DataPipeline()
        self.feature_engineer = FeatureEngineer(self.data_pipeline)

        # Initialize Elo system
        elo_config = EloConfig(
            k_factor=20,
            home_advantage=100,
            initial_rating=1500
        )
        self.elo_system = EloSystem(elo_config)

        # Initialize ML models
        model_config = ModelConfig(features=self.FEATURE_LIST)
        self.ensemble = EnsembleModel(
            self.elo_system,
            self.feature_engineer,
            model_config
        )

        # Initialize tracker
        self.tracker = PerformanceTracker()

        # Load saved models if available
        if config_path:
            self._load_configuration(config_path)

    def train(
        self,
        start_date: date,
        end_date: date
    ):
        """
        Train the system on historical data.

        This should be run before making predictions.
        """
        logger.info(f"Training on data from {start_date} to {end_date}")

        # Collect historical games
        games = self.data_pipeline.collect_games(start_date, end_date)
        logger.info(f"Collected {len(games)} games")

        # Update Elo ratings chronologically
        for game in sorted(games, key=lambda g: g.date):
            if game.is_completed:
                self.elo_system.update_ratings(
                    game.home_team,
                    game.away_team,
                    game.home_score,
                    game.away_score,
                    game.date
                )

        # Prepare training data for ML models
        features_list = []
        win_labels = []
        spread_labels = []

        for game in games:
            if not game.is_completed:
                continue

            try:
                features = self.feature_engineer.compute_game_features(
                    game.home_team,
                    game.away_team,
                    game.date
                )
                features_list.append([features[f] for f in self.FEATURE_LIST])
                win_labels.append(1 if game.home_win else 0)
                spread_labels.append(game.home_score - game.away_score)
            except Exception as e:
                logger.warning(f"Failed to compute features for {game.game_id}: {e}")

        import numpy as np
        features_matrix = np.array(features_list)
        win_labels = np.array(win_labels)
        spread_labels = np.array(spread_labels)

        # Train ensemble
        self.ensemble.fit(games, features_matrix, win_labels, spread_labels)

        logger.info("Training complete")

    def predict(
        self,
        home_team: str,
        away_team: str,
        game_date: Optional[date] = None
    ) -> Prediction:
        """
        Generate prediction for a game.

        Args:
            home_team: Home team abbreviation
            away_team: Away team abbreviation
            game_date: Game date (defaults to today)

        Returns:
            Prediction object with probabilities and spread
        """
        if game_date is None:
            game_date = date.today()

        prediction = self.ensemble.predict(home_team, away_team, game_date)

        # Record prediction for tracking
        game_id = f"{game_date.isoformat()}_{home_team}_{away_team}"
        self.tracker.record_prediction(
            game_id=game_id,
            home_team=home_team,
            away_team=away_team,
            home_win_prob=prediction.home_win_prob,
            predicted_spread=prediction.predicted_spread
        )

        return prediction

    def predict_todays_games(self) -> List[Prediction]:
        """Generate predictions for all of today's games."""
        games = self.data_pipeline.collect_games(date.today(), date.today())

        predictions = []
        for game in games:
            if not game.is_completed:
                pred = self.predict(game.home_team, game.away_team, game.date)
                predictions.append(pred)

        return predictions

    def update_with_results(self, game_date: date):
        """
        Update system with completed game results.

        Updates Elo ratings and resolves predictions.
        """
        games = self.data_pipeline.collect_games(game_date, game_date)

        for game in games:
            if game.is_completed:
                # Update Elo
                self.elo_system.update_ratings(
                    game.home_team,
                    game.away_team,
                    game.home_score,
                    game.away_score,
                    game.date
                )

                # Resolve prediction
                game_id = f"{game.date.isoformat()}_{game.home_team}_{game.away_team}"
                self.tracker.resolve_prediction(
                    game_id,
                    game.home_score,
                    game.away_score
                )

        logger.info(f"Updated with {len(games)} results from {game_date}")

    def get_performance_report(self) -> dict:
        """Get current performance metrics."""
        return self.tracker.generate_report()

    def _load_configuration(self, config_path: str):
        """Load system configuration from file."""
        pass


# Main entry point
if __name__ == "__main__":
    # Example usage
    system = PredictionSystem()

    # Train on historical data
    from datetime import timedelta
    train_end = date.today() - timedelta(days=1)
    train_start = train_end - timedelta(days=365)

    system.train(train_start, train_end)

    # Make a prediction
    prediction = system.predict("LAL", "BOS")

    print(f"\nPrediction: {prediction.away_team} @ {prediction.home_team}")
    print(f"Home win probability: {prediction.home_win_prob:.1%}")
    print(f"Predicted spread: {prediction.predicted_spread:+.1f}")
    print(f"Confidence: {prediction.confidence:.1%}")

Deployment Considerations

Docker Configuration

# Dockerfile
FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]

Environment Configuration

# config.py
import os
from pydantic import BaseSettings


class Settings(BaseSettings):
    """Application settings loaded from environment."""

    # API settings
    api_host: str = "0.0.0.0"
    api_port: int = 8000
    debug: bool = False

    # Database
    database_url: str = "sqlite:///predictions.db"

    # Model settings
    model_path: str = "./models"
    retrain_schedule: str = "0 4 * * *"  # 4 AM daily

    # Data sources
    nba_api_key: str = ""

    class Config:
        env_file = ".env"


settings = Settings()

Project Exercises

Exercise 1: Extend Feature Engineering

Add player-level features to the prediction system:

  1. Implement a PlayerImpactCalculator class that estimates each player's contribution to team performance
  2. Add features for: - Minutes-weighted plus/minus differential - Key player availability (top 3 players by minutes) - Position group strength comparisons

Exercise 2: Implement Live Game Predictions

Extend the system to update predictions during games:

  1. Create a LiveGamePredictor class
  2. Implement score-based probability updates using the log5 method
  3. Add time remaining as a feature
  4. Build a WebSocket endpoint for real-time updates

Exercise 3: Add Betting Market Integration

Compare predictions against betting market lines:

  1. Implement a MarketDataCollector for odds feeds
  2. Calculate implied probabilities from odds
  3. Track closing line value (CLV) for predictions
  4. Add market-relative metrics to the performance tracker

Exercise 4: Build a Backtesting Framework

Create comprehensive backtesting capabilities:

  1. Implement walk-forward validation
  2. Calculate profitability metrics assuming various betting strategies
  3. Generate detailed performance reports by: - Team - Month/season - Game context (home/away, rest, etc.)

Summary

This capstone project has guided you through building a production-ready NBA game prediction system. The key components include:

  1. Robust Data Pipeline: Handles data collection, validation, and storage with proper error handling and caching.

  2. Comprehensive Feature Engineering: Transforms raw statistics into predictive signals including team performance, rest, travel, injuries, and matchup history.

  3. Elo Rating System: Provides an interpretable baseline that captures team strength dynamics throughout the season.

  4. Machine Learning Ensemble: Combines multiple models to capture different aspects of game prediction, improving overall accuracy.

  5. REST API: Exposes predictions through a well-documented, production-ready interface.

  6. Performance Monitoring: Tracks prediction accuracy over time and alerts when performance degrades.

  7. Continuous Improvement: Implements automated retraining and model versioning for ongoing maintenance.

The modular architecture allows each component to be improved independently while maintaining system stability. As you continue developing the system, focus on:

  • Expanding feature coverage with additional data sources
  • Experimenting with new model architectures
  • Improving calibration for edge cases
  • Building robust deployment and monitoring infrastructure

Remember that prediction accuracy in sports is inherently limited by the random nature of game outcomes. A well-calibrated system that honestly represents uncertainty is more valuable than one that overconfidently makes predictions. Focus on generating accurate probabilities rather than maximizing the number of correct binary predictions.