Case Study 2: Scraping Historical Data for Backtesting

Overview

Backtesting a prediction market trading strategy requires historical price data --- ideally, a complete time series of prices from market creation to resolution for many markets. However, no single API provides this data comprehensively. In this case study, you will collect historical price data through a combination of API access and web scraping, clean and normalize it across platforms, store it in a unified format, and validate its completeness.

This is a realistic data engineering challenge: the data you need exists but is fragmented across multiple sources, each with different formats, coverage, and limitations.

Objectives

  1. Identify available sources of historical prediction market data
  2. Collect historical price time series from multiple platforms
  3. Clean and normalize the data into a unified backtesting format
  4. Store the data efficiently for time-series queries
  5. Validate completeness and identify gaps
  6. Build a backtesting-ready dataset

Part 1: Data Source Inventory

Available Historical Data Sources

Before writing any code, catalog what historical data is available from each platform:

Manifold Markets API: - /bets endpoint returns individual bet records with timestamps - Each bet record includes probBefore and probAfter --- the probability before and after the bet - This allows reconstruction of the full probability time series - Limitation: Large markets may have thousands of bets

Polymarket CLOB API: - /prices-history endpoint returns historical price data - Supports intervals: 1d, 1w, 1m, 3m, 1y, all - Returns {timestamp, price} pairs at configurable fidelity - Limitation: Limited to active and recently resolved markets

Metaculus API: - Question detail includes community prediction history - Prediction values at various time points - Limitation: Granularity varies by question activity

Web Archives (Wayback Machine): - Historical snapshots of prediction market pages - Useful for platforms that no longer exist or for data not in APIs - Limitation: Irregular snapshot frequency, requires HTML parsing

Task 1.1: Source Assessment Matrix

Create a matrix that rates each source on: - Coverage: What fraction of historical markets can be accessed? - Granularity: What is the finest time resolution available? - Reliability: How stable and consistent is the data? - Access difficulty: How hard is it to extract the data?

Part 2: Collecting Historical Data from APIs

Manifold Historical Reconstruction

Manifold's bet history allows us to reconstruct the full probability time series:

import json
import sqlite3
from datetime import datetime, timezone, timedelta
from typing import List, Tuple, Optional
import time

class ManifoldHistoryCollector:
    """
    Collect and reconstruct historical probability time series
    from Manifold Markets bet data.
    """

    def __init__(self, api_key: Optional[str] = None):
        self.client = ManifoldClient(api_key=api_key)

    def get_market_history(self, market_id: str) -> List[dict]:
        """
        Reconstruct the probability time series for a market
        from its bet history.

        Parameters
        ----------
        market_id : str
            Manifold market ID.

        Returns
        -------
        list
            Time-ordered list of {timestamp, probability} dicts.
        """
        all_bets = []
        before = None

        # Fetch all bets for this market
        while True:
            bets = self.client.get_bets(
                market_id=market_id,
                limit=1000,
                before=before
            )

            if not bets:
                break

            all_bets.extend(bets)
            before = bets[-1].get("id")

            if len(bets) < 1000:
                break

            time.sleep(0.2)

        if not all_bets:
            return []

        # Sort by timestamp
        all_bets.sort(key=lambda b: b.get("createdTime", 0))

        # Reconstruct probability time series
        history = []
        for bet in all_bets:
            if "probAfter" in bet and "createdTime" in bet:
                history.append({
                    "timestamp": datetime.fromtimestamp(
                        bet["createdTime"] / 1000, tz=timezone.utc
                    ).isoformat(),
                    "probability": bet["probAfter"],
                    "bet_amount": bet.get("amount", 0),
                    "outcome": bet.get("outcome", ""),
                    "source": "manifold_bets"
                })

        return history

    def collect_resolved_markets_history(
        self,
        max_markets: int = 100,
        min_bets: int = 10
    ) -> dict:
        """
        Collect historical data for resolved binary markets.

        Parameters
        ----------
        max_markets : int
            Maximum number of markets to collect.
        min_bets : int
            Minimum number of bets for a market to be included.

        Returns
        -------
        dict
            Mapping of market_id to {metadata, history}.
        """
        # Fetch resolved binary markets
        resolved = self.client.get_all_resolved_markets(
            max_markets=max_markets * 2  # Fetch extra to filter
        )

        dataset = {}
        collected = 0

        for market in resolved:
            if collected >= max_markets:
                break

            market_id = market.get("id")
            if not market_id:
                continue

            # Get history
            history = self.get_market_history(market_id)

            if len(history) < min_bets:
                continue

            dataset[market_id] = {
                "metadata": {
                    "question": market.get("question"),
                    "resolution": market.get("resolution"),
                    "created_at": datetime.fromtimestamp(
                        market["createdTime"] / 1000, tz=timezone.utc
                    ).isoformat() if market.get("createdTime") else None,
                    "resolved_at": datetime.fromtimestamp(
                        market["resolutionTime"] / 1000, tz=timezone.utc
                    ).isoformat() if market.get("resolutionTime") else None,
                    "total_volume": market.get("volume"),
                    "unique_bettors": market.get("uniqueBettorCount"),
                    "platform": "manifold"
                },
                "history": history
            }

            collected += 1
            if collected % 10 == 0:
                print(f"Collected {collected}/{max_markets} markets")

            time.sleep(0.3)

        return dataset

