Case Study 2: Evaluating Platform Quality — A Data-Driven Approach

Overview

Not all prediction market platforms are created equal. Some have deeper liquidity, faster resolution, better calibration, or more diverse market coverage. But how do you measure "quality" objectively?

In this case study, we will develop a quantitative Platform Quality Index (PQI) that scores prediction market platforms across multiple dimensions. We will define metrics, collect data, compute scores, and produce a rigorous comparison.


The Challenge

You are advising a research organization that wants to incorporate prediction market data into its decision-making process. The organization needs to decide which platform(s) to rely on. They want a data-driven framework — not just opinions.

Your task: Build a scoring system that evaluates platforms across measurable dimensions of quality, then apply it to Polymarket, Kalshi, Metaculus, and Manifold Markets.


Part 1: Defining Quality Dimensions

A good platform quality framework should capture multiple aspects of what makes a platform useful. We propose six dimensions:

Dimension 1: Market Breadth

What it measures: How many topics and markets does the platform cover?

Metrics: - Total number of active markets - Number of distinct categories/topics - Rate of new market creation (markets per week) - Coverage of "important" events (do major world events have markets?)

Data Collection: Use each platform's API to count active markets and categorize them.

Dimension 2: Liquidity Depth

What it measures: How easy is it to trade meaningful amounts without moving the price?

Metrics: - Median trading volume per market (in USD or equivalent) - Average bid-ask spread (for CLOB platforms) - Depth within 5 cents of midpoint (dollars available) - Percentage of markets with volume > $10,000

Data Collection: Fetch order book data and volume statistics via API.

Dimension 3: Forecast Accuracy (Calibration)

What it measures: How well do the platform's prices/forecasts predict actual outcomes?

Metrics: - Calibration error: average |predicted probability - actual frequency| across probability buckets - Brier score on resolved markets - Log score on resolved markets - Comparison to a naive baseline (e.g., always predicting 50%)

Data Collection: Gather resolved markets with their final prices and actual outcomes.

Dimension 4: Resolution Quality

What it measures: How reliably and quickly does the platform resolve markets?

Metrics: - Average time from event occurrence to market resolution (hours) - Number of disputed resolutions as a fraction of total resolutions - Frequency of N/A or voided resolutions - Transparency of resolution criteria

Data Collection: Compare event timestamps to resolution timestamps; review dispute records.

Dimension 5: Data Accessibility

What it measures: How easy is it to access platform data programmatically?

Metrics: - API availability (public? authenticated? WebSocket?) - Rate limits (requests per minute) - Data completeness (what fraction of fields are available?) - Documentation quality (1-5 subjective rating) - Historical data availability - Open source status

Data Collection: Review API documentation; test endpoints.

Dimension 6: User Experience and Trust

What it measures: How trustworthy and usable is the platform?

Metrics: - Regulatory status (score: 3=CFTC-regulated, 2=offshore entity, 1=play money, 0=fully decentralized) - Uptime reliability (downtime incidents in past year) - Withdrawal speed (hours to withdraw funds) - Customer support responsiveness - Transparency of operations

Data Collection: A combination of automated monitoring and manual review.


Part 2: Scoring Methodology

Normalization

Each metric needs to be normalized to a 0-100 scale so metrics can be combined across dimensions. We use min-max normalization within each metric:

$$\text{Score}_i = \frac{x_i - x_{\min}}{x_{\max} - x_{\min}} \times 100$$

where $x_i$ is the platform's raw value, $x_{\min}$ is the worst value among all platforms, and $x_{\max}$ is the best value.

For metrics where lower is better (e.g., resolution time, calibration error), invert the score:

$$\text{Score}_i = \left(1 - \frac{x_i - x_{\min}}{x_{\max} - x_{\min}}\right) \times 100$$

Dimension Scores

Each dimension score is the average of its constituent metric scores:

$$D_j = \frac{1}{|M_j|} \sum_{m \in M_j} \text{Score}_m$$

where $M_j$ is the set of metrics in dimension $j$.

Composite PQI

The Platform Quality Index is a weighted average of dimension scores:

$$\text{PQI} = \sum_{j=1}^{6} w_j \cdot D_j$$

where $\sum w_j = 1$.

Default weights (adjustable based on use case):

Dimension Default Weight Research Weight Trading Weight
Market Breadth 0.15 0.20 0.10
Liquidity Depth 0.20 0.10 0.30
Forecast Accuracy 0.25 0.30 0.20
Resolution Quality 0.15 0.15 0.15
Data Accessibility 0.15 0.20 0.10
User Experience/Trust 0.10 0.05 0.15

