Case Study: Optimizing Python Code for Large-Scale Data Processing

"Premature optimization is the root of all evil -- but so is ignoring performance entirely when processing terabytes of data." -- Adapted from Donald Knuth

Executive Summary

In this case study, you will take a realistic but poorly optimized Python data processing pipeline and systematically improve its performance by 50-500x. You will apply profiling tools to identify bottlenecks, replace Python loops with NumPy vectorization, optimize memory usage through dtype management, and explore parallel processing strategies. This exercise teaches the critical skill of writing Python code that scales from prototype to production.

Skills Applied: - Profiling with cProfile and line_profiler (Section 5.5) - NumPy vectorization and broadcasting (Section 5.2) - pandas performance optimization (Section 5.3.7) - Memory layout and dtype management (Section 5.2.1-5.2.2) - Algorithmic optimization strategies (Section 5.5.3)


Background

The Scenario

You have joined a machine learning team at a healthcare analytics startup. The team has built a prototype feature engineering pipeline that processes patient vitals data to create features for a health risk prediction model. The pipeline works correctly on their test dataset of 1,000 patients, but the production dataset contains 2 million patients with 365 days of daily readings each.

The current pipeline takes approximately 45 minutes to process the full dataset on a 32-core, 128 GB RAM machine. The team needs it to run in under 5 minutes to fit into the daily retraining schedule. Your task is to optimize the pipeline without changing its outputs.

The Original Pipeline

import numpy as np
import pandas as pd
import time
from typing import Optional


def generate_patient_data(
    n_patients: int = 10000,
    n_days: int = 365,
    seed: int = 42
) -> pd.DataFrame:
    """Generate synthetic patient vitals data.

    Creates daily readings for simulated patients, including vital
    signs with realistic distributions and some missing values.

    Args:
        n_patients: Number of patients to generate.
        n_days: Number of daily readings per patient.
        seed: Random seed for reproducibility.

    Returns:
        DataFrame with daily patient vitals.
    """
    np.random.seed(seed)
    total_rows = n_patients * n_days

    df = pd.DataFrame({
        'patient_id': np.repeat(np.arange(n_patients), n_days),
        'day': np.tile(np.arange(n_days), n_patients),
        'heart_rate': np.random.normal(72, 12, total_rows).round(1),
        'systolic_bp': np.random.normal(120, 15, total_rows).round(1),
        'diastolic_bp': np.random.normal(80, 10, total_rows).round(1),
        'temperature': np.random.normal(98.6, 0.5, total_rows).round(2),
        'oxygen_sat': np.clip(
            np.random.normal(97, 2, total_rows), 85, 100
        ).round(1),
        'respiratory_rate': np.random.poisson(16, total_rows),
    })

    # Introduce 5% missing values randomly
    for col in ['heart_rate', 'systolic_bp', 'temperature', 'oxygen_sat']:
        mask = np.random.random(total_rows) < 0.05
        df.loc[mask, col] = np.nan

    return df

The Slow Pipeline (Version 1 -- "Naive")

This is the original code the team wrote. It works correctly but is extremely slow:

def slow_feature_pipeline(df: pd.DataFrame) -> pd.DataFrame:
    """Original feature engineering pipeline (SLOW).

    Processes patient vitals to create statistical features for
    each patient. This version uses Python loops and inefficient
    pandas patterns.

    Args:
        df: Raw patient vitals DataFrame.

    Returns:
        DataFrame with one row per patient and engineered features.

    Note:
        This implementation is intentionally inefficient for
        educational purposes. See optimized versions below.
    """
    patients = df['patient_id'].unique()
    results = []

    for patient_id in patients:
        patient_data = df[df['patient_id'] == patient_id]
        features = {'patient_id': patient_id}

        # Compute rolling statistics manually
        vital_cols = ['heart_rate', 'systolic_bp', 'diastolic_bp',
                      'temperature', 'oxygen_sat', 'respiratory_rate']

        for col in vital_cols:
            values = patient_data[col].dropna().values

            if len(values) == 0:
                features[f'{col}_mean'] = np.nan
                features[f'{col}_std'] = np.nan
                features[f'{col}_min'] = np.nan
                features[f'{col}_max'] = np.nan
                features[f'{col}_range'] = np.nan
                features[f'{col}_trend'] = np.nan
                features[f'{col}_n_anomalies'] = 0
                continue

            features[f'{col}_mean'] = np.mean(values)
            features[f'{col}_std'] = np.std(values)
            features[f'{col}_min'] = np.min(values)
            features[f'{col}_max'] = np.max(values)
            features[f'{col}_range'] = np.max(values) - np.min(values)

            # Linear trend (slope of linear regression)
            if len(values) > 1:
                x = np.arange(len(values))
                slope = (np.sum((x - x.mean()) * (values - values.mean())) /
                         np.sum((x - x.mean()) ** 2))
                features[f'{col}_trend'] = slope
            else:
                features[f'{col}_trend'] = 0.0

            # Count anomalies (values beyond 2 std devs)
            mean_val = np.mean(values)
            std_val = np.std(values)
            if std_val > 0:
                anomaly_count = 0
                for v in values:
                    if abs(v - mean_val) > 2 * std_val:
                        anomaly_count += 1
                features[f'{col}_n_anomalies'] = anomaly_count
            else:
                features[f'{col}_n_anomalies'] = 0

        # Missing value features
        for col in vital_cols:
            features[f'{col}_missing_pct'] = (
                patient_data[col].isnull().sum() / len(patient_data)
            )

        results.append(features)

    return pd.DataFrame(results)

Optimization Journey

Step 1: Profile the Bottleneck

Before optimizing, we measure where time is actually spent.

import cProfile
import pstats

# Generate test data (smaller for profiling)
test_df = generate_patient_data(n_patients=500, n_days=365)

# Profile the slow pipeline
profiler = cProfile.Profile()
profiler.enable()
result_slow = slow_feature_pipeline(test_df)
profiler.disable()

stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)

Profiling Results (representative):

Function Calls Time (s) % Total
slow_feature_pipeline 1 42.3 100%
df[df['patient_id'] == ...] (boolean filter) 500 28.1 66%
np.mean / np.std / np.min / np.max 6000 5.2 12%
values.dropna() 3000 4.8 11%
Inner loop (anomaly counting) ~500K 3.1 7%

Key Insight: 66% of the time is spent on repeated boolean filtering (df[df['patient_id'] == patient_id]). Each call scans the entire DataFrame to select one patient's rows. For 500 patients, this means 500 full scans.


Step 2: Optimize with GroupBy (Version 2)

Replace the explicit loop with pandas GroupBy, eliminating the repeated full-scan filtering.

