Case Study 1: Building a Comprehensive Player Statistics Pipeline
Overview
Scenario: The Phoenix Suns analytics department needs to build an automated data collection system that aggregates player statistics from multiple sources, validates data quality, and produces unified datasets for their coaching staff and front office.
Duration: 2-3 hours Difficulty: Intermediate Prerequisites: Chapter 2 concepts, basic Python proficiency
Background
The Suns' analytics team currently relies on manual data collection processes that are time-consuming and error-prone. They need a robust, automated pipeline that can:
- Collect player statistics from the NBA API
- Supplement with advanced metrics from Basketball-Reference
- Validate data consistency between sources
- Handle missing data appropriately
- Produce clean, analysis-ready datasets
The team has identified Devin Booker as the primary subject for testing the pipeline, with plans to scale to the full roster once validated.
Part 1: Designing the Data Architecture
1.1 Data Requirements Analysis
The coaching staff has requested the following data categories:
Traditional Statistics: - Per-game averages (points, rebounds, assists, steals, blocks) - Shooting splits (FG%, 3P%, FT%) - Minutes played - Games played/started
Advanced Metrics: - True Shooting Percentage (TS%) - Player Efficiency Rating (PER) - Usage Rate (USG%) - Win Shares - Box Plus/Minus (BPM)
Play-by-Play Derived: - Shot distribution by zone - Scoring by quarter - Performance in clutch situations
1.2 Source Mapping
| Data Category | Primary Source | Secondary Source | Update Frequency |
|---|---|---|---|
| Traditional stats | NBA API | Basketball-Reference | Daily |
| Advanced metrics | NBA API | Basketball-Reference | Daily |
| Shot chart data | NBA API | N/A | Daily |
| Play-by-play | NBA API | N/A | Post-game |
| Historical data | Basketball-Reference | NBA API | Seasonal |
Part 2: Implementation
2.1 Setting Up the Collection Framework
"""
Phoenix Suns Player Statistics Pipeline
Case Study 1 - Chapter 2: Data Sources and Collection
"""
import pandas as pd
import numpy as np
import time
import logging
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class PlayerStatsPipeline:
"""
A comprehensive data collection pipeline for NBA player statistics.
This pipeline collects data from multiple sources, validates consistency,
and produces unified datasets for analysis.
Attributes:
data_dir: Base directory for storing collected data
cache_hours: Hours before cached data is considered stale
request_delay: Seconds between API requests
"""
def __init__(
self,
data_dir: str = "./data",
cache_hours: int = 24,
request_delay: float = 2.0
):
"""Initialize the pipeline with configuration settings."""
self.data_dir = Path(data_dir)
self.cache_hours = cache_hours
self.request_delay = request_delay
# Create directory structure
self._setup_directories()
# Load or initialize metadata
self.metadata_path = self.data_dir / "metadata.json"
self.metadata = self._load_metadata()
logger.info("PlayerStatsPipeline initialized")
def _setup_directories(self) -> None:
"""Create required directory structure."""
directories = [
"raw/nba_api",
"raw/bbref",
"processed",
"validated",
"exports"
]
for dir_name in directories:
(self.data_dir / dir_name).mkdir(parents=True, exist_ok=True)
def _load_metadata(self) -> Dict:
"""Load pipeline metadata from disk."""
if self.metadata_path.exists():
with open(self.metadata_path, 'r') as f:
return json.load(f)
return {"collections": {}, "validations": {}}
def _save_metadata(self) -> None:
"""Persist pipeline metadata to disk."""
with open(self.metadata_path, 'w') as f:
json.dump(self.metadata, f, indent=2)
def _is_cache_valid(self, cache_key: str) -> bool:
"""Check if cached data is still valid."""
if cache_key not in self.metadata["collections"]:
return False
last_collected = datetime.fromisoformat(
self.metadata["collections"][cache_key]["timestamp"]
)
expiry = last_collected + timedelta(hours=self.cache_hours)
return datetime.now() < expiry
def _wait_for_rate_limit(self) -> None:
"""Enforce rate limiting between requests."""
time.sleep(self.request_delay)
2.2 NBA API Data Collection
def collect_player_game_logs(
self,
player_id: int,
player_name: str,
season: str,
force_refresh: bool = False
) -> pd.DataFrame:
"""
Collect game-by-game statistics for a player.
Args:
player_id: NBA player ID
player_name: Player's full name (for logging)
season: Season in YYYY-YY format
force_refresh: Whether to ignore cache
Returns:
DataFrame with game log data
"""
from nba_api.stats.endpoints import playergamelog
cache_key = f"game_log_{player_id}_{season}"
cache_path = self.data_dir / "raw/nba_api" / f"{cache_key}.parquet"
# Check cache
if not force_refresh and cache_path.exists() and self._is_cache_valid(cache_key):
logger.info(f"Loading cached game log for {player_name}")
return pd.read_parquet(cache_path)
# Fetch from API
logger.info(f"Fetching game log from NBA API for {player_name}")
self._wait_for_rate_limit()
try:
game_log = playergamelog.PlayerGameLog(
player_id=player_id,
season=season,
season_type_all_star='Regular Season'
)
df = game_log.get_data_frames()[0]
# Save to cache
df.to_parquet(cache_path)
self.metadata["collections"][cache_key] = {
"timestamp": datetime.now().isoformat(),
"records": len(df),
"source": "nba_api"
}
self._save_metadata()
logger.info(f"Collected {len(df)} games for {player_name}")
return df
except Exception as e:
logger.error(f"Error collecting game log: {e}")
raise
def collect_shot_chart(
self,
player_id: int,
player_name: str,
season: str,
force_refresh: bool = False
) -> pd.DataFrame:
"""
Collect shot chart data for a player.
Args:
player_id: NBA player ID
player_name: Player's full name
season: Season in YYYY-YY format
force_refresh: Whether to ignore cache
Returns:
DataFrame with shot location data
"""
from nba_api.stats.endpoints import shotchartdetail
cache_key = f"shots_{player_id}_{season}"
cache_path = self.data_dir / "raw/nba_api" / f"{cache_key}.parquet"
if not force_refresh and cache_path.exists() and self._is_cache_valid(cache_key):
logger.info(f"Loading cached shot chart for {player_name}")
return pd.read_parquet(cache_path)
logger.info(f"Fetching shot chart from NBA API for {player_name}")
self._wait_for_rate_limit()
try:
shots = shotchartdetail.ShotChartDetail(
player_id=player_id,
team_id=0,
season_nullable=season,
context_measure_simple='FGA'
)
df = shots.get_data_frames()[0]
# Enhance with calculated fields
df['LOC_X_FEET'] = df['LOC_X'] / 10.0
df['LOC_Y_FEET'] = df['LOC_Y'] / 10.0
df['SHOT_DISTANCE_CALC'] = np.sqrt(
df['LOC_X_FEET']**2 + df['LOC_Y_FEET']**2
)
df.to_parquet(cache_path)
self.metadata["collections"][cache_key] = {
"timestamp": datetime.now().isoformat(),
"records": len(df),
"source": "nba_api"
}
self._save_metadata()
logger.info(f"Collected {len(df)} shots for {player_name}")
return df
except Exception as e:
logger.error(f"Error collecting shot chart: {e}")
raise
2.3 Basketball-Reference Integration
def collect_advanced_stats_bbref(
self,
player_bbref_id: str,
player_name: str,
force_refresh: bool = False
) -> pd.DataFrame:
"""
Collect advanced statistics from Basketball-Reference.
Args:
player_bbref_id: Basketball-Reference player ID
player_name: Player's full name
force_refresh: Whether to ignore cache
Returns:
DataFrame with advanced statistics
"""
import requests
from bs4 import BeautifulSoup
cache_key = f"bbref_advanced_{player_bbref_id}"
cache_path = self.data_dir / "raw/bbref" / f"{cache_key}.parquet"
if not force_refresh and cache_path.exists() and self._is_cache_valid(cache_key):
logger.info(f"Loading cached BBRef data for {player_name}")
return pd.read_parquet(cache_path)
logger.info(f"Fetching advanced stats from BBRef for {player_name}")
# Longer delay for BBRef (be respectful)
time.sleep(4.0)
url = f"https://www.basketball-reference.com/players/{player_bbref_id[0]}/{player_bbref_id}.html"
headers = {
'User-Agent': 'Mozilla/5.0 (Educational Research Purpose)'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Try to find advanced stats table
tables = pd.read_html(str(soup), match='Advanced')
if tables:
df = tables[0]
df.to_parquet(cache_path)
self.metadata["collections"][cache_key] = {
"timestamp": datetime.now().isoformat(),
"records": len(df),
"source": "basketball_reference"
}
self._save_metadata()
logger.info(f"Collected {len(df)} seasons of advanced stats")
return df
else:
logger.warning(f"No advanced stats table found for {player_name}")
return pd.DataFrame()
except Exception as e:
logger.error(f"Error collecting BBRef data: {e}")
raise
2.4 Data Validation
def validate_data_consistency(
self,
nba_api_df: pd.DataFrame,
bbref_df: pd.DataFrame,
season: str,
tolerance: float = 0.02
) -> Dict:
"""
Validate consistency between NBA API and BBRef data.
Args:
nba_api_df: Data from NBA API
bbref_df: Data from Basketball-Reference
season: Season to validate
tolerance: Acceptable difference ratio
Returns:
Dictionary with validation results
"""
logger.info(f"Validating data consistency for {season}")
validation_results = {
"season": season,
"timestamp": datetime.now().isoformat(),
"checks": [],
"overall_status": "PASS"
}
# Filter BBRef to specific season
season_year = int(season.split("-")[0]) + 1
bbref_season = bbref_df[bbref_df['Season'] == f"{season_year}"]
if len(bbref_season) == 0:
validation_results["overall_status"] = "SKIP"
validation_results["message"] = "Season not found in BBRef data"
return validation_results
# Calculate NBA API aggregates
nba_totals = {
'games': len(nba_api_df),
'points': nba_api_df['PTS'].sum(),
'rebounds': nba_api_df['REB'].sum(),
'assists': nba_api_df['AST'].sum()
}
# Validation checks
checks = [
("Games Played", nba_totals['games'], bbref_season['G'].values[0]),
("Total Points", nba_totals['points'], bbref_season['PTS'].values[0] if 'PTS' in bbref_season else None),
]
for check_name, nba_val, bbref_val in checks:
if bbref_val is None:
continue
if nba_val == 0 and bbref_val == 0:
status = "PASS"
diff_pct = 0
else:
diff_pct = abs(nba_val - bbref_val) / max(nba_val, bbref_val)
status = "PASS" if diff_pct <= tolerance else "FAIL"
check_result = {
"name": check_name,
"nba_api_value": float(nba_val),
"bbref_value": float(bbref_val),
"difference_pct": round(diff_pct * 100, 2),
"status": status
}
validation_results["checks"].append(check_result)
if status == "FAIL":
validation_results["overall_status"] = "FAIL"
logger.info(f"Validation complete: {validation_results['overall_status']}")
return validation_results
2.5 Unified Dataset Creation
def create_unified_dataset(
self,
player_id: int,
player_name: str,
player_bbref_id: str,
seasons: List[str]
) -> pd.DataFrame:
"""
Create a unified dataset combining multiple sources.
Args:
player_id: NBA player ID
player_name: Player's full name
player_bbref_id: Basketball-Reference ID
seasons: List of seasons to collect
Returns:
Unified DataFrame with all statistics
"""
logger.info(f"Creating unified dataset for {player_name}")
all_game_logs = []
all_shots = []
for season in seasons:
# Collect from each source
game_log = self.collect_player_game_logs(
player_id, player_name, season
)
game_log['SEASON'] = season
all_game_logs.append(game_log)
shots = self.collect_shot_chart(
player_id, player_name, season
)
shots['SEASON'] = season
all_shots.append(shots)
# Combine DataFrames
game_logs_df = pd.concat(all_game_logs, ignore_index=True)
shots_df = pd.concat(all_shots, ignore_index=True)
# Collect advanced stats
advanced_df = self.collect_advanced_stats_bbref(
player_bbref_id, player_name
)
# Calculate derived metrics
game_logs_df['GAME_DATE'] = pd.to_datetime(game_logs_df['GAME_DATE'])
game_logs_df = game_logs_df.sort_values('GAME_DATE')
# Export unified datasets
export_path = self.data_dir / "exports"
game_logs_df.to_parquet(
export_path / f"{player_name.lower().replace(' ', '_')}_game_logs.parquet"
)
shots_df.to_parquet(
export_path / f"{player_name.lower().replace(' ', '_')}_shots.parquet"
)
logger.info(f"Unified dataset created: {len(game_logs_df)} games, {len(shots_df)} shots")
return game_logs_df
Part 3: Execution and Analysis
3.1 Running the Pipeline
def main():
"""Execute the player statistics pipeline for Devin Booker."""
# Initialize pipeline
pipeline = PlayerStatsPipeline(
data_dir="./suns_analytics_data",
cache_hours=12,
request_delay=2.5
)
# Devin Booker's identifiers
BOOKER_NBA_ID = 1626164
BOOKER_BBREF_ID = "bookede01"
BOOKER_NAME = "Devin Booker"
# Seasons to collect
seasons = ["2021-22", "2022-23", "2023-24"]
# Create unified dataset
booker_data = pipeline.create_unified_dataset(
player_id=BOOKER_NBA_ID,
player_name=BOOKER_NAME,
player_bbref_id=BOOKER_BBREF_ID,
seasons=seasons
)
# Generate summary statistics
print("\n=== Devin Booker Career Summary ===")
print(f"Total Games: {len(booker_data)}")
print(f"Average PPG: {booker_data['PTS'].mean():.1f}")
print(f"Average APG: {booker_data['AST'].mean():.1f}")
print(f"Average RPG: {booker_data['REB'].mean():.1f}")
# Validate data
for season in seasons:
season_data = booker_data[booker_data['SEASON'] == season]
advanced = pd.read_parquet(
pipeline.data_dir / "raw/bbref" / f"bbref_advanced_{BOOKER_BBREF_ID}.parquet"
)
validation = pipeline.validate_data_consistency(
season_data, advanced, season
)
print(f"\nValidation {season}: {validation['overall_status']}")
if __name__ == "__main__":
main()
Part 4: Discussion Questions
Question 1: Design Decisions
Why does the pipeline use separate cache directories for NBA API and Basketball-Reference data? What advantages does this provide for debugging and maintenance?
Question 2: Error Handling
The current implementation raises exceptions when API calls fail. How would you modify the error handling to make the pipeline more resilient for production use?
Question 3: Scalability
If the Suns wanted to extend this pipeline to collect data for all 450+ NBA players, what modifications would be necessary? Consider rate limiting, parallel processing, and storage requirements.
Question 4: Data Quality
The validation function checks for consistency between sources. What additional validation checks would you implement to ensure data quality?
Question 5: Real-Time Updates
How would you modify this pipeline to support real-time updates during the season, such as updating statistics after each game?
Part 5: Extension Challenges
Challenge 1: Tracking Data Integration
Extend the pipeline to collect and integrate tracking-derived statistics from the NBA API's tracking endpoints. Handle cases where tracking data may not be available.
Challenge 2: Historical Comparison
Add functionality to compare a player's current season statistics to their career averages and league percentiles.
Challenge 3: Team-Wide Collection
Scale the pipeline to collect data for an entire team roster, implementing concurrent collection with proper rate limiting.
Challenge 4: Automated Alerts
Implement an alerting system that notifies analysts when data validation fails or when collection errors occur.
Deliverables
By completing this case study, you should produce:
- Pipeline Code: A functional Python module implementing the data collection pipeline
- Sample Dataset: Exported Parquet files for the test player
- Validation Report: Documentation of data consistency checks
- Design Document: Brief explanation of architectural decisions
- Extension Proposal: Plan for one of the extension challenges
Assessment Criteria
| Criterion | Weight | Description |
|---|---|---|
| Code Quality | 25% | Clean, well-documented, follows Python best practices |
| Functionality | 25% | Pipeline correctly collects and processes data |
| Error Handling | 20% | Graceful handling of API failures and edge cases |
| Data Validation | 15% | Robust consistency checks between sources |
| Documentation | 15% | Clear explanations and comments |
Key Takeaways
- Multi-source data collection requires careful coordination of different APIs and scrapers
- Caching and rate limiting are essential for responsible and efficient data collection
- Data validation between sources helps identify inconsistencies before they affect analysis
- Modular design enables easy extension and maintenance of collection pipelines
- Logging and metadata provide audit trails for debugging and reproducibility
This case study demonstrates the practical application of Chapter 2 concepts in a real-world analytics scenario. The techniques learned here form the foundation for building production-grade data infrastructure for professional basketball analysis.