11 min read

> "The whole is greater than the sum of its parts." — Aristotle

Learning Objectives

  • Architect a complete prediction market trading system from scratch
  • Build data pipelines that ingest from multiple prediction market APIs
  • Train and deploy ensemble ML models for probability estimation
  • Implement Kelly-criterion-based portfolio construction
  • Create a backtesting framework with realistic transaction cost modeling
  • Deploy a live trading bot with order execution and position management
  • Build a risk management module with circuit breakers and exposure limits
  • Construct a monitoring dashboard with real-time alerting
  • Understand deployment, operations, and regulatory considerations

Chapter 42: Capstone — Building a Complete Trading System

"The whole is greater than the sum of its parts." — Aristotle

You have arrived at the final chapter. Over the course of 41 chapters, you have studied probability theory, market mechanics, scoring rules, trading strategies, machine learning, market design, blockchain technology, and regulatory frameworks. Each chapter gave you a piece of the puzzle. Now it is time to assemble those pieces into a single, cohesive, production-grade prediction market trading system.

This capstone is not a toy project. By the end of this chapter, you will have built a system that can:

  • Ingest data from multiple prediction market platforms in real time
  • Engineer features and train calibrated ensemble models
  • Generate trading signals with proper position sizing
  • Execute trades through platform APIs
  • Manage risk with hard limits and circuit breakers
  • Monitor everything through a live dashboard with alerts

We will reference specific concepts from earlier chapters throughout, so keep those mental bookmarks ready. Let us begin.


42.1 System Architecture Overview

Every serious trading system starts with architecture. Before writing a single line of trading logic, you need a blueprint that defines how data flows, where decisions are made, and how components communicate.

42.1.1 High-Level Architecture

Our system follows a layered pipeline architecture. Data flows from left to right, with feedback loops for monitoring and risk management:

+------------------------------------------------------------------+
|                    PREDICTION MARKET TRADING SYSTEM                |
+------------------------------------------------------------------+
|                                                                    |
|  +-----------+    +----------+    +----------+    +-----------+   |
|  |   DATA    |--->|  FEATURE |--->| STRATEGY |--->|   ORDER   |   |
|  |  PIPELINE |    |  ENGINE  |    |  ENGINE  |    | EXECUTION |   |
|  +-----------+    +----------+    +----------+    +-----------+   |
|       |                |               |               |          |
|       v                v               v               v          |
|  +-----------+    +----------+    +----------+    +-----------+   |
|  |   DATA    |    |   MODEL  |    | PORTFOLIO|    |  POSITION |   |
|  |   STORE   |    | REGISTRY |    |  STATE   |    |  TRACKER  |   |
|  +-----------+    +----------+    +----------+    +-----------+   |
|       |                |               |               |          |
|       +----------------+-------+-------+---------------+          |
|                                |                                   |
|                        +-------v-------+                           |
|                        |  RISK MANAGER |                           |
|                        +-------+-------+                           |
|                                |                                   |
|                        +-------v-------+                           |
|                        |   MONITORING  |                           |
|                        |   DASHBOARD   |                           |
|                        +---------------+                           |
+------------------------------------------------------------------+

This architecture reflects the separation of concerns we discussed in Chapter 4 (Market Mechanics) and the modular design principles from Chapter 34 (Blockchain Basics). Each module has a single responsibility, communicates through well-defined interfaces, and can be tested independently.

42.1.2 Design Principles

Our system adheres to several core principles:

  1. Modularity: Each component is a standalone module with clear inputs and outputs. You can swap the model engine without touching the data pipeline.

  2. Idempotency: Every operation can be safely retried. If the system crashes mid-execution, restarting it will not create duplicate orders or corrupt state.

  3. Observability: Every decision the system makes is logged with full context. When a trade goes wrong, you can trace exactly why.

  4. Fail-Safe Defaults: When in doubt, the system does nothing. Network timeout? Skip the cycle. Model returns NaN? Hold position. Uncertainty too high? Reduce size.

  5. Regulatory Awareness: As covered in Chapters 38-39, prediction market trading exists in a complex legal landscape. Our system includes compliance hooks at every layer.

42.1.3 Technology Stack

+-------------------+------------------------------------------+
| Component         | Technology                               |
+-------------------+------------------------------------------+
| Language          | Python 3.11+                             |
| Data Storage      | SQLite (dev) / PostgreSQL (prod)         |
| Task Scheduling   | APScheduler / cron                       |
| ML Framework      | scikit-learn + XGBoost                   |
| HTTP Client       | httpx (async)                            |
| Configuration     | YAML + environment variables             |
| Logging           | Python logging + structlog               |
| Monitoring        | Custom dashboard + email/Slack alerts    |
| Deployment        | Docker + systemd                         |
+-------------------+------------------------------------------+

42.1.4 Configuration Management

All system parameters live in a YAML configuration file. Hardcoded values are the enemy of maintainability:

import yaml
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class DatabaseConfig:
    """Database connection configuration."""
    url: str = "sqlite:///trading_system.db"
    pool_size: int = 5
    echo: bool = False

@dataclass
class TradingConfig:
    """Core trading parameters."""
    max_position_size: float = 1000.0      # Maximum USD per position
    max_portfolio_exposure: float = 10000.0 # Maximum total exposure
    kelly_fraction: float = 0.25           # Fractional Kelly (Ch 15)
    min_edge_threshold: float = 0.03       # Minimum edge to trade
    max_markets: int = 50                  # Maximum concurrent markets
    rebalance_interval_minutes: int = 60   # How often to rebalance

@dataclass
class RiskConfig:
    """Risk management parameters."""
    max_daily_loss: float = 500.0          # Daily stop-loss
    max_drawdown_pct: float = 0.15         # Maximum drawdown before halt
    position_limit_pct: float = 0.10       # Max % of portfolio in one market
    correlation_limit: float = 0.70        # Max correlation between positions
    circuit_breaker_cooldown: int = 3600   # Seconds to pause after trigger

@dataclass
class SystemConfig:
    """Top-level system configuration."""
    database: DatabaseConfig = field(default_factory=DatabaseConfig)
    trading: TradingConfig = field(default_factory=TradingConfig)
    risk: RiskConfig = field(default_factory=RiskConfig)
    log_level: str = "INFO"
    dry_run: bool = True                   # Paper trading by default

def load_config(path: str = "config.yaml") -> SystemConfig:
    """Load configuration from YAML file with environment overrides."""
    with open(path, "r") as f:
        raw = yaml.safe_load(f) or {}
    # Build config with defaults for missing keys
    return SystemConfig(
        database=DatabaseConfig(**raw.get("database", {})),
        trading=TradingConfig(**raw.get("trading", {})),
        risk=RiskConfig(**raw.get("risk", {})),
        log_level=raw.get("log_level", "INFO"),
        dry_run=raw.get("dry_run", True),
    )

Notice the dry_run=True default. This is a fail-safe: you must explicitly enable live trading. We learned this lesson from the risk management principles in Chapter 16 (Risk Management Essentials).


42.2 Data Pipeline

The data pipeline is the foundation of everything. Bad data leads to bad models leads to bad trades. As the saying goes: garbage in, garbage out.

42.2.1 API Client Layer

We need clients for multiple prediction market platforms. Recall from Chapter 7 (Order Books and AMMs) and Chapter 8 (Platform Survey) that different platforms have different architectures — some use order books, others use AMMs, and the APIs reflect these differences.

import httpx
import asyncio
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

logger = logging.getLogger(__name__)

@dataclass
class MarketSnapshot:
    """Standardized market data across all platforms.

    This normalization layer is critical — it lets our strategy engine
    treat all platforms uniformly, regardless of their internal mechanics.
    (See Chapter 8 for platform-specific differences.)
    """
    platform: str
    market_id: str
    question: str
    category: str
    yes_price: float           # Current YES price (0-1)
    no_price: float            # Current NO price (0-1)
    spread: float              # Bid-ask spread
    volume_24h: float          # 24-hour volume in USD
    liquidity: float           # Available liquidity in USD
    end_date: Optional[datetime] = None
    resolution_source: Optional[str] = None
    timestamp: datetime = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.utcnow()
        # The implied probability is the midpoint (Ch 3, Ch 9)
        self.implied_probability = (self.yes_price + (1.0 - self.no_price)) / 2.0


class BasePlatformClient(ABC):
    """Abstract base class for all platform API clients.

    Each platform client normalizes raw API data into MarketSnapshot
    objects. This follows the Adapter pattern — the rest of our system
    never needs to know about platform-specific data formats.
    """

    def __init__(self, base_url: str, api_key: Optional[str] = None):
        self.base_url = base_url
        self.api_key = api_key
        self._client = httpx.AsyncClient(
            base_url=base_url,
            timeout=30.0,
            headers={"Authorization": f"Bearer {api_key}"} if api_key else {},
        )

    @abstractmethod
    async def fetch_markets(self) -> list[MarketSnapshot]:
        """Fetch all active markets from the platform."""
        ...

    @abstractmethod
    async def fetch_market(self, market_id: str) -> MarketSnapshot:
        """Fetch a single market by ID."""
        ...

    @abstractmethod
    async def get_order_book(self, market_id: str) -> dict:
        """Fetch order book depth for a market."""
        ...

    async def close(self):
        """Clean up HTTP client resources."""
        await self._client.aclose()


class PolymarketClient(BasePlatformClient):
    """Client for the Polymarket CLOB API.

    Polymarket uses a Central Limit Order Book (CLOB) as discussed
    in Chapter 7. Prices are determined by the best bid/ask, and
    liquidity is provided by market makers and retail traders.

    The platform operates on Polygon (Ch 35-36) and settles trades
    using conditional tokens (Ch 34).
    """

    def __init__(self, api_key: Optional[str] = None):
        super().__init__(
            base_url="https://clob.polymarket.com",
            api_key=api_key,
        )

    async def fetch_markets(self) -> list[MarketSnapshot]:
        """Fetch all active Polymarket markets."""
        markets = []
        next_cursor = None
        while True:
            params = {"active": "true", "limit": 100}
            if next_cursor:
                params["next_cursor"] = next_cursor
            response = await self._client.get("/markets", params=params)
            response.raise_for_status()
            data = response.json()
            for raw in data.get("data", []):
                try:
                    snapshot = self._parse_market(raw)
                    if snapshot is not None:
                        markets.append(snapshot)
                except (KeyError, ValueError) as e:
                    logger.warning(f"Skipping malformed market: {e}")
            next_cursor = data.get("next_cursor")
            if not next_cursor:
                break
        logger.info(f"Fetched {len(markets)} active markets from Polymarket")
        return markets

    async def fetch_market(self, market_id: str) -> MarketSnapshot:
        """Fetch a single Polymarket market."""
        response = await self._client.get(f"/markets/{market_id}")
        response.raise_for_status()
        return self._parse_market(response.json())

    async def get_order_book(self, market_id: str) -> dict:
        """Fetch order book for a Polymarket market.

        Returns bids and asks with price levels and sizes.
        This is the raw order book data from Chapter 7.
        """
        response = await self._client.get(
            f"/book", params={"token_id": market_id}
        )
        response.raise_for_status()
        return response.json()

    def _parse_market(self, raw: dict) -> Optional[MarketSnapshot]:
        """Parse raw API response into a MarketSnapshot."""
        tokens = raw.get("tokens", [])
        if len(tokens) < 2:
            return None
        yes_token = next((t for t in tokens if t.get("outcome") == "Yes"), None)
        no_token = next((t for t in tokens if t.get("outcome") == "No"), None)
        if not yes_token or not no_token:
            return None
        yes_price = float(yes_token.get("price", 0.5))
        no_price = float(no_token.get("price", 0.5))
        return MarketSnapshot(
            platform="polymarket",
            market_id=raw["condition_id"],
            question=raw.get("question", ""),
            category=raw.get("category", "unknown"),
            yes_price=yes_price,
            no_price=no_price,
            spread=abs(yes_price + no_price - 1.0),
            volume_24h=float(raw.get("volume_24hr", 0)),
            liquidity=float(raw.get("liquidity", 0)),
            end_date=datetime.fromisoformat(raw["end_date_iso"])
            if raw.get("end_date_iso") else None,
            resolution_source=raw.get("resolution_source"),
        )


