Case Study 1: Building a Market Data Collection Pipeline

Overview

In this case study, we build a complete data collection pipeline that connects to a prediction market API, fetches market data at regular intervals, stores it in a SQLite database, handles errors and rate limits gracefully, and produces summary analytics on the collected data. This exercise brings together nearly every concept from Chapter 6: API clients, database management, logging, configuration, and error handling.

By the end, you will have a production-quality script that can run unattended for days, collecting the data you need for the analyses in later chapters.

The Problem

You want to track the prices of 20 prediction markets over the next month. Markets trade 24/7, and prices can move significantly in response to news events. You need:

  1. Price snapshots every 15 minutes for each market
  2. Reliable storage that survives script restarts
  3. Graceful handling of API outages, rate limits, and network issues
  4. Logging so you can diagnose problems without being at your computer
  5. Summary statistics to verify data quality

Architecture

┌─────────────────┐     ┌──────────────┐     ┌───────────────┐
│  Scheduler       │────>│  Collector    │────>│  SQLite DB    │
│  (15 min loop)   │     │  (API Client) │     │  (markets.db) │
└─────────────────┘     └──────────────┘     └───────────────┘
                              │                      │
                              v                      v
                        ┌──────────┐          ┌───────────────┐
                        │  Logger  │          │  Analyzer     │
                        │  (file)  │          │  (pandas)     │
                        └──────────┘          └───────────────┘

Step 1: Configuration

We start by defining what we want to collect. Our configuration file specifies the markets, collection frequency, and API settings.

configs/collection_config.yaml:

collection:
  interval_minutes: 15
  max_markets: 20
  retry_on_failure: true

api:
  base_url: "https://api.example-market.com/v1"
  max_retries: 3
  requests_per_second: 2
  timeout_seconds: 30

database:
  path: "data/markets.db"

logging:
  level: "INFO"
  file: "logs/collector.log"

markets:
  # Can specify specific market IDs or use filters
  track_ids:
    - "presidential-election-2024"
    - "fed-rate-march-2024"
    - "sp500-above-5000"
  filters:
    min_volume: 1000
    status: "open"
    categories:
      - "politics"
      - "economics"
      - "finance"

Loading configuration:

import yaml
import os
from dotenv import load_dotenv
from pathlib import Path

def load_collection_config(
    config_path: str = "configs/collection_config.yaml"
) -> dict:
    """Load and validate collection configuration."""
    load_dotenv()

    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)

    # Inject secrets from environment
    config['api']['api_key'] = os.environ.get('MARKET_API_KEY')
    if not config['api']['api_key']:
        raise ValueError(
            "MARKET_API_KEY not found in environment. "
            "Set it in your .env file."
        )

    return config

Step 2: The API Client

We subclass our PredictionMarketClient base class for the specific platform. For this case study, we also build a mock client for testing without a real API.

import time
import random
import logging
import requests
from typing import Optional, Any
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class APIResponse:
    success: bool
    data: Optional[Any] = None
    error: Optional[str] = None
    status_code: Optional[int] = None

