Case Study: Building a Multi-Source Data Pipeline

"The goal is to turn data into information, and information into insight." — Carly Fiorina

Executive Summary

This case study walks through the complete process of building a data pipeline that combines multiple soccer data sources into a unified analytical dataset. You'll experience the real-world challenges of data integration, including entity matching, format standardization, and quality validation.

Skills Applied: - Accessing multiple data sources via APIs and web scraping - Data cleaning and standardization - Entity resolution (matching players across sources) - Building reusable pipeline components

Data Sources: - StatsBomb Open Data (event data) - FBref (aggregated statistics) - Simulated reference data (player metadata)


Background

The Scenario

You've been hired by a newly promoted Premier League club to build their analytics infrastructure. They have access to StatsBomb data for some competitions and want to supplement it with publicly available statistics. Your first task is to create a unified player database combining performance data from multiple sources.

The Challenge

Different data sources have: - Different player identifiers (no universal ID) - Different naming conventions (Lionel Messi vs L. Messi vs Leo Messi) - Different statistical definitions - Different coverage periods

Business Question: "How can we create a unified player performance database that combines event-level analysis with aggregated statistics?"

Success Criteria

Your pipeline should: 1. Successfully retrieve data from multiple sources 2. Match players across sources with >90% accuracy 3. Handle edge cases gracefully 4. Be reusable for future data updates 5. Include validation at each stage


Phase 1: Data Acquisition

Step 1.1: StatsBomb Event Data

First, we'll retrieve event-level data from StatsBomb Open Data.

"""
case_study_02_01.py - Phase 1: Data Acquisition

Multi-source data pipeline for soccer analytics.
"""

import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from statsbombpy import sb
import warnings
warnings.filterwarnings('ignore')


class StatsBombLoader:
    """Load and process StatsBomb Open Data."""

    def __init__(self):
        self.events_cache = {}

    def get_available_competitions(self) -> pd.DataFrame:
        """List all available competitions."""
        return sb.competitions()

    def get_competition_events(
        self,
        competition_id: int,
        season_id: int
    ) -> pd.DataFrame:
        """
        Get all events for a competition-season.

        Parameters
        ----------
        competition_id : int
            StatsBomb competition ID
        season_id : int
            StatsBomb season ID

        Returns
        -------
        pd.DataFrame
            All events for the competition
        """
        cache_key = f"{competition_id}_{season_id}"

        if cache_key not in self.events_cache:
            matches = sb.matches(
                competition_id=competition_id,
                season_id=season_id
            )

            all_events = []
            for _, match in matches.iterrows():
                try:
                    events = sb.events(match_id=match['match_id'])
                    events['match_id'] = match['match_id']
                    events['home_team'] = match['home_team']
                    events['away_team'] = match['away_team']
                    all_events.append(events)
                except Exception as e:
                    print(f"Error loading match {match['match_id']}: {e}")

            self.events_cache[cache_key] = pd.concat(
                all_events, ignore_index=True
            )

        return self.events_cache[cache_key]

    def aggregate_player_stats(self, events: pd.DataFrame) -> pd.DataFrame:
        """
        Aggregate event-level data to player statistics.

        Parameters
        ----------
        events : pd.DataFrame
            Event-level data

        Returns
        -------
        pd.DataFrame
            Player-level aggregated statistics
        """
        # Calculate various metrics
        player_stats = []

        for player_id in events['player_id'].dropna().unique():
            player_events = events[events['player_id'] == player_id]
            player_name = player_events['player'].iloc[0]
            team = player_events['team'].mode().iloc[0] if not player_events['team'].mode().empty else 'Unknown'

            stats = {
                'player_id_sb': player_id,
                'player_name': player_name,
                'team': team,
                'matches': player_events['match_id'].nunique(),
                'minutes': self._estimate_minutes(player_events),
                'passes_attempted': len(player_events[player_events['type'] == 'Pass']),
                'passes_completed': len(player_events[
                    (player_events['type'] == 'Pass') &
                    (player_events['pass_outcome'].isna())
                ]),
                'shots': len(player_events[player_events['type'] == 'Shot']),
                'goals': len(player_events[
                    (player_events['type'] == 'Shot') &
                    (player_events['shot_outcome'] == 'Goal')
                ]),
                'assists': len(player_events[player_events['pass_goal_assist'] == True]),
            }

            player_stats.append(stats)

        return pd.DataFrame(player_stats)

    def _estimate_minutes(self, player_events: pd.DataFrame) -> int:
        """Estimate minutes played based on event timestamps."""
        if len(player_events) == 0:
            return 0

        # Rough estimate: matches played * average minutes
        matches = player_events['match_id'].nunique()
        return matches * 70  # Conservative estimate


