Case Study 1: Building a Comprehensive Player Statistics Pipeline

Overview

Scenario: The Phoenix Suns analytics department needs to build an automated data collection system that aggregates player statistics from multiple sources, validates data quality, and produces unified datasets for their coaching staff and front office.

Duration: 2-3 hours Difficulty: Intermediate Prerequisites: Chapter 2 concepts, basic Python proficiency


Background

The Suns' analytics team currently relies on manual data collection processes that are time-consuming and error-prone. They need a robust, automated pipeline that can:

  1. Collect player statistics from the NBA API
  2. Supplement with advanced metrics from Basketball-Reference
  3. Validate data consistency between sources
  4. Handle missing data appropriately
  5. Produce clean, analysis-ready datasets

The team has identified Devin Booker as the primary subject for testing the pipeline, with plans to scale to the full roster once validated.


Part 1: Designing the Data Architecture

1.1 Data Requirements Analysis

The coaching staff has requested the following data categories:

Traditional Statistics: - Per-game averages (points, rebounds, assists, steals, blocks) - Shooting splits (FG%, 3P%, FT%) - Minutes played - Games played/started

Advanced Metrics: - True Shooting Percentage (TS%) - Player Efficiency Rating (PER) - Usage Rate (USG%) - Win Shares - Box Plus/Minus (BPM)

Play-by-Play Derived: - Shot distribution by zone - Scoring by quarter - Performance in clutch situations

1.2 Source Mapping

Data Category Primary Source Secondary Source Update Frequency
Traditional stats NBA API Basketball-Reference Daily
Advanced metrics NBA API Basketball-Reference Daily
Shot chart data NBA API N/A Daily
Play-by-play NBA API N/A Post-game
Historical data Basketball-Reference NBA API Seasonal

Part 2: Implementation

2.1 Setting Up the Collection Framework

"""
Phoenix Suns Player Statistics Pipeline
Case Study 1 - Chapter 2: Data Sources and Collection
"""

