Case Study 2: Reconciling Conflicting Data Sources
Overview
When collecting data from multiple sources, you'll inevitably encounter conflicts. Two sources might disagree on a player's statistics, a team's record, or a game's final score. This case study teaches you to identify, analyze, and resolve data conflicts systematically.
The Scenario
You're preparing historical data for a research project on college football performance. You've collected team statistics from three sources:
- ESPN: Sports media company's statistics API
- NCAA Official: Official NCAA statistics portal
- Sports Reference: Community-maintained sports database
Each source has different strengths and potential weaknesses. Your goal is to create a single authoritative dataset that resolves conflicts intelligently.
Part 1: Identifying Conflicts
Loading the Data
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
# Source 1: ESPN Data
espn_data = pd.DataFrame({
'team': ['Alabama', 'Georgia', 'Ohio State', 'Michigan', 'Texas'],
'games': [14, 15, 13, 15, 13],
'wins': [12, 15, 11, 15, 12],
'points_scored': [534, 615, 456, 498, 478],
'points_allowed': [238, 185, 196, 168, 225],
'passing_yards': [3245, 3012, 2856, 2678, 3156],
'rushing_yards': [2456, 2678, 2234, 2890, 2012],
'total_yards': [5701, 5690, 5090, 5568, 5168],
'source': 'ESPN'
})
# Source 2: NCAA Official
ncaa_data = pd.DataFrame({
'team': ['Alabama', 'Georgia', 'Ohio State', 'Michigan', 'Texas'],
'games': [14, 15, 13, 15, 13],
'wins': [12, 15, 11, 15, 12],
'points_scored': [535, 615, 456, 498, 478], # Alabama differs by 1
'points_allowed': [238, 185, 196, 168, 225],
'passing_yards': [3248, 3012, 2856, 2680, 3156], # Small differences
'rushing_yards': [2456, 2678, 2234, 2890, 2012],
'total_yards': [5704, 5690, 5090, 5570, 5168], # Derived differences
'source': 'NCAA'
})
# Source 3: Sports Reference (has more discrepancies)
sportsref_data = pd.DataFrame({
'team': ['Alabama', 'Georgia', 'Ohio State', 'Michigan', 'Texas'],
'games': [14, 15, 13, 15, 13],
'wins': [12, 14, 11, 15, 11], # Georgia and Texas wins differ
'points_scored': [534, 610, 456, 498, 475], # Multiple differences
'points_allowed': [238, 190, 196, 168, 228], # Georgia and Texas differ
'passing_yards': [3245, 3008, 2860, 2678, 3150],
'rushing_yards': [2456, 2680, 2230, 2890, 2015],
'total_yards': [5701, 5688, 5090, 5568, 5165],
'source': 'SportsRef'
})
print("Data Sources Loaded:")
print(f" ESPN: {len(espn_data)} teams")
print(f" NCAA: {len(ncaa_data)} teams")
print(f" Sports Reference: {len(sportsref_data)} teams")
Building a Conflict Detection System
class DataConflictDetector:
"""
Detect conflicts between multiple data sources.
"""
def __init__(self, key_column: str = 'team'):
self.key_column = key_column
self.conflicts = []
def add_source(self, name: str, df: pd.DataFrame):
"""Register a data source."""
setattr(self, f'source_{name}', df)
def detect_conflicts(self,
sources: Dict[str, pd.DataFrame],
columns: List[str],
tolerance: Dict[str, float] = None) -> pd.DataFrame:
"""
Find conflicts across sources.
Parameters
----------
sources : dict
{source_name: DataFrame}
columns : list
Columns to check for conflicts
tolerance : dict
{column: max_allowed_difference}
Values within tolerance are not flagged as conflicts
Returns
-------
pd.DataFrame : Conflict report
"""
if tolerance is None:
tolerance = {}
conflicts = []
# Get all unique keys (teams)
all_keys = set()
for df in sources.values():
all_keys.update(df[self.key_column].unique())
# Check each entity
for key in sorted(all_keys):
# Get values from each source
source_values = {}
for source_name, df in sources.items():
key_data = df[df[self.key_column] == key]
if len(key_data) == 1:
source_values[source_name] = key_data.iloc[0]
# Compare each column
for col in columns:
col_values = {}
for source_name, row in source_values.items():
if col in row.index:
col_values[source_name] = row[col]
# Check for conflicts
if len(col_values) > 1:
values = list(col_values.values())
max_diff = max(values) - min(values)
col_tolerance = tolerance.get(col, 0)
if max_diff > col_tolerance:
conflicts.append({
'entity': key,
'column': col,
'values': col_values,
'difference': max_diff,
'tolerance': col_tolerance,
'conflict_type': 'value_mismatch'
})
self.conflicts = conflicts
return pd.DataFrame(conflicts)
# Detect conflicts
detector = DataConflictDetector(key_column='team')
sources = {
'ESPN': espn_data,
'NCAA': ncaa_data,
'SportsRef': sportsref_data
}
columns_to_check = ['games', 'wins', 'points_scored', 'points_allowed',
'passing_yards', 'rushing_yards', 'total_yards']
# Allow small tolerance for derived stats
tolerances = {
'passing_yards': 5,
'rushing_yards': 5,
'total_yards': 10
}
conflicts_df = detector.detect_conflicts(sources, columns_to_check, tolerances)
print("\n" + "=" * 70)
print("DETECTED CONFLICTS")
print("=" * 70)
print(f"\nTotal conflicts found: {len(conflicts_df)}")
if len(conflicts_df) > 0:
print("\nConflict Details:")
for _, row in conflicts_df.iterrows():
print(f"\n {row['entity']} - {row['column']}:")
print(f" Values: {row['values']}")
print(f" Difference: {row['difference']} (tolerance: {row['tolerance']})")
Part 2: Analyzing Conflict Patterns
def analyze_conflict_patterns(conflicts: pd.DataFrame,
sources: Dict[str, pd.DataFrame]) -> Dict:
"""
Analyze patterns in data conflicts.
Identifies:
- Which sources tend to agree/disagree
- Which columns have most conflicts
- Systematic biases (one source always higher/lower)
"""
if len(conflicts) == 0:
return {'message': 'No conflicts to analyze'}
analysis = {}
# Conflicts by column
analysis['by_column'] = conflicts['column'].value_counts().to_dict()
# Conflicts by entity
analysis['by_entity'] = conflicts['entity'].value_counts().to_dict()
# Source agreement matrix
source_names = list(sources.keys())
agreement_matrix = pd.DataFrame(
index=source_names,
columns=source_names,
data=0.0
)
# Calculate agreement rates
columns_checked = list(set(conflicts['column'].unique()))
entities = set(conflicts['entity'].unique())
for s1 in source_names:
for s2 in source_names:
if s1 == s2:
agreement_matrix.loc[s1, s2] = 1.0
continue
agrees = 0
total_compared = 0
for entity in entities:
for col in columns_checked:
s1_data = sources[s1][sources[s1]['team'] == entity]
s2_data = sources[s2][sources[s2]['team'] == entity]
if len(s1_data) == 1 and len(s2_data) == 1:
v1 = s1_data[col].iloc[0]
v2 = s2_data[col].iloc[0]
total_compared += 1
if v1 == v2:
agrees += 1
if total_compared > 0:
agreement_matrix.loc[s1, s2] = agrees / total_compared
analysis['agreement_matrix'] = agreement_matrix
# Check for systematic biases
bias_analysis = {}
for col in columns_checked:
col_conflicts = conflicts[conflicts['column'] == col]
source_values_all = {}
for _, row in col_conflicts.iterrows():
for source, value in row['values'].items():
if source not in source_values_all:
source_values_all[source] = []
source_values_all[source].append(value)
# Calculate average value by source
source_means = {s: np.mean(v) for s, v in source_values_all.items()}
bias_analysis[col] = source_means
analysis['bias_analysis'] = bias_analysis
return analysis
pattern_analysis = analyze_conflict_patterns(conflicts_df, sources)
print("\n" + "=" * 70)
print("CONFLICT PATTERN ANALYSIS")
print("=" * 70)
print("\nConflicts by Column:")
for col, count in pattern_analysis['by_column'].items():
print(f" {col}: {count}")
print("\nConflicts by Team:")
for team, count in pattern_analysis['by_entity'].items():
print(f" {team}: {count}")
print("\nSource Agreement Matrix:")
print(pattern_analysis['agreement_matrix'].round(2))
print("\nBias Analysis (mean values by source for conflicting data):")
for col, source_means in pattern_analysis['bias_analysis'].items():
print(f"\n {col}:")
for source, mean_val in source_means.items():
print(f" {source}: {mean_val:.1f}")
Part 3: Conflict Resolution Strategies
class DataReconciler:
"""
Resolve conflicts between data sources using various strategies.
"""
def __init__(self, source_priority: Dict[str, int]):
"""
source_priority: Lower number = higher priority
Example: {'NCAA': 1, 'ESPN': 2, 'SportsRef': 3}
"""
self.source_priority = source_priority
self.resolution_log = []
def resolve_by_priority(self, values: Dict[str, float]) -> Tuple[float, str]:
"""Resolve conflict by using highest priority source."""
sorted_sources = sorted(
values.keys(),
key=lambda x: self.source_priority.get(x, 999)
)
winner = sorted_sources[0]
return values[winner], f'priority_{winner}'
def resolve_by_majority(self, values: Dict[str, float],
tolerance: float = 0) -> Tuple[float, str]:
"""Resolve by majority vote (within tolerance)."""
value_list = list(values.values())
source_list = list(values.keys())
# Group similar values
groups = []
for i, val in enumerate(value_list):
matched = False
for group in groups:
if abs(val - group['representative']) <= tolerance:
group['members'].append((source_list[i], val))
matched = True
break
if not matched:
groups.append({
'representative': val,
'members': [(source_list[i], val)]
})
# Find largest group
largest = max(groups, key=lambda g: len(g['members']))
if len(largest['members']) > len(values) / 2:
# Clear majority
avg_val = np.mean([m[1] for m in largest['members']])
member_names = [m[0] for m in largest['members']]
return avg_val, f"majority_{'+'.join(member_names)}"
else:
# No majority - fallback to priority
return self.resolve_by_priority(values)
def resolve_by_average(self, values: Dict[str, float]) -> Tuple[float, str]:
"""Resolve by averaging all values."""
avg = np.mean(list(values.values()))
return avg, 'average_all'
def resolve_by_weighted_average(self, values: Dict[str, float]) -> Tuple[float, str]:
"""Resolve by priority-weighted average."""
total_weight = 0
weighted_sum = 0
for source, value in values.items():
# Higher priority = higher weight
priority = self.source_priority.get(source, 3)
weight = 1 / priority
weighted_sum += value * weight
total_weight += weight
result = weighted_sum / total_weight
return result, 'weighted_average'
def resolve_conflicts(self,
conflicts: pd.DataFrame,
sources: Dict[str, pd.DataFrame],
strategy: str = 'priority') -> pd.DataFrame:
"""
Resolve all conflicts and create unified dataset.
Parameters
----------
conflicts : pd.DataFrame
Detected conflicts
sources : dict
Source DataFrames
strategy : str
Resolution strategy: 'priority', 'majority', 'average', 'weighted'
Returns
-------
pd.DataFrame : Reconciled data
"""
# Start with highest priority source
priority_source = min(self.source_priority, key=self.source_priority.get)
reconciled = sources[priority_source].copy()
reconciled = reconciled.drop(columns=['source'], errors='ignore')
# Process each conflict
for _, conflict in conflicts.iterrows():
entity = conflict['entity']
column = conflict['column']
values = conflict['values']
# Apply resolution strategy
if strategy == 'priority':
resolved_value, method = self.resolve_by_priority(values)
elif strategy == 'majority':
resolved_value, method = self.resolve_by_majority(values, tolerance=5)
elif strategy == 'average':
resolved_value, method = self.resolve_by_average(values)
elif strategy == 'weighted':
resolved_value, method = self.resolve_by_weighted_average(values)
else:
raise ValueError(f"Unknown strategy: {strategy}")
# Update reconciled data
mask = reconciled['team'] == entity
reconciled.loc[mask, column] = resolved_value
# Log resolution
self.resolution_log.append({
'entity': entity,
'column': column,
'original_values': values,
'resolved_value': resolved_value,
'method': method
})
return reconciled
def get_resolution_report(self) -> pd.DataFrame:
"""Generate report of all resolutions."""
return pd.DataFrame(self.resolution_log)
# Create reconciler with priority
reconciler = DataReconciler(
source_priority={
'NCAA': 1, # Official source = highest priority
'ESPN': 2, # Major media
'SportsRef': 3 # Community source
}
)
# Resolve using priority strategy
reconciled_priority = reconciler.resolve_conflicts(
conflicts_df, sources, strategy='priority'
)
print("\n" + "=" * 70)
print("RECONCILED DATA (Priority Strategy)")
print("=" * 70)
print(reconciled_priority)
print("\nResolution Log:")
resolution_report = reconciler.get_resolution_report()
print(resolution_report)
Comparing Resolution Strategies
def compare_resolution_strategies(conflicts: pd.DataFrame,
sources: Dict[str, pd.DataFrame],
source_priority: Dict[str, int]) -> Dict:
"""
Compare different resolution strategies on the same conflicts.
"""
strategies = ['priority', 'majority', 'average', 'weighted']
results = {}
for strategy in strategies:
reconciler = DataReconciler(source_priority)
reconciled = reconciler.resolve_conflicts(conflicts, sources, strategy)
results[strategy] = {
'data': reconciled,
'log': reconciler.get_resolution_report()
}
# Compare results
print("\n" + "=" * 70)
print("STRATEGY COMPARISON")
print("=" * 70)
# Find columns that differ between strategies
columns_to_compare = ['wins', 'points_scored', 'points_allowed']
for col in columns_to_compare:
print(f"\n{col}:")
for team in results['priority']['data']['team'].unique():
values = {}
for strategy in strategies:
df = results[strategy]['data']
val = df[df['team'] == team][col].iloc[0]
values[strategy] = val
if len(set(values.values())) > 1:
print(f" {team}: {values}")
return results
strategy_comparison = compare_resolution_strategies(
conflicts_df, sources,
{'NCAA': 1, 'ESPN': 2, 'SportsRef': 3}
)
Part 4: Handling Irreconcilable Conflicts
class IrreconcilableConflictHandler:
"""
Handle conflicts that cannot be automatically resolved.
"""
def __init__(self, threshold: float = 0.1):
"""
threshold: If max difference > threshold * mean, conflict is irreconcilable
"""
self.threshold = threshold
self.irreconcilable = []
self.flagged_for_review = []
def classify_conflicts(self, conflicts: pd.DataFrame) -> Dict:
"""
Classify conflicts into resolvable and irreconcilable.
"""
resolvable = []
irreconcilable = []
for _, conflict in conflicts.iterrows():
values = list(conflict['values'].values())
mean_val = np.mean(values)
max_diff = max(values) - min(values)
if mean_val == 0:
relative_diff = max_diff
else:
relative_diff = max_diff / abs(mean_val)
if relative_diff > self.threshold:
irreconcilable.append({
**conflict.to_dict(),
'relative_difference': relative_diff,
'reason': 'difference_too_large'
})
else:
resolvable.append(conflict.to_dict())
self.irreconcilable = irreconcilable
return {
'resolvable': pd.DataFrame(resolvable),
'irreconcilable': pd.DataFrame(irreconcilable)
}
def create_manual_review_queue(self,
irreconcilable: pd.DataFrame) -> pd.DataFrame:
"""
Create a queue of items for manual review.
"""
if len(irreconcilable) == 0:
return pd.DataFrame()
review_queue = []
for _, row in irreconcilable.iterrows():
review_queue.append({
'entity': row['entity'],
'column': row['column'],
'values': str(row['values']),
'difference': row['difference'],
'relative_diff': row.get('relative_difference', 'N/A'),
'priority': 'HIGH' if row.get('relative_difference', 0) > 0.2 else 'MEDIUM',
'suggested_action': 'verify_with_official_source',
'status': 'pending_review'
})
return pd.DataFrame(review_queue)
# Classify conflicts
handler = IrreconcilableConflictHandler(threshold=0.05) # 5% threshold
classification = handler.classify_conflicts(conflicts_df)
print("\n" + "=" * 70)
print("CONFLICT CLASSIFICATION")
print("=" * 70)
print(f"\nResolvable conflicts: {len(classification['resolvable'])}")
print(f"Irreconcilable conflicts: {len(classification['irreconcilable'])}")
if len(classification['irreconcilable']) > 0:
print("\nIrreconcilable Conflicts (require manual review):")
print(classification['irreconcilable'][['entity', 'column', 'difference', 'relative_difference']])
review_queue = handler.create_manual_review_queue(classification['irreconcilable'])
print("\nManual Review Queue:")
print(review_queue)
Part 5: Data Provenance and Audit Trail
class ProvenanceTracker:
"""
Track the provenance of each data value in the reconciled dataset.
"""
def __init__(self):
self.provenance = {}
def record_provenance(self,
entity: str,
column: str,
final_value: float,
source: str,
method: str,
original_values: Dict = None):
"""Record where a value came from."""
key = (entity, column)
self.provenance[key] = {
'final_value': final_value,
'source': source,
'method': method,
'original_values': original_values,
'timestamp': pd.Timestamp.now().isoformat()
}
def get_provenance(self, entity: str, column: str) -> Dict:
"""Get provenance for a specific value."""
return self.provenance.get((entity, column), None)
def generate_audit_report(self) -> pd.DataFrame:
"""Generate complete audit trail."""
records = []
for (entity, column), info in self.provenance.items():
records.append({
'entity': entity,
'column': column,
**info
})
return pd.DataFrame(records)
def reconcile_with_provenance(conflicts: pd.DataFrame,
sources: Dict[str, pd.DataFrame],
source_priority: Dict[str, int]) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Reconcile data and track provenance.
"""
tracker = ProvenanceTracker()
reconciler = DataReconciler(source_priority)
# Get base data
priority_source = min(source_priority, key=source_priority.get)
reconciled = sources[priority_source].copy()
reconciled = reconciled.drop(columns=['source'], errors='ignore')
# Record provenance for non-conflicting values
for _, row in reconciled.iterrows():
for col in reconciled.columns:
if col == 'team':
continue
tracker.record_provenance(
row['team'], col, row[col],
priority_source, 'no_conflict', None
)
# Resolve conflicts with provenance
for _, conflict in conflicts.iterrows():
entity = conflict['entity']
column = conflict['column']
values = conflict['values']
# Resolve
resolved_value, method = reconciler.resolve_by_priority(values)
# Update data
mask = reconciled['team'] == entity
reconciled.loc[mask, column] = resolved_value
# Track provenance
winning_source = method.replace('priority_', '')
tracker.record_provenance(
entity, column, resolved_value,
winning_source, method, values
)
audit_trail = tracker.generate_audit_report()
return reconciled, audit_trail
# Reconcile with full provenance
final_data, audit_trail = reconcile_with_provenance(
conflicts_df, sources,
{'NCAA': 1, 'ESPN': 2, 'SportsRef': 3}
)
print("\n" + "=" * 70)
print("FINAL RECONCILED DATA WITH PROVENANCE")
print("=" * 70)
print("\nReconciled Data:")
print(final_data)
print("\nAudit Trail (sample - conflict resolutions):")
conflict_resolutions = audit_trail[audit_trail['method'].str.startswith('priority')]
print(conflict_resolutions[['entity', 'column', 'final_value', 'source', 'method']])
Summary and Key Learnings
Conflict Types Encountered
- Small numeric differences: Rounding or timing differences (1-5 units)
- Larger discrepancies: Potential data entry errors or source differences
- Systematic differences: One source consistently higher/lower
Resolution Strategies Used
| Strategy | When to Use | Pros | Cons |
|---|---|---|---|
| Priority | Have authoritative source | Consistent, fast | Ignores other sources |
| Majority | Multiple sources agree | Democratic | May not have majority |
| Average | Sources equally trusted | Uses all data | May create false precision |
| Weighted | Varying source quality | Balanced | Requires defining weights |
Best Practices
- Always detect before resolving: Know what conflicts exist
- Analyze patterns: Systematic biases require investigation
- Track provenance: Know where final values came from
- Flag irreconcilable conflicts: Don't force bad resolutions
- Create audit trails: Support transparency and debugging
When Manual Review is Required
- Relative differences > 10%
- Critical statistics (wins, losses)
- Historical data used in official records
- Data that will be published externally
Exercises
-
Add a new source with intentionally different values and resolve the conflicts.
-
Implement a 'newest source wins' strategy that uses timestamps.
-
Create a confidence score for each reconciled value based on source agreement.
-
Build a web interface for the manual review queue.