Case Study 2: Building a Reproducible ML Pipeline

Overview

In this case study, we build a production-grade machine learning pipeline for a credit risk scoring application. The emphasis is not just on predictive performance but on reproducibility, maintainability, and leakage prevention---the qualities that distinguish a research prototype from a deployable system.

We demonstrate how to structure a pipeline that: - Handles heterogeneous data types cleanly. - Prevents data leakage at every stage. - Can be serialized, version-controlled, and deployed. - Supports hyperparameter tuning across all components. - Produces consistent results across runs.

This case study integrates concepts from Sections 9.7--9.9 and connects to evaluation strategies from Chapter 8.


The Problem

A lending company wants to predict whether a loan applicant will default within 12 months. The model's predictions will be used to set interest rates and approve or deny applications. Because the model affects real financial decisions, reproducibility and auditability are paramount.

Requirements

  1. The same code, data, and random seed must produce identical predictions.
  2. No information from the test set may influence any preprocessing step.
  3. The pipeline must handle new data---including unseen categories---gracefully.
  4. All feature engineering decisions must be documented and version-controlled.
  5. The model must be deployable as a single serialized artifact.

The Dataset

import numpy as np
import pandas as pd

np.random.seed(42)
n = 5000

df = pd.DataFrame({
    'applicant_id': range(n),
    'annual_income': np.random.lognormal(10.5, 0.8, n).round(2),
    'loan_amount': np.random.lognormal(9.5, 0.6, n).round(2),
    'employment_length': np.random.choice(
        ['< 1 year', '1 year', '2 years', '3 years', '5 years',
         '7 years', '10+ years'],
        n
    ),
    'home_ownership': np.random.choice(
        ['RENT', 'OWN', 'MORTGAGE', 'OTHER'], n, p=[0.4, 0.15, 0.4, 0.05]
    ),
    'purpose': np.random.choice(
        ['debt_consolidation', 'credit_card', 'home_improvement',
         'major_purchase', 'medical', 'car', 'small_business', 'other'],
        n
    ),
    'dti': np.random.uniform(0, 40, n).round(2),  # Debt-to-income
    'num_open_accounts': np.random.poisson(10, n),
    'num_derogatory': np.random.poisson(0.5, n),
    'credit_history_months': np.random.exponential(120, n).astype(int),
    'application_date': pd.date_range('2022-01-01', periods=n, freq='90min'),
    'free_text_description': np.random.choice([
        'Need to consolidate high interest debt',
        'Home renovation project funding',
        'Medical expenses not covered by insurance',
        'Starting a small business venture',
        'Purchasing a reliable vehicle for work',
        'Credit card balance transfer',
        'Emergency fund for unexpected expenses',
        'Education and professional development',
    ], n),
})

# Introduce missing values realistically
df.loc[np.random.choice(n, 300, replace=False), 'annual_income'] = np.nan
df.loc[np.random.choice(n, 200, replace=False), 'dti'] = np.nan
df.loc[np.random.choice(n, 150, replace=False), 'employment_length'] = np.nan
df.loc[np.random.choice(n, 100, replace=False), 'credit_history_months'] = np.nan

# Generate correlated target
default_prob = (
    0.15 * (df['dti'].fillna(20) > 25).astype(float) +
    0.10 * (df['num_derogatory'] > 1).astype(float) +
    0.10 * (df['annual_income'].fillna(50000) < 40000).astype(float) +
    0.08 * (df['loan_amount'] / df['annual_income'].fillna(50000) > 0.3).astype(float) +
    np.random.normal(0, 0.1, n)
).clip(0, 1)
df['default'] = np.random.binomial(1, default_prob)

Step 1: Define Column Groups

The first step in building a reproducible pipeline is to explicitly define which columns belong to which group. This serves as documentation and ensures consistency.