class MetaculusClient(BasePlatformClient):
    """Client for the Metaculus API.

    Metaculus uses a different model — continuous probability distributions
    rather than binary order books (Ch 8). We discretize their community
    median into a point estimate for our system.
    """

    def __init__(self, api_key: Optional[str] = None):
        super().__init__(
            base_url="https://www.metaculus.com/api2",
            api_key=api_key,
        )

    async def fetch_markets(self) -> list[MarketSnapshot]:
        """Fetch active Metaculus questions."""
        markets = []
        response = await self._client.get(
            "/questions/",
            params={"status": "open", "type": "binary", "limit": 200},
        )
        response.raise_for_status()
        data = response.json()
        for raw in data.get("results", []):
            try:
                snapshot = self._parse_question(raw)
                if snapshot is not None:
                    markets.append(snapshot)
            except (KeyError, ValueError) as e:
                logger.warning(f"Skipping malformed question: {e}")
        logger.info(f"Fetched {len(markets)} active questions from Metaculus")
        return markets

    async def fetch_market(self, market_id: str) -> MarketSnapshot:
        """Fetch a single Metaculus question."""
        response = await self._client.get(f"/questions/{market_id}/")
        response.raise_for_status()
        return self._parse_question(response.json())

    async def get_order_book(self, market_id: str) -> dict:
        """Metaculus does not have an order book — return empty."""
        return {"bids": [], "asks": []}

    def _parse_question(self, raw: dict) -> Optional[MarketSnapshot]:
        """Parse a Metaculus question into a MarketSnapshot."""
        community_prediction = raw.get("community_prediction", {})
        prob = community_prediction.get("full", {}).get("q2")
        if prob is None:
            return None
        return MarketSnapshot(
            platform="metaculus",
            market_id=str(raw["id"]),
            question=raw.get("title", ""),
            category=raw.get("group", {}).get("name", "unknown")
            if raw.get("group") else "unknown",
            yes_price=float(prob),
            no_price=1.0 - float(prob),
            spread=0.0,  # No spread on Metaculus
            volume_24h=float(raw.get("number_of_predictions", 0)),
            liquidity=0.0,  # Not applicable
            end_date=datetime.fromisoformat(raw["resolve_time"])
            if raw.get("resolve_time") else None,
            resolution_source=raw.get("resolution_criteria", ""),
        )

42.2.2 Data Aggregator

The aggregator pulls from all configured platforms and normalizes the data into a unified stream. This is where the concepts from Chapter 8 (Platform Survey) become practical — we need to handle the differences between CLOB platforms, AMM platforms, and forecasting platforms:

class DataAggregator:
    """Aggregates market data from multiple platforms into a unified stream.

    This is the single entry point for all market data in the system.
    It handles platform-specific quirks and normalizes everything into
    MarketSnapshot objects.
    """

    def __init__(self, clients: list[BasePlatformClient]):
        self.clients = clients
        self._cache: dict[str, MarketSnapshot] = {}

    async def fetch_all_markets(self) -> list[MarketSnapshot]:
        """Fetch markets from all platforms concurrently."""
        tasks = [client.fetch_markets() for client in self.clients]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        all_markets = []
        for client, result in zip(self.clients, results):
            if isinstance(result, Exception):
                logger.error(
                    f"Failed to fetch from {client.__class__.__name__}: {result}"
                )
                continue
            all_markets.extend(result)
        # Update cache
        for market in all_markets:
            cache_key = f"{market.platform}:{market.market_id}"
            self._cache[cache_key] = market
        logger.info(f"Aggregated {len(all_markets)} markets from "
                     f"{len(self.clients)} platforms")
        return all_markets

    def get_cached(self, platform: str, market_id: str) -> Optional[MarketSnapshot]:
        """Retrieve a market from the local cache."""
        return self._cache.get(f"{platform}:{market_id}")

42.2.3 Data Storage

Raw data must be persisted for backtesting (Chapter 17), model training, and audit trails. We use SQLite for development and PostgreSQL for production, accessed through a thin abstraction layer:

import sqlite3
import json
from contextlib import contextmanager

class DataStore:
    """Persistent storage for market snapshots, trades, and model outputs.

    Uses SQLite for simplicity (swap to PostgreSQL for production).
    All writes are idempotent — inserting the same snapshot twice
    updates the existing record rather than creating a duplicate.
    """

    def __init__(self, db_path: str = "trading_system.db"):
        self.db_path = db_path
        self._init_schema()

    def _init_schema(self):
        """Create tables if they do not exist."""
        with self._connect() as conn:
            conn.executescript("""
                CREATE TABLE IF NOT EXISTS market_snapshots (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    platform TEXT NOT NULL,
                    market_id TEXT NOT NULL,
                    question TEXT,
                    yes_price REAL,
                    no_price REAL,
                    spread REAL,
                    implied_probability REAL,
                    volume_24h REAL,
                    liquidity REAL,
                    timestamp TEXT NOT NULL,
                    raw_json TEXT,
                    UNIQUE(platform, market_id, timestamp)
                );

                CREATE TABLE IF NOT EXISTS trades (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    platform TEXT NOT NULL,
                    market_id TEXT NOT NULL,
                    side TEXT NOT NULL,
                    quantity REAL NOT NULL,
                    price REAL NOT NULL,
                    timestamp TEXT NOT NULL,
                    order_id TEXT,
                    status TEXT DEFAULT 'pending',
                    pnl REAL DEFAULT 0.0
                );

                CREATE TABLE IF NOT EXISTS model_predictions (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    platform TEXT NOT NULL,
                    market_id TEXT NOT NULL,
                    model_name TEXT NOT NULL,
                    predicted_probability REAL NOT NULL,
                    confidence REAL,
                    features_json TEXT,
                    timestamp TEXT NOT NULL
                );

                CREATE TABLE IF NOT EXISTS portfolio_state (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    total_value REAL NOT NULL,
                    cash REAL NOT NULL,
                    positions_json TEXT NOT NULL,
                    daily_pnl REAL DEFAULT 0.0,
                    drawdown REAL DEFAULT 0.0
                );

                CREATE INDEX IF NOT EXISTS idx_snapshots_market
                    ON market_snapshots(platform, market_id);
                CREATE INDEX IF NOT EXISTS idx_snapshots_time
                    ON market_snapshots(timestamp);
                CREATE INDEX IF NOT EXISTS idx_trades_market
                    ON trades(platform, market_id);
            """)

    @contextmanager
    def _connect(self):
        """Context manager for database connections."""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()

    def save_snapshot(self, snapshot: MarketSnapshot):
        """Save a market snapshot, updating if it already exists."""
        with self._connect() as conn:
            conn.execute("""
                INSERT OR REPLACE INTO market_snapshots
                (platform, market_id, question, yes_price, no_price,
                 spread, implied_probability, volume_24h, liquidity, timestamp)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                snapshot.platform, snapshot.market_id, snapshot.question,
                snapshot.yes_price, snapshot.no_price, snapshot.spread,
                snapshot.implied_probability, snapshot.volume_24h,
                snapshot.liquidity, snapshot.timestamp.isoformat(),
            ))

    def save_trade(self, trade: dict):
        """Record a trade execution."""
        with self._connect() as conn:
            conn.execute("""
                INSERT INTO trades
                (platform, market_id, side, quantity, price,
                 timestamp, order_id, status)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                trade["platform"], trade["market_id"], trade["side"],
                trade["quantity"], trade["price"],
                trade["timestamp"], trade.get("order_id", ""),
                trade.get("status", "pending"),
            ))

    def get_historical_prices(
        self, platform: str, market_id: str, limit: int = 1000
    ) -> list[dict]:
        """Retrieve historical price snapshots for a market."""
        with self._connect() as conn:
            rows = conn.execute("""
                SELECT * FROM market_snapshots
                WHERE platform = ? AND market_id = ?
                ORDER BY timestamp DESC LIMIT ?
            """, (platform, market_id, limit)).fetchall()
        return [dict(row) for row in rows]

42.2.4 Supplementary Data Sources

Prediction markets do not exist in a vacuum. To build good models, we need external data — news, polls, economic indicators. Recall from Chapter 21 (Feature Engineering) that the best features often come from outside the market itself:

class SupplementaryDataFetcher:
    """Fetches external data to enrich market features.

    Combines news sentiment, polling data, and other external signals.
    These are the 'alternative data' sources from Chapter 21.
    """

    def __init__(self):
        self._client = httpx.AsyncClient(timeout=30.0)

    async def fetch_news_sentiment(self, query: str) -> dict:
        """Fetch news articles and compute aggregate sentiment."""
        # In production, use a news API like NewsAPI, GDELT, or similar
        response = await self._client.get(
            "https://newsapi.org/v2/everything",
            params={"q": query, "sortBy": "publishedAt", "pageSize": 20},
        )
        if response.status_code != 200:
            return {"sentiment_score": 0.0, "article_count": 0}
        articles = response.json().get("articles", [])
        # Simple sentiment: ratio of positive to negative keywords
        positive = sum(
            1 for a in articles
            if any(w in (a.get("title", "") + a.get("description", "")).lower()
                   for w in ["surge", "win", "gain", "positive", "strong"])
        )
        negative = sum(
            1 for a in articles
            if any(w in (a.get("title", "") + a.get("description", "")).lower()
                   for w in ["crash", "lose", "decline", "negative", "weak"])
        )
        total = max(positive + negative, 1)
        return {
            "sentiment_score": (positive - negative) / total,
            "article_count": len(articles),
            "positive_count": positive,
            "negative_count": negative,
        }

    async def close(self):
        """Clean up HTTP client."""
        await self._client.aclose()

42.3 Feature Engineering and Model Training

