14 min read

> "Without data, you're just another person with an opinion."

Chapter 20: Data Collection and Web Scraping

"Without data, you're just another person with an opinion." -- W. Edwards Deming

Every profitable prediction market strategy begins long before a single trade is placed. It begins with data. The quality, breadth, and timeliness of the data you collect determines the upper bound of what your models can achieve. A perfectly designed forecasting algorithm fed with incomplete, stale, or corrupted data will underperform a simple heuristic backed by clean, comprehensive market information. This chapter is your comprehensive guide to gathering, storing, and maintaining the data infrastructure that makes serious prediction market analysis possible.

We will work through the entire data collection lifecycle: discovering what data is available across the prediction market ecosystem, accessing it through APIs and web scraping, transforming it into usable formats, storing it in well-designed databases, validating its quality, and doing all of this responsibly. By the end of this chapter, you will have the skills to build production-grade data pipelines that continuously feed your analytical and trading systems.


20.1 The Data Landscape for Prediction Markets

20.1.1 Types of Available Data

The prediction market ecosystem generates several distinct categories of data, each valuable for different analytical purposes.

Price and Probability Data. The most fundamental data type is the current and historical price of contracts. On binary markets, prices directly represent implied probabilities (a contract trading at $0.65 implies a 65% probability of the event occurring). For multi-outcome markets, the vector of prices across all outcomes characterizes the full probability distribution. Price data is available at various granularities --- from tick-by-tick trade prices to hourly or daily snapshots.

Volume and Liquidity Data. Trading volume tells you how much conviction backs a given price. A market trading at $0.70 with $5 million in volume carries very different information than one at $0.70 with $500. Volume data helps identify which markets are liquid enough for meaningful analysis and which price movements are significant versus noise.

Order Book Data. The order book reveals the depth of supply and demand at different price levels. It shows not just where the market is, but where participants are willing to trade. Order book snapshots capture bid and ask prices, sizes, and the spread. This data is essential for understanding market microstructure, estimating transaction costs, and building execution algorithms.

Resolution Data. How markets resolve --- the actual outcome --- is the ground truth against which all forecasts are measured. Resolution data, combined with historical prices, allows you to compute calibration curves, Brier scores, and other accuracy metrics. This is the data that lets you ask: "Was the market right?"

Market Metadata. Beyond prices, each market carries structured metadata: the question text, resolution criteria, category, creation date, close date, resolution date, and the rules governing how the contract settles. This metadata is crucial for filtering relevant markets, grouping them for analysis, and ensuring you understand what each contract actually represents.

User and Community Data. Some platforms expose data about forecasters: their track records, prediction histories, and reputation scores. Metaculus, for instance, publishes community prediction distributions. Manifold Markets provides individual trader histories. This data enables analyses of forecaster skill and the wisdom-of-crowds effect.

20.1.2 Primary Data Sources

Platform APIs. Most major prediction market platforms provide programmatic access to their data through REST APIs. These are the preferred data source --- they are structured, documented, and designed for machine consumption. Polymarket, Kalshi, Metaculus, and Manifold Markets all offer APIs of varying completeness.

Web Scraping. When APIs are incomplete or unavailable, web scraping fills the gap. Some platforms do not expose all data through their API, and scraping the web interface can capture what the API misses. Scraping requires more engineering effort and is more fragile, but it remains an essential skill.

Third-Party Datasets. Several organizations aggregate and redistribute prediction market data. These include academic datasets (such as those from the Iowa Electronic Markets), data aggregators (like the historical PredictIt data on various research repositories), and commercial data providers.

Academic Datasets. Researchers have published datasets from prediction markets in various domains. The Good Judgment Project data, the Iowa Electronic Markets dataset, and various contest datasets from forecasting tournaments are available for academic use.

Blockchain Data. For decentralized prediction markets like Polymarket (built on Polygon), on-chain data provides a complete transaction history. Tools like Dune Analytics, The Graph, and direct RPC calls can access this data, offering a level of transparency unavailable on centralized platforms.

20.1.3 Data Freshness Requirements

Different analytical tasks require different data freshness:

Use Case Required Freshness Typical Source
Real-time trading Milliseconds to seconds WebSocket feeds, order book APIs
Intraday analysis Minutes to hours REST API polling
Daily modeling End of day Scheduled API calls
Research / backtesting Historical (days to years) Bulk downloads, database queries
Calibration studies Post-resolution Resolution data feeds

Understanding your freshness requirements shapes every subsequent decision about architecture, storage, and pipeline design.


20.2 Working with REST APIs

20.2.1 HTTP Fundamentals for Data Collection

REST (Representational State Transfer) APIs communicate over HTTP, the same protocol your browser uses. Understanding HTTP is essential for working with any API.

HTTP Methods. The four primary methods relevant to data collection are:

  • GET: Retrieve data. This is what you will use 95% of the time for data collection. GET /markets fetches a list of markets.
  • POST: Submit data. Used for authentication (sending credentials) and sometimes for complex queries. POST /auth/token might obtain an access token.
  • PUT/PATCH: Update data. Rarely used in data collection, but relevant if you are submitting forecasts or managing orders.
  • DELETE: Remove data. Almost never used in data collection contexts.

HTTP Status Codes. The server communicates the result of your request through status codes:

200 OK              - Success. Data is in the response body.
201 Created         - Resource created successfully.
301 Moved           - Resource has moved permanently. Follow the redirect.
400 Bad Request     - Your request is malformed. Check parameters.
401 Unauthorized    - Authentication required or credentials invalid.
403 Forbidden       - You don't have permission to access this resource.
404 Not Found       - The resource doesn't exist. Check the URL.
429 Too Many Reqs   - You've hit the rate limit. Back off and retry.
500 Internal Error  - Server-side problem. Retry after a delay.
503 Service Unavail - Server is temporarily unavailable. Retry later.

JSON Responses. Nearly all modern APIs return data in JSON (JavaScript Object Notation) format. JSON maps naturally to Python dictionaries and lists:

# A typical API response for a prediction market
{
    "id": "market_12345",
    "question": "Will the Fed raise rates in March 2026?",
    "category": "Economics",
    "outcomes": [
        {"name": "Yes", "price": 0.72, "volume": 1500000},
        {"name": "No", "price": 0.28, "volume": 1500000}
    ],
    "created_at": "2025-12-01T00:00:00Z",
    "close_date": "2026-03-15T18:00:00Z",
    "resolved": false
}

20.2.2 Making API Requests in Python

The requests library is the standard tool for HTTP requests in Python. For more advanced use cases, httpx provides async support and HTTP/2.

import requests
import time

# Basic GET request
response = requests.get("https://api.example.com/markets")

# Check status
if response.status_code == 200:
    data = response.json()  # Parse JSON response
    print(f"Retrieved {len(data)} markets")
else:
    print(f"Error: {response.status_code} - {response.text}")

# GET with query parameters
params = {
    "category": "politics",
    "status": "active",
    "limit": 100,
    "offset": 0
}
response = requests.get("https://api.example.com/markets", params=params)

# GET with authentication headers
headers = {
    "Authorization": "Bearer YOUR_API_KEY",
    "Accept": "application/json"
}
response = requests.get("https://api.example.com/markets", headers=headers)

20.2.3 Handling Pagination

APIs typically limit the number of results per request. To retrieve all data, you must handle pagination:

def fetch_all_pages(base_url, params=None, headers=None, page_size=100):
    """
    Fetch all pages from a paginated API endpoint.

    Handles both offset-based and cursor-based pagination.
    """
    if params is None:
        params = {}

    all_results = []
    params["limit"] = page_size
    offset = 0

    while True:
        params["offset"] = offset
        response = requests.get(base_url, params=params, headers=headers)
        response.raise_for_status()

        data = response.json()

        # Handle different response structures
        if isinstance(data, list):
            results = data
        elif "results" in data:
            results = data["results"]
        elif "data" in data:
            results = data["data"]
        else:
            results = data

        if not results:
            break

        all_results.extend(results)
        offset += page_size

        # Respect rate limits
        time.sleep(0.5)

        # Safety check to avoid infinite loops
        if len(results) < page_size:
            break

    return all_results

20.2.4 Rate Limiting and Retry Logic

Every API imposes rate limits --- restrictions on how many requests you can make in a given time window. Violating rate limits results in 429 Too Many Requests responses and may get your API key suspended.

import time
from functools import wraps

class RateLimiter:
    """
    Token bucket rate limiter for API requests.

    Parameters
    ----------
    calls_per_second : float
        Maximum number of calls per second.
    """

    def __init__(self, calls_per_second: float):
        self.min_interval = 1.0 / calls_per_second
        self.last_call_time = 0.0

    def wait(self):
        """Block until it's safe to make another request."""
        elapsed = time.time() - self.last_call_time
        wait_time = self.min_interval - elapsed
        if wait_time > 0:
            time.sleep(wait_time)
        self.last_call_time = time.time()


