Case Study 2: Scaling a Data Pipeline from 1K to 1M Records

Performance Optimization for a Data Processing Pipeline


Background

GreenMetrics is a sustainability analytics company that collects energy consumption data from IoT sensors installed in commercial buildings. Their data pipeline ingests raw sensor readings, validates and cleans them, computes building-level aggregations, generates anomaly alerts, and stores results in a PostgreSQL database for dashboard queries.

When GreenMetrics launched with 20 pilot buildings and approximately 1,000 sensor readings per batch, the pipeline ran in 12 seconds — perfectly acceptable for their 15-minute processing interval. But after signing contracts with a regional property management company, their sensor count grew to 2,000 buildings producing 1 million readings per batch. The pipeline now took 47 minutes, far exceeding the 15-minute window. Batches overlapped, queues grew, and the dashboard showed increasingly stale data.

The engineering team needed to scale the pipeline by at least 200x — from handling 1K records in 12 seconds to handling 1M records within 15 minutes (900 seconds).

The Original Pipeline

The pipeline was a straightforward sequential Python script:

import json
import psycopg2
from datetime import datetime

def run_pipeline(readings: list[dict]) -> dict:
    """Process a batch of sensor readings."""
    conn = psycopg2.connect(dsn=DATABASE_URL)
    cursor = conn.cursor()
    stats = {"processed": 0, "errors": 0, "alerts": 0}

    for reading in readings:
        try:
            # Step 1: Validate
            if not validate_reading(reading):
                stats["errors"] += 1
                continue

            # Step 2: Clean and normalize
            cleaned = clean_reading(reading)

            # Step 3: Check for anomalies
            historical = get_historical_readings(
                cursor, cleaned["sensor_id"], hours=24
            )
            if is_anomalous(cleaned, historical):
                create_alert(cursor, cleaned)
                stats["alerts"] += 1

            # Step 4: Store the reading
            store_reading(cursor, cleaned)

            # Step 5: Update building aggregations
            update_building_aggregation(cursor, cleaned["building_id"])

            stats["processed"] += 1
        except Exception as e:
            stats["errors"] += 1
            log_error(reading, e)

    conn.commit()
    conn.close()
    return stats

Each helper function was straightforward:

def get_historical_readings(cursor, sensor_id: str, hours: int) -> list[dict]:
    """Fetch the last N hours of readings for a sensor."""
    cursor.execute(
        """SELECT value, timestamp FROM readings
           WHERE sensor_id = %s AND timestamp > NOW() - INTERVAL '%s hours'
           ORDER BY timestamp""",
        (sensor_id, hours)
    )
    return [{"value": row[0], "timestamp": row[1]} for row in cursor.fetchall()]

def store_reading(cursor, reading: dict) -> None:
    """Insert a single reading into the database."""
    cursor.execute(
        """INSERT INTO readings (sensor_id, building_id, value, unit, timestamp)
           VALUES (%s, %s, %s, %s, %s)""",
        (reading["sensor_id"], reading["building_id"],
         reading["value"], reading["unit"], reading["timestamp"])
    )

def update_building_aggregation(cursor, building_id: str) -> None:
    """Recalculate building-level aggregations."""
    cursor.execute(
        """UPDATE building_stats SET
           avg_consumption = (
               SELECT AVG(value) FROM readings
               WHERE building_id = %s AND timestamp > NOW() - INTERVAL '1 hour'
           ),
           max_consumption = (
               SELECT MAX(value) FROM readings
               WHERE building_id = %s AND timestamp > NOW() - INTERVAL '1 hour'
           ),
           updated_at = NOW()
           WHERE building_id = %s""",
        (building_id, building_id, building_id)
    )

Phase 1: Profiling the Pipeline

Timing Breakdown

The team instrumented the pipeline with detailed timing:

import time

def timed_section(name):
    """Context manager for timing code sections."""
    class Timer:
        def __enter__(self):
            self.start = time.perf_counter()
            return self
        def __exit__(self, *args):
            elapsed = time.perf_counter() - self.start
            timings[name] = timings.get(name, 0) + elapsed
    return Timer()

timings = {}

Running with 10,000 readings (a manageable subset) produced this breakdown:

Step Time % of Total Calls
Validation 0.8s 0.6% 10,000
Cleaning 1.2s 0.9% 9,800
Historical query 89.4s 67.3% 9,800
Anomaly detection 2.1s 1.6% 9,800
Store reading 18.7s 14.1% 9,800
Update aggregation 20.3s 15.3% 9,800
Other 0.4s 0.3%
Total 132.9s 100%

At this rate, 1M readings would take approximately 13,290 seconds (3.7 hours). The 15-minute target required at least a 265x speedup.

Bottleneck Analysis

The team identified four major bottlenecks:

  1. Historical query (67.3%): Fetching 24 hours of historical data for each sensor, one sensor at a time. With 9,800 calls, each averaging 9.1ms, this dominated execution.

  2. Update aggregation (15.3%): Recalculating building-level aggregations for every single reading. If a building has 100 sensors, the aggregation was recomputed 100 times per batch — 99 of which were wasted.

  3. Store reading (14.1%): Individual INSERT statements, one per reading. Each roundtrip to the database added latency.

  4. Sequential processing: All 1M readings processed one at a time, even though many operations were independent.

Phase 2: Optimization Strategy

The team formulated a five-part optimization plan, ordered by expected impact.

Optimization 1: Batch Inserts (14.1% of time)

Instead of 10,000 individual INSERT statements, use a single batch insert:

from psycopg2.extras import execute_values

def store_readings_batch(cursor, readings: list[dict]) -> None:
    """Insert readings in a single batch operation."""
    values = [
        (r["sensor_id"], r["building_id"], r["value"],
         r["unit"], r["timestamp"])
        for r in readings
    ]
    execute_values(
        cursor,
        """INSERT INTO readings (sensor_id, building_id, value, unit, timestamp)
           VALUES %s""",
        values,
        page_size=5000,
    )

Result for 10K records: Store step dropped from 18.7s to 0.3s (62x faster). Network round-trips reduced from 9,800 to 2.

Optimization 2: Deduplicate Building Aggregations (15.3% of time)

Instead of recalculating per reading, collect unique buildings and update each once:

def update_building_aggregations_batch(
    cursor, building_ids: set[str]
) -> None:
    """Update aggregations for each building exactly once."""
    for building_id in building_ids:
        cursor.execute(
            """UPDATE building_stats SET
               avg_consumption = (
                   SELECT AVG(value) FROM readings
                   WHERE building_id = %s
                   AND timestamp > NOW() - INTERVAL '1 hour'
               ),
               max_consumption = (
                   SELECT MAX(value) FROM readings
                   WHERE building_id = %s
                   AND timestamp > NOW() - INTERVAL '1 hour'
               ),
               updated_at = NOW()
               WHERE building_id = %s""",
            (building_id, building_id, building_id)
        )

With 2,000 unique buildings instead of 1M readings, the aggregation step executed 500x fewer queries.

Result: Aggregation dropped from 20.3s to 4.1s for 10K records (and the ratio would improve dramatically at scale since building count stays constant).

Optimization 3: Preload Historical Data (67.3% of time)

Instead of querying historical data per sensor, preload all recent data in a single query and build an in-memory lookup:

from collections import defaultdict

def preload_historical_data(
    cursor, sensor_ids: set[str], hours: int = 24
) -> dict[str, list[dict]]:
    """Load historical data for all sensors in one query."""
    cursor.execute(
        """SELECT sensor_id, value, timestamp
           FROM readings
           WHERE sensor_id = ANY(%s)
           AND timestamp > NOW() - INTERVAL '%s hours'
           ORDER BY sensor_id, timestamp""",
        (list(sensor_ids), hours)
    )

    history = defaultdict(list)
    for row in cursor.fetchall():
        history[row[0]].append({"value": row[1], "timestamp": row[2]})
    return dict(history)

This replaced 9,800 queries with 1. However, loading 24 hours of data for all sensors could be large. The team added an index to make the query fast:

CREATE INDEX idx_readings_sensor_timestamp
ON readings(sensor_id, timestamp DESC);

Result: Historical data loading dropped from 89.4s to 2.8s for 10K records (32x faster).

Optimization 4: Parallel Processing with Multiprocessing

Validation and cleaning are CPU-bound operations that benefit from parallelism. The team restructured the pipeline into stages:

from concurrent.futures import ProcessPoolExecutor
from itertools import islice

