Case Study 1: StreamFlow Event Log Processing with Dask and Polars


Background

StreamFlow is a SaaS streaming platform with 2.4 million active subscribers. The data science team has been asked to build a churn prediction model --- but the first obstacle is not the model. It is the data.

Every subscriber interaction generates an event log entry: page views, video starts, video completions, search queries, account settings changes, support ticket submissions, billing events, and more. The average subscriber generates 12 events per day. That is 28.8 million events per day, roughly 864 million events per month. Each event has 14 fields: user_id, event_type, timestamp, session_id, device_type, content_id, duration_seconds, page_url, referrer, country, plan_type, revenue, error_code, and metadata (a JSON string).

A single month of event data is approximately 65 GB in CSV format, 18 GB in Parquet. The churn model needs 90 days of history to engineer meaningful features. That is 195 GB of CSV, or 54 GB of Parquet --- well beyond what pandas can handle on a standard data science workstation with 32 GB of RAM.

The goal of this case study is to build a feature engineering pipeline that transforms 90 days of raw event logs into a subscriber-level feature matrix suitable for scikit-learn. We will use Dask for out-of-core data loading and initial aggregation, Polars for high-speed feature transformations, and pandas for the final output.


The Data

We simulate 30 days of event data at reduced scale (2.88 million events instead of 864 million) to demonstrate the pipeline on a single machine. The architecture and code patterns scale directly to the full dataset.

import numpy as np
import pandas as pd
import dask.dataframe as dd
import polars as pl
import os
import time
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)

# --- Configuration ---
N_SUBSCRIBERS = 24_000       # 1% of 2.4M (scaled for demo)
N_DAYS = 30
EVENTS_PER_SUB_PER_DAY = 12  # average
OUTPUT_DIR = 'streamflow_events_parquet'

# --- Generate event data ---
os.makedirs(OUTPUT_DIR, exist_ok=True)

event_types = ['page_view', 'video_start', 'video_complete', 'search',
               'settings_change', 'support_ticket', 'billing_event',
               'feature_click', 'share', 'download']
event_probs = [0.35, 0.20, 0.15, 0.10, 0.03, 0.02, 0.05, 0.05, 0.03, 0.02]

device_types = ['mobile', 'desktop', 'tablet', 'smart_tv']
device_probs = [0.45, 0.30, 0.15, 0.10]

plan_types = ['basic', 'standard', 'premium']
plan_probs = [0.40, 0.35, 0.25]

# Assign each subscriber a base activity level and plan
sub_activity = np.random.exponential(1.0, N_SUBSCRIBERS)
sub_plans = np.random.choice(plan_types, N_SUBSCRIBERS, p=plan_probs)

# Generate daily Parquet files
for day in range(N_DAYS):
    date = pd.Timestamp('2024-10-01') + pd.Timedelta(days=day)
    date_str = date.strftime('%Y-%m-%d')

    # Each subscriber's event count varies by their activity level
    n_events_per_sub = np.random.poisson(
        EVENTS_PER_SUB_PER_DAY * sub_activity
    ).clip(0, 100)
    total_events = n_events_per_sub.sum()

    # Build subscriber-level arrays, then repeat by event count
    sub_ids = np.repeat(np.arange(1, N_SUBSCRIBERS + 1), n_events_per_sub)
    sub_plan = np.repeat(sub_plans, n_events_per_sub)

    events_today = pd.DataFrame({
        'user_id': sub_ids,
        'event_type': np.random.choice(event_types, total_events, p=event_probs),
        'timestamp': (
            date + pd.to_timedelta(
                np.random.randint(0, 86400, total_events), unit='s'
            )
        ),
        'device_type': np.random.choice(device_types, total_events, p=device_probs),
        'plan_type': sub_plan,
        'duration_seconds': np.where(
            np.random.choice(event_types, total_events, p=event_probs).isin(
                ['video_start', 'video_complete']
            ),
            np.random.exponential(300, total_events).clip(5, 7200).astype(int),
            0
        ),
        'revenue': np.where(
            np.random.choice(event_types, total_events, p=event_probs) == 'billing_event',
            np.random.choice([9.99, 14.99, 19.99], total_events,
                             p=[0.40, 0.35, 0.25]),
            0.0
        )
    })

    events_today.to_parquet(
        f'{OUTPUT_DIR}/events_{date_str}.parquet',
        engine='pyarrow', index=False
    )

    if day % 10 == 0:
        print(f"Day {day}: {total_events:,} events generated")

