Modern football analytics extends beyond post-game analysis to real-time decision support during competition. Real-time systems process live data streams, update models continuously, and deliver actionable insights to coaching staffs within seconds...
In This Chapter
Chapter 26: Real-Time Analytics Systems
Introduction
Modern football analytics extends beyond post-game analysis to real-time decision support during competition. Real-time systems process live data streams, update models continuously, and deliver actionable insights to coaching staffs within seconds. This chapter explores the architecture, algorithms, and implementation challenges of building real-time analytics systems for college football.
Learning Objectives
By the end of this chapter, you will be able to:
- Design architectures for real-time sports analytics systems
- Implement streaming data processing pipelines
- Build live win probability and decision support models
- Create in-game dashboards with sub-second latency
- Handle data quality issues in real-time environments
- Deploy and monitor production analytics systems
26.1 Real-Time System Architecture
26.1.1 System Components Overview
REAL-TIME ANALYTICS ARCHITECTURE
================================
Data Sources:
┌─────────────────────────────────────────────────────────┐
│ Play-by-Play Feed │ Tracking Data │ External APIs │
│ (JSON/XML) │ (10-25 Hz) │ (Weather, etc) │
└──────────┬──────────┴────────┬────────┴────────┬────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────┐
│ INGESTION LAYER │
│ Message Queue (Kafka/RabbitMQ) + Stream Processing │
└──────────────────────────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ PROCESSING LAYER │
│ Data Validation → Feature Engineering → Model Scoring │
└──────────────────────────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ STORAGE LAYER │
│ In-Memory Cache (Redis) │ Time-Series DB (InfluxDB) │
└──────────────────────────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ DELIVERY LAYER │
│ REST API │ WebSocket │ Dashboard │ Mobile Push │
└─────────────────────────────────────────────────────────┘
26.1.2 Core System Implementation
import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
from datetime import datetime
from collections import deque
import threading
from queue import Queue, Empty
import numpy as np
@dataclass
class GameState:
"""Current state of a football game."""
game_id: str
home_team: str
away_team: str
home_score: int = 0
away_score: int = 0
quarter: int = 1
time_remaining: float = 900.0 # seconds in quarter
possession: str = ""
field_position: int = 25 # yards from own goal
down: int = 1
distance: int = 10
is_final: bool = False
last_updated: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict:
return {
'game_id': self.game_id,
'home_team': self.home_team,
'away_team': self.away_team,
'home_score': self.home_score,
'away_score': self.away_score,
'quarter': self.quarter,
'time_remaining': self.time_remaining,
'possession': self.possession,
'field_position': self.field_position,
'down': self.down,
'distance': self.distance,
'is_final': self.is_final
}
@dataclass
class PlayEvent:
"""Single play event from data feed."""
game_id: str
play_id: str
timestamp: datetime
play_type: str
description: str
yards_gained: int = 0
result: str = ""
home_score: int = 0
away_score: int = 0
raw_data: Dict = field(default_factory=dict)
class RealTimeEngine:
"""Core real-time analytics engine."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.game_states: Dict[str, GameState] = {}
self.event_queue: Queue = Queue()
self.subscribers: Dict[str, List[Callable]] = {}
self.processing_thread: Optional[threading.Thread] = None
self.running = False
# Performance tracking
self.latency_history: deque = deque(maxlen=1000)
self.events_processed = 0
def start(self):
"""Start the real-time processing engine."""
self.running = True
self.processing_thread = threading.Thread(
target=self._process_events,
daemon=True
)
self.processing_thread.start()
print("Real-time engine started")
def stop(self):
"""Stop the engine."""
self.running = False
if self.processing_thread:
self.processing_thread.join(timeout=5.0)
print("Real-time engine stopped")
def ingest_event(self, event: PlayEvent):
"""Ingest a new play event."""
event.timestamp = datetime.now()
self.event_queue.put(event)
def _process_events(self):
"""Main event processing loop."""
while self.running:
try:
event = self.event_queue.get(timeout=0.1)
start_time = time.time()
# Process the event
self._handle_event(event)
# Track latency
latency = (time.time() - start_time) * 1000 # ms
self.latency_history.append(latency)
self.events_processed += 1
except Empty:
continue
except Exception as e:
print(f"Error processing event: {e}")
def _handle_event(self, event: PlayEvent):
"""Process a single event."""
game_id = event.game_id
# Update game state
if game_id not in self.game_states:
self.game_states[game_id] = GameState(
game_id=game_id,
home_team=event.raw_data.get('home_team', 'Home'),
away_team=event.raw_data.get('away_team', 'Away')
)
state = self.game_states[game_id]
self._update_state(state, event)
# Notify subscribers
self._notify_subscribers(game_id, state, event)
def _update_state(self, state: GameState, event: PlayEvent):
"""Update game state from event."""
state.home_score = event.home_score
state.away_score = event.away_score
state.last_updated = event.timestamp
# Parse play result to update down/distance/field position
if event.play_type in ['rush', 'pass']:
if event.result == 'first_down':
state.down = 1
state.distance = 10
state.field_position += event.yards_gained
elif event.result == 'touchdown':
state.down = 1
state.distance = 10
state.field_position = 25
else:
state.field_position += event.yards_gained
state.distance -= event.yards_gained
state.down += 1
if state.down > 4:
# Turnover on downs
state.down = 1
state.distance = 10
state.field_position = 100 - state.field_position
def subscribe(self, game_id: str, callback: Callable):
"""Subscribe to game updates."""
if game_id not in self.subscribers:
self.subscribers[game_id] = []
self.subscribers[game_id].append(callback)
def _notify_subscribers(self, game_id: str, state: GameState, event: PlayEvent):
"""Notify all subscribers of a game update."""
for callback in self.subscribers.get(game_id, []):
try:
callback(state, event)
except Exception as e:
print(f"Subscriber error: {e}")
def get_performance_stats(self) -> Dict:
"""Get engine performance statistics."""
if not self.latency_history:
return {'avg_latency_ms': 0, 'max_latency_ms': 0}
return {
'events_processed': self.events_processed,
'avg_latency_ms': np.mean(self.latency_history),
'p95_latency_ms': np.percentile(list(self.latency_history), 95),
'max_latency_ms': max(self.latency_history),
'queue_size': self.event_queue.qsize()
}
26.2 Streaming Data Processing
26.2.1 Data Feed Integration
import aiohttp
import asyncio
from abc import ABC, abstractmethod
class DataFeedAdapter(ABC):
"""Abstract base class for data feed adapters."""
@abstractmethod
async def connect(self):
"""Connect to the data source."""
pass
@abstractmethod
async def disconnect(self):
"""Disconnect from the data source."""
pass
@abstractmethod
async def stream_events(self):
"""Stream events from the data source."""
pass
class PlayByPlayFeedAdapter(DataFeedAdapter):
"""Adapter for play-by-play JSON/XML feeds."""
def __init__(self, feed_url: str, poll_interval: float = 1.0):
self.feed_url = feed_url
self.poll_interval = poll_interval
self.session: Optional[aiohttp.ClientSession] = None
self.last_play_id: Optional[str] = None
self.running = False
async def connect(self):
"""Initialize HTTP session."""
self.session = aiohttp.ClientSession()
self.running = True
async def disconnect(self):
"""Close HTTP session."""
self.running = False
if self.session:
await self.session.close()
async def stream_events(self):
"""Poll feed and yield new events."""
while self.running:
try:
async with self.session.get(self.feed_url) as response:
if response.status == 200:
data = await response.json()
events = self._parse_feed(data)
for event in events:
if self._is_new_event(event):
yield event
self.last_play_id = event.play_id
except Exception as e:
print(f"Feed error: {e}")
await asyncio.sleep(self.poll_interval)
def _parse_feed(self, data: Dict) -> List[PlayEvent]:
"""Parse raw feed data into PlayEvent objects."""
events = []
for play in data.get('plays', []):
event = PlayEvent(
game_id=data.get('gameId', ''),
play_id=play.get('playId', ''),
timestamp=datetime.now(),
play_type=play.get('playType', ''),
description=play.get('description', ''),
yards_gained=play.get('yardsGained', 0),
result=play.get('result', ''),
home_score=data.get('homeScore', 0),
away_score=data.get('awayScore', 0),
raw_data=play
)
events.append(event)
return events
def _is_new_event(self, event: PlayEvent) -> bool:
"""Check if event is new."""
return event.play_id != self.last_play_id
class TrackingDataAdapter(DataFeedAdapter):
"""Adapter for high-frequency tracking data streams."""
def __init__(self, websocket_url: str):
self.websocket_url = websocket_url
self.websocket = None
self.running = False
self.buffer: deque = deque(maxlen=100)
async def connect(self):
"""Connect to WebSocket stream."""
import websockets
self.websocket = await websockets.connect(self.websocket_url)
self.running = True
async def disconnect(self):
"""Close WebSocket connection."""
self.running = False
if self.websocket:
await self.websocket.close()
async def stream_events(self):
"""Stream tracking frames."""
while self.running:
try:
message = await asyncio.wait_for(
self.websocket.recv(),
timeout=1.0
)
frame = json.loads(message)
yield self._parse_tracking_frame(frame)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"WebSocket error: {e}")
break
def _parse_tracking_frame(self, frame: Dict) -> Dict:
"""Parse tracking frame data."""
return {
'frame_id': frame.get('frameId'),
'timestamp': frame.get('timestamp'),
'players': [
{
'player_id': p.get('playerId'),
'x': p.get('x'),
'y': p.get('y'),
'speed': p.get('speed'),
'direction': p.get('direction')
}
for p in frame.get('players', [])
],
'ball': frame.get('ball', {})
}
class StreamProcessor:
"""Process incoming data streams."""
def __init__(self, engine: RealTimeEngine):
self.engine = engine
self.adapters: List[DataFeedAdapter] = []
self.processors: List[Callable] = []
def add_adapter(self, adapter: DataFeedAdapter):
"""Add a data feed adapter."""
self.adapters.append(adapter)
def add_processor(self, processor: Callable):
"""Add a data processor function."""
self.processors.append(processor)
async def start(self):
"""Start processing all streams."""
tasks = []
for adapter in self.adapters:
await adapter.connect()
tasks.append(asyncio.create_task(self._process_stream(adapter)))
await asyncio.gather(*tasks)
async def _process_stream(self, adapter: DataFeedAdapter):
"""Process events from a single stream."""
async for event in adapter.stream_events():
# Apply processors
processed_event = event
for processor in self.processors:
processed_event = processor(processed_event)
# Send to engine
if isinstance(processed_event, PlayEvent):
self.engine.ingest_event(processed_event)
26.2.2 Data Validation and Quality
from enum import Enum
from dataclasses import dataclass
class DataQualityLevel(Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INVALID = "invalid"
@dataclass
class ValidationResult:
"""Result of data validation."""
is_valid: bool
quality_level: DataQualityLevel
errors: List[str]
warnings: List[str]
class RealTimeDataValidator:
"""Validate real-time data for quality and consistency."""
def __init__(self):
self.validation_rules: List[Callable] = []
self.historical_stats: Dict[str, Dict] = {}
def add_rule(self, rule: Callable):
"""Add a validation rule."""
self.validation_rules.append(rule)
def validate_event(self, event: PlayEvent, state: GameState) -> ValidationResult:
"""Validate a play event."""
errors = []
warnings = []
# Basic field validation
if not event.game_id:
errors.append("Missing game_id")
if not event.play_id:
errors.append("Missing play_id")
# Temporal validation
if event.timestamp < state.last_updated:
warnings.append("Event timestamp before last update")
# Score consistency
if event.home_score < state.home_score or event.away_score < state.away_score:
errors.append("Score decreased - invalid")
# Field position bounds
if hasattr(event, 'field_position'):
if event.raw_data.get('field_position', 0) < 0 or \
event.raw_data.get('field_position', 0) > 100:
errors.append("Invalid field position")
# Run custom rules
for rule in self.validation_rules:
rule_result = rule(event, state)
errors.extend(rule_result.get('errors', []))
warnings.extend(rule_result.get('warnings', []))
# Determine quality level
if errors:
quality = DataQualityLevel.INVALID
elif warnings:
quality = DataQualityLevel.MEDIUM
else:
quality = DataQualityLevel.HIGH
return ValidationResult(
is_valid=len(errors) == 0,
quality_level=quality,
errors=errors,
warnings=warnings
)
def validate_tracking_frame(self, frame: Dict) -> ValidationResult:
"""Validate tracking data frame."""
errors = []
warnings = []
# Check for required fields
if 'frame_id' not in frame:
errors.append("Missing frame_id")
if 'players' not in frame:
errors.append("Missing players data")
# Validate player positions
for player in frame.get('players', []):
x, y = player.get('x', 0), player.get('y', 0)
# Field bounds check
if not (0 <= x <= 120):
errors.append(f"Player {player.get('player_id')} x out of bounds")
if not (0 <= y <= 53.3):
errors.append(f"Player {player.get('player_id')} y out of bounds")
# Speed sanity check (no one runs >15 yards/sec)
speed = player.get('speed', 0)
if speed > 15:
warnings.append(f"Unusually high speed for player {player.get('player_id')}")
# Check player count
player_count = len(frame.get('players', []))
if player_count < 22:
warnings.append(f"Only {player_count} players detected")
elif player_count > 22:
errors.append(f"Too many players detected: {player_count}")
quality = DataQualityLevel.INVALID if errors else \
DataQualityLevel.MEDIUM if warnings else \
DataQualityLevel.HIGH
return ValidationResult(
is_valid=len(errors) == 0,
quality_level=quality,
errors=errors,
warnings=warnings
)
class DataQualityMonitor:
"""Monitor data quality over time."""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.validation_history: deque = deque(maxlen=window_size)
self.error_counts: Dict[str, int] = {}
def record_validation(self, result: ValidationResult):
"""Record a validation result."""
self.validation_history.append(result)
for error in result.errors:
self.error_counts[error] = self.error_counts.get(error, 0) + 1
def get_quality_metrics(self) -> Dict:
"""Get current quality metrics."""
if not self.validation_history:
return {}
valid_count = sum(1 for r in self.validation_history if r.is_valid)
quality_distribution = {}
for r in self.validation_history:
level = r.quality_level.value
quality_distribution[level] = quality_distribution.get(level, 0) + 1
return {
'valid_rate': valid_count / len(self.validation_history),
'quality_distribution': quality_distribution,
'top_errors': sorted(
self.error_counts.items(),
key=lambda x: x[1],
reverse=True
)[:5]
}
26.3 Live Win Probability Models
26.3.1 Real-Time Win Probability Engine
import numpy as np
from scipy.special import expit
import pickle
class LiveWinProbabilityEngine:
"""Real-time win probability calculation."""
def __init__(self, model_path: Optional[str] = None):
self.model = None
self.feature_means: Dict[str, float] = {}
self.feature_stds: Dict[str, float] = {}
if model_path:
self.load_model(model_path)
else:
self._initialize_default_model()
def _initialize_default_model(self):
"""Initialize with simple logistic model coefficients."""
# Pre-trained coefficients for demonstration
self.coefficients = {
'intercept': 0.0,
'score_diff': 0.12,
'time_remaining_pct': -0.02,
'score_diff_x_time': 0.08,
'field_position': 0.015,
'is_home': 0.15,
'timeouts_diff': 0.05
}
def load_model(self, path: str):
"""Load pre-trained model from file."""
with open(path, 'rb') as f:
model_data = pickle.load(f)
self.model = model_data.get('model')
self.feature_means = model_data.get('means', {})
self.feature_stds = model_data.get('stds', {})
def calculate_win_probability(self,
state: GameState,
team: str) -> float:
"""Calculate win probability for specified team."""
features = self._extract_features(state, team)
if self.model:
# Use trained model
feature_vector = self._create_feature_vector(features)
prob = self.model.predict_proba([feature_vector])[0][1]
else:
# Use simple logistic model
prob = self._simple_model_prediction(features)
return float(prob)
def _extract_features(self, state: GameState, team: str) -> Dict:
"""Extract features for win probability model."""
is_home = team == state.home_team
if is_home:
score_diff = state.home_score - state.away_score
else:
score_diff = state.away_score - state.home_score
# Calculate time remaining as percentage of game
quarter_seconds = 900 # 15 minutes
total_game_seconds = quarter_seconds * 4
elapsed = (state.quarter - 1) * quarter_seconds + (quarter_seconds - state.time_remaining)
time_remaining_pct = 1 - (elapsed / total_game_seconds)
# Field position from team's perspective
if (is_home and state.possession == state.home_team) or \
(not is_home and state.possession == state.away_team):
field_pos = state.field_position
has_possession = True
else:
field_pos = 100 - state.field_position
has_possession = False
return {
'score_diff': score_diff,
'time_remaining_pct': time_remaining_pct,
'score_diff_x_time': score_diff * time_remaining_pct,
'field_position': field_pos,
'is_home': 1 if is_home else 0,
'has_possession': 1 if has_possession else 0,
'down': state.down,
'distance': state.distance,
'quarter': state.quarter
}
def _simple_model_prediction(self, features: Dict) -> float:
"""Simple logistic model prediction."""
linear_pred = self.coefficients['intercept']
for feature, coef in self.coefficients.items():
if feature != 'intercept' and feature in features:
linear_pred += coef * features[feature]
return expit(linear_pred)
def _create_feature_vector(self, features: Dict) -> np.ndarray:
"""Create normalized feature vector for ML model."""
feature_order = ['score_diff', 'time_remaining_pct', 'field_position',
'is_home', 'has_possession', 'down', 'distance']
vector = []
for feat in feature_order:
value = features.get(feat, 0)
# Normalize if we have stats
if feat in self.feature_means:
value = (value - self.feature_means[feat]) / self.feature_stds.get(feat, 1)
vector.append(value)
return np.array(vector)
class WinProbabilityTracker:
"""Track win probability throughout a game."""
def __init__(self, wp_engine: LiveWinProbabilityEngine):
self.engine = wp_engine
self.wp_history: Dict[str, List[Dict]] = {}
def update(self, state: GameState, event: Optional[PlayEvent] = None):
"""Update win probabilities after state change."""
game_id = state.game_id
if game_id not in self.wp_history:
self.wp_history[game_id] = []
home_wp = self.engine.calculate_win_probability(state, state.home_team)
away_wp = 1 - home_wp
entry = {
'timestamp': datetime.now().isoformat(),
'quarter': state.quarter,
'time_remaining': state.time_remaining,
'home_wp': home_wp,
'away_wp': away_wp,
'home_score': state.home_score,
'away_score': state.away_score,
'play_id': event.play_id if event else None
}
self.wp_history[game_id].append(entry)
return entry
def get_wp_added(self, game_id: str, play_id: str) -> Optional[float]:
"""Calculate WPA for a specific play."""
history = self.wp_history.get(game_id, [])
for i, entry in enumerate(history):
if entry['play_id'] == play_id and i > 0:
return entry['home_wp'] - history[i-1]['home_wp']
return None
def get_game_chart_data(self, game_id: str) -> List[Dict]:
"""Get data formatted for win probability chart."""
return self.wp_history.get(game_id, [])
def get_key_plays(self, game_id: str, threshold: float = 0.10) -> List[Dict]:
"""Identify plays with large WPA."""
history = self.wp_history.get(game_id, [])
key_plays = []
for i in range(1, len(history)):
wpa = abs(history[i]['home_wp'] - history[i-1]['home_wp'])
if wpa >= threshold:
key_plays.append({
**history[i],
'wpa': history[i]['home_wp'] - history[i-1]['home_wp']
})
return sorted(key_plays, key=lambda x: abs(x['wpa']), reverse=True)
26.3.2 Decision Support System
class LiveDecisionSupport:
"""Real-time decision support for coaching staff."""
def __init__(self, wp_engine: LiveWinProbabilityEngine):
self.wp_engine = wp_engine
self.fourth_down_model = None
self.field_goal_model = None
def analyze_fourth_down(self, state: GameState, team: str) -> Dict:
"""Analyze fourth down decision options."""
options = {}
# Current situation
current_wp = self.wp_engine.calculate_win_probability(state, team)
# Option 1: Go for it
go_for_it_wp = self._simulate_go_for_it(state, team)
options['go_for_it'] = {
'expected_wp': go_for_it_wp,
'wp_gain': go_for_it_wp - current_wp,
'conversion_prob': self._estimate_conversion_prob(state)
}
# Option 2: Field goal (if in range)
if state.field_position >= 60: # Within ~40 yard FG range
fg_wp = self._simulate_field_goal(state, team)
options['field_goal'] = {
'expected_wp': fg_wp,
'wp_gain': fg_wp - current_wp,
'make_prob': self._estimate_fg_prob(state)
}
# Option 3: Punt
punt_wp = self._simulate_punt(state, team)
options['punt'] = {
'expected_wp': punt_wp,
'wp_gain': punt_wp - current_wp,
'expected_net_yards': self._estimate_punt_yards(state)
}
# Recommendation
best_option = max(options.items(), key=lambda x: x[1]['expected_wp'])
options['recommendation'] = best_option[0]
options['confidence'] = best_option[1]['expected_wp'] - \
sorted([o['expected_wp'] for o in options.values()
if isinstance(o, dict) and 'expected_wp' in o])[-2] \
if len(options) > 2 else 0
return options
def _simulate_go_for_it(self, state: GameState, team: str) -> float:
"""Simulate expected WP if going for it."""
conv_prob = self._estimate_conversion_prob(state)
# If conversion successful
success_state = GameState(
game_id=state.game_id,
home_team=state.home_team,
away_team=state.away_team,
home_score=state.home_score,
away_score=state.away_score,
quarter=state.quarter,
time_remaining=state.time_remaining - 6, # Assume ~6 seconds
possession=state.possession,
field_position=min(state.field_position + state.distance, 100),
down=1,
distance=10
)
success_wp = self.wp_engine.calculate_win_probability(success_state, team)
# If conversion fails (turnover on downs)
fail_state = GameState(
game_id=state.game_id,
home_team=state.home_team,
away_team=state.away_team,
home_score=state.home_score,
away_score=state.away_score,
quarter=state.quarter,
time_remaining=state.time_remaining - 6,
possession=state.away_team if state.possession == state.home_team else state.home_team,
field_position=100 - state.field_position,
down=1,
distance=10
)
fail_wp = self.wp_engine.calculate_win_probability(fail_state, team)
return conv_prob * success_wp + (1 - conv_prob) * fail_wp
def _simulate_field_goal(self, state: GameState, team: str) -> float:
"""Simulate expected WP if attempting field goal."""
fg_prob = self._estimate_fg_prob(state)
# If FG is good
if team == state.home_team:
make_state = GameState(
game_id=state.game_id,
home_team=state.home_team,
away_team=state.away_team,
home_score=state.home_score + 3,
away_score=state.away_score,
quarter=state.quarter,
time_remaining=state.time_remaining - 6,
possession=state.away_team,
field_position=25, # Touchback on kickoff
down=1,
distance=10
)
else:
make_state = GameState(
game_id=state.game_id,
home_team=state.home_team,
away_team=state.away_team,
home_score=state.home_score,
away_score=state.away_score + 3,
quarter=state.quarter,
time_remaining=state.time_remaining - 6,
possession=state.home_team,
field_position=25,
down=1,
distance=10
)
make_wp = self.wp_engine.calculate_win_probability(make_state, team)
# If FG is missed
miss_state = GameState(
game_id=state.game_id,
home_team=state.home_team,
away_team=state.away_team,
home_score=state.home_score,
away_score=state.away_score,
quarter=state.quarter,
time_remaining=state.time_remaining - 6,
possession=state.away_team if state.possession == state.home_team else state.home_team,
field_position=max(100 - state.field_position, 20), # Spot of kick or 20
down=1,
distance=10
)
miss_wp = self.wp_engine.calculate_win_probability(miss_state, team)
return fg_prob * make_wp + (1 - fg_prob) * miss_wp
def _simulate_punt(self, state: GameState, team: str) -> float:
"""Simulate expected WP if punting."""
expected_net = self._estimate_punt_yards(state)
punt_state = GameState(
game_id=state.game_id,
home_team=state.home_team,
away_team=state.away_team,
home_score=state.home_score,
away_score=state.away_score,
quarter=state.quarter,
time_remaining=state.time_remaining - 6,
possession=state.away_team if state.possession == state.home_team else state.home_team,
field_position=max(100 - state.field_position - expected_net, 20),
down=1,
distance=10
)
return self.wp_engine.calculate_win_probability(punt_state, team)
def _estimate_conversion_prob(self, state: GameState) -> float:
"""Estimate 4th down conversion probability."""
# Simple model based on distance
base_prob = 0.55 # Average 4th down conversion rate
# Adjust for distance
if state.distance <= 1:
return 0.70
elif state.distance <= 3:
return 0.55
elif state.distance <= 5:
return 0.45
else:
return max(0.30 - (state.distance - 5) * 0.03, 0.15)
def _estimate_fg_prob(self, state: GameState) -> float:
"""Estimate field goal probability."""
fg_distance = 117 - state.field_position # End zone is at 110, kick from ~7 behind
# Simple distance-based model
if fg_distance <= 30:
return 0.95
elif fg_distance <= 40:
return 0.85
elif fg_distance <= 50:
return 0.70
elif fg_distance <= 55:
return 0.50
else:
return max(0.30 - (fg_distance - 55) * 0.05, 0.10)
def _estimate_punt_yards(self, state: GameState) -> float:
"""Estimate expected punt net yards."""
# Average net punt is about 40 yards
# Adjust for field position (shorter punts near end zone)
if state.field_position >= 60:
return min(40, 100 - state.field_position - 10)
return 40
26.4 In-Game Dashboards
26.4.1 Dashboard Data Service
from typing import Callable
import json
class DashboardDataService:
"""Serve real-time data to dashboard clients."""
def __init__(self, engine: RealTimeEngine, wp_tracker: WinProbabilityTracker):
self.engine = engine
self.wp_tracker = wp_tracker
self.connected_clients: Dict[str, List] = {} # game_id -> websocket connections
def get_game_snapshot(self, game_id: str) -> Dict:
"""Get current game state and metrics."""
state = self.engine.game_states.get(game_id)
if not state:
return {'error': 'Game not found'}
wp_data = self.wp_tracker.get_game_chart_data(game_id)
latest_wp = wp_data[-1] if wp_data else None
return {
'game_state': state.to_dict(),
'win_probability': {
'home': latest_wp['home_wp'] if latest_wp else 0.5,
'away': latest_wp['away_wp'] if latest_wp else 0.5
},
'wp_chart_data': wp_data[-50:], # Last 50 data points
'key_plays': self.wp_tracker.get_key_plays(game_id)[:5],
'performance': self.engine.get_performance_stats(),
'timestamp': datetime.now().isoformat()
}
def get_live_metrics(self, game_id: str) -> Dict:
"""Get real-time analytics metrics."""
state = self.engine.game_states.get(game_id)
if not state:
return {}
return {
'scoring_pace': self._calculate_scoring_pace(state),
'time_of_possession': self._estimate_time_of_possession(game_id),
'momentum': self._calculate_momentum(game_id),
'leverage_index': self._calculate_leverage_index(state)
}
def _calculate_scoring_pace(self, state: GameState) -> Dict:
"""Calculate projected final score based on current pace."""
elapsed_quarters = state.quarter - 1 + (900 - state.time_remaining) / 900
if elapsed_quarters > 0:
home_pace = state.home_score / elapsed_quarters * 4
away_pace = state.away_score / elapsed_quarters * 4
else:
home_pace = away_pace = 0
return {
'home_projected': round(home_pace),
'away_projected': round(away_pace)
}
def _calculate_momentum(self, game_id: str) -> float:
"""Calculate momentum based on recent WP changes."""
wp_history = self.wp_tracker.wp_history.get(game_id, [])
if len(wp_history) < 5:
return 0
# Look at last 5 plays
recent = wp_history[-5:]
wp_change = recent[-1]['home_wp'] - recent[0]['home_wp']
# Normalize to -1 to 1 scale
return max(min(wp_change * 5, 1), -1)
def _calculate_leverage_index(self, state: GameState) -> float:
"""Calculate how important the current situation is."""
# Higher leverage = closer game + less time
score_diff = abs(state.home_score - state.away_score)
time_factor = 1 - (state.quarter - 1) / 4
# Base leverage decreases with score differential
if score_diff == 0:
base = 2.0
elif score_diff <= 3:
base = 1.5
elif score_diff <= 7:
base = 1.0
elif score_diff <= 14:
base = 0.6
else:
base = 0.3
# Multiply by time factor (more leverage late in game)
return base * (1 + time_factor)
def _estimate_time_of_possession(self, game_id: str) -> Dict:
"""Estimate time of possession based on play history."""
# Simplified estimation
return {
'home': 0.50,
'away': 0.50
}
class WebSocketServer:
"""WebSocket server for real-time dashboard updates."""
def __init__(self, data_service: DashboardDataService):
self.data_service = data_service
self.connections: Dict[str, set] = {} # game_id -> set of websockets
async def handler(self, websocket, path):
"""Handle WebSocket connections."""
game_id = path.strip('/')
# Register connection
if game_id not in self.connections:
self.connections[game_id] = set()
self.connections[game_id].add(websocket)
try:
# Send initial snapshot
snapshot = self.data_service.get_game_snapshot(game_id)
await websocket.send(json.dumps({
'type': 'snapshot',
'data': snapshot
}))
# Keep connection alive and handle incoming messages
async for message in websocket:
msg_data = json.loads(message)
if msg_data.get('type') == 'request_metrics':
metrics = self.data_service.get_live_metrics(game_id)
await websocket.send(json.dumps({
'type': 'metrics',
'data': metrics
}))
finally:
# Unregister connection
self.connections[game_id].discard(websocket)
async def broadcast_update(self, game_id: str, update: Dict):
"""Broadcast update to all connected clients for a game."""
if game_id in self.connections:
message = json.dumps({
'type': 'update',
'data': update
})
await asyncio.gather(*[
ws.send(message) for ws in self.connections[game_id]
])
26.4.2 Dashboard UI Components
# Flask-based dashboard backend
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit, join_room
class DashboardApp:
"""Flask application for analytics dashboard."""
def __init__(self, data_service: DashboardDataService):
self.app = Flask(__name__)
self.socketio = SocketIO(self.app, cors_allowed_origins="*")
self.data_service = data_service
self._setup_routes()
self._setup_socket_handlers()
def _setup_routes(self):
"""Setup HTTP routes."""
@self.app.route('/')
def index():
return render_template('dashboard.html')
@self.app.route('/api/games')
def list_games():
games = list(self.data_service.engine.game_states.keys())
return jsonify({'games': games})
@self.app.route('/api/game/<game_id>')
def get_game(game_id):
snapshot = self.data_service.get_game_snapshot(game_id)
return jsonify(snapshot)
@self.app.route('/api/game/<game_id>/decision')
def get_decision(game_id):
state = self.data_service.engine.game_states.get(game_id)
if not state:
return jsonify({'error': 'Game not found'}), 404
# Get decision analysis
decision_support = LiveDecisionSupport(
self.data_service.wp_tracker.engine
)
analysis = decision_support.analyze_fourth_down(
state, state.possession
)
return jsonify(analysis)
def _setup_socket_handlers(self):
"""Setup WebSocket handlers."""
@self.socketio.on('join')
def on_join(data):
game_id = data.get('game_id')
join_room(game_id)
# Send initial data
snapshot = self.data_service.get_game_snapshot(game_id)
emit('game_update', snapshot)
@self.socketio.on('request_update')
def on_request_update(data):
game_id = data.get('game_id')
snapshot = self.data_service.get_game_snapshot(game_id)
emit('game_update', snapshot)
def broadcast_to_game(self, game_id: str, data: Dict):
"""Broadcast update to all clients watching a game."""
self.socketio.emit('game_update', data, room=game_id)
def run(self, host='0.0.0.0', port=5000):
"""Run the dashboard server."""
self.socketio.run(self.app, host=host, port=port)
<!-- templates/dashboard.html -->
<!DOCTYPE html>
<html>
<head>
<title>Live Football Analytics Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background: #1a1a2e;
color: white;
}
.dashboard {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
max-width: 1400px;
margin: 0 auto;
}
.card {
background: #16213e;
border-radius: 10px;
padding: 20px;
}
.scoreboard {
grid-column: span 2;
display: flex;
justify-content: center;
align-items: center;
gap: 50px;
}
.team-score {
text-align: center;
}
.team-name {
font-size: 1.5rem;
margin-bottom: 10px;
}
.score {
font-size: 4rem;
font-weight: bold;
}
.win-prob {
font-size: 1.2rem;
color: #4ecca3;
}
.wp-chart {
height: 300px;
}
.game-info {
display: flex;
justify-content: space-around;
}
.info-item {
text-align: center;
}
.key-plays {
max-height: 300px;
overflow-y: auto;
}
.play-item {
padding: 10px;
border-bottom: 1px solid #333;
}
.positive { color: #4ecca3; }
.negative { color: #e74c3c; }
</style>
</head>
<body>
<div class="dashboard">
<div class="card scoreboard">
<div class="team-score">
<div class="team-name" id="away-team">Away</div>
<div class="score" id="away-score">0</div>
<div class="win-prob" id="away-wp">50%</div>
</div>
<div class="game-status">
<div id="quarter">Q1</div>
<div id="time">15:00</div>
<div id="possession"></div>
</div>
<div class="team-score">
<div class="team-name" id="home-team">Home</div>
<div class="score" id="home-score">0</div>
<div class="win-prob" id="home-wp">50%</div>
</div>
</div>
<div class="card">
<h3>Win Probability</h3>
<div class="wp-chart">
<canvas id="wp-chart"></canvas>
</div>
</div>
<div class="card">
<h3>Game Situation</h3>
<div class="game-info">
<div class="info-item">
<div>Down & Distance</div>
<div id="down-distance">1st & 10</div>
</div>
<div class="info-item">
<div>Field Position</div>
<div id="field-pos">Own 25</div>
</div>
<div class="info-item">
<div>Leverage Index</div>
<div id="leverage">1.0</div>
</div>
</div>
</div>
<div class="card">
<h3>Key Plays</h3>
<div class="key-plays" id="key-plays"></div>
</div>
<div class="card">
<h3>Decision Support</h3>
<div id="decision-support">
<p>Fourth down analysis available when applicable</p>
</div>
</div>
</div>
<script>
const socket = io();
const gameId = new URLSearchParams(window.location.search).get('game') || 'default';
let wpChart;
// Initialize chart
function initChart() {
const ctx = document.getElementById('wp-chart').getContext('2d');
wpChart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: 'Home Win Probability',
data: [],
borderColor: '#4ecca3',
fill: true,
backgroundColor: 'rgba(78, 204, 163, 0.1)'
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
scales: {
y: {
min: 0,
max: 1,
ticks: { callback: v => (v * 100) + '%' }
}
}
}
});
}
// Update dashboard
function updateDashboard(data) {
const state = data.game_state;
const wp = data.win_probability;
// Update scores
document.getElementById('home-team').textContent = state.home_team;
document.getElementById('away-team').textContent = state.away_team;
document.getElementById('home-score').textContent = state.home_score;
document.getElementById('away-score').textContent = state.away_score;
// Update win probabilities
document.getElementById('home-wp').textContent = (wp.home * 100).toFixed(1) + '%';
document.getElementById('away-wp').textContent = (wp.away * 100).toFixed(1) + '%';
// Update game situation
document.getElementById('quarter').textContent = 'Q' + state.quarter;
const mins = Math.floor(state.time_remaining / 60);
const secs = Math.floor(state.time_remaining % 60);
document.getElementById('time').textContent = mins + ':' + secs.toString().padStart(2, '0');
document.getElementById('down-distance').textContent =
ordinal(state.down) + ' & ' + state.distance;
// Update chart
if (data.wp_chart_data) {
wpChart.data.labels = data.wp_chart_data.map((_, i) => i);
wpChart.data.datasets[0].data = data.wp_chart_data.map(d => d.home_wp);
wpChart.update('none');
}
// Update key plays
const keyPlaysDiv = document.getElementById('key-plays');
keyPlaysDiv.innerHTML = data.key_plays.map(play => `
<div class="play-item">
<span class="${play.wpa >= 0 ? 'positive' : 'negative'}">
${(play.wpa * 100).toFixed(1)}% WPA
</span>
- Q${play.quarter} ${formatTime(play.time_remaining)}
</div>
`).join('');
}
function ordinal(n) {
const s = ['th', 'st', 'nd', 'rd'];
const v = n % 100;
return n + (s[(v - 20) % 10] || s[v] || s[0]);
}
function formatTime(seconds) {
const m = Math.floor(seconds / 60);
const s = Math.floor(seconds % 60);
return m + ':' + s.toString().padStart(2, '0');
}
// Socket handlers
socket.on('connect', () => {
socket.emit('join', { game_id: gameId });
});
socket.on('game_update', updateDashboard);
// Initialize
initChart();
</script>
</body>
</html>
26.5 Production Deployment
26.5.1 System Monitoring
import logging
from prometheus_client import Counter, Histogram, Gauge, start_http_server
class MetricsCollector:
"""Collect and expose system metrics."""
def __init__(self):
# Event processing metrics
self.events_processed = Counter(
'events_processed_total',
'Total events processed',
['game_id', 'event_type']
)
self.processing_latency = Histogram(
'event_processing_latency_seconds',
'Event processing latency',
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
)
self.queue_size = Gauge(
'event_queue_size',
'Current event queue size'
)
# Data quality metrics
self.validation_errors = Counter(
'validation_errors_total',
'Total validation errors',
['error_type']
)
self.data_quality = Gauge(
'data_quality_score',
'Current data quality score',
['game_id']
)
# Connection metrics
self.active_connections = Gauge(
'active_websocket_connections',
'Number of active WebSocket connections',
['game_id']
)
def start_metrics_server(self, port: int = 8000):
"""Start Prometheus metrics server."""
start_http_server(port)
logging.info(f"Metrics server started on port {port}")
class SystemMonitor:
"""Monitor system health and performance."""
def __init__(self, metrics: MetricsCollector):
self.metrics = metrics
self.logger = logging.getLogger('SystemMonitor')
def check_health(self, engine: RealTimeEngine) -> Dict:
"""Perform health check."""
health = {
'status': 'healthy',
'checks': {}
}
# Check event processing
stats = engine.get_performance_stats()
if stats.get('queue_size', 0) > 1000:
health['status'] = 'degraded'
health['checks']['queue'] = 'Queue backlog detected'
if stats.get('p95_latency_ms', 0) > 100:
health['status'] = 'degraded'
health['checks']['latency'] = 'High latency detected'
# Check memory usage
import psutil
memory = psutil.Process().memory_info()
if memory.rss > 2 * 1024 * 1024 * 1024: # 2GB
health['status'] = 'degraded'
health['checks']['memory'] = 'High memory usage'
health['metrics'] = {
'events_processed': stats.get('events_processed', 0),
'avg_latency_ms': stats.get('avg_latency_ms', 0),
'queue_size': stats.get('queue_size', 0),
'memory_mb': memory.rss / (1024 * 1024)
}
return health
def setup_alerts(self):
"""Configure alerting rules."""
self.alert_rules = [
{
'name': 'high_latency',
'condition': lambda s: s.get('avg_latency_ms', 0) > 100,
'severity': 'warning',
'message': 'Event processing latency exceeds 100ms'
},
{
'name': 'queue_backlog',
'condition': lambda s: s.get('queue_size', 0) > 500,
'severity': 'critical',
'message': 'Event queue backlog exceeds 500'
},
{
'name': 'data_quality',
'condition': lambda s: s.get('valid_rate', 1) < 0.95,
'severity': 'warning',
'message': 'Data quality below 95%'
}
]
def check_alerts(self, stats: Dict) -> List[Dict]:
"""Check for alert conditions."""
triggered = []
for rule in self.alert_rules:
if rule['condition'](stats):
alert = {
'name': rule['name'],
'severity': rule['severity'],
'message': rule['message'],
'timestamp': datetime.now().isoformat()
}
triggered.append(alert)
self.logger.warning(f"Alert triggered: {alert}")
return triggered
26.5.2 Deployment Configuration
# Docker configuration
DOCKERFILE = """
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose ports
EXPOSE 5000 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
CMD curl -f http://localhost:5000/health || exit 1
# Run application
CMD ["python", "-m", "app.main"]
"""
# Docker Compose configuration
DOCKER_COMPOSE = """
version: '3.8'
services:
analytics-engine:
build: .
ports:
- "5000:5000"
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
depends_on:
- redis
- kafka
deploy:
resources:
limits:
cpus: '2'
memory: 4G
reservations:
cpus: '1'
memory: 2G
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
redis_data:
"""
class DeploymentManager:
"""Manage deployment and scaling."""
def __init__(self, config: Dict):
self.config = config
def generate_deployment_config(self) -> Dict:
"""Generate deployment configuration."""
return {
'docker': {
'image': 'football-analytics:latest',
'replicas': self.config.get('replicas', 2),
'resources': {
'limits': {'cpu': '2', 'memory': '4Gi'},
'requests': {'cpu': '500m', 'memory': '1Gi'}
}
},
'autoscaling': {
'enabled': True,
'min_replicas': 2,
'max_replicas': 10,
'target_cpu_utilization': 70,
'target_memory_utilization': 80
},
'networking': {
'ingress': {
'host': self.config.get('domain', 'analytics.example.com'),
'tls': True
},
'service': {
'type': 'ClusterIP',
'ports': [
{'name': 'http', 'port': 5000},
{'name': 'metrics', 'port': 8000}
]
}
}
}
def get_scaling_recommendations(self, metrics: Dict) -> Dict:
"""Get scaling recommendations based on metrics."""
recommendations = {
'action': 'none',
'reason': ''
}
avg_latency = metrics.get('avg_latency_ms', 0)
queue_size = metrics.get('queue_size', 0)
if avg_latency > 50 or queue_size > 100:
recommendations['action'] = 'scale_up'
recommendations['reason'] = 'High latency or queue backlog'
recommendations['new_replicas'] = min(
self.config.get('max_replicas', 10),
self.config.get('current_replicas', 2) + 1
)
elif avg_latency < 10 and queue_size < 10:
recommendations['action'] = 'scale_down'
recommendations['reason'] = 'Low utilization'
recommendations['new_replicas'] = max(
self.config.get('min_replicas', 2),
self.config.get('current_replicas', 2) - 1
)
return recommendations
Summary
Real-time analytics systems enable split-second decision support during football games. Key components include:
- Stream Processing: Ingest and process live data feeds with sub-second latency
- Data Validation: Ensure data quality in real-time environments
- Live Models: Win probability and decision support updated continuously
- Dashboard Delivery: WebSocket-based updates to coaching staff
- Production Operations: Monitoring, scaling, and reliability
The challenge is balancing speed with accuracy while maintaining system reliability under game-day loads.
Key Takeaways
- Real-time systems require event-driven architectures
- Data validation is critical when processing live feeds
- Win probability models must update in milliseconds
- WebSockets enable real-time dashboard updates
- Production systems need comprehensive monitoring and alerting
- Horizontal scaling handles variable game-day loads