Appendix C: Python ML Ecosystem Reference

This appendix serves as a practitioner's desk reference for the Python libraries used throughout this textbook. For each library, you will find its purpose, key API patterns, integration points with other tools in the stack, and the gotchas that cost experienced engineers hours of debugging. This is not a replacement for official documentation — it is a guide to the patterns that matter most in production data science.


C.1 scikit-learn: Advanced Patterns

Purpose: The backbone of classical machine learning in Python. While introductory courses cover fit/predict, production work demands mastery of pipelines, custom transformers, and model selection infrastructure.

Key API Patterns

Custom Transformers with BaseEstimator and TransformerMixin

Every custom preprocessing step should be a proper transformer so it integrates with Pipeline, GridSearchCV, and cross_val_score:

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class LogTransformer(BaseEstimator, TransformerMixin):
    """Log-transform skewed features with automatic offset for zeros."""

    def __init__(self, offset=1.0):
        self.offset = offset

    def fit(self, X, y=None):
        self.n_features_in_ = X.shape[1]
        return self

    def transform(self, X):
        return np.log(X + self.offset)

    def get_feature_names_out(self, input_features=None):
        if input_features is None:
            input_features = [f"x{i}" for i in range(self.n_features_in_)]
        return [f"log_{f}" for f in input_features]

Column-Specific Pipelines with ColumnTransformer

Production datasets have mixed types. Never manually split and recombine columns:

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

numeric_features = ["age", "income", "credit_score"]
categorical_features = ["state", "product_type"]

preprocessor = ColumnTransformer(
    transformers=[
        ("num", Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]), numeric_features),
        ("cat", Pipeline([
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
        ]), categorical_features),
    ],
    remainder="drop",
)

full_pipeline = Pipeline([
    ("preprocessor", preprocessor),
    ("classifier", LogisticRegression(max_iter=1000)),
])

Nested Cross-Validation for Honest Model Selection

Single-loop cross-validation produces optimistically biased estimates when you also use it for hyperparameter tuning. Nest the loops:

from sklearn.model_selection import cross_val_score, GridSearchCV

inner_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=99)

param_grid = {"classifier__C": [0.01, 0.1, 1.0, 10.0]}

inner_search = GridSearchCV(
    full_pipeline, param_grid, cv=inner_cv, scoring="roc_auc", n_jobs=-1
)

nested_scores = cross_val_score(
    inner_search, X, y, cv=outer_cv, scoring="roc_auc", n_jobs=1
)
print(f"Nested CV AUC: {nested_scores.mean():.3f} ± {nested_scores.std():.3f}")

Common Gotchas

  • Data leakage through preprocessing: Fitting a scaler on the full dataset before cross-validation leaks information. Always put preprocessing inside the pipeline.
  • sparse_output vs. sparse: In scikit-learn >= 1.2, OneHotEncoder uses sparse_output instead of sparse. Using the old parameter name raises a FutureWarning and will eventually break.
  • n_jobs=-1 in nested loops: Setting n_jobs=-1 in both inner and outer loops spawns processes exponentially. Set the outer loop to n_jobs=1 and let the inner loop parallelize.
  • set_output(transform="pandas"): Call this on your pipeline to preserve DataFrame column names through transformations — essential for debugging and for libraries like SHAP that use feature names.

C.2 DoWhy and EconML: Causal Inference

Purpose: DoWhy provides the causal reasoning framework (graph specification, identification, estimation, refutation). EconML provides the machine-learning-based estimators (DML, causal forests, meta-learners). They integrate tightly: DoWhy defines the problem, EconML solves it.

Key API Patterns

End-to-End Causal Workflow with DoWhy

import dowhy
from dowhy import CausalModel

model = CausalModel(
    data=df,
    treatment="marketing_spend",
    outcome="revenue",
    common_causes=["region", "season", "prior_revenue"],
    instruments=["ad_auction_rank"],
    graph=None,  # or provide a DOT/GML string for explicit graph
)

identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)