def retry_with_backoff(max_retries=5, base_delay=1.0, max_delay=60.0):
    """
    Decorator that retries failed requests with exponential backoff.

    Uses exponential backoff with jitter to avoid thundering herd
    problems when rate limits are hit.
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            import random

            for attempt in range(max_retries):
                try:
                    response = func(*args, **kwargs)

                    if response.status_code == 429:
                        # Rate limited - read Retry-After header if present
                        retry_after = response.headers.get("Retry-After")
                        if retry_after:
                            delay = float(retry_after)
                        else:
                            delay = min(base_delay * (2 ** attempt), max_delay)
                            delay += random.uniform(0, delay * 0.1)  # Jitter

                        print(f"Rate limited. Waiting {delay:.1f}s "
                              f"(attempt {attempt + 1}/{max_retries})")
                        time.sleep(delay)
                        continue

                    if response.status_code >= 500:
                        # Server error - retry
                        delay = min(base_delay * (2 ** attempt), max_delay)
                        print(f"Server error {response.status_code}. "
                              f"Retrying in {delay:.1f}s")
                        time.sleep(delay)
                        continue

                    return response

                except requests.exceptions.ConnectionError:
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    print(f"Connection error. Retrying in {delay:.1f}s")
                    time.sleep(delay)

            raise Exception(f"Max retries ({max_retries}) exceeded")

        return wrapper
    return decorator

20.2.5 Session Management and Connection Pooling

For making many requests to the same API, use a session object to reuse TCP connections:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_robust_session(
    max_retries=3,
    backoff_factor=0.5,
    status_forcelist=(500, 502, 503, 504)
):
    """
    Create a requests Session with automatic retry and connection pooling.
    """
    session = requests.Session()

    retry_strategy = Retry(
        total=max_retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist
    )

    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=10,
        pool_maxsize=20
    )

    session.mount("https://", adapter)
    session.mount("http://", adapter)

    return session

# Usage
session = create_robust_session()
session.headers.update({
    "Authorization": "Bearer YOUR_API_KEY",
    "User-Agent": "PredictionMarketResearch/1.0"
})

# All requests reuse the same connection pool
markets = session.get("https://api.example.com/markets").json()
prices = session.get("https://api.example.com/prices").json()

20.3 Polymarket API Deep Dive

20.3.1 Platform Overview

Polymarket is a decentralized prediction market built on the Polygon blockchain. It uses a central limit order book (CLOB) for price discovery, with settlement handled through smart contracts. The platform provides two main APIs: the Gamma API for market metadata and the CLOB API for trading and order book data.

20.3.2 Gamma API: Market Metadata and Prices

The Gamma API provides information about markets, events, and current prices. It does not require authentication for read-only access.

Key Endpoints:

Base URL: https://gamma-api.polymarket.com

GET /markets          - List all markets with filtering
GET /markets/{id}     - Get a specific market by condition ID
GET /events           - List events (groups of related markets)
GET /events/{id}      - Get a specific event

Fetching Active Markets:

import requests
from datetime import datetime
from typing import Optional

class PolymarketGammaClient:
    """
    Client for the Polymarket Gamma API.

    Provides access to market metadata, current prices,
    and event information.
    """

    BASE_URL = "https://gamma-api.polymarket.com"

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            "Accept": "application/json",
            "User-Agent": "PredictionMarketResearch/1.0"
        })

    def get_markets(
        self,
        active: bool = True,
        closed: bool = False,
        limit: int = 100,
        offset: int = 0,
        order: str = "volume24hr",
        ascending: bool = False
    ) -> list:
        """
        Fetch markets with optional filtering and sorting.

        Parameters
        ----------
        active : bool
            Whether to include active (open) markets.
        closed : bool
            Whether to include closed markets.
        limit : int
            Number of results per page (max 100).
        offset : int
            Pagination offset.
        order : str
            Sort field (e.g., 'volume24hr', 'liquidity', 'startDate').
        ascending : bool
            Sort direction.

        Returns
        -------
        list
            List of market dictionaries.
        """
        params = {
            "active": str(active).lower(),
            "closed": str(closed).lower(),
            "limit": limit,
            "offset": offset,
            "order": order,
            "ascending": str(ascending).lower()
        }

        response = self.session.get(f"{self.BASE_URL}/markets", params=params)
        response.raise_for_status()
        return response.json()

    def get_all_active_markets(self) -> list:
        """
        Fetch all active markets, handling pagination automatically.
        """
        all_markets = []
        offset = 0
        limit = 100

        while True:
            markets = self.get_markets(
                active=True,
                closed=False,
                limit=limit,
                offset=offset
            )

            if not markets:
                break

            all_markets.extend(markets)
            offset += limit

            if len(markets) < limit:
                break

            time.sleep(0.25)  # Rate limiting courtesy

        return all_markets

    def get_market(self, condition_id: str) -> dict:
        """Fetch a specific market by its condition ID."""
        response = self.session.get(f"{self.BASE_URL}/markets/{condition_id}")
        response.raise_for_status()
        return response.json()

    def get_events(
        self,
        limit: int = 100,
        offset: int = 0,
        slug: Optional[str] = None
    ) -> list:
        """
        Fetch events (groups of related markets).

        Parameters
        ----------
        limit : int
            Number of results per page.
        offset : int
            Pagination offset.
        slug : str, optional
            Filter by event slug (URL-friendly name).
        """
        params = {"limit": limit, "offset": offset}
        if slug:
            params["slug"] = slug

        response = self.session.get(f"{self.BASE_URL}/events", params=params)
        response.raise_for_status()
        return response.json()

    def get_event(self, event_id: str) -> dict:
        """Fetch a specific event with all its markets."""
        response = self.session.get(f"{self.BASE_URL}/events/{event_id}")
        response.raise_for_status()
        return response.json()

    def search_markets(self, query: str, limit: int = 20) -> list:
        """
        Search for markets by keyword.

        Parameters
        ----------
        query : str
            Search term.
        limit : int
            Maximum number of results.
        """
        markets = self.get_markets(limit=limit)
        query_lower = query.lower()
        return [
            m for m in markets
            if query_lower in m.get("question", "").lower()
            or query_lower in m.get("description", "").lower()
        ]

20.3.3 CLOB API: Order Book and Trading Data

The CLOB (Central Limit Order Book) API provides access to order book data and trading functionality.

import hmac
import hashlib
import time

class PolymarketCLOBClient:
    """
    Client for the Polymarket CLOB API.

    Provides access to order book data, trade history,
    and price time series.
    """

    BASE_URL = "https://clob.polymarket.com"

    def __init__(self, api_key: Optional[str] = None,
                 api_secret: Optional[str] = None):
        self.api_key = api_key
        self.api_secret = api_secret
        self.session = requests.Session()
        self.session.headers.update({
            "Accept": "application/json",
            "User-Agent": "PredictionMarketResearch/1.0"
        })

    def _sign_request(self, timestamp: str) -> str:
        """Generate HMAC signature for authenticated requests."""
        if not self.api_secret:
            raise ValueError("API secret required for authenticated endpoints")

        message = f"GET{timestamp}"
        signature = hmac.new(
            self.api_secret.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        return signature

    def get_order_book(self, token_id: str) -> dict:
        """
        Fetch the current order book for a market token.

        Parameters
        ----------
        token_id : str
            The token ID for the specific outcome.

        Returns
        -------
        dict
            Order book with 'bids' and 'asks' arrays.
        """
        params = {"token_id": token_id}
        response = self.session.get(
            f"{self.BASE_URL}/book", params=params
        )
        response.raise_for_status()
        return response.json()

    def get_midpoint(self, token_id: str) -> float:
        """Get the midpoint price for a token."""
        params = {"token_id": token_id}
        response = self.session.get(
            f"{self.BASE_URL}/midpoint", params=params
        )
        response.raise_for_status()
        data = response.json()
        return float(data.get("mid", 0))

    def get_price(self, token_id: str, side: str = "buy") -> float:
        """
        Get the current best price for a token.

        Parameters
        ----------
        token_id : str
            The token ID.
        side : str
            'buy' for best ask, 'sell' for best bid.
        """
        params = {"token_id": token_id, "side": side}
        response = self.session.get(
            f"{self.BASE_URL}/price", params=params
        )
        response.raise_for_status()
        data = response.json()
        return float(data.get("price", 0))

    def get_prices_history(
        self,
        token_id: str,
        interval: str = "1d",
        fidelity: int = 60
    ) -> list:
        """
        Fetch historical price data for a token.

        Parameters
        ----------
        token_id : str
            The token ID.
        interval : str
            Time range ('1d', '1w', '1m', '3m', '1y', 'all').
        fidelity : int
            Number of data points to return.

        Returns
        -------
        list
            List of {timestamp, price} dictionaries.
        """
        params = {
            "token_id": token_id,
            "interval": interval,
            "fidelity": fidelity
        }
        response = self.session.get(
            f"{self.BASE_URL}/prices-history", params=params
        )
        response.raise_for_status()
        return response.json().get("history", [])

    def get_trades(
        self,
        token_id: Optional[str] = None,
        limit: int = 100,
        cursor: Optional[str] = None
    ) -> dict:
        """
        Fetch recent trades, optionally filtered by token.

        Returns
        -------
        dict
            Contains 'data' (list of trades) and 'next_cursor'.
        """
        params = {"limit": limit}
        if token_id:
            params["token_id"] = token_id
        if cursor:
            params["cursor"] = cursor

        response = self.session.get(
            f"{self.BASE_URL}/trades", params=params
        )
        response.raise_for_status()
        return response.json()

20.3.4 On-Chain Data Access

For the most complete Polymarket data, you can query the Polygon blockchain directly:

from web3 import Web3

class PolymarketOnChainClient:
    """
    Access Polymarket data directly from the Polygon blockchain.

    Provides complete transaction history and settlement data
    that may not be available through REST APIs.
    """

    # Polymarket CTF Exchange contract on Polygon
    CTF_EXCHANGE = "0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E"

    def __init__(self, rpc_url: str = "https://polygon-rpc.com"):
        self.w3 = Web3(Web3.HTTPProvider(rpc_url))

    def get_block_timestamp(self, block_number: int) -> datetime:
        """Get the timestamp of a specific block."""
        block = self.w3.eth.get_block(block_number)
        return datetime.utcfromtimestamp(block["timestamp"])

    def get_recent_transactions(self, count: int = 10) -> list:
        """
        Fetch recent transactions to the CTF Exchange contract.

        Note: For production use, consider using an indexing service
        like The Graph or Dune Analytics for better performance.
        """
        latest_block = self.w3.eth.block_number
        transactions = []
        block = latest_block

        while len(transactions) < count and block > latest_block - 1000:
            block_data = self.w3.eth.get_block(block, full_transactions=True)
            for tx in block_data.transactions:
                if tx.get("to", "").lower() == self.CTF_EXCHANGE.lower():
                    transactions.append({
                        "hash": tx["hash"].hex(),
                        "from": tx["from"],
                        "value": self.w3.from_wei(tx["value"], "ether"),
                        "block": block,
                        "timestamp": datetime.utcfromtimestamp(
                            block_data["timestamp"]
                        )
                    })
            block -= 1

        return transactions[:count]

20.4 Kalshi API Deep Dive

20.4.1 Platform Overview

Kalshi is a CFTC-regulated prediction market exchange based in the United States. Unlike Polymarket, Kalshi operates as a traditional financial exchange with proper regulatory oversight. Its API provides comprehensive access to market data, though trading requires full account verification.

20.4.2 Authentication

Kalshi uses API key authentication with HMAC signatures for secure access:

import hashlib
import hmac
import time
import base64

class KalshiAuth:
    """
    Authentication handler for the Kalshi API.

    Generates signed headers required for authenticated requests.
    """

    def __init__(self, api_key: str, private_key_path: str):
        self.api_key = api_key
        with open(private_key_path, "r") as f:
            self.private_key = f.read()

    def get_auth_headers(self, method: str, path: str) -> dict:
        """
        Generate authentication headers for a request.

        Parameters
        ----------
        method : str
            HTTP method (GET, POST, etc.).
        path : str
            API endpoint path.

        Returns
        -------
        dict
            Headers including timestamp and signature.
        """
        timestamp = str(int(time.time() * 1000))
        message = f"{timestamp}{method}{path}"

        # In production, use proper RSA signing with the private key
        # This is a simplified example
        signature = base64.b64encode(
            hmac.new(
                self.private_key.encode(),
                message.encode(),
                hashlib.sha256
            ).digest()
        ).decode()

        return {
            "KALSHI-ACCESS-KEY": self.api_key,
            "KALSHI-ACCESS-SIGNATURE": signature,
            "KALSHI-ACCESS-TIMESTAMP": timestamp,
            "Content-Type": "application/json"
        }

20.4.3 Market Data Client

class KalshiClient:
    """
    Client for the Kalshi API.

    Provides access to events, markets, historical data,
    and portfolio information.
    """

    BASE_URL = "https://api.elections.kalshi.com/trade-api/v2"

    def __init__(self, api_key: Optional[str] = None,
                 private_key_path: Optional[str] = None):
        self.session = requests.Session()
        self.auth = None

        if api_key and private_key_path:
            self.auth = KalshiAuth(api_key, private_key_path)

        self.session.headers.update({
            "Accept": "application/json",
            "User-Agent": "PredictionMarketResearch/1.0"
        })
        self.rate_limiter = RateLimiter(calls_per_second=2)

    def _request(self, method: str, path: str,
                 params: Optional[dict] = None) -> dict:
        """Make an authenticated request to the Kalshi API."""
        self.rate_limiter.wait()

        url = f"{self.BASE_URL}{path}"
        headers = {}

        if self.auth:
            headers = self.auth.get_auth_headers(method, path)

        response = self.session.request(
            method, url, params=params, headers=headers
        )
        response.raise_for_status()
        return response.json()

    def get_events(
        self,
        status: Optional[str] = None,
        series_ticker: Optional[str] = None,
        limit: int = 100,
        cursor: Optional[str] = None
    ) -> dict:
        """
        Fetch events from Kalshi.

        Parameters
        ----------
        status : str, optional
            Filter by status ('open', 'closed', 'settled').
        series_ticker : str, optional
            Filter by event series.
        limit : int
            Maximum results per page.
        cursor : str, optional
            Pagination cursor.

        Returns
        -------
        dict
            Contains 'events' list and 'cursor' for pagination.
        """
        params = {"limit": limit}
        if status:
            params["status"] = status
        if series_ticker:
            params["series_ticker"] = series_ticker
        if cursor:
            params["cursor"] = cursor

        return self._request("GET", "/events", params)

    def get_event(self, event_ticker: str) -> dict:
        """Fetch a specific event by ticker."""
        return self._request("GET", f"/events/{event_ticker}")

    def get_markets(
        self,
        event_ticker: Optional[str] = None,
        status: Optional[str] = None,
        limit: int = 100,
        cursor: Optional[str] = None
    ) -> dict:
        """
        Fetch markets, optionally filtered by event.

        Returns
        -------
        dict
            Contains 'markets' list and 'cursor'.
        """
        params = {"limit": limit}
        if event_ticker:
            params["event_ticker"] = event_ticker
        if status:
            params["status"] = status
        if cursor:
            params["cursor"] = cursor

        return self._request("GET", "/markets", params)

    def get_market(self, ticker: str) -> dict:
        """Fetch a specific market by ticker."""
        return self._request("GET", f"/markets/{ticker}")

    def get_market_history(
        self,
        ticker: str,
        limit: int = 1000,
        min_ts: Optional[int] = None,
        max_ts: Optional[int] = None
    ) -> dict:
        """
        Fetch historical candlestick data for a market.

        Parameters
        ----------
        ticker : str
            Market ticker symbol.
        limit : int
            Number of data points.
        min_ts : int, optional
            Minimum timestamp (Unix epoch seconds).
        max_ts : int, optional
            Maximum timestamp (Unix epoch seconds).
        """
        params = {"limit": limit}
        if min_ts:
            params["min_ts"] = min_ts
        if max_ts:
            params["max_ts"] = max_ts

        return self._request("GET", f"/markets/{ticker}/history", params)

    def get_market_orderbook(self, ticker: str, depth: int = 10) -> dict:
        """
        Fetch the order book for a market.

        Parameters
        ----------
        ticker : str
            Market ticker symbol.
        depth : int
            Number of price levels to include.
        """
        params = {"depth": depth}
        return self._request("GET", f"/markets/{ticker}/orderbook", params)

    def get_series(self, series_ticker: str) -> dict:
        """Fetch an event series (e.g., all Fed rate decision markets)."""
        return self._request("GET", f"/series/{series_ticker}")

    def get_trades(
        self,
        ticker: Optional[str] = None,
        limit: int = 100,
        cursor: Optional[str] = None,
        min_ts: Optional[int] = None,
        max_ts: Optional[int] = None
    ) -> dict:
        """
        Fetch trade history.

        Parameters
        ----------
        ticker : str, optional
            Filter by market ticker.
        limit : int
            Maximum results per page.
        cursor : str, optional
            Pagination cursor.
        min_ts : int, optional
            Minimum timestamp.
        max_ts : int, optional
            Maximum timestamp.
        """
        params = {"limit": limit}
        if ticker:
            params["ticker"] = ticker
        if cursor:
            params["cursor"] = cursor
        if min_ts:
            params["min_ts"] = min_ts
        if max_ts:
            params["max_ts"] = max_ts

        return self._request("GET", "/trades", params)

    def get_all_events(self, status: str = "open") -> list:
        """
        Fetch all events with automatic pagination.
        """
        all_events = []
        cursor = None

        while True:
            result = self.get_events(
                status=status, limit=100, cursor=cursor
            )
            events = result.get("events", [])
            all_events.extend(events)

            cursor = result.get("cursor")
            if not cursor or not events:
                break

        return all_events

20.4.4 Working with Kalshi Event Series

Kalshi organizes markets into event series --- recurring events like monthly jobs reports, Fed meetings, or weekly weather forecasts. This structure is valuable for building models that learn from past instances of the same event type.

def analyze_series_history(client: KalshiClient,
                           series_ticker: str) -> dict:
    """
    Analyze the historical accuracy of a Kalshi event series.

    Computes calibration metrics across all resolved markets
    in the series.

    Parameters
    ----------
    client : KalshiClient
        Authenticated Kalshi API client.
    series_ticker : str
        The series ticker to analyze.

    Returns
    -------
    dict
        Analysis results including calibration data.
    """
    # Fetch all events in the series
    result = client.get_events(series_ticker=series_ticker)
    events = result.get("events", [])

    calibration_data = []

    for event in events:
        markets_result = client.get_markets(
            event_ticker=event["event_ticker"],
            status="settled"
        )

        for market in markets_result.get("markets", []):
            if market.get("result") is not None:
                # Get the last traded price before close
                history = client.get_market_history(
                    market["ticker"], limit=1
                )

                if history.get("history"):
                    last_price = history["history"][-1].get("yes_price", 0)
                    actual_outcome = 1 if market["result"] == "yes" else 0

                    calibration_data.append({
                        "ticker": market["ticker"],
                        "predicted_prob": last_price / 100,
                        "actual_outcome": actual_outcome,
                        "close_date": market.get("close_time")
                    })

            time.sleep(0.5)  # Rate limiting

    return {
        "series_ticker": series_ticker,
        "total_markets": len(calibration_data),
        "calibration_data": calibration_data
    }

20.5 Metaculus and Manifold APIs

20.5.1 Metaculus API

Metaculus is a community forecasting platform focused on calibrated probability estimation. Unlike betting markets, Metaculus does not involve real money --- forecasters are motivated by track record and reputation. The platform exposes rich data about community predictions.

class MetaculusClient:
    """
    Client for the Metaculus API.

    Provides access to questions, community forecasts,
    and forecaster track records.
    """

    BASE_URL = "https://www.metaculus.com/api2"

    def __init__(self, api_key: Optional[str] = None):
        self.session = requests.Session()
        self.session.headers.update({
            "Accept": "application/json",
            "User-Agent": "PredictionMarketResearch/1.0"
        })
        if api_key:
            self.session.headers["Authorization"] = f"Token {api_key}"

        self.rate_limiter = RateLimiter(calls_per_second=1)

    def get_questions(
        self,
        search: Optional[str] = None,
        status: Optional[str] = None,
        question_type: Optional[str] = None,
        limit: int = 20,
        offset: int = 0,
        order_by: str = "-activity"
    ) -> dict:
        """
        Fetch questions from Metaculus.

        Parameters
        ----------
        search : str, optional
            Search query.
        status : str, optional
            Question status ('open', 'closed', 'resolved').
        question_type : str, optional
            Type filter ('binary', 'numeric', 'multiple_choice').
        limit : int
            Results per page.
        offset : int
            Pagination offset.
        order_by : str
            Sort field ('-activity', '-publish_time', '-resolve_time').

        Returns
        -------
        dict
            Contains 'results' list and pagination info.
        """
        self.rate_limiter.wait()

        params = {
            "limit": limit,
            "offset": offset,
            "order_by": order_by
        }
        if search:
            params["search"] = search
        if status:
            params["status"] = status
        if question_type:
            params["type"] = question_type

        response = self.session.get(
            f"{self.BASE_URL}/questions/", params=params
        )
        response.raise_for_status()
        return response.json()

    def get_question(self, question_id: int) -> dict:
        """
        Fetch a specific question with full details.

        Includes community prediction, metaculus prediction,
        and resolution details.
        """
        self.rate_limiter.wait()

        response = self.session.get(
            f"{self.BASE_URL}/questions/{question_id}/"
        )
        response.raise_for_status()
        return response.json()

    def get_question_predictions(self, question_id: int) -> dict:
        """
        Fetch the prediction history for a question.

        Returns the time series of community and Metaculus
        predictions over the question's lifetime.
        """
        self.rate_limiter.wait()

        response = self.session.get(
            f"{self.BASE_URL}/questions/{question_id}/predictions/"
        )
        response.raise_for_status()
        return response.json()

    def get_all_questions(
        self,
        status: str = "resolved",
        max_questions: int = 1000
    ) -> list:
        """
        Fetch all questions matching the filter with pagination.
        """
        all_questions = []
        offset = 0
        limit = 100

        while len(all_questions) < max_questions:
            result = self.get_questions(
                status=status, limit=limit, offset=offset
            )
            questions = result.get("results", [])

            if not questions:
                break

            all_questions.extend(questions)
            offset += limit

            if not result.get("next"):
                break

        return all_questions[:max_questions]

    def extract_community_prediction(self, question: dict) -> Optional[float]:
        """
        Extract the current community prediction from a question.

        Parameters
        ----------
        question : dict
            Full question data from the API.

        Returns
        -------
        float or None
            Community prediction probability (0-1) for binary questions,
            or None if not available.
        """
        prediction = question.get("community_prediction")
        if prediction and isinstance(prediction, dict):
            return prediction.get("full", {}).get("q2")
        elif isinstance(prediction, (int, float)):
            return float(prediction)
        return None

20.5.2 Manifold Markets API

Manifold Markets is a play-money prediction market that has become popular in the effective altruism and rationalist communities. Its API is well-documented and permissive, making it an excellent platform for data collection practice.

class ManifoldClient:
    """
    Client for the Manifold Markets API.

    Provides access to markets, bets, comments,
    and user data on this play-money prediction platform.
    """

    BASE_URL = "https://api.manifold.markets/v0"

    def __init__(self, api_key: Optional[str] = None):
        self.session = requests.Session()
        self.session.headers.update({
            "Accept": "application/json",
            "User-Agent": "PredictionMarketResearch/1.0"
        })
        if api_key:
            self.session.headers["Authorization"] = f"Key {api_key}"

        self.rate_limiter = RateLimiter(calls_per_second=5)

    def get_markets(
        self,
        limit: int = 500,
        sort: str = "liquidity",
        order: str = "desc",
        before: Optional[str] = None
    ) -> list:
        """
        Fetch markets sorted by the specified criteria.

        Parameters
        ----------
        limit : int
            Number of markets to return (max 1000).
        sort : str
            Sort field ('created-time', 'updated-time',
            'last-bet-time', 'liquidity').
        order : str
            Sort direction ('asc' or 'desc').
        before : str, optional
            Cursor for pagination (market ID).
        """
        self.rate_limiter.wait()

        params = {
            "limit": limit,
            "sort": sort,
            "order": order
        }
        if before:
            params["before"] = before

        response = self.session.get(
            f"{self.BASE_URL}/markets", params=params
        )
        response.raise_for_status()
        return response.json()

    def get_market(self, market_id: str) -> dict:
        """Fetch a specific market by ID."""
        self.rate_limiter.wait()

        response = self.session.get(
            f"{self.BASE_URL}/market/{market_id}"
        )
        response.raise_for_status()
        return response.json()

    def get_market_by_slug(self, slug: str) -> dict:
        """Fetch a market by its URL slug."""
        self.rate_limiter.wait()

        response = self.session.get(
            f"{self.BASE_URL}/slug/{slug}"
        )
        response.raise_for_status()
        return response.json()

    def get_bets(
        self,
        market_id: Optional[str] = None,
        user_id: Optional[str] = None,
        limit: int = 1000,
        before: Optional[str] = None
    ) -> list:
        """
        Fetch bet history.

        Parameters
        ----------
        market_id : str, optional
            Filter bets to a specific market.
        user_id : str, optional
            Filter bets to a specific user.
        limit : int
            Maximum bets to return.
        before : str, optional
            Pagination cursor (bet ID).
        """
        self.rate_limiter.wait()

        params = {"limit": limit}
        if market_id:
            params["contractId"] = market_id
        if user_id:
            params["userId"] = user_id
        if before:
            params["before"] = before

        response = self.session.get(
            f"{self.BASE_URL}/bets", params=params
        )
        response.raise_for_status()
        return response.json()

    def get_user(self, username: str) -> dict:
        """Fetch user profile by username."""
        self.rate_limiter.wait()

        response = self.session.get(
            f"{self.BASE_URL}/user/{username}"
        )
        response.raise_for_status()
        return response.json()

    def search_markets(self, query: str, limit: int = 20) -> list:
        """Search for markets by text query."""
        self.rate_limiter.wait()

        params = {"term": query, "limit": limit}
        response = self.session.get(
            f"{self.BASE_URL}/search-markets", params=params
        )
        response.raise_for_status()
        return response.json()

    def get_market_positions(self, market_id: str) -> list:
        """Fetch all positions in a market."""
        self.rate_limiter.wait()

        response = self.session.get(
            f"{self.BASE_URL}/market/{market_id}/positions"
        )
        response.raise_for_status()
        return response.json()

    def get_all_resolved_markets(self, max_markets: int = 5000) -> list:
        """
        Fetch all resolved binary markets for calibration analysis.
        """
        all_markets = []
        before = None

        while len(all_markets) < max_markets:
            markets = self.get_markets(
                limit=500,
                sort="created-time",
                order="desc",
                before=before
            )

            if not markets:
                break

            resolved = [
                m for m in markets
                if m.get("isResolved") and m.get("outcomeType") == "BINARY"
            ]
            all_markets.extend(resolved)

            before = markets[-1].get("id")

        return all_markets[:max_markets]

20.6 Web Scraping Fundamentals

20.6.1 When to Scrape

Web scraping should be your fallback strategy, not your first choice. Use scraping when:

  1. No API is available for the data you need.
  2. The API is incomplete --- it does not expose certain data visible on the web interface.
  3. Historical data is only available through web archives or wayback machines.
  4. Cross-platform comparison requires data from platforms without APIs.
  5. One-time data collection where building an API integration is not worth the effort.

Always check for an API first. APIs are more reliable, faster, and less likely to break when the website is redesigned.

20.6.2 BeautifulSoup for HTML Parsing

BeautifulSoup is the workhorse library for parsing HTML in Python. It transforms raw HTML into a navigable tree structure.

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import time
import re

class PredictionMarketScraper:
    """
    General-purpose scraper for prediction market websites.

    Uses BeautifulSoup for HTML parsing with built-in
    rate limiting and error handling.
    """

    def __init__(self, base_url: str, delay: float = 2.0):
        """
        Parameters
        ----------
        base_url : str
            Base URL of the website to scrape.
        delay : float
            Minimum delay between requests in seconds.
        """
        self.base_url = base_url.rstrip("/")
        self.delay = delay
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/120.0.0.0 Safari/537.36"
            ),
            "Accept": "text/html,application/xhtml+xml",
            "Accept-Language": "en-US,en;q=0.9"
        })
        self.last_request_time = 0

    def _throttle(self):
        """Enforce minimum delay between requests."""
        elapsed = time.time() - self.last_request_time
        if elapsed < self.delay:
            time.sleep(self.delay - elapsed)
        self.last_request_time = time.time()

    def fetch_page(self, path: str = "") -> BeautifulSoup:
        """
        Fetch a page and return parsed HTML.

        Parameters
        ----------
        path : str
            Path relative to the base URL.

        Returns
        -------
        BeautifulSoup
            Parsed HTML document.
        """
        self._throttle()

        url = urljoin(self.base_url, path)
        response = self.session.get(url, timeout=30)
        response.raise_for_status()

        return BeautifulSoup(response.text, "html.parser")

    def extract_table_data(self, soup: BeautifulSoup,
                           table_selector: str) -> list:
        """
        Extract data from an HTML table.

        Parameters
        ----------
        soup : BeautifulSoup
            Parsed HTML document.
        table_selector : str
            CSS selector for the table element.

        Returns
        -------
        list
            List of dictionaries, one per row, with column headers
            as keys.
        """
        table = soup.select_one(table_selector)
        if not table:
            return []

        # Extract headers
        headers = []
        header_row = table.select_one("thead tr") or table.select_one("tr")
        if header_row:
            headers = [
                th.get_text(strip=True)
                for th in header_row.select("th, td")
            ]

        # Extract rows
        rows = []
        body = table.select_one("tbody") or table
        for tr in body.select("tr")[1:] if not table.select_one("thead") \
                else body.select("tr"):
            cells = [td.get_text(strip=True) for td in tr.select("td")]
            if cells and len(cells) == len(headers):
                rows.append(dict(zip(headers, cells)))

        return rows

    def extract_links(self, soup: BeautifulSoup,
                      pattern: str = "") -> list:
        """
        Extract links matching a URL pattern.

        Parameters
        ----------
        soup : BeautifulSoup
            Parsed HTML document.
        pattern : str
            Regex pattern to filter URLs.

        Returns
        -------
        list
            List of absolute URLs matching the pattern.
        """
        links = []
        for a_tag in soup.select("a[href]"):
            href = a_tag["href"]
            absolute_url = urljoin(self.base_url, href)

            if pattern and not re.search(pattern, absolute_url):
                continue

            links.append({
                "url": absolute_url,
                "text": a_tag.get_text(strip=True),
                "title": a_tag.get("title", "")
            })

        return links

    def scrape_market_list(self, path: str,
                           market_selector: str,
                           name_selector: str,
                           price_selector: str) -> list:
        """
        Scrape a list of markets from a page.

        Parameters
        ----------
        path : str
            Page path to scrape.
        market_selector : str
            CSS selector for market containers.
        name_selector : str
            CSS selector for market name within container.
        price_selector : str
            CSS selector for price within container.

        Returns
        -------
        list
            List of market dictionaries with name and price.
        """
        soup = self.fetch_page(path)
        markets = []

        for container in soup.select(market_selector):
            name_elem = container.select_one(name_selector)
            price_elem = container.select_one(price_selector)

            if name_elem and price_elem:
                price_text = price_elem.get_text(strip=True)
                # Extract numeric price from text like "$0.72" or "72%"
                price_match = re.search(r"[\d.]+", price_text)

                markets.append({
                    "name": name_elem.get_text(strip=True),
                    "price_text": price_text,
                    "price": float(price_match.group())
                             if price_match else None,
                    "url": urljoin(
                        self.base_url,
                        name_elem.get("href", "")
                    ),
                    "scraped_at": datetime.utcnow().isoformat()
                })

        return markets

20.6.3 Handling JavaScript-Rendered Pages

Many modern prediction market websites use JavaScript frameworks (React, Vue, Angular) that render content dynamically. BeautifulSoup only sees the initial HTML, which may be empty. For these sites, you need a browser automation tool like Selenium.

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException

class DynamicPageScraper:
    """
    Scraper for JavaScript-rendered prediction market pages.

    Uses Selenium with headless Chrome to render dynamic content
    before parsing with BeautifulSoup.
    """

    def __init__(self, headless: bool = True):
        """
        Parameters
        ----------
        headless : bool
            Whether to run Chrome in headless mode (no visible window).
        """
        chrome_options = Options()
        if headless:
            chrome_options.add_argument("--headless=new")

        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-gpu")
        chrome_options.add_argument(
            "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36"
        )

        self.driver = webdriver.Chrome(options=chrome_options)
        self.wait = WebDriverWait(self.driver, 15)

    def fetch_dynamic_page(self, url: str,
                           wait_selector: str = "body") -> BeautifulSoup:
        """
        Load a page and wait for dynamic content to render.

        Parameters
        ----------
        url : str
            Full URL to load.
        wait_selector : str
            CSS selector to wait for before parsing.
            The page is considered loaded when this element appears.

        Returns
        -------
        BeautifulSoup
            Parsed HTML after JavaScript rendering.
        """
        self.driver.get(url)

        try:
            self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector))
            )
        except TimeoutException:
            print(f"Timeout waiting for {wait_selector} on {url}")

        # Additional wait for AJAX requests to complete
        time.sleep(2)

        html = self.driver.page_source
        return BeautifulSoup(html, "html.parser")

    def scroll_and_collect(self, url: str, item_selector: str,
                           max_items: int = 100) -> list:
        """
        Scroll through an infinite-scroll page and collect items.

        Parameters
        ----------
        url : str
            Page URL.
        item_selector : str
            CSS selector for items to collect.
        max_items : int
            Maximum number of items to collect.

        Returns
        -------
        list
            Collected item texts.
        """
        self.driver.get(url)
        time.sleep(3)

        collected = set()
        last_count = 0
        no_change_count = 0

        while len(collected) < max_items and no_change_count < 5:
            # Collect current items
            elements = self.driver.find_elements(
                By.CSS_SELECTOR, item_selector
            )
            for elem in elements:
                collected.add(elem.text)

            if len(collected) == last_count:
                no_change_count += 1
            else:
                no_change_count = 0
                last_count = len(collected)

            # Scroll down
            self.driver.execute_script(
                "window.scrollTo(0, document.body.scrollHeight);"
            )
            time.sleep(2)

        return list(collected)[:max_items]

    def close(self):
        """Close the browser."""
        self.driver.quit()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

20.6.4 Respecting robots.txt

Before scraping any website, check its robots.txt file to understand what the site owner permits:

from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse

class RobotsChecker:
    """
    Check robots.txt compliance before scraping.

    Always respect robots.txt directives. Scraping pages
    that are disallowed is both unethical and may have
    legal consequences.
    """

    def __init__(self):
        self._parsers = {}

    def can_fetch(self, url: str,
                  user_agent: str = "PredictionMarketResearch") -> bool:
        """
        Check if a URL can be fetched according to robots.txt.

        Parameters
        ----------
        url : str
            The URL to check.
        user_agent : str
            The user agent string to check against.

        Returns
        -------
        bool
            True if scraping is allowed, False otherwise.
        """
        parsed = urlparse(url)
        domain = f"{parsed.scheme}://{parsed.netloc}"

        if domain not in self._parsers:
            parser = RobotFileParser()
            parser.set_url(f"{domain}/robots.txt")
            try:
                parser.read()
            except Exception:
                # If robots.txt cannot be read, assume allowed
                return True
            self._parsers[domain] = parser

        return self._parsers[domain].can_fetch(user_agent, url)

    def get_crawl_delay(self, url: str,
                        user_agent: str = "*") -> Optional[float]:
        """
        Get the crawl delay specified in robots.txt.

        Returns
        -------
        float or None
            Crawl delay in seconds, or None if not specified.
        """
        parsed = urlparse(url)
        domain = f"{parsed.scheme}://{parsed.netloc}"

        if domain in self._parsers:
            delay = self._parsers[domain].crawl_delay(user_agent)
            return float(delay) if delay else None

        return None


# Usage example
robots = RobotsChecker()

url = "https://example-prediction-market.com/markets"
if robots.can_fetch(url):
    delay = robots.get_crawl_delay(url)
    print(f"Scraping allowed. Crawl delay: {delay or 'not specified'}")
else:
    print("Scraping NOT allowed by robots.txt. Do not proceed.")

20.7 Building Robust Data Pipelines

20.7.1 ETL Architecture

ETL (Extract, Transform, Load) is the fundamental pattern for data pipelines. Each phase has a distinct responsibility:

Extract: Pull raw data from source systems (APIs, web pages, files). Transform: Clean, validate, normalize, and reshape the data. Load: Write the processed data to the target storage system.

┌──────────┐    ┌──────────────┐    ┌──────────┐
│  Extract  │───>│  Transform   │───>│   Load   │
│           │    │              │    │          │
│ API calls │    │ Clean        │    │ Database │
│ Scraping  │    │ Validate     │    │ Files    │
│ Files     │    │ Normalize    │    │ Cache    │
└──────────┘    └──────────────┘    └──────────┘

20.7.2 Pipeline Implementation

import json
import logging
import sqlite3
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import List, Optional, Any
from pathlib import Path

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("pipeline")


@dataclass
class MarketRecord:
    """Normalized market record used across all platforms."""
    platform: str
    market_id: str
    question: str
    category: Optional[str]
    yes_price: float
    no_price: Optional[float]
    volume: Optional[float]
    liquidity: Optional[float]
    status: str        # 'open', 'closed', 'resolved'
    resolution: Optional[str]  # 'yes', 'no', None
    created_at: Optional[str]
    close_date: Optional[str]
    fetched_at: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    metadata: dict = field(default_factory=dict)


class BaseExtractor(ABC):
    """Base class for data extractors."""

    @abstractmethod
    def extract(self) -> List[dict]:
        """Extract raw data from the source."""
        pass

    @property
    @abstractmethod
    def platform_name(self) -> str:
        """Name of the platform being extracted."""
        pass


class BaseTransformer(ABC):
    """Base class for data transformers."""

    @abstractmethod
    def transform(self, raw_data: List[dict]) -> List[MarketRecord]:
        """Transform raw data into normalized MarketRecords."""
        pass


class BaseLoader(ABC):
    """Base class for data loaders."""

    @abstractmethod
    def load(self, records: List[MarketRecord]) -> int:
        """
        Load records into the target system.

        Returns the number of records loaded.
        """
        pass


class ManifoldExtractor(BaseExtractor):
    """Extract market data from Manifold Markets."""

    platform_name = "manifold"

    def __init__(self, client: ManifoldClient, limit: int = 500):
        self.client = client
        self.limit = limit

    def extract(self) -> List[dict]:
        logger.info(f"Extracting up to {self.limit} markets from Manifold")
        try:
            markets = self.client.get_markets(limit=self.limit)
            logger.info(f"Extracted {len(markets)} markets from Manifold")
            return markets
        except Exception as e:
            logger.error(f"Manifold extraction failed: {e}")
            return []


class ManifoldTransformer(BaseTransformer):
    """Transform Manifold Markets data into normalized records."""

    def transform(self, raw_data: List[dict]) -> List[MarketRecord]:
        records = []

        for market in raw_data:
            try:
                # Skip non-binary markets for simplicity
                if market.get("outcomeType") != "BINARY":
                    continue

                probability = market.get("probability", 0)

                record = MarketRecord(
                    platform="manifold",
                    market_id=market.get("id", ""),
                    question=market.get("question", ""),
                    category=market.get("groupSlugs", [None])[0]
                        if market.get("groupSlugs") else None,
                    yes_price=round(probability, 4),
                    no_price=round(1 - probability, 4),
                    volume=market.get("volume"),
                    liquidity=market.get("totalLiquidity"),
                    status="resolved" if market.get("isResolved")
                           else "open",
                    resolution=market.get("resolution"),
                    created_at=datetime.fromtimestamp(
                        market["createdTime"] / 1000,
                        tz=timezone.utc
                    ).isoformat() if market.get("createdTime") else None,
                    close_date=datetime.fromtimestamp(
                        market["closeTime"] / 1000,
                        tz=timezone.utc
                    ).isoformat() if market.get("closeTime") else None,
                    metadata={
                        "slug": market.get("slug"),
                        "url": market.get("url"),
                        "creator": market.get("creatorUsername")
                    }
                )
                records.append(record)

            except (KeyError, TypeError, ValueError) as e:
                logger.warning(
                    f"Failed to transform Manifold market "
                    f"{market.get('id', '?')}: {e}"
                )

        logger.info(
            f"Transformed {len(records)} records from "
            f"{len(raw_data)} raw entries"
        )
        return records


class SQLiteLoader(BaseLoader):
    """Load market records into a SQLite database."""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_database()

    def _init_database(self):
        """Create tables if they don't exist."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS markets (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                platform TEXT NOT NULL,
                market_id TEXT NOT NULL,
                question TEXT NOT NULL,
                category TEXT,
                status TEXT NOT NULL,
                resolution TEXT,
                created_at TEXT,
                close_date TEXT,
                metadata TEXT,
                UNIQUE(platform, market_id)
            )
        """)

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS price_snapshots (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                platform TEXT NOT NULL,
                market_id TEXT NOT NULL,
                yes_price REAL NOT NULL,
                no_price REAL,
                volume REAL,
                liquidity REAL,
                fetched_at TEXT NOT NULL,
                FOREIGN KEY (platform, market_id)
                    REFERENCES markets(platform, market_id)
            )
        """)

        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_price_snapshots_market
            ON price_snapshots(platform, market_id, fetched_at)
        """)

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS pipeline_runs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                platform TEXT NOT NULL,
                started_at TEXT NOT NULL,
                completed_at TEXT,
                records_extracted INTEGER,
                records_loaded INTEGER,
                status TEXT NOT NULL,
                error_message TEXT
            )
        """)

        conn.commit()
        conn.close()

    def load(self, records: List[MarketRecord]) -> int:
        """Load records, updating markets and inserting price snapshots."""
        if not records:
            return 0

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        loaded = 0

        try:
            for record in records:
                # Upsert market
                cursor.execute("""
                    INSERT INTO markets
                        (platform, market_id, question, category,
                         status, resolution, created_at, close_date,
                         metadata)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
                    ON CONFLICT(platform, market_id) DO UPDATE SET
                        status = excluded.status,
                        resolution = excluded.resolution,
                        metadata = excluded.metadata
                """, (
                    record.platform, record.market_id, record.question,
                    record.category, record.status, record.resolution,
                    record.created_at, record.close_date,
                    json.dumps(record.metadata)
                ))

                # Insert price snapshot
                cursor.execute("""
                    INSERT INTO price_snapshots
                        (platform, market_id, yes_price, no_price,
                         volume, liquidity, fetched_at)
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                """, (
                    record.platform, record.market_id, record.yes_price,
                    record.no_price, record.volume, record.liquidity,
                    record.fetched_at
                ))

                loaded += 1

            conn.commit()
            logger.info(f"Loaded {loaded} records into SQLite")

        except Exception as e:
            conn.rollback()
            logger.error(f"Load failed: {e}")
            raise
        finally:
            conn.close()

        return loaded


