Case Study 2: Reconciling Conflicting Data Sources

Overview

When collecting data from multiple sources, you'll inevitably encounter conflicts. Two sources might disagree on a player's statistics, a team's record, or a game's final score. This case study teaches you to identify, analyze, and resolve data conflicts systematically.


The Scenario

You're preparing historical data for a research project on college football performance. You've collected team statistics from three sources:

  1. ESPN: Sports media company's statistics API
  2. NCAA Official: Official NCAA statistics portal
  3. Sports Reference: Community-maintained sports database

Each source has different strengths and potential weaknesses. Your goal is to create a single authoritative dataset that resolves conflicts intelligently.


Part 1: Identifying Conflicts

Loading the Data

import pandas as pd
import numpy as np
from typing import Dict, List, Tuple

# Source 1: ESPN Data
espn_data = pd.DataFrame({
    'team': ['Alabama', 'Georgia', 'Ohio State', 'Michigan', 'Texas'],
    'games': [14, 15, 13, 15, 13],
    'wins': [12, 15, 11, 15, 12],
    'points_scored': [534, 615, 456, 498, 478],
    'points_allowed': [238, 185, 196, 168, 225],
    'passing_yards': [3245, 3012, 2856, 2678, 3156],
    'rushing_yards': [2456, 2678, 2234, 2890, 2012],
    'total_yards': [5701, 5690, 5090, 5568, 5168],
    'source': 'ESPN'
})

# Source 2: NCAA Official
ncaa_data = pd.DataFrame({
    'team': ['Alabama', 'Georgia', 'Ohio State', 'Michigan', 'Texas'],
    'games': [14, 15, 13, 15, 13],
    'wins': [12, 15, 11, 15, 12],
    'points_scored': [535, 615, 456, 498, 478],  # Alabama differs by 1
    'points_allowed': [238, 185, 196, 168, 225],
    'passing_yards': [3248, 3012, 2856, 2680, 3156],  # Small differences
    'rushing_yards': [2456, 2678, 2234, 2890, 2012],
    'total_yards': [5704, 5690, 5090, 5570, 5168],  # Derived differences
    'source': 'NCAA'
})

# Source 3: Sports Reference (has more discrepancies)
sportsref_data = pd.DataFrame({
    'team': ['Alabama', 'Georgia', 'Ohio State', 'Michigan', 'Texas'],
    'games': [14, 15, 13, 15, 13],
    'wins': [12, 14, 11, 15, 11],  # Georgia and Texas wins differ
    'points_scored': [534, 610, 456, 498, 475],  # Multiple differences
    'points_allowed': [238, 190, 196, 168, 228],  # Georgia and Texas differ
    'passing_yards': [3245, 3008, 2860, 2678, 3150],
    'rushing_yards': [2456, 2680, 2230, 2890, 2015],
    'total_yards': [5701, 5688, 5090, 5568, 5165],
    'source': 'SportsRef'
})

print("Data Sources Loaded:")
print(f"  ESPN: {len(espn_data)} teams")
print(f"  NCAA: {len(ncaa_data)} teams")
print(f"  Sports Reference: {len(sportsref_data)} teams")

Building a Conflict Detection System

class DataConflictDetector:
    """
    Detect conflicts between multiple data sources.
    """

    def __init__(self, key_column: str = 'team'):
        self.key_column = key_column
        self.conflicts = []

    def add_source(self, name: str, df: pd.DataFrame):
        """Register a data source."""
        setattr(self, f'source_{name}', df)

    def detect_conflicts(self,
                          sources: Dict[str, pd.DataFrame],
                          columns: List[str],
                          tolerance: Dict[str, float] = None) -> pd.DataFrame:
        """
        Find conflicts across sources.

        Parameters
        ----------
        sources : dict
            {source_name: DataFrame}
        columns : list
            Columns to check for conflicts
        tolerance : dict
            {column: max_allowed_difference}
            Values within tolerance are not flagged as conflicts

        Returns
        -------
        pd.DataFrame : Conflict report
        """
        if tolerance is None:
            tolerance = {}

        conflicts = []

        # Get all unique keys (teams)
        all_keys = set()
        for df in sources.values():
            all_keys.update(df[self.key_column].unique())

        # Check each entity
        for key in sorted(all_keys):
            # Get values from each source
            source_values = {}
            for source_name, df in sources.items():
                key_data = df[df[self.key_column] == key]
                if len(key_data) == 1:
                    source_values[source_name] = key_data.iloc[0]

            # Compare each column
            for col in columns:
                col_values = {}
                for source_name, row in source_values.items():
                    if col in row.index:
                        col_values[source_name] = row[col]

                # Check for conflicts
                if len(col_values) > 1:
                    values = list(col_values.values())
                    max_diff = max(values) - min(values)
                    col_tolerance = tolerance.get(col, 0)

                    if max_diff > col_tolerance:
                        conflicts.append({
                            'entity': key,
                            'column': col,
                            'values': col_values,
                            'difference': max_diff,
                            'tolerance': col_tolerance,
                            'conflict_type': 'value_mismatch'
                        })

        self.conflicts = conflicts
        return pd.DataFrame(conflicts)


