Case Study 1: Building a Prediction Market Data Warehouse

Overview

In this case study, you will design and build a SQLite-based data warehouse that collects, normalizes, and stores prediction market data from three platforms: Manifold Markets, Metaculus, and Polymarket. You will implement ETL pipelines for each platform, schedule automated collection, run the system for a simulated week, and analyze the resulting data quality.

This exercise ties together nearly every concept from Chapter 20: API clients, data transformation, database design, pipeline orchestration, scheduling, and data validation.

Objectives

  1. Design a database schema capable of storing heterogeneous data from three different platforms
  2. Implement platform-specific extractors and transformers
  3. Build a unified data pipeline that handles errors gracefully
  4. Schedule the pipeline to run at regular intervals
  5. Analyze the collected data for quality issues and completeness
  6. Produce a data quality report after a simulated week of collection

Part 1: Schema Design

Requirements Analysis

Each platform provides different data structures:

Manifold Markets returns: - Market ID, question, description, creator username - Probability (0-1), volume (play money), liquidity - Creation time (Unix milliseconds), close time - Outcome type, resolution status, resolution value - Tags/groups

Metaculus returns: - Question ID, title, description, categories - Community prediction (median, quartiles) - Number of forecasters, comment count - Publication date, close date, resolution date - Resolution value, question type

Polymarket returns: - Condition ID, question, description - Token IDs for each outcome - Current prices from CLOB - Volume, liquidity - End date, active status

Unified Schema

Design your schema to accommodate all three platforms while maintaining a consistent structure. The key insight is to separate what changes rarely (market metadata) from what changes frequently (prices, volumes).

-- Core tables
CREATE TABLE platforms (
    id INTEGER PRIMARY KEY,
    name TEXT UNIQUE NOT NULL,
    display_name TEXT,
    api_type TEXT,         -- 'rest', 'graphql', 'blockchain'
    base_url TEXT,
    created_at TEXT DEFAULT (datetime('now'))
);

CREATE TABLE markets (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    platform_id INTEGER NOT NULL,
    external_id TEXT NOT NULL,
    question TEXT NOT NULL,
    description TEXT,
    category TEXT,
    outcome_type TEXT DEFAULT 'binary',
    status TEXT DEFAULT 'open',   -- open, closed, resolved, cancelled
    resolution TEXT,              -- yes, no, partial, N/A
    created_at TEXT,
    close_date TEXT,
    resolution_date TEXT,
    url TEXT,
    metadata_json TEXT,           -- Platform-specific extra fields
    first_seen TEXT DEFAULT (datetime('now')),
    last_updated TEXT DEFAULT (datetime('now')),
    UNIQUE(platform_id, external_id),
    FOREIGN KEY (platform_id) REFERENCES platforms(id)
);

CREATE TABLE price_snapshots (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    market_id INTEGER NOT NULL,
    yes_price REAL NOT NULL,
    no_price REAL,
    volume REAL,
    liquidity REAL,
    num_forecasters INTEGER,
    fetched_at TEXT NOT NULL DEFAULT (datetime('now')),
    FOREIGN KEY (market_id) REFERENCES markets(id)
);

-- Indexes for common query patterns
CREATE INDEX idx_markets_platform_status ON markets(platform_id, status);
CREATE INDEX idx_markets_category ON markets(category);
CREATE INDEX idx_markets_close_date ON markets(close_date);
CREATE INDEX idx_snapshots_market_time ON price_snapshots(market_id, fetched_at);
CREATE INDEX idx_snapshots_fetched ON price_snapshots(fetched_at);

-- Pipeline monitoring
CREATE TABLE pipeline_runs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    platform_id INTEGER NOT NULL,
    run_type TEXT NOT NULL,          -- 'scheduled', 'manual', 'backfill'
    started_at TEXT NOT NULL,
    completed_at TEXT,
    records_extracted INTEGER DEFAULT 0,
    records_transformed INTEGER DEFAULT 0,
    records_loaded INTEGER DEFAULT 0,
    records_failed INTEGER DEFAULT 0,
    status TEXT NOT NULL,            -- 'running', 'success', 'partial', 'failed'
    error_message TEXT,
    duration_seconds REAL,
    FOREIGN KEY (platform_id) REFERENCES platforms(id)
);

