Case Study 2: Building a Simple Matching Engine
Overview
In this case study, we build a complete matching engine from the ground up, test it with simulated order flow, measure its performance, and handle a variety of edge cases. This is a hands-on engineering exercise that reinforces the concepts from Sections 7.5 and 7.6 of the chapter.
By the end of this case study, you will have: 1. A production-quality (though simplified) matching engine in Python 2. A test suite that covers normal operations and edge cases 3. Performance benchmarks 4. A simulation harness that generates realistic order flow
The complete code is available in code/case-study-code.py.
Step 1: Requirements and Design
Functional Requirements
Our matching engine must: 1. Accept limit orders (buy and sell) with GTC, IOC, and FOK time-in-force 2. Accept market orders (implemented as aggressive limit orders) 3. Match orders using price-time priority 4. Support order cancellation 5. Generate trade notifications 6. Maintain a consistent order book state 7. Handle prediction market price bounds ($0.01 to $0.99)
Design Decisions
- Language: Python 3.9+
- Data structures: Sorted dictionaries for price levels, lists for FIFO order queues
- Matching: Synchronous (no threading) -- process one order at a time
- IDs: Auto-generated sequential IDs
- Timestamps:
time.time()for ordering
Architecture
┌──────────────────────────────────────────┐
│ MatchingEngine │
│ ┌────────────────────────────────────┐ │
│ │ OrderValidator │ │
│ │ - Price bounds check │ │
│ │ - Quantity check │ │
│ │ - Duplicate ID check │ │
│ └──────────────┬─────────────────────┘ │
│ │ │
│ ┌──────────────▼─────────────────────┐ │
│ │ OrderBook │ │
│ │ ┌─────────┐ ┌──────────┐ │ │
│ │ │ Bids │ │ Asks │ │ │
│ │ │(sorted) │ │ (sorted) │ │ │
│ │ └────┬────┘ └────┬─────┘ │ │
│ │ │ │ │ │
│ │ ┌────▼──────────────▼─────┐ │ │
│ │ │ PriceLevel (FIFO) │ │ │
│ │ │ [Order, Order, ...] │ │ │
│ │ └─────────────────────────┘ │ │
│ └────────────────────────────────────┘ │
│ │ │
│ ┌──────────────▼─────────────────────┐ │
│ │ TradeLogger │ │
│ │ - Records all trades │ │
│ │ - Notifies callbacks │ │
│ └────────────────────────────────────┘ │
└──────────────────────────────────────────┘
Step 2: Core Implementation
The Order Class
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, List, Dict, Tuple, Callable
import time
import random
class Side(Enum):
BUY = "BUY"
SELL = "SELL"
class OrderStatus(Enum):
NEW = "NEW"
PARTIALLY_FILLED = "PARTIALLY_FILLED"
FILLED = "FILLED"
CANCELLED = "CANCELLED"
REJECTED = "REJECTED"
class TimeInForce(Enum):
GTC = "GTC"
IOC = "IOC"
FOK = "FOK"
@dataclass
class Order:
order_id: str
side: Side
price: float
quantity: int
time_in_force: TimeInForce = TimeInForce.GTC
timestamp: float = field(default_factory=time.time)
remaining_quantity: int = None
status: OrderStatus = OrderStatus.NEW
def __post_init__(self):
if self.remaining_quantity is None:
self.remaining_quantity = self.quantity
@property
def is_filled(self) -> bool:
return self.remaining_quantity == 0
@property
def filled_quantity(self) -> int:
return self.quantity - self.remaining_quantity
@dataclass
class Trade:
trade_id: str
buy_order_id: str
sell_order_id: str
price: float
quantity: int
timestamp: float = field(default_factory=time.time)
aggressor_side: Side = None # Which side was the incoming order
The PriceLevel Class
class PriceLevel:
"""Orders at a single price, maintained in FIFO order."""
def __init__(self, price: float):
self.price = price
self._orders: List[Order] = []
@property
def total_quantity(self) -> int:
return sum(o.remaining_quantity for o in self._orders)
@property
def order_count(self) -> int:
return len(self._orders)
@property
def is_empty(self) -> bool:
return len(self._orders) == 0
def add(self, order: Order):
self._orders.append(order)
def remove(self, order_id: str) -> Optional[Order]:
for i, order in enumerate(self._orders):
if order.order_id == order_id:
return self._orders.pop(i)
return None
def first(self) -> Optional[Order]:
return self._orders[0] if self._orders else None
def pop_first(self) -> Optional[Order]:
return self._orders.pop(0) if self._orders else None
def __iter__(self):
return iter(self._orders)
def __len__(self):
return len(self._orders)
The OrderBook Class
from sortedcontainers import SortedDict
class OrderBook:
"""Maintains bid and ask sides with price-level organization."""
def __init__(self):
# Bids: use negative keys so SortedDict gives highest price first
self._bids: SortedDict = SortedDict()
# Asks: normal order (lowest price first)
self._asks: SortedDict = SortedDict()
# Index for fast lookup by order_id
self._order_index: Dict[str, Order] = {}
def add_to_bids(self, order: Order):
key = -order.price
if key not in self._bids:
self._bids[key] = PriceLevel(order.price)
self._bids[key].add(order)
self._order_index[order.order_id] = order
def add_to_asks(self, order: Order):
key = order.price
if key not in self._asks:
self._asks[key] = PriceLevel(order.price)
self._asks[key].add(order)
self._order_index[order.order_id] = order
def remove_order(self, order_id: str) -> Optional[Order]:
if order_id not in self._order_index:
return None
order = self._order_index[order_id]
if order.side == Side.BUY:
key = -order.price
if key in self._bids:
self._bids[key].remove(order_id)
if self._bids[key].is_empty:
del self._bids[key]
else:
key = order.price
if key in self._asks:
self._asks[key].remove(order_id)
if self._asks[key].is_empty:
del self._asks[key]
del self._order_index[order_id]
return order
def best_bid_level(self) -> Optional[PriceLevel]:
if not self._bids:
return None
return self._bids.values()[0]
def best_ask_level(self) -> Optional[PriceLevel]:
if not self._asks:
return None
return self._asks.values()[0]
def best_bid(self) -> Optional[float]:
level = self.best_bid_level()
return level.price if level else None
def best_ask(self) -> Optional[float]:
level = self.best_ask_level()
return level.price if level else None
def spread(self) -> Optional[float]:
bb, ba = self.best_bid(), self.best_ask()
if bb is None or ba is None:
return None
return ba - bb
def get_bids(self, depth: int = 10) -> List[Tuple[float, int]]:
result = []
for level in self._bids.values():
result.append((level.price, level.total_quantity))
if len(result) >= depth:
break
return result
def get_asks(self, depth: int = 10) -> List[Tuple[float, int]]:
result = []
for level in self._asks.values():
result.append((level.price, level.total_quantity))
if len(result) >= depth:
break
return result
def get_order(self, order_id: str) -> Optional[Order]:
return self._order_index.get(order_id)
@property
def bid_depth(self) -> int:
return sum(l.total_quantity for l in self._bids.values())
@property
def ask_depth(self) -> int:
return sum(l.total_quantity for l in self._asks.values())
def clear_empty_levels(self):
"""Remove any empty price levels (maintenance)."""
empty_bids = [k for k, v in self._bids.items() if v.is_empty]
for k in empty_bids:
del self._bids[k]
empty_asks = [k for k, v in self._asks.items() if v.is_empty]
for k in empty_asks:
del self._asks[k]
The MatchingEngine Class
class SimpleMatchingEngine:
"""
A complete matching engine for prediction market contracts.
Features:
- Price-time priority matching
- Limit orders with GTC, IOC, FOK
- Market orders
- Order cancellation
- Trade callbacks
- Price validation for prediction markets ($0.01-$0.99)
"""
MIN_PRICE = 0.01
MAX_PRICE = 0.99
def __init__(self, instrument: str = "EVENT"):
self.instrument = instrument
self.book = OrderBook()
self.trades: List[Trade] = []
self._order_counter = 0
self._trade_counter = 0
self._callbacks: List[Callable] = []
# ── ID Generation ─────────────────────────────────
def _next_order_id(self) -> str:
self._order_counter += 1
return f"O{self._order_counter:08d}"
def _next_trade_id(self) -> str:
self._trade_counter += 1
return f"T{self._trade_counter:08d}"
# ── Callbacks ─────────────────────────────────────
def on_trade(self, callback: Callable):
"""Register a callback for trade events."""
self._callbacks.append(callback)
def _emit_trade(self, trade: Trade):
for cb in self._callbacks:
cb(trade)
# ── Validation ────────────────────────────────────
def _validate(self, side: Side, price: float, quantity: int):
if price < self.MIN_PRICE or price > self.MAX_PRICE:
raise ValueError(
f"Price ${price:.2f} out of bounds "
f"[${self.MIN_PRICE}, ${self.MAX_PRICE}]")
if quantity <= 0:
raise ValueError(f"Quantity must be positive, got {quantity}")
# ── Order Submission ──────────────────────────────
def submit_limit(self, side: Side, price: float, quantity: int,
tif: TimeInForce = TimeInForce.GTC
) -> Tuple[str, List[Trade]]:
"""Submit a limit order. Returns (order_id, list_of_trades)."""
self._validate(side, price, quantity)
order_id = self._next_order_id()
order = Order(
order_id=order_id,
side=side,
price=price,
quantity=quantity,
time_in_force=tif
)
# FOK pre-check: can we fill completely?
if tif == TimeInForce.FOK:
available = self._available_quantity(side, price)
if available < quantity:
order.status = OrderStatus.REJECTED
return order_id, []
# Attempt matching
new_trades = self._match(order)
# Post-match handling based on TIF
if order.remaining_quantity > 0:
if tif == TimeInForce.GTC:
# Rest in book
if order.side == Side.BUY:
self.book.add_to_bids(order)
else:
self.book.add_to_asks(order)
order.status = (OrderStatus.PARTIALLY_FILLED
if order.filled_quantity > 0
else OrderStatus.NEW)
elif tif == TimeInForce.IOC:
order.status = (OrderStatus.PARTIALLY_FILLED
if order.filled_quantity > 0
else OrderStatus.CANCELLED)
# FOK with remaining would not reach here due to pre-check
return order_id, new_trades
def submit_market(self, side: Side, quantity: int
) -> Tuple[str, List[Trade]]:
"""Submit a market order (IOC at extreme price)."""
price = self.MAX_PRICE if side == Side.BUY else self.MIN_PRICE
return self.submit_limit(side, price, quantity, TimeInForce.IOC)
def cancel(self, order_id: str) -> bool:
"""Cancel a resting order. Returns True if found and cancelled."""
order = self.book.remove_order(order_id)
if order is None:
return False
order.status = OrderStatus.CANCELLED
return True
# ── Matching Logic ────────────────────────────────
def _match(self, incoming: Order) -> List[Trade]:
"""Core matching algorithm: price-time priority."""
trades = []
while incoming.remaining_quantity > 0:
# Get the best level on the opposite side
if incoming.side == Side.BUY:
best_level = self.book.best_ask_level()
else:
best_level = self.book.best_bid_level()
if best_level is None:
break # No resting orders on the opposite side
# Check price compatibility
if incoming.side == Side.BUY:
if incoming.price < best_level.price:
break # Incoming bid is below best ask
else:
if incoming.price > best_level.price:
break # Incoming ask is above best bid
# Match against orders at this level (FIFO)
while incoming.remaining_quantity > 0 and not best_level.is_empty:
resting = best_level.first()
fill_qty = min(incoming.remaining_quantity,
resting.remaining_quantity)
trade_price = resting.price
# Create trade
trade = self._create_trade(incoming, resting,
trade_price, fill_qty)
trades.append(trade)
self.trades.append(trade)
self._emit_trade(trade)
# Update quantities
incoming.remaining_quantity -= fill_qty
resting.remaining_quantity -= fill_qty
# Remove fully filled resting order
if resting.is_filled:
resting.status = OrderStatus.FILLED
best_level.pop_first()
if resting.order_id in self.book._order_index:
del self.book._order_index[resting.order_id]
else:
resting.status = OrderStatus.PARTIALLY_FILLED
# Clean up empty level
if best_level.is_empty:
if incoming.side == Side.BUY:
key = best_level.price
if key in self.book._asks:
del self.book._asks[key]
else:
key = -best_level.price
if key in self.book._bids:
del self.book._bids[key]
# Update incoming order status
if incoming.is_filled:
incoming.status = OrderStatus.FILLED
return trades
def _create_trade(self, incoming: Order, resting: Order,
price: float, quantity: int) -> Trade:
"""Create a trade record."""
if incoming.side == Side.BUY:
buy_id, sell_id = incoming.order_id, resting.order_id
else:
buy_id, sell_id = resting.order_id, incoming.order_id
return Trade(
trade_id=self._next_trade_id(),
buy_order_id=buy_id,
sell_order_id=sell_id,
price=price,
quantity=quantity,
aggressor_side=incoming.side
)
def _available_quantity(self, side: Side, price: float) -> int:
"""How much can be filled at the given price or better."""
total = 0
if side == Side.BUY:
for level in self.book._asks.values():
if level.price > price:
break
total += level.total_quantity
else:
for level in self.book._bids.values():
if level.price < price:
break
total += level.total_quantity
return total
# ── Query Methods ─────────────────────────────────
def snapshot(self) -> dict:
"""Return a snapshot of the current state."""
return {
'instrument': self.instrument,
'best_bid': self.book.best_bid(),
'best_ask': self.book.best_ask(),
'spread': self.book.spread(),
'bids': self.book.get_bids(),
'asks': self.book.get_asks(),
'bid_depth': self.book.bid_depth,
'ask_depth': self.book.ask_depth,
'trade_count': len(self.trades),
'order_count': self._order_counter,
}
def display(self, levels: int = 5):
"""Print the current order book."""
bids = self.book.get_bids(levels)
asks = self.book.get_asks(levels)
print(f"\n{'=' * 50}")
print(f" {self.instrument} Order Book")
print(f"{'=' * 50}")
print(f" {'BIDS':<20} | {'ASKS':>20}")
print(f" {'─' * 18} | {'─' * 20}")
max_rows = max(len(bids), len(asks))
for i in range(max_rows):
left = f" ${bids[i][0]:>6.2f} {bids[i][1]:>7}" if i < len(bids) else " " * 18
right = f"${asks[i][0]:>6.2f} {asks[i][1]:>7}" if i < len(asks) else ""
print(f"{left} | {right}")
sp = self.book.spread()
print(f" {'─' * 18} | {'─' * 20}")
if sp is not None:
bb, ba = self.book.best_bid(), self.book.best_ask()
mid = (bb + ba) / 2
print(f" Spread: ${sp:.2f} Midpoint: ${mid:.4f}")
print(f" Trades: {len(self.trades)} "
f"Bid depth: {self.book.bid_depth} "
f"Ask depth: {self.book.ask_depth}")
print(f"{'=' * 50}\n")
Step 3: Testing the Matching Engine
Test Suite
def test_basic_limit_orders():
"""Test that non-crossing limit orders rest in the book."""
engine = SimpleMatchingEngine("TEST")
# Place bids and asks that don't cross
engine.submit_limit(Side.BUY, 0.50, 100)
engine.submit_limit(Side.BUY, 0.48, 200)
engine.submit_limit(Side.SELL, 0.55, 150)
engine.submit_limit(Side.SELL, 0.58, 100)
assert engine.book.best_bid() == 0.50
assert engine.book.best_ask() == 0.55
assert engine.book.spread() == 0.05
assert engine.book.bid_depth == 300
assert engine.book.ask_depth == 250
assert len(engine.trades) == 0
print("PASS: test_basic_limit_orders")
def test_crossing_limit_order():
"""Test that a crossing limit order generates a trade."""
engine = SimpleMatchingEngine("TEST")
engine.submit_limit(Side.SELL, 0.55, 100)
oid, trades = engine.submit_limit(Side.BUY, 0.55, 50)
assert len(trades) == 1
assert trades[0].price == 0.55
assert trades[0].quantity == 50
assert engine.book.best_ask() == 0.55 # 50 remaining
assert engine.book.best_bid() is None # Buy was fully filled
print("PASS: test_crossing_limit_order")
def test_aggressive_limit_order():
"""Test that a buy above the ask executes at the ask price."""
engine = SimpleMatchingEngine("TEST")
engine.submit_limit(Side.SELL, 0.55, 100)
oid, trades = engine.submit_limit(Side.BUY, 0.60, 50)
assert len(trades) == 1
assert trades[0].price == 0.55 # Executes at resting price
assert trades[0].quantity == 50
print("PASS: test_aggressive_limit_order")
def test_partial_fill_and_rest():
"""Test partial fill with remainder resting in book."""
engine = SimpleMatchingEngine("TEST")
engine.submit_limit(Side.SELL, 0.55, 50)
oid, trades = engine.submit_limit(Side.BUY, 0.55, 100)
assert len(trades) == 1
assert trades[0].quantity == 50
# Remaining 50 should rest as a bid
assert engine.book.best_bid() == 0.55
bids = engine.book.get_bids()
assert bids[0] == (0.55, 50)
print("PASS: test_partial_fill_and_rest")
def test_walking_the_book():
"""Test a large order matching across multiple price levels."""
engine = SimpleMatchingEngine("TEST")
engine.submit_limit(Side.SELL, 0.55, 100)
engine.submit_limit(Side.SELL, 0.57, 100)
engine.submit_limit(Side.SELL, 0.60, 100)
oid, trades = engine.submit_limit(Side.BUY, 0.60, 250)
assert len(trades) == 3
assert trades[0].price == 0.55
assert trades[0].quantity == 100
assert trades[1].price == 0.57
assert trades[1].quantity == 100
assert trades[2].price == 0.60
assert trades[2].quantity == 50
# 50 remain at 0.60 on the ask
assert engine.book.best_ask() == 0.60
print("PASS: test_walking_the_book")
def test_price_time_priority():
"""Test that earlier orders at the same price match first."""
engine = SimpleMatchingEngine("TEST")
oid1, _ = engine.submit_limit(Side.SELL, 0.55, 100)
time.sleep(0.001) # Ensure different timestamps
oid2, _ = engine.submit_limit(Side.SELL, 0.55, 100)
_, trades = engine.submit_limit(Side.BUY, 0.55, 50)
assert len(trades) == 1
assert trades[0].sell_order_id == oid1 # First order matched first
print("PASS: test_price_time_priority")
def test_market_order():
"""Test market order execution."""
engine = SimpleMatchingEngine("TEST")
engine.submit_limit(Side.SELL, 0.55, 100)
engine.submit_limit(Side.SELL, 0.60, 100)
oid, trades = engine.submit_market(Side.BUY, 150)
assert len(trades) == 2
assert trades[0].price == 0.55
assert trades[0].quantity == 100
assert trades[1].price == 0.60
assert trades[1].quantity == 50
print("PASS: test_market_order")
def test_ioc_partial_fill():
"""Test IOC: partial fill, remainder cancelled."""
engine = SimpleMatchingEngine("TEST")
engine.submit_limit(Side.SELL, 0.55, 50)
oid, trades = engine.submit_limit(Side.BUY, 0.55, 100, TimeInForce.IOC)
assert len(trades) == 1
assert trades[0].quantity == 50
# No remainder should rest in the book
assert engine.book.best_bid() is None
print("PASS: test_ioc_partial_fill")
def test_fok_success():
"""Test FOK: full fill succeeds."""
engine = SimpleMatchingEngine("TEST")
engine.submit_limit(Side.SELL, 0.55, 100)
oid, trades = engine.submit_limit(Side.BUY, 0.55, 100, TimeInForce.FOK)
assert len(trades) == 1
assert trades[0].quantity == 100
print("PASS: test_fok_success")
def test_fok_failure():
"""Test FOK: insufficient quantity means no fill at all."""
engine = SimpleMatchingEngine("TEST")
engine.submit_limit(Side.SELL, 0.55, 50)
oid, trades = engine.submit_limit(Side.BUY, 0.55, 100, TimeInForce.FOK)
assert len(trades) == 0
# The resting ask should be untouched
assert engine.book.best_ask() == 0.55
asks = engine.book.get_asks()
assert asks[0] == (0.55, 50)
print("PASS: test_fok_failure")
def test_cancel_order():
"""Test order cancellation."""
engine = SimpleMatchingEngine("TEST")
oid, _ = engine.submit_limit(Side.BUY, 0.50, 100)
assert engine.book.best_bid() == 0.50
result = engine.cancel(oid)
assert result is True
assert engine.book.best_bid() is None
# Cancel non-existent order
result = engine.cancel("FAKE_ID")
assert result is False
print("PASS: test_cancel_order")
def test_price_validation():
"""Test that out-of-bounds prices are rejected."""
engine = SimpleMatchingEngine("TEST")
try:
engine.submit_limit(Side.BUY, 0.00, 100)
assert False, "Should have raised ValueError"
except ValueError:
pass
try:
engine.submit_limit(Side.SELL, 1.00, 100)
assert False, "Should have raised ValueError"
except ValueError:
pass
try:
engine.submit_limit(Side.BUY, 0.50, -10)
assert False, "Should have raised ValueError"
except ValueError:
pass
print("PASS: test_price_validation")
def test_self_trade():
"""Test that the engine handles self-trading scenarios correctly."""
engine = SimpleMatchingEngine("TEST")
# In a simple engine, we don't prevent self-trades.
# This test just ensures no crash.
engine.submit_limit(Side.SELL, 0.55, 100)
oid, trades = engine.submit_limit(Side.BUY, 0.55, 100)
assert len(trades) == 1
print("PASS: test_self_trade")
def test_empty_book_market_order():
"""Test market order on empty book -- should produce no trades."""
engine = SimpleMatchingEngine("TEST")
oid, trades = engine.submit_market(Side.BUY, 100)
assert len(trades) == 0
print("PASS: test_empty_book_market_order")
def run_all_tests():
"""Run the complete test suite."""
print("\n" + "=" * 50)
print(" Running Matching Engine Test Suite")
print("=" * 50 + "\n")
test_basic_limit_orders()
test_crossing_limit_order()
test_aggressive_limit_order()
test_partial_fill_and_rest()
test_walking_the_book()
test_price_time_priority()
test_market_order()
test_ioc_partial_fill()
test_fok_success()
test_fok_failure()
test_cancel_order()
test_price_validation()
test_self_trade()
test_empty_book_market_order()
print(f"\nAll tests passed!")
print("=" * 50 + "\n")
Step 4: Simulating Realistic Order Flow
Now we stress-test the engine with simulated order flow that mimics a real prediction market.
Order Flow Generator
class OrderFlowSimulator:
"""
Generates realistic order flow for a prediction market.
Models three types of participants:
1. Market makers: continuously quote both sides
2. Informed traders: trade directionally based on private information
3. Noise traders: random small trades
"""
def __init__(self, engine: SimpleMatchingEngine,
true_prob: float = 0.50):
self.engine = engine
self.true_prob = true_prob # The "true" probability
self.mm_order_ids = [] # Track market maker orders
def market_maker_quote(self, spread: float = 0.04,
size: int = 200):
"""Place symmetric quotes around the true probability."""
# Cancel old quotes
for oid in self.mm_order_ids:
self.engine.cancel(oid)
self.mm_order_ids.clear()
mid = self.true_prob
bid_price = round(mid - spread / 2, 2)
ask_price = round(mid + spread / 2, 2)
# Clamp to valid range
bid_price = max(0.01, min(0.98, bid_price))
ask_price = max(0.02, min(0.99, ask_price))
oid_bid, _ = self.engine.submit_limit(Side.BUY, bid_price, size)
oid_ask, _ = self.engine.submit_limit(Side.SELL, ask_price, size)
self.mm_order_ids.extend([oid_bid, oid_ask])
def informed_trade(self, direction: str = "BUY",
size: int = None):
"""An informed trader buys or sells aggressively."""
if size is None:
size = random.randint(20, 100)
side = Side.BUY if direction == "BUY" else Side.SELL
self.engine.submit_market(side, size)
def noise_trade(self):
"""A noise trader places a random limit order near the midpoint."""
side = random.choice([Side.BUY, Side.SELL])
mid = self.true_prob
# Random price within a few cents of midpoint
offset = random.gauss(0, 0.02)
price = round(mid + offset, 2)
price = max(0.01, min(0.99, price))
qty = random.randint(10, 50)
self.engine.submit_limit(side, price, qty, TimeInForce.GTC)
def simulate_session(self, n_steps: int = 200,
shock_at: int = None,
shock_prob: float = None):
"""
Run a simulation session.
At each step, one of three things happens:
- 40% chance: market maker refreshes quotes
- 30% chance: noise trader places an order
- 20% chance: informed trader trades
- 10% chance: random cancellation
"""
midpoints = []
spreads = []
depths = []
for step in range(n_steps):
# Apply shock if specified
if shock_at and step == shock_at and shock_prob:
self.true_prob = shock_prob
print(f" [Step {step}] SHOCK: "
f"true probability shifted to {shock_prob:.2f}")
# Random action
roll = random.random()
if roll < 0.40:
# Market maker refreshes
spread_width = 0.04
if shock_at and abs(step - shock_at) < 10:
spread_width = 0.08 # Wider during shock
self.market_maker_quote(spread=spread_width)
elif roll < 0.70:
# Noise trader
self.noise_trade()
elif roll < 0.90:
# Informed trader
if self.true_prob > 0.55:
self.informed_trade("BUY", random.randint(20, 80))
elif self.true_prob < 0.45:
self.informed_trade("SELL", random.randint(20, 80))
else:
# No strong signal
self.noise_trade()
else:
# Random cancellation (if there are resting orders)
pass # In a full simulation, we would cancel a random order
# Record metrics
bb = self.engine.book.best_bid()
ba = self.engine.book.best_ask()
if bb and ba:
midpoints.append((bb + ba) / 2)
spreads.append(ba - bb)
else:
midpoints.append(midpoints[-1] if midpoints else 0.50)
spreads.append(spreads[-1] if spreads else 0.04)
depths.append(self.engine.book.bid_depth +
self.engine.book.ask_depth)
return {
'midpoints': midpoints,
'spreads': spreads,
'depths': depths,
'trades': self.engine.trades.copy()
}
Running the Simulation
def run_simulation_demo():
"""Run a complete simulation and display results."""
print("\n" + "=" * 60)
print(" Order Flow Simulation")
print("=" * 60)
engine = SimpleMatchingEngine("Candidate A Wins Election")
sim = OrderFlowSimulator(engine, true_prob=0.50)
# Run with a shock at step 100 (probability jumps to 0.65)
random.seed(42) # For reproducibility
results = sim.simulate_session(
n_steps=200,
shock_at=100,
shock_prob=0.65
)
# Display final state
engine.display()
# Summary statistics
midpoints = results['midpoints']
spreads = results['spreads']
trades = results['trades']
print(f" Simulation Summary:")
print(f" {'─' * 40}")
print(f" Total steps: 200")
print(f" Total trades: {len(trades)}")
print(f" Final midpoint: ${midpoints[-1]:.4f}")
print(f" Starting mid: ${midpoints[0]:.4f}")
print(f" Min spread: ${min(spreads):.4f}")
print(f" Max spread: ${max(spreads):.4f}")
print(f" Avg spread: ${sum(spreads)/len(spreads):.4f}")
if trades:
vwap = sum(t.price * t.quantity for t in trades) / \
sum(t.quantity for t in trades)
print(f" VWAP: ${vwap:.4f}")
print(f" Total volume: {sum(t.quantity for t in trades)}")
print(f" {'─' * 40}")
print()
return results
Step 5: Performance Benchmarks
How fast is our matching engine? Let us measure.
import time as time_module
def benchmark_matching_engine():
"""Measure operations per second for key operations."""
print("\n" + "=" * 60)
print(" Performance Benchmarks")
print("=" * 60 + "\n")
# Benchmark 1: Order insertion (no matching)
engine = SimpleMatchingEngine("BENCH")
n = 10000
start = time_module.time()
for i in range(n):
price = round(0.01 + (i % 98) * 0.01, 2)
side = Side.BUY if i % 2 == 0 else Side.SELL
if side == Side.BUY:
price = round(price * 0.45, 2) + 0.01 # Low bids
else:
price = round(price * 0.45, 2) + 0.55 # High asks
engine.submit_limit(side, max(0.01, min(0.99, price)), 10)
elapsed = time_module.time() - start
print(f" Order insertion (no match): "
f"{n} orders in {elapsed:.3f}s "
f"({n/elapsed:.0f} orders/sec)")
# Benchmark 2: Order cancellation
engine2 = SimpleMatchingEngine("BENCH")
order_ids = []
for i in range(n):
price = round(random.uniform(0.01, 0.40), 2)
oid, _ = engine2.submit_limit(Side.BUY, price, 10)
order_ids.append(oid)
start = time_module.time()
for oid in order_ids:
engine2.cancel(oid)
elapsed = time_module.time() - start
print(f" Order cancellation: "
f"{n} cancels in {elapsed:.3f}s "
f"({n/elapsed:.0f} cancels/sec)")
# Benchmark 3: Matching (many small trades)
engine3 = SimpleMatchingEngine("BENCH")
# Pre-load asks
for i in range(100):
price = round(0.50 + i * 0.005, 3)
price = min(0.99, price)
engine3.submit_limit(Side.SELL, price, 100)
n_match = 5000
start = time_module.time()
for i in range(n_match):
engine3.submit_limit(Side.BUY, 0.60, 1)
elapsed = time_module.time() - start
print(f" Matching (small orders): "
f"{n_match} orders in {elapsed:.3f}s "
f"({n_match/elapsed:.0f} orders/sec)")
print(f" Trades generated: {len(engine3.trades)}")
# Benchmark 4: Full simulation throughput
engine4 = SimpleMatchingEngine("BENCH")
sim = OrderFlowSimulator(engine4, true_prob=0.50)
n_sim = 1000
start = time_module.time()
sim.simulate_session(n_steps=n_sim)
elapsed = time_module.time() - start
print(f" Full simulation: "
f"{n_sim} steps in {elapsed:.3f}s "
f"({n_sim/elapsed:.0f} steps/sec)")
print(f"\n{'=' * 60}\n")
Expected Performance
On a typical machine (2020s laptop), you should see: - Order insertion: 50,000-200,000 orders/sec - Cancellation: 100,000-500,000 cancels/sec - Matching: 20,000-100,000 orders/sec - Simulation: 5,000-20,000 steps/sec
This is more than adequate for simulation and research purposes. A production matching engine in C++ would be 100-1000x faster.
Step 6: Edge Cases and Robustness
Edge Case 1: Empty Book Operations
def test_edge_empty_book():
engine = SimpleMatchingEngine("EDGE")
# Query empty book
assert engine.book.best_bid() is None
assert engine.book.best_ask() is None
assert engine.book.spread() is None
assert engine.book.bid_depth == 0
assert engine.book.ask_depth == 0
# Market order on empty book
_, trades = engine.submit_market(Side.BUY, 100)
assert len(trades) == 0
print("PASS: test_edge_empty_book")
Edge Case 2: Exact Price Match
def test_edge_exact_price():
engine = SimpleMatchingEngine("EDGE")
# Sell at exactly the bid price
engine.submit_limit(Side.BUY, 0.50, 100)
_, trades = engine.submit_limit(Side.SELL, 0.50, 100)
assert len(trades) == 1
assert trades[0].price == 0.50
assert trades[0].quantity == 100
print("PASS: test_edge_exact_price")
Edge Case 3: Single-Contract Orders
def test_edge_single_contract():
engine = SimpleMatchingEngine("EDGE")
engine.submit_limit(Side.SELL, 0.55, 1)
_, trades = engine.submit_limit(Side.BUY, 0.55, 1)
assert len(trades) == 1
assert trades[0].quantity == 1
print("PASS: test_edge_single_contract")
Edge Case 4: Price at Boundaries
def test_edge_boundary_prices():
engine = SimpleMatchingEngine("EDGE")
# Minimum and maximum valid prices
engine.submit_limit(Side.BUY, 0.01, 100)
engine.submit_limit(Side.SELL, 0.99, 100)
assert engine.book.best_bid() == 0.01
assert engine.book.best_ask() == 0.99
assert engine.book.spread() == 0.98
print("PASS: test_edge_boundary_prices")
Edge Case 5: Many Orders at Same Price
def test_edge_many_same_price():
engine = SimpleMatchingEngine("EDGE")
# 1000 orders at the same price
for i in range(1000):
engine.submit_limit(Side.SELL, 0.55, 1)
assert engine.book.ask_depth == 1000
# Buy all of them
_, trades = engine.submit_limit(Side.BUY, 0.55, 1000)
assert len(trades) == 1000 # Each 1-contract order is a separate fill
assert engine.book.ask_depth == 0
print("PASS: test_edge_many_same_price")
Edge Case 6: Rapid Cancel-Replace
def test_edge_cancel_replace():
engine = SimpleMatchingEngine("EDGE")
# Simulate rapid cancel-replace (common for market makers)
for i in range(100):
oid, _ = engine.submit_limit(Side.BUY, 0.50, 100)
engine.cancel(oid)
assert engine.book.bid_depth == 0
assert engine.book.ask_depth == 0
print("PASS: test_edge_cancel_replace")
Conclusions
What We Built
We constructed a fully functional matching engine that:
- Handles all standard order types: Limit orders with GTC/IOC/FOK, market orders, and cancellations.
- Implements price-time priority correctly: Verified through targeted tests.
- Validates prediction market constraints: Prices bounded to $0.01-$0.99.
- Supports trade callbacks: External systems can be notified of executions.
- Performs adequately: Tens of thousands of operations per second in Python.
- Handles edge cases: Empty books, boundary prices, single-contract orders, and high-frequency cancel-replace patterns.
What a Production System Would Add
Our matching engine is educational, not production-grade. A real exchange would add:
- Persistence: Orders and trades stored in a database with crash recovery.
- Networking: Accept orders over TCP/WebSocket connections.
- Authentication: Verify trader identity and permissions.
- Risk management: Pre-trade checks for margin, position limits, and credit.
- Self-trade prevention: Prevent a trader from matching against their own orders.
- Audit logging: Every event recorded for regulatory compliance.
- Performance: Implemented in C++ or Rust for microsecond latency.
- Redundancy: Hot standby systems for failover.
Key Takeaways
- The matching engine is conceptually simple but requires careful attention to edge cases and ordering guarantees.
- Price-time priority is straightforward to implement with sorted data structures and FIFO queues.
- Different time-in-force conditions (GTC, IOC, FOK) change post-match behavior, not the matching logic itself.
- Performance in Python is adequate for simulation and research, but production systems require lower-level languages.
- Simulated order flow reveals realistic market dynamics, including spread widening during information shocks and gradual convergence to new equilibrium prices.