2 min read

Modern football analytics extends beyond post-game analysis to real-time decision support during competition. Real-time systems process live data streams, update models continuously, and deliver actionable insights to coaching staffs within seconds...

Chapter 26: Real-Time Analytics Systems

Introduction

Modern football analytics extends beyond post-game analysis to real-time decision support during competition. Real-time systems process live data streams, update models continuously, and deliver actionable insights to coaching staffs within seconds. This chapter explores the architecture, algorithms, and implementation challenges of building real-time analytics systems for college football.

Learning Objectives

By the end of this chapter, you will be able to:

  1. Design architectures for real-time sports analytics systems
  2. Implement streaming data processing pipelines
  3. Build live win probability and decision support models
  4. Create in-game dashboards with sub-second latency
  5. Handle data quality issues in real-time environments
  6. Deploy and monitor production analytics systems

26.1 Real-Time System Architecture

26.1.1 System Components Overview

REAL-TIME ANALYTICS ARCHITECTURE
================================

Data Sources:
┌─────────────────────────────────────────────────────────┐
│  Play-by-Play Feed  │  Tracking Data  │  External APIs  │
│      (JSON/XML)     │   (10-25 Hz)    │ (Weather, etc)  │
└──────────┬──────────┴────────┬────────┴────────┬────────┘
           │                   │                 │
           ▼                   ▼                 ▼
┌─────────────────────────────────────────────────────────┐
│                    INGESTION LAYER                      │
│   Message Queue (Kafka/RabbitMQ) + Stream Processing    │
└──────────────────────────┬──────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                   PROCESSING LAYER                      │
│  Data Validation → Feature Engineering → Model Scoring  │
└──────────────────────────┬──────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                    STORAGE LAYER                        │
│   In-Memory Cache (Redis)  │  Time-Series DB (InfluxDB) │
└──────────────────────────┬──────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│                    DELIVERY LAYER                       │
│  REST API  │  WebSocket  │  Dashboard  │  Mobile Push   │
└─────────────────────────────────────────────────────────┘

26.1.2 Core System Implementation

import asyncio
import json
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
from datetime import datetime
from collections import deque
import threading
from queue import Queue, Empty
import numpy as np

@dataclass
class GameState:
    """Current state of a football game."""
    game_id: str
    home_team: str
    away_team: str
    home_score: int = 0
    away_score: int = 0
    quarter: int = 1
    time_remaining: float = 900.0  # seconds in quarter
    possession: str = ""
    field_position: int = 25  # yards from own goal
    down: int = 1
    distance: int = 10
    is_final: bool = False
    last_updated: datetime = field(default_factory=datetime.now)

    def to_dict(self) -> Dict:
        return {
            'game_id': self.game_id,
            'home_team': self.home_team,
            'away_team': self.away_team,
            'home_score': self.home_score,
            'away_score': self.away_score,
            'quarter': self.quarter,
            'time_remaining': self.time_remaining,
            'possession': self.possession,
            'field_position': self.field_position,
            'down': self.down,
            'distance': self.distance,
            'is_final': self.is_final
        }


@dataclass
class PlayEvent:
    """Single play event from data feed."""
    game_id: str
    play_id: str
    timestamp: datetime
    play_type: str
    description: str
    yards_gained: int = 0
    result: str = ""
    home_score: int = 0
    away_score: int = 0
    raw_data: Dict = field(default_factory=dict)


class RealTimeEngine:
    """Core real-time analytics engine."""

    def __init__(self, config: Dict = None):
        self.config = config or {}
        self.game_states: Dict[str, GameState] = {}
        self.event_queue: Queue = Queue()
        self.subscribers: Dict[str, List[Callable]] = {}
        self.processing_thread: Optional[threading.Thread] = None
        self.running = False

        # Performance tracking
        self.latency_history: deque = deque(maxlen=1000)
        self.events_processed = 0

    def start(self):
        """Start the real-time processing engine."""
        self.running = True
        self.processing_thread = threading.Thread(
            target=self._process_events,
            daemon=True
        )
        self.processing_thread.start()
        print("Real-time engine started")

    def stop(self):
        """Stop the engine."""
        self.running = False
        if self.processing_thread:
            self.processing_thread.join(timeout=5.0)
        print("Real-time engine stopped")

    def ingest_event(self, event: PlayEvent):
        """Ingest a new play event."""
        event.timestamp = datetime.now()
        self.event_queue.put(event)

    def _process_events(self):
        """Main event processing loop."""
        while self.running:
            try:
                event = self.event_queue.get(timeout=0.1)
                start_time = time.time()

                # Process the event
                self._handle_event(event)

                # Track latency
                latency = (time.time() - start_time) * 1000  # ms
                self.latency_history.append(latency)
                self.events_processed += 1

            except Empty:
                continue
            except Exception as e:
                print(f"Error processing event: {e}")

    def _handle_event(self, event: PlayEvent):
        """Process a single event."""
        game_id = event.game_id

        # Update game state
        if game_id not in self.game_states:
            self.game_states[game_id] = GameState(
                game_id=game_id,
                home_team=event.raw_data.get('home_team', 'Home'),
                away_team=event.raw_data.get('away_team', 'Away')
            )

        state = self.game_states[game_id]
        self._update_state(state, event)

        # Notify subscribers
        self._notify_subscribers(game_id, state, event)

    def _update_state(self, state: GameState, event: PlayEvent):
        """Update game state from event."""
        state.home_score = event.home_score
        state.away_score = event.away_score
        state.last_updated = event.timestamp

        # Parse play result to update down/distance/field position
        if event.play_type in ['rush', 'pass']:
            if event.result == 'first_down':
                state.down = 1
                state.distance = 10
                state.field_position += event.yards_gained
            elif event.result == 'touchdown':
                state.down = 1
                state.distance = 10
                state.field_position = 25
            else:
                state.field_position += event.yards_gained
                state.distance -= event.yards_gained
                state.down += 1

                if state.down > 4:
                    # Turnover on downs
                    state.down = 1
                    state.distance = 10
                    state.field_position = 100 - state.field_position

    def subscribe(self, game_id: str, callback: Callable):
        """Subscribe to game updates."""
        if game_id not in self.subscribers:
            self.subscribers[game_id] = []
        self.subscribers[game_id].append(callback)

    def _notify_subscribers(self, game_id: str, state: GameState, event: PlayEvent):
        """Notify all subscribers of a game update."""
        for callback in self.subscribers.get(game_id, []):
            try:
                callback(state, event)
            except Exception as e:
                print(f"Subscriber error: {e}")

    def get_performance_stats(self) -> Dict:
        """Get engine performance statistics."""
        if not self.latency_history:
            return {'avg_latency_ms': 0, 'max_latency_ms': 0}

        return {
            'events_processed': self.events_processed,
            'avg_latency_ms': np.mean(self.latency_history),
            'p95_latency_ms': np.percentile(list(self.latency_history), 95),
            'max_latency_ms': max(self.latency_history),
            'queue_size': self.event_queue.qsize()
        }