estimate = model.estimate_effect(
    identified_estimand,
    method_name="backdoor.econml.dml.DML",
    method_params={
        "init_params": {
            "model_y": GradientBoostingRegressor(),
            "model_t": GradientBoostingRegressor(),
            "model_final": LassoCV(),
            "discrete_treatment": False,
        },
        "fit_params": {},
    },
)

refutation = model.refute_estimate(
    identified_estimand,
    estimate,
    method_name="random_common_cause",
    num_simulations=100,
)
print(refutation)

Heterogeneous Treatment Effects with EconML

from econml.dml import CausalForestDML
from sklearn.ensemble import GradientBoostingRegressor, GradientBoostingClassifier

est = CausalForestDML(
    model_y=GradientBoostingRegressor(n_estimators=200),
    model_t=GradientBoostingClassifier(n_estimators=200),
    n_estimators=1000,
    min_samples_leaf=10,
    random_state=42,
)
est.fit(Y=df["outcome"], T=df["treatment"], X=df[effect_modifiers], W=df[confounders])

# Individual treatment effects
ite = est.effect(df[effect_modifiers])

# Confidence intervals
lb, ub = est.effect_interval(df[effect_modifiers], alpha=0.05)

Common Gotchas

  • Graph specification matters: If you omit the graph parameter, DoWhy infers a naive graph from common_causes and instruments. For serious analysis, always provide an explicit causal graph.
  • Refutation is not optional: An estimate without refutation is an anecdote. Always run at least random_common_cause, placebo_treatment_refuter, and data_subset_refuter.
  • EconML's W vs. X: W are confounders (adjusted for but no heterogeneity estimated). X are effect modifiers (the dimensions across which treatment effects vary). Confusing these produces meaningless CATE estimates.
  • Discrete vs. continuous treatment: Many estimators behave differently. CausalForestDML requires you to specify discrete_treatment=True for binary treatments and use a classifier for model_t.

C.3 PyMC and ArviZ: Bayesian Methods

Purpose: PyMC is the probabilistic programming library for specifying and fitting Bayesian models. ArviZ provides diagnostics, visualization, and model comparison. Together they form the Bayesian workflow: specify, sample, diagnose, compare.

Key API Patterns

Hierarchical Model with PyMC

import pymc as pm
import arviz as az

with pm.Model() as hierarchical_model:
    # Hyperpriors
    mu_alpha = pm.Normal("mu_alpha", mu=0, sigma=10)
    sigma_alpha = pm.HalfNormal("sigma_alpha", sigma=5)

    # Group-level parameters
    alpha = pm.Normal("alpha", mu=mu_alpha, sigma=sigma_alpha, shape=n_groups)
    beta = pm.Normal("beta", mu=0, sigma=5)
    sigma = pm.HalfNormal("sigma", sigma=5)

    # Likelihood
    mu = alpha[group_idx] + beta * x
    y_obs = pm.Normal("y_obs", mu=mu, sigma=sigma, observed=y)

    # Sample
    trace = pm.sample(
        draws=2000,
        tune=1000,
        chains=4,
        cores=4,
        target_accept=0.95,
        random_seed=42,
        return_inferencedata=True,
    )

Diagnostics with ArviZ

# Convergence diagnostics
print(az.summary(trace, var_names=["mu_alpha", "sigma_alpha", "beta", "sigma"]))

# R-hat should be < 1.01, ESS should be > 400
az.plot_trace(trace, var_names=["mu_alpha", "sigma_alpha"])
az.plot_rank(trace, var_names=["beta"])  # rank plots catch issues trace plots miss

# Model comparison
az.plot_forest(trace, var_names=["alpha"], combined=True, hdi_prob=0.94)

# LOO-CV for model comparison
loo_result = az.loo(trace, pointwise=True)
print(loo_result)

