Case Study 1: Reading Polymarket Data from the Blockchain
Overview
In this case study, we connect directly to the Polygon blockchain to read data from Polymarket's smart contracts. Polymarket is the largest decentralized prediction market by volume, processing billions of dollars in trades. While Polymarket provides APIs for convenient data access, reading directly from the blockchain teaches fundamental skills and provides access to raw, unfiltered data that APIs may not expose.
We will: 1. Connect to Polygon via a public RPC endpoint 2. Identify and interact with Polymarket's core smart contracts 3. Fetch market data from the Conditional Token Framework (CTF) 4. Track trading activity through event logs 5. Analyze on-chain activity patterns with Python
Background: Polymarket's Architecture
Polymarket operates on the Polygon PoS sidechain and uses Gnosis's Conditional Token Framework (CTF) for its core token mechanics. The key contracts are:
-
Conditional Token Framework (CTF):
0x4D97DCd97eC945f40cF65F87097ACe5EA0476045on Polygon. This is an ERC-1155 contract that manages all outcome tokens. Each market condition has associated outcome tokens identified by token IDs derived from the condition ID. -
USDC (Polygon):
0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174(bridged USDC.e) or0x3c499c542cEF5E3811e1192ce70d8cC03d5c3359(native USDC). This is the collateral token used for betting. -
Exchange/Router contracts: Handle order matching and trade execution via a Central Limit Order Book (CLOB) mechanism. These contracts facilitate the buying and selling of outcome tokens.
-
Neg Risk contracts: Handle markets with negatively correlated outcomes (e.g., "Who will win the election?" where only one outcome can win).
Step 1: Setting Up the Connection
"""
Case Study 1: Reading Polymarket Data from the Blockchain
Connects to Polygon and reads Polymarket smart contract data.
"""
from web3 import Web3
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict
# Connect to Polygon
# Public RPC endpoints (for production, use Alchemy/Infura with API key)
POLYGON_RPC_URLS = [
'https://polygon-rpc.com',
'https://rpc-mainnet.matic.quiknode.pro',
'https://polygon-mainnet.g.alchemy.com/v2/YOUR_API_KEY',
]
def connect_to_polygon(rpc_urls=POLYGON_RPC_URLS):
"""Try connecting to Polygon via available RPC endpoints."""
for url in rpc_urls:
try:
w3 = Web3(Web3.HTTPProvider(url, request_kwargs={'timeout': 10}))
if w3.is_connected():
chain_id = w3.eth.chain_id
if chain_id == 137:
print(f"Connected to Polygon via {url}")
print(f"Chain ID: {chain_id}")
print(f"Latest block: {w3.eth.block_number}")
return w3
except Exception as e:
print(f"Failed to connect to {url}: {e}")
continue
raise ConnectionError("Could not connect to any Polygon RPC endpoint")
w3 = connect_to_polygon()
Step 2: Defining Contract Interfaces
# Polymarket CTF contract address on Polygon
CTF_ADDRESS = Web3.to_checksum_address(
'0x4D97DCd97eC945f40cF65F87097ACe5EA0476045'
)
# USDC.e (bridged) on Polygon
USDC_ADDRESS = Web3.to_checksum_address(
'0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'
)
# Minimal CTF ABI for reading data
CTF_ABI = [
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "conditionId", "type": "bytes32"},
{"indexed": True, "name": "oracle", "type": "address"},
{"indexed": True, "name": "questionId", "type": "bytes32"},
{"indexed": False, "name": "outcomeSlotCount", "type": "uint256"}
],
"name": "ConditionPreparation",
"type": "event"
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "conditionId", "type": "bytes32"},
{"indexed": True, "name": "oracle", "type": "address"},
{"indexed": True, "name": "questionId", "type": "bytes32"},
{"indexed": False, "name": "outcomeSlotCount", "type": "uint256"},
{"indexed": False, "name": "payoutNumerators", "type": "uint256[]"}
],
"name": "ConditionResolution",
"type": "event"
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "stakeholder", "type": "address"},
{"indexed": False, "name": "collateralToken", "type": "address"},
{"indexed": True, "name": "parentCollectionId", "type": "bytes32"},
{"indexed": True, "name": "conditionId", "type": "bytes32"},
{"indexed": False, "name": "partition", "type": "uint256[]"},
{"indexed": False, "name": "amount", "type": "uint256"}
],
"name": "PositionSplit",
"type": "event"
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "stakeholder", "type": "address"},
{"indexed": False, "name": "collateralToken", "type": "address"},
{"indexed": True, "name": "parentCollectionId", "type": "bytes32"},
{"indexed": True, "name": "conditionId", "type": "bytes32"},
{"indexed": False, "name": "partition", "type": "uint256[]"},
{"indexed": False, "name": "amount", "type": "uint256"}
],
"name": "PositionsMerge",
"type": "event"
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "operator", "type": "address"},
{"indexed": True, "name": "from", "type": "address"},
{"indexed": True, "name": "to", "type": "address"},
{"indexed": False, "name": "id", "type": "uint256"},
{"indexed": False, "name": "value", "type": "uint256"}
],
"name": "TransferSingle",
"type": "event"
},
{
"inputs": [
{"name": "conditionId", "type": "bytes32"}
],
"name": "getOutcomeSlotCount",
"outputs": [
{"name": "", "type": "uint256"}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{"name": "conditionId", "type": "bytes32"}
],
"name": "payoutDenominator",
"outputs": [
{"name": "", "type": "uint256"}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{"name": "conditionId", "type": "bytes32"},
{"name": "index", "type": "uint256"}
],
"name": "payoutNumerators",
"outputs": [
{"name": "", "type": "uint256"}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{"name": "account", "type": "address"},
{"name": "id", "type": "uint256"}
],
"name": "balanceOf",
"outputs": [
{"name": "", "type": "uint256"}
],
"stateMutability": "view",
"type": "function"
}
]
# Standard ERC-20 ABI for USDC
ERC20_ABI = [
{
"inputs": [{"name": "account", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "", "type": "uint256"}],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "decimals",
"outputs": [{"name": "", "type": "uint8"}],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "symbol",
"outputs": [{"name": "", "type": "string"}],
"stateMutability": "view",
"type": "function"
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "from", "type": "address"},
{"indexed": True, "name": "to", "type": "address"},
{"indexed": False, "name": "value", "type": "uint256"}
],
"name": "Transfer",
"type": "event"
}
]
# Create contract instances
ctf_contract = w3.eth.contract(address=CTF_ADDRESS, abi=CTF_ABI)
usdc_contract = w3.eth.contract(address=USDC_ADDRESS, abi=ERC20_ABI)
Step 3: Reading Market Condition Data
def get_condition_info(ctf, condition_id_hex):
"""
Read information about a market condition from the CTF contract.
Parameters:
ctf: The CTF contract instance
condition_id_hex: The condition ID as a hex string (bytes32)
Returns:
Dictionary with condition information
"""
condition_id = bytes.fromhex(condition_id_hex.replace('0x', ''))
# Get the number of outcome slots
outcome_count = ctf.functions.getOutcomeSlotCount(condition_id).call()
# Check if resolved (payout denominator > 0 means resolved)
payout_denominator = ctf.functions.payoutDenominator(condition_id).call()
is_resolved = payout_denominator > 0
result = {
'condition_id': condition_id_hex,
'outcome_count': outcome_count,
'is_resolved': is_resolved,
'payout_denominator': payout_denominator,
}
# If resolved, get payout numerators for each outcome
if is_resolved and outcome_count > 0:
payouts = []
for i in range(outcome_count):
numerator = ctf.functions.payoutNumerators(
condition_id, i
).call()
payouts.append(numerator)
result['payout_numerators'] = payouts
result['payout_fractions'] = [
n / payout_denominator for n in payouts
]
return result
# Example: Query a known condition
# (Replace with an actual Polymarket condition ID for live testing)
EXAMPLE_CONDITION_ID = (
'0x'
+ '0' * 64 # Placeholder - replace with real condition ID
)
# For demonstration, we will search for recent conditions instead
print("Searching for recent market conditions...")
Step 4: Tracking Market Activity via Events
def get_recent_splits(w3, ctf, from_block, to_block, batch_size=2000):
"""
Fetch recent PositionSplit events (market entries).
When a user enters a prediction market, they "split" collateral
into outcome tokens. Tracking splits shows market entry activity.
"""
all_events = []
current = from_block
while current <= to_block:
end = min(current + batch_size - 1, to_block)
try:
event_filter = ctf.events.PositionSplit.create_filter(
from_block=current,
to_block=end
)
events = event_filter.get_all_entries()
all_events.extend(events)
print(f" Blocks {current}-{end}: {len(events)} splits")
except Exception as e:
print(f" Error at blocks {current}-{end}: {e}")
if batch_size > 500:
batch_size = batch_size // 2
continue
current = end + 1
return all_events
def get_recent_token_transfers(w3, ctf, from_block, to_block,
batch_size=2000):
"""
Fetch recent TransferSingle events (ERC-1155 token transfers).
These represent trades of outcome tokens between addresses.
"""
all_events = []
current = from_block
while current <= to_block:
end = min(current + batch_size - 1, to_block)
try:
event_filter = ctf.events.TransferSingle.create_filter(
from_block=current,
to_block=end
)
events = event_filter.get_all_entries()
all_events.extend(events)
print(f" Blocks {current}-{end}: {len(events)} transfers")
except Exception as e:
print(f" Error at blocks {current}-{end}: {e}")
if batch_size > 500:
batch_size = batch_size // 2
continue
current = end + 1
return all_events
def get_recent_resolutions(w3, ctf, from_block, to_block,
batch_size=5000):
"""
Fetch recent ConditionResolution events (market settlements).
"""
all_events = []
current = from_block
while current <= to_block:
end = min(current + batch_size - 1, to_block)
try:
event_filter = ctf.events.ConditionResolution.create_filter(
from_block=current,
to_block=end
)
events = event_filter.get_all_entries()
all_events.extend(events)
except Exception as e:
if batch_size > 500:
batch_size = batch_size // 2
continue
current = end + 1
return all_events
# Fetch recent activity (last ~10,000 blocks, approximately 5-6 hours)
latest_block = w3.eth.block_number
lookback_blocks = 10000
from_block = latest_block - lookback_blocks
print(f"\nFetching data from block {from_block} to {latest_block}")
print(f"(approximately {lookback_blocks * 2 / 3600:.1f} hours of data)\n")
print("Fetching PositionSplit events...")
splits = get_recent_splits(w3, ctf_contract, from_block, latest_block)
print(f"\nFetching TransferSingle events...")
transfers = get_recent_token_transfers(
w3, ctf_contract, from_block, latest_block
)
print(f"\nFetching ConditionResolution events...")
resolutions = get_recent_resolutions(
w3, ctf_contract, from_block, latest_block
)
print(f"\n--- Summary ---")
print(f"Position splits (market entries): {len(splits)}")
print(f"Token transfers (trades): {len(transfers)}")
print(f"Market resolutions: {len(resolutions)}")
Step 5: Analyzing On-Chain Activity
def analyze_splits(splits, usdc_decimals=6):
"""Analyze PositionSplit events to understand market entry patterns."""
if not splits:
print("No split events found in the time range.")
return
# Aggregate by condition (market)
markets = defaultdict(lambda: {
'count': 0,
'total_amount': 0,
'unique_users': set(),
'amounts': []
})
for event in splits:
args = event['args']
condition_id = args['conditionId'].hex()
amount = args['amount']
user = args['stakeholder']
markets[condition_id]['count'] += 1
markets[condition_id]['total_amount'] += amount
markets[condition_id]['unique_users'].add(user)
markets[condition_id]['amounts'].append(amount)
print(f"\n{'='*70}")
print(f"POSITION SPLIT ANALYSIS")
print(f"{'='*70}")
print(f"Total splits: {len(splits)}")
print(f"Unique markets: {len(markets)}")
# Top markets by volume
sorted_markets = sorted(
markets.items(),
key=lambda x: x[1]['total_amount'],
reverse=True
)
print(f"\nTop 10 Markets by Entry Volume:")
print(f"{'Condition ID (short)':<24} {'Entries':>8} {'Users':>6} "
f"{'Total (USDC)':>14} {'Avg Entry':>12}")
print('-' * 70)
for cid, data in sorted_markets[:10]:
total_usdc = data['total_amount'] / 10**usdc_decimals
avg_entry = total_usdc / data['count'] if data['count'] > 0 else 0
print(f"{cid[:22]}.. {data['count']:>8} {len(data['unique_users']):>6} "
f"${total_usdc:>13,.2f} ${avg_entry:>11,.2f}")
# Overall statistics
all_amounts = []
all_users = set()
total_volume = 0
for data in markets.values():
all_amounts.extend(data['amounts'])
all_users.update(data['unique_users'])
total_volume += data['total_amount']
total_volume_usdc = total_volume / 10**usdc_decimals
print(f"\nOverall Statistics:")
print(f" Total entry volume: ${total_volume_usdc:,.2f}")
print(f" Unique participants: {len(all_users)}")
print(f" Average entry size: "
f"${total_volume_usdc / len(splits) if splits else 0:,.2f}")
if all_amounts:
amounts_usdc = [a / 10**usdc_decimals for a in all_amounts]
amounts_usdc.sort()
median_idx = len(amounts_usdc) // 2
print(f" Median entry size: ${amounts_usdc[median_idx]:,.2f}")
print(f" Largest entry: ${max(amounts_usdc):,.2f}")
print(f" Smallest entry: ${min(amounts_usdc):,.2f}")
return markets
def analyze_transfers(transfers):
"""Analyze TransferSingle events for trading patterns."""
if not transfers:
print("No transfer events found in the time range.")
return
# Exclude mints (from zero address) and burns (to zero address)
zero_addr = '0x0000000000000000000000000000000000000000'
trades = [t for t in transfers
if t['args']['from'] != zero_addr
and t['args']['to'] != zero_addr]
mints = [t for t in transfers if t['args']['from'] == zero_addr]
burns = [t for t in transfers if t['args']['to'] == zero_addr]
print(f"\n{'='*70}")
print(f"TOKEN TRANSFER ANALYSIS")
print(f"{'='*70}")
print(f"Total transfers: {len(transfers)}")
print(f" Mints (new positions): {len(mints)}")
print(f" Burns (position exits): {len(burns)}")
print(f" Trades (peer-to-peer): {len(trades)}")
# Active token IDs (markets)
token_ids = defaultdict(int)
for t in transfers:
token_id = t['args']['id']
token_ids[token_id] += 1
print(f"\nUnique token IDs traded: {len(token_ids)}")
# Top traded tokens
sorted_tokens = sorted(token_ids.items(), key=lambda x: x[1],
reverse=True)
print(f"\nTop 10 Most Actively Traded Token IDs:")
for token_id, count in sorted_tokens[:10]:
# Token ID is a large number; show abbreviated
tid_str = str(token_id)
if len(tid_str) > 20:
tid_str = tid_str[:10] + '...' + tid_str[-10:]
print(f" Token {tid_str}: {count} transfers")
# Unique traders
all_traders = set()
for t in trades:
all_traders.add(t['args']['from'])
all_traders.add(t['args']['to'])
print(f"\nUnique trading addresses: {len(all_traders)}")
return trades
def analyze_resolutions(resolutions):
"""Analyze ConditionResolution events."""
if not resolutions:
print("\nNo market resolutions found in the time range.")
return
print(f"\n{'='*70}")
print(f"MARKET RESOLUTION ANALYSIS")
print(f"{'='*70}")
print(f"Markets resolved: {len(resolutions)}")
for i, event in enumerate(resolutions[:10]):
args = event['args']
cid = args['conditionId'].hex()
outcome_count = args['outcomeSlotCount']
payouts = args['payoutNumerators']
print(f"\n Resolution {i+1}:")
print(f" Condition: {cid[:22]}...")
print(f" Outcomes: {outcome_count}")
print(f" Payouts: {payouts}")
# Determine winning outcome
if payouts:
total = sum(payouts)
if total > 0:
for j, p in enumerate(payouts):
pct = p / total * 100
marker = " <-- WINNER" if p == max(payouts) else ""
print(f" Outcome {j}: {pct:.1f}%{marker}")
# Run the analysis
market_data = analyze_splits(splits)
trade_data = analyze_transfers(transfers)
analyze_resolutions(resolutions)
Step 6: Checking Individual Token Balances
def check_token_balance(ctf, address, token_id):
"""
Check the balance of a specific outcome token for an address.
Parameters:
ctf: CTF contract instance
address: Wallet address to check
token_id: The ERC-1155 token ID for the outcome token
Returns:
Token balance (raw units)
"""
balance = ctf.functions.balanceOf(
Web3.to_checksum_address(address),
token_id
).call()
return balance
def check_usdc_in_ctf(w3, usdc_contract, ctf_address):
"""
Check total USDC locked in the CTF contract.
This represents the total collateral backing all active markets.
"""
balance = usdc_contract.functions.balanceOf(ctf_address).call()
decimals = usdc_contract.functions.decimals().call()
symbol = usdc_contract.functions.symbol().call()
return balance / 10**decimals, symbol
# Check total USDC locked in Polymarket's CTF
usdc_locked, symbol = check_usdc_in_ctf(w3, usdc_contract, CTF_ADDRESS)
print(f"\n{'='*70}")
print(f"COLLATERAL ANALYSIS")
print(f"{'='*70}")
print(f"Total {symbol} locked in CTF: ${usdc_locked:,.2f}")
Step 7: Block Timestamp Analysis
def analyze_activity_timing(w3, events, sample_size=100):
"""
Analyze when prediction market activity occurs by mapping
events to timestamps.
"""
if not events:
print("No events to analyze.")
return
# Sample events if there are too many (to avoid excessive RPC calls)
sample = events[:sample_size] if len(events) > sample_size else events
timestamps = []
for event in sample:
try:
block = w3.eth.get_block(event['blockNumber'])
timestamps.append(block['timestamp'])
except Exception:
continue
if not timestamps:
print("Could not retrieve timestamps.")
return
# Convert to datetime
datetimes = [datetime.utcfromtimestamp(ts) for ts in timestamps]
# Activity by hour
hour_counts = defaultdict(int)
for dt in datetimes:
hour_counts[dt.hour] += 1
print(f"\n{'='*70}")
print(f"ACTIVITY TIMING (UTC) - Sample of {len(datetimes)} events")
print(f"{'='*70}")
# Time range
earliest = min(datetimes)
latest = max(datetimes)
print(f"Time range: {earliest.strftime('%Y-%m-%d %H:%M')} to "
f"{latest.strftime('%Y-%m-%d %H:%M')} UTC")
# Hourly distribution
print(f"\nHourly Distribution:")
max_count = max(hour_counts.values()) if hour_counts else 1
for hour in range(24):
count = hour_counts.get(hour, 0)
bar = '#' * int(count / max_count * 40) if max_count > 0 else ''
print(f" {hour:02d}:00 {bar} ({count})")
# Activity rate
time_span = (latest - earliest).total_seconds()
if time_span > 0:
rate_per_minute = len(datetimes) / (time_span / 60)
rate_per_hour = len(datetimes) / (time_span / 3600)
print(f"\nActivity rate:")
print(f" {rate_per_minute:.1f} events/minute")
print(f" {rate_per_hour:.0f} events/hour")
# Analyze timing of split events
if splits:
analyze_activity_timing(w3, splits)
Key Takeaways from This Case Study
-
Direct blockchain access provides raw, unfiltered data about prediction market activity. Every trade, every market entry, and every settlement is verifiable on-chain.
-
Event logs are the primary mechanism for tracking historical activity. The CTF contract emits
PositionSplit,PositionsMerge,TransferSingle,ConditionPreparation, andConditionResolutionevents that together tell the complete story of market activity. -
Pagination is essential when querying historical data. RPC providers limit the block range per query, requiring batch processing.
-
The CTF contract is the heart of Polymarket's on-chain architecture. Understanding its events and state allows you to reconstruct any market's history.
-
USDC locked in the CTF serves as a proxy for total market capitalization across all active Polymarket markets.
-
On-chain data analysis reveals patterns in trading activity, market participation, and resolution outcomes that may not be visible through Polymarket's front-end interface.
Extensions
- Real-time monitoring: Use WebSocket subscriptions to track events in real time
- Price reconstruction: Use trade events combined with order book data to reconstruct price histories
- Whale tracking: Identify and monitor large position holders
- Cross-market correlation: Analyze how trading activity in one market correlates with activity in related markets
- Oracle analysis: Track which oracle addresses resolve markets and their resolution patterns