26.2 Streaming Data Processing

26.2.1 Data Feed Integration

import aiohttp
import asyncio
from abc import ABC, abstractmethod

class DataFeedAdapter(ABC):
    """Abstract base class for data feed adapters."""

    @abstractmethod
    async def connect(self):
        """Connect to the data source."""
        pass

    @abstractmethod
    async def disconnect(self):
        """Disconnect from the data source."""
        pass

    @abstractmethod
    async def stream_events(self):
        """Stream events from the data source."""
        pass


class PlayByPlayFeedAdapter(DataFeedAdapter):
    """Adapter for play-by-play JSON/XML feeds."""

    def __init__(self, feed_url: str, poll_interval: float = 1.0):
        self.feed_url = feed_url
        self.poll_interval = poll_interval
        self.session: Optional[aiohttp.ClientSession] = None
        self.last_play_id: Optional[str] = None
        self.running = False

    async def connect(self):
        """Initialize HTTP session."""
        self.session = aiohttp.ClientSession()
        self.running = True

    async def disconnect(self):
        """Close HTTP session."""
        self.running = False
        if self.session:
            await self.session.close()

    async def stream_events(self):
        """Poll feed and yield new events."""
        while self.running:
            try:
                async with self.session.get(self.feed_url) as response:
                    if response.status == 200:
                        data = await response.json()
                        events = self._parse_feed(data)

                        for event in events:
                            if self._is_new_event(event):
                                yield event
                                self.last_play_id = event.play_id

            except Exception as e:
                print(f"Feed error: {e}")

            await asyncio.sleep(self.poll_interval)

    def _parse_feed(self, data: Dict) -> List[PlayEvent]:
        """Parse raw feed data into PlayEvent objects."""
        events = []

        for play in data.get('plays', []):
            event = PlayEvent(
                game_id=data.get('gameId', ''),
                play_id=play.get('playId', ''),
                timestamp=datetime.now(),
                play_type=play.get('playType', ''),
                description=play.get('description', ''),
                yards_gained=play.get('yardsGained', 0),
                result=play.get('result', ''),
                home_score=data.get('homeScore', 0),
                away_score=data.get('awayScore', 0),
                raw_data=play
            )
            events.append(event)

        return events

    def _is_new_event(self, event: PlayEvent) -> bool:
        """Check if event is new."""
        return event.play_id != self.last_play_id


class TrackingDataAdapter(DataFeedAdapter):
    """Adapter for high-frequency tracking data streams."""

    def __init__(self, websocket_url: str):
        self.websocket_url = websocket_url
        self.websocket = None
        self.running = False
        self.buffer: deque = deque(maxlen=100)

    async def connect(self):
        """Connect to WebSocket stream."""
        import websockets
        self.websocket = await websockets.connect(self.websocket_url)
        self.running = True

    async def disconnect(self):
        """Close WebSocket connection."""
        self.running = False
        if self.websocket:
            await self.websocket.close()

    async def stream_events(self):
        """Stream tracking frames."""
        while self.running:
            try:
                message = await asyncio.wait_for(
                    self.websocket.recv(),
                    timeout=1.0
                )
                frame = json.loads(message)
                yield self._parse_tracking_frame(frame)

            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"WebSocket error: {e}")
                break

    def _parse_tracking_frame(self, frame: Dict) -> Dict:
        """Parse tracking frame data."""
        return {
            'frame_id': frame.get('frameId'),
            'timestamp': frame.get('timestamp'),
            'players': [
                {
                    'player_id': p.get('playerId'),
                    'x': p.get('x'),
                    'y': p.get('y'),
                    'speed': p.get('speed'),
                    'direction': p.get('direction')
                }
                for p in frame.get('players', [])
            ],
            'ball': frame.get('ball', {})
        }


class StreamProcessor:
    """Process incoming data streams."""

    def __init__(self, engine: RealTimeEngine):
        self.engine = engine
        self.adapters: List[DataFeedAdapter] = []
        self.processors: List[Callable] = []

    def add_adapter(self, adapter: DataFeedAdapter):
        """Add a data feed adapter."""
        self.adapters.append(adapter)

    def add_processor(self, processor: Callable):
        """Add a data processor function."""
        self.processors.append(processor)

    async def start(self):
        """Start processing all streams."""
        tasks = []
        for adapter in self.adapters:
            await adapter.connect()
            tasks.append(asyncio.create_task(self._process_stream(adapter)))

        await asyncio.gather(*tasks)

    async def _process_stream(self, adapter: DataFeedAdapter):
        """Process events from a single stream."""
        async for event in adapter.stream_events():
            # Apply processors
            processed_event = event
            for processor in self.processors:
                processed_event = processor(processed_event)

            # Send to engine
            if isinstance(processed_event, PlayEvent):
                self.engine.ingest_event(processed_event)

26.2.2 Data Validation and Quality

from enum import Enum
from dataclasses import dataclass

class DataQualityLevel(Enum):
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INVALID = "invalid"