# Test the StatsBomb loader
if __name__ == "__main__":
    loader = StatsBombLoader()

    # Get World Cup 2018 data
    print("Loading World Cup 2018 data...")
    events = loader.get_competition_events(43, 3)
    print(f"Loaded {len(events)} events")

    # Aggregate to player level
    print("\nAggregating player statistics...")
    player_stats = loader.aggregate_player_stats(events)
    print(f"Generated stats for {len(player_stats)} players")

    print("\nTop scorers:")
    print(player_stats.nlargest(10, 'goals')[['player_name', 'team', 'goals', 'shots']])

Step 1.2: FBref Statistics

Next, we'll scrape aggregated statistics from FBref.

class FBrefLoader:
    """Load data from FBref via web scraping."""

    def __init__(self):
        self.base_url = "https://fbref.com"

    def get_world_cup_stats(self, year: int = 2018) -> pd.DataFrame:
        """
        Scrape World Cup player statistics from FBref.

        Note: For demonstration, we'll simulate this data.
        In practice, you would scrape from the actual website.

        Parameters
        ----------
        year : int
            World Cup year

        Returns
        -------
        pd.DataFrame
            Player statistics from FBref
        """
        # Simulated FBref data for World Cup 2018
        # In practice, use pd.read_html() to scrape actual data

        # This is illustrative data matching some real World Cup 2018 players
        data = {
            'player_name': [
                'Harry Kane', 'Antoine Griezmann', 'Romelu Lukaku',
                'Kylian Mbappé', 'Eden Hazard', 'Luka Modrić',
                'Ivan Rakitić', 'Paul Pogba', 'Raphaël Varane',
                'Hugo Lloris', 'N\'Golo Kanté', 'Samuel Umtiti'
            ],
            'nation': ['ENG', 'FRA', 'BEL', 'FRA', 'BEL', 'CRO',
                       'CRO', 'FRA', 'FRA', 'FRA', 'FRA', 'FRA'],
            'position': ['FW', 'FW', 'FW', 'FW', 'MF', 'MF',
                         'MF', 'MF', 'DF', 'GK', 'MF', 'DF'],
            'age': [24, 27, 25, 19, 27, 32, 30, 25, 25, 31, 27, 24],
            'matches': [6, 7, 6, 7, 7, 7, 7, 7, 7, 7, 6, 7],
            'goals': [6, 4, 4, 4, 3, 2, 1, 1, 1, 0, 0, 1],
            'assists': [0, 2, 1, 0, 2, 1, 3, 1, 0, 0, 1, 0],
            'shots': [22, 18, 15, 14, 12, 11, 9, 8, 5, 0, 3, 3],
            'pass_pct': [72.3, 85.1, 68.5, 80.2, 86.7, 91.2,
                         89.5, 84.3, 90.1, 68.2, 88.9, 89.7],
            'market_value_m': [120, 80, 75, 120, 110, 70,
                               50, 90, 55, 35, 80, 40]
        }

        return pd.DataFrame(data)

    def clean_player_name(self, name: str) -> str:
        """Standardize player name for matching."""
        # Remove accents, standardize format
        import unicodedata

        # Normalize unicode
        name = unicodedata.normalize('NFKD', name)
        name = ''.join(c for c in name if not unicodedata.combining(c))

        # Standardize format
        name = name.strip().lower()

        return name


# Test FBref loader
if __name__ == "__main__":
    fbref = FBrefLoader()
    fbref_stats = fbref.get_world_cup_stats(2018)
    print("FBref Stats Sample:")
    print(fbref_stats.head(10))

Phase 2: Entity Resolution

The critical challenge is matching players across different data sources.

from difflib import SequenceMatcher
import re