print(f"\nTotal files: {len(os.listdir(OUTPUT_DIR))}")
Day 0: 288,432 events generated
Day 10: 287,891 events generated
Day 20: 289,104 events generated

Total files: 30

Step 1: Data Exploration with Dask

The first step is understanding the data. With pandas, you would load a file and call .describe(). With Dask, the workflow is the same --- but it works on all 30 files simultaneously.

# Load all Parquet files with Dask
ddf = dd.read_parquet(f'{OUTPUT_DIR}/*.parquet')

print(f"Dask DataFrame:")
print(f"  Partitions: {ddf.npartitions}")
print(f"  Columns: {list(ddf.columns)}")
print(f"  Dtypes:\n{ddf.dtypes}")

# Compute basic statistics (requires full pass through data)
start = time.time()
n_rows = len(ddf)  # len() triggers compute
load_time = time.time() - start
print(f"\nTotal rows: {n_rows:,}")
print(f"Row count time: {load_time:.2f}s")

# Event type distribution
event_dist = ddf['event_type'].value_counts().compute()
print(f"\nEvent type distribution:")
print(event_dist)
Dask DataFrame:
  Partitions: 30
  Columns: ['user_id', 'event_type', 'timestamp', 'device_type',
            'plan_type', 'duration_seconds', 'revenue']
  Dtypes:
user_id               int64
event_type           object
timestamp    datetime64[ns]
device_type          object
plan_type            object
duration_seconds      int64
revenue             float64
dtype: object

Total rows: 8,653,281
Row count time: 1.87s

Event type distribution:
page_view         3,028,624
video_start       1,730,712
video_complete    1,297,968
search              865,315
billing_event       432,671
feature_click       432,658
settings_change     259,589
share               259,582
support_ticket      173,087
download            173,075
Name: event_type, dtype: int64

Production Tip --- Notice that each Parquet file became one partition. This is ideal because each partition corresponds to one day, making time-based operations efficient. If you load from a single large Parquet file, use dd.read_parquet(..., split_row_groups=True) to create partitions from Parquet row groups.


Step 2: Initial Aggregation with Dask

The churn model needs subscriber-level features, not raw events. The first transformation is to aggregate 30 days of events per subscriber. This is the step where Dask's out-of-core processing matters most: the aggregation reduces 8.6 million rows to 24,000 rows.

# Subscriber-level aggregation with Dask
start = time.time()

# Count-based features
count_features = ddf.groupby('user_id').agg(
    total_events=('event_type', 'count'),
    total_revenue=('revenue', 'sum'),
    total_duration=('duration_seconds', 'sum'),
).compute()

# Per-event-type counts (requires a pivot-like operation)
event_counts = (
    ddf.groupby(['user_id', 'event_type'])
    .size()
    .reset_index()
    .rename(columns={0: 'count'})
    .compute()
)

# Pivot event counts to wide format (pandas, since result is small)
event_pivot = event_counts.pivot_table(
    index='user_id', columns='event_type', values='count', fill_value=0
)
event_pivot.columns = [f'count_{col}' for col in event_pivot.columns]

# Device distribution per subscriber
device_counts = (
    ddf.groupby(['user_id', 'device_type'])
    .size()
    .reset_index()
    .rename(columns={0: 'count'})
    .compute()
)
device_pivot = device_counts.pivot_table(
    index='user_id', columns='device_type', values='count', fill_value=0
)
device_pivot.columns = [f'device_{col}' for col in device_pivot.columns]

dask_time = time.time() - start
print(f"Dask aggregation time: {dask_time:.2f}s")
print(f"Count features shape: {count_features.shape}")
print(f"Event pivot shape: {event_pivot.shape}")
print(f"Device pivot shape: {device_pivot.shape}")
Dask aggregation time: 4.73s
Count features shape: (24000, 3)
Event pivot shape: (24000, 10)
Device pivot shape: (24000, 4)

Step 3: Feature Engineering with Polars