@dataclass
class ValidationResult:
    """Result of data validation."""
    is_valid: bool
    quality_level: DataQualityLevel
    errors: List[str]
    warnings: List[str]


class RealTimeDataValidator:
    """Validate real-time data for quality and consistency."""

    def __init__(self):
        self.validation_rules: List[Callable] = []
        self.historical_stats: Dict[str, Dict] = {}

    def add_rule(self, rule: Callable):
        """Add a validation rule."""
        self.validation_rules.append(rule)

    def validate_event(self, event: PlayEvent, state: GameState) -> ValidationResult:
        """Validate a play event."""
        errors = []
        warnings = []

        # Basic field validation
        if not event.game_id:
            errors.append("Missing game_id")
        if not event.play_id:
            errors.append("Missing play_id")

        # Temporal validation
        if event.timestamp < state.last_updated:
            warnings.append("Event timestamp before last update")

        # Score consistency
        if event.home_score < state.home_score or event.away_score < state.away_score:
            errors.append("Score decreased - invalid")

        # Field position bounds
        if hasattr(event, 'field_position'):
            if event.raw_data.get('field_position', 0) < 0 or \
               event.raw_data.get('field_position', 0) > 100:
                errors.append("Invalid field position")

        # Run custom rules
        for rule in self.validation_rules:
            rule_result = rule(event, state)
            errors.extend(rule_result.get('errors', []))
            warnings.extend(rule_result.get('warnings', []))

        # Determine quality level
        if errors:
            quality = DataQualityLevel.INVALID
        elif warnings:
            quality = DataQualityLevel.MEDIUM
        else:
            quality = DataQualityLevel.HIGH

        return ValidationResult(
            is_valid=len(errors) == 0,
            quality_level=quality,
            errors=errors,
            warnings=warnings
        )

    def validate_tracking_frame(self, frame: Dict) -> ValidationResult:
        """Validate tracking data frame."""
        errors = []
        warnings = []

        # Check for required fields
        if 'frame_id' not in frame:
            errors.append("Missing frame_id")
        if 'players' not in frame:
            errors.append("Missing players data")

        # Validate player positions
        for player in frame.get('players', []):
            x, y = player.get('x', 0), player.get('y', 0)

            # Field bounds check
            if not (0 <= x <= 120):
                errors.append(f"Player {player.get('player_id')} x out of bounds")
            if not (0 <= y <= 53.3):
                errors.append(f"Player {player.get('player_id')} y out of bounds")

            # Speed sanity check (no one runs >15 yards/sec)
            speed = player.get('speed', 0)
            if speed > 15:
                warnings.append(f"Unusually high speed for player {player.get('player_id')}")

        # Check player count
        player_count = len(frame.get('players', []))
        if player_count < 22:
            warnings.append(f"Only {player_count} players detected")
        elif player_count > 22:
            errors.append(f"Too many players detected: {player_count}")

        quality = DataQualityLevel.INVALID if errors else \
                  DataQualityLevel.MEDIUM if warnings else \
                  DataQualityLevel.HIGH

        return ValidationResult(
            is_valid=len(errors) == 0,
            quality_level=quality,
            errors=errors,
            warnings=warnings
        )


class DataQualityMonitor:
    """Monitor data quality over time."""

    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.validation_history: deque = deque(maxlen=window_size)
        self.error_counts: Dict[str, int] = {}

    def record_validation(self, result: ValidationResult):
        """Record a validation result."""
        self.validation_history.append(result)

        for error in result.errors:
            self.error_counts[error] = self.error_counts.get(error, 0) + 1

    def get_quality_metrics(self) -> Dict:
        """Get current quality metrics."""
        if not self.validation_history:
            return {}

        valid_count = sum(1 for r in self.validation_history if r.is_valid)
        quality_distribution = {}

        for r in self.validation_history:
            level = r.quality_level.value
            quality_distribution[level] = quality_distribution.get(level, 0) + 1

        return {
            'valid_rate': valid_count / len(self.validation_history),
            'quality_distribution': quality_distribution,
            'top_errors': sorted(
                self.error_counts.items(),
                key=lambda x: x[1],
                reverse=True
            )[:5]
        }

26.3 Live Win Probability Models

26.3.1 Real-Time Win Probability Engine

import numpy as np
from scipy.special import expit
import pickle

class LiveWinProbabilityEngine:
    """Real-time win probability calculation."""

    def __init__(self, model_path: Optional[str] = None):
        self.model = None
        self.feature_means: Dict[str, float] = {}
        self.feature_stds: Dict[str, float] = {}

        if model_path:
            self.load_model(model_path)
        else:
            self._initialize_default_model()

    def _initialize_default_model(self):
        """Initialize with simple logistic model coefficients."""
        # Pre-trained coefficients for demonstration
        self.coefficients = {
            'intercept': 0.0,
            'score_diff': 0.12,
            'time_remaining_pct': -0.02,
            'score_diff_x_time': 0.08,
            'field_position': 0.015,
            'is_home': 0.15,
            'timeouts_diff': 0.05
        }

    def load_model(self, path: str):
        """Load pre-trained model from file."""
        with open(path, 'rb') as f:
            model_data = pickle.load(f)
            self.model = model_data.get('model')
            self.feature_means = model_data.get('means', {})
            self.feature_stds = model_data.get('stds', {})

    def calculate_win_probability(self,
                                   state: GameState,
                                   team: str) -> float:
        """Calculate win probability for specified team."""
        features = self._extract_features(state, team)

        if self.model:
            # Use trained model
            feature_vector = self._create_feature_vector(features)
            prob = self.model.predict_proba([feature_vector])[0][1]
        else:
            # Use simple logistic model
            prob = self._simple_model_prediction(features)

        return float(prob)

    def _extract_features(self, state: GameState, team: str) -> Dict:
        """Extract features for win probability model."""
        is_home = team == state.home_team

        if is_home:
            score_diff = state.home_score - state.away_score
        else:
            score_diff = state.away_score - state.home_score

        # Calculate time remaining as percentage of game
        quarter_seconds = 900  # 15 minutes
        total_game_seconds = quarter_seconds * 4
        elapsed = (state.quarter - 1) * quarter_seconds + (quarter_seconds - state.time_remaining)
        time_remaining_pct = 1 - (elapsed / total_game_seconds)

        # Field position from team's perspective
        if (is_home and state.possession == state.home_team) or \
           (not is_home and state.possession == state.away_team):
            field_pos = state.field_position
            has_possession = True
        else:
            field_pos = 100 - state.field_position
            has_possession = False

        return {
            'score_diff': score_diff,
            'time_remaining_pct': time_remaining_pct,
            'score_diff_x_time': score_diff * time_remaining_pct,
            'field_position': field_pos,
            'is_home': 1 if is_home else 0,
            'has_possession': 1 if has_possession else 0,
            'down': state.down,
            'distance': state.distance,
            'quarter': state.quarter
        }

    def _simple_model_prediction(self, features: Dict) -> float:
        """Simple logistic model prediction."""
        linear_pred = self.coefficients['intercept']

        for feature, coef in self.coefficients.items():
            if feature != 'intercept' and feature in features:
                linear_pred += coef * features[feature]

        return expit(linear_pred)

    def _create_feature_vector(self, features: Dict) -> np.ndarray:
        """Create normalized feature vector for ML model."""
        feature_order = ['score_diff', 'time_remaining_pct', 'field_position',
                         'is_home', 'has_possession', 'down', 'distance']

        vector = []
        for feat in feature_order:
            value = features.get(feat, 0)

            # Normalize if we have stats
            if feat in self.feature_means:
                value = (value - self.feature_means[feat]) / self.feature_stds.get(feat, 1)

            vector.append(value)

        return np.array(vector)