# Detect conflicts
detector = DataConflictDetector(key_column='team')

sources = {
    'ESPN': espn_data,
    'NCAA': ncaa_data,
    'SportsRef': sportsref_data
}

columns_to_check = ['games', 'wins', 'points_scored', 'points_allowed',
                    'passing_yards', 'rushing_yards', 'total_yards']

# Allow small tolerance for derived stats
tolerances = {
    'passing_yards': 5,
    'rushing_yards': 5,
    'total_yards': 10
}

conflicts_df = detector.detect_conflicts(sources, columns_to_check, tolerances)

print("\n" + "=" * 70)
print("DETECTED CONFLICTS")
print("=" * 70)
print(f"\nTotal conflicts found: {len(conflicts_df)}")

if len(conflicts_df) > 0:
    print("\nConflict Details:")
    for _, row in conflicts_df.iterrows():
        print(f"\n  {row['entity']} - {row['column']}:")
        print(f"    Values: {row['values']}")
        print(f"    Difference: {row['difference']} (tolerance: {row['tolerance']})")

Part 2: Analyzing Conflict Patterns

def analyze_conflict_patterns(conflicts: pd.DataFrame,
                                sources: Dict[str, pd.DataFrame]) -> Dict:
    """
    Analyze patterns in data conflicts.

    Identifies:
    - Which sources tend to agree/disagree
    - Which columns have most conflicts
    - Systematic biases (one source always higher/lower)
    """
    if len(conflicts) == 0:
        return {'message': 'No conflicts to analyze'}

    analysis = {}

    # Conflicts by column
    analysis['by_column'] = conflicts['column'].value_counts().to_dict()

    # Conflicts by entity
    analysis['by_entity'] = conflicts['entity'].value_counts().to_dict()

    # Source agreement matrix
    source_names = list(sources.keys())
    agreement_matrix = pd.DataFrame(
        index=source_names,
        columns=source_names,
        data=0.0
    )

    # Calculate agreement rates
    columns_checked = list(set(conflicts['column'].unique()))
    entities = set(conflicts['entity'].unique())

    for s1 in source_names:
        for s2 in source_names:
            if s1 == s2:
                agreement_matrix.loc[s1, s2] = 1.0
                continue

            agrees = 0
            total_compared = 0

            for entity in entities:
                for col in columns_checked:
                    s1_data = sources[s1][sources[s1]['team'] == entity]
                    s2_data = sources[s2][sources[s2]['team'] == entity]

                    if len(s1_data) == 1 and len(s2_data) == 1:
                        v1 = s1_data[col].iloc[0]
                        v2 = s2_data[col].iloc[0]
                        total_compared += 1
                        if v1 == v2:
                            agrees += 1

            if total_compared > 0:
                agreement_matrix.loc[s1, s2] = agrees / total_compared

    analysis['agreement_matrix'] = agreement_matrix

    # Check for systematic biases
    bias_analysis = {}
    for col in columns_checked:
        col_conflicts = conflicts[conflicts['column'] == col]
        source_values_all = {}

        for _, row in col_conflicts.iterrows():
            for source, value in row['values'].items():
                if source not in source_values_all:
                    source_values_all[source] = []
                source_values_all[source].append(value)

        # Calculate average value by source
        source_means = {s: np.mean(v) for s, v in source_values_all.items()}
        bias_analysis[col] = source_means

    analysis['bias_analysis'] = bias_analysis

    return analysis


pattern_analysis = analyze_conflict_patterns(conflicts_df, sources)

print("\n" + "=" * 70)
print("CONFLICT PATTERN ANALYSIS")
print("=" * 70)

print("\nConflicts by Column:")
for col, count in pattern_analysis['by_column'].items():
    print(f"  {col}: {count}")

print("\nConflicts by Team:")
for team, count in pattern_analysis['by_entity'].items():
    print(f"  {team}: {count}")