Part 3: Data Collection Exercise

Question 3.1: Market Breadth Data

Using each platform's API, collect the following data:

"""
Platform Quality Index — Market Breadth Data Collection

Collects market count and category data from each platform.
"""

import httpx
from collections import Counter


def count_polymarket_markets() -> dict:
    """Count active markets on Polymarket."""
    url = "https://gamma-api.polymarket.com/markets"

    all_markets = []
    offset = 0
    limit = 100

    while True:
        params = {
            "limit": limit,
            "offset": offset,
            "active": True,
            "closed": False,
        }

        try:
            response = httpx.get(url, params=params, timeout=30.0)
            response.raise_for_status()
            batch = response.json()
        except Exception as e:
            print(f"  Error at offset {offset}: {e}")
            break

        if not batch:
            break

        all_markets.extend(batch)
        offset += limit

        if len(batch) < limit:
            break

    # Categorize
    categories = Counter()
    for m in all_markets:
        cat = m.get("category", "uncategorized") or "uncategorized"
        categories[cat] += 1

    return {
        "total_active": len(all_markets),
        "categories": dict(categories),
        "num_categories": len(categories),
    }


def count_manifold_markets() -> dict:
    """Count active markets on Manifold."""
    url = "https://api.manifold.markets/v0/search-markets"

    # Manifold API returns up to 1000 at a time
    params = {"limit": 1000, "sort": "created-time", "order": "desc"}

    try:
        response = httpx.get(url, params=params, timeout=30.0)
        response.raise_for_status()
        markets = response.json()
    except Exception as e:
        print(f"  Error: {e}")
        return {"total_active": 0, "categories": {}, "num_categories": 0}

    # Count by group/topic
    categories = Counter()
    for m in markets:
        groups = m.get("groupSlugs", [])
        if groups:
            categories[groups[0]] += 1
        else:
            categories["uncategorized"] += 1

    return {
        "total_active": len(markets),
        "categories": dict(categories.most_common(20)),
        "num_categories": len(categories),
    }


def count_metaculus_questions() -> dict:
    """Count open questions on Metaculus."""
    url = "https://www.metaculus.com/api2/questions/"

    params = {
        "limit": 100,
        "status": "open",
        "order_by": "-activity",
    }

    try:
        response = httpx.get(url, params=params, timeout=30.0)
        response.raise_for_status()
        data = response.json()
    except Exception as e:
        print(f"  Error: {e}")
        return {"total_active": 0, "categories": {}, "num_categories": 0}

    total = data.get("count", len(data.get("results", [])))
    questions = data.get("results", [])

    categories = Counter()
    for q in questions:
        # Metaculus uses tags/categories
        cats = q.get("categories", [])
        if cats:
            for c in cats:
                if isinstance(c, dict):
                    categories[c.get("name", "uncategorized")] += 1
                elif isinstance(c, str):
                    categories[c] += 1
        else:
            categories["uncategorized"] += 1

    return {
        "total_active": total,
        "categories": dict(categories.most_common(20)),
        "num_categories": len(categories),
    }


if __name__ == "__main__":
    print("Collecting Market Breadth Data...\n")

    print("Polymarket:")
    poly = count_polymarket_markets()
    print(f"  Active Markets: {poly['total_active']}")
    print(f"  Categories: {poly['num_categories']}")
    for cat, count in sorted(
        poly['categories'].items(), key=lambda x: -x[1]
    )[:10]:
        print(f"    {cat}: {count}")
    print()

    print("Manifold:")
    mani = count_manifold_markets()
    print(f"  Active Markets: {mani['total_active']}")
    print(f"  Categories: {mani['num_categories']}")
    for cat, count in sorted(
        mani['categories'].items(), key=lambda x: -x[1]
    )[:10]:
        print(f"    {cat}: {count}")
    print()

    print("Metaculus:")
    meta = count_metaculus_questions()
    print(f"  Open Questions: {meta['total_active']}")
    print(f"  Categories: {meta['num_categories']}")
    for cat, count in sorted(
        meta['categories'].items(), key=lambda x: -x[1]
    )[:10]:
        print(f"    {cat}: {count}")

Run this script and record the results.

Question 3.2: Liquidity Analysis

For Polymarket markets, calculate the following metrics:

"""
Platform Quality Index — Liquidity Analysis

Analyzes liquidity metrics for Polymarket markets.
"""

