Chapter 26 Exercises: Real-Time Analytics Systems
Exercise Set Overview
These exercises progress from basic real-time concepts to building production-ready analytics systems.
Level 1: Foundational Exercises
Exercise 26.1: Game State Management
Objective: Implement a game state tracker.
Task: Build a class that maintains current game state from events.
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class GameState:
"""Current state of a football game."""
home_team: str
away_team: str
home_score: int = 0
away_score: int = 0
quarter: int = 1
time_remaining: float = 900.0 # seconds
possession: str = ""
field_position: int = 25
down: int = 1
distance: int = 10
class GameStateTracker:
"""Track and update game state from events."""
def __init__(self, home_team: str, away_team: str):
self.state = GameState(home_team=home_team, away_team=away_team)
def process_event(self, event: dict) -> GameState:
"""
Process an event and update game state.
Event types to handle:
- 'rush': Update field position, down, distance
- 'pass': Update field position, down, distance
- 'touchdown': Update score, reset position
- 'field_goal': Update score, change possession
- 'turnover': Change possession
- 'punt': Change possession, update field position
- 'quarter_end': Update quarter, reset time
"""
# Your code here
pass
def get_situation_string(self) -> str:
"""Return human-readable situation string."""
# Example: "Q2 7:32 - 1st & 10 at OWN 25"
pass
# Test with sample events
events = [
{'type': 'rush', 'yards': 5, 'result': 'first_down'},
{'type': 'pass', 'yards': 15, 'result': 'complete'},
{'type': 'rush', 'yards': -2, 'result': 'tackle'},
{'type': 'touchdown', 'type': 'pass'},
]
Exercise 26.2: Simple Win Probability Calculator
Objective: Calculate basic win probability from game state.
Task: Implement a simple win probability model.
import numpy as np
from scipy.special import expit # Logistic function
class SimpleWinProbability:
"""Simple win probability model."""
def __init__(self):
# Pre-defined coefficients (would normally be trained)
self.coefficients = {
'score_diff': 0.15,
'time_remaining_pct': -0.05,
'possession_bonus': 0.10,
'field_position': 0.02
}
def calculate(self, state: GameState, team: str) -> float:
"""
Calculate win probability for specified team.
Consider:
- Score differential
- Time remaining
- Current possession
- Field position
Returns: probability between 0 and 1
"""
# Your code here
pass
def get_win_probability_chart_data(self,
history: list) -> dict:
"""
Format WP history for charting.
Returns:
{
'timestamps': [...],
'home_wp': [...],
'away_wp': [...]
}
"""
pass
# Test
state = GameState(
home_team='Ohio State',
away_team='Michigan',
home_score=21,
away_score=17,
quarter=3,
time_remaining=420,
possession='Ohio State',
field_position=65
)
# Expected: Ohio State WP > 0.5
Exercise 26.3: Event Queue Processing
Objective: Build a basic event processing queue.
Task: Implement a thread-safe event queue with processing.
import threading
from queue import Queue, Empty
import time
class EventProcessor:
"""Process events from a queue."""
def __init__(self):
self.event_queue = Queue()
self.processors = []
self.running = False
self.processed_count = 0
def add_processor(self, processor_func):
"""Add a processing function."""
self.processors.append(processor_func)
def submit_event(self, event: dict):
"""Submit event to queue."""
event['submitted_at'] = time.time()
self.event_queue.put(event)
def start(self):
"""Start processing thread."""
self.running = True
self.thread = threading.Thread(target=self._process_loop)
self.thread.start()
def stop(self):
"""Stop processing."""
self.running = False
self.thread.join(timeout=5.0)
def _process_loop(self):
"""Main processing loop."""
while self.running:
try:
event = self.event_queue.get(timeout=0.1)
self._process_event(event)
except Empty:
continue
def _process_event(self, event: dict):
"""Process single event through all processors."""
# Your code here
pass
def get_stats(self) -> dict:
"""Get processing statistics."""
# Your code here
pass
Exercise 26.4: Data Validation
Objective: Validate incoming real-time data.
Task: Build a validation system for play events.
from enum import Enum
from dataclasses import dataclass
from typing import List
class DataQuality(Enum):
VALID = "valid"
WARNING = "warning"
INVALID = "invalid"
@dataclass
class ValidationResult:
quality: DataQuality
errors: List[str]
warnings: List[str]
class PlayEventValidator:
"""Validate play event data."""
REQUIRED_FIELDS = ['game_id', 'play_id', 'play_type']
VALID_PLAY_TYPES = ['rush', 'pass', 'punt', 'field_goal', 'kickoff']
def validate(self, event: dict, current_state: GameState) -> ValidationResult:
"""
Validate a play event.
Check for:
- Required fields present
- Valid play type
- Consistent with game state (scores only increase)
- Valid field position (1-99)
- Valid down (1-4)
- Valid distance (1-99)
"""
# Your code here
pass
def _validate_required_fields(self, event: dict) -> List[str]:
"""Check required fields are present."""
pass
def _validate_consistency(self, event: dict, state: GameState) -> List[str]:
"""Check event is consistent with current state."""
pass
Level 2: Intermediate Exercises
Exercise 26.5: Live Decision Support
Objective: Build fourth-down decision analyzer.
Task: Implement real-time fourth-down decision support.
class FourthDownAnalyzer:
"""Real-time fourth-down decision analysis."""
def __init__(self, wp_calculator: SimpleWinProbability):
self.wp_calc = wp_calculator
def analyze(self, state: GameState, team: str) -> dict:
"""
Analyze fourth-down decision.
Returns:
{
'go_for_it': {
'expected_wp': float,
'conversion_prob': float,
'success_wp': float,
'fail_wp': float
},
'field_goal': { # if in range
'expected_wp': float,
'make_prob': float,
'make_wp': float,
'miss_wp': float
},
'punt': {
'expected_wp': float,
'expected_net': float
},
'recommendation': str,
'confidence': float
}
"""
# Your code here
pass
def _estimate_conversion_prob(self, distance: int) -> float:
"""Estimate 4th down conversion probability by distance."""
# Simplified model
if distance <= 1:
return 0.70
elif distance <= 3:
return 0.55
elif distance <= 5:
return 0.45
else:
return max(0.30 - (distance - 5) * 0.03, 0.15)
def _estimate_fg_prob(self, field_position: int) -> float:
"""Estimate field goal probability."""
# Distance = 100 - field_position + 17 (end zone + snap)
distance = 117 - field_position
# Your implementation
pass
def _estimate_punt_net(self, field_position: int) -> float:
"""Estimate punt net yards."""
pass
Exercise 26.6: Streaming Data Adapter
Objective: Build adapter for live data feeds.
Task: Implement a polling-based data feed adapter.
import asyncio
import aiohttp
from abc import ABC, abstractmethod
class DataFeedAdapter(ABC):
"""Abstract base for data feed adapters."""
@abstractmethod
async def connect(self):
pass
@abstractmethod
async def disconnect(self):
pass
@abstractmethod
async def stream_events(self):
"""Yield events as they arrive."""
pass
class PollingFeedAdapter(DataFeedAdapter):
"""Adapter that polls an HTTP endpoint."""
def __init__(self, url: str, poll_interval: float = 2.0):
self.url = url
self.poll_interval = poll_interval
self.session = None
self.running = False
self.last_event_id = None
async def connect(self):
"""Initialize HTTP session."""
self.session = aiohttp.ClientSession()
self.running = True
async def disconnect(self):
"""Close session."""
self.running = False
if self.session:
await self.session.close()
async def stream_events(self):
"""
Poll endpoint and yield new events.
Should:
- Poll at specified interval
- Track last seen event to avoid duplicates
- Handle connection errors gracefully
- Yield only new events
"""
# Your code here
pass
def _parse_response(self, data: dict) -> list:
"""Parse API response into events."""
pass
def _is_new_event(self, event: dict) -> bool:
"""Check if event is new."""
pass
# Usage:
# async for event in adapter.stream_events():
# process(event)
Exercise 26.7: WebSocket Dashboard Server
Objective: Build WebSocket server for live updates.
Task: Implement a WebSocket server that broadcasts updates.
import asyncio
import json
from typing import Set
class DashboardServer:
"""WebSocket server for dashboard updates."""
def __init__(self, state_tracker: GameStateTracker,
wp_calculator: SimpleWinProbability):
self.state_tracker = state_tracker
self.wp_calc = wp_calculator
self.clients: Set = set()
async def register(self, websocket):
"""Register new client connection."""
self.clients.add(websocket)
# Send initial state
await self._send_snapshot(websocket)
async def unregister(self, websocket):
"""Remove disconnected client."""
self.clients.discard(websocket)
async def broadcast(self, message: dict):
"""
Broadcast update to all clients.
Message should include:
- type: 'update', 'score', 'situation', etc.
- data: payload
- timestamp: ISO format
"""
# Your code here
pass
async def _send_snapshot(self, websocket):
"""Send current game snapshot to new client."""
# Your code here
pass
def get_current_snapshot(self) -> dict:
"""
Get current game snapshot.
Returns:
{
'game_state': {...},
'win_probability': {...},
'situation': str,
'timestamp': str
}
"""
pass
async def handle_client_message(self, websocket, message: str):
"""Handle incoming message from client."""
# Handle requests like 'get_stats', 'subscribe_team', etc.
pass
Exercise 26.8: Performance Metrics Collector
Objective: Collect and report system performance metrics.
Task: Build a metrics collection system.
from collections import deque
import time
import statistics
class MetricsCollector:
"""Collect and report performance metrics."""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.latencies = deque(maxlen=window_size)
self.event_counts = {}
self.error_counts = {}
self.start_time = time.time()
def record_latency(self, latency_ms: float, event_type: str = 'default'):
"""Record event processing latency."""
self.latencies.append(latency_ms)
self.event_counts[event_type] = self.event_counts.get(event_type, 0) + 1
def record_error(self, error_type: str):
"""Record an error occurrence."""
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
def get_metrics(self) -> dict:
"""
Get current metrics summary.
Returns:
{
'latency': {
'avg_ms': float,
'p50_ms': float,
'p95_ms': float,
'p99_ms': float,
'max_ms': float
},
'throughput': {
'events_per_second': float,
'total_events': int
},
'errors': {
'total': int,
'by_type': dict,
'error_rate': float
},
'uptime_seconds': float
}
"""
# Your code here
pass
def reset(self):
"""Reset all metrics."""
pass
def check_health(self, thresholds: dict) -> dict:
"""
Check system health against thresholds.
Example thresholds:
{
'max_latency_p95_ms': 100,
'max_error_rate': 0.01,
'min_throughput': 10
}
Returns:
{
'healthy': bool,
'issues': [str]
}
"""
pass
Level 3: Advanced Exercises
Exercise 26.9: Complete Real-Time Engine
Objective: Build integrated real-time analytics engine.
Task: Combine components into complete system.
class RealTimeAnalyticsEngine:
"""Complete real-time analytics engine."""
def __init__(self):
self.state_tracker = None
self.wp_calculator = None
self.decision_support = None
self.metrics = MetricsCollector()
self.validators = []
self.subscribers = []
def initialize_game(self, game_info: dict):
"""Initialize for a new game."""
# Your code here
pass
def process_event(self, event: dict) -> dict:
"""
Process incoming event and return analysis.
Pipeline:
1. Validate event
2. Update game state
3. Calculate win probability
4. Check for decision situations
5. Notify subscribers
6. Return comprehensive result
Returns:
{
'validation': ValidationResult,
'state': GameState,
'win_probability': {...},
'decision_analysis': {...} or None,
'key_metrics': {...},
'processing_time_ms': float
}
"""
# Your code here
pass
def subscribe(self, callback):
"""Subscribe to state updates."""
self.subscribers.append(callback)
def get_game_summary(self) -> dict:
"""Get comprehensive game summary."""
pass
def export_history(self, format: str = 'json') -> str:
"""Export game history."""
pass
# Integration test
engine = RealTimeAnalyticsEngine()
engine.initialize_game({
'home_team': 'Ohio State',
'away_team': 'Michigan',
'game_id': 'OSU-MICH-2024'
})
# Simulate game events
events = [...] # List of play events
for event in events:
result = engine.process_event(event)
print(f"WP: {result['win_probability']}")
Exercise 26.10: Live Visualization System
Objective: Build real-time visualization components.
Task: Create live-updating charts and displays.
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
class LiveVisualization:
"""Real-time game visualization."""
def __init__(self, engine: RealTimeAnalyticsEngine):
self.engine = engine
self.wp_history = []
self.fig = None
self.axes = None
def setup_dashboard(self):
"""Setup matplotlib dashboard with multiple panels."""
self.fig, self.axes = plt.subplots(2, 2, figsize=(14, 10))
# Panel 1: Win probability over time
# Panel 2: Score differential
# Panel 3: Current situation display
# Panel 4: Key metrics
# Your setup code here
pass
def update(self, frame):
"""Update visualization with latest data."""
# Get latest data from engine
# Update each panel
# Return artists that changed
pass
def animate(self, interval: int = 1000):
"""Start live animation."""
anim = FuncAnimation(
self.fig, self.update,
interval=interval,
blit=False
)
return anim
def save_snapshot(self, filepath: str):
"""Save current visualization state."""
pass
Exercise 26.11: Alerting System
Objective: Build an alerting system for key events.
Task: Implement configurable alerts.
from dataclasses import dataclass
from typing import Callable, List
@dataclass
class AlertRule:
"""Alert rule definition."""
name: str
condition: Callable[[dict], bool]
severity: str # 'low', 'medium', 'high', 'critical'
message_template: str
cooldown_seconds: float = 60.0
class AlertingSystem:
"""System for generating and managing alerts."""
def __init__(self):
self.rules: List[AlertRule] = []
self.triggered_alerts = []
self.last_triggered = {} # rule_name -> timestamp
def add_rule(self, rule: AlertRule):
"""Add an alert rule."""
self.rules.append(rule)
def add_common_rules(self):
"""Add common football alert rules."""
# Your implementation
# Examples:
# - Score change
# - Win probability swing > 15%
# - Fourth down situation
# - Red zone entry
# - Two-minute warning
pass
def check(self, state: dict) -> List[dict]:
"""
Check all rules against current state.
Returns list of triggered alerts:
[
{
'rule_name': str,
'severity': str,
'message': str,
'timestamp': datetime,
'state': dict
}
]
"""
# Your code here
pass
def _should_trigger(self, rule: AlertRule) -> bool:
"""Check if rule should trigger (respecting cooldown)."""
pass
def get_recent_alerts(self, minutes: int = 5) -> List[dict]:
"""Get alerts from last N minutes."""
pass
Level 4: Expert Challenges
Exercise 26.12: Production Deployment System
Objective: Build production-ready deployment configuration.
Task: Create complete deployment setup with monitoring.
class ProductionDeployment:
"""Production deployment configuration and management."""
def generate_docker_config(self) -> str:
"""
Generate Dockerfile for the analytics engine.
Include:
- Python 3.9+ base image
- All dependencies
- Health check endpoint
- Non-root user
- Proper signal handling
"""
# Your code here
pass
def generate_kubernetes_config(self) -> dict:
"""
Generate Kubernetes deployment configuration.
Include:
- Deployment with replicas
- Service for load balancing
- ConfigMap for settings
- HorizontalPodAutoscaler
- Health/readiness probes
"""
# Your code here
pass
def generate_monitoring_config(self) -> dict:
"""
Generate Prometheus/Grafana monitoring config.
Metrics to expose:
- Event processing latency
- Throughput
- Error rates
- Active connections
- Memory usage
"""
# Your code here
pass
def estimate_resources(self, expected_load: dict) -> dict:
"""
Estimate required resources.
Input:
{
'concurrent_games': int,
'events_per_second': int,
'connected_clients': int
}
Output:
{
'cpu_cores': int,
'memory_gb': int,
'replicas': int,
'estimated_cost': float
}
"""
pass
Exercise 26.13: Multi-Game Orchestration
Objective: Handle multiple simultaneous games.
Task: Build system to manage analytics for multiple games.
class GameOrchestrator:
"""Manage multiple simultaneous games."""
def __init__(self, max_games: int = 50):
self.max_games = max_games
self.active_games = {}
self.game_queues = {}
def start_game(self, game_id: str, game_info: dict) -> bool:
"""
Initialize analytics for a new game.
Should:
- Check capacity
- Create dedicated engine instance
- Setup event queue
- Initialize monitoring
"""
# Your code here
pass
def end_game(self, game_id: str) -> dict:
"""
End game and return final summary.
Should:
- Generate final report
- Archive history
- Clean up resources
- Return summary
"""
pass
def route_event(self, event: dict) -> dict:
"""Route event to correct game engine."""
pass
def get_all_games_status(self) -> dict:
"""Get status of all active games."""
pass
def get_aggregate_metrics(self) -> dict:
"""Get aggregate metrics across all games."""
pass
def rebalance_load(self):
"""Rebalance workload across workers."""
pass
Submission Guidelines
- Code Quality: Include comprehensive error handling
- Documentation: Document async patterns and thread safety
- Testing: Include load tests for real-time components
- Performance: Track and report latency metrics
Evaluation Criteria
| Level | Criteria | Points |
|---|---|---|
| Level 1 | Working state management | 25 |
| Level 2 | Stream processing, validation | 30 |
| Level 3 | Integrated real-time system | 30 |
| Level 4 | Production-ready deployment | 15 |
Resources
- asyncio documentation
- WebSocket protocol specifications
- Prometheus/Grafana for monitoring
- Docker and Kubernetes documentation