class WinProbabilityTracker:
    """Track win probability throughout a game."""

    def __init__(self, wp_engine: LiveWinProbabilityEngine):
        self.engine = wp_engine
        self.wp_history: Dict[str, List[Dict]] = {}

    def update(self, state: GameState, event: Optional[PlayEvent] = None):
        """Update win probabilities after state change."""
        game_id = state.game_id

        if game_id not in self.wp_history:
            self.wp_history[game_id] = []

        home_wp = self.engine.calculate_win_probability(state, state.home_team)
        away_wp = 1 - home_wp

        entry = {
            'timestamp': datetime.now().isoformat(),
            'quarter': state.quarter,
            'time_remaining': state.time_remaining,
            'home_wp': home_wp,
            'away_wp': away_wp,
            'home_score': state.home_score,
            'away_score': state.away_score,
            'play_id': event.play_id if event else None
        }

        self.wp_history[game_id].append(entry)

        return entry

    def get_wp_added(self, game_id: str, play_id: str) -> Optional[float]:
        """Calculate WPA for a specific play."""
        history = self.wp_history.get(game_id, [])

        for i, entry in enumerate(history):
            if entry['play_id'] == play_id and i > 0:
                return entry['home_wp'] - history[i-1]['home_wp']

        return None

    def get_game_chart_data(self, game_id: str) -> List[Dict]:
        """Get data formatted for win probability chart."""
        return self.wp_history.get(game_id, [])

    def get_key_plays(self, game_id: str, threshold: float = 0.10) -> List[Dict]:
        """Identify plays with large WPA."""
        history = self.wp_history.get(game_id, [])
        key_plays = []

        for i in range(1, len(history)):
            wpa = abs(history[i]['home_wp'] - history[i-1]['home_wp'])
            if wpa >= threshold:
                key_plays.append({
                    **history[i],
                    'wpa': history[i]['home_wp'] - history[i-1]['home_wp']
                })

        return sorted(key_plays, key=lambda x: abs(x['wpa']), reverse=True)

26.3.2 Decision Support System