Common Gotchas

  • return_inferencedata=True: Always set this. The legacy MultiTrace object is deprecated, and ArviZ functions expect InferenceData.
  • Divergences: If you see divergent transitions, increase target_accept (e.g., to 0.99) or reparameterize. Non-centered parameterizations often fix hierarchical model divergences: python alpha_offset = pm.Normal("alpha_offset", mu=0, sigma=1, shape=n_groups) alpha = pm.Deterministic("alpha", mu_alpha + sigma_alpha * alpha_offset)
  • Prior predictive checks: Always run pm.sample_prior_predictive() before fitting. If your priors produce absurd predictions (e.g., negative heights, revenues of 10^15), your model is misspecified.
  • PyMC v5 API changes: pm.Model() context managers are now required. The functional API from PyMC3 is gone. Distributions use dims instead of shape when working with labeled coordinates.

C.4 HuggingFace Transformers

Purpose: The standard library for pretrained transformer models. Covers text classification, generation, question answering, summarization, translation, image classification, and multimodal tasks. The pipeline API handles 90% of inference use cases; the Trainer API handles fine-tuning.

Key API Patterns

Inference with pipeline

from transformers import pipeline

classifier = pipeline(
    "text-classification",
    model="distilbert-base-uncased-finetuned-sst-2-english",
    device=0,  # GPU index, or -1 for CPU
)

results = classifier(
    ["This product exceeded my expectations.", "Terrible quality, broke immediately."],
    batch_size=32,
    truncation=True,
)

Fine-Tuning with Trainer

from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
)
from datasets import load_dataset
import numpy as np
from sklearn.metrics import accuracy_score, f1_score

dataset = load_dataset("imdb")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

def tokenize_fn(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=256)

tokenized = dataset.map(tokenize_fn, batched=True, remove_columns=["text"])

model = AutoModelForSequenceClassification.from_pretrained(
    "distilbert-base-uncased", num_labels=2
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average="weighted"),
    }

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=64,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    fp16=True,
    dataloader_num_workers=4,
    report_to="mlflow",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized["train"],
    eval_dataset=tokenized["test"],
    compute_metrics=compute_metrics,
)
trainer.train()

Common Gotchas

  • Tokenizer/model mismatch: Always load the tokenizer from the same checkpoint as the model. Mismatched tokenizers produce garbage silently.
  • padding and truncation: For training, use padding="max_length" with a fixed max_length. For inference, padding=True (dynamic padding) is faster but requires careful batching.
  • GPU memory with large models: Use model.half() or load_in_8bit=True (via bitsandbytes) for models that do not fit in VRAM at full precision.
  • device_map="auto": For models larger than a single GPU, this distributes layers across GPUs and CPU automatically. Requires accelerate installed.

C.5 Ray: Distributed Computing

Purpose: Ray is a framework for scaling Python workloads across multiple cores or machines. In ML, it is used for distributed hyperparameter tuning (Ray Tune), distributed training (Ray Train), serving (Ray Serve), and large-scale data processing (Ray Data).

Key API Patterns

Distributed Hyperparameter Tuning with Ray Tune

from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch

def train_model(config):
    model = build_model(
        lr=config["lr"],
        hidden_dim=config["hidden_dim"],
        dropout=config["dropout"],
    )
    for epoch in range(50):
        loss, accuracy = train_one_epoch(model, train_loader)
        val_loss, val_acc = evaluate(model, val_loader)
        tune.report({"val_loss": val_loss, "val_accuracy": val_acc})

search_space = {
    "lr": tune.loguniform(1e-5, 1e-2),
    "hidden_dim": tune.choice([64, 128, 256, 512]),
    "dropout": tune.uniform(0.1, 0.5),
}

tuner = tune.Tuner(
    train_model,
    param_space=search_space,
    tune_config=tune.TuneConfig(
        metric="val_loss",
        mode="min",
        scheduler=ASHAScheduler(max_t=50, grace_period=5, reduction_factor=2),
        search_alg=OptunaSearch(),
        num_samples=100,
    ),
    run_config=tune.RunConfig(
        storage_path="/tmp/ray_results",
        name="experiment_001",
    ),
)
results = tuner.fit()
best_result = results.get_best_result(metric="val_loss", mode="min")
print(f"Best config: {best_result.config}")

