Case Study 1: Reading Polymarket Data from the Blockchain

Overview

In this case study, we connect directly to the Polygon blockchain to read data from Polymarket's smart contracts. Polymarket is the largest decentralized prediction market by volume, processing billions of dollars in trades. While Polymarket provides APIs for convenient data access, reading directly from the blockchain teaches fundamental skills and provides access to raw, unfiltered data that APIs may not expose.

We will: 1. Connect to Polygon via a public RPC endpoint 2. Identify and interact with Polymarket's core smart contracts 3. Fetch market data from the Conditional Token Framework (CTF) 4. Track trading activity through event logs 5. Analyze on-chain activity patterns with Python

Background: Polymarket's Architecture

Polymarket operates on the Polygon PoS sidechain and uses Gnosis's Conditional Token Framework (CTF) for its core token mechanics. The key contracts are:

  • Conditional Token Framework (CTF): 0x4D97DCd97eC945f40cF65F87097ACe5EA0476045 on Polygon. This is an ERC-1155 contract that manages all outcome tokens. Each market condition has associated outcome tokens identified by token IDs derived from the condition ID.

  • USDC (Polygon): 0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174 (bridged USDC.e) or 0x3c499c542cEF5E3811e1192ce70d8cC03d5c3359 (native USDC). This is the collateral token used for betting.

  • Exchange/Router contracts: Handle order matching and trade execution via a Central Limit Order Book (CLOB) mechanism. These contracts facilitate the buying and selling of outcome tokens.

  • Neg Risk contracts: Handle markets with negatively correlated outcomes (e.g., "Who will win the election?" where only one outcome can win).

Step 1: Setting Up the Connection

"""
Case Study 1: Reading Polymarket Data from the Blockchain
Connects to Polygon and reads Polymarket smart contract data.
"""

from web3 import Web3
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict

# Connect to Polygon
# Public RPC endpoints (for production, use Alchemy/Infura with API key)
POLYGON_RPC_URLS = [
    'https://polygon-rpc.com',
    'https://rpc-mainnet.matic.quiknode.pro',
    'https://polygon-mainnet.g.alchemy.com/v2/YOUR_API_KEY',
]

def connect_to_polygon(rpc_urls=POLYGON_RPC_URLS):
    """Try connecting to Polygon via available RPC endpoints."""
    for url in rpc_urls:
        try:
            w3 = Web3(Web3.HTTPProvider(url, request_kwargs={'timeout': 10}))
            if w3.is_connected():
                chain_id = w3.eth.chain_id
                if chain_id == 137:
                    print(f"Connected to Polygon via {url}")
                    print(f"Chain ID: {chain_id}")
                    print(f"Latest block: {w3.eth.block_number}")
                    return w3
        except Exception as e:
            print(f"Failed to connect to {url}: {e}")
            continue
    raise ConnectionError("Could not connect to any Polygon RPC endpoint")

w3 = connect_to_polygon()

Step 2: Defining Contract Interfaces

# Polymarket CTF contract address on Polygon
CTF_ADDRESS = Web3.to_checksum_address(
    '0x4D97DCd97eC945f40cF65F87097ACe5EA0476045'
)

# USDC.e (bridged) on Polygon
USDC_ADDRESS = Web3.to_checksum_address(
    '0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'
)

