Case Study 1: Building a Super-Ensemble for Election Forecasting

Overview

In this case study, you will build a "super-ensemble" that combines five distinct forecasting approaches to predict the outcomes of U.S. Senate races. Each approach brings different strengths and weaknesses, and the challenge is to combine them in a way that outperforms any individual component.

The five components are: 1. Polling model: Aggregates and adjusts state-level polls 2. Fundamentals model: Uses economic indicators and incumbency 3. Market prices: Prediction market probabilities 4. Expert forecasts: Ratings from political analysts 5. ML model: Machine learning on historical features

We will test multiple combination methods and rigorously evaluate which approach produces the most accurate, well-calibrated forecasts.

Background

Election forecasting is one of the best-studied domains in prediction. Organizations like FiveThirtyEight, The Economist, and academic researchers have all developed sophisticated models. A common finding is that no single source dominates across all races and time horizons:

  • Polls are most informative close to Election Day but can be noisy or biased months out.
  • Fundamentals (GDP growth, presidential approval, incumbency) are stable long-range predictors but miss race-specific dynamics.
  • Markets aggregate diverse information but can be thinly traded for down-ballot races.
  • Expert ratings (Cook Political Report, Sabato's Crystal Ball) provide qualitative insight but are coarse-grained.
  • ML models can detect non-linear patterns but risk overfitting to idiosyncratic historical data.

Data Setup

"""
Super-Ensemble for Election Forecasting
Case Study 1 - Chapter 25
"""

import numpy as np
import pandas as pd
from scipy.special import logit, expit
from scipy.optimize import minimize
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import KFold
from sklearn.metrics import brier_score_loss
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)

# ============================================================
# STEP 1: Generate synthetic Senate race data
# ============================================================