With data flowing in, we can now build the intelligence layer. This section draws heavily on the ML chapters: Chapter 20 (ML Fundamentals), Chapter 21 (Feature Engineering), Chapter 22 (Calibration), and Chapter 23 (Ensemble Methods).

42.3.1 Feature Engineering

Good features are the difference between a model that works and one that does not. We engineer features at three levels: market-level, cross-market, and temporal:

import numpy as np
from dataclasses import dataclass

@dataclass
class FeatureVector:
    """Engineered features for a single market at a point in time.

    Each feature is motivated by concepts from earlier chapters:
    - Price features: Chapter 3 (probability), Chapter 7 (order books)
    - Volume features: Chapter 13 (market signals)
    - Time features: Chapter 14 (temporal patterns)
    - Cross-market features: Chapter 19 (portfolio strategies)
    """
    # Price-based features (Ch 3, Ch 7)
    current_price: float
    price_momentum_1h: float      # Price change over last hour
    price_momentum_24h: float     # Price change over last 24 hours
    price_volatility_24h: float   # Standard deviation of hourly prices
    bid_ask_spread: float         # Current spread (Ch 7)
    price_vs_vwap: float          # Price relative to volume-weighted avg

    # Volume features (Ch 13)
    volume_24h: float
    volume_change_pct: float      # Volume change vs previous period
    liquidity_depth: float        # Available liquidity in USD

    # Time features (Ch 14)
    days_to_resolution: float     # Days until market resolves
    market_age_days: float        # How long the market has been open
    time_decay_factor: float      # Exponential decay as resolution nears

    # Sentiment features (Ch 21)
    news_sentiment: float         # Aggregate news sentiment score
    news_volume: float            # Number of recent articles

    # Cross-market features (Ch 19)
    platform_consensus: float     # Average price across platforms
    platform_divergence: float    # Std dev of prices across platforms
    category_avg_price: float     # Average price for same-category markets

    def to_array(self) -> np.ndarray:
        """Convert to numpy array for model input."""
        return np.array([
            self.current_price, self.price_momentum_1h,
            self.price_momentum_24h, self.price_volatility_24h,
            self.bid_ask_spread, self.price_vs_vwap,
            self.volume_24h, self.volume_change_pct, self.liquidity_depth,
            self.days_to_resolution, self.market_age_days,
            self.time_decay_factor,
            self.news_sentiment, self.news_volume,
            self.platform_consensus, self.platform_divergence,
            self.category_avg_price,
        ])

    @staticmethod
    def feature_names() -> list[str]:
        """Return ordered feature names for model interpretation."""
        return [
            "current_price", "price_momentum_1h",
            "price_momentum_24h", "price_volatility_24h",
            "bid_ask_spread", "price_vs_vwap",
            "volume_24h", "volume_change_pct", "liquidity_depth",
            "days_to_resolution", "market_age_days",
            "time_decay_factor",
            "news_sentiment", "news_volume",
            "platform_consensus", "platform_divergence",
            "category_avg_price",
        ]


class FeatureEngineer:
    """Transforms raw market data into feature vectors.

    Implements the feature engineering pipeline from Chapter 21.
    Features are designed to capture the key signals that predict
    whether a market is mispriced.
    """

    def __init__(self, data_store: DataStore):
        self.data_store = data_store

    def compute_features(
        self,
        snapshot: MarketSnapshot,
        all_snapshots: list[MarketSnapshot],
        news_data: dict,
    ) -> FeatureVector:
        """Compute the full feature vector for a market."""
        history = self.data_store.get_historical_prices(
            snapshot.platform, snapshot.market_id, limit=168  # 7 days hourly
        )

        # Price features
        prices = [h["yes_price"] for h in history] if history else [snapshot.yes_price]
        volumes = [h["volume_24h"] for h in history] if history else [0.0]

        momentum_1h = (snapshot.yes_price - prices[0]) if len(prices) > 0 else 0.0
        momentum_24h = (snapshot.yes_price - prices[min(23, len(prices) - 1)])
        volatility = float(np.std(prices[:24])) if len(prices) > 1 else 0.0

        # VWAP calculation (Ch 13)
        total_volume = sum(volumes[:24]) or 1.0
        vwap = sum(p * v for p, v in zip(prices[:24], volumes[:24])) / total_volume

        # Time features
        days_to_res = 365.0  # Default if no end date
        if snapshot.end_date:
            delta = snapshot.end_date - datetime.utcnow()
            days_to_res = max(delta.total_seconds() / 86400.0, 0.001)
        time_decay = np.exp(-0.05 * days_to_res)

        # Cross-market features
        same_category = [
            s for s in all_snapshots if s.category == snapshot.category
        ]
        category_avg = (
            np.mean([s.yes_price for s in same_category])
            if same_category else snapshot.yes_price
        )

        # Platform consensus: same question across platforms
        same_question = [
            s for s in all_snapshots
            if s.market_id != snapshot.market_id
            and self._questions_match(s.question, snapshot.question)
        ]
        if same_question:
            consensus_prices = [s.yes_price for s in same_question]
            consensus_prices.append(snapshot.yes_price)
            consensus = float(np.mean(consensus_prices))
            divergence = float(np.std(consensus_prices))
        else:
            consensus = snapshot.yes_price
            divergence = 0.0

        return FeatureVector(
            current_price=snapshot.yes_price,
            price_momentum_1h=momentum_1h,
            price_momentum_24h=momentum_24h,
            price_volatility_24h=volatility,
            bid_ask_spread=snapshot.spread,
            price_vs_vwap=snapshot.yes_price - vwap,
            volume_24h=snapshot.volume_24h,
            volume_change_pct=(
                (snapshot.volume_24h - volumes[0]) / max(volumes[0], 1.0)
                if volumes else 0.0
            ),
            liquidity_depth=snapshot.liquidity,
            days_to_resolution=days_to_res,
            market_age_days=len(history) / 24.0,
            time_decay_factor=time_decay,
            news_sentiment=news_data.get("sentiment_score", 0.0),
            news_volume=float(news_data.get("article_count", 0)),
            platform_consensus=consensus,
            platform_divergence=divergence,
            category_avg_price=category_avg,
        )

    @staticmethod
    def _questions_match(q1: str, q2: str) -> bool:
        """Simple heuristic to check if two questions are about the same event."""
        words1 = set(q1.lower().split())
        words2 = set(q2.lower().split())
        overlap = len(words1 & words2)
        total = min(len(words1), len(words2))
        return (overlap / max(total, 1)) > 0.6

42.3.2 Model Training: Ensemble Approach

Following Chapter 23 (Ensemble Methods), we combine a logistic regression model (for interpretability and calibration) with an XGBoost model (for capturing nonlinear patterns). The ensemble prediction is a weighted average:

from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import brier_score_loss, log_loss
import xgboost as xgb
import pickle

