Case Study 1: Building a Prediction Market Data Warehouse
Overview
In this case study, you will design and build a SQLite-based data warehouse that collects, normalizes, and stores prediction market data from three platforms: Manifold Markets, Metaculus, and Polymarket. You will implement ETL pipelines for each platform, schedule automated collection, run the system for a simulated week, and analyze the resulting data quality.
This exercise ties together nearly every concept from Chapter 20: API clients, data transformation, database design, pipeline orchestration, scheduling, and data validation.
Objectives
- Design a database schema capable of storing heterogeneous data from three different platforms
- Implement platform-specific extractors and transformers
- Build a unified data pipeline that handles errors gracefully
- Schedule the pipeline to run at regular intervals
- Analyze the collected data for quality issues and completeness
- Produce a data quality report after a simulated week of collection
Part 1: Schema Design
Requirements Analysis
Each platform provides different data structures:
Manifold Markets returns: - Market ID, question, description, creator username - Probability (0-1), volume (play money), liquidity - Creation time (Unix milliseconds), close time - Outcome type, resolution status, resolution value - Tags/groups
Metaculus returns: - Question ID, title, description, categories - Community prediction (median, quartiles) - Number of forecasters, comment count - Publication date, close date, resolution date - Resolution value, question type
Polymarket returns: - Condition ID, question, description - Token IDs for each outcome - Current prices from CLOB - Volume, liquidity - End date, active status
Unified Schema
Design your schema to accommodate all three platforms while maintaining a consistent structure. The key insight is to separate what changes rarely (market metadata) from what changes frequently (prices, volumes).
-- Core tables
CREATE TABLE platforms (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
display_name TEXT,
api_type TEXT, -- 'rest', 'graphql', 'blockchain'
base_url TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE markets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform_id INTEGER NOT NULL,
external_id TEXT NOT NULL,
question TEXT NOT NULL,
description TEXT,
category TEXT,
outcome_type TEXT DEFAULT 'binary',
status TEXT DEFAULT 'open', -- open, closed, resolved, cancelled
resolution TEXT, -- yes, no, partial, N/A
created_at TEXT,
close_date TEXT,
resolution_date TEXT,
url TEXT,
metadata_json TEXT, -- Platform-specific extra fields
first_seen TEXT DEFAULT (datetime('now')),
last_updated TEXT DEFAULT (datetime('now')),
UNIQUE(platform_id, external_id),
FOREIGN KEY (platform_id) REFERENCES platforms(id)
);
CREATE TABLE price_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
market_id INTEGER NOT NULL,
yes_price REAL NOT NULL,
no_price REAL,
volume REAL,
liquidity REAL,
num_forecasters INTEGER,
fetched_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (market_id) REFERENCES markets(id)
);
-- Indexes for common query patterns
CREATE INDEX idx_markets_platform_status ON markets(platform_id, status);
CREATE INDEX idx_markets_category ON markets(category);
CREATE INDEX idx_markets_close_date ON markets(close_date);
CREATE INDEX idx_snapshots_market_time ON price_snapshots(market_id, fetched_at);
CREATE INDEX idx_snapshots_fetched ON price_snapshots(fetched_at);
-- Pipeline monitoring
CREATE TABLE pipeline_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform_id INTEGER NOT NULL,
run_type TEXT NOT NULL, -- 'scheduled', 'manual', 'backfill'
started_at TEXT NOT NULL,
completed_at TEXT,
records_extracted INTEGER DEFAULT 0,
records_transformed INTEGER DEFAULT 0,
records_loaded INTEGER DEFAULT 0,
records_failed INTEGER DEFAULT 0,
status TEXT NOT NULL, -- 'running', 'success', 'partial', 'failed'
error_message TEXT,
duration_seconds REAL,
FOREIGN KEY (platform_id) REFERENCES platforms(id)
);
-- Data quality tracking
CREATE TABLE quality_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_run_id INTEGER NOT NULL,
check_name TEXT NOT NULL,
check_result TEXT NOT NULL, -- 'pass', 'fail', 'warning'
details TEXT,
affected_records INTEGER DEFAULT 0,
total_records INTEGER DEFAULT 0,
checked_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (pipeline_run_id) REFERENCES pipeline_runs(id)
);
Task 1.1
Implement the schema creation as a Python function. Seed the platforms table with entries for Manifold, Metaculus, and Polymarket.
Part 2: Implementing Extractors
Manifold Extractor
class ManifoldWarehouseExtractor:
"""Extract market data from Manifold Markets."""
def __init__(self, limit_per_run: int = 500):
self.client = ManifoldClient()
self.limit = limit_per_run
def extract_active_markets(self) -> list:
"""Fetch currently active binary markets."""
markets = self.client.get_markets(
limit=self.limit,
sort="liquidity",
order="desc"
)
# Filter to binary markets only
return [m for m in markets if m.get("outcomeType") == "BINARY"]
def extract_recently_resolved(self, hours: int = 24) -> list:
"""Fetch markets that resolved in the last N hours."""
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
cutoff_ms = int(cutoff.timestamp() * 1000)
all_markets = self.client.get_markets(
limit=self.limit,
sort="updated-time",
order="desc"
)
return [
m for m in all_markets
if m.get("isResolved")
and m.get("resolutionTime", 0) >= cutoff_ms
and m.get("outcomeType") == "BINARY"
]
Task 2.1
Implement extractors for Metaculus and Polymarket following the same pattern. Each extractor should:
- Have both extract_active_markets() and extract_recently_resolved() methods
- Handle API errors gracefully with try/except blocks
- Log the number of records extracted
Part 3: Implementing Transformers
Each platform returns data in a different format. Transformers normalize this into our unified schema.
class ManifoldWarehouseTransformer:
"""Transform Manifold data into warehouse format."""
PLATFORM_ID = 1 # Manifold's ID in the platforms table
def transform_market(self, raw: dict) -> dict:
"""Transform a single Manifold market into warehouse format."""
probability = raw.get("probability", 0)
created_at = None
if raw.get("createdTime"):
created_at = datetime.fromtimestamp(
raw["createdTime"] / 1000, tz=timezone.utc
).isoformat()
close_date = None
if raw.get("closeTime"):
close_date = datetime.fromtimestamp(
raw["closeTime"] / 1000, tz=timezone.utc
).isoformat()
resolution_date = None
if raw.get("resolutionTime"):
resolution_date = datetime.fromtimestamp(
raw["resolutionTime"] / 1000, tz=timezone.utc
).isoformat()
return {
"market": {
"platform_id": self.PLATFORM_ID,
"external_id": raw.get("id", ""),
"question": raw.get("question", ""),
"description": raw.get("textDescription", "")[:2000],
"category": (raw.get("groupSlugs") or [None])[0],
"outcome_type": "binary",
"status": "resolved" if raw.get("isResolved") else "open",
"resolution": raw.get("resolution"),
"created_at": created_at,
"close_date": close_date,
"resolution_date": resolution_date,
"url": raw.get("url", ""),
"metadata_json": json.dumps({
"slug": raw.get("slug"),
"creator": raw.get("creatorUsername"),
"mechanism": raw.get("mechanism"),
"uniqueBettorCount": raw.get("uniqueBettorCount")
})
},
"snapshot": {
"yes_price": round(probability, 6),
"no_price": round(1 - probability, 6),
"volume": raw.get("volume"),
"liquidity": raw.get("totalLiquidity"),
"num_forecasters": raw.get("uniqueBettorCount"),
"fetched_at": datetime.now(timezone.utc).isoformat()
}
}
def transform_batch(self, raw_markets: list) -> list:
"""Transform a batch of markets, skipping failures."""
results = []
failed = 0
for raw in raw_markets:
try:
transformed = self.transform_market(raw)
results.append(transformed)
except Exception as e:
failed += 1
logger.warning(
f"Transform failed for Manifold market "
f"{raw.get('id', '?')}: {e}"
)
logger.info(
f"Transformed {len(results)}/{len(raw_markets)} "
f"Manifold markets ({failed} failed)"
)
return results
Task 3.1
Implement transformers for Metaculus and Polymarket. Pay special attention to: - Metaculus community predictions (extract the median from the distribution) - Polymarket prices (convert from the CLOB format) - Consistent status mapping across platforms
Part 4: Building the Loader
class WarehouseLoader:
"""Load transformed data into the SQLite warehouse."""
def __init__(self, db_path: str):
self.db_path = db_path
def load_batch(self, transformed_records: list) -> dict:
"""
Load a batch of transformed records.
Returns summary with counts of inserted, updated, and failed.
"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
summary = {"inserted": 0, "updated": 0, "failed": 0}
try:
for record in transformed_records:
market_data = record["market"]
snapshot_data = record["snapshot"]
# Upsert market
cursor.execute("""
INSERT INTO markets (
platform_id, external_id, question, description,
category, outcome_type, status, resolution,
created_at, close_date, resolution_date,
url, metadata_json, last_updated
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
ON CONFLICT(platform_id, external_id) DO UPDATE SET
status = excluded.status,
resolution = excluded.resolution,
resolution_date = excluded.resolution_date,
metadata_json = excluded.metadata_json,
last_updated = datetime('now')
""", (
market_data["platform_id"],
market_data["external_id"],
market_data["question"],
market_data.get("description"),
market_data.get("category"),
market_data.get("outcome_type", "binary"),
market_data.get("status", "open"),
market_data.get("resolution"),
market_data.get("created_at"),
market_data.get("close_date"),
market_data.get("resolution_date"),
market_data.get("url"),
market_data.get("metadata_json")
))
# Get market_id for snapshot
cursor.execute("""
SELECT id FROM markets
WHERE platform_id = ? AND external_id = ?
""", (market_data["platform_id"], market_data["external_id"]))
row = cursor.fetchone()
if row:
market_id = row[0]
# Insert price snapshot
cursor.execute("""
INSERT INTO price_snapshots (
market_id, yes_price, no_price,
volume, liquidity, num_forecasters, fetched_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
market_id,
snapshot_data["yes_price"],
snapshot_data.get("no_price"),
snapshot_data.get("volume"),
snapshot_data.get("liquidity"),
snapshot_data.get("num_forecasters"),
snapshot_data["fetched_at"]
))
if cursor.rowcount > 0:
summary["inserted"] += 1
else:
summary["updated"] += 1
conn.commit()
except Exception as e:
conn.rollback()
summary["failed"] = len(transformed_records)
logger.error(f"Batch load failed: {e}")
raise
finally:
conn.close()
return summary
Task 4.1
Add error handling to the loader so that individual record failures do not prevent the rest of the batch from loading. Track failed records and their error messages.
Part 5: Pipeline Orchestration
Wire together the extractors, transformers, and loader into a complete pipeline:
class WarehousePipeline:
"""Orchestrate the full ETL pipeline for the data warehouse."""
def __init__(self, db_path: str):
self.db_path = db_path
self.loader = WarehouseLoader(db_path)
self.sources = []
def register_source(self, name, extractor, transformer, platform_id):
"""Register a data source."""
self.sources.append({
"name": name,
"extractor": extractor,
"transformer": transformer,
"platform_id": platform_id
})
def run(self, run_type: str = "scheduled") -> dict:
"""Execute the pipeline for all registered sources."""
overall_summary = {
"started_at": datetime.now(timezone.utc).isoformat(),
"sources": {},
"total_loaded": 0,
"total_failed": 0
}
for source in self.sources:
name = source["name"]
logger.info(f"=== Processing {name} ===")
# Log pipeline run start
run_id = self._log_run_start(
source["platform_id"], run_type
)
source_summary = {
"extracted": 0,
"transformed": 0,
"loaded": 0,
"failed": 0,
"error": None
}
try:
# Extract
raw_data = source["extractor"].extract_active_markets()
source_summary["extracted"] = len(raw_data)
# Transform
transformed = source["transformer"].transform_batch(raw_data)
source_summary["transformed"] = len(transformed)
# Load
load_result = self.loader.load_batch(transformed)
source_summary["loaded"] = load_result["inserted"]
source_summary["failed"] = load_result["failed"]
overall_summary["total_loaded"] += load_result["inserted"]
overall_summary["total_failed"] += load_result["failed"]
# Validate
self._run_quality_checks(run_id, transformed)
# Log success
self._log_run_complete(
run_id, source_summary, "success"
)
except Exception as e:
source_summary["error"] = str(e)
overall_summary["total_failed"] += source_summary["extracted"]
self._log_run_complete(
run_id, source_summary, "failed", str(e)
)
logger.error(f"Pipeline failed for {name}: {e}")
overall_summary["sources"][name] = source_summary
overall_summary["completed_at"] = datetime.now(timezone.utc).isoformat()
return overall_summary
def _log_run_start(self, platform_id, run_type):
"""Log the start of a pipeline run."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO pipeline_runs (platform_id, run_type, started_at, status)
VALUES (?, ?, datetime('now'), 'running')
""", (platform_id, run_type))
run_id = cursor.lastrowid
conn.commit()
conn.close()
return run_id
def _log_run_complete(self, run_id, summary, status, error=None):
"""Log the completion of a pipeline run."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
UPDATE pipeline_runs SET
completed_at = datetime('now'),
records_extracted = ?,
records_transformed = ?,
records_loaded = ?,
records_failed = ?,
status = ?,
error_message = ?,
duration_seconds = (
julianday(datetime('now')) - julianday(started_at)
) * 86400
WHERE id = ?
""", (
summary["extracted"],
summary["transformed"],
summary["loaded"],
summary["failed"],
status,
error,
run_id
))
conn.commit()
conn.close()
def _run_quality_checks(self, run_id, records):
"""Run data quality checks and log results."""
# Implement checks from Section 20.9
pass
Task 5.1
Complete the _run_quality_checks method. Run at least 5 validation checks (price range, complement, question text, timestamps, duplicates) and log results to the quality_checks table.
Part 6: Simulated Week of Collection
Task 6.1
Set up the scheduled pipeline to run every hour. Since we cannot actually wait a week, simulate it:
def simulate_week(db_path: str, runs_per_day: int = 24):
"""
Simulate a week of pipeline runs.
In a real deployment, this would be handled by the scheduler.
For the case study, we run the pipeline multiple times
to approximate a week of data collection.
"""
pipeline = setup_pipeline(db_path)
total_runs = runs_per_day * 7 # 168 runs for a week
logger.info(f"Simulating {total_runs} pipeline runs over 7 days")
for run_number in range(total_runs):
day = run_number // runs_per_day + 1
hour = run_number % runs_per_day
logger.info(f"--- Day {day}, Hour {hour:02d}:00 ---")
summary = pipeline.run(run_type="scheduled")
logger.info(
f"Run {run_number + 1}/{total_runs}: "
f"loaded={summary['total_loaded']}, "
f"failed={summary['total_failed']}"
)
# Brief pause between runs (in real life this would be 1 hour)
time.sleep(1)
Task 6.2
After the simulation, generate a comprehensive data quality report:
- Completeness: How many markets from each platform? What percentage of expected snapshots are present?
- Freshness: What is the average and maximum age of the most recent snapshot for each market?
- Consistency: Do yes + no prices sum to approximately 1.0? Are there any negative prices?
- Pipeline Health: What percentage of runs succeeded? What are the most common errors?
- Growth: How does the number of tracked markets change over the simulated week?
Deliverables
- A working Python script that creates the database, runs the pipeline, and generates the quality report
- The SQLite database file after the simulated week
- A written analysis (500-1000 words) discussing: - Which platform had the best data quality and why - What data quality issues you encountered - How you would improve the pipeline for production use - What additional data you would want to collect
Evaluation Criteria
- Schema Design (20%): Appropriate normalization, indexes, constraints
- ETL Implementation (30%): Correct extraction, transformation, and loading with proper error handling
- Data Quality (25%): Comprehensive validation checks and meaningful quality metrics
- Pipeline Robustness (15%): Graceful error handling, logging, monitoring
- Analysis (10%): Insightful interpretation of the collected data