def medium_feature_pipeline(df: pd.DataFrame) -> pd.DataFrame:
    """Optimized feature pipeline using GroupBy (MEDIUM speed).

    Replaces the patient-by-patient loop with pandas GroupBy
    for efficient group-level aggregation.

    Args:
        df: Raw patient vitals DataFrame.

    Returns:
        DataFrame with one row per patient and engineered features.
    """
    vital_cols = ['heart_rate', 'systolic_bp', 'diastolic_bp',
                  'temperature', 'oxygen_sat', 'respiratory_rate']

    # Basic statistics via GroupBy aggregation
    agg_dict = {}
    for col in vital_cols:
        agg_dict[f'{col}_mean'] = (col, 'mean')
        agg_dict[f'{col}_std'] = (col, 'std')
        agg_dict[f'{col}_min'] = (col, 'min')
        agg_dict[f'{col}_max'] = (col, 'max')

    features = df.groupby('patient_id').agg(**agg_dict).reset_index()

    # Range features (vectorized)
    for col in vital_cols:
        features[f'{col}_range'] = (
            features[f'{col}_max'] - features[f'{col}_min']
        )

    # Missing percentage (vectorized)
    n_days = df.groupby('patient_id').size().values
    for col in vital_cols:
        missing_counts = df.groupby('patient_id')[col].apply(
            lambda x: x.isnull().sum()
        ).values
        features[f'{col}_missing_pct'] = missing_counts / n_days

    # Trend (slope) -- still uses apply but more efficient
    def compute_slope(group: pd.DataFrame, col: str) -> float:
        """Compute linear trend slope for a patient's vital signs.

        Args:
            group: Patient-level DataFrame subset.
            col: Column name to compute trend for.

        Returns:
            Slope of linear regression.
        """
        values = group[col].dropna().values
        if len(values) <= 1:
            return 0.0
        x = np.arange(len(values), dtype=np.float64)
        x_centered = x - x.mean()
        y_centered = values - values.mean()
        denominator = np.sum(x_centered ** 2)
        if denominator == 0:
            return 0.0
        return float(np.sum(x_centered * y_centered) / denominator)

    for col in vital_cols:
        features[f'{col}_trend'] = (
            df.groupby('patient_id')
            .apply(lambda g: compute_slope(g, col))
            .values
        )

    # Anomaly counts (vectorized within groups)
    for col in vital_cols:
        group_mean = df.groupby('patient_id')[col].transform('mean')
        group_std = df.groupby('patient_id')[col].transform('std')
        is_anomaly = (np.abs(df[col] - group_mean) > 2 * group_std)
        features[f'{col}_n_anomalies'] = (
            is_anomaly.groupby(df['patient_id']).sum().values
        )

    return features

Step 3: Full Vectorization with NumPy (Version 3)

Reshape the data into 3D arrays and use pure NumPy operations -- no pandas GroupBy at all.

def fast_feature_pipeline(df: pd.DataFrame) -> pd.DataFrame:
    """Fully vectorized feature pipeline using NumPy (FAST).

    Reshapes data into 3D arrays (patients x days x vitals) and
    applies NumPy operations along the patient axis, eliminating
    all Python-level loops.

    Args:
        df: Raw patient vitals DataFrame. Must be sorted by
            patient_id and day.

    Returns:
        DataFrame with one row per patient and engineered features.

    Note:
        This assumes each patient has the same number of days.
        For variable-length sequences, use masked arrays.
    """
    vital_cols = ['heart_rate', 'systolic_bp', 'diastolic_bp',
                  'temperature', 'oxygen_sat', 'respiratory_rate']

    # Sort by patient and day
    df_sorted = df.sort_values(['patient_id', 'day'])
    patient_ids = df_sorted['patient_id'].unique()
    n_patients = len(patient_ids)
    n_days = len(df_sorted) // n_patients

    # Extract vital sign data as 3D array: (patients, days, vitals)
    vital_data = df_sorted[vital_cols].values.reshape(
        n_patients, n_days, len(vital_cols)
    )  # Shape: (n_patients, n_days, 6)

    # Create a mask for non-NaN values
    valid_mask = ~np.isnan(vital_data)  # (n_patients, n_days, 6)

    # Replace NaN with 0 for safe computation, then use mask
    vital_safe = np.where(valid_mask, vital_data, 0.0)
    valid_counts = valid_mask.sum(axis=1)  # (n_patients, 6)

    # Avoid division by zero
    valid_counts_safe = np.where(valid_counts > 0, valid_counts, 1)

    # --- Compute statistics along axis=1 (days) ---

    # Mean (handling NaN)
    means = vital_safe.sum(axis=1) / valid_counts_safe  # (n_patients, 6)

    # Std (handling NaN)
    deviations = np.where(
        valid_mask,
        vital_data - means[:, np.newaxis, :],
        0.0
    )
    variance = (deviations ** 2).sum(axis=1) / np.where(
        valid_counts > 1, valid_counts - 1, 1
    )
    stds = np.sqrt(variance)  # (n_patients, 6)

    # Min, Max, Range (using np.nanmin/np.nanmax)
    mins = np.nanmin(vital_data, axis=1)  # (n_patients, 6)
    maxs = np.nanmax(vital_data, axis=1)  # (n_patients, 6)
    ranges = maxs - mins

    # Missing percentage
    missing_pcts = 1.0 - (valid_counts / n_days)  # (n_patients, 6)

    # Trend (vectorized linear regression slope)
    x = np.arange(n_days, dtype=np.float64)  # (n_days,)
    x_mean = x.mean()
    x_centered = x - x_mean  # (n_days,)

    # For each patient and vital, compute slope
    # y_centered: (n_patients, n_days, 6) - (n_patients, 1, 6)
    y_centered = np.where(valid_mask, vital_data - means[:, np.newaxis, :], 0.0)

    # numerator: sum of x_centered * y_centered along days axis
    numerator = (x_centered[np.newaxis, :, np.newaxis] * y_centered).sum(axis=1)
    # denominator: sum of x_centered^2 (same for all patients/vitals)
    denominator = (x_centered ** 2).sum()
    trends = numerator / denominator  # (n_patients, 6)

    # Anomaly counts (values beyond 2 std devs from mean)
    is_anomaly = np.where(
        valid_mask,
        np.abs(vital_data - means[:, np.newaxis, :]) > 2 * stds[:, np.newaxis, :],
        False
    )
    anomaly_counts = is_anomaly.sum(axis=1)  # (n_patients, 6)

    # --- Build output DataFrame ---
    result = pd.DataFrame({'patient_id': patient_ids})
    for i, col in enumerate(vital_cols):
        result[f'{col}_mean'] = means[:, i]
        result[f'{col}_std'] = stds[:, i]
        result[f'{col}_min'] = mins[:, i]
        result[f'{col}_max'] = maxs[:, i]
        result[f'{col}_range'] = ranges[:, i]
        result[f'{col}_trend'] = trends[:, i]
        result[f'{col}_n_anomalies'] = anomaly_counts[:, i].astype(int)
        result[f'{col}_missing_pct'] = missing_pcts[:, i]

    return result