def chunk_list(lst: list, chunk_size: int):
    """Split a list into chunks."""
    it = iter(lst)
    while True:
        chunk = list(islice(it, chunk_size))
        if not chunk:
            break
        yield chunk

def validate_and_clean_chunk(readings: list[dict]) -> list[dict]:
    """Validate and clean a chunk of readings (CPU-bound)."""
    results = []
    for reading in readings:
        if validate_reading(reading):
            cleaned = clean_reading(reading)
            results.append(cleaned)
    return results

def parallel_validate_and_clean(
    readings: list[dict], workers: int = 8, chunk_size: int = 10_000
) -> list[dict]:
    """Process validation and cleaning across multiple CPU cores."""
    chunks = list(chunk_list(readings, chunk_size))

    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = executor.map(validate_and_clean_chunk, chunks)

    return [item for sublist in results for item in sublist]

Result: Validation and cleaning (combined 2.0s) dropped to 0.4s with 8 workers (5x faster). The improvement would be more significant at 1M records.

Optimization 5: Chunked Anomaly Detection with Batch Alerts

Instead of creating alerts one at a time, collect all anomalies and batch-insert them:

def detect_anomalies_batch(
    readings: list[dict],
    historical: dict[str, list[dict]],
) -> list[dict]:
    """Detect anomalies across all readings using preloaded history."""
    anomalies = []
    for reading in readings:
        sensor_history = historical.get(reading["sensor_id"], [])
        if is_anomalous(reading, sensor_history):
            anomalies.append(reading)
    return anomalies

def create_alerts_batch(cursor, anomalies: list[dict]) -> None:
    """Create alert records in a single batch insert."""
    if not anomalies:
        return
    values = [
        (a["sensor_id"], a["building_id"], a["value"],
         a["timestamp"], "anomaly_detected")
        for a in anomalies
    ]
    execute_values(
        cursor,
        """INSERT INTO alerts
           (sensor_id, building_id, value, timestamp, alert_type)
           VALUES %s""",
        values,
    )

Phase 3: The Optimized Pipeline

The restructured pipeline separated concerns into clear stages:

def run_optimized_pipeline(readings: list[dict]) -> dict:
    """Optimized pipeline for processing sensor readings."""
    stats = {"received": len(readings), "processed": 0,
             "errors": 0, "alerts": 0}

    # Stage 1: Parallel validation and cleaning (CPU-bound, multi-process)
    cleaned_readings = parallel_validate_and_clean(readings)
    stats["errors"] = len(readings) - len(cleaned_readings)

    # Stage 2: Preload historical data (single query)
    conn = psycopg2.connect(dsn=DATABASE_URL)
    cursor = conn.cursor()

    sensor_ids = {r["sensor_id"] for r in cleaned_readings}
    historical = preload_historical_data(cursor, sensor_ids)

    # Stage 3: Anomaly detection (CPU-bound, can parallelize)
    anomalies = detect_anomalies_batch(cleaned_readings, historical)
    stats["alerts"] = len(anomalies)

    # Stage 4: Batch store readings
    store_readings_batch(cursor, cleaned_readings)
    stats["processed"] = len(cleaned_readings)

    # Stage 5: Batch create alerts
    create_alerts_batch(cursor, anomalies)

    # Stage 6: Update building aggregations (once per building)
    building_ids = {r["building_id"] for r in cleaned_readings}
    update_building_aggregations_batch(cursor, building_ids)

    conn.commit()
    conn.close()
    return stats

Phase 4: Benchmarking at Scale

Progressive Scale Testing

The team tested at increasing scales to verify the pipeline's scaling behavior:

Records Original Optimized Speedup Target Met?
1,000 12s 0.8s 15x Yes
10,000 133s 4.2s 32x Yes
100,000 ~1,330s (est.) 38s 35x Yes
500,000 ~6,650s (est.) 187s 36x Yes
1,000,000 ~13,290s (est.) 384s 35x Yes (< 900s)

The 1M record batch completed in 384 seconds (6.4 minutes), well within the 15-minute window. The original pipeline would have taken approximately 3.7 hours.

Scaling Characteristics

The optimized pipeline exhibited near-linear scaling because the dominant operations (batch inserts, batch queries) scale linearly with data size. The building aggregation step remained constant regardless of batch size (always 2,000 buildings), and the historical data preload scaled with the number of unique sensors, not the number of readings.