class LiveDecisionSupport:
    """Real-time decision support for coaching staff."""

    def __init__(self, wp_engine: LiveWinProbabilityEngine):
        self.wp_engine = wp_engine
        self.fourth_down_model = None
        self.field_goal_model = None

    def analyze_fourth_down(self, state: GameState, team: str) -> Dict:
        """Analyze fourth down decision options."""
        options = {}

        # Current situation
        current_wp = self.wp_engine.calculate_win_probability(state, team)

        # Option 1: Go for it
        go_for_it_wp = self._simulate_go_for_it(state, team)
        options['go_for_it'] = {
            'expected_wp': go_for_it_wp,
            'wp_gain': go_for_it_wp - current_wp,
            'conversion_prob': self._estimate_conversion_prob(state)
        }

        # Option 2: Field goal (if in range)
        if state.field_position >= 60:  # Within ~40 yard FG range
            fg_wp = self._simulate_field_goal(state, team)
            options['field_goal'] = {
                'expected_wp': fg_wp,
                'wp_gain': fg_wp - current_wp,
                'make_prob': self._estimate_fg_prob(state)
            }

        # Option 3: Punt
        punt_wp = self._simulate_punt(state, team)
        options['punt'] = {
            'expected_wp': punt_wp,
            'wp_gain': punt_wp - current_wp,
            'expected_net_yards': self._estimate_punt_yards(state)
        }

        # Recommendation
        best_option = max(options.items(), key=lambda x: x[1]['expected_wp'])
        options['recommendation'] = best_option[0]
        options['confidence'] = best_option[1]['expected_wp'] - \
                               sorted([o['expected_wp'] for o in options.values()
                                      if isinstance(o, dict) and 'expected_wp' in o])[-2] \
                               if len(options) > 2 else 0

        return options

    def _simulate_go_for_it(self, state: GameState, team: str) -> float:
        """Simulate expected WP if going for it."""
        conv_prob = self._estimate_conversion_prob(state)

        # If conversion successful
        success_state = GameState(
            game_id=state.game_id,
            home_team=state.home_team,
            away_team=state.away_team,
            home_score=state.home_score,
            away_score=state.away_score,
            quarter=state.quarter,
            time_remaining=state.time_remaining - 6,  # Assume ~6 seconds
            possession=state.possession,
            field_position=min(state.field_position + state.distance, 100),
            down=1,
            distance=10
        )
        success_wp = self.wp_engine.calculate_win_probability(success_state, team)

        # If conversion fails (turnover on downs)
        fail_state = GameState(
            game_id=state.game_id,
            home_team=state.home_team,
            away_team=state.away_team,
            home_score=state.home_score,
            away_score=state.away_score,
            quarter=state.quarter,
            time_remaining=state.time_remaining - 6,
            possession=state.away_team if state.possession == state.home_team else state.home_team,
            field_position=100 - state.field_position,
            down=1,
            distance=10
        )
        fail_wp = self.wp_engine.calculate_win_probability(fail_state, team)

        return conv_prob * success_wp + (1 - conv_prob) * fail_wp

    def _simulate_field_goal(self, state: GameState, team: str) -> float:
        """Simulate expected WP if attempting field goal."""
        fg_prob = self._estimate_fg_prob(state)

        # If FG is good
        if team == state.home_team:
            make_state = GameState(
                game_id=state.game_id,
                home_team=state.home_team,
                away_team=state.away_team,
                home_score=state.home_score + 3,
                away_score=state.away_score,
                quarter=state.quarter,
                time_remaining=state.time_remaining - 6,
                possession=state.away_team,
                field_position=25,  # Touchback on kickoff
                down=1,
                distance=10
            )
        else:
            make_state = GameState(
                game_id=state.game_id,
                home_team=state.home_team,
                away_team=state.away_team,
                home_score=state.home_score,
                away_score=state.away_score + 3,
                quarter=state.quarter,
                time_remaining=state.time_remaining - 6,
                possession=state.home_team,
                field_position=25,
                down=1,
                distance=10
            )
        make_wp = self.wp_engine.calculate_win_probability(make_state, team)

        # If FG is missed
        miss_state = GameState(
            game_id=state.game_id,
            home_team=state.home_team,
            away_team=state.away_team,
            home_score=state.home_score,
            away_score=state.away_score,
            quarter=state.quarter,
            time_remaining=state.time_remaining - 6,
            possession=state.away_team if state.possession == state.home_team else state.home_team,
            field_position=max(100 - state.field_position, 20),  # Spot of kick or 20
            down=1,
            distance=10
        )
        miss_wp = self.wp_engine.calculate_win_probability(miss_state, team)

        return fg_prob * make_wp + (1 - fg_prob) * miss_wp

    def _simulate_punt(self, state: GameState, team: str) -> float:
        """Simulate expected WP if punting."""
        expected_net = self._estimate_punt_yards(state)

        punt_state = GameState(
            game_id=state.game_id,
            home_team=state.home_team,
            away_team=state.away_team,
            home_score=state.home_score,
            away_score=state.away_score,
            quarter=state.quarter,
            time_remaining=state.time_remaining - 6,
            possession=state.away_team if state.possession == state.home_team else state.home_team,
            field_position=max(100 - state.field_position - expected_net, 20),
            down=1,
            distance=10
        )

        return self.wp_engine.calculate_win_probability(punt_state, team)

    def _estimate_conversion_prob(self, state: GameState) -> float:
        """Estimate 4th down conversion probability."""
        # Simple model based on distance
        base_prob = 0.55  # Average 4th down conversion rate

        # Adjust for distance
        if state.distance <= 1:
            return 0.70
        elif state.distance <= 3:
            return 0.55
        elif state.distance <= 5:
            return 0.45
        else:
            return max(0.30 - (state.distance - 5) * 0.03, 0.15)

    def _estimate_fg_prob(self, state: GameState) -> float:
        """Estimate field goal probability."""
        fg_distance = 117 - state.field_position  # End zone is at 110, kick from ~7 behind

        # Simple distance-based model
        if fg_distance <= 30:
            return 0.95
        elif fg_distance <= 40:
            return 0.85
        elif fg_distance <= 50:
            return 0.70
        elif fg_distance <= 55:
            return 0.50
        else:
            return max(0.30 - (fg_distance - 55) * 0.05, 0.10)

    def _estimate_punt_yards(self, state: GameState) -> float:
        """Estimate expected punt net yards."""
        # Average net punt is about 40 yards
        # Adjust for field position (shorter punts near end zone)
        if state.field_position >= 60:
            return min(40, 100 - state.field_position - 10)
        return 40

26.4 In-Game Dashboards

26.4.1 Dashboard Data Service

from typing import Callable
import json

