23 min read

> "A prediction market that cannot handle the moment everyone wants to trade is like a fire station that closes during fires." --- Anonymous Infrastructure Engineer

Chapter 33: Scaling, Performance, and Operations

"A prediction market that cannot handle the moment everyone wants to trade is like a fire station that closes during fires." --- Anonymous Infrastructure Engineer

Prediction markets face a unique operational challenge: the events that generate the most valuable information are precisely the events that generate the most traffic. Election nights, breaking news, unexpected geopolitical events --- these moments produce both the highest demand for market access and the highest information content in prices. A platform that buckles under this load fails at the exact moment it matters most.

This chapter provides an exhaustive treatment of the engineering disciplines required to build production-grade prediction market infrastructure. We cover event sourcing and CQRS as architectural foundations, database optimization, caching, message queues, horizontal scaling, monitoring, security, disaster recovery, and performance testing. Every section includes concrete Python implementations that can serve as starting points for real systems.

By the end of this chapter, you will understand not just how to build a prediction market matching engine (covered in earlier chapters), but how to operate one reliably at scale.


33.1 Why Scaling Matters

33.1.1 The Bursty Nature of Prediction Market Traffic

Prediction market traffic is among the most bursty of any application category. Consider the traffic pattern for a market on a U.S. presidential election:

  • Normal day: 500--2,000 trades per hour, a few hundred concurrent users.
  • Debate night: 10,000--50,000 trades per hour, several thousand concurrent users.
  • Election night: 500,000+ trades per hour, tens of thousands of concurrent users, with sub-second price updates critical.

The ratio between peak and normal traffic can easily exceed 100:1. This is far more extreme than typical e-commerce (Black Friday is roughly 10:1) and approaches the burstiness of live sports betting platforms.

Definition 33.1 (Peak-to-Normal Ratio). For a prediction market platform, the peak-to-normal ratio $R$ is defined as:

$$R = \frac{T_{\text{peak}}}{T_{\text{normal}}}$$

where $T_{\text{peak}}$ is the maximum sustained throughput during the highest-traffic event over a time period, and $T_{\text{normal}}$ is the average throughput during non-event periods.

For well-known prediction market platforms, $R$ values between 50 and 500 are common.

33.1.2 Why Traditional Approaches Fail

A monolithic application backed by a single relational database can typically handle a few hundred requests per second. This may be adequate for normal operations but is catastrophically insufficient during peak events. The failure modes are predictable:

  1. Database connection exhaustion: The database runs out of available connections, causing new requests to queue or fail.
  2. Lock contention: Multiple trades affecting the same market create lock contention in the order book tables.
  3. Memory pressure: In-memory data structures (order books, session data) exceed available RAM.
  4. Network saturation: WebSocket connections for real-time price feeds consume all available bandwidth.
  5. Cascading failures: When one component slows down, back-pressure causes upstream components to fail, leading to total system collapse.

33.1.3 Scaling Dimensions

We can scale along multiple dimensions, each with different costs and trade-offs:

Dimension Approach Benefit Limitation
Vertical Bigger machines Simple, no code changes Hardware ceiling, single point of failure
Horizontal More machines Near-linear scaling Requires stateless design, adds complexity
Functional Separate services Independent scaling Inter-service communication overhead
Data Sharding/partitioning Distributes load Cross-shard queries are expensive
Temporal Caching Reduces repeated work Staleness, invalidation complexity

The remainder of this chapter addresses each of these dimensions with architectures and code specifically designed for prediction market workloads.

33.1.4 Service Level Objectives

Before scaling, we must define what "fast enough" means. Here are typical SLOs for a production prediction market:

Metric Target Rationale
Order submission latency (p50) < 50 ms Traders expect near-instant confirmation
Order submission latency (p99) < 200 ms Tail latency must be bounded
Price feed latency < 100 ms Stale prices cause bad trades
API availability 99.95% ~4.4 hours downtime per year
Market resolution time < 5 minutes After outcome is known
Data consistency Exactly-once No duplicate or lost trades

These SLOs drive every architectural decision in this chapter.


33.2 Event Sourcing for Prediction Markets

33.2.1 The Core Idea

Traditional applications store the current state of data. An order book table might contain only the current set of open orders. When an order is filled, the row is updated or deleted. The history of how we arrived at the current state is lost (or, at best, stored in a separate audit log that may drift from reality).

Event sourcing inverts this model: instead of storing current state, we store the sequence of events that produced the current state. The current state is a derived quantity, computed by replaying the event log.

Definition 33.2 (Event Sourcing). An event-sourced system persists all changes to application state as an immutable, append-only sequence of events. The current state at time $t$ is obtained by applying a fold (left reduction) over all events up to time $t$:

$$S(t) = \text{fold}(f, S_0, [e_1, e_2, \ldots, e_n])$$

where $S_0$ is the initial state, $e_1, \ldots, e_n$ are the events with timestamps $\leq t$, and $f: (S, e) \to S'$ is the state transition function.

33.2.2 Event Types for Prediction Markets

A prediction market platform produces several categories of events:

Market Lifecycle Events: - MarketCreated --- a new market is opened for trading - MarketSuspended --- trading is temporarily halted - MarketResumed --- trading resumes after suspension - MarketResolved --- the outcome is determined and the market closes - MarketVoided --- the market is cancelled and all positions are refunded

Order Events: - OrderPlaced --- a new order enters the system - OrderMatched --- two orders are matched (partially or fully) - OrderCancelled --- a trader cancels an open order - OrderExpired --- a time-limited order reaches its expiry

Account Events: - FundsDeposited --- a trader adds funds to their account - FundsWithdrawn --- a trader removes funds - PositionOpened --- a new market position is created - PositionClosed --- a position is settled after market resolution - PayoutIssued --- funds are disbursed to winning positions

33.2.3 Benefits for Prediction Markets

Event sourcing is particularly well-suited to prediction markets for several reasons:

  1. Regulatory compliance: Many jurisdictions require a complete, tamper-evident audit trail of all trades. An event log provides this natively.

  2. Dispute resolution: When traders dispute a fill price or claim an order was not executed, the event log provides an authoritative record.

  3. Temporal queries: "What was the price of this market at 9:47 PM on election night?" is trivially answerable by replaying events up to that timestamp.

  4. Replay and debugging: When bugs are discovered, the production event stream can be replayed against fixed code to verify the correction.

  5. Analytics without impact: Historical analysis can run against a read-only copy of the event log without affecting production performance.

  6. Event-driven architecture: Downstream systems (notifications, analytics, settlement) can subscribe to the event stream and process events independently.

33.2.4 Implementation Considerations

Event Schema Evolution: Over time, event schemas will change. A MarketCreated event in version 1 might lack fields that version 2 requires. Solutions include:

  • Upcasting: Transform old events to the latest schema on read.
  • Versioned schemas: Include a version field in each event and handle each version.
  • Weak schema: Use a flexible format (JSON) that tolerates missing fields with sensible defaults.

Snapshotting: Replaying millions of events to compute current state is expensive. Periodic snapshots capture state at a point in time, and replay needs only to process events after the snapshot.

Compaction: For very long-lived streams, events before a snapshot can be archived to cold storage to keep the hot event store manageable.

33.2.5 Python Implementation

The following implementation provides a complete event sourcing system for a prediction market. See code/example-01-event-sourcing.py for the full runnable version.

import json
import time
import uuid
import hashlib
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Any, Callable
from enum import Enum
from datetime import datetime, timezone


class EventType(Enum):
    MARKET_CREATED = "market_created"
    ORDER_PLACED = "order_placed"
    ORDER_MATCHED = "order_matched"
    ORDER_CANCELLED = "order_cancelled"
    MARKET_RESOLVED = "market_resolved"
    FUNDS_DEPOSITED = "funds_deposited"
    FUNDS_WITHDRAWN = "funds_withdrawn"


@dataclass(frozen=True)
class Event:
    event_id: str
    event_type: EventType
    aggregate_id: str        # e.g., market_id or account_id
    timestamp: float
    version: int             # aggregate version for optimistic concurrency
    data: Dict[str, Any]
    metadata: Dict[str, Any] = field(default_factory=dict)

    @property
    def checksum(self) -> str:
        """Tamper-evident checksum for audit purposes."""
        payload = json.dumps({
            "event_id": self.event_id,
            "event_type": self.event_type.value,
            "aggregate_id": self.aggregate_id,
            "timestamp": self.timestamp,
            "version": self.version,
            "data": self.data,
        }, sort_keys=True)
        return hashlib.sha256(payload.encode()).hexdigest()


class EventStore:
    """Append-only event store with optimistic concurrency control."""

    def __init__(self):
        self._events: List[Event] = []
        self._aggregate_versions: Dict[str, int] = {}
        self._subscribers: List[Callable[[Event], None]] = []

    def append(self, event: Event) -> None:
        agg_id = event.aggregate_id
        current_version = self._aggregate_versions.get(agg_id, 0)

        if event.version != current_version + 1:
            raise ConcurrencyError(
                f"Expected version {current_version + 1} "
                f"for aggregate {agg_id}, got {event.version}"
            )

        self._events.append(event)
        self._aggregate_versions[agg_id] = event.version

        for subscriber in self._subscribers:
            subscriber(event)

    def get_events(
        self,
        aggregate_id: str,
        after_version: int = 0
    ) -> List[Event]:
        return [
            e for e in self._events
            if e.aggregate_id == aggregate_id
            and e.version > after_version
        ]

    def get_all_events(
        self,
        after_timestamp: float = 0.0
    ) -> List[Event]:
        return [
            e for e in self._events
            if e.timestamp > after_timestamp
        ]

    def subscribe(self, handler: Callable[[Event], None]) -> None:
        self._subscribers.append(handler)


class ConcurrencyError(Exception):
    pass

This event store enforces two critical properties: (1) events are immutable and append-only, and (2) optimistic concurrency control prevents conflicting updates to the same aggregate.

33.2.6 Rebuilding State from Events

The power of event sourcing lies in the ability to reconstruct state at any point in time:

class MarketAggregate:
    """Rebuilds market state from events."""

    def __init__(self, market_id: str):
        self.market_id = market_id
        self.status = None
        self.question = None
        self.outcomes = []
        self.orders = {}
        self.trades = []
        self.resolution = None
        self.version = 0

    def apply(self, event: Event) -> None:
        handler = getattr(
            self, f"_apply_{event.event_type.value}", None
        )
        if handler:
            handler(event)
        self.version = event.version

    def _apply_market_created(self, event: Event) -> None:
        self.status = "open"
        self.question = event.data["question"]
        self.outcomes = event.data["outcomes"]

    def _apply_order_placed(self, event: Event) -> None:
        order_id = event.data["order_id"]
        self.orders[order_id] = {
            "trader_id": event.data["trader_id"],
            "outcome": event.data["outcome"],
            "side": event.data["side"],
            "price": event.data["price"],
            "quantity": event.data["quantity"],
            "remaining": event.data["quantity"],
            "status": "open",
        }

    def _apply_order_matched(self, event: Event) -> None:
        buy_id = event.data["buy_order_id"]
        sell_id = event.data["sell_order_id"]
        qty = event.data["quantity"]
        price = event.data["price"]

        self.orders[buy_id]["remaining"] -= qty
        self.orders[sell_id]["remaining"] -= qty

        if self.orders[buy_id]["remaining"] == 0:
            self.orders[buy_id]["status"] = "filled"
        if self.orders[sell_id]["remaining"] == 0:
            self.orders[sell_id]["status"] = "filled"

        self.trades.append({
            "buy_order_id": buy_id,
            "sell_order_id": sell_id,
            "quantity": qty,
            "price": price,
            "timestamp": event.timestamp,
        })

    def _apply_market_resolved(self, event: Event) -> None:
        self.status = "resolved"
        self.resolution = event.data["winning_outcome"]

    @classmethod
    def rebuild(cls, market_id: str, event_store: EventStore):
        """Rebuild market state from the event log."""
        aggregate = cls(market_id)
        for event in event_store.get_events(market_id):
            aggregate.apply(event)
        return aggregate

33.2.7 Snapshots for Performance

When markets accumulate thousands of events, replaying all of them becomes slow. Snapshots solve this:

@dataclass
class Snapshot:
    aggregate_id: str
    version: int
    state: Dict[str, Any]
    timestamp: float

class SnapshotStore:
    def __init__(self):
        self._snapshots: Dict[str, Snapshot] = {}

    def save(self, aggregate: MarketAggregate) -> None:
        self._snapshots[aggregate.market_id] = Snapshot(
            aggregate_id=aggregate.market_id,
            version=aggregate.version,
            state={
                "status": aggregate.status,
                "question": aggregate.question,
                "outcomes": aggregate.outcomes,
                "orders": aggregate.orders,
                "trades": aggregate.trades,
                "resolution": aggregate.resolution,
            },
            timestamp=time.time(),
        )

    def load(self, aggregate_id: str) -> Optional[Snapshot]:
        return self._snapshots.get(aggregate_id)

With snapshots, state reconstruction only needs to replay events after the snapshot, reducing startup time from minutes to milliseconds for busy markets.


33.3 CQRS: Command Query Responsibility Segregation

33.3.1 The Problem with Unified Models

In a traditional architecture, the same data model serves both writes (placing orders, creating markets) and reads (displaying order books, showing market prices, generating leaderboards). This creates a fundamental tension:

  • Write-optimized schemas use normalization to avoid update anomalies, but normalized schemas require expensive joins for reads.
  • Read-optimized schemas use denormalization for fast queries, but denormalized schemas create update anomalies and consistency challenges.

Prediction markets feel this tension acutely. The write path (order matching) requires strict consistency and low latency. The read path (displaying prices, positions, history) requires high throughput and flexible querying. Trying to serve both from the same model forces compromises on both sides.

33.3.2 The CQRS Pattern

Definition 33.3 (CQRS). Command Query Responsibility Segregation is an architectural pattern that uses separate models for updating data (commands) and reading data (queries). Commands mutate state and return only success or failure. Queries return data and never mutate state.

In a prediction market context:

Command Side (Write Model): - Receives order submissions, cancellations, market creation requests. - Validates business rules (sufficient funds, market is open, price within bounds). - Applies changes to the event store. - Optimized for consistency and correctness.

Query Side (Read Model): - Maintains denormalized views optimized for specific query patterns. - Updated asynchronously by subscribing to the event stream. - Can have multiple read models, each optimized for different use cases. - Eventual consistency is acceptable (typically milliseconds of lag).

33.3.3 Read Models for Prediction Markets

A prediction market platform needs several read models:

  1. Order Book View: For each market, the current bids and asks at each price level. Optimized for the trading UI.

  2. Market Summary View: Current price, volume, number of traders, price history. Optimized for market listing pages.

  3. Portfolio View: Each trader's positions, PnL, and open orders. Optimized for portfolio dashboards.

  4. Leaderboard View: Top traders by profit, accuracy, or volume. Optimized for competitive features.

  5. Analytics View: Time-series data for charting, volume analysis, and market microstructure research.

Each view is a projection of the same underlying event stream, denormalized for its specific access pattern.

33.3.4 Consistency Considerations

CQRS introduces eventual consistency between the write model and read models. The write model is always authoritative, but read models may lag behind by a small amount of time. This has practical implications:

  • A trader who places an order may not immediately see it in their portfolio view.
  • The displayed price might be a few milliseconds behind the latest trade.
  • Two users viewing the same market might momentarily see different prices.

For prediction markets, this is generally acceptable. The critical invariant --- that orders are matched correctly and funds are transferred accurately --- is maintained by the write model. The read models only need to be eventually consistent.

Theorem 33.1 (Consistency Bound). In a CQRS system with event propagation latency $\delta$ and read model projection processing time $\pi$, the maximum staleness of any read model is bounded by:

$$\text{staleness}_{\max} = \delta + \pi$$

In practice, with in-process event subscribers and efficient projection logic, $\delta + \pi < 10\text{ms}$ is achievable, which is imperceptible to human users.

33.3.5 Python CQRS Implementation

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from collections import defaultdict


# --- Command Side ---

class CommandHandler:
    """Processes commands and emits events."""

    def __init__(self, event_store: EventStore):
        self.event_store = event_store

    def handle_place_order(
        self,
        market_id: str,
        trader_id: str,
        outcome: str,
        side: str,
        price: float,
        quantity: int,
    ) -> str:
        # Validate
        if not 0 < price < 1:
            raise ValueError("Price must be between 0 and 1")
        if quantity <= 0:
            raise ValueError("Quantity must be positive")

        # Load current state to check market is open
        market = MarketAggregate.rebuild(market_id, self.event_store)
        if market.status != "open":
            raise ValueError(f"Market is {market.status}, not open")

        # Create event
        order_id = str(uuid.uuid4())
        event = Event(
            event_id=str(uuid.uuid4()),
            event_type=EventType.ORDER_PLACED,
            aggregate_id=market_id,
            timestamp=time.time(),
            version=market.version + 1,
            data={
                "order_id": order_id,
                "trader_id": trader_id,
                "outcome": outcome,
                "side": side,
                "price": price,
                "quantity": quantity,
            },
        )
        self.event_store.append(event)
        return order_id


# --- Query Side ---

class OrderBookProjection:
    """Read model: order book for each market."""

    def __init__(self):
        self.books: Dict[str, Dict] = defaultdict(
            lambda: {"bids": defaultdict(int), "asks": defaultdict(int)}
        )

    def handle_event(self, event: Event) -> None:
        if event.event_type == EventType.ORDER_PLACED:
            market_id = event.aggregate_id
            side = "bids" if event.data["side"] == "buy" else "asks"
            price = event.data["price"]
            qty = event.data["quantity"]
            self.books[market_id][side][price] += qty

        elif event.event_type == EventType.ORDER_MATCHED:
            market_id = event.aggregate_id
            price = event.data["price"]
            qty = event.data["quantity"]
            self.books[market_id]["bids"][price] -= qty
            self.books[market_id]["asks"][price] -= qty

    def get_order_book(self, market_id: str) -> Dict:
        book = self.books[market_id]
        return {
            "bids": sorted(
                [(p, q) for p, q in book["bids"].items() if q > 0],
                key=lambda x: -x[0]
            ),
            "asks": sorted(
                [(p, q) for p, q in book["asks"].items() if q > 0],
                key=lambda x: x[0]
            ),
        }


class MarketSummaryProjection:
    """Read model: summary stats for each market."""

    def __init__(self):
        self.summaries: Dict[str, Dict] = {}

    def handle_event(self, event: Event) -> None:
        if event.event_type == EventType.MARKET_CREATED:
            self.summaries[event.aggregate_id] = {
                "market_id": event.aggregate_id,
                "question": event.data["question"],
                "outcomes": event.data["outcomes"],
                "status": "open",
                "last_price": None,
                "total_volume": 0,
                "trade_count": 0,
                "unique_traders": set(),
            }

        elif event.event_type == EventType.ORDER_MATCHED:
            mid = event.aggregate_id
            if mid in self.summaries:
                self.summaries[mid]["last_price"] = event.data["price"]
                self.summaries[mid]["total_volume"] += (
                    event.data["quantity"] * event.data["price"]
                )
                self.summaries[mid]["trade_count"] += 1

        elif event.event_type == EventType.MARKET_RESOLVED:
            mid = event.aggregate_id
            if mid in self.summaries:
                self.summaries[mid]["status"] = "resolved"
                self.summaries[mid]["resolution"] = (
                    event.data["winning_outcome"]
                )

    def get_summary(self, market_id: str) -> Optional[Dict]:
        summary = self.summaries.get(market_id)
        if summary:
            result = dict(summary)
            result["unique_traders"] = len(result["unique_traders"])
            return result
        return None

33.3.6 Wiring It Together

The command handler and projections are connected via the event store's subscription mechanism:

# Setup
event_store = EventStore()
command_handler = CommandHandler(event_store)
order_book_view = OrderBookProjection()
summary_view = MarketSummaryProjection()

# Wire projections to event store
event_store.subscribe(order_book_view.handle_event)
event_store.subscribe(summary_view.handle_event)

# Now, when a command is processed:
# 1. CommandHandler validates and appends event to store
# 2. EventStore notifies all subscribers
# 3. Projections update their denormalized views
# 4. Reads go directly to projections (no joins, no aggregation)

