Case Study: Analyzing Play-by-Play Data

"In God we trust. All others must bring data." — W. Edwards Deming

Executive Summary

Play-by-play data is the foundation of modern football analytics. This case study takes you through a complete workflow of loading, cleaning, transforming, and analyzing play-level data using pandas and NumPy. You'll build functions for calculating success rates, identifying explosive plays, and comparing offensive efficiency.

Skills Applied: - Large dataset handling - Complex filtering and aggregation - Vectorized calculations - Performance optimization


Background

The Challenge

You have play-by-play data from an entire season—over 100,000 plays. Your goal is to:

  1. Clean and validate the raw data
  2. Calculate play success for each play
  3. Compute team and game-level efficiency metrics
  4. Identify factors that correlate with offensive success

Play-by-Play Data Structure

Each row represents a single play with: - Game and timing information (quarter, clock) - Situation (down, distance, yard line) - Play type and result (yards gained) - Team information


Phase 1: Data Loading and Initial Exploration

Creating Realistic Play Data

import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
import time

np.random.seed(42)

def create_play_by_play_data(n_games: int = 100) -> pd.DataFrame:
    """
    Create realistic play-by-play data for analysis.

    Parameters
    ----------
    n_games : int
        Number of games to simulate

    Returns
    -------
    pd.DataFrame
        Play-by-play DataFrame
    """
    teams = [
        "Alabama", "Georgia", "Ohio State", "Michigan", "Texas",
        "Oklahoma", "LSU", "Florida", "Penn State", "Oregon",
        "Clemson", "Tennessee", "USC", "Notre Dame", "Auburn"
    ]

    all_plays = []
    play_id = 1

    for game_id in range(1, n_games + 1):
        # Select teams for this game
        home_team = np.random.choice(teams)
        away_team = np.random.choice([t for t in teams if t != home_team])

        # Generate plays for this game (typically 120-180 plays per game)
        n_plays = np.random.randint(120, 180)

        for play_num in range(n_plays):
            # Determine possession
            offense = home_team if np.random.random() > 0.5 else away_team
            defense = away_team if offense == home_team else home_team

            # Play context
            quarter = min(4, (play_num // 40) + 1)
            down = np.random.choice([1, 2, 3, 4], p=[0.35, 0.30, 0.25, 0.10])

            if down == 1:
                distance = 10
            else:
                distance = np.random.randint(1, 15)

            yard_line = np.random.randint(1, 99)  # Yards from own goal

            # Play type distribution
            play_type_probs = {
                "Pass": 0.52,
                "Rush": 0.38,
                "Punt": 0.04,
                "Field Goal": 0.02,
                "Kickoff": 0.02,
                "Penalty": 0.02
            }

            # Adjust for down and distance
            if down == 4:
                if distance > 3:
                    play_type_probs = {"Punt": 0.60, "Field Goal": 0.25,
                                      "Pass": 0.10, "Rush": 0.05}
                else:
                    play_type_probs = {"Rush": 0.40, "Pass": 0.35,
                                      "Field Goal": 0.15, "Punt": 0.10}
            elif down == 3 and distance > 5:
                play_type_probs = {"Pass": 0.75, "Rush": 0.20, "Punt": 0.03,
                                  "Penalty": 0.02}

            play_types = list(play_type_probs.keys())
            probs = list(play_type_probs.values())
            probs = [p / sum(probs) for p in probs]  # Normalize
            play_type = np.random.choice(play_types, p=probs)

            # Yards gained based on play type
            if play_type == "Pass":
                # Completion rate ~65%
                if np.random.random() < 0.65:
                    yards = int(np.random.normal(8, 10))
                else:
                    yards = 0  # Incomplete
                    # Small chance of interception
                    if np.random.random() < 0.03:
                        yards = None  # Will mark as turnover
            elif play_type == "Rush":
                yards = int(np.random.normal(4.2, 4))
                # Chance of fumble
                if np.random.random() < 0.01:
                    yards = None
            elif play_type == "Punt":
                yards = 0  # Punt doesn't gain yards for offense
            elif play_type == "Field Goal":
                # Success rate varies by distance
                fg_distance = 100 - yard_line + 17  # Approximate
                success_prob = max(0.3, min(0.95, 1.2 - fg_distance * 0.015))
                if np.random.random() < success_prob:
                    yards = None  # Scoring play
                else:
                    yards = 0
            elif play_type == "Kickoff":
                yards = 0
            else:  # Penalty
                yards = np.random.choice([-5, -10, -15, 5, 10, 15])

            # Handle edge cases
            if yards is not None:
                yards = max(-15, min(99 - yard_line, yards))

            all_plays.append({
                "play_id": play_id,
                "game_id": game_id,
                "home_team": home_team,
                "away_team": away_team,
                "offense": offense,
                "defense": defense,
                "quarter": quarter,
                "down": down if play_type in ["Pass", "Rush"] else None,
                "distance": distance if play_type in ["Pass", "Rush"] else None,
                "yard_line": yard_line,
                "play_type": play_type,
                "yards_gained": yards
            })
            play_id += 1

    return pd.DataFrame(all_plays)


# Create the dataset
print("Creating play-by-play data...")
start = time.time()
plays_df = create_play_by_play_data(100)
print(f"Created {len(plays_df)} plays in {time.time() - start:.2f}s")
print(f"\nDataFrame shape: {plays_df.shape}")
print(f"Memory usage: {plays_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

Initial Exploration

# Basic info
print("\nColumn types:")
print(plays_df.dtypes)

print("\nMissing values:")
print(plays_df.isnull().sum())

print("\nPlay type distribution:")
print(plays_df["play_type"].value_counts(normalize=True).round(3))

print("\nYards gained statistics:")
print(plays_df["yards_gained"].describe())

Phase 2: Data Cleaning and Validation

Cleaning Pipeline

def clean_play_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
    """
    Clean and validate play-by-play data.

    Parameters
    ----------
    df : pd.DataFrame
        Raw play data

    Returns
    -------
    Tuple[pd.DataFrame, Dict]
        Cleaned data and cleaning report
    """
    report = {
        "original_rows": len(df),
        "actions": []
    }

    clean_df = df.copy()

    # 1. Remove rows with missing yards for scrimmage plays
    scrimmage_plays = clean_df["play_type"].isin(["Pass", "Rush"])
    missing_yards = clean_df["yards_gained"].isna()
    turnovers = scrimmage_plays & missing_yards

    # Mark turnovers rather than remove
    clean_df["is_turnover"] = turnovers
    clean_df.loc[turnovers, "yards_gained"] = 0  # Assign 0 yards
    report["actions"].append(f"Marked {turnovers.sum()} turnovers")

    # 2. Validate down values
    invalid_down = clean_df["down"].notna() & ~clean_df["down"].isin([1, 2, 3, 4])
    if invalid_down.any():
        clean_df.loc[invalid_down, "down"] = None
        report["actions"].append(f"Fixed {invalid_down.sum()} invalid down values")

    # 3. Validate distance
    invalid_distance = clean_df["distance"].notna() & (
        (clean_df["distance"] < 1) | (clean_df["distance"] > 50)
    )
    if invalid_distance.any():
        clean_df.loc[invalid_distance, "distance"] = np.clip(
            clean_df.loc[invalid_distance, "distance"], 1, 50
        )
        report["actions"].append(f"Clipped {invalid_distance.sum()} invalid distances")

    # 4. Validate yard line
    invalid_yard_line = (clean_df["yard_line"] < 1) | (clean_df["yard_line"] > 99)
    if invalid_yard_line.any():
        clean_df.loc[invalid_yard_line, "yard_line"] = np.clip(
            clean_df.loc[invalid_yard_line, "yard_line"], 1, 99
        )
        report["actions"].append(f"Fixed {invalid_yard_line.sum()} invalid yard lines")

    # 5. Create standard columns
    clean_df["is_pass"] = clean_df["play_type"] == "Pass"
    clean_df["is_rush"] = clean_df["play_type"] == "Rush"
    clean_df["is_scrimmage"] = clean_df["play_type"].isin(["Pass", "Rush"])

    report["final_rows"] = len(clean_df)
    report["rows_removed"] = report["original_rows"] - report["final_rows"]

    return clean_df, report


# Clean the data
plays_clean, cleaning_report = clean_play_data(plays_df)

print("\nCLEANING REPORT")
print("-" * 40)
print(f"Original rows: {cleaning_report['original_rows']}")
print(f"Final rows: {cleaning_report['final_rows']}")
print("\nActions taken:")
for action in cleaning_report["actions"]:
    print(f"  - {action}")

Phase 3: Play Success Calculation

Defining Success

In modern football analytics, play success is typically defined as: - 1st down: Gain at least 40% of the distance to go - 2nd down: Gain at least 50% of the distance to go - 3rd down: Gain at least 100% of the distance to go (first down) - 4th down: Gain at least 100% of the distance to go (first down)

def calculate_play_success(
    df: pd.DataFrame,
    thresholds: Dict[int, float] = None
) -> pd.DataFrame:
    """
    Calculate play success for each play.

    Parameters
    ----------
    df : pd.DataFrame
        Play data with down, distance, yards_gained
    thresholds : Dict[int, float]
        Success thresholds by down (default: standard)

    Returns
    -------
    pd.DataFrame
        DataFrame with success column added
    """
    if thresholds is None:
        thresholds = {1: 0.40, 2: 0.50, 3: 1.0, 4: 1.0}

    result = df.copy()

    # Vectorized success calculation
    conditions = [
        (result["down"] == 1) & (result["yards_gained"] >= result["distance"] * thresholds[1]),
        (result["down"] == 2) & (result["yards_gained"] >= result["distance"] * thresholds[2]),
        (result["down"] == 3) & (result["yards_gained"] >= result["distance"] * thresholds[3]),
        (result["down"] == 4) & (result["yards_gained"] >= result["distance"] * thresholds[4]),
    ]
    choices = [True, True, True, True]

    result["is_success"] = np.select(conditions, choices, default=False)

    # Only applies to scrimmage plays with valid down
    result.loc[~result["is_scrimmage"], "is_success"] = None
    result.loc[result["down"].isna(), "is_success"] = None

    return result


# Calculate success
plays_with_success = calculate_play_success(plays_clean)

# Verify
print("\nSUCCESS RATE SUMMARY")
print("-" * 40)
scrimmage = plays_with_success[plays_with_success["is_scrimmage"]]
print(f"Overall success rate: {scrimmage['is_success'].mean():.1%}")

print("\nBy down:")
by_down = scrimmage.groupby("down")["is_success"].agg(["mean", "count"])
print(by_down)

print("\nBy play type:")
by_type = scrimmage.groupby("play_type")["is_success"].agg(["mean", "count"])
print(by_type)

Identifying Explosive Plays

def identify_explosive_plays(
    df: pd.DataFrame,
    pass_threshold: int = 15,
    rush_threshold: int = 10
) -> pd.DataFrame:
    """
    Identify explosive plays (big gains).

    Parameters
    ----------
    df : pd.DataFrame
        Play data
    pass_threshold : int
        Minimum yards for explosive pass
    rush_threshold : int
        Minimum yards for explosive rush

    Returns
    -------
    pd.DataFrame
        DataFrame with explosive flag added
    """
    result = df.copy()

    result["is_explosive"] = (
        ((result["play_type"] == "Pass") & (result["yards_gained"] >= pass_threshold)) |
        ((result["play_type"] == "Rush") & (result["yards_gained"] >= rush_threshold))
    )

    return result


plays_analyzed = identify_explosive_plays(plays_with_success)

print("\nEXPLOSIVE PLAY SUMMARY")
print("-" * 40)
scrimmage = plays_analyzed[plays_analyzed["is_scrimmage"]]
print(f"Explosive play rate: {scrimmage['is_explosive'].mean():.1%}")

print("\nBy play type:")
print(scrimmage.groupby("play_type")["is_explosive"].mean())

Phase 4: Team Efficiency Analysis

Calculating Team Metrics

def calculate_team_efficiency(df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate efficiency metrics for each team.

    Parameters
    ----------
    df : pd.DataFrame
        Play data with success and explosive flags

    Returns
    -------
    pd.DataFrame
        Team efficiency metrics
    """
    # Filter to scrimmage plays
    scrimmage = df[df["is_scrimmage"]].copy()

    # Group by offensive team
    team_stats = scrimmage.groupby("offense").agg(
        total_plays=("play_id", "count"),
        total_yards=("yards_gained", "sum"),
        pass_plays=("is_pass", "sum"),
        rush_plays=("is_rush", "sum"),
        successful_plays=("is_success", "sum"),
        explosive_plays=("is_explosive", "sum"),
        turnovers=("is_turnover", "sum"),

        # By play type
        pass_yards=("yards_gained", lambda x: x[scrimmage.loc[x.index, "is_pass"]].sum()),
        rush_yards=("yards_gained", lambda x: x[scrimmage.loc[x.index, "is_rush"]].sum()),
        pass_success=("is_success", lambda x: x[scrimmage.loc[x.index, "is_pass"]].sum()),
        rush_success=("is_success", lambda x: x[scrimmage.loc[x.index, "is_rush"]].sum())
    ).reset_index()

    # Calculate rates
    team_stats["yards_per_play"] = (team_stats["total_yards"] / team_stats["total_plays"]).round(2)
    team_stats["success_rate"] = (team_stats["successful_plays"] / team_stats["total_plays"]).round(3)
    team_stats["explosive_rate"] = (team_stats["explosive_plays"] / team_stats["total_plays"]).round(3)
    team_stats["turnover_rate"] = (team_stats["turnovers"] / team_stats["total_plays"]).round(4)

    team_stats["pass_ypp"] = (team_stats["pass_yards"] / team_stats["pass_plays"]).round(2)
    team_stats["rush_ypp"] = (team_stats["rush_yards"] / team_stats["rush_plays"]).round(2)
    team_stats["pass_success_rate"] = (team_stats["pass_success"] / team_stats["pass_plays"]).round(3)
    team_stats["rush_success_rate"] = (team_stats["rush_success"] / team_stats["rush_plays"]).round(3)

    team_stats["pass_rate"] = (team_stats["pass_plays"] / team_stats["total_plays"]).round(3)

    return team_stats.sort_values("yards_per_play", ascending=False)


team_efficiency = calculate_team_efficiency(plays_analyzed)

print("\nTEAM EFFICIENCY RANKINGS")
print("=" * 80)
print(team_efficiency[[
    "offense", "total_plays", "yards_per_play", "success_rate",
    "explosive_rate", "pass_rate"
]].to_string(index=False))

Situational Analysis

def analyze_situational_efficiency(
    df: pd.DataFrame,
    team: str = None
) -> Dict[str, pd.DataFrame]:
    """
    Analyze efficiency by game situation.

    Parameters
    ----------
    df : pd.DataFrame
        Play data
    team : str, optional
        Specific team to analyze (all if None)

    Returns
    -------
    Dict[str, pd.DataFrame]
        Efficiency by various situations
    """
    plays = df[df["is_scrimmage"]].copy()

    if team:
        plays = plays[plays["offense"] == team]

    results = {}

    # By down
    by_down = plays.groupby("down").agg(
        plays=("play_id", "count"),
        avg_yards=("yards_gained", "mean"),
        success_rate=("is_success", "mean"),
        explosive_rate=("is_explosive", "mean"),
        pass_rate=("is_pass", "mean")
    ).round(3)
    results["by_down"] = by_down

    # By distance bucket
    plays["distance_bucket"] = pd.cut(
        plays["distance"],
        bins=[0, 3, 6, 10, 50],
        labels=["Short (1-3)", "Medium (4-6)", "Long (7-10)", "Very Long (11+)"]
    )

    by_distance = plays.groupby("distance_bucket").agg(
        plays=("play_id", "count"),
        avg_yards=("yards_gained", "mean"),
        success_rate=("is_success", "mean"),
        pass_rate=("is_pass", "mean")
    ).round(3)
    results["by_distance"] = by_distance

    # By field position
    plays["field_zone"] = pd.cut(
        plays["yard_line"],
        bins=[0, 20, 40, 60, 80, 100],
        labels=["Own 0-20", "Own 21-40", "Midfield", "Opp 40-21", "Red Zone"]
    )

    by_field_position = plays.groupby("field_zone").agg(
        plays=("play_id", "count"),
        avg_yards=("yards_gained", "mean"),
        success_rate=("is_success", "mean"),
        pass_rate=("is_pass", "mean")
    ).round(3)
    results["by_field_position"] = by_field_position

    return results


# Analyze overall
situations = analyze_situational_efficiency(plays_analyzed)

print("\nSITUATIONAL ANALYSIS")
print("=" * 60)

print("\nBy Down:")
print(situations["by_down"])

print("\nBy Distance:")
print(situations["by_distance"])

print("\nBy Field Position:")
print(situations["by_field_position"])

Phase 5: Performance Optimization

Comparing Approaches

def benchmark_approaches(df: pd.DataFrame) -> Dict[str, float]:
    """
    Benchmark different calculation approaches.

    Parameters
    ----------
    df : pd.DataFrame
        Play data

    Returns
    -------
    Dict[str, float]
        Timing results
    """
    results = {}

    # Approach 1: Loop-based (slow)
    start = time.time()
    success_loop = []
    for _, row in df.iterrows():
        if row["is_scrimmage"] and pd.notna(row["down"]):
            threshold = {1: 0.4, 2: 0.5, 3: 1.0, 4: 1.0}.get(row["down"], 1.0)
            success = row["yards_gained"] >= row["distance"] * threshold
            success_loop.append(success)
        else:
            success_loop.append(None)
    results["loop"] = time.time() - start

    # Approach 2: Apply (medium)
    start = time.time()
    def calc_success(row):
        if row["is_scrimmage"] and pd.notna(row["down"]):
            threshold = {1: 0.4, 2: 0.5, 3: 1.0, 4: 1.0}.get(row["down"], 1.0)
            return row["yards_gained"] >= row["distance"] * threshold
        return None
    success_apply = df.apply(calc_success, axis=1)
    results["apply"] = time.time() - start

    # Approach 3: Vectorized (fast)
    start = time.time()
    thresholds = {1: 0.4, 2: 0.5, 3: 1.0, 4: 1.0}
    conditions = [
        (df["down"] == d) & (df["yards_gained"] >= df["distance"] * t)
        for d, t in thresholds.items()
    ]
    success_vector = np.select(conditions, [True]*4, default=False)
    success_vector = np.where(~df["is_scrimmage"], None, success_vector)
    results["vectorized"] = time.time() - start

    return results


# Run benchmark on subset (full data takes too long for loop)
print("\nPERFORMANCE BENCHMARK")
print("-" * 40)
print("Using 5,000 play subset:")

subset = plays_analyzed.head(5000).copy()
timings = benchmark_approaches(subset)

for method, seconds in timings.items():
    print(f"  {method:12}: {seconds:.4f}s")

# Calculate speedups
loop_time = timings["loop"]
for method, seconds in timings.items():
    speedup = loop_time / seconds
    print(f"  {method} speedup: {speedup:.1f}x")

Memory Optimization

def optimize_play_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
    """
    Optimize play data memory usage.

    Parameters
    ----------
    df : pd.DataFrame
        Play data

    Returns
    -------
    Tuple[pd.DataFrame, Dict]
        Optimized DataFrame and memory report
    """
    report = {"before": df.memory_usage(deep=True).sum() / 1024**2}

    optimized = df.copy()

    # Downcast integers
    int_cols = optimized.select_dtypes(include=["int64"]).columns
    for col in int_cols:
        optimized[col] = pd.to_numeric(optimized[col], downcast="integer")

    # Downcast floats
    float_cols = optimized.select_dtypes(include=["float64"]).columns
    for col in float_cols:
        optimized[col] = pd.to_numeric(optimized[col], downcast="float")

    # Convert strings to categorical
    cat_cols = ["play_type", "home_team", "away_team", "offense", "defense"]
    for col in cat_cols:
        if col in optimized.columns:
            optimized[col] = optimized[col].astype("category")

    # Convert booleans
    bool_cols = optimized.select_dtypes(include=["bool"]).columns
    for col in bool_cols:
        optimized[col] = optimized[col].astype("boolean")

    report["after"] = optimized.memory_usage(deep=True).sum() / 1024**2
    report["reduction"] = (1 - report["after"] / report["before"]) * 100

    return optimized, report


optimized_plays, mem_report = optimize_play_data(plays_analyzed)

print("\nMEMORY OPTIMIZATION")
print("-" * 40)
print(f"Before: {mem_report['before']:.2f} MB")
print(f"After:  {mem_report['after']:.2f} MB")
print(f"Reduction: {mem_report['reduction']:.1f}%")

Complete Analysis Function

def analyze_season_plays(plays_df: pd.DataFrame) -> Dict:
    """
    Complete play-by-play season analysis.

    Parameters
    ----------
    plays_df : pd.DataFrame
        Raw play-by-play data

    Returns
    -------
    Dict
        Complete analysis results
    """
    # Step 1: Clean
    plays_clean, clean_report = clean_play_data(plays_df)

    # Step 2: Add success
    plays_success = calculate_play_success(plays_clean)

    # Step 3: Add explosive
    plays_full = identify_explosive_plays(plays_success)

    # Step 4: Calculate metrics
    team_efficiency = calculate_team_efficiency(plays_full)
    situations = analyze_situational_efficiency(plays_full)

    # Step 5: Optimize
    plays_opt, mem_report = optimize_play_data(plays_full)

    return {
        "plays": plays_opt,
        "team_efficiency": team_efficiency,
        "situational": situations,
        "cleaning_report": clean_report,
        "memory_report": mem_report
    }


# Run complete analysis
print("\n" + "=" * 60)
print("COMPLETE SEASON ANALYSIS")
print("=" * 60)

results = analyze_season_plays(plays_df)

print(f"\nProcessed {len(results['plays'])} plays")
print(f"Memory usage: {results['memory_report']['after']:.2f} MB")

print("\nTop 5 Teams by Yards Per Play:")
print(results["team_efficiency"][["offense", "yards_per_play", "success_rate"]].head())

Discussion Questions

  1. How would you incorporate Expected Points Added (EPA) into this analysis?

  2. What additional situational factors would improve the success rate analysis?

  3. How would you handle garbage time plays differently?

  4. What validation would you add to ensure accuracy?

  5. How would this analysis change for postseason vs. regular season?


Key Takeaways

  1. Vectorization is essential: Loop-based approaches don't scale for play-by-play data
  2. Data cleaning first: Invalid values propagate through all calculations
  3. Memory matters: Optimizing data types enables analysis of full seasons
  4. Situational context: Down, distance, and field position fundamentally affect expectations
  5. Modular design: Breaking analysis into functions enables reuse and testing