In the previous four chapters, we studied market design principles, liquidity provision through automated market makers, combinatorial markets, and decision markets. All of that theory now converges into practice. In this chapter, we roll up our...
Learning Objectives
- Design (Create) a modular prediction market platform architecture using FastAPI with proper separation of concerns
- Implement (Apply) a continuous double auction order book engine with limit and market order support
- Implement (Apply) Hanson's LMSR automated market maker with correct cost and price functions
- Construct (Create) a RESTful API with endpoints for market creation, order submission, position tracking, and resolution
- Integrate (Apply) JWT-based user authentication and authorization into the platform
- Evaluate (Evaluate) the trade-offs between order book and AMM-based market mechanisms for different use cases
- Synthesize (Create) a basic frontend interface that consumes the prediction market API
In This Chapter
- 32.1 Platform Architecture and Project Setup
- 32.2 The Order Book Engine
- 32.3 The LMSR Automated Market Maker
- 32.4 REST API Design
- 32.5 User Authentication with JWT
- 32.6 Market Resolution and Payout Mechanics
- 32.7 Trading Service Integration
- 32.8 Basic Frontend Considerations
- 32.9 Putting It All Together
- 32.10 Chapter Summary
Chapter 32: Building a Prediction Market Platform from Scratch
In the previous four chapters, we studied market design principles, liquidity provision through automated market makers, combinatorial markets, and decision markets. All of that theory now converges into practice. In this chapter, we roll up our sleeves and build a fully functional prediction market platform—from the database schema to the REST API to a minimal web interface.
Building a platform is an exercise in engineering discipline. A prediction market must handle concurrent users placing orders, compute prices in real time, manage user funds, resolve markets fairly, and expose all of this through a clean API. We will use FastAPI as our web framework because of its native support for type hints, automatic OpenAPI documentation, and async capabilities. Our platform will support two pricing mechanisms: a traditional order book for price discovery among active traders, and Hanson's Logarithmic Market Scoring Rule (LMSR) for guaranteed liquidity.
By the end of this chapter, you will have a working codebase that you can run locally, test with HTTP requests, and extend into a production system (which is the focus of Chapter 33).
32.1 Platform Architecture and Project Setup
32.1.1 High-Level Architecture
A prediction market platform consists of several interacting subsystems. Before writing any code, we need to understand how they fit together.
┌─────────────────────────────────────────────────────────┐
│ Frontend (HTML/JS) │
│ or Mobile App / Third-party Client │
└──────────────────────────┬──────────────────────────────┘
│ HTTP/JSON
┌──────────────────────────▼──────────────────────────────┐
│ FastAPI Application │
│ ┌──────────┐ ┌──────────┐ ┌───────────┐ ┌────────┐ │
│ │ Auth │ │ Markets │ │ Trading │ │ Admin │ │
│ │ Router │ │ Router │ │ Router │ │ Router │ │
│ └────┬─────┘ └────┬─────┘ └─────┬─────┘ └───┬────┘ │
│ │ │ │ │ │
│ ┌────▼──────────────▼──────────────▼─────────────▼───┐ │
│ │ Service Layer │ │
│ │ ┌───────────┐ ┌────────────┐ ┌───────────────┐ │ │
│ │ │ User Svc │ │ Market Svc │ │ Trading Svc │ │ │
│ │ └───────────┘ └────────────┘ └───────┬───────┘ │ │
│ │ │ │ │
│ │ ┌──────────────┬────────────┘ │ │
│ │ ▼ ▼ │ │
│ │ ┌──────────────┐ ┌───────────┐ │ │
│ │ │ Order Book │ │ LMSR │ │ │
│ │ │ Engine │ │ AMM │ │ │
│ │ └──────────────┘ └───────────┘ │ │
│ └────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌────────────────────────▼───────────────────────────┐ │
│ │ Database (SQLite / PostgreSQL) │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
The architecture follows a layered pattern:
- Routers handle HTTP request/response concerns (validation, serialization, status codes).
- Services contain business logic (creating markets, executing trades, resolving outcomes).
- Engines implement the core market mechanisms (order matching, AMM pricing).
- Models define the database schema and Pydantic schemas for data validation.
💡 Intuition: Think of the layered architecture like a restaurant. The routers are the waitstaff (taking orders and delivering food), the services are the kitchen management (coordinating what gets cooked), and the engines are the actual cooking techniques (the recipes and methods). Each layer has a clear responsibility.
32.1.2 Project Structure
We organize the codebase into a clean directory layout:
prediction_market/
├── main.py # FastAPI app entry point
├── config.py # Configuration and settings
├── database.py # Database connection and session
├── models/
│ ├── __init__.py
│ ├── database_models.py # SQLAlchemy ORM models
│ └── schemas.py # Pydantic request/response schemas
├── routers/
│ ├── __init__.py
│ ├── auth.py # Authentication endpoints
│ ├── markets.py # Market CRUD endpoints
│ └── trading.py # Order and trade endpoints
├── services/
│ ├── __init__.py
│ ├── auth_service.py # JWT and password hashing
│ ├── market_service.py # Market lifecycle management
│ └── trading_service.py # Order processing and execution
├── engines/
│ ├── __init__.py
│ ├── order_book.py # Continuous double auction
│ └── lmsr.py # LMSR automated market maker
└── tests/
├── __init__.py
├── test_order_book.py
└── test_lmsr.py
32.1.3 Setting Up FastAPI
FastAPI is a modern Python web framework built on top of Starlette and Pydantic. It provides automatic request validation, serialization, and interactive API documentation. Here is the entry point for our application:
"""Main application entry point for the prediction market platform."""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from database import engine, Base
from routers import auth, markets, trading
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Handle application startup and shutdown events.
Creates database tables on startup and performs cleanup on shutdown.
"""
Base.metadata.create_all(bind=engine)
yield
# Cleanup resources on shutdown if needed
app = FastAPI(
title="Prediction Market Platform",
description="A complete prediction market with order book and LMSR AMM support",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Restrict in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router, prefix="/api/v1/auth", tags=["Authentication"])
app.include_router(markets.router, prefix="/api/v1/markets", tags=["Markets"])
app.include_router(trading.router, prefix="/api/v1/trading", tags=["Trading"])
@app.get("/health")
async def health_check():
"""Return the health status of the platform."""
return {"status": "healthy", "version": "1.0.0"}
✅ Best Practice: Always version your API (
/api/v1/) from the start. When you need breaking changes, you can introduce/api/v2/without disrupting existing clients.
32.1.4 Database Configuration
We use SQLAlchemy as our ORM with SQLite for development. The same code works with PostgreSQL in production by simply changing the connection string.
"""Database connection and session management."""
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "sqlite:///./prediction_market.db"
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False}, # SQLite-specific
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""Provide a database session for dependency injection.
Yields:
Session: A SQLAlchemy database session that is automatically
closed after the request completes.
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
32.1.5 Database Models
Our platform needs to represent users, markets, outcomes, orders, trades, and positions. Here are the core SQLAlchemy models:
"""SQLAlchemy database models for the prediction market platform."""
import enum
from datetime import datetime
from sqlalchemy import (
Column, Integer, String, Float, DateTime, Boolean,
ForeignKey, Enum, Text
)
from sqlalchemy.orm import relationship
from database import Base
class MarketType(str, enum.Enum):
"""Enumeration of supported market mechanism types."""
ORDER_BOOK = "order_book"
LMSR = "lmsr"
class MarketStatus(str, enum.Enum):
"""Enumeration of market lifecycle states."""
OPEN = "open"
CLOSED = "closed"
RESOLVED = "resolved"
class OrderSide(str, enum.Enum):
"""Side of a trade order."""
BUY = "buy"
SELL = "sell"
class OrderType(str, enum.Enum):
"""Type of order execution."""
LIMIT = "limit"
MARKET = "market"
class OrderStatus(str, enum.Enum):
"""Current status of an order."""
PENDING = "pending"
PARTIALLY_FILLED = "partially_filled"
FILLED = "filled"
CANCELLED = "cancelled"
class User(Base):
"""Represents a platform user with authentication credentials and balance."""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
balance = Column(Float, default=1000.0) # Starting balance
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
orders = relationship("Order", back_populates="user")
positions = relationship("Position", back_populates="user")
class Market(Base):
"""Represents a prediction market with its configuration and state."""
__tablename__ = "markets"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(Text)
market_type = Column(Enum(MarketType), nullable=False)
status = Column(Enum(MarketStatus), default=MarketStatus.OPEN)
creator_id = Column(Integer, ForeignKey("users.id"))
liquidity_parameter = Column(Float, default=100.0) # b for LMSR
resolution_source = Column(String(500))
closes_at = Column(DateTime, nullable=False)
resolved_at = Column(DateTime, nullable=True)
winning_outcome_id = Column(Integer, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
outcomes = relationship("Outcome", back_populates="market")
orders = relationship("Order", back_populates="market")
class Outcome(Base):
"""Represents one possible outcome of a prediction market."""
__tablename__ = "outcomes"
id = Column(Integer, primary_key=True, index=True)
market_id = Column(Integer, ForeignKey("markets.id"))
name = Column(String(100), nullable=False)
shares_outstanding = Column(Float, default=0.0) # For LMSR tracking
market = relationship("Market", back_populates="outcomes")
class Order(Base):
"""Represents a buy or sell order placed by a user."""
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
market_id = Column(Integer, ForeignKey("markets.id"))
outcome_id = Column(Integer, ForeignKey("outcomes.id"))
side = Column(Enum(OrderSide), nullable=False)
order_type = Column(Enum(OrderType), nullable=False)
price = Column(Float, nullable=True) # Null for market orders
quantity = Column(Float, nullable=False)
filled_quantity = Column(Float, default=0.0)
status = Column(Enum(OrderStatus), default=OrderStatus.PENDING)
created_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="orders")
market = relationship("Market", back_populates="orders")
class Trade(Base):
"""Records a completed trade between two orders or against the AMM."""
__tablename__ = "trades"
id = Column(Integer, primary_key=True, index=True)
market_id = Column(Integer, ForeignKey("markets.id"))
outcome_id = Column(Integer, ForeignKey("outcomes.id"))
buyer_order_id = Column(Integer, ForeignKey("orders.id"), nullable=True)
seller_order_id = Column(Integer, ForeignKey("orders.id"), nullable=True)
price = Column(Float, nullable=False)
quantity = Column(Float, nullable=False)
is_amm_trade = Column(Boolean, default=False)
executed_at = Column(DateTime, default=datetime.utcnow)
class Position(Base):
"""Tracks a user's share holdings in a particular outcome."""
__tablename__ = "positions"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
market_id = Column(Integer, ForeignKey("markets.id"))
outcome_id = Column(Integer, ForeignKey("outcomes.id"))
shares = Column(Float, default=0.0)
average_cost = Column(Float, default=0.0)
user = relationship("User", back_populates="positions")
⚠️ Common Pitfall: Using
Floatfor financial values introduces floating-point precision errors. In a production system, useDecimaltypes or store values as integers representing the smallest unit (e.g., cents). We useFloathere for clarity, but Chapter 33 covers the production-grade approach.
32.2 The Order Book Engine
An order book is the most traditional mechanism for price discovery. It collects buy orders (bids) and sell orders (asks), then matches them when prices cross. Our implementation supports both limit orders (specifying a maximum buy price or minimum sell price) and market orders (executing at the best available price).
32.2.1 Order Book Data Structures
The order book is organized as two sorted lists: bids sorted by price descending (highest bid first) and asks sorted by price ascending (lowest ask first). A trade occurs when the highest bid meets or exceeds the lowest ask.
"""Continuous double auction order book engine.
This module implements a price-time priority order book that supports
limit orders, market orders, and partial fills. It is designed to be
used as a pluggable engine within the prediction market platform.
"""
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict
import heapq
import time
@dataclass
class BookOrder:
"""Represents an order in the order book.
Attributes:
order_id: Unique identifier for the order.
user_id: ID of the user who placed the order.
outcome_id: ID of the market outcome being traded.
side: Either 'buy' or 'sell'.
price: Limit price (None for market orders).
quantity: Number of shares.
remaining: Unfilled quantity.
timestamp: Time the order was placed (for priority).
"""
order_id: int
user_id: int
outcome_id: int
side: str
price: Optional[float]
quantity: float
remaining: float = 0.0
timestamp: float = field(default_factory=time.time)
def __post_init__(self):
if self.remaining == 0.0:
self.remaining = self.quantity
@dataclass
class TradeExecution:
"""Records the result of a matched trade.
Attributes:
buy_order_id: ID of the buy order.
sell_order_id: ID of the sell order.
price: Execution price.
quantity: Number of shares traded.
timestamp: Time the trade was executed.
"""
buy_order_id: int
sell_order_id: int
price: float
quantity: float
timestamp: float = field(default_factory=time.time)
32.2.2 The Matching Algorithm
The matching engine is the heart of the order book. It follows price-time priority: orders at better prices are matched first, and among orders at the same price, earlier orders take precedence.
class OrderBook:
"""A continuous double auction order book with price-time priority.
The order book maintains separate bid and ask sides for each outcome
in a market. When a new order arrives, it is matched against existing
orders on the opposite side. Any unmatched remainder is added to the book.
Attributes:
bids: Dictionary mapping outcome_id to list of buy orders (max-heap).
asks: Dictionary mapping outcome_id to list of sell orders (min-heap).
"""
def __init__(self):
"""Initialize an empty order book."""
# bids: max-heap (negate price for heapq), asks: min-heap
self.bids: dict[int, list[tuple]] = defaultdict(list)
self.asks: dict[int, list[tuple]] = defaultdict(list)
self._orders: dict[int, BookOrder] = {}
def submit_order(self, order: BookOrder) -> list[TradeExecution]:
"""Submit an order to the book and attempt to match it.
Args:
order: The order to submit.
Returns:
A list of TradeExecution objects for any matches that occurred.
Raises:
ValueError: If the order has invalid parameters.
"""
if order.quantity <= 0:
raise ValueError("Order quantity must be positive")
if order.side not in ("buy", "sell"):
raise ValueError("Order side must be 'buy' or 'sell'")
self._orders[order.order_id] = order
trades = self._match_order(order)
# If there is remaining quantity on a limit order, add it to the book
if order.remaining > 0 and order.price is not None:
self._add_to_book(order)
return trades
def _match_order(self, order: BookOrder) -> list[TradeExecution]:
"""Attempt to match an incoming order against the opposite side.
The matching follows price-time priority. For a buy order, it matches
against the lowest-priced asks. For a sell order, it matches against
the highest-priced bids.
Args:
order: The incoming order to match.
Returns:
List of trade executions from matching.
"""
trades: list[TradeExecution] = []
oid = order.outcome_id
if order.side == "buy":
opposite_book = self.asks[oid]
else:
opposite_book = self.bids[oid]
while order.remaining > 0 and opposite_book:
if order.side == "buy":
# asks is a min-heap: (price, timestamp, order_id)
best_price, best_time, best_id = opposite_book[0]
else:
# bids is a max-heap: (-price, timestamp, order_id)
neg_price, best_time, best_id = opposite_book[0]
best_price = -neg_price
# Check if the best_id still refers to a valid, active order
if best_id not in self._orders:
heapq.heappop(opposite_book)
continue
best_order = self._orders[best_id]
if best_order.remaining <= 0:
heapq.heappop(opposite_book)
continue
# Check price compatibility
if order.side == "buy":
# For a buy: can match if buy price >= ask price
# Market orders (price=None) match any ask
if order.price is not None and order.price < best_price:
break # No more matches possible
else:
# For a sell: can match if sell price <= bid price
# Market orders (price=None) match any bid
if order.price is not None and order.price > best_price:
break
# Determine trade quantity and price
trade_qty = min(order.remaining, best_order.remaining)
# Trade executes at the resting order's price (maker price)
trade_price = best_price
# Execute the trade
order.remaining -= trade_qty
best_order.remaining -= trade_qty
if order.side == "buy":
trade = TradeExecution(
buy_order_id=order.order_id,
sell_order_id=best_order.order_id,
price=trade_price,
quantity=trade_qty,
)
else:
trade = TradeExecution(
buy_order_id=best_order.order_id,
sell_order_id=order.order_id,
price=trade_price,
quantity=trade_qty,
)
trades.append(trade)
# Remove fully filled resting order
if best_order.remaining <= 0:
heapq.heappop(opposite_book)
return trades
def _add_to_book(self, order: BookOrder) -> None:
"""Add a limit order with remaining quantity to the book.
Args:
order: The limit order to add.
"""
oid = order.outcome_id
if order.side == "buy":
# Max-heap: negate price
heapq.heappush(
self.bids[oid],
(-order.price, order.timestamp, order.order_id)
)
else:
# Min-heap
heapq.heappush(
self.asks[oid],
(order.price, order.timestamp, order.order_id)
)
def cancel_order(self, order_id: int) -> bool:
"""Cancel an existing order by marking it as having zero remaining.
The order entry in the heap will be lazily removed during matching.
Args:
order_id: The ID of the order to cancel.
Returns:
True if the order was found and cancelled, False otherwise.
"""
if order_id in self._orders:
self._orders[order_id].remaining = 0
return True
return False
def get_best_bid(self, outcome_id: int) -> Optional[float]:
"""Return the highest bid price for an outcome.
Args:
outcome_id: The outcome to check.
Returns:
The best bid price, or None if no bids exist.
"""
book = self.bids[outcome_id]
while book:
neg_price, _, oid = book[0]
if oid in self._orders and self._orders[oid].remaining > 0:
return -neg_price
heapq.heappop(book)
return None
def get_best_ask(self, outcome_id: int) -> Optional[float]:
"""Return the lowest ask price for an outcome.
Args:
outcome_id: The outcome to check.
Returns:
The best ask price, or None if no asks exist.
"""
book = self.asks[outcome_id]
while book:
price, _, oid = book[0]
if oid in self._orders and self._orders[oid].remaining > 0:
return price
heapq.heappop(book)
return None
def get_spread(self, outcome_id: int) -> Optional[float]:
"""Return the bid-ask spread for an outcome.
Args:
outcome_id: The outcome to check.
Returns:
The spread (ask - bid), or None if either side is empty.
"""
bid = self.get_best_bid(outcome_id)
ask = self.get_best_ask(outcome_id)
if bid is not None and ask is not None:
return ask - bid
return None
def get_depth(self, outcome_id: int, levels: int = 5) -> dict:
"""Return the order book depth for an outcome.
Args:
outcome_id: The outcome to check.
levels: Number of price levels to return on each side.
Returns:
Dictionary with 'bids' and 'asks' lists of (price, quantity) tuples.
"""
bid_levels: list[tuple[float, float]] = []
ask_levels: list[tuple[float, float]] = []
# Collect bid levels
temp_bids = list(self.bids[outcome_id])
seen_prices: dict[float, float] = {}
for neg_price, _, oid in sorted(temp_bids):
if oid in self._orders and self._orders[oid].remaining > 0:
price = -neg_price
seen_prices[price] = seen_prices.get(price, 0) + self._orders[oid].remaining
for price in sorted(seen_prices.keys(), reverse=True)[:levels]:
bid_levels.append((price, seen_prices[price]))
# Collect ask levels
temp_asks = list(self.asks[outcome_id])
seen_prices = {}
for ask_price, _, oid in sorted(temp_asks):
if oid in self._orders and self._orders[oid].remaining > 0:
seen_prices[ask_price] = seen_prices.get(ask_price, 0) + self._orders[oid].remaining
for price in sorted(seen_prices.keys())[:levels]:
ask_levels.append((price, seen_prices[price]))
return {"bids": bid_levels, "asks": ask_levels}
32.2.3 How Matching Works
Let us trace through a concrete example. Suppose we have a market on whether it will rain tomorrow with two outcomes: "Yes" and "No." The "Yes" outcome has outcome_id = 1.
book = OrderBook()
# Alice posts a limit buy: 10 shares of "Yes" at $0.60
order1 = BookOrder(order_id=1, user_id=100, outcome_id=1,
side="buy", price=0.60, quantity=10)
trades1 = book.submit_order(order1)
# trades1 = [] (no matching ask exists)
# Bob posts a limit sell: 5 shares of "Yes" at $0.55
order2 = BookOrder(order_id=2, user_id=200, outcome_id=1,
side="sell", price=0.55, quantity=5)
trades2 = book.submit_order(order2)
# trades2 = [TradeExecution(buy_order_id=1, sell_order_id=2,
# price=0.60, quantity=5)]
# Trade price is the maker's (resting order's) price: $0.60
# Alice's remaining: 5 shares, Bob's order is fully filled.
The key insight is that the trade executes at the maker's price (the order that was resting in the book), not the taker's price. This rewards patience and encourages liquidity provision.
📊 Real-World Application: Real prediction markets like Polymarket use a variant of this order book model, but with a CLOB (Central Limit Order Book) running on-chain or off-chain with settlement on blockchain. The fundamental matching logic, however, is the same as what we implement here.
32.3 The LMSR Automated Market Maker
While order books require sufficient trading volume to provide liquidity, an automated market maker (AMM) always stands ready to trade. Hanson's Logarithmic Market Scoring Rule (LMSR) is the gold standard for prediction market AMMs because it has provable bounded loss and well-behaved price dynamics.
32.3.1 LMSR Mathematics
The LMSR works through a cost function $C$ that determines the total amount spent by all traders combined as a function of the outstanding shares. For a market with $n$ outcomes and a liquidity parameter $b$, the cost function is:
$$C(\mathbf{q}) = b \cdot \ln\!\left(\sum_{i=1}^{n} e^{q_i / b}\right)$$
where $\mathbf{q} = (q_1, q_2, \ldots, q_n)$ is the vector of shares outstanding for each outcome.
The price of outcome $i$ at any point in time is the partial derivative of the cost function:
$$p_i(\mathbf{q}) = \frac{\partial C}{\partial q_i} = \frac{e^{q_i / b}}{\sum_{j=1}^{n} e^{q_j / b}}$$
Notice that prices always sum to 1 across all outcomes, which is exactly what we want for a probability distribution.
The cost of a trade (buying $\Delta q_i$ additional shares of outcome $i$) is the difference in the cost function before and after the trade:
$$\text{cost} = C(\mathbf{q'}) - C(\mathbf{q})$$
where $\mathbf{q'}$ is the new share vector after adding $\Delta q_i$ to outcome $i$.
💡 Intuition: The cost function is like a running tab at a bar. Every time someone buys shares, the tab goes up. The amount the tab increases for any particular purchase depends on how many shares of that outcome are already outstanding. Buying shares of a popular outcome (one with many shares already sold) is more expensive than buying shares of an unpopular one. This is how the LMSR dynamically adjusts prices.
32.3.2 The Liquidity Parameter $b$
The parameter $b$ controls how sensitive prices are to trades. A larger $b$ means prices move less per share traded, providing more liquidity but exposing the market maker to a larger potential loss.
The maximum loss for the market maker is bounded by:
$$L_{\max} = b \cdot \ln(n)$$
where $n$ is the number of outcomes. For a binary market ($n = 2$), the worst case is $b \cdot \ln(2) \approx 0.693 \cdot b$. If $b = 100$, the market maker can lose at most approximately $69.30.
⚠️ Common Pitfall: Setting $b$ too low makes prices jumpy and creates large slippage for traders. Setting $b$ too high creates deep liquidity but increases the market maker's subsidy cost. A common starting point for binary markets is to set $b$ so that buying one full share moves the price by 1-2 percentage points. For binary markets with initial prices at 0.5, this means $b \approx 50$ to $100$.
32.3.3 LMSR Implementation
Here is a complete, numerically stable implementation:
"""Logarithmic Market Scoring Rule (LMSR) automated market maker.
This module implements Hanson's LMSR with numerical stability
improvements to prevent overflow in the exponential calculations.
"""
import math
from dataclasses import dataclass, field
@dataclass
class LMSRMarketMaker:
"""An LMSR automated market maker for a prediction market.
Attributes:
b: Liquidity parameter controlling price sensitivity.
num_outcomes: Number of possible outcomes in the market.
shares: Current shares outstanding for each outcome.
"""
b: float
num_outcomes: int
shares: list[float] = field(default_factory=list)
def __post_init__(self):
"""Initialize shares to zero for each outcome if not provided."""
if not self.shares:
self.shares = [0.0] * self.num_outcomes
def cost(self, shares: list[float] | None = None) -> float:
"""Compute the LMSR cost function value.
Uses the log-sum-exp trick for numerical stability:
C(q) = b * ln(sum(exp(q_i / b)))
= b * (max_val + ln(sum(exp(q_i / b - max_val))))
Args:
shares: Share vector to evaluate. Uses current shares if None.
Returns:
The cost function value.
"""
if shares is None:
shares = self.shares
scaled = [q / self.b for q in shares]
max_val = max(scaled)
# Log-sum-exp trick: subtract max before exponentiating
exp_sum = sum(math.exp(s - max_val) for s in scaled)
return self.b * (max_val + math.log(exp_sum))
def price(self, outcome_index: int, shares: list[float] | None = None) -> float:
"""Compute the current price (instantaneous probability) for an outcome.
p_i = exp(q_i / b) / sum(exp(q_j / b))
Uses the softmax formulation with the max-subtraction trick.
Args:
outcome_index: Index of the outcome (0-indexed).
shares: Share vector to evaluate. Uses current shares if None.
Returns:
The current price of the outcome (between 0 and 1).
Raises:
IndexError: If outcome_index is out of range.
"""
if shares is None:
shares = self.shares
if outcome_index < 0 or outcome_index >= len(shares):
raise IndexError(
f"Outcome index {outcome_index} out of range "
f"for {len(shares)} outcomes"
)
scaled = [q / self.b for q in shares]
max_val = max(scaled)
exp_values = [math.exp(s - max_val) for s in scaled]
total = sum(exp_values)
return exp_values[outcome_index] / total
def all_prices(self, shares: list[float] | None = None) -> list[float]:
"""Compute prices for all outcomes simultaneously.
Args:
shares: Share vector to evaluate. Uses current shares if None.
Returns:
List of prices for each outcome, summing to 1.0.
"""
if shares is None:
shares = self.shares
scaled = [q / self.b for q in shares]
max_val = max(scaled)
exp_values = [math.exp(s - max_val) for s in scaled]
total = sum(exp_values)
return [ev / total for ev in exp_values]
def compute_trade_cost(self, outcome_index: int, num_shares: float) -> float:
"""Compute the cost of buying (or selling) shares of an outcome.
cost = C(q') - C(q)
where q' is the share vector after adding num_shares to the
specified outcome.
A positive num_shares means buying; negative means selling.
Args:
outcome_index: Index of the outcome to trade.
num_shares: Number of shares to buy (positive) or sell (negative).
Returns:
The cost of the trade. Positive means the trader pays;
negative means the trader receives.
Raises:
IndexError: If outcome_index is out of range.
"""
if outcome_index < 0 or outcome_index >= self.num_outcomes:
raise IndexError(
f"Outcome index {outcome_index} out of range "
f"for {self.num_outcomes} outcomes"
)
current_cost = self.cost()
new_shares = list(self.shares)
new_shares[outcome_index] += num_shares
new_cost = self.cost(new_shares)
return new_cost - current_cost
def execute_trade(
self, outcome_index: int, num_shares: float
) -> tuple[float, list[float]]:
"""Execute a trade by updating the share vector.
Args:
outcome_index: Index of the outcome to trade.
num_shares: Number of shares to buy (positive) or sell (negative).
Returns:
A tuple of (trade_cost, new_prices) where trade_cost is the
amount the trader pays (or receives if negative), and new_prices
is the updated price vector.
"""
trade_cost = self.compute_trade_cost(outcome_index, num_shares)
self.shares[outcome_index] += num_shares
new_prices = self.all_prices()
return trade_cost, new_prices
def max_loss(self) -> float:
"""Compute the maximum possible loss for the market maker.
For LMSR, the worst-case loss is b * ln(n) where n is the
number of outcomes. This occurs when one outcome receives all
the shares.
Returns:
The maximum possible loss.
"""
return self.b * math.log(self.num_outcomes)
32.3.4 LMSR in Action
Let us walk through a binary market to see the LMSR at work:
# Binary market: Will the project ship on time? (Yes=0, No=1)
amm = LMSRMarketMaker(b=100.0, num_outcomes=2)
# Initial prices are equal
print(amm.all_prices())
# [0.5, 0.5]
# Alice buys 10 shares of "Yes"
cost, prices = amm.execute_trade(outcome_index=0, num_shares=10)
print(f"Alice pays: ${cost:.4f}")
print(f"New prices: Yes={prices[0]:.4f}, No={prices[1]:.4f}")
# Alice pays: $5.1244
# New prices: Yes=0.5250, No=0.4750
# Bob buys 20 shares of "No"
cost, prices = amm.execute_trade(outcome_index=1, num_shares=20)
print(f"Bob pays: ${cost:.4f}")
print(f"New prices: Yes={prices[0]:.4f}, No={prices[1]:.4f}")
# Bob pays: $9.7509
# New prices: Yes=0.4750, No=0.5250
Notice how buying shares of one outcome increases its price and decreases the price of other outcomes. The prices always sum to 1.0, maintaining their interpretation as probabilities.
🎓 Advanced: The LMSR belongs to a family of cost-function-based market makers. Other members include the Quadratic Market Scoring Rule (QSR) and the Constant Product Market Maker (CPMM, used by Uniswap). The LMSR is particularly well-suited for prediction markets because its prices directly represent probabilities. The CPMM, while popular in DeFi, does not have this property without additional normalization.
32.3.5 Numerical Stability Considerations
When working with exponentials, we must be careful about numerical overflow and underflow. If $q_i / b$ is very large (say, > 700), then $e^{q_i/b}$ will overflow a 64-bit float. The log-sum-exp trick prevents this by factoring out the maximum value:
$$\ln\!\left(\sum_i e^{x_i}\right) = x_{\max} + \ln\!\left(\sum_i e^{x_i - x_{\max}}\right)$$
Since $x_i - x_{\max} \leq 0$ for all $i$, the exponentials are always in $[0, 1]$, preventing overflow. This is already incorporated in our implementation above.
32.4 REST API Design
With the order book engine and LMSR AMM implemented, we now expose them through a RESTful API. Good API design makes the platform intuitive for frontend developers and third-party integrators.
32.4.1 API Endpoint Overview
| Method | Endpoint | Description |
|---|---|---|
POST |
/api/v1/auth/register |
Register a new user |
POST |
/api/v1/auth/login |
Authenticate and receive JWT |
GET |
/api/v1/markets |
List all markets |
POST |
/api/v1/markets |
Create a new market |
GET |
/api/v1/markets/{id} |
Get market details |
GET |
/api/v1/markets/{id}/prices |
Get current prices |
POST |
/api/v1/markets/{id}/resolve |
Resolve a market |
POST |
/api/v1/trading/orders |
Place an order |
DELETE |
/api/v1/trading/orders/{id} |
Cancel an order |
GET |
/api/v1/trading/positions |
Get user's positions |
GET |
/api/v1/trading/orders |
Get user's open orders |
GET |
/api/v1/trading/history |
Get user's trade history |
32.4.2 Pydantic Schemas
FastAPI uses Pydantic for request validation and response serialization. Defining clear schemas ensures that API consumers always know what to send and what to expect.
"""Pydantic schemas for API request validation and response serialization."""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field, field_validator
class UserCreate(BaseModel):
"""Schema for user registration requests."""
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")
password: str = Field(..., min_length=8, max_length=100)
class UserResponse(BaseModel):
"""Schema for user data in API responses."""
id: int
username: str
email: str
balance: float
is_active: bool
created_at: datetime
class Config:
from_attributes = True
class TokenResponse(BaseModel):
"""Schema for JWT authentication responses."""
access_token: str
token_type: str = "bearer"
class MarketCreate(BaseModel):
"""Schema for creating a new prediction market."""
title: str = Field(..., min_length=10, max_length=200)
description: str = Field(default="")
market_type: str = Field(..., pattern=r"^(order_book|lmsr)$")
outcomes: list[str] = Field(..., min_length=2, max_length=10)
liquidity_parameter: float = Field(default=100.0, gt=0)
resolution_source: str = Field(default="")
closes_at: datetime
@field_validator("outcomes")
@classmethod
def outcomes_must_be_unique(cls, v: list[str]) -> list[str]:
"""Validate that all outcome names are unique."""
if len(v) != len(set(v)):
raise ValueError("Outcome names must be unique")
return v
class MarketResponse(BaseModel):
"""Schema for market data in API responses."""
id: int
title: str
description: str
market_type: str
status: str
creator_id: int
liquidity_parameter: float
closes_at: datetime
created_at: datetime
outcomes: list[dict]
prices: list[float]
class Config:
from_attributes = True
class OrderCreate(BaseModel):
"""Schema for placing a new order."""
market_id: int
outcome_id: int
side: str = Field(..., pattern=r"^(buy|sell)$")
order_type: str = Field(..., pattern=r"^(limit|market)$")
price: Optional[float] = Field(default=None, gt=0, le=1.0)
quantity: float = Field(..., gt=0)
@field_validator("price")
@classmethod
def price_required_for_limit(cls, v, info):
"""Validate that limit orders have a price."""
if info.data.get("order_type") == "limit" and v is None:
raise ValueError("Price is required for limit orders")
return v
class OrderResponse(BaseModel):
"""Schema for order data in API responses."""
id: int
market_id: int
outcome_id: int
side: str
order_type: str
price: Optional[float]
quantity: float
filled_quantity: float
status: str
created_at: datetime
class Config:
from_attributes = True
class TradeResponse(BaseModel):
"""Schema for trade data in API responses."""
id: int
market_id: int
outcome_id: int
price: float
quantity: float
is_amm_trade: bool
executed_at: datetime
class Config:
from_attributes = True
class PositionResponse(BaseModel):
"""Schema for position data in API responses."""
market_id: int
outcome_id: int
outcome_name: str
shares: float
average_cost: float
current_price: float
unrealized_pnl: float
class Config:
from_attributes = True
class MarketResolution(BaseModel):
"""Schema for resolving a market."""
winning_outcome_id: int
32.4.3 Markets Router
The markets router handles creating, listing, and resolving prediction markets:
"""Market management endpoints."""
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from database import get_db
from models.schemas import MarketCreate, MarketResponse, MarketResolution
from services.auth_service import get_current_user
from services.market_service import MarketService
router = APIRouter()
@router.post("/", response_model=MarketResponse, status_code=201)
async def create_market(
market_data: MarketCreate,
current_user=Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Create a new prediction market.
Args:
market_data: Market configuration including title, outcomes, and type.
current_user: The authenticated user creating the market.
db: Database session.
Returns:
The created market with its initial prices.
Raises:
HTTPException: If the market closing time is in the past.
"""
if market_data.closes_at <= datetime.utcnow():
raise HTTPException(
status_code=400,
detail="Market closing time must be in the future"
)
service = MarketService(db)
market = service.create_market(market_data, current_user.id)
return market
@router.get("/", response_model=list[MarketResponse])
async def list_markets(
status: Optional[str] = Query(None, pattern=r"^(open|closed|resolved)$"),
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List prediction markets with optional filtering.
Args:
status: Filter by market status (open, closed, resolved).
skip: Number of markets to skip for pagination.
limit: Maximum number of markets to return.
db: Database session.
Returns:
List of markets matching the filter criteria.
"""
service = MarketService(db)
return service.list_markets(status=status, skip=skip, limit=limit)
@router.get("/{market_id}", response_model=MarketResponse)
async def get_market(market_id: int, db: Session = Depends(get_db)):
"""Get detailed information about a specific market.
Args:
market_id: The ID of the market to retrieve.
db: Database session.
Returns:
The market details including current prices.
Raises:
HTTPException: If the market is not found.
"""
service = MarketService(db)
market = service.get_market(market_id)
if not market:
raise HTTPException(status_code=404, detail="Market not found")
return market
@router.get("/{market_id}/prices")
async def get_market_prices(market_id: int, db: Session = Depends(get_db)):
"""Get the current prices for all outcomes in a market.
For LMSR markets, prices are computed from the cost function.
For order book markets, prices are the midpoint of best bid and ask.
Args:
market_id: The ID of the market.
db: Database session.
Returns:
Dictionary mapping outcome names to their current prices.
"""
service = MarketService(db)
return service.get_prices(market_id)
@router.post("/{market_id}/resolve", response_model=MarketResponse)
async def resolve_market(
market_id: int,
resolution: MarketResolution,
current_user=Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Resolve a market by declaring the winning outcome.
Only the market creator or an admin can resolve a market.
Resolution triggers payout of winning positions.
Args:
market_id: The ID of the market to resolve.
resolution: The winning outcome specification.
current_user: The authenticated user.
db: Database session.
Returns:
The updated market with resolution information.
Raises:
HTTPException: If the user is not authorized or the market
cannot be resolved.
"""
service = MarketService(db)
market = service.get_market(market_id)
if not market:
raise HTTPException(status_code=404, detail="Market not found")
if market.creator_id != current_user.id:
raise HTTPException(
status_code=403,
detail="Only the market creator can resolve a market"
)
if market.status == "resolved":
raise HTTPException(
status_code=400,
detail="Market is already resolved"
)
return service.resolve_market(market_id, resolution.winning_outcome_id)
32.4.4 Trading Router
The trading router handles order placement, cancellation, and position queries:
"""Trading endpoints for order management and position tracking."""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from database import get_db
from models.schemas import OrderCreate, OrderResponse, PositionResponse, TradeResponse
from services.auth_service import get_current_user
from services.trading_service import TradingService
router = APIRouter()
@router.post("/orders", response_model=OrderResponse, status_code=201)
async def place_order(
order_data: OrderCreate,
current_user=Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Place a new order in a prediction market.
For order book markets, this submits a limit or market order.
For LMSR markets, this executes a trade against the AMM.
Args:
order_data: The order specification.
current_user: The authenticated user placing the order.
db: Database session.
Returns:
The created order with its execution status.
Raises:
HTTPException: If the market is not open or the user has
insufficient balance.
"""
service = TradingService(db)
try:
order = service.place_order(order_data, current_user.id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return order
@router.delete("/orders/{order_id}")
async def cancel_order(
order_id: int,
current_user=Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Cancel an open order.
Args:
order_id: The ID of the order to cancel.
current_user: The authenticated user.
db: Database session.
Returns:
Confirmation of cancellation.
Raises:
HTTPException: If the order is not found or does not belong
to the current user.
"""
service = TradingService(db)
success = service.cancel_order(order_id, current_user.id)
if not success:
raise HTTPException(
status_code=404,
detail="Order not found or cannot be cancelled"
)
return {"status": "cancelled", "order_id": order_id}
@router.get("/positions", response_model=list[PositionResponse])
async def get_positions(
current_user=Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Get all positions held by the current user.
Returns positions with current prices and unrealized P&L.
Args:
current_user: The authenticated user.
db: Database session.
Returns:
List of the user's positions across all markets.
"""
service = TradingService(db)
return service.get_positions(current_user.id)
@router.get("/history", response_model=list[TradeResponse])
async def get_trade_history(
current_user=Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Get the trade history for the current user.
Args:
current_user: The authenticated user.
db: Database session.
Returns:
List of executed trades involving the user.
"""
service = TradingService(db)
return service.get_trade_history(current_user.id)
✅ Best Practice: Notice how the router functions are thin wrappers that delegate all business logic to the service layer. This separation makes it easy to test the business logic independently and swap out the web framework if needed.
32.5 User Authentication with JWT
Security is a core requirement for any platform that manages user funds. We implement JSON Web Token (JWT) authentication, which is stateless and scales well.
32.5.1 How JWT Authentication Works
The JWT workflow is straightforward:
- A user registers with a username, email, and password. The password is hashed using bcrypt before storage.
- To log in, the user sends their credentials. If valid, the server returns a signed JWT.
- For subsequent requests, the client includes the JWT in the
Authorization: Bearer <token>header. - The server verifies the JWT signature and extracts the user identity without needing to query a session store.
A JWT consists of three parts: a header (algorithm and token type), a payload (user ID, expiration time), and a signature (HMAC-SHA256 over the header and payload using a secret key).
32.5.2 Authentication Service Implementation
"""Authentication service with JWT token management and password hashing."""
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from database import get_db
from models.database_models import User
# Configuration
SECRET_KEY = "your-secret-key-change-in-production" # Use env var in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
def hash_password(password: str) -> str:
"""Hash a password using bcrypt.
Args:
password: The plaintext password to hash.
Returns:
The bcrypt hash of the password.
"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash.
Args:
plain_password: The plaintext password to verify.
hashed_password: The stored bcrypt hash.
Returns:
True if the password matches, False otherwise.
"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(
data: dict, expires_delta: Optional[timedelta] = None
) -> str:
"""Create a JWT access token.
Args:
data: The payload data to encode in the token.
expires_delta: Optional custom expiration time.
Returns:
The encoded JWT string.
"""
to_encode = data.copy()
expire = datetime.utcnow() + (
expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
) -> User:
"""Extract and validate the current user from a JWT token.
This function is used as a FastAPI dependency to protect endpoints
that require authentication.
Args:
token: The JWT token from the Authorization header.
db: Database session.
Returns:
The authenticated User object.
Raises:
HTTPException: If the token is invalid or the user is not found.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise credentials_exception
return user
32.5.3 Authentication Router
"""Authentication endpoints for registration and login."""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from database import get_db
from models.database_models import User
from models.schemas import UserCreate, UserResponse, TokenResponse
from services.auth_service import (
hash_password, verify_password, create_access_token, get_current_user
)
router = APIRouter()
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Register a new user account.
Creates a new user with a hashed password and initial balance.
Args:
user_data: Registration information (username, email, password).
db: Database session.
Returns:
The created user profile.
Raises:
HTTPException: If the username or email already exists.
"""
# Check for existing user
existing = db.query(User).filter(
(User.username == user_data.username) | (User.email == user_data.email)
).first()
if existing:
raise HTTPException(
status_code=400,
detail="Username or email already registered"
)
user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hash_password(user_data.password),
)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.post("/login", response_model=TokenResponse)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
):
"""Authenticate a user and return a JWT access token.
Args:
form_data: OAuth2 form with username and password fields.
db: Database session.
Returns:
JWT access token for authenticating subsequent requests.
Raises:
HTTPException: If credentials are invalid.
"""
user = db.query(User).filter(
User.username == form_data.username
).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": user.id})
return TokenResponse(access_token=access_token)
@router.get("/me", response_model=UserResponse)
async def get_me(current_user=Depends(get_current_user)):
"""Get the current authenticated user's profile.
Args:
current_user: The authenticated user (injected by dependency).
Returns:
The user's profile information.
"""
return current_user
⚠️ Common Pitfall: Never store the
SECRET_KEYin source code. In production, load it from an environment variable or a secret management service like HashiCorp Vault. The example above uses a hardcoded key purely for instructional clarity.
32.6 Market Resolution and Payout Mechanics
Market resolution is the critical moment when a prediction market determines the outcome and distributes funds to winning positions. Getting this right requires careful handling of edge cases and clear audit trails.
32.6.1 The Resolution Lifecycle
A market passes through these states:
OPEN ──(closes_at passes)──> CLOSED ──(admin resolves)──> RESOLVED
- Open: Users can place orders, prices update dynamically.
- Closed: No new orders are accepted. Existing positions are frozen. The market awaits resolution.
- Resolved: The winning outcome is declared. Winning positions pay out $1 per share. Losing positions pay out $0.
32.6.2 Resolution Service
"""Market resolution and payout service."""
from datetime import datetime
from sqlalchemy.orm import Session
from models.database_models import (
Market, MarketStatus, Outcome, Position, User, Trade
)
class ResolutionService:
"""Handles market resolution and payout distribution.
Attributes:
db: SQLAlchemy database session.
"""
def __init__(self, db: Session):
"""Initialize the resolution service.
Args:
db: Active database session.
"""
self.db = db
def resolve_market(
self, market_id: int, winning_outcome_id: int
) -> dict:
"""Resolve a market and distribute payouts.
This method performs the following steps:
1. Validates the market can be resolved.
2. Marks the market as resolved with the winning outcome.
3. Pays out $1 per share to holders of the winning outcome.
4. Sets losing positions to zero value.
5. Records the resolution in the audit trail.
Args:
market_id: The ID of the market to resolve.
winning_outcome_id: The ID of the winning outcome.
Returns:
Dictionary with resolution summary including total payouts.
Raises:
ValueError: If the market cannot be resolved or the outcome
is invalid.
"""
market = self.db.query(Market).filter(Market.id == market_id).first()
if not market:
raise ValueError(f"Market {market_id} not found")
if market.status == MarketStatus.RESOLVED:
raise ValueError(f"Market {market_id} is already resolved")
# Verify the winning outcome belongs to this market
outcome = self.db.query(Outcome).filter(
Outcome.id == winning_outcome_id,
Outcome.market_id == market_id
).first()
if not outcome:
raise ValueError(
f"Outcome {winning_outcome_id} not found in market {market_id}"
)
# Update market status
market.status = MarketStatus.RESOLVED
market.winning_outcome_id = winning_outcome_id
market.resolved_at = datetime.utcnow()
# Process payouts
total_payout = 0.0
winners_count = 0
# Get all positions for this market
positions = self.db.query(Position).filter(
Position.market_id == market_id,
Position.shares > 0
).all()
for position in positions:
user = self.db.query(User).filter(
User.id == position.user_id
).first()
if position.outcome_id == winning_outcome_id:
# Winning position: pay $1 per share
payout = position.shares * 1.0
user.balance += payout
total_payout += payout
winners_count += 1
# Losing positions get nothing; shares become worthless
self.db.commit()
return {
"market_id": market_id,
"winning_outcome": outcome.name,
"total_payout": total_payout,
"winners_count": winners_count,
"resolved_at": market.resolved_at.isoformat(),
}
def void_market(self, market_id: int) -> dict:
"""Void a market, refunding all participants.
This is used when a market cannot be properly resolved (e.g.,
ambiguous outcome, data source unavailable). All trades are
unwound and participants receive refunds based on their net
cost basis.
Args:
market_id: The ID of the market to void.
Returns:
Dictionary with void summary including total refunds.
"""
market = self.db.query(Market).filter(Market.id == market_id).first()
if not market:
raise ValueError(f"Market {market_id} not found")
total_refund = 0.0
positions = self.db.query(Position).filter(
Position.market_id == market_id,
Position.shares > 0
).all()
for position in positions:
user = self.db.query(User).filter(
User.id == position.user_id
).first()
# Refund the average cost basis
refund = position.shares * position.average_cost
user.balance += refund
total_refund += refund
position.shares = 0.0
market.status = MarketStatus.RESOLVED
market.resolved_at = datetime.utcnow()
self.db.commit()
return {
"market_id": market_id,
"status": "voided",
"total_refund": total_refund,
}
32.6.3 Edge Cases in Resolution
Market resolution involves several edge cases that require careful thought:
Partial resolution: Some markets may have outcomes that partially apply (e.g., "Will the temperature exceed 30C on at least 3 days this week?"). For binary markets, the resolution is straightforward, but for multi-outcome markets, you may need to implement proportional payouts.
Disputed resolution: What happens when participants disagree with the resolution? Production platforms implement appeal mechanisms, multi-oracle resolution (multiple independent judges), or community voting. Our implementation uses a single-resolver model for simplicity, but you should plan for disputes.
Orphaned positions: If a user's account is deactivated before resolution, their positions still need to be settled. The payout should be credited to their account regardless of active status.
📊 Real-World Application: Polymarket uses a multi-step resolution process involving a primary oracle (UMA's optimistic oracle), a dispute period, and a final arbitration mechanism. This multi-layered approach reduces the risk of incorrect resolution. Our simplified model captures the core logic, while production systems add these additional safeguards.
32.7 Trading Service Integration
The trading service is the glue that connects the API routers to the pricing engines. It determines whether to route orders through the order book or the LMSR AMM based on the market type.
32.7.1 Trading Service Implementation
"""Trading service that routes orders to the appropriate pricing engine."""
from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Session
from models.database_models import (
Market, MarketType, MarketStatus, Order, OrderSide, OrderType,
OrderStatus, Trade, Position, User, Outcome
)
from engines.order_book import OrderBook, BookOrder
from engines.lmsr import LMSRMarketMaker
# In-memory order books (keyed by market_id)
# In production, use Redis or a persistent store
_order_books: dict[int, OrderBook] = {}
_lmsr_instances: dict[int, LMSRMarketMaker] = {}
class TradingService:
"""Manages order placement, execution, and position tracking.
Routes orders to either the order book engine or the LMSR AMM
based on the market's configured type.
Attributes:
db: SQLAlchemy database session.
"""
def __init__(self, db: Session):
"""Initialize the trading service.
Args:
db: Active database session.
"""
self.db = db
def place_order(self, order_data, user_id: int) -> Order:
"""Place an order and execute it against the appropriate engine.
Args:
order_data: Pydantic schema with order details.
user_id: The ID of the user placing the order.
Returns:
The created Order database record.
Raises:
ValueError: If the market is not open, the user has insufficient
balance, or the order parameters are invalid.
"""
# Validate market
market = self.db.query(Market).filter(
Market.id == order_data.market_id
).first()
if not market:
raise ValueError("Market not found")
if market.status != MarketStatus.OPEN:
raise ValueError("Market is not open for trading")
if market.closes_at <= datetime.utcnow():
raise ValueError("Market has passed its closing time")
# Validate user balance for buy orders
user = self.db.query(User).filter(User.id == user_id).first()
if order_data.side == "buy":
max_cost = self._estimate_max_cost(market, order_data)
if user.balance < max_cost:
raise ValueError(
f"Insufficient balance. Required: {max_cost:.2f}, "
f"Available: {user.balance:.2f}"
)
# Create the order record
db_order = Order(
user_id=user_id,
market_id=order_data.market_id,
outcome_id=order_data.outcome_id,
side=OrderSide(order_data.side),
order_type=OrderType(order_data.order_type),
price=order_data.price,
quantity=order_data.quantity,
)
self.db.add(db_order)
self.db.flush() # Get the order ID
# Route to the appropriate engine
if market.market_type == MarketType.LMSR:
self._execute_lmsr_trade(db_order, market, user)
else:
self._execute_order_book_trade(db_order, market, user)
self.db.commit()
return db_order
def _execute_lmsr_trade(
self, order: Order, market: Market, user: User
) -> None:
"""Execute a trade against the LMSR automated market maker.
Args:
order: The order to execute.
market: The market to trade in.
user: The user placing the order.
"""
# Get or create LMSR instance
if market.id not in _lmsr_instances:
outcomes = self.db.query(Outcome).filter(
Outcome.market_id == market.id
).order_by(Outcome.id).all()
shares = [o.shares_outstanding for o in outcomes]
_lmsr_instances[market.id] = LMSRMarketMaker(
b=market.liquidity_parameter,
num_outcomes=len(outcomes),
shares=shares,
)
amm = _lmsr_instances[market.id]
outcomes = self.db.query(Outcome).filter(
Outcome.market_id == market.id
).order_by(Outcome.id).all()
# Find outcome index
outcome_index = None
for i, o in enumerate(outcomes):
if o.id == order.outcome_id:
outcome_index = i
break
if outcome_index is None:
raise ValueError("Invalid outcome for this market")
# Determine shares to trade
shares_to_trade = order.quantity
if order.side == OrderSide.SELL:
shares_to_trade = -shares_to_trade
# Compute and execute trade
trade_cost = amm.compute_trade_cost(outcome_index, shares_to_trade)
# For buys, check that cost doesn't exceed limit price
if order.side == OrderSide.BUY and order.price is not None:
avg_price = trade_cost / order.quantity
if avg_price > order.price:
raise ValueError(
f"Trade cost per share ({avg_price:.4f}) exceeds "
f"limit price ({order.price})"
)
# Execute the trade
amm.shares[outcome_index] += shares_to_trade
# Update database
user.balance -= trade_cost
outcomes[outcome_index].shares_outstanding = amm.shares[outcome_index]
# Record the trade
avg_price = abs(trade_cost / order.quantity)
trade = Trade(
market_id=market.id,
outcome_id=order.outcome_id,
buyer_order_id=order.id if order.side == OrderSide.BUY else None,
seller_order_id=order.id if order.side == OrderSide.SELL else None,
price=avg_price,
quantity=order.quantity,
is_amm_trade=True,
)
self.db.add(trade)
# Update position
self._update_position(
user_id=order.user_id,
market_id=market.id,
outcome_id=order.outcome_id,
shares_delta=shares_to_trade,
cost=trade_cost,
)
order.filled_quantity = order.quantity
order.status = OrderStatus.FILLED
def _execute_order_book_trade(
self, order: Order, market: Market, user: User
) -> None:
"""Execute a trade through the order book matching engine.
Args:
order: The order to execute.
market: The market to trade in.
user: The user placing the order.
"""
if market.id not in _order_books:
_order_books[market.id] = OrderBook()
book = _order_books[market.id]
book_order = BookOrder(
order_id=order.id,
user_id=order.user_id,
outcome_id=order.outcome_id,
side=order.side.value,
price=order.price,
quantity=order.quantity,
)
executions = book.submit_order(book_order)
# Process each execution
total_filled = 0.0
for execution in executions:
trade = Trade(
market_id=market.id,
outcome_id=order.outcome_id,
buyer_order_id=execution.buy_order_id,
seller_order_id=execution.sell_order_id,
price=execution.price,
quantity=execution.quantity,
is_amm_trade=False,
)
self.db.add(trade)
total_filled += execution.quantity
# Update buyer position
self._update_position(
user_id=(
order.user_id
if order.side == OrderSide.BUY
else self._get_order_user(execution.buy_order_id)
),
market_id=market.id,
outcome_id=order.outcome_id,
shares_delta=execution.quantity,
cost=execution.price * execution.quantity,
)
# Update seller position
seller_user_id = (
order.user_id
if order.side == OrderSide.SELL
else self._get_order_user(execution.sell_order_id)
)
self._update_position(
user_id=seller_user_id,
market_id=market.id,
outcome_id=order.outcome_id,
shares_delta=-execution.quantity,
cost=-execution.price * execution.quantity,
)
# Transfer funds
buyer = self.db.query(User).filter(
User.id == (
order.user_id
if order.side == OrderSide.BUY
else self._get_order_user(execution.buy_order_id)
)
).first()
seller = self.db.query(User).filter(
User.id == seller_user_id
).first()
buyer.balance -= execution.price * execution.quantity
seller.balance += execution.price * execution.quantity
# Update order status
order.filled_quantity = total_filled
if total_filled >= order.quantity:
order.status = OrderStatus.FILLED
elif total_filled > 0:
order.status = OrderStatus.PARTIALLY_FILLED
else:
order.status = OrderStatus.PENDING
def _update_position(
self,
user_id: int,
market_id: int,
outcome_id: int,
shares_delta: float,
cost: float,
) -> None:
"""Update a user's position in an outcome.
Creates the position record if it doesn't exist, otherwise
updates the existing position with the new shares and
recalculates the average cost.
Args:
user_id: The user's ID.
market_id: The market ID.
outcome_id: The outcome ID.
shares_delta: Change in shares (positive for buy, negative for sell).
cost: Total cost of the trade.
"""
position = self.db.query(Position).filter(
Position.user_id == user_id,
Position.market_id == market_id,
Position.outcome_id == outcome_id,
).first()
if not position:
position = Position(
user_id=user_id,
market_id=market_id,
outcome_id=outcome_id,
shares=0.0,
average_cost=0.0,
)
self.db.add(position)
if shares_delta > 0:
# Buying: update average cost
total_cost = position.average_cost * position.shares + cost
position.shares += shares_delta
if position.shares > 0:
position.average_cost = total_cost / position.shares
else:
# Selling: reduce position
position.shares += shares_delta # shares_delta is negative
if position.shares <= 0:
position.shares = 0.0
position.average_cost = 0.0
def _get_order_user(self, order_id: int) -> int:
"""Get the user ID associated with an order.
Args:
order_id: The order's ID.
Returns:
The user ID who placed the order.
"""
order = self.db.query(Order).filter(Order.id == order_id).first()
return order.user_id if order else 0
def _estimate_max_cost(self, market: Market, order_data) -> float:
"""Estimate the maximum cost of an order for balance checking.
Args:
market: The market being traded.
order_data: The order specification.
Returns:
The estimated maximum cost.
"""
if order_data.order_type == "limit" and order_data.price:
return order_data.price * order_data.quantity
# For market orders, assume worst case of $1 per share
return order_data.quantity
def cancel_order(self, order_id: int, user_id: int) -> bool:
"""Cancel an open order.
Args:
order_id: The ID of the order to cancel.
user_id: The ID of the user requesting cancellation.
Returns:
True if the order was cancelled, False if not found or
not cancellable.
"""
order = self.db.query(Order).filter(
Order.id == order_id,
Order.user_id == user_id,
).first()
if not order:
return False
if order.status not in (OrderStatus.PENDING, OrderStatus.PARTIALLY_FILLED):
return False
market = self.db.query(Market).filter(Market.id == order.market_id).first()
if market.market_type == MarketType.ORDER_BOOK:
if market.id in _order_books:
_order_books[market.id].cancel_order(order_id)
order.status = OrderStatus.CANCELLED
self.db.commit()
return True
def get_positions(self, user_id: int) -> list[dict]:
"""Get all positions for a user with current prices and P&L.
Args:
user_id: The user's ID.
Returns:
List of position dictionaries with current market data.
"""
positions = self.db.query(Position).filter(
Position.user_id == user_id,
Position.shares > 0,
).all()
result = []
for pos in positions:
outcome = self.db.query(Outcome).filter(
Outcome.id == pos.outcome_id
).first()
market = self.db.query(Market).filter(
Market.id == pos.market_id
).first()
# Get current price
current_price = self._get_current_price(market, pos.outcome_id)
unrealized_pnl = (current_price - pos.average_cost) * pos.shares
result.append({
"market_id": pos.market_id,
"outcome_id": pos.outcome_id,
"outcome_name": outcome.name if outcome else "Unknown",
"shares": pos.shares,
"average_cost": pos.average_cost,
"current_price": current_price,
"unrealized_pnl": unrealized_pnl,
})
return result
def _get_current_price(self, market: Market, outcome_id: int) -> float:
"""Get the current price of an outcome.
Args:
market: The market to check.
outcome_id: The outcome to price.
Returns:
The current price of the outcome.
"""
if market.market_type == MarketType.LMSR:
if market.id in _lmsr_instances:
amm = _lmsr_instances[market.id]
outcomes = self.db.query(Outcome).filter(
Outcome.market_id == market.id
).order_by(Outcome.id).all()
for i, o in enumerate(outcomes):
if o.id == outcome_id:
return amm.price(i)
return 0.5 # Default if AMM not initialized
else:
if market.id in _order_books:
book = _order_books[market.id]
bid = book.get_best_bid(outcome_id)
ask = book.get_best_ask(outcome_id)
if bid is not None and ask is not None:
return (bid + ask) / 2
elif bid is not None:
return bid
elif ask is not None:
return ask
return 0.5
def get_trade_history(self, user_id: int) -> list[Trade]:
"""Get all trades involving a user.
Args:
user_id: The user's ID.
Returns:
List of Trade records.
"""
user_orders = self.db.query(Order.id).filter(
Order.user_id == user_id
).subquery()
trades = self.db.query(Trade).filter(
(Trade.buyer_order_id.in_(user_orders)) |
(Trade.seller_order_id.in_(user_orders))
).order_by(Trade.executed_at.desc()).all()
return trades
💡 Intuition: The trading service acts as a dispatcher. When an order arrives, it checks the market type label and sends the order to the right engine—either the order book or the LMSR. This is the Strategy Pattern from object-oriented design: the algorithm (pricing mechanism) is selected at runtime based on configuration.
32.8 Basic Frontend Considerations
While a full frontend framework (React, Vue, Svelte) is beyond our scope, we can build a functional interface using HTML, JavaScript, and FastAPI's built-in template rendering. This section covers the key patterns.
32.8.1 Serving Static Files and Templates
FastAPI can serve HTML templates using Jinja2:
"""Frontend template configuration."""
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
32.8.2 Market Dashboard Template
Here is a minimal HTML template that displays markets and their current prices:
<!-- templates/dashboard.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Prediction Market Platform</title>
<style>
body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
max-width: 1200px; margin: 0 auto; padding: 20px;
background-color: #f5f5f5; }
.market-card { background: white; border-radius: 8px;
padding: 20px; margin: 15px 0;
box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.market-title { font-size: 1.3em; font-weight: bold;
color: #333; margin-bottom: 10px; }
.outcome-bar { display: flex; align-items: center;
margin: 8px 0; }
.outcome-name { width: 150px; font-weight: 500; }
.price-bar { flex: 1; height: 30px; background: #e0e0e0;
border-radius: 4px; overflow: hidden;
position: relative; }
.price-fill { height: 100%; background: linear-gradient(
90deg, #4CAF50, #8BC34A);
transition: width 0.3s ease; }
.price-label { position: absolute; right: 10px; top: 5px;
font-weight: bold; }
.trade-btn { padding: 8px 16px; border: none;
border-radius: 4px; cursor: pointer;
font-weight: bold; margin: 0 5px; }
.buy-btn { background: #4CAF50; color: white; }
.sell-btn { background: #f44336; color: white; }
.status-badge { display: inline-block; padding: 3px 10px;
border-radius: 12px; font-size: 0.8em;
font-weight: bold; }
.status-open { background: #E8F5E9; color: #2E7D32; }
.status-closed { background: #FFF3E0; color: #E65100; }
.status-resolved { background: #E3F2FD; color: #1565C0; }
</style>
</head>
<body>
<h1>Prediction Market Platform</h1>
<div id="markets-container">Loading markets...</div>
<script>
const API_BASE = '/api/v1';
let authToken = localStorage.getItem('token');
async function fetchMarkets() {
const response = await fetch(`${API_BASE}/markets`);
const markets = await response.json();
renderMarkets(markets);
}
function renderMarkets(markets) {
const container = document.getElementById('markets-container');
container.innerHTML = markets.map(market => `
<div class="market-card">
<div class="market-title">${market.title}
<span class="status-badge status-${market.status}">
${market.status.toUpperCase()}
</span>
</div>
<p>${market.description}</p>
${market.outcomes.map((outcome, i) => `
<div class="outcome-bar">
<span class="outcome-name">${outcome.name}</span>
<div class="price-bar">
<div class="price-fill"
style="width: ${(market.prices[i] * 100).toFixed(1)}%">
</div>
<span class="price-label">
${(market.prices[i] * 100).toFixed(1)}%
</span>
</div>
<button class="trade-btn buy-btn"
onclick="trade(${market.id}, ${outcome.id}, 'buy')">
Buy
</button>
<button class="trade-btn sell-btn"
onclick="trade(${market.id}, ${outcome.id}, 'sell')">
Sell
</button>
</div>
`).join('')}
<small>Closes: ${new Date(market.closes_at).toLocaleString()}</small>
</div>
`).join('');
}
async function trade(marketId, outcomeId, side) {
if (!authToken) {
alert('Please log in first');
return;
}
const quantity = prompt('How many shares?', '10');
if (!quantity) return;
const response = await fetch(`${API_BASE}/trading/orders`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${authToken}`
},
body: JSON.stringify({
market_id: marketId,
outcome_id: outcomeId,
side: side,
order_type: 'market',
quantity: parseFloat(quantity)
})
});
if (response.ok) {
alert('Order placed successfully!');
fetchMarkets(); // Refresh prices
} else {
const error = await response.json();
alert(`Error: ${error.detail}`);
}
}
// Load markets on page load
fetchMarkets();
// Refresh every 5 seconds
setInterval(fetchMarkets, 5000);
</script>
</body>
</html>
32.8.3 Frontend-Backend Communication Patterns
When building the frontend, keep these patterns in mind:
-
Token management: Store the JWT in
localStorage(acceptable for demos) orhttpOnlycookies (better for production). Include it in every API request'sAuthorizationheader. -
Optimistic updates: When a user places an order, immediately update the UI before the server confirms. If the server returns an error, roll back the UI change. This makes the interface feel responsive.
-
Real-time updates: For a production platform, use WebSockets to push price updates to clients instead of polling. FastAPI supports WebSockets natively:
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/markets/{market_id}")
async def market_websocket(websocket: WebSocket, market_id: int):
"""Stream real-time price updates for a market.
Args:
websocket: The WebSocket connection.
market_id: The market to subscribe to.
"""
await websocket.accept()
try:
while True:
# In production, subscribe to a message broker
prices = get_current_prices(market_id)
await websocket.send_json({
"market_id": market_id,
"prices": prices,
"timestamp": datetime.utcnow().isoformat(),
})
await asyncio.sleep(1)
except WebSocketDisconnect:
pass
✅ Best Practice: Design your frontend with the API contract first. Use FastAPI's automatic OpenAPI documentation (available at
/docs) to explore and test endpoints before writing any frontend code. This approach is sometimes called "API-first development."
32.9 Putting It All Together
Let us walk through a complete end-to-end workflow that exercises every component of our platform.
32.9.1 End-to-End Demo
"""End-to-end demonstration of the prediction market platform."""
import requests
BASE_URL = "http://localhost:8000/api/v1"
# Step 1: Register two users
alice = requests.post(f"{BASE_URL}/auth/register", json={
"username": "alice",
"email": "alice@example.com",
"password": "securepassword123",
}).json()
print(f"Alice registered with ID: {alice['id']}")
bob = requests.post(f"{BASE_URL}/auth/register", json={
"username": "bob",
"email": "bob@example.com",
"password": "anotherpassword456",
}).json()
print(f"Bob registered with ID: {bob['id']}")
# Step 2: Login to get JWT tokens
alice_token = requests.post(f"{BASE_URL}/auth/login", data={
"username": "alice",
"password": "securepassword123",
}).json()["access_token"]
bob_token = requests.post(f"{BASE_URL}/auth/login", data={
"username": "bob",
"password": "anotherpassword456",
}).json()["access_token"]
# Step 3: Alice creates an LMSR market
market = requests.post(
f"{BASE_URL}/markets",
json={
"title": "Will Python 4.0 be released before 2030?",
"description": "Resolves YES if Python 4.0 is officially released.",
"market_type": "lmsr",
"outcomes": ["Yes", "No"],
"liquidity_parameter": 100.0,
"resolution_source": "https://python.org/downloads/",
"closes_at": "2029-12-31T23:59:59",
},
headers={"Authorization": f"Bearer {alice_token}"},
).json()
print(f"Market created: {market['title']}")
print(f"Initial prices: {market['prices']}")
# Step 4: Bob buys 20 shares of "Yes"
order = requests.post(
f"{BASE_URL}/trading/orders",
json={
"market_id": market["id"],
"outcome_id": market["outcomes"][0]["id"],
"side": "buy",
"order_type": "market",
"quantity": 20,
},
headers={"Authorization": f"Bearer {bob_token}"},
).json()
print(f"Bob's order status: {order['status']}")
# Step 5: Check updated prices
prices = requests.get(
f"{BASE_URL}/markets/{market['id']}/prices"
).json()
print(f"Updated prices: {prices}")
# Step 6: Check Bob's positions
positions = requests.get(
f"{BASE_URL}/trading/positions",
headers={"Authorization": f"Bearer {bob_token}"},
).json()
print(f"Bob's positions: {positions}")
# Step 7: Alice resolves the market (in the future, when outcome is known)
# resolution = requests.post(
# f"{BASE_URL}/markets/{market['id']}/resolve",
# json={"winning_outcome_id": market["outcomes"][0]["id"]},
# headers={"Authorization": f"Bearer {alice_token}"},
# ).json()
32.9.2 Testing the Platform
Automated testing is essential. Here is a test for the LMSR engine:
"""Tests for the LMSR automated market maker."""
import pytest
import math
from engines.lmsr import LMSRMarketMaker
class TestLMSR:
"""Test suite for the LMSR market maker."""
def test_initial_prices_are_uniform(self):
"""All outcomes should start with equal prices."""
amm = LMSRMarketMaker(b=100.0, num_outcomes=3)
prices = amm.all_prices()
for p in prices:
assert abs(p - 1/3) < 1e-10
def test_prices_sum_to_one(self):
"""Prices must always sum to 1.0."""
amm = LMSRMarketMaker(b=50.0, num_outcomes=4)
amm.execute_trade(0, 30)
amm.execute_trade(2, -10)
assert abs(sum(amm.all_prices()) - 1.0) < 1e-10
def test_buying_increases_price(self):
"""Buying shares of an outcome should increase its price."""
amm = LMSRMarketMaker(b=100.0, num_outcomes=2)
initial_price = amm.price(0)
amm.execute_trade(0, 10)
assert amm.price(0) > initial_price
def test_cost_is_positive_for_buy(self):
"""Buying shares should have a positive cost."""
amm = LMSRMarketMaker(b=100.0, num_outcomes=2)
cost = amm.compute_trade_cost(0, 10)
assert cost > 0
def test_max_loss_bound(self):
"""Maximum loss should be b * ln(n)."""
amm = LMSRMarketMaker(b=100.0, num_outcomes=3)
expected = 100.0 * math.log(3)
assert abs(amm.max_loss() - expected) < 1e-10
def test_roundtrip_trade(self):
"""Buying and selling the same amount should approximately cancel."""
amm = LMSRMarketMaker(b=100.0, num_outcomes=2)
cost_buy = amm.compute_trade_cost(0, 10)
amm.execute_trade(0, 10)
cost_sell = amm.compute_trade_cost(0, -10)
# Roundtrip should have small net cost due to price movement
assert abs(cost_buy + cost_sell) < 1.0 # Small but not zero
32.10 Chapter Summary
In this chapter, we built a complete prediction market platform from the ground up. Here is what we covered:
Architecture: We designed a layered architecture with clear separation between HTTP handling (routers), business logic (services), and pricing mechanisms (engines). This modular design makes the system maintainable and testable.
Order Book Engine: We implemented a continuous double auction with price-time priority matching. The engine supports limit orders, market orders, partial fills, and lazy cancellation through a heap-based data structure.
LMSR AMM: We implemented Hanson's Logarithmic Market Scoring Rule with numerical stability (log-sum-exp trick). The AMM provides guaranteed liquidity and produces prices that are directly interpretable as probabilities.
REST API: We designed a comprehensive RESTful API with endpoints for user management, market creation, order placement, position tracking, and market resolution. FastAPI's Pydantic integration provides automatic validation and documentation.
Authentication: We implemented JWT-based authentication with bcrypt password hashing. The dependency injection pattern in FastAPI makes it easy to protect endpoints while keeping the code clean.
Market Resolution: We built a resolution service that handles payout distribution, including edge cases like market voiding and orphaned positions.
Frontend: We created a minimal but functional web interface that communicates with the API using standard fetch requests.
Key Formulas
| Formula | Description |
|---|---|
| $C(\mathbf{q}) = b \cdot \ln\!\left(\sum_{i=1}^{n} e^{q_i / b}\right)$ | LMSR cost function |
| $p_i(\mathbf{q}) = \frac{e^{q_i / b}}{\sum_{j=1}^{n} e^{q_j / b}}$ | LMSR price function |
| $\text{cost} = C(\mathbf{q'}) - C(\mathbf{q})$ | Trade cost |
| $L_{\max} = b \cdot \ln(n)$ | Maximum market maker loss |
| $\text{spread} = p_{\text{ask}} - p_{\text{bid}}$ | Bid-ask spread |
| $\text{PnL} = (p_{\text{current}} - p_{\text{avg\_cost}}) \times \text{shares}$ | Unrealized profit/loss |
Decision Framework: Order Book vs. LMSR
| Factor | Order Book | LMSR |
|---|---|---|
| Liquidity source | Other traders | Market maker subsidy |
| Thin markets | Poor (wide spreads) | Good (always available) |
| Price discovery | Excellent | Good |
| Cost to operate | Low | Bounded by $b \cdot \ln(n)$ |
| Complexity | Medium | Low |
| Best for | Active markets, many traders | Internal forecasts, niche topics |
What's Next
In Chapter 33, Scaling, Performance, and Operations, we address the challenges of taking this platform to production. We will cover database migration from SQLite to PostgreSQL, adding Redis for caching and order book persistence, implementing rate limiting, deploying with Docker and Kubernetes, monitoring with Prometheus and Grafana, and handling the operational concerns (backups, incident response, compliance) that distinguish a demo from a real platform.