This separation allows us to scale reads and writes independently. The command handler can run on a single, powerful machine (writes are inherently serial for a given market), while read replicas can be scaled horizontally across many machines.


33.4 Database Optimization

33.4.1 Schema Design for Market Data

Prediction market data has specific patterns that inform schema design. The most frequently accessed data is:

  1. Current market prices --- queried on every page load and WebSocket tick.
  2. Open orders for a market --- queried by the matching engine on every new order.
  3. A trader's positions --- queried on every portfolio page load.
  4. Recent trades --- queried for chart rendering and market activity feeds.

A well-designed schema separates these access patterns:

-- Markets table: one row per market, accessed by ID
CREATE TABLE markets (
    market_id UUID PRIMARY KEY,
    question TEXT NOT NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'open',
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    resolved_at TIMESTAMP WITH TIME ZONE,
    resolution VARCHAR(100),
    metadata JSONB DEFAULT '{}'
);

-- Orders table: partitioned by status for efficient querying
CREATE TABLE orders (
    order_id UUID PRIMARY KEY,
    market_id UUID NOT NULL REFERENCES markets(market_id),
    trader_id UUID NOT NULL,
    outcome VARCHAR(100) NOT NULL,
    side VARCHAR(4) NOT NULL CHECK (side IN ('buy', 'sell')),
    price DECIMAL(10, 4) NOT NULL CHECK (price > 0 AND price < 1),
    quantity INTEGER NOT NULL CHECK (quantity > 0),
    remaining INTEGER NOT NULL CHECK (remaining >= 0),
    status VARCHAR(20) NOT NULL DEFAULT 'open',
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- Trades table: append-only, partitioned by time
CREATE TABLE trades (
    trade_id UUID PRIMARY KEY,
    market_id UUID NOT NULL REFERENCES markets(market_id),
    buy_order_id UUID NOT NULL REFERENCES orders(order_id),
    sell_order_id UUID NOT NULL REFERENCES orders(order_id),
    price DECIMAL(10, 4) NOT NULL,
    quantity INTEGER NOT NULL,
    executed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

-- Positions table: materialized view of net positions
CREATE TABLE positions (
    trader_id UUID NOT NULL,
    market_id UUID NOT NULL,
    outcome VARCHAR(100) NOT NULL,
    net_quantity INTEGER NOT NULL DEFAULT 0,
    avg_price DECIMAL(10, 4),
    PRIMARY KEY (trader_id, market_id, outcome)
);

33.4.2 Indexing Strategies

Indexes must be designed for actual query patterns, not hypothetical ones. Here are the critical indexes for a prediction market:

-- Fast lookup of open orders for matching engine
CREATE INDEX idx_orders_market_open
    ON orders (market_id, outcome, side, price)
    WHERE status = 'open';

-- Fast lookup of a trader's orders
CREATE INDEX idx_orders_trader
    ON orders (trader_id, status);

-- Fast lookup of recent trades for a market (chart data)
CREATE INDEX idx_trades_market_time
    ON trades (market_id, executed_at DESC);

-- Fast lookup of active markets
CREATE INDEX idx_markets_status
    ON markets (status)
    WHERE status = 'open';

-- Full-text search on market questions
CREATE INDEX idx_markets_question_search
    ON markets USING gin(to_tsvector('english', question));

The partial index on orders WHERE status = 'open' is critical: in a mature platform, the vast majority of orders are filled or cancelled, but the matching engine only cares about open orders. A partial index keeps the index small and fast.

33.4.3 Connection Pooling

Database connections are expensive to establish (TCP handshake, TLS negotiation, authentication). A connection pool maintains a set of ready-to-use connections:

import asyncpg
import asyncio


class DatabasePool:
    """Manages a pool of database connections."""

    def __init__(
        self,
        dsn: str,
        min_size: int = 10,
        max_size: int = 50,
    ):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self._pool = None

    async def initialize(self):
        self._pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=30,
            max_inactive_connection_lifetime=300,
        )

    async def execute(self, query: str, *args):
        async with self._pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        async with self._pool.acquire() as conn:
            return await conn.fetch(query, *args)

    async def fetchrow(self, query: str, *args):
        async with self._pool.acquire() as conn:
            return await conn.fetchrow(query, *args)

    async def close(self):
        if self._pool:
            await self._pool.close()

Sizing the pool: A common formula is:

$$\text{pool\_size} = \frac{C \times (Q_{\text{avg}} + L_{\text{avg}})}{1000}$$

where $C$ is the expected concurrent requests, $Q_{\text{avg}}$ is the average query time in milliseconds, and $L_{\text{avg}}$ is the average network latency in milliseconds. For 1,000 concurrent requests with 5ms average query time and 1ms latency, this gives a pool size of 6. In practice, add headroom: a pool of 20--50 connections serves most prediction market workloads well.

33.4.4 Read Replicas

For read-heavy workloads (which prediction markets are --- far more price checks than trades), read replicas distribute the query load:

class ReadReplicaRouter:
    """Routes queries to primary or replica based on freshness needs."""

    def __init__(
        self,
        primary_pool: DatabasePool,
        replica_pools: List[DatabasePool],
    ):
        self.primary = primary_pool
        self.replicas = replica_pools
        self._replica_index = 0

    def _next_replica(self) -> DatabasePool:
        """Round-robin across replicas."""
        pool = self.replicas[self._replica_index]
        self._replica_index = (
            (self._replica_index + 1) % len(self.replicas)
        )
        return pool

    async def query(
        self,
        sql: str,
        *args,
        require_fresh: bool = False,
    ):
        if require_fresh or not self.replicas:
            return await self.primary.fetch(sql, *args)
        return await self._next_replica().fetch(sql, *args)

33.4.5 Table Partitioning

The trades table grows indefinitely and is best partitioned by time:

-- Range partition trades by month
CREATE TABLE trades (
    trade_id UUID NOT NULL,
    market_id UUID NOT NULL,
    price DECIMAL(10, 4) NOT NULL,
    quantity INTEGER NOT NULL,
    executed_at TIMESTAMP WITH TIME ZONE NOT NULL
) PARTITION BY RANGE (executed_at);

CREATE TABLE trades_2026_01 PARTITION OF trades
    FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');

CREATE TABLE trades_2026_02 PARTITION OF trades
    FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- ... and so on, created automatically by a cron job

Queries that include a time range predicate automatically target only the relevant partitions, dramatically reducing I/O for historical queries.


33.5 Caching Strategies

33.5.1 What to Cache

Not all data benefits equally from caching. The caching value of a data item depends on three factors:

$$V_{\text{cache}} = \frac{f_{\text{access}} \times c_{\text{compute}}}{r_{\text{change}}}$$

where $f_{\text{access}}$ is the access frequency, $c_{\text{compute}}$ is the computation cost, and $r_{\text{change}}$ is the rate of change. Data with high access frequency, high computation cost, and low change rate benefits most from caching.

Data Access Freq Compute Cost Change Rate Cache Value
Market list Very high Medium Low Very high
Market prices Very high Low High Medium (short TTL)
Order book depth High Medium High Medium (short TTL)
User portfolio Medium High Medium High
Leaderboard High Very high Low Very high
Trade history Medium Low Append-only High

33.5.2 Redis Caching Layer

import json
import time
import hashlib
from typing import Any, Optional, Callable


class PredictionMarketCache:
    """Caching layer for prediction market data."""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.hit_count = 0
        self.miss_count = 0

    async def get_or_compute(
        self,
        key: str,
        compute_fn: Callable,
        ttl_seconds: int = 60,
    ) -> Any:
        """Get from cache or compute and store."""
        cached = await self.redis.get(key)
        if cached is not None:
            self.hit_count += 1
            return json.loads(cached)

        self.miss_count += 1
        value = await compute_fn()
        await self.redis.setex(
            key,
            ttl_seconds,
            json.dumps(value, default=str),
        )
        return value

    async def get_market_price(
        self,
        market_id: str,
        compute_fn: Callable,
    ) -> dict:
        """Cache market prices with short TTL."""
        return await self.get_or_compute(
            f"market:price:{market_id}",
            compute_fn,
            ttl_seconds=2,  # 2 seconds for actively traded markets
        )

    async def get_market_list(
        self,
        compute_fn: Callable,
    ) -> list:
        """Cache the market list with medium TTL."""
        return await self.get_or_compute(
            "markets:active",
            compute_fn,
            ttl_seconds=30,
        )

    async def get_leaderboard(
        self,
        compute_fn: Callable,
    ) -> list:
        """Cache leaderboard with long TTL."""
        return await self.get_or_compute(
            "leaderboard:top100",
            compute_fn,
            ttl_seconds=300,  # 5 minutes
        )

    async def invalidate_market(self, market_id: str) -> None:
        """Invalidate all caches for a market after a trade."""
        keys = [
            f"market:price:{market_id}",
            f"market:orderbook:{market_id}",
            f"market:summary:{market_id}",
        ]
        await self.redis.delete(*keys)

    @property
    def hit_rate(self) -> float:
        total = self.hit_count + self.miss_count
        return self.hit_count / total if total > 0 else 0.0

33.5.3 Cache Invalidation Strategies

Cache invalidation is famously one of the two hard problems in computer science (along with naming things and off-by-one errors). For prediction markets, we use a hybrid strategy:

  1. TTL-based expiration: Every cached item has a time-to-live. This provides a safety net --- even if explicit invalidation fails, stale data is bounded.

  2. Event-driven invalidation: When events occur (trades, order placements), we explicitly invalidate affected cache entries. This keeps latency low for active markets.

  3. Write-through for critical data: Market prices are written to cache simultaneously with the event store, ensuring the cache is always up-to-date for this critical data point.

class CacheInvalidator:
    """Subscribes to events and invalidates relevant caches."""

    def __init__(self, cache: PredictionMarketCache):
        self.cache = cache

    async def handle_event(self, event: Event) -> None:
        if event.event_type == EventType.ORDER_MATCHED:
            await self.cache.invalidate_market(event.aggregate_id)
            # Also invalidate both traders' portfolio caches
            buy_trader = event.data.get("buy_trader_id")
            sell_trader = event.data.get("sell_trader_id")
            if buy_trader:
                await self.cache.redis.delete(
                    f"portfolio:{buy_trader}"
                )
            if sell_trader:
                await self.cache.redis.delete(
                    f"portfolio:{sell_trader}"
                )

        elif event.event_type == EventType.ORDER_PLACED:
            await self.cache.redis.delete(
                f"market:orderbook:{event.aggregate_id}"
            )

        elif event.event_type == EventType.MARKET_RESOLVED:
            await self.cache.invalidate_market(event.aggregate_id)
            await self.cache.redis.delete("markets:active")
            await self.cache.redis.delete("leaderboard:top100")

33.5.4 Multi-Level Caching

For the highest performance, use multiple cache levels:

  • L1: In-process cache (Python dict or LRU cache): Nanosecond access, limited by process memory, not shared across instances.
  • L2: Redis: Microsecond access over network, shared across all instances, persists across restarts.
  • L3: Database: Millisecond access, authoritative source of truth.
from functools import lru_cache


class MultiLevelCache:
    def __init__(self, redis_client):
        self.redis = redis_client
        self._l1 = {}
        self._l1_expiry = {}

    async def get(self, key: str, compute_fn, l1_ttl=1, l2_ttl=60):
        # L1: in-process
        now = time.time()
        if key in self._l1 and self._l1_expiry[key] > now:
            return self._l1[key]

        # L2: Redis
        cached = await self.redis.get(key)
        if cached is not None:
            value = json.loads(cached)
            self._l1[key] = value
            self._l1_expiry[key] = now + l1_ttl
            return value

        # L3: compute from database
        value = await compute_fn()
        await self.redis.setex(key, l2_ttl, json.dumps(value, default=str))
        self._l1[key] = value
        self._l1_expiry[key] = now + l1_ttl
        return value

33.6 Message Queues and Async Processing

33.6.1 Why Decouple?

A synchronous request-response cycle for order placement might involve: validating the order, checking funds, inserting into the order book, attempting to match, updating positions for both traders, sending WebSocket notifications, updating analytics, and recording audit logs. If all of this happens synchronously, the trader waits for all steps to complete before receiving confirmation.

Message queues decouple the critical path (validate, check funds, match) from secondary operations (notifications, analytics, audit). This reduces perceived latency and improves reliability: if the notification service is temporarily down, orders still process.

33.6.2 Queue Architecture for Prediction Markets

                    +------------------+
                    |   API Gateway    |
                    +--------+---------+
                             |
                    +--------v---------+
                    |  Command Handler |
                    +--------+---------+
                             |
              +--------------+--------------+
              |              |              |
     +--------v-----+  +----v------+  +----v-------+
     | Order Queue   |  | Notif.   |  | Analytics  |
     | (high prio)   |  | Queue    |  | Queue      |
     +--------+------+  +----+-----+  +----+-------+
              |              |              |
     +--------v------+  +---v------+  +----v-------+
     | Matching       |  | Notif.  |  | Analytics  |
     | Engine         |  | Worker  |  | Worker     |
     +--------+------+  +---------+  +------------+
              |
     +--------v------+
     | Settlement     |
     | Queue          |
     +--------+------+
              |
     +--------v------+
     | Settlement     |
     | Worker         |
     +----------------+

33.6.3 Python Message Queue Implementation

import asyncio
import json
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from enum import Enum
from collections import defaultdict


class QueuePriority(Enum):
    CRITICAL = 0   # Order matching, fund transfers
    HIGH = 1       # Price updates, position updates
    NORMAL = 2     # Notifications, analytics
    LOW = 3        # Reports, cleanup tasks


@dataclass
class QueueMessage:
    message_id: str
    queue_name: str
    priority: QueuePriority
    payload: Dict[str, Any]
    timestamp: float
    retry_count: int = 0
    max_retries: int = 3


class InMemoryMessageQueue:
    """
    Simple priority message queue for demonstration.
    In production, replace with Redis Streams or RabbitMQ.
    """

    def __init__(self):
        self._queues: Dict[str, asyncio.PriorityQueue] = {}
        self._handlers: Dict[str, Callable] = {}
        self._dead_letter: List[QueueMessage] = []
        self._processed_count: Dict[str, int] = defaultdict(int)

    def declare_queue(self, name: str, handler: Callable) -> None:
        self._queues[name] = asyncio.PriorityQueue()
        self._handlers[name] = handler

    async def publish(self, message: QueueMessage) -> None:
        queue = self._queues.get(message.queue_name)
        if queue is None:
            raise ValueError(f"Unknown queue: {message.queue_name}")
        await queue.put((message.priority.value, message.timestamp, message))

    async def consume(self, queue_name: str) -> None:
        """Consume messages from a queue indefinitely."""
        queue = self._queues[queue_name]
        handler = self._handlers[queue_name]

        while True:
            _, _, message = await queue.get()
            try:
                await handler(message)
                self._processed_count[queue_name] += 1
            except Exception as e:
                message.retry_count += 1
                if message.retry_count <= message.max_retries:
                    await self.publish(message)
                else:
                    self._dead_letter.append(message)
                    print(
                        f"Message {message.message_id} "
                        f"moved to dead letter queue: {e}"
                    )
            finally:
                queue.task_done()


class OrderProcessingPipeline:
    """Orchestrates order processing through queues."""

    def __init__(self, message_queue: InMemoryMessageQueue):
        self.mq = message_queue
        self._setup_queues()

    def _setup_queues(self):
        self.mq.declare_queue("orders", self._handle_order)
        self.mq.declare_queue("notifications", self._handle_notification)
        self.mq.declare_queue("analytics", self._handle_analytics)
        self.mq.declare_queue("settlement", self._handle_settlement)

    async def submit_order(self, order_data: dict) -> str:
        message_id = str(uuid.uuid4())
        await self.mq.publish(QueueMessage(
            message_id=message_id,
            queue_name="orders",
            priority=QueuePriority.CRITICAL,
            payload=order_data,
            timestamp=time.time(),
        ))
        return message_id

    async def _handle_order(self, message: QueueMessage) -> None:
        """Process an order: validate, match, emit follow-up messages."""
        order = message.payload
        # ... matching logic ...

        # Emit notification
        await self.mq.publish(QueueMessage(
            message_id=str(uuid.uuid4()),
            queue_name="notifications",
            priority=QueuePriority.NORMAL,
            payload={
                "type": "order_confirmed",
                "trader_id": order["trader_id"],
                "order_id": order["order_id"],
            },
            timestamp=time.time(),
        ))

        # Emit analytics event
        await self.mq.publish(QueueMessage(
            message_id=str(uuid.uuid4()),
            queue_name="analytics",
            priority=QueuePriority.LOW,
            payload={
                "type": "order_processed",
                "market_id": order["market_id"],
                "price": order["price"],
                "quantity": order["quantity"],
            },
            timestamp=time.time(),
        ))

    async def _handle_notification(self, message: QueueMessage) -> None:
        """Send notification to trader (email, push, WebSocket)."""
        payload = message.payload
        print(
            f"Notifying trader {payload['trader_id']}: "
            f"{payload['type']}"
        )

    async def _handle_analytics(self, message: QueueMessage) -> None:
        """Record analytics event."""
        payload = message.payload
        print(f"Analytics: {payload['type']} for {payload.get('market_id')}")

    async def _handle_settlement(self, message: QueueMessage) -> None:
        """Settle a resolved market."""
        payload = message.payload
        print(f"Settling market {payload['market_id']}")

33.6.4 Dead Letter Queues and Retry Strategies

Messages that fail repeatedly should not be retried forever. A dead letter queue (DLQ) captures these failures for manual inspection. The retry strategy should use exponential backoff:

$$t_{\text{retry}}(n) = t_{\text{base}} \cdot 2^{n-1} + \text{jitter}$$

where $n$ is the retry attempt number and $\text{jitter}$ is a random value to prevent thundering herd effects. For prediction markets, $t_{\text{base}} = 1\text{s}$ and $\max(n) = 5$ gives retry delays of 1s, 2s, 4s, 8s, 16s before the message is moved to the DLQ.


33.7 Horizontal Scaling

33.7.1 Stateless API Servers

The first step in horizontal scaling is making API servers stateless. No server-local state should be required to process a request:

  • Session data goes to Redis, not server memory.
  • File uploads go to object storage (S3, GCS), not local disk.
  • Cached data goes to shared Redis, not in-process caches (or use in-process caches only as L1 with short TTLs).

Once servers are stateless, scaling is a matter of adding more instances behind a load balancer.

33.7.2 Load Balancing

For prediction markets, the load balancing strategy must account for WebSocket connections:

class LoadBalancerConfig:
    """Configuration for prediction market load balancing."""

    def __init__(self):
        self.rules = {
            # REST API: round-robin across all instances
            "api": {
                "algorithm": "round_robin",
                "health_check": "/health",
                "health_interval_seconds": 10,
                "max_connections_per_server": 10000,
            },
            # WebSocket: sticky sessions (IP hash)
            # because WS connections are long-lived
            "websocket": {
                "algorithm": "ip_hash",
                "health_check": "/ws/health",
                "health_interval_seconds": 5,
                "max_connections_per_server": 50000,
            },
            # Matching engine: route by market_id
            # All orders for a given market go to the same instance
            "matching": {
                "algorithm": "consistent_hash",
                "hash_key": "market_id",
                "health_check": "/matching/health",
                "health_interval_seconds": 3,
            },
        }

33.7.3 Sharding the Matching Engine

The matching engine is the most latency-sensitive and hardest-to-scale component. Since orders for a given market must be processed sequentially (to maintain order book integrity), we cannot simply load-balance matching requests.

The solution is sharding by market: each matching engine instance is responsible for a subset of markets. This is a form of data parallelism where each shard operates independently.

import hashlib


class MarketShardRouter:
    """Routes market operations to the correct shard."""

    def __init__(self, num_shards: int):
        self.num_shards = num_shards
        self._shard_map: Dict[str, int] = {}

    def get_shard(self, market_id: str) -> int:
        """Consistent hashing to determine shard assignment."""
        if market_id not in self._shard_map:
            hash_val = int(
                hashlib.md5(market_id.encode()).hexdigest(), 16
            )
            self._shard_map[market_id] = hash_val % self.num_shards
        return self._shard_map[market_id]

    def rebalance(self, new_num_shards: int) -> Dict[str, int]:
        """
        Returns a migration plan when changing shard count.
        In production, use consistent hashing with virtual nodes
        to minimize data movement.
        """
        migrations = {}
        old_num = self.num_shards
        self.num_shards = new_num_shards

        for market_id, old_shard in self._shard_map.items():
            new_shard = int(
                hashlib.md5(market_id.encode()).hexdigest(), 16
            ) % new_num_shards
            if new_shard != old_shard:
                migrations[market_id] = {
                    "from": old_shard,
                    "to": new_shard
                }
            self._shard_map[market_id] = new_shard

        return migrations

33.7.4 Scaling Architecture Overview

A fully scaled prediction market platform looks like this:

                         +-------------------+
                         |   CDN / Edge      |
                         | (static assets,   |
                         |  rate limiting)    |
                         +---------+---------+
                                   |
                         +---------v---------+
                         |   Load Balancer   |
                         | (L7, TLS term.)   |
                         +---------+---------+
                                   |
              +--------------------+--------------------+
              |                    |                    |
    +---------v-------+  +--------v--------+  +--------v--------+
    | API Server 1    |  | API Server 2    |  | API Server N    |
    | (stateless)     |  | (stateless)     |  | (stateless)     |
    +---------+-------+  +--------+--------+  +--------+--------+
              |                    |                    |
              +---------+----------+---------+---------+
                        |                    |
              +---------v-------+  +---------v-------+
              | Matching Shard 1|  | Matching Shard M|
              | (markets A-F)   |  | (markets G-Z)   |
              +---------+-------+  +---------+-------+
                        |                    |
              +---------v--------------------v--------+
              |          Event Store (Primary)        |
              +--------+---+---+---+---------+--------+
                       |   |   |   |         |
                       v   v   v   v         v
                    Read Replicas      Redis Cluster

33.7.5 Auto-Scaling Policies

Auto-scaling rules should be based on leading indicators, not lagging ones:

class AutoScalingPolicy:
    """Defines when to scale API servers up or down."""

    def __init__(self):
        self.rules = {
            "scale_up": {
                "cpu_utilization_above": 70,      # percent
                "memory_utilization_above": 80,    # percent
                "request_latency_p99_above": 150,  # milliseconds
                "queue_depth_above": 1000,         # messages
                "evaluation_period": 120,          # seconds
                "cooldown_period": 300,            # seconds
                "scale_increment": 2,              # add 2 instances
            },
            "scale_down": {
                "cpu_utilization_below": 30,
                "memory_utilization_below": 40,
                "request_latency_p99_below": 50,
                "queue_depth_below": 10,
                "evaluation_period": 600,          # 10 min (slower)
                "cooldown_period": 600,
                "scale_decrement": 1,
            },
            "limits": {
                "min_instances": 3,
                "max_instances": 50,
            },
        }

    def evaluate(self, metrics: dict) -> str:
        """Returns 'scale_up', 'scale_down', or 'no_change'."""
        up = self.rules["scale_up"]
        down = self.rules["scale_down"]

        if (metrics.get("cpu_utilization", 0) > up["cpu_utilization_above"]
                or metrics.get("request_latency_p99", 0)
                > up["request_latency_p99_above"]
                or metrics.get("queue_depth", 0)
                > up["queue_depth_above"]):
            return "scale_up"

        if (metrics.get("cpu_utilization", 0)
                < down["cpu_utilization_below"]
                and metrics.get("request_latency_p99", 0)
                < down["request_latency_p99_below"]
                and metrics.get("queue_depth", 0)
                < down["queue_depth_below"]):
            return "scale_down"

        return "no_change"

33.8 Monitoring and Alerting

33.8.1 The Four Golden Signals

Google's Site Reliability Engineering book identifies four golden signals that every service should monitor:

  1. Latency: The time it takes to serve a request. Distinguish between the latency of successful requests and failed requests (a fast error is still an error).

  2. Traffic: The demand on the system. For prediction markets: requests per second, orders per second, WebSocket messages per second.

  3. Errors: The rate of requests that fail. Include both explicit errors (HTTP 5xx) and implicit errors (HTTP 200 but with incorrect content).

  4. Saturation: How "full" the system is. CPU usage, memory usage, disk I/O, database connection pool utilization, message queue depth.

33.8.2 Prediction Market-Specific Metrics

Beyond the four golden signals, prediction markets have domain-specific metrics that require monitoring:

from dataclasses import dataclass, field
from typing import Dict, List
import time
import statistics


@dataclass
class MarketMetrics:
    """Metrics specific to prediction market operations."""

    # Matching engine performance
    orders_per_second: float = 0.0
    trades_per_second: float = 0.0
    matching_latency_ms: List[float] = field(default_factory=list)

    # Market health
    active_markets: int = 0
    markets_with_no_trades_24h: int = 0
    total_open_orders: int = 0
    total_positions: int = 0

    # Price feed health
    price_update_latency_ms: List[float] = field(default_factory=list)
    websocket_connections: int = 0
    stale_price_feeds: int = 0  # feeds not updated in > 5 seconds

    # Financial metrics
    total_volume_24h: float = 0.0
    total_fees_collected_24h: float = 0.0
    pending_settlements: int = 0

    # Queue health
    order_queue_depth: int = 0
    notification_queue_depth: int = 0
    settlement_queue_depth: int = 0

    def summary(self) -> Dict:
        return {
            "orders_per_second": self.orders_per_second,
            "trades_per_second": self.trades_per_second,
            "matching_latency_p50": (
                statistics.median(self.matching_latency_ms)
                if self.matching_latency_ms else 0
            ),
            "matching_latency_p99": (
                (
                    sorted(self.matching_latency_ms)[
                        int(len(self.matching_latency_ms) * 0.99)
                    ]
                ) if self.matching_latency_ms else 0
            ),
            "active_markets": self.active_markets,
            "websocket_connections": self.websocket_connections,
            "order_queue_depth": self.order_queue_depth,
        }

33.8.3 Prometheus-Style Metrics Collection

import time
from typing import Dict, Optional
from collections import defaultdict
import threading


class MetricsCollector:
    """
    Collects and exposes metrics in Prometheus format.
    Thread-safe for use in multi-threaded servers.
    """

    def __init__(self):
        self._counters: Dict[str, float] = defaultdict(float)
        self._gauges: Dict[str, float] = defaultdict(float)
        self._histograms: Dict[str, List[float]] = defaultdict(list)
        self._lock = threading.Lock()

    def increment_counter(
        self, name: str, value: float = 1.0, labels: Dict = None
    ) -> None:
        key = self._make_key(name, labels)
        with self._lock:
            self._counters[key] += value

    def set_gauge(
        self, name: str, value: float, labels: Dict = None
    ) -> None:
        key = self._make_key(name, labels)
        with self._lock:
            self._gauges[key] = value

    def observe_histogram(
        self, name: str, value: float, labels: Dict = None
    ) -> None:
        key = self._make_key(name, labels)
        with self._lock:
            self._histograms[key].append(value)
            # Keep only last 10000 observations to bound memory
            if len(self._histograms[key]) > 10000:
                self._histograms[key] = self._histograms[key][-5000:]

    def _make_key(self, name: str, labels: Optional[Dict]) -> str:
        if labels:
            label_str = ",".join(
                f'{k}="{v}"' for k, v in sorted(labels.items())
            )
            return f"{name}{{{label_str}}}"
        return name

    def export_prometheus(self) -> str:
        """Export metrics in Prometheus text format."""
        lines = []
        with self._lock:
            for key, value in self._counters.items():
                lines.append(f"{key} {value}")

            for key, value in self._gauges.items():
                lines.append(f"{key} {value}")

            for key, values in self._histograms.items():
                if values:
                    sorted_vals = sorted(values)
                    n = len(sorted_vals)
                    lines.append(
                        f"{key}_p50 {sorted_vals[int(n * 0.5)]}"
                    )
                    lines.append(
                        f"{key}_p95 {sorted_vals[int(n * 0.95)]}"
                    )
                    lines.append(
                        f"{key}_p99 {sorted_vals[min(int(n * 0.99), n - 1)]}"
                    )
                    lines.append(
                        f"{key}_count {n}"
                    )

        return "\n".join(lines)


# Global metrics instance
metrics = MetricsCollector()


class TimingContext:
    """Context manager for timing operations."""

    def __init__(self, metric_name: str, labels: Dict = None):
        self.metric_name = metric_name
        self.labels = labels
        self.start_time = None

    def __enter__(self):
        self.start_time = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        elapsed_ms = (time.perf_counter() - self.start_time) * 1000
        metrics.observe_histogram(
            self.metric_name, elapsed_ms, self.labels
        )
        if exc_type:
            metrics.increment_counter(
                f"{self.metric_name}_errors", labels=self.labels
            )
        return False

33.8.4 Health Checks

Health checks serve two purposes: load balancer routing and automated alerting. A prediction market platform needs both shallow and deep health checks:

import asyncio
import time
from typing import Dict


class HealthChecker:
    """
    Multi-level health checks for prediction market services.
    """

    def __init__(self, db_pool, redis_client, event_store):
        self.db = db_pool
        self.redis = redis_client
        self.event_store = event_store

    async def shallow_check(self) -> Dict:
        """
        Fast check for load balancer.
        Returns 200 if the process is running and accepting connections.
        Should complete in < 1ms.
        """
        return {"status": "ok", "timestamp": time.time()}

    async def deep_check(self) -> Dict:
        """
        Comprehensive check of all dependencies.
        Used for alerting, not for load balancer routing.
        """
        checks = {}

        # Database connectivity
        try:
            start = time.perf_counter()
            await self.db.fetchrow("SELECT 1")
            latency = (time.perf_counter() - start) * 1000
            checks["database"] = {
                "status": "ok",
                "latency_ms": round(latency, 2),
            }
        except Exception as e:
            checks["database"] = {
                "status": "error",
                "error": str(e),
            }

        # Redis connectivity
        try:
            start = time.perf_counter()
            await self.redis.ping()
            latency = (time.perf_counter() - start) * 1000
            checks["redis"] = {
                "status": "ok",
                "latency_ms": round(latency, 2),
            }
        except Exception as e:
            checks["redis"] = {
                "status": "error",
                "error": str(e),
            }

        # Event store lag
        try:
            recent_events = self.event_store.get_all_events(
                after_timestamp=time.time() - 60
            )
            checks["event_store"] = {
                "status": "ok",
                "events_last_60s": len(recent_events),
            }
        except Exception as e:
            checks["event_store"] = {
                "status": "error",
                "error": str(e),
            }

        overall = "ok" if all(
            c["status"] == "ok" for c in checks.values()
        ) else "degraded"

        return {
            "status": overall,
            "checks": checks,
            "timestamp": time.time(),
        }

33.8.5 Alerting Rules

Alerts should be actionable. A good alert answers: what is broken, how badly, and what should be done about it.

@dataclass
class AlertRule:
    name: str
    condition: str        # Human-readable description
    metric: str
    threshold: float
    comparison: str       # "above" or "below"
    duration_seconds: int # Must be true for this long
    severity: str         # "critical", "warning", "info"
    runbook_url: str      # Link to remediation steps


PREDICTION_MARKET_ALERTS = [
    AlertRule(
        name="high_matching_latency",
        condition="Matching engine p99 latency exceeds 200ms",
        metric="matching_latency_ms_p99",
        threshold=200,
        comparison="above",
        duration_seconds=120,
        severity="critical",
        runbook_url="/runbooks/high-matching-latency",
    ),
    AlertRule(
        name="order_queue_backing_up",
        condition="Order queue depth exceeds 500 messages",
        metric="order_queue_depth",
        threshold=500,
        comparison="above",
        duration_seconds=60,
        severity="critical",
        runbook_url="/runbooks/order-queue-backup",
    ),
    AlertRule(
        name="high_error_rate",
        condition="API error rate exceeds 5%",
        metric="api_error_rate_percent",
        threshold=5.0,
        comparison="above",
        duration_seconds=300,
        severity="warning",
        runbook_url="/runbooks/high-error-rate",
    ),
    AlertRule(
        name="database_connections_high",
        condition="Database connection pool > 80% utilized",
        metric="db_pool_utilization_percent",
        threshold=80,
        comparison="above",
        duration_seconds=300,
        severity="warning",
        runbook_url="/runbooks/db-connections",
    ),
    AlertRule(
        name="stale_prices",
        condition="More than 10 markets with stale prices (>30s old)",
        metric="stale_price_feeds",
        threshold=10,
        comparison="above",
        duration_seconds=60,
        severity="warning",
        runbook_url="/runbooks/stale-prices",
    ),
    AlertRule(
        name="settlement_delayed",
        condition="Pending settlements older than 10 minutes",
        metric="oldest_pending_settlement_seconds",
        threshold=600,
        comparison="above",
        duration_seconds=60,
        severity="critical",
        runbook_url="/runbooks/settlement-delayed",
    ),
]

33.9 Security and DDoS Mitigation

33.9.1 Threat Model

Prediction markets face unique security threats:

  1. Market manipulation: Attackers flood the order book with fake orders to move prices, then trade on the manipulated price.
  2. DDoS for profit: Attackers prevent legitimate trading during an event, hoping their existing positions benefit from the information vacuum.
  3. Data exfiltration: Stealing user data, trading history, or internal market parameters.
  4. Account takeover: Gaining control of accounts to place unauthorized trades or withdraw funds.
  5. API abuse: Automated scraping of market data, excessive API usage, or brute-force attacks.

33.9.2 Rate Limiting

Rate limiting is the first line of defense against abuse. A prediction market needs multiple rate limiting tiers:

import time
from typing import Dict, Tuple, Optional
from collections import defaultdict
from dataclasses import dataclass


@dataclass
class RateLimitConfig:
    requests_per_second: float
    burst_size: int
    penalty_duration_seconds: int = 60


class TokenBucketRateLimiter:
    """
    Token bucket rate limiter with per-user and per-IP limits.
    """

    def __init__(self):
        self._buckets: Dict[str, Tuple[float, float]] = {}
        self._configs: Dict[str, RateLimitConfig] = {
            "api_general": RateLimitConfig(
                requests_per_second=10,
                burst_size=20,
            ),
            "order_placement": RateLimitConfig(
                requests_per_second=5,
                burst_size=10,
            ),
            "market_data": RateLimitConfig(
                requests_per_second=30,
                burst_size=60,
            ),
            "authentication": RateLimitConfig(
                requests_per_second=1,
                burst_size=5,
                penalty_duration_seconds=300,
            ),
        }
        self._penalties: Dict[str, float] = {}

    def check(
        self,
        identifier: str,
        limit_type: str = "api_general",
    ) -> Tuple[bool, Dict]:
        """
        Returns (allowed, info_dict).
        info_dict contains remaining tokens and retry-after if blocked.
        """
        now = time.time()

        # Check penalty box
        penalty_key = f"{identifier}:penalty"
        if penalty_key in self._penalties:
            if now < self._penalties[penalty_key]:
                retry_after = self._penalties[penalty_key] - now
                return False, {
                    "reason": "rate_limit_penalty",
                    "retry_after": retry_after,
                }
            else:
                del self._penalties[penalty_key]

        config = self._configs.get(limit_type)
        if not config:
            return True, {"remaining": -1}

        bucket_key = f"{identifier}:{limit_type}"

        if bucket_key in self._buckets:
            tokens, last_time = self._buckets[bucket_key]
            # Add tokens based on elapsed time
            elapsed = now - last_time
            tokens = min(
                config.burst_size,
                tokens + elapsed * config.requests_per_second,
            )
        else:
            tokens = config.burst_size

        if tokens >= 1:
            self._buckets[bucket_key] = (tokens - 1, now)
            return True, {"remaining": int(tokens - 1)}
        else:
            self._buckets[bucket_key] = (tokens, now)
            return False, {
                "reason": "rate_limit_exceeded",
                "retry_after": (1 - tokens) / config.requests_per_second,
            }

    def penalize(self, identifier: str, limit_type: str) -> None:
        """Put an identifier in the penalty box."""
        config = self._configs.get(limit_type)
        if config:
            penalty_key = f"{identifier}:penalty"
            self._penalties[penalty_key] = (
                time.time() + config.penalty_duration_seconds
            )

33.9.3 Input Validation and SQL Injection Prevention

Every input from the user must be validated. Never trust client-side validation alone.

import re
from decimal import Decimal, InvalidOperation
from typing import Optional


class InputValidator:
    """Validates all user inputs for the prediction market API."""

    # Only alphanumeric, spaces, and basic punctuation in market questions
    QUESTION_PATTERN = re.compile(r'^[\w\s\?\.\,\-\:\;\!\'\"\(\)]{10,500}$')

    # UUIDs
    UUID_PATTERN = re.compile(
        r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
    )

    @staticmethod
    def validate_price(price_str: str) -> Decimal:
        """Validate and parse a price value."""
        try:
            price = Decimal(price_str)
        except (InvalidOperation, TypeError, ValueError):
            raise ValueError("Price must be a valid decimal number")

        if price <= 0 or price >= 1:
            raise ValueError("Price must be between 0 and 1 (exclusive)")

        # Limit precision to 4 decimal places
        if price != round(price, 4):
            raise ValueError("Price precision limited to 4 decimal places")

        return price

    @staticmethod
    def validate_quantity(quantity_str: str) -> int:
        """Validate and parse a quantity value."""
        try:
            quantity = int(quantity_str)
        except (TypeError, ValueError):
            raise ValueError("Quantity must be an integer")

        if quantity <= 0:
            raise ValueError("Quantity must be positive")

        if quantity > 1_000_000:
            raise ValueError("Quantity exceeds maximum (1,000,000)")

        return quantity

    @classmethod
    def validate_uuid(cls, value: str) -> str:
        """Validate a UUID string."""
        if not cls.UUID_PATTERN.match(value.lower()):
            raise ValueError("Invalid UUID format")
        return value.lower()

    @classmethod
    def validate_question(cls, question: str) -> str:
        """Validate a market question."""
        question = question.strip()
        if not cls.QUESTION_PATTERN.match(question):
            raise ValueError(
                "Question must be 10-500 characters, "
                "alphanumeric with basic punctuation"
            )
        return question

    @staticmethod
    def validate_outcome(outcome: str) -> str:
        """Validate an outcome name."""
        outcome = outcome.strip()
        if not outcome or len(outcome) > 100:
            raise ValueError("Outcome must be 1-100 characters")
        if not outcome.replace(' ', '').replace('-', '').isalnum():
            raise ValueError("Outcome must be alphanumeric")
        return outcome

    @staticmethod
    def validate_side(side: str) -> str:
        """Validate order side."""
        side = side.lower().strip()
        if side not in ("buy", "sell"):
            raise ValueError("Side must be 'buy' or 'sell'")
        return side

33.9.4 API Key Management

import hashlib
import secrets
import time
from dataclasses import dataclass
from typing import Dict, Optional, Set


@dataclass
class APIKey:
    key_hash: str          # We never store the raw key
    user_id: str
    permissions: Set[str]  # e.g., {"read", "trade", "admin"}
    created_at: float
    expires_at: Optional[float]
    rate_limit_tier: str   # "standard", "premium", "institutional"
    is_active: bool = True


class APIKeyManager:
    """Manages API keys for programmatic access."""

    def __init__(self):
        self._keys: Dict[str, APIKey] = {}  # hash -> APIKey

    def create_key(
        self,
        user_id: str,
        permissions: Set[str],
        rate_limit_tier: str = "standard",
        expires_in_days: Optional[int] = 365,
    ) -> str:
        """
        Create a new API key. Returns the raw key (only shown once).
        """
        raw_key = f"pm_{secrets.token_urlsafe(32)}"
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()

        expires_at = None
        if expires_in_days:
            expires_at = time.time() + (expires_in_days * 86400)

        self._keys[key_hash] = APIKey(
            key_hash=key_hash,
            user_id=user_id,
            permissions=permissions,
            created_at=time.time(),
            expires_at=expires_at,
            rate_limit_tier=rate_limit_tier,
        )

        return raw_key

    def validate_key(
        self,
        raw_key: str,
        required_permission: str,
    ) -> Optional[APIKey]:
        """Validate an API key and check permissions."""
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
        api_key = self._keys.get(key_hash)

        if not api_key:
            return None

        if not api_key.is_active:
            return None

        if api_key.expires_at and time.time() > api_key.expires_at:
            api_key.is_active = False
            return None

        if required_permission not in api_key.permissions:
            return None

        return api_key

    def revoke_key(self, raw_key: str) -> bool:
        """Revoke an API key."""
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
        if key_hash in self._keys:
            self._keys[key_hash].is_active = False
            return True
        return False

33.9.5 DDoS Mitigation Strategies

A layered defense approach:

Layer 1: Edge / CDN - Absorb volumetric attacks at the network edge. - Cache static assets and read-only API responses at CDN points of presence. - Use Anycast routing to distribute attack traffic globally.

Layer 2: Application Load Balancer - TLS termination (the TLS handshake itself can be used as a DDoS vector). - Connection rate limiting per IP. - Geographic blocking if the platform only operates in specific regions.

Layer 3: Application - Token bucket rate limiting (per user, per IP, per API key). - Request validation (reject malformed requests early, before they reach business logic). - Adaptive throttling: under load, reduce rate limits dynamically.

Layer 4: Infrastructure - Auto-scaling to absorb legitimate traffic spikes (which may resemble DDoS). - Circuit breakers to prevent cascading failures. - Graceful degradation: during extreme load, disable non-essential features.

class AdaptiveThrottler:
    """
    Adjusts rate limits based on system load.
    Under normal conditions, allows full rate.
    Under high load, progressively reduces allowed rates.
    """

    def __init__(self, base_rate: float = 10.0):
        self.base_rate = base_rate
        self.current_load = 0.0  # 0.0 to 1.0

    def update_load(self, cpu_percent: float, queue_depth: int) -> None:
        """Update current load estimate."""
        cpu_load = cpu_percent / 100.0
        queue_load = min(queue_depth / 1000.0, 1.0)
        self.current_load = max(cpu_load, queue_load)

    @property
    def effective_rate(self) -> float:
        """
        Reduce rate limit as load increases.
        At 50% load: full rate.
        At 75% load: 50% rate.
        At 90% load: 10% rate.
        At 100% load: 1% rate (emergency mode).
        """
        if self.current_load < 0.5:
            return self.base_rate

        # Exponential reduction above 50% load
        reduction = ((self.current_load - 0.5) / 0.5) ** 2
        return max(
            self.base_rate * (1 - reduction * 0.99),
            self.base_rate * 0.01,
        )

    def should_allow(self, identifier: str) -> bool:
        """
        Probabilistic admission control.
        Higher-tier users get priority.
        """
        effective = self.effective_rate
        if effective >= self.base_rate:
            return True

        # Admission probability
        probability = effective / self.base_rate
        return secrets.randbelow(1000) < int(probability * 1000)

33.9.6 Circuit Breaker Pattern

When a downstream service is failing, continuing to send it requests makes the problem worse. A circuit breaker stops requests to failing services and periodically tests if they have recovered:

class CircuitBreaker:
    """
    Circuit breaker for downstream service calls.

    States:
    - CLOSED: normal operation, requests flow through
    - OPEN: service is failing, requests are rejected immediately
    - HALF_OPEN: testing if service has recovered
    """

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_max_calls: int = 3,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls

        self.state = "CLOSED"
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.half_open_calls = 0

    def can_proceed(self) -> bool:
        if self.state == "CLOSED":
            return True

        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
                self.half_open_calls = 0
                return True
            return False

        if self.state == "HALF_OPEN":
            if self.half_open_calls < self.half_open_max_calls:
                self.half_open_calls += 1
                return True
            return False

        return False

    def record_success(self) -> None:
        if self.state == "HALF_OPEN":
            self.state = "CLOSED"
        self.failure_count = 0

    def record_failure(self) -> None:
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"

        if self.state == "HALF_OPEN":
            self.state = "OPEN"

33.10 Disaster Recovery and High Availability

33.10.1 Failure Modes

Understanding potential failure modes is the first step in designing for resilience:

Failure Mode Probability Impact Mitigation
Single server crash High Low (if redundant) Auto-restart, health checks, load balancer drains
Database primary failure Medium High Automated failover to standby
Network partition Low High Multi-AZ deployment, split-brain detection
Entire AZ failure Low Very High Multi-AZ, cross-region replication
Data corruption Very Low Critical Point-in-time recovery, event sourcing replay
Security breach Low Critical Incident response plan, key rotation

33.10.2 Backup Strategies

@dataclass
class BackupPolicy:
    """Defines backup strategy for prediction market data."""

    # Event store: the most critical data
    event_store_backup = {
        "type": "continuous_replication",
        "method": "streaming_replication",
        "replicas": 2,
        "cross_region": True,
        "retention_days": 365 * 7,  # 7 years for regulatory compliance
    }

    # Database: point-in-time recovery
    database_backup = {
        "full_backup_interval_hours": 24,
        "incremental_interval_hours": 1,
        "wal_archiving": True,  # Write-Ahead Log for PITR
        "retention_days": 90,
        "cross_region_copy": True,
    }

    # Redis: less critical (can be rebuilt from event store)
    redis_backup = {
        "rdb_interval_hours": 6,
        "aof_enabled": True,
        "retention_days": 7,
    }

    # Configuration and secrets
    config_backup = {
        "method": "version_controlled",
        "secrets_method": "encrypted_vault",
    }

33.10.3 Failover Mechanisms

Automated failover for the database:

class DatabaseFailoverManager:
    """Manages automated database failover."""

    def __init__(
        self,
        primary_dsn: str,
        standby_dsns: List[str],
        health_check_interval: float = 5.0,
        failure_threshold: int = 3,
    ):
        self.primary_dsn = primary_dsn
        self.standby_dsns = standby_dsns
        self.health_check_interval = health_check_interval
        self.failure_threshold = failure_threshold
        self.consecutive_failures = 0
        self.is_primary_healthy = True

    async def health_check_loop(self):
        """Continuously monitor primary health."""
        while True:
            try:
                healthy = await self._check_primary()
                if healthy:
                    self.consecutive_failures = 0
                    self.is_primary_healthy = True
                else:
                    self.consecutive_failures += 1
                    if (self.consecutive_failures
                            >= self.failure_threshold):
                        await self._initiate_failover()
            except Exception as e:
                self.consecutive_failures += 1
                if (self.consecutive_failures
                        >= self.failure_threshold):
                    await self._initiate_failover()

            await asyncio.sleep(self.health_check_interval)

    async def _check_primary(self) -> bool:
        """Check if primary database is responsive."""
        try:
            # Attempt a simple query with timeout
            pool = await asyncpg.create_pool(
                self.primary_dsn, min_size=1, max_size=1
            )
            async with pool.acquire() as conn:
                await asyncio.wait_for(
                    conn.fetchval("SELECT 1"), timeout=5.0
                )
            await pool.close()
            return True
        except Exception:
            return False

    async def _initiate_failover(self):
        """Promote standby to primary."""
        self.is_primary_healthy = False
        print(
            f"ALERT: Primary database failed after "
            f"{self.consecutive_failures} checks. "
            f"Initiating failover..."
        )

        # In a real system, this would:
        # 1. Promote the standby to primary
        # 2. Update DNS or connection strings
        # 3. Notify on-call engineers
        # 4. Log the failover event
        if self.standby_dsns:
            self.primary_dsn = self.standby_dsns.pop(0)
            self.consecutive_failures = 0
            print(f"Failover complete. New primary: {self.primary_dsn}")

33.10.4 Multi-Region Deployment

For prediction markets serving a global audience, multi-region deployment provides both performance (lower latency for geographically distributed users) and resilience (survives regional outages).

Key considerations:

  1. Single writer, multiple readers: The event store has one primary writer in one region. Read replicas exist in all regions.

  2. Latency budget: A user in Europe placing an order on a platform with its primary in the US will experience at least 70--100ms of network latency. This is acceptable for most prediction market use cases.

  3. Conflict resolution: If the primary region fails and a standby is promoted, any writes that were acknowledged by the old primary but not yet replicated are potentially lost. Event sourcing mitigates this: clients can resend unacknowledged commands.

  4. Regulatory considerations: Some jurisdictions require data to remain within geographic boundaries. This constrains replication topology.

33.10.5 Runbook Template

Every alert should link to a runbook. Here is a template:

## Runbook: [Alert Name]

### Symptoms
- What the alert looks like
- What users are experiencing

### Impact
- Which services are affected
- Revenue impact estimate

### Diagnosis
1. Check [specific dashboard]
2. Run [specific query/command]
3. Look for [specific log pattern]

### Resolution
#### Immediate Mitigation
1. [Step to reduce immediate impact]

#### Root Cause Fix
1. [Step to fix the underlying issue]

### Escalation
- On-call: [team/person]
- Escalate to [team] if not resolved in [time]

### Post-Incident
- [ ] Write incident report
- [ ] Update monitoring if needed
- [ ] Schedule follow-up work

33.11 Performance Testing

33.11.1 Why Performance Testing Matters

The worst time to discover your system cannot handle the load is during the load. Performance testing should be continuous, automated, and realistic.

33.11.2 Types of Performance Tests

  1. Load testing: Verify the system handles expected peak load. Run the system at 2x expected peak for a sustained period.

  2. Stress testing: Find the breaking point. Gradually increase load until the system fails. Document how it fails (gracefully or catastrophically).

  3. Soak testing: Run at moderate load for extended periods (24--72 hours). Detect memory leaks, connection pool exhaustion, log rotation failures.

  4. Spike testing: Simulate sudden traffic spikes (e.g., breaking news). Verify auto-scaling responds quickly enough.

33.11.3 Load Testing with Locust

"""
Load test suite for prediction market API.
Run with: locust -f load_test.py --host=http://localhost:8000
"""

import random
import string
import json
import uuid

# Note: In practice, import from locust
# from locust import HttpUser, task, between, events


class PredictionMarketLoadTest:
    """
    Simulates realistic prediction market usage patterns.
    Can be adapted for use with locust or any load testing framework.
    """

    def __init__(self, base_url: str = "http://localhost:8000"):
        self.base_url = base_url
        self.market_ids = []
        self.api_key = None
        self.wait_time_min = 1
        self.wait_time_max = 5

    def on_start(self):
        """Setup: create account and get API key."""
        # Register
        username = f"loadtest_{''.join(random.choices(string.ascii_lowercase, k=8))}"
        response = self._post("/api/register", {
            "username": username,
            "password": "LoadTest123!",
        })
        self.api_key = response.get("api_key")

        # Deposit funds
        self._post("/api/account/deposit", {
            "amount": 100000,
        })

        # Get available markets
        markets = self._get("/api/markets")
        self.market_ids = [m["market_id"] for m in markets.get("markets", [])]

    def task_browse_markets(self):
        """Most common action: browsing markets."""
        self._get("/api/markets")

    def task_view_market(self):
        """View a specific market's details."""
        if self.market_ids:
            market_id = random.choice(self.market_ids)
            self._get(f"/api/markets/{market_id}")
            self._get(f"/api/markets/{market_id}/orderbook")
            self._get(f"/api/markets/{market_id}/trades")

    def task_place_order(self):
        """Place a limit order."""
        if self.market_ids:
            market_id = random.choice(self.market_ids)
            self._post(f"/api/markets/{market_id}/orders", {
                "outcome": "Yes",
                "side": random.choice(["buy", "sell"]),
                "price": round(random.uniform(0.1, 0.9), 2),
                "quantity": random.randint(1, 100),
            })

    def task_check_portfolio(self):
        """Check portfolio and positions."""
        self._get("/api/portfolio")
        self._get("/api/portfolio/positions")
        self._get("/api/portfolio/orders")

    def task_cancel_order(self):
        """Cancel a random open order."""
        orders = self._get("/api/portfolio/orders")
        open_orders = [
            o for o in orders.get("orders", [])
            if o.get("status") == "open"
        ]
        if open_orders:
            order = random.choice(open_orders)
            self._delete(
                f"/api/markets/{order['market_id']}"
                f"/orders/{order['order_id']}"
            )

    def _get(self, path: str) -> dict:
        """Simulated GET request."""
        # In a real load test, this would make an HTTP request
        print(f"GET {self.base_url}{path}")
        return {}

    def _post(self, path: str, data: dict) -> dict:
        """Simulated POST request."""
        print(f"POST {self.base_url}{path} {json.dumps(data)}")
        return {}

    def _delete(self, path: str) -> dict:
        """Simulated DELETE request."""
        print(f"DELETE {self.base_url}{path}")
        return {}


class LoadTestScenarios:
    """Predefined load test scenarios."""

    @staticmethod
    def normal_day():
        """Simulate a normal trading day."""
        return {
            "users": 100,
            "spawn_rate": 10,  # users per second
            "duration_minutes": 30,
            "task_weights": {
                "browse_markets": 40,
                "view_market": 30,
                "place_order": 15,
                "check_portfolio": 10,
                "cancel_order": 5,
            },
        }

    @staticmethod
    def election_night():
        """Simulate election night traffic."""
        return {
            "users": 10000,
            "spawn_rate": 500,
            "duration_minutes": 240,
            "task_weights": {
                "browse_markets": 10,
                "view_market": 25,
                "place_order": 45,
                "check_portfolio": 15,
                "cancel_order": 5,
            },
        }

    @staticmethod
    def breaking_news_spike():
        """Simulate a sudden breaking news event."""
        return {
            "phases": [
                {
                    "name": "calm_before",
                    "users": 200,
                    "duration_minutes": 5,
                },
                {
                    "name": "spike",
                    "users": 5000,
                    "spawn_rate": 1000,
                    "duration_minutes": 10,
                },
                {
                    "name": "sustained",
                    "users": 3000,
                    "duration_minutes": 30,
                },
                {
                    "name": "cooldown",
                    "users": 500,
                    "duration_minutes": 15,
                },
            ],
        }

33.11.4 Capacity Planning

Capacity planning translates load test results into infrastructure requirements:

class CapacityPlanner:
    """
    Estimates required infrastructure based on
    expected traffic and load test results.
    """

    def __init__(self):
        self.benchmarks = {
            "api_server": {
                "max_rps_per_instance": 500,
                "cpu_cores": 4,
                "memory_gb": 8,
            },
            "matching_engine": {
                "max_orders_per_second": 10000,
                "cpu_cores": 8,
                "memory_gb": 16,
            },
            "database": {
                "max_connections": 200,
                "max_tps": 5000,
                "storage_per_trade_bytes": 256,
            },
            "redis": {
                "max_ops_per_second": 100000,
                "memory_per_market_bytes": 10240,
            },
        }

    def plan_for_event(
        self,
        expected_peak_rps: int,
        expected_peak_orders_per_second: int,
        active_markets: int,
        headroom_factor: float = 1.5,
    ) -> dict:
        """Generate capacity plan for an expected event."""
        plan = {}

        # API servers
        api_benchmark = self.benchmarks["api_server"]
        api_instances = int(
            (expected_peak_rps * headroom_factor)
            / api_benchmark["max_rps_per_instance"]
        ) + 1
        plan["api_servers"] = {
            "instances": max(api_instances, 3),  # minimum 3 for HA
            "cpu_cores_each": api_benchmark["cpu_cores"],
            "memory_gb_each": api_benchmark["memory_gb"],
        }

        # Matching engine shards
        me_benchmark = self.benchmarks["matching_engine"]
        me_shards = int(
            (expected_peak_orders_per_second * headroom_factor)
            / me_benchmark["max_orders_per_second"]
        ) + 1
        plan["matching_engine"] = {
            "shards": max(me_shards, 2),
            "cpu_cores_each": me_benchmark["cpu_cores"],
            "memory_gb_each": me_benchmark["memory_gb"],
        }

        # Database
        db_benchmark = self.benchmarks["database"]
        plan["database"] = {
            "primary_instances": 1,
            "read_replicas": max(api_instances // 5, 2),
            "connection_pool_size": min(
                api_instances * 10,
                db_benchmark["max_connections"]
            ),
        }

        # Redis
        redis_benchmark = self.benchmarks["redis"]
        redis_memory = (
            active_markets
            * redis_benchmark["memory_per_market_bytes"]
            / (1024 ** 3)
        )
        plan["redis"] = {
            "instances": max(int(redis_memory / 8) + 1, 3),
            "memory_gb_each": 8,
        }

        return plan


# Example usage:
planner = CapacityPlanner()
election_plan = planner.plan_for_event(
    expected_peak_rps=50000,
    expected_peak_orders_per_second=5000,
    active_markets=500,
    headroom_factor=2.0,
)

33.11.5 Bottleneck Identification

Performance bottlenecks in prediction markets typically appear in this order:

  1. Database connections --- the first thing to saturate.
  2. Lock contention --- multiple orders for the same market compete for the same locks.
  3. Network bandwidth --- WebSocket price feeds to many clients.
  4. CPU on matching engine --- sorting and matching order books.
  5. Memory --- holding large order books in memory.

Each bottleneck has a characteristic signature in the metrics:

Bottleneck Latency Pattern CPU Memory Queue Depth
DB connections Sudden spike, then timeouts Low Normal Grows rapidly
Lock contention Gradual increase, specific markets Medium Normal Moderate
Network Normal server latency, high client latency Low Normal Normal
CPU (matching) Gradual increase, all markets High Normal Grows slowly
Memory Normal until OOM, then crash Normal Growing Normal until crash

33.12 Chapter Summary

This chapter covered the full spectrum of production operations for prediction markets:

  1. Event sourcing provides an immutable, auditable record of all market activity. The event log is the source of truth, and current state is always derivable from events. This is particularly valuable for prediction markets, where regulatory compliance and dispute resolution require a complete audit trail.

  2. CQRS separates the write path (order submission, matching) from the read path (market prices, portfolios, analytics). Each side can be optimized independently, and read models can be scaled horizontally without affecting the consistency of writes.

  3. Database optimization through careful indexing (especially partial indexes for open orders), connection pooling, read replicas, and time-based partitioning keeps the data layer responsive under load.

  4. Caching at multiple levels (in-process, Redis, CDN) reduces the load on the database and speeds up the most common queries. Cache invalidation is driven by events, with TTL as a safety net.

  5. Message queues decouple the critical trading path from secondary operations like notifications and analytics. Dead letter queues and retry strategies ensure reliability.

  6. Horizontal scaling is enabled by stateless API servers, market-sharded matching engines, and read replicas. Auto-scaling policies based on leading indicators allow the platform to respond to traffic changes.

  7. Monitoring and alerting based on the four golden signals (latency, traffic, errors, saturation) plus domain-specific metrics (matching latency, queue depths, stale prices) provides operational visibility. Every alert links to a runbook.

  8. Security and DDoS mitigation uses layered defenses: rate limiting, input validation, API key management, adaptive throttling, and circuit breakers. The threat model is specific to prediction markets, where attacks may be financially motivated.

  9. Disaster recovery through continuous replication, automated failover, and multi-region deployment ensures the platform survives infrastructure failures. Event sourcing simplifies recovery by making state reconstructible from events.

  10. Performance testing with realistic scenarios (normal day, election night, breaking news spike) validates capacity plans and identifies bottlenecks before they affect real users.

Together, these practices transform a prediction market from a prototype into a production system that can reliably serve thousands of concurrent traders during the most important events.


What's Next

With Part V complete, we have covered the full breadth of market design and mechanism engineering: from matching engines and automated market makers (Chapters 29--30), through fee structures and liquidity provision (Chapters 31--32), to the production infrastructure in this chapter.

Part VI: Blockchain and Decentralized Markets takes these concepts into the decentralized world. Chapter 34 introduces blockchain fundamentals and smart contract platforms. Chapter 35 explores how prediction markets work on Ethereum, Gnosis Chain, and other networks. Chapter 36 examines oracle mechanisms --- the critical problem of getting real-world outcomes onto the blockchain. And Chapter 37 addresses the unique challenges of decentralized governance and dispute resolution.

The architectural patterns from this chapter --- event sourcing, CQRS, monitoring --- apply equally to decentralized systems, where they are implemented through smart contract event logs, The Graph (a decentralized query layer), and on-chain monitoring. The transition from centralized to decentralized infrastructure is not a wholesale replacement but an evolution of the same principles.


End of Chapter 33