Case Study 1: Building a Live Game Dashboard for Broadcast

Overview

This case study details the development of a real-time analytics dashboard for a major sports broadcaster, providing live win probability, play analysis, and decision support during college football broadcasts.

Business Context

A major sports network needs to: - Display live win probability graphics during broadcasts - Provide analysts with real-time statistical insights - Power second-screen experiences for viewers - Support in-game decision analysis discussions - Handle multiple simultaneous games on game days

Requirements

requirements = {
    'latency': {
        'play_to_display': '<500ms',
        'wp_calculation': '<100ms',
        'dashboard_update': '<200ms'
    },
    'availability': {
        'uptime_target': '99.9%',
        'failover_time': '<30s'
    },
    'scale': {
        'concurrent_games': 15,
        'concurrent_users': 500000,
        'events_per_second': 50
    },
    'integration': {
        'data_feeds': ['official_pbp', 'tracking_data', 'odds_feeds'],
        'output_targets': ['broadcast_graphics', 'web_app', 'mobile_app']
    }
}

System Architecture

BROADCAST ANALYTICS SYSTEM ARCHITECTURE
=======================================

                    ┌─────────────────────────────────────┐
                    │        Data Source Layer            │
                    │  ┌─────┐  ┌─────────┐  ┌─────────┐ │
                    │  │ PBP │  │Tracking │  │  Odds   │ │
                    │  │Feed │  │  Feed   │  │  Feed   │ │
                    │  └──┬──┘  └────┬────┘  └────┬────┘ │
                    └─────┼──────────┼───────────┼───────┘
                          │          │           │
                          ▼          ▼           ▼
                    ┌─────────────────────────────────────┐
                    │         Ingestion Layer             │
                    │    ┌──────────────────────────┐    │
                    │    │    Apache Kafka Cluster   │    │
                    │    │  (Event Streaming + Buffer)│   │
                    │    └───────────┬──────────────┘    │
                    └────────────────┼───────────────────┘
                                     │
                          ┌──────────┴──────────┐
                          ▼                     ▼
                    ┌──────────┐          ┌──────────┐
                    │  Game 1  │   ...    │  Game N  │
                    │ Processor│          │ Processor│
                    └─────┬────┘          └────┬─────┘
                          │                    │
                          ▼                    ▼
                    ┌─────────────────────────────────────┐
                    │         Analytics Layer             │
                    │  ┌─────────────────────────────┐   │
                    │  │   Win Probability Engine    │   │
                    │  │   Decision Support Engine   │   │
                    │  │   Play Classification       │   │
                    │  └───────────┬─────────────────┘   │
                    └──────────────┼──────────────────────┘
                                   │
                    ┌──────────────┼──────────────────────┐
                    │         Storage Layer               │
                    │  ┌────────┐    ┌─────────────────┐ │
                    │  │ Redis  │    │   PostgreSQL    │ │
                    │  │(Cache) │    │   (Historical)  │ │
                    │  └────────┘    └─────────────────┘ │
                    └──────────────┬──────────────────────┘
                                   │
                    ┌──────────────┼──────────────────────┐
                    │         Delivery Layer              │
                    │  ┌────────────────────────────────┐ │
                    │  │     WebSocket Gateway          │ │
                    │  └──────────┬─────────────────────┘ │
                    └─────────────┼───────────────────────┘
                          ┌───────┼───────┐
                          ▼       ▼       ▼
                       ┌─────┐ ┌─────┐ ┌──────┐
                       │Graph│ │ Web │ │Mobile│
                       │ics  │ │ App │ │ App  │
                       └─────┘ └─────┘ └──────┘

Implementation

Step 1: Event Ingestion Service

import asyncio
import json
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from collections import defaultdict
import aiohttp
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

@dataclass
class GameEvent:
    """Standardized game event."""
    game_id: str
    event_id: str
    timestamp: datetime
    event_type: str
    home_score: int
    away_score: int
    quarter: int
    time_remaining: float
    possession: str
    field_position: int
    down: int
    distance: int
    description: str
    raw_data: Dict = field(default_factory=dict)


