Case Study 1: StreamFlow --- Assembling the Full Preprocessing Pipeline


Background

Over the last five chapters, the StreamFlow data science team has built individual preprocessing components: a SQL extraction query (Chapter 5), feature engineering logic (Chapter 6), categorical encoding (Chapter 7), missing data imputation (Chapter 8), and feature selection (Chapter 9). Each component lives in a separate notebook. Each was validated independently.

Now the team needs to assemble them into a single pipeline for production deployment. The goal is straightforward: given a raw subscriber DataFrame from the data warehouse, produce churn predictions. One input, one output, no manual steps in between.

The integration should take a day. It takes two weeks.


The Integration Problem

The first attempt is to run the notebooks in sequence. The data engineer writes a script that calls each notebook's core function in order:

# Attempt 1: sequential function calls
df_raw = extract_subscriber_data(conn, prediction_date='2025-02-01')
df_features = engineer_features(df_raw)
df_encoded = encode_categoricals(df_features)
df_imputed = impute_missing(df_encoded)
df_selected = select_features(df_imputed, y_train)
predictions = model.predict(df_selected)

This breaks immediately. The issues:

Issue 1: Column name mismatch. The encode_categoricals function expects a column called plan_tier, but engineer_features renamed it to plan_type during feature engineering. Nobody noticed because the notebooks were always run in the same session with the same variable names.

Issue 2: Fit/transform leakage. The impute_missing function calls SimpleImputer().fit_transform(df) every time. During development, this was fine --- the imputer was always fitted on the training data. In the sequential pipeline, it is fitted on whatever data is passed in, including test data.

Issue 3: Feature ordering. The select_features function returns columns in a different order than the model expects. The model was trained on alphabetically sorted features. The selection function returns features in importance-ranked order.

Issue 4: Stale state. The encode_categoricals function uses a OneHotEncoder that was fitted during Chapter 7's development. It does not know about the enterprise plan type that was added to the product last month. One-hot encoding fails at prediction time for subscribers on the new plan.

Each of these issues is trivial to fix individually. Together, they represent a systemic problem: the preprocessing logic is distributed across five separate stateful components with implicit dependencies. There is no single object that encodes the correct sequence of operations and the correct fitted parameters.


The Pipeline Solution

The senior data scientist rewrites the entire preprocessing workflow as a single scikit-learn Pipeline.

Step 1: Define Custom Transformers

The domain-specific logic from Chapters 6 and 8 becomes custom transformer classes:

import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

class StreamFlowFeatureEngineer(BaseEstimator, TransformerMixin):
    """Create StreamFlow domain features from raw subscriber data.

    Computes engagement ratios, trend features, and recency scores
    as designed in Chapter 6.
    """

    def __init__(self, fill_value=0.0):
        self.fill_value = fill_value

    def fit(self, X, y=None):
        self.feature_names_in_ = list(X.columns)
        return self

    def transform(self, X):
        X = pd.DataFrame(X, columns=self.feature_names_in_).copy()

        # Engagement ratios
        X['hours_per_session'] = np.where(
            X['sessions_last_7d'] > 0,
            X['total_hours_last_30d'] / X['sessions_last_7d'],
            self.fill_value
        )

        X['tickets_per_tenure_month'] = np.where(
            X['tenure_months'] > 0,
            X['support_tickets_last_90d'] / X['tenure_months'],
            self.fill_value
        )

        X['charge_per_hour'] = np.where(
            X['total_hours_last_30d'] > 0,
            X['monthly_charge'] / X['total_hours_last_30d'],
            self.fill_value
        )

        # Recency score (0 = logged in today, 1 = 30+ days since login)
        X['recency_score'] = (X['days_since_last_login'].clip(0, 30) / 30.0)

        return X

    def get_feature_names_out(self, input_features=None):
        return (list(self.feature_names_in_) +
                ['hours_per_session', 'tickets_per_tenure_month',
                 'charge_per_hour', 'recency_score'])


class MissingIndicatorAdder(BaseEstimator, TransformerMixin):
    """Add binary missing indicators for high-missingness features.

    Learns which columns have missingness above the threshold during fit.
    At transform time, adds indicator columns for those features.
    """

    def __init__(self, threshold=0.01):
        self.threshold = threshold

    def fit(self, X, y=None):
        X = pd.DataFrame(X)
        missing_rates = X.isna().mean()
        self.missing_columns_ = missing_rates[
            missing_rates >= self.threshold
        ].index.tolist()
        self.feature_names_in_ = list(X.columns)
        return self

    def transform(self, X):
        X = pd.DataFrame(X, columns=self.feature_names_in_).copy()
        for col in self.missing_columns_:
            X[f'{col}_missing'] = X[col].isna().astype(int)
        return X

    def get_feature_names_out(self, input_features=None):
        indicators = [f'{col}_missing' for col in self.missing_columns_]
        return list(self.feature_names_in_) + indicators