import httpx
import statistics


def analyze_polymarket_liquidity(num_markets: int = 50) -> dict:
    """
    Analyze liquidity across Polymarket markets.

    Returns:
        Dictionary of liquidity metrics.
    """
    url = "https://gamma-api.polymarket.com/markets"
    params = {
        "limit": num_markets,
        "active": True,
        "closed": False,
    }

    response = httpx.get(url, params=params, timeout=30.0)
    response.raise_for_status()
    markets = response.json()

    volumes = []
    liquidities = []

    for m in markets:
        try:
            vol = float(m.get("volume", 0))
            liq = float(m.get("liquidity", 0))
            volumes.append(vol)
            liquidities.append(liq)
        except (ValueError, TypeError):
            continue

    if not volumes:
        return {}

    return {
        "num_markets_analyzed": len(volumes),
        "total_volume": sum(volumes),
        "median_volume": statistics.median(volumes),
        "mean_volume": statistics.mean(volumes),
        "max_volume": max(volumes),
        "min_volume": min(volumes),
        "total_liquidity": sum(liquidities),
        "median_liquidity": statistics.median(liquidities),
        "pct_above_10k_volume": sum(
            1 for v in volumes if v > 10000
        ) / len(volumes) * 100,
        "pct_above_100k_volume": sum(
            1 for v in volumes if v > 100000
        ) / len(volumes) * 100,
    }


if __name__ == "__main__":
    print("Analyzing Polymarket Liquidity...\n")
    metrics = analyze_polymarket_liquidity(50)

    for key, value in metrics.items():
        if isinstance(value, float):
            print(f"  {key}: {value:,.2f}")
        else:
            print(f"  {key}: {value}")

Question 3.3: Calibration Assessment

To assess calibration, we need resolved markets. Manifold Markets provides the easiest access to historical resolution data:

"""
Platform Quality Index — Calibration Assessment

Assesses forecast calibration using resolved Manifold markets.
"""

import httpx
import statistics
from collections import defaultdict


def assess_manifold_calibration(num_markets: int = 500) -> dict:
    """
    Assess calibration of Manifold Markets by examining
    resolved binary markets.

    Returns:
        Calibration metrics dictionary.
    """
    url = "https://api.manifold.markets/v0/search-markets"
    params = {
        "limit": min(num_markets, 1000),
        "sort": "close-date",
        "filter": "resolved",
    }

    response = httpx.get(url, params=params, timeout=60.0)
    response.raise_for_status()
    markets = response.json()

    # Group by probability bucket
    buckets = defaultdict(list)
    bucket_size = 0.10  # 10% buckets

    valid_count = 0

    for m in markets:
        # Only binary markets
        if m.get("outcomeType") != "BINARY":
            continue

        prob = m.get("probability")
        resolution = m.get("resolution")

        if prob is None or resolution is None:
            continue

        # Convert resolution to numeric
        if resolution == "YES":
            outcome = 1.0
        elif resolution == "NO":
            outcome = 0.0
        else:
            continue  # Skip CANCEL, N/A, etc.

        # Use the close probability (last probability before close)
        close_prob = m.get("closeProb", prob)

        # Assign to bucket
        bucket = min(int(close_prob / bucket_size), 9)
        bucket_label = f"{bucket * 10}-{(bucket + 1) * 10}%"
        buckets[bucket_label].append(outcome)
        valid_count += 1

    # Calculate calibration
    calibration = {}
    calibration_errors = []

    for bucket_label in sorted(buckets.keys()):
        outcomes = buckets[bucket_label]
        actual_rate = statistics.mean(outcomes)
        # Extract predicted midpoint from label
        low = int(bucket_label.split("-")[0]) / 100
        high = int(bucket_label.split("-")[1].rstrip("%")) / 100
        predicted_mid = (low + high) / 2

        error = abs(actual_rate - predicted_mid)
        calibration_errors.append(error)

        calibration[bucket_label] = {
            "count": len(outcomes),
            "actual_yes_rate": actual_rate,
            "predicted_midpoint": predicted_mid,
            "error": error,
        }

    mean_calibration_error = (
        statistics.mean(calibration_errors) if calibration_errors else None
    )

    return {
        "total_resolved_analyzed": valid_count,
        "buckets": calibration,
        "mean_absolute_calibration_error": mean_calibration_error,
    }


