> "In theory, there is no difference between theory and practice. In practice, there is."
In This Chapter
- 19.1 From Backtest to Live Trading
- 19.2 Building a Trading Bot Architecture
- 19.3 API Integration and Order Management
- 19.4 Rate Limiting and Reliability
- 19.5 Pre-Trade Risk Checks
- 19.6 Execution Quality
- 19.7 Position Tracking and P&L Monitoring
- 19.8 Logging, Monitoring, and Alerts
- 19.9 Trading Psychology and Discipline
- 19.10 Paper Trading and Simulation
- 19.11 Operational Best Practices
- 19.12 Chapter Summary
- What's Next
Chapter 19: Live Trading, Execution, and Operational Discipline
"In theory, there is no difference between theory and practice. In practice, there is." -- Yogi Berra
Every chapter before this one has been, in some sense, theoretical. We have built models, tested strategies, and measured edges on historical data. Now comes the moment of truth: executing those strategies in real markets, with real capital, against real counterparties, in real time. The gap between a profitable backtest and a profitable live system is vast, and most traders who fail do so not because their models were wrong but because their operational infrastructure was inadequate.
This chapter covers the full stack of live trading in prediction markets: building a bot architecture, integrating with platform APIs, managing orders, controlling risk, tracking positions, monitoring performance, and -- perhaps most importantly -- maintaining the psychological discipline to let a well-designed system do its work. By the end, you will have a complete, deployable trading infrastructure and the operational practices to run it safely.
19.1 From Backtest to Live Trading
19.1.1 The Gap Between Theory and Practice
A backtest tells you what your strategy would have done under idealized conditions. Live trading reveals all the assumptions you did not know you were making. Here are the most common sources of divergence:
Execution assumptions. Backtests typically assume you can trade at the last observed price. In reality, you face bid-ask spreads, your orders move the market, and fills are never guaranteed. A strategy that earns 2 cents per trade in backtesting may lose money live if average slippage exceeds 2 cents.
Latency. Your backtest processes data instantaneously. Your live system has latency at every stage: receiving market data, computing signals, transmitting orders, receiving fills. In fast-moving markets, the price you decided to trade at may be gone by the time your order arrives.
Data quality. Historical data is cleaned and complete. Live data has gaps, duplicates, stale quotes, and format changes. A bot that never encountered a null value in backtesting will crash the first time the API returns one.
Survivorship bias in market selection. Your backtest may have focused on markets that resolved cleanly. Live trading means encountering markets with ambiguous resolution criteria, delayed resolution, or mid-stream rule changes.
Psychological factors. A backtest has no emotions. You do. Watching real money fluctuate changes how you interpret signals, manage risk, and make decisions.
19.1.2 Paper Trading as a Bridge
Paper trading (also called simulated trading or demo trading) provides a middle ground. You run your strategy against live market data but without placing real orders. This validates:
- Data pipeline reliability. Does your system handle the full range of live data conditions?
- Signal generation timing. Do signals fire at the right moments with live data?
- Order logic correctness. Would the orders your system generates make sense?
- System stability. Does your bot run for hours or days without crashing?
Paper trading does not validate execution quality, market impact, or psychological resilience. These require live trading, initially with small positions.
19.1.3 The Staged Deployment Approach
We recommend a four-stage deployment:
| Stage | Description | Duration | Capital |
|---|---|---|---|
| 1. Unit Testing | Test each component in isolation | 1-2 weeks | $0 |
| 2. Paper Trading | Full system, live data, simulated execution | 2-4 weeks | $0 |
| 3. Small Live | Real orders, minimal position sizes | 2-4 weeks | 5-10% of target |
| 4. Full Live | Target position sizes | Ongoing | 100% of target |
Each stage has explicit criteria for advancement. Do not skip stages. Do not accelerate the timeline because results look good -- short runs prove nothing.
19.1.4 Mental Preparation for Live Trading
Before going live, honestly answer these questions:
-
Can I afford to lose this capital? If losing your entire trading balance would cause financial hardship, you are trading with money you cannot afford to risk.
-
Have I defined my risk limits in advance? Maximum position size, maximum daily loss, maximum drawdown before stopping -- these must be set before the first trade, not after the first loss.
-
Am I prepared for drawdowns? Even a strategy with a genuine edge will have losing streaks. A strategy with a 60% win rate has a 2.8% chance of losing 5 trades in a row. Over hundreds of trades, it will happen.
-
Do I have a process for reviewing and improving? Live trading is not "set and forget." You need a regular cadence of performance review, bug fixing, and strategy refinement.
19.1.5 Common Surprises When Going Live
Experienced traders report these consistent surprises when moving from paper to live:
- Fills are worse than expected. Market impact is real, especially in thin prediction markets.
- The system needs more babysitting than expected. APIs change, markets get added or removed, edge cases emerge.
- Emotional reactions are stronger than expected. Even traders who think they are disciplined discover new emotional responses when real money is at stake.
- Infrastructure failures happen more often than expected. Internet outages, API downtime, server crashes -- these are not rare events over a trading career.
- Strategy decay is real. An edge that worked last month may not work this month. Markets adapt.
19.2 Building a Trading Bot Architecture
19.2.1 Component Overview
A well-architected trading bot separates concerns into distinct components, each with a clear responsibility:
┌─────────────────────────────────────────────────────┐
│ Trading Bot System │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Data Feed │──>│ Signal │──>│ Risk │ │
│ │ Handler │ │ Generator │ │ Manager │ │
│ └──────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │ │
│ │ ┌──────────────────────┘ │
│ │ │ │
│ │ v │
│ │ ┌──────────┐ ┌──────────────────┐ │
│ │ │ Order │──>│ Position │ │
│ │ │ Executor │ │ Tracker │ │
│ │ └──────────┘ └──────────────────┘ │
│ │ │ │ │
│ v v v │
│ ┌──────────────────────────────────────────────┐ │
│ │ Logger / Monitor │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
Data Feed Handler. Connects to market data sources (platform APIs, websockets, or polling endpoints). Normalizes data into a consistent internal format. Detects and handles data quality issues.
Signal Generator. Implements your trading strategy. Takes market data as input, produces trading signals as output. This is where your models from earlier chapters live.
Risk Manager. Validates proposed trades against risk limits before they are sent to the market. This is your last line of defense against catastrophic errors.
Order Executor. Translates trading decisions into API calls. Manages the lifecycle of orders from submission to fill or cancellation.
Position Tracker. Maintains the current state of all positions, including unrealized P&L, margin requirements, and exposure by category.
Logger / Monitor. Records everything that happens in the system. Generates alerts when things go wrong.
19.2.2 Message Flow Between Components
The components communicate through a well-defined message flow:
- Data Feed publishes
MarketUpdateevents. - Signal Generator consumes
MarketUpdateevents, emitsSignalevents. - Risk Manager consumes
Signalevents, performs validation. If approved, emitsOrderRequestevents. If rejected, emitsRiskRejectionevents. - Order Executor consumes
OrderRequestevents, submits to the platform. EmitsOrderStatusevents (submitted, filled, partially filled, rejected, canceled). - Position Tracker consumes
OrderStatusevents (specifically fills), updates positions. EmitsPositionUpdateevents. - Logger consumes all events from all components.
This event-driven architecture has several advantages:
- Components are loosely coupled and can be tested independently.
- The system is extensible -- you can add new consumers without modifying producers.
- The event log provides a complete audit trail.
19.2.3 Python Bot Skeleton
Here is the core architecture. Each component is a class with a well-defined interface:
import time
import logging
import threading
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Callable
from enum import Enum
from datetime import datetime
from collections import deque
class Side(Enum):
BUY = "BUY"
SELL = "SELL"
class OrderStatus(Enum):
PENDING = "PENDING"
SUBMITTED = "SUBMITTED"
PARTIALLY_FILLED = "PARTIALLY_FILLED"
FILLED = "FILLED"
CANCELED = "CANCELED"
REJECTED = "REJECTED"
@dataclass
class MarketUpdate:
"""A snapshot of a market's current state."""
market_id: str
timestamp: datetime
best_bid: Optional[float] = None
best_ask: Optional[float] = None
last_price: Optional[float] = None
volume_24h: float = 0.0
open_interest: float = 0.0
@dataclass
class Signal:
"""A trading signal produced by the signal generator."""
market_id: str
timestamp: datetime
side: Side
target_price: float
confidence: float # 0.0 to 1.0
model_name: str = ""
metadata: dict = field(default_factory=dict)
@dataclass
class OrderRequest:
"""A validated order request ready for execution."""
market_id: str
side: Side
quantity: float
price: float
order_type: str = "LIMIT"
time_in_force: str = "GTC"
signal: Optional[Signal] = None
@dataclass
class Fill:
"""A completed trade."""
order_id: str
market_id: str
side: Side
quantity: float
price: float
timestamp: datetime
fees: float = 0.0
class EventBus:
"""Simple publish-subscribe event bus for component communication."""
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = {}
self._lock = threading.Lock()
def subscribe(self, event_type: str, callback: Callable):
with self._lock:
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(callback)
def publish(self, event_type: str, data):
with self._lock:
subscribers = self._subscribers.get(event_type, []).copy()
for callback in subscribers:
try:
callback(data)
except Exception as e:
logging.error(f"Error in subscriber for {event_type}: {e}")
class DataFeedHandler:
"""Connects to market data sources and publishes updates."""
def __init__(self, event_bus: EventBus, api_client, poll_interval: float = 5.0):
self.event_bus = event_bus
self.api_client = api_client
self.poll_interval = poll_interval
self.watched_markets: List[str] = []
self._running = False
self._thread: Optional[threading.Thread] = None
self.logger = logging.getLogger("DataFeed")
def add_market(self, market_id: str):
if market_id not in self.watched_markets:
self.watched_markets.append(market_id)
self.logger.info(f"Now watching market: {market_id}")
def remove_market(self, market_id: str):
if market_id in self.watched_markets:
self.watched_markets.remove(market_id)
self.logger.info(f"Stopped watching market: {market_id}")
def start(self):
self._running = True
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
self.logger.info("Data feed started")
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=10)
self.logger.info("Data feed stopped")
def _poll_loop(self):
while self._running:
for market_id in self.watched_markets:
try:
data = self.api_client.get_market(market_id)
update = MarketUpdate(
market_id=market_id,
timestamp=datetime.utcnow(),
best_bid=data.get("best_bid"),
best_ask=data.get("best_ask"),
last_price=data.get("last_price"),
volume_24h=data.get("volume_24h", 0),
open_interest=data.get("open_interest", 0),
)
self.event_bus.publish("market_update", update)
except Exception as e:
self.logger.error(f"Error fetching {market_id}: {e}")
time.sleep(self.poll_interval)
class SignalGenerator:
"""Generates trading signals from market data."""
def __init__(self, event_bus: EventBus, models: Dict[str, Callable] = None):
self.event_bus = event_bus
self.models = models or {}
self.market_history: Dict[str, deque] = {}
self.logger = logging.getLogger("SignalGen")
# Subscribe to market updates
self.event_bus.subscribe("market_update", self.on_market_update)
def register_model(self, name: str, model_fn: Callable):
"""Register a model that takes market history and returns a signal or None."""
self.models[name] = model_fn
self.logger.info(f"Registered model: {name}")
def on_market_update(self, update: MarketUpdate):
# Maintain rolling history
if update.market_id not in self.market_history:
self.market_history[update.market_id] = deque(maxlen=1000)
self.market_history[update.market_id].append(update)
# Run each model
history = list(self.market_history[update.market_id])
for model_name, model_fn in self.models.items():
try:
result = model_fn(update, history)
if result is not None:
signal = Signal(
market_id=update.market_id,
timestamp=datetime.utcnow(),
side=result["side"],
target_price=result["price"],
confidence=result["confidence"],
model_name=model_name,
metadata=result.get("metadata", {}),
)
self.logger.info(
f"Signal: {signal.side.value} {signal.market_id} "
f"@ {signal.target_price:.4f} "
f"(confidence={signal.confidence:.2f})"
)
self.event_bus.publish("signal", signal)
except Exception as e:
self.logger.error(f"Model {model_name} error: {e}")
The TradingBot class ties everything together (see the full implementation in code/example-01-trading-bot.py).
19.2.4 Design Principles for Trading Systems
Several principles should guide your bot architecture:
Fail safe, not fail fast. When something goes wrong, the system should move to a safe state (no open orders, positions within limits) rather than crashing immediately. A crash while orders are outstanding can leave you with unintended exposure.
Idempotency. Operations should be safe to retry. If you are unsure whether an order was submitted, you need a way to check before resubmitting. Duplicate orders can be very expensive.
Determinism. Given the same inputs, your signal generator should produce the same outputs. Avoid hidden state that makes behavior unpredictable.
Separation of concerns. Your signal generation logic should know nothing about API details. Your API integration should know nothing about your trading strategy. This makes each component independently testable and replaceable.
19.3 API Integration and Order Management
19.3.1 Connecting to Platform APIs
Most prediction market platforms provide REST APIs for trading. Some also offer WebSocket connections for real-time data. The typical workflow:
- Obtain API credentials. Usually an API key and secret, sometimes with additional parameters like a passphrase.
- Authenticate requests. Typically via HMAC signatures or bearer tokens.
- Respect rate limits. Every platform has limits on how many requests you can make per second or per minute.
import hashlib
import hmac
import time
import requests
class PredictionMarketAPI:
"""Generic API client for a prediction market platform."""
def __init__(self, base_url: str, api_key: str, api_secret: str):
self.base_url = base_url
self.api_key = api_key
self.api_secret = api_secret
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"User-Agent": "TradingBot/1.0",
})
def _sign_request(self, method: str, path: str, body: str = "") -> dict:
"""Create authentication headers."""
timestamp = str(int(time.time()))
message = timestamp + method.upper() + path + body
signature = hmac.new(
self.api_secret.encode(),
message.encode(),
hashlib.sha256,
).hexdigest()
return {
"X-API-Key": self.api_key,
"X-Timestamp": timestamp,
"X-Signature": signature,
}
def get_market(self, market_id: str) -> dict:
path = f"/api/v1/markets/{market_id}"
headers = self._sign_request("GET", path)
response = self.session.get(
self.base_url + path, headers=headers
)
response.raise_for_status()
return response.json()
def place_order(self, market_id: str, side: str,
quantity: float, price: float,
order_type: str = "LIMIT") -> dict:
path = "/api/v1/orders"
import json
body = json.dumps({
"market_id": market_id,
"side": side,
"quantity": quantity,
"price": price,
"order_type": order_type,
})
headers = self._sign_request("POST", path, body)
response = self.session.post(
self.base_url + path, headers=headers, data=body
)
response.raise_for_status()
return response.json()
def cancel_order(self, order_id: str) -> dict:
path = f"/api/v1/orders/{order_id}"
headers = self._sign_request("DELETE", path)
response = self.session.delete(
self.base_url + path, headers=headers
)
response.raise_for_status()
return response.json()
def get_order_status(self, order_id: str) -> dict:
path = f"/api/v1/orders/{order_id}"
headers = self._sign_request("GET", path)
response = self.session.get(
self.base_url + path, headers=headers
)
response.raise_for_status()
return response.json()
def get_positions(self) -> list:
path = "/api/v1/positions"
headers = self._sign_request("GET", path)
response = self.session.get(
self.base_url + path, headers=headers
)
response.raise_for_status()
return response.json()
def get_balance(self) -> dict:
path = "/api/v1/balance"
headers = self._sign_request("GET", path)
response = self.session.get(
self.base_url + path, headers=headers
)
response.raise_for_status()
return response.json()
19.3.2 Order State Machine
An order goes through a well-defined sequence of states. Understanding this state machine is critical for correct order management:
┌─────────┐
│ CREATED │
└────┬────┘
│
┌────v────┐
┌────│SUBMITTED│────┐
│ └────┬────┘ │
│ │ │
┌────v───┐ │ ┌────v────┐
│REJECTED│ │ │CANCELED │
└────────┘ │ └─────────┘
│
┌──────v──────┐
│ PARTIALLY │───────┐
│ FILLED │ │
└──────┬──────┘ │
│ ┌────v────┐
│ │CANCELED │
┌────v───┐ │(partial)│
│ FILLED │ └─────────┘
└────────┘
Key rules:
- An order can only be canceled if it is in SUBMITTED or PARTIALLY_FILLED state.
- A REJECTED order was never live in the market -- no risk was taken.
- A PARTIALLY_FILLED order that is then canceled results in a partial position.
- Never assume an order is in a state -- always check with the platform.
19.3.3 The OrderManager Class
The OrderManager tracks all orders through their lifecycle:
import uuid
from datetime import datetime
from dataclasses import dataclass, field
from typing import Dict, Optional, List
from enum import Enum
import threading
import logging
class OrderState(Enum):
CREATED = "CREATED"
SUBMITTED = "SUBMITTED"
PARTIALLY_FILLED = "PARTIALLY_FILLED"
FILLED = "FILLED"
CANCELED = "CANCELED"
REJECTED = "REJECTED"
@dataclass
class Order:
"""Tracks the full lifecycle of a single order."""
internal_id: str
market_id: str
side: str # "BUY" or "SELL"
quantity: float
price: float
order_type: str = "LIMIT"
state: OrderState = OrderState.CREATED
exchange_id: Optional[str] = None
filled_quantity: float = 0.0
average_fill_price: float = 0.0
created_at: datetime = field(default_factory=datetime.utcnow)
submitted_at: Optional[datetime] = None
filled_at: Optional[datetime] = None
canceled_at: Optional[datetime] = None
fills: List[dict] = field(default_factory=list)
error_message: str = ""
@property
def remaining_quantity(self) -> float:
return self.quantity - self.filled_quantity
@property
def is_terminal(self) -> bool:
return self.state in (
OrderState.FILLED, OrderState.CANCELED, OrderState.REJECTED
)
def add_fill(self, qty: float, price: float, timestamp: datetime):
self.fills.append({
"quantity": qty, "price": price, "timestamp": timestamp
})
total_cost = self.average_fill_price * self.filled_quantity
self.filled_quantity += qty
total_cost += price * qty
self.average_fill_price = total_cost / self.filled_quantity
if abs(self.filled_quantity - self.quantity) < 1e-9:
self.state = OrderState.FILLED
self.filled_at = timestamp
else:
self.state = OrderState.PARTIALLY_FILLED
class OrderManager:
"""Manages the lifecycle of all orders."""
def __init__(self, api_client, event_bus):
self.api_client = api_client
self.event_bus = event_bus
self.orders: Dict[str, Order] = {}
self._lock = threading.Lock()
self.logger = logging.getLogger("OrderManager")
def create_order(self, market_id: str, side: str,
quantity: float, price: float,
order_type: str = "LIMIT") -> Order:
internal_id = str(uuid.uuid4())[:8]
order = Order(
internal_id=internal_id,
market_id=market_id,
side=side,
quantity=quantity,
price=price,
order_type=order_type,
)
with self._lock:
self.orders[internal_id] = order
self.logger.info(
f"Created order {internal_id}: {side} {quantity} "
f"{market_id} @ {price}"
)
return order
def submit_order(self, internal_id: str) -> bool:
with self._lock:
order = self.orders.get(internal_id)
if order is None:
self.logger.error(f"Order {internal_id} not found")
return False
if order.state != OrderState.CREATED:
self.logger.error(
f"Cannot submit order {internal_id} in state {order.state}"
)
return False
try:
response = self.api_client.place_order(
market_id=order.market_id,
side=order.side,
quantity=order.quantity,
price=order.price,
order_type=order.order_type,
)
order.exchange_id = response.get("order_id")
order.state = OrderState.SUBMITTED
order.submitted_at = datetime.utcnow()
self.logger.info(
f"Submitted order {internal_id} -> exchange {order.exchange_id}"
)
self.event_bus.publish("order_submitted", order)
return True
except Exception as e:
order.state = OrderState.REJECTED
order.error_message = str(e)
self.logger.error(f"Order {internal_id} rejected: {e}")
self.event_bus.publish("order_rejected", order)
return False
def cancel_order(self, internal_id: str) -> bool:
with self._lock:
order = self.orders.get(internal_id)
if order is None or order.is_terminal:
return False
if order.exchange_id is None:
order.state = OrderState.CANCELED
order.canceled_at = datetime.utcnow()
return True
try:
self.api_client.cancel_order(order.exchange_id)
order.state = OrderState.CANCELED
order.canceled_at = datetime.utcnow()
self.logger.info(f"Canceled order {internal_id}")
self.event_bus.publish("order_canceled", order)
return True
except Exception as e:
self.logger.error(f"Failed to cancel {internal_id}: {e}")
return False
def cancel_all(self, market_id: Optional[str] = None):
"""Cancel all open orders, optionally filtered by market."""
with self._lock:
open_orders = [
o for o in self.orders.values()
if not o.is_terminal
and (market_id is None or o.market_id == market_id)
]
canceled = 0
for order in open_orders:
if self.cancel_order(order.internal_id):
canceled += 1
self.logger.info(
f"Canceled {canceled}/{len(open_orders)} open orders"
)
return canceled
def sync_order_status(self, internal_id: str):
"""Sync local order state with the exchange."""
with self._lock:
order = self.orders.get(internal_id)
if order is None or order.exchange_id is None:
return
try:
status = self.api_client.get_order_status(order.exchange_id)
exchange_state = status.get("status", "").upper()
if exchange_state == "FILLED" and order.state != OrderState.FILLED:
fill_qty = float(status.get("filled_quantity", 0))
fill_price = float(status.get("average_price", 0))
if fill_qty > order.filled_quantity:
new_qty = fill_qty - order.filled_quantity
order.add_fill(new_qty, fill_price, datetime.utcnow())
self.event_bus.publish("order_filled", order)
elif exchange_state == "CANCELED":
order.state = OrderState.CANCELED
order.canceled_at = datetime.utcnow()
self.event_bus.publish("order_canceled", order)
except Exception as e:
self.logger.error(
f"Failed to sync order {internal_id}: {e}"
)
def get_open_orders(self, market_id: Optional[str] = None) -> List[Order]:
with self._lock:
return [
o for o in self.orders.values()
if not o.is_terminal
and (market_id is None or o.market_id == market_id)
]
def get_order_summary(self) -> dict:
with self._lock:
summary = {}
for state in OrderState:
summary[state.value] = sum(
1 for o in self.orders.values() if o.state == state
)
return summary
19.3.4 Handling API Errors Gracefully
API errors fall into several categories, each requiring different handling:
| Error Type | HTTP Code | Handling |
|---|---|---|
| Authentication | 401, 403 | Stop trading, alert operator |
| Rate limit | 429 | Back off, retry after delay |
| Bad request | 400 | Log, do not retry (fix the bug) |
| Server error | 500, 502, 503 | Retry with backoff |
| Timeout | -- | Retry, but check order status first |
| Network error | -- | Retry with backoff |
The most dangerous error is a timeout on an order submission. You do not know whether the order was placed or not. Before retrying, you must check whether the order exists on the exchange. Blindly retrying could result in duplicate orders.
def safe_submit_order(order_manager, order, max_retries=3):
"""Submit an order with safe retry logic."""
for attempt in range(max_retries):
try:
success = order_manager.submit_order(order.internal_id)
if success:
return True
# Submission failed but no exception -- order was rejected
return False
except TimeoutError:
# DANGER: order may or may not have been placed
# Check exchange before retrying
if order.exchange_id:
order_manager.sync_order_status(order.internal_id)
if order.state == OrderState.SUBMITTED:
return True # Order was placed
time.sleep(2 ** attempt)
except Exception as e:
logging.error(f"Submit attempt {attempt + 1} failed: {e}")
time.sleep(2 ** attempt)
return False
19.4 Rate Limiting and Reliability
19.4.1 Understanding Rate Limits
Every platform imposes rate limits to prevent abuse and ensure fair access. Common structures:
- Requests per second (RPS). Example: 10 requests per second.
- Requests per minute (RPM). Example: 300 requests per minute.
- Weighted limits. Different endpoints have different costs. A market data query might cost 1 point, while an order submission costs 5 points, from a budget of 100 points per minute.
- Burst limits. You can send up to N requests in a burst, then must slow down.
Exceeding rate limits typically results in HTTP 429 responses and possibly temporary bans.
19.4.2 Token Bucket Rate Limiter
The token bucket algorithm is the standard approach to rate limiting on the client side:
import time
import threading
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, rate: float, burst: int = 1):
"""
Args:
rate: Number of tokens added per second.
burst: Maximum number of tokens (bucket size).
"""
self.rate = rate
self.burst = burst
self.tokens = float(burst)
self.last_refill = time.monotonic()
self._lock = threading.Lock()
def _refill(self):
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
def acquire(self, tokens: int = 1, timeout: float = 30.0) -> bool:
"""
Acquire tokens, blocking until available or timeout.
Returns True if tokens were acquired, False on timeout.
"""
deadline = time.monotonic() + timeout
while True:
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
if time.monotonic() >= deadline:
return False
# Sleep for the estimated time until enough tokens
with self._lock:
wait_time = (tokens - self.tokens) / self.rate
time.sleep(min(wait_time, 0.1))
def try_acquire(self, tokens: int = 1) -> bool:
"""Try to acquire tokens without blocking."""
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
19.4.3 Retry Logic with Exponential Backoff
When requests fail, retry with increasing delays to avoid overwhelming the server:
import random
import functools
import logging
class RetryHandler:
"""Configurable retry logic with exponential backoff and jitter."""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retryable_exceptions: tuple = (
ConnectionError, TimeoutError, IOError
),
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
self.retryable_exceptions = retryable_exceptions
self.logger = logging.getLogger("RetryHandler")
def calculate_delay(self, attempt: int) -> float:
"""Calculate delay for a given attempt number."""
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)
if self.jitter:
delay *= random.uniform(0.5, 1.5)
return delay
def execute(self, func, *args, **kwargs):
"""Execute a function with retry logic."""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except self.retryable_exceptions as e:
last_exception = e
if attempt < self.max_retries:
delay = self.calculate_delay(attempt)
self.logger.warning(
f"Attempt {attempt + 1}/{self.max_retries + 1} "
f"failed: {e}. Retrying in {delay:.1f}s"
)
time.sleep(delay)
else:
self.logger.error(
f"All {self.max_retries + 1} attempts failed: {e}"
)
raise last_exception
def decorator(self, func):
"""Use as a decorator for automatic retries."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
return self.execute(func, *args, **kwargs)
return wrapper
19.4.4 Circuit Breakers
A circuit breaker prevents your system from repeatedly hitting a failing service. After a threshold of failures, the circuit "opens" and all requests fail immediately, giving the service time to recover:
import time
import threading
from enum import Enum
class CircuitState(Enum):
CLOSED = "CLOSED" # Normal operation
OPEN = "OPEN" # Failing, reject all requests
HALF_OPEN = "HALF_OPEN" # Testing if service recovered
class CircuitBreaker:
"""Circuit breaker pattern for API reliability."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
success_threshold: int = 2,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0.0
self._lock = threading.Lock()
self.logger = logging.getLogger("CircuitBreaker")
def can_execute(self) -> bool:
with self._lock:
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if time.monotonic() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
self.logger.info("Circuit half-open, testing...")
return True
return False
else: # HALF_OPEN
return True
def record_success(self):
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.logger.info("Circuit closed (recovered)")
else:
self.failure_count = 0
def record_failure(self):
with self._lock:
self.failure_count += 1
self.last_failure_time = time.monotonic()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.logger.warning("Circuit re-opened (recovery failed)")
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.logger.warning(
f"Circuit opened after {self.failure_count} failures"
)
def execute(self, func, *args, **kwargs):
if not self.can_execute():
raise ConnectionError(
"Circuit breaker is OPEN -- service unavailable"
)
try:
result = func(*args, **kwargs)
self.record_success()
return result
except Exception as e:
self.record_failure()
raise
19.4.5 Connection Loss Recovery
Network connections fail. Your bot must handle disconnections gracefully:
- Detect the failure. Monitor heartbeats or response times.
- Move to a safe state. Cancel outstanding orders if possible.
- Attempt reconnection. With exponential backoff.
- Reconcile state. After reconnecting, sync your local state with the exchange.
- Resume operation. Only after state is consistent.
class ConnectionMonitor:
"""Monitors API connection health via heartbeat."""
def __init__(self, api_client, check_interval: float = 10.0,
max_failures: int = 3):
self.api_client = api_client
self.check_interval = check_interval
self.max_failures = max_failures
self.consecutive_failures = 0
self.is_connected = True
self._callbacks_on_disconnect = []
self._callbacks_on_reconnect = []
self._running = False
self.logger = logging.getLogger("ConnMonitor")
def on_disconnect(self, callback):
self._callbacks_on_disconnect.append(callback)
def on_reconnect(self, callback):
self._callbacks_on_reconnect.append(callback)
def start(self):
self._running = True
self._thread = threading.Thread(
target=self._monitor_loop, daemon=True
)
self._thread.start()
def stop(self):
self._running = False
def _monitor_loop(self):
while self._running:
try:
# Simple health check -- fetch balance or server time
self.api_client.get_balance()
self.consecutive_failures = 0
if not self.is_connected:
self.is_connected = True
self.logger.info("Connection restored")
for cb in self._callbacks_on_reconnect:
try:
cb()
except Exception as e:
self.logger.error(f"Reconnect callback error: {e}")
except Exception as e:
self.consecutive_failures += 1
self.logger.warning(
f"Health check failed ({self.consecutive_failures}/"
f"{self.max_failures}): {e}"
)
if (self.consecutive_failures >= self.max_failures
and self.is_connected):
self.is_connected = False
self.logger.error("Connection lost!")
for cb in self._callbacks_on_disconnect:
try:
cb()
except Exception as exc:
self.logger.error(
f"Disconnect callback error: {exc}"
)
time.sleep(self.check_interval)
19.5 Pre-Trade Risk Checks
19.5.1 Why Pre-Trade Checks Matter
A single bad trade can wipe out months of profits. Pre-trade risk checks are your last line of defense between a trading signal and a live order. They should be:
- Always on. Never bypass risk checks, even "just this once."
- Conservative. When in doubt, reject the trade.
- Fast. They run on every order, so they must not add significant latency.
- Logged. Every rejection should be recorded with the reason.
19.5.2 Essential Pre-Trade Checks
Position limits. No single position should exceed a maximum size, both in absolute terms and as a percentage of your portfolio.
$$\text{Position Check: } |q_{\text{current}} + q_{\text{new}}| \leq q_{\max}$$
Daily loss limits. If your realized + unrealized losses for the day exceed a threshold, stop trading.
$$\text{Daily Loss Check: } \text{PnL}_{\text{today}} + \text{UnrealizedPnL} > -L_{\max}$$
Order size limits. No single order should be larger than a maximum size.
$$\text{Order Size Check: } q_{\text{order}} \leq q_{\text{order\_max}}$$
Concentration limits. Your exposure to any single category or correlated group of markets should be bounded.
$$\text{Concentration Check: } \sum_{i \in \text{category}} |V_i| \leq V_{\text{category\_max}}$$
Liquidity checks. Do not place orders that are a large fraction of the market's daily volume or visible depth.
$$\text{Liquidity Check: } q_{\text{order}} \leq \alpha \cdot \text{ADV}$$
where $\alpha$ is typically 0.01 to 0.10 (1-10% of average daily volume).
Price reasonableness. An order at a price far from the current market is probably a bug.
$$\text{Price Check: } |p_{\text{order}} - p_{\text{market}}| \leq \delta_{\max}$$
Fat finger check. An order whose notional value is unusually large relative to typical orders should trigger additional confirmation.
19.5.3 Python PreTradeRiskManager
from dataclasses import dataclass
from typing import Dict, Optional, Tuple
import logging
@dataclass
class RiskLimits:
"""Configuration for all pre-trade risk limits."""
max_position_size: float = 1000.0 # Max shares in any market
max_position_value: float = 5000.0 # Max dollar value per position
max_order_size: float = 200.0 # Max shares per order
max_daily_loss: float = 500.0 # Max daily loss before halt
max_portfolio_exposure: float = 20000.0 # Max total portfolio exposure
max_category_exposure: float = 5000.0 # Max exposure per category
max_price_deviation: float = 0.15 # Max deviation from market price
max_volume_fraction: float = 0.05 # Max fraction of daily volume
min_spread: float = 0.001 # Min bid-ask spread to trade
class PreTradeRiskManager:
"""Validates all orders against risk limits before execution."""
def __init__(self, limits: RiskLimits, position_tracker, event_bus):
self.limits = limits
self.position_tracker = position_tracker
self.event_bus = event_bus
self.daily_pnl = 0.0
self.daily_order_count = 0
self.is_halted = False
self.halt_reason = ""
self.logger = logging.getLogger("RiskManager")
# Subscribe to signals
self.event_bus.subscribe("signal", self.on_signal)
def on_signal(self, signal):
"""Process a trading signal through risk checks."""
# Convert signal to order request
order_request = OrderRequest(
market_id=signal.market_id,
side=signal.side,
quantity=self._calculate_order_size(signal),
price=signal.target_price,
signal=signal,
)
approved, reason = self.check_order(order_request)
if approved:
self.logger.info(
f"Risk approved: {order_request.side.value} "
f"{order_request.quantity} {order_request.market_id}"
)
self.event_bus.publish("order_request", order_request)
else:
self.logger.warning(
f"Risk rejected: {order_request.side.value} "
f"{order_request.quantity} {order_request.market_id} "
f"-- {reason}"
)
self.event_bus.publish("risk_rejection", {
"order": order_request, "reason": reason
})
def _calculate_order_size(self, signal) -> float:
"""Size the order based on signal confidence and limits."""
base_size = self.limits.max_order_size * signal.confidence
return round(base_size, 2)
def check_order(self, order: OrderRequest) -> Tuple[bool, str]:
"""
Run all pre-trade risk checks.
Returns (approved, reason).
"""
checks = [
self._check_halt_status,
self._check_order_size,
self._check_position_limit,
self._check_daily_loss,
self._check_portfolio_exposure,
self._check_price_reasonableness,
self._check_liquidity,
]
for check in checks:
approved, reason = check(order)
if not approved:
return False, reason
return True, "All checks passed"
def _check_halt_status(self, order: OrderRequest) -> Tuple[bool, str]:
if self.is_halted:
return False, f"Trading halted: {self.halt_reason}"
return True, ""
def _check_order_size(self, order: OrderRequest) -> Tuple[bool, str]:
if order.quantity > self.limits.max_order_size:
return False, (
f"Order size {order.quantity} exceeds limit "
f"{self.limits.max_order_size}"
)
if order.quantity <= 0:
return False, "Order quantity must be positive"
return True, ""
def _check_position_limit(self, order: OrderRequest) -> Tuple[bool, str]:
current_position = self.position_tracker.get_position(
order.market_id
)
current_qty = current_position.get("quantity", 0) if current_position else 0
if order.side == Side.BUY:
new_qty = current_qty + order.quantity
else:
new_qty = current_qty - order.quantity
if abs(new_qty) > self.limits.max_position_size:
return False, (
f"Resulting position {new_qty} exceeds limit "
f"{self.limits.max_position_size}"
)
new_value = abs(new_qty * order.price)
if new_value > self.limits.max_position_value:
return False, (
f"Resulting position value ${new_value:.2f} exceeds limit "
f"${self.limits.max_position_value:.2f}"
)
return True, ""
def _check_daily_loss(self, order: OrderRequest) -> Tuple[bool, str]:
total_pnl = (
self.daily_pnl
+ self.position_tracker.get_unrealized_pnl()
)
if total_pnl < -self.limits.max_daily_loss:
self.halt_trading(
f"Daily loss limit reached: ${total_pnl:.2f}"
)
return False, f"Daily loss ${total_pnl:.2f} exceeds limit"
return True, ""
def _check_portfolio_exposure(
self, order: OrderRequest
) -> Tuple[bool, str]:
current_exposure = self.position_tracker.get_total_exposure()
additional_exposure = order.quantity * order.price
new_exposure = current_exposure + additional_exposure
if new_exposure > self.limits.max_portfolio_exposure:
return False, (
f"Portfolio exposure ${new_exposure:.2f} exceeds limit "
f"${self.limits.max_portfolio_exposure:.2f}"
)
return True, ""
def _check_price_reasonableness(
self, order: OrderRequest
) -> Tuple[bool, str]:
# Prediction market prices should be between 0 and 1
if not (0.0 < order.price < 1.0):
return False, (
f"Price {order.price} outside valid range (0, 1)"
)
return True, ""
def _check_liquidity(self, order: OrderRequest) -> Tuple[bool, str]:
# This would check against known market volume
# Simplified implementation
return True, ""
def halt_trading(self, reason: str):
self.is_halted = True
self.halt_reason = reason
self.logger.critical(f"TRADING HALTED: {reason}")
self.event_bus.publish("trading_halted", {"reason": reason})
def resume_trading(self):
self.is_halted = False
self.halt_reason = ""
self.logger.info("Trading resumed")
self.event_bus.publish("trading_resumed", {})
def reset_daily_limits(self):
"""Call at the start of each trading day."""
self.daily_pnl = 0.0
self.daily_order_count = 0
if self.is_halted and "daily" in self.halt_reason.lower():
self.resume_trading()
self.logger.info("Daily limits reset")
19.5.4 The Pre-Trade Checklist
Beyond automated checks, maintain a manual pre-trading checklist:
- Are all systems running and healthy?
- Is the data feed current (not stale)?
- Have daily risk limits been reset?
- Are there any known platform maintenance windows?
- Are there any major news events that could cause extreme volatility?
- Is your internet connection stable?
- Do you have a way to manually cancel all orders if the bot fails?
19.6 Execution Quality
19.6.1 Measuring Execution Quality
Execution quality is the difference between the price you intended to trade at and the price you actually traded at. The key metrics:
Slippage is the difference between your target price and your actual fill price:
$$\text{Slippage} = \begin{cases} p_{\text{fill}} - p_{\text{target}} & \text{for buys} \\ p_{\text{target}} - p_{\text{fill}} & \text{for sells} \end{cases}$$
Positive slippage means you got a worse price than expected. In prediction markets where contracts trade between 0 and 1, even a cent of slippage is significant for many strategies.
Fill rate is the percentage of orders that get filled:
$$\text{Fill Rate} = \frac{\text{Number of orders filled}}{\text{Number of orders submitted}}$$
For limit orders, fill rate trades off against execution price. More aggressive limits fill more often but at worse prices.
Latency is the time between deciding to trade and receiving a fill:
$$\text{Latency} = t_{\text{fill}} - t_{\text{signal}}$$
This includes signal processing time, order construction, API request time, exchange processing, and fill reporting.
Implementation shortfall captures the total cost of your execution process versus the theoretical ideal:
$$\text{IS} = \frac{V_{\text{paper}} - V_{\text{actual}}}{V_{\text{paper}}}$$
where $V_{\text{paper}}$ is the P&L if you had traded at the signal price and $V_{\text{actual}}$ is your real P&L.
19.6.2 Improving Execution Quality
Limit orders vs. market orders. In prediction markets, always prefer limit orders. Market orders in thin books can experience extreme slippage. A limit order guarantees your price but not your fill.
Order splitting. For larger orders, split into smaller pieces to reduce market impact:
$$q_i = \frac{Q}{n}, \quad i = 1, 2, \ldots, n$$
where $Q$ is the total desired quantity and $n$ is the number of slices. In practice, use a participation-rate approach:
$$q_i = \alpha \cdot V_{\text{interval}}$$
where $V_{\text{interval}}$ is the market's volume in the interval and $\alpha$ is your target participation rate (typically 5-10%).
Timing. Some prediction markets have predictable volume patterns. Trading during high-volume periods typically results in better execution.
Passive vs. aggressive pricing. Posting at the best bid (for buys) is passive and earns the spread but may not fill. Crossing the spread to trade at the ask is aggressive and fills immediately but pays the spread.
The optimal approach depends on your time horizon and signal decay:
- If your signal is valid for hours, be passive and earn the spread.
- If your signal decays in minutes, be aggressive and accept slippage.
19.6.3 Execution Quality Analyzer
from dataclasses import dataclass, field
from typing import List, Dict
from datetime import datetime, timedelta
import statistics
@dataclass
class ExecutionRecord:
"""Record of a single execution for quality analysis."""
order_id: str
market_id: str
side: str
intended_price: float
fill_price: float
quantity: float
signal_time: datetime
submit_time: datetime
fill_time: datetime
was_filled: bool = True
@property
def slippage(self) -> float:
if self.side == "BUY":
return self.fill_price - self.intended_price
else:
return self.intended_price - self.fill_price
@property
def slippage_bps(self) -> float:
"""Slippage in basis points."""
if self.intended_price == 0:
return 0
return (self.slippage / self.intended_price) * 10000
@property
def total_latency_ms(self) -> float:
return (self.fill_time - self.signal_time).total_seconds() * 1000
@property
def submission_latency_ms(self) -> float:
return (self.submit_time - self.signal_time).total_seconds() * 1000
@property
def exchange_latency_ms(self) -> float:
return (self.fill_time - self.submit_time).total_seconds() * 1000
class ExecutionQualityAnalyzer:
"""Analyzes execution quality across all trades."""
def __init__(self):
self.records: List[ExecutionRecord] = []
self.rejected_count = 0 # Orders never submitted
self.unfilled_count = 0 # Orders submitted but not filled
self.logger = logging.getLogger("ExecQuality")
def add_record(self, record: ExecutionRecord):
self.records.append(record)
def record_unfilled(self):
self.unfilled_count += 1
def record_rejected(self):
self.rejected_count += 1
def get_summary(self, lookback_hours: float = 24) -> dict:
"""Get execution quality summary for recent trades."""
cutoff = datetime.utcnow() - timedelta(hours=lookback_hours)
recent = [
r for r in self.records if r.fill_time > cutoff
]
if not recent:
return {"message": "No recent trades to analyze"}
slippages = [r.slippage for r in recent]
slippages_bps = [r.slippage_bps for r in recent]
latencies = [r.total_latency_ms for r in recent]
total_orders = len(recent) + self.unfilled_count
return {
"period_hours": lookback_hours,
"num_trades": len(recent),
"fill_rate": len(recent) / total_orders if total_orders > 0 else 0,
"slippage": {
"mean": statistics.mean(slippages),
"median": statistics.median(slippages),
"std": statistics.stdev(slippages) if len(slippages) > 1 else 0,
"worst": max(slippages),
"best": min(slippages),
"mean_bps": statistics.mean(slippages_bps),
},
"latency_ms": {
"mean": statistics.mean(latencies),
"median": statistics.median(latencies),
"p95": sorted(latencies)[int(0.95 * len(latencies))]
if len(latencies) > 1 else latencies[0],
"max": max(latencies),
},
"implementation_shortfall": self._calc_implementation_shortfall(
recent
),
}
def _calc_implementation_shortfall(
self, records: List[ExecutionRecord]
) -> float:
"""
Calculate implementation shortfall as the fraction of
theoretical P&L lost to execution costs.
"""
paper_pnl = 0.0
actual_pnl = 0.0
for r in records:
if r.side == "BUY":
# For a buy, we wanted to buy at intended_price
# Theoretical profit if we sell at 1 (market resolves YES)
paper_pnl += (1.0 - r.intended_price) * r.quantity
actual_pnl += (1.0 - r.fill_price) * r.quantity
else:
paper_pnl += r.intended_price * r.quantity
actual_pnl += r.fill_price * r.quantity
if paper_pnl == 0:
return 0.0
return (paper_pnl - actual_pnl) / paper_pnl
def get_by_market(self) -> Dict[str, dict]:
"""Break down execution quality by market."""
by_market = {}
for r in self.records:
if r.market_id not in by_market:
by_market[r.market_id] = []
by_market[r.market_id].append(r)
summaries = {}
for market_id, records in by_market.items():
slippages = [r.slippage for r in records]
summaries[market_id] = {
"num_trades": len(records),
"mean_slippage": statistics.mean(slippages),
"total_slippage_cost": sum(
r.slippage * r.quantity for r in records
),
}
return summaries
def print_report(self, lookback_hours: float = 24):
"""Print a formatted execution quality report."""
summary = self.get_summary(lookback_hours)
if "message" in summary:
print(summary["message"])
return
print(f"\n{'='*60}")
print(f" Execution Quality Report (last {lookback_hours}h)")
print(f"{'='*60}")
print(f" Trades: {summary['num_trades']}")
print(f" Fill Rate: {summary['fill_rate']:.1%}")
print(f"\n Slippage:")
s = summary["slippage"]
print(f" Mean: {s['mean']:.4f} ({s['mean_bps']:.1f} bps)")
print(f" Median: {s['median']:.4f}")
print(f" Worst: {s['worst']:.4f}")
print(f" Best: {s['best']:.4f}")
print(f"\n Latency (ms):")
lat = summary["latency_ms"]
print(f" Mean: {lat['mean']:.0f}")
print(f" Median: {lat['median']:.0f}")
print(f" p95: {lat['p95']:.0f}")
print(f" Max: {lat['max']:.0f}")
print(f"\n Implementation Shortfall: "
f"{summary['implementation_shortfall']:.2%}")
print(f"{'='*60}\n")
19.7 Position Tracking and P&L Monitoring
19.7.1 Real-Time Position Tracking
Your position tracker is the single source of truth for what you own. It must be:
- Accurate. Every fill must be recorded. Missing a fill means your risk calculations are wrong.
- Reconciled. Periodically compare your local positions with the exchange's records.
- Real-time. P&L and exposure should update immediately on every fill and price change.
19.7.2 P&L Calculations
For prediction market contracts, P&L has two components:
Unrealized P&L for an open position:
$$\text{Unrealized PnL} = \begin{cases} (p_{\text{current}} - p_{\text{avg\_entry}}) \times q & \text{for long positions} \\ (p_{\text{avg\_entry}} - p_{\text{current}}) \times |q| & \text{for short positions} \end{cases}$$
Realized P&L from closed trades:
$$\text{Realized PnL} = \sum_{\text{closed trades}} (p_{\text{exit}} - p_{\text{entry}}) \times q_{\text{closed}} - \text{fees}$$
For prediction markets specifically, there is an additional component: resolution P&L. When a market resolves, YES contracts are worth $1 and NO contracts are worth $0 (or vice versa). This can produce large P&L events:
$$\text{Resolution PnL} = \begin{cases} (1 - p_{\text{avg\_entry}}) \times q & \text{if you hold YES and market resolves YES} \\ -p_{\text{avg\_entry}} \times q & \text{if you hold YES and market resolves NO} \end{cases}$$
19.7.3 Python PositionTracker
from dataclasses import dataclass, field
from typing import Dict, Optional, List
from datetime import datetime
import threading
import logging
@dataclass
class Position:
"""Tracks a position in a single market."""
market_id: str
quantity: float = 0.0 # Positive = long, negative = short
average_entry_price: float = 0.0
total_cost: float = 0.0 # Total capital deployed
realized_pnl: float = 0.0
total_fees: float = 0.0
last_price: float = 0.0
trades: List[dict] = field(default_factory=list)
@property
def is_open(self) -> bool:
return abs(self.quantity) > 1e-9
@property
def unrealized_pnl(self) -> float:
if not self.is_open:
return 0.0
if self.quantity > 0:
return (self.last_price - self.average_entry_price) * self.quantity
else:
return (self.average_entry_price - self.last_price) * abs(self.quantity)
@property
def total_pnl(self) -> float:
return self.realized_pnl + self.unrealized_pnl - self.total_fees
@property
def market_value(self) -> float:
return abs(self.quantity * self.last_price)
@property
def exposure(self) -> float:
"""Dollar exposure of this position."""
return abs(self.quantity * self.last_price)
def apply_fill(self, side: str, quantity: float, price: float,
fees: float = 0.0, timestamp: datetime = None):
"""Apply a fill to this position."""
self.total_fees += fees
timestamp = timestamp or datetime.utcnow()
fill_qty = quantity if side == "BUY" else -quantity
# Check if this fill reduces or increases the position
if self.quantity == 0:
# Opening a new position
self.quantity = fill_qty
self.average_entry_price = price
self.total_cost = abs(fill_qty) * price
elif (self.quantity > 0 and fill_qty > 0) or \
(self.quantity < 0 and fill_qty < 0):
# Adding to existing position
total_cost = abs(self.quantity) * self.average_entry_price
total_cost += abs(fill_qty) * price
self.quantity += fill_qty
self.average_entry_price = total_cost / abs(self.quantity)
self.total_cost = total_cost
else:
# Reducing or flipping position
close_qty = min(abs(fill_qty), abs(self.quantity))
if self.quantity > 0:
# Closing long position
self.realized_pnl += (price - self.average_entry_price) * close_qty
else:
# Closing short position
self.realized_pnl += (self.average_entry_price - price) * close_qty
remaining_new = abs(fill_qty) - close_qty
self.quantity += fill_qty
if abs(self.quantity) < 1e-9:
# Position fully closed
self.quantity = 0.0
self.average_entry_price = 0.0
self.total_cost = 0.0
elif remaining_new > 0:
# Position flipped
self.average_entry_price = price
self.total_cost = abs(self.quantity) * price
self.trades.append({
"side": side,
"quantity": quantity,
"price": price,
"fees": fees,
"timestamp": timestamp,
})
class PositionTracker:
"""Tracks all positions and provides portfolio-level metrics."""
def __init__(self, event_bus):
self.event_bus = event_bus
self.positions: Dict[str, Position] = {}
self._lock = threading.Lock()
self.logger = logging.getLogger("PositionTracker")
# Subscribe to events
self.event_bus.subscribe("order_filled", self._on_fill)
self.event_bus.subscribe("market_update", self._on_price_update)
def _on_fill(self, order):
"""Handle a filled order."""
with self._lock:
if order.market_id not in self.positions:
self.positions[order.market_id] = Position(
market_id=order.market_id
)
position = self.positions[order.market_id]
position.apply_fill(
side=order.side,
quantity=order.filled_quantity,
price=order.average_fill_price,
)
self.logger.info(
f"Position updated: {order.market_id} "
f"qty={position.quantity:.2f} "
f"avg_entry={position.average_entry_price:.4f} "
f"unrealized={position.unrealized_pnl:.2f}"
)
self.event_bus.publish("position_update", position)
def _on_price_update(self, update: MarketUpdate):
"""Update last price for P&L calculations."""
with self._lock:
if update.market_id in self.positions:
mid_price = None
if update.best_bid and update.best_ask:
mid_price = (update.best_bid + update.best_ask) / 2
elif update.last_price:
mid_price = update.last_price
if mid_price is not None:
self.positions[update.market_id].last_price = mid_price
def get_position(self, market_id: str) -> Optional[dict]:
with self._lock:
pos = self.positions.get(market_id)
if pos is None:
return None
return {
"market_id": pos.market_id,
"quantity": pos.quantity,
"average_entry_price": pos.average_entry_price,
"unrealized_pnl": pos.unrealized_pnl,
"realized_pnl": pos.realized_pnl,
"total_pnl": pos.total_pnl,
"market_value": pos.market_value,
}
def get_all_positions(self) -> List[dict]:
with self._lock:
return [
self.get_position(mid)
for mid in self.positions
if self.positions[mid].is_open
]
def get_unrealized_pnl(self) -> float:
with self._lock:
return sum(
p.unrealized_pnl for p in self.positions.values()
)
def get_realized_pnl(self) -> float:
with self._lock:
return sum(
p.realized_pnl for p in self.positions.values()
)
def get_total_pnl(self) -> float:
with self._lock:
return sum(p.total_pnl for p in self.positions.values())
def get_total_exposure(self) -> float:
with self._lock:
return sum(p.exposure for p in self.positions.values())
def get_portfolio_summary(self) -> dict:
with self._lock:
open_positions = [
p for p in self.positions.values() if p.is_open
]
return {
"num_open_positions": len(open_positions),
"total_exposure": sum(p.exposure for p in open_positions),
"unrealized_pnl": sum(
p.unrealized_pnl for p in open_positions
),
"realized_pnl": sum(
p.realized_pnl for p in self.positions.values()
),
"total_pnl": sum(
p.total_pnl for p in self.positions.values()
),
"total_fees": sum(
p.total_fees for p in self.positions.values()
),
"largest_position": max(
(p.exposure for p in open_positions), default=0
),
}
def reconcile(self, exchange_positions: List[dict]):
"""
Compare local positions with exchange records.
Returns list of discrepancies.
"""
discrepancies = []
with self._lock:
exchange_by_market = {
p["market_id"]: p for p in exchange_positions
}
# Check each local position against exchange
for market_id, local_pos in self.positions.items():
if not local_pos.is_open:
continue
exchange_pos = exchange_by_market.get(market_id)
if exchange_pos is None:
discrepancies.append({
"market_id": market_id,
"type": "local_only",
"local_qty": local_pos.quantity,
"exchange_qty": 0,
})
elif abs(local_pos.quantity - exchange_pos["quantity"]) > 1e-6:
discrepancies.append({
"market_id": market_id,
"type": "quantity_mismatch",
"local_qty": local_pos.quantity,
"exchange_qty": exchange_pos["quantity"],
})
# Check for exchange positions we don't have locally
for market_id, ex_pos in exchange_by_market.items():
if market_id not in self.positions or \
not self.positions[market_id].is_open:
if abs(ex_pos["quantity"]) > 1e-6:
discrepancies.append({
"market_id": market_id,
"type": "exchange_only",
"local_qty": 0,
"exchange_qty": ex_pos["quantity"],
})
if discrepancies:
self.logger.warning(
f"Reconciliation found {len(discrepancies)} discrepancies"
)
for d in discrepancies:
self.logger.warning(f" {d}")
else:
self.logger.info("Reconciliation: all positions match")
return discrepancies
19.7.4 Portfolio Dashboard
A portfolio dashboard should display at minimum:
- Current positions with entry prices, current prices, and P&L.
- Total portfolio P&L (realized + unrealized).
- Total exposure and utilization of risk limits.
- Open orders and their status.
- Recent trades with execution quality metrics.
In a production system, this would be a web dashboard or terminal UI. The get_portfolio_summary() method provides the data; presentation is left to the deployment environment.
19.8 Logging, Monitoring, and Alerts
19.8.1 Structured Logging for Trading Systems
Trading system logs serve three purposes:
- Debugging. When something goes wrong, logs are your primary tool for understanding what happened.
- Audit trail. Every order decision should be traceable from signal to execution.
- Analysis. Post-trade analysis relies on complete, structured logs.
Standard print statements are insufficient. Use structured logging with consistent fields:
import logging
import json
from datetime import datetime
from typing import Optional
class TradingLogger:
"""Structured logging for trading systems."""
def __init__(self, name: str, log_file: str = "trading.log",
level: int = logging.DEBUG):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# File handler with structured JSON output
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(JsonFormatter())
# Console handler with human-readable output
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def log_signal(self, signal, extra: dict = None):
data = {
"event": "signal",
"market_id": signal.market_id,
"side": signal.side.value,
"target_price": signal.target_price,
"confidence": signal.confidence,
"model": signal.model_name,
}
if extra:
data.update(extra)
self.logger.info(json.dumps(data))
def log_order(self, order, action: str, extra: dict = None):
data = {
"event": f"order_{action}",
"internal_id": order.internal_id,
"exchange_id": order.exchange_id,
"market_id": order.market_id,
"side": order.side,
"quantity": order.quantity,
"price": order.price,
"state": order.state.value,
}
if extra:
data.update(extra)
self.logger.info(json.dumps(data))
def log_fill(self, fill_data: dict):
data = {"event": "fill"}
data.update(fill_data)
self.logger.info(json.dumps(data))
def log_risk_rejection(self, order, reason: str):
data = {
"event": "risk_rejection",
"market_id": order.market_id,
"side": order.side.value if hasattr(order.side, 'value') else order.side,
"quantity": order.quantity,
"price": order.price,
"reason": reason,
}
self.logger.warning(json.dumps(data))
def log_error(self, component: str, error: str,
extra: dict = None):
data = {
"event": "error",
"component": component,
"error": error,
}
if extra:
data.update(extra)
self.logger.error(json.dumps(data))
def log_pnl_snapshot(self, portfolio_summary: dict):
data = {"event": "pnl_snapshot"}
data.update(portfolio_summary)
self.logger.info(json.dumps(data))
class JsonFormatter(logging.Formatter):
"""Formats log records as JSON lines."""
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
19.8.2 Critical Alerts
Some events require immediate human attention. Your monitoring system should alert on:
| Alert Level | Condition | Response |
|---|---|---|
| CRITICAL | Trading halted due to risk limit | Investigate immediately |
| CRITICAL | Connection lost for > 1 minute | Check infrastructure |
| CRITICAL | Position reconciliation mismatch | Resolve before continuing |
| HIGH | Daily loss > 50% of limit | Review strategy |
| HIGH | Fill rate < 50% | Check order pricing |
| HIGH | Unusual slippage | Check market conditions |
| MEDIUM | Order rejected by exchange | Review order parameters |
| MEDIUM | API rate limit hit | Adjust polling frequency |
| LOW | Partial fill on order | Normal, informational |
import smtplib
from email.mime.text import MIMEText
from typing import List
class AlertManager:
"""Sends alerts via multiple channels."""
def __init__(self, config: dict):
self.config = config
self.alert_history: List[dict] = []
self.logger = logging.getLogger("AlertManager")
# Rate limit alerts to avoid spam
self._last_alert_time: Dict[str, float] = {}
self._alert_cooldown = 300 # 5 minutes between same alerts
def alert(self, level: str, title: str, message: str,
alert_type: str = "general"):
"""Send an alert if not in cooldown."""
now = time.time()
key = f"{level}:{alert_type}"
if key in self._last_alert_time:
if now - self._last_alert_time[key] < self._alert_cooldown:
return # In cooldown
self._last_alert_time[key] = now
self.alert_history.append({
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"title": title,
"message": message,
})
self.logger.warning(f"ALERT [{level}]: {title} - {message}")
# Send to configured channels
if level in ("CRITICAL", "HIGH"):
self._send_email(level, title, message)
# Could also send to Slack, SMS, etc.
def _send_email(self, level: str, title: str, message: str):
"""Send alert via email."""
if "email" not in self.config:
return
try:
msg = MIMEText(
f"Level: {level}\n"
f"Time: {datetime.utcnow().isoformat()}\n\n"
f"{message}"
)
msg["Subject"] = f"[Trading Alert - {level}] {title}"
msg["From"] = self.config["email"]["from"]
msg["To"] = self.config["email"]["to"]
with smtplib.SMTP(
self.config["email"]["smtp_host"],
self.config["email"]["smtp_port"],
) as server:
server.starttls()
server.login(
self.config["email"]["username"],
self.config["email"]["password"],
)
server.send_message(msg)
except Exception as e:
self.logger.error(f"Failed to send alert email: {e}")
19.8.3 Monitoring Dashboard Metrics
Track these metrics continuously:
- System health: CPU usage, memory, network latency.
- Data freshness: Age of most recent market data per watched market.
- Order pipeline: Orders pending, submitted, filled in the last hour.
- P&L curve: Real-time chart of cumulative P&L.
- Risk utilization: Current exposure vs. limits, as a percentage.
- Error rate: Number of errors per component per hour.
19.9 Trading Psychology and Discipline
19.9.1 Why Psychology Matters Even with a Bot
You might think that automating your trading eliminates psychological factors. It does not. You still face psychological pressure at several points:
- Deciding to start or stop the bot. After a losing streak, the temptation to turn off the bot is enormous -- right when the edge may be about to reassert itself.
- Modifying parameters. "If I just widen the spread a bit, I'll catch more trades." This kind of on-the-fly adjustment, driven by recent results rather than analysis, is a form of emotional trading.
- Overriding the bot. "The bot wants to sell, but I feel like this market is going up." Overriding a systematic strategy with discretionary judgment undermines the entire point of systematic trading.
- Sizing decisions. After a big win, the temptation to increase size. After a big loss, the temptation to reduce size or add to losers. Both can be destructive.
19.9.2 Common Emotional Pitfalls
FOMO (Fear of Missing Out). You see a market moving rapidly and want to jump in, even though your model has no signal. Or your bot did not act on a signal in time, and you want to manually chase the move. FOMO trades are almost always losers because you are reacting to past price action, not to an informational edge.
Revenge trading. After a loss, you feel the need to "make it back" immediately. This leads to larger position sizes, more aggressive pricing, and trading in markets where you have no edge.
Tilt. Borrowed from poker, tilt is a state of emotional compromised decision-making. Signs of tilt: - You are angry about recent losses. - You are trading more frequently than usual. - You are not following your pre-defined rules. - You are checking your P&L every few minutes. - You feel that the market is "against" you personally.
Overconfidence after wins. A winning streak feels like skill; a losing streak feels like bad luck. In reality, both are often noise. Overconfidence leads to: - Increasing position sizes beyond what the strategy dictates. - Trading in new markets without proper research. - Relaxing risk limits.
Anchoring. You bought at $0.60 and the price dropped to $0.45. You hold because "it will come back to my entry price." Your entry price is irrelevant to whether the current price represents value. The market does not know or care what you paid.
19.9.3 Building Emotional Discipline
Pre-commitment. Write down your trading rules before you start. Specify: - Position sizing formula. - Entry and exit criteria. - Maximum loss before stopping for the day. - Conditions for overriding the bot (there should be very few).
The cooling-off rule. After any manual intervention, wait at least 1 hour before making another manual change. This prevents cascading emotional decisions.
Regular review cadence. Review performance weekly or monthly, not daily. Daily review amplifies noise and emotional reactions.
Separate monitoring from decision-making. It is fine to monitor your bot frequently. But any decision to change parameters, add markets, or modify risk limits should happen during scheduled review sessions, not in real-time.
19.9.4 When to Override the Bot
There are legitimate reasons to intervene:
- Technical failure. The bot is clearly malfunctioning -- submitting orders at wrong prices, not canceling orders it should, etc.
- Stale data. The data feed is stuck and the bot is trading on old information.
- External information. You have reliable information that the market will undergo a structural change (platform rule change, market cancellation, etc.) that the bot cannot know about.
- Risk limit breach. An external event has changed the risk profile of your positions in a way the bot's risk checks do not capture.
Illegitimate reasons to intervene:
- "I feel like this market is going to move."
- "The bot missed a trade I would have taken."
- "The bot is losing money so it must be broken."
- "This position is too big / too small for my comfort."
19.9.5 The Decision Journal
Keep a decision journal that records every manual intervention and its outcome:
Date: 2025-03-15
Action: Manually closed position in Market XYZ
Reason: "Felt the model was wrong about this market"
Outcome: Market resolved as model predicted. Lost $47 in
opportunity cost.
Lesson: Trust the model unless I have specific new information.
After 50 entries, review the journal. Most traders discover that their manual interventions have negative expected value. This data provides the conviction to trust the system.
19.10 Paper Trading and Simulation
19.10.1 Building a Paper Trading Environment
A paper trading engine simulates order execution against live market data. It should be realistic enough to validate your strategy but honest about its limitations:
What paper trading validates: - Data pipeline correctness - Signal generation logic - Risk check behavior - Order management lifecycle - System stability over time
What paper trading does not validate: - Market impact (your orders are not hitting the real book) - Fill probability (real fills depend on queue position and order flow) - Psychological responses (no real money at risk) - Latency effects (simulated fills are instantaneous)
19.10.2 Realistic Simulation
To make paper trading more realistic, simulate these effects:
Order book dynamics. Do not assume limit orders fill at their price. Simulate fills based on whether the market price crosses your limit:
$$\text{Buy limit fills if: } p_{\text{market}} \leq p_{\text{limit}}$$ $$\text{Sell limit fills if: } p_{\text{market}} \geq p_{\text{limit}}$$
Partial fills. Not all orders fill completely. Simulate partial fills based on market volume:
$$q_{\text{filled}} = \min\left(q_{\text{order}}, \alpha \cdot V_{\text{interval}}\right)$$
where $\alpha$ is a participation rate (typically 5-20% of available volume).
Slippage. Add random slippage to simulate market impact:
$$p_{\text{fill}} = p_{\text{limit}} + \epsilon, \quad \epsilon \sim \mathcal{N}(0, \sigma_{\text{slip}})$$
where $\sigma_{\text{slip}}$ is calibrated from actual execution data (start with 0.5-1% of the bid-ask spread).
Latency. Add simulated latency between signal generation and order "submission":
$$t_{\text{submit}} = t_{\text{signal}} + \Delta t, \quad \Delta t \sim \text{LogNormal}(\mu_{\text{lat}}, \sigma_{\text{lat}})$$
19.10.3 Python Paper Trading Engine
import random
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import logging
@dataclass
class PaperOrder:
"""A simulated order in the paper trading engine."""
order_id: str
market_id: str
side: str
quantity: float
price: float
order_type: str = "LIMIT"
status: str = "SUBMITTED"
filled_quantity: float = 0.0
average_fill_price: float = 0.0
created_at: datetime = field(default_factory=datetime.utcnow)
fills: List[dict] = field(default_factory=list)
class PaperTradingEngine:
"""
Simulates order execution against live market data.
Drop-in replacement for the real API client.
"""
def __init__(
self,
initial_balance: float = 10000.0,
fill_probability: float = 0.8,
slippage_std: float = 0.002,
latency_mean_ms: float = 100.0,
partial_fill_probability: float = 0.2,
):
self.balance = initial_balance
self.initial_balance = initial_balance
self.fill_probability = fill_probability
self.slippage_std = slippage_std
self.latency_mean_ms = latency_mean_ms
self.partial_fill_probability = partial_fill_probability
self.orders: Dict[str, PaperOrder] = {}
self.positions: Dict[str, float] = {} # market_id -> quantity
self.entry_prices: Dict[str, float] = {}
self.trade_history: List[dict] = []
self.current_prices: Dict[str, dict] = {}
self._order_counter = 0
self.logger = logging.getLogger("PaperTrader")
def update_market_data(self, market_id: str, bid: float, ask: float,
last: float, volume: float = 0):
"""Feed live market data into the simulator."""
self.current_prices[market_id] = {
"best_bid": bid,
"best_ask": ask,
"last_price": last,
"volume_24h": volume,
}
# Check if any pending orders should fill
self._check_fills(market_id)
def get_market(self, market_id: str) -> dict:
"""API-compatible market data getter."""
return self.current_prices.get(market_id, {
"best_bid": None,
"best_ask": None,
"last_price": None,
"volume_24h": 0,
})
def place_order(self, market_id: str, side: str, quantity: float,
price: float, order_type: str = "LIMIT") -> dict:
"""Simulate placing an order."""
# Simulate latency
latency = random.lognormvariate(
3.0, 0.5 # ~100ms mean
) / 1000.0
self._order_counter += 1
order_id = f"PAPER-{self._order_counter:06d}"
# Check balance for buy orders
if side == "BUY":
cost = quantity * price
if cost > self.balance:
return {"order_id": order_id, "status": "REJECTED",
"reason": "Insufficient balance"}
order = PaperOrder(
order_id=order_id,
market_id=market_id,
side=side,
quantity=quantity,
price=price,
order_type=order_type,
)
self.orders[order_id] = order
self.logger.info(
f"Paper order placed: {order_id} {side} {quantity} "
f"{market_id} @ {price}"
)
# For market orders, fill immediately with slippage
if order_type == "MARKET":
self._fill_market_order(order)
else:
# Check if limit order can fill now
self._check_fills(market_id)
return {
"order_id": order_id,
"status": order.status,
}
def cancel_order(self, order_id: str) -> dict:
"""Simulate canceling an order."""
order = self.orders.get(order_id)
if order is None:
return {"status": "NOT_FOUND"}
if order.status in ("FILLED", "CANCELED"):
return {"status": order.status}
order.status = "CANCELED"
self.logger.info(f"Paper order canceled: {order_id}")
return {"status": "CANCELED"}
def get_order_status(self, order_id: str) -> dict:
"""Get status of a paper order."""
order = self.orders.get(order_id)
if order is None:
return {"status": "NOT_FOUND"}
return {
"order_id": order.order_id,
"status": order.status,
"filled_quantity": order.filled_quantity,
"average_price": order.average_fill_price,
}
def get_positions(self) -> list:
"""Get all open positions."""
return [
{
"market_id": mid,
"quantity": qty,
"entry_price": self.entry_prices.get(mid, 0),
}
for mid, qty in self.positions.items()
if abs(qty) > 1e-9
]
def get_balance(self) -> dict:
"""Get current balance."""
return {
"available": self.balance,
"total": self.balance + self._positions_value(),
}
def _positions_value(self) -> float:
"""Calculate total value of open positions."""
total = 0.0
for mid, qty in self.positions.items():
if mid in self.current_prices:
price = self.current_prices[mid].get("last_price", 0)
if price:
total += qty * price
return total
def _check_fills(self, market_id: str):
"""Check if any pending orders for this market should fill."""
market_data = self.current_prices.get(market_id)
if market_data is None:
return
for order in self.orders.values():
if (order.market_id != market_id or
order.status not in ("SUBMITTED", "PARTIALLY_FILLED")):
continue
should_fill = False
if order.side == "BUY":
# Buy limit fills if ask <= limit price
if market_data.get("best_ask") and \
market_data["best_ask"] <= order.price:
should_fill = True
else:
# Sell limit fills if bid >= limit price
if market_data.get("best_bid") and \
market_data["best_bid"] >= order.price:
should_fill = True
if should_fill:
# Random fill probability check
if random.random() < self.fill_probability:
self._simulate_fill(order, market_data)
def _fill_market_order(self, order: PaperOrder):
"""Fill a market order with slippage."""
market_data = self.current_prices.get(order.market_id, {})
if order.side == "BUY":
base_price = market_data.get("best_ask", order.price)
else:
base_price = market_data.get("best_bid", order.price)
slippage = abs(random.gauss(0, self.slippage_std))
if order.side == "BUY":
fill_price = base_price + slippage
else:
fill_price = base_price - slippage
fill_price = max(0.01, min(0.99, fill_price))
self._execute_fill(order, order.quantity, fill_price)
def _simulate_fill(self, order: PaperOrder, market_data: dict):
"""Simulate a fill with optional partial fill and slippage."""
remaining = order.quantity - order.filled_quantity
# Determine fill quantity (possible partial fill)
if random.random() < self.partial_fill_probability:
fill_qty = remaining * random.uniform(0.3, 0.8)
fill_qty = round(fill_qty, 2)
else:
fill_qty = remaining
# Add slippage
slippage = random.gauss(0, self.slippage_std)
if order.side == "BUY":
fill_price = order.price + abs(slippage)
else:
fill_price = order.price - abs(slippage)
fill_price = max(0.01, min(0.99, fill_price))
self._execute_fill(order, fill_qty, fill_price)
def _execute_fill(self, order: PaperOrder, quantity: float,
price: float):
"""Execute a fill and update positions."""
# Update order
prev_filled = order.filled_quantity
prev_cost = order.average_fill_price * prev_filled
order.filled_quantity += quantity
order.average_fill_price = (
(prev_cost + price * quantity) / order.filled_quantity
)
order.fills.append({
"quantity": quantity,
"price": price,
"timestamp": datetime.utcnow(),
})
if abs(order.filled_quantity - order.quantity) < 1e-9:
order.status = "FILLED"
else:
order.status = "PARTIALLY_FILLED"
# Update position
current_qty = self.positions.get(order.market_id, 0)
if order.side == "BUY":
new_qty = current_qty + quantity
self.balance -= quantity * price
else:
new_qty = current_qty - quantity
self.balance += quantity * price
self.positions[order.market_id] = new_qty
# Track entry price (simplified)
if abs(new_qty) > 1e-9:
self.entry_prices[order.market_id] = price
# Record trade
self.trade_history.append({
"timestamp": datetime.utcnow(),
"order_id": order.order_id,
"market_id": order.market_id,
"side": order.side,
"quantity": quantity,
"price": price,
})
self.logger.info(
f"Paper fill: {order.order_id} {order.side} {quantity} "
f"{order.market_id} @ {price:.4f}"
)
def get_performance_report(self) -> dict:
"""Generate a performance summary of paper trading."""
total_value = self.balance + self._positions_value()
total_return = (total_value - self.initial_balance) / self.initial_balance
num_trades = len(self.trade_history)
buys = [t for t in self.trade_history if t["side"] == "BUY"]
sells = [t for t in self.trade_history if t["side"] == "SELL"]
return {
"initial_balance": self.initial_balance,
"current_balance": self.balance,
"positions_value": self._positions_value(),
"total_value": total_value,
"total_return": total_return,
"num_trades": num_trades,
"num_buys": len(buys),
"num_sells": len(sells),
"num_open_positions": sum(
1 for q in self.positions.values() if abs(q) > 1e-9
),
}
19.10.4 Validating Paper Trading Results
Before trusting paper trading results, validate:
-
Compare with backtest. Paper trading results should be in the same ballpark as the backtest over the same period. Large divergences indicate a bug.
-
Check fill realism. What percentage of limit orders filled? If your paper trader fills 100% of limit orders, it is unrealistic. Real fill rates for non-aggressive limits are typically 40-70%.
-
Verify signal timing. Are signals generated at the same timestamps as you would expect from the backtest?
-
Run for sufficient duration. A few days of paper trading proves nothing. Run for at least the minimum duration needed to observe 50-100 trades.
19.11 Operational Best Practices
19.11.1 Deployment Checklist
Before deploying a trading bot to production:
Code Review - [ ] All code reviewed by at least one other person (or by your future self after a 48-hour break) - [ ] Unit tests pass with >90% coverage of critical paths - [ ] Integration tests pass against paper trading environment - [ ] No hardcoded credentials in source code - [ ] All configuration is externalized (config file or environment variables)
Risk Configuration - [ ] All risk limits set conservatively (you can always loosen later) - [ ] Daily loss limit is set and tested - [ ] Position limits are set per market and per portfolio - [ ] Kill switch is tested and accessible
Infrastructure - [ ] Server has reliable internet connection with failover - [ ] Monitoring and alerting is configured and tested - [ ] Log rotation is configured (logs can grow very fast) - [ ] Backup procedure for configuration and state files - [ ] Firewall rules restrict access to necessary services only
Operational - [ ] Runbook documents startup, shutdown, and emergency procedures - [ ] Contact information for platform support is accessible - [ ] Calendar marked with platform maintenance windows - [ ] Team members know how to shut down the bot in an emergency
19.11.2 API Key Security
API keys are the keys to your capital. Treat them accordingly:
Never commit API keys to version control. Use environment variables or encrypted configuration files. Even in private repositories, credentials in git history are a security risk.
import os
# CORRECT: Read from environment
API_KEY = os.environ.get("TRADING_API_KEY")
API_SECRET = os.environ.get("TRADING_API_SECRET")
if not API_KEY or not API_SECRET:
raise EnvironmentError(
"TRADING_API_KEY and TRADING_API_SECRET must be set"
)
Use the minimum necessary permissions. If the API supports permission scoping, grant only trading and reading permissions. Do not grant withdrawal permissions to a trading bot's API key.
IP whitelisting. If the platform supports it, restrict API key usage to your server's IP address. This prevents misuse even if the key is compromised.
Key rotation. Rotate API keys periodically (at least quarterly) and immediately if you suspect compromise.
Encrypted storage. If you must store keys in files, encrypt them:
from cryptography.fernet import Fernet
import json
class SecureConfig:
"""Encrypted configuration storage."""
def __init__(self, key_file: str = ".config_key"):
if os.path.exists(key_file):
with open(key_file, "rb") as f:
self.key = f.read()
else:
self.key = Fernet.generate_key()
with open(key_file, "wb") as f:
f.write(self.key)
os.chmod(key_file, 0o600) # Owner read/write only
self.cipher = Fernet(self.key)
def save_config(self, config: dict, filepath: str):
"""Encrypt and save configuration."""
data = json.dumps(config).encode()
encrypted = self.cipher.encrypt(data)
with open(filepath, "wb") as f:
f.write(encrypted)
def load_config(self, filepath: str) -> dict:
"""Load and decrypt configuration."""
with open(filepath, "rb") as f:
encrypted = f.read()
data = self.cipher.decrypt(encrypted)
return json.loads(data.decode())
19.11.3 Version Control for Strategies
Treat your trading strategies like production software:
- Use git (or another VCS) for all code and configuration.
- Tag deployments so you can always identify which version of the strategy was running at any point in time.
- Never modify production code directly. Make changes in a development branch, test in paper trading, then deploy.
- Keep a changelog that documents every modification to the strategy.
19.11.4 Disaster Recovery
Plan for these scenarios:
Server crash. Your bot should be able to restart and recover its state. This means: - Positions are stored persistently (not just in memory). - On startup, the bot reconciles local state with the exchange. - The bot cancels any stale orders from the previous session.
import json
import os
class StateManager:
"""Persists bot state for crash recovery."""
def __init__(self, state_file: str = "bot_state.json"):
self.state_file = state_file
self.backup_file = state_file + ".bak"
def save_state(self, state: dict):
"""Atomically save state to disk."""
# Write to backup first
with open(self.backup_file, "w") as f:
json.dump(state, f, indent=2, default=str)
# Then rename (atomic on most filesystems)
if os.path.exists(self.state_file):
os.remove(self.state_file)
os.rename(self.backup_file, self.state_file)
def load_state(self) -> dict:
"""Load state from disk."""
if os.path.exists(self.state_file):
with open(self.state_file, "r") as f:
return json.load(f)
elif os.path.exists(self.backup_file):
with open(self.backup_file, "r") as f:
return json.load(f)
return {}
def state_exists(self) -> bool:
return (os.path.exists(self.state_file) or
os.path.exists(self.backup_file))
API outage. If the trading platform is down: 1. Cancel all pending orders (if possible). 2. Record the current state. 3. Wait for recovery with exponential backoff. 4. On recovery, reconcile state before resuming trading.
Internet outage. Similar to API outage, but you also cannot cancel orders. This is why position size limits are critical -- they bound your worst-case exposure during outages.
Strategy bug. A bug that causes incorrect trading: 1. Detect via monitoring (unusual P&L, unexpected positions). 2. Stop the bot immediately. 3. Cancel all open orders. 4. Manually review all positions. 5. Fix the bug. 6. Re-run paper trading validation. 7. Deploy the fix with fresh paper trading before going live again.
19.11.5 Regular Maintenance Tasks
| Frequency | Task |
|---|---|
| Daily | Review P&L and risk metrics |
| Daily | Check for any alerts or errors in logs |
| Weekly | Review execution quality metrics |
| Weekly | Check for API changes or platform updates |
| Monthly | Full strategy performance review |
| Monthly | Review and update risk limits if needed |
| Quarterly | Rotate API keys |
| Quarterly | Review and update the disaster recovery plan |
19.12 Chapter Summary
This chapter has covered the complete operational stack for live trading in prediction markets. The key themes:
Architecture matters. A well-designed bot with clear component separation is easier to build, test, debug, and extend than a monolithic script. The event-driven architecture with separate data feed, signal generator, risk manager, order executor, and position tracker provides the right level of modularity.
Risk management is non-negotiable. Pre-trade risk checks are your last line of defense. They should never be bypassed, and they should be conservative. It is far better to miss a good trade than to take a catastrophic one.
Execution quality is edge. In prediction markets with thin liquidity, the difference between good and bad execution can be the difference between a profitable and an unprofitable strategy. Measure slippage, fill rates, and latency continuously.
Reliability is a feature. Rate limiting, retry logic, circuit breakers, and connection monitoring are not optional extras -- they are core requirements for any system that manages real capital.
Logging everything is essential. You cannot improve what you cannot measure. Structured logging with every signal, order, fill, and risk decision provides the foundation for all post-trade analysis.
Psychology is the weakest link. Even with a fully automated system, human judgment remains in the loop for critical decisions. Emotional discipline, pre-commitment to rules, and a decision journal help maintain rationality under pressure.
Paper trading is necessary but not sufficient. It validates your system but cannot validate execution quality, market impact, or your own psychological responses. Use it as a stage in a graduated deployment process.
Operations are ongoing. Live trading is not a "deploy and forget" activity. It requires daily monitoring, weekly review, and continuous improvement.
This chapter completes Part III of the book. You now have the full toolkit for developing, testing, and deploying trading strategies in prediction markets: from understanding market mechanics (Part I) and building quantitative models (Part II), through strategy development (Chapters 13-18) and now live execution.
What's Next
Part IV of the book shifts focus to Data Science and Advanced Analytics. With your trading infrastructure in place and generating data, we will explore how to extract deeper insights from trading data:
- Chapter 20: Data Collection and Web Scraping -- Gathering market data from platform APIs, building scrapers, and managing databases.
- Chapter 21: Exploratory Data Analysis of Market Data -- Analyzing price time-series, volume profiles, and detecting market regimes.
- Chapter 22: Statistical Modeling — Regression and Time Series -- Applying logistic regression, ARIMA, and walk-forward validation to market data.
- Chapter 23: Machine Learning for Probability Estimation -- Using XGBoost, neural networks, and calibration techniques to improve your forecasts.
The infrastructure you have built in this chapter -- the data pipelines, the logging, the position tracking -- will serve as the foundation for the data science work ahead. Every signal you generate, every trade you execute, and every outcome you observe is data that feeds back into model improvement.