class EventIngestionService:
    """Ingest events from multiple data sources."""

    def __init__(self, kafka_bootstrap: str):
        self.kafka_bootstrap = kafka_bootstrap
        self.producer: Optional[AIOKafkaProducer] = None
        self.consumers: Dict[str, AIOKafkaConsumer] = {}
        self.event_counts = defaultdict(int)

    async def start(self):
        """Initialize Kafka connections."""
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.kafka_bootstrap,
            value_serializer=lambda v: json.dumps(v).encode()
        )
        await self.producer.start()

    async def stop(self):
        """Cleanup connections."""
        if self.producer:
            await self.producer.stop()
        for consumer in self.consumers.values():
            await consumer.stop()

    async def ingest_from_feed(self, feed_url: str, game_id: str):
        """Poll a data feed and publish events to Kafka."""
        async with aiohttp.ClientSession() as session:
            last_event_id = None

            while True:
                try:
                    async with session.get(feed_url) as response:
                        if response.status == 200:
                            data = await response.json()
                            events = self._parse_feed(data, game_id)

                            for event in events:
                                if event.event_id != last_event_id:
                                    await self._publish_event(event)
                                    last_event_id = event.event_id
                                    self.event_counts[game_id] += 1

                except Exception as e:
                    print(f"Feed error for {game_id}: {e}")

                await asyncio.sleep(1.0)  # Poll every second

    def _parse_feed(self, data: Dict, game_id: str) -> List[GameEvent]:
        """Parse raw feed data into standardized events."""
        events = []

        for play in data.get('plays', []):
            event = GameEvent(
                game_id=game_id,
                event_id=play.get('playId', ''),
                timestamp=datetime.now(),
                event_type=play.get('playType', 'unknown'),
                home_score=data.get('homeScore', 0),
                away_score=data.get('awayScore', 0),
                quarter=data.get('quarter', 1),
                time_remaining=data.get('gameClockSeconds', 0),
                possession=play.get('possession', ''),
                field_position=play.get('yardLine', 25),
                down=play.get('down', 1),
                distance=play.get('distance', 10),
                description=play.get('description', ''),
                raw_data=play
            )
            events.append(event)

        return events

    async def _publish_event(self, event: GameEvent):
        """Publish event to Kafka topic."""
        topic = f"game-events-{event.game_id}"
        await self.producer.send_and_wait(
            topic,
            {
                'event_id': event.event_id,
                'timestamp': event.timestamp.isoformat(),
                'event_type': event.event_type,
                'home_score': event.home_score,
                'away_score': event.away_score,
                'quarter': event.quarter,
                'time_remaining': event.time_remaining,
                'possession': event.possession,
                'field_position': event.field_position,
                'down': event.down,
                'distance': event.distance,
                'description': event.description
            }
        )

Step 2: Game Processor Service

import numpy as np
from scipy.special import expit