Polymarket Historical Data

class PolymarketHistoryCollector:
    """
    Collect historical price data from Polymarket.
    """

    def __init__(self):
        self.gamma_client = PolymarketGammaClient()
        self.clob_client = PolymarketCLOBClient()

    def get_market_history(
        self,
        condition_id: str,
        token_id: str,
        interval: str = "all"
    ) -> List[dict]:
        """
        Fetch historical prices for a Polymarket token.

        Parameters
        ----------
        condition_id : str
            Market condition ID (from Gamma API).
        token_id : str
            Token ID for the specific outcome.
        interval : str
            Time range ('1d', '1w', '1m', '3m', '1y', 'all').

        Returns
        -------
        list
            Time-ordered list of {timestamp, probability} dicts.
        """
        try:
            raw_history = self.clob_client.get_prices_history(
                token_id=token_id,
                interval=interval,
                fidelity=500  # Maximum data points
            )
        except Exception as e:
            print(f"Failed to fetch history for {token_id}: {e}")
            return []

        history = []
        for point in raw_history:
            try:
                ts = point.get("t", 0)
                price = float(point.get("p", 0))

                if isinstance(ts, (int, float)):
                    timestamp = datetime.fromtimestamp(
                        ts, tz=timezone.utc
                    ).isoformat()
                else:
                    timestamp = str(ts)

                history.append({
                    "timestamp": timestamp,
                    "probability": price,
                    "source": "polymarket_clob"
                })
            except (ValueError, TypeError):
                continue

        return sorted(history, key=lambda x: x["timestamp"])

    def collect_market_histories(
        self,
        max_markets: int = 50
    ) -> dict:
        """
        Collect historical data for Polymarket markets.
        """
        # Fetch markets (including closed ones)
        markets = self.gamma_client.get_markets(
            active=True, closed=True, limit=100
        )

        dataset = {}
        collected = 0

        for market in markets:
            if collected >= max_markets:
                break

            condition_id = market.get("conditionId") or market.get("condition_id")
            if not condition_id:
                continue

            # Get token IDs
            tokens = market.get("tokens", [])
            if not tokens:
                # Try extracting from clobTokenIds
                clob_token_ids = market.get("clobTokenIds")
                if clob_token_ids:
                    try:
                        if isinstance(clob_token_ids, str):
                            tokens = json.loads(clob_token_ids)
                        else:
                            tokens = clob_token_ids
                    except (json.JSONDecodeError, TypeError):
                        continue

            if not tokens:
                continue

            # Get the YES token (typically the first one)
            yes_token_id = tokens[0] if isinstance(tokens[0], str) \
                else tokens[0].get("token_id", "")

            if not yes_token_id:
                continue

            history = self.get_market_history(
                condition_id, yes_token_id, interval="all"
            )

            if len(history) < 5:
                continue

            end_date = market.get("endDate") or market.get("end_date_iso")
            question = market.get("question", "")

            dataset[condition_id] = {
                "metadata": {
                    "question": question,
                    "resolution": market.get("resolution"),
                    "created_at": market.get("startDate") or market.get("created_at"),
                    "resolved_at": end_date if market.get("resolved") else None,
                    "total_volume": market.get("volume"),
                    "platform": "polymarket"
                },
                "history": history
            }

            collected += 1
            time.sleep(0.5)

        return dataset

