> War Story --- A data scientist at TurbineTech was asked to build an anomaly detection pipeline for vibration sensor data across 1,200 wind turbines. Each turbine generates approximately 86,400 readings per day (one per second). That is 103 million...
In This Chapter
- Dask, Polars, SQL Optimization, and When pandas Isn't Enough
- The Dirty Secret of Data Science Bootcamps
- Why pandas Breaks: Memory and Speed
- Dask: Out-of-Core and Parallel DataFrames
- Polars: Speed Through Rust and Apache Arrow
- SQL Optimization: Make the Database Do the Work
- Chunked Reading: When You Must Use pandas
- Sampling Strategies: When You Do Not Need All the Data
- Memory-Mapped Files and Apache Arrow
- Choosing the Right Tool: A Decision Framework
- Putting It All Together: A Multi-Tool Pipeline
- File Formats for Large Data
- Chapter Summary
Chapter 28: Working with Large Datasets
Dask, Polars, SQL Optimization, and When pandas Isn't Enough
Learning Objectives
By the end of this chapter, you will be able to:
- Identify when pandas is the bottleneck (memory, speed, or both)
- Use Dask for out-of-core and parallel DataFrame operations
- Apply Polars for high-performance in-memory analytics
- Optimize SQL queries for large-scale data extraction
- Choose the right tool for the dataset size and operation type
The Dirty Secret of Data Science Bootcamps
War Story --- A data scientist at TurbineTech was asked to build an anomaly detection pipeline for vibration sensor data across 1,200 wind turbines. Each turbine generates approximately 86,400 readings per day (one per second). That is 103 million rows per day, 37.8 billion rows per year. She opened her Jupyter notebook, typed
pd.read_csv('sensor_data_2024.csv'), and waited. Forty minutes later, her laptop's 32 GB of RAM was exhausted, the kernel crashed, and her only output wasMemoryError. She had never encountered a dataset that did not fit in memory, because no bootcamp, no Kaggle competition, and no textbook exercise had ever given her one.
The dirty secret of data science education is this: they never give you a dataset that does not fit in RAM. Every exercise, every tutorial, every competition dataset is carefully sized so that pd.read_csv() works without complaint. The result is an entire generation of data scientists whose instinct when facing a large dataset is to get a bigger machine --- or worse, to sample randomly and hope the sample is representative.
This chapter teaches you what to do instead. We cover three tools that handle data beyond pandas' comfort zone: Dask (out-of-core parallel computation), Polars (a blazing-fast DataFrame library built on Apache Arrow), and SQL optimization (making your database do the heavy lifting before data ever reaches Python). We also cover the often-overlooked technique of data type optimization --- the act of making pandas itself more efficient before reaching for a different tool.
The decision framework is straightforward:
| Dataset Size | Tool | Rationale |
|---|---|---|
| < 1 GB | pandas | Fast enough, familiar API, rich ecosystem |
| 1 -- 100 GB | Polars or Dask | Polars for single-machine speed; Dask for parallelism or lazy evaluation |
| > 100 GB | Spark (or distributed Dask) | True distributed computing, cluster required |
Spark is mentioned for context but is not covered in this chapter. Our focus is the 1--100 GB range --- the territory where most real-world data science happens and where the right tool choice can mean the difference between a 30-second pipeline and a 3-hour one.
Why pandas Breaks: Memory and Speed
Before reaching for a new tool, you need to understand why pandas struggles. The answer is usually one of two things: memory or speed. Often both.
The Memory Problem
pandas loads the entire dataset into RAM. Every column, every row, every value. A CSV file that is 2 GB on disk can consume 6--10 GB in memory once parsed into a DataFrame, because pandas stores strings as Python objects (each with 50+ bytes of overhead) and often infers wider dtypes than necessary.
import pandas as pd
import numpy as np
np.random.seed(42)
# Simulate a typical sensor reading dataset
n_rows = 5_000_000 # 5 million rows -- still within pandas' range
sensor_df = pd.DataFrame({
'turbine_id': np.random.randint(1, 1201, n_rows),
'timestamp': pd.date_range('2024-01-01', periods=n_rows, freq='s'),
'vibration_mm_s': np.round(np.random.normal(2.5, 0.4, n_rows), 3),
'temperature_c': np.round(np.random.normal(55.0, 3.2, n_rows), 1),
'status': np.random.choice(['normal', 'warning', 'critical'], n_rows,
p=[0.97, 0.025, 0.005])
})
# Memory usage before optimization
print("Memory usage by column:")
print(sensor_df.memory_usage(deep=True) / 1e6)
print(f"\nTotal memory: {sensor_df.memory_usage(deep=True).sum() / 1e6:.1f} MB")
Memory usage by column:
Index 128.0
turbine_id 40.0
timestamp 40.0
vibration_mm_s 40.0
temperature_c 40.0
status 310.3
Total memory: 598.3 MB
Five million rows consume 598 MB. That status column --- just three unique string values repeated five million times --- accounts for 310 MB, more than half the total. This is pandas' object dtype: each string is a separate Python object with its own memory allocation. Now imagine TurbineTech's actual dataset: 103 million rows per day, with 30 sensor columns. That is not 598 MB. That is over 100 GB.
Data Type Optimization: The First Line of Defense
Before switching tools, try making pandas more efficient. The principle is simple: use the smallest dtype that can represent your data without loss of information.
def optimize_dtypes(df):
"""Downcast numeric columns and convert low-cardinality strings to categories."""
optimized = df.copy()
for col in optimized.select_dtypes(include=['int64']).columns:
col_min = optimized[col].min()
col_max = optimized[col].max()
if col_min >= 0:
if col_max < 255:
optimized[col] = optimized[col].astype('uint8')
elif col_max < 65535:
optimized[col] = optimized[col].astype('uint16')
elif col_max < 4294967295:
optimized[col] = optimized[col].astype('uint32')
else:
if col_min > -128 and col_max < 127:
optimized[col] = optimized[col].astype('int8')
elif col_min > -32768 and col_max < 32767:
optimized[col] = optimized[col].astype('int16')
elif col_min > -2147483648 and col_max < 2147483647:
optimized[col] = optimized[col].astype('int32')
for col in optimized.select_dtypes(include=['float64']).columns:
optimized[col] = pd.to_numeric(optimized[col], downcast='float')
for col in optimized.select_dtypes(include=['object']).columns:
n_unique = optimized[col].nunique()
n_total = len(optimized[col])
if n_unique / n_total < 0.5: # fewer than 50% unique values
optimized[col] = optimized[col].astype('category')
return optimized
sensor_optimized = optimize_dtypes(sensor_df)
print("Memory after optimization:")
print(sensor_optimized.memory_usage(deep=True) / 1e6)
print(f"\nTotal memory: {sensor_optimized.memory_usage(deep=True).sum() / 1e6:.1f} MB")
print(f"Reduction: {(1 - sensor_optimized.memory_usage(deep=True).sum() / "
f"sensor_df.memory_usage(deep=True).sum()) * 100:.1f}%")
Memory after optimization:
Index 128.0
turbine_id 10.0
timestamp 40.0
vibration_mm_s 20.0
temperature_c 20.0
status 5.0
Total memory: 223.0 MB
Reduction: 62.7%
A 62.7% reduction by changing dtypes alone. The status column collapsed from 310 MB to 5 MB by converting to a categorical. The integers and floats halved by downcasting from 64-bit to 16-bit or 32-bit.
Production Tip --- Always run dtype optimization immediately after loading data. For datasets in the 1--5 GB range, this single step can be the difference between "fits in RAM" and "kernel crash." Save optimized DataFrames to Parquet rather than CSV --- Parquet preserves dtypes, compresses well, and loads much faster.
The Speed Problem
Even when data fits in memory, pandas can be slow. pandas is fundamentally single-threaded for most operations. A groupby().agg() on 100 million rows uses one CPU core while the other 7 (or 15, or 63) sit idle. String operations are Python-level loops under the hood. And pandas' eager evaluation means every intermediate step materializes a full DataFrame in memory, even if you only need the final result.
import time
# Benchmarking a common aggregation on 5M rows
start = time.time()
result = sensor_df.groupby('turbine_id').agg(
mean_vibration=('vibration_mm_s', 'mean'),
max_temperature=('temperature_c', 'max'),
n_readings=('turbine_id', 'count'),
critical_count=('status', lambda x: (x == 'critical').sum())
).reset_index()
pandas_time = time.time() - start
print(f"pandas groupby on 5M rows: {pandas_time:.2f}s")
print(f"Result shape: {result.shape}")
pandas groupby on 5M rows: 2.41s
Result shape: (1200, 5)
2.4 seconds for 5 million rows. Extrapolate to 103 million rows per day, and that single groupby takes nearly a minute. Chain five of them in a pipeline, add some merges and string operations, and your "quick analysis" takes 20 minutes. This is where Dask and Polars earn their place.
Dask: Out-of-Core and Parallel DataFrames
Dask is the tool you reach for when your data is too large for memory or your computation is too slow for a single core. It mirrors the pandas API closely enough that the learning curve is gentle, but under the hood it does two fundamentally different things: lazy evaluation and partitioned computation.
Core Concepts
Partitions. A Dask DataFrame is not one DataFrame --- it is a collection of smaller pandas DataFrames called partitions. Each partition fits in memory. When you load a 50 GB CSV with Dask, it does not read the file into a single 50 GB DataFrame. It reads it in chunks (default: one partition per file, or configurable block sizes), and each chunk is a regular pandas DataFrame.
Lazy evaluation. When you write ddf.groupby('turbine_id').mean(), Dask does not compute the result immediately. It builds a task graph --- a directed acyclic graph (DAG) of operations. The computation only happens when you call .compute(). This allows Dask to optimize the execution plan: it can fuse operations, avoid unnecessary intermediate DataFrames, and parallelize independent tasks.
Task graph. The graph tracks every operation and its dependencies. A groupby-aggregation on a partitioned DataFrame generates a graph where each partition is grouped independently, then the partial results are combined. Dask's scheduler decides how to execute this graph --- locally with threads, with processes, or across a distributed cluster.
import dask.dataframe as dd
# Save the pandas DataFrame to multiple Parquet files (simulating partitioned data)
sensor_df.to_parquet('turbinetech_sensors.parquet', engine='pyarrow')
# Load with Dask
ddf = dd.read_parquet('turbinetech_sensors.parquet')
# Inspect -- no computation happens yet
print(f"Type: {type(ddf)}")
print(f"Number of partitions: {ddf.npartitions}")
print(f"Columns: {list(ddf.columns)}")
print(f"Dtypes:\n{ddf.dtypes}")
Type: <class 'dask.dataframe.core.DataFrame'>
Number of partitions: 1
Columns: ['turbine_id', 'timestamp', 'vibration_mm_s', 'temperature_c', 'status']
Dtypes:
turbine_id int64
timestamp datetime64[ns]
vibration_mm_s float64
temperature_c float64
status object
dtype: object
Lazy Evaluation in Practice
The key mental model shift: in pandas, every line executes immediately. In Dask, operations are recorded, not executed.
# These lines build a task graph -- nothing is computed
filtered = ddf[ddf['status'] == 'critical']
grouped = filtered.groupby('turbine_id').agg(
critical_count=('vibration_mm_s', 'count'),
max_vibration=('vibration_mm_s', 'max')
)
# Inspect the task graph
print(f"Type of 'grouped': {type(grouped)}")
print(f"Task graph has {len(grouped.__dask_graph__())} tasks")
# NOW compute
result = grouped.compute()
print(f"\nResult type: {type(result)}")
print(f"Result shape: {result.shape}")
print(result.head(10))
Type of 'grouped': <class 'dask.dataframe.core.DataFrame'>
Task graph has 14 tasks
Result type: <class 'pandas.core.frame.DataFrame'>
Result shape: (1175, 2)
critical_count max_vibration
turbine_id
1 23 3.628
2 19 3.514
3 27 3.711
5 18 3.419
6 20 3.582
7 25 3.691
8 22 3.545
9 21 3.603
10 24 3.667
11 19 3.498
After .compute(), the result is a regular pandas DataFrame. This is the Dask pattern: build the graph lazily, compute once at the end.
Reading Large Files in Chunks
Dask's real power appears when data does not fit in memory. Here we simulate reading a dataset that is too large by using Dask's blocksize parameter.
# Read a large CSV in chunks (blocksize controls partition size)
# In practice, this would be a multi-gigabyte file
ddf_chunked = dd.read_csv(
'large_sensor_data_*.csv', # glob pattern for multiple files
blocksize='64MB', # each partition is ~64 MB
dtype={
'turbine_id': 'int32',
'vibration_mm_s': 'float32',
'temperature_c': 'float32',
'status': 'category'
}
)
# Or read from multiple Parquet files (preferred)
# ddf_parquet = dd.read_parquet('sensor_data_partitioned/')
Production Tip --- Always prefer Parquet over CSV for large datasets. Parquet is columnar (so reading 3 of 30 columns reads only 10% of the data), compressed (3--10x smaller than CSV), and preserves dtypes (no re-inference on load). Dask reads Parquet partitions in parallel, making it significantly faster than CSV for large files.
Dask Delayed: Custom Parallel Functions
Not everything fits the DataFrame API. dask.delayed lets you parallelize any Python function.
import dask
@dask.delayed
def process_turbine(turbine_id, df):
"""Compute rolling statistics for a single turbine."""
turbine_data = df[df['turbine_id'] == turbine_id].sort_values('timestamp')
turbine_data['rolling_mean_vib'] = turbine_data['vibration_mm_s'].rolling(
window=60, min_periods=30
).mean()
turbine_data['rolling_std_vib'] = turbine_data['vibration_mm_s'].rolling(
window=60, min_periods=30
).std()
turbine_data['anomaly_flag'] = (
turbine_data['vibration_mm_s'] >
turbine_data['rolling_mean_vib'] + 3 * turbine_data['rolling_std_vib']
)
return turbine_data[turbine_data['anomaly_flag']].shape[0]
# Build a list of delayed computations (one per turbine)
turbine_ids = sensor_df['turbine_id'].unique()[:50] # first 50 for demo
delayed_results = [process_turbine(tid, sensor_df) for tid in turbine_ids]
# Compute all in parallel
anomaly_counts = dask.compute(*delayed_results)
print(f"Anomaly counts for first 10 turbines: {anomaly_counts[:10]}")
Anomaly counts for first 10 turbines: (7, 5, 8, 6, 4, 9, 7, 5, 6, 8)
Each process_turbine call runs independently, so Dask can distribute them across cores. On an 8-core machine, this runs roughly 8x faster than a serial loop.
Dask Gotchas
Dask mirrors pandas, but it is not a drop-in replacement. Several operations behave differently:
# 1. No positional indexing with .iloc
# ddf.iloc[0] # This FAILS -- Dask does not support positional indexing
# 2. Sorting is expensive (requires shuffling data across partitions)
# ddf.sort_values('vibration_mm_s') # Works but triggers full data shuffle
# 3. Unique and nunique require a full pass
n_unique = ddf['turbine_id'].nunique().compute() # Must call .compute()
# 4. apply() with axis=1 is slow (falls back to row-wise Python loops)
# Use map_partitions for partition-level operations instead
def add_risk_score(partition):
"""Apply a function to each partition (which is a pandas DataFrame)."""
partition['risk_score'] = (
partition['vibration_mm_s'] * 0.6 +
partition['temperature_c'] * 0.4
)
return partition
ddf_scored = ddf.map_partitions(add_risk_score)
print(ddf_scored.head())
Common Mistake --- Calling
.compute()too early or too often. Every.compute()triggers a full pass through the data. Chain your operations, build the full task graph, and compute once at the end. If you call.compute()after every operation, you lose all the benefits of lazy evaluation and may actually be slower than pandas.
Polars: Speed Through Rust and Apache Arrow
Polars is a DataFrame library written in Rust that uses Apache Arrow as its memory format. If Dask's value proposition is "pandas but parallel and out-of-core," Polars' value proposition is "pandas but 10--50x faster, with less memory usage, on a single machine."
Why Polars Is Fast
Three architectural decisions explain Polars' speed:
-
Apache Arrow memory format. Arrow is a columnar, zero-copy memory layout. Data stored in Arrow format can be passed between processes and libraries without serialization. This eliminates the copying overhead that plagues pandas when interacting with other tools.
-
Rust execution engine. All computation happens in compiled Rust code, not interpreted Python. String operations, groupbys, joins --- everything runs at native speed with no Python GIL overhead.
-
Lazy evaluation with query optimization. Like Dask, Polars supports lazy execution. But Polars' optimizer goes further: it pushes predicates down (filters before joins), projects columns early (reads only needed columns), and eliminates common subexpressions. The optimizer understands the semantics of your query, not just the execution graph.
import polars as pl
# Read the same data with Polars
sensor_pl = pl.read_parquet('turbinetech_sensors.parquet')
print(f"Type: {type(sensor_pl)}")
print(f"Shape: {sensor_pl.shape}")
print(f"Schema:\n{sensor_pl.schema}")
print(f"\nEstimated memory: {sensor_pl.estimated_size('mb'):.1f} MB")
Type: <class 'polars.dataframe.frame.DataFrame'>
Shape: (5000000, 5)
Schema:
{'turbine_id': Int64, 'timestamp': Datetime(time_unit='ns', time_zone=None),
'vibration_mm_s': Float64, 'temperature_c': Float64, 'status': String}
Estimated memory: 232.8 MB
The Polars Expression System
Polars does not use the pandas index/column API. Instead, it uses an expression system built around pl.col(), pl.when(), pl.lit(), and chained method calls. This takes adjustment, but it enables the query optimizer to understand and rewrite your logic.
# pandas equivalent:
# sensor_df[sensor_df['status'] == 'critical'].groupby('turbine_id')['vibration_mm_s'].mean()
# Polars expression syntax
result_pl = (
sensor_pl
.filter(pl.col('status') == 'critical')
.group_by('turbine_id')
.agg(
pl.col('vibration_mm_s').mean().alias('mean_vibration'),
pl.col('vibration_mm_s').max().alias('max_vibration'),
pl.col('vibration_mm_s').count().alias('critical_count')
)
.sort('turbine_id')
)
print(f"Result shape: {result_pl.shape}")
print(result_pl.head(10))
Result shape: (1175, 4)
shape: (10, 4)
+-----------+----------------+---------------+----------------+
| turbine_id| mean_vibration | max_vibration | critical_count |
| --- | --- | --- | --- |
| i64 | f64 | f64 | u32 |
+-----------+----------------+---------------+----------------+
| 1 | 2.497 | 3.628 | 23 |
| 2 | 2.523 | 3.514 | 19 |
| 3 | 2.481 | 3.711 | 27 |
| 5 | 2.512 | 3.419 | 18 |
| 6 | 2.505 | 3.582 | 20 |
| 7 | 2.498 | 3.691 | 25 |
| 8 | 2.519 | 3.545 | 22 |
| 9 | 2.507 | 3.603 | 21 |
| 10 | 2.483 | 3.667 | 24 |
| 11 | 2.531 | 3.498 | 19 |
+-----------+----------------+---------------+----------------+
LazyFrame: Polars' Killer Feature
The LazyFrame is where Polars' optimizer does its best work. Instead of executing eagerly, a LazyFrame records your operations and optimizes the entire query plan before execution.
# Lazy mode: build the query plan, then collect
lazy_result = (
sensor_pl.lazy()
.filter(pl.col('vibration_mm_s') > 3.0)
.filter(pl.col('temperature_c') > 58.0)
.with_columns(
risk_score=(
pl.col('vibration_mm_s') * 0.6 +
pl.col('temperature_c') * 0.01
)
)
.group_by('turbine_id')
.agg(
pl.col('risk_score').mean().alias('avg_risk'),
pl.col('risk_score').max().alias('max_risk'),
pl.len().alias('high_risk_count')
)
.sort('avg_risk', descending=True)
)
# Inspect the optimized query plan
print("Optimized plan:")
print(lazy_result.explain())
# Execute
result = lazy_result.collect()
print(f"\nResult shape: {result.shape}")
print(result.head(10))
Optimized plan:
SORT BY [col("avg_risk")]
AGGREGATE
[col("risk_score").mean().alias("avg_risk"),
col("risk_score").max().alias("max_risk"),
len().alias("high_risk_count")]
BY [col("turbine_id")]
FROM
WITH_COLUMNS
[(col("vibration_mm_s") * 0.6) + (col("temperature_c") * 0.01)]
FROM
FILTER [(col("vibration_mm_s") > 3.0) & (col("temperature_c") > 58.0)]
FROM
DF ["turbine_id", "timestamp", "vibration_mm_s", ...]; ...
Result shape: (1082, 4)
shape: (10, 4)
+-----------+----------+----------+------------------+
| turbine_id| avg_risk | max_risk | high_risk_count |
| --- | --- | --- | --- |
| i64 | f64 | f64 | u32 |
+-----------+----------+----------+------------------+
| 483 | 2.651 | 2.824 | 6 |
| 1072 | 2.638 | 2.779 | 5 |
| 721 | 2.631 | 2.801 | 7 |
| 394 | 2.625 | 2.756 | 4 |
| 88 | 2.618 | 2.793 | 8 |
| 556 | 2.612 | 2.768 | 5 |
| 1134 | 2.607 | 2.741 | 6 |
| 267 | 2.601 | 2.789 | 7 |
| 945 | 2.596 | 2.735 | 4 |
| 612 | 2.591 | 2.772 | 6 |
+-----------+----------+----------+------------------+
Notice how the optimizer combined the two .filter() calls into a single predicate with &. It also pushed the filter before the with_columns, so the risk score is only computed for rows that pass both filters. With pandas, both filters and the column computation would execute sequentially, materializing intermediate DataFrames at each step.
Polars vs. pandas Speed Comparison
import time
# --- pandas ---
start = time.time()
pandas_result = (
sensor_df[sensor_df['status'] != 'normal']
.groupby('turbine_id')
.agg(
mean_vib=('vibration_mm_s', 'mean'),
std_vib=('vibration_mm_s', 'std'),
max_temp=('temperature_c', 'max'),
count=('turbine_id', 'count')
)
.sort_values('mean_vib', ascending=False)
)
pandas_time = time.time() - start
# --- Polars (eager) ---
start = time.time()
polars_result = (
sensor_pl
.filter(pl.col('status') != 'normal')
.group_by('turbine_id')
.agg(
pl.col('vibration_mm_s').mean().alias('mean_vib'),
pl.col('vibration_mm_s').std().alias('std_vib'),
pl.col('temperature_c').max().alias('max_temp'),
pl.len().alias('count')
)
.sort('mean_vib', descending=True)
)
polars_time = time.time() - start
print(f"pandas: {pandas_time:.3f}s")
print(f"Polars: {polars_time:.3f}s")
print(f"Speedup: {pandas_time / polars_time:.1f}x")
pandas: 1.872s
Polars: 0.089s
Speedup: 21.0x
21x faster on 5 million rows. On 100 million rows the gap widens further, because Polars' parallelism scales with core count while pandas remains single-threaded.
Converting Between pandas and Polars
You will often need to pass data between Polars and pandas-dependent libraries (scikit-learn, matplotlib, statsmodels).
# Polars to pandas
pandas_from_polars = result.to_pandas()
print(f"Type: {type(pandas_from_polars)}")
# pandas to Polars
polars_from_pandas = pl.from_pandas(sensor_df)
print(f"Type: {type(polars_from_pandas)}")
# Direct from/to Arrow (zero-copy when possible)
arrow_table = sensor_pl.to_arrow()
back_to_polars = pl.from_arrow(arrow_table)
Production Tip --- Use Polars for data loading, cleaning, and aggregation. Convert to pandas only at the boundary where you need a pandas-dependent library. This gives you Polars' speed for the heavy lifting and pandas' ecosystem compatibility for modeling and visualization.
SQL Optimization: Make the Database Do the Work
The fastest way to process large data in Python is to not process it in Python. If your data lives in a database --- PostgreSQL, MySQL, BigQuery, Snowflake, Redshift --- the database engine is almost always faster at filtering, aggregating, and joining than pandas. The problem is that many data scientists treat SQL as a data retrieval tool ("give me all the rows") rather than a computation tool ("give me the answer").
The Anti-Pattern: SELECT * FROM everything
# DON'T DO THIS
# This pulls the entire table into Python, then filters and aggregates in pandas
import sqlalchemy
engine = sqlalchemy.create_engine('postgresql://user:pass@host:5432/turbinetech')
# Anti-pattern: pull everything, process in pandas
df_all = pd.read_sql("SELECT * FROM sensor_readings", engine) # 100M+ rows
df_critical = df_all[df_all['status'] == 'critical']
result = df_critical.groupby('turbine_id')['vibration_mm_s'].mean()
This query transfers 100 million rows across the network, loads them into RAM, and then filters 97% of them away. The database could have done the filtering and aggregation in milliseconds.
The Correct Pattern: Push Computation to the Database
# DO THIS INSTEAD
# Let the database filter and aggregate -- only transfer the result
query = """
SELECT
turbine_id,
AVG(vibration_mm_s) AS mean_vibration,
MAX(vibration_mm_s) AS max_vibration,
COUNT(*) AS critical_count
FROM sensor_readings
WHERE status = 'critical'
AND timestamp >= '2024-01-01'
AND timestamp < '2025-01-01'
GROUP BY turbine_id
ORDER BY mean_vibration DESC
"""
result = pd.read_sql(query, engine)
print(f"Rows transferred: {len(result)}") # ~1,200 rows instead of 100M
SQL Optimization Techniques
Indexing. The single most impactful optimization. An index on a column is like the index of a book: it lets the database jump directly to matching rows instead of scanning every row.
-- Create indexes on columns used in WHERE and JOIN clauses
CREATE INDEX idx_sensor_status ON sensor_readings (status);
CREATE INDEX idx_sensor_timestamp ON sensor_readings (timestamp);
CREATE INDEX idx_sensor_turbine ON sensor_readings (turbine_id);
-- Composite index for queries that filter on multiple columns
CREATE INDEX idx_sensor_status_timestamp
ON sensor_readings (status, timestamp);
Partitioning. For very large tables, database-level partitioning splits the table into physical segments (by date range, by key, or by hash). Queries that filter on the partition key only scan the relevant partitions.
-- PostgreSQL range partitioning by month
CREATE TABLE sensor_readings (
id BIGSERIAL,
turbine_id INTEGER NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
vibration_mm_s REAL,
temperature_c REAL,
status VARCHAR(10)
) PARTITION BY RANGE (timestamp);
-- Create monthly partitions
CREATE TABLE sensor_readings_2024_01
PARTITION OF sensor_readings
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE sensor_readings_2024_02
PARTITION OF sensor_readings
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- ... and so on
EXPLAIN ANALYZE. The database's own diagnostic tool. It shows you exactly how the database plans to execute your query and how long each step takes.
EXPLAIN ANALYZE
SELECT turbine_id, AVG(vibration_mm_s) AS mean_vib
FROM sensor_readings
WHERE timestamp >= '2024-06-01'
AND timestamp < '2024-07-01'
AND status = 'critical'
GROUP BY turbine_id;
GroupAggregate (cost=1245.23..1267.89 rows=1200 width=12)
(actual time=12.4..14.1 rows=1175 loops=1)
Group Key: turbine_id
-> Index Scan using idx_sensor_status_timestamp on sensor_readings
(cost=0.56..1234.67 rows=2450 width=8)
(actual time=0.04..11.2 rows=2387 loops=1)
Index Cond: ((status = 'critical') AND (timestamp >= '2024-06-01')
AND (timestamp < '2024-07-01'))
Planning Time: 0.3 ms
Execution Time: 14.5 ms
14.5 milliseconds. The database used the composite index to jump directly to critical readings in June 2024, scanned only 2,387 rows, grouped them, and returned 1,175 aggregated results. Compare that to transferring 100 million rows to Python.
Common SQL Performance Killers
# These patterns are slow and should be rewritten:
# 1. SELECT * -- reads all columns, even those you don't need
# Fix: SELECT only the columns you need
bad = "SELECT * FROM sensor_readings WHERE status = 'critical'"
good = "SELECT turbine_id, vibration_mm_s FROM sensor_readings WHERE status = 'critical'"
# 2. Functions on indexed columns disable the index
# Fix: rewrite to avoid wrapping the indexed column in a function
bad = "SELECT * FROM sensor_readings WHERE YEAR(timestamp) = 2024"
good = "SELECT * FROM sensor_readings WHERE timestamp >= '2024-01-01' AND timestamp < '2025-01-01'"
# 3. OR on different columns prevents index use
# Fix: use UNION ALL instead
bad = "SELECT * FROM sensor_readings WHERE turbine_id = 847 OR status = 'critical'"
good = """
SELECT * FROM sensor_readings WHERE turbine_id = 847
UNION ALL
SELECT * FROM sensor_readings WHERE status = 'critical' AND turbine_id != 847
"""
# 4. No LIMIT on exploratory queries
# Fix: always LIMIT during exploration
bad = "SELECT * FROM sensor_readings ORDER BY timestamp DESC"
good = "SELECT * FROM sensor_readings ORDER BY timestamp DESC LIMIT 1000"
Production Tip --- For analytical queries on large tables, consider creating materialized views for frequently-used aggregations. A materialized view pre-computes and stores the result. Querying it is as fast as querying a small table. Refresh it periodically (e.g., nightly) rather than recomputing on every query.
Chunked Reading: When You Must Use pandas
Sometimes you cannot use Dask or Polars --- perhaps you depend on a pandas-only library, or your production pipeline is deeply pandas-integrated. In these cases, chunked reading lets you process data in manageable pieces.
# Process a large CSV in chunks
chunk_results = []
for chunk in pd.read_csv('large_sensor_data.csv', chunksize=500_000):
# Optimize dtypes per chunk
chunk['turbine_id'] = chunk['turbine_id'].astype('int16')
chunk['vibration_mm_s'] = chunk['vibration_mm_s'].astype('float32')
chunk['temperature_c'] = chunk['temperature_c'].astype('float32')
chunk['status'] = chunk['status'].astype('category')
# Compute partial aggregation
partial = chunk.groupby('turbine_id').agg(
sum_vib=('vibration_mm_s', 'sum'),
count=('vibration_mm_s', 'count'),
max_temp=('temperature_c', 'max')
)
chunk_results.append(partial)
# Combine partial results
combined = pd.concat(chunk_results).groupby(level=0).agg(
sum_vib=('sum_vib', 'sum'),
count=('count', 'sum'),
max_temp=('max_temp', 'max')
)
combined['mean_vib'] = combined['sum_vib'] / combined['count']
print(combined[['mean_vib', 'max_temp', 'count']].head())
Warning
--- Chunked reading works for commutative aggregations (sum, count, max, min) where you can combine partial results. It does not work for operations that need the full dataset at once, such as sorting, median, or machine learning model training. For those, you need Dask, Polars, or sampling.
Sampling Strategies: When You Do Not Need All the Data
Sometimes the right answer is not "use a bigger tool" but "use less data." Many analyses --- exploratory visualization, feature distribution checks, model prototyping --- do not require the full dataset. A well-chosen sample gives you the same insights in a fraction of the time.
# Strategy 1: Simple random sampling
# Good for: EDA, distribution checks, quick prototyping
sample_random = sensor_df.sample(n=100_000, random_state=42)
# Strategy 2: Stratified sampling (preserving group proportions)
# Good for: when you need representative coverage across categories
from sklearn.model_selection import train_test_split
# Stratify by status to preserve the 97%/2.5%/0.5% distribution
sample_stratified, _ = train_test_split(
sensor_df, train_size=100_000, stratify=sensor_df['status'],
random_state=42
)
print("Original distribution:")
print(sensor_df['status'].value_counts(normalize=True).round(4))
print("\nStratified sample distribution:")
print(sample_stratified['status'].value_counts(normalize=True).round(4))
# Strategy 3: Time-based sampling (every Nth row from sorted data)
# Good for: time series where you want temporal coverage
sample_temporal = sensor_df.iloc[::50] # every 50th row
print(f"\nTemporal sample: {len(sample_temporal)} rows")
Original distribution:
normal 0.9700
warning 0.0250
critical 0.0050
Name: status, dtype: float64
Stratified sample distribution:
normal 0.9700
warning 0.0250
critical 0.0050
Name: status, dtype: float64
Temporal sample: 100000 rows
Production Tip --- Develop on a sample, validate on the full dataset. Build your pipeline on 1% of the data where iteration is fast. Once the logic is correct, switch to Dask or Polars for the full run. This is faster than debugging on 100 million rows.
Memory-Mapped Files and Apache Arrow
For datasets that are too large for RAM but need random access (not just sequential processing), memory-mapped files offer an alternative to Dask's partitioned approach.
Memory-Mapped Files
A memory-mapped file maps a file on disk into virtual memory. The operating system loads only the pages you access, not the entire file. This lets you work with files larger than RAM as if they were in-memory arrays.
# Create a memory-mapped NumPy array
np.random.seed(42)
n_readings = 10_000_000
# Write to disk
fp = np.memmap('sensor_mmap.dat', dtype='float32', mode='w+',
shape=(n_readings, 3))
fp[:, 0] = np.random.normal(2.5, 0.4, n_readings) # vibration
fp[:, 1] = np.random.normal(55.0, 3.2, n_readings) # temperature
fp[:, 2] = np.random.randint(0, 1200, n_readings) # turbine_id
fp.flush()
# Read back with memory mapping (does NOT load into RAM)
sensor_mmap = np.memmap('sensor_mmap.dat', dtype='float32', mode='r',
shape=(n_readings, 3))
# Access a slice -- only this portion is loaded into memory
turbine_847 = sensor_mmap[sensor_mmap[:, 2] == 847]
print(f"Turbine 847 readings: {len(turbine_847)}")
print(f"Mean vibration: {turbine_847[:, 0].mean():.3f}")
Turbine 847 readings: 8342
Mean vibration: 2.499
Apache Arrow: The Lingua Franca of Columnar Data
Apache Arrow is a columnar memory format that both Polars and Dask can use. Arrow enables zero-copy data sharing between libraries --- data in Arrow format can be consumed by Polars, pandas (via ArrowDtype), DuckDB, and Spark without copying.
import pyarrow as pa
import pyarrow.parquet as pq
# Read a Parquet file as an Arrow Table
arrow_table = pq.read_table('turbinetech_sensors.parquet')
print(f"Type: {type(arrow_table)}")
print(f"Schema:\n{arrow_table.schema}")
print(f"Num rows: {arrow_table.num_rows}")
# Convert to Polars (zero-copy)
polars_df = pl.from_arrow(arrow_table)
# Convert to pandas with Arrow-backed dtypes (much less memory than default)
pandas_arrow = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)
print(f"\npandas with Arrow dtypes memory: "
f"{pandas_arrow.memory_usage(deep=True).sum() / 1e6:.1f} MB")
print(f"pandas default memory: "
f"{sensor_df.memory_usage(deep=True).sum() / 1e6:.1f} MB")
Type: <class 'pyarrow.lib.Table'>
Schema:
turbine_id: int64
timestamp: timestamp[ns]
vibration_mm_s: double
temperature_c: double
status: string
Num rows: 5000000
pandas with Arrow dtypes memory: 248.5 MB
pandas default memory: 598.3 MB
Arrow-backed pandas dtypes cut memory usage by more than half, because strings are stored in Arrow's efficient binary format rather than as Python objects. In pandas 2.0+, you can enable this globally:
# pandas 2.0+: use Arrow-backed string dtype by default
pd.options.mode.dtype_backend = 'pyarrow'
df_arrow = pd.read_parquet('turbinetech_sensors.parquet')
Choosing the Right Tool: A Decision Framework
With four tools (pandas, Dask, Polars, SQL) and several techniques (dtype optimization, chunked reading, sampling, memory mapping), the choice can feel overwhelming. Here is a practical decision tree.
Step 1: Can the Database Do It?
If your data lives in a database and your operation is a filter, aggregation, or join, push it to SQL first. The database is almost always faster for these operations. Only pull data into Python when you need something SQL cannot do: machine learning, complex statistical tests, custom visualizations.
Step 2: Does It Fit in RAM?
After dtype optimization and smart SQL queries, ask: does the data I actually need fit in RAM?
-
Yes, comfortably (< 1 GB): Use pandas. It is the best-documented, best-integrated tool in the Python data ecosystem. The ecosystem advantage (scikit-learn, matplotlib, seaborn, statsmodels) is real and significant.
-
Yes, but tight (1--5 GB): Use pandas with Arrow-backed dtypes, or use Polars for speed-critical operations and convert to pandas at the modeling boundary.
-
No (> 5 GB): Use Dask or Polars depending on the operation.
Step 3: Dask or Polars?
| Criterion | Dask | Polars |
|---|---|---|
| API | pandas-like | Own expression system |
| Learning curve | Lower (if you know pandas) | Higher (new syntax) |
| Single-machine speed | Good | Excellent (10--50x faster than pandas) |
| Out-of-core | Yes (native) | Limited (streaming mode) |
| Distributed | Yes (Dask Distributed) | No (single machine) |
| Lazy evaluation | Yes | Yes (with query optimizer) |
| Ecosystem | Integrates with scikit-learn, XGBoost | Growing, but less integrated |
| Best for | Larger-than-RAM data, distributed clusters | Single-machine speed, complex transformations |
Use Dask when: - Data does not fit on one machine (distributed computing needed) - You want to parallelize existing pandas code with minimal changes - You need lazy evaluation for a complex multi-step pipeline - Your team already knows pandas and needs to move fast
Use Polars when: - Data fits on one machine but pandas is too slow - You need maximum single-machine performance - You are building a new pipeline (no legacy pandas code to maintain) - Your query involves complex transformations that benefit from the optimizer
The Complete Picture
import polars as pl
import dask.dataframe as dd
import pandas as pd
def load_sensor_data(data_size_gb, operation):
"""Choose the right tool based on data size and operation."""
if data_size_gb < 1:
print("Recommendation: pandas")
print(" - Familiar API, rich ecosystem, fast enough")
return "pandas"
elif data_size_gb < 10:
if operation in ('groupby', 'filter', 'join', 'transform'):
print("Recommendation: Polars")
print(" - 10-50x faster than pandas, query optimizer")
return "polars"
elif operation in ('ml_training', 'statsmodels'):
print("Recommendation: Polars for preprocessing, pandas for modeling")
return "polars_to_pandas"
elif data_size_gb < 100:
if operation in ('groupby', 'filter', 'transform'):
print("Recommendation: Dask (parallel, out-of-core)")
return "dask"
elif operation == 'speed_critical':
print("Recommendation: Polars with streaming/lazy")
return "polars_lazy"
else:
print("Recommendation: Dask with dask-ml")
return "dask"
else:
print("Recommendation: Spark (beyond this chapter's scope)")
print(" - Or: Dask Distributed on a cluster")
return "spark"
# Example usage
load_sensor_data(0.5, 'groupby')
print()
load_sensor_data(8, 'groupby')
print()
load_sensor_data(50, 'filter')
Recommendation: pandas
- Familiar API, rich ecosystem, fast enough
Recommendation: Polars
- 10-50x faster than pandas, query optimizer
Recommendation: Dask (parallel, out-of-core)
Putting It All Together: A Multi-Tool Pipeline
Real-world pipelines often combine multiple tools. Here is a pattern from TurbineTech's production system.
import polars as pl
import pandas as pd
from sklearn.ensemble import IsolationForest
# Step 1: SQL extracts only what we need (hypothetical query)
sql_query = """
SELECT turbine_id, timestamp, vibration_mm_s, temperature_c, status
FROM sensor_readings
WHERE timestamp >= '2024-06-01'
AND timestamp < '2024-07-01'
AND turbine_id IN (SELECT turbine_id FROM turbines WHERE region = 'west')
"""
# result = pd.read_sql(sql_query, engine)
# Step 2: Polars for heavy transformation (simulating with our data)
sensor_pl = pl.read_parquet('turbinetech_sensors.parquet')
features = (
sensor_pl.lazy()
.filter(pl.col('status') != 'normal')
.group_by('turbine_id')
.agg(
pl.col('vibration_mm_s').mean().alias('mean_vib'),
pl.col('vibration_mm_s').std().alias('std_vib'),
pl.col('vibration_mm_s').max().alias('max_vib'),
pl.col('temperature_c').mean().alias('mean_temp'),
pl.col('temperature_c').max().alias('max_temp'),
pl.len().alias('alert_count')
)
.with_columns(
vib_cv=(pl.col('std_vib') / pl.col('mean_vib')),
temp_range=(pl.col('max_temp') - pl.col('mean_temp'))
)
.collect()
)
# Step 3: Convert to pandas for scikit-learn
features_pd = features.to_pandas().set_index('turbine_id')
# Step 4: Anomaly detection with scikit-learn
iso_forest = IsolationForest(
n_estimators=100, contamination=0.05, random_state=42
)
feature_cols = ['mean_vib', 'std_vib', 'max_vib', 'mean_temp',
'max_temp', 'alert_count', 'vib_cv', 'temp_range']
features_pd['anomaly'] = iso_forest.fit_predict(features_pd[feature_cols])
anomalous_turbines = features_pd[features_pd['anomaly'] == -1]
print(f"Anomalous turbines detected: {len(anomalous_turbines)}")
print(f"Top 5 by alert count:")
print(anomalous_turbines.nlargest(5, 'alert_count')[
['mean_vib', 'alert_count', 'vib_cv']
])
Anomalous turbines detected: 59
Top 5 by alert count:
mean_vib alert_count vib_cv
turbine_id
483 2.72 112 0.198
721 2.68 108 0.185
88 2.65 105 0.192
1072 2.61 103 0.177
267 2.58 101 0.189
The pipeline uses each tool where it is strongest: SQL for extraction, Polars for transformation, pandas for scikit-learn integration. No single tool does everything, but together they handle the full workflow efficiently.
File Formats for Large Data
The choice of file format has a dramatic impact on load time and memory usage.
| Format | Read Speed | Write Speed | Compression | Column Selection | pandas | Polars | Dask |
|---|---|---|---|---|---|---|---|
| CSV | Slow | Fast | None | No (reads all) | Yes | Yes | Yes |
| Parquet | Fast | Moderate | Excellent | Yes (columnar) | Yes | Yes | Yes |
| Feather/Arrow IPC | Very fast | Very fast | Good | Yes | Yes | Yes | Yes |
| HDF5 | Fast | Fast | Good | Partial | Yes | No | Yes |
| ORC | Fast | Moderate | Excellent | Yes | Limited | Yes | Limited |
import time
# Benchmark: CSV vs. Parquet vs. Feather for 5M rows
# Write
sensor_df.to_csv('benchmark.csv', index=False)
sensor_df.to_parquet('benchmark.parquet', engine='pyarrow')
sensor_df.to_feather('benchmark.feather')
# Read (full)
start = time.time()
_ = pd.read_csv('benchmark.csv')
csv_time = time.time() - start
start = time.time()
_ = pd.read_parquet('benchmark.parquet')
parquet_time = time.time() - start
start = time.time()
_ = pd.read_feather('benchmark.feather')
feather_time = time.time() - start
print(f"CSV: {csv_time:.2f}s")
print(f"Parquet: {parquet_time:.2f}s")
print(f"Feather: {feather_time:.2f}s")
# Read (column selection -- Parquet and Feather only)
start = time.time()
_ = pd.read_parquet('benchmark.parquet',
columns=['turbine_id', 'vibration_mm_s'])
parquet_cols_time = time.time() - start
print(f"\nParquet (2 of 5 columns): {parquet_cols_time:.2f}s")
CSV: 8.34s
Parquet: 0.91s
Feather: 0.67s
Parquet (2 of 5 columns): 0.38s
Production Tip --- For data that will be read many times and written once (which is most analytical data), always store as Parquet. For data that is read and written frequently in the same session (intermediate pipeline results), use Feather for maximum speed. Never use CSV for large datasets unless you must for compatibility with external systems.
Chapter Summary
pandas is a brilliant tool for datasets that fit comfortably in memory. But the real world does not always cooperate, and the transition from "my data fits in RAM" to "it does not" is one of the most common friction points in a data science career. The tools in this chapter --- Dask, Polars, SQL optimization, and the supporting techniques of dtype optimization, chunked reading, sampling, and smart file formats --- give you the vocabulary and the skills to handle that transition gracefully.
The key insight is not that pandas is bad. It is that pandas is one tool in a toolkit, and a mature data scientist knows when to reach for a different one. That decision is not about memorizing benchmarks. It is about understanding the nature of your bottleneck --- is it memory, speed, or both? --- and matching the tool to the constraint.
Next up: Part VI takes your models from Jupyter notebooks to production systems. The datasets get larger, the stakes get higher, and the tools you learned in this chapter become essential infrastructure.