# Minimal CTF ABI for reading data
CTF_ABI = [
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True, "name": "conditionId", "type": "bytes32"},
            {"indexed": True, "name": "oracle", "type": "address"},
            {"indexed": True, "name": "questionId", "type": "bytes32"},
            {"indexed": False, "name": "outcomeSlotCount", "type": "uint256"}
        ],
        "name": "ConditionPreparation",
        "type": "event"
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True, "name": "conditionId", "type": "bytes32"},
            {"indexed": True, "name": "oracle", "type": "address"},
            {"indexed": True, "name": "questionId", "type": "bytes32"},
            {"indexed": False, "name": "outcomeSlotCount", "type": "uint256"},
            {"indexed": False, "name": "payoutNumerators", "type": "uint256[]"}
        ],
        "name": "ConditionResolution",
        "type": "event"
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True, "name": "stakeholder", "type": "address"},
            {"indexed": False, "name": "collateralToken", "type": "address"},
            {"indexed": True, "name": "parentCollectionId", "type": "bytes32"},
            {"indexed": True, "name": "conditionId", "type": "bytes32"},
            {"indexed": False, "name": "partition", "type": "uint256[]"},
            {"indexed": False, "name": "amount", "type": "uint256"}
        ],
        "name": "PositionSplit",
        "type": "event"
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True, "name": "stakeholder", "type": "address"},
            {"indexed": False, "name": "collateralToken", "type": "address"},
            {"indexed": True, "name": "parentCollectionId", "type": "bytes32"},
            {"indexed": True, "name": "conditionId", "type": "bytes32"},
            {"indexed": False, "name": "partition", "type": "uint256[]"},
            {"indexed": False, "name": "amount", "type": "uint256"}
        ],
        "name": "PositionsMerge",
        "type": "event"
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True, "name": "operator", "type": "address"},
            {"indexed": True, "name": "from", "type": "address"},
            {"indexed": True, "name": "to", "type": "address"},
            {"indexed": False, "name": "id", "type": "uint256"},
            {"indexed": False, "name": "value", "type": "uint256"}
        ],
        "name": "TransferSingle",
        "type": "event"
    },
    {
        "inputs": [
            {"name": "conditionId", "type": "bytes32"}
        ],
        "name": "getOutcomeSlotCount",
        "outputs": [
            {"name": "", "type": "uint256"}
        ],
        "stateMutability": "view",
        "type": "function"
    },
    {
        "inputs": [
            {"name": "conditionId", "type": "bytes32"}
        ],
        "name": "payoutDenominator",
        "outputs": [
            {"name": "", "type": "uint256"}
        ],
        "stateMutability": "view",
        "type": "function"
    },
    {
        "inputs": [
            {"name": "conditionId", "type": "bytes32"},
            {"name": "index", "type": "uint256"}
        ],
        "name": "payoutNumerators",
        "outputs": [
            {"name": "", "type": "uint256"}
        ],
        "stateMutability": "view",
        "type": "function"
    },
    {
        "inputs": [
            {"name": "account", "type": "address"},
            {"name": "id", "type": "uint256"}
        ],
        "name": "balanceOf",
        "outputs": [
            {"name": "", "type": "uint256"}
        ],
        "stateMutability": "view",
        "type": "function"
    }
]

# Standard ERC-20 ABI for USDC
ERC20_ABI = [
    {
        "inputs": [{"name": "account", "type": "address"}],
        "name": "balanceOf",
        "outputs": [{"name": "", "type": "uint256"}],
        "stateMutability": "view",
        "type": "function"
    },
    {
        "inputs": [],
        "name": "decimals",
        "outputs": [{"name": "", "type": "uint8"}],
        "stateMutability": "view",
        "type": "function"
    },
    {
        "inputs": [],
        "name": "symbol",
        "outputs": [{"name": "", "type": "string"}],
        "stateMutability": "view",
        "type": "function"
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True, "name": "from", "type": "address"},
            {"indexed": True, "name": "to", "type": "address"},
            {"indexed": False, "name": "value", "type": "uint256"}
        ],
        "name": "Transfer",
        "type": "event"
    }
]

# Create contract instances
ctf_contract = w3.eth.contract(address=CTF_ADDRESS, abi=CTF_ABI)
usdc_contract = w3.eth.contract(address=USDC_ADDRESS, abi=ERC20_ABI)

Step 3: Reading Market Condition Data

def get_condition_info(ctf, condition_id_hex):
    """
    Read information about a market condition from the CTF contract.

    Parameters:
        ctf: The CTF contract instance
        condition_id_hex: The condition ID as a hex string (bytes32)

    Returns:
        Dictionary with condition information
    """
    condition_id = bytes.fromhex(condition_id_hex.replace('0x', ''))

    # Get the number of outcome slots
    outcome_count = ctf.functions.getOutcomeSlotCount(condition_id).call()

    # Check if resolved (payout denominator > 0 means resolved)
    payout_denominator = ctf.functions.payoutDenominator(condition_id).call()
    is_resolved = payout_denominator > 0

    result = {
        'condition_id': condition_id_hex,
        'outcome_count': outcome_count,
        'is_resolved': is_resolved,
        'payout_denominator': payout_denominator,
    }

    # If resolved, get payout numerators for each outcome
    if is_resolved and outcome_count > 0:
        payouts = []
        for i in range(outcome_count):
            numerator = ctf.functions.payoutNumerators(
                condition_id, i
            ).call()
            payouts.append(numerator)
        result['payout_numerators'] = payouts
        result['payout_fractions'] = [
            n / payout_denominator for n in payouts
        ]

    return result


# Example: Query a known condition
# (Replace with an actual Polymarket condition ID for live testing)
EXAMPLE_CONDITION_ID = (
    '0x'
    + '0' * 64  # Placeholder - replace with real condition ID
)

# For demonstration, we will search for recent conditions instead
print("Searching for recent market conditions...")