Bottleneck Breakdown at 1M Records

Step Time % of Total
Parallel validation/cleaning 42s 10.9%
Preload historical data 78s 20.3%
Anomaly detection 34s 8.9%
Batch store readings 156s 40.6%
Batch create alerts 8s 2.1%
Building aggregations 62s 16.1%
Other (overhead, commit) 4s 1.0%
Total 384s 100%

The new bottleneck was batch insertion (40.6%), which is expected since writing 1M records to disk is inherently I/O-intensive. Further optimization could use PostgreSQL's COPY command for even faster bulk loading, but 384 seconds was well within the target.

Phase 5: Additional Optimizations for Headroom

To provide headroom for future growth, the team implemented two additional improvements.

Database Partitioning

The readings table was growing rapidly. The team implemented time-based partitioning:

CREATE TABLE readings (
    id BIGSERIAL,
    sensor_id VARCHAR(50) NOT NULL,
    building_id VARCHAR(50) NOT NULL,
    value DOUBLE PRECISION NOT NULL,
    unit VARCHAR(20) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (timestamp);

-- Monthly partitions
CREATE TABLE readings_2024_01 PARTITION OF readings
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- ... additional partitions

Partitioning improved query performance for time-range queries (like the historical data preload) because the database only scans relevant partitions.

COPY-Based Bulk Loading

For the batch insert step, switching from execute_values to PostgreSQL's COPY protocol provided an additional 3x speedup:

from io import StringIO

def store_readings_copy(cursor, readings: list[dict]) -> None:
    """Ultra-fast bulk insert using PostgreSQL COPY."""
    buffer = StringIO()
    for r in readings:
        buffer.write(
            f"{r['sensor_id']}\t{r['building_id']}\t{r['value']}\t"
            f"{r['unit']}\t{r['timestamp']}\n"
        )
    buffer.seek(0)
    cursor.copy_from(
        buffer, "readings",
        columns=("sensor_id", "building_id", "value", "unit", "timestamp"),
    )

Result: Batch insertion dropped from 156s to 52s. Total pipeline time for 1M records: 232 seconds (3.9 minutes).

Lessons Learned

1. The Bottleneck Shifted Five Times

Initially the bottleneck was historical data queries (67%). After fixing that, it was batch insertion (41%). After COPY optimization, it was historical preload (34%). Each optimization revealed the next bottleneck. The team stopped when they had sufficient headroom (3.9 minutes vs. 15-minute target).

2. Algorithmic Changes Beat Concurrency

The single biggest improvement came from replacing per-record historical queries with a single preloaded query — a change in algorithm, not in parallelism. Multiprocessing for validation/cleaning provided a modest 5x speedup, while the query optimization provided a 32x speedup. Always fix the algorithm first.

3. Batch Operations Are Critical at Scale

Every per-record database operation (INSERT, SELECT, UPDATE) has a fixed overhead of network round-trip and query parsing. At 1M records, these fixed costs dominate. Batching reduces per-record overhead to near zero.

4. Profiling at Representative Scale Matters

The team initially profiled at 10K records and optimized based on those proportions. But at 1M records, the proportions shifted because some operations scale linearly and others are constant. The team learned to verify their optimizations at progressively larger scales.

5. Headroom Is Essential

The team could have stopped at 384 seconds (within the 15-minute target) but pushed to 232 seconds. This headroom proved valuable when the customer added 500 more buildings three months later, increasing batch sizes to 1.5M records — still well within the 15-minute window.

Summary of Optimizations

Optimization Technique Impact
Batch inserts execute_values replacing individual INSERTs 62x faster for writes
Deduplicate aggregations Update per-building, not per-reading 500x fewer aggregation queries
Preload historical data Single query with ANY() replacing per-sensor queries 32x faster for historical lookups
Parallel validation ProcessPoolExecutor for CPU-bound steps 5x faster for validation
COPY bulk loading PostgreSQL COPY protocol for inserts 3x faster than execute_values
Database partitioning Time-based partitioning on readings table 2x faster for time-range queries
Strategic indexing Composite index on (sensor_id, timestamp) 3x faster for historical queries

Overall result: 1M records processed in 232 seconds, down from an estimated 13,290 seconds — a 57x improvement achieved through systematic measurement and targeted optimization.