Case Study 2: Building a Personal Forecasting Calibration Tracker

Overview

In this case study, we build a complete Python tool for tracking and improving personal calibration as a prediction market trader. The tracker will:

  1. Log predictions with metadata (event description, probability, market price, category).
  2. Resolve predictions and record outcomes.
  3. Compute rolling calibration metrics (ECE, BSS, Murphy decomposition).
  4. Generate improvement reports with actionable insights.
  5. Set calibration goals and track progress.
  6. Compare personal performance against market consensus.

This is a practical, "build it and use it" case study. By the end, you will have a production-ready tool that you can use in your own trading.


Part 1: Designing the Tracker

Requirements

A good calibration tracker must be:

  • Persistent: Data survives across sessions (stored in a JSON file).
  • Easy to use: Simple API for adding predictions and recording outcomes.
  • Analytical: Computes all relevant calibration and forecast quality metrics.
  • Visual: Generates reliability diagrams and trend charts.
  • Actionable: Provides specific advice for improvement based on patterns.

Data Model

Each forecast entry contains:

{
    "id": 1,
    "event": "Will BTC exceed $100k by March 2026?",
    "probability": 0.65,
    "market_price": 0.58,
    "category": "crypto",
    "tags": ["bitcoin", "price"],
    "notes": "Strong momentum, ETF inflows continue",
    "forecast_date": "2026-01-15T10:30:00",
    "resolution_date": null,
    "outcome": null,
    "confidence_level": "moderate"
}

Part 2: Core Implementation

"""
Personal Forecasting Calibration Tracker
=========================================
A complete tool for tracking, analyzing, and improving personal calibration
as a prediction market trader.

Usage:
    tracker = CalibrationTracker('my_forecasts.json')
    tracker.add_forecast("Event description", 0.70, market_price=0.65, category="politics")
    tracker.resolve(1, outcome=1)
    tracker.report()
"""

import json
import os
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict


class CalibrationTracker:
    """
    Personal calibration tracking and analysis tool.

    Stores forecasts in a JSON file and provides methods for
    logging, resolving, analyzing, and visualizing forecasting performance.
    """

    def __init__(self, filepath='calibration_log.json'):
        """
        Initialize the tracker.

        Parameters
        ----------
        filepath : str
            Path to the JSON file for persistent storage.
        """
        self.filepath = filepath
        self.log = self._load()

    def _load(self):
        """Load existing forecast log from file."""
        if os.path.exists(self.filepath):
            with open(self.filepath, 'r') as f:
                return json.load(f)
        return []

    def _save(self):
        """Save forecast log to file."""
        with open(self.filepath, 'w') as f:
            json.dump(self.log, f, indent=2, default=str)

    # ==========================================
    # LOGGING METHODS
    # ==========================================

    def add_forecast(self, event, probability, market_price=None,
                     category='general', tags=None, notes='',
                     confidence_level='moderate'):
        """
        Record a new forecast.

        Parameters
        ----------
        event : str
            Description of the event being forecasted.
        probability : float
            Your predicted probability (0 to 1).
        market_price : float or None
            Current market price for comparison.
        category : str
            Category label (e.g., 'politics', 'sports', 'crypto').
        tags : list of str or None
            Optional tags for filtering.
        notes : str
            Your reasoning or notes.
        confidence_level : str
            Self-assessed confidence: 'low', 'moderate', 'high'.

        Returns
        -------
        int : The ID of the new forecast entry.
        """
        if not 0 <= probability <= 1:
            raise ValueError("Probability must be between 0 and 1.")

        entry = {
            'id': len(self.log) + 1,
            'event': event,
            'probability': round(probability, 4),
            'market_price': round(market_price, 4) if market_price is not None else None,
            'category': category,
            'tags': tags or [],
            'notes': notes,
            'confidence_level': confidence_level,
            'forecast_date': datetime.now().isoformat(),
            'resolution_date': None,
            'outcome': None,
        }
        self.log.append(entry)
        self._save()
        print(f"Forecast #{entry['id']} recorded: {event} @ {probability:.1%}")
        return entry['id']

    def resolve(self, event_id, outcome):
        """
        Record the outcome of a forecasted event.

        Parameters
        ----------
        event_id : int
            The forecast ID to resolve.
        outcome : int
            1 if the event occurred, 0 if it did not.

        Returns
        -------
        bool : True if successfully resolved.
        """
        for entry in self.log:
            if entry['id'] == event_id:
                if entry['outcome'] is not None:
                    print(f"Warning: Forecast #{event_id} already resolved.")
                    return False
                entry['outcome'] = int(outcome)
                entry['resolution_date'] = datetime.now().isoformat()
                self._save()
                print(f"Forecast #{event_id} resolved: {'YES' if outcome else 'NO'}")
                return True
        print(f"Forecast #{event_id} not found.")
        return False

    def bulk_resolve(self, resolutions):
        """
        Resolve multiple forecasts at once.

        Parameters
        ----------
        resolutions : dict
            Mapping of event_id to outcome (e.g., {1: 1, 2: 0, 3: 1}).
        """
        for event_id, outcome in resolutions.items():
            self.resolve(event_id, outcome)

    # ==========================================
    # QUERY METHODS
    # ==========================================

    def get_resolved(self, category=None, tags=None, min_date=None, max_date=None):
        """
        Get resolved forecasts with optional filters.

        Parameters
        ----------
        category : str or None
            Filter by category.
        tags : list of str or None
            Filter by tags (any match).
        min_date : str or None
            Minimum forecast date (ISO format).
        max_date : str or None
            Maximum forecast date (ISO format).

        Returns
        -------
        list of dict : Filtered resolved forecasts.
        """
        resolved = [e for e in self.log if e['outcome'] is not None]

        if category:
            resolved = [e for e in resolved if e['category'] == category]

        if tags:
            resolved = [e for e in resolved
                       if any(t in e.get('tags', []) for t in tags)]

        if min_date:
            resolved = [e for e in resolved if e['forecast_date'] >= min_date]

        if max_date:
            resolved = [e for e in resolved if e['forecast_date'] <= max_date]

        return resolved

    def get_pending(self):
        """Get all unresolved forecasts."""
        return [e for e in self.log if e['outcome'] is None]

    def summary(self):
        """Print a summary of the forecast log."""
        total = len(self.log)
        resolved = len([e for e in self.log if e['outcome'] is not None])
        pending = total - resolved
        categories = set(e['category'] for e in self.log)

        print(f"=== Forecast Log Summary ===")
        print(f"Total forecasts: {total}")
        print(f"Resolved: {resolved}")
        print(f"Pending: {pending}")
        print(f"Categories: {', '.join(sorted(categories))}")

        if resolved > 0:
            outcomes = [e['outcome'] for e in self.log if e['outcome'] is not None]
            print(f"Outcome rate: {np.mean(outcomes):.1%}")

    # ==========================================
    # CALIBRATION METRICS
    # ==========================================

    def _compute_ece(self, predictions, outcomes, n_bins=10):
        """Compute Expected Calibration Error."""
        predictions = np.array(predictions, dtype=float)
        outcomes = np.array(outcomes, dtype=float)

        bin_edges = np.linspace(0, 1, n_bins + 1)
        ece = 0.0
        bin_data = []

        for i in range(n_bins):
            if i < n_bins - 1:
                mask = (predictions >= bin_edges[i]) & (predictions < bin_edges[i + 1])
            else:
                mask = (predictions >= bin_edges[i]) & (predictions <= bin_edges[i + 1])

            count = mask.sum()
            if count > 0:
                mean_pred = predictions[mask].mean()
                mean_out = outcomes[mask].mean()
                ece += count * abs(mean_pred - mean_out)
                bin_data.append({
                    'bin_center': (bin_edges[i] + bin_edges[i + 1]) / 2,
                    'mean_pred': mean_pred,
                    'mean_out': mean_out,
                    'count': int(count),
                    'error': abs(mean_pred - mean_out),
                })

        ece /= len(predictions)
        return ece, bin_data

    def _compute_brier(self, predictions, outcomes):
        """Compute Brier score."""
        return np.mean((np.array(predictions) - np.array(outcomes)) ** 2)

    def _compute_bss(self, predictions, outcomes):
        """Compute Brier Skill Score against base rate."""
        bs = self._compute_brier(predictions, outcomes)
        base_rate = np.mean(outcomes)
        bs_ref = base_rate * (1 - base_rate)
        if bs_ref == 0:
            return 0.0
        return 1 - bs / bs_ref

    def _murphy_decomposition(self, predictions, outcomes, n_bins=10):
        """Compute Murphy decomposition of Brier score."""
        predictions = np.array(predictions, dtype=float)
        outcomes = np.array(outcomes, dtype=float)
        N = len(predictions)
        base_rate = outcomes.mean()

        bin_edges = np.linspace(0, 1, n_bins + 1)
        reliability = 0.0
        resolution = 0.0

        for i in range(n_bins):
            if i < n_bins - 1:
                mask = (predictions >= bin_edges[i]) & (predictions < bin_edges[i + 1])
            else:
                mask = (predictions >= bin_edges[i]) & (predictions <= bin_edges[i + 1])

            count = mask.sum()
            if count > 0:
                mean_pred = predictions[mask].mean()
                mean_out = outcomes[mask].mean()
                reliability += count * (mean_pred - mean_out) ** 2
                resolution += count * (mean_out - base_rate) ** 2

        reliability /= N
        resolution /= N
        uncertainty = base_rate * (1 - base_rate)

        return {
            'reliability': reliability,
            'resolution': resolution,
            'uncertainty': uncertainty,
            'brier_score': reliability - resolution + uncertainty,
        }

    # ==========================================
    # REPORTING
    # ==========================================

    def report(self, category=None, n_bins=5):
        """
        Generate a comprehensive calibration report.

        Parameters
        ----------
        category : str or None
            Filter by category. None for overall report.
        n_bins : int
            Number of bins for calibration analysis.

        Returns
        -------
        str : Formatted report text.
        """
        resolved = self.get_resolved(category=category)

        if len(resolved) < 10:
            msg = (f"Not enough resolved forecasts for a meaningful report "
                   f"({len(resolved)} resolved, need at least 10).")
            print(msg)
            return msg

        predictions = [e['probability'] for e in resolved]
        outcomes = [e['outcome'] for e in resolved]

        brier = self._compute_brier(predictions, outcomes)
        bss = self._compute_bss(predictions, outcomes)
        ece, bin_data = self._compute_ece(predictions, outcomes, n_bins)
        decomp = self._murphy_decomposition(predictions, outcomes, n_bins)
        sharpness = np.mean(np.abs(np.array(predictions) - 0.5))

        lines = []
        lines.append("=" * 55)
        lines.append("       PERSONAL CALIBRATION REPORT")
        lines.append("=" * 55)
        lines.append(f"  Date:       {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        lines.append(f"  Category:   {category or 'All categories'}")
        lines.append(f"  Forecasts:  {len(resolved)} resolved")
        lines.append("")
        lines.append("--- Scoring Metrics ---")
        lines.append(f"  Brier Score:     {brier:.4f}")
        lines.append(f"  BSS:             {bss:.4f}")
        lines.append(f"  Base rate:       {np.mean(outcomes):.4f}")
        lines.append("")
        lines.append("--- Calibration Metrics ---")
        lines.append(f"  ECE:             {ece:.4f}")
        lines.append(f"  Reliability:     {decomp['reliability']:.6f}")
        lines.append(f"  Resolution:      {decomp['resolution']:.6f}")
        lines.append(f"  Uncertainty:     {decomp['uncertainty']:.6f}")
        lines.append(f"  Sharpness (MAD): {sharpness:.4f}")
        lines.append("")
        lines.append("--- Calibration by Bin ---")
        lines.append(f"  {'Predicted':>10} {'Observed':>10} {'Count':>7} {'Error':>8}")

        for bd in bin_data:
            lines.append(f"  {bd['mean_pred']:>10.3f} {bd['mean_out']:>10.3f} "
                        f"{bd['count']:>7d} {bd['error']:>8.3f}")

        # Diagnosis
        lines.append("")
        lines.append("--- Diagnosis ---")
        lines.extend(self._diagnose(predictions, outcomes, ece, bss, sharpness, bin_data))

        report_text = '\n'.join(lines)
        print(report_text)
        return report_text

    def _diagnose(self, predictions, outcomes, ece, bss, sharpness, bin_data):
        """Generate diagnostic advice based on metrics."""
        advice = []

        # Overall calibration assessment
        if ece < 0.02:
            advice.append("  [CALIBRATION] Excellent calibration (ECE < 0.02).")
        elif ece < 0.05:
            advice.append("  [CALIBRATION] Good calibration (ECE < 0.05).")
        elif ece < 0.10:
            advice.append("  [CALIBRATION] Mediocre calibration. Review systematic biases.")
        else:
            advice.append("  [CALIBRATION] Poor calibration. Significant improvement needed.")

        # Overconfidence/underconfidence detection
        high_bins = [b for b in bin_data if b['mean_pred'] > 0.6]
        low_bins = [b for b in bin_data if b['mean_pred'] < 0.4]

        if high_bins:
            high_error = np.mean([b['mean_pred'] - b['mean_out'] for b in high_bins])
            if high_error > 0.05:
                advice.append("  [BIAS] Overconfident for high-probability events. "
                            "Consider reducing extreme high predictions by 5-10%.")
            elif high_error < -0.05:
                advice.append("  [BIAS] Underconfident for high-probability events. "
                            "You can be more confident in your strong beliefs.")

        if low_bins:
            low_error = np.mean([b['mean_pred'] - b['mean_out'] for b in low_bins])
            if low_error < -0.05:
                advice.append("  [BIAS] Overconfident for low-probability events. "
                            "Events you dismiss happen more than you expect.")
            elif low_error > 0.05:
                advice.append("  [BIAS] Underconfident for low-probability events. "
                            "You are assigning too much probability to unlikely events.")

        # Sharpness assessment
        if sharpness < 0.10:
            advice.append("  [SHARPNESS] Low sharpness. You cluster near 50%. "
                        "Practice making more extreme predictions.")
        elif sharpness > 0.30:
            advice.append("  [SHARPNESS] High sharpness. Ensure this is matched "
                        "by correspondingly good calibration at the extremes.")

        # BSS assessment
        if bss < 0:
            advice.append("  [SKILL] Negative BSS! You would do better predicting "
                        "the base rate. Strongly consider recalibration.")
        elif bss < 0.05:
            advice.append("  [SKILL] Marginal skill. Your predictions add little "
                        "value beyond the base rate.")
        elif bss < 0.15:
            advice.append("  [SKILL] Good skill. You are meaningfully outperforming "
                        "the base rate.")
        else:
            advice.append("  [SKILL] Excellent skill. Strong forecasting performance.")

        return advice

    # ==========================================
    # MARKET COMPARISON
    # ==========================================

    def compare_to_market(self, category=None):
        """
        Compare personal calibration to market prices.

        Shows whether you add value beyond market consensus.
        """
        resolved = self.get_resolved(category=category)
        with_market = [e for e in resolved if e['market_price'] is not None]

        if len(with_market) < 10:
            print("Not enough forecasts with market prices for comparison.")
            return

        my_preds = [e['probability'] for e in with_market]
        market_preds = [e['market_price'] for e in with_market]
        outcomes = [e['outcome'] for e in with_market]

        my_brier = self._compute_brier(my_preds, outcomes)
        market_brier = self._compute_brier(market_preds, outcomes)
        my_ece, _ = self._compute_ece(my_preds, outcomes, n_bins=5)
        market_ece, _ = self._compute_ece(market_preds, outcomes, n_bins=5)

        # BSS of personal forecast relative to market
        bss_vs_market = 1 - my_brier / market_brier if market_brier > 0 else 0

        print("=" * 55)
        print("       YOU vs. THE MARKET")
        print("=" * 55)
        print(f"  Forecasts compared: {len(with_market)}")
        print(f"  Category: {category or 'All'}")
        print("")
        print(f"  {'Metric':<20} {'You':>10} {'Market':>10}")
        print(f"  {'-'*40}")
        print(f"  {'Brier Score':<20} {my_brier:>10.4f} {market_brier:>10.4f}")
        print(f"  {'ECE':<20} {my_ece:>10.4f} {market_ece:>10.4f}")
        print("")
        print(f"  BSS (you vs. market): {bss_vs_market:.4f}")

        if bss_vs_market > 0.02:
            print("  >> You are outperforming the market. Keep it up!")
        elif bss_vs_market > -0.02:
            print("  >> You are roughly matching the market.")
        else:
            print("  >> The market is outperforming you. Consider following "
                  "market prices more closely.")

    # ==========================================
    # ROLLING ANALYSIS
    # ==========================================

    def rolling_calibration(self, window_size=50, category=None):
        """
        Compute rolling ECE over resolved forecasts.

        Parameters
        ----------
        window_size : int
            Number of forecasts in each rolling window.
        category : str or None
            Filter by category.

        Returns
        -------
        list of dict : Rolling ECE values with timestamps.
        """
        resolved = self.get_resolved(category=category)
        resolved.sort(key=lambda e: e['resolution_date'] or e['forecast_date'])

        if len(resolved) < window_size:
            print(f"Need at least {window_size} resolved forecasts for rolling analysis.")
            return []

        rolling_data = []
        for i in range(window_size, len(resolved) + 1):
            window = resolved[i - window_size:i]
            preds = [e['probability'] for e in window]
            outs = [e['outcome'] for e in window]

            ece, _ = self._compute_ece(preds, outs, n_bins=5)
            bss = self._compute_bss(preds, outs)

            rolling_data.append({
                'end_index': i,
                'end_date': window[-1].get('resolution_date', window[-1]['forecast_date']),
                'ece': ece,
                'bss': bss,
                'n': len(window),
            })

        return rolling_data

    # ==========================================
    # CATEGORY BREAKDOWN
    # ==========================================

    def category_breakdown(self, min_forecasts=10):
        """
        Show calibration metrics broken down by category.

        Parameters
        ----------
        min_forecasts : int
            Minimum resolved forecasts per category to include.
        """
        categories = set(e['category'] for e in self.log)

        print("=" * 65)
        print("       CALIBRATION BY CATEGORY")
        print("=" * 65)
        print(f"  {'Category':<15} {'N':>5} {'ECE':>8} {'BSS':>8} {'Brier':>8} {'Sharp':>8}")
        print(f"  {'-' * 53}")

        for cat in sorted(categories):
            resolved = self.get_resolved(category=cat)
            if len(resolved) < min_forecasts:
                continue

            preds = [e['probability'] for e in resolved]
            outs = [e['outcome'] for e in resolved]

            ece, _ = self._compute_ece(preds, outs, n_bins=5)
            bss = self._compute_bss(preds, outs)
            brier = self._compute_brier(preds, outs)
            sharpness = np.mean(np.abs(np.array(preds) - 0.5))

            print(f"  {cat:<15} {len(resolved):>5} {ece:>8.4f} {bss:>8.4f} "
                  f"{brier:>8.4f} {sharpness:>8.4f}")

    # ==========================================
    # GOAL SETTING AND TRACKING
    # ==========================================

    def set_goals(self, ece_target=0.03, bss_target=0.10, sharpness_target=0.20):
        """
        Set calibration improvement goals.

        Parameters
        ----------
        ece_target : float
            Target ECE (lower is better).
        bss_target : float
            Target BSS (higher is better).
        sharpness_target : float
            Target sharpness MAD (higher is better, subject to calibration).
        """
        self.goals = {
            'ece_target': ece_target,
            'bss_target': bss_target,
            'sharpness_target': sharpness_target,
        }

        resolved = self.get_resolved()
        if len(resolved) < 10:
            print("Set goals successfully. Resolve more forecasts to track progress.")
            return

        preds = [e['probability'] for e in resolved]
        outs = [e['outcome'] for e in resolved]

        current_ece, _ = self._compute_ece(preds, outs)
        current_bss = self._compute_bss(preds, outs)
        current_sharpness = np.mean(np.abs(np.array(preds) - 0.5))

        print("=" * 55)
        print("       CALIBRATION GOALS")
        print("=" * 55)
        print(f"  {'Metric':<15} {'Current':>10} {'Target':>10} {'Status':>10}")
        print(f"  {'-' * 45}")

        ece_status = 'MET' if current_ece <= ece_target else 'NOT MET'
        bss_status = 'MET' if current_bss >= bss_target else 'NOT MET'
        sharp_status = 'MET' if current_sharpness >= sharpness_target else 'NOT MET'

        print(f"  {'ECE':<15} {current_ece:>10.4f} {ece_target:>10.4f} {ece_status:>10}")
        print(f"  {'BSS':<15} {current_bss:>10.4f} {bss_target:>10.4f} {bss_status:>10}")
        print(f"  {'Sharpness':<15} {current_sharpness:>10.4f} {sharpness_target:>10.4f} "
              f"{sharp_status:>10}")

Part 3: Usage Walkthrough

Starting the Tracker

# Initialize the tracker
tracker = CalibrationTracker('my_trading_forecasts.json')

# Log some forecasts
tracker.add_forecast(
    "Will the Fed cut rates by 50bp at March meeting?",
    probability=0.25,
    market_price=0.22,
    category="economics",
    tags=["fed", "rates"],
    notes="Inflation data still above target, but labor market cooling"
)

tracker.add_forecast(
    "Will candidate X win the primary?",
    probability=0.60,
    market_price=0.55,
    category="politics",
    tags=["election", "primary"],
    notes="Strong polling lead, good ground game"
)

tracker.add_forecast(
    "Will ETH exceed $5000 by end of Q1?",
    probability=0.35,
    market_price=0.40,
    category="crypto",
    tags=["ethereum", "price"],
    notes="Bullish sentiment but resistance at $4500"
)

Resolving Forecasts

# As events resolve, record outcomes
tracker.resolve(1, outcome=0)  # Fed did not cut by 50bp
tracker.resolve(2, outcome=1)  # Candidate X won
tracker.resolve(3, outcome=0)  # ETH did not reach $5000

Generating Reports

# After accumulating enough resolved forecasts...
tracker.report()
tracker.compare_to_market()
tracker.category_breakdown()

Part 4: Simulating a Learning Journey

To demonstrate the tracker's analytical capabilities, we simulate a trader who makes 500 forecasts over time, gradually improving their calibration through feedback.

def simulate_learning_trader(n_forecasts=500, initial_bias=0.15, learning_rate=0.003):
    """
    Simulate a trader who starts overconfident and gradually improves.

    Parameters
    ----------
    n_forecasts : int
        Number of forecasts to simulate.
    initial_bias : float
        Initial overconfidence bias (predictions are scaled by 1 + bias away from 0.5).
    learning_rate : float
        Rate at which bias decreases per resolved forecast.
    """
    tracker = CalibrationTracker('simulated_learning.json')

    # Clear any existing data
    tracker.log = []

    categories = ['politics', 'sports', 'crypto', 'economics', 'science']
    bias = initial_bias

    for i in range(n_forecasts):
        # Generate a "true" probability
        true_prob = np.random.beta(2, 2)

        # Apply current bias (overconfidence: push away from 0.5)
        biased_prob = 0.5 + (1 + bias) * (true_prob - 0.5)
        biased_prob = np.clip(biased_prob, 0.02, 0.98)

        # Simulate market price (true_prob + small noise)
        market_price = np.clip(true_prob + np.random.normal(0, 0.03), 0.02, 0.98)

        # Generate outcome
        outcome = np.random.binomial(1, true_prob)

        # Choose category
        cat = np.random.choice(categories, p=[0.3, 0.25, 0.2, 0.15, 0.1])

        # Add and immediately resolve (for simulation purposes)
        entry = {
            'id': i + 1,
            'event': f'Simulated event {i+1}',
            'probability': round(float(biased_prob), 4),
            'market_price': round(float(market_price), 4),
            'category': cat,
            'tags': [],
            'notes': '',
            'confidence_level': 'moderate',
            'forecast_date': (datetime.now() - timedelta(days=n_forecasts - i)).isoformat(),
            'resolution_date': (datetime.now() - timedelta(days=n_forecasts - i - 1)).isoformat(),
            'outcome': int(outcome),
        }
        tracker.log.append(entry)

        # Learning: reduce bias over time (simulating calibration feedback)
        bias = max(0, bias - learning_rate)

    tracker._save()
    return tracker

# Run the simulation
tracker = simulate_learning_trader()

# Generate reports
print("\n--- FULL REPORT ---")
tracker.report(n_bins=5)

print("\n--- CATEGORY BREAKDOWN ---")
tracker.category_breakdown(min_forecasts=20)

print("\n--- VS. MARKET ---")
tracker.compare_to_market()

Analyzing the Learning Curve

def plot_learning_curve(tracker, window_size=50):
    """Plot how ECE improves over time."""
    rolling = tracker.rolling_calibration(window_size=window_size)

    if not rolling:
        print("Not enough data for rolling analysis.")
        return

    indices = [r['end_index'] for r in rolling]
    eces = [r['ece'] for r in rolling]
    bsses = [r['bss'] for r in rolling]

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True)

    ax1.plot(indices, eces, color='#d62728', linewidth=2)
    ax1.axhline(y=0.03, color='green', linestyle='--', alpha=0.7, label='Goal (ECE < 0.03)')
    ax1.set_ylabel('ECE (lower is better)', fontsize=12)
    ax1.set_title('Calibration Improvement Over Time', fontsize=14)
    ax1.legend(fontsize=10)
    ax1.grid(True, alpha=0.3)

    ax2.plot(indices, bsses, color='#1f77b4', linewidth=2)
    ax2.axhline(y=0.10, color='green', linestyle='--', alpha=0.7, label='Goal (BSS > 0.10)')
    ax2.set_xlabel('Number of Resolved Forecasts', fontsize=12)
    ax2.set_ylabel('BSS (higher is better)', fontsize=12)
    ax2.legend(fontsize=10)
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig('learning_curve.png', dpi=150, bbox_inches='tight')
    plt.show()