if __name__ == "__main__":
    print("Assessing Manifold Markets Calibration...\n")
    result = assess_manifold_calibration(500)

    print(f"Markets analyzed: {result['total_resolved_analyzed']}\n")
    print(f"{'Bucket':<12} {'Count':>6} {'Actual':>8} {'Predicted':>10} {'Error':>8}")
    print("-" * 48)

    for bucket, data in sorted(result['buckets'].items()):
        print(
            f"{bucket:<12} {data['count']:>6} "
            f"{data['actual_yes_rate']:>7.1%} "
            f"{data['predicted_midpoint']:>9.1%} "
            f"{data['error']:>7.1%}"
        )

    mace = result['mean_absolute_calibration_error']
    if mace is not None:
        print(f"\nMean Absolute Calibration Error: {mace:.3f}")

Part 4: Computing the Platform Quality Index

Question 4.1: Gather Raw Data

Using the scripts above (and manual research where API data is unavailable), fill in the following raw data table. Some cells will need to be estimated or marked as "N/A":

Metric Polymarket Kalshi Metaculus Manifold
Active markets ? ? ? ?
Number of categories ? ? ? ?
Median volume per market (USD) ? ? N/A ? (Mana)
Pct markets > $10K volume ? ? N/A N/A
Mean calibration error ? ? ? ?
Avg resolution time (hours) ? ? ? ?
Disputed resolutions (%) ? ? ? ?
API rate limit (req/min) ~60 ~600 ~30 ~100
Historical data available Yes Yes Yes Yes
Open source Partial No Partial Yes
Regulatory score (0-3) 1 3 1 1
Withdrawal speed (hours) ~1 ~48 N/A N/A

Question 4.2: Normalize and Score

Apply min-max normalization to each metric. For metrics where higher is better (market count, volume, API rate limit), use standard normalization. For metrics where lower is better (calibration error, resolution time), use inverted normalization.

Compute dimension scores and the composite PQI using the default weights.

Question 4.3: Sensitivity Analysis

Recalculate the PQI using: 1. Research weights (emphasize accuracy and data access) 2. Trading weights (emphasize liquidity and trust) 3. Equal weights (all dimensions weighted equally)

Does the ranking of platforms change? Which platform is most robust to weight changes?


Part 5: Interpreting Results

Question 5.1: Comparative Analysis

Based on your computed PQI scores, write a comparative analysis addressing:

  1. Which platform scores highest overall? Is this the platform you expected?
  2. Which platform has the most uneven profile (high in some dimensions, low in others)?
  3. What is the single biggest weakness of each platform?
  4. If you could combine features from different platforms, what would the "ideal" prediction market look like?

Question 5.2: Limitations of the Framework

Discuss the limitations of this quantitative approach:

  1. What important qualities are not captured by our metrics? (e.g., community culture, innovation speed, alignment with user values)
  2. How stable are these scores over time? Would the rankings change significantly if measured six months later?
  3. Does the framework unfairly favor certain types of platforms? (e.g., real-money platforms score higher on liquidity, play-money platforms score higher on breadth)
  4. How should we handle N/A values? When a metric is not applicable (e.g., volume for Metaculus), how should it be scored?

Question 5.3: Actionable Recommendations

Based on your analysis, provide specific recommendations for the following stakeholders:

  1. A hedge fund wanting to use prediction market data as an input signal.
  2. A journalist wanting to cite prediction market probabilities in articles.
  3. A government agency wanting to use forecasting for policy planning.
  4. A startup wanting to build a prediction market aggregator product.

For each, recommend: which platform(s) to use, what data to focus on, and what caveats to keep in mind.


Part 6: Building an Automated Quality Monitor

Question 6.1: Monitoring Script

Design a Python script that runs daily and tracks platform quality metrics over time. The script should:

  1. Fetch current market counts, volume data, and (where available) recent resolution data from each platform.
  2. Store the results in a local CSV or SQLite database.
  3. Compute rolling averages for each metric.
  4. Flag significant changes (e.g., "Polymarket active market count dropped by 20% this week").

Outline the architecture and implement the core data collection loop.

"""
Platform Quality Monitor — Daily Collection Script (Outline)

This script collects platform quality metrics daily and stores them.
"""

import httpx
import csv
import sqlite3
from datetime import datetime, timezone
from pathlib import Path


DB_PATH = "platform_quality.db"