Step 4: Tracking Market Activity via Events

def get_recent_splits(w3, ctf, from_block, to_block, batch_size=2000):
    """
    Fetch recent PositionSplit events (market entries).

    When a user enters a prediction market, they "split" collateral
    into outcome tokens. Tracking splits shows market entry activity.
    """
    all_events = []
    current = from_block

    while current <= to_block:
        end = min(current + batch_size - 1, to_block)
        try:
            event_filter = ctf.events.PositionSplit.create_filter(
                from_block=current,
                to_block=end
            )
            events = event_filter.get_all_entries()
            all_events.extend(events)
            print(f"  Blocks {current}-{end}: {len(events)} splits")
        except Exception as e:
            print(f"  Error at blocks {current}-{end}: {e}")
            if batch_size > 500:
                batch_size = batch_size // 2
                continue
        current = end + 1

    return all_events


def get_recent_token_transfers(w3, ctf, from_block, to_block,
                                batch_size=2000):
    """
    Fetch recent TransferSingle events (ERC-1155 token transfers).

    These represent trades of outcome tokens between addresses.
    """
    all_events = []
    current = from_block

    while current <= to_block:
        end = min(current + batch_size - 1, to_block)
        try:
            event_filter = ctf.events.TransferSingle.create_filter(
                from_block=current,
                to_block=end
            )
            events = event_filter.get_all_entries()
            all_events.extend(events)
            print(f"  Blocks {current}-{end}: {len(events)} transfers")
        except Exception as e:
            print(f"  Error at blocks {current}-{end}: {e}")
            if batch_size > 500:
                batch_size = batch_size // 2
                continue
        current = end + 1

    return all_events


def get_recent_resolutions(w3, ctf, from_block, to_block,
                            batch_size=5000):
    """
    Fetch recent ConditionResolution events (market settlements).
    """
    all_events = []
    current = from_block

    while current <= to_block:
        end = min(current + batch_size - 1, to_block)
        try:
            event_filter = ctf.events.ConditionResolution.create_filter(
                from_block=current,
                to_block=end
            )
            events = event_filter.get_all_entries()
            all_events.extend(events)
        except Exception as e:
            if batch_size > 500:
                batch_size = batch_size // 2
                continue
        current = end + 1

    return all_events


# Fetch recent activity (last ~10,000 blocks, approximately 5-6 hours)
latest_block = w3.eth.block_number
lookback_blocks = 10000
from_block = latest_block - lookback_blocks

print(f"\nFetching data from block {from_block} to {latest_block}")
print(f"(approximately {lookback_blocks * 2 / 3600:.1f} hours of data)\n")

print("Fetching PositionSplit events...")
splits = get_recent_splits(w3, ctf_contract, from_block, latest_block)

print(f"\nFetching TransferSingle events...")
transfers = get_recent_token_transfers(
    w3, ctf_contract, from_block, latest_block
)

print(f"\nFetching ConditionResolution events...")
resolutions = get_recent_resolutions(
    w3, ctf_contract, from_block, latest_block
)

print(f"\n--- Summary ---")
print(f"Position splits (market entries): {len(splits)}")
print(f"Token transfers (trades): {len(transfers)}")
print(f"Market resolutions: {len(resolutions)}")

Step 5: Analyzing On-Chain Activity