class GameProcessor:
    """Process events for a single game."""

    def __init__(self, game_id: str, home_team: str, away_team: str):
        self.game_id = game_id
        self.home_team = home_team
        self.away_team = away_team
        self.current_state = self._initial_state()
        self.wp_history = []
        self.play_history = []

    def _initial_state(self) -> Dict:
        """Create initial game state."""
        return {
            'home_score': 0,
            'away_score': 0,
            'quarter': 1,
            'time_remaining': 900.0,
            'possession': self.home_team,
            'field_position': 25,
            'down': 1,
            'distance': 10,
            'home_wp': 0.53,  # Slight home advantage
            'last_updated': datetime.now().isoformat()
        }

    def process_event(self, event: Dict) -> Dict:
        """Process event and return updated analysis."""
        start_time = datetime.now()

        # Update state
        self._update_state(event)

        # Calculate win probability
        home_wp = self._calculate_win_probability()
        self.current_state['home_wp'] = home_wp

        # Record history
        self.wp_history.append({
            'timestamp': datetime.now().isoformat(),
            'home_wp': home_wp,
            'event_id': event['event_id']
        })

        self.play_history.append({
            'event': event,
            'state_after': self.current_state.copy()
        })

        # Calculate additional metrics
        wpa = self._calculate_wpa()
        leverage = self._calculate_leverage()

        processing_time = (datetime.now() - start_time).total_seconds() * 1000

        return {
            'game_id': self.game_id,
            'state': self.current_state.copy(),
            'home_wp': home_wp,
            'away_wp': 1 - home_wp,
            'wpa': wpa,
            'leverage_index': leverage,
            'processing_time_ms': processing_time
        }

    def _update_state(self, event: Dict):
        """Update game state from event."""
        self.current_state['home_score'] = event['home_score']
        self.current_state['away_score'] = event['away_score']
        self.current_state['quarter'] = event['quarter']
        self.current_state['time_remaining'] = event['time_remaining']
        self.current_state['possession'] = event['possession']
        self.current_state['field_position'] = event['field_position']
        self.current_state['down'] = event['down']
        self.current_state['distance'] = event['distance']
        self.current_state['last_updated'] = datetime.now().isoformat()

    def _calculate_win_probability(self) -> float:
        """Calculate home team win probability."""
        state = self.current_state

        # Score differential
        score_diff = state['home_score'] - state['away_score']

        # Time remaining (as fraction of game)
        total_seconds = 3600  # 60 minutes
        elapsed = (state['quarter'] - 1) * 900 + (900 - state['time_remaining'])
        time_remaining_pct = 1 - (elapsed / total_seconds)

        # Field position value
        if state['possession'] == self.home_team:
            field_value = (state['field_position'] - 50) / 50
        else:
            field_value = -(state['field_position'] - 50) / 50

        # Simple model
        linear = (
            0.0 +  # Intercept
            0.12 * score_diff +
            0.15 * score_diff * (1 - time_remaining_pct) +
            0.05 * field_value +
            0.10  # Home field advantage
        )

        return float(expit(linear))

    def _calculate_wpa(self) -> float:
        """Calculate WPA for last play."""
        if len(self.wp_history) < 2:
            return 0.0

        return self.wp_history[-1]['home_wp'] - self.wp_history[-2]['home_wp']

    def _calculate_leverage(self) -> float:
        """Calculate leverage index."""
        state = self.current_state
        score_diff = abs(state['home_score'] - state['away_score'])

        # Higher leverage = closer game + less time
        if score_diff == 0:
            base = 2.0
        elif score_diff <= 3:
            base = 1.5
        elif score_diff <= 7:
            base = 1.0
        elif score_diff <= 14:
            base = 0.6
        else:
            base = 0.3

        # Increase leverage late in game
        time_factor = 1 + (state['quarter'] - 1) * 0.25

        return base * time_factor

    def get_snapshot(self) -> Dict:
        """Get current game snapshot."""
        return {
            'game_id': self.game_id,
            'home_team': self.home_team,
            'away_team': self.away_team,
            'state': self.current_state,
            'wp_chart_data': self.wp_history[-100:],  # Last 100 points
            'key_plays': self._get_key_plays()
        }

    def _get_key_plays(self, threshold: float = 0.10) -> List[Dict]:
        """Get plays with significant WPA."""
        key_plays = []

        for i in range(1, len(self.wp_history)):
            wpa = abs(self.wp_history[i]['home_wp'] - self.wp_history[i-1]['home_wp'])
            if wpa >= threshold:
                key_plays.append({
                    'event_id': self.wp_history[i]['event_id'],
                    'wpa': self.wp_history[i]['home_wp'] - self.wp_history[i-1]['home_wp'],
                    'home_wp': self.wp_history[i]['home_wp']
                })

        return sorted(key_plays, key=lambda x: abs(x['wpa']), reverse=True)[:5]

Step 3: WebSocket Gateway

import asyncio
import json
from typing import Dict, Set
import websockets

class BroadcastGateway:
    """WebSocket gateway for real-time updates."""

    def __init__(self):
        self.game_processors: Dict[str, GameProcessor] = {}
        self.game_clients: Dict[str, Set] = {}
        self.broadcast_queue = asyncio.Queue()

    async def register_game(self, game_id: str, home_team: str, away_team: str):
        """Register a new game for broadcasting."""
        self.game_processors[game_id] = GameProcessor(game_id, home_team, away_team)
        self.game_clients[game_id] = set()

    async def handle_client(self, websocket, path):
        """Handle new client connection."""
        game_id = path.strip('/')

        if game_id not in self.game_processors:
            await websocket.close(1008, "Game not found")
            return

        # Register client
        self.game_clients[game_id].add(websocket)

        try:
            # Send initial snapshot
            processor = self.game_processors[game_id]
            snapshot = processor.get_snapshot()
            await websocket.send(json.dumps({
                'type': 'snapshot',
                'data': snapshot
            }))

            # Handle incoming messages
            async for message in websocket:
                await self._handle_message(websocket, game_id, message)

        finally:
            # Unregister client
            self.game_clients[game_id].discard(websocket)

    async def _handle_message(self, websocket, game_id: str, message: str):
        """Handle message from client."""
        try:
            msg = json.loads(message)
            msg_type = msg.get('type')

            if msg_type == 'ping':
                await websocket.send(json.dumps({'type': 'pong'}))
            elif msg_type == 'request_snapshot':
                snapshot = self.game_processors[game_id].get_snapshot()
                await websocket.send(json.dumps({
                    'type': 'snapshot',
                    'data': snapshot
                }))

        except json.JSONDecodeError:
            pass

    async def broadcast_update(self, game_id: str, update: Dict):
        """Broadcast update to all clients watching a game."""
        if game_id not in self.game_clients:
            return

        message = json.dumps({
            'type': 'update',
            'data': update
        })

        dead_clients = set()
        for client in self.game_clients[game_id]:
            try:
                await client.send(message)
            except websockets.exceptions.ConnectionClosed:
                dead_clients.add(client)

        # Clean up dead connections
        self.game_clients[game_id] -= dead_clients

    async def process_and_broadcast(self, game_id: str, event: Dict):
        """Process event and broadcast results."""
        if game_id not in self.game_processors:
            return

        processor = self.game_processors[game_id]
        result = processor.process_event(event)

        await self.broadcast_update(game_id, result)

        return result

