Chapter 26 Exercises: Real-Time Analytics Systems

Exercise Set Overview

These exercises progress from basic real-time concepts to building production-ready analytics systems.


Level 1: Foundational Exercises

Exercise 26.1: Game State Management

Objective: Implement a game state tracker.

Task: Build a class that maintains current game state from events.

from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class GameState:
    """Current state of a football game."""
    home_team: str
    away_team: str
    home_score: int = 0
    away_score: int = 0
    quarter: int = 1
    time_remaining: float = 900.0  # seconds
    possession: str = ""
    field_position: int = 25
    down: int = 1
    distance: int = 10

class GameStateTracker:
    """Track and update game state from events."""

    def __init__(self, home_team: str, away_team: str):
        self.state = GameState(home_team=home_team, away_team=away_team)

    def process_event(self, event: dict) -> GameState:
        """
        Process an event and update game state.

        Event types to handle:
        - 'rush': Update field position, down, distance
        - 'pass': Update field position, down, distance
        - 'touchdown': Update score, reset position
        - 'field_goal': Update score, change possession
        - 'turnover': Change possession
        - 'punt': Change possession, update field position
        - 'quarter_end': Update quarter, reset time
        """
        # Your code here
        pass

    def get_situation_string(self) -> str:
        """Return human-readable situation string."""
        # Example: "Q2 7:32 - 1st & 10 at OWN 25"
        pass

# Test with sample events
events = [
    {'type': 'rush', 'yards': 5, 'result': 'first_down'},
    {'type': 'pass', 'yards': 15, 'result': 'complete'},
    {'type': 'rush', 'yards': -2, 'result': 'tackle'},
    {'type': 'touchdown', 'type': 'pass'},
]

Exercise 26.2: Simple Win Probability Calculator

Objective: Calculate basic win probability from game state.

Task: Implement a simple win probability model.

import numpy as np
from scipy.special import expit  # Logistic function

class SimpleWinProbability:
    """Simple win probability model."""

    def __init__(self):
        # Pre-defined coefficients (would normally be trained)
        self.coefficients = {
            'score_diff': 0.15,
            'time_remaining_pct': -0.05,
            'possession_bonus': 0.10,
            'field_position': 0.02
        }

    def calculate(self, state: GameState, team: str) -> float:
        """
        Calculate win probability for specified team.

        Consider:
        - Score differential
        - Time remaining
        - Current possession
        - Field position

        Returns: probability between 0 and 1
        """
        # Your code here
        pass

    def get_win_probability_chart_data(self,
                                        history: list) -> dict:
        """
        Format WP history for charting.

        Returns:
            {
                'timestamps': [...],
                'home_wp': [...],
                'away_wp': [...]
            }
        """
        pass

# Test
state = GameState(
    home_team='Ohio State',
    away_team='Michigan',
    home_score=21,
    away_score=17,
    quarter=3,
    time_remaining=420,
    possession='Ohio State',
    field_position=65
)
# Expected: Ohio State WP > 0.5

Exercise 26.3: Event Queue Processing

Objective: Build a basic event processing queue.

Task: Implement a thread-safe event queue with processing.

import threading
from queue import Queue, Empty
import time

class EventProcessor:
    """Process events from a queue."""

    def __init__(self):
        self.event_queue = Queue()
        self.processors = []
        self.running = False
        self.processed_count = 0

    def add_processor(self, processor_func):
        """Add a processing function."""
        self.processors.append(processor_func)

    def submit_event(self, event: dict):
        """Submit event to queue."""
        event['submitted_at'] = time.time()
        self.event_queue.put(event)

    def start(self):
        """Start processing thread."""
        self.running = True
        self.thread = threading.Thread(target=self._process_loop)
        self.thread.start()

    def stop(self):
        """Stop processing."""
        self.running = False
        self.thread.join(timeout=5.0)

    def _process_loop(self):
        """Main processing loop."""
        while self.running:
            try:
                event = self.event_queue.get(timeout=0.1)
                self._process_event(event)
            except Empty:
                continue

    def _process_event(self, event: dict):
        """Process single event through all processors."""
        # Your code here
        pass

    def get_stats(self) -> dict:
        """Get processing statistics."""
        # Your code here
        pass

Exercise 26.4: Data Validation

Objective: Validate incoming real-time data.

Task: Build a validation system for play events.

from enum import Enum
from dataclasses import dataclass
from typing import List

class DataQuality(Enum):
    VALID = "valid"
    WARNING = "warning"
    INVALID = "invalid"

@dataclass
class ValidationResult:
    quality: DataQuality
    errors: List[str]
    warnings: List[str]