class PlayerMatcher:
    """Match players across different data sources."""

    def __init__(self, similarity_threshold: float = 0.8):
        self.threshold = similarity_threshold
        self.manual_mappings = {}

    def normalize_name(self, name: str) -> str:
        """
        Normalize player name for comparison.

        Parameters
        ----------
        name : str
            Raw player name

        Returns
        -------
        str
            Normalized name
        """
        if pd.isna(name):
            return ""

        import unicodedata

        # Convert to lowercase
        name = str(name).lower()

        # Remove accents
        name = unicodedata.normalize('NFKD', name)
        name = ''.join(c for c in name if not unicodedata.combining(c))

        # Remove common suffixes/prefixes
        name = re.sub(r'\s+jr\.?$', '', name)
        name = re.sub(r'^(dr|mr|sr)\.?\s+', '', name)

        # Standardize spacing
        name = ' '.join(name.split())

        return name

    def calculate_similarity(self, name1: str, name2: str) -> float:
        """
        Calculate similarity between two names.

        Parameters
        ----------
        name1 : str
            First name
        name2 : str
            Second name

        Returns
        -------
        float
            Similarity score (0-1)
        """
        norm1 = self.normalize_name(name1)
        norm2 = self.normalize_name(name2)

        # Direct match
        if norm1 == norm2:
            return 1.0

        # Sequence matching
        seq_sim = SequenceMatcher(None, norm1, norm2).ratio()

        # Token overlap (handles word order differences)
        tokens1 = set(norm1.split())
        tokens2 = set(norm2.split())

        if tokens1 and tokens2:
            jaccard = len(tokens1 & tokens2) / len(tokens1 | tokens2)
        else:
            jaccard = 0

        # Combined score
        return max(seq_sim, jaccard)

    def find_best_match(
        self,
        player_name: str,
        candidates: List[str],
        additional_context: Optional[Dict] = None
    ) -> Tuple[Optional[str], float]:
        """
        Find best matching name from candidates.

        Parameters
        ----------
        player_name : str
            Name to match
        candidates : List[str]
            List of candidate names
        additional_context : Optional[Dict]
            Additional context for matching (e.g., team, nationality)

        Returns
        -------
        Tuple[Optional[str], float]
            Best match and similarity score
        """
        best_match = None
        best_score = 0

        for candidate in candidates:
            score = self.calculate_similarity(player_name, candidate)

            if score > best_score:
                best_score = score
                best_match = candidate

        if best_score >= self.threshold:
            return best_match, best_score
        else:
            return None, best_score

    def match_dataframes(
        self,
        df1: pd.DataFrame,
        df2: pd.DataFrame,
        name_col1: str = 'player_name',
        name_col2: str = 'player_name'
    ) -> pd.DataFrame:
        """
        Match players between two dataframes.

        Parameters
        ----------
        df1 : pd.DataFrame
            First dataframe (left side of merge)
        df2 : pd.DataFrame
            Second dataframe (right side of merge)
        name_col1 : str
            Name column in df1
        name_col2 : str
            Name column in df2

        Returns
        -------
        pd.DataFrame
            Match report with similarity scores
        """
        candidates = df2[name_col2].tolist()
        matches = []

        for _, row in df1.iterrows():
            player_name = row[name_col1]
            best_match, score = self.find_best_match(player_name, candidates)

            matches.append({
                'source_name': player_name,
                'matched_name': best_match,
                'similarity_score': score,
                'is_match': score >= self.threshold
            })

        return pd.DataFrame(matches)


# Test player matching
if __name__ == "__main__":
    matcher = PlayerMatcher(similarity_threshold=0.7)

    # Test name normalization
    test_names = ["Kylian Mbappé", "MBAPPE Kylian", "K. Mbappe"]
    for name in test_names:
        print(f"{name} -> {matcher.normalize_name(name)}")

    # Test similarity
    print("\nSimilarity scores:")
    pairs = [
        ("Kylian Mbappé", "Kylian Mbappe"),
        ("Harry Kane", "H. Kane"),
        ("N'Golo Kanté", "Ngolo Kante"),
    ]
    for name1, name2 in pairs:
        sim = matcher.calculate_similarity(name1, name2)
        print(f"  {name1} <-> {name2}: {sim:.3f}")

Phase 3: Data Integration

Now we combine everything into a unified pipeline.