def analyze_splits(splits, usdc_decimals=6):
    """Analyze PositionSplit events to understand market entry patterns."""
    if not splits:
        print("No split events found in the time range.")
        return

    # Aggregate by condition (market)
    markets = defaultdict(lambda: {
        'count': 0,
        'total_amount': 0,
        'unique_users': set(),
        'amounts': []
    })

    for event in splits:
        args = event['args']
        condition_id = args['conditionId'].hex()
        amount = args['amount']
        user = args['stakeholder']

        markets[condition_id]['count'] += 1
        markets[condition_id]['total_amount'] += amount
        markets[condition_id]['unique_users'].add(user)
        markets[condition_id]['amounts'].append(amount)

    print(f"\n{'='*70}")
    print(f"POSITION SPLIT ANALYSIS")
    print(f"{'='*70}")
    print(f"Total splits: {len(splits)}")
    print(f"Unique markets: {len(markets)}")

    # Top markets by volume
    sorted_markets = sorted(
        markets.items(),
        key=lambda x: x[1]['total_amount'],
        reverse=True
    )

    print(f"\nTop 10 Markets by Entry Volume:")
    print(f"{'Condition ID (short)':<24} {'Entries':>8} {'Users':>6} "
          f"{'Total (USDC)':>14} {'Avg Entry':>12}")
    print('-' * 70)

    for cid, data in sorted_markets[:10]:
        total_usdc = data['total_amount'] / 10**usdc_decimals
        avg_entry = total_usdc / data['count'] if data['count'] > 0 else 0
        print(f"{cid[:22]}.. {data['count']:>8} {len(data['unique_users']):>6} "
              f"${total_usdc:>13,.2f} ${avg_entry:>11,.2f}")

    # Overall statistics
    all_amounts = []
    all_users = set()
    total_volume = 0

    for data in markets.values():
        all_amounts.extend(data['amounts'])
        all_users.update(data['unique_users'])
        total_volume += data['total_amount']

    total_volume_usdc = total_volume / 10**usdc_decimals

    print(f"\nOverall Statistics:")
    print(f"  Total entry volume: ${total_volume_usdc:,.2f}")
    print(f"  Unique participants: {len(all_users)}")
    print(f"  Average entry size: "
          f"${total_volume_usdc / len(splits) if splits else 0:,.2f}")

    if all_amounts:
        amounts_usdc = [a / 10**usdc_decimals for a in all_amounts]
        amounts_usdc.sort()
        median_idx = len(amounts_usdc) // 2
        print(f"  Median entry size: ${amounts_usdc[median_idx]:,.2f}")
        print(f"  Largest entry: ${max(amounts_usdc):,.2f}")
        print(f"  Smallest entry: ${min(amounts_usdc):,.2f}")

    return markets


def analyze_transfers(transfers):
    """Analyze TransferSingle events for trading patterns."""
    if not transfers:
        print("No transfer events found in the time range.")
        return

    # Exclude mints (from zero address) and burns (to zero address)
    zero_addr = '0x0000000000000000000000000000000000000000'

    trades = [t for t in transfers
              if t['args']['from'] != zero_addr
              and t['args']['to'] != zero_addr]

    mints = [t for t in transfers if t['args']['from'] == zero_addr]
    burns = [t for t in transfers if t['args']['to'] == zero_addr]

    print(f"\n{'='*70}")
    print(f"TOKEN TRANSFER ANALYSIS")
    print(f"{'='*70}")
    print(f"Total transfers: {len(transfers)}")
    print(f"  Mints (new positions): {len(mints)}")
    print(f"  Burns (position exits): {len(burns)}")
    print(f"  Trades (peer-to-peer): {len(trades)}")

    # Active token IDs (markets)
    token_ids = defaultdict(int)
    for t in transfers:
        token_id = t['args']['id']
        token_ids[token_id] += 1

    print(f"\nUnique token IDs traded: {len(token_ids)}")

    # Top traded tokens
    sorted_tokens = sorted(token_ids.items(), key=lambda x: x[1],
                           reverse=True)
    print(f"\nTop 10 Most Actively Traded Token IDs:")
    for token_id, count in sorted_tokens[:10]:
        # Token ID is a large number; show abbreviated
        tid_str = str(token_id)
        if len(tid_str) > 20:
            tid_str = tid_str[:10] + '...' + tid_str[-10:]
        print(f"  Token {tid_str}: {count} transfers")

    # Unique traders
    all_traders = set()
    for t in trades:
        all_traders.add(t['args']['from'])
        all_traders.add(t['args']['to'])
    print(f"\nUnique trading addresses: {len(all_traders)}")

    return trades


def analyze_resolutions(resolutions):
    """Analyze ConditionResolution events."""
    if not resolutions:
        print("\nNo market resolutions found in the time range.")
        return

    print(f"\n{'='*70}")
    print(f"MARKET RESOLUTION ANALYSIS")
    print(f"{'='*70}")
    print(f"Markets resolved: {len(resolutions)}")

    for i, event in enumerate(resolutions[:10]):
        args = event['args']
        cid = args['conditionId'].hex()
        outcome_count = args['outcomeSlotCount']
        payouts = args['payoutNumerators']

        print(f"\n  Resolution {i+1}:")
        print(f"    Condition: {cid[:22]}...")
        print(f"    Outcomes: {outcome_count}")
        print(f"    Payouts: {payouts}")

        # Determine winning outcome
        if payouts:
            total = sum(payouts)
            if total > 0:
                for j, p in enumerate(payouts):
                    pct = p / total * 100
                    marker = " <-- WINNER" if p == max(payouts) else ""
                    print(f"    Outcome {j}: {pct:.1f}%{marker}")


# Run the analysis
market_data = analyze_splits(splits)
trade_data = analyze_transfers(transfers)
analyze_resolutions(resolutions)

