Case Study 1: StreamFlow Feature Extraction Pipeline — From Schema to Model-Ready Table
The Situation
StreamFlow's data science team has been asked to build a churn prediction model. The model needs to score all 2.4 million active subscribers nightly, producing a probability of cancellation within 30 days. The marketing team will use these scores to trigger retention interventions — personalized offers, proactive support outreach, and re-engagement campaigns.
The problem is not the model. The problem is the features.
StreamFlow's data lives in a PostgreSQL database designed by application engineers for the subscription platform. Five normalized tables. No feature columns. No pre-computed aggregates. The data science team's first task is to build a SQL pipeline that extracts 20+ features from these tables and produces a flat, model-ready table with one row per subscriber.
The team lead assigns the work to Priya, a data scientist who joined StreamFlow three months ago. She has strong Python skills and decent SQL fundamentals. She has never written a window function in production.
This case study follows Priya as she builds the pipeline from scratch, makes mistakes, fixes them, and delivers a feature set that the churn model will use for the next 18 months.
Phase 1: Understanding the Data
Priya starts by exploring the schema. She runs basic counts and distributions:
-- Table sizes
SELECT 'subscribers' AS tbl, COUNT(*) AS rows FROM subscribers
UNION ALL
SELECT 'usage_events', COUNT(*) FROM usage_events
UNION ALL
SELECT 'support_tickets', COUNT(*) FROM support_tickets
UNION ALL
SELECT 'plan_changes', COUNT(*) FROM plan_changes
UNION ALL
SELECT 'billing_events', COUNT(*) FROM billing_events;
tbl | rows
-----------------+------------
subscribers | 2,400,000
usage_events | 847,000,000
support_tickets | 340,000
plan_changes | 890,000
billing_events | 28,800,000
The usage_events table is the monster: 847 million rows. Every SELECT on this table must be filtered by date or subscriber, or it will take minutes.
She checks the plan distribution and churn rate:
SELECT
plan_type,
COUNT(*) AS total,
COUNT(*) FILTER (WHERE cancel_date IS NOT NULL) AS canceled,
ROUND(COUNT(*) FILTER (WHERE cancel_date IS NOT NULL)::NUMERIC / COUNT(*), 3) AS churn_rate
FROM subscribers
GROUP BY plan_type
ORDER BY churn_rate DESC;
plan_type | total | canceled | churn_rate
------------+---------+----------+------------
free | 480,000 | 67,200 | 0.140
basic | 960,000 | 100,800 | 0.105
premium | 720,000 | 43,200 | 0.060
enterprise | 240,000 | 7,200 | 0.030
Free-tier subscribers churn at 14%. Enterprise subscribers churn at 3%. Plan type is clearly a strong feature — but not the only one.
Phase 2: The Naive Approach (And Why It Fails)
Priya's first instinct is to compute everything in pandas:
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('postgresql://priya:***@warehouse:5432/streamflow')
# Load usage events for the last 90 days
usage = pd.read_sql("""
SELECT subscriber_id, event_date, session_minutes, feature_used, device_type
FROM usage_events
WHERE event_date >= CURRENT_DATE - INTERVAL '90 days'
""", engine)
print(f"Loaded {len(usage):,} rows, {usage.memory_usage(deep=True).sum() / 1e9:.1f} GB")
Loaded 214,000,000 rows, 18.7 GB
Her laptop has 16 GB of RAM. The kernel crashes.
She requests a larger instance — 64 GB — and the query takes 8 minutes to transfer over the network. The pandas groupby().shift() operations for LAG-equivalent features take another 12 minutes. Total pipeline runtime: 23 minutes.
Her team lead looks at the code and says: "Do this in SQL. The database has indexes and 256 GB of RAM. Your laptop does not."
Phase 3: Building the CTE Pipeline
Priya restructures the pipeline into SQL. She starts with the simplest features and adds complexity one CTE at a time.
CTE 1: Subscriber Tenure
WITH tenure AS (
SELECT
subscriber_id,
plan_type,
plan_price,
country,
signup_date,
cancel_date,
EXTRACT(EPOCH FROM (COALESCE(cancel_date, CURRENT_DATE) - signup_date))
/ (86400 * 30.44) AS tenure_months,
CASE
WHEN EXTRACT(EPOCH FROM (COALESCE(cancel_date, CURRENT_DATE) - signup_date))
/ 86400 <= 30 THEN 'new'
WHEN EXTRACT(EPOCH FROM (COALESCE(cancel_date, CURRENT_DATE) - signup_date))
/ 86400 <= 180 THEN 'growing'
WHEN EXTRACT(EPOCH FROM (COALESCE(cancel_date, CURRENT_DATE) - signup_date))
/ 86400 <= 365 THEN 'established'
ELSE 'mature'
END AS tenure_segment
FROM subscribers
)
SELECT tenure_segment, COUNT(*), ROUND(AVG(tenure_months), 1)
FROM tenure
GROUP BY tenure_segment;
tenure_segment | count | avg_months
----------------+---------+-----------
new | 187,000 | 0.5
growing | 612,000 | 3.4
established | 843,000 | 8.7
mature | 758,000 | 22.1
She validates: the numbers add up to 2.4M. The distribution makes sense for a 4-year-old product.
CTE 2: Usage Features with Window Functions
This is where Priya encounters her first real challenge. She needs the usage trend — how is each subscriber's usage changing over time? LAG is the tool.
WITH monthly_usage AS (
SELECT
subscriber_id,
DATE_TRUNC('month', event_date) AS usage_month,
SUM(session_minutes) AS total_minutes,
COUNT(DISTINCT event_date) AS active_days,
COUNT(DISTINCT feature_used) AS features_used
FROM usage_events
WHERE event_date >= CURRENT_DATE - INTERVAL '4 months'
GROUP BY subscriber_id, DATE_TRUNC('month', event_date)
),
usage_with_trend AS (
SELECT
subscriber_id,
usage_month,
total_minutes,
active_days,
features_used,
LAG(total_minutes, 1) OVER w AS prev_1m_minutes,
LAG(total_minutes, 2) OVER w AS prev_2m_minutes,
LAG(total_minutes, 3) OVER w AS prev_3m_minutes
FROM monthly_usage
WINDOW w AS (PARTITION BY subscriber_id ORDER BY usage_month)
)
SELECT
subscriber_id,
total_minutes AS current_month_minutes,
active_days AS current_month_active_days,
features_used AS current_month_features,
CASE WHEN prev_1m_minutes > 0
THEN ROUND(((total_minutes - prev_1m_minutes) / prev_1m_minutes * 100)::NUMERIC, 1)
ELSE NULL
END AS mom_pct_change,
-- 3-month trend direction
CASE
WHEN prev_1m_minutes IS NOT NULL
AND prev_2m_minutes IS NOT NULL
AND total_minutes < prev_1m_minutes
AND prev_1m_minutes < prev_2m_minutes
THEN 'declining'
WHEN prev_1m_minutes IS NOT NULL
AND prev_2m_minutes IS NOT NULL
AND total_minutes > prev_1m_minutes
AND prev_1m_minutes > prev_2m_minutes
THEN 'growing'
ELSE 'mixed_or_insufficient'
END AS usage_trend_direction
FROM usage_with_trend
WHERE usage_month = DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month');
She runs this and gets results in 47 seconds. The same computation in pandas — after the 8-minute data transfer — took 12 minutes.
Common Mistake — Priya's first version used
CURRENT_DATEin the month filter instead ofCURRENT_DATE - INTERVAL '1 month'. The current month is incomplete, so its usage totals are artificially low. Every subscriber appeared to be declining. Always use the most recent complete month for trend features.
CTE 3: Engagement Regularity
WITH session_dates AS (
SELECT DISTINCT subscriber_id, event_date
FROM usage_events
WHERE event_date >= CURRENT_DATE - INTERVAL '90 days'
),
gaps AS (
SELECT
subscriber_id,
event_date,
LEAD(event_date) OVER (
PARTITION BY subscriber_id ORDER BY event_date
) AS next_date,
LEAD(event_date) OVER (
PARTITION BY subscriber_id ORDER BY event_date
) - event_date AS gap_days
FROM session_dates
)
SELECT
subscriber_id,
COUNT(*) + 1 AS session_count_90d,
ROUND(AVG(gap_days)::NUMERIC, 1) AS avg_gap_days,
MAX(gap_days) AS max_gap_days,
CURRENT_DATE - MAX(event_date) AS days_since_last_session
FROM gaps
WHERE gap_days IS NOT NULL
GROUP BY subscriber_id;
CTE 4-6: Support, Plan, Billing Features
Priya adds three more CTEs following the patterns from the chapter — conditional aggregation for support tickets, FILTER clause for plan change types, and billing failure counts. Each CTE is independently testable. She runs each one in isolation before assembling the final query.
Phase 4: The Assembly Join
With 6 CTEs computed, Priya assembles the final feature table:
-- Final assembly: 6 CTEs joined on subscriber_id
SELECT
t.subscriber_id,
t.plan_type,
t.plan_price,
t.country,
t.tenure_months,
t.tenure_segment,
COALESCE(u.current_month_minutes, 0) AS current_month_minutes,
COALESCE(u.current_month_active_days, 0) AS current_month_active_days,
COALESCE(u.current_month_features, 0) AS current_month_features_used,
u.mom_pct_change,
COALESCE(u.usage_trend_direction, 'mixed_or_insufficient') AS usage_trend_direction,
COALESCE(e.avg_gap_days, 90) AS avg_gap_days,
COALESCE(e.max_gap_days, 90) AS max_gap_days,
COALESCE(e.days_since_last_session, 90) AS days_since_last_session,
COALESCE(ts.tickets_90d, 0) AS tickets_90d,
COALESCE(ts.complaint_tickets_90d, 0) AS complaint_tickets_90d,
COALESCE(ts.has_unresolved_ticket, FALSE) AS has_unresolved_ticket,
COALESCE(pf.downgrades, 0) AS plan_downgrades,
COALESCE(pf.upgrades, 0) AS plan_upgrades,
COALESCE(pf.recent_downgrade, FALSE) AS recent_downgrade,
COALESCE(bf.failed_payments_90d, 0) AS failed_payments_90d,
COALESCE(bf.total_refunds, 0) AS total_refunds,
-- Domain-informed flags
CASE WHEN pf.recent_downgrade AND t.tenure_months > 3 THEN 1 ELSE 0 END
AS downgrade_risk_flag,
CASE WHEN COALESCE(e.days_since_last_session, 90) > 21
AND t.tenure_months > 3 THEN 1 ELSE 0 END
AS dormant_flag,
-- Target
CASE WHEN t.cancel_date IS NOT NULL
AND t.cancel_date BETWEEN CURRENT_DATE AND CURRENT_DATE + INTERVAL '30 days'
THEN 1 ELSE 0
END AS churned_within_30d
FROM tenure t
LEFT JOIN usage_with_trend u ON t.subscriber_id = u.subscriber_id
LEFT JOIN engagement e ON t.subscriber_id = e.subscriber_id
LEFT JOIN ticket_features ts ON t.subscriber_id = ts.subscriber_id
LEFT JOIN plan_features pf ON t.subscriber_id = pf.subscriber_id
LEFT JOIN billing_features bf ON t.subscriber_id = bf.subscriber_id
WHERE t.cancel_date IS NULL
OR t.cancel_date >= CURRENT_DATE - INTERVAL '30 days';
She runs EXPLAIN ANALYZE. Total execution time: 2 minutes 14 seconds. The bottleneck is a sequential scan on usage_events.
Phase 5: Optimization
Priya adds two indexes:
CREATE INDEX idx_usage_sub_date ON usage_events (subscriber_id, event_date);
CREATE INDEX idx_tickets_sub_date ON support_tickets (subscriber_id, created_date);
After the indexes build (12 minutes for the 847M-row usage_events table), she re-runs the query. Execution time: 38 seconds. A 3.5x improvement.
She wraps the entire query in a materialized view:
CREATE MATERIALIZED VIEW mv_churn_features AS
-- ... (the full query above)
WITH DATA;
CREATE UNIQUE INDEX idx_mv_churn_subscriber
ON mv_churn_features (subscriber_id);
Refresh time: 38 seconds. Read time after materialization: instant.
Phase 6: Validation
Priya checks the output:
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine('postgresql://priya:***@warehouse:5432/streamflow')
df = pd.read_sql(text("SELECT * FROM mv_churn_features"), engine)
print(f"Shape: {df.shape}")
print(f"\nNull counts (top 5):")
print(df.isnull().sum().sort_values(ascending=False).head())
print(f"\nTarget distribution:")
print(df['churned_within_30d'].value_counts(normalize=True))
print(f"\nUsage trend distribution:")
print(df['usage_trend_direction'].value_counts(normalize=True))
Shape: (2387000, 25)
Null counts (top 5):
mom_pct_change 312456
days_since_last_ticket 1847234
avg_gap_days 8923
max_gap_days 8923
days_since_last_session 0
Target distribution:
0 0.918
1 0.082
Name: churned_within_30d, dtype: float64
Usage trend distribution:
mixed_or_insufficient 0.421
declining 0.312
growing 0.267
Name: usage_trend_direction, dtype: float64
Key observations:
- 312K subscribers have NULL month-over-month change — these are subscribers with less than 2 months of data. The NULL is informative: it means "new subscriber, insufficient trend data."
- 1.85M subscribers have NULL days_since_last_ticket — they have never filed a ticket. This is the majority of subscribers, which means "no ticket history" is the normal case.
- 8,923 subscribers have NULL engagement gaps — they had fewer than 2 sessions in the last 90 days. Nearly dormant accounts.
- 31.2% of subscribers have declining usage — this is the core risk population.
- The 8.2% churn rate matches expectations from the business metrics.
Phase 7: The Handoff
Priya delivers three artifacts:
- The SQL query — saved as
src/features/extract_features.sql, version-controlled in Git - The materialized view —
mv_churn_features, refreshed nightly at 2 AM via an Airflow DAG - A data dictionary — documenting each feature's name, type, computation logic, and expected null pattern
The data science team loads the features via pd.read_sql() and begins model development in Chapter 6.
Analysis Questions
-
Priya's pandas approach crashed with 18.7 GB of data on a 16 GB laptop. But the SQL approach uses the same data. Why doesn't the database run out of memory? What are the fundamental differences in how a database processes aggregations versus how pandas does it?
-
The
mom_pct_changefeature is NULL for 312K subscribers. What are three strategies for handling this NULL in the modeling phase? Which would you choose and why? -
Priya used
COALESCE(e.days_since_last_session, 90)to replace NULL with 90 for subscribers with no recent sessions. Is 90 the right default? What implicit assumption does this choice encode? How would you validate it? -
The query execution dropped from 2 minutes 14 seconds to 38 seconds after adding two indexes. But those indexes took 12 minutes to build and will slow down every INSERT into
usage_eventsandsupport_tickets. How would you decide whether the indexes are worth the write-performance cost? -
The materialized view refreshes nightly. This means the features are up to 24 hours stale. For a churn model that scores subscribers once a day, is this acceptable? What if the model needed to trigger real-time interventions — would you change the architecture?
-
Priya built domain-informed flags like
downgrade_risk_flaganddormant_flagdirectly in SQL. An alternative is to let the ML model learn these interactions from the raw features (e.g., a gradient boosting model can learn thatrecent_downgrade = TRUE AND tenure_months > 3predicts churn). What are the tradeoffs between pre-computing interaction features in SQL versus letting the model discover them?