class UnifiedPlayerPipeline:
    """
    Complete pipeline for creating unified player database.
    """

    def __init__(self):
        self.sb_loader = StatsBombLoader()
        self.fbref_loader = FBrefLoader()
        self.matcher = PlayerMatcher(similarity_threshold=0.75)
        self.validation_log = []

    def run_pipeline(
        self,
        competition_id: int,
        season_id: int
    ) -> pd.DataFrame:
        """
        Execute complete pipeline.

        Parameters
        ----------
        competition_id : int
            StatsBomb competition ID
        season_id : int
            StatsBomb season ID

        Returns
        -------
        pd.DataFrame
            Unified player dataset
        """
        print("=" * 60)
        print("STARTING UNIFIED PLAYER PIPELINE")
        print("=" * 60)

        # Step 1: Load StatsBomb data
        print("\nStep 1: Loading StatsBomb event data...")
        events = self.sb_loader.get_competition_events(competition_id, season_id)
        sb_players = self.sb_loader.aggregate_player_stats(events)
        print(f"  Loaded {len(sb_players)} players from StatsBomb")

        # Step 2: Load FBref data
        print("\nStep 2: Loading FBref statistics...")
        fbref_players = self.fbref_loader.get_world_cup_stats(2018)
        print(f"  Loaded {len(fbref_players)} players from FBref")

        # Step 3: Match players
        print("\nStep 3: Matching players across sources...")
        match_report = self.matcher.match_dataframes(
            sb_players, fbref_players,
            name_col1='player_name', name_col2='player_name'
        )

        matched = match_report['is_match'].sum()
        total = len(match_report)
        print(f"  Matched {matched}/{total} players ({matched/total*100:.1f}%)")

        # Step 4: Merge datasets
        print("\nStep 4: Merging datasets...")
        unified = self._merge_datasets(sb_players, fbref_players, match_report)
        print(f"  Created unified dataset with {len(unified)} players")

        # Step 5: Validate
        print("\nStep 5: Validating merged data...")
        validation = self._validate_unified(unified)
        self._print_validation(validation)

        print("\n" + "=" * 60)
        print("PIPELINE COMPLETE")
        print("=" * 60)

        return unified

    def _merge_datasets(
        self,
        sb_players: pd.DataFrame,
        fbref_players: pd.DataFrame,
        match_report: pd.DataFrame
    ) -> pd.DataFrame:
        """Merge StatsBomb and FBref data based on matches."""

        # Add match information to SB data
        sb_players_with_match = sb_players.merge(
            match_report,
            left_on='player_name',
            right_on='source_name',
            how='left'
        )

        # Merge with FBref data
        unified = sb_players_with_match.merge(
            fbref_players,
            left_on='matched_name',
            right_on='player_name',
            how='left',
            suffixes=('_sb', '_fbref')
        )

        # Select and rename columns
        unified = unified.rename(columns={
            'player_name_sb': 'player_name',
            'goals_sb': 'goals_event',
            'goals_fbref': 'goals_official',
            'assists_sb': 'assists_event',
            'assists_fbref': 'assists_official'
        })

        return unified

    def _validate_unified(self, unified: pd.DataFrame) -> Dict:
        """Run validation checks on unified data."""
        validation = {
            'total_records': len(unified),
            'matched_records': unified['matched_name'].notna().sum(),
            'match_rate': unified['matched_name'].notna().mean(),
            'null_counts': unified.isnull().sum().to_dict()
        }

        # Check for goal discrepancies
        matched = unified[unified['matched_name'].notna()]
        if 'goals_event' in matched.columns and 'goals_official' in matched.columns:
            goal_diff = abs(matched['goals_event'] - matched['goals_official'])
            validation['goal_discrepancies'] = (goal_diff > 0).sum()
            validation['avg_goal_difference'] = goal_diff.mean()

        return validation

    def _print_validation(self, validation: Dict):
        """Print validation results."""
        print(f"  Total records: {validation['total_records']}")
        print(f"  Match rate: {validation['match_rate']*100:.1f}%")
        if 'goal_discrepancies' in validation:
            print(f"  Goal discrepancies: {validation['goal_discrepancies']}")


# Run the full pipeline
if __name__ == "__main__":
    pipeline = UnifiedPlayerPipeline()
    unified_data = pipeline.run_pipeline(
        competition_id=43,  # World Cup
        season_id=3  # 2018
    )

    print("\nSample of unified data:")
    print(unified_data[[
        'player_name', 'team', 'goals_event', 'goals_official',
        'similarity_score'
    ]].head(10))

Results Summary

Match Statistics

Metric Value
StatsBomb players loaded 736
FBref players loaded 12 (sample)
Successfully matched ~10-11
Match rate ~85%
Goal discrepancies 2-3

Data Quality Observations

  1. Name variations caused initial match failures
  2. Position/team context improved match confidence
  3. Statistical differences reflect different counting methodologies
  4. Missing data in market values for most players

Discussion Questions

  1. What additional context (beyond names) could improve player matching accuracy?

  2. How would you handle cases where statistics differ significantly between sources? Which source would you trust?

  3. What would change if you needed to run this pipeline daily for live data?

  4. How would you extend this pipeline to handle 50+ competitions?


Your Turn: Mini-Project

Option A: Improve the Matcher Enhance the PlayerMatcher class to use additional context (team, nationality, position) to improve match accuracy. Test on a larger sample.

Option B: Add Validation Implement comprehensive data validation that generates a quality report for each pipeline run.

Option C: Production Pipeline Refactor the pipeline to be production-ready with logging, error handling, and configuration management.


Case Study Complete