> "Give me six hours to chop down a tree and I will spend the first four sharpening the axe."
In This Chapter
- 6.1 Python Environment Setup
- 6.2 Essential Libraries Overview
- 6.3 Jupyter Notebooks vs Scripts
- 6.4 Connecting to Prediction Market APIs
- 6.5 Building the Prediction Markets Utility Module
- 6.6 Data Storage and Management
- 6.7 Visualization Toolkit
- 6.8 Configuration and Secrets Management
- 6.9 Logging and Debugging
- 6.10 Testing Your Setup
- 6.11 Version Control with Git
- 6.12 Chapter Summary
- What's Next
Chapter 6: Setting Up Your Python Toolkit
"Give me six hours to chop down a tree and I will spend the first four sharpening the axe." — Attributed to Abraham Lincoln
In the preceding chapters, we explored probability theory, the mechanics of prediction markets, and how markets aggregate information into prices. We examined these ideas conceptually and mathematically. Now it is time to get our hands dirty. This chapter marks the transition from understanding prediction markets in the abstract to building the software tools that will let you interact with them directly.
A carpenter would not begin a project without organizing their workshop. A chef would not attempt a complex dish without mise en place — everything in its place. Similarly, we will not attempt to build trading strategies, analyze market data, or evaluate forecasting accuracy without first establishing a clean, reliable, and well-organized Python environment.
This chapter is deliberately thorough. You may be tempted to skip ahead to the more exciting material on strategy and analysis. Resist that temptation. Every hour you invest here will save you ten hours of frustration later. Debugging an API connection at 2 AM when a market is about to resolve is not where you want to discover that your environment is misconfigured.
By the end of this chapter, you will have:
- A properly configured Python environment with all necessary libraries
- A robust API client capable of connecting to major prediction market platforms
- A reusable utility module (
pmtools) that we will use throughout the rest of this book - A local database for storing market data
- A visualization toolkit tuned for prediction market analysis
- Proper configuration management, logging, and testing infrastructure
- Version control set up for your prediction market projects
Let us begin.
6.1 Python Environment Setup
Why Python?
Python has become the lingua franca of data analysis, quantitative finance, and machine learning. Its ecosystem of libraries — NumPy for numerical computing, pandas for data manipulation, matplotlib for visualization, and requests for HTTP communication — makes it the natural choice for prediction market work. More importantly, every major prediction market platform provides Python SDKs or has community-maintained Python wrappers for their APIs.
We will use Python 3.9 or later throughout this book. Python 3.9 introduced dictionary union operators and updated type hinting syntax that simplifies our code. If you are using Python 3.10 or later, you will have access to structural pattern matching and even more expressive type hints, but 3.9 is our minimum requirement.
Installing Python
On macOS:
The recommended approach is to use Homebrew:
brew install python@3.11
Verify the installation:
python3 --version
# Python 3.11.x
On Windows:
Download the official installer from python.org. During installation, check the box that says "Add Python to PATH" — this is critical. Alternatively, install via the Windows Store or use the winget package manager:
winget install Python.Python.3.11
Verify:
python --version
# Python 3.11.x
Note: On Windows, the command may be python rather than python3. Throughout this book, we will use python3 in examples; substitute python if that is what your system requires.
On Linux (Ubuntu/Debian):
sudo apt update
sudo apt install python3.11 python3.11-venv python3-pip
Virtual Environments: Isolation Is Essential
A virtual environment is an isolated Python installation that keeps your project's dependencies separate from the system Python and from other projects. This is not optional — it is a fundamental best practice.
Consider what happens without virtual environments: you install version 1.4 of a library for Project A, then Project B requires version 2.0 of the same library. You upgrade, and Project A breaks. Virtual environments eliminate this problem entirely.
Using venv (built into Python):
# Create a new virtual environment
python3 -m venv ~/prediction-markets-env
# Activate it
# On macOS/Linux:
source ~/prediction-markets-env/bin/activate
# On Windows:
# ~/prediction-markets-env/Scripts/activate
# Your prompt should now show the environment name
(prediction-markets-env) $
When activated, python and pip commands will use the virtual environment's copies, not the system ones. When you are done working, deactivate with:
deactivate
Using Conda:
Conda is popular in data science because it manages not just Python packages but also non-Python dependencies (like C libraries that NumPy relies on). If you prefer Conda:
# Install Miniconda (lighter than full Anaconda)
# Download from https://docs.conda.io/en/latest/miniconda.html
# Create environment
conda create -n prediction-markets python=3.11
# Activate
conda activate prediction-markets
# Deactivate
conda deactivate
Which should you choose? If you are comfortable with Python and pip, venv is simpler and has no additional dependencies. If you work heavily with scientific computing libraries or want an all-in-one solution, Conda is excellent. Both work well for our purposes.
Project Directory Structure
Organization matters. Here is the directory structure we will build throughout this book:
prediction-markets/
├── .env # API keys and secrets (NEVER commit this)
├── .gitignore # Files to exclude from version control
├── README.md # Project description
├── requirements.txt # Python dependencies
├── setup.py # Package configuration (optional)
│
├── pmtools/ # Our utility module (Section 6.5)
│ ├── __init__.py
│ ├── api_client.py # API client base class and implementations
│ ├── data_models.py # Data structures for markets, orders, etc.
│ ├── visualization.py # Plotting functions
│ ├── probability.py # Probability utilities
│ └── database.py # Database helper functions
│
├── notebooks/ # Jupyter notebooks for exploration
│ ├── 01_market_overview.ipynb
│ ├── 02_price_analysis.ipynb
│ └── ...
│
├── scripts/ # Production scripts
│ ├── collect_data.py # Data collection pipeline
│ ├── analyze_markets.py # Market analysis
│ └── ...
│
├── data/ # Data storage
│ ├── raw/ # Raw data from APIs
│ ├── processed/ # Cleaned and transformed data
│ └── markets.db # SQLite database
│
├── tests/ # Test files
│ ├── test_api_client.py
│ ├── test_data_models.py
│ └── ...
│
├── configs/ # Configuration files
│ └── logging.yaml # Logging configuration
│
└── logs/ # Log files
└── app.log
Create this structure now:
mkdir -p prediction-markets/{pmtools,notebooks,scripts,data/{raw,processed},tests,configs,logs}
touch prediction-markets/.env
touch prediction-markets/.gitignore
touch prediction-markets/requirements.txt
touch prediction-markets/pmtools/__init__.py
Managing Dependencies with requirements.txt
Our requirements.txt file specifies exact versions to ensure reproducibility:
# Core scientific computing
numpy>=1.24.0,<2.0.0
pandas>=2.0.0,<3.0.0
scipy>=1.10.0,<2.0.0
# Visualization
matplotlib>=3.7.0,<4.0.0
seaborn>=0.12.0,<1.0.0
# HTTP and API interaction
requests>=2.28.0,<3.0.0
httpx>=0.24.0,<1.0.0
# Data storage
sqlalchemy>=2.0.0,<3.0.0
# Configuration and environment
python-dotenv>=1.0.0,<2.0.0
pyyaml>=6.0,<7.0
# Jupyter
jupyterlab>=4.0.0,<5.0.0
ipywidgets>=8.0.0,<9.0.0
# Testing
pytest>=7.3.0,<9.0.0
# Type checking (development)
mypy>=1.3.0,<2.0.0
# Utilities
tqdm>=4.65.0,<5.0.0
python-dateutil>=2.8.0,<3.0.0
Install everything:
cd prediction-markets
pip install -r requirements.txt
A note on version pinning: we use compatible release specifiers (>=X.Y.Z,<X+1.0.0) rather than exact pins (==X.Y.Z). This allows minor and patch updates (bug fixes, security patches) while preventing major version changes that might break our code. For truly reproducible environments, you can generate an exact lock file:
pip freeze > requirements-lock.txt
6.2 Essential Libraries Overview
Let us examine each library we installed and understand its role in our prediction market toolkit.
NumPy: Numerical Computing Foundation
NumPy provides the array data structure and mathematical operations that underpin virtually all scientific Python. When we calculate expected values, perform Monte Carlo simulations, or compute portfolio statistics, NumPy does the heavy lifting.
import numpy as np
# Example: Calculate expected value of a prediction market position
probabilities = np.array([0.6, 0.4]) # Win/lose probabilities
payoffs = np.array([0.40, -0.60]) # Profit/loss for each outcome
expected_value = np.dot(probabilities, payoffs)
print(f"Expected value: ${expected_value:.2f}") # $0.00
pandas: Data Manipulation and Analysis
pandas is our primary tool for working with structured data. Market prices, trade histories, and portfolio positions are all naturally represented as DataFrames — two-dimensional labeled data structures that support powerful filtering, grouping, and transformation operations.
import pandas as pd
# Example: Load and analyze market price history
prices = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=100, freq='h'),
'yes_price': np.random.uniform(0.3, 0.7, 100).cumsum() / 100 + 0.4,
'volume': np.random.poisson(50, 100)
})
prices.set_index('timestamp', inplace=True)
# Rolling average price
prices['price_ma_24h'] = prices['yes_price'].rolling(24).mean()
print(prices.describe())
matplotlib and seaborn: Visualization
matplotlib is the foundational plotting library. seaborn builds on top of it with statistical visualizations and better default aesthetics. We will use both extensively for visualizing market prices, probability distributions, and calibration plots.
import matplotlib.pyplot as plt
import seaborn as sns
# Set a clean style for our work
sns.set_theme(style="whitegrid", palette="muted")
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(prices.index, prices['yes_price'], label='Yes Price', linewidth=1.5)
ax.plot(prices.index, prices['price_ma_24h'], label='24h Moving Average',
linewidth=2, linestyle='--')
ax.set_ylabel('Price ($)')
ax.set_title('Market Price History')
ax.legend()
plt.tight_layout()
plt.savefig('price_history.png', dpi=150)
SciPy: Scientific Computing
SciPy extends NumPy with optimization, statistics, and signal processing. We will use it for statistical tests (is a market well-calibrated?), optimization (what is the optimal portfolio?), and distribution fitting.
from scipy import stats
# Example: Test if market prices are well-calibrated
# (Are events priced at 70% actually happening 70% of the time?)
observed_frequency = 0.65 # 65% actually occurred
predicted_probability = 0.70
n_observations = 200
# Binomial test
result = stats.binomtest(
k=int(observed_frequency * n_observations),
n=n_observations,
p=predicted_probability
)
print(f"p-value: {result.pvalue:.4f}")
requests and httpx: HTTP Communication
requests is Python's most popular HTTP library — simple and reliable. httpx is a modern alternative that supports async operations, which matters when we need to poll multiple markets simultaneously.
import requests
import httpx
# Synchronous request with requests
response = requests.get(
"https://api.example.com/markets",
headers={"Authorization": "Bearer YOUR_API_KEY"},
timeout=30
)
# Async request with httpx (for concurrent API calls)
import asyncio
async def fetch_multiple_markets(market_ids):
async with httpx.AsyncClient() as client:
tasks = [
client.get(f"https://api.example.com/markets/{mid}")
for mid in market_ids
]
responses = await asyncio.gather(*tasks)
return responses
Installation Verification
Run this script to verify all libraries are installed correctly:
#!/usr/bin/env python3
"""Verify all required libraries are installed and report their versions."""
import sys
def check_library(name, import_name=None):
"""Check if a library is importable and return its version."""
import_name = import_name or name
try:
module = __import__(import_name)
version = getattr(module, '__version__', 'unknown')
return True, version
except ImportError:
return False, None
libraries = [
("numpy", "numpy"),
("pandas", "pandas"),
("matplotlib", "matplotlib"),
("seaborn", "seaborn"),
("scipy", "scipy"),
("requests", "requests"),
("httpx", "httpx"),
("sqlalchemy", "sqlalchemy"),
("dotenv", "dotenv"),
("yaml", "yaml"),
("pytest", "pytest"),
("tqdm", "tqdm"),
]
print(f"Python version: {sys.version}")
print(f"{'Library':<20} {'Status':<10} {'Version':<15}")
print("-" * 45)
all_ok = True
for display_name, import_name in libraries:
ok, version = check_library(display_name, import_name)
status = "OK" if ok else "MISSING"
version_str = version if version else "N/A"
if not ok:
all_ok = False
print(f"{display_name:<20} {status:<10} {version_str:<15}")
print()
if all_ok:
print("All libraries installed successfully!")
else:
print("Some libraries are missing. Run: pip install -r requirements.txt")
6.3 Jupyter Notebooks vs Scripts
One of the most common questions in data-oriented Python work is: when should I use a Jupyter notebook, and when should I write a plain Python script? The answer is straightforward once you understand the strengths of each.
When to Use Jupyter Notebooks
Notebooks excel at exploration and communication:
- Exploratory data analysis: When you are first examining a new market, investigating price patterns, or testing a hypothesis, notebooks let you execute code in small chunks, see results immediately, and iterate quickly.
- Visualization development: Building and refining charts is much faster when you can see the output inline.
- Documentation and explanation: Notebooks interleave code, output, and Markdown text, making them ideal for explaining your analysis to others (or to your future self).
- Prototyping: When you are not sure what approach will work, notebooks let you try things rapidly.
When to Use Scripts
Scripts excel at automation and reliability:
- Data collection pipelines: A script that runs every hour to fetch market data should be a
.pyfile, not a notebook. - Production trading logic: Anything that executes trades should be a properly tested script with error handling.
- Reusable modules: Code that multiple notebooks or scripts import should live in
.pyfiles. - Scheduled tasks: Cron jobs and task schedulers run scripts, not notebooks.
Setting Up Jupyter
If you installed from our requirements.txt, JupyterLab is already available:
# Launch JupyterLab
jupyter lab
# Or classic Jupyter Notebook
jupyter notebook
JupyterLab will open in your browser, typically at http://localhost:8888.
VS Code Integration
Visual Studio Code has excellent Jupyter support built in. With the Python extension installed, you can:
- Open
.ipynbfiles directly in VS Code - Create new notebooks from the command palette
- Run cells with
Shift+Enter - Use the variable explorer to inspect DataFrames
- Get full IntelliSense (autocomplete) in notebook cells
This is our recommended workflow: use VS Code as your primary editor for both scripts and notebooks. The integrated terminal, debugger, and Git support make it a powerful all-in-one environment.
Notebook Best Practices for Market Analysis
1. Start with imports and configuration at the top:
# Cell 1: Always the same structure
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
# Our utility module
import pmtools
# Configuration
sns.set_theme(style="whitegrid")
pd.set_option('display.max_columns', 50)
pd.set_option('display.float_format', '{:.4f}'.format)
%matplotlib inline
2. Use descriptive Markdown cells between code sections:
Do not write a notebook that is 50 consecutive code cells. Explain what you are doing and why.
3. Keep cells short and focused:
Each cell should do one thing. If a cell scrolls off the screen, it is too long.
4. Restart and run all before sharing:
Notebooks maintain state between cells, which means they can contain hidden dependencies on execution order. Before sharing a notebook or relying on its results, restart the kernel and run all cells from top to bottom. If it fails, fix it.
5. Never put secrets in notebooks:
Notebooks are often shared or committed to version control. Load API keys from environment variables:
import os
api_key = os.environ.get('POLYMARKET_API_KEY')
if not api_key:
raise ValueError("Set POLYMARKET_API_KEY environment variable")
6.4 Connecting to Prediction Market APIs
Now we arrive at the heart of this chapter: connecting to real prediction market platforms. Every major platform provides a REST API (and sometimes WebSocket connections for real-time data). The patterns are similar across platforms, even though the details differ.
Authentication Patterns
Prediction market APIs use several authentication methods:
API Key Authentication is the simplest. You register for an account, generate an API key, and include it in your HTTP headers:
headers = {
"Authorization": "Bearer your-api-key-here",
"Content-Type": "application/json"
}
response = requests.get("https://api.example.com/markets", headers=headers)
OAuth 2.0 is used by platforms that integrate with existing identity providers. The flow involves redirecting the user to a login page, receiving an authorization code, and exchanging it for an access token. This is more complex but more secure for applications that act on behalf of users.
Crypto Wallet Signing is used by blockchain-based platforms like Polymarket. Instead of a username/password, you prove your identity by signing a message with your Ethereum private key:
from eth_account.messages import encode_defunct
from web3 import Web3
w3 = Web3()
message = "Sign this message to authenticate with Polymarket"
encoded = encode_defunct(text=message)
signed = w3.eth.account.sign_message(encoded, private_key="0x...")
signature = signed.signature.hex()
Rate Limiting and Retry Logic
Every API enforces rate limits — restrictions on how many requests you can make per second or per minute. Exceeding these limits results in HTTP 429 (Too Many Requests) responses. A production-quality client must handle this gracefully.
The standard approach uses exponential backoff: when you receive a rate limit error, wait for a short time and retry. If it fails again, wait longer. The wait time grows exponentially (1 second, 2 seconds, 4 seconds, and so on) up to a maximum.
import time
import random
def request_with_retry(url, headers, max_retries=5, base_delay=1.0):
"""Make an HTTP request with exponential backoff retry logic."""
for attempt in range(max_retries):
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
# Rate limited — back off
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited. Waiting {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
elif response.status_code >= 500:
# Server error — retry
delay = base_delay * (2 ** attempt)
print(f"Server error {response.status_code}. Retrying in {delay:.1f}s")
time.sleep(delay)
else:
# Client error — do not retry
response.raise_for_status()
raise Exception(f"Failed after {max_retries} retries")
The addition of random.uniform(0, 1) is called jitter. If multiple clients hit the rate limit simultaneously and all retry after exactly the same delay, they will collide again. Jitter staggers the retries.
Error Handling
API calls can fail in many ways. Robust error handling distinguishes a toy project from a reliable tool:
import requests
from requests.exceptions import (
ConnectionError, Timeout, HTTPError, RequestException
)
def safe_api_call(url, headers=None, params=None):
"""Make an API call with comprehensive error handling."""
try:
response = requests.get(
url, headers=headers, params=params, timeout=30
)
response.raise_for_status()
return {"success": True, "data": response.json()}
except ConnectionError:
return {"success": False, "error": "Cannot connect to server"}
except Timeout:
return {"success": False, "error": "Request timed out"}
except HTTPError as e:
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}"
}
except RequestException as e:
return {"success": False, "error": f"Request failed: {str(e)}"}
except ValueError:
return {"success": False, "error": "Invalid JSON response"}
Building a Robust API Client Base Class
Rather than repeating retry logic, error handling, and authentication in every API call, we encapsulate it in a base class. This is a pattern from software engineering called the Template Method pattern: the base class defines the structure, and subclasses fill in the details.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Optional
import time
import logging
import requests
logger = logging.getLogger(__name__)
@dataclass
class APIResponse:
"""Standardized API response wrapper."""
success: bool
data: Optional[Any] = None
error: Optional[str] = None
status_code: Optional[int] = None
class PredictionMarketClient(ABC):
"""
Base class for prediction market API clients.
Provides retry logic, rate limiting, error handling,
and logging. Subclasses implement platform-specific details.
"""
def __init__(
self,
base_url: str,
api_key: Optional[str] = None,
max_retries: int = 3,
base_delay: float = 1.0,
requests_per_second: float = 5.0
):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.max_retries = max_retries
self.base_delay = base_delay
self.min_request_interval = 1.0 / requests_per_second
self._last_request_time = 0.0
self.session = requests.Session()
self._setup_session()
@abstractmethod
def _setup_session(self):
"""Configure session headers, authentication, etc."""
pass
@abstractmethod
def _parse_markets(self, raw_data: Any) -> list:
"""Parse platform-specific market data into standard format."""
pass
def _throttle(self):
"""Enforce rate limiting between requests."""
elapsed = time.time() - self._last_request_time
if elapsed < self.min_request_interval:
time.sleep(self.min_request_interval - elapsed)
self._last_request_time = time.time()
def _request(
self,
method: str,
endpoint: str,
params: Optional[dict] = None,
json_data: Optional[dict] = None
) -> APIResponse:
"""Make an HTTP request with retry logic and error handling."""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(self.max_retries):
self._throttle()
try:
response = self.session.request(
method=method,
url=url,
params=params,
json=json_data,
timeout=30
)
if response.status_code == 200:
return APIResponse(
success=True,
data=response.json(),
status_code=200
)
elif response.status_code == 429:
delay = self.base_delay * (2 ** attempt)
logger.warning(
f"Rate limited on {endpoint}. "
f"Retrying in {delay:.1f}s "
f"(attempt {attempt + 1}/{self.max_retries})"
)
time.sleep(delay)
elif response.status_code >= 500:
delay = self.base_delay * (2 ** attempt)
logger.warning(
f"Server error {response.status_code} on {endpoint}. "
f"Retrying in {delay:.1f}s"
)
time.sleep(delay)
else:
return APIResponse(
success=False,
error=response.text,
status_code=response.status_code
)
except requests.exceptions.ConnectionError:
logger.error(f"Connection error on {endpoint}")
if attempt < self.max_retries - 1:
time.sleep(self.base_delay * (2 ** attempt))
except requests.exceptions.Timeout:
logger.error(f"Timeout on {endpoint}")
if attempt < self.max_retries - 1:
time.sleep(self.base_delay)
return APIResponse(
success=False,
error=f"Failed after {self.max_retries} attempts"
)
def get(self, endpoint: str, params: Optional[dict] = None) -> APIResponse:
"""HTTP GET request."""
return self._request("GET", endpoint, params=params)
def post(self, endpoint: str, data: Optional[dict] = None) -> APIResponse:
"""HTTP POST request."""
return self._request("POST", endpoint, json_data=data)
def get_markets(self, **kwargs) -> APIResponse:
"""Fetch markets from the platform."""
response = self.get(self._markets_endpoint(), params=kwargs)
if response.success:
response.data = self._parse_markets(response.data)
return response
@abstractmethod
def _markets_endpoint(self) -> str:
"""Return the API endpoint for fetching markets."""
pass
We will build complete subclasses for specific platforms in the code examples (see example-02-api-client.py). The key insight here is that 90% of the work — retry logic, rate limiting, error handling, logging — is the same regardless of which platform you connect to. The base class captures that commonality.
6.5 Building the Prediction Markets Utility Module
Throughout this book, we will reference a utility module called pmtools. Rather than duplicating code in every chapter, we define it once here. This section walks through each component.
Module Structure
pmtools/
├── __init__.py # Package initialization, convenience imports
├── api_client.py # The base class from Section 6.4, plus implementations
├── data_models.py # Data classes for markets, orders, positions
├── visualization.py # Reusable plotting functions
├── probability.py # Probability calculations and conversions
└── database.py # SQLite database helper
__init__.py: Package Initialization
"""
pmtools — Prediction Markets Toolkit
=====================================
A utility library for prediction market analysis, used throughout
"Learning Prediction Markets: From Concepts to Strategies."
Usage:
import pmtools
client = pmtools.PolymarketClient(api_key="...")
markets = client.get_markets()
"""
__version__ = "0.1.0"
from .probability import (
implied_probability,
probability_to_price,
expected_value,
kelly_fraction,
brier_score,
log_score,
)
from .data_models import Market, Order, Position, TradeRecord
from .visualization import (
plot_price_history,
plot_probability_fan,
plot_calibration,
plot_volume_bars,
setup_plot_style,
)
from .database import MarketDatabase
probability.py: Probability Calculations
This module contains the mathematical functions we developed in Chapters 3 and 4, now implemented as clean, tested Python functions:
"""Probability calculations for prediction market analysis."""
import numpy as np
from typing import Union, Sequence
def implied_probability(
yes_price: float,
no_price: float | None = None
) -> float:
"""
Calculate implied probability from market prices.
In a simple binary market, the yes price IS the implied probability
if prices are normalized. With an overround (vig), we need to adjust.
Args:
yes_price: Price of a "Yes" share (0 to 1)
no_price: Price of a "No" share (0 to 1). If None, assumes 1 - yes_price.
Returns:
Implied probability (0 to 1)
"""
if no_price is None:
return yes_price
# Remove overround (vig)
total = yes_price + no_price
if total == 0:
return 0.5
return yes_price / total
def probability_to_price(
probability: float,
overround: float = 0.0
) -> tuple[float, float]:
"""
Convert a probability to yes/no prices with optional overround.
Args:
probability: True probability (0 to 1)
overround: Market overround/vig (e.g., 0.02 for 2%)
Returns:
Tuple of (yes_price, no_price)
"""
half_vig = overround / 2
yes_price = probability + half_vig
no_price = (1 - probability) + half_vig
return (yes_price, no_price)
def expected_value(
probability: float,
price: float,
side: str = "yes"
) -> float:
"""
Calculate expected value of a prediction market position.
Args:
probability: Your estimated true probability
price: Current market price
side: "yes" or "no"
Returns:
Expected value per dollar risked
"""
if side == "yes":
# Buy yes at `price`, win (1 - price) if yes, lose price if no
ev = probability * (1 - price) - (1 - probability) * price
else:
# Buy no at (1 - price), win price if no, lose (1 - price) if yes
ev = (1 - probability) * price - probability * (1 - price)
return ev
def kelly_fraction(
probability: float,
price: float,
side: str = "yes",
fractional: float = 1.0
) -> float:
"""
Calculate Kelly criterion bet size.
The Kelly criterion gives the optimal fraction of bankroll to wager
for maximum long-run growth rate.
Args:
probability: Your estimated true probability
price: Current market price
side: "yes" or "no"
fractional: Kelly fraction (0.5 = half-Kelly, more conservative)
Returns:
Fraction of bankroll to bet (0 means do not bet)
"""
if side == "yes":
# Odds: win (1-price)/price to 1
b = (1 - price) / price # Odds ratio
p = probability
else:
b = price / (1 - price)
p = 1 - probability
# Kelly formula: f* = (bp - q) / b where q = 1 - p
q = 1 - p
if b <= 0:
return 0.0
f = (b * p - q) / b
return max(0.0, f * fractional)
def brier_score(
probabilities: Sequence[float],
outcomes: Sequence[int]
) -> float:
"""
Calculate the Brier score for a set of predictions.
The Brier score measures the accuracy of probabilistic predictions.
Lower is better. Perfect predictions score 0, worst possible is 2.
Args:
probabilities: Predicted probabilities (0 to 1)
outcomes: Actual outcomes (0 or 1)
Returns:
Brier score (0 to 2)
"""
probs = np.array(probabilities)
outs = np.array(outcomes)
return float(np.mean((probs - outs) ** 2))
def log_score(
probabilities: Sequence[float],
outcomes: Sequence[int],
epsilon: float = 1e-10
) -> float:
"""
Calculate the logarithmic scoring rule.
More sensitive to confident wrong predictions than the Brier score.
More negative is worse. Perfect predictions approach 0.
Args:
probabilities: Predicted probabilities (0 to 1)
outcomes: Actual outcomes (0 or 1)
epsilon: Small value to avoid log(0)
Returns:
Mean log score (negative, closer to 0 is better)
"""
probs = np.clip(probabilities, epsilon, 1 - epsilon)
outs = np.array(outcomes)
scores = outs * np.log(probs) + (1 - outs) * np.log(1 - probs)
return float(np.mean(scores))
def overround(yes_price: float, no_price: float) -> float:
"""
Calculate the overround (vig) in a market.
A fair market has an overround of 0. Positive overround means
the market maker takes a cut.
Args:
yes_price: Price of Yes shares
no_price: Price of No shares
Returns:
Overround as a fraction (e.g., 0.02 = 2%)
"""
return yes_price + no_price - 1.0
data_models.py: Data Structures
We use Python dataclasses to define clean, typed data structures:
"""Data models for prediction market entities."""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class MarketStatus(Enum):
OPEN = "open"
CLOSED = "closed"
RESOLVED = "resolved"
class OrderSide(Enum):
YES = "yes"
NO = "no"
class OrderType(Enum):
MARKET = "market"
LIMIT = "limit"
@dataclass
class Market:
"""Represents a prediction market."""
id: str
title: str
description: str
status: MarketStatus
yes_price: float
no_price: float
volume: float
created_at: datetime
close_date: Optional[datetime] = None
resolved_at: Optional[datetime] = None
resolution: Optional[bool] = None # True=Yes, False=No, None=unresolved
category: str = ""
platform: str = ""
url: str = ""
@property
def implied_probability(self) -> float:
"""Implied probability from current prices."""
total = self.yes_price + self.no_price
if total == 0:
return 0.5
return self.yes_price / total
@property
def overround(self) -> float:
"""Market overround (vig)."""
return self.yes_price + self.no_price - 1.0
@property
def is_active(self) -> bool:
"""Whether the market is currently tradeable."""
return self.status == MarketStatus.OPEN
@dataclass
class Order:
"""Represents an order to buy or sell shares."""
market_id: str
side: OrderSide
order_type: OrderType
quantity: float
price: Optional[float] = None # Required for limit orders
timestamp: Optional[datetime] = None
def __post_init__(self):
if self.order_type == OrderType.LIMIT and self.price is None:
raise ValueError("Limit orders must have a price")
if self.timestamp is None:
self.timestamp = datetime.now()
@dataclass
class Position:
"""Represents a current position in a market."""
market_id: str
side: OrderSide
quantity: float
average_price: float
current_price: float = 0.0
@property
def unrealized_pnl(self) -> float:
"""Unrealized profit/loss."""
if self.side == OrderSide.YES:
return (self.current_price - self.average_price) * self.quantity
else:
return (self.average_price - self.current_price) * self.quantity
@property
def market_value(self) -> float:
"""Current market value of position."""
return self.current_price * self.quantity
@dataclass
class TradeRecord:
"""Record of an executed trade."""
trade_id: str
market_id: str
side: OrderSide
quantity: float
price: float
timestamp: datetime
fees: float = 0.0
@property
def total_cost(self) -> float:
"""Total cost including fees."""
return self.price * self.quantity + self.fees
visualization.py: Plotting Functions
Section 6.7 covers this in detail. The key functions are plot_price_history, plot_probability_fan, plot_calibration, and plot_volume_bars.
database.py: Database Helper
Section 6.6 covers this in detail. The MarketDatabase class wraps SQLite operations for storing and retrieving market data.
The complete pmtools module, assembled as a single reference file, is available in example-03-pmtools-module.py.
6.6 Data Storage and Management
Prediction market analysis generates data: price histories, trade records, portfolio snapshots, and analysis results. You need a reliable, structured way to store this data. We will use a three-tier approach:
- SQLite for structured, queryable data (market metadata, trade history)
- pandas DataFrames for in-memory analysis
- CSV/Parquet files for data exchange and archival
SQLite: Your Local Database
SQLite is a database engine that stores everything in a single file. It requires no server, no configuration, and no separate installation — Python includes it in the standard library. For a single-user analysis workflow, it is ideal.
Here is our database schema for market data:
-- Markets table: core market information
CREATE TABLE IF NOT EXISTS markets (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
platform TEXT NOT NULL,
category TEXT,
status TEXT DEFAULT 'open',
created_at TIMESTAMP,
close_date TIMESTAMP,
resolved_at TIMESTAMP,
resolution INTEGER, -- 1=Yes, 0=No, NULL=unresolved
url TEXT
);
-- Price snapshots: periodic price recordings
CREATE TABLE IF NOT EXISTS price_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
market_id TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
yes_price REAL NOT NULL,
no_price REAL NOT NULL,
volume REAL,
FOREIGN KEY (market_id) REFERENCES markets(id)
);
-- Trades: your personal trade history
CREATE TABLE IF NOT EXISTS trades (
id TEXT PRIMARY KEY,
market_id TEXT NOT NULL,
side TEXT NOT NULL, -- 'yes' or 'no'
quantity REAL NOT NULL,
price REAL NOT NULL,
timestamp TIMESTAMP NOT NULL,
fees REAL DEFAULT 0,
FOREIGN KEY (market_id) REFERENCES markets(id)
);
-- Create indexes for common queries
CREATE INDEX IF NOT EXISTS idx_price_snapshots_market
ON price_snapshots(market_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_trades_market
ON trades(market_id);
CREATE INDEX IF NOT EXISTS idx_trades_timestamp
ON trades(timestamp);
Python Database Helper
"""Database helper for prediction market data storage."""
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional
import pandas as pd
import logging
logger = logging.getLogger(__name__)
class MarketDatabase:
"""SQLite database wrapper for prediction market data."""
def __init__(self, db_path: str = "data/markets.db"):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _get_connection(self) -> sqlite3.Connection:
"""Create a database connection with proper settings."""
conn = sqlite3.connect(
str(self.db_path),
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") # Better concurrent access
conn.execute("PRAGMA foreign_keys=ON")
return conn
def _init_db(self):
"""Initialize database schema."""
conn = self._get_connection()
try:
conn.executescript("""
CREATE TABLE IF NOT EXISTS markets (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
platform TEXT NOT NULL,
category TEXT,
status TEXT DEFAULT 'open',
created_at TIMESTAMP,
close_date TIMESTAMP,
resolved_at TIMESTAMP,
resolution INTEGER,
url TEXT
);
CREATE TABLE IF NOT EXISTS price_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
market_id TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
yes_price REAL NOT NULL,
no_price REAL NOT NULL,
volume REAL,
FOREIGN KEY (market_id) REFERENCES markets(id)
);
CREATE TABLE IF NOT EXISTS trades (
id TEXT PRIMARY KEY,
market_id TEXT NOT NULL,
side TEXT NOT NULL,
quantity REAL NOT NULL,
price REAL NOT NULL,
timestamp TIMESTAMP NOT NULL,
fees REAL DEFAULT 0,
FOREIGN KEY (market_id) REFERENCES markets(id)
);
CREATE INDEX IF NOT EXISTS idx_snapshots_market_time
ON price_snapshots(market_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_trades_market
ON trades(market_id);
""")
conn.commit()
logger.info(f"Database initialized at {self.db_path}")
finally:
conn.close()
def save_market(self, market: dict):
"""Save or update a market record."""
conn = self._get_connection()
try:
conn.execute("""
INSERT OR REPLACE INTO markets
(id, title, description, platform, category, status,
created_at, close_date, resolved_at, resolution, url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
market['id'], market['title'], market.get('description', ''),
market.get('platform', 'unknown'), market.get('category', ''),
market.get('status', 'open'), market.get('created_at'),
market.get('close_date'), market.get('resolved_at'),
market.get('resolution'), market.get('url', '')
))
conn.commit()
finally:
conn.close()
def save_price_snapshot(
self,
market_id: str,
yes_price: float,
no_price: float,
volume: Optional[float] = None,
timestamp: Optional[datetime] = None
):
"""Record a price snapshot."""
if timestamp is None:
timestamp = datetime.now()
conn = self._get_connection()
try:
conn.execute("""
INSERT INTO price_snapshots
(market_id, timestamp, yes_price, no_price, volume)
VALUES (?, ?, ?, ?, ?)
""", (market_id, timestamp, yes_price, no_price, volume))
conn.commit()
finally:
conn.close()
def get_price_history(
self,
market_id: str,
start: Optional[datetime] = None,
end: Optional[datetime] = None
) -> pd.DataFrame:
"""Get price history as a pandas DataFrame."""
conn = self._get_connection()
try:
query = """
SELECT timestamp, yes_price, no_price, volume
FROM price_snapshots
WHERE market_id = ?
"""
params = [market_id]
if start:
query += " AND timestamp >= ?"
params.append(start)
if end:
query += " AND timestamp <= ?"
params.append(end)
query += " ORDER BY timestamp"
df = pd.read_sql_query(query, conn, params=params,
parse_dates=['timestamp'])
if not df.empty:
df.set_index('timestamp', inplace=True)
return df
finally:
conn.close()
def get_all_markets(self, status: Optional[str] = None) -> pd.DataFrame:
"""Get all markets as a DataFrame."""
conn = self._get_connection()
try:
query = "SELECT * FROM markets"
params = []
if status:
query += " WHERE status = ?"
params.append(status)
return pd.read_sql_query(query, conn, params=params)
finally:
conn.close()
def save_trade(self, trade: dict):
"""Record a trade."""
conn = self._get_connection()
try:
conn.execute("""
INSERT INTO trades (id, market_id, side, quantity, price,
timestamp, fees)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
trade['id'], trade['market_id'], trade['side'],
trade['quantity'], trade['price'], trade['timestamp'],
trade.get('fees', 0)
))
conn.commit()
finally:
conn.close()
def get_trades(self, market_id: Optional[str] = None) -> pd.DataFrame:
"""Get trade history as a DataFrame."""
conn = self._get_connection()
try:
if market_id:
query = "SELECT * FROM trades WHERE market_id = ? ORDER BY timestamp"
return pd.read_sql_query(query, conn, params=[market_id],
parse_dates=['timestamp'])
else:
query = "SELECT * FROM trades ORDER BY timestamp"
return pd.read_sql_query(query, conn, parse_dates=['timestamp'])
finally:
conn.close()
pandas DataFrames for Analysis
Once data is in a DataFrame, the full power of pandas is available:
# Load price history from database
db = MarketDatabase("data/markets.db")
prices = db.get_price_history("market-123")
# Calculate daily returns
prices['daily_return'] = prices['yes_price'].pct_change()
# Rolling volatility (20-period)
prices['volatility'] = prices['daily_return'].rolling(20).std()
# Find the biggest price moves
big_moves = prices[prices['daily_return'].abs() > 0.05]
print(f"Found {len(big_moves)} price moves > 5%")
CSV vs Parquet
For data exchange, CSV is universal but inefficient for large datasets. Parquet is a columnar format that is smaller (compressed), faster to read, and preserves data types. Use Parquet for internal storage and CSV when sharing with non-Python tools:
# Save to Parquet (recommended for internal use)
prices.to_parquet("data/processed/market_123_prices.parquet")
# Save to CSV (for sharing)
prices.to_csv("data/processed/market_123_prices.csv")
# Read back
prices_from_parquet = pd.read_parquet("data/processed/market_123_prices.parquet")
prices_from_csv = pd.read_csv("data/processed/market_123_prices.csv",
parse_dates=['timestamp'], index_col='timestamp')
6.7 Visualization Toolkit
Clear, consistent visualizations are essential for understanding market behavior. In this section, we establish the plotting style for the entire book and build reusable functions for the chart types we will use most frequently.
Style Configuration
We define a consistent visual style:
"""Visualization toolkit for prediction market analysis."""
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.ticker as mticker
import seaborn as sns
import numpy as np
import pandas as pd
from typing import Optional, Sequence
# Book style configuration
BOOK_STYLE = {
'figure.figsize': (10, 5),
'figure.dpi': 150,
'axes.titlesize': 14,
'axes.labelsize': 12,
'xtick.labelsize': 10,
'ytick.labelsize': 10,
'legend.fontsize': 10,
'lines.linewidth': 1.5,
'axes.grid': True,
'grid.alpha': 0.3,
'axes.spines.top': False,
'axes.spines.right': False,
}
# Color palette for prediction market charts
COLORS = {
'yes': '#2196F3', # Blue for Yes
'no': '#F44336', # Red for No
'neutral': '#9E9E9E', # Gray for reference lines
'positive': '#4CAF50', # Green for profit
'negative': '#FF5722', # Orange-red for loss
'highlight': '#FFC107', # Amber for highlights
}
def setup_plot_style():
"""Apply the book's standard plotting style."""
plt.rcParams.update(BOOK_STYLE)
sns.set_theme(style="whitegrid", palette="muted")
Price History Plot
The most common chart in prediction market analysis:
def plot_price_history(
df: pd.DataFrame,
title: str = "Market Price History",
yes_col: str = "yes_price",
no_col: Optional[str] = "no_price",
volume_col: Optional[str] = "volume",
show_events: Optional[list] = None,
figsize: tuple = (12, 6),
save_path: Optional[str] = None
) -> plt.Figure:
"""
Plot prediction market price history with optional volume bars.
Args:
df: DataFrame with DatetimeIndex and price columns
title: Chart title
yes_col: Column name for Yes prices
no_col: Column name for No prices (None to skip)
volume_col: Column name for volume (None to skip)
show_events: List of (datetime, label) tuples for event markers
figsize: Figure size
save_path: Path to save the figure (None for display only)
Returns:
matplotlib Figure object
"""
setup_plot_style()
has_volume = volume_col and volume_col in df.columns
nrows = 2 if has_volume else 1
height_ratios = [3, 1] if has_volume else [1]
fig, axes = plt.subplots(
nrows, 1, figsize=figsize,
gridspec_kw={'height_ratios': height_ratios},
sharex=True
)
if not has_volume:
axes = [axes]
# Price plot
ax_price = axes[0]
ax_price.plot(df.index, df[yes_col], color=COLORS['yes'],
label='Yes Price', linewidth=1.5)
if no_col and no_col in df.columns:
ax_price.plot(df.index, df[no_col], color=COLORS['no'],
label='No Price', linewidth=1.5, alpha=0.7)
# Reference line at 0.5
ax_price.axhline(y=0.5, color=COLORS['neutral'], linestyle='--',
alpha=0.5, label='50%')
ax_price.set_ylabel('Price ($)')
ax_price.set_title(title, fontsize=14, fontweight='bold')
ax_price.set_ylim(-0.02, 1.02)
ax_price.yaxis.set_major_formatter(mticker.FormatStrFormatter('$%.2f'))
ax_price.legend(loc='upper left')
# Add event markers
if show_events:
for event_time, event_label in show_events:
ax_price.axvline(x=event_time, color=COLORS['highlight'],
linestyle=':', alpha=0.8)
ax_price.annotate(
event_label, xy=(event_time, 0.95),
fontsize=8, rotation=45, ha='right',
color=COLORS['highlight']
)
# Volume plot
if has_volume:
ax_vol = axes[1]
ax_vol.bar(df.index, df[volume_col], color=COLORS['neutral'],
alpha=0.6, width=0.8)
ax_vol.set_ylabel('Volume')
ax_vol.set_xlabel('Date')
plt.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches='tight')
return fig
Probability Fan Chart
A fan chart shows a range of possible future probability paths, useful for visualizing uncertainty:
def plot_probability_fan(
current_prob: float,
hours_ahead: int = 168,
volatility: float = 0.02,
n_simulations: int = 1000,
title: str = "Probability Fan Chart",
figsize: tuple = (10, 6),
save_path: Optional[str] = None
) -> plt.Figure:
"""
Plot a fan chart showing possible future probability paths.
Uses geometric Brownian motion-like simulation bounded to [0, 1].
Args:
current_prob: Current market probability
hours_ahead: Number of hours to project forward
volatility: Per-step volatility (standard deviation)
n_simulations: Number of simulation paths
title: Chart title
figsize: Figure size
save_path: Path to save figure
Returns:
matplotlib Figure object
"""
setup_plot_style()
# Simulate paths using logit-normal random walk
np.random.seed(42)
logit_current = np.log(current_prob / (1 - current_prob))
paths = np.zeros((n_simulations, hours_ahead + 1))
paths[:, 0] = current_prob
for t in range(1, hours_ahead + 1):
logit_values = np.log(paths[:, t-1] / (1 - paths[:, t-1]))
logit_values += np.random.normal(0, volatility, n_simulations)
paths[:, t] = 1 / (1 + np.exp(-logit_values))
# Calculate percentiles
hours = np.arange(hours_ahead + 1)
p5 = np.percentile(paths, 5, axis=0)
p25 = np.percentile(paths, 25, axis=0)
p50 = np.percentile(paths, 50, axis=0)
p75 = np.percentile(paths, 75, axis=0)
p95 = np.percentile(paths, 95, axis=0)
fig, ax = plt.subplots(figsize=figsize)
# Fan bands
ax.fill_between(hours, p5, p95, alpha=0.15, color=COLORS['yes'],
label='5th-95th percentile')
ax.fill_between(hours, p25, p75, alpha=0.3, color=COLORS['yes'],
label='25th-75th percentile')
ax.plot(hours, p50, color=COLORS['yes'], linewidth=2, label='Median')
# Reference lines
ax.axhline(y=0.5, color=COLORS['neutral'], linestyle='--', alpha=0.5)
ax.axhline(y=current_prob, color=COLORS['highlight'], linestyle=':',
alpha=0.7, label=f'Current ({current_prob:.0%})')
ax.set_xlabel('Hours Ahead')
ax.set_ylabel('Probability')
ax.set_title(title, fontsize=14, fontweight='bold')
ax.set_ylim(0, 1)
ax.yaxis.set_major_formatter(mticker.PercentFormatter(1.0))
ax.legend()
plt.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches='tight')
return fig
Calibration Plot
A calibration plot shows how well predicted probabilities match observed frequencies — the heart of forecast evaluation:
def plot_calibration(
predicted: Sequence[float],
actual: Sequence[int],
n_bins: int = 10,
title: str = "Calibration Plot",
figsize: tuple = (8, 8),
save_path: Optional[str] = None
) -> plt.Figure:
"""
Plot a calibration curve comparing predicted probabilities to outcomes.
Args:
predicted: Predicted probabilities (0 to 1)
actual: Actual outcomes (0 or 1)
n_bins: Number of bins for calibration
title: Chart title
figsize: Figure size
save_path: Path to save figure
Returns:
matplotlib Figure object
"""
setup_plot_style()
predicted = np.array(predicted)
actual = np.array(actual)
# Bin predictions and calculate observed frequency in each bin
bin_edges = np.linspace(0, 1, n_bins + 1)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
bin_counts = np.zeros(n_bins)
bin_correct = np.zeros(n_bins)
for i in range(n_bins):
mask = (predicted >= bin_edges[i]) & (predicted < bin_edges[i + 1])
bin_counts[i] = mask.sum()
if bin_counts[i] > 0:
bin_correct[i] = actual[mask].mean()
else:
bin_correct[i] = np.nan
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize,
gridspec_kw={'height_ratios': [3, 1]})
# Calibration curve
valid = ~np.isnan(bin_correct)
ax1.plot(bin_centers[valid], bin_correct[valid], 'o-',
color=COLORS['yes'], linewidth=2, markersize=8,
label='Model')
ax1.plot([0, 1], [0, 1], '--', color=COLORS['neutral'],
label='Perfect calibration')
ax1.set_xlabel('Predicted Probability')
ax1.set_ylabel('Observed Frequency')
ax1.set_title(title, fontsize=14, fontweight='bold')
ax1.set_xlim(-0.02, 1.02)
ax1.set_ylim(-0.02, 1.02)
ax1.set_aspect('equal')
ax1.legend()
# Histogram of predictions
ax2.bar(bin_centers, bin_counts, width=1/n_bins * 0.8,
color=COLORS['yes'], alpha=0.6, edgecolor='white')
ax2.set_xlabel('Predicted Probability')
ax2.set_ylabel('Count')
plt.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches='tight')
return fig
Volume Bars
def plot_volume_bars(
df: pd.DataFrame,
volume_col: str = "volume",
title: str = "Trading Volume",
resample_freq: str = "D",
figsize: tuple = (12, 4),
save_path: Optional[str] = None
) -> plt.Figure:
"""
Plot trading volume as a bar chart, optionally resampled.
Args:
df: DataFrame with DatetimeIndex
volume_col: Column name for volume data
title: Chart title
resample_freq: Resampling frequency ('h', 'D', 'W')
figsize: Figure size
save_path: Path to save figure
Returns:
matplotlib Figure object
"""
setup_plot_style()
if resample_freq:
volume_data = df[volume_col].resample(resample_freq).sum()
else:
volume_data = df[volume_col]
fig, ax = plt.subplots(figsize=figsize)
ax.bar(volume_data.index, volume_data.values,
color=COLORS['yes'], alpha=0.7, edgecolor='white')
ax.set_ylabel('Volume')
ax.set_title(title, fontsize=14, fontweight='bold')
ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %d'))
plt.xticks(rotation=45)
plt.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches='tight')
return fig
6.8 Configuration and Secrets Management
One of the most common mistakes beginners make is hardcoding API keys, passwords, or other secrets directly in their source code. This is dangerous: if you commit that code to a Git repository (especially a public one), your credentials are exposed to the world. Automated bots scan GitHub for leaked API keys and exploit them within minutes.
The solution is to store secrets in environment variables and load them at runtime.
Using python-dotenv
The python-dotenv library reads key-value pairs from a .env file and makes them available as environment variables:
.env file (never commit this):
# Prediction Market API Keys
POLYMARKET_API_KEY=pk_live_abc123def456
KALSHI_API_KEY=kalshi_prod_xyz789
KALSHI_SECRET=s3cr3t_k3y_here
# Database
DATABASE_PATH=data/markets.db
# Logging
LOG_LEVEL=INFO
Loading in Python:
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Access them
polymarket_key = os.environ.get('POLYMARKET_API_KEY')
kalshi_key = os.environ.get('KALSHI_API_KEY')
db_path = os.environ.get('DATABASE_PATH', 'data/markets.db')
log_level = os.environ.get('LOG_LEVEL', 'INFO')
# Validate required keys
if not polymarket_key:
raise ValueError(
"POLYMARKET_API_KEY not set. "
"Add it to your .env file or set the environment variable."
)
Configuration File Pattern
For non-secret configuration, use a YAML file:
configs/settings.yaml:
# Application settings (no secrets here!)
data:
database_path: "data/markets.db"
raw_data_dir: "data/raw"
processed_data_dir: "data/processed"
api:
max_retries: 3
base_delay: 1.0
requests_per_second: 5
collection:
snapshot_interval_minutes: 15
markets_to_track:
- "presidential-election-2024"
- "fed-rate-decision-march"
visualization:
default_figsize: [10, 5]
dpi: 150
style: "whitegrid"
logging:
level: "INFO"
file: "logs/app.log"
max_bytes: 10485760 # 10 MB
backup_count: 5
Loading configuration:
import yaml
from pathlib import Path
def load_config(config_path: str = "configs/settings.yaml") -> dict:
"""Load application configuration from YAML file."""
path = Path(config_path)
if not path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(path, 'r') as f:
config = yaml.safe_load(f)
return config
# Usage
config = load_config()
db_path = config['data']['database_path']
max_retries = config['api']['max_retries']
.gitignore Setup
Your .gitignore file should prevent secrets, data files, and generated artifacts from being committed:
# Secrets - NEVER commit these
.env
.env.*
*.pem
*.key
# Data files (too large for git, may contain sensitive info)
data/
*.db
*.sqlite
*.parquet
*.csv
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
*.egg-info/
dist/
build/
.eggs/
# Virtual environments
venv/
env/
.venv/
*.env
# Jupyter
.ipynb_checkpoints/
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Logs
logs/
*.log
The Configuration Hierarchy
In practice, configuration comes from multiple sources. Here is the order of precedence (highest to lowest):
- Command-line arguments (most specific)
- Environment variables (including
.env) - Configuration files (e.g.,
settings.yaml) - Default values in code (most general)
import os
import argparse
import yaml
def get_config():
"""Build configuration from all sources."""
# 1. Defaults
config = {
'db_path': 'data/markets.db',
'log_level': 'INFO',
'max_retries': 3,
}
# 2. Config file
try:
with open('configs/settings.yaml', 'r') as f:
file_config = yaml.safe_load(f)
if file_config:
config.update(_flatten_dict(file_config))
except FileNotFoundError:
pass
# 3. Environment variables
env_mapping = {
'DATABASE_PATH': 'db_path',
'LOG_LEVEL': 'log_level',
'MAX_RETRIES': 'max_retries',
}
for env_key, config_key in env_mapping.items():
value = os.environ.get(env_key)
if value is not None:
config[config_key] = value
# 4. Command-line arguments (parsed elsewhere)
return config
6.9 Logging and Debugging
When things go wrong — and they will — logging is your best friend. Print statements are tempting but inadequate for any serious work. They cannot be turned off, they cannot be redirected to a file, and they do not include timestamps or severity levels.
Python Logging Setup
import logging
import logging.handlers
from pathlib import Path
def setup_logging(
level: str = "INFO",
log_file: str = "logs/app.log",
max_bytes: int = 10_485_760, # 10 MB
backup_count: int = 5
):
"""
Configure application logging.
Args:
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_file: Path to log file
max_bytes: Maximum log file size before rotation
backup_count: Number of backup log files to keep
"""
# Create logs directory
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
# Root logger configuration
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
# Format
formatter = logging.Formatter(
'%(asctime)s | %(name)-20s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
# File handler with rotation
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count
)
file_handler.setLevel(logging.DEBUG) # File gets everything
file_handler.setFormatter(formatter)
# Add handlers
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
# Reduce noise from third-party libraries
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.info(f"Logging initialized at {level} level")
Using Logging in Your Code
import logging
# Create a logger for this module
logger = logging.getLogger(__name__)
def fetch_market_data(market_id: str) -> dict:
"""Fetch market data with proper logging."""
logger.info(f"Fetching data for market {market_id}")
try:
response = api_client.get(f"/markets/{market_id}")
logger.debug(f"Raw response: {response.status_code}")
if response.success:
logger.info(f"Successfully fetched {market_id}: "
f"price={response.data['yes_price']}")
return response.data
else:
logger.warning(f"Failed to fetch {market_id}: {response.error}")
return None
except Exception as e:
logger.error(f"Exception fetching {market_id}: {e}", exc_info=True)
raise
Debug vs Production Logging
In development, you want to see everything:
# Development
setup_logging(level="DEBUG")
In production, you want only important messages:
# Production
setup_logging(level="WARNING")
The beauty of the logging module is that DEBUG-level log statements remain in your code but produce no output when the level is set to WARNING or above. You never need to add or remove debug statements — just change the log level.
Common Debugging Patterns for API Work
1. Log request and response details:
def debug_request(response):
"""Log detailed request/response information for debugging."""
logger.debug(f"Request: {response.request.method} {response.request.url}")
logger.debug(f"Request headers: {dict(response.request.headers)}")
logger.debug(f"Response status: {response.status_code}")
logger.debug(f"Response headers: {dict(response.headers)}")
logger.debug(f"Response body (first 500 chars): {response.text[:500]}")
2. Time your API calls:
import time
def timed_request(url, **kwargs):
"""Make a request and log how long it took."""
start = time.perf_counter()
response = requests.get(url, **kwargs)
elapsed = time.perf_counter() - start
logger.debug(f"Request to {url} took {elapsed:.3f}s")
return response
3. Save failed responses for inspection:
def save_debug_response(response, filename="debug_response.json"):
"""Save a problematic response to disk for inspection."""
import json
debug_data = {
'url': str(response.url),
'status_code': response.status_code,
'headers': dict(response.headers),
'body': response.text,
'timestamp': datetime.now().isoformat()
}
with open(filename, 'w') as f:
json.dump(debug_data, f, indent=2)
logger.info(f"Debug response saved to {filename}")
Error Tracking
For longer-running scripts (like data collection pipelines), track errors over time:
from collections import Counter
from datetime import datetime
class ErrorTracker:
"""Track and summarize errors over time."""
def __init__(self, max_errors: int = 1000):
self.errors: list = []
self.error_counts = Counter()
self.max_errors = max_errors
def record(self, error_type: str, message: str):
"""Record an error occurrence."""
self.errors.append({
'timestamp': datetime.now(),
'type': error_type,
'message': message
})
self.error_counts[error_type] += 1
# Prevent unbounded growth
if len(self.errors) > self.max_errors:
self.errors = self.errors[-self.max_errors:]
def summary(self) -> str:
"""Generate error summary."""
lines = ["Error Summary:"]
for error_type, count in self.error_counts.most_common():
lines.append(f" {error_type}: {count} occurrences")
lines.append(f"Total errors: {sum(self.error_counts.values())}")
return "\n".join(lines)
def recent(self, n: int = 10) -> list:
"""Get the n most recent errors."""
return self.errors[-n:]
6.10 Testing Your Setup
Now that we have built all the components, we need to verify that everything works together. Testing is not just about finding bugs — it is about building confidence that your tools are reliable before you depend on them.
Comprehensive Environment Test Script
#!/usr/bin/env python3
"""
Comprehensive test script for the prediction markets toolkit.
Run this after setting up your environment to verify everything works.
"""
import sys
import os
def test_python_version():
"""Verify Python version is 3.9+."""
version = sys.version_info
assert version >= (3, 9), f"Python 3.9+ required, got {version}"
print(f"[PASS] Python version: {sys.version}")
def test_imports():
"""Verify all required libraries can be imported."""
libraries = {
'numpy': 'numpy',
'pandas': 'pandas',
'matplotlib': 'matplotlib',
'seaborn': 'seaborn',
'scipy': 'scipy',
'requests': 'requests',
'httpx': 'httpx',
'sqlalchemy': 'sqlalchemy',
'dotenv': 'dotenv',
'yaml': 'yaml',
'tqdm': 'tqdm',
}
for name, import_name in libraries.items():
try:
__import__(import_name)
print(f" [PASS] {name}")
except ImportError:
print(f" [FAIL] {name} — not installed")
def test_database():
"""Test SQLite database operations."""
import sqlite3
import tempfile
import os
db_path = os.path.join(tempfile.gettempdir(), "test_pm.db")
try:
conn = sqlite3.connect(db_path)
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)")
conn.execute("INSERT INTO test VALUES (1, 'hello')")
result = conn.execute("SELECT value FROM test WHERE id = 1").fetchone()
assert result[0] == 'hello'
conn.close()
print("[PASS] SQLite database operations")
finally:
if os.path.exists(db_path):
os.remove(db_path)
def test_data_analysis():
"""Test basic data analysis pipeline."""
import numpy as np
import pandas as pd
# Create sample market data
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=100, freq='h')
prices = pd.DataFrame({
'timestamp': dates,
'yes_price': np.clip(np.cumsum(np.random.normal(0, 0.01, 100)) + 0.5,
0.01, 0.99),
'volume': np.random.poisson(50, 100)
})
prices.set_index('timestamp', inplace=True)
# Test operations
rolling_mean = prices['yes_price'].rolling(24).mean()
assert not rolling_mean.dropna().empty
daily_vol = prices['yes_price'].pct_change().std()
assert daily_vol > 0
print("[PASS] Data analysis pipeline")
def test_visualization():
"""Test that plotting works (generates but does not display)."""
import matplotlib
matplotlib.use('Agg') # Non-interactive backend
import matplotlib.pyplot as plt
import numpy as np
fig, ax = plt.subplots()
ax.plot([0, 1], [0, 1])
ax.set_title("Test Plot")
# Save to temporary file
import tempfile
import os
tmp_path = os.path.join(tempfile.gettempdir(), "test_plot.png")
fig.savefig(tmp_path)
plt.close(fig)
assert os.path.exists(tmp_path)
os.remove(tmp_path)
print("[PASS] Visualization system")
def test_http():
"""Test HTTP request capability."""
import requests
try:
response = requests.get("https://httpbin.org/get", timeout=10)
assert response.status_code == 200
print("[PASS] HTTP requests")
except Exception as e:
print(f"[WARN] HTTP requests — {e} (may be a network issue)")
def test_probability_functions():
"""Test our probability utility functions."""
import numpy as np
# Implied probability
prob = 0.6 / (0.6 + 0.45)
assert abs(prob - 0.5714) < 0.001
# Expected value
ev = 0.6 * (1 - 0.5) - 0.4 * 0.5
assert abs(ev - 0.10) < 0.001
# Brier score
predictions = [0.8, 0.3, 0.6]
outcomes = [1, 0, 1]
brier = np.mean([(p - o) ** 2 for p, o in zip(predictions, outcomes)])
assert brier < 0.15
print("[PASS] Probability functions")
if __name__ == "__main__":
print("=" * 50)
print("Prediction Markets Toolkit — Environment Test")
print("=" * 50)
print()
test_python_version()
print()
print("Library imports:")
test_imports()
print()
test_database()
test_data_analysis()
test_visualization()
test_http()
test_probability_functions()
print()
print("=" * 50)
print("All tests complete!")
print("=" * 50)
Using pytest for Module Testing
For the pmtools module, write proper unit tests:
# tests/test_probability.py
import pytest
import numpy as np
from pmtools.probability import (
implied_probability, expected_value, kelly_fraction,
brier_score, log_score, overround
)
class TestImpliedProbability:
def test_fair_market(self):
"""Fair market: yes=0.6, no=0.4 -> prob=0.6"""
assert implied_probability(0.6, 0.4) == pytest.approx(0.6)
def test_market_with_overround(self):
"""Market with vig: yes=0.62, no=0.42 -> prob~0.596"""
prob = implied_probability(0.62, 0.42)
assert prob == pytest.approx(0.596, abs=0.001)
def test_no_price_omitted(self):
"""When no_price is omitted, yes_price IS the probability."""
assert implied_probability(0.7) == 0.7
class TestExpectedValue:
def test_positive_ev(self):
"""Underpriced market should have positive EV."""
ev = expected_value(probability=0.7, price=0.5, side="yes")
assert ev > 0
def test_negative_ev(self):
"""Overpriced market should have negative EV."""
ev = expected_value(probability=0.3, price=0.5, side="yes")
assert ev < 0
def test_fair_price_zero_ev(self):
"""Fair price should have zero EV."""
ev = expected_value(probability=0.6, price=0.6, side="yes")
assert ev == pytest.approx(0.0)
class TestKellyFraction:
def test_no_edge_no_bet(self):
"""No edge means Kelly says don't bet."""
f = kelly_fraction(0.5, 0.5)
assert f == pytest.approx(0.0)
def test_positive_edge(self):
"""With edge, Kelly recommends a positive bet."""
f = kelly_fraction(0.7, 0.5)
assert f > 0
def test_half_kelly(self):
"""Half-Kelly should be half of full Kelly."""
full = kelly_fraction(0.7, 0.5, fractional=1.0)
half = kelly_fraction(0.7, 0.5, fractional=0.5)
assert half == pytest.approx(full / 2)
class TestBrierScore:
def test_perfect_predictions(self):
"""Perfect predictions should score 0."""
score = brier_score([1.0, 0.0, 1.0], [1, 0, 1])
assert score == pytest.approx(0.0)
def test_worst_predictions(self):
"""Maximally wrong predictions should score 1."""
score = brier_score([0.0, 1.0, 0.0], [1, 0, 1])
assert score == pytest.approx(1.0)
class TestOverround:
def test_fair_market(self):
"""Fair market has 0% overround."""
assert overround(0.6, 0.4) == pytest.approx(0.0)
def test_typical_overround(self):
"""Typical market with some vig."""
vig = overround(0.52, 0.52)
assert vig == pytest.approx(0.04)
Run tests with:
cd prediction-markets
python -m pytest tests/ -v
6.11 Version Control with Git
Version control is not optional for any serious software project, and prediction market analysis is no exception. Git tracks every change you make, lets you experiment without fear, and enables collaboration.
Initial Setup
cd prediction-markets
# Initialize repository
git init
# Create .gitignore (see Section 6.8 for full contents)
# We already created this file
# Initial commit
git add .gitignore requirements.txt README.md
git add pmtools/
git add configs/
git commit -m "Initial project setup with pmtools module"
Basic Git Workflow
The daily workflow for prediction market development:
# 1. Check what has changed
git status
git diff
# 2. Stage specific files (never use git add .)
git add pmtools/probability.py
git add notebooks/01_market_overview.ipynb
# 3. Commit with a meaningful message
git commit -m "Add Kelly criterion calculation to probability module"
# 4. View history
git log --oneline -10
Branching for Experiments
When you want to try a new strategy or approach without risking your working code:
# Create and switch to a new branch
git checkout -b experiment/momentum-strategy
# Work on your experiment...
# If it works, merge back:
git checkout main
git merge experiment/momentum-strategy
# If it fails, just switch back:
git checkout main
# The experimental branch still exists if you want it later
What to Commit and What Not To
Always commit:
- Source code (.py files)
- Configuration templates (not the actual .env)
- Requirements files
- Tests
- Notebooks (but be mindful of size — clear output before committing)
- Documentation
Never commit:
- .env files (API keys, secrets)
- Data files (.csv, .parquet, .db) — too large and potentially sensitive
- Log files
- Virtual environment directories
- __pycache__ directories
- IDE-specific files (.vscode/settings.json with personal settings)
Remote Repository
To back up your work and enable collaboration, push to a remote repository:
# Add remote (GitHub, GitLab, etc.)
git remote add origin https://github.com/yourusername/prediction-markets.git
# Push
git push -u origin main
Double-check your .gitignore before the first push. Once secrets are pushed to a public repository, consider them compromised — even if you delete them later, they remain in Git history.
6.12 Chapter Summary
You now have a complete, professional-grade Python toolkit for prediction market analysis. Let us review what we have built:
Setup Checklist
Use this checklist to verify your environment is complete:
- [ ] Python 3.9+ installed and verified
- [ ] Virtual environment created and activated
- [ ] All libraries installed from
requirements.txt - [ ] Project directory structure created
- [ ]
.envfile created with API keys (not committed to Git) - [ ]
.gitignoreconfigured to exclude secrets and data - [ ]
pmtoolsmodule created with all submodules: - [ ]
probability.py— probability calculations - [ ]
data_models.py— data structures - [ ]
visualization.py— plotting functions - [ ]
database.py— SQLite helper - [ ]
api_client.py— API client base class - [ ] Logging configured
- [ ] Configuration management set up
- [ ] Git repository initialized
- [ ] Environment test script passes all checks
Directory Structure Reference
prediction-markets/
├── .env # API keys (NEVER commit)
├── .gitignore # Git exclusions
├── README.md # Project description
├── requirements.txt # Python dependencies
├── pmtools/ # Utility module
│ ├── __init__.py
│ ├── api_client.py
│ ├── data_models.py
│ ├── database.py
│ ├── probability.py
│ └── visualization.py
├── notebooks/ # Jupyter notebooks
├── scripts/ # Production scripts
├── data/ # Data storage
│ ├── raw/
│ ├── processed/
│ └── markets.db
├── tests/ # Test files
├── configs/ # Configuration
│ └── settings.yaml
└── logs/ # Log files
Key Concepts Reviewed
| Concept | Purpose | Tool/Library |
|---|---|---|
| Virtual environments | Dependency isolation | venv or conda |
| API client base class | Reusable HTTP logic | requests, httpx |
| Data models | Type-safe data structures | dataclasses |
| Database storage | Persistent market data | sqlite3, pandas |
| Visualization | Consistent, clear charts | matplotlib, seaborn |
| Configuration | Flexible settings | python-dotenv, yaml |
| Logging | Debugging and monitoring | logging module |
| Testing | Reliability assurance | pytest |
| Version control | Change tracking | git |
What's Next
With your toolkit assembled and tested, you are ready to move into Part II: How Markets Work. In Chapter 7, we will put these tools to immediate use as we explore how prediction market prices form and evolve. You will:
- Connect to a live prediction market API using the client base class we built here
- Fetch real market data and store it in your SQLite database
- Create your first visualizations of real market prices
- Begin to recognize patterns that will inform the strategies we develop in later chapters
The foundation is solid. The axe is sharp. It is time to start chopping.