Task 2.1

Implement a similar history collector for Metaculus using their API. Extract the community prediction time series from the question detail endpoint.

Part 3: Web Scraping for Missing Data

Some historical data is only available through web scraping. For this case study, we will scrape historical snapshots from the Wayback Machine.

class WaybackHistoryScraper:
    """
    Scrape historical prediction market snapshots from the
    Internet Archive's Wayback Machine.

    This provides historical data for platforms that do not
    offer historical API endpoints.
    """

    WAYBACK_API = "https://web.archive.org/web"
    CDX_API = "https://web.archive.org/cdx/search/cdx"

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "PredictionMarketResearch/1.0 "
                          "(academic research project)"
        })

    def get_available_snapshots(
        self,
        url: str,
        from_date: str = None,
        to_date: str = None,
        limit: int = 100
    ) -> List[dict]:
        """
        Query the CDX API to find available snapshots of a URL.

        Parameters
        ----------
        url : str
            The URL to look up.
        from_date : str
            Start date (YYYYMMDD format).
        to_date : str
            End date (YYYYMMDD format).
        limit : int
            Maximum number of snapshots.

        Returns
        -------
        list
            List of snapshot records with timestamps and status codes.
        """
        params = {
            "url": url,
            "output": "json",
            "limit": limit,
            "fl": "timestamp,statuscode,digest,length"
        }
        if from_date:
            params["from"] = from_date
        if to_date:
            params["to"] = to_date

        response = self.session.get(self.CDX_API, params=params)
        response.raise_for_status()
        data = response.json()

        if len(data) < 2:
            return []

        headers = data[0]
        snapshots = []
        for row in data[1:]:
            record = dict(zip(headers, row))
            if record.get("statuscode") == "200":
                snapshots.append({
                    "timestamp": record["timestamp"],
                    "datetime": datetime.strptime(
                        record["timestamp"], "%Y%m%d%H%M%S"
                    ).replace(tzinfo=timezone.utc),
                    "url": (
                        f"https://web.archive.org/web/"
                        f"{record['timestamp']}/{url}"
                    )
                })

        return snapshots

    def fetch_snapshot(self, wayback_url: str) -> Optional[BeautifulSoup]:
        """
        Fetch and parse a Wayback Machine snapshot.

        Parameters
        ----------
        wayback_url : str
            Full Wayback Machine URL.

        Returns
        -------
        BeautifulSoup or None
            Parsed HTML, or None if fetch fails.
        """
        time.sleep(2)  # Be respectful of the Internet Archive

        try:
            response = self.session.get(wayback_url, timeout=30)
            if response.status_code == 200:
                return BeautifulSoup(response.text, "html.parser")
        except Exception as e:
            print(f"Failed to fetch {wayback_url}: {e}")

        return None

    def extract_prices_from_snapshot(
        self,
        soup: BeautifulSoup,
        platform: str
    ) -> dict:
        """
        Extract market prices from a parsed Wayback snapshot.

        Platform-specific extraction logic is needed because
        each platform structures its HTML differently.

        Parameters
        ----------
        soup : BeautifulSoup
            Parsed HTML snapshot.
        platform : str
            Platform name for platform-specific parsing.

        Returns
        -------
        dict
            Extracted market data.
        """
        # This is highly platform-specific and fragile
        # Each platform's HTML structure must be analyzed individually

        if platform == "predictit":
            return self._extract_predictit(soup)
        elif platform == "metaculus":
            return self._extract_metaculus_html(soup)
        else:
            return {}

    def _extract_predictit(self, soup: BeautifulSoup) -> dict:
        """
        Extract market data from a PredictIt page snapshot.

        PredictIt's HTML typically has market cards with
        class-based selectors for prices and market names.
        """
        markets = {}

        # PredictIt market containers (structure may vary by archive date)
        containers = soup.select(".market-card, .market-row, [class*='market']")

        for container in containers:
            name_elem = container.select_one(
                ".market-name, .market-title, h3, h4"
            )
            price_elem = container.select_one(
                ".market-price, .price, [class*='price']"
            )

            if name_elem and price_elem:
                name = name_elem.get_text(strip=True)
                price_text = price_elem.get_text(strip=True)

                import re
                price_match = re.search(r"(\d+)", price_text)
                if price_match:
                    price_cents = int(price_match.group(1))
                    markets[name] = {
                        "yes_price": price_cents / 100,
                        "price_text": price_text
                    }

        return markets

    def _extract_metaculus_html(self, soup: BeautifulSoup) -> dict:
        """Extract prediction data from a Metaculus page snapshot."""
        data = {}

        # Try to find embedded JSON data
        scripts = soup.select("script[type='application/json']")
        for script in scripts:
            try:
                json_data = json.loads(script.string)
                if "community_prediction" in str(json_data):
                    data["embedded_json"] = json_data
                    break
            except (json.JSONDecodeError, TypeError):
                continue

        # Fall back to HTML extraction
        prediction_elem = soup.select_one(
            ".community-prediction, [class*='prediction']"
        )
        if prediction_elem:
            text = prediction_elem.get_text(strip=True)
            import re
            pct_match = re.search(r"(\d+)%", text)
            if pct_match:
                data["community_prediction"] = int(pct_match.group(1)) / 100

        return data

    def collect_historical_snapshots(
        self,
        base_url: str,
        market_paths: List[str],
        platform: str,
        from_date: str = "20200101",
        to_date: str = None,
        max_snapshots_per_market: int = 20
    ) -> dict:
        """
        Collect historical data by scraping Wayback Machine snapshots.

        Parameters
        ----------
        base_url : str
            Platform's base URL.
        market_paths : list
            List of market URL paths to scrape.
        platform : str
            Platform name for extraction logic.
        from_date : str
            Start date (YYYYMMDD).
        to_date : str
            End date (YYYYMMDD), defaults to today.
        max_snapshots_per_market : int
            Maximum snapshots per market to fetch.

        Returns
        -------
        dict
            Mapping of market paths to time series data.
        """
        if to_date is None:
            to_date = datetime.now().strftime("%Y%m%d")

        dataset = {}

        for path in market_paths:
            full_url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
            print(f"Looking up snapshots for: {full_url}")

            snapshots = self.get_available_snapshots(
                full_url, from_date, to_date,
                limit=max_snapshots_per_market
            )

            if not snapshots:
                print(f"  No snapshots found")
                continue

            print(f"  Found {len(snapshots)} snapshots")

            history = []
            for snap in snapshots:
                soup = self.fetch_snapshot(snap["url"])
                if soup:
                    data = self.extract_prices_from_snapshot(soup, platform)
                    if data:
                        history.append({
                            "timestamp": snap["datetime"].isoformat(),
                            "data": data,
                            "source": "wayback_machine"
                        })

            dataset[path] = {
                "url": full_url,
                "snapshots_found": len(snapshots),
                "snapshots_parsed": len(history),
                "history": history
            }

            time.sleep(1)

        return dataset

