Case Study: Analyzing Play-by-Play Data
"In God we trust. All others must bring data." — W. Edwards Deming
Executive Summary
Play-by-play data is the foundation of modern football analytics. This case study takes you through a complete workflow of loading, cleaning, transforming, and analyzing play-level data using pandas and NumPy. You'll build functions for calculating success rates, identifying explosive plays, and comparing offensive efficiency.
Skills Applied: - Large dataset handling - Complex filtering and aggregation - Vectorized calculations - Performance optimization
Background
The Challenge
You have play-by-play data from an entire season—over 100,000 plays. Your goal is to:
- Clean and validate the raw data
- Calculate play success for each play
- Compute team and game-level efficiency metrics
- Identify factors that correlate with offensive success
Play-by-Play Data Structure
Each row represents a single play with: - Game and timing information (quarter, clock) - Situation (down, distance, yard line) - Play type and result (yards gained) - Team information
Phase 1: Data Loading and Initial Exploration
Creating Realistic Play Data
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
import time
np.random.seed(42)
def create_play_by_play_data(n_games: int = 100) -> pd.DataFrame:
"""
Create realistic play-by-play data for analysis.
Parameters
----------
n_games : int
Number of games to simulate
Returns
-------
pd.DataFrame
Play-by-play DataFrame
"""
teams = [
"Alabama", "Georgia", "Ohio State", "Michigan", "Texas",
"Oklahoma", "LSU", "Florida", "Penn State", "Oregon",
"Clemson", "Tennessee", "USC", "Notre Dame", "Auburn"
]
all_plays = []
play_id = 1
for game_id in range(1, n_games + 1):
# Select teams for this game
home_team = np.random.choice(teams)
away_team = np.random.choice([t for t in teams if t != home_team])
# Generate plays for this game (typically 120-180 plays per game)
n_plays = np.random.randint(120, 180)
for play_num in range(n_plays):
# Determine possession
offense = home_team if np.random.random() > 0.5 else away_team
defense = away_team if offense == home_team else home_team
# Play context
quarter = min(4, (play_num // 40) + 1)
down = np.random.choice([1, 2, 3, 4], p=[0.35, 0.30, 0.25, 0.10])
if down == 1:
distance = 10
else:
distance = np.random.randint(1, 15)
yard_line = np.random.randint(1, 99) # Yards from own goal
# Play type distribution
play_type_probs = {
"Pass": 0.52,
"Rush": 0.38,
"Punt": 0.04,
"Field Goal": 0.02,
"Kickoff": 0.02,
"Penalty": 0.02
}
# Adjust for down and distance
if down == 4:
if distance > 3:
play_type_probs = {"Punt": 0.60, "Field Goal": 0.25,
"Pass": 0.10, "Rush": 0.05}
else:
play_type_probs = {"Rush": 0.40, "Pass": 0.35,
"Field Goal": 0.15, "Punt": 0.10}
elif down == 3 and distance > 5:
play_type_probs = {"Pass": 0.75, "Rush": 0.20, "Punt": 0.03,
"Penalty": 0.02}
play_types = list(play_type_probs.keys())
probs = list(play_type_probs.values())
probs = [p / sum(probs) for p in probs] # Normalize
play_type = np.random.choice(play_types, p=probs)
# Yards gained based on play type
if play_type == "Pass":
# Completion rate ~65%
if np.random.random() < 0.65:
yards = int(np.random.normal(8, 10))
else:
yards = 0 # Incomplete
# Small chance of interception
if np.random.random() < 0.03:
yards = None # Will mark as turnover
elif play_type == "Rush":
yards = int(np.random.normal(4.2, 4))
# Chance of fumble
if np.random.random() < 0.01:
yards = None
elif play_type == "Punt":
yards = 0 # Punt doesn't gain yards for offense
elif play_type == "Field Goal":
# Success rate varies by distance
fg_distance = 100 - yard_line + 17 # Approximate
success_prob = max(0.3, min(0.95, 1.2 - fg_distance * 0.015))
if np.random.random() < success_prob:
yards = None # Scoring play
else:
yards = 0
elif play_type == "Kickoff":
yards = 0
else: # Penalty
yards = np.random.choice([-5, -10, -15, 5, 10, 15])
# Handle edge cases
if yards is not None:
yards = max(-15, min(99 - yard_line, yards))
all_plays.append({
"play_id": play_id,
"game_id": game_id,
"home_team": home_team,
"away_team": away_team,
"offense": offense,
"defense": defense,
"quarter": quarter,
"down": down if play_type in ["Pass", "Rush"] else None,
"distance": distance if play_type in ["Pass", "Rush"] else None,
"yard_line": yard_line,
"play_type": play_type,
"yards_gained": yards
})
play_id += 1
return pd.DataFrame(all_plays)
# Create the dataset
print("Creating play-by-play data...")
start = time.time()
plays_df = create_play_by_play_data(100)
print(f"Created {len(plays_df)} plays in {time.time() - start:.2f}s")
print(f"\nDataFrame shape: {plays_df.shape}")
print(f"Memory usage: {plays_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
Initial Exploration
# Basic info
print("\nColumn types:")
print(plays_df.dtypes)
print("\nMissing values:")
print(plays_df.isnull().sum())
print("\nPlay type distribution:")
print(plays_df["play_type"].value_counts(normalize=True).round(3))
print("\nYards gained statistics:")
print(plays_df["yards_gained"].describe())
Phase 2: Data Cleaning and Validation
Cleaning Pipeline
def clean_play_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
"""
Clean and validate play-by-play data.
Parameters
----------
df : pd.DataFrame
Raw play data
Returns
-------
Tuple[pd.DataFrame, Dict]
Cleaned data and cleaning report
"""
report = {
"original_rows": len(df),
"actions": []
}
clean_df = df.copy()
# 1. Remove rows with missing yards for scrimmage plays
scrimmage_plays = clean_df["play_type"].isin(["Pass", "Rush"])
missing_yards = clean_df["yards_gained"].isna()
turnovers = scrimmage_plays & missing_yards
# Mark turnovers rather than remove
clean_df["is_turnover"] = turnovers
clean_df.loc[turnovers, "yards_gained"] = 0 # Assign 0 yards
report["actions"].append(f"Marked {turnovers.sum()} turnovers")
# 2. Validate down values
invalid_down = clean_df["down"].notna() & ~clean_df["down"].isin([1, 2, 3, 4])
if invalid_down.any():
clean_df.loc[invalid_down, "down"] = None
report["actions"].append(f"Fixed {invalid_down.sum()} invalid down values")
# 3. Validate distance
invalid_distance = clean_df["distance"].notna() & (
(clean_df["distance"] < 1) | (clean_df["distance"] > 50)
)
if invalid_distance.any():
clean_df.loc[invalid_distance, "distance"] = np.clip(
clean_df.loc[invalid_distance, "distance"], 1, 50
)
report["actions"].append(f"Clipped {invalid_distance.sum()} invalid distances")
# 4. Validate yard line
invalid_yard_line = (clean_df["yard_line"] < 1) | (clean_df["yard_line"] > 99)
if invalid_yard_line.any():
clean_df.loc[invalid_yard_line, "yard_line"] = np.clip(
clean_df.loc[invalid_yard_line, "yard_line"], 1, 99
)
report["actions"].append(f"Fixed {invalid_yard_line.sum()} invalid yard lines")
# 5. Create standard columns
clean_df["is_pass"] = clean_df["play_type"] == "Pass"
clean_df["is_rush"] = clean_df["play_type"] == "Rush"
clean_df["is_scrimmage"] = clean_df["play_type"].isin(["Pass", "Rush"])
report["final_rows"] = len(clean_df)
report["rows_removed"] = report["original_rows"] - report["final_rows"]
return clean_df, report
# Clean the data
plays_clean, cleaning_report = clean_play_data(plays_df)
print("\nCLEANING REPORT")
print("-" * 40)
print(f"Original rows: {cleaning_report['original_rows']}")
print(f"Final rows: {cleaning_report['final_rows']}")
print("\nActions taken:")
for action in cleaning_report["actions"]:
print(f" - {action}")
Phase 3: Play Success Calculation
Defining Success
In modern football analytics, play success is typically defined as: - 1st down: Gain at least 40% of the distance to go - 2nd down: Gain at least 50% of the distance to go - 3rd down: Gain at least 100% of the distance to go (first down) - 4th down: Gain at least 100% of the distance to go (first down)
def calculate_play_success(
df: pd.DataFrame,
thresholds: Dict[int, float] = None
) -> pd.DataFrame:
"""
Calculate play success for each play.
Parameters
----------
df : pd.DataFrame
Play data with down, distance, yards_gained
thresholds : Dict[int, float]
Success thresholds by down (default: standard)
Returns
-------
pd.DataFrame
DataFrame with success column added
"""
if thresholds is None:
thresholds = {1: 0.40, 2: 0.50, 3: 1.0, 4: 1.0}
result = df.copy()
# Vectorized success calculation
conditions = [
(result["down"] == 1) & (result["yards_gained"] >= result["distance"] * thresholds[1]),
(result["down"] == 2) & (result["yards_gained"] >= result["distance"] * thresholds[2]),
(result["down"] == 3) & (result["yards_gained"] >= result["distance"] * thresholds[3]),
(result["down"] == 4) & (result["yards_gained"] >= result["distance"] * thresholds[4]),
]
choices = [True, True, True, True]
result["is_success"] = np.select(conditions, choices, default=False)
# Only applies to scrimmage plays with valid down
result.loc[~result["is_scrimmage"], "is_success"] = None
result.loc[result["down"].isna(), "is_success"] = None
return result
# Calculate success
plays_with_success = calculate_play_success(plays_clean)
# Verify
print("\nSUCCESS RATE SUMMARY")
print("-" * 40)
scrimmage = plays_with_success[plays_with_success["is_scrimmage"]]
print(f"Overall success rate: {scrimmage['is_success'].mean():.1%}")
print("\nBy down:")
by_down = scrimmage.groupby("down")["is_success"].agg(["mean", "count"])
print(by_down)
print("\nBy play type:")
by_type = scrimmage.groupby("play_type")["is_success"].agg(["mean", "count"])
print(by_type)
Identifying Explosive Plays
def identify_explosive_plays(
df: pd.DataFrame,
pass_threshold: int = 15,
rush_threshold: int = 10
) -> pd.DataFrame:
"""
Identify explosive plays (big gains).
Parameters
----------
df : pd.DataFrame
Play data
pass_threshold : int
Minimum yards for explosive pass
rush_threshold : int
Minimum yards for explosive rush
Returns
-------
pd.DataFrame
DataFrame with explosive flag added
"""
result = df.copy()
result["is_explosive"] = (
((result["play_type"] == "Pass") & (result["yards_gained"] >= pass_threshold)) |
((result["play_type"] == "Rush") & (result["yards_gained"] >= rush_threshold))
)
return result
plays_analyzed = identify_explosive_plays(plays_with_success)
print("\nEXPLOSIVE PLAY SUMMARY")
print("-" * 40)
scrimmage = plays_analyzed[plays_analyzed["is_scrimmage"]]
print(f"Explosive play rate: {scrimmage['is_explosive'].mean():.1%}")
print("\nBy play type:")
print(scrimmage.groupby("play_type")["is_explosive"].mean())
Phase 4: Team Efficiency Analysis
Calculating Team Metrics
def calculate_team_efficiency(df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate efficiency metrics for each team.
Parameters
----------
df : pd.DataFrame
Play data with success and explosive flags
Returns
-------
pd.DataFrame
Team efficiency metrics
"""
# Filter to scrimmage plays
scrimmage = df[df["is_scrimmage"]].copy()
# Group by offensive team
team_stats = scrimmage.groupby("offense").agg(
total_plays=("play_id", "count"),
total_yards=("yards_gained", "sum"),
pass_plays=("is_pass", "sum"),
rush_plays=("is_rush", "sum"),
successful_plays=("is_success", "sum"),
explosive_plays=("is_explosive", "sum"),
turnovers=("is_turnover", "sum"),
# By play type
pass_yards=("yards_gained", lambda x: x[scrimmage.loc[x.index, "is_pass"]].sum()),
rush_yards=("yards_gained", lambda x: x[scrimmage.loc[x.index, "is_rush"]].sum()),
pass_success=("is_success", lambda x: x[scrimmage.loc[x.index, "is_pass"]].sum()),
rush_success=("is_success", lambda x: x[scrimmage.loc[x.index, "is_rush"]].sum())
).reset_index()
# Calculate rates
team_stats["yards_per_play"] = (team_stats["total_yards"] / team_stats["total_plays"]).round(2)
team_stats["success_rate"] = (team_stats["successful_plays"] / team_stats["total_plays"]).round(3)
team_stats["explosive_rate"] = (team_stats["explosive_plays"] / team_stats["total_plays"]).round(3)
team_stats["turnover_rate"] = (team_stats["turnovers"] / team_stats["total_plays"]).round(4)
team_stats["pass_ypp"] = (team_stats["pass_yards"] / team_stats["pass_plays"]).round(2)
team_stats["rush_ypp"] = (team_stats["rush_yards"] / team_stats["rush_plays"]).round(2)
team_stats["pass_success_rate"] = (team_stats["pass_success"] / team_stats["pass_plays"]).round(3)
team_stats["rush_success_rate"] = (team_stats["rush_success"] / team_stats["rush_plays"]).round(3)
team_stats["pass_rate"] = (team_stats["pass_plays"] / team_stats["total_plays"]).round(3)
return team_stats.sort_values("yards_per_play", ascending=False)
team_efficiency = calculate_team_efficiency(plays_analyzed)
print("\nTEAM EFFICIENCY RANKINGS")
print("=" * 80)
print(team_efficiency[[
"offense", "total_plays", "yards_per_play", "success_rate",
"explosive_rate", "pass_rate"
]].to_string(index=False))
Situational Analysis
def analyze_situational_efficiency(
df: pd.DataFrame,
team: str = None
) -> Dict[str, pd.DataFrame]:
"""
Analyze efficiency by game situation.
Parameters
----------
df : pd.DataFrame
Play data
team : str, optional
Specific team to analyze (all if None)
Returns
-------
Dict[str, pd.DataFrame]
Efficiency by various situations
"""
plays = df[df["is_scrimmage"]].copy()
if team:
plays = plays[plays["offense"] == team]
results = {}
# By down
by_down = plays.groupby("down").agg(
plays=("play_id", "count"),
avg_yards=("yards_gained", "mean"),
success_rate=("is_success", "mean"),
explosive_rate=("is_explosive", "mean"),
pass_rate=("is_pass", "mean")
).round(3)
results["by_down"] = by_down
# By distance bucket
plays["distance_bucket"] = pd.cut(
plays["distance"],
bins=[0, 3, 6, 10, 50],
labels=["Short (1-3)", "Medium (4-6)", "Long (7-10)", "Very Long (11+)"]
)
by_distance = plays.groupby("distance_bucket").agg(
plays=("play_id", "count"),
avg_yards=("yards_gained", "mean"),
success_rate=("is_success", "mean"),
pass_rate=("is_pass", "mean")
).round(3)
results["by_distance"] = by_distance
# By field position
plays["field_zone"] = pd.cut(
plays["yard_line"],
bins=[0, 20, 40, 60, 80, 100],
labels=["Own 0-20", "Own 21-40", "Midfield", "Opp 40-21", "Red Zone"]
)
by_field_position = plays.groupby("field_zone").agg(
plays=("play_id", "count"),
avg_yards=("yards_gained", "mean"),
success_rate=("is_success", "mean"),
pass_rate=("is_pass", "mean")
).round(3)
results["by_field_position"] = by_field_position
return results
# Analyze overall
situations = analyze_situational_efficiency(plays_analyzed)
print("\nSITUATIONAL ANALYSIS")
print("=" * 60)
print("\nBy Down:")
print(situations["by_down"])
print("\nBy Distance:")
print(situations["by_distance"])
print("\nBy Field Position:")
print(situations["by_field_position"])
Phase 5: Performance Optimization
Comparing Approaches
def benchmark_approaches(df: pd.DataFrame) -> Dict[str, float]:
"""
Benchmark different calculation approaches.
Parameters
----------
df : pd.DataFrame
Play data
Returns
-------
Dict[str, float]
Timing results
"""
results = {}
# Approach 1: Loop-based (slow)
start = time.time()
success_loop = []
for _, row in df.iterrows():
if row["is_scrimmage"] and pd.notna(row["down"]):
threshold = {1: 0.4, 2: 0.5, 3: 1.0, 4: 1.0}.get(row["down"], 1.0)
success = row["yards_gained"] >= row["distance"] * threshold
success_loop.append(success)
else:
success_loop.append(None)
results["loop"] = time.time() - start
# Approach 2: Apply (medium)
start = time.time()
def calc_success(row):
if row["is_scrimmage"] and pd.notna(row["down"]):
threshold = {1: 0.4, 2: 0.5, 3: 1.0, 4: 1.0}.get(row["down"], 1.0)
return row["yards_gained"] >= row["distance"] * threshold
return None
success_apply = df.apply(calc_success, axis=1)
results["apply"] = time.time() - start
# Approach 3: Vectorized (fast)
start = time.time()
thresholds = {1: 0.4, 2: 0.5, 3: 1.0, 4: 1.0}
conditions = [
(df["down"] == d) & (df["yards_gained"] >= df["distance"] * t)
for d, t in thresholds.items()
]
success_vector = np.select(conditions, [True]*4, default=False)
success_vector = np.where(~df["is_scrimmage"], None, success_vector)
results["vectorized"] = time.time() - start
return results
# Run benchmark on subset (full data takes too long for loop)
print("\nPERFORMANCE BENCHMARK")
print("-" * 40)
print("Using 5,000 play subset:")
subset = plays_analyzed.head(5000).copy()
timings = benchmark_approaches(subset)
for method, seconds in timings.items():
print(f" {method:12}: {seconds:.4f}s")
# Calculate speedups
loop_time = timings["loop"]
for method, seconds in timings.items():
speedup = loop_time / seconds
print(f" {method} speedup: {speedup:.1f}x")
Memory Optimization
def optimize_play_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
"""
Optimize play data memory usage.
Parameters
----------
df : pd.DataFrame
Play data
Returns
-------
Tuple[pd.DataFrame, Dict]
Optimized DataFrame and memory report
"""
report = {"before": df.memory_usage(deep=True).sum() / 1024**2}
optimized = df.copy()
# Downcast integers
int_cols = optimized.select_dtypes(include=["int64"]).columns
for col in int_cols:
optimized[col] = pd.to_numeric(optimized[col], downcast="integer")
# Downcast floats
float_cols = optimized.select_dtypes(include=["float64"]).columns
for col in float_cols:
optimized[col] = pd.to_numeric(optimized[col], downcast="float")
# Convert strings to categorical
cat_cols = ["play_type", "home_team", "away_team", "offense", "defense"]
for col in cat_cols:
if col in optimized.columns:
optimized[col] = optimized[col].astype("category")
# Convert booleans
bool_cols = optimized.select_dtypes(include=["bool"]).columns
for col in bool_cols:
optimized[col] = optimized[col].astype("boolean")
report["after"] = optimized.memory_usage(deep=True).sum() / 1024**2
report["reduction"] = (1 - report["after"] / report["before"]) * 100
return optimized, report
optimized_plays, mem_report = optimize_play_data(plays_analyzed)
print("\nMEMORY OPTIMIZATION")
print("-" * 40)
print(f"Before: {mem_report['before']:.2f} MB")
print(f"After: {mem_report['after']:.2f} MB")
print(f"Reduction: {mem_report['reduction']:.1f}%")
Complete Analysis Function
def analyze_season_plays(plays_df: pd.DataFrame) -> Dict:
"""
Complete play-by-play season analysis.
Parameters
----------
plays_df : pd.DataFrame
Raw play-by-play data
Returns
-------
Dict
Complete analysis results
"""
# Step 1: Clean
plays_clean, clean_report = clean_play_data(plays_df)
# Step 2: Add success
plays_success = calculate_play_success(plays_clean)
# Step 3: Add explosive
plays_full = identify_explosive_plays(plays_success)
# Step 4: Calculate metrics
team_efficiency = calculate_team_efficiency(plays_full)
situations = analyze_situational_efficiency(plays_full)
# Step 5: Optimize
plays_opt, mem_report = optimize_play_data(plays_full)
return {
"plays": plays_opt,
"team_efficiency": team_efficiency,
"situational": situations,
"cleaning_report": clean_report,
"memory_report": mem_report
}
# Run complete analysis
print("\n" + "=" * 60)
print("COMPLETE SEASON ANALYSIS")
print("=" * 60)
results = analyze_season_plays(plays_df)
print(f"\nProcessed {len(results['plays'])} plays")
print(f"Memory usage: {results['memory_report']['after']:.2f} MB")
print("\nTop 5 Teams by Yards Per Play:")
print(results["team_efficiency"][["offense", "yards_per_play", "success_rate"]].head())
Discussion Questions
-
How would you incorporate Expected Points Added (EPA) into this analysis?
-
What additional situational factors would improve the success rate analysis?
-
How would you handle garbage time plays differently?
-
What validation would you add to ensure accuracy?
-
How would this analysis change for postseason vs. regular season?
Key Takeaways
- Vectorization is essential: Loop-based approaches don't scale for play-by-play data
- Data cleaning first: Invalid values propagate through all calculations
- Memory matters: Optimizing data types enables analysis of full seasons
- Situational context: Down, distance, and field position fundamentally affect expectations
- Modular design: Breaking analysis into functions enables reuse and testing