Common Gotchas

  • Object store memory: Ray serializes objects into a shared-memory object store. Large DataFrames passed to remote functions are copied there. If your object store fills up, Ray crashes silently. Monitor with ray.cluster_resources().
  • Pickling failures: Ray uses cloudpickle to serialize functions. Lambda functions, closures over database connections, and objects with open file handles will fail. Restructure code to avoid these.
  • Head node resources: By default, Ray schedules tasks on the head node too. For production clusters, set num_cpus=0 on the head node so it only coordinates.
  • ASHA scheduler + warmup: ASHA aggressively kills bad runs. If your model has a long warmup phase, increase grace_period so promising runs are not terminated prematurely.

C.6 PySpark: Large-Scale Data Processing

Purpose: PySpark is the Python interface to Apache Spark, used for data processing at scales where pandas cannot fit data in memory. In ML pipelines, it handles feature engineering, joins across billion-row tables, and integration with Delta Lake and cloud data warehouses.

Key API Patterns

Feature Engineering at Scale

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("FeatureEngineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

df = spark.read.parquet("s3a://data-lake/transactions/")

user_window = Window.partitionBy("user_id").orderBy("timestamp")
rolling_7d = user_window.rangeBetween(-7 * 86400, 0)  # 7 days in seconds

features = df.withColumn(
    "txn_count_7d", F.count("*").over(rolling_7d)
).withColumn(
    "avg_amount_7d", F.avg("amount").over(rolling_7d)
).withColumn(
    "txn_rank", F.row_number().over(user_window)
).withColumn(
    "hour_of_day", F.hour("timestamp")
).withColumn(
    "is_weekend", F.dayofweek("timestamp").isin([1, 7]).cast("int")
)

features.write.mode("overwrite").parquet("s3a://feature-store/user_txn_features/")

Common Gotchas

  • Small file problem: Writing partitioned data can produce thousands of tiny files. Use .coalesce(n) before writing, or enable Spark's adaptive query execution (spark.sql.adaptive.enabled=true).
  • Shuffle partitions: The default 200 shuffle partitions is too low for large datasets and too high for small ones. Set it based on data size: roughly 128 MB per partition.
  • toPandas() on large DataFrames: This collects all data to the driver. For large DataFrames, use pandas_api() (the pandas API on Spark) instead.
  • UDF performance: Python UDFs serialize data row by row between the JVM and Python. Use pandas_udf (vectorized UDFs) for 10-100x speedup, or better yet, use built-in Spark functions whenever possible.

C.7 Airflow, Dagster, and Prefect: Orchestration

Purpose: Orchestrators schedule, monitor, and manage the execution order of tasks in ML pipelines. Airflow is the most widely deployed; Dagster offers a modern asset-centric model; Prefect emphasizes developer experience and dynamic workflows.

Key API Patterns

Airflow: TaskFlow API (Airflow 2.x)

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule="0 6 * * *",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["ml-pipeline"],
)
def daily_training_pipeline():

    @task()
    def extract_features() -> str:
        output_path = run_feature_pipeline()
        return output_path

    @task()
    def train_model(feature_path: str) -> str:
        model_uri = train_and_log(feature_path)
        return model_uri

    @task()
    def validate_model(model_uri: str) -> bool:
        metrics = evaluate_model(model_uri)
        return metrics["auc"] > 0.85

    @task()
    def deploy_model(model_uri: str, is_valid: bool):
        if is_valid:
            promote_to_production(model_uri)

    features = extract_features()
    model = train_model(features)
    valid = validate_model(model)
    deploy_model(model, valid)

daily_training_pipeline()

Dagster: Software-Defined Assets

from dagster import asset, AssetExecutionContext, DailyPartitionsDefinition
import pandas as pd

daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")

@asset(partitions_def=daily_partitions)
def raw_transactions(context: AssetExecutionContext) -> pd.DataFrame:
    partition_date = context.partition_key
    return fetch_transactions(date=partition_date)