class DashboardDataService:
    """Serve real-time data to dashboard clients."""

    def __init__(self, engine: RealTimeEngine, wp_tracker: WinProbabilityTracker):
        self.engine = engine
        self.wp_tracker = wp_tracker
        self.connected_clients: Dict[str, List] = {}  # game_id -> websocket connections

    def get_game_snapshot(self, game_id: str) -> Dict:
        """Get current game state and metrics."""
        state = self.engine.game_states.get(game_id)
        if not state:
            return {'error': 'Game not found'}

        wp_data = self.wp_tracker.get_game_chart_data(game_id)
        latest_wp = wp_data[-1] if wp_data else None

        return {
            'game_state': state.to_dict(),
            'win_probability': {
                'home': latest_wp['home_wp'] if latest_wp else 0.5,
                'away': latest_wp['away_wp'] if latest_wp else 0.5
            },
            'wp_chart_data': wp_data[-50:],  # Last 50 data points
            'key_plays': self.wp_tracker.get_key_plays(game_id)[:5],
            'performance': self.engine.get_performance_stats(),
            'timestamp': datetime.now().isoformat()
        }

    def get_live_metrics(self, game_id: str) -> Dict:
        """Get real-time analytics metrics."""
        state = self.engine.game_states.get(game_id)
        if not state:
            return {}

        return {
            'scoring_pace': self._calculate_scoring_pace(state),
            'time_of_possession': self._estimate_time_of_possession(game_id),
            'momentum': self._calculate_momentum(game_id),
            'leverage_index': self._calculate_leverage_index(state)
        }

    def _calculate_scoring_pace(self, state: GameState) -> Dict:
        """Calculate projected final score based on current pace."""
        elapsed_quarters = state.quarter - 1 + (900 - state.time_remaining) / 900

        if elapsed_quarters > 0:
            home_pace = state.home_score / elapsed_quarters * 4
            away_pace = state.away_score / elapsed_quarters * 4
        else:
            home_pace = away_pace = 0

        return {
            'home_projected': round(home_pace),
            'away_projected': round(away_pace)
        }

    def _calculate_momentum(self, game_id: str) -> float:
        """Calculate momentum based on recent WP changes."""
        wp_history = self.wp_tracker.wp_history.get(game_id, [])

        if len(wp_history) < 5:
            return 0

        # Look at last 5 plays
        recent = wp_history[-5:]
        wp_change = recent[-1]['home_wp'] - recent[0]['home_wp']

        # Normalize to -1 to 1 scale
        return max(min(wp_change * 5, 1), -1)

    def _calculate_leverage_index(self, state: GameState) -> float:
        """Calculate how important the current situation is."""
        # Higher leverage = closer game + less time
        score_diff = abs(state.home_score - state.away_score)
        time_factor = 1 - (state.quarter - 1) / 4

        # Base leverage decreases with score differential
        if score_diff == 0:
            base = 2.0
        elif score_diff <= 3:
            base = 1.5
        elif score_diff <= 7:
            base = 1.0
        elif score_diff <= 14:
            base = 0.6
        else:
            base = 0.3

        # Multiply by time factor (more leverage late in game)
        return base * (1 + time_factor)

    def _estimate_time_of_possession(self, game_id: str) -> Dict:
        """Estimate time of possession based on play history."""
        # Simplified estimation
        return {
            'home': 0.50,
            'away': 0.50
        }


class WebSocketServer:
    """WebSocket server for real-time dashboard updates."""

    def __init__(self, data_service: DashboardDataService):
        self.data_service = data_service
        self.connections: Dict[str, set] = {}  # game_id -> set of websockets

    async def handler(self, websocket, path):
        """Handle WebSocket connections."""
        game_id = path.strip('/')

        # Register connection
        if game_id not in self.connections:
            self.connections[game_id] = set()
        self.connections[game_id].add(websocket)

        try:
            # Send initial snapshot
            snapshot = self.data_service.get_game_snapshot(game_id)
            await websocket.send(json.dumps({
                'type': 'snapshot',
                'data': snapshot
            }))

            # Keep connection alive and handle incoming messages
            async for message in websocket:
                msg_data = json.loads(message)

                if msg_data.get('type') == 'request_metrics':
                    metrics = self.data_service.get_live_metrics(game_id)
                    await websocket.send(json.dumps({
                        'type': 'metrics',
                        'data': metrics
                    }))

        finally:
            # Unregister connection
            self.connections[game_id].discard(websocket)

    async def broadcast_update(self, game_id: str, update: Dict):
        """Broadcast update to all connected clients for a game."""
        if game_id in self.connections:
            message = json.dumps({
                'type': 'update',
                'data': update
            })

            await asyncio.gather(*[
                ws.send(message) for ws in self.connections[game_id]
            ])

26.4.2 Dashboard UI Components

# Flask-based dashboard backend
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit, join_room

class DashboardApp:
    """Flask application for analytics dashboard."""

    def __init__(self, data_service: DashboardDataService):
        self.app = Flask(__name__)
        self.socketio = SocketIO(self.app, cors_allowed_origins="*")
        self.data_service = data_service

        self._setup_routes()
        self._setup_socket_handlers()

    def _setup_routes(self):
        """Setup HTTP routes."""

        @self.app.route('/')
        def index():
            return render_template('dashboard.html')

        @self.app.route('/api/games')
        def list_games():
            games = list(self.data_service.engine.game_states.keys())
            return jsonify({'games': games})

        @self.app.route('/api/game/<game_id>')
        def get_game(game_id):
            snapshot = self.data_service.get_game_snapshot(game_id)
            return jsonify(snapshot)

        @self.app.route('/api/game/<game_id>/decision')
        def get_decision(game_id):
            state = self.data_service.engine.game_states.get(game_id)
            if not state:
                return jsonify({'error': 'Game not found'}), 404

            # Get decision analysis
            decision_support = LiveDecisionSupport(
                self.data_service.wp_tracker.engine
            )
            analysis = decision_support.analyze_fourth_down(
                state, state.possession
            )
            return jsonify(analysis)

    def _setup_socket_handlers(self):
        """Setup WebSocket handlers."""

        @self.socketio.on('join')
        def on_join(data):
            game_id = data.get('game_id')
            join_room(game_id)

            # Send initial data
            snapshot = self.data_service.get_game_snapshot(game_id)
            emit('game_update', snapshot)

        @self.socketio.on('request_update')
        def on_request_update(data):
            game_id = data.get('game_id')
            snapshot = self.data_service.get_game_snapshot(game_id)
            emit('game_update', snapshot)

    def broadcast_to_game(self, game_id: str, data: Dict):
        """Broadcast update to all clients watching a game."""
        self.socketio.emit('game_update', data, room=game_id)

    def run(self, host='0.0.0.0', port=5000):
        """Run the dashboard server."""
        self.socketio.run(self.app, host=host, port=port)