# Requires matplotlib
# plot_learning_curve(tracker)

Part 5: Improvement Strategies

Based on the diagnostic output, here are the specific improvement strategies the tracker recommends:

Strategy 1: Overconfidence Correction

If the tracker detects overconfidence (high-probability events occur less than predicted):

  1. Before each forecast, explicitly list three reasons the event might NOT happen.
  2. Apply a mechanical correction: Reduce extreme predictions by 5-10 percentage points.
  3. Track the correction: Log both your "gut" probability and your "adjusted" probability. Over time, see which is better calibrated.

Strategy 2: Sharpness Improvement

If the tracker detects low sharpness (predictions cluster near 50%):

  1. Force yourself to use the full probability scale. Practice making predictions at 15%, 25%, 75%, 85%.
  2. Identify your strongest beliefs and push them to more extreme values.
  3. Use the "confidence scale": Before any forecast, rate your confidence as "very low / low / medium / high / very high" and map these to specific probability ranges.

Strategy 3: Category-Specific Improvement

If the tracker reveals miscalibration in specific categories:

  1. Focus learning on your weakest category. Read more, track base rates, study outcomes.
  2. Temporarily avoid trading in categories where your BSS is negative.
  3. Consider using market prices as your forecast in weak categories until your calibration improves.

Strategy 4: Base Rate Anchoring