@asset(partitions_def=daily_partitions)
def transaction_features(context: AssetExecutionContext, raw_transactions: pd.DataFrame) -> pd.DataFrame:
    features = compute_features(raw_transactions)
    context.log.info(f"Computed {len(features)} feature rows")
    return features

@asset()
def trained_model(transaction_features: pd.DataFrame) -> dict:
    model, metrics = train(transaction_features)
    return {"model_path": save_model(model), "metrics": metrics}

Common Gotchas

  • Airflow is not a data processor: Airflow orchestrates tasks — it should not process data itself. Heavy computation should happen in Spark, a database, or a container. The Airflow worker only triggers and monitors.
  • DAG parsing performance: Airflow parses all DAG files every dag_dir_list_interval seconds. Imports at the top level of a DAG file execute during parsing. Never import heavy libraries (PyTorch, TensorFlow) at the top of a DAG file.
  • Dagster I/O managers: By default, Dagster materializes assets using PickleIOManager. For production, configure S3PickleIOManager, BigQueryPandasIOManager, or a custom I/O manager. Otherwise, assets are stored on the local filesystem and lost on restart.
  • Prefect task retries: Prefect tasks are not retried by default. Always set retries=3 and retry_delay_seconds=60 for tasks that call external services.

C.8 Great Expectations: Data Validation

Purpose: Great Expectations validates data at every stage of a pipeline — on ingestion, after transformation, and before model training. It catches data drift, schema changes, and quality issues before they corrupt models.

Key API Patterns

import great_expectations as gx

context = gx.get_context()

data_source = context.data_sources.add_pandas("transactions_source")
data_asset = data_source.add_dataframe_asset(name="daily_transactions")
batch_definition = data_asset.add_batch_definition_whole_dataframe("full_batch")

batch = batch_definition.get_batch(batch_parameters={"dataframe": df})

suite = gx.ExpectationSuite(name="transactions_quality")
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
    column="amount", min_value=0, max_value=1_000_000
))
suite.add_expectation(gx.expectations.ExpectColumnDistinctValuesToBeInSet(
    column="currency", value_set=["USD", "EUR", "GBP", "JPY"]
))
suite.add_expectation(gx.expectations.ExpectTableRowCountToBeBetween(
    min_value=10_000, max_value=10_000_000
))
suite = context.suites.add(suite)

validation_definition = gx.ValidationDefinition(
    name="daily_txn_validation",
    data=batch_definition,
    suite=suite,
)
validation_definition = context.validation_definitions.add(validation_definition)

checkpoint = gx.Checkpoint(
    name="daily_checkpoint",
    validation_definitions=[validation_definition],
)
checkpoint = context.checkpoints.add(checkpoint)
result = checkpoint.run()

if not result.success:
    raise ValueError(f"Data validation failed: {result}")

Common Gotchas

  • GX Cloud vs. GX OSS: The open-source version stores expectations as JSON files. GX Cloud stores them in a managed backend. The API is the same, but deployment differs.
  • Row-level vs. aggregate expectations: Row-level expectations (like ExpectColumnValuesToBeUnique) scan every row and can be slow on large datasets. Aggregate expectations (like ExpectColumnMeanToBeBetween) are fast.
  • Integration with Airflow: Use the GreatExpectationsOperator in Airflow, or simply call GX from within a Python task. The operator provides better logging but adds a dependency.

C.9 Optuna: Hyperparameter Optimization

Purpose: Optuna provides Bayesian hyperparameter optimization with pruning, multi-objective optimization, and distributed search. It replaces grid search and random search with more sample-efficient algorithms.

Key API Patterns

import optuna
from optuna.integration import PyTorchLightningPruningCallback

def objective(trial: optuna.Trial) -> float:
    lr = trial.suggest_float("lr", 1e-5, 1e-2, log=True)
    n_layers = trial.suggest_int("n_layers", 2, 6)
    hidden_dim = trial.suggest_categorical("hidden_dim", [64, 128, 256, 512])
    dropout = trial.suggest_float("dropout", 0.1, 0.5)
    weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-2, log=True)

    model = build_model(n_layers=n_layers, hidden_dim=hidden_dim, dropout=dropout)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)

    for epoch in range(50):
        train_loss = train_one_epoch(model, optimizer, train_loader)
        val_loss = evaluate(model, val_loader)

        trial.report(val_loss, epoch)
        if trial.should_prune():
            raise optuna.TrialPruned()

    return val_loss