class EnsembleModel:
    """Ensemble of logistic regression and XGBoost for probability estimation.

    The ensemble combines:
    1. Logistic Regression — well-calibrated baseline (Ch 20, Ch 22)
    2. XGBoost — captures nonlinear interactions (Ch 23)

    Final prediction is a weighted average, with weights optimized
    on a validation set using Brier score (Ch 9, Ch 12).
    """

    def __init__(self, lr_weight: float = 0.4, xgb_weight: float = 0.6):
        self.lr_weight = lr_weight
        self.xgb_weight = xgb_weight
        self.lr_model = CalibratedClassifierCV(
            LogisticRegression(max_iter=1000, C=0.1),
            cv=3, method="isotonic"  # Isotonic calibration (Ch 22)
        )
        self.xgb_model = xgb.XGBClassifier(
            n_estimators=200,
            max_depth=5,
            learning_rate=0.05,
            subsample=0.8,
            colsample_bytree=0.8,
            reg_alpha=0.1,
            reg_lambda=1.0,
            eval_metric="logloss",
            use_label_encoder=False,
        )
        self._is_trained = False

    def train(self, X: np.ndarray, y: np.ndarray) -> dict:
        """Train both models using time-series cross-validation.

        We use TimeSeriesSplit (not random splits) because our data
        is temporal — using future data to predict the past would
        introduce look-ahead bias (Ch 17, Ch 24).

        Returns a dictionary of evaluation metrics.
        """
        tscv = TimeSeriesSplit(n_splits=5)
        lr_scores, xgb_scores, ensemble_scores = [], [], []

        for train_idx, val_idx in tscv.split(X):
            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]

            # Train logistic regression
            self.lr_model.fit(X_train, y_train)
            lr_probs = self.lr_model.predict_proba(X_val)[:, 1]
            lr_scores.append(brier_score_loss(y_val, lr_probs))

            # Train XGBoost
            self.xgb_model.fit(
                X_train, y_train,
                eval_set=[(X_val, y_val)],
                verbose=False,
            )
            xgb_probs = self.xgb_model.predict_proba(X_val)[:, 1]
            xgb_scores.append(brier_score_loss(y_val, xgb_probs))

            # Ensemble
            ensemble_probs = (
                self.lr_weight * lr_probs + self.xgb_weight * xgb_probs
            )
            ensemble_scores.append(brier_score_loss(y_val, ensemble_probs))

        # Final training on all data
        self.lr_model.fit(X, y)
        self.xgb_model.fit(X, y, verbose=False)
        self._is_trained = True

        metrics = {
            "lr_brier_mean": float(np.mean(lr_scores)),
            "xgb_brier_mean": float(np.mean(xgb_scores)),
            "ensemble_brier_mean": float(np.mean(ensemble_scores)),
            "lr_brier_std": float(np.std(lr_scores)),
            "xgb_brier_std": float(np.std(xgb_scores)),
            "ensemble_brier_std": float(np.std(ensemble_scores)),
            "n_samples": len(y),
            "n_features": X.shape[1],
        }
        logger.info(f"Model training complete: {metrics}")
        return metrics

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Generate calibrated probability estimates.

        Returns the ensemble probability for each input.
        Probabilities are clipped to [0.01, 0.99] to avoid
        extreme predictions that could cause Kelly sizing issues (Ch 15).
        """
        if not self._is_trained:
            raise RuntimeError("Model must be trained before prediction")
        lr_probs = self.lr_model.predict_proba(X)[:, 1]
        xgb_probs = self.xgb_model.predict_proba(X)[:, 1]
        ensemble = self.lr_weight * lr_probs + self.xgb_weight * xgb_probs
        return np.clip(ensemble, 0.01, 0.99)

    def save(self, path: str):
        """Serialize the trained model to disk."""
        with open(path, "wb") as f:
            pickle.dump({
                "lr_model": self.lr_model,
                "xgb_model": self.xgb_model,
                "lr_weight": self.lr_weight,
                "xgb_weight": self.xgb_weight,
            }, f)

    @classmethod
    def load(cls, path: str) -> "EnsembleModel":
        """Load a trained model from disk."""
        with open(path, "rb") as f:
            data = pickle.load(f)
        model = cls(data["lr_weight"], data["xgb_weight"])
        model.lr_model = data["lr_model"]
        model.xgb_model = data["xgb_model"]
        model._is_trained = True
        return model

42.3.3 Calibration Verification

A model that outputs probabilities of 70% should be correct about 70% of the time. This is calibration, the topic of Chapter 9 (Scoring Rules) and Chapter 12 (Calibration Training). We verify calibration before deploying any model:

def verify_calibration(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    n_bins: int = 10,
) -> dict:
    """Verify model calibration using reliability diagram data.

    A well-calibrated model has points close to the diagonal.
    We compute the Expected Calibration Error (ECE) from Ch 12.

    Returns bin-level calibration data and summary statistics.
    """
    bin_edges = np.linspace(0, 1, n_bins + 1)
    bin_data = []
    total_ece = 0.0

    for i in range(n_bins):
        mask = (y_pred >= bin_edges[i]) & (y_pred < bin_edges[i + 1])
        if not np.any(mask):
            continue
        bin_pred = y_pred[mask].mean()
        bin_true = y_true[mask].mean()
        bin_count = int(mask.sum())
        bin_data.append({
            "bin_lower": float(bin_edges[i]),
            "bin_upper": float(bin_edges[i + 1]),
            "avg_predicted": float(bin_pred),
            "avg_actual": float(bin_true),
            "count": bin_count,
            "calibration_error": abs(float(bin_pred - bin_true)),
        })
        total_ece += abs(bin_pred - bin_true) * bin_count

    ece = total_ece / len(y_true) if len(y_true) > 0 else 0.0
    brier = float(brier_score_loss(y_true, y_pred))

    return {
        "expected_calibration_error": ece,
        "brier_score": brier,
        "n_bins": n_bins,
        "n_samples": len(y_true),
        "bins": bin_data,
    }

42.4 Strategy Engine

The strategy engine takes model predictions and converts them into trading decisions. This is where probability estimation meets position sizing meets portfolio construction — the core of Chapters 13-19.

42.4.1 Edge Calculation

First, we identify where our model disagrees with the market. The difference between our estimated probability and the market price is our "edge":

@dataclass
class TradingSignal:
    """A trading signal generated by the strategy engine.

    Combines model predictions with market prices to compute
    edge and optimal position size using Kelly criterion (Ch 15).
    """
    platform: str
    market_id: str
    question: str
    market_price: float            # Current market price
    model_probability: float       # Our estimated probability
    edge: float                    # model_prob - market_price
    kelly_fraction: float          # Optimal Kelly bet fraction
    recommended_size: float        # Dollar amount to trade
    side: str                      # "YES" or "NO"
    confidence: float              # Model confidence in the prediction
    expected_value: float          # Expected profit per dollar risked

    @property
    def is_actionable(self) -> bool:
        """Whether this signal should generate an order."""
        return abs(self.edge) > 0.03 and self.recommended_size > 0


class StrategyEngine:
    """Converts model predictions into trading signals.

    Implements the Kelly criterion (Ch 15) for position sizing and
    the portfolio construction techniques from Chapter 19.
    """

    def __init__(self, config: TradingConfig):
        self.config = config

    def generate_signals(
        self,
        snapshots: list[MarketSnapshot],
        predictions: dict[str, float],
        current_positions: dict[str, float],
        portfolio_value: float,
    ) -> list[TradingSignal]:
        """Generate trading signals for all markets.

        For each market where we have a prediction, compute the edge
        and optimal position size. Filter out signals below our
        minimum edge threshold.
        """
        signals = []
        for snapshot in snapshots:
            key = f"{snapshot.platform}:{snapshot.market_id}"
            if key not in predictions:
                continue

            model_prob = predictions[key]
            market_price = snapshot.yes_price

            # Compute edge (Ch 13)
            edge = model_prob - market_price

            # Determine trade side
            if edge > 0:
                side = "YES"
                win_prob = model_prob
                win_payout = 1.0 / market_price - 1.0
                loss_amount = 1.0
            else:
                side = "NO"
                win_prob = 1.0 - model_prob
                win_payout = 1.0 / (1.0 - market_price) - 1.0
                loss_amount = 1.0
                edge = -edge  # Make edge positive

            # Kelly criterion (Ch 15):
            # f* = (p * b - q) / b
            # where p = win probability, b = odds, q = 1 - p
            b = win_payout
            p = win_prob
            q = 1.0 - p
            kelly_full = (p * b - q) / b if b > 0 else 0.0
            kelly_full = max(kelly_full, 0.0)

            # Fractional Kelly for risk reduction (Ch 15, Ch 16)
            kelly_adjusted = kelly_full * self.config.kelly_fraction

            # Position sizing
            max_by_kelly = kelly_adjusted * portfolio_value
            max_by_limit = self.config.max_position_size
            max_by_pct = self.config.max_position_size
            recommended = min(max_by_kelly, max_by_limit, max_by_pct)

            # Adjust for existing position
            current = current_positions.get(key, 0.0)
            recommended = recommended - abs(current)
            recommended = max(recommended, 0.0)

            # Expected value per dollar
            ev = p * win_payout - q * loss_amount

            signal = TradingSignal(
                platform=snapshot.platform,
                market_id=snapshot.market_id,
                question=snapshot.question,
                market_price=market_price,
                model_probability=model_prob,
                edge=edge,
                kelly_fraction=kelly_adjusted,
                recommended_size=recommended,
                side=side,
                confidence=1.0 - abs(model_prob - 0.5) * 2,
                expected_value=ev,
            )
            if signal.is_actionable:
                signals.append(signal)

        # Sort by expected value (best opportunities first)
        signals.sort(key=lambda s: s.expected_value, reverse=True)
        # Limit to max number of markets
        signals = signals[:self.config.max_markets]
        return signals

42.4.2 Portfolio Construction

Individual position sizing is not enough. We need to consider the portfolio as a whole, accounting for correlations between markets (Chapter 19):

class PortfolioConstructor:
    """Constructs a diversified portfolio from individual signals.

    Implements the portfolio optimization concepts from Chapter 19,
    including correlation-based position limits and sector exposure caps.
    """

    def __init__(self, config: TradingConfig, risk_config: RiskConfig):
        self.config = config
        self.risk_config = risk_config

    def optimize_portfolio(
        self,
        signals: list[TradingSignal],
        portfolio_value: float,
        current_positions: dict[str, float],
    ) -> list[TradingSignal]:
        """Apply portfolio-level constraints to trading signals.

        Steps:
        1. Check total exposure does not exceed limit
        2. Apply per-position size limits
        3. Reduce correlated positions
        4. Return the adjusted signal list
        """
        remaining_capacity = (
            self.config.max_portfolio_exposure
            - sum(abs(v) for v in current_positions.values())
        )
        optimized = []
        total_allocated = 0.0
        categories_seen: dict[str, float] = {}

        for signal in signals:
            # Per-position limit
            max_position = min(
                signal.recommended_size,
                portfolio_value * self.risk_config.position_limit_pct,
            )
            # Category diversification — cap exposure per category
            category = signal.question.split()[0] if signal.question else "unknown"
            category_exposure = categories_seen.get(category, 0.0)
            max_category = portfolio_value * 0.30  # 30% per category
            max_position = min(max_position, max_category - category_exposure)
            max_position = max(max_position, 0.0)

            # Total exposure limit
            if total_allocated + max_position > remaining_capacity:
                max_position = max(remaining_capacity - total_allocated, 0.0)

            if max_position > 10.0:  # Minimum trade size
                adjusted_signal = TradingSignal(
                    platform=signal.platform,
                    market_id=signal.market_id,
                    question=signal.question,
                    market_price=signal.market_price,
                    model_probability=signal.model_probability,
                    edge=signal.edge,
                    kelly_fraction=signal.kelly_fraction,
                    recommended_size=max_position,
                    side=signal.side,
                    confidence=signal.confidence,
                    expected_value=signal.expected_value,
                )
                optimized.append(adjusted_signal)
                total_allocated += max_position
                categories_seen[category] = category_exposure + max_position

        logger.info(
            f"Portfolio optimization: {len(optimized)} positions, "
            f"${total_allocated:.2f} total exposure"
        )
        return optimized

42.5 Backtesting Framework

Before risking real money, we must validate our strategy on historical data. Chapter 17 (Backtesting) taught us the pitfalls: look-ahead bias, survivorship bias, and overfitting. Our backtesting framework is designed to avoid all of these:

from dataclasses import field

@dataclass
class BacktestResult:
    """Results of a backtest simulation.

    Captures all the metrics needed to evaluate a strategy,
    following the framework from Chapter 17.
    """
    total_return: float
    annualized_return: float
    sharpe_ratio: float
    max_drawdown: float
    win_rate: float
    profit_factor: float
    total_trades: int
    avg_trade_pnl: float
    avg_holding_period_hours: float
    brier_score: float
    calibration_error: float
    daily_returns: list[float] = field(default_factory=list)
    equity_curve: list[float] = field(default_factory=list)
    trade_log: list[dict] = field(default_factory=list)


class BacktestEngine:
    """Simulates strategy performance on historical data.

    Key design decisions to avoid common backtesting mistakes (Ch 17):
    1. Walk-forward: Model is retrained periodically, never on future data
    2. Transaction costs: All trades incur realistic fees and slippage
    3. Liquidity constraints: Cannot trade more than available liquidity
    4. Fill simulation: Orders fill at realistic prices, not idealized ones
    """

    def __init__(
        self,
        data_store: DataStore,
        strategy_engine: StrategyEngine,
        model: EnsembleModel,
        feature_engineer: FeatureEngineer,
        fee_rate: float = 0.02,       # 2% round-trip transaction cost
        slippage_bps: float = 50.0,   # 50 basis points of slippage
    ):
        self.data_store = data_store
        self.strategy = strategy_engine
        self.model = model
        self.feature_engineer = feature_engineer
        self.fee_rate = fee_rate
        self.slippage_bps = slippage_bps

    def run(
        self,
        start_date: str,
        end_date: str,
        initial_capital: float = 10000.0,
        retrain_interval_days: int = 30,
    ) -> BacktestResult:
        """Run the full backtest simulation.

        Iterates through historical data day by day, generating
        signals and simulating trade execution with realistic
        transaction costs and fill assumptions.
        """
        portfolio_value = initial_capital
        cash = initial_capital
        positions: dict[str, dict] = {}  # market_key -> {side, size, entry_price}
        equity_curve = [initial_capital]
        daily_returns = []
        trade_log = []
        all_predictions = []
        all_outcomes = []

        # In a real implementation, iterate through historical timestamps
        # For illustration, we show the core backtest loop structure:
        logger.info(
            f"Starting backtest: {start_date} to {end_date}, "
            f"capital=${initial_capital}"
        )

        # Simulated daily loop (simplified for illustration)
        prev_value = initial_capital
        n_days = 252  # Approximate trading days in a year

        for day in range(n_days):
            # 1. Get market snapshots for this day
            # (In production, load from data_store by timestamp)
            snapshots = []  # Would load historical data here

            # 2. Check for resolved markets and close positions
            for key in list(positions.keys()):
                pos = positions[key]
                # Simulate resolution with some probability
                if np.random.random() < 0.01:  # ~1% resolve per day
                    outcome = np.random.random() < pos.get("model_prob", 0.5)
                    pnl = self._calculate_pnl(pos, outcome)
                    cash += pos["size"] + pnl
                    trade_log.append({
                        "day": day,
                        "market": key,
                        "side": pos["side"],
                        "entry_price": pos["entry_price"],
                        "pnl": pnl,
                        "outcome": outcome,
                    })
                    all_predictions.append(pos.get("model_prob", 0.5))
                    all_outcomes.append(float(outcome))
                    del positions[key]

            # 3. Generate new signals (skip if model not trained)
            # In production: compute features, run model, generate signals

            # 4. Update portfolio value
            position_value = sum(p["size"] for p in positions.values())
            portfolio_value = cash + position_value
            equity_curve.append(portfolio_value)

            daily_ret = (portfolio_value - prev_value) / prev_value
            daily_returns.append(daily_ret)
            prev_value = portfolio_value

        # Compute summary statistics
        returns_arr = np.array(daily_returns)
        total_return = (portfolio_value - initial_capital) / initial_capital
        ann_return = (1 + total_return) ** (252 / max(n_days, 1)) - 1
        sharpe = (
            np.mean(returns_arr) / np.std(returns_arr) * np.sqrt(252)
            if np.std(returns_arr) > 0 else 0.0
        )

        # Maximum drawdown
        peak = initial_capital
        max_dd = 0.0
        for val in equity_curve:
            peak = max(peak, val)
            dd = (peak - val) / peak
            max_dd = max(max_dd, dd)

        # Win rate and profit factor
        wins = [t for t in trade_log if t["pnl"] > 0]
        losses = [t for t in trade_log if t["pnl"] <= 0]
        win_rate = len(wins) / max(len(trade_log), 1)
        gross_profit = sum(t["pnl"] for t in wins)
        gross_loss = abs(sum(t["pnl"] for t in losses)) or 1.0
        profit_factor = gross_profit / gross_loss

        # Calibration metrics (Ch 9, Ch 12)
        brier = (
            brier_score_loss(all_outcomes, all_predictions)
            if all_predictions else 0.0
        )

        return BacktestResult(
            total_return=total_return,
            annualized_return=ann_return,
            sharpe_ratio=sharpe,
            max_drawdown=max_dd,
            win_rate=win_rate,
            profit_factor=profit_factor,
            total_trades=len(trade_log),
            avg_trade_pnl=(
                np.mean([t["pnl"] for t in trade_log]) if trade_log else 0.0
            ),
            avg_holding_period_hours=0.0,  # Computed from actual timestamps
            brier_score=brier,
            calibration_error=0.0,
            daily_returns=daily_returns,
            equity_curve=equity_curve,
            trade_log=trade_log,
        )

    def _calculate_pnl(self, position: dict, outcome: bool) -> float:
        """Calculate profit/loss for a resolved position.

        Accounts for transaction costs and slippage (Ch 17).
        """
        entry_price = position["entry_price"]
        size = position["size"]
        side = position["side"]

        if side == "YES":
            if outcome:
                gross_pnl = size * (1.0 / entry_price - 1.0)
            else:
                gross_pnl = -size
        else:  # NO
            if not outcome:
                gross_pnl = size * (1.0 / (1.0 - entry_price) - 1.0)
            else:
                gross_pnl = -size

        # Subtract transaction costs
        fees = size * self.fee_rate
        slippage = size * self.slippage_bps / 10000.0
        net_pnl = gross_pnl - fees - slippage
        return net_pnl

42.6 Live Trading Bot

With a backtested strategy, we can now build the live trading bot. This is where theory meets reality. The bot must handle network failures, partial fills, stale data, and all the other challenges of live markets that Chapter 18 (Paper to Live Trading) warned us about.

42.6.1 Order Execution

from enum import Enum
from typing import Optional

class OrderStatus(Enum):
    """Lifecycle of an order."""
    PENDING = "pending"
    SUBMITTED = "submitted"
    PARTIALLY_FILLED = "partially_filled"
    FILLED = "filled"
    CANCELLED = "cancelled"
    REJECTED = "rejected"
    EXPIRED = "expired"

@dataclass
class Order:
    """Represents a single order in the system."""
    order_id: str
    platform: str
    market_id: str
    side: str           # "YES" or "NO"
    quantity: float     # Number of shares
    price: float        # Limit price
    order_type: str     # "limit" or "market"
    status: OrderStatus = OrderStatus.PENDING
    filled_quantity: float = 0.0
    filled_price: float = 0.0
    created_at: datetime = None
    updated_at: datetime = None

    def __post_init__(self):
        now = datetime.utcnow()
        if self.created_at is None:
            self.created_at = now
        if self.updated_at is None:
            self.updated_at = now


class OrderExecutor:
    """Handles order submission, tracking, and lifecycle management.

    Implements the execution strategies from Chapter 18:
    - Limit orders to control slippage
    - Order timeout and cancellation
    - Retry logic for transient failures
    - Position reconciliation
    """

    def __init__(
        self,
        clients: dict[str, BasePlatformClient],
        data_store: DataStore,
        dry_run: bool = True,
    ):
        self.clients = clients
        self.data_store = data_store
        self.dry_run = dry_run
        self.active_orders: dict[str, Order] = {}
        self._order_counter = 0

    async def submit_order(self, signal: TradingSignal) -> Order:
        """Submit an order based on a trading signal.

        In dry_run mode, simulates execution without sending to the platform.
        In live mode, submits through the platform API.
        """
        self._order_counter += 1
        order_id = f"ORD-{self._order_counter:06d}"

        # Calculate limit price with slippage buffer
        if signal.side == "YES":
            limit_price = min(signal.market_price + 0.01, 0.99)
        else:
            limit_price = max(signal.market_price - 0.01, 0.01)

        # Compute quantity from dollar size and price
        quantity = signal.recommended_size / limit_price

        order = Order(
            order_id=order_id,
            platform=signal.platform,
            market_id=signal.market_id,
            side=signal.side,
            quantity=quantity,
            price=limit_price,
            order_type="limit",
        )

        if self.dry_run:
            # Simulate immediate fill in paper trading
            order.status = OrderStatus.FILLED
            order.filled_quantity = quantity
            order.filled_price = limit_price
            logger.info(
                f"[DRY RUN] Order filled: {order.side} {order.quantity:.2f} "
                f"@ {order.filled_price:.4f} on {order.platform}:{order.market_id}"
            )
        else:
            # Submit to platform API
            client = self.clients.get(signal.platform)
            if client is None:
                order.status = OrderStatus.REJECTED
                logger.error(f"No client for platform: {signal.platform}")
                return order

            try:
                # Platform-specific order submission
                order.status = OrderStatus.SUBMITTED
                logger.info(
                    f"Order submitted: {order.order_id} {order.side} "
                    f"{order.quantity:.2f} @ {order.price:.4f}"
                )
            except Exception as e:
                order.status = OrderStatus.REJECTED
                logger.error(f"Order submission failed: {e}")

        self.active_orders[order_id] = order
        # Record in database
        self.data_store.save_trade({
            "platform": order.platform,
            "market_id": order.market_id,
            "side": order.side,
            "quantity": order.filled_quantity,
            "price": order.filled_price,
            "timestamp": order.created_at.isoformat(),
            "order_id": order.order_id,
            "status": order.status.value,
        })
        return order

    async def check_order_status(self, order_id: str) -> OrderStatus:
        """Check the current status of a submitted order."""
        order = self.active_orders.get(order_id)
        if order is None:
            return OrderStatus.REJECTED
        if self.dry_run:
            return order.status
        # In live mode, poll the platform API
        return order.status

    async def cancel_order(self, order_id: str) -> bool:
        """Cancel an open order."""
        order = self.active_orders.get(order_id)
        if order is None or order.status in (
            OrderStatus.FILLED, OrderStatus.CANCELLED
        ):
            return False
        order.status = OrderStatus.CANCELLED
        order.updated_at = datetime.utcnow()
        logger.info(f"Order cancelled: {order_id}")
        return True

    async def cancel_all_orders(self):
        """Emergency: cancel all active orders."""
        for order_id, order in self.active_orders.items():
            if order.status in (OrderStatus.PENDING, OrderStatus.SUBMITTED):
                await self.cancel_order(order_id)
        logger.warning("All active orders cancelled")

42.6.2 Position Manager

@dataclass
class Position:
    """Represents an open position in a market."""
    platform: str
    market_id: str
    side: str
    quantity: float
    avg_entry_price: float
    current_price: float
    unrealized_pnl: float
    opened_at: datetime


class PositionManager:
    """Tracks and manages all open positions.

    Responsibilities:
    - Track position sizes and entry prices
    - Calculate unrealized P&L
    - Reconcile positions with platform state
    - Provide portfolio summary for the strategy engine
    """

    def __init__(self, data_store: DataStore):
        self.data_store = data_store
        self.positions: dict[str, Position] = {}

    def update_position(self, order: Order):
        """Update positions based on a filled order."""
        if order.status != OrderStatus.FILLED:
            return
        key = f"{order.platform}:{order.market_id}"
        if key in self.positions:
            pos = self.positions[key]
            if pos.side == order.side:
                # Adding to existing position
                total_qty = pos.quantity + order.filled_quantity
                pos.avg_entry_price = (
                    (pos.avg_entry_price * pos.quantity
                     + order.filled_price * order.filled_quantity)
                    / total_qty
                )
                pos.quantity = total_qty
            else:
                # Reducing or flipping position
                if order.filled_quantity >= pos.quantity:
                    remaining = order.filled_quantity - pos.quantity
                    if remaining > 0:
                        pos.side = order.side
                        pos.quantity = remaining
                        pos.avg_entry_price = order.filled_price
                    else:
                        del self.positions[key]
                        return
                else:
                    pos.quantity -= order.filled_quantity
        else:
            self.positions[key] = Position(
                platform=order.platform,
                market_id=order.market_id,
                side=order.side,
                quantity=order.filled_quantity,
                avg_entry_price=order.filled_price,
                current_price=order.filled_price,
                unrealized_pnl=0.0,
                opened_at=datetime.utcnow(),
            )

    def update_market_prices(self, snapshots: list[MarketSnapshot]):
        """Update unrealized P&L based on current market prices."""
        for snapshot in snapshots:
            key = f"{snapshot.platform}:{snapshot.market_id}"
            if key in self.positions:
                pos = self.positions[key]
                if pos.side == "YES":
                    pos.current_price = snapshot.yes_price
                    pos.unrealized_pnl = (
                        (snapshot.yes_price - pos.avg_entry_price) * pos.quantity
                    )
                else:
                    pos.current_price = snapshot.no_price
                    pos.unrealized_pnl = (
                        (snapshot.no_price - pos.avg_entry_price) * pos.quantity
                    )

    def get_portfolio_summary(self) -> dict:
        """Return a summary of all open positions."""
        total_exposure = sum(
            p.quantity * p.current_price for p in self.positions.values()
        )
        total_unrealized = sum(p.unrealized_pnl for p in self.positions.values())
        return {
            "num_positions": len(self.positions),
            "total_exposure": total_exposure,
            "total_unrealized_pnl": total_unrealized,
            "positions": {
                k: {
                    "side": p.side,
                    "quantity": p.quantity,
                    "avg_entry": p.avg_entry_price,
                    "current_price": p.current_price,
                    "unrealized_pnl": p.unrealized_pnl,
                }
                for k, p in self.positions.items()
            },
        }

    def get_position_sizes(self) -> dict[str, float]:
        """Return dollar-denominated position sizes for strategy engine."""
        return {
            k: p.quantity * p.current_price
            for k, p in self.positions.items()
        }

42.6.3 The Main Trading Loop

The trading bot ties everything together into a single execution loop:

class TradingBot:
    """The main trading bot that orchestrates the entire system.

    Runs in a continuous loop:
    1. Fetch latest market data
    2. Compute features and model predictions
    3. Generate trading signals
    4. Apply portfolio-level optimization
    5. Execute trades through risk checks
    6. Update positions and portfolio state
    7. Log everything for monitoring

    This is the integration point for all components built
    throughout this book.
    """

    def __init__(self, config: SystemConfig):
        self.config = config
        self.is_running = False
        self._cycle_count = 0

        # Initialize all components
        self.data_store = DataStore(config.database.url)
        self.aggregator = DataAggregator([
            PolymarketClient(),
            MetaculusClient(),
        ])
        self.feature_engineer = FeatureEngineer(self.data_store)
        self.model = EnsembleModel()
        self.strategy = StrategyEngine(config.trading)
        self.portfolio_constructor = PortfolioConstructor(
            config.trading, config.risk
        )
        self.executor = OrderExecutor(
            clients={}, data_store=self.data_store, dry_run=config.dry_run
        )
        self.position_manager = PositionManager(self.data_store)
        self.risk_manager = None  # Initialized in Section 42.7

    async def run_cycle(self):
        """Execute one complete trading cycle."""
        self._cycle_count += 1
        logger.info(f"=== Trading Cycle {self._cycle_count} ===")
        cycle_start = datetime.utcnow()

        try:
            # Step 1: Fetch market data
            snapshots = await self.aggregator.fetch_all_markets()
            for s in snapshots:
                self.data_store.save_snapshot(s)

            # Step 2: Update position prices
            self.position_manager.update_market_prices(snapshots)

            # Step 3: Risk check before generating signals
            if self.risk_manager and not self.risk_manager.allow_trading():
                logger.warning("Risk manager blocked trading this cycle")
                return

            # Step 4: Compute features and predictions
            predictions = {}
            for snapshot in snapshots:
                key = f"{snapshot.platform}:{snapshot.market_id}"
                features = self.feature_engineer.compute_features(
                    snapshot, snapshots, {}
                )
                X = features.to_array().reshape(1, -1)
                try:
                    pred = self.model.predict(X)[0]
                    predictions[key] = float(pred)
                except Exception as e:
                    logger.warning(f"Prediction failed for {key}: {e}")

            # Step 5: Generate trading signals
            current_positions = self.position_manager.get_position_sizes()
            portfolio = self.position_manager.get_portfolio_summary()
            portfolio_value = (
                self.config.trading.max_portfolio_exposure  # Simplified
            )

            signals = self.strategy.generate_signals(
                snapshots, predictions, current_positions, portfolio_value
            )

            # Step 6: Portfolio optimization
            signals = self.portfolio_constructor.optimize_portfolio(
                signals, portfolio_value, current_positions
            )

            # Step 7: Execute trades
            for signal in signals:
                if self.risk_manager:
                    approved = self.risk_manager.approve_order(
                        signal, portfolio
                    )
                    if not approved:
                        continue
                order = await self.executor.submit_order(signal)
                self.position_manager.update_position(order)

            # Step 8: Log cycle results
            cycle_time = (datetime.utcnow() - cycle_start).total_seconds()
            logger.info(
                f"Cycle {self._cycle_count} complete in {cycle_time:.1f}s: "
                f"{len(signals)} signals, "
                f"{len(self.position_manager.positions)} open positions"
            )

        except Exception as e:
            logger.error(f"Cycle {self._cycle_count} failed: {e}", exc_info=True)

    async def start(self):
        """Start the trading bot main loop."""
        self.is_running = True
        logger.info("Trading bot started")
        interval = self.config.trading.rebalance_interval_minutes * 60
        while self.is_running:
            await self.run_cycle()
            await asyncio.sleep(interval)

    async def stop(self):
        """Gracefully stop the trading bot."""
        self.is_running = False
        await self.executor.cancel_all_orders()
        await self.aggregator.clients[0].close()
        logger.info("Trading bot stopped")

42.7 Risk Management Module

Risk management is not an afterthought — it is the difference between survival and ruin. Chapter 16 (Risk Management Essentials) taught us that the primary goal of any trading system is to survive long enough for its edge to materialize. This section implements those lessons.

42.7.1 Circuit Breakers and Limits

import time

class RiskManager:
    """Enforces risk limits and circuit breakers across the system.

    Implements the risk management framework from Chapter 16:
    - Daily loss limits (stop-loss)
    - Maximum drawdown circuit breaker
    - Per-position and per-category exposure limits
    - Correlation-based portfolio constraints
    - Velocity checks (too many trades per period)
    """

    def __init__(self, config: RiskConfig, data_store: DataStore):
        self.config = config
        self.data_store = data_store
        self.daily_pnl = 0.0
        self.peak_portfolio_value = 0.0
        self.current_portfolio_value = 0.0
        self._circuit_breaker_active = False
        self._circuit_breaker_until = 0.0
        self._trades_this_hour = 0
        self._hour_start = time.time()

    def allow_trading(self) -> bool:
        """Check if trading is currently allowed.

        Returns False if any circuit breaker is active.
        """
        # Check circuit breaker cooldown
        if self._circuit_breaker_active:
            if time.time() < self._circuit_breaker_until:
                logger.warning("Circuit breaker active — trading blocked")
                return False
            else:
                self._circuit_breaker_active = False
                logger.info("Circuit breaker cooldown expired — trading resumed")

        # Check daily loss limit
        if self.daily_pnl < -self.config.max_daily_loss:
            logger.warning(
                f"Daily loss limit hit: ${self.daily_pnl:.2f} "
                f"(limit: -${self.config.max_daily_loss:.2f})"
            )
            self._trigger_circuit_breaker("daily_loss_limit")
            return False

        # Check drawdown limit
        if self.peak_portfolio_value > 0:
            drawdown = (
                (self.peak_portfolio_value - self.current_portfolio_value)
                / self.peak_portfolio_value
            )
            if drawdown > self.config.max_drawdown_pct:
                logger.warning(
                    f"Drawdown limit hit: {drawdown:.1%} "
                    f"(limit: {self.config.max_drawdown_pct:.1%})"
                )
                self._trigger_circuit_breaker("max_drawdown")
                return False

        return True

    def approve_order(
        self, signal: TradingSignal, portfolio: dict
    ) -> bool:
        """Approve or reject a specific order based on risk rules.

        Checks:
        1. Position size vs portfolio percentage limit
        2. Total exposure vs maximum
        3. Trade velocity (max trades per hour)
        """
        # Position size check
        total_exposure = portfolio.get("total_exposure", 0.0)
        if signal.recommended_size + total_exposure > (
            self.config.max_drawdown_pct * self.peak_portfolio_value * 10
        ):
            logger.warning(f"Order rejected: would exceed exposure limit")
            return False

        # Trade velocity check (max 20 trades per hour)
        now = time.time()
        if now - self._hour_start > 3600:
            self._trades_this_hour = 0
            self._hour_start = now
        if self._trades_this_hour >= 20:
            logger.warning("Order rejected: trade velocity limit reached")
            return False
        self._trades_this_hour += 1

        return True

    def update_pnl(self, realized_pnl: float, portfolio_value: float):
        """Update the daily P&L and portfolio tracking."""
        self.daily_pnl += realized_pnl
        self.current_portfolio_value = portfolio_value
        if portfolio_value > self.peak_portfolio_value:
            self.peak_portfolio_value = portfolio_value

    def reset_daily(self):
        """Reset daily counters (call at start of each trading day)."""
        self.daily_pnl = 0.0
        self._trades_this_hour = 0
        logger.info("Daily risk counters reset")

    def _trigger_circuit_breaker(self, reason: str):
        """Activate the circuit breaker."""
        self._circuit_breaker_active = True
        self._circuit_breaker_until = (
            time.time() + self.config.circuit_breaker_cooldown
        )
        logger.critical(
            f"CIRCUIT BREAKER TRIGGERED: {reason}. "
            f"Trading halted for {self.config.circuit_breaker_cooldown}s"
        )

    def get_risk_report(self) -> dict:
        """Generate a comprehensive risk report."""
        drawdown = 0.0
        if self.peak_portfolio_value > 0:
            drawdown = (
                (self.peak_portfolio_value - self.current_portfolio_value)
                / self.peak_portfolio_value
            )
        return {
            "daily_pnl": self.daily_pnl,
            "daily_loss_limit": self.config.max_daily_loss,
            "daily_loss_remaining": self.config.max_daily_loss + self.daily_pnl,
            "current_drawdown": drawdown,
            "max_drawdown_limit": self.config.max_drawdown_pct,
            "circuit_breaker_active": self._circuit_breaker_active,
            "trades_this_hour": self._trades_this_hour,
            "portfolio_value": self.current_portfolio_value,
            "peak_value": self.peak_portfolio_value,
        }

42.7.2 Compliance Checks

As discussed extensively in Chapters 38 (Legal Landscape) and 39 (Compliance Frameworks), prediction market trading carries regulatory obligations. Our system includes compliance hooks:

class ComplianceChecker:
    """Enforces regulatory compliance rules.

    Based on the frameworks discussed in Chapters 38-39:
    - Geographic restrictions (some platforms restricted by jurisdiction)
    - Market type restrictions (some event types may have legal issues)
    - Position reporting thresholds
    - Anti-manipulation safeguards
    """

    def __init__(self, jurisdiction: str = "US"):
        self.jurisdiction = jurisdiction
        self.restricted_categories = self._load_restricted_categories()
        self.position_report_threshold = 10000.0  # USD

    def check_market_eligibility(self, snapshot: MarketSnapshot) -> bool:
        """Check if a market is eligible for trading in our jurisdiction."""
        # Check category restrictions
        if snapshot.category.lower() in self.restricted_categories:
            logger.info(
                f"Market {snapshot.market_id} restricted in {self.jurisdiction}: "
                f"category '{snapshot.category}'"
            )
            return False

        # Check platform availability in jurisdiction
        platform_restrictions = {
            "polymarket": ["US"],  # Simplified — actual rules vary
        }
        restricted_in = platform_restrictions.get(snapshot.platform, [])
        if self.jurisdiction in restricted_in:
            logger.info(
                f"Platform {snapshot.platform} restricted in {self.jurisdiction}"
            )
            return False

        return True

    def check_position_reporting(self, position_size: float) -> bool:
        """Check if a position requires regulatory reporting."""
        if position_size > self.position_report_threshold:
            logger.warning(
                f"Position ${position_size:.2f} exceeds reporting threshold "
                f"(${self.position_report_threshold:.2f})"
            )
            return True
        return False

    def _load_restricted_categories(self) -> set:
        """Load restricted market categories for the jurisdiction."""
        # In production, load from a configuration file or database
        return {"assassination", "terrorism", "illegal_activity"}

42.8 Monitoring and Alerting Dashboard

A trading system without monitoring is a ticking time bomb. You need to know what is happening at all times. This section implements the observability layer.

42.8.1 Metrics Collection

from collections import defaultdict

class MetricsCollector:
    """Collects and aggregates system metrics for monitoring.

    Tracks three categories of metrics:
    1. Trading metrics: P&L, win rate, position sizes
    2. System metrics: Cycle times, API latencies, error rates
    3. Model metrics: Prediction accuracy, calibration drift
    """

    def __init__(self):
        self.counters: dict[str, int] = defaultdict(int)
        self.gauges: dict[str, float] = {}
        self.histograms: dict[str, list[float]] = defaultdict(list)
        self._start_time = datetime.utcnow()

    def increment(self, name: str, value: int = 1):
        """Increment a counter metric."""
        self.counters[name] += value

    def set_gauge(self, name: str, value: float):
        """Set a gauge metric to a specific value."""
        self.gauges[name] = value

    def observe(self, name: str, value: float):
        """Record a value in a histogram."""
        self.histograms[name].append(value)
        # Keep only last 1000 observations
        if len(self.histograms[name]) > 1000:
            self.histograms[name] = self.histograms[name][-1000:]

    def get_summary(self) -> dict:
        """Generate a summary of all current metrics."""
        uptime = (datetime.utcnow() - self._start_time).total_seconds()
        summary = {
            "uptime_seconds": uptime,
            "counters": dict(self.counters),
            "gauges": dict(self.gauges),
            "histograms": {},
        }
        for name, values in self.histograms.items():
            if values:
                arr = np.array(values)
                summary["histograms"][name] = {
                    "count": len(values),
                    "mean": float(np.mean(arr)),
                    "std": float(np.std(arr)),
                    "min": float(np.min(arr)),
                    "max": float(np.max(arr)),
                    "p50": float(np.percentile(arr, 50)),
                    "p95": float(np.percentile(arr, 95)),
                    "p99": float(np.percentile(arr, 99)),
                }
        return summary

42.8.2 Alerting System

import smtplib
from email.mime.text import MIMEText

class AlertManager:
    """Sends alerts when critical conditions are detected.

    Alert levels:
    - INFO: Informational, logged only
    - WARNING: Requires attention, sent via configured channels
    - CRITICAL: Requires immediate action, sent to all channels
    """

    def __init__(
        self,
        email_config: Optional[dict] = None,
        webhook_url: Optional[str] = None,
    ):
        self.email_config = email_config
        self.webhook_url = webhook_url
        self._alert_history: list[dict] = []
        self._suppression_cache: dict[str, float] = {}
        self._suppression_interval = 300.0  # 5 minutes between repeated alerts

    async def send_alert(
        self, level: str, title: str, message: str, data: Optional[dict] = None
    ):
        """Send an alert through configured channels."""
        now = time.time()

        # Check suppression — do not spam the same alert
        cache_key = f"{level}:{title}"
        last_sent = self._suppression_cache.get(cache_key, 0)
        if now - last_sent < self._suppression_interval:
            return
        self._suppression_cache[cache_key] = now

        alert = {
            "level": level,
            "title": title,
            "message": message,
            "data": data or {},
            "timestamp": datetime.utcnow().isoformat(),
        }
        self._alert_history.append(alert)
        logger.log(
            getattr(logging, level.upper(), logging.INFO),
            f"ALERT [{level}] {title}: {message}"
        )

        if level in ("WARNING", "CRITICAL"):
            if self.webhook_url:
                await self._send_webhook(alert)
            if level == "CRITICAL" and self.email_config:
                self._send_email(alert)

    async def _send_webhook(self, alert: dict):
        """Send alert via webhook (Slack, Discord, etc.)."""
        try:
            async with httpx.AsyncClient() as client:
                payload = {
                    "text": (
                        f"*[{alert['level']}]* {alert['title']}\n"
                        f"{alert['message']}"
                    ),
                }
                await client.post(self.webhook_url, json=payload)
        except Exception as e:
            logger.error(f"Webhook alert failed: {e}")

    def _send_email(self, alert: dict):
        """Send alert via email."""
        if not self.email_config:
            return
        try:
            msg = MIMEText(
                f"Level: {alert['level']}\n"
                f"Title: {alert['title']}\n"
                f"Message: {alert['message']}\n"
                f"Time: {alert['timestamp']}\n"
                f"Data: {json.dumps(alert.get('data', {}), indent=2)}"
            )
            msg["Subject"] = f"[Trading Alert] {alert['title']}"
            msg["From"] = self.email_config["from"]
            msg["To"] = self.email_config["to"]
            with smtplib.SMTP(
                self.email_config["smtp_host"],
                self.email_config.get("smtp_port", 587),
            ) as server:
                server.starttls()
                server.login(
                    self.email_config["username"],
                    self.email_config["password"],
                )
                server.send_message(msg)
        except Exception as e:
            logger.error(f"Email alert failed: {e}")

42.8.3 Dashboard Renderer

For a simple but effective monitoring view, we build a text-based dashboard that can run in a terminal or be served via a minimal web interface:

class DashboardRenderer:
    """Renders a monitoring dashboard to the terminal or HTML.

    Displays:
    - Portfolio summary (value, P&L, positions)
    - Risk status (limits, circuit breakers)
    - Model performance (recent predictions vs outcomes)
    - System health (uptime, error rates, API status)
    """

    def __init__(
        self,
        metrics: MetricsCollector,
        risk_manager: RiskManager,
        position_manager: PositionManager,
    ):
        self.metrics = metrics
        self.risk_manager = risk_manager
        self.position_manager = position_manager

    def render_text(self) -> str:
        """Render the dashboard as formatted text."""
        portfolio = self.position_manager.get_portfolio_summary()
        risk = self.risk_manager.get_risk_report()
        system = self.metrics.get_summary()

        lines = [
            "=" * 70,
            "  PREDICTION MARKET TRADING SYSTEM — DASHBOARD",
            "=" * 70,
            "",
            "--- PORTFOLIO ---",
            f"  Total Exposure:     ${portfolio['total_exposure']:>12,.2f}",
            f"  Unrealized P&L:     ${portfolio['total_unrealized_pnl']:>12,.2f}",
            f"  Open Positions:     {portfolio['num_positions']:>12d}",
            "",
            "--- RISK STATUS ---",
            f"  Daily P&L:          ${risk['daily_pnl']:>12,.2f}",
            f"  Daily Loss Remain:  ${risk['daily_loss_remaining']:>12,.2f}",
            f"  Current Drawdown:   {risk['current_drawdown']:>12.1%}",
            f"  Circuit Breaker:    {'ACTIVE' if risk['circuit_breaker_active'] else 'OFF':>12}",
            f"  Trades This Hour:   {risk['trades_this_hour']:>12d}",
            "",
            "--- SYSTEM HEALTH ---",
            f"  Uptime:             {system['uptime_seconds'] / 3600:>12.1f} hrs",
            f"  Total Cycles:       {system['counters'].get('cycles', 0):>12d}",
            f"  Errors:             {system['counters'].get('errors', 0):>12d}",
            "",
            "--- POSITIONS ---",
        ]

        for key, pos in portfolio.get("positions", {}).items():
            pnl_str = f"${pos['unrealized_pnl']:+.2f}"
            lines.append(
                f"  {key[:40]:<40} {pos['side']:>4} "
                f"{pos['quantity']:>8.2f} @ {pos['avg_entry']:>.4f}  {pnl_str:>10}"
            )

        if not portfolio.get("positions"):
            lines.append("  (no open positions)")

        lines.extend(["", "=" * 70])
        return "\n".join(lines)

42.9 Deployment and Operations

Building the system is only half the battle. Deploying and operating it reliably in production is the other half.

42.9.1 Docker Deployment

We containerize the system for reproducible deployments:

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc libpq-dev && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Run as non-root user
RUN useradd -m trader
USER trader

# Health check
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
    CMD python -c "import sys; sys.exit(0)"

ENTRYPOINT ["python", "-m", "trading_system.main"]

42.9.2 Deployment Checklist

Before going live, run through this checklist:

PRE-DEPLOYMENT CHECKLIST
========================

[ ] Configuration
    [ ] All API keys set via environment variables (not in code)
    [ ] dry_run=True for initial deployment
    [ ] Risk limits configured conservatively
    [ ] Alert channels configured and tested

[ ] Model
    [ ] Model trained on latest data
    [ ] Calibration verified (ECE < 0.05)
    [ ] Backtest results reviewed and acceptable
    [ ] Model file versioned and backed up

[ ] Infrastructure
    [ ] Database migrations applied
    [ ] Log rotation configured
    [ ] Disk space sufficient for 30+ days of data
    [ ] Network connectivity to all platform APIs verified

[ ] Risk Management
    [ ] Daily loss limit set
    [ ] Maximum drawdown limit set
    [ ] Per-position size limit set
    [ ] Circuit breaker cooldown configured
    [ ] Emergency shutdown procedure documented

[ ] Monitoring
    [ ] Dashboard accessible
    [ ] Alert channels (email/Slack) verified
    [ ] Log aggregation configured
    [ ] Metrics retention policy set

[ ] Compliance (Ch 38-39)
    [ ] Jurisdiction restrictions implemented
    [ ] Restricted market categories configured
    [ ] Position reporting thresholds set
    [ ] Terms of service for each platform reviewed

[ ] Testing
    [ ] Ran 24 hours in paper trading mode
    [ ] Verified order execution in dry-run
    [ ] Tested circuit breaker triggers
    [ ] Tested alert delivery
    [ ] Performed manual reconciliation of paper trades

42.9.3 Operational Runbook

A runbook is a document that tells operators how to handle common situations. Here are the key procedures:

Scenario: System starts but makes no trades

  1. Check the dashboard — is the circuit breaker active?
  2. Check model predictions — are all edges below the minimum threshold?
  3. Check API connectivity — can the system reach platform APIs?
  4. Check logs for errors in the data pipeline or feature engineering stages.

Scenario: Unexpected large loss

  1. Verify the circuit breaker activated. If not, manually halt trading.
  2. Review the trade log — identify which positions caused the loss.
  3. Check if the model was making extreme predictions (calibration drift).
  4. Review market data for anomalies (flash crashes, data errors).
  5. Do not resume trading until the root cause is understood.

Scenario: API rate limiting

  1. Check the backoff configuration — are we respecting rate limits?
  2. Reduce the polling frequency in the configuration.
  3. Check if multiple instances are running (would double the request rate).
  4. Contact the platform if persistent — rate limits may have changed.

42.9.4 Scheduled Maintenance

class MaintenanceScheduler:
    """Handles periodic maintenance tasks.

    Tasks run at different intervals:
    - Every hour: Compact metrics, check disk space
    - Every day: Retrain model, reset daily counters, backup database
    - Every week: Full calibration check, performance report
    """

    def __init__(self, bot: TradingBot):
        self.bot = bot
        self.last_daily_reset = datetime.utcnow().date()
        self.last_weekly_report = datetime.utcnow().date()

    async def run_hourly(self):
        """Hourly maintenance tasks."""
        logger.info("Running hourly maintenance")
        # Compact old metrics
        # Check disk space
        # Verify API connectivity

    async def run_daily(self):
        """Daily maintenance tasks."""
        today = datetime.utcnow().date()
        if today == self.last_daily_reset:
            return
        self.last_daily_reset = today
        logger.info("Running daily maintenance")

        # Reset daily risk counters
        if self.bot.risk_manager:
            self.bot.risk_manager.reset_daily()

        # Backup database
        # Retrain model with latest data (if enough new data)

    async def run_weekly(self):
        """Weekly maintenance tasks."""
        today = datetime.utcnow().date()
        days_since_report = (today - self.last_weekly_report).days
        if days_since_report < 7:
            return
        self.last_weekly_report = today
        logger.info("Running weekly maintenance")

        # Generate performance report
        # Full calibration check
        # Review risk parameters

42.10 Putting It All Together

Let us now see the complete system initialization and startup:

async def main():
    """Entry point for the complete trading system."""
    # Load configuration
    config = load_config("config.yaml")

    # Configure logging
    logging.basicConfig(
        level=getattr(logging, config.log_level),
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        handlers=[
            logging.StreamHandler(),
            logging.FileHandler("trading_system.log"),
        ],
    )
    logger.info("Starting Prediction Market Trading System")
    logger.info(f"Mode: {'DRY RUN' if config.dry_run else 'LIVE'}")

    # Initialize the trading bot
    bot = TradingBot(config)

    # Initialize risk manager
    bot.risk_manager = RiskManager(config.risk, bot.data_store)
    bot.risk_manager.peak_portfolio_value = config.trading.max_portfolio_exposure

    # Initialize monitoring
    metrics = MetricsCollector()
    alerts = AlertManager(webhook_url=None)
    dashboard = DashboardRenderer(metrics, bot.risk_manager, bot.position_manager)

    # Load or train model
    try:
        bot.model = EnsembleModel.load("model.pkl")
        logger.info("Loaded existing model from disk")
    except FileNotFoundError:
        logger.warning("No trained model found — system will need training data")

    # Start the maintenance scheduler
    maintenance = MaintenanceScheduler(bot)

    # Main loop
    try:
        while True:
            await bot.run_cycle()
            metrics.increment("cycles")

            # Maintenance tasks
            await maintenance.run_hourly()
            await maintenance.run_daily()
            await maintenance.run_weekly()

            # Update and display dashboard
            print(dashboard.render_text())

            # Wait for next cycle
            await asyncio.sleep(
                config.trading.rebalance_interval_minutes * 60
            )
    except KeyboardInterrupt:
        logger.info("Shutdown signal received")
    finally:
        await bot.stop()
        logger.info("System shutdown complete")


if __name__ == "__main__":
    asyncio.run(main())

42.11 Testing the System

No system is complete without tests. We take a layered testing approach:

Unit Tests

Each module has focused unit tests:

import unittest

class TestEdgeCalculation(unittest.TestCase):
    """Test the strategy engine's edge calculation."""

    def test_positive_edge_yes(self):
        """When model predicts higher than market, edge is positive YES."""
        config = TradingConfig()
        engine = StrategyEngine(config)
        snapshot = MarketSnapshot(
            platform="test", market_id="m1", question="Test?",
            category="test", yes_price=0.40, no_price=0.60,
            spread=0.0, volume_24h=1000, liquidity=5000,
        )
        signals = engine.generate_signals(
            [snapshot], {"test:m1": 0.60}, {}, 10000.0
        )
        self.assertEqual(len(signals), 1)
        self.assertEqual(signals[0].side, "YES")
        self.assertAlmostEqual(signals[0].edge, 0.20, places=2)

    def test_no_edge_no_signal(self):
        """When model agrees with market, no signal is generated."""
        config = TradingConfig()
        engine = StrategyEngine(config)
        snapshot = MarketSnapshot(
            platform="test", market_id="m1", question="Test?",
            category="test", yes_price=0.50, no_price=0.50,
            spread=0.0, volume_24h=1000, liquidity=5000,
        )
        signals = engine.generate_signals(
            [snapshot], {"test:m1": 0.51}, {}, 10000.0
        )
        # Edge of 0.01 is below min_edge_threshold of 0.03
        self.assertEqual(len(signals), 0)