class PlayEventValidator:
    """Validate play event data."""

    REQUIRED_FIELDS = ['game_id', 'play_id', 'play_type']
    VALID_PLAY_TYPES = ['rush', 'pass', 'punt', 'field_goal', 'kickoff']

    def validate(self, event: dict, current_state: GameState) -> ValidationResult:
        """
        Validate a play event.

        Check for:
        - Required fields present
        - Valid play type
        - Consistent with game state (scores only increase)
        - Valid field position (1-99)
        - Valid down (1-4)
        - Valid distance (1-99)
        """
        # Your code here
        pass

    def _validate_required_fields(self, event: dict) -> List[str]:
        """Check required fields are present."""
        pass

    def _validate_consistency(self, event: dict, state: GameState) -> List[str]:
        """Check event is consistent with current state."""
        pass

Level 2: Intermediate Exercises

Exercise 26.5: Live Decision Support

Objective: Build fourth-down decision analyzer.

Task: Implement real-time fourth-down decision support.

class FourthDownAnalyzer:
    """Real-time fourth-down decision analysis."""

    def __init__(self, wp_calculator: SimpleWinProbability):
        self.wp_calc = wp_calculator

    def analyze(self, state: GameState, team: str) -> dict:
        """
        Analyze fourth-down decision.

        Returns:
            {
                'go_for_it': {
                    'expected_wp': float,
                    'conversion_prob': float,
                    'success_wp': float,
                    'fail_wp': float
                },
                'field_goal': {  # if in range
                    'expected_wp': float,
                    'make_prob': float,
                    'make_wp': float,
                    'miss_wp': float
                },
                'punt': {
                    'expected_wp': float,
                    'expected_net': float
                },
                'recommendation': str,
                'confidence': float
            }
        """
        # Your code here
        pass

    def _estimate_conversion_prob(self, distance: int) -> float:
        """Estimate 4th down conversion probability by distance."""
        # Simplified model
        if distance <= 1:
            return 0.70
        elif distance <= 3:
            return 0.55
        elif distance <= 5:
            return 0.45
        else:
            return max(0.30 - (distance - 5) * 0.03, 0.15)

    def _estimate_fg_prob(self, field_position: int) -> float:
        """Estimate field goal probability."""
        # Distance = 100 - field_position + 17 (end zone + snap)
        distance = 117 - field_position
        # Your implementation
        pass

    def _estimate_punt_net(self, field_position: int) -> float:
        """Estimate punt net yards."""
        pass

Exercise 26.6: Streaming Data Adapter

Objective: Build adapter for live data feeds.

Task: Implement a polling-based data feed adapter.

import asyncio
import aiohttp
from abc import ABC, abstractmethod

class DataFeedAdapter(ABC):
    """Abstract base for data feed adapters."""

    @abstractmethod
    async def connect(self):
        pass

    @abstractmethod
    async def disconnect(self):
        pass

    @abstractmethod
    async def stream_events(self):
        """Yield events as they arrive."""
        pass

class PollingFeedAdapter(DataFeedAdapter):
    """Adapter that polls an HTTP endpoint."""

    def __init__(self, url: str, poll_interval: float = 2.0):
        self.url = url
        self.poll_interval = poll_interval
        self.session = None
        self.running = False
        self.last_event_id = None

    async def connect(self):
        """Initialize HTTP session."""
        self.session = aiohttp.ClientSession()
        self.running = True

    async def disconnect(self):
        """Close session."""
        self.running = False
        if self.session:
            await self.session.close()

    async def stream_events(self):
        """
        Poll endpoint and yield new events.

        Should:
        - Poll at specified interval
        - Track last seen event to avoid duplicates
        - Handle connection errors gracefully
        - Yield only new events
        """
        # Your code here
        pass

    def _parse_response(self, data: dict) -> list:
        """Parse API response into events."""
        pass

    def _is_new_event(self, event: dict) -> bool:
        """Check if event is new."""
        pass

# Usage:
# async for event in adapter.stream_events():
#     process(event)

Exercise 26.7: WebSocket Dashboard Server

Objective: Build WebSocket server for live updates.

Task: Implement a WebSocket server that broadcasts updates.

import asyncio
import json
from typing import Set

class DashboardServer:
    """WebSocket server for dashboard updates."""

    def __init__(self, state_tracker: GameStateTracker,
                 wp_calculator: SimpleWinProbability):
        self.state_tracker = state_tracker
        self.wp_calc = wp_calculator
        self.clients: Set = set()

    async def register(self, websocket):
        """Register new client connection."""
        self.clients.add(websocket)
        # Send initial state
        await self._send_snapshot(websocket)

    async def unregister(self, websocket):
        """Remove disconnected client."""
        self.clients.discard(websocket)

    async def broadcast(self, message: dict):
        """
        Broadcast update to all clients.

        Message should include:
        - type: 'update', 'score', 'situation', etc.
        - data: payload
        - timestamp: ISO format
        """
        # Your code here
        pass

    async def _send_snapshot(self, websocket):
        """Send current game snapshot to new client."""
        # Your code here
        pass

    def get_current_snapshot(self) -> dict:
        """
        Get current game snapshot.

        Returns:
            {
                'game_state': {...},
                'win_probability': {...},
                'situation': str,
                'timestamp': str
            }
        """
        pass

    async def handle_client_message(self, websocket, message: str):
        """Handle incoming message from client."""
        # Handle requests like 'get_stats', 'subscribe_team', etc.
        pass

