Case Study 1: Building a Live Game Dashboard for Broadcast
Overview
This case study details the development of a real-time analytics dashboard for a major sports broadcaster, providing live win probability, play analysis, and decision support during college football broadcasts.
Business Context
A major sports network needs to: - Display live win probability graphics during broadcasts - Provide analysts with real-time statistical insights - Power second-screen experiences for viewers - Support in-game decision analysis discussions - Handle multiple simultaneous games on game days
Requirements
requirements = {
'latency': {
'play_to_display': '<500ms',
'wp_calculation': '<100ms',
'dashboard_update': '<200ms'
},
'availability': {
'uptime_target': '99.9%',
'failover_time': '<30s'
},
'scale': {
'concurrent_games': 15,
'concurrent_users': 500000,
'events_per_second': 50
},
'integration': {
'data_feeds': ['official_pbp', 'tracking_data', 'odds_feeds'],
'output_targets': ['broadcast_graphics', 'web_app', 'mobile_app']
}
}
System Architecture
BROADCAST ANALYTICS SYSTEM ARCHITECTURE
=======================================
┌─────────────────────────────────────┐
│ Data Source Layer │
│ ┌─────┐ ┌─────────┐ ┌─────────┐ │
│ │ PBP │ │Tracking │ │ Odds │ │
│ │Feed │ │ Feed │ │ Feed │ │
│ └──┬──┘ └────┬────┘ └────┬────┘ │
└─────┼──────────┼───────────┼───────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────┐
│ Ingestion Layer │
│ ┌──────────────────────────┐ │
│ │ Apache Kafka Cluster │ │
│ │ (Event Streaming + Buffer)│ │
│ └───────────┬──────────────┘ │
└────────────────┼───────────────────┘
│
┌──────────┴──────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ Game 1 │ ... │ Game N │
│ Processor│ │ Processor│
└─────┬────┘ └────┬─────┘
│ │
▼ ▼
┌─────────────────────────────────────┐
│ Analytics Layer │
│ ┌─────────────────────────────┐ │
│ │ Win Probability Engine │ │
│ │ Decision Support Engine │ │
│ │ Play Classification │ │
│ └───────────┬─────────────────┘ │
└──────────────┼──────────────────────┘
│
┌──────────────┼──────────────────────┐
│ Storage Layer │
│ ┌────────┐ ┌─────────────────┐ │
│ │ Redis │ │ PostgreSQL │ │
│ │(Cache) │ │ (Historical) │ │
│ └────────┘ └─────────────────┘ │
└──────────────┬──────────────────────┘
│
┌──────────────┼──────────────────────┐
│ Delivery Layer │
│ ┌────────────────────────────────┐ │
│ │ WebSocket Gateway │ │
│ └──────────┬─────────────────────┘ │
└─────────────┼───────────────────────┘
┌───────┼───────┐
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌──────┐
│Graph│ │ Web │ │Mobile│
│ics │ │ App │ │ App │
└─────┘ └─────┘ └──────┘
Implementation
Step 1: Event Ingestion Service
import asyncio
import json
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from collections import defaultdict
import aiohttp
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
@dataclass
class GameEvent:
"""Standardized game event."""
game_id: str
event_id: str
timestamp: datetime
event_type: str
home_score: int
away_score: int
quarter: int
time_remaining: float
possession: str
field_position: int
down: int
distance: int
description: str
raw_data: Dict = field(default_factory=dict)
class EventIngestionService:
"""Ingest events from multiple data sources."""
def __init__(self, kafka_bootstrap: str):
self.kafka_bootstrap = kafka_bootstrap
self.producer: Optional[AIOKafkaProducer] = None
self.consumers: Dict[str, AIOKafkaConsumer] = {}
self.event_counts = defaultdict(int)
async def start(self):
"""Initialize Kafka connections."""
self.producer = AIOKafkaProducer(
bootstrap_servers=self.kafka_bootstrap,
value_serializer=lambda v: json.dumps(v).encode()
)
await self.producer.start()
async def stop(self):
"""Cleanup connections."""
if self.producer:
await self.producer.stop()
for consumer in self.consumers.values():
await consumer.stop()
async def ingest_from_feed(self, feed_url: str, game_id: str):
"""Poll a data feed and publish events to Kafka."""
async with aiohttp.ClientSession() as session:
last_event_id = None
while True:
try:
async with session.get(feed_url) as response:
if response.status == 200:
data = await response.json()
events = self._parse_feed(data, game_id)
for event in events:
if event.event_id != last_event_id:
await self._publish_event(event)
last_event_id = event.event_id
self.event_counts[game_id] += 1
except Exception as e:
print(f"Feed error for {game_id}: {e}")
await asyncio.sleep(1.0) # Poll every second
def _parse_feed(self, data: Dict, game_id: str) -> List[GameEvent]:
"""Parse raw feed data into standardized events."""
events = []
for play in data.get('plays', []):
event = GameEvent(
game_id=game_id,
event_id=play.get('playId', ''),
timestamp=datetime.now(),
event_type=play.get('playType', 'unknown'),
home_score=data.get('homeScore', 0),
away_score=data.get('awayScore', 0),
quarter=data.get('quarter', 1),
time_remaining=data.get('gameClockSeconds', 0),
possession=play.get('possession', ''),
field_position=play.get('yardLine', 25),
down=play.get('down', 1),
distance=play.get('distance', 10),
description=play.get('description', ''),
raw_data=play
)
events.append(event)
return events
async def _publish_event(self, event: GameEvent):
"""Publish event to Kafka topic."""
topic = f"game-events-{event.game_id}"
await self.producer.send_and_wait(
topic,
{
'event_id': event.event_id,
'timestamp': event.timestamp.isoformat(),
'event_type': event.event_type,
'home_score': event.home_score,
'away_score': event.away_score,
'quarter': event.quarter,
'time_remaining': event.time_remaining,
'possession': event.possession,
'field_position': event.field_position,
'down': event.down,
'distance': event.distance,
'description': event.description
}
)
Step 2: Game Processor Service
import numpy as np
from scipy.special import expit
class GameProcessor:
"""Process events for a single game."""
def __init__(self, game_id: str, home_team: str, away_team: str):
self.game_id = game_id
self.home_team = home_team
self.away_team = away_team
self.current_state = self._initial_state()
self.wp_history = []
self.play_history = []
def _initial_state(self) -> Dict:
"""Create initial game state."""
return {
'home_score': 0,
'away_score': 0,
'quarter': 1,
'time_remaining': 900.0,
'possession': self.home_team,
'field_position': 25,
'down': 1,
'distance': 10,
'home_wp': 0.53, # Slight home advantage
'last_updated': datetime.now().isoformat()
}
def process_event(self, event: Dict) -> Dict:
"""Process event and return updated analysis."""
start_time = datetime.now()
# Update state
self._update_state(event)
# Calculate win probability
home_wp = self._calculate_win_probability()
self.current_state['home_wp'] = home_wp
# Record history
self.wp_history.append({
'timestamp': datetime.now().isoformat(),
'home_wp': home_wp,
'event_id': event['event_id']
})
self.play_history.append({
'event': event,
'state_after': self.current_state.copy()
})
# Calculate additional metrics
wpa = self._calculate_wpa()
leverage = self._calculate_leverage()
processing_time = (datetime.now() - start_time).total_seconds() * 1000
return {
'game_id': self.game_id,
'state': self.current_state.copy(),
'home_wp': home_wp,
'away_wp': 1 - home_wp,
'wpa': wpa,
'leverage_index': leverage,
'processing_time_ms': processing_time
}
def _update_state(self, event: Dict):
"""Update game state from event."""
self.current_state['home_score'] = event['home_score']
self.current_state['away_score'] = event['away_score']
self.current_state['quarter'] = event['quarter']
self.current_state['time_remaining'] = event['time_remaining']
self.current_state['possession'] = event['possession']
self.current_state['field_position'] = event['field_position']
self.current_state['down'] = event['down']
self.current_state['distance'] = event['distance']
self.current_state['last_updated'] = datetime.now().isoformat()
def _calculate_win_probability(self) -> float:
"""Calculate home team win probability."""
state = self.current_state
# Score differential
score_diff = state['home_score'] - state['away_score']
# Time remaining (as fraction of game)
total_seconds = 3600 # 60 minutes
elapsed = (state['quarter'] - 1) * 900 + (900 - state['time_remaining'])
time_remaining_pct = 1 - (elapsed / total_seconds)
# Field position value
if state['possession'] == self.home_team:
field_value = (state['field_position'] - 50) / 50
else:
field_value = -(state['field_position'] - 50) / 50
# Simple model
linear = (
0.0 + # Intercept
0.12 * score_diff +
0.15 * score_diff * (1 - time_remaining_pct) +
0.05 * field_value +
0.10 # Home field advantage
)
return float(expit(linear))
def _calculate_wpa(self) -> float:
"""Calculate WPA for last play."""
if len(self.wp_history) < 2:
return 0.0
return self.wp_history[-1]['home_wp'] - self.wp_history[-2]['home_wp']
def _calculate_leverage(self) -> float:
"""Calculate leverage index."""
state = self.current_state
score_diff = abs(state['home_score'] - state['away_score'])
# Higher leverage = closer game + less time
if score_diff == 0:
base = 2.0
elif score_diff <= 3:
base = 1.5
elif score_diff <= 7:
base = 1.0
elif score_diff <= 14:
base = 0.6
else:
base = 0.3
# Increase leverage late in game
time_factor = 1 + (state['quarter'] - 1) * 0.25
return base * time_factor
def get_snapshot(self) -> Dict:
"""Get current game snapshot."""
return {
'game_id': self.game_id,
'home_team': self.home_team,
'away_team': self.away_team,
'state': self.current_state,
'wp_chart_data': self.wp_history[-100:], # Last 100 points
'key_plays': self._get_key_plays()
}
def _get_key_plays(self, threshold: float = 0.10) -> List[Dict]:
"""Get plays with significant WPA."""
key_plays = []
for i in range(1, len(self.wp_history)):
wpa = abs(self.wp_history[i]['home_wp'] - self.wp_history[i-1]['home_wp'])
if wpa >= threshold:
key_plays.append({
'event_id': self.wp_history[i]['event_id'],
'wpa': self.wp_history[i]['home_wp'] - self.wp_history[i-1]['home_wp'],
'home_wp': self.wp_history[i]['home_wp']
})
return sorted(key_plays, key=lambda x: abs(x['wpa']), reverse=True)[:5]
Step 3: WebSocket Gateway
import asyncio
import json
from typing import Dict, Set
import websockets
class BroadcastGateway:
"""WebSocket gateway for real-time updates."""
def __init__(self):
self.game_processors: Dict[str, GameProcessor] = {}
self.game_clients: Dict[str, Set] = {}
self.broadcast_queue = asyncio.Queue()
async def register_game(self, game_id: str, home_team: str, away_team: str):
"""Register a new game for broadcasting."""
self.game_processors[game_id] = GameProcessor(game_id, home_team, away_team)
self.game_clients[game_id] = set()
async def handle_client(self, websocket, path):
"""Handle new client connection."""
game_id = path.strip('/')
if game_id not in self.game_processors:
await websocket.close(1008, "Game not found")
return
# Register client
self.game_clients[game_id].add(websocket)
try:
# Send initial snapshot
processor = self.game_processors[game_id]
snapshot = processor.get_snapshot()
await websocket.send(json.dumps({
'type': 'snapshot',
'data': snapshot
}))
# Handle incoming messages
async for message in websocket:
await self._handle_message(websocket, game_id, message)
finally:
# Unregister client
self.game_clients[game_id].discard(websocket)
async def _handle_message(self, websocket, game_id: str, message: str):
"""Handle message from client."""
try:
msg = json.loads(message)
msg_type = msg.get('type')
if msg_type == 'ping':
await websocket.send(json.dumps({'type': 'pong'}))
elif msg_type == 'request_snapshot':
snapshot = self.game_processors[game_id].get_snapshot()
await websocket.send(json.dumps({
'type': 'snapshot',
'data': snapshot
}))
except json.JSONDecodeError:
pass
async def broadcast_update(self, game_id: str, update: Dict):
"""Broadcast update to all clients watching a game."""
if game_id not in self.game_clients:
return
message = json.dumps({
'type': 'update',
'data': update
})
dead_clients = set()
for client in self.game_clients[game_id]:
try:
await client.send(message)
except websockets.exceptions.ConnectionClosed:
dead_clients.add(client)
# Clean up dead connections
self.game_clients[game_id] -= dead_clients
async def process_and_broadcast(self, game_id: str, event: Dict):
"""Process event and broadcast results."""
if game_id not in self.game_processors:
return
processor = self.game_processors[game_id]
result = processor.process_event(event)
await self.broadcast_update(game_id, result)
return result
Step 4: Graphics Integration
class BroadcastGraphicsInterface:
"""Interface for broadcast graphics systems."""
def __init__(self, graphics_endpoint: str):
self.endpoint = graphics_endpoint
self.session = None
async def connect(self):
"""Initialize connection to graphics system."""
self.session = aiohttp.ClientSession()
async def disconnect(self):
"""Close connection."""
if self.session:
await self.session.close()
async def update_win_probability(self, game_id: str, home_wp: float, away_wp: float):
"""Send win probability update to graphics."""
payload = {
'command': 'UPDATE_WIN_PROBABILITY',
'game_id': game_id,
'home_wp': round(home_wp * 100, 1),
'away_wp': round(away_wp * 100, 1),
'timestamp': datetime.now().isoformat()
}
await self._send_command(payload)
async def trigger_key_play_graphic(self, play_data: Dict):
"""Trigger key play graphic."""
payload = {
'command': 'SHOW_KEY_PLAY',
'play_description': play_data['description'],
'wpa': round(play_data['wpa'] * 100, 1),
'duration_ms': 5000
}
await self._send_command(payload)
async def update_situation_display(self, state: Dict):
"""Update down and distance display."""
payload = {
'command': 'UPDATE_SITUATION',
'down': state['down'],
'distance': state['distance'],
'field_position': state['field_position'],
'possession': state['possession']
}
await self._send_command(payload)
async def _send_command(self, payload: Dict):
"""Send command to graphics system."""
try:
async with self.session.post(self.endpoint, json=payload) as response:
if response.status != 200:
print(f"Graphics error: {response.status}")
except Exception as e:
print(f"Graphics connection error: {e}")
Results
Performance Metrics
SYSTEM PERFORMANCE (CHAMPIONSHIP SATURDAY)
==========================================
Games Processed: 15 concurrent
Peak Events/Second: 47
Average Latency: 43ms
P95 Latency: 89ms
P99 Latency: 142ms
Uptime: 100% (8 hours)
Total Events Processed: 12,847
Total WP Updates Broadcast: 38,541
Client Metrics:
- Peak Concurrent Connections: 127,000
- Average Connection Duration: 42 minutes
- Messages Broadcast: 2.3 million
Win Probability Accuracy
WP MODEL CALIBRATION (2024 SEASON)
==================================
Predicted WP | Actual Win % | Sample Size
-------------|--------------|------------
0-10% | 8.2% | 1,234
10-20% | 14.7% | 2,456
20-30% | 26.1% | 3,891
30-40% | 35.8% | 4,567
40-50% | 46.2% | 5,123
50-60% | 53.4% | 5,234
60-70% | 64.8% | 4,678
70-80% | 73.9% | 3,456
80-90% | 84.7% | 2,345
90-100% | 92.1% | 1,567
Brier Score: 0.187 (Good calibration)
Viewer Engagement
SECOND SCREEN APP METRICS
=========================
Active Users During Games: 89,000 avg
Sessions per User: 2.3
Avg Session Duration: 38 minutes
Most Viewed Features:
1. Live Win Probability: 94% of users
2. Play-by-Play: 78% of users
3. Key Plays Highlights: 67% of users
4. Fourth Down Analysis: 45% of users
User Satisfaction: 4.2/5.0 stars
Lessons Learned
- Latency is Critical: Users notice delays over 500ms; optimize everything
- Graceful Degradation: Handle partial data; don't crash on missing fields
- Cache Aggressively: Win probability coefficients, team data, historical baselines
- Monitor Everything: Latency percentiles, queue depths, error rates
- Test at Scale: Game day loads are 10x normal; test accordingly
Recommendations
- Pre-warm Caches: Load all team/player data before kickoff
- Geographic Distribution: CDN for WebSocket connections
- Fallback Graphics: Static displays if real-time fails
- Alert Thresholds: Page on-call at 100ms p95 latency
- Post-Game Analysis: Archive all events for model improvement