class TestRiskManager(unittest.TestCase):
    """Test risk management circuit breakers."""

    def test_daily_loss_limit(self):
        """Trading halts when daily loss limit is exceeded."""
        config = RiskConfig(max_daily_loss=100.0)
        rm = RiskManager(config, DataStore(":memory:"))
        rm.peak_portfolio_value = 10000.0
        rm.current_portfolio_value = 10000.0
        rm.update_pnl(-150.0, 9850.0)
        self.assertFalse(rm.allow_trading())

    def test_drawdown_limit(self):
        """Trading halts when drawdown exceeds limit."""
        config = RiskConfig(max_drawdown_pct=0.10)
        rm = RiskManager(config, DataStore(":memory:"))
        rm.peak_portfolio_value = 10000.0
        rm.current_portfolio_value = 8500.0
        self.assertFalse(rm.allow_trading())

Integration Tests

Test that modules work together correctly:

class TestTradingPipeline(unittest.TestCase):
    """Integration test for the full trading pipeline."""

    def test_full_cycle_dry_run(self):
        """Verify a complete trading cycle works in dry-run mode."""
        config = SystemConfig(dry_run=True)
        # This would test the full flow from data to execution
        # using mock API responses and verifying the output
        self.assertTrue(config.dry_run)

42.12 Reflections on the Learning Journey

