Case Study 1: Refactoring the StreamFlow Churn Notebook


Background

StreamFlow's churn prediction model was built over the course of Chapters 1--19 in a series of Jupyter notebooks. The model works. It achieves an AUC of 0.847 on held-out data. The stakeholders are satisfied with the predictions. The data science team is proud of the feature engineering.

There is one problem: the model lives in a notebook named churn_model_final_v2_ACTUALLY_FINAL_v3.ipynb. It has 147 cells. It takes 22 minutes to run top-to-bottom. Cells 34 through 41 must be run in a specific order that is not the order they appear in the notebook. Cell 89 contains a hardcoded file path to a CSV on the original author's laptop. Cell 112 has a comment that says # TODO: fix this before production dated eight months ago.

The VP of Engineering has asked the data science team to deploy the model as an API (Chapter 31). The engineering team looked at the notebook and sent a one-line response: "We cannot deploy a notebook."

This case study walks through the complete refactoring of the StreamFlow churn notebook into a production-quality Python package.


The Notebook: An Honest Inventory

Before refactoring, we audit the notebook to understand what it actually does. Every cell falls into one of five categories:

# Audit the notebook structure
# (This analysis was done by reading the notebook manually,
# but here's the conceptual breakdown)

notebook_cells = {
    "data_loading": {
        "cells": [1, 2, 3, 4, 5, 6],
        "description": "Load CSVs, merge tables, basic dtypes",
        "destination": "src/data/make_dataset.py",
    },
    "exploration": {
        "cells": list(range(7, 34)),
        "description": "EDA: distributions, correlations, missing values, plots",
        "destination": "notebooks/01-eda.ipynb (keep as notebook)",
    },
    "feature_engineering": {
        "cells": list(range(34, 72)),
        "description": "Recency, frequency, monetary, engagement, plan features",
        "destination": "src/features/",
    },
    "modeling": {
        "cells": list(range(72, 112)),
        "description": "Train/test split, baseline, tuning, final model",
        "destination": "src/models/",
    },
    "evaluation": {
        "cells": list(range(112, 148)),
        "description": "Metrics, ROC curve, confusion matrix, SHAP",
        "destination": "src/evaluation/",
    },
}

print(f"Total cells: 147")
print(f"Cells that are production code: {147 - 27}")  # 120 cells
print(f"Cells that are exploration (keep in notebook): 27")

The audit reveals several problems:

Problem Location Severity
Hardcoded file path Cell 1 High --- breaks on any other machine
Out-of-order cell dependency Cells 34-41 High --- wrong execution order produces wrong features
Global variable reference_date Cell 5 Medium --- implicit dependency, not passed to functions
Copy-pasted feature code Cells 45, 52, 61 Medium --- three versions of rolling window logic
Unused imports Cells 1, 7, 72 Low --- clutters namespace
Magic number threshold 0.42 Cell 130 Medium --- no explanation, not configurable
# TODO: fix this comment Cell 112 Medium --- undone for 8 months

Step 1: Create the Project Structure

mkdir -p streamflow-churn/{config,data/{raw,interim,processed},models,notebooks}
mkdir -p streamflow-churn/src/{config,data,features,models,evaluation}
mkdir -p streamflow-churn/tests

# Create __init__.py files
touch streamflow-churn/src/__init__.py
touch streamflow-churn/src/{config,data,features,models,evaluation}/__init__.py
touch streamflow-churn/tests/__init__.py
streamflow-churn/
    config/
        model_config.yaml
    data/
        raw/
            subscribers.csv
            events.parquet
        interim/
        processed/
    models/
    notebooks/
        01-eda.ipynb
    src/
        __init__.py
        config/
            __init__.py
            loader.py
        data/
            __init__.py
            make_dataset.py
            validate.py
        features/
            __init__.py
            build_features.py
            recency.py
            frequency.py
            monetary.py
            engagement.py
            plan_features.py
        models/
            __init__.py
            train_model.py
            predict_model.py
        evaluation/
            __init__.py
            evaluate.py
    tests/
        __init__.py
        conftest.py
        test_data.py
        test_features.py
        test_models.py
        test_pipeline.py
    pyproject.toml
    Makefile
    .pre-commit-config.yaml
    .gitignore
    TECH_DEBT.md