study = optuna.create_study(
    direction="minimize",
    study_name="nn_tuning",
    storage="sqlite:///optuna.db",
    load_if_exists=True,
    pruner=optuna.pruners.MedianPruner(n_startup_trials=10, n_warmup_steps=5),
    sampler=optuna.samplers.TPESampler(seed=42),
)
study.optimize(objective, n_trials=200, n_jobs=4, show_progress_bar=True)

print(f"Best val_loss: {study.best_value:.4f}")
print(f"Best params: {study.best_params}")

optuna.visualization.plot_param_importances(study).show()
optuna.visualization.plot_optimization_history(study).show()

Common Gotchas

  • Pruning + warmup: Aggressive pruning kills trials before models converge. Set n_warmup_steps in the pruner to match your model's warmup period.
  • SQLite concurrency: The SQLite backend does not handle concurrent writes well. For parallel studies (n_jobs > 1), use PostgreSQL or MySQL as the storage backend.
  • Search space consistency: If you change the search space between runs with load_if_exists=True, old trials have parameters from the old space. Optuna handles this gracefully, but it can confuse the sampler. Start a new study when the search space changes significantly.

C.10 MLflow: Experiment Tracking and Model Registry

Purpose: MLflow tracks experiments (parameters, metrics, artifacts), manages model versions (staging, production, archived), and serves models via a REST API. It is the connective tissue between experimentation and production.

Key API Patterns

import mlflow
import mlflow.pytorch

mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("churn-prediction-v2")

with mlflow.start_run(run_name="causal_forest_v3") as run:
    mlflow.log_params({
        "n_estimators": 1000,
        "min_samples_leaf": 10,
        "model_type": "CausalForestDML",
    })

    model, metrics = train_and_evaluate()

    mlflow.log_metrics({
        "ate": metrics["ate"],
        "ate_ci_lower": metrics["ci"][0],
        "ate_ci_upper": metrics["ci"][1],
        "refutation_p_value": metrics["refutation_p"],
    })

    mlflow.log_artifact("feature_importance.png")
    mlflow.log_artifact("shap_summary.html")

    mlflow.pytorch.log_model(
        model,
        artifact_path="model",
        registered_model_name="churn-causal-model",
        input_example=sample_input,
        signature=mlflow.models.infer_signature(X_train, predictions),
    )

# Model registry operations
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="churn-causal-model",
    version=3,
    stage="Production",
    archive_existing_versions=True,
)

Common Gotchas

  • Artifact storage vs. tracking server: The tracking server stores metadata (params, metrics). Artifacts (models, plots) go to a separate artifact store (S3, GCS, Azure Blob). Configure both.
  • Model signatures: Always log models with a signature (infer_signature). Without it, the model serving endpoint cannot validate input schemas, and you will get cryptic errors in production.
  • Autologging conflicts: mlflow.autolog() is convenient but can conflict with custom logging. If you log a metric manually and autolog logs the same metric, you get duplicates. Disable autolog for the specific framework if you are logging manually.
  • Nested runs: Use mlflow.start_run(nested=True) for child runs (e.g., individual cross-validation folds). This keeps the experiment view clean.

C.11 FastAPI: Model Serving

Purpose: FastAPI is the standard Python framework for building ML model serving APIs. It provides automatic request validation (via Pydantic), async support, and OpenAPI documentation out of the box.

Key API Patterns

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import mlflow.pyfunc
import numpy as np
from contextlib import asynccontextmanager

model = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global model
    model = mlflow.pyfunc.load_model("models:/churn-model/Production")
    yield
    model = None

app = FastAPI(title="Churn Prediction API", version="2.0", lifespan=lifespan)