You have now built a complete prediction market trading system from the ground up. Let us reflect on how every part of this book contributed to what you have built.

Part I: Foundations (Chapters 1-6) gave you the conceptual framework. You learned what prediction markets are, why they work, and the probability theory underpinning them. Without Chapter 3's treatment of probability, you could not have built the model training pipeline. Without Chapter 5's economic theory, you would not understand why mispricings exist for your strategy to exploit.

Part II: Market Mechanics (Chapters 7-12) taught you how markets actually operate. The API clients you built in Section 42.2 directly apply the order book and AMM concepts from Chapter 7. The calibration verification in Section 42.3 implements the scoring rules from Chapter 9 and the calibration techniques from Chapter 12.

Part III: Trading Strategies (Chapters 13-19) is the intellectual core of the strategy engine. Your edge calculation comes from Chapter 13, Kelly sizing from Chapter 15, risk management from Chapter 16, backtesting from Chapter 17, the transition from paper to live trading from Chapter 18, and portfolio construction from Chapter 19.

Part IV: Data Science and ML (Chapters 20-27) powers the intelligence layer. The feature engineering pipeline implements Chapter 21's techniques. The ensemble model combines Chapter 20's logistic regression with Chapter 23's gradient boosting. The time-series cross-validation follows Chapter 24's warnings about temporal leakage.