Exercise 26.8: Performance Metrics Collector

Objective: Collect and report system performance metrics.

Task: Build a metrics collection system.

from collections import deque
import time
import statistics

class MetricsCollector:
    """Collect and report performance metrics."""

    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.latencies = deque(maxlen=window_size)
        self.event_counts = {}
        self.error_counts = {}
        self.start_time = time.time()

    def record_latency(self, latency_ms: float, event_type: str = 'default'):
        """Record event processing latency."""
        self.latencies.append(latency_ms)
        self.event_counts[event_type] = self.event_counts.get(event_type, 0) + 1

    def record_error(self, error_type: str):
        """Record an error occurrence."""
        self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1

    def get_metrics(self) -> dict:
        """
        Get current metrics summary.

        Returns:
            {
                'latency': {
                    'avg_ms': float,
                    'p50_ms': float,
                    'p95_ms': float,
                    'p99_ms': float,
                    'max_ms': float
                },
                'throughput': {
                    'events_per_second': float,
                    'total_events': int
                },
                'errors': {
                    'total': int,
                    'by_type': dict,
                    'error_rate': float
                },
                'uptime_seconds': float
            }
        """
        # Your code here
        pass

    def reset(self):
        """Reset all metrics."""
        pass

    def check_health(self, thresholds: dict) -> dict:
        """
        Check system health against thresholds.

        Example thresholds:
        {
            'max_latency_p95_ms': 100,
            'max_error_rate': 0.01,
            'min_throughput': 10
        }

        Returns:
            {
                'healthy': bool,
                'issues': [str]
            }
        """
        pass

Level 3: Advanced Exercises

Exercise 26.9: Complete Real-Time Engine

Objective: Build integrated real-time analytics engine.

Task: Combine components into complete system.

class RealTimeAnalyticsEngine:
    """Complete real-time analytics engine."""

    def __init__(self):
        self.state_tracker = None
        self.wp_calculator = None
        self.decision_support = None
        self.metrics = MetricsCollector()
        self.validators = []
        self.subscribers = []

    def initialize_game(self, game_info: dict):
        """Initialize for a new game."""
        # Your code here
        pass

    def process_event(self, event: dict) -> dict:
        """
        Process incoming event and return analysis.

        Pipeline:
        1. Validate event
        2. Update game state
        3. Calculate win probability
        4. Check for decision situations
        5. Notify subscribers
        6. Return comprehensive result

        Returns:
            {
                'validation': ValidationResult,
                'state': GameState,
                'win_probability': {...},
                'decision_analysis': {...} or None,
                'key_metrics': {...},
                'processing_time_ms': float
            }
        """
        # Your code here
        pass

    def subscribe(self, callback):
        """Subscribe to state updates."""
        self.subscribers.append(callback)

    def get_game_summary(self) -> dict:
        """Get comprehensive game summary."""
        pass

    def export_history(self, format: str = 'json') -> str:
        """Export game history."""
        pass

# Integration test
engine = RealTimeAnalyticsEngine()
engine.initialize_game({
    'home_team': 'Ohio State',
    'away_team': 'Michigan',
    'game_id': 'OSU-MICH-2024'
})

# Simulate game events
events = [...]  # List of play events
for event in events:
    result = engine.process_event(event)
    print(f"WP: {result['win_probability']}")

Exercise 26.10: Live Visualization System

Objective: Build real-time visualization components.

Task: Create live-updating charts and displays.

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

class LiveVisualization:
    """Real-time game visualization."""

    def __init__(self, engine: RealTimeAnalyticsEngine):
        self.engine = engine
        self.wp_history = []
        self.fig = None
        self.axes = None

    def setup_dashboard(self):
        """Setup matplotlib dashboard with multiple panels."""
        self.fig, self.axes = plt.subplots(2, 2, figsize=(14, 10))

        # Panel 1: Win probability over time
        # Panel 2: Score differential
        # Panel 3: Current situation display
        # Panel 4: Key metrics

        # Your setup code here
        pass

    def update(self, frame):
        """Update visualization with latest data."""
        # Get latest data from engine
        # Update each panel
        # Return artists that changed
        pass

    def animate(self, interval: int = 1000):
        """Start live animation."""
        anim = FuncAnimation(
            self.fig, self.update,
            interval=interval,
            blit=False
        )
        return anim

    def save_snapshot(self, filepath: str):
        """Save current visualization state."""
        pass