class DataPipeline:
    """
    Orchestrates the ETL pipeline for prediction market data.

    Combines extractors, transformers, and loaders into
    a complete data collection system.
    """

    def __init__(self, loader: BaseLoader, db_path: str = None):
        self.loader = loader
        self.db_path = db_path
        self.pipelines = []  # List of (extractor, transformer) tuples

    def add_source(self, extractor: BaseExtractor,
                   transformer: BaseTransformer):
        """Register a data source with its transformer."""
        self.pipelines.append((extractor, transformer))

    def run(self) -> dict:
        """
        Execute the full ETL pipeline for all registered sources.

        Returns
        -------
        dict
            Summary of the pipeline run including record counts
            and any errors.
        """
        summary = {
            "started_at": datetime.now(timezone.utc).isoformat(),
            "sources": {},
            "total_extracted": 0,
            "total_loaded": 0,
            "errors": []
        }

        for extractor, transformer in self.pipelines:
            platform = extractor.platform_name
            logger.info(f"--- Running pipeline for {platform} ---")

            source_summary = {
                "extracted": 0,
                "transformed": 0,
                "loaded": 0,
                "errors": []
            }

            try:
                # Extract
                raw_data = extractor.extract()
                source_summary["extracted"] = len(raw_data)

                # Transform
                records = transformer.transform(raw_data)
                source_summary["transformed"] = len(records)

                # Load
                loaded = self.loader.load(records)
                source_summary["loaded"] = loaded

                summary["total_extracted"] += len(raw_data)
                summary["total_loaded"] += loaded

            except Exception as e:
                error_msg = f"{platform}: {str(e)}"
                source_summary["errors"].append(error_msg)
                summary["errors"].append(error_msg)
                logger.error(f"Pipeline failed for {platform}: {e}")

            summary["sources"][platform] = source_summary

        summary["completed_at"] = datetime.now(timezone.utc).isoformat()

        # Log pipeline run to database
        if self.db_path:
            self._log_run(summary)

        return summary

    def _log_run(self, summary: dict):
        """Log the pipeline run to the pipeline_runs table."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        for platform, source in summary["sources"].items():
            cursor.execute("""
                INSERT INTO pipeline_runs
                    (platform, started_at, completed_at,
                     records_extracted, records_loaded,
                     status, error_message)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (
                platform,
                summary["started_at"],
                summary.get("completed_at"),
                source["extracted"],
                source["loaded"],
                "error" if source["errors"] else "success",
                "; ".join(source["errors"]) if source["errors"] else None
            ))

        conn.commit()
        conn.close()