class MarketDataCollector:
    """
    API client specialized for data collection.

    Handles authentication, rate limiting, pagination,
    and error recovery.
    """

    def __init__(self, config: dict):
        self.base_url = config['api']['base_url']
        self.api_key = config['api']['api_key']
        self.max_retries = config['api']['max_retries']
        self.timeout = config['api']['timeout_seconds']
        self.min_interval = 1.0 / config['api']['requests_per_second']
        self._last_request = 0.0

        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json',
            'User-Agent': 'PredictionMarketCollector/1.0'
        })

        # Metrics
        self.total_requests = 0
        self.successful_requests = 0
        self.failed_requests = 0
        self.rate_limited_count = 0

    def _throttle(self):
        """Enforce rate limiting."""
        elapsed = time.time() - self._last_request
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self._last_request = time.time()

    def _request(self, endpoint: str, params: dict = None) -> APIResponse:
        """Make a throttled, retrying request."""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        self.total_requests += 1

        for attempt in range(self.max_retries):
            self._throttle()

            try:
                response = self.session.get(
                    url, params=params, timeout=self.timeout
                )

                if response.status_code == 200:
                    self.successful_requests += 1
                    return APIResponse(
                        success=True,
                        data=response.json(),
                        status_code=200
                    )

                elif response.status_code == 429:
                    self.rate_limited_count += 1
                    retry_after = int(
                        response.headers.get('Retry-After', 5)
                    )
                    delay = max(retry_after, self.min_interval * (2 ** attempt))
                    logger.warning(
                        f"Rate limited on {endpoint}. "
                        f"Waiting {delay:.0f}s "
                        f"(attempt {attempt + 1}/{self.max_retries})"
                    )
                    time.sleep(delay + random.uniform(0, 1))

                elif response.status_code >= 500:
                    delay = 2 ** attempt + random.uniform(0, 1)
                    logger.warning(
                        f"Server error {response.status_code} on {endpoint}. "
                        f"Retrying in {delay:.1f}s"
                    )
                    time.sleep(delay)

                else:
                    self.failed_requests += 1
                    return APIResponse(
                        success=False,
                        error=f"HTTP {response.status_code}: {response.text[:200]}",
                        status_code=response.status_code
                    )

            except requests.exceptions.Timeout:
                logger.warning(f"Timeout on {endpoint} (attempt {attempt + 1})")
                time.sleep(2 ** attempt)

            except requests.exceptions.ConnectionError:
                logger.warning(
                    f"Connection error on {endpoint} (attempt {attempt + 1})"
                )
                time.sleep(5 * (attempt + 1))

        self.failed_requests += 1
        return APIResponse(
            success=False,
            error=f"Failed after {self.max_retries} attempts"
        )

    def get_markets(
        self,
        market_ids: list[str] = None,
        category: str = None,
        status: str = "open",
        limit: int = 100
    ) -> APIResponse:
        """Fetch market listings."""
        params = {'status': status, 'limit': limit}
        if category:
            params['category'] = category

        response = self._request('markets', params=params)

        if response.success and market_ids:
            # Filter to specific market IDs
            response.data = [
                m for m in response.data
                if m.get('id') in market_ids
            ]

        return response

    def get_market_prices(self, market_id: str) -> APIResponse:
        """Fetch current prices for a specific market."""
        return self._request(f'markets/{market_id}/prices')

    def get_market_history(
        self,
        market_id: str,
        start: str = None,
        end: str = None,
        resolution: str = "1h"
    ) -> APIResponse:
        """Fetch historical price data."""
        params = {'resolution': resolution}
        if start:
            params['start'] = start
        if end:
            params['end'] = end
        return self._request(f'markets/{market_id}/history', params=params)

    def get_stats(self) -> dict:
        """Return collection statistics."""
        return {
            'total_requests': self.total_requests,
            'successful': self.successful_requests,
            'failed': self.failed_requests,
            'rate_limited': self.rate_limited_count,
            'success_rate': (
                self.successful_requests / max(1, self.total_requests)
            )
        }

Step 3: The Mock Client

For testing and development, we create a mock client that generates realistic data without hitting a real API:

import numpy as np
from datetime import datetime, timedelta

