Exercises: Chapter 28
Working with Large Datasets
Exercise 1: Dtype Detective (Conceptual + Code)
You receive a dataset with the following columns and sample values:
| Column | Sample Values | pandas dtype | Unique Values |
|---|---|---|---|
| customer_id | 10001, 10002, ... | int64 | 2,400,000 |
| plan_type | "basic", "premium", "enterprise" | object | 3 |
| monthly_revenue | 9.99, 14.99, 29.99, 49.99 | float64 | 4 |
| signup_date | "2020-03-15", "2021-07-22" | object | ~730 |
| is_active | True, False | bool | 2 |
| country_code | "US", "CA", "GB", ... | object | 47 |
| last_login_days_ago | 0, 1, 2, ..., 365 | int64 | 366 |
| lifetime_events | 0, 1, ..., 50000 | int64 | ~35,000 |
The dataset has 2.4 million rows and currently consumes 1.8 GB in memory.
a) For each column, recommend the optimal dtype. Justify each choice, explaining the tradeoff between precision and memory.
b) Write a function optimize_streamflow(df) that applies your optimizations. Estimate the memory savings as a percentage.
c) The monthly_revenue column has only 4 unique values. Would you convert it to category? What are the pros and cons of treating a numeric column as categorical?
d) A colleague argues that dtype optimization is "premature optimization" and not worth the effort. Under what circumstances is this argument valid? Under what circumstances is it wrong?
e) After optimization, the dataset fits comfortably in pandas. But your pipeline also needs to compute a 90-day rolling average of lifetime_events per customer, sorted by signup date. Does the dtype-optimized DataFrame change your tool recommendation? Why or why not?
Exercise 2: Dask Task Graphs (Code)
Run the following code and answer the questions.
import dask.dataframe as dd
import pandas as pd
import numpy as np
np.random.seed(42)
n = 10_000_000
# Create a large DataFrame and save as partitioned Parquet
df = pd.DataFrame({
'user_id': np.random.randint(1, 50001, n),
'event_type': np.random.choice(['click', 'view', 'purchase', 'signup'], n,
p=[0.5, 0.35, 0.1, 0.05]),
'revenue': np.where(
np.random.choice(['click', 'view', 'purchase', 'signup'], n,
p=[0.5, 0.35, 0.1, 0.05]) == 'purchase',
np.round(np.random.exponential(25, n), 2),
0.0
),
'timestamp': pd.date_range('2024-01-01', periods=n, freq='s')
})
df.to_parquet('streamflow_events/', partition_cols=['event_type'], engine='pyarrow')
# Load with Dask
ddf = dd.read_parquet('streamflow_events/')
a) How many partitions does the Dask DataFrame have? Why does the number of partitions match the number of unique event_type values?
b) Write a Dask pipeline that computes the total revenue per user for purchase events only. Chain the operations lazily (filter, groupby, sum) and then call .compute(). How many tasks are in the task graph before computation?
c) Rewrite the same pipeline but call .compute() after the filter step and again after the groupby. Compare the wall-clock time of the two approaches. Why is the single-compute version faster?
d) Use .map_partitions() to add a column hour_of_day extracted from timestamp. Why is map_partitions preferred over .apply(lambda row: row['timestamp'].hour, axis=1) in Dask?
e) A Dask DataFrame with 200 partitions runs a groupby-aggregation. The result has 50,000 unique groups. Explain what happens during the "shuffle" phase of this operation. Why does the number of partitions affect shuffle performance?
Exercise 3: Polars Expression Translation (Code)
Translate each of the following pandas operations into Polars. For each, write both the eager and lazy versions.
import pandas as pd
import numpy as np
np.random.seed(42)
n = 1_000_000
df = pd.DataFrame({
'turbine_id': np.random.randint(1, 1201, n),
'reading_type': np.random.choice(['vibration', 'temperature', 'pressure'], n),
'value': np.round(np.random.normal(50, 10, n), 2),
'quality_flag': np.random.choice(['good', 'suspect', 'bad'], n,
p=[0.9, 0.07, 0.03]),
'timestamp': pd.date_range('2024-01-01', periods=n, freq='s')
})
a) Filter rows where quality_flag == 'good' and value > 60.
b) Group by turbine_id and reading_type, compute mean and standard deviation of value, and sort by mean descending.
c) Add a column z_score defined as (value - mean_value) / std_value where mean and std are computed per reading_type group.
d) Pivot the data so that each row is a turbine_id and each column is a reading_type, with cells containing the mean value.
e) For each translation, run both the pandas and Polars versions on the full 1M rows and record the execution times. Which operations show the largest speedup? Hypothesize why.
Exercise 4: SQL Push-Down (Code + Analysis)
You have access to a PostgreSQL database (or SQLite for local testing) with the following schema:
CREATE TABLE events (
event_id BIGINT PRIMARY KEY,
user_id INTEGER NOT NULL,
event_type VARCHAR(20) NOT NULL,
revenue DECIMAL(10,2),
created_at TIMESTAMP NOT NULL
);
-- 50 million rows, covering 2023-01-01 to 2024-12-31
a) Write two versions of a query that computes "monthly revenue by event type for 2024": one that pulls all data to pandas and aggregates there, one that does the aggregation in SQL. Explain why the SQL version is faster even if the database and Python are on the same machine.
b) The query SELECT * FROM events WHERE user_id = 12345 takes 8 seconds. After creating an index on user_id, it takes 0.003 seconds. Explain this 2,600x speedup in terms of how the database scans data with and without an index.
c) Write a query that computes the 7-day rolling average of daily revenue. Can this be done entirely in SQL (using window functions), or do you need to pull data to Python?
d) Your manager asks for "all events from users who made a purchase in December 2024." Write this as a subquery and as a JOIN. Which is likely faster, and why?
e) The database has 50 million rows, and you need to train a model on all of them. Pulling them all to Python at once causes a memory error. Describe two strategies for handling this: one using SQL-side sampling, one using chunked fetching with pd.read_sql(..., chunksize=...).
Exercise 5: Tool Selection Decision (Scenario-Based)
For each scenario, recommend the primary tool (pandas, Polars, Dask, SQL, or a combination) and justify your choice.
a) Scenario A: 200 MB CSV file, need to compute mean and median of 5 numeric columns grouped by 2 categorical columns. One-off analysis.
b) Scenario B: 15 GB of Parquet files (one per day for 365 days), each with 20 columns. Need to filter on 3 columns, aggregate by 1, and produce a summary DataFrame of ~500 rows. Runs nightly in production.
c) Scenario C: 800 MB CSV, but the analysis requires training a Random Forest with scikit-learn on all rows. The feature engineering involves 3 rolling-window calculations and 2 merge operations.
d) Scenario D: A PostgreSQL table with 2 billion rows and 50 columns. You need 5 columns for 3 months of data (~125 million rows), grouped into hourly averages (~2,160 rows of output).
e) Scenario E: 40 GB of JSON log files, semi-structured (varying fields per record). Need to extract 8 fields, parse timestamps, filter by date range, and compute session-level aggregations.
f) For each scenario, describe what would go wrong if you used plain pandas with pd.read_csv() or pd.read_sql("SELECT * ...").
Exercise 6: Memory Profiling (Code)
Write a memory profiling script that demonstrates the memory behavior of pandas, Polars, and chunked reading on the same dataset.
import tracemalloc
import pandas as pd
import polars as pl
import numpy as np
np.random.seed(42)
def generate_test_data(n_rows, filepath):
"""Generate a test CSV file with n_rows rows."""
df = pd.DataFrame({
'id': np.arange(n_rows),
'category': np.random.choice(['A', 'B', 'C', 'D', 'E'], n_rows),
'value_1': np.round(np.random.normal(100, 25, n_rows), 2),
'value_2': np.round(np.random.exponential(50, n_rows), 2),
'label': np.random.choice(['pos', 'neg'], n_rows)
})
df.to_csv(filepath, index=False)
return df
# Generate test data
generate_test_data(5_000_000, 'memory_test.csv')
a) Use tracemalloc to measure peak memory usage when loading the CSV with pandas, with pandas + dtype optimization, and with Polars. Report the peak memory for each and the ratio.
b) Implement chunked reading that computes value_1.mean() grouped by category. Measure peak memory during chunked processing vs. loading the full DataFrame. What is the peak memory ratio?
c) A colleague claims "Polars uses less memory because it does not copy the data." Test this claim by loading the same Parquet file with both pandas (default) and Polars, measuring memory with tracemalloc. Is the claim accurate? What is the actual explanation for the memory difference?
d) Load the CSV into pandas, convert category and label to categorical dtype, and measure memory again. Then convert value_1 and value_2 to float32. At what point do the diminishing returns of dtype optimization make it not worth the complexity?
Exercise 7: End-to-End Pipeline (Project)
Build a complete data pipeline for the following scenario.
StreamFlow has 2.4 million subscribers. Each subscriber generates an average of 12 events per day (page views, feature clicks, support tickets, billing events). That is approximately 28.8 million events per day, stored in daily Parquet files.
Your task: build a pipeline that, given 30 days of event data (~864 million events), produces a churn risk feature matrix with one row per subscriber and 15 engineered features.
a) Design the feature list. Include at least: - 3 count-based features (e.g., total events, support tickets, billing failures) - 3 recency features (e.g., days since last login, days since last purchase) - 3 frequency features (e.g., daily average events, weekly session count) - 3 monetary features (e.g., total revenue, average transaction value) - 3 behavioral features (e.g., feature adoption rate, engagement trend)
b) Implement the pipeline using Dask for data loading and initial aggregation, Polars for the feature engineering transformations, and pandas for the final output (compatible with scikit-learn).
c) Add error handling: what happens if a daily Parquet file is missing? If a subscriber has zero events in the 30-day window?
d) Benchmark the pipeline: measure wall-clock time and peak memory for the full dataset and for a 1% sample. Report the scaling factor.
e) Write a short memo (5-7 sentences) to a non-technical stakeholder explaining why this pipeline uses three different tools and why it cannot simply use pandas.
Exercise 8: Polars vs. Dask Head-to-Head (Benchmark)
Design and run a benchmark comparing Polars and Dask on the following operations, using datasets of 1M, 10M, and 50M rows.
import numpy as np
import pandas as pd
def generate_benchmark_data(n_rows):
np.random.seed(42)
return pd.DataFrame({
'group_key': np.random.randint(0, 1000, n_rows),
'secondary_key': np.random.choice(['alpha', 'beta', 'gamma', 'delta'], n_rows),
'float_col': np.random.normal(0, 1, n_rows),
'int_col': np.random.randint(0, 10000, n_rows),
'string_col': np.random.choice([f'val_{i}' for i in range(100)], n_rows)
})
a) Filter: Select rows where float_col > 1.5 and int_col < 5000.
b) GroupBy-Agg: Group by group_key and compute mean, std, min, max of float_col.
c) Multi-key GroupBy: Group by both group_key and secondary_key, compute count and sum.
d) String operation: Filter rows where string_col starts with "val_5" (string matching).
e) Join: Self-join the DataFrame on group_key (limit to 10M rows for the join to avoid memory issues).
For each operation and dataset size, record wall-clock time for pandas, Polars (eager), Polars (lazy), and Dask. Present results in a table and identify which tool wins for each operation type and scale.
Solutions to selected exercises appear in the appendix. Full solutions with benchmarking results are available in the course repository.