Part V: Market Design (Chapters 28-33) informs how you interact with different platforms. Understanding market maker incentives (Chapter 30), liquidity dynamics (Chapter 31), and manipulation risks (Chapter 32) helps you navigate the ecosystem intelligently.

Part VI: Blockchain and Technology (Chapters 34-37) underpins the Polymarket integration. The conditional tokens, Polygon settlement, and on-chain resolution your system interacts with all trace back to these chapters.

Part VII: Regulation, Ethics, and the Future (Chapters 38-41) keeps your system legal and ethical. The compliance checker in Section 42.7 implements the regulatory frameworks from Chapter 38 and the compliance patterns from Chapter 39.

Next Steps

This system is a foundation, not a ceiling. Here are paths for continued growth:

  1. Advanced Models: Replace the logistic regression + XGBoost ensemble with transformer-based models that can process market text directly (Chapter 26 explored NLP techniques).

  2. Multi-Asset Arbitrage: Extend the system to exploit price discrepancies across platforms for the same event (Case Study 2 explores this in detail).

  3. Market Making: Instead of only taking positions, provide liquidity and earn the bid-ask spread. This requires the market-making concepts from Chapter 30.

  4. On-Chain Integration: Deploy your own smart contracts for automated settlement, building on the Solidity and blockchain concepts from Chapters 34-37.

  5. Social Forecasting: Incorporate wisdom-of-crowds signals from forecasting communities, combining the human judgment concepts from Chapter 11 with your ML pipeline.

  6. Real-Time Streaming: Replace the polling-based data pipeline with WebSocket streams for sub-second reaction times.

  7. Reinforcement Learning: Train an RL agent to optimize the trading policy directly, rather than decomposing the problem into prediction and sizing steps.