class MockMarketCollector(MarketDataCollector):
    """Mock collector that generates synthetic market data."""

    def __init__(self, config: dict):
        # Skip parent __init__ (no real API connection needed)
        self.total_requests = 0
        self.successful_requests = 0
        self.failed_requests = 0
        self.rate_limited_count = 0
        self._markets = self._generate_mock_markets()
        self._price_state = {
            m['id']: m['yes_price'] for m in self._markets
        }

    def _generate_mock_markets(self) -> list[dict]:
        """Generate a set of realistic mock markets."""
        markets = [
            {
                'id': 'presidential-election-2024',
                'title': 'Will the incumbent win the 2024 presidential election?',
                'category': 'politics',
                'status': 'open',
                'yes_price': 0.52,
                'no_price': 0.50,
                'volume': 2500000,
                'created_at': '2024-01-15T00:00:00Z',
                'close_date': '2024-11-05T00:00:00Z',
            },
            {
                'id': 'fed-rate-march-2024',
                'title': 'Will the Fed cut rates in March 2024?',
                'category': 'economics',
                'status': 'open',
                'yes_price': 0.15,
                'no_price': 0.87,
                'volume': 800000,
                'created_at': '2024-01-01T00:00:00Z',
                'close_date': '2024-03-20T00:00:00Z',
            },
            {
                'id': 'sp500-above-5000',
                'title': 'Will the S&P 500 close above 5000 by end of Q1?',
                'category': 'finance',
                'status': 'open',
                'yes_price': 0.68,
                'no_price': 0.35,
                'volume': 450000,
                'created_at': '2024-01-10T00:00:00Z',
                'close_date': '2024-03-31T00:00:00Z',
            },
        ]

        # Generate additional markets
        topics = [
            ("tech-ipo-q1", "Will a major tech company IPO in Q1 2024?", "finance", 0.40),
            ("oscars-best-picture", "Will Oppenheimer win Best Picture?", "entertainment", 0.72),
            ("bitcoin-above-50k", "Will Bitcoin trade above $50,000 in February?", "crypto", 0.55),
            ("ukraine-ceasefire", "Will there be a Ukraine ceasefire by June?", "politics", 0.12),
            ("ai-regulation-bill", "Will Congress pass an AI regulation bill in 2024?", "politics", 0.25),
            ("nfl-superbowl-chiefs", "Will the Chiefs win Super Bowl LVIII?", "sports", 0.35),
            ("us-gdp-above-3", "Will US GDP growth exceed 3% in Q1?", "economics", 0.30),
            ("eu-interest-rate-cut", "Will the ECB cut rates before the Fed?", "economics", 0.45),
            ("spacex-starship-orbit", "Will Starship reach orbit by mid-2024?", "science", 0.60),
            ("twitter-rebrand-reverse", "Will X rebrand back to Twitter in 2024?", "tech", 0.08),
            ("oil-above-90", "Will oil prices exceed $90/barrel in Q1?", "commodities", 0.38),
            ("china-taiwan-escalation", "Will there be a military escalation in the Taiwan Strait?", "geopolitics", 0.07),
            ("ev-sales-record", "Will US EV sales set a monthly record in Q1?", "auto", 0.62),
            ("inflation-below-3", "Will US CPI fall below 3% by March?", "economics", 0.48),
            ("major-earthquake-pacific", "Will a magnitude 7+ earthquake hit the Pacific Ring in Q1?", "science", 0.22),
            ("oscar-host-controversy", "Will the Oscars host face a major controversy?", "entertainment", 0.15),
            ("tiktok-ban-progress", "Will TikTok face a US ban or forced sale in 2024?", "tech", 0.30),
        ]

        for id_slug, title, category, prob in topics:
            overround = random.uniform(0.01, 0.04)
            markets.append({
                'id': id_slug,
                'title': title,
                'category': category,
                'status': 'open',
                'yes_price': round(prob + overround / 2, 2),
                'no_price': round(1 - prob + overround / 2, 2),
                'volume': random.randint(10000, 1000000),
                'created_at': '2024-01-01T00:00:00Z',
                'close_date': '2024-06-30T00:00:00Z',
            })

        return markets

    def _evolve_price(self, market_id: str) -> float:
        """Simulate realistic price movement."""
        current = self._price_state[market_id]
        # Logit-space random walk
        logit = np.log(current / (1 - current))
        logit += np.random.normal(0, 0.02)
        new_price = 1 / (1 + np.exp(-logit))
        new_price = np.clip(new_price, 0.01, 0.99)
        self._price_state[market_id] = float(new_price)
        return float(new_price)

    def get_markets(self, **kwargs) -> APIResponse:
        self.total_requests += 1
        self.successful_requests += 1
        time.sleep(0.1)  # Simulate network latency
        return APIResponse(success=True, data=self._markets, status_code=200)

    def get_market_prices(self, market_id: str) -> APIResponse:
        self.total_requests += 1

        # Simulate occasional failures (5% chance)
        if random.random() < 0.05:
            self.failed_requests += 1
            return APIResponse(
                success=False,
                error="Simulated server error",
                status_code=500
            )

        self.successful_requests += 1
        new_price = self._evolve_price(market_id)
        overround = random.uniform(0.01, 0.04)

        return APIResponse(
            success=True,
            data={
                'market_id': market_id,
                'yes_price': round(new_price + overround / 2, 4),
                'no_price': round(1 - new_price + overround / 2, 4),
                'volume': random.randint(100, 10000),
                'timestamp': datetime.now().isoformat()
            },
            status_code=200
        )

Step 4: The Collection Pipeline