# Column definitions --- single source of truth
COLUMN_CONFIG = {
    'id_col': 'applicant_id',
    'target_col': 'default',
    'numerical_cols': [
        'annual_income', 'loan_amount', 'dti',
        'num_open_accounts', 'num_derogatory', 'credit_history_months'
    ],
    'categorical_cols': ['home_ownership', 'purpose'],
    'ordinal_cols': ['employment_length'],
    'ordinal_categories': [
        ['< 1 year', '1 year', '2 years', '3 years',
         '5 years', '7 years', '10+ years']
    ],
    'text_cols': ['free_text_description'],
    'datetime_cols': ['application_date'],
}

Step 2: Custom Transformers

Each custom transformation is encapsulated in a transformer class with proper fit/transform semantics.

from sklearn.base import BaseEstimator, TransformerMixin


class LoanFeatureEngineer(BaseEstimator, TransformerMixin):
    """Create domain-specific loan features.

    Derives ratio and interaction features that are known to be
    predictive in credit risk modeling.
    """

    def fit(self, X: pd.DataFrame, y=None):
        """No fitting required for deterministic feature creation.

        Args:
            X: Input DataFrame.
            y: Ignored.

        Returns:
            self
        """
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Create loan-specific engineered features.

        Args:
            X: DataFrame with raw loan features.

        Returns:
            DataFrame with additional engineered columns.
        """
        X = X.copy()

        # Loan-to-income ratio
        X['loan_to_income'] = X['loan_amount'] / X['annual_income'].clip(lower=1)

        # Derogatory marks per account
        X['derog_per_account'] = (
            X['num_derogatory'] / X['num_open_accounts'].clip(lower=1)
        )

        # Credit history in years
        X['credit_history_years'] = X['credit_history_months'] / 12

        # Log-transformed income (reduce skew)
        X['log_income'] = np.log1p(X['annual_income'])

        # Log-transformed loan amount
        X['log_loan'] = np.log1p(X['loan_amount'])

        return X


class MissingIndicatorTransformer(BaseEstimator, TransformerMixin):
    """Create binary missing value indicators.

    Learns which columns have missing values in training data and
    creates indicator columns for those in transform.

    Attributes:
        columns_with_missing: Columns that had missing values in training.
    """

    def fit(self, X: pd.DataFrame, y=None):
        """Identify columns with missing values.

        Args:
            X: Training DataFrame.
            y: Ignored.

        Returns:
            self
        """
        self.columns_with_missing_ = [
            col for col in X.columns
            if X[col].isna().any()
        ]
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Add binary missing indicators.

        Args:
            X: Input DataFrame.

        Returns:
            DataFrame with indicator columns appended.
        """
        X = X.copy()
        for col in self.columns_with_missing_:
            if col in X.columns:
                X[f'{col}_missing'] = X[col].isna().astype(int)
        return X