20.7.3 Scheduling Pipelines

For continuous data collection, you need to schedule your pipeline to run at regular intervals:

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger

def setup_scheduled_pipeline(db_path: str):
    """
    Set up a scheduled data collection pipeline.

    Runs every hour to collect fresh market data from
    all configured platforms.
    """

    def run_pipeline():
        """Execute the pipeline (called by scheduler)."""
        logger.info("Scheduled pipeline run starting")

        # Initialize clients
        manifold = ManifoldClient()

        # Initialize pipeline components
        loader = SQLiteLoader(db_path)
        pipeline = DataPipeline(loader, db_path)

        # Register sources
        pipeline.add_source(
            ManifoldExtractor(manifold, limit=500),
            ManifoldTransformer()
        )

        # Run
        summary = pipeline.run()
        logger.info(
            f"Pipeline complete: {summary['total_loaded']} records loaded"
        )

    # Create scheduler
    scheduler = BlockingScheduler()

    # Run every hour
    scheduler.add_job(
        run_pipeline,
        IntervalTrigger(hours=1),
        id="hourly_collection",
        name="Hourly Market Data Collection",
        misfire_grace_time=300  # Allow 5 min grace period
    )

    # Also run once immediately
    scheduler.add_job(
        run_pipeline,
        "date",  # Run once at the specified date (now)
        id="initial_collection",
        name="Initial Data Collection"
    )

    logger.info("Starting scheduled pipeline")
    scheduler.start()

