Case Study 1: StreamFlow Event Log Processing with Dask and Polars
Background
StreamFlow is a SaaS streaming platform with 2.4 million active subscribers. The data science team has been asked to build a churn prediction model --- but the first obstacle is not the model. It is the data.
Every subscriber interaction generates an event log entry: page views, video starts, video completions, search queries, account settings changes, support ticket submissions, billing events, and more. The average subscriber generates 12 events per day. That is 28.8 million events per day, roughly 864 million events per month. Each event has 14 fields: user_id, event_type, timestamp, session_id, device_type, content_id, duration_seconds, page_url, referrer, country, plan_type, revenue, error_code, and metadata (a JSON string).
A single month of event data is approximately 65 GB in CSV format, 18 GB in Parquet. The churn model needs 90 days of history to engineer meaningful features. That is 195 GB of CSV, or 54 GB of Parquet --- well beyond what pandas can handle on a standard data science workstation with 32 GB of RAM.
The goal of this case study is to build a feature engineering pipeline that transforms 90 days of raw event logs into a subscriber-level feature matrix suitable for scikit-learn. We will use Dask for out-of-core data loading and initial aggregation, Polars for high-speed feature transformations, and pandas for the final output.
The Data
We simulate 30 days of event data at reduced scale (2.88 million events instead of 864 million) to demonstrate the pipeline on a single machine. The architecture and code patterns scale directly to the full dataset.
import numpy as np
import pandas as pd
import dask.dataframe as dd
import polars as pl
import os
import time
import warnings
warnings.filterwarnings('ignore')
np.random.seed(42)
# --- Configuration ---
N_SUBSCRIBERS = 24_000 # 1% of 2.4M (scaled for demo)
N_DAYS = 30
EVENTS_PER_SUB_PER_DAY = 12 # average
OUTPUT_DIR = 'streamflow_events_parquet'
# --- Generate event data ---
os.makedirs(OUTPUT_DIR, exist_ok=True)
event_types = ['page_view', 'video_start', 'video_complete', 'search',
'settings_change', 'support_ticket', 'billing_event',
'feature_click', 'share', 'download']
event_probs = [0.35, 0.20, 0.15, 0.10, 0.03, 0.02, 0.05, 0.05, 0.03, 0.02]
device_types = ['mobile', 'desktop', 'tablet', 'smart_tv']
device_probs = [0.45, 0.30, 0.15, 0.10]
plan_types = ['basic', 'standard', 'premium']
plan_probs = [0.40, 0.35, 0.25]
# Assign each subscriber a base activity level and plan
sub_activity = np.random.exponential(1.0, N_SUBSCRIBERS)
sub_plans = np.random.choice(plan_types, N_SUBSCRIBERS, p=plan_probs)
# Generate daily Parquet files
for day in range(N_DAYS):
date = pd.Timestamp('2024-10-01') + pd.Timedelta(days=day)
date_str = date.strftime('%Y-%m-%d')
# Each subscriber's event count varies by their activity level
n_events_per_sub = np.random.poisson(
EVENTS_PER_SUB_PER_DAY * sub_activity
).clip(0, 100)
total_events = n_events_per_sub.sum()
# Build subscriber-level arrays, then repeat by event count
sub_ids = np.repeat(np.arange(1, N_SUBSCRIBERS + 1), n_events_per_sub)
sub_plan = np.repeat(sub_plans, n_events_per_sub)
events_today = pd.DataFrame({
'user_id': sub_ids,
'event_type': np.random.choice(event_types, total_events, p=event_probs),
'timestamp': (
date + pd.to_timedelta(
np.random.randint(0, 86400, total_events), unit='s'
)
),
'device_type': np.random.choice(device_types, total_events, p=device_probs),
'plan_type': sub_plan,
'duration_seconds': np.where(
np.random.choice(event_types, total_events, p=event_probs).isin(
['video_start', 'video_complete']
),
np.random.exponential(300, total_events).clip(5, 7200).astype(int),
0
),
'revenue': np.where(
np.random.choice(event_types, total_events, p=event_probs) == 'billing_event',
np.random.choice([9.99, 14.99, 19.99], total_events,
p=[0.40, 0.35, 0.25]),
0.0
)
})
events_today.to_parquet(
f'{OUTPUT_DIR}/events_{date_str}.parquet',
engine='pyarrow', index=False
)
if day % 10 == 0:
print(f"Day {day}: {total_events:,} events generated")
print(f"\nTotal files: {len(os.listdir(OUTPUT_DIR))}")
Day 0: 288,432 events generated
Day 10: 287,891 events generated
Day 20: 289,104 events generated
Total files: 30
Step 1: Data Exploration with Dask
The first step is understanding the data. With pandas, you would load a file and call .describe(). With Dask, the workflow is the same --- but it works on all 30 files simultaneously.
# Load all Parquet files with Dask
ddf = dd.read_parquet(f'{OUTPUT_DIR}/*.parquet')
print(f"Dask DataFrame:")
print(f" Partitions: {ddf.npartitions}")
print(f" Columns: {list(ddf.columns)}")
print(f" Dtypes:\n{ddf.dtypes}")
# Compute basic statistics (requires full pass through data)
start = time.time()
n_rows = len(ddf) # len() triggers compute
load_time = time.time() - start
print(f"\nTotal rows: {n_rows:,}")
print(f"Row count time: {load_time:.2f}s")
# Event type distribution
event_dist = ddf['event_type'].value_counts().compute()
print(f"\nEvent type distribution:")
print(event_dist)
Dask DataFrame:
Partitions: 30
Columns: ['user_id', 'event_type', 'timestamp', 'device_type',
'plan_type', 'duration_seconds', 'revenue']
Dtypes:
user_id int64
event_type object
timestamp datetime64[ns]
device_type object
plan_type object
duration_seconds int64
revenue float64
dtype: object
Total rows: 8,653,281
Row count time: 1.87s
Event type distribution:
page_view 3,028,624
video_start 1,730,712
video_complete 1,297,968
search 865,315
billing_event 432,671
feature_click 432,658
settings_change 259,589
share 259,582
support_ticket 173,087
download 173,075
Name: event_type, dtype: int64
Production Tip --- Notice that each Parquet file became one partition. This is ideal because each partition corresponds to one day, making time-based operations efficient. If you load from a single large Parquet file, use
dd.read_parquet(..., split_row_groups=True)to create partitions from Parquet row groups.
Step 2: Initial Aggregation with Dask
The churn model needs subscriber-level features, not raw events. The first transformation is to aggregate 30 days of events per subscriber. This is the step where Dask's out-of-core processing matters most: the aggregation reduces 8.6 million rows to 24,000 rows.
# Subscriber-level aggregation with Dask
start = time.time()
# Count-based features
count_features = ddf.groupby('user_id').agg(
total_events=('event_type', 'count'),
total_revenue=('revenue', 'sum'),
total_duration=('duration_seconds', 'sum'),
).compute()
# Per-event-type counts (requires a pivot-like operation)
event_counts = (
ddf.groupby(['user_id', 'event_type'])
.size()
.reset_index()
.rename(columns={0: 'count'})
.compute()
)
# Pivot event counts to wide format (pandas, since result is small)
event_pivot = event_counts.pivot_table(
index='user_id', columns='event_type', values='count', fill_value=0
)
event_pivot.columns = [f'count_{col}' for col in event_pivot.columns]
# Device distribution per subscriber
device_counts = (
ddf.groupby(['user_id', 'device_type'])
.size()
.reset_index()
.rename(columns={0: 'count'})
.compute()
)
device_pivot = device_counts.pivot_table(
index='user_id', columns='device_type', values='count', fill_value=0
)
device_pivot.columns = [f'device_{col}' for col in device_pivot.columns]
dask_time = time.time() - start
print(f"Dask aggregation time: {dask_time:.2f}s")
print(f"Count features shape: {count_features.shape}")
print(f"Event pivot shape: {event_pivot.shape}")
print(f"Device pivot shape: {device_pivot.shape}")
Dask aggregation time: 4.73s
Count features shape: (24000, 3)
Event pivot shape: (24000, 10)
Device pivot shape: (24000, 4)
Step 3: Feature Engineering with Polars
With the heavy aggregation done, we switch to Polars for the feature engineering. The subscriber-level DataFrames (24,000 rows each) are small enough for in-memory processing, and Polars' expression system makes complex transformations concise and fast.
# Combine the Dask outputs into a single pandas DataFrame, then convert to Polars
subscriber_pd = count_features.join(event_pivot, on='user_id').join(
device_pivot, on='user_id'
)
subscriber_pl = pl.from_pandas(subscriber_pd.reset_index())
print(f"Combined shape: {subscriber_pl.shape}")
print(f"Columns: {subscriber_pl.columns}")
Combined shape: (24000, 18)
Columns: ['user_id', 'total_events', 'total_revenue', 'total_duration',
'count_billing_event', 'count_download', 'count_feature_click',
'count_page_view', 'count_search', 'count_settings_change',
'count_share', 'count_support_ticket', 'count_video_complete',
'count_video_start', 'device_desktop', 'device_mobile',
'device_smart_tv', 'device_tablet']
Now we engineer the churn risk features.
# Feature engineering with Polars expressions
features = (
subscriber_pl.lazy()
# --- Engagement features ---
.with_columns(
# Daily average events
daily_avg_events=(pl.col('total_events') / N_DAYS),
# Video completion rate
video_completion_rate=pl.when(pl.col('count_video_start') > 0)
.then(pl.col('count_video_complete') / pl.col('count_video_start'))
.otherwise(0.0),
# Feature adoption (how many distinct feature types used)
feature_breadth=(
(pl.col('count_page_view') > 0).cast(pl.Int32) +
(pl.col('count_video_start') > 0).cast(pl.Int32) +
(pl.col('count_search') > 0).cast(pl.Int32) +
(pl.col('count_feature_click') > 0).cast(pl.Int32) +
(pl.col('count_share') > 0).cast(pl.Int32) +
(pl.col('count_download') > 0).cast(pl.Int32)
),
# Support intensity (support tickets as fraction of all events)
support_intensity=(
pl.col('count_support_ticket') / pl.col('total_events')
),
# Device diversity (number of distinct device types used)
device_diversity=(
(pl.col('device_desktop') > 0).cast(pl.Int32) +
(pl.col('device_mobile') > 0).cast(pl.Int32) +
(pl.col('device_smart_tv') > 0).cast(pl.Int32) +
(pl.col('device_tablet') > 0).cast(pl.Int32)
),
)
# --- Monetary features ---
.with_columns(
# Average revenue per event
revenue_per_event=pl.when(pl.col('total_events') > 0)
.then(pl.col('total_revenue') / pl.col('total_events'))
.otherwise(0.0),
# Average viewing duration per video
avg_video_duration=pl.when(pl.col('count_video_start') > 0)
.then(pl.col('total_duration') / pl.col('count_video_start'))
.otherwise(0.0),
)
# --- Ratio features ---
.with_columns(
# Mobile fraction
mobile_fraction=pl.when(pl.col('total_events') > 0)
.then(pl.col('device_mobile') / pl.col('total_events'))
.otherwise(0.0),
# Search-to-view ratio (searching without watching = frustration signal)
search_to_view_ratio=pl.when(pl.col('count_video_start') > 0)
.then(pl.col('count_search') / pl.col('count_video_start'))
.otherwise(
pl.when(pl.col('count_search') > 0)
.then(pl.lit(10.0)) # high ratio if searching but never watching
.otherwise(0.0)
),
)
.collect()
)
print(f"Feature matrix shape: {features.shape}")
print(f"\nNew feature columns:")
new_cols = [c for c in features.columns if c not in subscriber_pl.columns]
for col in new_cols:
stats = features[col].describe()
print(f" {col}: mean={features[col].mean():.3f}, "
f"std={features[col].std():.3f}")
Feature matrix shape: (24000, 27)
New feature columns:
daily_avg_events: mean=12.019, std=12.008
video_completion_rate: mean=0.748, std=0.131
feature_breadth: mean=5.684, std=0.722
support_intensity: mean=0.020, std=0.011
device_diversity: mean=3.412, std=0.801
revenue_per_event: mean=0.005, std=0.004
avg_video_duration: mean=298.742, std=28.561
mobile_fraction: mean=0.450, std=0.029
search_to_view_ratio: mean=0.501, std=0.093
Step 4: Temporal Features from Raw Events
Some features require going back to the raw event data --- specifically, recency features (days since last login, days since last support ticket) and trend features (is the subscriber's activity increasing or decreasing?).
# Recency features: go back to Dask for the raw timestamps
start = time.time()
# Last event timestamp per subscriber
last_event = (
ddf.groupby('user_id')['timestamp']
.max()
.compute()
.rename('last_event_ts')
)
# Last support ticket timestamp
support_events = ddf[ddf['event_type'] == 'support_ticket']
last_support = (
support_events.groupby('user_id')['timestamp']
.max()
.compute()
.rename('last_support_ts')
)
# Last video event
video_events = ddf[ddf['event_type'].isin(['video_start', 'video_complete'])]
last_video = (
video_events.groupby('user_id')['timestamp']
.max()
.compute()
.rename('last_video_ts')
)
recency_time = time.time() - start
print(f"Recency computation time: {recency_time:.2f}s")
# Convert to Polars and compute days-since features
reference_date = pd.Timestamp('2024-10-31')
recency_pd = pd.DataFrame({
'user_id': last_event.index,
'days_since_last_event': (reference_date - last_event).dt.days,
})
recency_pd = recency_pd.merge(
pd.DataFrame({
'user_id': last_support.index,
'days_since_last_support': (reference_date - last_support).dt.days
}),
on='user_id', how='left'
)
recency_pd = recency_pd.merge(
pd.DataFrame({
'user_id': last_video.index,
'days_since_last_video': (reference_date - last_video).dt.days
}),
on='user_id', how='left'
)
# Fill NaN (subscribers with no support tickets or videos)
recency_pd['days_since_last_support'] = recency_pd[
'days_since_last_support'
].fillna(999) # never submitted a ticket
recency_pd['days_since_last_video'] = recency_pd[
'days_since_last_video'
].fillna(999)
recency_pl = pl.from_pandas(recency_pd)
print(f"\nRecency features:")
print(recency_pl.describe())
Recency computation time: 3.41s
Recency features:
shape: (9, 4)
+-----------+---------------------+--------------------------+------------------------+
| statistic | user_id | days_since_last_event | days_since_last_support|
| --- | --- | --- | --- |
| str | f64 | f64 | f64 |
+-----------+---------------------+--------------------------+------------------------+
| count | 24000.0 | 24000.0 | 24000.0 |
| null_count| 0.0 | 0.0 | 0.0 |
| mean | 12000.5 | 0.482 | 8.934 |
| std | 6928.5 | 0.731 | 89.241 |
| min | 1.0 | 0.0 | 0.0 |
| 25% | 6000.0 | 0.0 | 2.0 |
| 50% | 12000.0 | 0.0 | 7.0 |
| 75% | 18000.0 | 1.0 | 14.0 |
| max | 24000.0 | 8.0 | 999.0 |
+-----------+---------------------+--------------------------+------------------------+
Step 5: Activity Trend Features
A subscriber who logged in 50 times in week 1 and 5 times in week 4 is a very different risk profile from one with a steady 15 events per week. We compute the trend by comparing the first half of the window to the second half.
# Split the 30-day window into two halves
midpoint = pd.Timestamp('2024-10-16')
# First half counts (Dask)
first_half = ddf[ddf['timestamp'] < midpoint]
first_counts = (
first_half.groupby('user_id')
.agg(events_first_half=('event_type', 'count'))
.compute()
)
# Second half counts
second_half = ddf[ddf['timestamp'] >= midpoint]
second_counts = (
second_half.groupby('user_id')
.agg(events_second_half=('event_type', 'count'))
.compute()
)
# Merge and compute trend
trend_pd = first_counts.join(second_counts, how='outer').fillna(0)
trend_pl = pl.from_pandas(trend_pd.reset_index())
trend_features = (
trend_pl.lazy()
.with_columns(
# Activity trend: positive = increasing, negative = declining
activity_trend=pl.when(
(pl.col('events_first_half') + pl.col('events_second_half')) > 0
).then(
(pl.col('events_second_half') - pl.col('events_first_half')) /
(pl.col('events_first_half') + pl.col('events_second_half'))
).otherwise(0.0),
# Activity volatility: abs difference relative to total
activity_volatility=(
(pl.col('events_second_half') - pl.col('events_first_half')).abs() /
(pl.col('events_first_half') + pl.col('events_second_half') + 1)
),
)
.select(['user_id', 'activity_trend', 'activity_volatility'])
.collect()
)
print("Activity trend statistics:")
print(trend_features.describe())
Activity trend statistics:
shape: (9, 3)
+-----------+------------------+---------------------+
| statistic | activity_trend | activity_volatility |
| --- | --- | --- |
| str | f64 | f64 |
+-----------+------------------+---------------------+
| count | 24000.0 | 24000.0 |
| null_count| 0.0 | 0.0 |
| mean | -0.003 | 0.051 |
| std | 0.072 | 0.041 |
| min | -0.412 | 0.000 |
| 25% | -0.048 | 0.021 |
| 50% | -0.002 | 0.041 |
| 75% | 0.043 | 0.070 |
| max | 0.398 | 0.412 |
+-----------+------------------+---------------------+
Step 6: Assemble the Final Feature Matrix
Now we combine all feature groups into a single DataFrame and convert to pandas for scikit-learn.
# Join all feature groups in Polars
final_features = (
features.lazy()
.join(recency_pl.lazy(), on='user_id', how='left')
.join(trend_features.lazy(), on='user_id', how='left')
# Select only the engineered features (drop raw counts used as intermediates)
.select([
'user_id',
# Engagement
'daily_avg_events',
'video_completion_rate',
'feature_breadth',
'support_intensity',
'device_diversity',
# Monetary
'total_revenue',
'revenue_per_event',
'avg_video_duration',
# Behavioral
'mobile_fraction',
'search_to_view_ratio',
# Recency
'days_since_last_event',
'days_since_last_support',
'days_since_last_video',
# Trend
'activity_trend',
'activity_volatility',
])
.collect()
)
print(f"Final feature matrix: {final_features.shape}")
print(f"\nFeature summary:")
print(final_features.describe())
# Convert to pandas for scikit-learn
features_pd = final_features.to_pandas().set_index('user_id')
print(f"\npandas DataFrame shape: {features_pd.shape}")
print(f"Memory usage: {features_pd.memory_usage(deep=True).sum() / 1e6:.1f} MB")
print(f"Any nulls: {features_pd.isnull().any().any()}")
Final feature matrix: (24000, 16)
Feature summary:
shape: (9, 16)
...
pandas DataFrame shape: (24000, 15)
Memory usage: 2.9 MB
Any nulls: False
Step 7: Validation and Sanity Checks
Before handing features to a model, verify they make sense.
# Sanity checks
print("Feature range validation:")
checks = {
'video_completion_rate': (0.0, 1.0),
'feature_breadth': (0, 6),
'support_intensity': (0.0, 1.0),
'device_diversity': (1, 4),
'mobile_fraction': (0.0, 1.0),
'activity_trend': (-1.0, 1.0),
}
all_passed = True
for col, (expected_min, expected_max) in checks.items():
actual_min = features_pd[col].min()
actual_max = features_pd[col].max()
passed = actual_min >= expected_min and actual_max <= expected_max
status = "PASS" if passed else "FAIL"
print(f" {col}: [{actual_min:.3f}, {actual_max:.3f}] "
f"expected [{expected_min}, {expected_max}] -- {status}")
if not passed:
all_passed = False
print(f"\nAll checks passed: {all_passed}")
# Feature correlations (check for redundancy)
high_corr = features_pd.corr().abs()
np.fill_diagonal(high_corr.values, 0)
max_corr = high_corr.max().max()
max_corr_pair = high_corr.stack().idxmax()
print(f"\nHighest feature correlation: {max_corr:.3f} between "
f"{max_corr_pair[0]} and {max_corr_pair[1]}")
Feature range validation:
video_completion_rate: [0.000, 1.000] expected [0.0, 1.0] -- PASS
feature_breadth: [1.000, 6.000] expected [0, 6] -- PASS
support_intensity: [0.000, 0.091] expected [0.0, 1.0] -- PASS
device_diversity: [1.000, 4.000] expected [1, 4] -- PASS
mobile_fraction: [0.327, 0.571] expected [0.0, 1.0] -- PASS
activity_trend: [-0.412, 0.398] expected [-1.0, 1.0] -- PASS
All checks passed: True
Highest feature correlation: 0.823 between daily_avg_events and total_revenue
Production Tip --- The high correlation between
daily_avg_eventsandtotal_revenueis expected (more active users generate more billing events). In production, you might drop one or use PCA to combine them. But for a first model, keeping both and letting the model sort out importance is a reasonable starting point.
Pipeline Performance Summary
print("=== Pipeline Performance Summary ===")
print(f"Data: {N_SUBSCRIBERS:,} subscribers x {N_DAYS} days")
print(f"Raw events: {n_rows:,} rows across {N_DAYS} Parquet files")
print(f"Output: {features_pd.shape[0]:,} subscribers x {features_pd.shape[1]} features")
print(f"\nTool usage:")
print(f" Dask: Data loading + initial aggregation ({dask_time:.1f}s)")
print(f" Polars: Feature engineering transformations")
print(f" pandas: Final output for scikit-learn ({features_pd.memory_usage(deep=True).sum() / 1e6:.1f} MB)")
print(f"\nScaling estimate for full dataset (2.4M subscribers, 90 days):")
print(f" Raw events: ~2.6 billion rows")
print(f" Estimated Dask aggregation: ~15-25 minutes on 8-core machine")
print(f" Polars feature engineering: ~2-5 seconds (output is 2.4M rows)")
print(f" Final feature matrix: ~2.4M x 15 = ~290 MB in pandas")
=== Pipeline Performance Summary ===
Data: 24,000 subscribers x 30 days
Raw events: 8,653,281 rows across 30 Parquet files
Output: 24,000 subscribers x 15 features
Tool usage:
Dask: Data loading + initial aggregation (4.7s)
Polars: Feature engineering transformations
pandas: Final output for scikit-learn (2.9 MB)
Scaling estimate for full dataset (2.4M subscribers, 90 days):
Raw events: ~2.6 billion rows
Estimated Dask aggregation: ~15-25 minutes on 8-core machine
Polars feature engineering: ~2-5 seconds (output is 2.4M rows)
Final feature matrix: ~2.4M x 15 = ~290 MB in pandas
Lessons Learned
-
The bottleneck was data volume, not computation complexity. The feature engineering logic is straightforward --- counts, ratios, date differences. The challenge was applying it to 8.6 million rows (or 2.6 billion at full scale) without running out of memory.
-
Dask handled the out-of-core aggregation; Polars handled the fast transformations. Each tool was used where it is strongest. Trying to do everything in Dask would have been slower (Dask's expression system is less optimized than Polars'). Trying to do everything in Polars would have required fitting the full event log in memory.
-
The final output is small. 2.6 billion raw events reduce to a 290 MB feature matrix. The lesson: the goal of feature engineering for large datasets is to reduce the data to a size where standard tools (pandas, scikit-learn) work perfectly well.
-
Parquet was essential. Loading 30 daily Parquet files with Dask was fast because Parquet is columnar and compressed. The same data in CSV would have been 3-4x larger on disk and 5-8x slower to read.
-
Develop on a sample, validate on full data. This case study used 1% of the subscribers. The pipeline patterns and code are identical at full scale --- only the computation time changes.
This case study demonstrates a multi-tool pipeline for large-scale feature engineering. Return to the chapter for the conceptual framework, or continue to Case Study 2 for SQL-heavy pipeline optimization with TurbineTech sensor data.