-- Data quality tracking
CREATE TABLE quality_checks (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pipeline_run_id INTEGER NOT NULL,
    check_name TEXT NOT NULL,
    check_result TEXT NOT NULL,      -- 'pass', 'fail', 'warning'
    details TEXT,
    affected_records INTEGER DEFAULT 0,
    total_records INTEGER DEFAULT 0,
    checked_at TEXT DEFAULT (datetime('now')),
    FOREIGN KEY (pipeline_run_id) REFERENCES pipeline_runs(id)
);

Task 1.1

Implement the schema creation as a Python function. Seed the platforms table with entries for Manifold, Metaculus, and Polymarket.

Part 2: Implementing Extractors

Manifold Extractor

class ManifoldWarehouseExtractor:
    """Extract market data from Manifold Markets."""

    def __init__(self, limit_per_run: int = 500):
        self.client = ManifoldClient()
        self.limit = limit_per_run

    def extract_active_markets(self) -> list:
        """Fetch currently active binary markets."""
        markets = self.client.get_markets(
            limit=self.limit,
            sort="liquidity",
            order="desc"
        )
        # Filter to binary markets only
        return [m for m in markets if m.get("outcomeType") == "BINARY"]

    def extract_recently_resolved(self, hours: int = 24) -> list:
        """Fetch markets that resolved in the last N hours."""
        cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
        cutoff_ms = int(cutoff.timestamp() * 1000)

        all_markets = self.client.get_markets(
            limit=self.limit,
            sort="updated-time",
            order="desc"
        )

        return [
            m for m in all_markets
            if m.get("isResolved")
            and m.get("resolutionTime", 0) >= cutoff_ms
            and m.get("outcomeType") == "BINARY"
        ]

Task 2.1

Implement extractors for Metaculus and Polymarket following the same pattern. Each extractor should: - Have both extract_active_markets() and extract_recently_resolved() methods - Handle API errors gracefully with try/except blocks - Log the number of records extracted

Part 3: Implementing Transformers

Each platform returns data in a different format. Transformers normalize this into our unified schema.

class ManifoldWarehouseTransformer:
    """Transform Manifold data into warehouse format."""

    PLATFORM_ID = 1  # Manifold's ID in the platforms table

    def transform_market(self, raw: dict) -> dict:
        """Transform a single Manifold market into warehouse format."""
        probability = raw.get("probability", 0)

        created_at = None
        if raw.get("createdTime"):
            created_at = datetime.fromtimestamp(
                raw["createdTime"] / 1000, tz=timezone.utc
            ).isoformat()

        close_date = None
        if raw.get("closeTime"):
            close_date = datetime.fromtimestamp(
                raw["closeTime"] / 1000, tz=timezone.utc
            ).isoformat()

        resolution_date = None
        if raw.get("resolutionTime"):
            resolution_date = datetime.fromtimestamp(
                raw["resolutionTime"] / 1000, tz=timezone.utc
            ).isoformat()

        return {
            "market": {
                "platform_id": self.PLATFORM_ID,
                "external_id": raw.get("id", ""),
                "question": raw.get("question", ""),
                "description": raw.get("textDescription", "")[:2000],
                "category": (raw.get("groupSlugs") or [None])[0],
                "outcome_type": "binary",
                "status": "resolved" if raw.get("isResolved") else "open",
                "resolution": raw.get("resolution"),
                "created_at": created_at,
                "close_date": close_date,
                "resolution_date": resolution_date,
                "url": raw.get("url", ""),
                "metadata_json": json.dumps({
                    "slug": raw.get("slug"),
                    "creator": raw.get("creatorUsername"),
                    "mechanism": raw.get("mechanism"),
                    "uniqueBettorCount": raw.get("uniqueBettorCount")
                })
            },
            "snapshot": {
                "yes_price": round(probability, 6),
                "no_price": round(1 - probability, 6),
                "volume": raw.get("volume"),
                "liquidity": raw.get("totalLiquidity"),
                "num_forecasters": raw.get("uniqueBettorCount"),
                "fetched_at": datetime.now(timezone.utc).isoformat()
            }
        }

    def transform_batch(self, raw_markets: list) -> list:
        """Transform a batch of markets, skipping failures."""
        results = []
        failed = 0

        for raw in raw_markets:
            try:
                transformed = self.transform_market(raw)
                results.append(transformed)
            except Exception as e:
                failed += 1
                logger.warning(
                    f"Transform failed for Manifold market "
                    f"{raw.get('id', '?')}: {e}"
                )

        logger.info(
            f"Transformed {len(results)}/{len(raw_markets)} "
            f"Manifold markets ({failed} failed)"
        )
        return results