Now we build the main collection loop that ties everything together:

import time
import logging
import json
from datetime import datetime
from pathlib import Path

logger = logging.getLogger(__name__)

class CollectionPipeline:
    """
    Orchestrates periodic data collection from prediction markets.

    Responsibilities:
    - Manages the collection schedule
    - Coordinates the API client, database, and logger
    - Tracks collection health metrics
    - Handles graceful shutdown
    """

    def __init__(self, config: dict, use_mock: bool = False):
        self.config = config
        self.interval = config['collection']['interval_minutes'] * 60
        self.max_markets = config['collection']['max_markets']

        # Initialize components
        if use_mock:
            self.collector = MockMarketCollector(config)
            logger.info("Using mock data collector")
        else:
            self.collector = MarketDataCollector(config)
            logger.info("Using live API collector")

        self.db = MarketDatabase(config['database']['path'])

        # Pipeline state
        self.running = False
        self.collection_count = 0
        self.error_count = 0
        self.start_time = None

    def collect_once(self) -> dict:
        """
        Run a single collection cycle.

        Returns:
            Dictionary with collection results
        """
        cycle_start = time.time()
        results = {
            'timestamp': datetime.now().isoformat(),
            'markets_attempted': 0,
            'markets_succeeded': 0,
            'markets_failed': 0,
            'errors': []
        }

        # Step 1: Get list of markets to track
        logger.info("Fetching market listings...")
        markets_response = self.collector.get_markets(
            status='open'
        )

        if not markets_response.success:
            error_msg = f"Failed to fetch markets: {markets_response.error}"
            logger.error(error_msg)
            results['errors'].append(error_msg)
            return results

        markets = markets_response.data[:self.max_markets]
        logger.info(f"Tracking {len(markets)} markets")

        # Step 2: Save market metadata
        for market in markets:
            try:
                self.db.save_market(market)
            except Exception as e:
                logger.error(f"Failed to save market {market.get('id')}: {e}")

        # Step 3: Fetch and store prices for each market
        for market in markets:
            market_id = market.get('id', 'unknown')
            results['markets_attempted'] += 1

            try:
                price_response = self.collector.get_market_prices(market_id)

                if price_response.success:
                    data = price_response.data
                    self.db.save_price_snapshot(
                        market_id=market_id,
                        yes_price=data['yes_price'],
                        no_price=data['no_price'],
                        volume=data.get('volume'),
                        timestamp=datetime.now()
                    )
                    results['markets_succeeded'] += 1
                    logger.debug(
                        f"  {market_id}: yes={data['yes_price']:.4f}, "
                        f"no={data['no_price']:.4f}"
                    )
                else:
                    results['markets_failed'] += 1
                    error_msg = f"{market_id}: {price_response.error}"
                    results['errors'].append(error_msg)
                    logger.warning(f"  Failed: {error_msg}")

            except Exception as e:
                results['markets_failed'] += 1
                error_msg = f"{market_id}: {str(e)}"
                results['errors'].append(error_msg)
                logger.error(f"  Exception: {error_msg}", exc_info=True)

        # Step 4: Log summary
        elapsed = time.time() - cycle_start
        logger.info(
            f"Collection cycle complete: "
            f"{results['markets_succeeded']}/{results['markets_attempted']} "
            f"succeeded in {elapsed:.1f}s"
        )

        self.collection_count += 1
        self.error_count += results['markets_failed']

        return results

    def run(self, max_cycles: int = None):
        """
        Run the collection pipeline continuously.

        Args:
            max_cycles: Maximum number of collection cycles (None for infinite)
        """
        self.running = True
        self.start_time = datetime.now()
        cycle = 0

        logger.info(
            f"Starting collection pipeline. "
            f"Interval: {self.config['collection']['interval_minutes']}min. "
            f"Max cycles: {max_cycles or 'unlimited'}"
        )

        try:
            while self.running:
                cycle += 1

                if max_cycles and cycle > max_cycles:
                    logger.info(f"Reached max cycles ({max_cycles}). Stopping.")
                    break

                logger.info(f"=== Collection Cycle {cycle} ===")

                results = self.collect_once()

                # Save cycle results to a log file
                self._save_cycle_log(cycle, results)

                # Check if we should continue
                if not self.running:
                    break

                # Wait for next cycle
                if max_cycles is None or cycle < max_cycles:
                    logger.info(
                        f"Next collection in "
                        f"{self.config['collection']['interval_minutes']} minutes"
                    )
                    self._interruptible_sleep(self.interval)

        except KeyboardInterrupt:
            logger.info("Received keyboard interrupt. Shutting down gracefully.")
        finally:
            self.running = False
            self._print_summary()

    def _interruptible_sleep(self, seconds: float):
        """Sleep in small increments so we can respond to interrupts."""
        increment = 1.0
        elapsed = 0.0
        while elapsed < seconds and self.running:
            time.sleep(min(increment, seconds - elapsed))
            elapsed += increment

    def _save_cycle_log(self, cycle: int, results: dict):
        """Save collection cycle results to a JSON log."""
        log_dir = Path("logs/collection")
        log_dir.mkdir(parents=True, exist_ok=True)

        log_file = log_dir / f"cycle_{cycle:05d}.json"
        with open(log_file, 'w') as f:
            json.dump(results, f, indent=2)

    def _print_summary(self):
        """Print a summary of the collection run."""
        duration = datetime.now() - self.start_time if self.start_time else None
        api_stats = self.collector.get_stats()

        summary = f"""
========================================
Collection Pipeline Summary
========================================
Duration:           {duration}
Collection cycles:  {self.collection_count}
Total errors:       {self.error_count}
API requests:       {api_stats['total_requests']}
Success rate:       {api_stats['success_rate']:.1%}
Rate limited:       {api_stats['rate_limited']}
========================================
"""
        logger.info(summary)
        print(summary)