print("\nSource Agreement Matrix:")
print(pattern_analysis['agreement_matrix'].round(2))

print("\nBias Analysis (mean values by source for conflicting data):")
for col, source_means in pattern_analysis['bias_analysis'].items():
    print(f"\n  {col}:")
    for source, mean_val in source_means.items():
        print(f"    {source}: {mean_val:.1f}")

Part 3: Conflict Resolution Strategies

class DataReconciler:
    """
    Resolve conflicts between data sources using various strategies.
    """

    def __init__(self, source_priority: Dict[str, int]):
        """
        source_priority: Lower number = higher priority
        Example: {'NCAA': 1, 'ESPN': 2, 'SportsRef': 3}
        """
        self.source_priority = source_priority
        self.resolution_log = []

    def resolve_by_priority(self, values: Dict[str, float]) -> Tuple[float, str]:
        """Resolve conflict by using highest priority source."""
        sorted_sources = sorted(
            values.keys(),
            key=lambda x: self.source_priority.get(x, 999)
        )
        winner = sorted_sources[0]
        return values[winner], f'priority_{winner}'

    def resolve_by_majority(self, values: Dict[str, float],
                             tolerance: float = 0) -> Tuple[float, str]:
        """Resolve by majority vote (within tolerance)."""
        value_list = list(values.values())
        source_list = list(values.keys())

        # Group similar values
        groups = []
        for i, val in enumerate(value_list):
            matched = False
            for group in groups:
                if abs(val - group['representative']) <= tolerance:
                    group['members'].append((source_list[i], val))
                    matched = True
                    break
            if not matched:
                groups.append({
                    'representative': val,
                    'members': [(source_list[i], val)]
                })

        # Find largest group
        largest = max(groups, key=lambda g: len(g['members']))

        if len(largest['members']) > len(values) / 2:
            # Clear majority
            avg_val = np.mean([m[1] for m in largest['members']])
            member_names = [m[0] for m in largest['members']]
            return avg_val, f"majority_{'+'.join(member_names)}"
        else:
            # No majority - fallback to priority
            return self.resolve_by_priority(values)

    def resolve_by_average(self, values: Dict[str, float]) -> Tuple[float, str]:
        """Resolve by averaging all values."""
        avg = np.mean(list(values.values()))
        return avg, 'average_all'

    def resolve_by_weighted_average(self, values: Dict[str, float]) -> Tuple[float, str]:
        """Resolve by priority-weighted average."""
        total_weight = 0
        weighted_sum = 0

        for source, value in values.items():
            # Higher priority = higher weight
            priority = self.source_priority.get(source, 3)
            weight = 1 / priority
            weighted_sum += value * weight
            total_weight += weight

        result = weighted_sum / total_weight
        return result, 'weighted_average'

    def resolve_conflicts(self,
                           conflicts: pd.DataFrame,
                           sources: Dict[str, pd.DataFrame],
                           strategy: str = 'priority') -> pd.DataFrame:
        """
        Resolve all conflicts and create unified dataset.

        Parameters
        ----------
        conflicts : pd.DataFrame
            Detected conflicts
        sources : dict
            Source DataFrames
        strategy : str
            Resolution strategy: 'priority', 'majority', 'average', 'weighted'

        Returns
        -------
        pd.DataFrame : Reconciled data
        """
        # Start with highest priority source
        priority_source = min(self.source_priority, key=self.source_priority.get)
        reconciled = sources[priority_source].copy()
        reconciled = reconciled.drop(columns=['source'], errors='ignore')

        # Process each conflict
        for _, conflict in conflicts.iterrows():
            entity = conflict['entity']
            column = conflict['column']
            values = conflict['values']

            # Apply resolution strategy
            if strategy == 'priority':
                resolved_value, method = self.resolve_by_priority(values)
            elif strategy == 'majority':
                resolved_value, method = self.resolve_by_majority(values, tolerance=5)
            elif strategy == 'average':
                resolved_value, method = self.resolve_by_average(values)
            elif strategy == 'weighted':
                resolved_value, method = self.resolve_by_weighted_average(values)
            else:
                raise ValueError(f"Unknown strategy: {strategy}")

            # Update reconciled data
            mask = reconciled['team'] == entity
            reconciled.loc[mask, column] = resolved_value

            # Log resolution
            self.resolution_log.append({
                'entity': entity,
                'column': column,
                'original_values': values,
                'resolved_value': resolved_value,
                'method': method
            })

        return reconciled

    def get_resolution_report(self) -> pd.DataFrame:
        """Generate report of all resolutions."""
        return pd.DataFrame(self.resolution_log)