import pandas as pd
import numpy as np
import time
import logging
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import json

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('pipeline.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


class PlayerStatsPipeline:
    """
    A comprehensive data collection pipeline for NBA player statistics.

    This pipeline collects data from multiple sources, validates consistency,
    and produces unified datasets for analysis.

    Attributes:
        data_dir: Base directory for storing collected data
        cache_hours: Hours before cached data is considered stale
        request_delay: Seconds between API requests
    """

    def __init__(
        self,
        data_dir: str = "./data",
        cache_hours: int = 24,
        request_delay: float = 2.0
    ):
        """Initialize the pipeline with configuration settings."""
        self.data_dir = Path(data_dir)
        self.cache_hours = cache_hours
        self.request_delay = request_delay

        # Create directory structure
        self._setup_directories()

        # Load or initialize metadata
        self.metadata_path = self.data_dir / "metadata.json"
        self.metadata = self._load_metadata()

        logger.info("PlayerStatsPipeline initialized")

    def _setup_directories(self) -> None:
        """Create required directory structure."""
        directories = [
            "raw/nba_api",
            "raw/bbref",
            "processed",
            "validated",
            "exports"
        ]
        for dir_name in directories:
            (self.data_dir / dir_name).mkdir(parents=True, exist_ok=True)

    def _load_metadata(self) -> Dict:
        """Load pipeline metadata from disk."""
        if self.metadata_path.exists():
            with open(self.metadata_path, 'r') as f:
                return json.load(f)
        return {"collections": {}, "validations": {}}

    def _save_metadata(self) -> None:
        """Persist pipeline metadata to disk."""
        with open(self.metadata_path, 'w') as f:
            json.dump(self.metadata, f, indent=2)

    def _is_cache_valid(self, cache_key: str) -> bool:
        """Check if cached data is still valid."""
        if cache_key not in self.metadata["collections"]:
            return False

        last_collected = datetime.fromisoformat(
            self.metadata["collections"][cache_key]["timestamp"]
        )
        expiry = last_collected + timedelta(hours=self.cache_hours)

        return datetime.now() < expiry

    def _wait_for_rate_limit(self) -> None:
        """Enforce rate limiting between requests."""
        time.sleep(self.request_delay)

2.2 NBA API Data Collection

    def collect_player_game_logs(
        self,
        player_id: int,
        player_name: str,
        season: str,
        force_refresh: bool = False
    ) -> pd.DataFrame:
        """
        Collect game-by-game statistics for a player.

        Args:
            player_id: NBA player ID
            player_name: Player's full name (for logging)
            season: Season in YYYY-YY format
            force_refresh: Whether to ignore cache

        Returns:
            DataFrame with game log data
        """
        from nba_api.stats.endpoints import playergamelog

        cache_key = f"game_log_{player_id}_{season}"
        cache_path = self.data_dir / "raw/nba_api" / f"{cache_key}.parquet"

        # Check cache
        if not force_refresh and cache_path.exists() and self._is_cache_valid(cache_key):
            logger.info(f"Loading cached game log for {player_name}")
            return pd.read_parquet(cache_path)

        # Fetch from API
        logger.info(f"Fetching game log from NBA API for {player_name}")
        self._wait_for_rate_limit()

        try:
            game_log = playergamelog.PlayerGameLog(
                player_id=player_id,
                season=season,
                season_type_all_star='Regular Season'
            )
            df = game_log.get_data_frames()[0]

            # Save to cache
            df.to_parquet(cache_path)
            self.metadata["collections"][cache_key] = {
                "timestamp": datetime.now().isoformat(),
                "records": len(df),
                "source": "nba_api"
            }
            self._save_metadata()

            logger.info(f"Collected {len(df)} games for {player_name}")
            return df

        except Exception as e:
            logger.error(f"Error collecting game log: {e}")
            raise

    def collect_shot_chart(
        self,
        player_id: int,
        player_name: str,
        season: str,
        force_refresh: bool = False
    ) -> pd.DataFrame:
        """
        Collect shot chart data for a player.

        Args:
            player_id: NBA player ID
            player_name: Player's full name
            season: Season in YYYY-YY format
            force_refresh: Whether to ignore cache

        Returns:
            DataFrame with shot location data
        """
        from nba_api.stats.endpoints import shotchartdetail

        cache_key = f"shots_{player_id}_{season}"
        cache_path = self.data_dir / "raw/nba_api" / f"{cache_key}.parquet"

        if not force_refresh and cache_path.exists() and self._is_cache_valid(cache_key):
            logger.info(f"Loading cached shot chart for {player_name}")
            return pd.read_parquet(cache_path)

        logger.info(f"Fetching shot chart from NBA API for {player_name}")
        self._wait_for_rate_limit()

        try:
            shots = shotchartdetail.ShotChartDetail(
                player_id=player_id,
                team_id=0,
                season_nullable=season,
                context_measure_simple='FGA'
            )
            df = shots.get_data_frames()[0]

            # Enhance with calculated fields
            df['LOC_X_FEET'] = df['LOC_X'] / 10.0
            df['LOC_Y_FEET'] = df['LOC_Y'] / 10.0
            df['SHOT_DISTANCE_CALC'] = np.sqrt(
                df['LOC_X_FEET']**2 + df['LOC_Y_FEET']**2
            )

            df.to_parquet(cache_path)
            self.metadata["collections"][cache_key] = {
                "timestamp": datetime.now().isoformat(),
                "records": len(df),
                "source": "nba_api"
            }
            self._save_metadata()

            logger.info(f"Collected {len(df)} shots for {player_name}")
            return df

        except Exception as e:
            logger.error(f"Error collecting shot chart: {e}")
            raise

2.3 Basketball-Reference Integration

    def collect_advanced_stats_bbref(
        self,
        player_bbref_id: str,
        player_name: str,
        force_refresh: bool = False
    ) -> pd.DataFrame:
        """
        Collect advanced statistics from Basketball-Reference.

        Args:
            player_bbref_id: Basketball-Reference player ID
            player_name: Player's full name
            force_refresh: Whether to ignore cache

        Returns:
            DataFrame with advanced statistics
        """
        import requests
        from bs4 import BeautifulSoup

        cache_key = f"bbref_advanced_{player_bbref_id}"
        cache_path = self.data_dir / "raw/bbref" / f"{cache_key}.parquet"

        if not force_refresh and cache_path.exists() and self._is_cache_valid(cache_key):
            logger.info(f"Loading cached BBRef data for {player_name}")
            return pd.read_parquet(cache_path)

        logger.info(f"Fetching advanced stats from BBRef for {player_name}")

        # Longer delay for BBRef (be respectful)
        time.sleep(4.0)

        url = f"https://www.basketball-reference.com/players/{player_bbref_id[0]}/{player_bbref_id}.html"

        headers = {
            'User-Agent': 'Mozilla/5.0 (Educational Research Purpose)'
        }

        try:
            response = requests.get(url, headers=headers)
            response.raise_for_status()

            soup = BeautifulSoup(response.content, 'html.parser')

            # Try to find advanced stats table
            tables = pd.read_html(str(soup), match='Advanced')

            if tables:
                df = tables[0]
                df.to_parquet(cache_path)
                self.metadata["collections"][cache_key] = {
                    "timestamp": datetime.now().isoformat(),
                    "records": len(df),
                    "source": "basketball_reference"
                }
                self._save_metadata()

                logger.info(f"Collected {len(df)} seasons of advanced stats")
                return df
            else:
                logger.warning(f"No advanced stats table found for {player_name}")
                return pd.DataFrame()

        except Exception as e:
            logger.error(f"Error collecting BBRef data: {e}")
            raise

2.4 Data Validation

    def validate_data_consistency(
        self,
        nba_api_df: pd.DataFrame,
        bbref_df: pd.DataFrame,
        season: str,
        tolerance: float = 0.02
    ) -> Dict:
        """
        Validate consistency between NBA API and BBRef data.

        Args:
            nba_api_df: Data from NBA API
            bbref_df: Data from Basketball-Reference
            season: Season to validate
            tolerance: Acceptable difference ratio

        Returns:
            Dictionary with validation results
        """
        logger.info(f"Validating data consistency for {season}")

        validation_results = {
            "season": season,
            "timestamp": datetime.now().isoformat(),
            "checks": [],
            "overall_status": "PASS"
        }

        # Filter BBRef to specific season
        season_year = int(season.split("-")[0]) + 1
        bbref_season = bbref_df[bbref_df['Season'] == f"{season_year}"]

        if len(bbref_season) == 0:
            validation_results["overall_status"] = "SKIP"
            validation_results["message"] = "Season not found in BBRef data"
            return validation_results

        # Calculate NBA API aggregates
        nba_totals = {
            'games': len(nba_api_df),
            'points': nba_api_df['PTS'].sum(),
            'rebounds': nba_api_df['REB'].sum(),
            'assists': nba_api_df['AST'].sum()
        }

        # Validation checks
        checks = [
            ("Games Played", nba_totals['games'], bbref_season['G'].values[0]),
            ("Total Points", nba_totals['points'], bbref_season['PTS'].values[0] if 'PTS' in bbref_season else None),
        ]

        for check_name, nba_val, bbref_val in checks:
            if bbref_val is None:
                continue

            if nba_val == 0 and bbref_val == 0:
                status = "PASS"
                diff_pct = 0
            else:
                diff_pct = abs(nba_val - bbref_val) / max(nba_val, bbref_val)
                status = "PASS" if diff_pct <= tolerance else "FAIL"

            check_result = {
                "name": check_name,
                "nba_api_value": float(nba_val),
                "bbref_value": float(bbref_val),
                "difference_pct": round(diff_pct * 100, 2),
                "status": status
            }
            validation_results["checks"].append(check_result)

            if status == "FAIL":
                validation_results["overall_status"] = "FAIL"

        logger.info(f"Validation complete: {validation_results['overall_status']}")
        return validation_results

2.5 Unified Dataset Creation

    def create_unified_dataset(
        self,
        player_id: int,
        player_name: str,
        player_bbref_id: str,
        seasons: List[str]
    ) -> pd.DataFrame:
        """
        Create a unified dataset combining multiple sources.

        Args:
            player_id: NBA player ID
            player_name: Player's full name
            player_bbref_id: Basketball-Reference ID
            seasons: List of seasons to collect

        Returns:
            Unified DataFrame with all statistics
        """
        logger.info(f"Creating unified dataset for {player_name}")

        all_game_logs = []
        all_shots = []

        for season in seasons:
            # Collect from each source
            game_log = self.collect_player_game_logs(
                player_id, player_name, season
            )
            game_log['SEASON'] = season
            all_game_logs.append(game_log)

            shots = self.collect_shot_chart(
                player_id, player_name, season
            )
            shots['SEASON'] = season
            all_shots.append(shots)

        # Combine DataFrames
        game_logs_df = pd.concat(all_game_logs, ignore_index=True)
        shots_df = pd.concat(all_shots, ignore_index=True)

        # Collect advanced stats
        advanced_df = self.collect_advanced_stats_bbref(
            player_bbref_id, player_name
        )

        # Calculate derived metrics
        game_logs_df['GAME_DATE'] = pd.to_datetime(game_logs_df['GAME_DATE'])
        game_logs_df = game_logs_df.sort_values('GAME_DATE')

        # Export unified datasets
        export_path = self.data_dir / "exports"

        game_logs_df.to_parquet(
            export_path / f"{player_name.lower().replace(' ', '_')}_game_logs.parquet"
        )
        shots_df.to_parquet(
            export_path / f"{player_name.lower().replace(' ', '_')}_shots.parquet"
        )

        logger.info(f"Unified dataset created: {len(game_logs_df)} games, {len(shots_df)} shots")

        return game_logs_df

Part 3: Execution and Analysis

3.1 Running the Pipeline

def main():
    """Execute the player statistics pipeline for Devin Booker."""

    # Initialize pipeline
    pipeline = PlayerStatsPipeline(
        data_dir="./suns_analytics_data",
        cache_hours=12,
        request_delay=2.5
    )

    # Devin Booker's identifiers
    BOOKER_NBA_ID = 1626164
    BOOKER_BBREF_ID = "bookede01"
    BOOKER_NAME = "Devin Booker"

    # Seasons to collect
    seasons = ["2021-22", "2022-23", "2023-24"]

    # Create unified dataset
    booker_data = pipeline.create_unified_dataset(
        player_id=BOOKER_NBA_ID,
        player_name=BOOKER_NAME,
        player_bbref_id=BOOKER_BBREF_ID,
        seasons=seasons
    )

    # Generate summary statistics
    print("\n=== Devin Booker Career Summary ===")
    print(f"Total Games: {len(booker_data)}")
    print(f"Average PPG: {booker_data['PTS'].mean():.1f}")
    print(f"Average APG: {booker_data['AST'].mean():.1f}")
    print(f"Average RPG: {booker_data['REB'].mean():.1f}")

    # Validate data
    for season in seasons:
        season_data = booker_data[booker_data['SEASON'] == season]
        advanced = pd.read_parquet(
            pipeline.data_dir / "raw/bbref" / f"bbref_advanced_{BOOKER_BBREF_ID}.parquet"
        )
        validation = pipeline.validate_data_consistency(
            season_data, advanced, season
        )
        print(f"\nValidation {season}: {validation['overall_status']}")


if __name__ == "__main__":
    main()

Part 4: Discussion Questions

Question 1: Design Decisions

Why does the pipeline use separate cache directories for NBA API and Basketball-Reference data? What advantages does this provide for debugging and maintenance?

Question 2: Error Handling

The current implementation raises exceptions when API calls fail. How would you modify the error handling to make the pipeline more resilient for production use?

Question 3: Scalability

If the Suns wanted to extend this pipeline to collect data for all 450+ NBA players, what modifications would be necessary? Consider rate limiting, parallel processing, and storage requirements.

Question 4: Data Quality

The validation function checks for consistency between sources. What additional validation checks would you implement to ensure data quality?

Question 5: Real-Time Updates

How would you modify this pipeline to support real-time updates during the season, such as updating statistics after each game?


Part 5: Extension Challenges

Challenge 1: Tracking Data Integration

Extend the pipeline to collect and integrate tracking-derived statistics from the NBA API's tracking endpoints. Handle cases where tracking data may not be available.

Challenge 2: Historical Comparison

Add functionality to compare a player's current season statistics to their career averages and league percentiles.

Challenge 3: Team-Wide Collection

Scale the pipeline to collect data for an entire team roster, implementing concurrent collection with proper rate limiting.

Challenge 4: Automated Alerts

Implement an alerting system that notifies analysts when data validation fails or when collection errors occur.


Deliverables

By completing this case study, you should produce:

  1. Pipeline Code: A functional Python module implementing the data collection pipeline
  2. Sample Dataset: Exported Parquet files for the test player
  3. Validation Report: Documentation of data consistency checks
  4. Design Document: Brief explanation of architectural decisions
  5. Extension Proposal: Plan for one of the extension challenges

Assessment Criteria

Criterion Weight Description
Code Quality 25% Clean, well-documented, follows Python best practices
Functionality 25% Pipeline correctly collects and processes data
Error Handling 20% Graceful handling of API failures and edge cases
Data Validation 15% Robust consistency checks between sources
Documentation 15% Clear explanations and comments

Key Takeaways

  1. Multi-source data collection requires careful coordination of different APIs and scrapers
  2. Caching and rate limiting are essential for responsible and efficient data collection
  3. Data validation between sources helps identify inconsistencies before they affect analysis
  4. Modular design enables easy extension and maintenance of collection pipelines
  5. Logging and metadata provide audit trails for debugging and reproducibility

This case study demonstrates the practical application of Chapter 2 concepts in a real-world analytics scenario. The techniques learned here form the foundation for building production-grade data infrastructure for professional basketball analysis.