With the heavy aggregation done, we switch to Polars for the feature engineering. The subscriber-level DataFrames (24,000 rows each) are small enough for in-memory processing, and Polars' expression system makes complex transformations concise and fast.

# Combine the Dask outputs into a single pandas DataFrame, then convert to Polars
subscriber_pd = count_features.join(event_pivot, on='user_id').join(
    device_pivot, on='user_id'
)
subscriber_pl = pl.from_pandas(subscriber_pd.reset_index())

print(f"Combined shape: {subscriber_pl.shape}")
print(f"Columns: {subscriber_pl.columns}")
Combined shape: (24000, 18)
Columns: ['user_id', 'total_events', 'total_revenue', 'total_duration',
           'count_billing_event', 'count_download', 'count_feature_click',
           'count_page_view', 'count_search', 'count_settings_change',
           'count_share', 'count_support_ticket', 'count_video_complete',
           'count_video_start', 'device_desktop', 'device_mobile',
           'device_smart_tv', 'device_tablet']

Now we engineer the churn risk features.

# Feature engineering with Polars expressions
features = (
    subscriber_pl.lazy()

    # --- Engagement features ---
    .with_columns(
        # Daily average events
        daily_avg_events=(pl.col('total_events') / N_DAYS),

        # Video completion rate
        video_completion_rate=pl.when(pl.col('count_video_start') > 0)
            .then(pl.col('count_video_complete') / pl.col('count_video_start'))
            .otherwise(0.0),

        # Feature adoption (how many distinct feature types used)
        feature_breadth=(
            (pl.col('count_page_view') > 0).cast(pl.Int32) +
            (pl.col('count_video_start') > 0).cast(pl.Int32) +
            (pl.col('count_search') > 0).cast(pl.Int32) +
            (pl.col('count_feature_click') > 0).cast(pl.Int32) +
            (pl.col('count_share') > 0).cast(pl.Int32) +
            (pl.col('count_download') > 0).cast(pl.Int32)
        ),

        # Support intensity (support tickets as fraction of all events)
        support_intensity=(
            pl.col('count_support_ticket') / pl.col('total_events')
        ),

        # Device diversity (number of distinct device types used)
        device_diversity=(
            (pl.col('device_desktop') > 0).cast(pl.Int32) +
            (pl.col('device_mobile') > 0).cast(pl.Int32) +
            (pl.col('device_smart_tv') > 0).cast(pl.Int32) +
            (pl.col('device_tablet') > 0).cast(pl.Int32)
        ),
    )

    # --- Monetary features ---
    .with_columns(
        # Average revenue per event
        revenue_per_event=pl.when(pl.col('total_events') > 0)
            .then(pl.col('total_revenue') / pl.col('total_events'))
            .otherwise(0.0),

        # Average viewing duration per video
        avg_video_duration=pl.when(pl.col('count_video_start') > 0)
            .then(pl.col('total_duration') / pl.col('count_video_start'))
            .otherwise(0.0),
    )

    # --- Ratio features ---
    .with_columns(
        # Mobile fraction
        mobile_fraction=pl.when(pl.col('total_events') > 0)
            .then(pl.col('device_mobile') / pl.col('total_events'))
            .otherwise(0.0),

        # Search-to-view ratio (searching without watching = frustration signal)
        search_to_view_ratio=pl.when(pl.col('count_video_start') > 0)
            .then(pl.col('count_search') / pl.col('count_video_start'))
            .otherwise(
                pl.when(pl.col('count_search') > 0)
                .then(pl.lit(10.0))  # high ratio if searching but never watching
                .otherwise(0.0)
            ),
    )

    .collect()
)

print(f"Feature matrix shape: {features.shape}")
print(f"\nNew feature columns:")
new_cols = [c for c in features.columns if c not in subscriber_pl.columns]
for col in new_cols:
    stats = features[col].describe()
    print(f"  {col}: mean={features[col].mean():.3f}, "
          f"std={features[col].std():.3f}")
Feature matrix shape: (24000, 27)

