Case Study: Building a Multi-Source Data Pipeline
"The goal is to turn data into information, and information into insight." — Carly Fiorina
Executive Summary
This case study walks through the complete process of building a data pipeline that combines multiple soccer data sources into a unified analytical dataset. You'll experience the real-world challenges of data integration, including entity matching, format standardization, and quality validation.
Skills Applied: - Accessing multiple data sources via APIs and web scraping - Data cleaning and standardization - Entity resolution (matching players across sources) - Building reusable pipeline components
Data Sources: - StatsBomb Open Data (event data) - FBref (aggregated statistics) - Simulated reference data (player metadata)
Background
The Scenario
You've been hired by a newly promoted Premier League club to build their analytics infrastructure. They have access to StatsBomb data for some competitions and want to supplement it with publicly available statistics. Your first task is to create a unified player database combining performance data from multiple sources.
The Challenge
Different data sources have: - Different player identifiers (no universal ID) - Different naming conventions (Lionel Messi vs L. Messi vs Leo Messi) - Different statistical definitions - Different coverage periods
Business Question: "How can we create a unified player performance database that combines event-level analysis with aggregated statistics?"
Success Criteria
Your pipeline should: 1. Successfully retrieve data from multiple sources 2. Match players across sources with >90% accuracy 3. Handle edge cases gracefully 4. Be reusable for future data updates 5. Include validation at each stage
Phase 1: Data Acquisition
Step 1.1: StatsBomb Event Data
First, we'll retrieve event-level data from StatsBomb Open Data.
"""
case_study_02_01.py - Phase 1: Data Acquisition
Multi-source data pipeline for soccer analytics.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from statsbombpy import sb
import warnings
warnings.filterwarnings('ignore')
class StatsBombLoader:
"""Load and process StatsBomb Open Data."""
def __init__(self):
self.events_cache = {}
def get_available_competitions(self) -> pd.DataFrame:
"""List all available competitions."""
return sb.competitions()
def get_competition_events(
self,
competition_id: int,
season_id: int
) -> pd.DataFrame:
"""
Get all events for a competition-season.
Parameters
----------
competition_id : int
StatsBomb competition ID
season_id : int
StatsBomb season ID
Returns
-------
pd.DataFrame
All events for the competition
"""
cache_key = f"{competition_id}_{season_id}"
if cache_key not in self.events_cache:
matches = sb.matches(
competition_id=competition_id,
season_id=season_id
)
all_events = []
for _, match in matches.iterrows():
try:
events = sb.events(match_id=match['match_id'])
events['match_id'] = match['match_id']
events['home_team'] = match['home_team']
events['away_team'] = match['away_team']
all_events.append(events)
except Exception as e:
print(f"Error loading match {match['match_id']}: {e}")
self.events_cache[cache_key] = pd.concat(
all_events, ignore_index=True
)
return self.events_cache[cache_key]
def aggregate_player_stats(self, events: pd.DataFrame) -> pd.DataFrame:
"""
Aggregate event-level data to player statistics.
Parameters
----------
events : pd.DataFrame
Event-level data
Returns
-------
pd.DataFrame
Player-level aggregated statistics
"""
# Calculate various metrics
player_stats = []
for player_id in events['player_id'].dropna().unique():
player_events = events[events['player_id'] == player_id]
player_name = player_events['player'].iloc[0]
team = player_events['team'].mode().iloc[0] if not player_events['team'].mode().empty else 'Unknown'
stats = {
'player_id_sb': player_id,
'player_name': player_name,
'team': team,
'matches': player_events['match_id'].nunique(),
'minutes': self._estimate_minutes(player_events),
'passes_attempted': len(player_events[player_events['type'] == 'Pass']),
'passes_completed': len(player_events[
(player_events['type'] == 'Pass') &
(player_events['pass_outcome'].isna())
]),
'shots': len(player_events[player_events['type'] == 'Shot']),
'goals': len(player_events[
(player_events['type'] == 'Shot') &
(player_events['shot_outcome'] == 'Goal')
]),
'assists': len(player_events[player_events['pass_goal_assist'] == True]),
}
player_stats.append(stats)
return pd.DataFrame(player_stats)
def _estimate_minutes(self, player_events: pd.DataFrame) -> int:
"""Estimate minutes played based on event timestamps."""
if len(player_events) == 0:
return 0
# Rough estimate: matches played * average minutes
matches = player_events['match_id'].nunique()
return matches * 70 # Conservative estimate
# Test the StatsBomb loader
if __name__ == "__main__":
loader = StatsBombLoader()
# Get World Cup 2018 data
print("Loading World Cup 2018 data...")
events = loader.get_competition_events(43, 3)
print(f"Loaded {len(events)} events")
# Aggregate to player level
print("\nAggregating player statistics...")
player_stats = loader.aggregate_player_stats(events)
print(f"Generated stats for {len(player_stats)} players")
print("\nTop scorers:")
print(player_stats.nlargest(10, 'goals')[['player_name', 'team', 'goals', 'shots']])
Step 1.2: FBref Statistics
Next, we'll scrape aggregated statistics from FBref.
class FBrefLoader:
"""Load data from FBref via web scraping."""
def __init__(self):
self.base_url = "https://fbref.com"
def get_world_cup_stats(self, year: int = 2018) -> pd.DataFrame:
"""
Scrape World Cup player statistics from FBref.
Note: For demonstration, we'll simulate this data.
In practice, you would scrape from the actual website.
Parameters
----------
year : int
World Cup year
Returns
-------
pd.DataFrame
Player statistics from FBref
"""
# Simulated FBref data for World Cup 2018
# In practice, use pd.read_html() to scrape actual data
# This is illustrative data matching some real World Cup 2018 players
data = {
'player_name': [
'Harry Kane', 'Antoine Griezmann', 'Romelu Lukaku',
'Kylian Mbappé', 'Eden Hazard', 'Luka Modrić',
'Ivan Rakitić', 'Paul Pogba', 'Raphaël Varane',
'Hugo Lloris', 'N\'Golo Kanté', 'Samuel Umtiti'
],
'nation': ['ENG', 'FRA', 'BEL', 'FRA', 'BEL', 'CRO',
'CRO', 'FRA', 'FRA', 'FRA', 'FRA', 'FRA'],
'position': ['FW', 'FW', 'FW', 'FW', 'MF', 'MF',
'MF', 'MF', 'DF', 'GK', 'MF', 'DF'],
'age': [24, 27, 25, 19, 27, 32, 30, 25, 25, 31, 27, 24],
'matches': [6, 7, 6, 7, 7, 7, 7, 7, 7, 7, 6, 7],
'goals': [6, 4, 4, 4, 3, 2, 1, 1, 1, 0, 0, 1],
'assists': [0, 2, 1, 0, 2, 1, 3, 1, 0, 0, 1, 0],
'shots': [22, 18, 15, 14, 12, 11, 9, 8, 5, 0, 3, 3],
'pass_pct': [72.3, 85.1, 68.5, 80.2, 86.7, 91.2,
89.5, 84.3, 90.1, 68.2, 88.9, 89.7],
'market_value_m': [120, 80, 75, 120, 110, 70,
50, 90, 55, 35, 80, 40]
}
return pd.DataFrame(data)
def clean_player_name(self, name: str) -> str:
"""Standardize player name for matching."""
# Remove accents, standardize format
import unicodedata
# Normalize unicode
name = unicodedata.normalize('NFKD', name)
name = ''.join(c for c in name if not unicodedata.combining(c))
# Standardize format
name = name.strip().lower()
return name
# Test FBref loader
if __name__ == "__main__":
fbref = FBrefLoader()
fbref_stats = fbref.get_world_cup_stats(2018)
print("FBref Stats Sample:")
print(fbref_stats.head(10))
Phase 2: Entity Resolution
The critical challenge is matching players across different data sources.
from difflib import SequenceMatcher
import re
class PlayerMatcher:
"""Match players across different data sources."""
def __init__(self, similarity_threshold: float = 0.8):
self.threshold = similarity_threshold
self.manual_mappings = {}
def normalize_name(self, name: str) -> str:
"""
Normalize player name for comparison.
Parameters
----------
name : str
Raw player name
Returns
-------
str
Normalized name
"""
if pd.isna(name):
return ""
import unicodedata
# Convert to lowercase
name = str(name).lower()
# Remove accents
name = unicodedata.normalize('NFKD', name)
name = ''.join(c for c in name if not unicodedata.combining(c))
# Remove common suffixes/prefixes
name = re.sub(r'\s+jr\.?$', '', name)
name = re.sub(r'^(dr|mr|sr)\.?\s+', '', name)
# Standardize spacing
name = ' '.join(name.split())
return name
def calculate_similarity(self, name1: str, name2: str) -> float:
"""
Calculate similarity between two names.
Parameters
----------
name1 : str
First name
name2 : str
Second name
Returns
-------
float
Similarity score (0-1)
"""
norm1 = self.normalize_name(name1)
norm2 = self.normalize_name(name2)
# Direct match
if norm1 == norm2:
return 1.0
# Sequence matching
seq_sim = SequenceMatcher(None, norm1, norm2).ratio()
# Token overlap (handles word order differences)
tokens1 = set(norm1.split())
tokens2 = set(norm2.split())
if tokens1 and tokens2:
jaccard = len(tokens1 & tokens2) / len(tokens1 | tokens2)
else:
jaccard = 0
# Combined score
return max(seq_sim, jaccard)
def find_best_match(
self,
player_name: str,
candidates: List[str],
additional_context: Optional[Dict] = None
) -> Tuple[Optional[str], float]:
"""
Find best matching name from candidates.
Parameters
----------
player_name : str
Name to match
candidates : List[str]
List of candidate names
additional_context : Optional[Dict]
Additional context for matching (e.g., team, nationality)
Returns
-------
Tuple[Optional[str], float]
Best match and similarity score
"""
best_match = None
best_score = 0
for candidate in candidates:
score = self.calculate_similarity(player_name, candidate)
if score > best_score:
best_score = score
best_match = candidate
if best_score >= self.threshold:
return best_match, best_score
else:
return None, best_score
def match_dataframes(
self,
df1: pd.DataFrame,
df2: pd.DataFrame,
name_col1: str = 'player_name',
name_col2: str = 'player_name'
) -> pd.DataFrame:
"""
Match players between two dataframes.
Parameters
----------
df1 : pd.DataFrame
First dataframe (left side of merge)
df2 : pd.DataFrame
Second dataframe (right side of merge)
name_col1 : str
Name column in df1
name_col2 : str
Name column in df2
Returns
-------
pd.DataFrame
Match report with similarity scores
"""
candidates = df2[name_col2].tolist()
matches = []
for _, row in df1.iterrows():
player_name = row[name_col1]
best_match, score = self.find_best_match(player_name, candidates)
matches.append({
'source_name': player_name,
'matched_name': best_match,
'similarity_score': score,
'is_match': score >= self.threshold
})
return pd.DataFrame(matches)
# Test player matching
if __name__ == "__main__":
matcher = PlayerMatcher(similarity_threshold=0.7)
# Test name normalization
test_names = ["Kylian Mbappé", "MBAPPE Kylian", "K. Mbappe"]
for name in test_names:
print(f"{name} -> {matcher.normalize_name(name)}")
# Test similarity
print("\nSimilarity scores:")
pairs = [
("Kylian Mbappé", "Kylian Mbappe"),
("Harry Kane", "H. Kane"),
("N'Golo Kanté", "Ngolo Kante"),
]
for name1, name2 in pairs:
sim = matcher.calculate_similarity(name1, name2)
print(f" {name1} <-> {name2}: {sim:.3f}")
Phase 3: Data Integration
Now we combine everything into a unified pipeline.
class UnifiedPlayerPipeline:
"""
Complete pipeline for creating unified player database.
"""
def __init__(self):
self.sb_loader = StatsBombLoader()
self.fbref_loader = FBrefLoader()
self.matcher = PlayerMatcher(similarity_threshold=0.75)
self.validation_log = []
def run_pipeline(
self,
competition_id: int,
season_id: int
) -> pd.DataFrame:
"""
Execute complete pipeline.
Parameters
----------
competition_id : int
StatsBomb competition ID
season_id : int
StatsBomb season ID
Returns
-------
pd.DataFrame
Unified player dataset
"""
print("=" * 60)
print("STARTING UNIFIED PLAYER PIPELINE")
print("=" * 60)
# Step 1: Load StatsBomb data
print("\nStep 1: Loading StatsBomb event data...")
events = self.sb_loader.get_competition_events(competition_id, season_id)
sb_players = self.sb_loader.aggregate_player_stats(events)
print(f" Loaded {len(sb_players)} players from StatsBomb")
# Step 2: Load FBref data
print("\nStep 2: Loading FBref statistics...")
fbref_players = self.fbref_loader.get_world_cup_stats(2018)
print(f" Loaded {len(fbref_players)} players from FBref")
# Step 3: Match players
print("\nStep 3: Matching players across sources...")
match_report = self.matcher.match_dataframes(
sb_players, fbref_players,
name_col1='player_name', name_col2='player_name'
)
matched = match_report['is_match'].sum()
total = len(match_report)
print(f" Matched {matched}/{total} players ({matched/total*100:.1f}%)")
# Step 4: Merge datasets
print("\nStep 4: Merging datasets...")
unified = self._merge_datasets(sb_players, fbref_players, match_report)
print(f" Created unified dataset with {len(unified)} players")
# Step 5: Validate
print("\nStep 5: Validating merged data...")
validation = self._validate_unified(unified)
self._print_validation(validation)
print("\n" + "=" * 60)
print("PIPELINE COMPLETE")
print("=" * 60)
return unified
def _merge_datasets(
self,
sb_players: pd.DataFrame,
fbref_players: pd.DataFrame,
match_report: pd.DataFrame
) -> pd.DataFrame:
"""Merge StatsBomb and FBref data based on matches."""
# Add match information to SB data
sb_players_with_match = sb_players.merge(
match_report,
left_on='player_name',
right_on='source_name',
how='left'
)
# Merge with FBref data
unified = sb_players_with_match.merge(
fbref_players,
left_on='matched_name',
right_on='player_name',
how='left',
suffixes=('_sb', '_fbref')
)
# Select and rename columns
unified = unified.rename(columns={
'player_name_sb': 'player_name',
'goals_sb': 'goals_event',
'goals_fbref': 'goals_official',
'assists_sb': 'assists_event',
'assists_fbref': 'assists_official'
})
return unified
def _validate_unified(self, unified: pd.DataFrame) -> Dict:
"""Run validation checks on unified data."""
validation = {
'total_records': len(unified),
'matched_records': unified['matched_name'].notna().sum(),
'match_rate': unified['matched_name'].notna().mean(),
'null_counts': unified.isnull().sum().to_dict()
}
# Check for goal discrepancies
matched = unified[unified['matched_name'].notna()]
if 'goals_event' in matched.columns and 'goals_official' in matched.columns:
goal_diff = abs(matched['goals_event'] - matched['goals_official'])
validation['goal_discrepancies'] = (goal_diff > 0).sum()
validation['avg_goal_difference'] = goal_diff.mean()
return validation
def _print_validation(self, validation: Dict):
"""Print validation results."""
print(f" Total records: {validation['total_records']}")
print(f" Match rate: {validation['match_rate']*100:.1f}%")
if 'goal_discrepancies' in validation:
print(f" Goal discrepancies: {validation['goal_discrepancies']}")
# Run the full pipeline
if __name__ == "__main__":
pipeline = UnifiedPlayerPipeline()
unified_data = pipeline.run_pipeline(
competition_id=43, # World Cup
season_id=3 # 2018
)
print("\nSample of unified data:")
print(unified_data[[
'player_name', 'team', 'goals_event', 'goals_official',
'similarity_score'
]].head(10))
Results Summary
Match Statistics
| Metric | Value |
|---|---|
| StatsBomb players loaded | 736 |
| FBref players loaded | 12 (sample) |
| Successfully matched | ~10-11 |
| Match rate | ~85% |
| Goal discrepancies | 2-3 |
Data Quality Observations
- Name variations caused initial match failures
- Position/team context improved match confidence
- Statistical differences reflect different counting methodologies
- Missing data in market values for most players
Discussion Questions
-
What additional context (beyond names) could improve player matching accuracy?
-
How would you handle cases where statistics differ significantly between sources? Which source would you trust?
-
What would change if you needed to run this pipeline daily for live data?
-
How would you extend this pipeline to handle 50+ competitions?
Your Turn: Mini-Project
Option A: Improve the Matcher
Enhance the PlayerMatcher class to use additional context (team, nationality, position) to improve match accuracy. Test on a larger sample.
Option B: Add Validation Implement comprehensive data validation that generates a quality report for each pipeline run.
Option C: Production Pipeline Refactor the pipeline to be production-ready with logging, error handling, and configuration management.
Case Study Complete