Step 5: Data Analysis

After collection, we analyze the data to verify quality and extract insights:

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

def analyze_collected_data(db_path: str = "data/markets.db"):
    """Analyze the collected market data."""
    db = MarketDatabase(db_path)

    # Get all markets
    markets = db.get_all_markets()
    print(f"Total markets in database: {len(markets)}")
    print(f"Markets by category:")
    if 'category' in markets.columns:
        print(markets['category'].value_counts().to_string())
    print()

    # Analyze price data completeness
    all_market_ids = markets['id'].tolist() if not markets.empty else []
    stats = []

    for market_id in all_market_ids:
        prices = db.get_price_history(market_id)
        if not prices.empty:
            stats.append({
                'market_id': market_id,
                'snapshots': len(prices),
                'first_snapshot': prices.index.min(),
                'last_snapshot': prices.index.max(),
                'avg_yes_price': prices['yes_price'].mean(),
                'price_std': prices['yes_price'].std(),
                'max_gap_hours': (
                    prices.index.to_series()
                    .diff()
                    .max()
                    .total_seconds() / 3600
                    if len(prices) > 1 else 0
                ),
            })

    if stats:
        stats_df = pd.DataFrame(stats)
        print("Data Collection Summary:")
        print(f"  Markets with price data: {len(stats_df)}")
        print(f"  Total snapshots: {stats_df['snapshots'].sum()}")
        print(f"  Avg snapshots per market: {stats_df['snapshots'].mean():.1f}")
        print(f"  Max gap between snapshots: "
              f"{stats_df['max_gap_hours'].max():.1f} hours")
        print()

        # Identify most volatile markets
        volatile = stats_df.nlargest(5, 'price_std')
        print("Most volatile markets:")
        for _, row in volatile.iterrows():
            print(f"  {row['market_id']}: std={row['price_std']:.4f}")
        print()

    return stats_df if stats else pd.DataFrame()