Exercise 26.11: Alerting System

Objective: Build an alerting system for key events.

Task: Implement configurable alerts.

from dataclasses import dataclass
from typing import Callable, List

@dataclass
class AlertRule:
    """Alert rule definition."""
    name: str
    condition: Callable[[dict], bool]
    severity: str  # 'low', 'medium', 'high', 'critical'
    message_template: str
    cooldown_seconds: float = 60.0

class AlertingSystem:
    """System for generating and managing alerts."""

    def __init__(self):
        self.rules: List[AlertRule] = []
        self.triggered_alerts = []
        self.last_triggered = {}  # rule_name -> timestamp

    def add_rule(self, rule: AlertRule):
        """Add an alert rule."""
        self.rules.append(rule)

    def add_common_rules(self):
        """Add common football alert rules."""
        # Your implementation
        # Examples:
        # - Score change
        # - Win probability swing > 15%
        # - Fourth down situation
        # - Red zone entry
        # - Two-minute warning
        pass

    def check(self, state: dict) -> List[dict]:
        """
        Check all rules against current state.

        Returns list of triggered alerts:
        [
            {
                'rule_name': str,
                'severity': str,
                'message': str,
                'timestamp': datetime,
                'state': dict
            }
        ]
        """
        # Your code here
        pass

    def _should_trigger(self, rule: AlertRule) -> bool:
        """Check if rule should trigger (respecting cooldown)."""
        pass

    def get_recent_alerts(self, minutes: int = 5) -> List[dict]:
        """Get alerts from last N minutes."""
        pass

Level 4: Expert Challenges

Exercise 26.12: Production Deployment System

Objective: Build production-ready deployment configuration.

Task: Create complete deployment setup with monitoring.

class ProductionDeployment:
    """Production deployment configuration and management."""

    def generate_docker_config(self) -> str:
        """
        Generate Dockerfile for the analytics engine.

        Include:
        - Python 3.9+ base image
        - All dependencies
        - Health check endpoint
        - Non-root user
        - Proper signal handling
        """
        # Your code here
        pass

    def generate_kubernetes_config(self) -> dict:
        """
        Generate Kubernetes deployment configuration.

        Include:
        - Deployment with replicas
        - Service for load balancing
        - ConfigMap for settings
        - HorizontalPodAutoscaler
        - Health/readiness probes
        """
        # Your code here
        pass

    def generate_monitoring_config(self) -> dict:
        """
        Generate Prometheus/Grafana monitoring config.

        Metrics to expose:
        - Event processing latency
        - Throughput
        - Error rates
        - Active connections
        - Memory usage
        """
        # Your code here
        pass

    def estimate_resources(self, expected_load: dict) -> dict:
        """
        Estimate required resources.

        Input:
            {
                'concurrent_games': int,
                'events_per_second': int,
                'connected_clients': int
            }

        Output:
            {
                'cpu_cores': int,
                'memory_gb': int,
                'replicas': int,
                'estimated_cost': float
            }
        """
        pass

Exercise 26.13: Multi-Game Orchestration

Objective: Handle multiple simultaneous games.

Task: Build system to manage analytics for multiple games.

class GameOrchestrator:
    """Manage multiple simultaneous games."""

    def __init__(self, max_games: int = 50):
        self.max_games = max_games
        self.active_games = {}
        self.game_queues = {}

    def start_game(self, game_id: str, game_info: dict) -> bool:
        """
        Initialize analytics for a new game.

        Should:
        - Check capacity
        - Create dedicated engine instance
        - Setup event queue
        - Initialize monitoring
        """
        # Your code here
        pass

    def end_game(self, game_id: str) -> dict:
        """
        End game and return final summary.

        Should:
        - Generate final report
        - Archive history
        - Clean up resources
        - Return summary
        """
        pass

    def route_event(self, event: dict) -> dict:
        """Route event to correct game engine."""
        pass

    def get_all_games_status(self) -> dict:
        """Get status of all active games."""
        pass

    def get_aggregate_metrics(self) -> dict:
        """Get aggregate metrics across all games."""
        pass

    def rebalance_load(self):
        """Rebalance workload across workers."""
        pass

Submission Guidelines

  1. Code Quality: Include comprehensive error handling
  2. Documentation: Document async patterns and thread safety
  3. Testing: Include load tests for real-time components
  4. Performance: Track and report latency metrics

Evaluation Criteria

Level Criteria Points
Level 1 Working state management 25
Level 2 Stream processing, validation 30
Level 3 Integrated real-time system 30
Level 4 Production-ready deployment 15

Resources

  • asyncio documentation
  • WebSocket protocol specifications
  • Prometheus/Grafana for monitoring
  • Docker and Kubernetes documentation