Step 6: Checking Individual Token Balances

def check_token_balance(ctf, address, token_id):
    """
    Check the balance of a specific outcome token for an address.

    Parameters:
        ctf: CTF contract instance
        address: Wallet address to check
        token_id: The ERC-1155 token ID for the outcome token

    Returns:
        Token balance (raw units)
    """
    balance = ctf.functions.balanceOf(
        Web3.to_checksum_address(address),
        token_id
    ).call()
    return balance


def check_usdc_in_ctf(w3, usdc_contract, ctf_address):
    """
    Check total USDC locked in the CTF contract.
    This represents the total collateral backing all active markets.
    """
    balance = usdc_contract.functions.balanceOf(ctf_address).call()
    decimals = usdc_contract.functions.decimals().call()
    symbol = usdc_contract.functions.symbol().call()
    return balance / 10**decimals, symbol


# Check total USDC locked in Polymarket's CTF
usdc_locked, symbol = check_usdc_in_ctf(w3, usdc_contract, CTF_ADDRESS)
print(f"\n{'='*70}")
print(f"COLLATERAL ANALYSIS")
print(f"{'='*70}")
print(f"Total {symbol} locked in CTF: ${usdc_locked:,.2f}")

Step 7: Block Timestamp Analysis

def analyze_activity_timing(w3, events, sample_size=100):
    """
    Analyze when prediction market activity occurs by mapping
    events to timestamps.
    """
    if not events:
        print("No events to analyze.")
        return

    # Sample events if there are too many (to avoid excessive RPC calls)
    sample = events[:sample_size] if len(events) > sample_size else events

    timestamps = []
    for event in sample:
        try:
            block = w3.eth.get_block(event['blockNumber'])
            timestamps.append(block['timestamp'])
        except Exception:
            continue

    if not timestamps:
        print("Could not retrieve timestamps.")
        return

    # Convert to datetime
    datetimes = [datetime.utcfromtimestamp(ts) for ts in timestamps]

    # Activity by hour
    hour_counts = defaultdict(int)
    for dt in datetimes:
        hour_counts[dt.hour] += 1

    print(f"\n{'='*70}")
    print(f"ACTIVITY TIMING (UTC) - Sample of {len(datetimes)} events")
    print(f"{'='*70}")

    # Time range
    earliest = min(datetimes)
    latest = max(datetimes)
    print(f"Time range: {earliest.strftime('%Y-%m-%d %H:%M')} to "
          f"{latest.strftime('%Y-%m-%d %H:%M')} UTC")

    # Hourly distribution
    print(f"\nHourly Distribution:")
    max_count = max(hour_counts.values()) if hour_counts else 1
    for hour in range(24):
        count = hour_counts.get(hour, 0)
        bar = '#' * int(count / max_count * 40) if max_count > 0 else ''
        print(f"  {hour:02d}:00  {bar} ({count})")

    # Activity rate
    time_span = (latest - earliest).total_seconds()
    if time_span > 0:
        rate_per_minute = len(datetimes) / (time_span / 60)
        rate_per_hour = len(datetimes) / (time_span / 3600)
        print(f"\nActivity rate:")
        print(f"  {rate_per_minute:.1f} events/minute")
        print(f"  {rate_per_hour:.0f} events/hour")


# Analyze timing of split events
if splits:
    analyze_activity_timing(w3, splits)

Key Takeaways from This Case Study

  1. Direct blockchain access provides raw, unfiltered data about prediction market activity. Every trade, every market entry, and every settlement is verifiable on-chain.

  2. Event logs are the primary mechanism for tracking historical activity. The CTF contract emits PositionSplit, PositionsMerge, TransferSingle, ConditionPreparation, and ConditionResolution events that together tell the complete story of market activity.

  3. Pagination is essential when querying historical data. RPC providers limit the block range per query, requiring batch processing.

  4. The CTF contract is the heart of Polymarket's on-chain architecture. Understanding its events and state allows you to reconstruct any market's history.

  5. USDC locked in the CTF serves as a proxy for total market capitalization across all active Polymarket markets.

  6. On-chain data analysis reveals patterns in trading activity, market participation, and resolution outcomes that may not be visible through Polymarket's front-end interface.

Extensions

  • Real-time monitoring: Use WebSocket subscriptions to track events in real time
  • Price reconstruction: Use trade events combined with order book data to reconstruct price histories
  • Whale tracking: Identify and monitor large position holders
  • Cross-market correlation: Analyze how trading activity in one market correlates with activity in related markets
  • Oracle analysis: Track which oracle addresses resolve markets and their resolution patterns