class PredictionRequest(BaseModel):
    user_id: str = Field(..., description="Unique user identifier")
    age: int = Field(..., ge=18, le=120)
    tenure_months: int = Field(..., ge=0)
    monthly_spend: float = Field(..., ge=0)
    support_tickets_90d: int = Field(..., ge=0)

class PredictionResponse(BaseModel):
    user_id: str
    churn_probability: float
    risk_tier: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    features = np.array([[
        request.age,
        request.tenure_months,
        request.monthly_spend,
        request.support_tickets_90d,
    ]])

    try:
        prob = model.predict(features)[0]
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}")

    risk_tier = "high" if prob > 0.7 else "medium" if prob > 0.3 else "low"

    return PredictionResponse(
        user_id=request.user_id,
        churn_probability=round(float(prob), 4),
        risk_tier=risk_tier,
    )

@app.get("/health")
async def health():
    return {"status": "healthy", "model_loaded": model is not None}

Common Gotchas

  • Model loading at startup: Load models in the lifespan context manager, not at import time. This ensures the server starts cleanly and handles model loading failures gracefully.
  • Async and CPU-bound inference: FastAPI is async, but model inference is CPU-bound. For CPU-bound prediction, it runs synchronously by default (which is fine for single requests). For high throughput, use a thread pool or process pool executor.
  • Input validation vs. model validation: Pydantic validates input types and ranges. But the model itself may have additional constraints (e.g., certain feature combinations are invalid). Validate both layers.

C.12 Fairlearn: Responsible AI

Purpose: Fairlearn provides tools for assessing and mitigating fairness issues in ML models. It implements fairness metrics (demographic parity, equalized odds) and mitigation algorithms (threshold optimization, exponentiated gradient).

Key API Patterns

from fairlearn.metrics import MetricFrame, selection_rate, false_positive_rate
from fairlearn.postprocessing import ThresholdOptimizer
from sklearn.metrics import accuracy_score, recall_score

# Assessment
metric_frame = MetricFrame(
    metrics={
        "accuracy": accuracy_score,
        "selection_rate": selection_rate,
        "false_positive_rate": false_positive_rate,
        "recall": recall_score,
    },
    y_true=y_test,
    y_pred=y_pred,
    sensitive_features=sensitive_test,
)
print(metric_frame.by_group)
print(f"Demographic parity difference: {metric_frame.difference()['selection_rate']:.3f}")

# Mitigation via threshold optimization
postprocessor = ThresholdOptimizer(
    estimator=base_model,
    constraints="equalized_odds",
    objective="accuracy_score",
    prefit=True,
)
postprocessor.fit(X_train, y_train, sensitive_features=sensitive_train)
y_pred_fair = postprocessor.predict(X_test, sensitive_features=sensitive_test)

Common Gotchas

  • Fairness metrics require sensitive features at prediction time: ThresholdOptimizer.predict() needs sensitive_features. In production, you may not have access to these features due to legal constraints. Plan for this early.
  • Fairness-accuracy tradeoff: Mitigation almost always reduces overall accuracy. Quantify the tradeoff explicitly and document the decision with stakeholders.
  • Intersectional groups: Default analysis looks at one sensitive feature at a time. For intersectional analysis (e.g., race × gender), create a combined feature: df["race_gender"] = df["race"] + "_" + df["gender"].

C.13 Opacus: Differential Privacy

Purpose: Opacus adds differential privacy guarantees to PyTorch training. It clips per-sample gradients and adds calibrated noise, ensuring that no single training example can be reverse-engineered from the trained model.

Key API Patterns

from opacus import PrivacyEngine
from opacus.validators import ModuleValidator
import torch

model = build_model()

# Validate model compatibility (BatchNorm is not supported; use GroupNorm)
errors = ModuleValidator.validate(model, strict=False)
if errors:
    model = ModuleValidator.fix(model)  # replaces BatchNorm with GroupNorm

optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
data_loader = torch.utils.data.DataLoader(dataset, batch_size=256)