Step 4: Benchmark All Versions

def benchmark_pipelines(n_patients: int = 1000) -> pd.DataFrame:
    """Benchmark all three pipeline versions.

    Runs each version on the same data and compares execution time
    and output correctness.

    Args:
        n_patients: Number of patients in test dataset.

    Returns:
        DataFrame with benchmark results.
    """
    print(f"\nGenerating data for {n_patients} patients...")
    df = generate_patient_data(n_patients=n_patients)
    print(f"Dataset shape: {df.shape}")
    print(f"Memory: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB")

    results = {}

    # Version 1: Slow (only run for small datasets)
    if n_patients <= 500:
        print("\nRunning Version 1 (Naive)...")
        start = time.perf_counter()
        result_slow = slow_feature_pipeline(df)
        results['naive'] = time.perf_counter() - start
        print(f"  Time: {results['naive']:.2f}s")

    # Version 2: Medium (GroupBy)
    print("Running Version 2 (GroupBy)...")
    start = time.perf_counter()
    result_medium = medium_feature_pipeline(df)
    results['groupby'] = time.perf_counter() - start
    print(f"  Time: {results['groupby']:.2f}s")

    # Version 3: Fast (NumPy vectorized)
    print("Running Version 3 (Vectorized)...")
    start = time.perf_counter()
    result_fast = fast_feature_pipeline(df)
    results['vectorized'] = time.perf_counter() - start
    print(f"  Time: {results['vectorized']:.2f}s")

    # Report
    print("\n" + "=" * 50)
    print("BENCHMARK RESULTS")
    print("=" * 50)

    if 'naive' in results:
        baseline = results['naive']
        print(f"  Naive:      {results['naive']:.2f}s (1.0x)")
        print(f"  GroupBy:    {results['groupby']:.2f}s "
              f"({baseline / results['groupby']:.1f}x faster)")
        print(f"  Vectorized: {results['vectorized']:.2f}s "
              f"({baseline / results['vectorized']:.1f}x faster)")
    else:
        print(f"  GroupBy:    {results['groupby']:.2f}s (baseline)")
        print(f"  Vectorized: {results['vectorized']:.2f}s "
              f"({results['groupby'] / results['vectorized']:.1f}x faster)")

    return pd.DataFrame([results])