Task 3.1

Use the Wayback scraper to collect historical data for at least 5 prediction market pages. Document the challenges you encounter (missing snapshots, changed HTML structure, JavaScript-rendered content).

Part 4: Data Normalization and Unification

Unified Backtesting Format

All historical data, regardless of source, must be normalized into a single format:

@dataclass
class BacktestingRecord:
    """
    A single point in a market's price history,
    normalized for backtesting.
    """
    market_id: str
    platform: str
    question: str
    timestamp: datetime  # Always UTC
    probability: float   # Always 0-1
    volume_at_time: Optional[float]  # Cumulative volume at this point
    source: str          # 'api', 'scrape', 'interpolated'
    resolution: Optional[str]  # Only set for the final record
    resolution_date: Optional[datetime]


class BacktestingDataNormalizer:
    """
    Normalize historical data from multiple sources into a
    unified backtesting format.
    """

    def normalize_manifold(self, dataset: dict) -> List[BacktestingRecord]:
        """Normalize Manifold historical data."""
        records = []

        for market_id, data in dataset.items():
            metadata = data["metadata"]

            for point in data["history"]:
                records.append(BacktestingRecord(
                    market_id=f"manifold_{market_id}",
                    platform="manifold",
                    question=metadata["question"],
                    timestamp=datetime.fromisoformat(
                        point["timestamp"].replace("Z", "+00:00")
                    ),
                    probability=point["probability"],
                    volume_at_time=None,  # Not available per-bet
                    source="api",
                    resolution=metadata.get("resolution"),
                    resolution_date=datetime.fromisoformat(
                        metadata["resolved_at"].replace("Z", "+00:00")
                    ) if metadata.get("resolved_at") else None
                ))

        return records

    def normalize_polymarket(self, dataset: dict) -> List[BacktestingRecord]:
        """Normalize Polymarket historical data."""
        records = []

        for condition_id, data in dataset.items():
            metadata = data["metadata"]

            for point in data["history"]:
                records.append(BacktestingRecord(
                    market_id=f"polymarket_{condition_id}",
                    platform="polymarket",
                    question=metadata["question"],
                    timestamp=datetime.fromisoformat(
                        point["timestamp"].replace("Z", "+00:00")
                    ),
                    probability=point["probability"],
                    volume_at_time=None,
                    source="api",
                    resolution=metadata.get("resolution"),
                    resolution_date=datetime.fromisoformat(
                        metadata["resolved_at"].replace("Z", "+00:00")
                    ) if metadata.get("resolved_at") else None
                ))

        return records

    def resample_to_uniform_frequency(
        self,
        records: List[BacktestingRecord],
        frequency_hours: int = 1
    ) -> List[BacktestingRecord]:
        """
        Resample irregular time series to a uniform frequency.

        Uses forward-fill interpolation to create a regular
        time grid from irregular observation timestamps.

        Parameters
        ----------
        records : list
            Sorted list of BacktestingRecords for a single market.
        frequency_hours : int
            Desired output frequency in hours.

        Returns
        -------
        list
            Resampled records at uniform frequency.
        """
        if not records:
            return []

        # Sort by timestamp
        records = sorted(records, key=lambda r: r.timestamp)

        # Generate uniform time grid
        start = records[0].timestamp.replace(
            minute=0, second=0, microsecond=0
        )
        end = records[-1].timestamp.replace(
            minute=0, second=0, microsecond=0
        )

        resampled = []
        current_time = start
        record_idx = 0
        last_prob = records[0].probability

        while current_time <= end:
            # Advance record index to find the latest observation
            # at or before current_time
            while (record_idx < len(records) - 1 and
                   records[record_idx + 1].timestamp <= current_time):
                record_idx += 1
                last_prob = records[record_idx].probability

            # Determine if this is an observed or interpolated point
            is_observed = (
                record_idx < len(records) and
                abs((records[record_idx].timestamp - current_time).total_seconds())
                < frequency_hours * 3600
            )

            resampled.append(BacktestingRecord(
                market_id=records[0].market_id,
                platform=records[0].platform,
                question=records[0].question,
                timestamp=current_time,
                probability=last_prob,
                volume_at_time=None,
                source="api" if is_observed else "interpolated",
                resolution=records[0].resolution,
                resolution_date=records[0].resolution_date
            ))

            current_time += timedelta(hours=frequency_hours)

        return resampled