def generate_senate_data(n_races=200, n_cycles=5):
    """
    Generate synthetic data mimicking Senate race forecasting.

    Returns a DataFrame with:
    - true_prob: the true Democratic win probability
    - outcome: 1 if Democrat wins, 0 otherwise
    - polling_forecast: polling-based probability
    - fundamentals_forecast: fundamentals-based probability
    - market_price: prediction market implied probability
    - expert_forecast: expert rating converted to probability
    - ml_forecast: ML model probability
    - cycle: election cycle indicator
    - days_out: days before election (0 to 180)
    """
    races = []

    for cycle in range(n_cycles):
        n = n_races // n_cycles

        # True probabilities: mix of competitive and safe seats
        true_probs = np.concatenate([
            np.random.beta(5, 5, n // 2),      # Competitive races
            np.random.beta(8, 2, n // 4),       # Safe Dem seats
            np.random.beta(2, 8, n // 4),       # Safe GOP seats
        ])
        np.random.shuffle(true_probs)

        # Days before election
        days_out = np.random.choice([0, 7, 30, 60, 90, 180], n)

        # Generate outcomes
        outcomes = (np.random.random(n) < true_probs).astype(float)

        for i in range(n):
            tp = true_probs[i]
            d = days_out[i]

            # Polling model: accurate close to election, noisy far out
            poll_noise = 0.03 + 0.15 * (d / 180)
            poll_bias = 0.02 * np.random.choice([-1, 1])  # House effect
            polling = np.clip(tp + poll_bias +
                            np.random.normal(0, poll_noise), 0.02, 0.98)

            # Fundamentals model: stable but less responsive to dynamics
            fund_noise = 0.10
            fund_bias = 0.03 * (cycle - 2)  # Cycle-specific bias
            fundamentals = np.clip(0.7 * tp + 0.15 +
                                 np.random.normal(fund_bias, fund_noise),
                                 0.05, 0.95)

            # Market price: informed but with favorite-longshot bias
            market_noise = 0.05 + 0.10 * (d / 180)
            # Favorite-longshot: compress toward 0.5
            market_true = 0.8 * tp + 0.1
            market = np.clip(market_true +
                           np.random.normal(0, market_noise), 0.03, 0.97)

            # Expert forecast: coarse but useful
            expert_bins = [0.05, 0.20, 0.35, 0.50, 0.65, 0.80, 0.95]
            expert_idx = np.argmin(np.abs(np.array(expert_bins) - tp))
            expert_noise = np.random.normal(0, 0.07)
            expert = np.clip(expert_bins[expert_idx] + expert_noise,
                           0.03, 0.97)

            # ML model: good on average but occasionally overfits
            ml_overfit = np.random.random() < 0.1  # 10% overfit
            if ml_overfit:
                ml = np.clip(np.random.beta(0.5, 0.5), 0.02, 0.98)
            else:
                ml = np.clip(tp + np.random.normal(0, 0.08), 0.02, 0.98)

            races.append({
                'cycle': cycle,
                'race_id': i,
                'days_out': d,
                'true_prob': tp,
                'outcome': outcomes[i],
                'polling_forecast': polling,
                'fundamentals_forecast': fundamentals,
                'market_price': market,
                'expert_forecast': expert,
                'ml_forecast': ml,
            })

    return pd.DataFrame(races)


data = generate_senate_data(n_races=400, n_cycles=5)
print(f"Generated {len(data)} race-forecasts across "
      f"{data['cycle'].nunique()} cycles")
print(f"\nOutcome distribution: {data['outcome'].mean():.2%} Democratic wins")
print(f"\nDays before election distribution:")
print(data['days_out'].value_counts().sort_index())

Step 2: Evaluate Individual Components

# ============================================================
# STEP 2: Evaluate each component individually
# ============================================================

forecast_cols = ['polling_forecast', 'fundamentals_forecast',
                 'market_price', 'expert_forecast', 'ml_forecast']

print("\n" + "=" * 60)
print("INDIVIDUAL MODEL PERFORMANCE")
print("=" * 60)

for col in forecast_cols:
    bs = brier_score_loss(data['outcome'], data[col])
    print(f"  {col:30s}: Brier = {bs:.4f}")

# Performance by days_out
print("\nBrier Score by Days Before Election:")
print("-" * 70)
for days in sorted(data['days_out'].unique()):
    subset = data[data['days_out'] == days]
    print(f"\n  Days out = {days:3d}  (n = {len(subset)})")
    for col in forecast_cols:
        bs = brier_score_loss(subset['outcome'], subset[col])
        print(f"    {col:30s}: {bs:.4f}")

Step 3: Combination Methods

# ============================================================
# STEP 3: Implement and test combination methods
# ============================================================

def simple_average(forecasts_df, cols):
    """Simple equal-weight average."""
    return forecasts_df[cols].mean(axis=1).values

def weighted_average_brier(forecasts_df, cols, outcomes, train_mask):
    """Inverse-Brier-score weighted average."""
    train = forecasts_df[train_mask]
    train_outcomes = outcomes[train_mask]

    weights = []
    for col in cols:
        bs = brier_score_loss(train_outcomes, train[col])
        weights.append(1.0 / bs)
    weights = np.array(weights) / sum(weights)

    return forecasts_df[cols].values @ weights, weights

def optimized_weights(forecasts_df, cols, outcomes, train_mask,
                      shrinkage=0.1):
    """Constrained optimization of combination weights."""
    train_f = forecasts_df.loc[train_mask, cols].values
    train_y = outcomes[train_mask]
    K = len(cols)
    equal_w = np.ones(K) / K

    def objective(w):
        combined = train_f @ w
        mse = np.mean((train_y - combined) ** 2)
        reg = shrinkage * np.sum((w - equal_w) ** 2)
        return mse + reg

    constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}
    bounds = [(0, 1)] * K
    result = minimize(objective, equal_w, method='SLSQP',
                      bounds=bounds, constraints=constraints)
    weights = result.x

    return forecasts_df[cols].values @ weights, weights

def extremized_average(forecasts_df, cols, d=1.5):
    """Simple average with extremizing."""
    avg = forecasts_df[cols].mean(axis=1).values
    avg = np.clip(avg, 1e-4, 1 - 1e-4)
    return expit(d * logit(avg))

def calibrated_extremized(forecasts_df, cols, outcomes, train_mask):
    """Extremized average with calibrated d from logistic regression."""
    train_avg = forecasts_df.loc[train_mask, cols].mean(axis=1).values
    train_avg = np.clip(train_avg, 1e-4, 1 - 1e-4)
    train_y = outcomes[train_mask]

    X_train = logit(train_avg).reshape(-1, 1)
    lr = LogisticRegression(C=1e6, fit_intercept=True, max_iter=1000)
    lr.fit(X_train, train_y)

    d = lr.coef_[0, 0]
    b = lr.intercept_[0]

    all_avg = forecasts_df[cols].mean(axis=1).values
    all_avg = np.clip(all_avg, 1e-4, 1 - 1e-4)
    calibrated = expit(b + d * logit(all_avg))

    return calibrated, d, b

def log_pool(forecasts_df, cols):
    """Logarithmic opinion pool (equal weights)."""
    forecasts = forecasts_df[cols].values
    forecasts = np.clip(forecasts, 1e-4, 1 - 1e-4)
    log_odds = logit(forecasts)
    avg_log_odds = log_odds.mean(axis=1)
    return expit(avg_log_odds)

def stacking_ensemble(forecasts_df, cols, outcomes, train_mask):
    """Stacking with logistic regression meta-learner."""
    X_train = forecasts_df.loc[train_mask, cols].values
    y_train = outcomes[train_mask]
    X_all = forecasts_df[cols].values

    # Cross-validated stacking on training set
    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    meta_preds = np.zeros(len(y_train))

    for tr_idx, val_idx in kf.split(X_train):
        lr = LogisticRegression(C=1.0, max_iter=1000)
        lr.fit(X_train[tr_idx], y_train[tr_idx])
        meta_preds[val_idx] = lr.predict_proba(X_train[val_idx])[:, 1]

    # Final meta-learner on full training set
    meta_lr = LogisticRegression(C=1.0, max_iter=1000)
    meta_lr.fit(X_train, y_train)

    return meta_lr.predict_proba(X_all)[:, 1], meta_lr


# ============================================================
# STEP 4: Time-series cross-validation (train on past cycles,
#          test on next cycle)
# ============================================================

print("\n" + "=" * 60)
print("ENSEMBLE METHOD COMPARISON (Cycle-Based CV)")
print("=" * 60)

results = {method: [] for method in [
    'simple_avg', 'weighted_avg', 'optimized', 'extremized_1.5',
    'extremized_2.0', 'calibrated_ext', 'log_pool', 'stacking'
]}

# Also track individual model results
for col in forecast_cols:
    results[col] = []

cycles = sorted(data['cycle'].unique())

for test_cycle in cycles[1:]:  # Need at least one training cycle
    train_mask = data['cycle'] < test_cycle
    test_mask = data['cycle'] == test_cycle

    outcomes = data['outcome'].values
    test_y = data.loc[test_mask, 'outcome'].values

    # Individual models
    for col in forecast_cols:
        bs = brier_score_loss(test_y, data.loc[test_mask, col])
        results[col].append(bs)

    # Simple average
    sa = simple_average(data, forecast_cols)
    results['simple_avg'].append(
        brier_score_loss(test_y, sa[test_mask]))

    # Weighted average
    wa, wa_weights = weighted_average_brier(
        data, forecast_cols, outcomes, train_mask)
    results['weighted_avg'].append(
        brier_score_loss(test_y, wa[test_mask]))

    # Optimized weights
    ow, ow_weights = optimized_weights(
        data, forecast_cols, outcomes, train_mask, shrinkage=0.1)
    results['optimized'].append(
        brier_score_loss(test_y, ow[test_mask]))

    # Extremized (d=1.5)
    ext15 = extremized_average(data, forecast_cols, d=1.5)
    results['extremized_1.5'].append(
        brier_score_loss(test_y, ext15[test_mask]))

    # Extremized (d=2.0)
    ext20 = extremized_average(data, forecast_cols, d=2.0)
    results['extremized_2.0'].append(
        brier_score_loss(test_y, ext20[test_mask]))

    # Calibrated extremized
    cal_ext, d_cal, b_cal = calibrated_extremized(
        data, forecast_cols, outcomes, train_mask)
    results['calibrated_ext'].append(
        brier_score_loss(test_y, cal_ext[test_mask]))

    # Log pool
    lp = log_pool(data, forecast_cols)
    results['log_pool'].append(
        brier_score_loss(test_y, lp[test_mask]))

    # Stacking
    stk, stk_model = stacking_ensemble(
        data, forecast_cols, outcomes, train_mask)
    results['stacking'].append(
        brier_score_loss(test_y, stk[test_mask]))

# Print results
print(f"\n{'Method':<30s} {'Mean Brier':>10s} {'Std':>8s}")
print("-" * 50)
for method in results:
    scores = results[method]
    if len(scores) > 0:
        mean_bs = np.mean(scores)
        std_bs = np.std(scores)
        print(f"  {method:<28s} {mean_bs:>10.4f} {std_bs:>8.4f}")

Step 4: Detailed Analysis

# ============================================================
# STEP 5: Detailed analysis of the best methods
# ============================================================

# Use the last cycle as the final test
final_train = data['cycle'] < cycles[-1]
final_test = data['cycle'] == cycles[-1]
test_data = data[final_test].copy()
test_y = test_data['outcome'].values

# Get all ensemble forecasts for the test set
test_data['simple_avg'] = simple_average(data, forecast_cols)[final_test]

wa, _ = weighted_average_brier(
    data, forecast_cols, data['outcome'].values, final_train)
test_data['weighted_avg'] = wa[final_test]

ext_cal, d_opt, b_opt = calibrated_extremized(
    data, forecast_cols, data['outcome'].values, final_train)
test_data['cal_extremized'] = ext_cal[final_test]

stk, stk_model = stacking_ensemble(
    data, forecast_cols, data['outcome'].values, final_train)
test_data['stacking'] = stk[final_test]

# Calibration analysis
def calibration_analysis(forecasts, outcomes, n_bins=10, label=''):
    """Compute calibration metrics."""
    bins = np.linspace(0, 1, n_bins + 1)
    bin_centers = []
    bin_means = []
    bin_counts = []

    for i in range(n_bins):
        mask = (forecasts >= bins[i]) & (forecasts < bins[i+1])
        if mask.sum() > 0:
            bin_centers.append(np.mean(forecasts[mask]))
            bin_means.append(np.mean(outcomes[mask]))
            bin_counts.append(mask.sum())

    # Calibration error (ECE)
    ece = 0
    total = sum(bin_counts)
    for center, mean, count in zip(bin_centers, bin_means, bin_counts):
        ece += (count / total) * abs(center - mean)

    return {
        'label': label,
        'bin_centers': bin_centers,
        'bin_means': bin_means,
        'bin_counts': bin_counts,
        'ece': ece,
        'brier': brier_score_loss(outcomes, forecasts),
    }


# Calibration comparison
methods_to_analyze = {
    'Simple Average': test_data['simple_avg'].values,
    'Weighted Average': test_data['weighted_avg'].values,
    'Calibrated Extremized': test_data['cal_extremized'].values,
    'Stacking': test_data['stacking'].values,
    'Polling Model': test_data['polling_forecast'].values,
    'Market Price': test_data['market_price'].values,
}

print("\n" + "=" * 60)
print("CALIBRATION ANALYSIS (Final Test Cycle)")
print("=" * 60)

fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()

for idx, (name, forecasts) in enumerate(methods_to_analyze.items()):
    cal = calibration_analysis(forecasts, test_y, n_bins=10, label=name)

    print(f"\n{name}:")
    print(f"  Brier Score: {cal['brier']:.4f}")
    print(f"  ECE:         {cal['ece']:.4f}")

    ax = axes[idx]
    ax.plot([0, 1], [0, 1], 'k--', alpha=0.5, label='Perfect')
    ax.scatter(cal['bin_centers'], cal['bin_means'],
               s=[c * 3 for c in cal['bin_counts']], alpha=0.7)
    ax.set_xlabel('Forecast Probability')
    ax.set_ylabel('Observed Frequency')
    ax.set_title(f'{name}\nBrier={cal["brier"]:.4f}, ECE={cal["ece"]:.4f}')
    ax.set_xlim(0, 1)
    ax.set_ylim(0, 1)
    ax.legend()

plt.tight_layout()
plt.savefig('election_calibration.png', dpi=150, bbox_inches='tight')
plt.close()
print("\nCalibration plot saved to election_calibration.png")

Step 5: Performance by Race Type

# ============================================================
# STEP 6: Performance breakdown by race competitiveness
# ============================================================

print("\n" + "=" * 60)
print("PERFORMANCE BY RACE COMPETITIVENESS")
print("=" * 60)

test_data['competitiveness'] = pd.cut(
    test_data['true_prob'],
    bins=[0, 0.25, 0.40, 0.60, 0.75, 1.0],
    labels=['Safe R', 'Lean R', 'Tossup', 'Lean D', 'Safe D']
)

ensemble_methods = {
    'Simple Avg': 'simple_avg',
    'Weighted': 'weighted_avg',
    'Cal. Extremized': 'cal_extremized',
    'Stacking': 'stacking',
}

for comp in ['Safe R', 'Lean R', 'Tossup', 'Lean D', 'Safe D']:
    subset = test_data[test_data['competitiveness'] == comp]
    if len(subset) < 5:
        continue
    print(f"\n  {comp} (n={len(subset)}):")
    sub_y = subset['outcome'].values
    for name, col in ensemble_methods.items():
        bs = brier_score_loss(sub_y, subset[col])
        print(f"    {name:20s}: {bs:.4f}")

Step 6: Weight Analysis

# ============================================================
# STEP 7: Analyze learned weights and model contributions
# ============================================================

print("\n" + "=" * 60)
print("WEIGHT ANALYSIS")
print("=" * 60)

# Weighted average weights
_, wa_final_weights = weighted_average_brier(
    data, forecast_cols, data['outcome'].values, final_train)
print("\nInverse-Brier Weights:")
for col, w in zip(forecast_cols, wa_final_weights):
    print(f"  {col:30s}: {w:.3f}")

# Optimized weights
_, opt_final_weights = optimized_weights(
    data, forecast_cols, data['outcome'].values, final_train, shrinkage=0.1)
print("\nOptimized Weights (shrinkage=0.1):")
for col, w in zip(forecast_cols, opt_final_weights):
    print(f"  {col:30s}: {w:.3f}")

# Stacking meta-learner weights
print("\nStacking Meta-Learner Coefficients:")
print(f"  Intercept: {stk_model.intercept_[0]:.3f}")
for col, coef in zip(forecast_cols, stk_model.coef_[0]):
    print(f"  {col:30s}: {coef:.3f}")

# Extremizing parameters
print(f"\nCalibrated Extremizing Parameters:")
print(f"  Extremizing factor d: {d_opt:.3f}")
print(f"  Bias intercept b:     {b_opt:.3f}")

Key Findings and Discussion

After running this analysis, we typically observe the following patterns:

  1. All ensemble methods outperform individual models. The simple average alone typically improves on every individual component by a meaningful margin.

  2. The polling model performs best close to the election (low days_out), while the fundamentals model is relatively better far from the election. The stacking meta-learner can learn this time-dependent weighting.

  3. Extremizing helps for competitive races (true probability near 0.5), where shared information causes the simple average to be insufficiently decisive. But it can hurt for "safe" races where the average is already extreme.

  4. Calibrated extremizing and stacking tend to perform similarly and both outperform the simple average, though the improvement may be modest.

  5. Market prices are surprisingly competitive as a single source but still benefit from being combined with model-based forecasts.

  6. The optimal combination varies by context. No single ensemble method dominates across all race types and time horizons. A practical system might use different methods for different situations.

The broader lesson: building a super-ensemble is an iterative process of testing, validating, and refining. The key ingredients are diverse, well-calibrated component models and a disciplined approach to combination that guards against overfitting.