# Create reconciler with priority
reconciler = DataReconciler(
    source_priority={
        'NCAA': 1,      # Official source = highest priority
        'ESPN': 2,      # Major media
        'SportsRef': 3  # Community source
    }
)

# Resolve using priority strategy
reconciled_priority = reconciler.resolve_conflicts(
    conflicts_df, sources, strategy='priority'
)

print("\n" + "=" * 70)
print("RECONCILED DATA (Priority Strategy)")
print("=" * 70)
print(reconciled_priority)

print("\nResolution Log:")
resolution_report = reconciler.get_resolution_report()
print(resolution_report)

Comparing Resolution Strategies

def compare_resolution_strategies(conflicts: pd.DataFrame,
                                    sources: Dict[str, pd.DataFrame],
                                    source_priority: Dict[str, int]) -> Dict:
    """
    Compare different resolution strategies on the same conflicts.
    """
    strategies = ['priority', 'majority', 'average', 'weighted']
    results = {}

    for strategy in strategies:
        reconciler = DataReconciler(source_priority)
        reconciled = reconciler.resolve_conflicts(conflicts, sources, strategy)
        results[strategy] = {
            'data': reconciled,
            'log': reconciler.get_resolution_report()
        }

    # Compare results
    print("\n" + "=" * 70)
    print("STRATEGY COMPARISON")
    print("=" * 70)

    # Find columns that differ between strategies
    columns_to_compare = ['wins', 'points_scored', 'points_allowed']

    for col in columns_to_compare:
        print(f"\n{col}:")
        for team in results['priority']['data']['team'].unique():
            values = {}
            for strategy in strategies:
                df = results[strategy]['data']
                val = df[df['team'] == team][col].iloc[0]
                values[strategy] = val

            if len(set(values.values())) > 1:
                print(f"  {team}: {values}")

    return results


strategy_comparison = compare_resolution_strategies(
    conflicts_df, sources,
    {'NCAA': 1, 'ESPN': 2, 'SportsRef': 3}
)

Part 4: Handling Irreconcilable Conflicts

class IrreconcilableConflictHandler:
    """
    Handle conflicts that cannot be automatically resolved.
    """

    def __init__(self, threshold: float = 0.1):
        """
        threshold: If max difference > threshold * mean, conflict is irreconcilable
        """
        self.threshold = threshold
        self.irreconcilable = []
        self.flagged_for_review = []

    def classify_conflicts(self, conflicts: pd.DataFrame) -> Dict:
        """
        Classify conflicts into resolvable and irreconcilable.
        """
        resolvable = []
        irreconcilable = []

        for _, conflict in conflicts.iterrows():
            values = list(conflict['values'].values())
            mean_val = np.mean(values)
            max_diff = max(values) - min(values)

            if mean_val == 0:
                relative_diff = max_diff
            else:
                relative_diff = max_diff / abs(mean_val)

            if relative_diff > self.threshold:
                irreconcilable.append({
                    **conflict.to_dict(),
                    'relative_difference': relative_diff,
                    'reason': 'difference_too_large'
                })
            else:
                resolvable.append(conflict.to_dict())

        self.irreconcilable = irreconcilable

        return {
            'resolvable': pd.DataFrame(resolvable),
            'irreconcilable': pd.DataFrame(irreconcilable)
        }

    def create_manual_review_queue(self,
                                     irreconcilable: pd.DataFrame) -> pd.DataFrame:
        """
        Create a queue of items for manual review.
        """
        if len(irreconcilable) == 0:
            return pd.DataFrame()

        review_queue = []

        for _, row in irreconcilable.iterrows():
            review_queue.append({
                'entity': row['entity'],
                'column': row['column'],
                'values': str(row['values']),
                'difference': row['difference'],
                'relative_diff': row.get('relative_difference', 'N/A'),
                'priority': 'HIGH' if row.get('relative_difference', 0) > 0.2 else 'MEDIUM',
                'suggested_action': 'verify_with_official_source',
                'status': 'pending_review'
            })

        return pd.DataFrame(review_queue)


# Classify conflicts
handler = IrreconcilableConflictHandler(threshold=0.05)  # 5% threshold
classification = handler.classify_conflicts(conflicts_df)

print("\n" + "=" * 70)
print("CONFLICT CLASSIFICATION")
print("=" * 70)

print(f"\nResolvable conflicts: {len(classification['resolvable'])}")
print(f"Irreconcilable conflicts: {len(classification['irreconcilable'])}")