For all categories:

  1. Before each forecast, estimate the base rate for this type of event.
  2. Adjust from the base rate based on specific evidence. Do not anchor to arbitrary round numbers.
  3. Track your base rate estimates and compare them to actual base rates over time.

Part 6: Exporting and Sharing

def export_to_csv(tracker, filepath='forecasts_export.csv'):
    """Export forecast log to CSV for analysis in other tools."""
    import csv

    resolved = tracker.get_resolved()
    if not resolved:
        print("No resolved forecasts to export.")
        return

    fieldnames = ['id', 'event', 'probability', 'market_price', 'category',
                  'forecast_date', 'resolution_date', 'outcome']

    with open(filepath, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
        writer.writeheader()
        writer.writerows(resolved)

    print(f"Exported {len(resolved)} forecasts to {filepath}")

Conclusions

This case study demonstrated how to build a practical, production-ready calibration tracker. The key features are:

  1. Persistent storage in JSON format for easy portability.
  2. Comprehensive metrics including ECE, BSS, Murphy decomposition, and sharpness.
  3. Automated diagnosis that identifies specific calibration problems and provides actionable advice.
  4. Market comparison to assess whether your personal forecasts add value beyond market consensus.
  5. Rolling analysis to track improvement over time.
  6. Category breakdown to identify domain-specific strengths and weaknesses.

The most important takeaway is that calibration is a skill that improves with deliberate practice and feedback. The tracker provides the feedback; the improvement comes from consistently applying the diagnostic insights to your forecasting process.

Start logging your forecasts today. Even 50 resolved predictions can reveal actionable patterns in your calibration. The earlier you start, the sooner you can identify and correct your systematic biases.


The complete code for this case study is available in code/case-study-code.py.