The prediction market ecosystem is young and growing rapidly. The skills you have developed throughout this book — probability reasoning, market analysis, ML engineering, system design — will serve you well regardless of how the space evolves.

Welcome to the frontier. Build wisely.


Summary

This capstone chapter integrated concepts from all 41 previous chapters into a production-grade prediction market trading system. The system comprises seven major modules:

  1. Data Pipeline: API clients for multiple platforms, data normalization, persistent storage, and supplementary data ingestion.

  2. Feature Engineering and Model Training: 17 engineered features across price, volume, time, sentiment, and cross-market dimensions, fed into a calibrated logistic regression + XGBoost ensemble.

  3. Strategy Engine: Edge calculation, Kelly criterion position sizing, and portfolio-level optimization with diversification constraints.

  4. Backtesting Framework: Walk-forward simulation with realistic transaction costs, slippage modeling, and temporal cross-validation.

  5. Live Trading Bot: Asynchronous order execution, position tracking, and a continuous main loop orchestrating all components.

  6. Risk Management: Daily loss limits, maximum drawdown circuit breakers, per-position and per-category exposure caps, trade velocity limits, and regulatory compliance checks.

  7. Monitoring and Alerting: Metrics collection, multi-channel alerting with suppression, and a real-time dashboard renderer.

The system is deployed using Docker, configured via YAML, and operated according to a detailed runbook and pre-deployment checklist.