New feature columns:
  daily_avg_events: mean=12.019, std=12.008
  video_completion_rate: mean=0.748, std=0.131
  feature_breadth: mean=5.684, std=0.722
  support_intensity: mean=0.020, std=0.011
  device_diversity: mean=3.412, std=0.801
  revenue_per_event: mean=0.005, std=0.004
  avg_video_duration: mean=298.742, std=28.561
  mobile_fraction: mean=0.450, std=0.029
  search_to_view_ratio: mean=0.501, std=0.093

Step 4: Temporal Features from Raw Events

Some features require going back to the raw event data --- specifically, recency features (days since last login, days since last support ticket) and trend features (is the subscriber's activity increasing or decreasing?).

# Recency features: go back to Dask for the raw timestamps
start = time.time()

# Last event timestamp per subscriber
last_event = (
    ddf.groupby('user_id')['timestamp']
    .max()
    .compute()
    .rename('last_event_ts')
)

# Last support ticket timestamp
support_events = ddf[ddf['event_type'] == 'support_ticket']
last_support = (
    support_events.groupby('user_id')['timestamp']
    .max()
    .compute()
    .rename('last_support_ts')
)

# Last video event
video_events = ddf[ddf['event_type'].isin(['video_start', 'video_complete'])]
last_video = (
    video_events.groupby('user_id')['timestamp']
    .max()
    .compute()
    .rename('last_video_ts')
)

recency_time = time.time() - start
print(f"Recency computation time: {recency_time:.2f}s")

# Convert to Polars and compute days-since features
reference_date = pd.Timestamp('2024-10-31')

recency_pd = pd.DataFrame({
    'user_id': last_event.index,
    'days_since_last_event': (reference_date - last_event).dt.days,
})
recency_pd = recency_pd.merge(
    pd.DataFrame({
        'user_id': last_support.index,
        'days_since_last_support': (reference_date - last_support).dt.days
    }),
    on='user_id', how='left'
)
recency_pd = recency_pd.merge(
    pd.DataFrame({
        'user_id': last_video.index,
        'days_since_last_video': (reference_date - last_video).dt.days
    }),
    on='user_id', how='left'
)

# Fill NaN (subscribers with no support tickets or videos)
recency_pd['days_since_last_support'] = recency_pd[
    'days_since_last_support'
].fillna(999)  # never submitted a ticket
recency_pd['days_since_last_video'] = recency_pd[
    'days_since_last_video'
].fillna(999)

recency_pl = pl.from_pandas(recency_pd)
print(f"\nRecency features:")
print(recency_pl.describe())
Recency computation time: 3.41s

Recency features:
shape: (9, 4)
+-----------+---------------------+--------------------------+------------------------+
| statistic | user_id             | days_since_last_event    | days_since_last_support|
| ---       | ---                 | ---                      | ---                    |
| str       | f64                 | f64                      | f64                    |
+-----------+---------------------+--------------------------+------------------------+
| count     | 24000.0             | 24000.0                  | 24000.0                |
| null_count| 0.0                 | 0.0                      | 0.0                    |
| mean      | 12000.5             | 0.482                    | 8.934                  |
| std       | 6928.5              | 0.731                    | 89.241                 |
| min       | 1.0                 | 0.0                      | 0.0                    |
| 25%       | 6000.0              | 0.0                      | 2.0                    |
| 50%       | 12000.0             | 0.0                      | 7.0                    |
| 75%       | 18000.0             | 1.0                      | 14.0                   |
| max       | 24000.0             | 8.0                      | 999.0                  |
+-----------+---------------------+--------------------------+------------------------+

Step 5: Activity Trend Features

A subscriber who logged in 50 times in week 1 and 5 times in week 4 is a very different risk profile from one with a steady 15 events per week. We compute the trend by comparing the first half of the window to the second half.

# Split the 30-day window into two halves
midpoint = pd.Timestamp('2024-10-16')

# First half counts (Dask)
first_half = ddf[ddf['timestamp'] < midpoint]
first_counts = (
    first_half.groupby('user_id')
    .agg(events_first_half=('event_type', 'count'))
    .compute()
)

# Second half counts
second_half = ddf[ddf['timestamp'] >= midpoint]
second_counts = (
    second_half.groupby('user_id')
    .agg(events_second_half=('event_type', 'count'))
    .compute()
)

# Merge and compute trend
trend_pd = first_counts.join(second_counts, how='outer').fillna(0)
trend_pl = pl.from_pandas(trend_pd.reset_index())

trend_features = (
    trend_pl.lazy()
    .with_columns(
        # Activity trend: positive = increasing, negative = declining
        activity_trend=pl.when(
            (pl.col('events_first_half') + pl.col('events_second_half')) > 0
        ).then(
            (pl.col('events_second_half') - pl.col('events_first_half')) /
            (pl.col('events_first_half') + pl.col('events_second_half'))
        ).otherwise(0.0),

        # Activity volatility: abs difference relative to total
        activity_volatility=(
            (pl.col('events_second_half') - pl.col('events_first_half')).abs() /
            (pl.col('events_first_half') + pl.col('events_second_half') + 1)
        ),
    )
    .select(['user_id', 'activity_trend', 'activity_volatility'])
    .collect()
)

print("Activity trend statistics:")
print(trend_features.describe())
Activity trend statistics:
shape: (9, 3)
+-----------+------------------+---------------------+
| statistic | activity_trend   | activity_volatility |
| ---       | ---              | ---                 |
| str       | f64              | f64                 |
+-----------+------------------+---------------------+
| count     | 24000.0          | 24000.0             |
| null_count| 0.0              | 0.0                 |
| mean      | -0.003           | 0.051               |
| std       | 0.072            | 0.041               |
| min       | -0.412           | 0.000               |
| 25%       | -0.048           | 0.021               |
| 50%       | -0.002           | 0.041               |
| 75%       | 0.043            | 0.070               |
| max       | 0.398            | 0.412               |
+-----------+------------------+---------------------+

Step 6: Assemble the Final Feature Matrix

Now we combine all feature groups into a single DataFrame and convert to pandas for scikit-learn.

# Join all feature groups in Polars
final_features = (
    features.lazy()
    .join(recency_pl.lazy(), on='user_id', how='left')
    .join(trend_features.lazy(), on='user_id', how='left')

    # Select only the engineered features (drop raw counts used as intermediates)
    .select([
        'user_id',
        # Engagement
        'daily_avg_events',
        'video_completion_rate',
        'feature_breadth',
        'support_intensity',
        'device_diversity',
        # Monetary
        'total_revenue',
        'revenue_per_event',
        'avg_video_duration',
        # Behavioral
        'mobile_fraction',
        'search_to_view_ratio',
        # Recency
        'days_since_last_event',
        'days_since_last_support',
        'days_since_last_video',
        # Trend
        'activity_trend',
        'activity_volatility',
    ])
    .collect()
)

print(f"Final feature matrix: {final_features.shape}")
print(f"\nFeature summary:")
print(final_features.describe())

# Convert to pandas for scikit-learn
features_pd = final_features.to_pandas().set_index('user_id')
print(f"\npandas DataFrame shape: {features_pd.shape}")
print(f"Memory usage: {features_pd.memory_usage(deep=True).sum() / 1e6:.1f} MB")
print(f"Any nulls: {features_pd.isnull().any().any()}")
Final feature matrix: (24000, 16)

Feature summary:
shape: (9, 16)
...

pandas DataFrame shape: (24000, 15)
Memory usage: 2.9 MB
Any nulls: False

Step 7: Validation and Sanity Checks

Before handing features to a model, verify they make sense.

# Sanity checks
print("Feature range validation:")
checks = {
    'video_completion_rate': (0.0, 1.0),
    'feature_breadth': (0, 6),
    'support_intensity': (0.0, 1.0),
    'device_diversity': (1, 4),
    'mobile_fraction': (0.0, 1.0),
    'activity_trend': (-1.0, 1.0),
}

all_passed = True
for col, (expected_min, expected_max) in checks.items():
    actual_min = features_pd[col].min()
    actual_max = features_pd[col].max()
    passed = actual_min >= expected_min and actual_max <= expected_max
    status = "PASS" if passed else "FAIL"
    print(f"  {col}: [{actual_min:.3f}, {actual_max:.3f}] "
          f"expected [{expected_min}, {expected_max}] -- {status}")
    if not passed:
        all_passed = False

print(f"\nAll checks passed: {all_passed}")

# Feature correlations (check for redundancy)
high_corr = features_pd.corr().abs()
np.fill_diagonal(high_corr.values, 0)
max_corr = high_corr.max().max()
max_corr_pair = high_corr.stack().idxmax()
print(f"\nHighest feature correlation: {max_corr:.3f} between "
      f"{max_corr_pair[0]} and {max_corr_pair[1]}")
Feature range validation:
  video_completion_rate: [0.000, 1.000] expected [0.0, 1.0] -- PASS
  feature_breadth: [1.000, 6.000] expected [0, 6] -- PASS
  support_intensity: [0.000, 0.091] expected [0.0, 1.0] -- PASS
  device_diversity: [1.000, 4.000] expected [1, 4] -- PASS
  mobile_fraction: [0.327, 0.571] expected [0.0, 1.0] -- PASS
  activity_trend: [-0.412, 0.398] expected [-1.0, 1.0] -- PASS

All checks passed: True

Highest feature correlation: 0.823 between daily_avg_events and total_revenue

Production Tip --- The high correlation between daily_avg_events and total_revenue is expected (more active users generate more billing events). In production, you might drop one or use PCA to combine them. But for a first model, keeping both and letting the model sort out importance is a reasonable starting point.


Pipeline Performance Summary

print("=== Pipeline Performance Summary ===")
print(f"Data: {N_SUBSCRIBERS:,} subscribers x {N_DAYS} days")
print(f"Raw events: {n_rows:,} rows across {N_DAYS} Parquet files")
print(f"Output: {features_pd.shape[0]:,} subscribers x {features_pd.shape[1]} features")
print(f"\nTool usage:")
print(f"  Dask:   Data loading + initial aggregation ({dask_time:.1f}s)")
print(f"  Polars: Feature engineering transformations")
print(f"  pandas: Final output for scikit-learn ({features_pd.memory_usage(deep=True).sum() / 1e6:.1f} MB)")
print(f"\nScaling estimate for full dataset (2.4M subscribers, 90 days):")
print(f"  Raw events: ~2.6 billion rows")
print(f"  Estimated Dask aggregation: ~15-25 minutes on 8-core machine")
print(f"  Polars feature engineering: ~2-5 seconds (output is 2.4M rows)")
print(f"  Final feature matrix: ~2.4M x 15 = ~290 MB in pandas")
=== Pipeline Performance Summary ===
Data: 24,000 subscribers x 30 days
Raw events: 8,653,281 rows across 30 Parquet files
Output: 24,000 subscribers x 15 features

Tool usage:
  Dask:   Data loading + initial aggregation (4.7s)
  Polars: Feature engineering transformations
  pandas: Final output for scikit-learn (2.9 MB)

Scaling estimate for full dataset (2.4M subscribers, 90 days):
  Raw events: ~2.6 billion rows
  Estimated Dask aggregation: ~15-25 minutes on 8-core machine
  Polars feature engineering: ~2-5 seconds (output is 2.4M rows)
  Final feature matrix: ~2.4M x 15 = ~290 MB in pandas

Lessons Learned

  1. The bottleneck was data volume, not computation complexity. The feature engineering logic is straightforward --- counts, ratios, date differences. The challenge was applying it to 8.6 million rows (or 2.6 billion at full scale) without running out of memory.

  2. Dask handled the out-of-core aggregation; Polars handled the fast transformations. Each tool was used where it is strongest. Trying to do everything in Dask would have been slower (Dask's expression system is less optimized than Polars'). Trying to do everything in Polars would have required fitting the full event log in memory.

  3. The final output is small. 2.6 billion raw events reduce to a 290 MB feature matrix. The lesson: the goal of feature engineering for large datasets is to reduce the data to a size where standard tools (pandas, scikit-learn) work perfectly well.

  4. Parquet was essential. Loading 30 daily Parquet files with Dask was fast because Parquet is columnar and compressed. The same data in CSV would have been 3-4x larger on disk and 5-8x slower to read.

  5. Develop on a sample, validate on full data. This case study used 1% of the subscribers. The pipeline patterns and code are identical at full scale --- only the computation time changes.


This case study demonstrates a multi-tool pipeline for large-scale feature engineering. Return to the chapter for the conceptual framework, or continue to Case Study 2 for SQL-heavy pipeline optimization with TurbineTech sensor data.