Step 4: Graphics Integration

class BroadcastGraphicsInterface:
    """Interface for broadcast graphics systems."""

    def __init__(self, graphics_endpoint: str):
        self.endpoint = graphics_endpoint
        self.session = None

    async def connect(self):
        """Initialize connection to graphics system."""
        self.session = aiohttp.ClientSession()

    async def disconnect(self):
        """Close connection."""
        if self.session:
            await self.session.close()

    async def update_win_probability(self, game_id: str, home_wp: float, away_wp: float):
        """Send win probability update to graphics."""
        payload = {
            'command': 'UPDATE_WIN_PROBABILITY',
            'game_id': game_id,
            'home_wp': round(home_wp * 100, 1),
            'away_wp': round(away_wp * 100, 1),
            'timestamp': datetime.now().isoformat()
        }

        await self._send_command(payload)

    async def trigger_key_play_graphic(self, play_data: Dict):
        """Trigger key play graphic."""
        payload = {
            'command': 'SHOW_KEY_PLAY',
            'play_description': play_data['description'],
            'wpa': round(play_data['wpa'] * 100, 1),
            'duration_ms': 5000
        }

        await self._send_command(payload)

    async def update_situation_display(self, state: Dict):
        """Update down and distance display."""
        payload = {
            'command': 'UPDATE_SITUATION',
            'down': state['down'],
            'distance': state['distance'],
            'field_position': state['field_position'],
            'possession': state['possession']
        }

        await self._send_command(payload)

    async def _send_command(self, payload: Dict):
        """Send command to graphics system."""
        try:
            async with self.session.post(self.endpoint, json=payload) as response:
                if response.status != 200:
                    print(f"Graphics error: {response.status}")
        except Exception as e:
            print(f"Graphics connection error: {e}")

Results

Performance Metrics

SYSTEM PERFORMANCE (CHAMPIONSHIP SATURDAY)
==========================================

Games Processed: 15 concurrent
Peak Events/Second: 47
Average Latency: 43ms
P95 Latency: 89ms
P99 Latency: 142ms

Uptime: 100% (8 hours)
Total Events Processed: 12,847
Total WP Updates Broadcast: 38,541

Client Metrics:
- Peak Concurrent Connections: 127,000
- Average Connection Duration: 42 minutes
- Messages Broadcast: 2.3 million

Win Probability Accuracy

WP MODEL CALIBRATION (2024 SEASON)
==================================

Predicted WP | Actual Win % | Sample Size
-------------|--------------|------------
0-10%        | 8.2%         | 1,234
10-20%       | 14.7%        | 2,456
20-30%       | 26.1%        | 3,891
30-40%       | 35.8%        | 4,567
40-50%       | 46.2%        | 5,123
50-60%       | 53.4%        | 5,234
60-70%       | 64.8%        | 4,678
70-80%       | 73.9%        | 3,456
80-90%       | 84.7%        | 2,345
90-100%      | 92.1%        | 1,567

Brier Score: 0.187 (Good calibration)

Viewer Engagement

SECOND SCREEN APP METRICS
=========================

Active Users During Games: 89,000 avg
Sessions per User: 2.3
Avg Session Duration: 38 minutes

Most Viewed Features:
1. Live Win Probability: 94% of users
2. Play-by-Play: 78% of users
3. Key Plays Highlights: 67% of users
4. Fourth Down Analysis: 45% of users

User Satisfaction: 4.2/5.0 stars

Lessons Learned

  1. Latency is Critical: Users notice delays over 500ms; optimize everything
  2. Graceful Degradation: Handle partial data; don't crash on missing fields
  3. Cache Aggressively: Win probability coefficients, team data, historical baselines
  4. Monitor Everything: Latency percentiles, queue depths, error rates
  5. Test at Scale: Game day loads are 10x normal; test accordingly

Recommendations

  1. Pre-warm Caches: Load all team/player data before kickoff
  2. Geographic Distribution: CDN for WebSocket connections
  3. Fallback Graphics: Static displays if real-time fails
  4. Alert Thresholds: Page on-call at 100ms p95 latency
  5. Post-Game Analysis: Archive all events for model improvement