Step 2: Extract Data Loading

The notebook loads data from hardcoded paths and performs basic cleaning inline. We extract this into src/data/make_dataset.py.

Before (notebook cells 1--6):

# Cell 1
import pandas as pd
events = pd.read_csv('C:/Users/sarah/Desktop/streamflow/events_2024.csv')

# Cell 2
subs = pd.read_csv('C:/Users/sarah/Desktop/streamflow/subscribers.csv')

# Cell 3
events['timestamp'] = pd.to_datetime(events['timestamp'])
subs['signup_date'] = pd.to_datetime(subs['signup_date'])

# Cell 4
events = events.dropna(subset=['user_id'])
events['user_id'] = events['user_id'].astype(int)

# Cell 5
reference_date = pd.Timestamp('2024-10-31')  # Global variable

# Cell 6
df = events.merge(subs[['user_id', 'plan_type', 'signup_date', 'monthly_revenue']], on='user_id')

After (src/data/make_dataset.py):

"""Load, clean, and merge raw StreamFlow data.

This module reads raw event logs and subscriber metadata,
performs basic cleaning (type casting, null removal), and
produces merged datasets for downstream feature engineering.
"""

import logging
from pathlib import Path

import pandas as pd

logger = logging.getLogger(__name__)


def load_events(path: Path) -> pd.DataFrame:
    """Load raw event log from Parquet file.

    Args:
        path: Path to the events Parquet file.

    Returns:
        DataFrame with columns: user_id, event_type, timestamp,
        device_type, duration_seconds, revenue, plan_type.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If required columns are missing.
    """
    logger.info(f"Loading events from {path}")
    events = pd.read_parquet(path)

    required_columns = ["user_id", "event_type", "timestamp"]
    missing = set(required_columns) - set(events.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    return events


def load_subscribers(path: Path) -> pd.DataFrame:
    """Load raw subscriber metadata from CSV.

    Args:
        path: Path to the subscribers CSV file.

    Returns:
        DataFrame with columns: user_id, signup_date, plan_type,
        monthly_revenue, churned.
    """
    logger.info(f"Loading subscribers from {path}")
    subs = pd.read_csv(path)

    required_columns = ["user_id", "signup_date", "plan_type",
                        "monthly_revenue", "churned"]
    missing = set(required_columns) - set(subs.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    return subs


def clean_events(events: pd.DataFrame) -> pd.DataFrame:
    """Clean and type-cast the raw event DataFrame.

    - Drops rows with null user_id
    - Casts user_id to int
    - Parses timestamp to datetime
    - Sorts by user_id and timestamp

    Args:
        events: Raw event DataFrame from load_events.

    Returns:
        Cleaned event DataFrame.
    """
    n_before = len(events)
    events = events.dropna(subset=["user_id"]).copy()
    n_after = len(events)
    if n_before != n_after:
        logger.warning(f"Dropped {n_before - n_after} rows with null user_id")

    events["user_id"] = events["user_id"].astype(int)
    events["timestamp"] = pd.to_datetime(events["timestamp"])
    events = events.sort_values(["user_id", "timestamp"]).reset_index(drop=True)
    return events


def clean_subscribers(subs: pd.DataFrame) -> pd.DataFrame:
    """Clean and type-cast the raw subscriber DataFrame.

    - Parses signup_date to datetime
    - Ensures churned is int (0 or 1)

    Args:
        subs: Raw subscriber DataFrame from load_subscribers.

    Returns:
        Cleaned subscriber DataFrame.
    """
    subs = subs.copy()
    subs["signup_date"] = pd.to_datetime(subs["signup_date"])
    subs["churned"] = subs["churned"].astype(int)
    return subs


def make_dataset(
    events_path: Path,
    subscribers_path: Path,
    output_path: Path,
) -> pd.DataFrame:
    """Full data loading and cleaning pipeline.

    Loads raw data, cleans it, merges events with subscriber metadata,
    and saves the merged dataset to the output path.

    Args:
        events_path: Path to raw events file.
        subscribers_path: Path to raw subscribers file.
        output_path: Path to write the cleaned, merged dataset.

    Returns:
        Merged and cleaned DataFrame.
    """
    events = load_events(events_path)
    events = clean_events(events)

    subs = load_subscribers(subscribers_path)
    subs = clean_subscribers(subs)

    merge_cols = ["user_id", "plan_type", "signup_date", "monthly_revenue"]
    merged = events.merge(subs[merge_cols], on="user_id", how="inner")

    logger.info(
        f"Merged dataset: {len(merged)} rows, "
        f"{merged['user_id'].nunique()} unique users"
    )

    output_path.parent.mkdir(parents=True, exist_ok=True)
    merged.to_parquet(output_path, index=False)
    logger.info(f"Saved merged dataset to {output_path}")

    return merged


def main() -> None:
    """Entry point for `python -m src.data.make_dataset`."""
    logging.basicConfig(level=logging.INFO)

    project_root = Path(__file__).resolve().parents[2]
    events_path = project_root / "data" / "raw" / "events.parquet"
    subscribers_path = project_root / "data" / "raw" / "subscribers.csv"
    output_path = project_root / "data" / "interim" / "merged_events.parquet"

    make_dataset(events_path, subscribers_path, output_path)


if __name__ == "__main__":
    main()

What Changed --- The hardcoded path is gone. File paths are parameters, not constants. The reference date is not embedded in data loading --- it is a feature engineering concern. Every function has type hints, a docstring, and explicit inputs and outputs. The main() function provides a command-line entry point. Logging replaces print statements.


Step 3: Extract Feature Engineering

The notebook's feature engineering spans 38 cells with three copy-pasted versions of rolling window logic. We extract each feature family into its own module.

# src/features/recency.py

"""Recency features: days since last event of each type."""

import pandas as pd


def compute_recency_features(
    events: pd.DataFrame,
    reference_date: pd.Timestamp,
) -> pd.DataFrame:
    """Compute recency features per user.

    Features computed:
    - days_since_last_event: days since any event
    - days_since_last_video: days since video_start or video_complete
    - days_since_last_search: days since a search event
    - days_since_last_billing: days since a billing_event

    Args:
        events: Cleaned event DataFrame with columns
                ['user_id', 'timestamp', 'event_type'].
        reference_date: The observation cutoff date.

    Returns:
        DataFrame indexed by user_id with recency columns.
    """
    def _days_since(subset: pd.DataFrame, col_name: str) -> pd.DataFrame:
        if subset.empty:
            return pd.DataFrame(columns=["user_id", col_name])
        last = subset.groupby("user_id")["timestamp"].max().reset_index()
        last[col_name] = (reference_date - last["timestamp"]).dt.days
        return last[["user_id", col_name]]

    overall = _days_since(events, "days_since_last_event")
    video = _days_since(
        events[events["event_type"].isin(["video_start", "video_complete"])],
        "days_since_last_video",
    )
    search = _days_since(
        events[events["event_type"] == "search"],
        "days_since_last_search",
    )
    billing = _days_since(
        events[events["event_type"] == "billing_event"],
        "days_since_last_billing",
    )

    result = overall
    for df in [video, search, billing]:
        if not df.empty:
            result = result.merge(df, on="user_id", how="left")

    return result
# src/features/frequency.py

"""Frequency features: event counts and activity patterns."""

import pandas as pd
import numpy as np


def compute_frequency_features(
    events: pd.DataFrame,
    reference_date: pd.Timestamp,
    windows: list[int] | None = None,
) -> pd.DataFrame:
    """Compute frequency features per user over multiple time windows.

    Features computed per window:
    - event_count_{w}d: total events in the last w days
    - active_days_{w}d: distinct days with at least one event
    - engagement_rate_{w}d: active_days / window_days
    - sessions_{w}d: distinct sessions (approximated by distinct hours)

    Args:
        events: Cleaned event DataFrame.
        reference_date: The observation cutoff date.
        windows: List of lookback windows in days. Defaults to [7, 14, 30, 60, 90].

    Returns:
        DataFrame indexed by user_id with frequency columns.
    """
    if windows is None:
        windows = [7, 14, 30, 60, 90]

    all_users = events["user_id"].unique()
    result = pd.DataFrame({"user_id": all_users})

    for w in windows:
        window_start = reference_date - pd.Timedelta(days=w)
        windowed = events[
            (events["timestamp"] >= window_start)
            & (events["timestamp"] < reference_date)
        ]

        if windowed.empty:
            result[f"event_count_{w}d"] = 0
            result[f"active_days_{w}d"] = 0
            result[f"engagement_rate_{w}d"] = 0.0
            result[f"sessions_{w}d"] = 0
            continue

        event_count = (
            windowed.groupby("user_id")
            .size()
            .reset_index(name=f"event_count_{w}d")
        )

        windowed_copy = windowed.copy()
        windowed_copy["date"] = windowed_copy["timestamp"].dt.date
        active_days = (
            windowed_copy.groupby("user_id")["date"]
            .nunique()
            .reset_index(name=f"active_days_{w}d")
        )

        windowed_copy["hour_bucket"] = (
            windowed_copy["timestamp"].dt.floor("h")
        )
        sessions = (
            windowed_copy.groupby("user_id")["hour_bucket"]
            .nunique()
            .reset_index(name=f"sessions_{w}d")
        )

        window_features = event_count.merge(active_days, on="user_id")
        window_features = window_features.merge(sessions, on="user_id")
        window_features[f"engagement_rate_{w}d"] = (
            window_features[f"active_days_{w}d"] / w
        ).clip(upper=1.0)

        result = result.merge(window_features, on="user_id", how="left")
        # Fill NaN for users with no events in this window
        for col in [f"event_count_{w}d", f"active_days_{w}d",
                     f"sessions_{w}d"]:
            result[col] = result[col].fillna(0).astype(int)
        result[f"engagement_rate_{w}d"] = (
            result[f"engagement_rate_{w}d"].fillna(0.0)
        )

    return result
# src/features/build_features.py

"""Orchestrate all feature engineering into a single feature matrix."""

import logging
from pathlib import Path
from typing import Optional

import pandas as pd
import numpy as np

from src.features.recency import compute_recency_features
from src.features.frequency import compute_frequency_features
from src.features.monetary import compute_monetary_features
from src.features.plan_features import compute_plan_features

logger = logging.getLogger(__name__)


def build_feature_matrix(
    events: pd.DataFrame,
    subscribers: pd.DataFrame,
    reference_date: Optional[pd.Timestamp] = None,
) -> tuple[pd.DataFrame, pd.Series]:
    """Build the complete feature matrix for churn prediction.

    Calls each feature module and merges results into a single
    DataFrame aligned with the subscriber table.

    Args:
        events: Cleaned event log.
        subscribers: Cleaned subscriber metadata with 'churned' column.
        reference_date: Observation cutoff. Defaults to max event timestamp.

    Returns:
        Tuple of (X, y) where X is the feature matrix and y is the
        binary churn target.

    Raises:
        ValueError: If the subscribers DataFrame has only one class.
    """
    if reference_date is None:
        reference_date = events["timestamp"].max()

    logger.info(f"Building features with reference date: {reference_date}")

    # Compute each feature family
    recency = compute_recency_features(events, reference_date)
    frequency = compute_frequency_features(events, reference_date)
    monetary = compute_monetary_features(events, reference_date)
    plan = compute_plan_features(subscribers, reference_date)

    # Merge all features onto the subscriber table
    features = subscribers[["user_id", "churned"]].copy()
    for feature_df in [recency, frequency, monetary, plan]:
        features = features.merge(feature_df, on="user_id", how="left")

    # Separate target from features
    y = features["churned"]
    X = features.drop(columns=["user_id", "churned"])

    # Fill remaining NaN with 0 (users with no events in a window)
    X = X.fillna(0)

    logger.info(f"Feature matrix: {X.shape[0]} rows, {X.shape[1]} columns")

    n_classes = y.nunique()
    if n_classes < 2:
        raise ValueError(
            f"Target has only {n_classes} class(es). "
            "Cannot train a classifier with a single class."
        )

    return X, y


def main() -> None:
    """Entry point for `python -m src.features.build_features`."""
    logging.basicConfig(level=logging.INFO)

    project_root = Path(__file__).resolve().parents[2]
    events = pd.read_parquet(
        project_root / "data" / "interim" / "merged_events.parquet"
    )
    subscribers = pd.read_csv(
        project_root / "data" / "raw" / "subscribers.csv"
    )
    subscribers["signup_date"] = pd.to_datetime(subscribers["signup_date"])

    X, y = build_feature_matrix(events, subscribers)

    output_dir = project_root / "data" / "processed"
    output_dir.mkdir(parents=True, exist_ok=True)
    X.to_parquet(output_dir / "features.parquet", index=False)
    y.to_frame().to_parquet(output_dir / "target.parquet", index=False)

    logger.info("Feature matrix and target saved to data/processed/")


if __name__ == "__main__":
    main()

What Changed --- The three copy-pasted rolling window blocks became a single compute_frequency_features function with a configurable windows parameter. The recency, frequency, monetary, and plan features live in separate modules. The build_feature_matrix function orchestrates all of them and is the single entry point for feature engineering in both training and prediction.


Step 4: Extract Model Training

# src/models/train_model.py

"""Train the StreamFlow churn prediction model."""

import logging
import pickle
from pathlib import Path

import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split

from src.config.loader import ModelConfig

logger = logging.getLogger(__name__)


def train_churn_model(
    X: pd.DataFrame,
    y: pd.Series,
    config: ModelConfig | None = None,
    test_size: float = 0.2,
    random_state: int = 42,
) -> tuple[GradientBoostingClassifier, pd.DataFrame, pd.Series, pd.DataFrame, pd.Series]:
    """Train a gradient boosting churn classifier.

    Args:
        X: Feature matrix.
        y: Binary target (0 = retained, 1 = churned).
        config: Model configuration. Loads default if None.
        test_size: Fraction of data for test set.
        random_state: Random seed for reproducibility.

    Returns:
        Tuple of (model, X_train, y_train, X_test, y_test).

    Raises:
        ValueError: If y has fewer than 2 classes.
    """
    if y.nunique() < 2:
        raise ValueError(
            f"Target has only {y.nunique()} class(es). "
            "Cannot train a classifier with a single class."
        )

    if config is None:
        config = ModelConfig.from_yaml(
            Path(__file__).resolve().parents[2] / "config" / "model_config.yaml"
        )

    # Select features specified in config
    feature_cols = [c for c in config.include_features if c in X.columns]
    excluded = set(config.include_features) - set(feature_cols)
    if excluded:
        logger.warning(f"Features in config but not in data: {excluded}")

    X_selected = X[feature_cols] if feature_cols else X

    X_train, X_test, y_train, y_test = train_test_split(
        X_selected, y, test_size=test_size, random_state=random_state, stratify=y
    )

    logger.info(
        f"Train: {len(X_train)} rows ({y_train.mean():.1%} churn rate), "
        f"Test: {len(X_test)} rows ({y_test.mean():.1%} churn rate)"
    )

    model = GradientBoostingClassifier(
        n_estimators=config.n_estimators,
        max_depth=config.max_depth,
        learning_rate=config.learning_rate,
        min_samples_leaf=config.min_samples_leaf,
        subsample=config.subsample,
        random_state=random_state,
    )
    model.fit(X_train, y_train)

    logger.info("Model training complete.")
    return model, X_train, y_train, X_test, y_test


def save_model(model: GradientBoostingClassifier, path: Path) -> None:
    """Serialize a trained model to disk."""
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "wb") as f:
        pickle.dump(model, f)
    logger.info(f"Model saved to {path}")


def main() -> None:
    """Entry point for `python -m src.models.train_model`."""
    logging.basicConfig(level=logging.INFO)

    project_root = Path(__file__).resolve().parents[2]
    X = pd.read_parquet(project_root / "data" / "processed" / "features.parquet")
    y = pd.read_parquet(project_root / "data" / "processed" / "target.parquet")["churned"]

    model, X_train, y_train, X_test, y_test = train_churn_model(X, y)

    save_model(model, project_root / "models" / "churn_model.pkl")

    # Save test set for evaluation
    X_test.to_parquet(project_root / "data" / "processed" / "X_test.parquet")
    y_test.to_frame().to_parquet(project_root / "data" / "processed" / "y_test.parquet")


if __name__ == "__main__":
    main()

Step 5: Write Tests

# tests/conftest.py

import pandas as pd
import numpy as np
import pytest


@pytest.fixture
def sample_events():
    """Small but realistic event DataFrame for testing."""
    np.random.seed(42)
    n = 200
    users = np.random.choice([1, 2, 3, 4, 5], n)
    event_types = np.random.choice(
        ["page_view", "video_start", "video_complete", "search",
         "billing_event", "support_ticket"],
        n, p=[0.35, 0.2, 0.15, 0.15, 0.1, 0.05],
    )
    timestamps = pd.date_range("2024-09-01", periods=n, freq="4h")

    return pd.DataFrame({
        "user_id": users,
        "event_type": event_types,
        "timestamp": timestamps,
        "duration_seconds": np.random.randint(0, 300, n),
        "revenue": np.where(event_types == "billing_event",
                            np.round(np.random.uniform(5, 50, n), 2), 0.0),
    })


@pytest.fixture
def sample_subscribers():
    """Small subscriber table for testing."""
    return pd.DataFrame({
        "user_id": [1, 2, 3, 4, 5],
        "signup_date": pd.to_datetime([
            "2023-01-15", "2023-06-01", "2024-01-01",
            "2024-03-15", "2024-05-20",
        ]),
        "plan_type": ["premium", "basic", "standard", "basic", "premium"],
        "monthly_revenue": [14.99, 9.99, 12.99, 9.99, 14.99],
        "churned": [0, 1, 0, 1, 0],
    })


@pytest.fixture
def reference_date():
    """Standard reference date for testing."""
    return pd.Timestamp("2024-10-31")
# tests/test_features.py

import pandas as pd
import numpy as np
import pytest
from src.features.recency import compute_recency_features
from src.features.frequency import compute_frequency_features


class TestRecencyFeatures:

    def test_basic_recency(self, sample_events, reference_date):
        result = compute_recency_features(sample_events, reference_date)
        assert "days_since_last_event" in result.columns
        assert "user_id" in result.columns
        assert result["days_since_last_event"].ge(0).all()

    def test_empty_events(self, reference_date):
        empty = pd.DataFrame(columns=["user_id", "timestamp", "event_type"])
        result = compute_recency_features(empty, reference_date)
        assert len(result) == 0

    def test_no_video_events(self, reference_date):
        events = pd.DataFrame({
            "user_id": [1, 1],
            "timestamp": pd.to_datetime(["2024-10-25", "2024-10-26"]),
            "event_type": ["page_view", "search"],
        })
        result = compute_recency_features(events, reference_date)
        assert result["days_since_last_event"].iloc[0] == 5


class TestFrequencyFeatures:

    def test_default_windows(self, sample_events, reference_date):
        result = compute_frequency_features(sample_events, reference_date)
        for w in [7, 14, 30, 60, 90]:
            assert f"event_count_{w}d" in result.columns
            assert f"engagement_rate_{w}d" in result.columns

    def test_custom_windows(self, sample_events, reference_date):
        result = compute_frequency_features(
            sample_events, reference_date, windows=[7, 30]
        )
        assert "event_count_7d" in result.columns
        assert "event_count_30d" in result.columns
        assert "event_count_14d" not in result.columns

    def test_engagement_rate_bounded(self, sample_events, reference_date):
        result = compute_frequency_features(sample_events, reference_date)
        for col in result.columns:
            if "engagement_rate" in col:
                assert result[col].between(0, 1).all()
# tests/test_pipeline.py

import pytest
from src.features.build_features import build_feature_matrix


class TestPipeline:

    def test_full_pipeline(self, sample_events, sample_subscribers, reference_date):
        X, y = build_feature_matrix(
            sample_events, sample_subscribers, reference_date
        )
        assert X.shape[0] == y.shape[0]
        assert X.shape[1] > 0
        assert not X.isnull().any().any()
        assert set(y.unique()).issubset({0, 1})

    def test_single_class_raises(self, sample_events, reference_date):
        subs = pd.DataFrame({
            "user_id": [1, 2, 3],
            "signup_date": pd.to_datetime(["2023-01-01"] * 3),
            "plan_type": ["basic"] * 3,
            "monthly_revenue": [9.99] * 3,
            "churned": [0, 0, 0],
        })
        with pytest.raises(ValueError, match="single class"):
            build_feature_matrix(sample_events, subs, reference_date)
# Run the test suite
$ pytest tests/ -v --tb=short

tests/test_features.py::TestRecencyFeatures::test_basic_recency PASSED
tests/test_features.py::TestRecencyFeatures::test_empty_events PASSED
tests/test_features.py::TestRecencyFeatures::test_no_video_events PASSED
tests/test_features.py::TestFrequencyFeatures::test_default_windows PASSED
tests/test_features.py::TestFrequencyFeatures::test_custom_windows PASSED
tests/test_features.py::TestFrequencyFeatures::test_engagement_rate_bounded PASSED
tests/test_pipeline.py::TestPipeline::test_full_pipeline PASSED
tests/test_pipeline.py::TestPipeline::test_single_class_raises PASSED

8 passed in 1.24s

Step 6: Add Code Quality Tooling

# Format everything
$ black src/ tests/
reformatted src/features/monetary.py
reformatted src/data/make_dataset.py
All done! 2 files reformatted, 11 files left unchanged.

# Lint everything
$ ruff check src/ tests/
src/features/monetary.py:7:1: F401 [*] `numpy` imported but unused
src/evaluation/evaluate.py:42:9: F841 Local variable `temp` is assigned to but never used
Found 2 errors.
[*] 1 fixable with the `--fix` option.

# Fix what can be auto-fixed
$ ruff check --fix src/ tests/
Found 2 errors (1 fixed, 1 remaining).

# Type check
$ mypy src/ --ignore-missing-imports
src/models/train_model.py:67: note: Revealed type is "builtins.float"
Success: no issues found in 12 source files

Step 7: The Makefile

.PHONY: data features train evaluate test lint format all clean

data:
    python -m src.data.make_dataset

features:
    python -m src.features.build_features

train:
    python -m src.models.train_model

evaluate:
    python -m src.evaluation.evaluate

test:
    pytest tests/ -v --tb=short

lint:
    ruff check src/ tests/
    mypy src/ --ignore-missing-imports

format:
    black src/ tests/
    ruff check --fix src/ tests/

all: data features train evaluate

clean:
    rm -rf data/interim/* data/processed/* models/*.pkl
# The acid test: reproduce everything from scratch
$ make clean && make all && make test

Results

Metric Before (notebook) After (package)
Files 1 notebook (147 cells) 15 Python modules + 4 test files
Lines of code ~1,800 (estimated, mixed with outputs) ~1,200 (pure code)
Test count 0 8 (+ parametrized variants)
Time to onboard new developer ~2 weeks ~2 hours
Reproducibility "Run cells in this order" make clean && make all
Model AUC 0.847 0.847 (identical --- refactoring does not change behavior)

The model did not improve. The AUC did not change. The stakeholders cannot tell the difference. But the engineering team can now deploy the model. The new hire can onboard in a day. The feature engineering can be tested, versioned, and reused. And nobody will ever need to open churn_model_ACTUALLY_FINAL_v3.ipynb again.


Case Study 1 accompanies Chapter 29: Software Engineering for Data Scientists. See case-study-02.md for the technical debt crisis case study.