Step 2: Define the ColumnTransformer

The numeric and categorical pipelines are defined separately and combined:

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif

# Column definitions
num_cols = [
    'tenure_months', 'monthly_charge', 'total_hours_last_30d',
    'sessions_last_7d', 'support_tickets_last_90d',
    'days_since_last_login', 'hours_per_session',
    'tickets_per_tenure_month', 'charge_per_hour', 'recency_score',
    'total_hours_last_30d_missing', 'sessions_last_7d_missing',
    'days_since_last_login_missing'
]
cat_cols = ['plan_type', 'device_type']

numeric_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_pipeline = Pipeline([
    ('encoder', OneHotEncoder(
        sparse_output=False,
        handle_unknown='ignore',
        min_frequency=50  # Collapse rare categories
    ))
])

preprocessor = ColumnTransformer([
    ('num', numeric_pipeline, num_cols),
    ('cat', categorical_pipeline, cat_cols)
], remainder='drop')

Step 3: Assemble the Full Pipeline

from sklearn.linear_model import LogisticRegression

streamflow_pipeline = Pipeline([
    ('feature_engineer', StreamFlowFeatureEngineer(fill_value=0.0)),
    ('missing_indicators', MissingIndicatorAdder(threshold=0.01)),
    ('preprocessor', preprocessor),
    ('selector', SelectKBest(f_classif, k=15)),
    ('model', LogisticRegression(random_state=42, max_iter=1000))
])

Step 4: Validate

from sklearn.model_selection import cross_val_score, train_test_split

# Simulate full StreamFlow dataset
np.random.seed(42)
n = 50000

df = pd.DataFrame({
    'tenure_months': np.random.exponential(18, n).clip(1, 120).astype(int),
    'monthly_charge': np.random.uniform(9.99, 49.99, n).round(2),
    'total_hours_last_30d': np.where(
        np.random.random(n) < 0.12, np.nan,
        np.random.exponential(15, n).round(1)
    ),
    'sessions_last_7d': np.where(
        np.random.random(n) < 0.12, np.nan,
        np.random.poisson(5, n).astype(float)
    ),
    'support_tickets_last_90d': np.random.poisson(0.8, n),
    'days_since_last_login': np.where(
        np.random.random(n) < 0.05, np.nan,
        np.random.exponential(5, n).round(0)
    ),
    'plan_type': np.random.choice(
        ['basic', 'standard', 'premium'], n, p=[0.5, 0.35, 0.15]
    ),
    'device_type': np.random.choice(
        ['mobile', 'desktop', 'tablet', 'smart_tv'], n, p=[0.4, 0.3, 0.2, 0.1]
    ),
})

# Synthetic churn target
df['churned'] = (
    (df['total_hours_last_30d'].fillna(0) < 5).astype(int) * 0.3
    + (df['support_tickets_last_90d'] > 2).astype(int) * 0.2
    + (df['tenure_months'] < 6).astype(int) * 0.15
    + (df['days_since_last_login'].fillna(30) > 14).astype(int) * 0.15
    + np.random.random(n) * 0.2
) > 0.5
df['churned'] = df['churned'].astype(int)

X = df.drop('churned', axis=1)
y = df['churned']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Cross-validation
cv_scores = cross_val_score(
    streamflow_pipeline, X_train, y_train,
    cv=5, scoring='roc_auc'
)

print("StreamFlow Pipeline v1 --- Cross-Validation Results")
print("=" * 55)
print(f"  CV AUC (mean): {cv_scores.mean():.4f}")
print(f"  CV AUC (std):  {cv_scores.std():.4f}")
print(f"  Per-fold:      {cv_scores.round(4)}")
StreamFlow Pipeline v1 --- Cross-Validation Results
=======================================================
  CV AUC (mean): 0.8143
  CV AUC (std):  0.0045
  Per-fold:      [0.8112 0.8189 0.8097 0.8201 0.8116]

Step 5: Fit and Serialize

import joblib

# Fit on full training data
streamflow_pipeline.fit(X_train, y_train)

