Case Study 1: StreamFlow --- Assembling the Full Preprocessing Pipeline
Background
Over the last five chapters, the StreamFlow data science team has built individual preprocessing components: a SQL extraction query (Chapter 5), feature engineering logic (Chapter 6), categorical encoding (Chapter 7), missing data imputation (Chapter 8), and feature selection (Chapter 9). Each component lives in a separate notebook. Each was validated independently.
Now the team needs to assemble them into a single pipeline for production deployment. The goal is straightforward: given a raw subscriber DataFrame from the data warehouse, produce churn predictions. One input, one output, no manual steps in between.
The integration should take a day. It takes two weeks.
The Integration Problem
The first attempt is to run the notebooks in sequence. The data engineer writes a script that calls each notebook's core function in order:
# Attempt 1: sequential function calls
df_raw = extract_subscriber_data(conn, prediction_date='2025-02-01')
df_features = engineer_features(df_raw)
df_encoded = encode_categoricals(df_features)
df_imputed = impute_missing(df_encoded)
df_selected = select_features(df_imputed, y_train)
predictions = model.predict(df_selected)
This breaks immediately. The issues:
Issue 1: Column name mismatch. The encode_categoricals function expects a column called plan_tier, but engineer_features renamed it to plan_type during feature engineering. Nobody noticed because the notebooks were always run in the same session with the same variable names.
Issue 2: Fit/transform leakage. The impute_missing function calls SimpleImputer().fit_transform(df) every time. During development, this was fine --- the imputer was always fitted on the training data. In the sequential pipeline, it is fitted on whatever data is passed in, including test data.
Issue 3: Feature ordering. The select_features function returns columns in a different order than the model expects. The model was trained on alphabetically sorted features. The selection function returns features in importance-ranked order.
Issue 4: Stale state. The encode_categoricals function uses a OneHotEncoder that was fitted during Chapter 7's development. It does not know about the enterprise plan type that was added to the product last month. One-hot encoding fails at prediction time for subscribers on the new plan.
Each of these issues is trivial to fix individually. Together, they represent a systemic problem: the preprocessing logic is distributed across five separate stateful components with implicit dependencies. There is no single object that encodes the correct sequence of operations and the correct fitted parameters.
The Pipeline Solution
The senior data scientist rewrites the entire preprocessing workflow as a single scikit-learn Pipeline.
Step 1: Define Custom Transformers
The domain-specific logic from Chapters 6 and 8 becomes custom transformer classes:
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
class StreamFlowFeatureEngineer(BaseEstimator, TransformerMixin):
"""Create StreamFlow domain features from raw subscriber data.
Computes engagement ratios, trend features, and recency scores
as designed in Chapter 6.
"""
def __init__(self, fill_value=0.0):
self.fill_value = fill_value
def fit(self, X, y=None):
self.feature_names_in_ = list(X.columns)
return self
def transform(self, X):
X = pd.DataFrame(X, columns=self.feature_names_in_).copy()
# Engagement ratios
X['hours_per_session'] = np.where(
X['sessions_last_7d'] > 0,
X['total_hours_last_30d'] / X['sessions_last_7d'],
self.fill_value
)
X['tickets_per_tenure_month'] = np.where(
X['tenure_months'] > 0,
X['support_tickets_last_90d'] / X['tenure_months'],
self.fill_value
)
X['charge_per_hour'] = np.where(
X['total_hours_last_30d'] > 0,
X['monthly_charge'] / X['total_hours_last_30d'],
self.fill_value
)
# Recency score (0 = logged in today, 1 = 30+ days since login)
X['recency_score'] = (X['days_since_last_login'].clip(0, 30) / 30.0)
return X
def get_feature_names_out(self, input_features=None):
return (list(self.feature_names_in_) +
['hours_per_session', 'tickets_per_tenure_month',
'charge_per_hour', 'recency_score'])
class MissingIndicatorAdder(BaseEstimator, TransformerMixin):
"""Add binary missing indicators for high-missingness features.
Learns which columns have missingness above the threshold during fit.
At transform time, adds indicator columns for those features.
"""
def __init__(self, threshold=0.01):
self.threshold = threshold
def fit(self, X, y=None):
X = pd.DataFrame(X)
missing_rates = X.isna().mean()
self.missing_columns_ = missing_rates[
missing_rates >= self.threshold
].index.tolist()
self.feature_names_in_ = list(X.columns)
return self
def transform(self, X):
X = pd.DataFrame(X, columns=self.feature_names_in_).copy()
for col in self.missing_columns_:
X[f'{col}_missing'] = X[col].isna().astype(int)
return X
def get_feature_names_out(self, input_features=None):
indicators = [f'{col}_missing' for col in self.missing_columns_]
return list(self.feature_names_in_) + indicators
Step 2: Define the ColumnTransformer
The numeric and categorical pipelines are defined separately and combined:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif
# Column definitions
num_cols = [
'tenure_months', 'monthly_charge', 'total_hours_last_30d',
'sessions_last_7d', 'support_tickets_last_90d',
'days_since_last_login', 'hours_per_session',
'tickets_per_tenure_month', 'charge_per_hour', 'recency_score',
'total_hours_last_30d_missing', 'sessions_last_7d_missing',
'days_since_last_login_missing'
]
cat_cols = ['plan_type', 'device_type']
numeric_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_pipeline = Pipeline([
('encoder', OneHotEncoder(
sparse_output=False,
handle_unknown='ignore',
min_frequency=50 # Collapse rare categories
))
])
preprocessor = ColumnTransformer([
('num', numeric_pipeline, num_cols),
('cat', categorical_pipeline, cat_cols)
], remainder='drop')
Step 3: Assemble the Full Pipeline
from sklearn.linear_model import LogisticRegression
streamflow_pipeline = Pipeline([
('feature_engineer', StreamFlowFeatureEngineer(fill_value=0.0)),
('missing_indicators', MissingIndicatorAdder(threshold=0.01)),
('preprocessor', preprocessor),
('selector', SelectKBest(f_classif, k=15)),
('model', LogisticRegression(random_state=42, max_iter=1000))
])
Step 4: Validate
from sklearn.model_selection import cross_val_score, train_test_split
# Simulate full StreamFlow dataset
np.random.seed(42)
n = 50000
df = pd.DataFrame({
'tenure_months': np.random.exponential(18, n).clip(1, 120).astype(int),
'monthly_charge': np.random.uniform(9.99, 49.99, n).round(2),
'total_hours_last_30d': np.where(
np.random.random(n) < 0.12, np.nan,
np.random.exponential(15, n).round(1)
),
'sessions_last_7d': np.where(
np.random.random(n) < 0.12, np.nan,
np.random.poisson(5, n).astype(float)
),
'support_tickets_last_90d': np.random.poisson(0.8, n),
'days_since_last_login': np.where(
np.random.random(n) < 0.05, np.nan,
np.random.exponential(5, n).round(0)
),
'plan_type': np.random.choice(
['basic', 'standard', 'premium'], n, p=[0.5, 0.35, 0.15]
),
'device_type': np.random.choice(
['mobile', 'desktop', 'tablet', 'smart_tv'], n, p=[0.4, 0.3, 0.2, 0.1]
),
})
# Synthetic churn target
df['churned'] = (
(df['total_hours_last_30d'].fillna(0) < 5).astype(int) * 0.3
+ (df['support_tickets_last_90d'] > 2).astype(int) * 0.2
+ (df['tenure_months'] < 6).astype(int) * 0.15
+ (df['days_since_last_login'].fillna(30) > 14).astype(int) * 0.15
+ np.random.random(n) * 0.2
) > 0.5
df['churned'] = df['churned'].astype(int)
X = df.drop('churned', axis=1)
y = df['churned']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Cross-validation
cv_scores = cross_val_score(
streamflow_pipeline, X_train, y_train,
cv=5, scoring='roc_auc'
)
print("StreamFlow Pipeline v1 --- Cross-Validation Results")
print("=" * 55)
print(f" CV AUC (mean): {cv_scores.mean():.4f}")
print(f" CV AUC (std): {cv_scores.std():.4f}")
print(f" Per-fold: {cv_scores.round(4)}")
StreamFlow Pipeline v1 --- Cross-Validation Results
=======================================================
CV AUC (mean): 0.8143
CV AUC (std): 0.0045
Per-fold: [0.8112 0.8189 0.8097 0.8201 0.8116]
Step 5: Fit and Serialize
import joblib
# Fit on full training data
streamflow_pipeline.fit(X_train, y_train)
# Evaluate on holdout
from sklearn.metrics import roc_auc_score, classification_report
y_proba = streamflow_pipeline.predict_proba(X_test)[:, 1]
y_pred = streamflow_pipeline.predict(X_test)
print(f"\nHoldout AUC: {roc_auc_score(y_test, y_proba):.4f}")
print(f"\nClassification Report:")
print(classification_report(y_test, y_pred))
# Save
joblib.dump(streamflow_pipeline, 'streamflow_pipeline_v1.joblib')
metadata = {
'version': 'v1',
'sklearn_version': '1.5.2',
'trained_date': '2025-02-01',
'training_rows': len(X_train),
'features': list(X_train.columns),
'cv_auc_mean': float(cv_scores.mean()),
'cv_auc_std': float(cv_scores.std()),
'random_state': 42,
}
joblib.dump(metadata, 'streamflow_pipeline_v1_meta.joblib')
print("\nPipeline and metadata saved.")
Holdout AUC: 0.8178
Classification Report:
precision recall f1-score support
0 0.80 0.82 0.81 5247
1 0.73 0.70 0.71 4753
accuracy 0.76 10000
macro avg 0.76 0.76 0.76 10000
weighted avg 0.76 0.76 0.76 10000
Pipeline and metadata saved.
Step 6: Verify Serialization Round-Trip
# Load and verify
loaded_pipeline = joblib.load('streamflow_pipeline_v1.joblib')
loaded_proba = loaded_pipeline.predict_proba(X_test)[:, 1]
assert np.allclose(y_proba, loaded_proba), "Predictions do not match!"
print("Serialization round-trip verified: predictions are identical.")
Serialization round-trip verified: predictions are identical.
What Changed
The two-week integration was reduced to a single object:
| Before (5 Notebooks) | After (1 Pipeline) |
|---|---|
| 5 separate functions with implicit ordering | 1 Pipeline with enforced ordering |
| 5 fitted objects to serialize separately | 1 joblib file |
| Column names tracked manually | Column names tracked by transformers |
| Fit/transform correctness depends on the developer | Fit/transform correctness enforced by the Pipeline API |
| Cross-validation requires manual fold management | cross_val_score(pipeline, X, y) |
| New team member needs to read 5 notebooks | New team member reads the pipeline definition |
| Production deployment requires orchestrating 5 steps | Production loads 1 file and calls .predict() |
Lessons Learned
-
Integration is where reproducibility dies. Each individual component can be correct, but the connections between them --- column names, feature ordering, fit/transform state --- are implicit contracts that break silently. A Pipeline makes those contracts explicit and enforceable.
-
Custom transformers are the bridge between domain knowledge and the scikit-learn API. The engagement ratio calculations and missing indicators from Chapters 6 and 8 were originally ad-hoc pandas operations. Wrapping them in
BaseEstimator/TransformerMixinclasses makes them testable, reusable, and compatible with cross-validation. -
The
handle_unknown='ignore'parameter is not optional. Theenterpriseplan type broke the sequential approach because the encoder had never seen it. In the pipeline,handle_unknown='ignore'encodes unseen categories as all-zero vectors. The model degrades gracefully instead of crashing. -
Metadata is as important as the model. The version, training date, feature list, and CV score saved alongside the pipeline make it possible to audit, compare, and debug the model months later. The pipeline file answers "what does this model do." The metadata file answers "when was it trained, on what, and how well."
-
The pipeline is the contract between data science and engineering. The data engineer does not need to understand imputation strategies or engagement ratio formulas. They need to call
pipeline.predict(df)and get predictions. The pipeline encapsulates the complexity.
Discussion Questions
-
The team discovered that the sequential function approach had a fit/transform leakage bug that inflated the AUC by approximately 0.03. In a production churn prediction system, what are the business consequences of this overestimation?
-
The custom
StreamFlowFeatureEngineertransformer computesrecency_scoreasdays_since_last_login / 30.0, clipped to [0, 1]. This normalization is hardcoded, not learned from data. Is this a problem? When would you want to learn the normalization range from training data instead? -
The pipeline uses
SelectKBest(k=15), which was chosen as a reasonable default. How would you determine the optimalk? What pipeline feature (covered later in Chapter 18) would you use? -
The
MissingIndicatorAddertransformer learns which columns have missingness above the threshold duringfit. What happens if a column that had 2% missingness during training suddenly has 0% missingness in production data? Does the pipeline still work? Should you be concerned?
This case study supports Chapter 10: Building Reproducible Data Pipelines. Return to the chapter for the full discussion.