def init_db():
    """Create the database table if it does not exist."""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS daily_metrics (
            date TEXT NOT NULL,
            platform TEXT NOT NULL,
            active_markets INTEGER,
            total_volume REAL,
            median_volume REAL,
            api_latency_ms REAL,
            PRIMARY KEY (date, platform)
        )
    """)
    conn.commit()
    conn.close()


def measure_api_latency(url: str, params: dict = None) -> float:
    """
    Measure API response time in milliseconds.

    Args:
        url: The API endpoint to test.
        params: Query parameters.

    Returns:
        Response time in milliseconds.
    """
    import time

    start = time.perf_counter()
    try:
        response = httpx.get(url, params=params or {}, timeout=30.0)
        response.raise_for_status()
    except Exception:
        return -1.0
    end = time.perf_counter()

    return (end - start) * 1000


def collect_polymarket_metrics() -> dict:
    """Collect daily metrics from Polymarket."""
    url = "https://gamma-api.polymarket.com/markets"

    latency = measure_api_latency(url, {"limit": 1})

    # Fetch a larger batch for stats
    try:
        response = httpx.get(
            url,
            params={"limit": 100, "active": True, "closed": False},
            timeout=30.0,
        )
        response.raise_for_status()
        markets = response.json()
    except Exception:
        markets = []

    volumes = []
    for m in markets:
        try:
            volumes.append(float(m.get("volume", 0)))
        except (ValueError, TypeError):
            pass

    import statistics

    return {
        "platform": "Polymarket",
        "active_markets": len(markets),
        "total_volume": sum(volumes) if volumes else 0,
        "median_volume": statistics.median(volumes) if volumes else 0,
        "api_latency_ms": latency,
    }


def collect_manifold_metrics() -> dict:
    """Collect daily metrics from Manifold."""
    url = "https://api.manifold.markets/v0/search-markets"

    latency = measure_api_latency(url, {"limit": 1})

    try:
        response = httpx.get(
            url,
            params={"limit": 100, "sort": "liquidity", "order": "desc"},
            timeout=30.0,
        )
        response.raise_for_status()
        markets = response.json()
    except Exception:
        markets = []

    volumes = []
    for m in markets:
        try:
            volumes.append(float(m.get("volume", 0)))
        except (ValueError, TypeError):
            pass

    import statistics

    return {
        "platform": "Manifold",
        "active_markets": len(markets),
        "total_volume": sum(volumes) if volumes else 0,
        "median_volume": statistics.median(volumes) if volumes else 0,
        "api_latency_ms": latency,
    }


def store_metrics(metrics: dict):
    """Store a metrics record in the database."""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")

    cursor.execute(
        """
        INSERT OR REPLACE INTO daily_metrics
        (date, platform, active_markets, total_volume, median_volume, api_latency_ms)
        VALUES (?, ?, ?, ?, ?, ?)
        """,
        (
            today,
            metrics["platform"],
            metrics["active_markets"],
            metrics["total_volume"],
            metrics["median_volume"],
            metrics["api_latency_ms"],
        ),
    )

    conn.commit()
    conn.close()


def run_daily_collection():
    """Run the daily metrics collection."""
    init_db()

    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    print(f"Collecting metrics for {today}...\n")

    collectors = [
        collect_polymarket_metrics,
        collect_manifold_metrics,
    ]

    for collector in collectors:
        try:
            metrics = collector()
            store_metrics(metrics)
            print(f"  [{metrics['platform']}]")
            print(f"    Active markets: {metrics['active_markets']}")
            print(f"    Total volume: ${metrics['total_volume']:,.0f}")
            print(f"    Median volume: ${metrics['median_volume']:,.0f}")
            print(f"    API latency: {metrics['api_latency_ms']:.0f}ms")
            print()
        except Exception as e:
            print(f"  Error with collector: {e}")

    print("Done.")


if __name__ == "__main__":
    run_daily_collection()

Key Takeaways

  1. Platform quality is multidimensional: No single metric captures overall quality. A framework that considers breadth, liquidity, accuracy, resolution, accessibility, and trust provides a more complete picture.

  2. The "best" platform depends on context: Different use cases (trading, research, forecasting practice) weight quality dimensions differently.

  3. Calibration is the ultimate test: Regardless of other qualities, a platform that produces well-calibrated probabilities is the most valuable for decision-making.

  4. Data accessibility matters: Platforms with good APIs and open data enable a rich ecosystem of tools, research, and applications built on top of them.

  5. Quality changes over time: The prediction market landscape is evolving rapidly. Any quality assessment is a snapshot — continuous monitoring is essential.

  6. Combining platforms is often optimal: Using data from multiple platforms (and understanding each one's biases and strengths) typically produces better-informed views than relying on any single platform.


End of Case Study 2