# Evaluate on holdout
from sklearn.metrics import roc_auc_score, classification_report

y_proba = streamflow_pipeline.predict_proba(X_test)[:, 1]
y_pred = streamflow_pipeline.predict(X_test)

print(f"\nHoldout AUC: {roc_auc_score(y_test, y_proba):.4f}")
print(f"\nClassification Report:")
print(classification_report(y_test, y_pred))

# Save
joblib.dump(streamflow_pipeline, 'streamflow_pipeline_v1.joblib')

metadata = {
    'version': 'v1',
    'sklearn_version': '1.5.2',
    'trained_date': '2025-02-01',
    'training_rows': len(X_train),
    'features': list(X_train.columns),
    'cv_auc_mean': float(cv_scores.mean()),
    'cv_auc_std': float(cv_scores.std()),
    'random_state': 42,
}
joblib.dump(metadata, 'streamflow_pipeline_v1_meta.joblib')

print("\nPipeline and metadata saved.")
Holdout AUC: 0.8178

Classification Report:
              precision    recall  f1-score   support

           0       0.80      0.82      0.81      5247
           1       0.73      0.70      0.71      4753

    accuracy                           0.76     10000
   macro avg       0.76      0.76      0.76     10000
weighted avg       0.76      0.76      0.76     10000

Pipeline and metadata saved.

Step 6: Verify Serialization Round-Trip

# Load and verify
loaded_pipeline = joblib.load('streamflow_pipeline_v1.joblib')
loaded_proba = loaded_pipeline.predict_proba(X_test)[:, 1]

assert np.allclose(y_proba, loaded_proba), "Predictions do not match!"
print("Serialization round-trip verified: predictions are identical.")
Serialization round-trip verified: predictions are identical.

What Changed

The two-week integration was reduced to a single object:

Before (5 Notebooks) After (1 Pipeline)
5 separate functions with implicit ordering 1 Pipeline with enforced ordering
5 fitted objects to serialize separately 1 joblib file
Column names tracked manually Column names tracked by transformers
Fit/transform correctness depends on the developer Fit/transform correctness enforced by the Pipeline API
Cross-validation requires manual fold management cross_val_score(pipeline, X, y)
New team member needs to read 5 notebooks New team member reads the pipeline definition
Production deployment requires orchestrating 5 steps Production loads 1 file and calls .predict()

Lessons Learned

  1. Integration is where reproducibility dies. Each individual component can be correct, but the connections between them --- column names, feature ordering, fit/transform state --- are implicit contracts that break silently. A Pipeline makes those contracts explicit and enforceable.

  2. Custom transformers are the bridge between domain knowledge and the scikit-learn API. The engagement ratio calculations and missing indicators from Chapters 6 and 8 were originally ad-hoc pandas operations. Wrapping them in BaseEstimator / TransformerMixin classes makes them testable, reusable, and compatible with cross-validation.

  3. The handle_unknown='ignore' parameter is not optional. The enterprise plan type broke the sequential approach because the encoder had never seen it. In the pipeline, handle_unknown='ignore' encodes unseen categories as all-zero vectors. The model degrades gracefully instead of crashing.

  4. Metadata is as important as the model. The version, training date, feature list, and CV score saved alongside the pipeline make it possible to audit, compare, and debug the model months later. The pipeline file answers "what does this model do." The metadata file answers "when was it trained, on what, and how well."

  5. The pipeline is the contract between data science and engineering. The data engineer does not need to understand imputation strategies or engagement ratio formulas. They need to call pipeline.predict(df) and get predictions. The pipeline encapsulates the complexity.


Discussion Questions

  1. The team discovered that the sequential function approach had a fit/transform leakage bug that inflated the AUC by approximately 0.03. In a production churn prediction system, what are the business consequences of this overestimation?

  2. The custom StreamFlowFeatureEngineer transformer computes recency_score as days_since_last_login / 30.0, clipped to [0, 1]. This normalization is hardcoded, not learned from data. Is this a problem? When would you want to learn the normalization range from training data instead?

  3. The pipeline uses SelectKBest(k=15), which was chosen as a reasonable default. How would you determine the optimal k? What pipeline feature (covered later in Chapter 18) would you use?

  4. The MissingIndicatorAdder transformer learns which columns have missingness above the threshold during fit. What happens if a column that had 2% missingness during training suddenly has 0% missingness in production data? Does the pipeline still work? Should you be concerned?


This case study supports Chapter 10: Building Reproducible Data Pipelines. Return to the chapter for the full discussion.