def plot_collection_overview(db_path: str = "data/markets.db"):
    """Generate overview plots of collected data."""
    db = MarketDatabase(db_path)
    markets = db.get_all_markets()

    if markets.empty:
        print("No data to plot.")
        return

    # Select up to 6 markets for the overview
    market_ids = markets['id'].tolist()[:6]

    fig, axes = plt.subplots(
        len(market_ids), 1,
        figsize=(12, 3 * len(market_ids)),
        sharex=True
    )

    if len(market_ids) == 1:
        axes = [axes]

    for ax, market_id in zip(axes, market_ids):
        prices = db.get_price_history(market_id)
        if not prices.empty:
            ax.plot(prices.index, prices['yes_price'],
                    color='#2196F3', linewidth=1.2)
            ax.axhline(y=0.5, color='gray', linestyle='--', alpha=0.5)
            ax.set_ylabel('Yes Price')
            ax.set_ylim(0, 1)
            ax.set_title(market_id, fontsize=10, loc='left')

    axes[-1].xaxis.set_major_formatter(mdates.DateFormatter('%m/%d %H:%M'))
    plt.xticks(rotation=45)
    plt.suptitle('Collected Market Prices', fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.savefig('data/collection_overview.png', dpi=150, bbox_inches='tight')
    print("Saved overview plot to data/collection_overview.png")

Step 6: Putting It All Together

The main entry point:

#!/usr/bin/env python3
"""
Market data collection pipeline.

Usage:
    # Collect using mock data (for testing):
    python collect_data.py --mock --cycles 5

    # Collect using live API:
    python collect_data.py

    # Collect and analyze:
    python collect_data.py --mock --cycles 10 --analyze
"""

import argparse
import logging
import sys

def main():
    parser = argparse.ArgumentParser(
        description="Prediction market data collection pipeline"
    )
    parser.add_argument(
        '--mock', action='store_true',
        help='Use mock data instead of live API'
    )
    parser.add_argument(
        '--cycles', type=int, default=None,
        help='Number of collection cycles (default: unlimited)'
    )
    parser.add_argument(
        '--analyze', action='store_true',
        help='Analyze collected data after collection'
    )
    parser.add_argument(
        '--config', type=str, default='configs/collection_config.yaml',
        help='Path to configuration file'
    )
    args = parser.parse_args()

    # Setup logging
    setup_logging(level="INFO", log_file="logs/collector.log")
    logger = logging.getLogger(__name__)

    try:
        # Load configuration
        config = load_collection_config(args.config)
        logger.info("Configuration loaded successfully")

        # Run pipeline
        pipeline = CollectionPipeline(config, use_mock=args.mock)
        pipeline.run(max_cycles=args.cycles)

        # Analyze if requested
        if args.analyze:
            print("\n--- Data Analysis ---\n")
            stats = analyze_collected_data(config['database']['path'])
            plot_collection_overview(config['database']['path'])

    except FileNotFoundError as e:
        logger.error(f"Configuration file not found: {e}")
        sys.exit(1)
    except ValueError as e:
        logger.error(f"Configuration error: {e}")
        sys.exit(1)
    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        sys.exit(1)

if __name__ == "__main__":
    main()

Key Lessons

1. Separation of Concerns

Each component has a single responsibility: - Configuration: loads and validates settings - API Client: handles HTTP communication - Database: handles data persistence - Pipeline: orchestrates the collection cycle - Analysis: processes collected data

This separation makes the code testable, maintainable, and adaptable.

2. Defensive Programming

The pipeline handles every expected failure mode: - API rate limiting (exponential backoff) - Network errors (retry with increasing delays) - Server errors (retry a limited number of times) - Invalid data (validation before storage) - Interrupted execution (graceful shutdown with summary)

3. Observability

Without logging and metrics, a long-running script is a black box. Our pipeline provides: - Real-time logging to console and file - Per-cycle JSON logs for post-hoc analysis - API request statistics - Collection health summary on shutdown

4. Mock-Friendly Design

The mock collector follows the same interface as the real collector. This lets you develop, test, and debug without consuming API quota or requiring network access. When you are confident in the pipeline logic, switch to the live API by removing the --mock flag.

5. Scheduling

For true production use, you would run this script via a system scheduler:

# Linux/macOS cron (every 15 minutes)
*/15 * * * * cd /path/to/project && /path/to/venv/bin/python scripts/collect_data.py --cycles 1 >> logs/cron.log 2>&1

# Windows Task Scheduler
# Create a basic task that runs:
# C:\path\to\venv\Scripts\python.exe C:\path\to\project\scripts\collect_data.py --cycles 1

Alternatively, the script's built-in loop with --cycles omitted will run continuously, managing its own schedule.

Discussion Questions

  1. How would you modify this pipeline to handle markets that close (become untradeable) during collection?
  2. What additional data quality checks would you add to the analysis step?
  3. How would you extend this to collect from multiple platforms simultaneously?
  4. What monitoring or alerting would you add for a pipeline running in production for weeks?
  5. How would you handle a situation where the API changes its response format?