Task 4.1

Combine data from all three sources (Manifold API, Polymarket API, Wayback scraping) into a unified dataset. Resample all time series to hourly frequency using forward-fill interpolation.

Part 5: Storage for Backtesting

class BacktestingDatabase:
    """
    SQLite database optimized for backtesting queries.

    Designed for efficient time-range queries on price history
    and cross-market analysis.
    """

    def __init__(self, db_path: str):
        self.db_path = db_path
        self._create_schema()

    def _create_schema(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.executescript("""
            CREATE TABLE IF NOT EXISTS bt_markets (
                market_id TEXT PRIMARY KEY,
                platform TEXT NOT NULL,
                question TEXT NOT NULL,
                resolution TEXT,
                resolution_date TEXT,
                first_observation TEXT,
                last_observation TEXT,
                num_observations INTEGER DEFAULT 0,
                data_source TEXT
            );

            CREATE TABLE IF NOT EXISTS bt_prices (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                market_id TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                probability REAL NOT NULL,
                source TEXT NOT NULL,
                FOREIGN KEY (market_id) REFERENCES bt_markets(market_id)
            );

            CREATE INDEX IF NOT EXISTS idx_bt_prices_market_time
                ON bt_prices(market_id, timestamp);

            CREATE INDEX IF NOT EXISTS idx_bt_prices_time
                ON bt_prices(timestamp);

            CREATE TABLE IF NOT EXISTS bt_data_quality (
                market_id TEXT NOT NULL,
                total_observations INTEGER,
                interpolated_pct REAL,
                max_gap_hours REAL,
                price_range_min REAL,
                price_range_max REAL,
                completeness_score REAL,
                FOREIGN KEY (market_id) REFERENCES bt_markets(market_id)
            );
        """)

        conn.commit()
        conn.close()

    def store_market(self, records: List[BacktestingRecord]):
        """Store a market's complete history."""
        if not records:
            return

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        first = records[0]
        last = records[-1]

        # Upsert market
        cursor.execute("""
            INSERT OR REPLACE INTO bt_markets (
                market_id, platform, question, resolution,
                resolution_date, first_observation, last_observation,
                num_observations, data_source
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            first.market_id, first.platform, first.question,
            first.resolution,
            first.resolution_date.isoformat() if first.resolution_date else None,
            first.timestamp.isoformat(),
            last.timestamp.isoformat(),
            len(records),
            ", ".join(set(r.source for r in records))
        ))

        # Insert price records
        cursor.executemany("""
            INSERT INTO bt_prices (market_id, timestamp, probability, source)
            VALUES (?, ?, ?, ?)
        """, [
            (r.market_id, r.timestamp.isoformat(), r.probability, r.source)
            for r in records
        ])

        # Compute and store quality metrics
        self._compute_quality_metrics(cursor, first.market_id, records)

        conn.commit()
        conn.close()

    def _compute_quality_metrics(self, cursor, market_id, records):
        """Compute data quality metrics for a market."""
        total = len(records)
        interpolated = sum(1 for r in records if r.source == "interpolated")
        interpolated_pct = interpolated / total if total > 0 else 0

        # Compute maximum gap
        max_gap = 0
        for i in range(1, len(records)):
            gap = (records[i].timestamp - records[i-1].timestamp).total_seconds() / 3600
            max_gap = max(max_gap, gap)

        prices = [r.probability for r in records]
        price_min = min(prices)
        price_max = max(prices)

        # Completeness: ratio of observed to total points
        observed = sum(1 for r in records if r.source != "interpolated")
        completeness = observed / total if total > 0 else 0

        cursor.execute("""
            INSERT OR REPLACE INTO bt_data_quality (
                market_id, total_observations, interpolated_pct,
                max_gap_hours, price_range_min, price_range_max,
                completeness_score
            ) VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            market_id, total, interpolated_pct,
            max_gap, price_min, price_max, completeness
        ))

    def get_price_series(self, market_id: str,
                         start: str = None,
                         end: str = None) -> list:
        """
        Query price history for backtesting.

        Returns (timestamp, probability) tuples.
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        query = "SELECT timestamp, probability FROM bt_prices WHERE market_id = ?"
        params = [market_id]

        if start:
            query += " AND timestamp >= ?"
            params.append(start)
        if end:
            query += " AND timestamp <= ?"
            params.append(end)

        query += " ORDER BY timestamp"

        cursor.execute(query, params)
        results = cursor.fetchall()
        conn.close()

        return results

    def get_quality_report(self) -> list:
        """Get data quality metrics for all markets."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            SELECT m.market_id, m.platform, m.question,
                   m.resolution, m.num_observations,
                   q.interpolated_pct, q.max_gap_hours,
                   q.completeness_score
            FROM bt_markets m
            LEFT JOIN bt_data_quality q ON m.market_id = q.market_id
            ORDER BY q.completeness_score ASC
        """)

        results = cursor.fetchall()
        conn.close()

        return [
            {
                "market_id": r[0], "platform": r[1],
                "question": r[2][:80], "resolution": r[3],
                "observations": r[4], "interpolated_pct": r[5],
                "max_gap_hours": r[6], "completeness": r[7]
            }
            for r in results
        ]

Task 5.1

Store all collected and normalized data in the backtesting database. Generate and analyze the quality report.

Part 6: Validation and Completeness Analysis

Task 6.1: Completeness Validation

For each market in your dataset, compute: 1. The total time span (creation to resolution) 2. The number of actual observations vs expected observations (at hourly frequency) 3. The percentage of interpolated points 4. The maximum gap between consecutive observations 5. A letter grade: A (>90% complete), B (>75%), C (>50%), D (>25%), F (<25%)

Task 6.2: Cross-Platform Validation

If any markets exist on multiple platforms: 1. Align their time series to the same timestamps 2. Compute the correlation between platforms 3. Measure the average absolute difference in probability 4. Identify periods where the platforms diverged significantly

Task 6.3: Fitness for Backtesting Assessment

Write a function that evaluates whether the dataset is suitable for backtesting a given strategy. Consider: - Does the strategy require intraday data? (Check hourly completeness) - Does the strategy trade across platforms? (Check cross-platform coverage) - Does the strategy use volume data? (Check volume data availability) - What is the minimum number of resolved markets needed?

Deliverables

  1. A complete Python script that collects, normalizes, stores, and validates historical data
  2. The SQLite database with all collected data
  3. A data quality report showing completeness metrics for each market
  4. A written assessment (500-1000 words) covering: - Which data sources provided the best historical coverage - The main challenges encountered in data collection - Gaps in the dataset and how they might affect backtesting results - Recommendations for improving historical data coverage

Evaluation Criteria

  • Collection Breadth (20%): Data collected from multiple platforms and sources
  • Normalization Quality (25%): Consistent format, correct timezone handling, proper interpolation
  • Storage Design (15%): Efficient schema for backtesting queries
  • Validation Rigor (25%): Comprehensive quality checks with meaningful metrics
  • Analysis (15%): Insightful assessment of data quality and its implications for backtesting