Case Study 1: Building a Super-Ensemble for Election Forecasting
Overview
In this case study, you will build a "super-ensemble" that combines five distinct forecasting approaches to predict the outcomes of U.S. Senate races. Each approach brings different strengths and weaknesses, and the challenge is to combine them in a way that outperforms any individual component.
The five components are: 1. Polling model: Aggregates and adjusts state-level polls 2. Fundamentals model: Uses economic indicators and incumbency 3. Market prices: Prediction market probabilities 4. Expert forecasts: Ratings from political analysts 5. ML model: Machine learning on historical features
We will test multiple combination methods and rigorously evaluate which approach produces the most accurate, well-calibrated forecasts.
Background
Election forecasting is one of the best-studied domains in prediction. Organizations like FiveThirtyEight, The Economist, and academic researchers have all developed sophisticated models. A common finding is that no single source dominates across all races and time horizons:
- Polls are most informative close to Election Day but can be noisy or biased months out.
- Fundamentals (GDP growth, presidential approval, incumbency) are stable long-range predictors but miss race-specific dynamics.
- Markets aggregate diverse information but can be thinly traded for down-ballot races.
- Expert ratings (Cook Political Report, Sabato's Crystal Ball) provide qualitative insight but are coarse-grained.
- ML models can detect non-linear patterns but risk overfitting to idiosyncratic historical data.
Data Setup
"""
Super-Ensemble for Election Forecasting
Case Study 1 - Chapter 25
"""
import numpy as np
import pandas as pd
from scipy.special import logit, expit
from scipy.optimize import minimize
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import KFold
from sklearn.metrics import brier_score_loss
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
np.random.seed(42)
# ============================================================
# STEP 1: Generate synthetic Senate race data
# ============================================================
def generate_senate_data(n_races=200, n_cycles=5):
"""
Generate synthetic data mimicking Senate race forecasting.
Returns a DataFrame with:
- true_prob: the true Democratic win probability
- outcome: 1 if Democrat wins, 0 otherwise
- polling_forecast: polling-based probability
- fundamentals_forecast: fundamentals-based probability
- market_price: prediction market implied probability
- expert_forecast: expert rating converted to probability
- ml_forecast: ML model probability
- cycle: election cycle indicator
- days_out: days before election (0 to 180)
"""
races = []
for cycle in range(n_cycles):
n = n_races // n_cycles
# True probabilities: mix of competitive and safe seats
true_probs = np.concatenate([
np.random.beta(5, 5, n // 2), # Competitive races
np.random.beta(8, 2, n // 4), # Safe Dem seats
np.random.beta(2, 8, n // 4), # Safe GOP seats
])
np.random.shuffle(true_probs)
# Days before election
days_out = np.random.choice([0, 7, 30, 60, 90, 180], n)
# Generate outcomes
outcomes = (np.random.random(n) < true_probs).astype(float)
for i in range(n):
tp = true_probs[i]
d = days_out[i]
# Polling model: accurate close to election, noisy far out
poll_noise = 0.03 + 0.15 * (d / 180)
poll_bias = 0.02 * np.random.choice([-1, 1]) # House effect
polling = np.clip(tp + poll_bias +
np.random.normal(0, poll_noise), 0.02, 0.98)
# Fundamentals model: stable but less responsive to dynamics
fund_noise = 0.10
fund_bias = 0.03 * (cycle - 2) # Cycle-specific bias
fundamentals = np.clip(0.7 * tp + 0.15 +
np.random.normal(fund_bias, fund_noise),
0.05, 0.95)
# Market price: informed but with favorite-longshot bias
market_noise = 0.05 + 0.10 * (d / 180)
# Favorite-longshot: compress toward 0.5
market_true = 0.8 * tp + 0.1
market = np.clip(market_true +
np.random.normal(0, market_noise), 0.03, 0.97)
# Expert forecast: coarse but useful
expert_bins = [0.05, 0.20, 0.35, 0.50, 0.65, 0.80, 0.95]
expert_idx = np.argmin(np.abs(np.array(expert_bins) - tp))
expert_noise = np.random.normal(0, 0.07)
expert = np.clip(expert_bins[expert_idx] + expert_noise,
0.03, 0.97)
# ML model: good on average but occasionally overfits
ml_overfit = np.random.random() < 0.1 # 10% overfit
if ml_overfit:
ml = np.clip(np.random.beta(0.5, 0.5), 0.02, 0.98)
else:
ml = np.clip(tp + np.random.normal(0, 0.08), 0.02, 0.98)
races.append({
'cycle': cycle,
'race_id': i,
'days_out': d,
'true_prob': tp,
'outcome': outcomes[i],
'polling_forecast': polling,
'fundamentals_forecast': fundamentals,
'market_price': market,
'expert_forecast': expert,
'ml_forecast': ml,
})
return pd.DataFrame(races)
data = generate_senate_data(n_races=400, n_cycles=5)
print(f"Generated {len(data)} race-forecasts across "
f"{data['cycle'].nunique()} cycles")
print(f"\nOutcome distribution: {data['outcome'].mean():.2%} Democratic wins")
print(f"\nDays before election distribution:")
print(data['days_out'].value_counts().sort_index())
Step 2: Evaluate Individual Components
# ============================================================
# STEP 2: Evaluate each component individually
# ============================================================
forecast_cols = ['polling_forecast', 'fundamentals_forecast',
'market_price', 'expert_forecast', 'ml_forecast']
print("\n" + "=" * 60)
print("INDIVIDUAL MODEL PERFORMANCE")
print("=" * 60)
for col in forecast_cols:
bs = brier_score_loss(data['outcome'], data[col])
print(f" {col:30s}: Brier = {bs:.4f}")
# Performance by days_out
print("\nBrier Score by Days Before Election:")
print("-" * 70)
for days in sorted(data['days_out'].unique()):
subset = data[data['days_out'] == days]
print(f"\n Days out = {days:3d} (n = {len(subset)})")
for col in forecast_cols:
bs = brier_score_loss(subset['outcome'], subset[col])
print(f" {col:30s}: {bs:.4f}")
Step 3: Combination Methods
# ============================================================
# STEP 3: Implement and test combination methods
# ============================================================
def simple_average(forecasts_df, cols):
"""Simple equal-weight average."""
return forecasts_df[cols].mean(axis=1).values
def weighted_average_brier(forecasts_df, cols, outcomes, train_mask):
"""Inverse-Brier-score weighted average."""
train = forecasts_df[train_mask]
train_outcomes = outcomes[train_mask]
weights = []
for col in cols:
bs = brier_score_loss(train_outcomes, train[col])
weights.append(1.0 / bs)
weights = np.array(weights) / sum(weights)
return forecasts_df[cols].values @ weights, weights
def optimized_weights(forecasts_df, cols, outcomes, train_mask,
shrinkage=0.1):
"""Constrained optimization of combination weights."""
train_f = forecasts_df.loc[train_mask, cols].values
train_y = outcomes[train_mask]
K = len(cols)
equal_w = np.ones(K) / K
def objective(w):
combined = train_f @ w
mse = np.mean((train_y - combined) ** 2)
reg = shrinkage * np.sum((w - equal_w) ** 2)
return mse + reg
constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}
bounds = [(0, 1)] * K
result = minimize(objective, equal_w, method='SLSQP',
bounds=bounds, constraints=constraints)
weights = result.x
return forecasts_df[cols].values @ weights, weights
def extremized_average(forecasts_df, cols, d=1.5):
"""Simple average with extremizing."""
avg = forecasts_df[cols].mean(axis=1).values
avg = np.clip(avg, 1e-4, 1 - 1e-4)
return expit(d * logit(avg))
def calibrated_extremized(forecasts_df, cols, outcomes, train_mask):
"""Extremized average with calibrated d from logistic regression."""
train_avg = forecasts_df.loc[train_mask, cols].mean(axis=1).values
train_avg = np.clip(train_avg, 1e-4, 1 - 1e-4)
train_y = outcomes[train_mask]
X_train = logit(train_avg).reshape(-1, 1)
lr = LogisticRegression(C=1e6, fit_intercept=True, max_iter=1000)
lr.fit(X_train, train_y)
d = lr.coef_[0, 0]
b = lr.intercept_[0]
all_avg = forecasts_df[cols].mean(axis=1).values
all_avg = np.clip(all_avg, 1e-4, 1 - 1e-4)
calibrated = expit(b + d * logit(all_avg))
return calibrated, d, b
def log_pool(forecasts_df, cols):
"""Logarithmic opinion pool (equal weights)."""
forecasts = forecasts_df[cols].values
forecasts = np.clip(forecasts, 1e-4, 1 - 1e-4)
log_odds = logit(forecasts)
avg_log_odds = log_odds.mean(axis=1)
return expit(avg_log_odds)
def stacking_ensemble(forecasts_df, cols, outcomes, train_mask):
"""Stacking with logistic regression meta-learner."""
X_train = forecasts_df.loc[train_mask, cols].values
y_train = outcomes[train_mask]
X_all = forecasts_df[cols].values
# Cross-validated stacking on training set
kf = KFold(n_splits=5, shuffle=True, random_state=42)
meta_preds = np.zeros(len(y_train))
for tr_idx, val_idx in kf.split(X_train):
lr = LogisticRegression(C=1.0, max_iter=1000)
lr.fit(X_train[tr_idx], y_train[tr_idx])
meta_preds[val_idx] = lr.predict_proba(X_train[val_idx])[:, 1]
# Final meta-learner on full training set
meta_lr = LogisticRegression(C=1.0, max_iter=1000)
meta_lr.fit(X_train, y_train)
return meta_lr.predict_proba(X_all)[:, 1], meta_lr
# ============================================================
# STEP 4: Time-series cross-validation (train on past cycles,
# test on next cycle)
# ============================================================
print("\n" + "=" * 60)
print("ENSEMBLE METHOD COMPARISON (Cycle-Based CV)")
print("=" * 60)
results = {method: [] for method in [
'simple_avg', 'weighted_avg', 'optimized', 'extremized_1.5',
'extremized_2.0', 'calibrated_ext', 'log_pool', 'stacking'
]}
# Also track individual model results
for col in forecast_cols:
results[col] = []
cycles = sorted(data['cycle'].unique())
for test_cycle in cycles[1:]: # Need at least one training cycle
train_mask = data['cycle'] < test_cycle
test_mask = data['cycle'] == test_cycle
outcomes = data['outcome'].values
test_y = data.loc[test_mask, 'outcome'].values
# Individual models
for col in forecast_cols:
bs = brier_score_loss(test_y, data.loc[test_mask, col])
results[col].append(bs)
# Simple average
sa = simple_average(data, forecast_cols)
results['simple_avg'].append(
brier_score_loss(test_y, sa[test_mask]))
# Weighted average
wa, wa_weights = weighted_average_brier(
data, forecast_cols, outcomes, train_mask)
results['weighted_avg'].append(
brier_score_loss(test_y, wa[test_mask]))
# Optimized weights
ow, ow_weights = optimized_weights(
data, forecast_cols, outcomes, train_mask, shrinkage=0.1)
results['optimized'].append(
brier_score_loss(test_y, ow[test_mask]))
# Extremized (d=1.5)
ext15 = extremized_average(data, forecast_cols, d=1.5)
results['extremized_1.5'].append(
brier_score_loss(test_y, ext15[test_mask]))
# Extremized (d=2.0)
ext20 = extremized_average(data, forecast_cols, d=2.0)
results['extremized_2.0'].append(
brier_score_loss(test_y, ext20[test_mask]))
# Calibrated extremized
cal_ext, d_cal, b_cal = calibrated_extremized(
data, forecast_cols, outcomes, train_mask)
results['calibrated_ext'].append(
brier_score_loss(test_y, cal_ext[test_mask]))
# Log pool
lp = log_pool(data, forecast_cols)
results['log_pool'].append(
brier_score_loss(test_y, lp[test_mask]))
# Stacking
stk, stk_model = stacking_ensemble(
data, forecast_cols, outcomes, train_mask)
results['stacking'].append(
brier_score_loss(test_y, stk[test_mask]))
# Print results
print(f"\n{'Method':<30s} {'Mean Brier':>10s} {'Std':>8s}")
print("-" * 50)
for method in results:
scores = results[method]
if len(scores) > 0:
mean_bs = np.mean(scores)
std_bs = np.std(scores)
print(f" {method:<28s} {mean_bs:>10.4f} {std_bs:>8.4f}")
Step 4: Detailed Analysis
# ============================================================
# STEP 5: Detailed analysis of the best methods
# ============================================================
# Use the last cycle as the final test
final_train = data['cycle'] < cycles[-1]
final_test = data['cycle'] == cycles[-1]
test_data = data[final_test].copy()
test_y = test_data['outcome'].values
# Get all ensemble forecasts for the test set
test_data['simple_avg'] = simple_average(data, forecast_cols)[final_test]
wa, _ = weighted_average_brier(
data, forecast_cols, data['outcome'].values, final_train)
test_data['weighted_avg'] = wa[final_test]
ext_cal, d_opt, b_opt = calibrated_extremized(
data, forecast_cols, data['outcome'].values, final_train)
test_data['cal_extremized'] = ext_cal[final_test]
stk, stk_model = stacking_ensemble(
data, forecast_cols, data['outcome'].values, final_train)
test_data['stacking'] = stk[final_test]
# Calibration analysis
def calibration_analysis(forecasts, outcomes, n_bins=10, label=''):
"""Compute calibration metrics."""
bins = np.linspace(0, 1, n_bins + 1)
bin_centers = []
bin_means = []
bin_counts = []
for i in range(n_bins):
mask = (forecasts >= bins[i]) & (forecasts < bins[i+1])
if mask.sum() > 0:
bin_centers.append(np.mean(forecasts[mask]))
bin_means.append(np.mean(outcomes[mask]))
bin_counts.append(mask.sum())
# Calibration error (ECE)
ece = 0
total = sum(bin_counts)
for center, mean, count in zip(bin_centers, bin_means, bin_counts):
ece += (count / total) * abs(center - mean)
return {
'label': label,
'bin_centers': bin_centers,
'bin_means': bin_means,
'bin_counts': bin_counts,
'ece': ece,
'brier': brier_score_loss(outcomes, forecasts),
}
# Calibration comparison
methods_to_analyze = {
'Simple Average': test_data['simple_avg'].values,
'Weighted Average': test_data['weighted_avg'].values,
'Calibrated Extremized': test_data['cal_extremized'].values,
'Stacking': test_data['stacking'].values,
'Polling Model': test_data['polling_forecast'].values,
'Market Price': test_data['market_price'].values,
}
print("\n" + "=" * 60)
print("CALIBRATION ANALYSIS (Final Test Cycle)")
print("=" * 60)
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
for idx, (name, forecasts) in enumerate(methods_to_analyze.items()):
cal = calibration_analysis(forecasts, test_y, n_bins=10, label=name)
print(f"\n{name}:")
print(f" Brier Score: {cal['brier']:.4f}")
print(f" ECE: {cal['ece']:.4f}")
ax = axes[idx]
ax.plot([0, 1], [0, 1], 'k--', alpha=0.5, label='Perfect')
ax.scatter(cal['bin_centers'], cal['bin_means'],
s=[c * 3 for c in cal['bin_counts']], alpha=0.7)
ax.set_xlabel('Forecast Probability')
ax.set_ylabel('Observed Frequency')
ax.set_title(f'{name}\nBrier={cal["brier"]:.4f}, ECE={cal["ece"]:.4f}')
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.legend()
plt.tight_layout()
plt.savefig('election_calibration.png', dpi=150, bbox_inches='tight')
plt.close()
print("\nCalibration plot saved to election_calibration.png")
Step 5: Performance by Race Type
# ============================================================
# STEP 6: Performance breakdown by race competitiveness
# ============================================================
print("\n" + "=" * 60)
print("PERFORMANCE BY RACE COMPETITIVENESS")
print("=" * 60)
test_data['competitiveness'] = pd.cut(
test_data['true_prob'],
bins=[0, 0.25, 0.40, 0.60, 0.75, 1.0],
labels=['Safe R', 'Lean R', 'Tossup', 'Lean D', 'Safe D']
)
ensemble_methods = {
'Simple Avg': 'simple_avg',
'Weighted': 'weighted_avg',
'Cal. Extremized': 'cal_extremized',
'Stacking': 'stacking',
}
for comp in ['Safe R', 'Lean R', 'Tossup', 'Lean D', 'Safe D']:
subset = test_data[test_data['competitiveness'] == comp]
if len(subset) < 5:
continue
print(f"\n {comp} (n={len(subset)}):")
sub_y = subset['outcome'].values
for name, col in ensemble_methods.items():
bs = brier_score_loss(sub_y, subset[col])
print(f" {name:20s}: {bs:.4f}")
Step 6: Weight Analysis
# ============================================================
# STEP 7: Analyze learned weights and model contributions
# ============================================================
print("\n" + "=" * 60)
print("WEIGHT ANALYSIS")
print("=" * 60)
# Weighted average weights
_, wa_final_weights = weighted_average_brier(
data, forecast_cols, data['outcome'].values, final_train)
print("\nInverse-Brier Weights:")
for col, w in zip(forecast_cols, wa_final_weights):
print(f" {col:30s}: {w:.3f}")
# Optimized weights
_, opt_final_weights = optimized_weights(
data, forecast_cols, data['outcome'].values, final_train, shrinkage=0.1)
print("\nOptimized Weights (shrinkage=0.1):")
for col, w in zip(forecast_cols, opt_final_weights):
print(f" {col:30s}: {w:.3f}")
# Stacking meta-learner weights
print("\nStacking Meta-Learner Coefficients:")
print(f" Intercept: {stk_model.intercept_[0]:.3f}")
for col, coef in zip(forecast_cols, stk_model.coef_[0]):
print(f" {col:30s}: {coef:.3f}")
# Extremizing parameters
print(f"\nCalibrated Extremizing Parameters:")
print(f" Extremizing factor d: {d_opt:.3f}")
print(f" Bias intercept b: {b_opt:.3f}")
Key Findings and Discussion
After running this analysis, we typically observe the following patterns:
-
All ensemble methods outperform individual models. The simple average alone typically improves on every individual component by a meaningful margin.
-
The polling model performs best close to the election (low days_out), while the fundamentals model is relatively better far from the election. The stacking meta-learner can learn this time-dependent weighting.
-
Extremizing helps for competitive races (true probability near 0.5), where shared information causes the simple average to be insufficiently decisive. But it can hurt for "safe" races where the average is already extreme.
-
Calibrated extremizing and stacking tend to perform similarly and both outperform the simple average, though the improvement may be modest.
-
Market prices are surprisingly competitive as a single source but still benefit from being combined with model-based forecasts.
-
The optimal combination varies by context. No single ensemble method dominates across all race types and time horizons. A practical system might use different methods for different situations.
The broader lesson: building a super-ensemble is an iterative process of testing, validating, and refining. The key ingredients are diverse, well-calibrated component models and a disciplined approach to combination that guards against overfitting.