<!-- templates/dashboard.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Live Football Analytics Dashboard</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
            background: #1a1a2e;
            color: white;
        }
        .dashboard {
            display: grid;
            grid-template-columns: 1fr 1fr;
            gap: 20px;
            max-width: 1400px;
            margin: 0 auto;
        }
        .card {
            background: #16213e;
            border-radius: 10px;
            padding: 20px;
        }
        .scoreboard {
            grid-column: span 2;
            display: flex;
            justify-content: center;
            align-items: center;
            gap: 50px;
        }
        .team-score {
            text-align: center;
        }
        .team-name {
            font-size: 1.5rem;
            margin-bottom: 10px;
        }
        .score {
            font-size: 4rem;
            font-weight: bold;
        }
        .win-prob {
            font-size: 1.2rem;
            color: #4ecca3;
        }
        .wp-chart {
            height: 300px;
        }
        .game-info {
            display: flex;
            justify-content: space-around;
        }
        .info-item {
            text-align: center;
        }
        .key-plays {
            max-height: 300px;
            overflow-y: auto;
        }
        .play-item {
            padding: 10px;
            border-bottom: 1px solid #333;
        }
        .positive { color: #4ecca3; }
        .negative { color: #e74c3c; }
    </style>
</head>
<body>
    <div class="dashboard">
        <div class="card scoreboard">
            <div class="team-score">
                <div class="team-name" id="away-team">Away</div>
                <div class="score" id="away-score">0</div>
                <div class="win-prob" id="away-wp">50%</div>
            </div>
            <div class="game-status">
                <div id="quarter">Q1</div>
                <div id="time">15:00</div>
                <div id="possession"></div>
            </div>
            <div class="team-score">
                <div class="team-name" id="home-team">Home</div>
                <div class="score" id="home-score">0</div>
                <div class="win-prob" id="home-wp">50%</div>
            </div>
        </div>

        <div class="card">
            <h3>Win Probability</h3>
            <div class="wp-chart">
                <canvas id="wp-chart"></canvas>
            </div>
        </div>

        <div class="card">
            <h3>Game Situation</h3>
            <div class="game-info">
                <div class="info-item">
                    <div>Down & Distance</div>
                    <div id="down-distance">1st & 10</div>
                </div>
                <div class="info-item">
                    <div>Field Position</div>
                    <div id="field-pos">Own 25</div>
                </div>
                <div class="info-item">
                    <div>Leverage Index</div>
                    <div id="leverage">1.0</div>
                </div>
            </div>
        </div>

        <div class="card">
            <h3>Key Plays</h3>
            <div class="key-plays" id="key-plays"></div>
        </div>

        <div class="card">
            <h3>Decision Support</h3>
            <div id="decision-support">
                <p>Fourth down analysis available when applicable</p>
            </div>
        </div>
    </div>

    <script>
        const socket = io();
        const gameId = new URLSearchParams(window.location.search).get('game') || 'default';
        let wpChart;

        // Initialize chart
        function initChart() {
            const ctx = document.getElementById('wp-chart').getContext('2d');
            wpChart = new Chart(ctx, {
                type: 'line',
                data: {
                    labels: [],
                    datasets: [{
                        label: 'Home Win Probability',
                        data: [],
                        borderColor: '#4ecca3',
                        fill: true,
                        backgroundColor: 'rgba(78, 204, 163, 0.1)'
                    }]
                },
                options: {
                    responsive: true,
                    maintainAspectRatio: false,
                    scales: {
                        y: {
                            min: 0,
                            max: 1,
                            ticks: { callback: v => (v * 100) + '%' }
                        }
                    }
                }
            });
        }

        // Update dashboard
        function updateDashboard(data) {
            const state = data.game_state;
            const wp = data.win_probability;

            // Update scores
            document.getElementById('home-team').textContent = state.home_team;
            document.getElementById('away-team').textContent = state.away_team;
            document.getElementById('home-score').textContent = state.home_score;
            document.getElementById('away-score').textContent = state.away_score;

            // Update win probabilities
            document.getElementById('home-wp').textContent = (wp.home * 100).toFixed(1) + '%';
            document.getElementById('away-wp').textContent = (wp.away * 100).toFixed(1) + '%';

            // Update game situation
            document.getElementById('quarter').textContent = 'Q' + state.quarter;
            const mins = Math.floor(state.time_remaining / 60);
            const secs = Math.floor(state.time_remaining % 60);
            document.getElementById('time').textContent = mins + ':' + secs.toString().padStart(2, '0');
            document.getElementById('down-distance').textContent =
                ordinal(state.down) + ' & ' + state.distance;

            // Update chart
            if (data.wp_chart_data) {
                wpChart.data.labels = data.wp_chart_data.map((_, i) => i);
                wpChart.data.datasets[0].data = data.wp_chart_data.map(d => d.home_wp);
                wpChart.update('none');
            }

            // Update key plays
            const keyPlaysDiv = document.getElementById('key-plays');
            keyPlaysDiv.innerHTML = data.key_plays.map(play => `
                <div class="play-item">
                    <span class="${play.wpa >= 0 ? 'positive' : 'negative'}">
                        ${(play.wpa * 100).toFixed(1)}% WPA
                    </span>
                    - Q${play.quarter} ${formatTime(play.time_remaining)}
                </div>
            `).join('');
        }

        function ordinal(n) {
            const s = ['th', 'st', 'nd', 'rd'];
            const v = n % 100;
            return n + (s[(v - 20) % 10] || s[v] || s[0]);
        }

        function formatTime(seconds) {
            const m = Math.floor(seconds / 60);
            const s = Math.floor(seconds % 60);
            return m + ':' + s.toString().padStart(2, '0');
        }

        // Socket handlers
        socket.on('connect', () => {
            socket.emit('join', { game_id: gameId });
        });

        socket.on('game_update', updateDashboard);

        // Initialize
        initChart();
    </script>
</body>
</html>

26.5 Production Deployment

26.5.1 System Monitoring

import logging
from prometheus_client import Counter, Histogram, Gauge, start_http_server

class MetricsCollector:
    """Collect and expose system metrics."""

    def __init__(self):
        # Event processing metrics
        self.events_processed = Counter(
            'events_processed_total',
            'Total events processed',
            ['game_id', 'event_type']
        )
        self.processing_latency = Histogram(
            'event_processing_latency_seconds',
            'Event processing latency',
            buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]
        )
        self.queue_size = Gauge(
            'event_queue_size',
            'Current event queue size'
        )

        # Data quality metrics
        self.validation_errors = Counter(
            'validation_errors_total',
            'Total validation errors',
            ['error_type']
        )
        self.data_quality = Gauge(
            'data_quality_score',
            'Current data quality score',
            ['game_id']
        )

        # Connection metrics
        self.active_connections = Gauge(
            'active_websocket_connections',
            'Number of active WebSocket connections',
            ['game_id']
        )

    def start_metrics_server(self, port: int = 8000):
        """Start Prometheus metrics server."""
        start_http_server(port)
        logging.info(f"Metrics server started on port {port}")


class SystemMonitor:
    """Monitor system health and performance."""

    def __init__(self, metrics: MetricsCollector):
        self.metrics = metrics
        self.logger = logging.getLogger('SystemMonitor')

    def check_health(self, engine: RealTimeEngine) -> Dict:
        """Perform health check."""
        health = {
            'status': 'healthy',
            'checks': {}
        }

        # Check event processing
        stats = engine.get_performance_stats()
        if stats.get('queue_size', 0) > 1000:
            health['status'] = 'degraded'
            health['checks']['queue'] = 'Queue backlog detected'

        if stats.get('p95_latency_ms', 0) > 100:
            health['status'] = 'degraded'
            health['checks']['latency'] = 'High latency detected'

        # Check memory usage
        import psutil
        memory = psutil.Process().memory_info()
        if memory.rss > 2 * 1024 * 1024 * 1024:  # 2GB
            health['status'] = 'degraded'
            health['checks']['memory'] = 'High memory usage'

        health['metrics'] = {
            'events_processed': stats.get('events_processed', 0),
            'avg_latency_ms': stats.get('avg_latency_ms', 0),
            'queue_size': stats.get('queue_size', 0),
            'memory_mb': memory.rss / (1024 * 1024)
        }

        return health

    def setup_alerts(self):
        """Configure alerting rules."""
        self.alert_rules = [
            {
                'name': 'high_latency',
                'condition': lambda s: s.get('avg_latency_ms', 0) > 100,
                'severity': 'warning',
                'message': 'Event processing latency exceeds 100ms'
            },
            {
                'name': 'queue_backlog',
                'condition': lambda s: s.get('queue_size', 0) > 500,
                'severity': 'critical',
                'message': 'Event queue backlog exceeds 500'
            },
            {
                'name': 'data_quality',
                'condition': lambda s: s.get('valid_rate', 1) < 0.95,
                'severity': 'warning',
                'message': 'Data quality below 95%'
            }
        ]

    def check_alerts(self, stats: Dict) -> List[Dict]:
        """Check for alert conditions."""
        triggered = []

        for rule in self.alert_rules:
            if rule['condition'](stats):
                alert = {
                    'name': rule['name'],
                    'severity': rule['severity'],
                    'message': rule['message'],
                    'timestamp': datetime.now().isoformat()
                }
                triggered.append(alert)
                self.logger.warning(f"Alert triggered: {alert}")

        return triggered

26.5.2 Deployment Configuration

# Docker configuration
DOCKERFILE = """
FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose ports
EXPOSE 5000 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
    CMD curl -f http://localhost:5000/health || exit 1

# Run application
CMD ["python", "-m", "app.main"]
"""

# Docker Compose configuration
DOCKER_COMPOSE = """
version: '3.8'

services:
  analytics-engine:
    build: .
    ports:
      - "5000:5000"
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092
    depends_on:
      - redis
      - kafka
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 4G
        reservations:
          cpus: '1'
          memory: 2G

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    depends_on:
      - prometheus

volumes:
  redis_data:
"""


class DeploymentManager:
    """Manage deployment and scaling."""

    def __init__(self, config: Dict):
        self.config = config

    def generate_deployment_config(self) -> Dict:
        """Generate deployment configuration."""
        return {
            'docker': {
                'image': 'football-analytics:latest',
                'replicas': self.config.get('replicas', 2),
                'resources': {
                    'limits': {'cpu': '2', 'memory': '4Gi'},
                    'requests': {'cpu': '500m', 'memory': '1Gi'}
                }
            },
            'autoscaling': {
                'enabled': True,
                'min_replicas': 2,
                'max_replicas': 10,
                'target_cpu_utilization': 70,
                'target_memory_utilization': 80
            },
            'networking': {
                'ingress': {
                    'host': self.config.get('domain', 'analytics.example.com'),
                    'tls': True
                },
                'service': {
                    'type': 'ClusterIP',
                    'ports': [
                        {'name': 'http', 'port': 5000},
                        {'name': 'metrics', 'port': 8000}
                    ]
                }
            }
        }

    def get_scaling_recommendations(self, metrics: Dict) -> Dict:
        """Get scaling recommendations based on metrics."""
        recommendations = {
            'action': 'none',
            'reason': ''
        }

        avg_latency = metrics.get('avg_latency_ms', 0)
        queue_size = metrics.get('queue_size', 0)

        if avg_latency > 50 or queue_size > 100:
            recommendations['action'] = 'scale_up'
            recommendations['reason'] = 'High latency or queue backlog'
            recommendations['new_replicas'] = min(
                self.config.get('max_replicas', 10),
                self.config.get('current_replicas', 2) + 1
            )
        elif avg_latency < 10 and queue_size < 10:
            recommendations['action'] = 'scale_down'
            recommendations['reason'] = 'Low utilization'
            recommendations['new_replicas'] = max(
                self.config.get('min_replicas', 2),
                self.config.get('current_replicas', 2) - 1
            )

        return recommendations

Summary

Real-time analytics systems enable split-second decision support during football games. Key components include:

  1. Stream Processing: Ingest and process live data feeds with sub-second latency
  2. Data Validation: Ensure data quality in real-time environments
  3. Live Models: Win probability and decision support updated continuously
  4. Dashboard Delivery: WebSocket-based updates to coaching staff
  5. Production Operations: Monitoring, scaling, and reliability

The challenge is balancing speed with accuracy while maintaining system reliability under game-day loads.

Key Takeaways

  • Real-time systems require event-driven architectures
  • Data validation is critical when processing live feeds
  • Win probability models must update in milliseconds
  • WebSockets enable real-time dashboard updates
  • Production systems need comprehensive monitoring and alerting
  • Horizontal scaling handles variable game-day loads