class DatetimeTransformer(BaseEstimator, TransformerMixin):
    """Extract features from datetime columns.

    Converts datetime columns to numerical features including
    cyclical encodings for periodic components.

    Attributes:
        datetime_cols: Names of datetime columns to process.
    """

    def __init__(self, datetime_cols: list[str] | None = None):
        self.datetime_cols = datetime_cols or []

    def fit(self, X: pd.DataFrame, y=None):
        """No fitting required.

        Args:
            X: Input DataFrame.
            y: Ignored.

        Returns:
            self
        """
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Extract datetime features.

        Args:
            X: DataFrame with datetime columns.

        Returns:
            DataFrame with datetime-derived features.
        """
        X = X.copy()
        for col in self.datetime_cols:
            if col in X.columns:
                dt = pd.to_datetime(X[col])
                X[f'{col}_month'] = dt.dt.month
                X[f'{col}_dayofweek'] = dt.dt.dayofweek
                X[f'{col}_quarter'] = dt.dt.quarter
                X[f'{col}_month_sin'] = np.sin(2 * np.pi * dt.dt.month / 12)
                X[f'{col}_month_cos'] = np.cos(2 * np.pi * dt.dt.month / 12)
                X = X.drop(columns=[col])
        return X

Step 3: Build the Pipeline

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import (
    StandardScaler, OneHotEncoder, OrdinalEncoder
)
from sklearn.impute import SimpleImputer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import GradientBoostingClassifier

# Numerical preprocessing sub-pipeline
numerical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler()),
])

# Categorical preprocessing sub-pipeline
categorical_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('encoder', OneHotEncoder(drop='first', handle_unknown='ignore',
                               sparse_output=False)),
])

# Ordinal preprocessing sub-pipeline
ordinal_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='< 1 year')),
    ('encoder', OrdinalEncoder(
        categories=COLUMN_CONFIG['ordinal_categories'],
        handle_unknown='use_encoded_value',
        unknown_value=-1
    )),
])

# Define which columns get which treatment AFTER feature engineering
# (The feature engineering steps add columns to numerical_cols)
engineered_numerical = (
    COLUMN_CONFIG['numerical_cols'] +
    ['loan_to_income', 'derog_per_account', 'credit_history_years',
     'log_income', 'log_loan']
)

# Missing indicator columns (added by MissingIndicatorTransformer)
missing_indicator_cols = [
    f'{col}_missing' for col in
    ['annual_income', 'dti', 'employment_length', 'credit_history_months']
]

# Full preprocessing with ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_pipeline, engineered_numerical),
        ('cat', categorical_pipeline, COLUMN_CONFIG['categorical_cols']),
        ('ord', ordinal_pipeline, COLUMN_CONFIG['ordinal_cols']),
        ('text', TfidfVectorizer(max_features=100, stop_words='english'),
         'free_text_description'),
        ('missing', 'passthrough', missing_indicator_cols),
        ('datetime', 'passthrough', [
            'application_date_month', 'application_date_dayofweek',
            'application_date_quarter', 'application_date_month_sin',
            'application_date_month_cos'
        ]),
    ],
    remainder='drop',
    verbose_feature_names_out=True,
)

# Complete end-to-end pipeline
full_pipeline = Pipeline([
    ('feature_engineer', LoanFeatureEngineer()),
    ('missing_indicators', MissingIndicatorTransformer()),
    ('datetime_features', DatetimeTransformer(
        datetime_cols=COLUMN_CONFIG['datetime_cols']
    )),
    ('preprocessor', preprocessor),
    ('classifier', GradientBoostingClassifier(
        n_estimators=300,
        learning_rate=0.05,
        max_depth=4,
        subsample=0.8,
        min_samples_leaf=20,
        random_state=42,
    )),
])

Step 4: Evaluate with Cross-Validation

from sklearn.model_selection import (
    train_test_split, cross_val_score, StratifiedKFold
)
from sklearn.metrics import (
    roc_auc_score, classification_report, brier_score_loss
)

# Prepare data
feature_cols = (
    COLUMN_CONFIG['numerical_cols'] +
    COLUMN_CONFIG['categorical_cols'] +
    COLUMN_CONFIG['ordinal_cols'] +
    COLUMN_CONFIG['text_cols'] +
    COLUMN_CONFIG['datetime_cols']
)

X = df[feature_cols].copy()
y = df[COLUMN_CONFIG['target_col']].copy()

# Chronological split (since we have temporal data)
split_idx = int(0.8 * len(df))
X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]

# Cross-validate on training data
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(
    full_pipeline, X_train, y_train,
    cv=cv, scoring='roc_auc', n_jobs=-1
)
print(f"CV AUC: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")

# Final training and evaluation
full_pipeline.fit(X_train, y_train)
y_pred_proba = full_pipeline.predict_proba(X_test)[:, 1]
y_pred = full_pipeline.predict(X_test)

test_auc = roc_auc_score(y_test, y_pred_proba)
brier = brier_score_loss(y_test, y_pred_proba)
print(f"Test AUC: {test_auc:.4f}")
print(f"Brier Score: {brier:.4f}")
print(classification_report(y_test, y_pred))

Step 5: Hyperparameter Tuning

The pipeline's double-underscore notation enables tuning across all components:

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform

param_distributions = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'preprocessor__text__max_features': [50, 100, 200],
    'classifier__n_estimators': randint(100, 500),
    'classifier__learning_rate': uniform(0.01, 0.19),
    'classifier__max_depth': randint(3, 8),
    'classifier__subsample': uniform(0.6, 0.4),
    'classifier__min_samples_leaf': randint(10, 50),
}

search = RandomizedSearchCV(
    full_pipeline,
    param_distributions,
    n_iter=50,
    cv=cv,
    scoring='roc_auc',
    random_state=42,
    n_jobs=-1,
    verbose=1,
)
search.fit(X_train, y_train)

print(f"Best CV AUC: {search.best_score_:.4f}")
print(f"Best params: {search.best_params_}")

Step 6: Serialization and Deployment

import joblib
from pathlib import Path

# Save the pipeline
model_dir = Path('models')
model_dir.mkdir(exist_ok=True)
model_path = model_dir / 'credit_risk_pipeline_v1.pkl'
joblib.dump(full_pipeline, model_path)
print(f"Pipeline saved to {model_path}")

# Verify reproducibility: load and predict
loaded_pipeline = joblib.load(model_path)
y_pred_loaded = loaded_pipeline.predict_proba(X_test)[:, 1]

# Confirm identical predictions
assert np.allclose(y_pred_proba, y_pred_loaded), "Predictions differ!"
print("Reproducibility verified: predictions are identical.")

# Simulate production prediction
new_application = X_test.iloc[[0]]
risk_score = loaded_pipeline.predict_proba(new_application)[:, 1][0]
print(f"Risk score for new application: {risk_score:.4f}")

Step 7: Pipeline Documentation

For auditability, document the pipeline configuration:

def document_pipeline(pipeline: Pipeline) -> dict:
    """Generate a documentation dictionary for a fitted pipeline.

    Args:
        pipeline: Fitted scikit-learn Pipeline.

    Returns:
        Dictionary describing each pipeline step and its parameters.
    """
    doc = {}
    for name, step in pipeline.named_steps.items():
        doc[name] = {
            'class': type(step).__name__,
            'params': step.get_params(),
        }
    return doc

pipeline_doc = document_pipeline(full_pipeline)
for step_name, info in pipeline_doc.items():
    print(f"\n--- {step_name} ---")
    print(f"  Class: {info['class']}")
    print(f"  Key params: {dict(list(info['params'].items())[:5])}")

Lessons Learned

  1. Explicit column definitions are essential. A single COLUMN_CONFIG dictionary serves as the source of truth for all column groupings, preventing inconsistencies.

  2. Custom transformers enforce the fit/transform contract. By implementing transformers as proper scikit-learn classes, we guarantee that no information leaks between train and test.

  3. Chronological splits matter for temporal data. Even though this is not a time series problem, application dates progress forward, and a chronological split more accurately simulates production conditions.

  4. Serialization captures the full pipeline. The saved .pkl file contains all fitted preprocessing parameters (scaler means, encoder categories, TF-IDF vocabulary), not just the model weights.

  5. The double-underscore notation enables end-to-end tuning. We tuned both preprocessing parameters (imputation strategy, TF-IDF vocabulary size) and model parameters in a single search.

  6. Reproducibility requires explicit random seeds. Every stochastic component (random_state=42) ensures that re-running the code produces identical results.


Production Considerations

Beyond what we have built here, a production deployment would additionally require:

  • Input validation: Check that incoming data matches expected schemas, types, and value ranges.
  • Monitoring: Track prediction distributions over time to detect data drift (Chapter 16).
  • A/B testing: Compare model versions before full deployment.
  • Fallback logic: Define behavior when the model cannot make a prediction (e.g., unseen categories despite handle_unknown='ignore').
  • Audit logging: Record every prediction with its inputs for regulatory compliance.
  • Model versioning: Use tools like MLflow or DVC to track model versions alongside code and data versions.

These topics are explored further in Part V (Deployment and Production).