if len(classification['irreconcilable']) > 0:
    print("\nIrreconcilable Conflicts (require manual review):")
    print(classification['irreconcilable'][['entity', 'column', 'difference', 'relative_difference']])

    review_queue = handler.create_manual_review_queue(classification['irreconcilable'])
    print("\nManual Review Queue:")
    print(review_queue)

Part 5: Data Provenance and Audit Trail

class ProvenanceTracker:
    """
    Track the provenance of each data value in the reconciled dataset.
    """

    def __init__(self):
        self.provenance = {}

    def record_provenance(self,
                           entity: str,
                           column: str,
                           final_value: float,
                           source: str,
                           method: str,
                           original_values: Dict = None):
        """Record where a value came from."""
        key = (entity, column)
        self.provenance[key] = {
            'final_value': final_value,
            'source': source,
            'method': method,
            'original_values': original_values,
            'timestamp': pd.Timestamp.now().isoformat()
        }

    def get_provenance(self, entity: str, column: str) -> Dict:
        """Get provenance for a specific value."""
        return self.provenance.get((entity, column), None)

    def generate_audit_report(self) -> pd.DataFrame:
        """Generate complete audit trail."""
        records = []
        for (entity, column), info in self.provenance.items():
            records.append({
                'entity': entity,
                'column': column,
                **info
            })
        return pd.DataFrame(records)


def reconcile_with_provenance(conflicts: pd.DataFrame,
                                sources: Dict[str, pd.DataFrame],
                                source_priority: Dict[str, int]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Reconcile data and track provenance.
    """
    tracker = ProvenanceTracker()
    reconciler = DataReconciler(source_priority)

    # Get base data
    priority_source = min(source_priority, key=source_priority.get)
    reconciled = sources[priority_source].copy()
    reconciled = reconciled.drop(columns=['source'], errors='ignore')

    # Record provenance for non-conflicting values
    for _, row in reconciled.iterrows():
        for col in reconciled.columns:
            if col == 'team':
                continue
            tracker.record_provenance(
                row['team'], col, row[col],
                priority_source, 'no_conflict', None
            )

    # Resolve conflicts with provenance
    for _, conflict in conflicts.iterrows():
        entity = conflict['entity']
        column = conflict['column']
        values = conflict['values']

        # Resolve
        resolved_value, method = reconciler.resolve_by_priority(values)

        # Update data
        mask = reconciled['team'] == entity
        reconciled.loc[mask, column] = resolved_value

        # Track provenance
        winning_source = method.replace('priority_', '')
        tracker.record_provenance(
            entity, column, resolved_value,
            winning_source, method, values
        )

    audit_trail = tracker.generate_audit_report()

    return reconciled, audit_trail


# Reconcile with full provenance
final_data, audit_trail = reconcile_with_provenance(
    conflicts_df, sources,
    {'NCAA': 1, 'ESPN': 2, 'SportsRef': 3}
)

print("\n" + "=" * 70)
print("FINAL RECONCILED DATA WITH PROVENANCE")
print("=" * 70)

print("\nReconciled Data:")
print(final_data)

print("\nAudit Trail (sample - conflict resolutions):")
conflict_resolutions = audit_trail[audit_trail['method'].str.startswith('priority')]
print(conflict_resolutions[['entity', 'column', 'final_value', 'source', 'method']])

Summary and Key Learnings

Conflict Types Encountered

  1. Small numeric differences: Rounding or timing differences (1-5 units)
  2. Larger discrepancies: Potential data entry errors or source differences
  3. Systematic differences: One source consistently higher/lower

Resolution Strategies Used

Strategy When to Use Pros Cons
Priority Have authoritative source Consistent, fast Ignores other sources
Majority Multiple sources agree Democratic May not have majority
Average Sources equally trusted Uses all data May create false precision
Weighted Varying source quality Balanced Requires defining weights

Best Practices

  1. Always detect before resolving: Know what conflicts exist
  2. Analyze patterns: Systematic biases require investigation
  3. Track provenance: Know where final values came from
  4. Flag irreconcilable conflicts: Don't force bad resolutions
  5. Create audit trails: Support transparency and debugging

When Manual Review is Required

  • Relative differences > 10%
  • Critical statistics (wins, losses)
  • Historical data used in official records
  • Data that will be published externally

Exercises

  1. Add a new source with intentionally different values and resolve the conflicts.

  2. Implement a 'newest source wins' strategy that uses timestamps.

  3. Create a confidence score for each reconciled value based on source agreement.

  4. Build a web interface for the manual review queue.