Task 3.1

Implement transformers for Metaculus and Polymarket. Pay special attention to: - Metaculus community predictions (extract the median from the distribution) - Polymarket prices (convert from the CLOB format) - Consistent status mapping across platforms

Part 4: Building the Loader

class WarehouseLoader:
    """Load transformed data into the SQLite warehouse."""

    def __init__(self, db_path: str):
        self.db_path = db_path

    def load_batch(self, transformed_records: list) -> dict:
        """
        Load a batch of transformed records.

        Returns summary with counts of inserted, updated, and failed.
        """
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        summary = {"inserted": 0, "updated": 0, "failed": 0}

        try:
            for record in transformed_records:
                market_data = record["market"]
                snapshot_data = record["snapshot"]

                # Upsert market
                cursor.execute("""
                    INSERT INTO markets (
                        platform_id, external_id, question, description,
                        category, outcome_type, status, resolution,
                        created_at, close_date, resolution_date,
                        url, metadata_json, last_updated
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
                    ON CONFLICT(platform_id, external_id) DO UPDATE SET
                        status = excluded.status,
                        resolution = excluded.resolution,
                        resolution_date = excluded.resolution_date,
                        metadata_json = excluded.metadata_json,
                        last_updated = datetime('now')
                """, (
                    market_data["platform_id"],
                    market_data["external_id"],
                    market_data["question"],
                    market_data.get("description"),
                    market_data.get("category"),
                    market_data.get("outcome_type", "binary"),
                    market_data.get("status", "open"),
                    market_data.get("resolution"),
                    market_data.get("created_at"),
                    market_data.get("close_date"),
                    market_data.get("resolution_date"),
                    market_data.get("url"),
                    market_data.get("metadata_json")
                ))

                # Get market_id for snapshot
                cursor.execute("""
                    SELECT id FROM markets
                    WHERE platform_id = ? AND external_id = ?
                """, (market_data["platform_id"], market_data["external_id"]))

                row = cursor.fetchone()
                if row:
                    market_id = row[0]

                    # Insert price snapshot
                    cursor.execute("""
                        INSERT INTO price_snapshots (
                            market_id, yes_price, no_price,
                            volume, liquidity, num_forecasters, fetched_at
                        ) VALUES (?, ?, ?, ?, ?, ?, ?)
                    """, (
                        market_id,
                        snapshot_data["yes_price"],
                        snapshot_data.get("no_price"),
                        snapshot_data.get("volume"),
                        snapshot_data.get("liquidity"),
                        snapshot_data.get("num_forecasters"),
                        snapshot_data["fetched_at"]
                    ))

                    if cursor.rowcount > 0:
                        summary["inserted"] += 1
                    else:
                        summary["updated"] += 1

            conn.commit()

        except Exception as e:
            conn.rollback()
            summary["failed"] = len(transformed_records)
            logger.error(f"Batch load failed: {e}")
            raise

        finally:
            conn.close()

        return summary

Task 4.1

Add error handling to the loader so that individual record failures do not prevent the rest of the batch from loading. Track failed records and their error messages.

Part 5: Pipeline Orchestration

Wire together the extractors, transformers, and loader into a complete pipeline:

class WarehousePipeline:
    """Orchestrate the full ETL pipeline for the data warehouse."""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self.loader = WarehouseLoader(db_path)
        self.sources = []

    def register_source(self, name, extractor, transformer, platform_id):
        """Register a data source."""
        self.sources.append({
            "name": name,
            "extractor": extractor,
            "transformer": transformer,
            "platform_id": platform_id
        })

    def run(self, run_type: str = "scheduled") -> dict:
        """Execute the pipeline for all registered sources."""
        overall_summary = {
            "started_at": datetime.now(timezone.utc).isoformat(),
            "sources": {},
            "total_loaded": 0,
            "total_failed": 0
        }

        for source in self.sources:
            name = source["name"]
            logger.info(f"=== Processing {name} ===")

            # Log pipeline run start
            run_id = self._log_run_start(
                source["platform_id"], run_type
            )

            source_summary = {
                "extracted": 0,
                "transformed": 0,
                "loaded": 0,
                "failed": 0,
                "error": None
            }

            try:
                # Extract
                raw_data = source["extractor"].extract_active_markets()
                source_summary["extracted"] = len(raw_data)

                # Transform
                transformed = source["transformer"].transform_batch(raw_data)
                source_summary["transformed"] = len(transformed)

                # Load
                load_result = self.loader.load_batch(transformed)
                source_summary["loaded"] = load_result["inserted"]
                source_summary["failed"] = load_result["failed"]

                overall_summary["total_loaded"] += load_result["inserted"]
                overall_summary["total_failed"] += load_result["failed"]

                # Validate
                self._run_quality_checks(run_id, transformed)

                # Log success
                self._log_run_complete(
                    run_id, source_summary, "success"
                )

            except Exception as e:
                source_summary["error"] = str(e)
                overall_summary["total_failed"] += source_summary["extracted"]
                self._log_run_complete(
                    run_id, source_summary, "failed", str(e)
                )
                logger.error(f"Pipeline failed for {name}: {e}")

            overall_summary["sources"][name] = source_summary

        overall_summary["completed_at"] = datetime.now(timezone.utc).isoformat()
        return overall_summary

    def _log_run_start(self, platform_id, run_type):
        """Log the start of a pipeline run."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO pipeline_runs (platform_id, run_type, started_at, status)
            VALUES (?, ?, datetime('now'), 'running')
        """, (platform_id, run_type))
        run_id = cursor.lastrowid
        conn.commit()
        conn.close()
        return run_id

    def _log_run_complete(self, run_id, summary, status, error=None):
        """Log the completion of a pipeline run."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            UPDATE pipeline_runs SET
                completed_at = datetime('now'),
                records_extracted = ?,
                records_transformed = ?,
                records_loaded = ?,
                records_failed = ?,
                status = ?,
                error_message = ?,
                duration_seconds = (
                    julianday(datetime('now')) - julianday(started_at)
                ) * 86400
            WHERE id = ?
        """, (
            summary["extracted"],
            summary["transformed"],
            summary["loaded"],
            summary["failed"],
            status,
            error,
            run_id
        ))
        conn.commit()
        conn.close()

    def _run_quality_checks(self, run_id, records):
        """Run data quality checks and log results."""
        # Implement checks from Section 20.9
        pass

Task 5.1

Complete the _run_quality_checks method. Run at least 5 validation checks (price range, complement, question text, timestamps, duplicates) and log results to the quality_checks table.

Part 6: Simulated Week of Collection

Task 6.1

Set up the scheduled pipeline to run every hour. Since we cannot actually wait a week, simulate it:

def simulate_week(db_path: str, runs_per_day: int = 24):
    """
    Simulate a week of pipeline runs.

    In a real deployment, this would be handled by the scheduler.
    For the case study, we run the pipeline multiple times
    to approximate a week of data collection.
    """
    pipeline = setup_pipeline(db_path)

    total_runs = runs_per_day * 7  # 168 runs for a week
    logger.info(f"Simulating {total_runs} pipeline runs over 7 days")

    for run_number in range(total_runs):
        day = run_number // runs_per_day + 1
        hour = run_number % runs_per_day

        logger.info(f"--- Day {day}, Hour {hour:02d}:00 ---")

        summary = pipeline.run(run_type="scheduled")

        logger.info(
            f"Run {run_number + 1}/{total_runs}: "
            f"loaded={summary['total_loaded']}, "
            f"failed={summary['total_failed']}"
        )

        # Brief pause between runs (in real life this would be 1 hour)
        time.sleep(1)

Task 6.2

After the simulation, generate a comprehensive data quality report:

  1. Completeness: How many markets from each platform? What percentage of expected snapshots are present?
  2. Freshness: What is the average and maximum age of the most recent snapshot for each market?
  3. Consistency: Do yes + no prices sum to approximately 1.0? Are there any negative prices?
  4. Pipeline Health: What percentage of runs succeeded? What are the most common errors?
  5. Growth: How does the number of tracked markets change over the simulated week?

Deliverables

  1. A working Python script that creates the database, runs the pipeline, and generates the quality report
  2. The SQLite database file after the simulated week
  3. A written analysis (500-1000 words) discussing: - Which platform had the best data quality and why - What data quality issues you encountered - How you would improve the pipeline for production use - What additional data you would want to collect

Evaluation Criteria

  • Schema Design (20%): Appropriate normalization, indexes, constraints
  • ETL Implementation (30%): Correct extraction, transformation, and loading with proper error handling
  • Data Quality (25%): Comprehensive validation checks and meaningful quality metrics
  • Pipeline Robustness (15%): Graceful error handling, logging, monitoring
  • Analysis (10%): Insightful interpretation of the collected data