20.8 Database Design for Market Data

20.8.1 Relational Schema Design

A well-designed database schema is the backbone of your data infrastructure. The schema must capture the hierarchical structure of prediction market data while supporting efficient queries.

Entity-Relationship Model:

┌──────────────┐     ┌──────────────────┐     ┌──────────────────┐
│  platforms    │     │     markets      │     │  price_snapshots │
├──────────────┤     ├──────────────────┤     ├──────────────────┤
│ id (PK)      │──┐  │ id (PK)          │──┐  │ id (PK)          │
│ name         │  └─>│ platform_id (FK) │  └─>│ market_id (FK)   │
│ api_base_url │     │ external_id      │     │ yes_price        │
│ type         │     │ question         │     │ no_price         │
└──────────────┘     │ category         │     │ volume           │
                     │ status           │     │ spread           │
                     │ created_at       │     │ fetched_at       │
                     │ close_date       │     └──────────────────┘
                     │ resolution       │
                     │ resolution_date  │     ┌──────────────────┐
                     └──────────────────┘     │     trades       │
                                              ├──────────────────┤
                                              │ id (PK)          │
                                              │ market_id (FK)   │
                                              │ price            │
                                              │ size             │
                                              │ side             │
                                              │ traded_at        │
                                              └──────────────────┘

