> War Story --- A fintech startup spent four months building a credit-risk model. The data scientist who built it reported an AUC of 0.88 on the holdout set. The engineering team deployed it. Two weeks later, the model was producing scores that made...
In This Chapter
- scikit-learn Pipelines, Column Transformers, and Custom Transformers
- The Model That Worked on Tuesday
- Why Pipelines Exist
- The Pipeline API
- ColumnTransformer: Different Transformations for Different Features
- The Full Pipeline: Preprocessing + Model
- Pipelines and Cross-Validation
- Custom Transformers
- Feature Names and Pipeline Introspection
- Ensuring Reproducibility
- Serialization with joblib
- Putting It All Together: The StreamFlow Pipeline
- Advanced Pipeline Patterns
- Common Pipeline Mistakes
- Progressive Project M3 (Part 4): The StreamFlow Preprocessing Pipeline
- Chapter Summary
Chapter 10: Building Reproducible Data Pipelines
scikit-learn Pipelines, Column Transformers, and Custom Transformers
Learning Objectives
By the end of this chapter, you will be able to:
- Build end-to-end scikit-learn Pipelines that chain preprocessing and modeling
- Use ColumnTransformer to apply different transformations to different feature types
- Write custom transformers (BaseEstimator, TransformerMixin) for domain-specific logic
- Ensure pipeline reproducibility with versioning and random seeds
- Serialize and deserialize pipelines with joblib
The Model That Worked on Tuesday
War Story --- A fintech startup spent four months building a credit-risk model. The data scientist who built it reported an AUC of 0.88 on the holdout set. The engineering team deployed it. Two weeks later, the model was producing scores that made no sense --- customers with perfect payment histories were flagged as high risk, and customers with recent defaults were scored as low risk.
The root cause took three days to find. The data scientist's notebook applied StandardScaler before one-hot encoding. The production pipeline applied one-hot encoding before StandardScaler. The feature order was different. The scaling parameters were computed on a different column arrangement. The model was receiving inputs in a format it had never seen during training. Nothing crashed. No error was thrown. The predictions were simply wrong.
The fix took fifteen minutes: wrap everything in a scikit-learn Pipeline. The investigation, the rollback, the redeployment, and the customer complaints took three weeks and cost an estimated $340,000 in misclassified loan decisions.
If your preprocessing steps are not in a Pipeline, your results are not reproducible --- they are folklore.
That is not hyperbole. Every chapter in Part II has introduced a preprocessing technique: SQL extraction, feature engineering, categorical encoding, imputation, feature selection. Each technique is powerful on its own. But the moment you have more than one transformation, the order in which they are applied, the data they are fitted on, and the parameters they learn all become dependencies that must be tracked, reproduced, and deployed as a single unit.
A Jupyter notebook with ten cells of preprocessing code is not a pipeline. It is a sequence of manual operations that works today because you ran the cells in the right order, on the right data, with the right library versions. Run them out of order, refit a scaler on the test set, or swap two columns, and you get the war story above.
scikit-learn's Pipeline and ColumnTransformer solve this problem. They encode the entire preprocessing-to-prediction workflow as a single object that can be fitted, evaluated, serialized, and deployed. This chapter shows you how to build them, extend them with custom transformers, and save them for production.
Why Pipelines Exist
The Manual Approach and Its Failure Modes
Consider the preprocessing steps we have built across Chapters 5--9 for the StreamFlow churn dataset:
- Impute missing values (Chapter 8)
- Encode categorical features (Chapter 7)
- Scale numeric features (Chapter 6)
- Select features (Chapter 9)
- Train a model (Chapter 11, but we need the interface now)
Without a pipeline, the code looks something like this:
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
np.random.seed(42)
# Simulate StreamFlow data for demonstration
n = 10000
df = pd.DataFrame({
'tenure_months': np.random.exponential(18, n).clip(1, 120).astype(int),
'monthly_charge': np.random.uniform(9.99, 49.99, n).round(2),
'total_hours_last_30d': np.where(
np.random.random(n) < 0.12, np.nan,
np.random.exponential(15, n).round(1)
),
'sessions_last_7d': np.where(
np.random.random(n) < 0.12, np.nan,
np.random.poisson(5, n).astype(float)
),
'support_tickets_last_90d': np.random.poisson(0.8, n),
'plan_type': np.random.choice(['basic', 'standard', 'premium'], n, p=[0.5, 0.35, 0.15]),
'device_type': np.random.choice(['mobile', 'desktop', 'tablet', 'smart_tv'], n, p=[0.4, 0.3, 0.2, 0.1]),
})
df['churned'] = (
(df['total_hours_last_30d'].fillna(0) < 5).astype(int) * 0.3
+ (df['support_tickets_last_90d'] > 2).astype(int) * 0.2
+ (df['tenure_months'] < 6).astype(int) * 0.15
+ np.random.random(n) * 0.35
) > 0.5
df['churned'] = df['churned'].astype(int)
X = df.drop('churned', axis=1)
y = df['churned']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Step 1: Impute numeric features
num_cols = ['tenure_months', 'monthly_charge', 'total_hours_last_30d',
'sessions_last_7d', 'support_tickets_last_90d']
cat_cols = ['plan_type', 'device_type']
imputer = SimpleImputer(strategy='median')
X_train_num = imputer.fit_transform(X_train[num_cols])
X_test_num = imputer.transform(X_test[num_cols]) # correct: transform only
# Step 2: Encode categorical features
encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
X_train_cat = encoder.fit_transform(X_train[cat_cols])
X_test_cat = encoder.transform(X_test[cat_cols])
# Step 3: Combine and scale
X_train_combined = np.hstack([X_train_num, X_train_cat])
X_test_combined = np.hstack([X_test_num, X_test_cat])
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_combined)
X_test_scaled = scaler.transform(X_test_combined)
# Step 4: Select features
selector = SelectKBest(f_classif, k=8)
X_train_selected = selector.fit_transform(X_train_scaled, y_train)
X_test_selected = selector.transform(X_test_scaled)
# Step 5: Train model
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train_selected, y_train)
print(f"Train accuracy: {model.score(X_train_selected, y_train):.4f}")
print(f"Test accuracy: {model.score(X_test_selected, y_test):.4f}")
This code works. It also has at least four ways to silently break:
Failure mode 1: Fit/transform confusion. Replace imputer.transform(X_test[num_cols]) with imputer.fit_transform(X_test[num_cols]) and you have test-set leakage. The imputer now learns medians from the test data. No error is raised. The evaluation metric is optimistic.
Failure mode 2: Column order mismatch. If the production system sends features in a different order than the training data, every downstream transformation produces garbage. The scaler subtracts the wrong mean. The model multiplies by the wrong coefficients.
Failure mode 3: Deployment fragmentation. To deploy this model, you must serialize and ship five separate objects: the imputer, the encoder, the scaler, the selector, and the model. Miss one, load a stale version of one, and the system silently produces wrong predictions.
Failure mode 4: Reproducibility gap. Six months later, a new team member needs to retrain the model on fresh data. They read the notebook. They run the cells. They get different results because they refactored the code and inadvertently applied scaling before imputation. Nothing in the code enforces the ordering.
Pipelines eliminate all four failure modes.
The Pipeline API
Your First Pipeline
A scikit-learn Pipeline chains a sequence of transformers followed by a final estimator. Each step has a name and an object:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
pipe = Pipeline([
('scaler', StandardScaler()),
('model', LogisticRegression(random_state=42, max_iter=1000))
])
# fit calls scaler.fit_transform(X_train, y_train), then model.fit(result, y_train)
pipe.fit(X_train[num_cols].fillna(X_train[num_cols].median()), y_train)
# predict calls scaler.transform(X_test), then model.predict(result)
y_pred = pipe.predict(X_test[num_cols].fillna(X_test[num_cols].median()))
When you call pipe.fit(X, y):
- The first step calls
scaler.fit_transform(X, y)--- fitting and transforming - The transformed output is passed to the next step
- The final step calls
model.fit(transformed_X, y)
When you call pipe.predict(X):
- The first step calls
scaler.transform(X)--- transform only, no fitting - The transformed output is passed to the next step
- The final step calls
model.predict(transformed_X)
This single design choice --- fit_transform during training, transform during prediction --- eliminates the fit/transform confusion permanently. You cannot accidentally refit the scaler on test data because the pipeline enforces the correct behavior.
make_pipeline: Less Typing, Same Result
If you do not need custom step names, make_pipeline generates them automatically from the class names:
from sklearn.pipeline import make_pipeline
pipe = make_pipeline(
StandardScaler(),
LogisticRegression(random_state=42, max_iter=1000)
)
# Step names are auto-generated: 'standardscaler', 'logisticregression'
print(pipe.named_steps)
{'standardscaler': StandardScaler(), 'logisticregression': LogisticRegression(max_iter=1000, random_state=42)}
Practitioner Note --- Use
Pipelinewith explicit names in production code and shared projects. Usemake_pipelinein exploratory analysis and prototyping. The explicit names make debugging, logging, and hyperparameter tuning significantly easier.
Accessing Pipeline Steps
You can access individual steps by name or index:
# By name
pipe.named_steps['standardscaler']
# By index
pipe[0] # StandardScaler
pipe[-1] # LogisticRegression
# Get the fitted scaler's parameters
print(f"Feature means: {pipe.named_steps['standardscaler'].mean_[:3]}")
print(f"Feature stds: {pipe.named_steps['standardscaler'].scale_[:3]}")
ColumnTransformer: Different Transformations for Different Features
The Problem
The simple Pipeline above works when every feature gets the same transformation. In practice, that almost never happens. Numeric features need imputation and scaling. Categorical features need encoding. Some features need no transformation at all.
ColumnTransformer applies different transformer pipelines to different subsets of columns, then concatenates the results:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
# Define column groups
num_cols = ['tenure_months', 'monthly_charge', 'total_hours_last_30d',
'sessions_last_7d', 'support_tickets_last_90d']
cat_cols = ['plan_type', 'device_type']
# Define per-group pipelines
numeric_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_pipeline = Pipeline([
('encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore'))
])
# Combine with ColumnTransformer
preprocessor = ColumnTransformer([
('num', numeric_pipeline, num_cols),
('cat', categorical_pipeline, cat_cols)
])
Each entry in the ColumnTransformer is a tuple of (name, transformer, columns). The columns can be specified as:
- A list of column names (for DataFrames):
['tenure_months', 'monthly_charge'] - A list of integer indices (for arrays):
[0, 1, 2] - A boolean mask:
[True, True, False, False, True] - A callable that returns a list of columns
- A
make_column_selectorobject (see below)
make_column_transformer: The Shorthand
Like make_pipeline, there is a shorthand for ColumnTransformer:
from sklearn.compose import make_column_transformer
preprocessor = make_column_transformer(
(numeric_pipeline, num_cols),
(categorical_pipeline, cat_cols)
)
Practitioner Note --- Use
ColumnTransformerwith explicit names in production. Usemake_column_transformerfor prototyping. The same reasoning applies: explicit names make debugging and hyperparameter tuning easier.
Dynamic Column Selection with make_column_selector
Hardcoding column names works for fixed datasets. For more flexible pipelines, use make_column_selector to select columns by dtype:
from sklearn.compose import make_column_selector
preprocessor = ColumnTransformer([
('num', numeric_pipeline, make_column_selector(dtype_include='number')),
('cat', categorical_pipeline, make_column_selector(dtype_include='object'))
])
This is powerful but requires that your DataFrame dtypes are set correctly before entering the pipeline. A plan_type column stored as integer codes instead of strings will be routed to the numeric pipeline. Always verify dtypes before fitting.
The remainder Parameter
By default, columns not mentioned in any transformer are dropped. The remainder parameter controls this:
# Drop unspecified columns (default)
preprocessor = ColumnTransformer(
transformers=[('num', numeric_pipeline, num_cols)],
remainder='drop'
)
# Pass unspecified columns through unchanged
preprocessor = ColumnTransformer(
transformers=[('num', numeric_pipeline, num_cols)],
remainder='passthrough'
)
# Apply a transformer to unspecified columns
preprocessor = ColumnTransformer(
transformers=[('num', numeric_pipeline, num_cols)],
remainder=StandardScaler()
)
Practitioner Note --- Use
remainder='passthrough'during development so you do not accidentally lose features. Switch to explicit column lists andremainder='drop'in production, so that unexpected new columns in the input data cause a visible failure instead of silently passing through.
The Full Pipeline: Preprocessing + Model
Now we combine the ColumnTransformer with a model in a single Pipeline:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.linear_model import LogisticRegression
num_cols = ['tenure_months', 'monthly_charge', 'total_hours_last_30d',
'sessions_last_7d', 'support_tickets_last_90d']
cat_cols = ['plan_type', 'device_type']
# Nested pipeline: numeric features get imputation + scaling
numeric_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Nested pipeline: categorical features get one-hot encoding
categorical_pipeline = Pipeline([
('encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore'))
])
# ColumnTransformer combines the two
preprocessor = ColumnTransformer([
('num', numeric_pipeline, num_cols),
('cat', categorical_pipeline, cat_cols)
])
# Full pipeline: preprocessing -> feature selection -> model
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('selector', SelectKBest(f_classif, k=8)),
('model', LogisticRegression(random_state=42, max_iter=1000))
])
# One call to fit. One call to predict.
full_pipeline.fit(X_train, y_train)
print(f"Train accuracy: {full_pipeline.score(X_train, y_train):.4f}")
print(f"Test accuracy: {full_pipeline.score(X_test, y_test):.4f}")
Train accuracy: 0.7241
Test accuracy: 0.7175
Compare this to the manual approach. The entire preprocessing-to-prediction workflow is a single object. There is no opportunity to fit the imputer on the test set. There is no column ordering to track. There is one object to serialize for deployment.
Nested Pipeline Structure
The full pipeline has a tree structure:
full_pipeline
|-- preprocessor (ColumnTransformer)
| |-- num (Pipeline)
| | |-- imputer (SimpleImputer)
| | |-- scaler (StandardScaler)
| |-- cat (Pipeline)
| |-- encoder (OneHotEncoder)
|-- selector (SelectKBest)
|-- model (LogisticRegression)
You can access any node using double-underscore notation:
# Access the numeric imputer's learned medians
print(full_pipeline.named_steps['preprocessor']
.named_transformers_['num']
.named_steps['imputer']
.statistics_)
# Shorter: use set_params/get_params with __ notation
print(full_pipeline.get_params()['preprocessor__num__imputer__strategy'])
[12. 24.99 14.9 5. 1. ]
median
This double-underscore path notation is essential for hyperparameter tuning (Chapter 18), where you will pass parameter grids like {'preprocessor__num__imputer__strategy': ['mean', 'median']} to GridSearchCV.
Pipelines and Cross-Validation
One of the most important benefits of pipelines is correct cross-validation. When preprocessing and modeling are in the same pipeline, cross_val_score refits the entire pipeline on each fold:
from sklearn.model_selection import cross_val_score
scores = cross_val_score(
full_pipeline, X_train, y_train,
cv=5, scoring='roc_auc'
)
print(f"CV AUC: {scores.mean():.4f} +/- {scores.std():.4f}")
print(f"Per-fold: {scores.round(4)}")
CV AUC: 0.7083 +/- 0.0112
Per-fold: [0.7178 0.6932 0.7054 0.7198 0.7054]
Without a pipeline, you would have to manually refit the imputer, encoder, scaler, and selector on each training fold. With a pipeline, cross_val_score handles it automatically. The imputer is fitted on fold 1 training data, transforms fold 1 training and validation data, and the result is discarded before fold 2 begins. No leakage.
Critical Point --- If you fit any preprocessing step on the full training set before passing data to
cross_val_score, the cross-validation estimates are optimistic. The preprocessing step has already seen the validation fold. This is the most common source of inflated CV scores in practice. Pipelines prevent it by construction.
Custom Transformers
Why You Need Them
scikit-learn ships with transformers for standard operations: imputation, scaling, encoding, feature selection. But domain-specific preprocessing rarely fits neatly into those boxes.
In the StreamFlow project, Chapter 6 introduced features like hours_change_30d (change in viewing hours between the current 30-day window and the previous 30-day window). Chapter 8 introduced missing indicators. These are domain-specific transformations that do not exist in scikit-learn. To include them in a pipeline, you need custom transformers.
FunctionTransformer: The Quick Solution
For stateless transformations --- those that do not learn anything from the data --- FunctionTransformer wraps a plain function:
from sklearn.preprocessing import FunctionTransformer
import numpy as np
# Log-transform a feature (stateless --- no fitting required)
log_transformer = FunctionTransformer(
func=np.log1p,
inverse_func=np.expm1,
validate=True
)
# Add missing indicators (stateless)
def add_missing_indicators(X):
"""Add binary columns indicating where values were originally missing."""
X_out = X.copy()
for col in X.columns:
if X[col].isna().any():
X_out[f'{col}_missing'] = X[col].isna().astype(int)
return X_out
missing_indicator_transformer = FunctionTransformer(
func=add_missing_indicators,
validate=False # Accept DataFrames
)
Practitioner Note ---
FunctionTransformeris perfect for quick prototyping. For anything that will go to production, write a proper custom transformer class. The class approach gives youget_params,set_params, proper cloning for cross-validation, andget_feature_names_out.
Writing a Full Custom Transformer
A proper custom transformer inherits from BaseEstimator and TransformerMixin:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np
class MissingIndicatorTransformer(BaseEstimator, TransformerMixin):
"""Add binary columns indicating original missingness.
For each column with missing values in the training data,
creates a corresponding '_missing' indicator column.
Parameters
----------
threshold : float, default=0.01
Minimum fraction of missing values in training data to
create an indicator. Columns with less missingness than
this threshold are skipped.
"""
def __init__(self, threshold=0.01):
self.threshold = threshold
def fit(self, X, y=None):
"""Learn which columns have missing values above the threshold."""
X = pd.DataFrame(X)
missing_rates = X.isna().mean()
self.missing_columns_ = missing_rates[
missing_rates >= self.threshold
].index.tolist()
self.feature_names_in_ = list(X.columns)
return self
def transform(self, X):
"""Add indicator columns for the learned missing columns."""
X = pd.DataFrame(X, columns=self.feature_names_in_)
X_out = X.copy()
for col in self.missing_columns_:
X_out[f'{col}_missing'] = X_out[col].isna().astype(int)
return X_out
def get_feature_names_out(self, input_features=None):
"""Return output feature names for pipeline introspection."""
base_features = list(self.feature_names_in_)
indicator_features = [f'{col}_missing' for col in self.missing_columns_]
return base_features + indicator_features
The contract is simple:
__init__: Store parameters as attributes. Do not do any data-dependent work here.fit(X, y=None): Learn parameters from training data. Always returnself. Attributes learned during fit should end with an underscore (e.g.,missing_columns_).transform(X): Apply the learned transformation. Do not fit anything here.get_feature_names_out: Return the output column names. This powers pipeline introspection.
BaseEstimator provides get_params() and set_params() for free --- which means your transformer automatically works with GridSearchCV and clone(). TransformerMixin provides fit_transform() for free.
A Domain-Specific Custom Transformer: Engagement Ratios
Here is a custom transformer that creates the engagement ratio features introduced in Chapter 6:
class EngagementRatioTransformer(BaseEstimator, TransformerMixin):
"""Create engagement ratio features for StreamFlow subscribers.
Computes:
- hours_per_session: total_hours_last_30d / sessions_last_30d
- tickets_per_tenure_month: support_tickets / tenure_months
- charge_per_hour: monthly_charge / total_hours_last_30d
Parameters
----------
fill_value : float, default=0.0
Value to use when a denominator is zero.
"""
def __init__(self, fill_value=0.0):
self.fill_value = fill_value
def fit(self, X, y=None):
self.feature_names_in_ = list(X.columns) if hasattr(X, 'columns') else None
return self
def transform(self, X):
X = pd.DataFrame(X).copy()
# hours_per_session
X['hours_per_session'] = np.where(
X['sessions_last_7d'] > 0,
X['total_hours_last_30d'] / X['sessions_last_7d'],
self.fill_value
)
# tickets_per_tenure_month
X['tickets_per_tenure_month'] = np.where(
X['tenure_months'] > 0,
X['support_tickets_last_90d'] / X['tenure_months'],
self.fill_value
)
# charge_per_hour
X['charge_per_hour'] = np.where(
X['total_hours_last_30d'] > 0,
X['monthly_charge'] / X['total_hours_last_30d'],
self.fill_value
)
return X
def get_feature_names_out(self, input_features=None):
base = list(self.feature_names_in_) if self.feature_names_in_ else []
return base + ['hours_per_session', 'tickets_per_tenure_month',
'charge_per_hour']
Custom Transformer Rules
Rules for Custom Transformers
Every
__init__parameter must be stored as an attribute with the same name.self.threshold = threshold, notself._thresh = threshold.get_paramsreads attribute names directly.Never modify
__init__parameters duringfit. If you need to store learned values, create new attributes with trailing underscores.
fitmust returnself. Always.
transformmust not fit anything. If you find yourself calling.fit()insidetransform, you have a bug.Handle both DataFrames and arrays. Production systems may pass either. Convert to DataFrame at the top of
transformif you need column names.Implement
get_feature_names_outif you add or remove columns. Without it, downstream pipeline introspection breaks.
Feature Names and Pipeline Introspection
scikit-learn 1.0+ supports set_output and get_feature_names_out for tracking feature names through a pipeline. This is essential for interpretability --- you need to know which feature a coefficient or importance score corresponds to.
Enabling DataFrame Output
import sklearn
sklearn.set_config(transform_output='pandas')
# Or per-pipeline:
full_pipeline.set_output(transform='pandas')
With this setting, every transformer in the pipeline outputs a DataFrame with named columns instead of a numpy array.
Tracing Feature Names Through a ColumnTransformer
# After fitting the pipeline
full_pipeline.fit(X_train, y_train)
# Get feature names after preprocessing
preprocessor = full_pipeline.named_steps['preprocessor']
feature_names = preprocessor.get_feature_names_out()
print("Features after preprocessing:")
for i, name in enumerate(feature_names):
print(f" {i:2d}: {name}")
Features after preprocessing:
0: num__tenure_months
1: num__monthly_charge
2: num__total_hours_last_30d
3: num__sessions_last_7d
4: num__support_tickets_last_90d
5: cat__plan_type_basic
6: cat__plan_type_premium
7: cat__plan_type_standard
8: cat__device_type_desktop
9: cat__device_type_mobile
10: cat__device_type_smart_tv
11: cat__device_type_tablet
The prefix (num__, cat__) indicates which transformer pipeline produced each feature. This is invaluable when debugging: if a feature importance plot shows that cat__plan_type_premium is the top feature, you know exactly which transformer and which input column produced it.
Ensuring Reproducibility
A pipeline encodes the what of preprocessing. Reproducibility also requires controlling the when and with what.
Random Seeds
Any step that involves randomness must have a fixed seed:
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('selector', SelectKBest(f_classif, k=8)),
('model', LogisticRegression(random_state=42, max_iter=1000))
])
If you use a model with randomness (random forests, gradient boosting), set random_state=42 explicitly. If you use a preprocessing step with randomness (such as IterativeImputer), set its random_state as well.
Practitioner Note --- The specific seed value does not matter. What matters is that you set one, and that you set the same one every time.
42is a convention, not a requirement. The point is determinism, not numerology.
Version Tracking
A serialized pipeline is only reproducible if you know the library versions used to create it. Record them:
import sklearn
import pandas as pd
import numpy as np
import joblib
version_info = {
'sklearn': sklearn.__version__,
'pandas': pd.__version__,
'numpy': np.__version__,
'joblib': joblib.__version__,
'python': '3.11.7',
'trained_date': '2025-02-01',
'training_rows': len(X_train),
'training_columns': list(X_train.columns),
'random_state': 42,
}
print("Pipeline version info:")
for k, v in version_info.items():
print(f" {k}: {v}")
Serialization with joblib
Saving a Pipeline
joblib is the standard tool for serializing scikit-learn pipelines. It handles numpy arrays more efficiently than Python's pickle:
import joblib
# Save the entire fitted pipeline
joblib.dump(full_pipeline, 'streamflow_churn_pipeline_v1.joblib')
# Save version metadata alongside the pipeline
joblib.dump(version_info, 'streamflow_churn_pipeline_v1_meta.joblib')
print("Pipeline saved.")
Loading and Using a Pipeline
# Load the pipeline --- anywhere, any time
loaded_pipeline = joblib.load('streamflow_churn_pipeline_v1.joblib')
# Use it exactly as before
y_pred = loaded_pipeline.predict(X_test)
y_proba = loaded_pipeline.predict_proba(X_test)[:, 1]
print(f"Loaded pipeline test accuracy: {loaded_pipeline.score(X_test, y_test):.4f}")
Loaded pipeline test accuracy: 0.7175
The loaded pipeline is identical to the original. Same preprocessing steps, same learned parameters, same model weights. One file, one object, complete reproducibility.
joblib vs. pickle
import pickle
# pickle also works
with open('streamflow_churn_pipeline_v1.pkl', 'wb') as f:
pickle.dump(full_pipeline, f)
with open('streamflow_churn_pipeline_v1.pkl', 'rb') as f:
loaded_from_pickle = pickle.load(f)
Practitioner Note --- Use
joblibfor scikit-learn pipelines. It compresses large numpy arrays (common in fitted transformers) and is 2--10x faster thanpicklefor models with large parameter arrays. Usepicklewhen you need compatibility with systems that do not havejoblibinstalled.
Security Warning
Security Warning --- Never load a joblib or pickle file from an untrusted source. Both formats execute arbitrary code during deserialization. A malicious
.joblibfile can run any Python code on your machine. Only load serialized pipelines that you created or that come from a trusted, verified source.
Putting It All Together: The StreamFlow Pipeline
This is the culmination of Part II. Every technique from Chapters 5--9 assembled into a single, reproducible pipeline:
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.linear_model import LogisticRegression
import joblib
# ---- Custom Transformers ----
class MissingIndicatorTransformer(BaseEstimator, TransformerMixin):
"""Add binary missing indicators for columns with missingness above threshold."""
def __init__(self, threshold=0.01):
self.threshold = threshold
def fit(self, X, y=None):
X = pd.DataFrame(X)
missing_rates = X.isna().mean()
self.missing_columns_ = missing_rates[
missing_rates >= self.threshold
].index.tolist()
self.feature_names_in_ = list(X.columns)
return self
def transform(self, X):
X = pd.DataFrame(X, columns=self.feature_names_in_)
X_out = X.copy()
for col in self.missing_columns_:
X_out[f'{col}_missing'] = X_out[col].isna().astype(int)
return X_out
def get_feature_names_out(self, input_features=None):
base = list(self.feature_names_in_)
indicators = [f'{col}_missing' for col in self.missing_columns_]
return base + indicators
class EngagementRatioTransformer(BaseEstimator, TransformerMixin):
"""Compute domain-specific engagement ratios for StreamFlow data."""
def __init__(self, fill_value=0.0):
self.fill_value = fill_value
def fit(self, X, y=None):
self.feature_names_in_ = list(X.columns) if hasattr(X, 'columns') else None
return self
def transform(self, X):
X = pd.DataFrame(X).copy()
X['hours_per_session'] = np.where(
X['sessions_last_7d'] > 0,
X['total_hours_last_30d'] / X['sessions_last_7d'],
self.fill_value
)
X['tickets_per_tenure_month'] = np.where(
X['tenure_months'] > 0,
X['support_tickets_last_90d'] / X['tenure_months'],
self.fill_value
)
X['charge_per_hour'] = np.where(
X['total_hours_last_30d'] > 0,
X['monthly_charge'] / X['total_hours_last_30d'],
self.fill_value
)
return X
def get_feature_names_out(self, input_features=None):
base = list(self.feature_names_in_) if self.feature_names_in_ else []
return base + ['hours_per_session', 'tickets_per_tenure_month',
'charge_per_hour']
# ---- Column Definitions ----
num_cols = ['tenure_months', 'monthly_charge', 'total_hours_last_30d',
'sessions_last_7d', 'support_tickets_last_90d']
cat_cols = ['plan_type', 'device_type']
# ---- Sub-Pipelines ----
numeric_pipeline = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_pipeline = Pipeline([
('encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore'))
])
# ---- ColumnTransformer ----
preprocessor = ColumnTransformer([
('num', numeric_pipeline, num_cols),
('cat', categorical_pipeline, cat_cols)
], remainder='drop')
# ---- Full Pipeline ----
streamflow_pipeline = Pipeline([
('missing_indicators', MissingIndicatorTransformer(threshold=0.01)),
('engagement_ratios', EngagementRatioTransformer(fill_value=0.0)),
('preprocessor', preprocessor),
('selector', SelectKBest(f_classif, k=10)),
('model', LogisticRegression(random_state=42, max_iter=1000))
])
# ---- Fit and Evaluate ----
streamflow_pipeline.fit(X_train, y_train)
train_score = streamflow_pipeline.score(X_train, y_train)
test_score = streamflow_pipeline.score(X_test, y_test)
print(f"StreamFlow Churn Pipeline v1")
print(f" Train accuracy: {train_score:.4f}")
print(f" Test accuracy: {test_score:.4f}")
# ---- Cross-Validation ----
from sklearn.model_selection import cross_val_score
cv_scores = cross_val_score(
streamflow_pipeline, X_train, y_train,
cv=5, scoring='roc_auc'
)
print(f" CV AUC: {cv_scores.mean():.4f} +/- {cv_scores.std():.4f}")
# ---- Serialize ----
joblib.dump(streamflow_pipeline, 'streamflow_pipeline_v1.joblib')
print("\nPipeline saved to streamflow_pipeline_v1.joblib")
StreamFlow Churn Pipeline v1
Train accuracy: 0.7236
Test accuracy: 0.7190
CV AUC: 0.7091 +/- 0.0098
Pipeline saved to streamflow_pipeline_v1.joblib
This pipeline:
- Adds missing indicators (Chapter 8) for any column with >1% missingness
- Computes engagement ratio features (Chapter 6) from domain knowledge
- Applies median imputation to numeric features (Chapter 8) and one-hot encodes categoricals (Chapter 7)
- Scales numeric features (Chapter 6)
- Selects the top 10 features (Chapter 9)
- Trains a logistic regression baseline
Every step is encoded in the pipeline object. Every learned parameter is serialized in the joblib file. The pipeline can be loaded on another machine, in a Docker container, or in a production API, and it will produce identical predictions given identical inputs.
This is what reproducibility looks like. Not "run the notebook cells in order." Not "ask the original data scientist." One object. One file. Complete specification of the data-to-prediction transformation.
Advanced Pipeline Patterns
Nested ColumnTransformers
For complex preprocessing, you can nest ColumnTransformer inside another ColumnTransformer:
# High-cardinality categoricals get target encoding
# Low-cardinality categoricals get one-hot encoding
# Numeric features get standard treatment
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder
low_card_cats = ['plan_type', 'device_type']
high_card_cats = ['primary_genre'] # 47 levels
preprocessor = ColumnTransformer([
('num', numeric_pipeline, num_cols),
('cat_low', Pipeline([
('encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore'))
]), low_card_cats),
('cat_high', Pipeline([
('encoder', OrdinalEncoder(handle_unknown='use_encoded_value',
unknown_value=-1))
]), high_card_cats),
], remainder='drop')
Caching Expensive Steps
If a preprocessing step is expensive (e.g., a large imputation), you can cache intermediate results:
from tempfile import mkdtemp
from shutil import rmtree
cache_dir = mkdtemp()
cached_pipeline = Pipeline([
('preprocessor', preprocessor),
('model', LogisticRegression(random_state=42, max_iter=1000))
], memory=cache_dir)
# First fit: computes and caches preprocessing
cached_pipeline.fit(X_train, y_train)
# Second fit (e.g., during grid search): reuses cached preprocessing
# if the preprocessor parameters and input data haven't changed
cached_pipeline.fit(X_train, y_train)
# Clean up
rmtree(cache_dir)
Visualizing Pipelines
scikit-learn can render pipelines as HTML diagrams in Jupyter notebooks:
from sklearn import set_config
set_config(display='diagram')
# In a Jupyter notebook, this renders an interactive diagram
full_pipeline
Or as plain text:
set_config(display='text')
print(full_pipeline)
Pipeline(steps=[('missing_indicators',
MissingIndicatorTransformer(threshold=0.01)),
('engagement_ratios',
EngagementRatioTransformer(fill_value=0.0)),
('preprocessor',
ColumnTransformer(transformers=[('num',
Pipeline(steps=[('imputer',
SimpleImputer(strategy='median')),
('scaler',
StandardScaler())]),
['tenure_months',
'monthly_charge',
'total_hours_last_30d',
'sessions_last_7d',
'support_tickets_last_90d']),
('cat',
Pipeline(steps=[('encoder',
OneHotEncoder(handle_unknown='ignore',
sparse_output=False))]),
['plan_type',
'device_type'])])),
('selector', SelectKBest(k=10)),
('model',
LogisticRegression(max_iter=1000, random_state=42))])
Common Pipeline Mistakes
Mistake 1: Fitting on Test Data
# WRONG: fit_transform on test data
X_test_processed = preprocessor.fit_transform(X_test)
# RIGHT: transform only on test data
X_test_processed = preprocessor.transform(X_test)
# BEST: use the pipeline --- it handles this automatically
y_pred = full_pipeline.predict(X_test)
Mistake 2: Preprocessing Outside the Pipeline
# WRONG: scale before the pipeline
X_train_scaled = StandardScaler().fit_transform(X_train[num_cols])
# Now the pipeline doesn't know about the scaling
# RIGHT: put all preprocessing inside the pipeline
pipe = Pipeline([
('scaler', StandardScaler()),
('model', LogisticRegression(random_state=42))
])
Mistake 3: Forgetting handle_unknown
# WRONG: will crash if test data has unseen categories
encoder = OneHotEncoder(sparse_output=False)
# RIGHT: handle unseen categories gracefully
encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
Mistake 4: Not Setting random_state
# WRONG: non-reproducible
model = LogisticRegression(max_iter=1000)
# RIGHT: reproducible
model = LogisticRegression(random_state=42, max_iter=1000)
Progressive Project M3 (Part 4): The StreamFlow Preprocessing Pipeline
Milestone 3, Part 4 --- Build the complete StreamFlow preprocessing Pipeline with ColumnTransformer. Save it with joblib. This pipeline will be reused in every subsequent chapter.
Deliverables
- A Python script (
streamflow_pipeline.py) that defines and fits the StreamFlow churn preprocessing pipeline - A fitted pipeline saved as
streamflow_pipeline_v1.joblib - A metadata file saved as
streamflow_pipeline_v1_meta.joblib
Requirements
Your pipeline must include:
- Missing indicators for
total_hours_last_30d,sessions_last_7d,avg_session_duration,genre_diversity_score, andemail_open_rate - Median imputation for all numeric features
- StandardScaler for all numeric features
- OneHotEncoder with
handle_unknown='ignore'for categorical features - At least one custom transformer implementing a domain-specific feature (engagement ratios, recency scores, or another feature you designed in Chapter 6)
- Feature selection using SelectKBest with k=15 (we will tune k in Chapter 18)
- LogisticRegression as a baseline model with
random_state=42 - A metadata dictionary containing library versions, training date, column names, and random state
Acceptance Criteria
cross_val_scorewith 5-fold CV andscoring='roc_auc'returns a mean AUC above 0.65- Loading the saved pipeline and calling
.predict()produces identical results to the original pipeline - The pipeline accepts a raw DataFrame (no manual preprocessing required) and returns predictions
Stretch Goals
- Add a second custom transformer for a feature not covered in this chapter
- Implement
get_feature_names_outfor all custom transformers - Add pipeline caching for the expensive preprocessing steps
- Write a function that loads the pipeline and produces predictions from a CSV file in a single call
Chapter Summary
scikit-learn Pipelines transform ad-hoc preprocessing scripts into reproducible, deployable, auditable workflows. ColumnTransformer routes different feature types through different transformation paths. Custom transformers encode domain knowledge as reusable, testable components. joblib serializes the entire pipeline --- preprocessing parameters, model weights, and all --- into a single file.
The StreamFlow pipeline you built in this chapter assembles every technique from Part II: SQL extraction results (Chapter 5), engineered features (Chapter 6), categorical encoding (Chapter 7), missing data handling (Chapter 8), and feature selection (Chapter 9) into one object. That object is the input to Part III.
From this point forward, every chapter will begin with:
import joblib
pipeline = joblib.load('streamflow_pipeline_v1.joblib')
If your preprocessing is not in a pipeline, your results depend on the order you ran your notebook cells. That is not data science. That is a ritual. Pipelines are how you turn rituals into engineering.
Next chapter: Chapter 11 --- Linear Models Revisited