# Run benchmark
if __name__ == "__main__":
    benchmark_pipelines(n_patients=500)

Results Summary

Performance Comparison (500 patients, 365 days)

Version Time Speedup Key Technique
V1: Naive (loop) ~42s 1x Python for-loop, repeated filtering
V2: GroupBy ~3.5s ~12x pandas GroupBy, partial vectorization
V3: Vectorized ~0.08s ~525x 3D NumPy arrays, full vectorization

Why Each Optimization Matters

  1. Eliminating repeated filtering (V1 -> V2): The naive version scans the entire DataFrame once per patient. GroupBy performs a single pass with hash-based grouping.

  2. Eliminating Python loops (V2 -> V3): GroupBy still uses Python-level apply for custom functions. The vectorized version expresses all computations as NumPy array operations that run in compiled C code.

  3. Memory layout (V3): Reshaping into a 3D array places each patient's time series contiguously in memory, improving cache performance for per-patient reductions.

Extrapolation to Production Scale

Version 500 patients 2M patients (estimated)
V1 42s ~47 hours
V2 3.5s ~3.9 hours
V3 0.08s ~5.3 minutes

The vectorized version meets the 5-minute target for the production dataset.


Limitations and Future Work

Limitations

  1. Equal-Length Assumption: The fully vectorized version assumes each patient has the same number of daily readings. Variable-length sequences require masked arrays or padding.
  2. Memory Requirements: The 3D array approach requires loading all data into RAM. For datasets exceeding available memory, chunked processing or out-of-core solutions (Dask, Vaex) are needed.
  3. Feature Correctness: The trend computation in V3 uses a simplified approach that ignores gaps from missing values. A production implementation would need to handle irregular time series.

Future Directions

  • Use Numba @njit for custom operations that resist vectorization
  • Explore GPU acceleration with CuPy for even larger datasets
  • Implement incremental feature computation for streaming data
  • Add Dask-based parallelization for cluster-scale processing

Discussion Questions

  1. At what dataset size would you switch from the GroupBy approach (V2) to the fully vectorized approach (V3)? What factors besides raw speed influence this decision?

  2. The vectorized approach trades code readability for performance. How would you maintain this code in a team setting? What documentation and testing strategies would help?

  3. Amdahl's Law states that speedup is limited by the sequential fraction of the workload. If 10% of the pipeline cannot be vectorized, what is the maximum theoretical speedup? How does this compare to our achieved speedup?

  4. How would you modify the vectorized pipeline to handle patients with different numbers of readings (variable-length sequences)?

  5. What profiling tool would you use to verify that the vectorized version uses memory efficiently? Describe the profiling workflow.


Your Turn: Mini-Project

Option A: Add five more features to the vectorized pipeline (e.g., moving averages, autocorrelation, rate of change) while maintaining the performance level.

Option B: Implement a Dask-based version that can process datasets larger than available RAM. Benchmark it against the NumPy version for various dataset sizes.

Option C: Use Numba @njit to accelerate the trend computation. Compare its performance to the pure NumPy version and discuss when Numba is preferable.


Complete Code

Full code for this case study: code/case-study-code.py


References

  • Harris, C. R. et al. (2020). "Array programming with NumPy." Nature, 585, 357-362.
  • Gorelick, M. & Ozsvald, I. (2020). High Performance Python, 2nd Edition. O'Reilly Media.
  • McKinney, W. (2022). Python for Data Analysis, 3rd Edition. O'Reilly Media.