Building a game prediction system represents one of the most challenging and rewarding applications of basketball analytics. This capstone project guides you through creating a production-ready system that combines statistical modeling, machine...
In This Chapter
- Project Overview
- System Architecture
- Data Pipeline Design
- Feature Engineering
- Elo Rating Implementation
- Machine Learning Models
- Real-Time Data Integration
- Prediction API Design
- Performance Tracking and Monitoring
- Continuous Model Improvement
- Complete System Integration
- Deployment Considerations
- Project Exercises
- Summary
Capstone Project 3: Develop a Game Prediction System
Project Overview
Building a game prediction system represents one of the most challenging and rewarding applications of basketball analytics. This capstone project guides you through creating a production-ready system that combines statistical modeling, machine learning, and real-time data processing to generate accurate game predictions.
The system we build will incorporate multiple prediction methodologies, from classic Elo ratings to modern machine learning approaches, creating an ensemble that leverages the strengths of each method. By the end of this project, you will have a fully functional prediction API capable of generating win probabilities, point spread predictions, and confidence intervals for any NBA matchup.
Learning Objectives
Upon completing this capstone project, you will be able to:
- Design and implement a scalable prediction system architecture
- Build robust data pipelines for basketball statistics
- Engineer predictive features from raw game data
- Implement and calibrate Elo rating systems
- Train and evaluate machine learning models for game prediction
- Integrate real-time data sources for live predictions
- Deploy a prediction API with proper monitoring
- Establish processes for continuous model improvement
Project Scope
Our prediction system will handle three primary prediction types:
- Win Probability: The likelihood each team wins the game
- Point Spread: The expected margin of victory
- Total Points: The expected combined score
The system architecture supports both pre-game predictions and in-game probability updates, though this capstone focuses primarily on pre-game prediction with foundations for live prediction expansion.
System Architecture
A production prediction system requires careful architectural planning to ensure reliability, scalability, and maintainability. Our architecture follows a modular design pattern that separates concerns and enables independent scaling of components.
High-Level Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Data Sources Layer │
├─────────────────────────────────────────────────────────────────┤
│ NBA API │ Injury Reports │ Schedule │ Historical Data │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Data Pipeline Layer │
├─────────────────────────────────────────────────────────────────┤
│ Collectors │ Validators │ Transformers │ Storage │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Feature Engineering Layer │
├─────────────────────────────────────────────────────────────────┤
│ Team Stats │ Player Stats │ Contextual │ Feature Store │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Model Layer │
├─────────────────────────────────────────────────────────────────┤
│ Elo System │ ML Models │ Ensemble │ Model Registry │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ API Layer │
├─────────────────────────────────────────────────────────────────┤
│ REST API │ Authentication │ Rate Limiting │ Caching │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Monitoring Layer │
├─────────────────────────────────────────────────────────────────┤
│ Metrics │ Logging │ Alerting │ Performance Dashboard │
└─────────────────────────────────────────────────────────────────┘
Component Responsibilities
Data Sources Layer: External data providers including official NBA statistics, injury reporting services, and historical databases. This layer abstracts the complexity of multiple data formats and delivery mechanisms.
Data Pipeline Layer: Responsible for data ingestion, validation, transformation, and storage. Implements retry logic, error handling, and data quality checks to ensure downstream components receive clean, consistent data.
Feature Engineering Layer: Transforms raw data into predictive features. Maintains a feature store for efficient retrieval and ensures feature consistency between training and inference.
Model Layer: Houses all prediction models including Elo ratings and machine learning models. Implements the ensemble logic that combines individual model outputs into final predictions.
API Layer: Exposes prediction functionality through a REST API. Handles authentication, rate limiting, request validation, and response formatting.
Monitoring Layer: Tracks system health, prediction accuracy, and model performance. Generates alerts when metrics fall outside acceptable ranges.
Technology Stack
For this implementation, we use the following technologies:
- Python 3.10+: Primary programming language
- FastAPI: API framework with automatic documentation
- SQLite/PostgreSQL: Data storage (SQLite for development, PostgreSQL for production)
- Pandas/NumPy: Data manipulation and numerical computing
- Scikit-learn/XGBoost: Machine learning models
- Redis: Caching layer (optional for production)
- Docker: Containerization for deployment
Data Pipeline Design
The data pipeline forms the foundation of our prediction system. Reliable predictions require reliable data, making pipeline robustness a critical concern.
Data Sources
Our system integrates data from multiple sources:
Official Statistics: Game results, box scores, team statistics, and player statistics. These form the core input for our models.
Injury Reports: Current player availability and injury status. Critical for adjusting predictions based on lineup changes.
Schedule Information: Game dates, times, locations, and travel requirements. Enables calculation of rest days and travel fatigue.
Historical Data: Multiple seasons of historical games for model training and Elo initialization.
Pipeline Architecture
"""
Data Pipeline Module
This module handles all data collection, validation, and storage
for the prediction system.
"""
from dataclasses import dataclass
from datetime import datetime, date
from typing import Optional, List, Dict, Any
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class DataSource(Enum):
"""Enumeration of supported data sources."""
NBA_API = "nba_api"
INJURY_REPORT = "injury_report"
SCHEDULE = "schedule"
HISTORICAL = "historical"
@dataclass
class GameData:
"""Represents a single game's data."""
game_id: str
date: date
home_team: str
away_team: str
home_score: Optional[int] = None
away_score: Optional[int] = None
venue: Optional[str] = None
attendance: Optional[int] = None
@property
def is_completed(self) -> bool:
"""Check if the game has been completed."""
return self.home_score is not None and self.away_score is not None
@property
def home_win(self) -> Optional[bool]:
"""Return True if home team won, None if game not completed."""
if not self.is_completed:
return None
return self.home_score > self.away_score
@dataclass
class TeamStats:
"""Aggregated team statistics."""
team_id: str
games_played: int
wins: int
losses: int
points_per_game: float
points_allowed_per_game: float
offensive_rating: float
defensive_rating: float
net_rating: float
pace: float
effective_fg_pct: float
turnover_pct: float
offensive_rebound_pct: float
free_throw_rate: float
@property
def win_pct(self) -> float:
"""Calculate win percentage."""
if self.games_played == 0:
return 0.5
return self.wins / self.games_played
class DataValidator:
"""Validates incoming data for consistency and completeness."""
@staticmethod
def validate_game(game: GameData) -> List[str]:
"""
Validate a game record.
Returns a list of validation errors (empty if valid).
"""
errors = []
if not game.game_id:
errors.append("Missing game_id")
if not game.home_team or not game.away_team:
errors.append("Missing team information")
if game.home_team == game.away_team:
errors.append("Home and away teams cannot be the same")
if game.is_completed:
if game.home_score < 0 or game.away_score < 0:
errors.append("Scores cannot be negative")
if game.home_score == game.away_score:
errors.append("NBA games cannot end in a tie")
return errors
@staticmethod
def validate_team_stats(stats: TeamStats) -> List[str]:
"""Validate team statistics."""
errors = []
if stats.games_played < 0:
errors.append("Games played cannot be negative")
if stats.wins + stats.losses != stats.games_played:
errors.append("Wins + losses must equal games played")
if not 0 <= stats.effective_fg_pct <= 1:
errors.append("Effective FG% must be between 0 and 1")
return errors
class DataPipeline:
"""
Main data pipeline class.
Handles data collection, validation, transformation, and storage.
"""
def __init__(self, storage_backend: str = "sqlite"):
self.storage_backend = storage_backend
self.validator = DataValidator()
self._cache: Dict[str, Any] = {}
def collect_games(
self,
start_date: date,
end_date: date,
source: DataSource = DataSource.NBA_API
) -> List[GameData]:
"""
Collect game data for a date range.
In production, this would call external APIs.
"""
logger.info(f"Collecting games from {start_date} to {end_date}")
# Implementation would connect to actual data source
# For demonstration, we show the interface pattern
games = self._fetch_from_source(source, start_date, end_date)
# Validate all collected games
valid_games = []
for game in games:
errors = self.validator.validate_game(game)
if errors:
logger.warning(f"Invalid game {game.game_id}: {errors}")
else:
valid_games.append(game)
return valid_games
def _fetch_from_source(
self,
source: DataSource,
start_date: date,
end_date: date
) -> List[GameData]:
"""Fetch data from specified source."""
# Implementation varies by source
# This is a placeholder for the actual API calls
return []
def get_team_stats(
self,
team_id: str,
as_of_date: Optional[date] = None
) -> TeamStats:
"""
Get team statistics, optionally as of a specific date.
Uses caching to improve performance.
"""
cache_key = f"team_stats_{team_id}_{as_of_date}"
if cache_key in self._cache:
return self._cache[cache_key]
stats = self._calculate_team_stats(team_id, as_of_date)
self._cache[cache_key] = stats
return stats
def _calculate_team_stats(
self,
team_id: str,
as_of_date: Optional[date]
) -> TeamStats:
"""Calculate team statistics from game data."""
# Implementation would aggregate from stored games
pass
Data Quality Monitoring
Data quality issues can silently degrade prediction accuracy. Implement comprehensive monitoring:
class DataQualityMonitor:
"""Monitors data quality metrics over time."""
def __init__(self):
self.metrics_history: List[Dict[str, Any]] = []
def check_completeness(self, games: List[GameData]) -> float:
"""
Calculate data completeness score.
Returns percentage of games with all required fields.
"""
if not games:
return 0.0
complete_count = sum(
1 for g in games
if g.game_id and g.home_team and g.away_team and g.date
)
return complete_count / len(games)
def check_timeliness(self, games: List[GameData]) -> Dict[str, Any]:
"""Check if data is being collected in a timely manner."""
if not games:
return {"status": "no_data", "lag_hours": None}
latest_game = max(games, key=lambda g: g.date)
lag = datetime.now().date() - latest_game.date
return {
"status": "ok" if lag.days <= 1 else "delayed",
"lag_days": lag.days,
"latest_game_date": latest_game.date.isoformat()
}
def generate_report(self) -> Dict[str, Any]:
"""Generate a comprehensive data quality report."""
return {
"timestamp": datetime.now().isoformat(),
"completeness": self.metrics_history[-1].get("completeness", 0),
"timeliness": self.metrics_history[-1].get("timeliness", {}),
"total_records": self.metrics_history[-1].get("total_records", 0)
}
Feature Engineering
Feature engineering transforms raw data into predictive signals. The quality of features directly impacts model performance, making this one of the most important components of our system.
Feature Categories
Our system uses five main feature categories:
- Team Performance Features: Offensive and defensive ratings, pace, shooting efficiency
- Matchup Features: Historical head-to-head performance, style matchup indicators
- Rest and Schedule Features: Days of rest, back-to-back games, travel distance
- Injury Features: Impact of missing players on team performance
- Venue Features: Home court advantage, altitude, travel factors
Team Performance Features
"""
Feature Engineering Module
Transforms raw basketball data into predictive features.
"""
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from datetime import date, timedelta
import numpy as np
@dataclass
class TeamPerformanceFeatures:
"""Features derived from team performance metrics."""
# Offensive metrics
offensive_rating: float # Points per 100 possessions
effective_fg_pct: float # (FG + 0.5 * 3P) / FGA
true_shooting_pct: float # PTS / (2 * (FGA + 0.44 * FTA))
assist_rate: float # AST / FGM
turnover_rate: float # TOV / possessions
offensive_rebound_rate: float # ORB / (ORB + opp DRB)
# Defensive metrics
defensive_rating: float # Points allowed per 100 possessions
opponent_efg_pct: float # Opponent effective FG%
steal_rate: float
block_rate: float
defensive_rebound_rate: float
# Pace and style
pace: float # Possessions per 48 minutes
three_point_rate: float # 3PA / FGA
free_throw_rate: float # FTA / FGA
# Recent form (last 10 games)
recent_offensive_rating: float
recent_defensive_rating: float
recent_win_pct: float
class FeatureEngineer:
"""
Main feature engineering class.
Computes all features needed for game prediction.
"""
# Team location coordinates for travel calculation
TEAM_LOCATIONS: Dict[str, Tuple[float, float]] = {
"ATL": (33.7573, -84.3963),
"BOS": (42.3662, -71.0621),
"BKN": (40.6826, -73.9754),
"CHA": (35.2251, -80.8392),
"CHI": (41.8807, -87.6742),
"CLE": (41.4966, -81.6882),
"DAL": (32.7905, -96.8103),
"DEN": (39.7487, -105.0077),
"DET": (42.3410, -83.0553),
"GSW": (37.7680, -122.3879),
"HOU": (29.7508, -95.3621),
"IND": (39.7640, -86.1555),
"LAC": (34.0430, -118.2673),
"LAL": (34.0430, -118.2673),
"MEM": (35.1382, -90.0505),
"MIA": (25.7814, -80.1870),
"MIL": (43.0451, -87.9172),
"MIN": (44.9795, -93.2761),
"NOP": (29.9490, -90.0821),
"NYK": (40.7505, -73.9934),
"OKC": (35.4634, -97.5151),
"ORL": (28.5392, -81.3839),
"PHI": (39.9012, -75.1720),
"PHX": (33.4457, -112.0712),
"POR": (45.5316, -122.6668),
"SAC": (38.5802, -121.4997),
"SAS": (29.4271, -98.4375),
"TOR": (43.6435, -79.3791),
"UTA": (40.7683, -111.9011),
"WAS": (38.8981, -77.0209)
}
# Historical home court advantage (points)
HOME_COURT_ADVANTAGE = 3.2
def __init__(self, data_pipeline):
self.data_pipeline = data_pipeline
def compute_game_features(
self,
home_team: str,
away_team: str,
game_date: date
) -> Dict[str, float]:
"""
Compute all features for a game prediction.
Returns a dictionary of feature names to values.
"""
features = {}
# Team performance features
home_perf = self._get_team_performance(home_team, game_date)
away_perf = self._get_team_performance(away_team, game_date)
features.update(self._performance_diff_features(home_perf, away_perf))
# Rest and schedule features
features.update(self._rest_features(home_team, away_team, game_date))
# Travel features
features.update(self._travel_features(home_team, away_team, game_date))
# Home court advantage
features["home_court_advantage"] = self.HOME_COURT_ADVANTAGE
# Matchup features
features.update(self._matchup_features(home_team, away_team, game_date))
return features
def _get_team_performance(
self,
team: str,
as_of_date: date
) -> TeamPerformanceFeatures:
"""Get team performance features as of a specific date."""
stats = self.data_pipeline.get_team_stats(team, as_of_date)
# Calculate recent form (last 10 games)
recent_games = self._get_recent_games(team, as_of_date, n_games=10)
return TeamPerformanceFeatures(
offensive_rating=stats.offensive_rating,
effective_fg_pct=stats.effective_fg_pct,
true_shooting_pct=self._calculate_ts_pct(stats),
assist_rate=self._calculate_assist_rate(stats),
turnover_rate=stats.turnover_pct,
offensive_rebound_rate=stats.offensive_rebound_pct,
defensive_rating=stats.defensive_rating,
opponent_efg_pct=self._calculate_opp_efg(stats),
steal_rate=self._calculate_steal_rate(stats),
block_rate=self._calculate_block_rate(stats),
defensive_rebound_rate=self._calculate_drb_rate(stats),
pace=stats.pace,
three_point_rate=self._calculate_3pt_rate(stats),
free_throw_rate=stats.free_throw_rate,
recent_offensive_rating=self._recent_off_rating(recent_games),
recent_defensive_rating=self._recent_def_rating(recent_games),
recent_win_pct=self._recent_win_pct(recent_games)
)
def _performance_diff_features(
self,
home: TeamPerformanceFeatures,
away: TeamPerformanceFeatures
) -> Dict[str, float]:
"""Calculate performance differential features."""
return {
# Net rating differential
"net_rating_diff": (
(home.offensive_rating - home.defensive_rating) -
(away.offensive_rating - away.defensive_rating)
),
# Offensive comparison
"offensive_rating_diff": home.offensive_rating - away.offensive_rating,
"efg_diff": home.effective_fg_pct - away.effective_fg_pct,
"ts_pct_diff": home.true_shooting_pct - away.true_shooting_pct,
# Defensive comparison
"defensive_rating_diff": away.defensive_rating - home.defensive_rating,
"opp_efg_diff": away.opponent_efg_pct - home.opponent_efg_pct,
# Pace and style
"pace_diff": home.pace - away.pace,
"avg_pace": (home.pace + away.pace) / 2,
"three_point_rate_diff": home.three_point_rate - away.three_point_rate,
# Turnover differential
"turnover_diff": away.turnover_rate - home.turnover_rate,
# Rebounding differential
"orb_rate_diff": home.offensive_rebound_rate - away.offensive_rebound_rate,
"drb_rate_diff": home.defensive_rebound_rate - away.defensive_rebound_rate,
# Recent form
"recent_net_diff": (
(home.recent_offensive_rating - home.recent_defensive_rating) -
(away.recent_offensive_rating - away.recent_defensive_rating)
),
"recent_win_pct_diff": home.recent_win_pct - away.recent_win_pct
}
def _rest_features(
self,
home_team: str,
away_team: str,
game_date: date
) -> Dict[str, float]:
"""Calculate rest-related features."""
home_rest = self._days_of_rest(home_team, game_date)
away_rest = self._days_of_rest(away_team, game_date)
return {
"home_rest_days": min(home_rest, 7), # Cap at 7
"away_rest_days": min(away_rest, 7),
"rest_advantage": min(home_rest, 4) - min(away_rest, 4),
"home_back_to_back": 1.0 if home_rest == 0 else 0.0,
"away_back_to_back": 1.0 if away_rest == 0 else 0.0,
"both_rested": 1.0 if home_rest >= 2 and away_rest >= 2 else 0.0
}
def _days_of_rest(self, team: str, game_date: date) -> int:
"""Calculate days since team's last game."""
# Would query database for team's previous game
# Returns integer days of rest
pass
def _travel_features(
self,
home_team: str,
away_team: str,
game_date: date
) -> Dict[str, float]:
"""Calculate travel-related features."""
# Get away team's previous game location
prev_location = self._get_previous_location(away_team, game_date)
game_location = self.TEAM_LOCATIONS.get(home_team)
if prev_location and game_location:
distance = self._haversine_distance(prev_location, game_location)
timezone_diff = self._timezone_difference(prev_location, game_location)
else:
distance = 0
timezone_diff = 0
return {
"away_travel_distance": distance,
"away_timezone_diff": abs(timezone_diff),
"west_to_east_travel": 1.0 if timezone_diff > 0 else 0.0,
"long_distance_travel": 1.0 if distance > 1500 else 0.0
}
def _haversine_distance(
self,
coord1: Tuple[float, float],
coord2: Tuple[float, float]
) -> float:
"""Calculate distance between two coordinates in miles."""
lat1, lon1 = np.radians(coord1)
lat2, lon2 = np.radians(coord2)
dlat = lat2 - lat1
dlon = lon2 - lon1
a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
c = 2 * np.arcsin(np.sqrt(a))
# Earth's radius in miles
r = 3956
return c * r
def _timezone_difference(
self,
coord1: Tuple[float, float],
coord2: Tuple[float, float]
) -> int:
"""Estimate timezone difference based on longitude."""
# Rough approximation: 15 degrees longitude per timezone
lon_diff = coord2[1] - coord1[1]
return round(lon_diff / 15)
def _matchup_features(
self,
home_team: str,
away_team: str,
game_date: date
) -> Dict[str, float]:
"""Calculate head-to-head matchup features."""
# Get historical matchups (last 2 seasons)
h2h_games = self._get_head_to_head(
home_team,
away_team,
game_date - timedelta(days=730),
game_date
)
if not h2h_games:
return {
"h2h_home_win_pct": 0.5,
"h2h_avg_margin": 0.0,
"h2h_games_played": 0
}
home_wins = sum(1 for g in h2h_games if g.home_team == home_team and g.home_win)
home_wins += sum(1 for g in h2h_games if g.away_team == home_team and not g.home_win)
margins = []
for g in h2h_games:
if g.home_team == home_team:
margins.append(g.home_score - g.away_score)
else:
margins.append(g.away_score - g.home_score)
return {
"h2h_home_win_pct": home_wins / len(h2h_games),
"h2h_avg_margin": np.mean(margins),
"h2h_games_played": len(h2h_games)
}
def _get_recent_games(self, team: str, as_of_date: date, n_games: int):
"""Get team's most recent games."""
pass
def _get_head_to_head(self, team1: str, team2: str, start: date, end: date):
"""Get head-to-head games between two teams."""
pass
def _get_previous_location(self, team: str, game_date: date):
"""Get the location of team's previous game."""
pass
# Additional helper methods for calculations
def _calculate_ts_pct(self, stats) -> float:
pass
def _calculate_assist_rate(self, stats) -> float:
pass
def _calculate_opp_efg(self, stats) -> float:
pass
def _calculate_steal_rate(self, stats) -> float:
pass
def _calculate_block_rate(self, stats) -> float:
pass
def _calculate_drb_rate(self, stats) -> float:
pass
def _calculate_3pt_rate(self, stats) -> float:
pass
def _recent_off_rating(self, games) -> float:
pass
def _recent_def_rating(self, games) -> float:
pass
def _recent_win_pct(self, games) -> float:
pass
Injury Impact Features
Player availability significantly impacts game outcomes. Our system quantifies this impact:
@dataclass
class InjuryImpact:
"""Quantifies the impact of player absences."""
player_id: str
player_name: str
team: str
status: str # "Out", "Doubtful", "Questionable", "Probable"
estimated_impact: float # Expected point differential impact
minutes_typically_played: float
replacement_level_diff: float
class InjuryFeatureGenerator:
"""Generates features related to player injuries and availability."""
# Status to probability of playing
STATUS_PROBABILITIES = {
"Out": 0.0,
"Doubtful": 0.15,
"Questionable": 0.50,
"Probable": 0.85,
"Available": 1.0
}
def calculate_team_injury_impact(
self,
team: str,
injuries: List[InjuryImpact]
) -> Dict[str, float]:
"""
Calculate total injury impact for a team.
Uses player value estimates and injury status probabilities.
"""
team_injuries = [inj for inj in injuries if inj.team == team]
if not team_injuries:
return {
"injury_impact": 0.0,
"star_player_out": 0.0,
"rotation_players_out": 0
}
total_impact = 0.0
star_out = False
rotation_out = 0
for injury in team_injuries:
prob_out = 1.0 - self.STATUS_PROBABILITIES.get(
injury.status, 0.5
)
expected_impact = injury.estimated_impact * prob_out
total_impact += expected_impact
if injury.estimated_impact > 3.0 and prob_out > 0.5:
star_out = True
if injury.minutes_typically_played >= 15 and prob_out > 0.5:
rotation_out += 1
return {
"injury_impact": total_impact,
"star_player_out": 1.0 if star_out else 0.0,
"rotation_players_out": rotation_out
}
Elo Rating Implementation
Elo ratings provide a robust, interpretable foundation for game prediction. Originally developed for chess, Elo systems have been successfully adapted for many sports including basketball.
Elo Fundamentals
The Elo system maintains a rating for each team that represents their current strength. After each game, ratings update based on the result compared to the expected outcome. The key parameters are:
- K-factor: Controls how quickly ratings respond to new results
- Home Court Advantage: Expected point differential for home team
- Initial Rating: Starting point for new teams
Implementation
"""
Elo Rating System
Implements Elo ratings optimized for NBA game prediction.
"""
from dataclasses import dataclass, field
from datetime import date
from typing import Dict, List, Optional, Tuple
import math
import logging
logger = logging.getLogger(__name__)
@dataclass
class EloConfig:
"""Configuration parameters for the Elo system."""
k_factor: float = 20.0 # Rating update magnitude
home_advantage: float = 100.0 # Home court advantage in Elo points
initial_rating: float = 1500.0 # Starting rating for new teams
regression_factor: float = 0.75 # Season-to-season regression to mean
margin_multiplier: float = True # Use margin of victory in updates
@dataclass
class EloRating:
"""Represents a team's Elo rating at a point in time."""
team_id: str
rating: float
games_played: int = 0
last_updated: Optional[date] = None
rating_history: List[Tuple[date, float]] = field(default_factory=list)
class EloSystem:
"""
NBA Elo Rating System.
Maintains ratings for all teams and provides win probability
and point spread predictions.
"""
# Elo point to point spread conversion
# Approximately 25 Elo points = 1 point spread
ELO_TO_SPREAD_FACTOR = 25.0
def __init__(self, config: Optional[EloConfig] = None):
self.config = config or EloConfig()
self.ratings: Dict[str, EloRating] = {}
self._initialize_ratings()
def _initialize_ratings(self):
"""Initialize ratings for all NBA teams."""
nba_teams = [
"ATL", "BOS", "BKN", "CHA", "CHI", "CLE", "DAL", "DEN",
"DET", "GSW", "HOU", "IND", "LAC", "LAL", "MEM", "MIA",
"MIL", "MIN", "NOP", "NYK", "OKC", "ORL", "PHI", "PHX",
"POR", "SAC", "SAS", "TOR", "UTA", "WAS"
]
for team in nba_teams:
self.ratings[team] = EloRating(
team_id=team,
rating=self.config.initial_rating
)
def get_rating(self, team: str) -> float:
"""Get current rating for a team."""
if team not in self.ratings:
logger.warning(f"Unknown team {team}, using initial rating")
return self.config.initial_rating
return self.ratings[team].rating
def expected_score(
self,
home_team: str,
away_team: str,
neutral_site: bool = False
) -> float:
"""
Calculate expected score (win probability) for home team.
Uses the standard Elo formula:
E = 1 / (1 + 10^((R_away - R_home - HCA) / 400))
"""
home_rating = self.get_rating(home_team)
away_rating = self.get_rating(away_team)
home_advantage = 0 if neutral_site else self.config.home_advantage
rating_diff = home_rating + home_advantage - away_rating
expected = 1.0 / (1.0 + math.pow(10, -rating_diff / 400))
return expected
def predict_spread(
self,
home_team: str,
away_team: str,
neutral_site: bool = False
) -> float:
"""
Predict point spread (positive = home favored).
Converts Elo difference to expected point margin.
"""
home_rating = self.get_rating(home_team)
away_rating = self.get_rating(away_team)
home_advantage = 0 if neutral_site else self.config.home_advantage
rating_diff = home_rating + home_advantage - away_rating
spread = rating_diff / self.ELO_TO_SPREAD_FACTOR
return spread
def update_ratings(
self,
home_team: str,
away_team: str,
home_score: int,
away_score: int,
game_date: date,
neutral_site: bool = False
) -> Tuple[float, float]:
"""
Update ratings based on game result.
Returns the rating changes for (home, away).
"""
# Calculate expected outcome
expected_home = self.expected_score(home_team, away_team, neutral_site)
# Actual outcome (1 = home win, 0 = away win)
actual_home = 1.0 if home_score > away_score else 0.0
# Calculate K-factor adjustment based on margin of victory
if self.config.margin_multiplier:
margin = abs(home_score - away_score)
k_multiplier = self._margin_of_victory_multiplier(
margin,
home_score > away_score,
expected_home
)
else:
k_multiplier = 1.0
k = self.config.k_factor * k_multiplier
# Update ratings
home_change = k * (actual_home - expected_home)
away_change = k * (expected_home - actual_home)
self._apply_update(home_team, home_change, game_date)
self._apply_update(away_team, away_change, game_date)
return home_change, away_change
def _margin_of_victory_multiplier(
self,
margin: int,
home_won: bool,
expected_home: float
) -> float:
"""
Calculate multiplier based on margin of victory.
Uses FiveThirtyEight's formula to prevent rating inflation
from blowout wins by favorites.
"""
# Expected margin based on win probability
if home_won:
expected_margin = (expected_home - 0.5) * 20 # Rough estimate
else:
expected_margin = (0.5 - expected_home) * 20
# Multiplier that increases with margin but decreases
# the benefit of blowouts by favorites
multiplier = math.log(margin + 1) * (2.2 / (1 + 0.001 * max(0, margin - expected_margin)))
return max(0.5, min(multiplier, 3.0)) # Bound between 0.5 and 3.0
def _apply_update(
self,
team: str,
change: float,
game_date: date
):
"""Apply rating change to a team."""
if team not in self.ratings:
self.ratings[team] = EloRating(
team_id=team,
rating=self.config.initial_rating
)
rating = self.ratings[team]
rating.rating += change
rating.games_played += 1
rating.last_updated = game_date
rating.rating_history.append((game_date, rating.rating))
def apply_season_regression(self):
"""
Regress ratings toward the mean at season start.
This accounts for roster changes and uncertainty
about team strength at the beginning of a new season.
"""
mean_rating = sum(r.rating for r in self.ratings.values()) / len(self.ratings)
for rating in self.ratings.values():
regression = (mean_rating - rating.rating) * (1 - self.config.regression_factor)
rating.rating += regression
rating.games_played = 0
logger.info(f"Applied season regression, mean rating: {mean_rating:.1f}")
def calibrate(
self,
historical_games: List[Dict],
parameter_ranges: Optional[Dict] = None
) -> EloConfig:
"""
Calibrate Elo parameters using historical data.
Uses grid search to find optimal k-factor and home advantage.
"""
if parameter_ranges is None:
parameter_ranges = {
"k_factor": [15, 20, 25, 30],
"home_advantage": [75, 100, 125, 150]
}
best_config = None
best_log_loss = float("inf")
for k in parameter_ranges["k_factor"]:
for hca in parameter_ranges["home_advantage"]:
config = EloConfig(k_factor=k, home_advantage=hca)
log_loss = self._evaluate_config(config, historical_games)
if log_loss < best_log_loss:
best_log_loss = log_loss
best_config = config
logger.info(
f"Best config: k={best_config.k_factor}, "
f"HCA={best_config.home_advantage}, "
f"log_loss={best_log_loss:.4f}"
)
return best_config
def _evaluate_config(
self,
config: EloConfig,
games: List[Dict]
) -> float:
"""Evaluate an Elo configuration using log loss."""
# Create temporary system with this config
temp_system = EloSystem(config)
total_log_loss = 0.0
n_games = 0
for game in games:
expected = temp_system.expected_score(
game["home_team"],
game["away_team"]
)
actual = 1.0 if game["home_score"] > game["away_score"] else 0.0
# Log loss calculation (with small epsilon to avoid log(0))
epsilon = 1e-10
expected = max(epsilon, min(1 - epsilon, expected))
log_loss = -(
actual * math.log(expected) +
(1 - actual) * math.log(1 - expected)
)
total_log_loss += log_loss
n_games += 1
# Update ratings
temp_system.update_ratings(
game["home_team"],
game["away_team"],
game["home_score"],
game["away_score"],
game["date"]
)
return total_log_loss / n_games if n_games > 0 else float("inf")
def get_all_ratings(self) -> List[Dict]:
"""Get current ratings for all teams, sorted by rating."""
ratings_list = [
{
"team": team,
"rating": rating.rating,
"games_played": rating.games_played,
"last_updated": rating.last_updated.isoformat() if rating.last_updated else None
}
for team, rating in self.ratings.items()
]
return sorted(ratings_list, key=lambda x: x["rating"], reverse=True)
Elo Extensions
Our implementation includes several extensions beyond basic Elo:
- Margin of Victory: Adjusts updates based on game margin while preventing rating inflation from blowouts
- Season Regression: Accounts for offseason roster changes
- Calibration: Optimizes parameters using historical data
Machine Learning Models
While Elo provides a solid baseline, machine learning models can capture complex patterns and utilize additional features. Our system implements multiple models combined in an ensemble.
Model Selection
We implement three complementary models:
- Logistic Regression: Interpretable baseline with regularization
- Gradient Boosting (XGBoost): Captures non-linear patterns and feature interactions
- Neural Network: Can learn complex representations (optional, for advanced users)
Implementation
"""
Machine Learning Models for Game Prediction
Implements multiple models combined in an ensemble.
"""
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Any
from datetime import date
import numpy as np
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.calibration import CalibratedClassifierCV
import pickle
import logging
logger = logging.getLogger(__name__)
# XGBoost import with fallback
try:
import xgboost as xgb
HAS_XGBOOST = True
except ImportError:
HAS_XGBOOST = False
logger.warning("XGBoost not available, using sklearn GradientBoosting")
from sklearn.ensemble import GradientBoostingClassifier
@dataclass
class ModelConfig:
"""Configuration for ML models."""
features: List[str]
target: str = "home_win"
test_size: float = 0.2
cv_folds: int = 5
random_state: int = 42
@dataclass
class Prediction:
"""Represents a game prediction."""
home_team: str
away_team: str
game_date: date
home_win_prob: float
predicted_spread: float
predicted_total: Optional[float] = None
confidence: float = 0.0
model_contributions: Optional[Dict[str, float]] = None
class LogisticRegressionModel:
"""Logistic regression model for win probability."""
def __init__(self, config: ModelConfig):
self.config = config
self.scaler = StandardScaler()
self.model = LogisticRegression(
C=1.0,
max_iter=1000,
random_state=config.random_state
)
self.is_fitted = False
def fit(self, X: np.ndarray, y: np.ndarray):
"""Fit the model."""
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled, y)
self.is_fitted = True
# Log feature importances
if hasattr(self.model, 'coef_'):
importances = list(zip(self.config.features, self.model.coef_[0]))
importances.sort(key=lambda x: abs(x[1]), reverse=True)
logger.info("Top features: " + str(importances[:5]))
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Predict win probabilities."""
if not self.is_fitted:
raise ValueError("Model not fitted")
X_scaled = self.scaler.transform(X)
return self.model.predict_proba(X_scaled)[:, 1]
def evaluate(self, X: np.ndarray, y: np.ndarray) -> Dict[str, float]:
"""Evaluate model performance."""
X_scaled = self.scaler.transform(X)
# Cross-validation with time series split
tscv = TimeSeriesSplit(n_splits=self.config.cv_folds)
scores = cross_val_score(
self.model, X_scaled, y,
cv=tscv, scoring='neg_log_loss'
)
return {
"mean_log_loss": -scores.mean(),
"std_log_loss": scores.std(),
"accuracy": self.model.score(X_scaled, y)
}
class GradientBoostingModel:
"""Gradient boosting model for win probability."""
def __init__(self, config: ModelConfig):
self.config = config
self.scaler = StandardScaler()
if HAS_XGBOOST:
self.model = xgb.XGBClassifier(
n_estimators=100,
max_depth=4,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=config.random_state,
use_label_encoder=False,
eval_metric='logloss'
)
else:
self.model = GradientBoostingClassifier(
n_estimators=100,
max_depth=4,
learning_rate=0.1,
random_state=config.random_state
)
self.is_fitted = False
def fit(self, X: np.ndarray, y: np.ndarray):
"""Fit the model."""
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled, y)
self.is_fitted = True
# Log feature importances
if hasattr(self.model, 'feature_importances_'):
importances = list(zip(
self.config.features,
self.model.feature_importances_
))
importances.sort(key=lambda x: x[1], reverse=True)
logger.info("Top features: " + str(importances[:5]))
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Predict win probabilities."""
if not self.is_fitted:
raise ValueError("Model not fitted")
X_scaled = self.scaler.transform(X)
return self.model.predict_proba(X_scaled)[:, 1]
def evaluate(self, X: np.ndarray, y: np.ndarray) -> Dict[str, float]:
"""Evaluate model performance."""
X_scaled = self.scaler.transform(X)
tscv = TimeSeriesSplit(n_splits=self.config.cv_folds)
scores = cross_val_score(
self.model, X_scaled, y,
cv=tscv, scoring='neg_log_loss'
)
return {
"mean_log_loss": -scores.mean(),
"std_log_loss": scores.std(),
"accuracy": self.model.score(X_scaled, y)
}
class SpreadModel:
"""Ridge regression model for point spread prediction."""
def __init__(self, config: ModelConfig):
self.config = config
self.scaler = StandardScaler()
self.model = Ridge(alpha=1.0, random_state=config.random_state)
self.is_fitted = False
def fit(self, X: np.ndarray, y: np.ndarray):
"""Fit the model. y should be point differential (home - away)."""
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled, y)
self.is_fitted = True
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict point spread."""
if not self.is_fitted:
raise ValueError("Model not fitted")
X_scaled = self.scaler.transform(X)
return self.model.predict(X_scaled)
def evaluate(self, X: np.ndarray, y: np.ndarray) -> Dict[str, float]:
"""Evaluate model performance."""
predictions = self.predict(X)
mae = np.mean(np.abs(predictions - y))
rmse = np.sqrt(np.mean((predictions - y) ** 2))
# Against the spread accuracy
correct = np.sum((predictions > 0) == (y > 0))
ats_accuracy = correct / len(y)
return {
"mae": mae,
"rmse": rmse,
"ats_accuracy": ats_accuracy
}
class EnsembleModel:
"""
Ensemble combining multiple models.
Uses weighted averaging of predictions from:
- Elo rating system
- Logistic regression
- Gradient boosting
"""
def __init__(
self,
elo_system,
feature_engineer,
config: ModelConfig,
weights: Optional[Dict[str, float]] = None
):
self.elo_system = elo_system
self.feature_engineer = feature_engineer
self.config = config
# Default weights based on typical performance
self.weights = weights or {
"elo": 0.30,
"logistic": 0.30,
"xgboost": 0.40
}
self.logistic_model = LogisticRegressionModel(config)
self.xgboost_model = GradientBoostingModel(config)
self.spread_model = SpreadModel(config)
self.is_fitted = False
def fit(
self,
games: List[Dict],
features_matrix: np.ndarray,
win_labels: np.ndarray,
spread_labels: np.ndarray
):
"""Fit all models in the ensemble."""
logger.info(f"Training ensemble on {len(games)} games")
# Train individual models
self.logistic_model.fit(features_matrix, win_labels)
self.xgboost_model.fit(features_matrix, win_labels)
self.spread_model.fit(features_matrix, spread_labels)
self.is_fitted = True
# Evaluate and potentially adjust weights
self._calibrate_weights(features_matrix, win_labels)
def _calibrate_weights(
self,
X: np.ndarray,
y: np.ndarray
):
"""Calibrate ensemble weights using validation performance."""
# Get individual model performance
logistic_eval = self.logistic_model.evaluate(X, y)
xgboost_eval = self.xgboost_model.evaluate(X, y)
# Could implement more sophisticated weight optimization here
# For now, log the performance
logger.info(f"Logistic log loss: {logistic_eval['mean_log_loss']:.4f}")
logger.info(f"XGBoost log loss: {xgboost_eval['mean_log_loss']:.4f}")
def predict(
self,
home_team: str,
away_team: str,
game_date: date
) -> Prediction:
"""Generate prediction for a game."""
if not self.is_fitted:
raise ValueError("Ensemble not fitted")
# Get features
features = self.feature_engineer.compute_game_features(
home_team, away_team, game_date
)
X = np.array([[features[f] for f in self.config.features]])
# Get individual predictions
elo_prob = self.elo_system.expected_score(home_team, away_team)
logistic_prob = self.logistic_model.predict_proba(X)[0]
xgboost_prob = self.xgboost_model.predict_proba(X)[0]
# Ensemble probability
ensemble_prob = (
self.weights["elo"] * elo_prob +
self.weights["logistic"] * logistic_prob +
self.weights["xgboost"] * xgboost_prob
)
# Spread prediction
elo_spread = self.elo_system.predict_spread(home_team, away_team)
ml_spread = self.spread_model.predict(X)[0]
# Combine spreads (weight ML model more)
ensemble_spread = 0.4 * elo_spread + 0.6 * ml_spread
# Calculate confidence based on model agreement
probs = [elo_prob, logistic_prob, xgboost_prob]
confidence = 1.0 - np.std(probs) * 2 # Higher agreement = higher confidence
confidence = max(0.0, min(1.0, confidence))
return Prediction(
home_team=home_team,
away_team=away_team,
game_date=game_date,
home_win_prob=ensemble_prob,
predicted_spread=ensemble_spread,
confidence=confidence,
model_contributions={
"elo": elo_prob,
"logistic": logistic_prob,
"xgboost": xgboost_prob
}
)
def save(self, path: str):
"""Save the ensemble to disk."""
with open(path, 'wb') as f:
pickle.dump({
'logistic': self.logistic_model,
'xgboost': self.xgboost_model,
'spread': self.spread_model,
'weights': self.weights,
'config': self.config
}, f)
logger.info(f"Ensemble saved to {path}")
def load(self, path: str):
"""Load the ensemble from disk."""
with open(path, 'rb') as f:
data = pickle.load(f)
self.logistic_model = data['logistic']
self.xgboost_model = data['xgboost']
self.spread_model = data['spread']
self.weights = data['weights']
self.config = data['config']
self.is_fitted = True
logger.info(f"Ensemble loaded from {path}")
Real-Time Data Integration
Production prediction systems require timely data to generate accurate predictions. This section covers integrating live data feeds.
Data Source Integration
"""
Real-Time Data Integration
Connects to live data sources for current statistics and injury reports.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, date
from typing import Dict, List, Optional, Any
import asyncio
import aiohttp
import logging
logger = logging.getLogger(__name__)
@dataclass
class DataSourceConfig:
"""Configuration for a data source."""
name: str
base_url: str
api_key: Optional[str] = None
rate_limit: int = 60 # Requests per minute
timeout: int = 30 # Seconds
class DataSourceAdapter(ABC):
"""Abstract base class for data source adapters."""
@abstractmethod
async def fetch_games(self, date: date) -> List[Dict]:
"""Fetch games for a specific date."""
pass
@abstractmethod
async def fetch_team_stats(self, team_id: str) -> Dict:
"""Fetch current stats for a team."""
pass
@abstractmethod
async def fetch_injuries(self) -> List[Dict]:
"""Fetch current injury report."""
pass
class NBAApiAdapter(DataSourceAdapter):
"""Adapter for NBA official API."""
def __init__(self, config: DataSourceConfig):
self.config = config
self._session: Optional[aiohttp.ClientSession] = None
self._rate_limiter = asyncio.Semaphore(config.rate_limit)
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create aiohttp session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
)
return self._session
async def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
"""Make rate-limited request to API."""
async with self._rate_limiter:
session = await self._get_session()
url = f"{self.config.base_url}/{endpoint}"
headers = {}
if self.config.api_key:
headers["Authorization"] = f"Bearer {self.config.api_key}"
try:
async with session.get(url, params=params, headers=headers) as resp:
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
logger.error(f"API request failed: {e}")
raise
async def fetch_games(self, game_date: date) -> List[Dict]:
"""Fetch games for a specific date."""
data = await self._make_request(
"scoreboard",
{"gameDate": game_date.strftime("%Y-%m-%d")}
)
return [
{
"game_id": g["gameId"],
"home_team": g["homeTeam"]["teamTricode"],
"away_team": g["awayTeam"]["teamTricode"],
"home_score": g.get("homeTeam", {}).get("score"),
"away_score": g.get("awayTeam", {}).get("score"),
"status": g["gameStatus"]
}
for g in data.get("scoreboard", {}).get("games", [])
]
async def fetch_team_stats(self, team_id: str) -> Dict:
"""Fetch current stats for a team."""
data = await self._make_request(
f"teams/{team_id}/stats"
)
return data
async def fetch_injuries(self) -> List[Dict]:
"""Fetch current injury report."""
data = await self._make_request("players/injuries")
return data.get("injuries", [])
async def close(self):
"""Close the session."""
if self._session and not self._session.closed:
await self._session.close()
class RealTimeDataManager:
"""
Manages real-time data collection and caching.
Coordinates multiple data sources and maintains a cache
of recent data to minimize API calls.
"""
def __init__(self):
self.adapters: Dict[str, DataSourceAdapter] = {}
self._cache: Dict[str, Any] = {}
self._cache_timestamps: Dict[str, datetime] = {}
self._cache_ttl = 300 # 5 minutes
def register_adapter(self, name: str, adapter: DataSourceAdapter):
"""Register a data source adapter."""
self.adapters[name] = adapter
logger.info(f"Registered adapter: {name}")
async def get_todays_games(self) -> List[Dict]:
"""Get today's games with caching."""
cache_key = f"games_{date.today().isoformat()}"
if self._is_cache_valid(cache_key):
return self._cache[cache_key]
# Try primary adapter, fall back to others
for name, adapter in self.adapters.items():
try:
games = await adapter.fetch_games(date.today())
self._update_cache(cache_key, games)
return games
except Exception as e:
logger.warning(f"Adapter {name} failed: {e}")
continue
raise RuntimeError("All data sources failed")
async def get_current_injuries(self) -> List[Dict]:
"""Get current injury report with caching."""
cache_key = "injuries_current"
if self._is_cache_valid(cache_key):
return self._cache[cache_key]
for name, adapter in self.adapters.items():
try:
injuries = await adapter.fetch_injuries()
self._update_cache(cache_key, injuries)
return injuries
except Exception as e:
logger.warning(f"Adapter {name} failed: {e}")
continue
raise RuntimeError("All data sources failed")
def _is_cache_valid(self, key: str) -> bool:
"""Check if cached data is still valid."""
if key not in self._cache:
return False
timestamp = self._cache_timestamps.get(key)
if timestamp is None:
return False
age = (datetime.now() - timestamp).total_seconds()
return age < self._cache_ttl
def _update_cache(self, key: str, data: Any):
"""Update cache with new data."""
self._cache[key] = data
self._cache_timestamps[key] = datetime.now()
async def close(self):
"""Close all adapters."""
for adapter in self.adapters.values():
if hasattr(adapter, 'close'):
await adapter.close()
Prediction API Design
The API layer exposes prediction functionality to external consumers. We use FastAPI for its performance, automatic documentation, and type validation.
API Implementation
"""
Prediction API
FastAPI-based REST API for game predictions.
"""
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from datetime import date, datetime
from typing import List, Optional, Dict
import logging
logger = logging.getLogger(__name__)
# Pydantic models for request/response validation
class PredictionRequest(BaseModel):
"""Request model for single game prediction."""
home_team: str = Field(..., description="Home team abbreviation (e.g., 'LAL')")
away_team: str = Field(..., description="Away team abbreviation (e.g., 'BOS')")
game_date: date = Field(default_factory=date.today, description="Game date")
class Config:
json_schema_extra = {
"example": {
"home_team": "LAL",
"away_team": "BOS",
"game_date": "2024-01-15"
}
}
class PredictionResponse(BaseModel):
"""Response model for game prediction."""
home_team: str
away_team: str
game_date: date
home_win_probability: float = Field(..., ge=0, le=1)
away_win_probability: float = Field(..., ge=0, le=1)
predicted_spread: float
predicted_total: Optional[float] = None
confidence: float = Field(..., ge=0, le=1)
model_breakdown: Optional[Dict[str, float]] = None
generated_at: datetime
class TeamRating(BaseModel):
"""Team Elo rating."""
team: str
rating: float
rank: int
games_played: int
class HealthResponse(BaseModel):
"""Health check response."""
status: str
version: str
model_version: str
last_update: datetime
# Create FastAPI app
app = FastAPI(
title="NBA Game Prediction API",
description="Production-ready API for NBA game predictions",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Dependency for getting prediction system
def get_prediction_system():
"""Dependency injection for prediction system."""
# In production, this would return a configured prediction system
# For now, return a placeholder
from prediction_system import PredictionSystem
return PredictionSystem()
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Check API health status."""
return HealthResponse(
status="healthy",
version="1.0.0",
model_version="2024.01",
last_update=datetime.now()
)
@app.post("/predict", response_model=PredictionResponse)
async def predict_game(
request: PredictionRequest,
system = Depends(get_prediction_system)
):
"""
Generate prediction for a single game.
Returns win probabilities, predicted spread, and confidence level.
"""
try:
prediction = system.predict(
home_team=request.home_team,
away_team=request.away_team,
game_date=request.game_date
)
return PredictionResponse(
home_team=prediction.home_team,
away_team=prediction.away_team,
game_date=prediction.game_date,
home_win_probability=prediction.home_win_prob,
away_win_probability=1 - prediction.home_win_prob,
predicted_spread=prediction.predicted_spread,
predicted_total=prediction.predicted_total,
confidence=prediction.confidence,
model_breakdown=prediction.model_contributions,
generated_at=datetime.now()
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Prediction failed: {e}")
raise HTTPException(status_code=500, detail="Prediction failed")
@app.get("/predict/today", response_model=List[PredictionResponse])
async def predict_todays_games(
system = Depends(get_prediction_system)
):
"""
Generate predictions for all of today's games.
Automatically fetches the current schedule and returns predictions.
"""
try:
predictions = system.predict_todays_games()
return [
PredictionResponse(
home_team=p.home_team,
away_team=p.away_team,
game_date=p.game_date,
home_win_probability=p.home_win_prob,
away_win_probability=1 - p.home_win_prob,
predicted_spread=p.predicted_spread,
confidence=p.confidence,
model_breakdown=p.model_contributions,
generated_at=datetime.now()
)
for p in predictions
]
except Exception as e:
logger.error(f"Bulk prediction failed: {e}")
raise HTTPException(status_code=500, detail="Prediction failed")
@app.get("/ratings", response_model=List[TeamRating])
async def get_ratings(
system = Depends(get_prediction_system)
):
"""
Get current Elo ratings for all teams.
Returns teams sorted by rating (highest first).
"""
ratings = system.elo_system.get_all_ratings()
return [
TeamRating(
team=r["team"],
rating=r["rating"],
rank=i + 1,
games_played=r["games_played"]
)
for i, r in enumerate(ratings)
]
@app.get("/ratings/{team}")
async def get_team_rating(
team: str,
system = Depends(get_prediction_system)
):
"""Get rating for a specific team."""
rating = system.elo_system.get_rating(team.upper())
if rating is None:
raise HTTPException(status_code=404, detail=f"Team {team} not found")
return {"team": team.upper(), "rating": rating}
# Error handlers
@app.exception_handler(ValueError)
async def value_error_handler(request, exc):
return HTTPException(status_code=400, detail=str(exc))
API Documentation
FastAPI automatically generates OpenAPI documentation. Access it at /docs for interactive Swagger UI or /redoc for ReDoc format.
Performance Tracking and Monitoring
Continuous monitoring ensures the prediction system maintains accuracy over time. Implement comprehensive tracking of predictions and outcomes.
Metrics Collection
"""
Performance Tracking and Monitoring
Tracks prediction accuracy and system health metrics.
"""
from dataclasses import dataclass, field
from datetime import date, datetime
from typing import Dict, List, Optional
import numpy as np
from collections import defaultdict
import logging
logger = logging.getLogger(__name__)
@dataclass
class PredictionRecord:
"""Record of a prediction and its outcome."""
game_id: str
prediction_time: datetime
home_team: str
away_team: str
home_win_prob: float
predicted_spread: float
actual_home_win: Optional[bool] = None
actual_margin: Optional[int] = None
resolved: bool = False
@dataclass
class PerformanceMetrics:
"""Aggregated performance metrics."""
period_start: date
period_end: date
total_predictions: int
resolved_predictions: int
# Win probability metrics
log_loss: float
brier_score: float
accuracy: float
# Spread metrics
spread_mae: float
spread_rmse: float
ats_accuracy: float
# Calibration
calibration_error: float
class PerformanceTracker:
"""
Tracks and analyzes prediction performance.
Maintains a history of predictions and calculates
various accuracy metrics over time.
"""
def __init__(self):
self.predictions: Dict[str, PredictionRecord] = {}
self.metrics_history: List[PerformanceMetrics] = []
def record_prediction(
self,
game_id: str,
home_team: str,
away_team: str,
home_win_prob: float,
predicted_spread: float
):
"""Record a new prediction."""
record = PredictionRecord(
game_id=game_id,
prediction_time=datetime.now(),
home_team=home_team,
away_team=away_team,
home_win_prob=home_win_prob,
predicted_spread=predicted_spread
)
self.predictions[game_id] = record
logger.debug(f"Recorded prediction for {game_id}")
def resolve_prediction(
self,
game_id: str,
home_score: int,
away_score: int
):
"""Update prediction with actual outcome."""
if game_id not in self.predictions:
logger.warning(f"No prediction found for {game_id}")
return
record = self.predictions[game_id]
record.actual_home_win = home_score > away_score
record.actual_margin = home_score - away_score
record.resolved = True
logger.debug(f"Resolved prediction for {game_id}")
def calculate_metrics(
self,
start_date: Optional[date] = None,
end_date: Optional[date] = None
) -> PerformanceMetrics:
"""Calculate performance metrics for a period."""
# Filter predictions by date
resolved = [
p for p in self.predictions.values()
if p.resolved and (start_date is None or p.prediction_time.date() >= start_date)
and (end_date is None or p.prediction_time.date() <= end_date)
]
if not resolved:
raise ValueError("No resolved predictions in period")
# Extract arrays
probs = np.array([p.home_win_prob for p in resolved])
actuals = np.array([1.0 if p.actual_home_win else 0.0 for p in resolved])
predicted_spreads = np.array([p.predicted_spread for p in resolved])
actual_margins = np.array([p.actual_margin for p in resolved])
# Win probability metrics
log_loss = self._calculate_log_loss(probs, actuals)
brier_score = self._calculate_brier_score(probs, actuals)
accuracy = np.mean((probs > 0.5) == actuals)
# Spread metrics
spread_mae = np.mean(np.abs(predicted_spreads - actual_margins))
spread_rmse = np.sqrt(np.mean((predicted_spreads - actual_margins) ** 2))
ats_accuracy = np.mean((predicted_spreads > 0) == (actual_margins > 0))
# Calibration
calibration_error = self._calculate_calibration_error(probs, actuals)
metrics = PerformanceMetrics(
period_start=start_date or min(p.prediction_time.date() for p in resolved),
period_end=end_date or max(p.prediction_time.date() for p in resolved),
total_predictions=len(self.predictions),
resolved_predictions=len(resolved),
log_loss=log_loss,
brier_score=brier_score,
accuracy=accuracy,
spread_mae=spread_mae,
spread_rmse=spread_rmse,
ats_accuracy=ats_accuracy,
calibration_error=calibration_error
)
self.metrics_history.append(metrics)
return metrics
def _calculate_log_loss(
self,
probs: np.ndarray,
actuals: np.ndarray
) -> float:
"""Calculate log loss (cross-entropy)."""
epsilon = 1e-10
probs = np.clip(probs, epsilon, 1 - epsilon)
log_loss = -np.mean(
actuals * np.log(probs) +
(1 - actuals) * np.log(1 - probs)
)
return log_loss
def _calculate_brier_score(
self,
probs: np.ndarray,
actuals: np.ndarray
) -> float:
"""Calculate Brier score."""
return np.mean((probs - actuals) ** 2)
def _calculate_calibration_error(
self,
probs: np.ndarray,
actuals: np.ndarray,
n_bins: int = 10
) -> float:
"""
Calculate expected calibration error.
Measures how well probability predictions match actual outcomes.
"""
bin_edges = np.linspace(0, 1, n_bins + 1)
calibration_error = 0.0
for i in range(n_bins):
mask = (probs >= bin_edges[i]) & (probs < bin_edges[i + 1])
if np.sum(mask) == 0:
continue
bin_prob = np.mean(probs[mask])
bin_actual = np.mean(actuals[mask])
bin_weight = np.sum(mask) / len(probs)
calibration_error += bin_weight * abs(bin_prob - bin_actual)
return calibration_error
def get_performance_trend(
self,
metric: str = "log_loss",
window: int = 100
) -> List[float]:
"""Calculate rolling metric over recent predictions."""
resolved = sorted(
[p for p in self.predictions.values() if p.resolved],
key=lambda x: x.prediction_time
)
if len(resolved) < window:
return []
trend = []
for i in range(window, len(resolved) + 1):
window_preds = resolved[i - window:i]
probs = np.array([p.home_win_prob for p in window_preds])
actuals = np.array([1.0 if p.actual_home_win else 0.0 for p in window_preds])
if metric == "log_loss":
value = self._calculate_log_loss(probs, actuals)
elif metric == "accuracy":
value = np.mean((probs > 0.5) == actuals)
else:
raise ValueError(f"Unknown metric: {metric}")
trend.append(value)
return trend
def generate_report(self) -> Dict:
"""Generate comprehensive performance report."""
if not self.metrics_history:
self.calculate_metrics()
latest = self.metrics_history[-1]
return {
"summary": {
"total_predictions": latest.total_predictions,
"resolved_predictions": latest.resolved_predictions,
"period": f"{latest.period_start} to {latest.period_end}"
},
"win_probability": {
"log_loss": round(latest.log_loss, 4),
"brier_score": round(latest.brier_score, 4),
"accuracy": f"{latest.accuracy * 100:.1f}%"
},
"spread": {
"mae": round(latest.spread_mae, 2),
"rmse": round(latest.spread_rmse, 2),
"ats_accuracy": f"{latest.ats_accuracy * 100:.1f}%"
},
"calibration": {
"expected_calibration_error": round(latest.calibration_error, 4)
}
}
Alerting
Set up alerts for significant performance degradation:
class AlertManager:
"""Manages performance alerts."""
def __init__(self, tracker: PerformanceTracker):
self.tracker = tracker
self.thresholds = {
"log_loss": 0.70, # Alert if log loss exceeds this
"accuracy": 0.55, # Alert if accuracy drops below this
"calibration_error": 0.10 # Alert if calibration error exceeds this
}
def check_alerts(self) -> List[Dict]:
"""Check for any metric threshold violations."""
alerts = []
try:
metrics = self.tracker.calculate_metrics()
except ValueError:
return alerts
if metrics.log_loss > self.thresholds["log_loss"]:
alerts.append({
"severity": "warning",
"metric": "log_loss",
"value": metrics.log_loss,
"threshold": self.thresholds["log_loss"],
"message": f"Log loss {metrics.log_loss:.4f} exceeds threshold"
})
if metrics.accuracy < self.thresholds["accuracy"]:
alerts.append({
"severity": "warning",
"metric": "accuracy",
"value": metrics.accuracy,
"threshold": self.thresholds["accuracy"],
"message": f"Accuracy {metrics.accuracy:.1%} below threshold"
})
if metrics.calibration_error > self.thresholds["calibration_error"]:
alerts.append({
"severity": "warning",
"metric": "calibration_error",
"value": metrics.calibration_error,
"threshold": self.thresholds["calibration_error"],
"message": f"Calibration error {metrics.calibration_error:.4f} exceeds threshold"
})
return alerts
Continuous Model Improvement
A production prediction system requires ongoing maintenance and improvement. This section covers strategies for keeping the system accurate over time.
Scheduled Retraining
"""
Model Retraining Pipeline
Automates periodic model updates with new data.
"""
from datetime import date, timedelta
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class RetrainingPipeline:
"""
Manages scheduled model retraining.
Implements:
- Daily Elo updates
- Weekly feature recalculation
- Monthly full model retraining
"""
def __init__(
self,
prediction_system,
data_pipeline,
feature_engineer
):
self.prediction_system = prediction_system
self.data_pipeline = data_pipeline
self.feature_engineer = feature_engineer
self.last_elo_update: Optional[date] = None
self.last_feature_update: Optional[date] = None
self.last_full_retrain: Optional[date] = None
def run_daily_updates(self):
"""Run daily update tasks."""
logger.info("Running daily updates")
# Update Elo ratings with yesterday's results
yesterday = date.today() - timedelta(days=1)
games = self.data_pipeline.collect_games(yesterday, yesterday)
for game in games:
if game.is_completed:
self.prediction_system.elo_system.update_ratings(
game.home_team,
game.away_team,
game.home_score,
game.away_score,
game.date
)
self.last_elo_update = date.today()
logger.info(f"Updated Elo ratings with {len(games)} games")
def run_weekly_updates(self):
"""Run weekly update tasks."""
logger.info("Running weekly updates")
# Recalculate team statistics
# Update feature cache
# Check for data quality issues
self.last_feature_update = date.today()
def run_monthly_retrain(self):
"""Run full model retraining."""
logger.info("Running monthly model retraining")
# Collect all data from current season
season_start = self._get_season_start()
games = self.data_pipeline.collect_games(season_start, date.today())
# Generate features for all games
features_matrix, labels = self._prepare_training_data(games)
# Retrain ensemble
self.prediction_system.ensemble.fit(
games,
features_matrix,
labels["win"],
labels["spread"]
)
# Evaluate and log performance
metrics = self.prediction_system.tracker.calculate_metrics()
logger.info(f"Retrained model - Log loss: {metrics.log_loss:.4f}")
self.last_full_retrain = date.today()
def _get_season_start(self) -> date:
"""Get the start date of the current NBA season."""
today = date.today()
year = today.year if today.month >= 10 else today.year - 1
return date(year, 10, 1)
def _prepare_training_data(self, games):
"""Prepare feature matrix and labels from games."""
features_list = []
win_labels = []
spread_labels = []
for game in games:
if not game.is_completed:
continue
features = self.feature_engineer.compute_game_features(
game.home_team,
game.away_team,
game.date
)
features_list.append(features)
win_labels.append(1 if game.home_win else 0)
spread_labels.append(game.home_score - game.away_score)
import numpy as np
feature_names = list(features_list[0].keys()) if features_list else []
features_matrix = np.array([
[f[name] for name in feature_names]
for f in features_list
])
return features_matrix, {
"win": np.array(win_labels),
"spread": np.array(spread_labels)
}
class ModelVersionManager:
"""Manages model versions and rollback capability."""
def __init__(self, storage_path: str):
self.storage_path = storage_path
self.versions: List[Dict] = []
def save_version(
self,
model,
version_name: str,
metrics: Dict
):
"""Save a model version with metadata."""
import os
version_path = os.path.join(
self.storage_path,
f"model_{version_name}.pkl"
)
model.save(version_path)
self.versions.append({
"name": version_name,
"path": version_path,
"created_at": datetime.now().isoformat(),
"metrics": metrics
})
logger.info(f"Saved model version: {version_name}")
def rollback(self, version_name: str):
"""Rollback to a previous model version."""
version = next(
(v for v in self.versions if v["name"] == version_name),
None
)
if version is None:
raise ValueError(f"Version {version_name} not found")
logger.info(f"Rolling back to version: {version_name}")
return version["path"]
def get_best_version(self, metric: str = "log_loss") -> str:
"""Get the version with best performance on a metric."""
if not self.versions:
raise ValueError("No versions available")
# Lower is better for log_loss, higher for accuracy
reverse = metric in ["accuracy", "ats_accuracy"]
best = sorted(
self.versions,
key=lambda v: v["metrics"].get(metric, float("inf")),
reverse=reverse
)[0]
return best["name"]
Complete System Integration
The following brings all components together into a unified prediction system.
"""
Complete Prediction System
Integrates all components into a production-ready system.
"""
from datetime import date
from typing import List, Optional
import logging
# Import all components
from elo import EloSystem, EloConfig
from features import FeatureEngineer
from models import EnsembleModel, ModelConfig, Prediction
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PredictionSystem:
"""
Complete NBA Game Prediction System.
Integrates:
- Data pipeline
- Feature engineering
- Elo ratings
- Machine learning models
- Performance tracking
"""
# Features used by ML models
FEATURE_LIST = [
"net_rating_diff",
"offensive_rating_diff",
"defensive_rating_diff",
"efg_diff",
"pace_diff",
"turnover_diff",
"orb_rate_diff",
"recent_net_diff",
"rest_advantage",
"home_back_to_back",
"away_back_to_back",
"away_travel_distance",
"h2h_home_win_pct",
"injury_impact_diff",
"home_court_advantage"
]
def __init__(self, config_path: Optional[str] = None):
"""Initialize the prediction system."""
logger.info("Initializing Prediction System")
# Initialize components
self.data_pipeline = DataPipeline()
self.feature_engineer = FeatureEngineer(self.data_pipeline)
# Initialize Elo system
elo_config = EloConfig(
k_factor=20,
home_advantage=100,
initial_rating=1500
)
self.elo_system = EloSystem(elo_config)
# Initialize ML models
model_config = ModelConfig(features=self.FEATURE_LIST)
self.ensemble = EnsembleModel(
self.elo_system,
self.feature_engineer,
model_config
)
# Initialize tracker
self.tracker = PerformanceTracker()
# Load saved models if available
if config_path:
self._load_configuration(config_path)
def train(
self,
start_date: date,
end_date: date
):
"""
Train the system on historical data.
This should be run before making predictions.
"""
logger.info(f"Training on data from {start_date} to {end_date}")
# Collect historical games
games = self.data_pipeline.collect_games(start_date, end_date)
logger.info(f"Collected {len(games)} games")
# Update Elo ratings chronologically
for game in sorted(games, key=lambda g: g.date):
if game.is_completed:
self.elo_system.update_ratings(
game.home_team,
game.away_team,
game.home_score,
game.away_score,
game.date
)
# Prepare training data for ML models
features_list = []
win_labels = []
spread_labels = []
for game in games:
if not game.is_completed:
continue
try:
features = self.feature_engineer.compute_game_features(
game.home_team,
game.away_team,
game.date
)
features_list.append([features[f] for f in self.FEATURE_LIST])
win_labels.append(1 if game.home_win else 0)
spread_labels.append(game.home_score - game.away_score)
except Exception as e:
logger.warning(f"Failed to compute features for {game.game_id}: {e}")
import numpy as np
features_matrix = np.array(features_list)
win_labels = np.array(win_labels)
spread_labels = np.array(spread_labels)
# Train ensemble
self.ensemble.fit(games, features_matrix, win_labels, spread_labels)
logger.info("Training complete")
def predict(
self,
home_team: str,
away_team: str,
game_date: Optional[date] = None
) -> Prediction:
"""
Generate prediction for a game.
Args:
home_team: Home team abbreviation
away_team: Away team abbreviation
game_date: Game date (defaults to today)
Returns:
Prediction object with probabilities and spread
"""
if game_date is None:
game_date = date.today()
prediction = self.ensemble.predict(home_team, away_team, game_date)
# Record prediction for tracking
game_id = f"{game_date.isoformat()}_{home_team}_{away_team}"
self.tracker.record_prediction(
game_id=game_id,
home_team=home_team,
away_team=away_team,
home_win_prob=prediction.home_win_prob,
predicted_spread=prediction.predicted_spread
)
return prediction
def predict_todays_games(self) -> List[Prediction]:
"""Generate predictions for all of today's games."""
games = self.data_pipeline.collect_games(date.today(), date.today())
predictions = []
for game in games:
if not game.is_completed:
pred = self.predict(game.home_team, game.away_team, game.date)
predictions.append(pred)
return predictions
def update_with_results(self, game_date: date):
"""
Update system with completed game results.
Updates Elo ratings and resolves predictions.
"""
games = self.data_pipeline.collect_games(game_date, game_date)
for game in games:
if game.is_completed:
# Update Elo
self.elo_system.update_ratings(
game.home_team,
game.away_team,
game.home_score,
game.away_score,
game.date
)
# Resolve prediction
game_id = f"{game.date.isoformat()}_{game.home_team}_{game.away_team}"
self.tracker.resolve_prediction(
game_id,
game.home_score,
game.away_score
)
logger.info(f"Updated with {len(games)} results from {game_date}")
def get_performance_report(self) -> dict:
"""Get current performance metrics."""
return self.tracker.generate_report()
def _load_configuration(self, config_path: str):
"""Load system configuration from file."""
pass
# Main entry point
if __name__ == "__main__":
# Example usage
system = PredictionSystem()
# Train on historical data
from datetime import timedelta
train_end = date.today() - timedelta(days=1)
train_start = train_end - timedelta(days=365)
system.train(train_start, train_end)
# Make a prediction
prediction = system.predict("LAL", "BOS")
print(f"\nPrediction: {prediction.away_team} @ {prediction.home_team}")
print(f"Home win probability: {prediction.home_win_prob:.1%}")
print(f"Predicted spread: {prediction.predicted_spread:+.1f}")
print(f"Confidence: {prediction.confidence:.1%}")
Deployment Considerations
Docker Configuration
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "api:app", "--host", "0.0.0.0", "--port", "8000"]
Environment Configuration
# config.py
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
"""Application settings loaded from environment."""
# API settings
api_host: str = "0.0.0.0"
api_port: int = 8000
debug: bool = False
# Database
database_url: str = "sqlite:///predictions.db"
# Model settings
model_path: str = "./models"
retrain_schedule: str = "0 4 * * *" # 4 AM daily
# Data sources
nba_api_key: str = ""
class Config:
env_file = ".env"
settings = Settings()
Project Exercises
Exercise 1: Extend Feature Engineering
Add player-level features to the prediction system:
- Implement a
PlayerImpactCalculatorclass that estimates each player's contribution to team performance - Add features for: - Minutes-weighted plus/minus differential - Key player availability (top 3 players by minutes) - Position group strength comparisons
Exercise 2: Implement Live Game Predictions
Extend the system to update predictions during games:
- Create a
LiveGamePredictorclass - Implement score-based probability updates using the log5 method
- Add time remaining as a feature
- Build a WebSocket endpoint for real-time updates
Exercise 3: Add Betting Market Integration
Compare predictions against betting market lines:
- Implement a
MarketDataCollectorfor odds feeds - Calculate implied probabilities from odds
- Track closing line value (CLV) for predictions
- Add market-relative metrics to the performance tracker
Exercise 4: Build a Backtesting Framework
Create comprehensive backtesting capabilities:
- Implement walk-forward validation
- Calculate profitability metrics assuming various betting strategies
- Generate detailed performance reports by: - Team - Month/season - Game context (home/away, rest, etc.)
Summary
This capstone project has guided you through building a production-ready NBA game prediction system. The key components include:
-
Robust Data Pipeline: Handles data collection, validation, and storage with proper error handling and caching.
-
Comprehensive Feature Engineering: Transforms raw statistics into predictive signals including team performance, rest, travel, injuries, and matchup history.
-
Elo Rating System: Provides an interpretable baseline that captures team strength dynamics throughout the season.
-
Machine Learning Ensemble: Combines multiple models to capture different aspects of game prediction, improving overall accuracy.
-
REST API: Exposes predictions through a well-documented, production-ready interface.
-
Performance Monitoring: Tracks prediction accuracy over time and alerts when performance degrades.
-
Continuous Improvement: Implements automated retraining and model versioning for ongoing maintenance.
The modular architecture allows each component to be improved independently while maintaining system stability. As you continue developing the system, focus on:
- Expanding feature coverage with additional data sources
- Experimenting with new model architectures
- Improving calibration for edge cases
- Building robust deployment and monitoring infrastructure
Remember that prediction accuracy in sports is inherently limited by the random nature of game outcomes. A well-calibrated system that honestly represents uncertainty is more valuable than one that overconfidently makes predictions. Focus on generating accurate probabilities rather than maximizing the number of correct binary predictions.