> "A prediction market that cannot handle the moment everyone wants to trade is like a fire station that closes during fires." --- Anonymous Infrastructure Engineer
In This Chapter
- 33.1 Why Scaling Matters
- 33.2 Event Sourcing for Prediction Markets
- 33.3 CQRS: Command Query Responsibility Segregation
- 33.4 Database Optimization
- 33.5 Caching Strategies
- 33.6 Message Queues and Async Processing
- 33.7 Horizontal Scaling
- 33.8 Monitoring and Alerting
- 33.9 Security and DDoS Mitigation
- 33.10 Disaster Recovery and High Availability
- 33.11 Performance Testing
- 33.12 Chapter Summary
- What's Next
Chapter 33: Scaling, Performance, and Operations
"A prediction market that cannot handle the moment everyone wants to trade is like a fire station that closes during fires." --- Anonymous Infrastructure Engineer
Prediction markets face a unique operational challenge: the events that generate the most valuable information are precisely the events that generate the most traffic. Election nights, breaking news, unexpected geopolitical events --- these moments produce both the highest demand for market access and the highest information content in prices. A platform that buckles under this load fails at the exact moment it matters most.
This chapter provides an exhaustive treatment of the engineering disciplines required to build production-grade prediction market infrastructure. We cover event sourcing and CQRS as architectural foundations, database optimization, caching, message queues, horizontal scaling, monitoring, security, disaster recovery, and performance testing. Every section includes concrete Python implementations that can serve as starting points for real systems.
By the end of this chapter, you will understand not just how to build a prediction market matching engine (covered in earlier chapters), but how to operate one reliably at scale.
33.1 Why Scaling Matters
33.1.1 The Bursty Nature of Prediction Market Traffic
Prediction market traffic is among the most bursty of any application category. Consider the traffic pattern for a market on a U.S. presidential election:
- Normal day: 500--2,000 trades per hour, a few hundred concurrent users.
- Debate night: 10,000--50,000 trades per hour, several thousand concurrent users.
- Election night: 500,000+ trades per hour, tens of thousands of concurrent users, with sub-second price updates critical.
The ratio between peak and normal traffic can easily exceed 100:1. This is far more extreme than typical e-commerce (Black Friday is roughly 10:1) and approaches the burstiness of live sports betting platforms.
Definition 33.1 (Peak-to-Normal Ratio). For a prediction market platform, the peak-to-normal ratio $R$ is defined as:
$$R = \frac{T_{\text{peak}}}{T_{\text{normal}}}$$
where $T_{\text{peak}}$ is the maximum sustained throughput during the highest-traffic event over a time period, and $T_{\text{normal}}$ is the average throughput during non-event periods.
For well-known prediction market platforms, $R$ values between 50 and 500 are common.
33.1.2 Why Traditional Approaches Fail
A monolithic application backed by a single relational database can typically handle a few hundred requests per second. This may be adequate for normal operations but is catastrophically insufficient during peak events. The failure modes are predictable:
- Database connection exhaustion: The database runs out of available connections, causing new requests to queue or fail.
- Lock contention: Multiple trades affecting the same market create lock contention in the order book tables.
- Memory pressure: In-memory data structures (order books, session data) exceed available RAM.
- Network saturation: WebSocket connections for real-time price feeds consume all available bandwidth.
- Cascading failures: When one component slows down, back-pressure causes upstream components to fail, leading to total system collapse.
33.1.3 Scaling Dimensions
We can scale along multiple dimensions, each with different costs and trade-offs:
| Dimension | Approach | Benefit | Limitation |
|---|---|---|---|
| Vertical | Bigger machines | Simple, no code changes | Hardware ceiling, single point of failure |
| Horizontal | More machines | Near-linear scaling | Requires stateless design, adds complexity |
| Functional | Separate services | Independent scaling | Inter-service communication overhead |
| Data | Sharding/partitioning | Distributes load | Cross-shard queries are expensive |
| Temporal | Caching | Reduces repeated work | Staleness, invalidation complexity |
The remainder of this chapter addresses each of these dimensions with architectures and code specifically designed for prediction market workloads.
33.1.4 Service Level Objectives
Before scaling, we must define what "fast enough" means. Here are typical SLOs for a production prediction market:
| Metric | Target | Rationale |
|---|---|---|
| Order submission latency (p50) | < 50 ms | Traders expect near-instant confirmation |
| Order submission latency (p99) | < 200 ms | Tail latency must be bounded |
| Price feed latency | < 100 ms | Stale prices cause bad trades |
| API availability | 99.95% | ~4.4 hours downtime per year |
| Market resolution time | < 5 minutes | After outcome is known |
| Data consistency | Exactly-once | No duplicate or lost trades |
These SLOs drive every architectural decision in this chapter.
33.2 Event Sourcing for Prediction Markets
33.2.1 The Core Idea
Traditional applications store the current state of data. An order book table might contain only the current set of open orders. When an order is filled, the row is updated or deleted. The history of how we arrived at the current state is lost (or, at best, stored in a separate audit log that may drift from reality).
Event sourcing inverts this model: instead of storing current state, we store the sequence of events that produced the current state. The current state is a derived quantity, computed by replaying the event log.
Definition 33.2 (Event Sourcing). An event-sourced system persists all changes to application state as an immutable, append-only sequence of events. The current state at time $t$ is obtained by applying a fold (left reduction) over all events up to time $t$:
$$S(t) = \text{fold}(f, S_0, [e_1, e_2, \ldots, e_n])$$
where $S_0$ is the initial state, $e_1, \ldots, e_n$ are the events with timestamps $\leq t$, and $f: (S, e) \to S'$ is the state transition function.
33.2.2 Event Types for Prediction Markets
A prediction market platform produces several categories of events:
Market Lifecycle Events:
- MarketCreated --- a new market is opened for trading
- MarketSuspended --- trading is temporarily halted
- MarketResumed --- trading resumes after suspension
- MarketResolved --- the outcome is determined and the market closes
- MarketVoided --- the market is cancelled and all positions are refunded
Order Events:
- OrderPlaced --- a new order enters the system
- OrderMatched --- two orders are matched (partially or fully)
- OrderCancelled --- a trader cancels an open order
- OrderExpired --- a time-limited order reaches its expiry
Account Events:
- FundsDeposited --- a trader adds funds to their account
- FundsWithdrawn --- a trader removes funds
- PositionOpened --- a new market position is created
- PositionClosed --- a position is settled after market resolution
- PayoutIssued --- funds are disbursed to winning positions
33.2.3 Benefits for Prediction Markets
Event sourcing is particularly well-suited to prediction markets for several reasons:
-
Regulatory compliance: Many jurisdictions require a complete, tamper-evident audit trail of all trades. An event log provides this natively.
-
Dispute resolution: When traders dispute a fill price or claim an order was not executed, the event log provides an authoritative record.
-
Temporal queries: "What was the price of this market at 9:47 PM on election night?" is trivially answerable by replaying events up to that timestamp.
-
Replay and debugging: When bugs are discovered, the production event stream can be replayed against fixed code to verify the correction.
-
Analytics without impact: Historical analysis can run against a read-only copy of the event log without affecting production performance.
-
Event-driven architecture: Downstream systems (notifications, analytics, settlement) can subscribe to the event stream and process events independently.
33.2.4 Implementation Considerations
Event Schema Evolution: Over time, event schemas will change. A MarketCreated event in version 1 might lack fields that version 2 requires. Solutions include:
- Upcasting: Transform old events to the latest schema on read.
- Versioned schemas: Include a version field in each event and handle each version.
- Weak schema: Use a flexible format (JSON) that tolerates missing fields with sensible defaults.
Snapshotting: Replaying millions of events to compute current state is expensive. Periodic snapshots capture state at a point in time, and replay needs only to process events after the snapshot.
Compaction: For very long-lived streams, events before a snapshot can be archived to cold storage to keep the hot event store manageable.
33.2.5 Python Implementation
The following implementation provides a complete event sourcing system for a prediction market. See code/example-01-event-sourcing.py for the full runnable version.
import json
import time
import uuid
import hashlib
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Any, Callable
from enum import Enum
from datetime import datetime, timezone
class EventType(Enum):
MARKET_CREATED = "market_created"
ORDER_PLACED = "order_placed"
ORDER_MATCHED = "order_matched"
ORDER_CANCELLED = "order_cancelled"
MARKET_RESOLVED = "market_resolved"
FUNDS_DEPOSITED = "funds_deposited"
FUNDS_WITHDRAWN = "funds_withdrawn"
@dataclass(frozen=True)
class Event:
event_id: str
event_type: EventType
aggregate_id: str # e.g., market_id or account_id
timestamp: float
version: int # aggregate version for optimistic concurrency
data: Dict[str, Any]
metadata: Dict[str, Any] = field(default_factory=dict)
@property
def checksum(self) -> str:
"""Tamper-evident checksum for audit purposes."""
payload = json.dumps({
"event_id": self.event_id,
"event_type": self.event_type.value,
"aggregate_id": self.aggregate_id,
"timestamp": self.timestamp,
"version": self.version,
"data": self.data,
}, sort_keys=True)
return hashlib.sha256(payload.encode()).hexdigest()
class EventStore:
"""Append-only event store with optimistic concurrency control."""
def __init__(self):
self._events: List[Event] = []
self._aggregate_versions: Dict[str, int] = {}
self._subscribers: List[Callable[[Event], None]] = []
def append(self, event: Event) -> None:
agg_id = event.aggregate_id
current_version = self._aggregate_versions.get(agg_id, 0)
if event.version != current_version + 1:
raise ConcurrencyError(
f"Expected version {current_version + 1} "
f"for aggregate {agg_id}, got {event.version}"
)
self._events.append(event)
self._aggregate_versions[agg_id] = event.version
for subscriber in self._subscribers:
subscriber(event)
def get_events(
self,
aggregate_id: str,
after_version: int = 0
) -> List[Event]:
return [
e for e in self._events
if e.aggregate_id == aggregate_id
and e.version > after_version
]
def get_all_events(
self,
after_timestamp: float = 0.0
) -> List[Event]:
return [
e for e in self._events
if e.timestamp > after_timestamp
]
def subscribe(self, handler: Callable[[Event], None]) -> None:
self._subscribers.append(handler)
class ConcurrencyError(Exception):
pass
This event store enforces two critical properties: (1) events are immutable and append-only, and (2) optimistic concurrency control prevents conflicting updates to the same aggregate.
33.2.6 Rebuilding State from Events
The power of event sourcing lies in the ability to reconstruct state at any point in time:
class MarketAggregate:
"""Rebuilds market state from events."""
def __init__(self, market_id: str):
self.market_id = market_id
self.status = None
self.question = None
self.outcomes = []
self.orders = {}
self.trades = []
self.resolution = None
self.version = 0
def apply(self, event: Event) -> None:
handler = getattr(
self, f"_apply_{event.event_type.value}", None
)
if handler:
handler(event)
self.version = event.version
def _apply_market_created(self, event: Event) -> None:
self.status = "open"
self.question = event.data["question"]
self.outcomes = event.data["outcomes"]
def _apply_order_placed(self, event: Event) -> None:
order_id = event.data["order_id"]
self.orders[order_id] = {
"trader_id": event.data["trader_id"],
"outcome": event.data["outcome"],
"side": event.data["side"],
"price": event.data["price"],
"quantity": event.data["quantity"],
"remaining": event.data["quantity"],
"status": "open",
}
def _apply_order_matched(self, event: Event) -> None:
buy_id = event.data["buy_order_id"]
sell_id = event.data["sell_order_id"]
qty = event.data["quantity"]
price = event.data["price"]
self.orders[buy_id]["remaining"] -= qty
self.orders[sell_id]["remaining"] -= qty
if self.orders[buy_id]["remaining"] == 0:
self.orders[buy_id]["status"] = "filled"
if self.orders[sell_id]["remaining"] == 0:
self.orders[sell_id]["status"] = "filled"
self.trades.append({
"buy_order_id": buy_id,
"sell_order_id": sell_id,
"quantity": qty,
"price": price,
"timestamp": event.timestamp,
})
def _apply_market_resolved(self, event: Event) -> None:
self.status = "resolved"
self.resolution = event.data["winning_outcome"]
@classmethod
def rebuild(cls, market_id: str, event_store: EventStore):
"""Rebuild market state from the event log."""
aggregate = cls(market_id)
for event in event_store.get_events(market_id):
aggregate.apply(event)
return aggregate
33.2.7 Snapshots for Performance
When markets accumulate thousands of events, replaying all of them becomes slow. Snapshots solve this:
@dataclass
class Snapshot:
aggregate_id: str
version: int
state: Dict[str, Any]
timestamp: float
class SnapshotStore:
def __init__(self):
self._snapshots: Dict[str, Snapshot] = {}
def save(self, aggregate: MarketAggregate) -> None:
self._snapshots[aggregate.market_id] = Snapshot(
aggregate_id=aggregate.market_id,
version=aggregate.version,
state={
"status": aggregate.status,
"question": aggregate.question,
"outcomes": aggregate.outcomes,
"orders": aggregate.orders,
"trades": aggregate.trades,
"resolution": aggregate.resolution,
},
timestamp=time.time(),
)
def load(self, aggregate_id: str) -> Optional[Snapshot]:
return self._snapshots.get(aggregate_id)
With snapshots, state reconstruction only needs to replay events after the snapshot, reducing startup time from minutes to milliseconds for busy markets.
33.3 CQRS: Command Query Responsibility Segregation
33.3.1 The Problem with Unified Models
In a traditional architecture, the same data model serves both writes (placing orders, creating markets) and reads (displaying order books, showing market prices, generating leaderboards). This creates a fundamental tension:
- Write-optimized schemas use normalization to avoid update anomalies, but normalized schemas require expensive joins for reads.
- Read-optimized schemas use denormalization for fast queries, but denormalized schemas create update anomalies and consistency challenges.
Prediction markets feel this tension acutely. The write path (order matching) requires strict consistency and low latency. The read path (displaying prices, positions, history) requires high throughput and flexible querying. Trying to serve both from the same model forces compromises on both sides.
33.3.2 The CQRS Pattern
Definition 33.3 (CQRS). Command Query Responsibility Segregation is an architectural pattern that uses separate models for updating data (commands) and reading data (queries). Commands mutate state and return only success or failure. Queries return data and never mutate state.
In a prediction market context:
Command Side (Write Model): - Receives order submissions, cancellations, market creation requests. - Validates business rules (sufficient funds, market is open, price within bounds). - Applies changes to the event store. - Optimized for consistency and correctness.
Query Side (Read Model): - Maintains denormalized views optimized for specific query patterns. - Updated asynchronously by subscribing to the event stream. - Can have multiple read models, each optimized for different use cases. - Eventual consistency is acceptable (typically milliseconds of lag).
33.3.3 Read Models for Prediction Markets
A prediction market platform needs several read models:
-
Order Book View: For each market, the current bids and asks at each price level. Optimized for the trading UI.
-
Market Summary View: Current price, volume, number of traders, price history. Optimized for market listing pages.
-
Portfolio View: Each trader's positions, PnL, and open orders. Optimized for portfolio dashboards.
-
Leaderboard View: Top traders by profit, accuracy, or volume. Optimized for competitive features.
-
Analytics View: Time-series data for charting, volume analysis, and market microstructure research.
Each view is a projection of the same underlying event stream, denormalized for its specific access pattern.
33.3.4 Consistency Considerations
CQRS introduces eventual consistency between the write model and read models. The write model is always authoritative, but read models may lag behind by a small amount of time. This has practical implications:
- A trader who places an order may not immediately see it in their portfolio view.
- The displayed price might be a few milliseconds behind the latest trade.
- Two users viewing the same market might momentarily see different prices.
For prediction markets, this is generally acceptable. The critical invariant --- that orders are matched correctly and funds are transferred accurately --- is maintained by the write model. The read models only need to be eventually consistent.
Theorem 33.1 (Consistency Bound). In a CQRS system with event propagation latency $\delta$ and read model projection processing time $\pi$, the maximum staleness of any read model is bounded by:
$$\text{staleness}_{\max} = \delta + \pi$$
In practice, with in-process event subscribers and efficient projection logic, $\delta + \pi < 10\text{ms}$ is achievable, which is imperceptible to human users.
33.3.5 Python CQRS Implementation
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from collections import defaultdict
# --- Command Side ---
class CommandHandler:
"""Processes commands and emits events."""
def __init__(self, event_store: EventStore):
self.event_store = event_store
def handle_place_order(
self,
market_id: str,
trader_id: str,
outcome: str,
side: str,
price: float,
quantity: int,
) -> str:
# Validate
if not 0 < price < 1:
raise ValueError("Price must be between 0 and 1")
if quantity <= 0:
raise ValueError("Quantity must be positive")
# Load current state to check market is open
market = MarketAggregate.rebuild(market_id, self.event_store)
if market.status != "open":
raise ValueError(f"Market is {market.status}, not open")
# Create event
order_id = str(uuid.uuid4())
event = Event(
event_id=str(uuid.uuid4()),
event_type=EventType.ORDER_PLACED,
aggregate_id=market_id,
timestamp=time.time(),
version=market.version + 1,
data={
"order_id": order_id,
"trader_id": trader_id,
"outcome": outcome,
"side": side,
"price": price,
"quantity": quantity,
},
)
self.event_store.append(event)
return order_id
# --- Query Side ---
class OrderBookProjection:
"""Read model: order book for each market."""
def __init__(self):
self.books: Dict[str, Dict] = defaultdict(
lambda: {"bids": defaultdict(int), "asks": defaultdict(int)}
)
def handle_event(self, event: Event) -> None:
if event.event_type == EventType.ORDER_PLACED:
market_id = event.aggregate_id
side = "bids" if event.data["side"] == "buy" else "asks"
price = event.data["price"]
qty = event.data["quantity"]
self.books[market_id][side][price] += qty
elif event.event_type == EventType.ORDER_MATCHED:
market_id = event.aggregate_id
price = event.data["price"]
qty = event.data["quantity"]
self.books[market_id]["bids"][price] -= qty
self.books[market_id]["asks"][price] -= qty
def get_order_book(self, market_id: str) -> Dict:
book = self.books[market_id]
return {
"bids": sorted(
[(p, q) for p, q in book["bids"].items() if q > 0],
key=lambda x: -x[0]
),
"asks": sorted(
[(p, q) for p, q in book["asks"].items() if q > 0],
key=lambda x: x[0]
),
}
class MarketSummaryProjection:
"""Read model: summary stats for each market."""
def __init__(self):
self.summaries: Dict[str, Dict] = {}
def handle_event(self, event: Event) -> None:
if event.event_type == EventType.MARKET_CREATED:
self.summaries[event.aggregate_id] = {
"market_id": event.aggregate_id,
"question": event.data["question"],
"outcomes": event.data["outcomes"],
"status": "open",
"last_price": None,
"total_volume": 0,
"trade_count": 0,
"unique_traders": set(),
}
elif event.event_type == EventType.ORDER_MATCHED:
mid = event.aggregate_id
if mid in self.summaries:
self.summaries[mid]["last_price"] = event.data["price"]
self.summaries[mid]["total_volume"] += (
event.data["quantity"] * event.data["price"]
)
self.summaries[mid]["trade_count"] += 1
elif event.event_type == EventType.MARKET_RESOLVED:
mid = event.aggregate_id
if mid in self.summaries:
self.summaries[mid]["status"] = "resolved"
self.summaries[mid]["resolution"] = (
event.data["winning_outcome"]
)
def get_summary(self, market_id: str) -> Optional[Dict]:
summary = self.summaries.get(market_id)
if summary:
result = dict(summary)
result["unique_traders"] = len(result["unique_traders"])
return result
return None
33.3.6 Wiring It Together
The command handler and projections are connected via the event store's subscription mechanism:
# Setup
event_store = EventStore()
command_handler = CommandHandler(event_store)
order_book_view = OrderBookProjection()
summary_view = MarketSummaryProjection()
# Wire projections to event store
event_store.subscribe(order_book_view.handle_event)
event_store.subscribe(summary_view.handle_event)
# Now, when a command is processed:
# 1. CommandHandler validates and appends event to store
# 2. EventStore notifies all subscribers
# 3. Projections update their denormalized views
# 4. Reads go directly to projections (no joins, no aggregation)
This separation allows us to scale reads and writes independently. The command handler can run on a single, powerful machine (writes are inherently serial for a given market), while read replicas can be scaled horizontally across many machines.
33.4 Database Optimization
33.4.1 Schema Design for Market Data
Prediction market data has specific patterns that inform schema design. The most frequently accessed data is:
- Current market prices --- queried on every page load and WebSocket tick.
- Open orders for a market --- queried by the matching engine on every new order.
- A trader's positions --- queried on every portfolio page load.
- Recent trades --- queried for chart rendering and market activity feeds.
A well-designed schema separates these access patterns:
-- Markets table: one row per market, accessed by ID
CREATE TABLE markets (
market_id UUID PRIMARY KEY,
question TEXT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'open',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMP WITH TIME ZONE,
resolution VARCHAR(100),
metadata JSONB DEFAULT '{}'
);
-- Orders table: partitioned by status for efficient querying
CREATE TABLE orders (
order_id UUID PRIMARY KEY,
market_id UUID NOT NULL REFERENCES markets(market_id),
trader_id UUID NOT NULL,
outcome VARCHAR(100) NOT NULL,
side VARCHAR(4) NOT NULL CHECK (side IN ('buy', 'sell')),
price DECIMAL(10, 4) NOT NULL CHECK (price > 0 AND price < 1),
quantity INTEGER NOT NULL CHECK (quantity > 0),
remaining INTEGER NOT NULL CHECK (remaining >= 0),
status VARCHAR(20) NOT NULL DEFAULT 'open',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Trades table: append-only, partitioned by time
CREATE TABLE trades (
trade_id UUID PRIMARY KEY,
market_id UUID NOT NULL REFERENCES markets(market_id),
buy_order_id UUID NOT NULL REFERENCES orders(order_id),
sell_order_id UUID NOT NULL REFERENCES orders(order_id),
price DECIMAL(10, 4) NOT NULL,
quantity INTEGER NOT NULL,
executed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Positions table: materialized view of net positions
CREATE TABLE positions (
trader_id UUID NOT NULL,
market_id UUID NOT NULL,
outcome VARCHAR(100) NOT NULL,
net_quantity INTEGER NOT NULL DEFAULT 0,
avg_price DECIMAL(10, 4),
PRIMARY KEY (trader_id, market_id, outcome)
);
33.4.2 Indexing Strategies
Indexes must be designed for actual query patterns, not hypothetical ones. Here are the critical indexes for a prediction market:
-- Fast lookup of open orders for matching engine
CREATE INDEX idx_orders_market_open
ON orders (market_id, outcome, side, price)
WHERE status = 'open';
-- Fast lookup of a trader's orders
CREATE INDEX idx_orders_trader
ON orders (trader_id, status);
-- Fast lookup of recent trades for a market (chart data)
CREATE INDEX idx_trades_market_time
ON trades (market_id, executed_at DESC);
-- Fast lookup of active markets
CREATE INDEX idx_markets_status
ON markets (status)
WHERE status = 'open';
-- Full-text search on market questions
CREATE INDEX idx_markets_question_search
ON markets USING gin(to_tsvector('english', question));
The partial index on orders WHERE status = 'open' is critical: in a mature platform, the vast majority of orders are filled or cancelled, but the matching engine only cares about open orders. A partial index keeps the index small and fast.
33.4.3 Connection Pooling
Database connections are expensive to establish (TCP handshake, TLS negotiation, authentication). A connection pool maintains a set of ready-to-use connections:
import asyncpg
import asyncio
class DatabasePool:
"""Manages a pool of database connections."""
def __init__(
self,
dsn: str,
min_size: int = 10,
max_size: int = 50,
):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self._pool = None
async def initialize(self):
self._pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=30,
max_inactive_connection_lifetime=300,
)
async def execute(self, query: str, *args):
async with self._pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
async with self._pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
async with self._pool.acquire() as conn:
return await conn.fetchrow(query, *args)
async def close(self):
if self._pool:
await self._pool.close()
Sizing the pool: A common formula is:
$$\text{pool\_size} = \frac{C \times (Q_{\text{avg}} + L_{\text{avg}})}{1000}$$
where $C$ is the expected concurrent requests, $Q_{\text{avg}}$ is the average query time in milliseconds, and $L_{\text{avg}}$ is the average network latency in milliseconds. For 1,000 concurrent requests with 5ms average query time and 1ms latency, this gives a pool size of 6. In practice, add headroom: a pool of 20--50 connections serves most prediction market workloads well.
33.4.4 Read Replicas
For read-heavy workloads (which prediction markets are --- far more price checks than trades), read replicas distribute the query load:
class ReadReplicaRouter:
"""Routes queries to primary or replica based on freshness needs."""
def __init__(
self,
primary_pool: DatabasePool,
replica_pools: List[DatabasePool],
):
self.primary = primary_pool
self.replicas = replica_pools
self._replica_index = 0
def _next_replica(self) -> DatabasePool:
"""Round-robin across replicas."""
pool = self.replicas[self._replica_index]
self._replica_index = (
(self._replica_index + 1) % len(self.replicas)
)
return pool
async def query(
self,
sql: str,
*args,
require_fresh: bool = False,
):
if require_fresh or not self.replicas:
return await self.primary.fetch(sql, *args)
return await self._next_replica().fetch(sql, *args)
33.4.5 Table Partitioning
The trades table grows indefinitely and is best partitioned by time:
-- Range partition trades by month
CREATE TABLE trades (
trade_id UUID NOT NULL,
market_id UUID NOT NULL,
price DECIMAL(10, 4) NOT NULL,
quantity INTEGER NOT NULL,
executed_at TIMESTAMP WITH TIME ZONE NOT NULL
) PARTITION BY RANGE (executed_at);
CREATE TABLE trades_2026_01 PARTITION OF trades
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE trades_2026_02 PARTITION OF trades
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- ... and so on, created automatically by a cron job
Queries that include a time range predicate automatically target only the relevant partitions, dramatically reducing I/O for historical queries.
33.5 Caching Strategies
33.5.1 What to Cache
Not all data benefits equally from caching. The caching value of a data item depends on three factors:
$$V_{\text{cache}} = \frac{f_{\text{access}} \times c_{\text{compute}}}{r_{\text{change}}}$$
where $f_{\text{access}}$ is the access frequency, $c_{\text{compute}}$ is the computation cost, and $r_{\text{change}}$ is the rate of change. Data with high access frequency, high computation cost, and low change rate benefits most from caching.
| Data | Access Freq | Compute Cost | Change Rate | Cache Value |
|---|---|---|---|---|
| Market list | Very high | Medium | Low | Very high |
| Market prices | Very high | Low | High | Medium (short TTL) |
| Order book depth | High | Medium | High | Medium (short TTL) |
| User portfolio | Medium | High | Medium | High |
| Leaderboard | High | Very high | Low | Very high |
| Trade history | Medium | Low | Append-only | High |
33.5.2 Redis Caching Layer
import json
import time
import hashlib
from typing import Any, Optional, Callable
class PredictionMarketCache:
"""Caching layer for prediction market data."""
def __init__(self, redis_client):
self.redis = redis_client
self.hit_count = 0
self.miss_count = 0
async def get_or_compute(
self,
key: str,
compute_fn: Callable,
ttl_seconds: int = 60,
) -> Any:
"""Get from cache or compute and store."""
cached = await self.redis.get(key)
if cached is not None:
self.hit_count += 1
return json.loads(cached)
self.miss_count += 1
value = await compute_fn()
await self.redis.setex(
key,
ttl_seconds,
json.dumps(value, default=str),
)
return value
async def get_market_price(
self,
market_id: str,
compute_fn: Callable,
) -> dict:
"""Cache market prices with short TTL."""
return await self.get_or_compute(
f"market:price:{market_id}",
compute_fn,
ttl_seconds=2, # 2 seconds for actively traded markets
)
async def get_market_list(
self,
compute_fn: Callable,
) -> list:
"""Cache the market list with medium TTL."""
return await self.get_or_compute(
"markets:active",
compute_fn,
ttl_seconds=30,
)
async def get_leaderboard(
self,
compute_fn: Callable,
) -> list:
"""Cache leaderboard with long TTL."""
return await self.get_or_compute(
"leaderboard:top100",
compute_fn,
ttl_seconds=300, # 5 minutes
)
async def invalidate_market(self, market_id: str) -> None:
"""Invalidate all caches for a market after a trade."""
keys = [
f"market:price:{market_id}",
f"market:orderbook:{market_id}",
f"market:summary:{market_id}",
]
await self.redis.delete(*keys)
@property
def hit_rate(self) -> float:
total = self.hit_count + self.miss_count
return self.hit_count / total if total > 0 else 0.0
33.5.3 Cache Invalidation Strategies
Cache invalidation is famously one of the two hard problems in computer science (along with naming things and off-by-one errors). For prediction markets, we use a hybrid strategy:
-
TTL-based expiration: Every cached item has a time-to-live. This provides a safety net --- even if explicit invalidation fails, stale data is bounded.
-
Event-driven invalidation: When events occur (trades, order placements), we explicitly invalidate affected cache entries. This keeps latency low for active markets.
-
Write-through for critical data: Market prices are written to cache simultaneously with the event store, ensuring the cache is always up-to-date for this critical data point.
class CacheInvalidator:
"""Subscribes to events and invalidates relevant caches."""
def __init__(self, cache: PredictionMarketCache):
self.cache = cache
async def handle_event(self, event: Event) -> None:
if event.event_type == EventType.ORDER_MATCHED:
await self.cache.invalidate_market(event.aggregate_id)
# Also invalidate both traders' portfolio caches
buy_trader = event.data.get("buy_trader_id")
sell_trader = event.data.get("sell_trader_id")
if buy_trader:
await self.cache.redis.delete(
f"portfolio:{buy_trader}"
)
if sell_trader:
await self.cache.redis.delete(
f"portfolio:{sell_trader}"
)
elif event.event_type == EventType.ORDER_PLACED:
await self.cache.redis.delete(
f"market:orderbook:{event.aggregate_id}"
)
elif event.event_type == EventType.MARKET_RESOLVED:
await self.cache.invalidate_market(event.aggregate_id)
await self.cache.redis.delete("markets:active")
await self.cache.redis.delete("leaderboard:top100")
33.5.4 Multi-Level Caching
For the highest performance, use multiple cache levels:
- L1: In-process cache (Python dict or LRU cache): Nanosecond access, limited by process memory, not shared across instances.
- L2: Redis: Microsecond access over network, shared across all instances, persists across restarts.
- L3: Database: Millisecond access, authoritative source of truth.
from functools import lru_cache
class MultiLevelCache:
def __init__(self, redis_client):
self.redis = redis_client
self._l1 = {}
self._l1_expiry = {}
async def get(self, key: str, compute_fn, l1_ttl=1, l2_ttl=60):
# L1: in-process
now = time.time()
if key in self._l1 and self._l1_expiry[key] > now:
return self._l1[key]
# L2: Redis
cached = await self.redis.get(key)
if cached is not None:
value = json.loads(cached)
self._l1[key] = value
self._l1_expiry[key] = now + l1_ttl
return value
# L3: compute from database
value = await compute_fn()
await self.redis.setex(key, l2_ttl, json.dumps(value, default=str))
self._l1[key] = value
self._l1_expiry[key] = now + l1_ttl
return value
33.6 Message Queues and Async Processing
33.6.1 Why Decouple?
A synchronous request-response cycle for order placement might involve: validating the order, checking funds, inserting into the order book, attempting to match, updating positions for both traders, sending WebSocket notifications, updating analytics, and recording audit logs. If all of this happens synchronously, the trader waits for all steps to complete before receiving confirmation.
Message queues decouple the critical path (validate, check funds, match) from secondary operations (notifications, analytics, audit). This reduces perceived latency and improves reliability: if the notification service is temporarily down, orders still process.
33.6.2 Queue Architecture for Prediction Markets
+------------------+
| API Gateway |
+--------+---------+
|
+--------v---------+
| Command Handler |
+--------+---------+
|
+--------------+--------------+
| | |
+--------v-----+ +----v------+ +----v-------+
| Order Queue | | Notif. | | Analytics |
| (high prio) | | Queue | | Queue |
+--------+------+ +----+-----+ +----+-------+
| | |
+--------v------+ +---v------+ +----v-------+
| Matching | | Notif. | | Analytics |
| Engine | | Worker | | Worker |
+--------+------+ +---------+ +------------+
|
+--------v------+
| Settlement |
| Queue |
+--------+------+
|
+--------v------+
| Settlement |
| Worker |
+----------------+
33.6.3 Python Message Queue Implementation
import asyncio
import json
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from enum import Enum
from collections import defaultdict
class QueuePriority(Enum):
CRITICAL = 0 # Order matching, fund transfers
HIGH = 1 # Price updates, position updates
NORMAL = 2 # Notifications, analytics
LOW = 3 # Reports, cleanup tasks
@dataclass
class QueueMessage:
message_id: str
queue_name: str
priority: QueuePriority
payload: Dict[str, Any]
timestamp: float
retry_count: int = 0
max_retries: int = 3
class InMemoryMessageQueue:
"""
Simple priority message queue for demonstration.
In production, replace with Redis Streams or RabbitMQ.
"""
def __init__(self):
self._queues: Dict[str, asyncio.PriorityQueue] = {}
self._handlers: Dict[str, Callable] = {}
self._dead_letter: List[QueueMessage] = []
self._processed_count: Dict[str, int] = defaultdict(int)
def declare_queue(self, name: str, handler: Callable) -> None:
self._queues[name] = asyncio.PriorityQueue()
self._handlers[name] = handler
async def publish(self, message: QueueMessage) -> None:
queue = self._queues.get(message.queue_name)
if queue is None:
raise ValueError(f"Unknown queue: {message.queue_name}")
await queue.put((message.priority.value, message.timestamp, message))
async def consume(self, queue_name: str) -> None:
"""Consume messages from a queue indefinitely."""
queue = self._queues[queue_name]
handler = self._handlers[queue_name]
while True:
_, _, message = await queue.get()
try:
await handler(message)
self._processed_count[queue_name] += 1
except Exception as e:
message.retry_count += 1
if message.retry_count <= message.max_retries:
await self.publish(message)
else:
self._dead_letter.append(message)
print(
f"Message {message.message_id} "
f"moved to dead letter queue: {e}"
)
finally:
queue.task_done()
class OrderProcessingPipeline:
"""Orchestrates order processing through queues."""
def __init__(self, message_queue: InMemoryMessageQueue):
self.mq = message_queue
self._setup_queues()
def _setup_queues(self):
self.mq.declare_queue("orders", self._handle_order)
self.mq.declare_queue("notifications", self._handle_notification)
self.mq.declare_queue("analytics", self._handle_analytics)
self.mq.declare_queue("settlement", self._handle_settlement)
async def submit_order(self, order_data: dict) -> str:
message_id = str(uuid.uuid4())
await self.mq.publish(QueueMessage(
message_id=message_id,
queue_name="orders",
priority=QueuePriority.CRITICAL,
payload=order_data,
timestamp=time.time(),
))
return message_id
async def _handle_order(self, message: QueueMessage) -> None:
"""Process an order: validate, match, emit follow-up messages."""
order = message.payload
# ... matching logic ...
# Emit notification
await self.mq.publish(QueueMessage(
message_id=str(uuid.uuid4()),
queue_name="notifications",
priority=QueuePriority.NORMAL,
payload={
"type": "order_confirmed",
"trader_id": order["trader_id"],
"order_id": order["order_id"],
},
timestamp=time.time(),
))
# Emit analytics event
await self.mq.publish(QueueMessage(
message_id=str(uuid.uuid4()),
queue_name="analytics",
priority=QueuePriority.LOW,
payload={
"type": "order_processed",
"market_id": order["market_id"],
"price": order["price"],
"quantity": order["quantity"],
},
timestamp=time.time(),
))
async def _handle_notification(self, message: QueueMessage) -> None:
"""Send notification to trader (email, push, WebSocket)."""
payload = message.payload
print(
f"Notifying trader {payload['trader_id']}: "
f"{payload['type']}"
)
async def _handle_analytics(self, message: QueueMessage) -> None:
"""Record analytics event."""
payload = message.payload
print(f"Analytics: {payload['type']} for {payload.get('market_id')}")
async def _handle_settlement(self, message: QueueMessage) -> None:
"""Settle a resolved market."""
payload = message.payload
print(f"Settling market {payload['market_id']}")
33.6.4 Dead Letter Queues and Retry Strategies
Messages that fail repeatedly should not be retried forever. A dead letter queue (DLQ) captures these failures for manual inspection. The retry strategy should use exponential backoff:
$$t_{\text{retry}}(n) = t_{\text{base}} \cdot 2^{n-1} + \text{jitter}$$
where $n$ is the retry attempt number and $\text{jitter}$ is a random value to prevent thundering herd effects. For prediction markets, $t_{\text{base}} = 1\text{s}$ and $\max(n) = 5$ gives retry delays of 1s, 2s, 4s, 8s, 16s before the message is moved to the DLQ.
33.7 Horizontal Scaling
33.7.1 Stateless API Servers
The first step in horizontal scaling is making API servers stateless. No server-local state should be required to process a request:
- Session data goes to Redis, not server memory.
- File uploads go to object storage (S3, GCS), not local disk.
- Cached data goes to shared Redis, not in-process caches (or use in-process caches only as L1 with short TTLs).
Once servers are stateless, scaling is a matter of adding more instances behind a load balancer.
33.7.2 Load Balancing
For prediction markets, the load balancing strategy must account for WebSocket connections:
class LoadBalancerConfig:
"""Configuration for prediction market load balancing."""
def __init__(self):
self.rules = {
# REST API: round-robin across all instances
"api": {
"algorithm": "round_robin",
"health_check": "/health",
"health_interval_seconds": 10,
"max_connections_per_server": 10000,
},
# WebSocket: sticky sessions (IP hash)
# because WS connections are long-lived
"websocket": {
"algorithm": "ip_hash",
"health_check": "/ws/health",
"health_interval_seconds": 5,
"max_connections_per_server": 50000,
},
# Matching engine: route by market_id
# All orders for a given market go to the same instance
"matching": {
"algorithm": "consistent_hash",
"hash_key": "market_id",
"health_check": "/matching/health",
"health_interval_seconds": 3,
},
}
33.7.3 Sharding the Matching Engine
The matching engine is the most latency-sensitive and hardest-to-scale component. Since orders for a given market must be processed sequentially (to maintain order book integrity), we cannot simply load-balance matching requests.
The solution is sharding by market: each matching engine instance is responsible for a subset of markets. This is a form of data parallelism where each shard operates independently.
import hashlib
class MarketShardRouter:
"""Routes market operations to the correct shard."""
def __init__(self, num_shards: int):
self.num_shards = num_shards
self._shard_map: Dict[str, int] = {}
def get_shard(self, market_id: str) -> int:
"""Consistent hashing to determine shard assignment."""
if market_id not in self._shard_map:
hash_val = int(
hashlib.md5(market_id.encode()).hexdigest(), 16
)
self._shard_map[market_id] = hash_val % self.num_shards
return self._shard_map[market_id]
def rebalance(self, new_num_shards: int) -> Dict[str, int]:
"""
Returns a migration plan when changing shard count.
In production, use consistent hashing with virtual nodes
to minimize data movement.
"""
migrations = {}
old_num = self.num_shards
self.num_shards = new_num_shards
for market_id, old_shard in self._shard_map.items():
new_shard = int(
hashlib.md5(market_id.encode()).hexdigest(), 16
) % new_num_shards
if new_shard != old_shard:
migrations[market_id] = {
"from": old_shard,
"to": new_shard
}
self._shard_map[market_id] = new_shard
return migrations
33.7.4 Scaling Architecture Overview
A fully scaled prediction market platform looks like this:
+-------------------+
| CDN / Edge |
| (static assets, |
| rate limiting) |
+---------+---------+
|
+---------v---------+
| Load Balancer |
| (L7, TLS term.) |
+---------+---------+
|
+--------------------+--------------------+
| | |
+---------v-------+ +--------v--------+ +--------v--------+
| API Server 1 | | API Server 2 | | API Server N |
| (stateless) | | (stateless) | | (stateless) |
+---------+-------+ +--------+--------+ +--------+--------+
| | |
+---------+----------+---------+---------+
| |
+---------v-------+ +---------v-------+
| Matching Shard 1| | Matching Shard M|
| (markets A-F) | | (markets G-Z) |
+---------+-------+ +---------+-------+
| |
+---------v--------------------v--------+
| Event Store (Primary) |
+--------+---+---+---+---------+--------+
| | | | |
v v v v v
Read Replicas Redis Cluster
33.7.5 Auto-Scaling Policies
Auto-scaling rules should be based on leading indicators, not lagging ones:
class AutoScalingPolicy:
"""Defines when to scale API servers up or down."""
def __init__(self):
self.rules = {
"scale_up": {
"cpu_utilization_above": 70, # percent
"memory_utilization_above": 80, # percent
"request_latency_p99_above": 150, # milliseconds
"queue_depth_above": 1000, # messages
"evaluation_period": 120, # seconds
"cooldown_period": 300, # seconds
"scale_increment": 2, # add 2 instances
},
"scale_down": {
"cpu_utilization_below": 30,
"memory_utilization_below": 40,
"request_latency_p99_below": 50,
"queue_depth_below": 10,
"evaluation_period": 600, # 10 min (slower)
"cooldown_period": 600,
"scale_decrement": 1,
},
"limits": {
"min_instances": 3,
"max_instances": 50,
},
}
def evaluate(self, metrics: dict) -> str:
"""Returns 'scale_up', 'scale_down', or 'no_change'."""
up = self.rules["scale_up"]
down = self.rules["scale_down"]
if (metrics.get("cpu_utilization", 0) > up["cpu_utilization_above"]
or metrics.get("request_latency_p99", 0)
> up["request_latency_p99_above"]
or metrics.get("queue_depth", 0)
> up["queue_depth_above"]):
return "scale_up"
if (metrics.get("cpu_utilization", 0)
< down["cpu_utilization_below"]
and metrics.get("request_latency_p99", 0)
< down["request_latency_p99_below"]
and metrics.get("queue_depth", 0)
< down["queue_depth_below"]):
return "scale_down"
return "no_change"
33.8 Monitoring and Alerting
33.8.1 The Four Golden Signals
Google's Site Reliability Engineering book identifies four golden signals that every service should monitor:
-
Latency: The time it takes to serve a request. Distinguish between the latency of successful requests and failed requests (a fast error is still an error).
-
Traffic: The demand on the system. For prediction markets: requests per second, orders per second, WebSocket messages per second.
-
Errors: The rate of requests that fail. Include both explicit errors (HTTP 5xx) and implicit errors (HTTP 200 but with incorrect content).
-
Saturation: How "full" the system is. CPU usage, memory usage, disk I/O, database connection pool utilization, message queue depth.
33.8.2 Prediction Market-Specific Metrics
Beyond the four golden signals, prediction markets have domain-specific metrics that require monitoring:
from dataclasses import dataclass, field
from typing import Dict, List
import time
import statistics
@dataclass
class MarketMetrics:
"""Metrics specific to prediction market operations."""
# Matching engine performance
orders_per_second: float = 0.0
trades_per_second: float = 0.0
matching_latency_ms: List[float] = field(default_factory=list)
# Market health
active_markets: int = 0
markets_with_no_trades_24h: int = 0
total_open_orders: int = 0
total_positions: int = 0
# Price feed health
price_update_latency_ms: List[float] = field(default_factory=list)
websocket_connections: int = 0
stale_price_feeds: int = 0 # feeds not updated in > 5 seconds
# Financial metrics
total_volume_24h: float = 0.0
total_fees_collected_24h: float = 0.0
pending_settlements: int = 0
# Queue health
order_queue_depth: int = 0
notification_queue_depth: int = 0
settlement_queue_depth: int = 0
def summary(self) -> Dict:
return {
"orders_per_second": self.orders_per_second,
"trades_per_second": self.trades_per_second,
"matching_latency_p50": (
statistics.median(self.matching_latency_ms)
if self.matching_latency_ms else 0
),
"matching_latency_p99": (
(
sorted(self.matching_latency_ms)[
int(len(self.matching_latency_ms) * 0.99)
]
) if self.matching_latency_ms else 0
),
"active_markets": self.active_markets,
"websocket_connections": self.websocket_connections,
"order_queue_depth": self.order_queue_depth,
}
33.8.3 Prometheus-Style Metrics Collection
import time
from typing import Dict, Optional
from collections import defaultdict
import threading
class MetricsCollector:
"""
Collects and exposes metrics in Prometheus format.
Thread-safe for use in multi-threaded servers.
"""
def __init__(self):
self._counters: Dict[str, float] = defaultdict(float)
self._gauges: Dict[str, float] = defaultdict(float)
self._histograms: Dict[str, List[float]] = defaultdict(list)
self._lock = threading.Lock()
def increment_counter(
self, name: str, value: float = 1.0, labels: Dict = None
) -> None:
key = self._make_key(name, labels)
with self._lock:
self._counters[key] += value
def set_gauge(
self, name: str, value: float, labels: Dict = None
) -> None:
key = self._make_key(name, labels)
with self._lock:
self._gauges[key] = value
def observe_histogram(
self, name: str, value: float, labels: Dict = None
) -> None:
key = self._make_key(name, labels)
with self._lock:
self._histograms[key].append(value)
# Keep only last 10000 observations to bound memory
if len(self._histograms[key]) > 10000:
self._histograms[key] = self._histograms[key][-5000:]
def _make_key(self, name: str, labels: Optional[Dict]) -> str:
if labels:
label_str = ",".join(
f'{k}="{v}"' for k, v in sorted(labels.items())
)
return f"{name}{{{label_str}}}"
return name
def export_prometheus(self) -> str:
"""Export metrics in Prometheus text format."""
lines = []
with self._lock:
for key, value in self._counters.items():
lines.append(f"{key} {value}")
for key, value in self._gauges.items():
lines.append(f"{key} {value}")
for key, values in self._histograms.items():
if values:
sorted_vals = sorted(values)
n = len(sorted_vals)
lines.append(
f"{key}_p50 {sorted_vals[int(n * 0.5)]}"
)
lines.append(
f"{key}_p95 {sorted_vals[int(n * 0.95)]}"
)
lines.append(
f"{key}_p99 {sorted_vals[min(int(n * 0.99), n - 1)]}"
)
lines.append(
f"{key}_count {n}"
)
return "\n".join(lines)
# Global metrics instance
metrics = MetricsCollector()
class TimingContext:
"""Context manager for timing operations."""
def __init__(self, metric_name: str, labels: Dict = None):
self.metric_name = metric_name
self.labels = labels
self.start_time = None
def __enter__(self):
self.start_time = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
elapsed_ms = (time.perf_counter() - self.start_time) * 1000
metrics.observe_histogram(
self.metric_name, elapsed_ms, self.labels
)
if exc_type:
metrics.increment_counter(
f"{self.metric_name}_errors", labels=self.labels
)
return False
33.8.4 Health Checks
Health checks serve two purposes: load balancer routing and automated alerting. A prediction market platform needs both shallow and deep health checks:
import asyncio
import time
from typing import Dict
class HealthChecker:
"""
Multi-level health checks for prediction market services.
"""
def __init__(self, db_pool, redis_client, event_store):
self.db = db_pool
self.redis = redis_client
self.event_store = event_store
async def shallow_check(self) -> Dict:
"""
Fast check for load balancer.
Returns 200 if the process is running and accepting connections.
Should complete in < 1ms.
"""
return {"status": "ok", "timestamp": time.time()}
async def deep_check(self) -> Dict:
"""
Comprehensive check of all dependencies.
Used for alerting, not for load balancer routing.
"""
checks = {}
# Database connectivity
try:
start = time.perf_counter()
await self.db.fetchrow("SELECT 1")
latency = (time.perf_counter() - start) * 1000
checks["database"] = {
"status": "ok",
"latency_ms": round(latency, 2),
}
except Exception as e:
checks["database"] = {
"status": "error",
"error": str(e),
}
# Redis connectivity
try:
start = time.perf_counter()
await self.redis.ping()
latency = (time.perf_counter() - start) * 1000
checks["redis"] = {
"status": "ok",
"latency_ms": round(latency, 2),
}
except Exception as e:
checks["redis"] = {
"status": "error",
"error": str(e),
}
# Event store lag
try:
recent_events = self.event_store.get_all_events(
after_timestamp=time.time() - 60
)
checks["event_store"] = {
"status": "ok",
"events_last_60s": len(recent_events),
}
except Exception as e:
checks["event_store"] = {
"status": "error",
"error": str(e),
}
overall = "ok" if all(
c["status"] == "ok" for c in checks.values()
) else "degraded"
return {
"status": overall,
"checks": checks,
"timestamp": time.time(),
}
33.8.5 Alerting Rules
Alerts should be actionable. A good alert answers: what is broken, how badly, and what should be done about it.
@dataclass
class AlertRule:
name: str
condition: str # Human-readable description
metric: str
threshold: float
comparison: str # "above" or "below"
duration_seconds: int # Must be true for this long
severity: str # "critical", "warning", "info"
runbook_url: str # Link to remediation steps
PREDICTION_MARKET_ALERTS = [
AlertRule(
name="high_matching_latency",
condition="Matching engine p99 latency exceeds 200ms",
metric="matching_latency_ms_p99",
threshold=200,
comparison="above",
duration_seconds=120,
severity="critical",
runbook_url="/runbooks/high-matching-latency",
),
AlertRule(
name="order_queue_backing_up",
condition="Order queue depth exceeds 500 messages",
metric="order_queue_depth",
threshold=500,
comparison="above",
duration_seconds=60,
severity="critical",
runbook_url="/runbooks/order-queue-backup",
),
AlertRule(
name="high_error_rate",
condition="API error rate exceeds 5%",
metric="api_error_rate_percent",
threshold=5.0,
comparison="above",
duration_seconds=300,
severity="warning",
runbook_url="/runbooks/high-error-rate",
),
AlertRule(
name="database_connections_high",
condition="Database connection pool > 80% utilized",
metric="db_pool_utilization_percent",
threshold=80,
comparison="above",
duration_seconds=300,
severity="warning",
runbook_url="/runbooks/db-connections",
),
AlertRule(
name="stale_prices",
condition="More than 10 markets with stale prices (>30s old)",
metric="stale_price_feeds",
threshold=10,
comparison="above",
duration_seconds=60,
severity="warning",
runbook_url="/runbooks/stale-prices",
),
AlertRule(
name="settlement_delayed",
condition="Pending settlements older than 10 minutes",
metric="oldest_pending_settlement_seconds",
threshold=600,
comparison="above",
duration_seconds=60,
severity="critical",
runbook_url="/runbooks/settlement-delayed",
),
]
33.9 Security and DDoS Mitigation
33.9.1 Threat Model
Prediction markets face unique security threats:
- Market manipulation: Attackers flood the order book with fake orders to move prices, then trade on the manipulated price.
- DDoS for profit: Attackers prevent legitimate trading during an event, hoping their existing positions benefit from the information vacuum.
- Data exfiltration: Stealing user data, trading history, or internal market parameters.
- Account takeover: Gaining control of accounts to place unauthorized trades or withdraw funds.
- API abuse: Automated scraping of market data, excessive API usage, or brute-force attacks.
33.9.2 Rate Limiting
Rate limiting is the first line of defense against abuse. A prediction market needs multiple rate limiting tiers:
import time
from typing import Dict, Tuple, Optional
from collections import defaultdict
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
requests_per_second: float
burst_size: int
penalty_duration_seconds: int = 60
class TokenBucketRateLimiter:
"""
Token bucket rate limiter with per-user and per-IP limits.
"""
def __init__(self):
self._buckets: Dict[str, Tuple[float, float]] = {}
self._configs: Dict[str, RateLimitConfig] = {
"api_general": RateLimitConfig(
requests_per_second=10,
burst_size=20,
),
"order_placement": RateLimitConfig(
requests_per_second=5,
burst_size=10,
),
"market_data": RateLimitConfig(
requests_per_second=30,
burst_size=60,
),
"authentication": RateLimitConfig(
requests_per_second=1,
burst_size=5,
penalty_duration_seconds=300,
),
}
self._penalties: Dict[str, float] = {}
def check(
self,
identifier: str,
limit_type: str = "api_general",
) -> Tuple[bool, Dict]:
"""
Returns (allowed, info_dict).
info_dict contains remaining tokens and retry-after if blocked.
"""
now = time.time()
# Check penalty box
penalty_key = f"{identifier}:penalty"
if penalty_key in self._penalties:
if now < self._penalties[penalty_key]:
retry_after = self._penalties[penalty_key] - now
return False, {
"reason": "rate_limit_penalty",
"retry_after": retry_after,
}
else:
del self._penalties[penalty_key]
config = self._configs.get(limit_type)
if not config:
return True, {"remaining": -1}
bucket_key = f"{identifier}:{limit_type}"
if bucket_key in self._buckets:
tokens, last_time = self._buckets[bucket_key]
# Add tokens based on elapsed time
elapsed = now - last_time
tokens = min(
config.burst_size,
tokens + elapsed * config.requests_per_second,
)
else:
tokens = config.burst_size
if tokens >= 1:
self._buckets[bucket_key] = (tokens - 1, now)
return True, {"remaining": int(tokens - 1)}
else:
self._buckets[bucket_key] = (tokens, now)
return False, {
"reason": "rate_limit_exceeded",
"retry_after": (1 - tokens) / config.requests_per_second,
}
def penalize(self, identifier: str, limit_type: str) -> None:
"""Put an identifier in the penalty box."""
config = self._configs.get(limit_type)
if config:
penalty_key = f"{identifier}:penalty"
self._penalties[penalty_key] = (
time.time() + config.penalty_duration_seconds
)
33.9.3 Input Validation and SQL Injection Prevention
Every input from the user must be validated. Never trust client-side validation alone.
import re
from decimal import Decimal, InvalidOperation
from typing import Optional
class InputValidator:
"""Validates all user inputs for the prediction market API."""
# Only alphanumeric, spaces, and basic punctuation in market questions
QUESTION_PATTERN = re.compile(r'^[\w\s\?\.\,\-\:\;\!\'\"\(\)]{10,500}$')
# UUIDs
UUID_PATTERN = re.compile(
r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)
@staticmethod
def validate_price(price_str: str) -> Decimal:
"""Validate and parse a price value."""
try:
price = Decimal(price_str)
except (InvalidOperation, TypeError, ValueError):
raise ValueError("Price must be a valid decimal number")
if price <= 0 or price >= 1:
raise ValueError("Price must be between 0 and 1 (exclusive)")
# Limit precision to 4 decimal places
if price != round(price, 4):
raise ValueError("Price precision limited to 4 decimal places")
return price
@staticmethod
def validate_quantity(quantity_str: str) -> int:
"""Validate and parse a quantity value."""
try:
quantity = int(quantity_str)
except (TypeError, ValueError):
raise ValueError("Quantity must be an integer")
if quantity <= 0:
raise ValueError("Quantity must be positive")
if quantity > 1_000_000:
raise ValueError("Quantity exceeds maximum (1,000,000)")
return quantity
@classmethod
def validate_uuid(cls, value: str) -> str:
"""Validate a UUID string."""
if not cls.UUID_PATTERN.match(value.lower()):
raise ValueError("Invalid UUID format")
return value.lower()
@classmethod
def validate_question(cls, question: str) -> str:
"""Validate a market question."""
question = question.strip()
if not cls.QUESTION_PATTERN.match(question):
raise ValueError(
"Question must be 10-500 characters, "
"alphanumeric with basic punctuation"
)
return question
@staticmethod
def validate_outcome(outcome: str) -> str:
"""Validate an outcome name."""
outcome = outcome.strip()
if not outcome or len(outcome) > 100:
raise ValueError("Outcome must be 1-100 characters")
if not outcome.replace(' ', '').replace('-', '').isalnum():
raise ValueError("Outcome must be alphanumeric")
return outcome
@staticmethod
def validate_side(side: str) -> str:
"""Validate order side."""
side = side.lower().strip()
if side not in ("buy", "sell"):
raise ValueError("Side must be 'buy' or 'sell'")
return side
33.9.4 API Key Management
import hashlib
import secrets
import time
from dataclasses import dataclass
from typing import Dict, Optional, Set
@dataclass
class APIKey:
key_hash: str # We never store the raw key
user_id: str
permissions: Set[str] # e.g., {"read", "trade", "admin"}
created_at: float
expires_at: Optional[float]
rate_limit_tier: str # "standard", "premium", "institutional"
is_active: bool = True
class APIKeyManager:
"""Manages API keys for programmatic access."""
def __init__(self):
self._keys: Dict[str, APIKey] = {} # hash -> APIKey
def create_key(
self,
user_id: str,
permissions: Set[str],
rate_limit_tier: str = "standard",
expires_in_days: Optional[int] = 365,
) -> str:
"""
Create a new API key. Returns the raw key (only shown once).
"""
raw_key = f"pm_{secrets.token_urlsafe(32)}"
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
expires_at = None
if expires_in_days:
expires_at = time.time() + (expires_in_days * 86400)
self._keys[key_hash] = APIKey(
key_hash=key_hash,
user_id=user_id,
permissions=permissions,
created_at=time.time(),
expires_at=expires_at,
rate_limit_tier=rate_limit_tier,
)
return raw_key
def validate_key(
self,
raw_key: str,
required_permission: str,
) -> Optional[APIKey]:
"""Validate an API key and check permissions."""
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
api_key = self._keys.get(key_hash)
if not api_key:
return None
if not api_key.is_active:
return None
if api_key.expires_at and time.time() > api_key.expires_at:
api_key.is_active = False
return None
if required_permission not in api_key.permissions:
return None
return api_key
def revoke_key(self, raw_key: str) -> bool:
"""Revoke an API key."""
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
if key_hash in self._keys:
self._keys[key_hash].is_active = False
return True
return False
33.9.5 DDoS Mitigation Strategies
A layered defense approach:
Layer 1: Edge / CDN - Absorb volumetric attacks at the network edge. - Cache static assets and read-only API responses at CDN points of presence. - Use Anycast routing to distribute attack traffic globally.
Layer 2: Application Load Balancer - TLS termination (the TLS handshake itself can be used as a DDoS vector). - Connection rate limiting per IP. - Geographic blocking if the platform only operates in specific regions.
Layer 3: Application - Token bucket rate limiting (per user, per IP, per API key). - Request validation (reject malformed requests early, before they reach business logic). - Adaptive throttling: under load, reduce rate limits dynamically.
Layer 4: Infrastructure - Auto-scaling to absorb legitimate traffic spikes (which may resemble DDoS). - Circuit breakers to prevent cascading failures. - Graceful degradation: during extreme load, disable non-essential features.
class AdaptiveThrottler:
"""
Adjusts rate limits based on system load.
Under normal conditions, allows full rate.
Under high load, progressively reduces allowed rates.
"""
def __init__(self, base_rate: float = 10.0):
self.base_rate = base_rate
self.current_load = 0.0 # 0.0 to 1.0
def update_load(self, cpu_percent: float, queue_depth: int) -> None:
"""Update current load estimate."""
cpu_load = cpu_percent / 100.0
queue_load = min(queue_depth / 1000.0, 1.0)
self.current_load = max(cpu_load, queue_load)
@property
def effective_rate(self) -> float:
"""
Reduce rate limit as load increases.
At 50% load: full rate.
At 75% load: 50% rate.
At 90% load: 10% rate.
At 100% load: 1% rate (emergency mode).
"""
if self.current_load < 0.5:
return self.base_rate
# Exponential reduction above 50% load
reduction = ((self.current_load - 0.5) / 0.5) ** 2
return max(
self.base_rate * (1 - reduction * 0.99),
self.base_rate * 0.01,
)
def should_allow(self, identifier: str) -> bool:
"""
Probabilistic admission control.
Higher-tier users get priority.
"""
effective = self.effective_rate
if effective >= self.base_rate:
return True
# Admission probability
probability = effective / self.base_rate
return secrets.randbelow(1000) < int(probability * 1000)
33.9.6 Circuit Breaker Pattern
When a downstream service is failing, continuing to send it requests makes the problem worse. A circuit breaker stops requests to failing services and periodically tests if they have recovered:
class CircuitBreaker:
"""
Circuit breaker for downstream service calls.
States:
- CLOSED: normal operation, requests flow through
- OPEN: service is failing, requests are rejected immediately
- HALF_OPEN: testing if service has recovered
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 3,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = "CLOSED"
self.failure_count = 0
self.last_failure_time = 0.0
self.half_open_calls = 0
def can_proceed(self) -> bool:
if self.state == "CLOSED":
return True
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
self.half_open_calls = 0
return True
return False
if self.state == "HALF_OPEN":
if self.half_open_calls < self.half_open_max_calls:
self.half_open_calls += 1
return True
return False
return False
def record_success(self) -> None:
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
def record_failure(self) -> None:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
if self.state == "HALF_OPEN":
self.state = "OPEN"
33.10 Disaster Recovery and High Availability
33.10.1 Failure Modes
Understanding potential failure modes is the first step in designing for resilience:
| Failure Mode | Probability | Impact | Mitigation |
|---|---|---|---|
| Single server crash | High | Low (if redundant) | Auto-restart, health checks, load balancer drains |
| Database primary failure | Medium | High | Automated failover to standby |
| Network partition | Low | High | Multi-AZ deployment, split-brain detection |
| Entire AZ failure | Low | Very High | Multi-AZ, cross-region replication |
| Data corruption | Very Low | Critical | Point-in-time recovery, event sourcing replay |
| Security breach | Low | Critical | Incident response plan, key rotation |
33.10.2 Backup Strategies
@dataclass
class BackupPolicy:
"""Defines backup strategy for prediction market data."""
# Event store: the most critical data
event_store_backup = {
"type": "continuous_replication",
"method": "streaming_replication",
"replicas": 2,
"cross_region": True,
"retention_days": 365 * 7, # 7 years for regulatory compliance
}
# Database: point-in-time recovery
database_backup = {
"full_backup_interval_hours": 24,
"incremental_interval_hours": 1,
"wal_archiving": True, # Write-Ahead Log for PITR
"retention_days": 90,
"cross_region_copy": True,
}
# Redis: less critical (can be rebuilt from event store)
redis_backup = {
"rdb_interval_hours": 6,
"aof_enabled": True,
"retention_days": 7,
}
# Configuration and secrets
config_backup = {
"method": "version_controlled",
"secrets_method": "encrypted_vault",
}
33.10.3 Failover Mechanisms
Automated failover for the database:
class DatabaseFailoverManager:
"""Manages automated database failover."""
def __init__(
self,
primary_dsn: str,
standby_dsns: List[str],
health_check_interval: float = 5.0,
failure_threshold: int = 3,
):
self.primary_dsn = primary_dsn
self.standby_dsns = standby_dsns
self.health_check_interval = health_check_interval
self.failure_threshold = failure_threshold
self.consecutive_failures = 0
self.is_primary_healthy = True
async def health_check_loop(self):
"""Continuously monitor primary health."""
while True:
try:
healthy = await self._check_primary()
if healthy:
self.consecutive_failures = 0
self.is_primary_healthy = True
else:
self.consecutive_failures += 1
if (self.consecutive_failures
>= self.failure_threshold):
await self._initiate_failover()
except Exception as e:
self.consecutive_failures += 1
if (self.consecutive_failures
>= self.failure_threshold):
await self._initiate_failover()
await asyncio.sleep(self.health_check_interval)
async def _check_primary(self) -> bool:
"""Check if primary database is responsive."""
try:
# Attempt a simple query with timeout
pool = await asyncpg.create_pool(
self.primary_dsn, min_size=1, max_size=1
)
async with pool.acquire() as conn:
await asyncio.wait_for(
conn.fetchval("SELECT 1"), timeout=5.0
)
await pool.close()
return True
except Exception:
return False
async def _initiate_failover(self):
"""Promote standby to primary."""
self.is_primary_healthy = False
print(
f"ALERT: Primary database failed after "
f"{self.consecutive_failures} checks. "
f"Initiating failover..."
)
# In a real system, this would:
# 1. Promote the standby to primary
# 2. Update DNS or connection strings
# 3. Notify on-call engineers
# 4. Log the failover event
if self.standby_dsns:
self.primary_dsn = self.standby_dsns.pop(0)
self.consecutive_failures = 0
print(f"Failover complete. New primary: {self.primary_dsn}")
33.10.4 Multi-Region Deployment
For prediction markets serving a global audience, multi-region deployment provides both performance (lower latency for geographically distributed users) and resilience (survives regional outages).
Key considerations:
-
Single writer, multiple readers: The event store has one primary writer in one region. Read replicas exist in all regions.
-
Latency budget: A user in Europe placing an order on a platform with its primary in the US will experience at least 70--100ms of network latency. This is acceptable for most prediction market use cases.
-
Conflict resolution: If the primary region fails and a standby is promoted, any writes that were acknowledged by the old primary but not yet replicated are potentially lost. Event sourcing mitigates this: clients can resend unacknowledged commands.
-
Regulatory considerations: Some jurisdictions require data to remain within geographic boundaries. This constrains replication topology.
33.10.5 Runbook Template
Every alert should link to a runbook. Here is a template:
## Runbook: [Alert Name]
### Symptoms
- What the alert looks like
- What users are experiencing
### Impact
- Which services are affected
- Revenue impact estimate
### Diagnosis
1. Check [specific dashboard]
2. Run [specific query/command]
3. Look for [specific log pattern]
### Resolution
#### Immediate Mitigation
1. [Step to reduce immediate impact]
#### Root Cause Fix
1. [Step to fix the underlying issue]
### Escalation
- On-call: [team/person]
- Escalate to [team] if not resolved in [time]
### Post-Incident
- [ ] Write incident report
- [ ] Update monitoring if needed
- [ ] Schedule follow-up work
33.11 Performance Testing
33.11.1 Why Performance Testing Matters
The worst time to discover your system cannot handle the load is during the load. Performance testing should be continuous, automated, and realistic.
33.11.2 Types of Performance Tests
-
Load testing: Verify the system handles expected peak load. Run the system at 2x expected peak for a sustained period.
-
Stress testing: Find the breaking point. Gradually increase load until the system fails. Document how it fails (gracefully or catastrophically).
-
Soak testing: Run at moderate load for extended periods (24--72 hours). Detect memory leaks, connection pool exhaustion, log rotation failures.
-
Spike testing: Simulate sudden traffic spikes (e.g., breaking news). Verify auto-scaling responds quickly enough.
33.11.3 Load Testing with Locust
"""
Load test suite for prediction market API.
Run with: locust -f load_test.py --host=http://localhost:8000
"""
import random
import string
import json
import uuid
# Note: In practice, import from locust
# from locust import HttpUser, task, between, events
class PredictionMarketLoadTest:
"""
Simulates realistic prediction market usage patterns.
Can be adapted for use with locust or any load testing framework.
"""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.market_ids = []
self.api_key = None
self.wait_time_min = 1
self.wait_time_max = 5
def on_start(self):
"""Setup: create account and get API key."""
# Register
username = f"loadtest_{''.join(random.choices(string.ascii_lowercase, k=8))}"
response = self._post("/api/register", {
"username": username,
"password": "LoadTest123!",
})
self.api_key = response.get("api_key")
# Deposit funds
self._post("/api/account/deposit", {
"amount": 100000,
})
# Get available markets
markets = self._get("/api/markets")
self.market_ids = [m["market_id"] for m in markets.get("markets", [])]
def task_browse_markets(self):
"""Most common action: browsing markets."""
self._get("/api/markets")
def task_view_market(self):
"""View a specific market's details."""
if self.market_ids:
market_id = random.choice(self.market_ids)
self._get(f"/api/markets/{market_id}")
self._get(f"/api/markets/{market_id}/orderbook")
self._get(f"/api/markets/{market_id}/trades")
def task_place_order(self):
"""Place a limit order."""
if self.market_ids:
market_id = random.choice(self.market_ids)
self._post(f"/api/markets/{market_id}/orders", {
"outcome": "Yes",
"side": random.choice(["buy", "sell"]),
"price": round(random.uniform(0.1, 0.9), 2),
"quantity": random.randint(1, 100),
})
def task_check_portfolio(self):
"""Check portfolio and positions."""
self._get("/api/portfolio")
self._get("/api/portfolio/positions")
self._get("/api/portfolio/orders")
def task_cancel_order(self):
"""Cancel a random open order."""
orders = self._get("/api/portfolio/orders")
open_orders = [
o for o in orders.get("orders", [])
if o.get("status") == "open"
]
if open_orders:
order = random.choice(open_orders)
self._delete(
f"/api/markets/{order['market_id']}"
f"/orders/{order['order_id']}"
)
def _get(self, path: str) -> dict:
"""Simulated GET request."""
# In a real load test, this would make an HTTP request
print(f"GET {self.base_url}{path}")
return {}
def _post(self, path: str, data: dict) -> dict:
"""Simulated POST request."""
print(f"POST {self.base_url}{path} {json.dumps(data)}")
return {}
def _delete(self, path: str) -> dict:
"""Simulated DELETE request."""
print(f"DELETE {self.base_url}{path}")
return {}
class LoadTestScenarios:
"""Predefined load test scenarios."""
@staticmethod
def normal_day():
"""Simulate a normal trading day."""
return {
"users": 100,
"spawn_rate": 10, # users per second
"duration_minutes": 30,
"task_weights": {
"browse_markets": 40,
"view_market": 30,
"place_order": 15,
"check_portfolio": 10,
"cancel_order": 5,
},
}
@staticmethod
def election_night():
"""Simulate election night traffic."""
return {
"users": 10000,
"spawn_rate": 500,
"duration_minutes": 240,
"task_weights": {
"browse_markets": 10,
"view_market": 25,
"place_order": 45,
"check_portfolio": 15,
"cancel_order": 5,
},
}
@staticmethod
def breaking_news_spike():
"""Simulate a sudden breaking news event."""
return {
"phases": [
{
"name": "calm_before",
"users": 200,
"duration_minutes": 5,
},
{
"name": "spike",
"users": 5000,
"spawn_rate": 1000,
"duration_minutes": 10,
},
{
"name": "sustained",
"users": 3000,
"duration_minutes": 30,
},
{
"name": "cooldown",
"users": 500,
"duration_minutes": 15,
},
],
}
33.11.4 Capacity Planning
Capacity planning translates load test results into infrastructure requirements:
class CapacityPlanner:
"""
Estimates required infrastructure based on
expected traffic and load test results.
"""
def __init__(self):
self.benchmarks = {
"api_server": {
"max_rps_per_instance": 500,
"cpu_cores": 4,
"memory_gb": 8,
},
"matching_engine": {
"max_orders_per_second": 10000,
"cpu_cores": 8,
"memory_gb": 16,
},
"database": {
"max_connections": 200,
"max_tps": 5000,
"storage_per_trade_bytes": 256,
},
"redis": {
"max_ops_per_second": 100000,
"memory_per_market_bytes": 10240,
},
}
def plan_for_event(
self,
expected_peak_rps: int,
expected_peak_orders_per_second: int,
active_markets: int,
headroom_factor: float = 1.5,
) -> dict:
"""Generate capacity plan for an expected event."""
plan = {}
# API servers
api_benchmark = self.benchmarks["api_server"]
api_instances = int(
(expected_peak_rps * headroom_factor)
/ api_benchmark["max_rps_per_instance"]
) + 1
plan["api_servers"] = {
"instances": max(api_instances, 3), # minimum 3 for HA
"cpu_cores_each": api_benchmark["cpu_cores"],
"memory_gb_each": api_benchmark["memory_gb"],
}
# Matching engine shards
me_benchmark = self.benchmarks["matching_engine"]
me_shards = int(
(expected_peak_orders_per_second * headroom_factor)
/ me_benchmark["max_orders_per_second"]
) + 1
plan["matching_engine"] = {
"shards": max(me_shards, 2),
"cpu_cores_each": me_benchmark["cpu_cores"],
"memory_gb_each": me_benchmark["memory_gb"],
}
# Database
db_benchmark = self.benchmarks["database"]
plan["database"] = {
"primary_instances": 1,
"read_replicas": max(api_instances // 5, 2),
"connection_pool_size": min(
api_instances * 10,
db_benchmark["max_connections"]
),
}
# Redis
redis_benchmark = self.benchmarks["redis"]
redis_memory = (
active_markets
* redis_benchmark["memory_per_market_bytes"]
/ (1024 ** 3)
)
plan["redis"] = {
"instances": max(int(redis_memory / 8) + 1, 3),
"memory_gb_each": 8,
}
return plan
# Example usage:
planner = CapacityPlanner()
election_plan = planner.plan_for_event(
expected_peak_rps=50000,
expected_peak_orders_per_second=5000,
active_markets=500,
headroom_factor=2.0,
)
33.11.5 Bottleneck Identification
Performance bottlenecks in prediction markets typically appear in this order:
- Database connections --- the first thing to saturate.
- Lock contention --- multiple orders for the same market compete for the same locks.
- Network bandwidth --- WebSocket price feeds to many clients.
- CPU on matching engine --- sorting and matching order books.
- Memory --- holding large order books in memory.
Each bottleneck has a characteristic signature in the metrics:
| Bottleneck | Latency Pattern | CPU | Memory | Queue Depth |
|---|---|---|---|---|
| DB connections | Sudden spike, then timeouts | Low | Normal | Grows rapidly |
| Lock contention | Gradual increase, specific markets | Medium | Normal | Moderate |
| Network | Normal server latency, high client latency | Low | Normal | Normal |
| CPU (matching) | Gradual increase, all markets | High | Normal | Grows slowly |
| Memory | Normal until OOM, then crash | Normal | Growing | Normal until crash |
33.12 Chapter Summary
This chapter covered the full spectrum of production operations for prediction markets:
-
Event sourcing provides an immutable, auditable record of all market activity. The event log is the source of truth, and current state is always derivable from events. This is particularly valuable for prediction markets, where regulatory compliance and dispute resolution require a complete audit trail.
-
CQRS separates the write path (order submission, matching) from the read path (market prices, portfolios, analytics). Each side can be optimized independently, and read models can be scaled horizontally without affecting the consistency of writes.
-
Database optimization through careful indexing (especially partial indexes for open orders), connection pooling, read replicas, and time-based partitioning keeps the data layer responsive under load.
-
Caching at multiple levels (in-process, Redis, CDN) reduces the load on the database and speeds up the most common queries. Cache invalidation is driven by events, with TTL as a safety net.
-
Message queues decouple the critical trading path from secondary operations like notifications and analytics. Dead letter queues and retry strategies ensure reliability.
-
Horizontal scaling is enabled by stateless API servers, market-sharded matching engines, and read replicas. Auto-scaling policies based on leading indicators allow the platform to respond to traffic changes.
-
Monitoring and alerting based on the four golden signals (latency, traffic, errors, saturation) plus domain-specific metrics (matching latency, queue depths, stale prices) provides operational visibility. Every alert links to a runbook.
-
Security and DDoS mitigation uses layered defenses: rate limiting, input validation, API key management, adaptive throttling, and circuit breakers. The threat model is specific to prediction markets, where attacks may be financially motivated.
-
Disaster recovery through continuous replication, automated failover, and multi-region deployment ensures the platform survives infrastructure failures. Event sourcing simplifies recovery by making state reconstructible from events.
-
Performance testing with realistic scenarios (normal day, election night, breaking news spike) validates capacity plans and identifies bottlenecks before they affect real users.
Together, these practices transform a prediction market from a prototype into a production system that can reliably serve thousands of concurrent traders during the most important events.
What's Next
With Part V complete, we have covered the full breadth of market design and mechanism engineering: from matching engines and automated market makers (Chapters 29--30), through fee structures and liquidity provision (Chapters 31--32), to the production infrastructure in this chapter.
Part VI: Blockchain and Decentralized Markets takes these concepts into the decentralized world. Chapter 34 introduces blockchain fundamentals and smart contract platforms. Chapter 35 explores how prediction markets work on Ethereum, Gnosis Chain, and other networks. Chapter 36 examines oracle mechanisms --- the critical problem of getting real-world outcomes onto the blockchain. And Chapter 37 addresses the unique challenges of decentralized governance and dispute resolution.
The architectural patterns from this chapter --- event sourcing, CQRS, monitoring --- apply equally to decentralized systems, where they are implemented through smart contract event logs, The Graph (a decentralized query layer), and on-chain monitoring. The transition from centralized to decentralized infrastructure is not a wholesale replacement but an evolution of the same principles.
End of Chapter 33