privacy_engine = PrivacyEngine()
model, optimizer, data_loader = privacy_engine.make_private_with_epsilon(
    module=model,
    optimizer=optimizer,
    data_loader=data_loader,
    epochs=20,
    target_epsilon=3.0,
    target_delta=1e-5,
    max_grad_norm=1.0,
)

for epoch in range(20):
    for batch in data_loader:
        optimizer.zero_grad()
        loss = criterion(model(batch[0]), batch[1])
        loss.backward()
        optimizer.step()

    epsilon = privacy_engine.get_epsilon(delta=1e-5)
    print(f"Epoch {epoch}: ε = {epsilon:.2f}")

Common Gotchas

  • BatchNorm incompatibility: Opacus cannot clip per-sample gradients through BatchNorm layers because they compute statistics across the batch. Use GroupNorm or LayerNorm instead. ModuleValidator.fix() handles this automatically, but verify that the replacement does not degrade model quality.
  • Privacy budget is cumulative: Each epoch of training consumes privacy budget. The reported epsilon grows monotonically. If you resume training, the new epsilon includes all previous training.
  • Noise vs. utility: Smaller epsilon means more privacy but more noise. For practical applications, epsilon values between 1 and 10 are typical. Epsilon below 1 often degrades model utility too much for real-world use.
  • Batch size matters: Larger batch sizes improve the privacy-utility tradeoff because the noise-to-signal ratio decreases. Use the largest batch size your GPU memory allows.

C.14 Captum: Model Interpretability

Purpose: Captum provides attribution methods for PyTorch models. It answers "which input features drove this prediction?" using techniques from integrated gradients to SHAP to layer conductance.

Key API Patterns

from captum.attr import IntegratedGradients, LayerConductance, visualization

model.eval()

ig = IntegratedGradients(model)

# Attribution for a single input
input_tensor = torch.tensor(sample, dtype=torch.float32).unsqueeze(0)
input_tensor.requires_grad = True
baseline = torch.zeros_like(input_tensor)

attributions, delta = ig.attribute(
    input_tensor,
    baselines=baseline,
    target=1,  # class index
    return_convergence_delta=True,
    n_steps=200,
)

print(f"Convergence delta: {delta.item():.6f}")  # should be close to 0
print(f"Attribution shape: {attributions.shape}")

# Layer-level attribution
lc = LayerConductance(model, model.hidden_layers[2])
layer_attr = lc.attribute(input_tensor, baselines=baseline, target=1)

Common Gotchas

  • Baseline selection: Integrated gradients results depend heavily on the baseline. A zero baseline is standard for images but may be wrong for tabular data (where zero might be a meaningful value). Use the training set mean or a domain-specific "absence" value.
  • Convergence delta: Always check the convergence delta. If it is large (greater than 5% of the prediction difference), increase n_steps or investigate numerical issues.
  • Computational cost: Attribution methods require multiple forward and backward passes (e.g., n_steps=200 means 200 passes for integrated gradients). For large models, this is slow. Use internal_batch_size to control memory usage.

Quick Reference: When to Use What

Task Library Key Entry Point
Classical ML pipelines scikit-learn Pipeline, ColumnTransformer
Causal effect estimation DoWhy + EconML CausalModel, CausalForestDML
Bayesian modeling PyMC + ArviZ pm.Model(), az.summary()
NLP / pretrained models HuggingFace Transformers pipeline(), Trainer
Distributed tuning Ray Tune tune.Tuner
Large-scale data PySpark SparkSession, DataFrame
Pipeline orchestration Airflow / Dagster / Prefect @dag / @asset / @flow
Data validation Great Expectations ExpectationSuite, Checkpoint
Hyperparameter search Optuna study.optimize()
Experiment tracking MLflow mlflow.start_run()
Model serving FastAPI @app.post("/predict")
Fairness assessment Fairlearn MetricFrame, ThresholdOptimizer
Differential privacy Opacus PrivacyEngine
Attribution / interpretability Captum IntegratedGradients

Cross-references: Appendix D covers environment setup for these libraries. Appendix F covers the SQL and infrastructure patterns that feed data into these Python tools. Individual chapters provide deeper worked examples for each library.