20.8.2 SQLAlchemy ORM Implementation

from sqlalchemy import (
    create_engine, Column, Integer, String, Float, DateTime,
    Text, Boolean, ForeignKey, Index, UniqueConstraint,
    JSON, Enum
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.sql import func
import enum

Base = declarative_base()


class MarketStatus(enum.Enum):
    OPEN = "open"
    CLOSED = "closed"
    RESOLVED = "resolved"
    CANCELLED = "cancelled"


class Platform(Base):
    """Prediction market platform."""
    __tablename__ = "platforms"

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    display_name = Column(String(100))
    api_base_url = Column(String(500))
    platform_type = Column(String(20))  # 'real_money', 'play_money', 'forecast'

    markets = relationship("Market", back_populates="platform")

    def __repr__(self):
        return f"<Platform(name='{self.name}')>"


class Market(Base):
    """A single prediction market or question."""
    __tablename__ = "markets"
    __table_args__ = (
        UniqueConstraint("platform_id", "external_id",
                         name="uq_market_platform_external"),
        Index("idx_market_status", "status"),
        Index("idx_market_category", "category"),
        Index("idx_market_close_date", "close_date"),
    )

    id = Column(Integer, primary_key=True)
    platform_id = Column(Integer, ForeignKey("platforms.id"), nullable=False)
    external_id = Column(String(255), nullable=False)
    question = Column(Text, nullable=False)
    description = Column(Text)
    category = Column(String(100))
    outcome_type = Column(String(20), default="binary")
    status = Column(String(20), default="open")
    resolution = Column(String(50))
    created_at = Column(DateTime)
    close_date = Column(DateTime)
    resolution_date = Column(DateTime)
    url = Column(String(1000))
    metadata_json = Column(JSON)

    # Timestamps
    first_seen = Column(DateTime, server_default=func.now())
    last_updated = Column(DateTime, onupdate=func.now())

    # Relationships
    platform = relationship("Platform", back_populates="markets")
    price_snapshots = relationship(
        "PriceSnapshot", back_populates="market",
        order_by="PriceSnapshot.fetched_at"
    )
    trades = relationship(
        "Trade", back_populates="market",
        order_by="Trade.traded_at"
    )

    def __repr__(self):
        return f"<Market(id={self.id}, question='{self.question[:50]}...')>"

    @property
    def latest_price(self):
        """Get the most recent price snapshot."""
        if self.price_snapshots:
            return self.price_snapshots[-1]
        return None


class PriceSnapshot(Base):
    """Point-in-time price observation for a market."""
    __tablename__ = "price_snapshots"
    __table_args__ = (
        Index("idx_price_market_time", "market_id", "fetched_at"),
    )

    id = Column(Integer, primary_key=True)
    market_id = Column(Integer, ForeignKey("markets.id"), nullable=False)
    yes_price = Column(Float, nullable=False)
    no_price = Column(Float)
    spread = Column(Float)
    volume_24h = Column(Float)
    total_volume = Column(Float)
    liquidity = Column(Float)
    num_traders = Column(Integer)
    fetched_at = Column(DateTime, nullable=False, server_default=func.now())

    market = relationship("Market", back_populates="price_snapshots")

    def __repr__(self):
        return (f"<PriceSnapshot(market_id={self.market_id}, "
                f"yes={self.yes_price}, at={self.fetched_at})>")


class Trade(Base):
    """Individual trade on a market."""
    __tablename__ = "trades"
    __table_args__ = (
        Index("idx_trade_market_time", "market_id", "traded_at"),
    )

    id = Column(Integer, primary_key=True)
    market_id = Column(Integer, ForeignKey("markets.id"), nullable=False)
    external_trade_id = Column(String(255))
    price = Column(Float, nullable=False)
    size = Column(Float)
    side = Column(String(10))  # 'buy', 'sell'
    outcome = Column(String(50))  # 'yes', 'no'
    traded_at = Column(DateTime, nullable=False)
    trader_id = Column(String(255))

    market = relationship("Market", back_populates="trades")


class Resolution(Base):
    """Resolution record for a settled market."""
    __tablename__ = "resolutions"

    id = Column(Integer, primary_key=True)
    market_id = Column(Integer, ForeignKey("markets.id"),
                       unique=True, nullable=False)
    resolution = Column(String(50), nullable=False)
    resolution_date = Column(DateTime, nullable=False)
    final_price = Column(Float)
    total_volume = Column(Float)
    notes = Column(Text)

    market = relationship("Market")


def create_database(db_url: str = "sqlite:///prediction_markets.db"):
    """
    Create the database and all tables.

    Parameters
    ----------
    db_url : str
        SQLAlchemy database URL.
        Examples:
        - 'sqlite:///local.db'
        - 'postgresql://user:pass@localhost/markets'

    Returns
    -------
    tuple
        (engine, SessionFactory) for database operations.
    """
    engine = create_engine(db_url, echo=False)
    Base.metadata.create_all(engine)
    SessionFactory = sessionmaker(bind=engine)

    return engine, SessionFactory

20.8.3 Database Query Patterns

from sqlalchemy import and_, or_, desc, func
from datetime import datetime, timedelta

class MarketDatabase:
    """
    High-level interface for querying prediction market data.

    Wraps SQLAlchemy queries in domain-specific methods.
    """

    def __init__(self, session_factory):
        self.SessionFactory = session_factory

    def get_active_markets(self, platform: str = None,
                           category: str = None,
                           min_volume: float = 0) -> list:
        """
        Fetch all currently active markets with optional filters.
        """
        session = self.SessionFactory()
        try:
            query = session.query(Market).filter(
                Market.status == "open"
            )

            if platform:
                query = query.join(Platform).filter(
                    Platform.name == platform
                )
            if category:
                query = query.filter(Market.category == category)

            # Filter by volume using the latest price snapshot
            if min_volume > 0:
                subquery = session.query(
                    PriceSnapshot.market_id,
                    func.max(PriceSnapshot.fetched_at).label("latest")
                ).group_by(PriceSnapshot.market_id).subquery()

                query = query.join(
                    subquery, Market.id == subquery.c.market_id
                ).join(
                    PriceSnapshot,
                    and_(
                        PriceSnapshot.market_id == subquery.c.market_id,
                        PriceSnapshot.fetched_at == subquery.c.latest
                    )
                ).filter(
                    PriceSnapshot.total_volume >= min_volume
                )

            return query.all()
        finally:
            session.close()

    def get_price_history(self, market_id: int,
                          start_date: datetime = None,
                          end_date: datetime = None) -> list:
        """
        Fetch price history for a specific market.
        """
        session = self.SessionFactory()
        try:
            query = session.query(PriceSnapshot).filter(
                PriceSnapshot.market_id == market_id
            )

            if start_date:
                query = query.filter(
                    PriceSnapshot.fetched_at >= start_date
                )
            if end_date:
                query = query.filter(
                    PriceSnapshot.fetched_at <= end_date
                )

            return query.order_by(PriceSnapshot.fetched_at).all()
        finally:
            session.close()

    def get_resolved_markets_for_calibration(
        self,
        platform: str = None,
        min_volume: float = 0,
        after_date: datetime = None
    ) -> list:
        """
        Fetch resolved markets with their final prices
        for calibration analysis.

        Returns markets paired with their last price before
        resolution.
        """
        session = self.SessionFactory()
        try:
            query = session.query(Market, Resolution).join(
                Resolution, Market.id == Resolution.market_id
            ).filter(
                Market.status == "resolved"
            )

            if platform:
                query = query.join(Platform).filter(
                    Platform.name == platform
                )
            if after_date:
                query = query.filter(
                    Resolution.resolution_date >= after_date
                )

            results = []
            for market, resolution in query.all():
                results.append({
                    "market_id": market.id,
                    "question": market.question,
                    "final_price": resolution.final_price,
                    "resolution": resolution.resolution,
                    "resolution_date": resolution.resolution_date,
                    "total_volume": resolution.total_volume
                })

            return results
        finally:
            session.close()

    def get_data_quality_report(self) -> dict:
        """
        Generate a data quality report across all platforms.
        """
        session = self.SessionFactory()
        try:
            report = {}

            # Total counts
            report["total_markets"] = session.query(Market).count()
            report["total_snapshots"] = session.query(PriceSnapshot).count()
            report["total_trades"] = session.query(Trade).count()

            # Per-platform breakdown
            platform_stats = session.query(
                Platform.name,
                func.count(Market.id),
                func.sum(
                    func.case(
                        (Market.status == "resolved", 1),
                        else_=0
                    )
                )
            ).join(Market).group_by(Platform.name).all()

            report["platforms"] = {
                name: {"total_markets": total, "resolved": resolved}
                for name, total, resolved in platform_stats
            }

            # Data freshness
            latest_snapshot = session.query(
                func.max(PriceSnapshot.fetched_at)
            ).scalar()
            report["latest_data"] = str(latest_snapshot) if latest_snapshot else None

            # Missing data
            markets_without_prices = session.query(Market).outerjoin(
                PriceSnapshot
            ).filter(PriceSnapshot.id == None).count()
            report["markets_without_prices"] = markets_without_prices

            return report
        finally:
            session.close()

20.9 Data Quality and Validation

20.9.1 Common Data Quality Issues

Prediction market data suffers from several recurring quality problems:

  1. Missing values: API responses may omit fields, especially for inactive or low-volume markets.
  2. Timezone inconsistencies: Different platforms report timestamps in different timezones, sometimes without explicit zone information.
  3. Stale prices: Markets with no recent trades may report the last traded price, which could be hours or days old.
  4. Duplicate records: Running pipelines multiple times or across overlapping time windows can create duplicates.
  5. Price anomalies: Flash crashes, fat-finger errors, or thin liquidity can produce prices that do not reflect genuine market beliefs.
  6. Resolution ambiguity: Some markets resolve in unexpected ways (N/A, voided, or partial resolution) that do not map cleanly to binary outcomes.

20.9.2 Data Validation Framework

from dataclasses import dataclass
from typing import Callable
import statistics

@dataclass
class ValidationResult:
    """Result of a data validation check."""
    check_name: str
    passed: bool
    message: str
    severity: str  # 'error', 'warning', 'info'
    affected_records: int = 0
    total_records: int = 0

    @property
    def pass_rate(self) -> float:
        if self.total_records == 0:
            return 1.0
        return 1.0 - (self.affected_records / self.total_records)


class DataValidator:
    """
    Validates prediction market data quality.

    Runs a suite of checks on market records and produces
    a comprehensive quality report.
    """

    def __init__(self):
        self.checks = []

    def add_check(self, name: str, check_fn: Callable,
                  severity: str = "warning"):
        """Register a validation check."""
        self.checks.append({
            "name": name,
            "fn": check_fn,
            "severity": severity
        })

    def validate(self, records: list) -> list:
        """
        Run all validation checks on the records.

        Parameters
        ----------
        records : list
            List of MarketRecord objects or dictionaries.

        Returns
        -------
        list
            List of ValidationResult objects.
        """
        results = []
        for check in self.checks:
            try:
                result = check["fn"](records)
                result.severity = check["severity"]
                results.append(result)
            except Exception as e:
                results.append(ValidationResult(
                    check_name=check["name"],
                    passed=False,
                    message=f"Check failed with error: {e}",
                    severity="error"
                ))
        return results

    def print_report(self, results: list):
        """Print a formatted validation report."""
        print("\n" + "=" * 60)
        print("DATA QUALITY REPORT")
        print("=" * 60)

        for result in results:
            status = "PASS" if result.passed else "FAIL"
            icon = "[OK]" if result.passed else "[!!]"

            print(f"\n{icon} {result.check_name} - {status}")
            print(f"    {result.message}")
            if result.total_records > 0:
                print(f"    Pass rate: {result.pass_rate:.1%} "
                      f"({result.affected_records}/{result.total_records} "
                      f"affected)")

        # Summary
        passed = sum(1 for r in results if r.passed)
        total = len(results)
        errors = sum(1 for r in results
                    if not r.passed and r.severity == "error")

        print(f"\n{'=' * 60}")
        print(f"SUMMARY: {passed}/{total} checks passed, "
              f"{errors} errors")
        print("=" * 60)


def create_standard_validators() -> DataValidator:
    """
    Create a DataValidator with standard prediction market checks.
    """
    validator = DataValidator()

    def check_price_range(records):
        """Prices should be between 0 and 1."""
        invalid = [
            r for r in records
            if not (0 <= r.yes_price <= 1)
        ]
        return ValidationResult(
            check_name="Price Range [0, 1]",
            passed=len(invalid) == 0,
            message=f"{len(invalid)} records have prices outside [0, 1]",
            severity="error",
            affected_records=len(invalid),
            total_records=len(records)
        )

    def check_price_complement(records):
        """Yes + No prices should approximately sum to 1."""
        invalid = [
            r for r in records
            if r.no_price is not None
            and abs(r.yes_price + r.no_price - 1.0) > 0.05
        ]
        return ValidationResult(
            check_name="Price Complement (Yes + No ~ 1.0)",
            passed=len(invalid) == 0,
            message=f"{len(invalid)} records where yes + no price "
                    f"deviates from 1.0 by more than 5%",
            severity="warning",
            affected_records=len(invalid),
            total_records=len(records)
        )

    def check_missing_questions(records):
        """All records should have a non-empty question."""
        invalid = [
            r for r in records
            if not r.question or r.question.strip() == ""
        ]
        return ValidationResult(
            check_name="Non-empty Question Text",
            passed=len(invalid) == 0,
            message=f"{len(invalid)} records have empty questions",
            severity="error",
            affected_records=len(invalid),
            total_records=len(records)
        )

    def check_timestamp_validity(records):
        """Timestamps should be parseable and reasonable."""
        invalid = []
        for r in records:
            try:
                if r.fetched_at:
                    dt = datetime.fromisoformat(
                        r.fetched_at.replace("Z", "+00:00")
                    )
                    # Check for obviously wrong dates
                    if dt.year < 2014 or dt.year > 2030:
                        invalid.append(r)
            except (ValueError, TypeError):
                invalid.append(r)

        return ValidationResult(
            check_name="Valid Timestamps",
            passed=len(invalid) == 0,
            message=f"{len(invalid)} records have invalid timestamps",
            severity="warning",
            affected_records=len(invalid),
            total_records=len(records)
        )

    def check_duplicate_markets(records):
        """Check for duplicate market IDs within a platform."""
        seen = set()
        duplicates = 0
        for r in records:
            key = (r.platform, r.market_id)
            if key in seen:
                duplicates += 1
            seen.add(key)

        return ValidationResult(
            check_name="No Duplicate Market IDs",
            passed=duplicates == 0,
            message=f"{duplicates} duplicate market IDs found",
            severity="warning",
            affected_records=duplicates,
            total_records=len(records)
        )

    def check_price_staleness(records):
        """Flag markets where the price hasn't changed recently."""
        # This check is more relevant for time-series data
        # Here we just verify fetched_at is recent
        now = datetime.now(timezone.utc)
        stale_threshold = timedelta(hours=24)
        stale = 0

        for r in records:
            try:
                fetched = datetime.fromisoformat(
                    r.fetched_at.replace("Z", "+00:00")
                )
                if now - fetched > stale_threshold:
                    stale += 1
            except (ValueError, TypeError, AttributeError):
                pass

        return ValidationResult(
            check_name="Data Freshness (< 24 hours)",
            passed=stale < len(records) * 0.1,
            message=f"{stale} records are more than 24 hours old",
            severity="info",
            affected_records=stale,
            total_records=len(records)
        )

    # Register all checks
    validator.add_check("Price Range", check_price_range, "error")
    validator.add_check("Price Complement", check_price_complement, "warning")
    validator.add_check("Question Text", check_missing_questions, "error")
    validator.add_check("Timestamps", check_timestamp_validity, "warning")
    validator.add_check("Duplicates", check_duplicate_markets, "warning")
    validator.add_check("Freshness", check_price_staleness, "info")

    return validator

20.9.3 Timezone Normalization

Timezone handling is a persistent source of bugs in data pipelines. The golden rule is: store everything in UTC, convert only for display.

from datetime import datetime, timezone, timedelta
from zoneinfo import ZoneInfo  # Python 3.9+

class TimestampNormalizer:
    """
    Normalize timestamps from various prediction market platforms
    to a consistent UTC format.
    """

    # Known platform timezone behaviors
    PLATFORM_TIMEZONES = {
        "kalshi": "America/New_York",   # Kalshi reports in ET
        "polymarket": "UTC",            # Polymarket uses UTC
        "metaculus": "UTC",             # Metaculus uses UTC
        "manifold": "UTC",             # Manifold uses UTC (Unix ms)
        "predictit": "America/New_York" # PredictIt reports in ET
    }

    @staticmethod
    def to_utc(timestamp_str: str,
               source_timezone: str = "UTC") -> datetime:
        """
        Parse a timestamp string and convert to UTC.

        Handles various common timestamp formats found in
        prediction market APIs.
        """
        if not timestamp_str:
            return None

        # Try ISO format first
        try:
            dt = datetime.fromisoformat(
                timestamp_str.replace("Z", "+00:00")
            )
            if dt.tzinfo is None:
                tz = ZoneInfo(source_timezone)
                dt = dt.replace(tzinfo=tz)
            return dt.astimezone(timezone.utc)
        except (ValueError, TypeError):
            pass

        # Try Unix timestamp (seconds)
        try:
            ts = float(timestamp_str)
            if ts > 1e12:  # Milliseconds
                ts /= 1000
            return datetime.fromtimestamp(ts, tz=timezone.utc)
        except (ValueError, TypeError):
            pass

        # Try common date formats
        formats = [
            "%Y-%m-%d %H:%M:%S",
            "%Y-%m-%dT%H:%M:%S",
            "%m/%d/%Y %H:%M:%S",
            "%Y-%m-%d",
        ]

        for fmt in formats:
            try:
                dt = datetime.strptime(timestamp_str, fmt)
                tz = ZoneInfo(source_timezone)
                dt = dt.replace(tzinfo=tz)
                return dt.astimezone(timezone.utc)
            except ValueError:
                continue

        raise ValueError(
            f"Cannot parse timestamp: '{timestamp_str}'"
        )

    @classmethod
    def normalize_for_platform(cls, timestamp_str: str,
                                platform: str) -> datetime:
        """
        Normalize a timestamp using the known timezone behavior
        of a specific platform.
        """
        source_tz = cls.PLATFORM_TIMEZONES.get(platform, "UTC")
        return cls.to_utc(timestamp_str, source_tz)

20.10 Alternative Data Sources

20.10.1 Why Alternative Data Matters

Prediction markets price events based on available information. If you can systematically access information before it is reflected in market prices, you have an edge. Alternative data sources --- news, social media, economic indicators, weather data --- provide the raw material for building informational advantages.

20.10.2 News Data

class NewsDataFetcher:
    """
    Fetch news data from various APIs for prediction market analysis.

    News sentiment and coverage can be leading indicators for
    market movements on political, economic, and social events.
    """

    def __init__(self, newsapi_key: Optional[str] = None,
                 gdelt_enabled: bool = True):
        self.newsapi_key = newsapi_key
        self.gdelt_enabled = gdelt_enabled
        self.session = requests.Session()

    def search_newsapi(self, query: str,
                       from_date: str = None,
                       to_date: str = None,
                       language: str = "en",
                       page_size: int = 100) -> list:
        """
        Search for news articles using the NewsAPI.

        Parameters
        ----------
        query : str
            Search keywords.
        from_date : str
            Start date (YYYY-MM-DD).
        to_date : str
            End date (YYYY-MM-DD).
        language : str
            Language filter.
        page_size : int
            Number of articles to return.

        Returns
        -------
        list
            List of article dictionaries.
        """
        if not self.newsapi_key:
            raise ValueError("NewsAPI key required")

        params = {
            "q": query,
            "language": language,
            "pageSize": page_size,
            "apiKey": self.newsapi_key,
            "sortBy": "publishedAt"
        }
        if from_date:
            params["from"] = from_date
        if to_date:
            params["to"] = to_date

        response = self.session.get(
            "https://newsapi.org/v2/everything",
            params=params
        )
        response.raise_for_status()
        data = response.json()

        return data.get("articles", [])

    def fetch_gdelt_mentions(self, query: str,
                             timespan: str = "24h") -> dict:
        """
        Query GDELT for global media mentions and tone.

        GDELT monitors news media worldwide and provides
        real-time data about events, mentions, and sentiment.

        Parameters
        ----------
        query : str
            Search query.
        timespan : str
            Time window ('24h', '7d', '30d').

        Returns
        -------
        dict
            GDELT response with articles and analytics.
        """
        params = {
            "query": query,
            "mode": "artlist",
            "maxrecords": 250,
            "format": "json",
            "timespan": timespan
        }

        response = self.session.get(
            "https://api.gdeltproject.org/api/v2/doc/doc",
            params=params
        )
        response.raise_for_status()
        return response.json()

20.10.3 Economic Data

class EconomicDataFetcher:
    """
    Fetch economic indicators from FRED (Federal Reserve
    Economic Data) and other sources.

    Economic data is essential for markets related to
    inflation, employment, GDP, and monetary policy.
    """

    def __init__(self, fred_api_key: Optional[str] = None):
        self.fred_api_key = fred_api_key
        self.session = requests.Session()

    def get_fred_series(self, series_id: str,
                        start_date: str = None,
                        end_date: str = None) -> list:
        """
        Fetch a time series from FRED.

        Parameters
        ----------
        series_id : str
            FRED series ID (e.g., 'UNRATE' for unemployment rate,
            'CPIAUCSL' for CPI, 'GDP' for GDP).
        start_date : str
            Start date (YYYY-MM-DD).
        end_date : str
            End date (YYYY-MM-DD).

        Returns
        -------
        list
            List of {date, value} observations.
        """
        if not self.fred_api_key:
            raise ValueError("FRED API key required")

        params = {
            "series_id": series_id,
            "api_key": self.fred_api_key,
            "file_type": "json"
        }
        if start_date:
            params["observation_start"] = start_date
        if end_date:
            params["observation_end"] = end_date

        response = self.session.get(
            "https://api.stlouisfed.org/fred/series/observations",
            params=params
        )
        response.raise_for_status()
        data = response.json()

        return [
            {
                "date": obs["date"],
                "value": float(obs["value"])
                         if obs["value"] != "." else None
            }
            for obs in data.get("observations", [])
        ]

    # Useful FRED series for prediction markets
    SERIES_MAP = {
        "unemployment_rate": "UNRATE",
        "cpi": "CPIAUCSL",
        "gdp": "GDP",
        "fed_funds_rate": "FEDFUNDS",
        "treasury_10y": "DGS10",
        "sp500": "SP500",
        "housing_starts": "HOUST",
        "consumer_sentiment": "UMCSENT",
        "inflation_expectations": "MICH",
        "initial_claims": "ICSA"
    }

    def get_economic_dashboard(self) -> dict:
        """
        Fetch latest values for key economic indicators.

        Returns
        -------
        dict
            Dictionary mapping indicator names to their latest values.
        """
        dashboard = {}

        for name, series_id in self.SERIES_MAP.items():
            try:
                observations = self.get_fred_series(series_id)
                if observations:
                    latest = observations[-1]
                    dashboard[name] = {
                        "value": latest["value"],
                        "date": latest["date"],
                        "series_id": series_id
                    }
            except Exception as e:
                dashboard[name] = {"error": str(e)}

            time.sleep(0.5)  # FRED rate limit

        return dashboard

20.10.4 Polling Data

class PollingDataFetcher:
    """
    Fetch polling data for political prediction markets.

    Polling data is the single most important alternative
    data source for election-related prediction markets.
    """

    def __init__(self):
        self.session = requests.Session()

    def fetch_fivethirtyeight_polls(self, poll_type: str = "president_general") -> list:
        """
        Fetch polling data from FiveThirtyEight's public datasets.

        Parameters
        ----------
        poll_type : str
            Type of polls to fetch:
            - 'president_general': Presidential general election
            - 'president_primary': Presidential primary
            - 'senate': Senate races
            - 'house': House races
            - 'governor': Governor races
            - 'generic_ballot': Generic congressional ballot

        Returns
        -------
        list
            List of poll dictionaries.
        """
        base_url = "https://projects.fivethirtyeight.com/polls/data"
        url_map = {
            "president_general": f"{base_url}/president_polls.csv",
            "president_primary": f"{base_url}/president_primary_polls.csv",
            "senate": f"{base_url}/senate_polls.csv",
            "house": f"{base_url}/house_polls.csv",
            "governor": f"{base_url}/governor_polls.csv",
            "generic_ballot": f"{base_url}/generic_ballot_polls.csv"
        }

        url = url_map.get(poll_type)
        if not url:
            raise ValueError(f"Unknown poll type: {poll_type}")

        response = self.session.get(url)
        response.raise_for_status()

        # Parse CSV
        import csv
        from io import StringIO

        reader = csv.DictReader(StringIO(response.text))
        polls = [row for row in reader]

        return polls

    def fetch_rcp_average(self, race: str) -> dict:
        """
        Fetch RealClearPolitics polling averages.

        Note: RCP does not have a public API, so this
        would require scraping (see Section 20.6).
        This is a placeholder demonstrating the interface.
        """
        # In practice, you would scrape the RCP website
        # or use a third-party dataset
        return {
            "race": race,
            "note": "RCP data requires web scraping - see Section 20.6"
        }

20.10.5 Weather Data

class WeatherDataFetcher:
    """
    Fetch weather data for weather-related prediction markets.

    Kalshi, for example, offers markets on temperature extremes,
    precipitation, and hurricane activity.
    """

    def __init__(self):
        self.session = requests.Session()

    def fetch_noaa_forecast(self, latitude: float,
                            longitude: float) -> dict:
        """
        Fetch weather forecast from NOAA's Weather API.

        Parameters
        ----------
        latitude : float
            Location latitude.
        longitude : float
            Location longitude.

        Returns
        -------
        dict
            Forecast data including temperature, precipitation,
            and other meteorological variables.
        """
        # Step 1: Get the grid point
        response = self.session.get(
            f"https://api.weather.gov/points/{latitude},{longitude}",
            headers={"User-Agent": "PredictionMarketResearch/1.0"}
        )
        response.raise_for_status()
        grid_data = response.json()

        # Step 2: Get the forecast
        forecast_url = grid_data["properties"]["forecast"]
        response = self.session.get(
            forecast_url,
            headers={"User-Agent": "PredictionMarketResearch/1.0"}
        )
        response.raise_for_status()

        return response.json()

    def fetch_open_meteo_historical(
        self,
        latitude: float,
        longitude: float,
        start_date: str,
        end_date: str,
        variables: list = None
    ) -> dict:
        """
        Fetch historical weather data from Open-Meteo (free, no API key).

        Parameters
        ----------
        latitude : float
            Location latitude.
        longitude : float
            Location longitude.
        start_date : str
            Start date (YYYY-MM-DD).
        end_date : str
            End date (YYYY-MM-DD).
        variables : list
            Weather variables to fetch. Defaults to temperature
            and precipitation.

        Returns
        -------
        dict
            Historical weather observations.
        """
        if variables is None:
            variables = [
                "temperature_2m_max",
                "temperature_2m_min",
                "precipitation_sum",
                "windspeed_10m_max"
            ]

        params = {
            "latitude": latitude,
            "longitude": longitude,
            "start_date": start_date,
            "end_date": end_date,
            "daily": ",".join(variables),
            "timezone": "UTC"
        }

        response = self.session.get(
            "https://archive-api.open-meteo.com/v1/archive",
            params=params
        )
        response.raise_for_status()
        return response.json()

20.11.1 Terms of Service Compliance

Every website and API has terms of service (ToS) that govern how their data may be used. Violating these terms can result in legal action, account termination, and reputational damage.

Key principles:

  1. Read the ToS before scraping or using an API. Many platforms explicitly prohibit scraping, automated access, or redistribution of their data.
  2. Respect rate limits. Even if you can technically make more requests, staying within published limits is both polite and protects the platform's infrastructure.
  3. Do not redistribute raw data unless the ToS explicitly permits it. Aggregated analyses and derived datasets are generally safer.
  4. Attribute sources. When publishing research based on platform data, credit the source.

20.11.2 Responsible Scraping Practices

Even when scraping is technically permitted, follow these guidelines:

DO:
  - Check robots.txt before scraping
  - Use reasonable delays between requests (2+ seconds)
  - Identify your scraper with a descriptive User-Agent
  - Cache responses to avoid redundant requests
  - Scrape during off-peak hours when possible
  - Contact the site owner if you plan large-scale scraping

DO NOT:
  - Ignore robots.txt directives
  - Make hundreds of requests per second
  - Disguise your scraper as a regular browser
  - Scrape behind login walls without explicit permission
  - Store or redistribute personal user data
  - Overwhelm small websites with traffic

20.11.3 Data Privacy and GDPR

If you collect data that includes personal information (usernames, prediction histories, user profiles), you must consider privacy regulations:

GDPR (General Data Protection Regulation) applies to data about EU residents. Key requirements:

  • Lawful basis: You need a legal justification for processing personal data. Legitimate interest (academic research) may apply, but consult a lawyer.
  • Data minimization: Collect only what you need. If you only need aggregate predictions, do not store individual user data.
  • Right to erasure: If someone asks you to delete their data, you must comply.
  • Data protection by design: Build privacy considerations into your data pipeline from the start.

Practical guidelines for prediction market research:

  1. Prefer aggregate data over individual data. Community predictions are more useful and less sensitive than individual forecasts.
  2. Anonymize user data. If you must store user-level data, replace usernames with hashed identifiers.
  3. Secure your database. Use encryption, access controls, and regular backups.
  4. Document your data practices. Keep a record of what data you collect, why, and how long you retain it.

20.11.4 Platform-Specific Considerations

Platform API Terms Scraping Policy Data Redistribution
Polymarket Open API, no key required for reads Generally permissive On-chain data is public
Kalshi API key required, rate limits apply ToS restricts scraping Prohibited without permission
Metaculus API available, reasonable use Robots.txt should be respected CC-BY licensed for some data
Manifold Open API, generous rate limits Generally permissive Data is open

20.11.5 Ethical Use of Data Advantages

Having better data than other market participants creates a responsibility:

  1. Do not use insider information. If you have access to non-public information about how a market will resolve, trading on it is unethical and potentially illegal.
  2. Do not manipulate markets. Using data to create misleading signals or manipulate prices harms other participants.
  3. Consider systemic effects. If your data pipeline and trading algorithm become large enough to move markets, consider whether your presence improves or degrades market quality.
  4. Share your findings. Consider publishing your research (with appropriate lags) to contribute to the forecasting community's collective knowledge.

20.12 Chapter Summary

This chapter has covered the complete data collection lifecycle for prediction market analysis:

APIs are your primary data source. We built Python clients for four major platforms --- Polymarket, Kalshi, Metaculus, and Manifold Markets --- each with proper authentication, pagination, rate limiting, and error handling. These clients form the foundation of any data collection infrastructure.

Web scraping fills the gaps. When APIs are incomplete, BeautifulSoup handles static HTML and Selenium handles JavaScript-rendered content. Always check robots.txt first and scrape responsibly.

ETL pipelines automate collection. The Extract-Transform-Load pattern separates concerns cleanly: extractors pull raw data, transformers normalize it into a common schema, and loaders persist it to storage. Scheduling with APScheduler or cron keeps your data fresh.

Database design matters. A well-designed relational schema with tables for platforms, markets, price snapshots, trades, and resolutions supports efficient queries and clean data organization. SQLAlchemy provides a Pythonic ORM layer.

Data quality requires active validation. Price range checks, complement verification, timestamp normalization, deduplication, and freshness monitoring catch problems before they corrupt your analyses.

Alternative data sources provide edge. News, economic indicators, polling data, and weather observations supplement market data and can provide informational advantages for forecasting.

Ethics and legality are non-negotiable. Respect terms of service, rate limits, and privacy regulations. The prediction market community benefits when participants act responsibly.

The data infrastructure you build in this chapter is the foundation upon which all subsequent modeling chapters rest. A model is only as good as the data that feeds it.


What's Next

In Chapter 21: Feature Engineering for Prediction Markets, we will transform the raw data collected in this chapter into features suitable for machine learning models. We will cover:

  • Extracting time-series features from price histories (momentum, volatility, mean reversion signals)
  • Computing market microstructure features from order book data
  • Building cross-market features that capture correlations between related questions
  • Creating calendar and event-based features
  • Handling the unique challenges of prediction market feature engineering, including non-stationarity and survivorship bias

The transition from raw data to predictive features is where domain expertise meets data science --- and where the most valuable analytical insights emerge.