11 min read

Operationalizing Machine Learning and Large Language Models for Production

Chapter 34: MLOps and LLMOps

Operationalizing Machine Learning and Large Language Models for Production

"Everyone wants to build ML models, but no one wants to operate them." --- Attributed to countless ML platform engineers

34.1 Introduction: The Gap Between Model and Product

Building a machine learning model that achieves impressive metrics on a held-out test set is, paradoxically, the easiest part of delivering ML-powered products. The hard part---the part that consumes 80--90% of engineering effort in mature organizations---is everything that happens around the model: data pipelines, experiment tracking, reproducibility, deployment, monitoring, retraining, and governance.

MLOps (Machine Learning Operations) is the discipline that bridges this gap. It borrows principles from DevOps---continuous integration, continuous delivery, infrastructure as code, monitoring---and adapts them for the unique challenges of machine learning systems. With the rise of large language models, a specialized subset has emerged: LLMOps, which addresses the distinct operational challenges of deploying, evaluating, and governing LLM-powered applications.

This chapter provides a comprehensive, hands-on guide to both MLOps and LLMOps. We will cover the full ML lifecycle, from experiment tracking and data versioning to production monitoring and cost management. By the end, you will have the knowledge and tools to take any model from a Jupyter notebook to a production-grade, observable, continuously improving system.

Why MLOps Matters

Consider what happens without MLOps:

  • Reproducibility crisis: "It worked on my machine" becomes "it worked in my notebook three months ago, but I don't remember which dataset version I used."
  • Deployment friction: Models sit in notebooks for weeks or months before reaching production.
  • Silent failures: Models degrade in production due to data drift, and nobody notices until a customer complains.
  • Compliance risks: Auditors ask which model version generated a prediction, and nobody can answer.
  • Wasted compute: Teams retrain models unnecessarily because they cannot track what has already been tried.

MLOps solves these problems systematically. Let us begin with the foundational concept: the ML lifecycle.


34.2 The ML Lifecycle

The ML lifecycle is not a linear pipeline but a continuous loop. Understanding its phases is essential for designing robust operational processes.

34.2.1 Phases of the ML Lifecycle

┌─────────────────────────────────────────────────────────────────┐
│                        ML LIFECYCLE                             │
│                                                                 │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐ │
│   │  Problem  │───>│   Data   │───>│  Model   │───>│  Model   │ │
│   │ Framing   │    │ Pipeline │    │ Training │    │ Evaluation│ │
│   └──────────┘    └──────────┘    └──────────┘    └──────────┘ │
│        ^                                               │        │
│        │                                               v        │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐ │
│   │ Retrain  │<───│ Monitor  │<───│  Serve   │<───│  Deploy  │ │
│   │ / Update │    │ & Alert  │    │  Model   │    │  Model   │ │
│   └──────────┘    └──────────┘    └──────────┘    └──────────┘ │
└─────────────────────────────────────────────────────────────────┘

Phase 1: Problem Framing. Define the business problem, success metrics, and constraints. This phase determines everything downstream---the data you collect, the models you consider, and the serving requirements.

Phase 2: Data Pipeline. Ingest, clean, validate, transform, and version your data. Data quality is the single largest determinant of model quality.

Phase 3: Model Training. Experiment with architectures, hyperparameters, and training strategies. Track every experiment rigorously.

Phase 4: Model Evaluation. Evaluate models against business metrics, fairness criteria, and robustness requirements. Go beyond simple accuracy.

Phase 5: Model Deployment. Package the model, deploy it to the serving infrastructure, and route traffic.

Phase 6: Model Serving. Serve predictions with low latency, high throughput, and proper error handling.

Phase 7: Monitoring and Alerting. Continuously monitor model performance, data quality, and system health.

Phase 8: Retraining. When performance degrades or new data becomes available, retrain and redeploy.

34.2.2 MLOps Maturity Levels

Google's MLOps maturity model provides a useful framework:

Level Description Characteristics
Level 0 Manual Jupyter notebooks, manual deployment, no monitoring
Level 1 ML Pipeline Automation Automated training pipelines, experiment tracking, basic monitoring
Level 2 CI/CD for ML Automated testing, continuous training, A/B testing, full observability

Most organizations are at Level 0. The goal of this chapter is to move you toward Level 2.


34.3 Experiment Tracking with Weights & Biases

Experiment tracking is the foundation of reproducible ML. Without it, you cannot answer basic questions: "Which hyperparameters produced the best model?" "What data was used?" "Can we reproduce this result?"

34.3.1 Why Experiment Tracking

Every training run produces a rich set of artifacts:

  • Hyperparameters: Learning rate, batch size, architecture choices
  • Metrics: Loss, accuracy, F1, custom business metrics
  • Artifacts: Model weights, predictions, confusion matrices
  • Environment: Python version, library versions, hardware
  • Data: Which dataset version, preprocessing steps, splits

Manual tracking (spreadsheets, notebooks) breaks down quickly. Dedicated tools like Weights & Biases (W&B), MLflow, and Neptune provide structured, searchable, and shareable experiment records.

34.3.2 Weights & Biases Integration

W&B has become the de facto standard for experiment tracking in deep learning. Here is how to integrate it into a PyTorch training loop:

"""Experiment tracking with Weights & Biases.

Demonstrates logging metrics, hyperparameters, and artifacts
during a PyTorch training loop.
"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import wandb

torch.manual_seed(42)


def create_synthetic_data(
    n_samples: int = 1000,
    n_features: int = 20,
    n_classes: int = 5,
) -> tuple[TensorDataset, TensorDataset]:
    """Create synthetic classification data for demonstration.

    Args:
        n_samples: Number of training samples.
        n_features: Number of input features.
        n_classes: Number of output classes.

    Returns:
        Tuple of (train_dataset, val_dataset).
    """
    X = torch.randn(n_samples, n_features)
    y = torch.randint(0, n_classes, (n_samples,))

    split = int(0.8 * n_samples)
    train_ds = TensorDataset(X[:split], y[:split])
    val_ds = TensorDataset(X[split:], y[split:])
    return train_ds, val_ds


class SimpleClassifier(nn.Module):
    """A simple feedforward classifier for demonstration."""

    def __init__(self, n_features: int, n_hidden: int, n_classes: int) -> None:
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_features, n_hidden),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(n_hidden, n_hidden),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(n_hidden, n_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass."""
        return self.net(x)


def train_with_wandb(config: dict) -> None:
    """Train a model with W&B experiment tracking.

    Args:
        config: Dictionary of hyperparameters.
    """
    # Initialize W&B run
    run = wandb.init(
        project="mlops-textbook",
        config=config,
        tags=["chapter-34", "experiment-tracking"],
    )

    # Create data
    train_ds, val_ds = create_synthetic_data(
        n_features=config["n_features"],
        n_classes=config["n_classes"],
    )
    train_loader = DataLoader(train_ds, batch_size=config["batch_size"], shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=config["batch_size"])

    # Create model
    model = SimpleClassifier(
        n_features=config["n_features"],
        n_hidden=config["n_hidden"],
        n_classes=config["n_classes"],
    )

    # Log model architecture
    wandb.watch(model, log="all", log_freq=100)

    optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
    criterion = nn.CrossEntropyLoss()

    best_val_loss = float("inf")

    for epoch in range(config["epochs"]):
        # Training
        model.train()
        train_loss = 0.0
        correct = 0
        total = 0

        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            logits = model(X_batch)
            loss = criterion(logits, y_batch)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * X_batch.size(0)
            correct += (logits.argmax(dim=1) == y_batch).sum().item()
            total += y_batch.size(0)

        train_loss /= total
        train_acc = correct / total

        # Validation
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                logits = model(X_batch)
                loss = criterion(logits, y_batch)
                val_loss += loss.item() * X_batch.size(0)
                val_correct += (logits.argmax(dim=1) == y_batch).sum().item()
                val_total += y_batch.size(0)

        val_loss /= val_total
        val_acc = val_correct / val_total

        # Log metrics to W&B
        wandb.log({
            "epoch": epoch,
            "train/loss": train_loss,
            "train/accuracy": train_acc,
            "val/loss": val_loss,
            "val/accuracy": val_acc,
            "learning_rate": optimizer.param_groups[0]["lr"],
        })

        # Save best model as artifact
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), "best_model.pt")
            artifact = wandb.Artifact(
                name="best-model",
                type="model",
                description=f"Best model at epoch {epoch}",
                metadata={"val_loss": val_loss, "val_acc": val_acc},
            )
            artifact.add_file("best_model.pt")
            run.log_artifact(artifact)

    wandb.finish()


if __name__ == "__main__":
    config = {
        "n_features": 20,
        "n_classes": 5,
        "n_hidden": 128,
        "batch_size": 32,
        "learning_rate": 1e-3,
        "epochs": 50,
    }
    train_with_wandb(config)

34.3.3 Hyperparameter Sweeps

W&B Sweeps automate hyperparameter search:

"""W&B Sweep configuration for hyperparameter optimization."""

sweep_config = {
    "method": "bayes",  # Bayesian optimization
    "metric": {"name": "val/loss", "goal": "minimize"},
    "parameters": {
        "learning_rate": {
            "distribution": "log_uniform_values",
            "min": 1e-5,
            "max": 1e-2,
        },
        "n_hidden": {"values": [64, 128, 256, 512]},
        "batch_size": {"values": [16, 32, 64]},
        "epochs": {"value": 50},
        "n_features": {"value": 20},
        "n_classes": {"value": 5},
    },
}

# Launch sweep
# sweep_id = wandb.sweep(sweep_config, project="mlops-textbook")
# wandb.agent(sweep_id, function=train_with_wandb, count=20)

34.3.4 Comparing Experiment Tracking Tools

Feature W&B MLflow Neptune CometML
Hosting Cloud + Self-hosted Self-hosted + Cloud Cloud Cloud
UI Quality Excellent Good Excellent Good
Artifact Tracking Yes Yes Yes Yes
Hyperparameter Sweeps Built-in Limited Limited Built-in
Team Collaboration Excellent Good Good Good
Pricing Free tier + Paid Open source Paid Free tier + Paid
LLM Support W&B Prompts MLflow AI Gateway Yes Yes

34.4 Model Versioning

Model versioning goes beyond saving model weights. A truly versioned model includes:

  1. Model weights (the serialized parameters)
  2. Model code (the architecture definition)
  3. Training configuration (hyperparameters, optimizer settings)
  4. Data version (which data produced this model)
  5. Environment (dependencies, hardware)
  6. Evaluation results (metrics on standard benchmarks)

34.4.1 Model Registry

A model registry is a centralized store for managing model versions and their lifecycle stages:

"""Model registry implementation using a structured approach.

Demonstrates model versioning, staging, and lifecycle management.
"""

import json
import hashlib
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any

import torch
import torch.nn as nn

torch.manual_seed(42)


class ModelStage(Enum):
    """Model lifecycle stages."""
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"
    ARCHIVED = "archived"


@dataclass
class ModelVersion:
    """Represents a versioned model in the registry.

    Attributes:
        name: Model name.
        version: Semantic version string.
        stage: Current lifecycle stage.
        metrics: Evaluation metrics dictionary.
        parameters: Training hyperparameters.
        data_version: Hash or tag of the training data.
        created_at: Timestamp of creation.
        description: Human-readable description.
        tags: Arbitrary key-value tags.
    """
    name: str
    version: str
    stage: ModelStage = ModelStage.DEVELOPMENT
    metrics: dict[str, float] = field(default_factory=dict)
    parameters: dict[str, Any] = field(default_factory=dict)
    data_version: str = ""
    created_at: str = field(
        default_factory=lambda: datetime.now().isoformat()
    )
    description: str = ""
    tags: dict[str, str] = field(default_factory=dict)


class ModelRegistry:
    """A simple file-based model registry.

    In production, use MLflow Model Registry, W&B Model Registry,
    or a cloud-native solution (SageMaker, Vertex AI).

    Args:
        registry_path: Path to the registry directory.
    """

    def __init__(self, registry_path: str = "./model_registry") -> None:
        self.registry_path = Path(registry_path)
        self.registry_path.mkdir(parents=True, exist_ok=True)
        self.manifest_path = self.registry_path / "manifest.json"
        self.manifest: dict[str, list[dict]] = self._load_manifest()

    def _load_manifest(self) -> dict[str, list[dict]]:
        """Load the registry manifest from disk."""
        if self.manifest_path.exists():
            with open(self.manifest_path, "r") as f:
                return json.load(f)
        return {}

    def _save_manifest(self) -> None:
        """Persist the registry manifest to disk."""
        with open(self.manifest_path, "w") as f:
            json.dump(self.manifest, f, indent=2, default=str)

    def register_model(
        self,
        model: nn.Module,
        model_version: ModelVersion,
    ) -> str:
        """Register a model version in the registry.

        Args:
            model: The PyTorch model to register.
            model_version: Metadata for this version.

        Returns:
            The path where the model was saved.
        """
        model_dir = (
            self.registry_path / model_version.name / model_version.version
        )
        model_dir.mkdir(parents=True, exist_ok=True)

        # Save model weights
        model_path = model_dir / "model.pt"
        torch.save(model.state_dict(), model_path)

        # Compute checksum
        with open(model_path, "rb") as f:
            checksum = hashlib.sha256(f.read()).hexdigest()

        model_version.tags["checksum"] = checksum

        # Save metadata
        metadata = asdict(model_version)
        metadata["stage"] = model_version.stage.value
        with open(model_dir / "metadata.json", "w") as f:
            json.dump(metadata, f, indent=2)

        # Update manifest
        if model_version.name not in self.manifest:
            self.manifest[model_version.name] = []
        self.manifest[model_version.name].append(metadata)
        self._save_manifest()

        print(
            f"Registered {model_version.name} v{model_version.version} "
            f"[{model_version.stage.value}]"
        )
        return str(model_path)

    def promote_model(
        self,
        name: str,
        version: str,
        target_stage: ModelStage,
    ) -> None:
        """Promote a model to a new lifecycle stage.

        Args:
            name: Model name.
            version: Version to promote.
            target_stage: Target stage.
        """
        if name not in self.manifest:
            raise ValueError(f"Model '{name}' not found in registry.")

        for entry in self.manifest[name]:
            if entry["version"] == version:
                old_stage = entry["stage"]
                entry["stage"] = target_stage.value
                self._save_manifest()
                print(
                    f"Promoted {name} v{version}: "
                    f"{old_stage} -> {target_stage.value}"
                )
                return

        raise ValueError(f"Version '{version}' not found for model '{name}'.")

    def get_production_model(self, name: str) -> dict | None:
        """Get the current production model metadata.

        Args:
            name: Model name.

        Returns:
            Metadata dict or None if no production model exists.
        """
        if name not in self.manifest:
            return None

        for entry in reversed(self.manifest[name]):
            if entry["stage"] == ModelStage.PRODUCTION.value:
                return entry
        return None

34.5 Data Versioning with DVC

Data is the foundation of every ML model, yet it is often the least well-managed artifact. DVC (Data Version Control) solves this by extending Git to handle large files and datasets.

34.5.1 Why Version Data

Data versioning answers critical questions:

  • "Which dataset did we use to train model v2.3?"
  • "What changed in the data between the last two training runs?"
  • "Can we reproduce the training run from six months ago?"

34.5.2 DVC Basics

DVC works alongside Git. Git tracks code and small metadata files; DVC tracks large data files and models in remote storage (S3, GCS, Azure Blob, etc.).

# Initialize DVC in an existing Git repo
dvc init

# Track a large dataset
dvc add data/training_data.parquet

# This creates data/training_data.parquet.dvc (small metadata file)
# and adds data/training_data.parquet to .gitignore

# Commit the DVC metadata to Git
git add data/training_data.parquet.dvc data/.gitignore
git commit -m "Track training data with DVC"

# Push data to remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc push

34.5.3 DVC Pipelines

DVC pipelines define reproducible ML workflows:

# dvc.yaml - Defines the ML pipeline stages
stages:
  prepare:
    cmd: python src/prepare_data.py
    deps:
      - src/prepare_data.py
      - data/raw/
    outs:
      - data/processed/
    params:
      - prepare.split_ratio
      - prepare.seed

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/processed/
    outs:
      - models/model.pt
    params:
      - train.learning_rate
      - train.batch_size
      - train.epochs
    metrics:
      - metrics/train_metrics.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pt
      - data/processed/test/
    metrics:
      - metrics/eval_metrics.json:
          cache: false
    plots:
      - metrics/confusion_matrix.csv:
          x: predicted
          y: actual
# params.yaml - Centralized parameters
prepare:
  split_ratio: 0.8
  seed: 42

train:
  learning_rate: 0.001
  batch_size: 32
  epochs: 50

34.5.4 Reproducing Experiments with DVC

# Run the full pipeline
dvc repro

# Compare metrics between Git branches
dvc metrics diff main

# Show pipeline DAG
dvc dag

# Switch to a different dataset version
git checkout v1.0 -- data/training_data.parquet.dvc
dvc checkout

34.6 CI/CD for ML Pipelines

Continuous Integration and Continuous Delivery (CI/CD) for ML extends traditional software CI/CD with ML-specific checks.

34.6.1 What to Test in ML Pipelines

┌──────────────────────────────────────────────────────────────┐
│                   ML CI/CD Pipeline                          │
│                                                              │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐             │
│  │   Code     │  │   Data     │  │   Model    │             │
│  │   Tests    │  │   Tests    │  │   Tests    │             │
│  ├────────────┤  ├────────────┤  ├────────────┤             │
│  │ Unit tests │  │ Schema     │  │ Performance│             │
│  │ Lint/type  │  │ validation │  │ regression │             │
│  │ Integration│  │ Distribution│  │ Fairness   │             │
│  │ Smoke tests│  │ Freshness  │  │ Latency    │             │
│  └────────────┘  └────────────┘  └────────────┘             │
│                                                              │
│  ┌────────────────────────────────────────────────────────┐  │
│  │              Infrastructure Tests                      │  │
│  │  Dockerfile builds | Serving endpoint health           │  │
│  │  Resource limits   | Rollback procedures               │  │
│  └────────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────────┘

34.6.2 Data Validation

Data validation catches data quality issues before they corrupt your models:

"""Data validation framework for ML pipelines.

Implements schema validation, distribution checks, and freshness
monitoring for training data.
"""

from dataclasses import dataclass
from typing import Any

import torch

torch.manual_seed(42)


@dataclass
class ColumnSchema:
    """Schema definition for a single data column.

    Attributes:
        name: Column identifier.
        dtype: Expected data type.
        min_value: Minimum allowed value (numeric columns).
        max_value: Maximum allowed value (numeric columns).
        nullable: Whether null values are permitted.
        allowed_values: Set of allowed categorical values.
    """
    name: str
    dtype: str
    min_value: float | None = None
    max_value: float | None = None
    nullable: bool = False
    allowed_values: set[str] | None = None


@dataclass
class ValidationResult:
    """Result of a data validation check.

    Attributes:
        passed: Whether the validation passed.
        check_name: Name of the validation check.
        details: Human-readable details.
    """
    passed: bool
    check_name: str
    details: str


class DataValidator:
    """Validates data against a defined schema and statistical expectations.

    Args:
        schema: List of column schema definitions.
    """

    def __init__(self, schema: list[ColumnSchema]) -> None:
        self.schema = {col.name: col for col in schema}

    def validate_schema(
        self,
        data: dict[str, torch.Tensor],
    ) -> list[ValidationResult]:
        """Validate data against the defined schema.

        Args:
            data: Dictionary mapping column names to tensor values.

        Returns:
            List of validation results.
        """
        results: list[ValidationResult] = []

        # Check for missing columns
        for col_name in self.schema:
            if col_name not in data:
                results.append(ValidationResult(
                    passed=False,
                    check_name=f"column_exists:{col_name}",
                    details=f"Expected column '{col_name}' not found in data.",
                ))
            else:
                results.append(ValidationResult(
                    passed=True,
                    check_name=f"column_exists:{col_name}",
                    details=f"Column '{col_name}' present.",
                ))

        # Check for unexpected columns
        for col_name in data:
            if col_name not in self.schema:
                results.append(ValidationResult(
                    passed=False,
                    check_name=f"unexpected_column:{col_name}",
                    details=f"Unexpected column '{col_name}' found in data.",
                ))

        # Validate value ranges
        for col_name, col_schema in self.schema.items():
            if col_name not in data:
                continue

            tensor = data[col_name]

            if col_schema.min_value is not None:
                min_val = tensor.min().item()
                passed = min_val >= col_schema.min_value
                results.append(ValidationResult(
                    passed=passed,
                    check_name=f"min_value:{col_name}",
                    details=(
                        f"Min value {min_val:.4f} "
                        f"{'>=':} {col_schema.min_value}"
                        if passed else
                        f"Min value {min_val:.4f} < {col_schema.min_value}"
                    ),
                ))

            if col_schema.max_value is not None:
                max_val = tensor.max().item()
                passed = max_val <= col_schema.max_value
                results.append(ValidationResult(
                    passed=passed,
                    check_name=f"max_value:{col_name}",
                    details=(
                        f"Max value {max_val:.4f} <= {col_schema.max_value}"
                        if passed else
                        f"Max value {max_val:.4f} > {col_schema.max_value}"
                    ),
                ))

            # Check for NaN values
            if not col_schema.nullable:
                has_nan = torch.isnan(tensor).any().item()
                results.append(ValidationResult(
                    passed=not has_nan,
                    check_name=f"no_nan:{col_name}",
                    details=(
                        f"No NaN values in '{col_name}'."
                        if not has_nan else
                        f"NaN values detected in non-nullable column "
                        f"'{col_name}'."
                    ),
                ))

        return results

    def validate_distribution(
        self,
        reference: dict[str, torch.Tensor],
        current: dict[str, torch.Tensor],
        threshold: float = 0.1,
    ) -> list[ValidationResult]:
        """Compare distributions between reference and current data.

        Uses a simple mean/std comparison. In production, consider
        using KL divergence, KS test, or PSI.

        Args:
            reference: Reference (training) data.
            current: Current (serving) data.
            threshold: Maximum allowed relative difference.

        Returns:
            List of validation results.
        """
        results: list[ValidationResult] = []

        for col_name in reference:
            if col_name not in current:
                continue

            ref_mean = reference[col_name].float().mean().item()
            cur_mean = current[col_name].float().mean().item()
            ref_std = reference[col_name].float().std().item()
            cur_std = current[col_name].float().std().item()

            # Relative mean shift
            if abs(ref_mean) > 1e-8:
                mean_shift = abs(cur_mean - ref_mean) / abs(ref_mean)
            else:
                mean_shift = abs(cur_mean - ref_mean)

            results.append(ValidationResult(
                passed=mean_shift <= threshold,
                check_name=f"mean_drift:{col_name}",
                details=(
                    f"Mean shift: {mean_shift:.4f} "
                    f"(ref={ref_mean:.4f}, cur={cur_mean:.4f})"
                ),
            ))

            # Relative std shift
            if abs(ref_std) > 1e-8:
                std_shift = abs(cur_std - ref_std) / abs(ref_std)
            else:
                std_shift = abs(cur_std - ref_std)

            results.append(ValidationResult(
                passed=std_shift <= threshold,
                check_name=f"std_drift:{col_name}",
                details=(
                    f"Std shift: {std_shift:.4f} "
                    f"(ref={ref_std:.4f}, cur={cur_std:.4f})"
                ),
            ))

        return results

34.6.3 GitHub Actions for ML

Here is a production-grade GitHub Actions workflow for an ML pipeline:

# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  code-quality:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - name: Install dependencies
        run: pip install ruff mypy pytest
      - name: Lint
        run: ruff check src/
      - name: Type check
        run: mypy src/ --ignore-missing-imports
      - name: Unit tests
        run: pytest tests/unit/ -v

  data-validation:
    runs-on: ubuntu-latest
    needs: code-quality
    steps:
      - uses: actions/checkout@v4
      - name: Install DVC
        run: pip install dvc[s3]
      - name: Pull data
        run: dvc pull
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
      - name: Validate data
        run: python src/validate_data.py

  train-and-evaluate:
    runs-on: ubuntu-latest
    needs: data-validation
    steps:
      - uses: actions/checkout@v4
      - name: Pull data
        run: |
          pip install dvc[s3]
          dvc pull
      - name: Train model
        run: python src/train.py --config configs/ci.yaml
      - name: Evaluate model
        run: python src/evaluate.py
      - name: Check performance regression
        run: python src/check_regression.py --threshold 0.95
      - name: Upload model artifact
        uses: actions/upload-artifact@v4
        with:
          name: model
          path: models/

  deploy:
    runs-on: ubuntu-latest
    needs: train-and-evaluate
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      - name: Deploy to staging
        run: ./scripts/deploy.sh staging
      - name: Run integration tests
        run: pytest tests/integration/ -v
      - name: Deploy to production
        run: ./scripts/deploy.sh production

34.7 Model Monitoring and Observability

Once a model is in production, monitoring becomes critical. Unlike traditional software, ML models can fail silently---they continue returning predictions, but the predictions gradually become wrong.

34.7.1 What to Monitor

Model Performance Metrics: - Prediction accuracy (when ground truth is available) - Prediction confidence distributions - Prediction latency (p50, p95, p99) - Throughput (requests per second) - Error rates

Data Quality Metrics: - Input feature distributions - Missing value rates - Feature correlations - Input volume

System Metrics: - CPU/GPU utilization - Memory usage - Disk I/O - Network latency

34.7.2 Building a Monitoring System

"""Model monitoring system for production ML.

Tracks predictions, detects drift, and generates alerts.
"""

import time
import json
from collections import deque
from dataclasses import dataclass, field
from typing import Any

import torch

torch.manual_seed(42)


@dataclass
class PredictionLog:
    """A single prediction record.

    Attributes:
        timestamp: Unix timestamp of the prediction.
        input_features: Input feature values.
        prediction: Model output.
        confidence: Prediction confidence score.
        latency_ms: Inference latency in milliseconds.
        model_version: Version of the serving model.
    """
    timestamp: float
    input_features: dict[str, float]
    prediction: Any
    confidence: float
    latency_ms: float
    model_version: str


class ModelMonitor:
    """Monitors model performance and data quality in production.

    Args:
        model_name: Name of the model being monitored.
        window_size: Number of recent predictions to track.
        alert_threshold: Confidence below which to alert.
    """

    def __init__(
        self,
        model_name: str,
        window_size: int = 1000,
        alert_threshold: float = 0.5,
    ) -> None:
        self.model_name = model_name
        self.window_size = window_size
        self.alert_threshold = alert_threshold

        # Rolling window of predictions
        self.predictions: deque[PredictionLog] = deque(maxlen=window_size)

        # Reference statistics (from training data)
        self.reference_stats: dict[str, dict[str, float]] = {}

        # Alert callbacks
        self.alert_handlers: list[callable] = []

    def set_reference_stats(
        self,
        feature_stats: dict[str, dict[str, float]],
    ) -> None:
        """Set reference statistics from training data.

        Args:
            feature_stats: Dict mapping feature names to
                {"mean": float, "std": float, "min": float, "max": float}.
        """
        self.reference_stats = feature_stats

    def log_prediction(self, log: PredictionLog) -> None:
        """Log a prediction and check for anomalies.

        Args:
            log: The prediction record to log.
        """
        self.predictions.append(log)

        # Check for low confidence
        if log.confidence < self.alert_threshold:
            self._alert(
                level="WARNING",
                message=(
                    f"Low confidence prediction: {log.confidence:.4f} "
                    f"(threshold: {self.alert_threshold})"
                ),
            )

        # Check latency
        if log.latency_ms > 1000:
            self._alert(
                level="WARNING",
                message=f"High latency: {log.latency_ms:.1f}ms",
            )

    def compute_drift_score(
        self,
        feature_name: str,
    ) -> float | None:
        """Compute drift score for a specific feature.

        Uses Population Stability Index (PSI) approximation.

        Args:
            feature_name: Name of the feature to check.

        Returns:
            PSI score, or None if insufficient data.
        """
        if feature_name not in self.reference_stats:
            return None

        if len(self.predictions) < 100:
            return None

        # Gather current values
        current_values = torch.tensor([
            p.input_features.get(feature_name, 0.0)
            for p in self.predictions
            if feature_name in p.input_features
        ])

        if len(current_values) == 0:
            return None

        ref = self.reference_stats[feature_name]
        cur_mean = current_values.mean().item()
        cur_std = current_values.std().item()

        # Simplified PSI using normal distribution assumption
        if ref["std"] < 1e-8 or cur_std < 1e-8:
            return float("inf") if abs(cur_mean - ref["mean"]) > 0.01 else 0.0

        psi = (
            (cur_mean - ref["mean"]) ** 2 / (2 * ref["std"] ** 2)
            + 0.5 * ((cur_std / ref["std"]) ** 2
                      + (ref["std"] / cur_std) ** 2 - 2)
        )

        return psi

    def get_summary(self) -> dict[str, Any]:
        """Get a summary of current monitoring state.

        Returns:
            Dictionary with monitoring metrics.
        """
        if not self.predictions:
            return {"status": "no_data"}

        confidences = torch.tensor([p.confidence for p in self.predictions])
        latencies = torch.tensor([p.latency_ms for p in self.predictions])

        summary = {
            "model_name": self.model_name,
            "total_predictions": len(self.predictions),
            "confidence": {
                "mean": confidences.mean().item(),
                "std": confidences.std().item(),
                "p5": confidences.quantile(0.05).item(),
                "p50": confidences.quantile(0.50).item(),
                "p95": confidences.quantile(0.95).item(),
            },
            "latency_ms": {
                "mean": latencies.mean().item(),
                "p50": latencies.quantile(0.50).item(),
                "p95": latencies.quantile(0.95).item(),
                "p99": latencies.quantile(0.99).item(),
            },
            "low_confidence_rate": (
                (confidences < self.alert_threshold).float().mean().item()
            ),
        }

        # Compute drift for each tracked feature
        drift_scores = {}
        for feature_name in self.reference_stats:
            score = self.compute_drift_score(feature_name)
            if score is not None:
                drift_scores[feature_name] = score
        summary["drift_scores"] = drift_scores

        return summary

    def _alert(self, level: str, message: str) -> None:
        """Trigger an alert.

        Args:
            level: Alert severity (INFO, WARNING, CRITICAL).
            message: Alert message.
        """
        alert_data = {
            "level": level,
            "model": self.model_name,
            "message": message,
            "timestamp": time.time(),
        }
        for handler in self.alert_handlers:
            handler(alert_data)

34.8 Drift Detection

Drift is the phenomenon where the statistical properties of data change over time, causing model performance to degrade. Understanding and detecting drift is one of the most important MLOps capabilities.

34.8.1 Types of Drift

Data Drift (Covariate Shift): The distribution of input features changes. For example, a credit scoring model trained on pre-pandemic data sees dramatically different income distributions post-pandemic.

$$P_{\text{train}}(X) \neq P_{\text{prod}}(X)$$

Concept Drift: The relationship between inputs and outputs changes. The features look the same, but the correct predictions are different.

$$P_{\text{train}}(Y|X) \neq P_{\text{prod}}(Y|X)$$

Prediction Drift: The distribution of model outputs changes, even if the input distribution seems stable.

$$P_{\text{train}}(\hat{Y}) \neq P_{\text{prod}}(\hat{Y})$$

34.8.2 Drift Detection Methods

Population Stability Index (PSI):

$$\text{PSI} = \sum_{i=1}^{k} (p_i - q_i) \ln \frac{p_i}{q_i}$$

where $p_i$ is the proportion in bucket $i$ for the reference distribution and $q_i$ is the proportion for the current distribution.

PSI Value Interpretation
< 0.1 No significant drift
0.1 -- 0.2 Moderate drift, investigate
> 0.2 Significant drift, action needed

Kolmogorov-Smirnov (KS) Test:

$$D = \sup_x |F_{\text{ref}}(x) - F_{\text{cur}}(x)|$$

The KS test measures the maximum distance between two cumulative distribution functions.

"""Drift detection implementations.

Provides PSI, KS test, and windowed drift detection.
"""

import torch
from scipy import stats

torch.manual_seed(42)


def compute_psi(
    reference: torch.Tensor,
    current: torch.Tensor,
    n_bins: int = 10,
    eps: float = 1e-4,
) -> float:
    """Compute Population Stability Index between two distributions.

    Args:
        reference: Reference distribution values.
        current: Current distribution values.
        n_bins: Number of histogram bins.
        eps: Small constant to avoid division by zero.

    Returns:
        PSI value (0 = identical distributions).
    """
    # Create bins from reference distribution
    min_val = min(reference.min().item(), current.min().item())
    max_val = max(reference.max().item(), current.max().item())
    bins = torch.linspace(min_val, max_val, n_bins + 1)

    # Compute histograms
    ref_hist = torch.histogram(reference.float(), bins=bins).hist
    cur_hist = torch.histogram(current.float(), bins=bins).hist

    # Normalize to proportions
    ref_proportions = ref_hist / ref_hist.sum() + eps
    cur_proportions = cur_hist / cur_hist.sum() + eps

    # Compute PSI
    psi = ((cur_proportions - ref_proportions)
           * torch.log(cur_proportions / ref_proportions)).sum()

    return psi.item()


def ks_drift_test(
    reference: torch.Tensor,
    current: torch.Tensor,
    significance_level: float = 0.05,
) -> dict[str, float | bool]:
    """Perform Kolmogorov-Smirnov test for drift detection.

    Args:
        reference: Reference distribution values.
        current: Current distribution values.
        significance_level: P-value threshold for drift.

    Returns:
        Dictionary with statistic, p_value, and drift_detected.
    """
    statistic, p_value = stats.ks_2samp(
        reference.numpy(),
        current.numpy(),
    )

    return {
        "statistic": float(statistic),
        "p_value": float(p_value),
        "drift_detected": p_value < significance_level,
    }


class WindowedDriftDetector:
    """Detects drift using sliding windows over streaming data.

    Maintains a reference window and compares incoming data
    against it periodically.

    Args:
        reference_data: Reference distribution (e.g., training data).
        window_size: Size of the comparison window.
        psi_threshold: PSI threshold for drift alert.
    """

    def __init__(
        self,
        reference_data: torch.Tensor,
        window_size: int = 500,
        psi_threshold: float = 0.2,
    ) -> None:
        self.reference_data = reference_data
        self.window_size = window_size
        self.psi_threshold = psi_threshold
        self.current_window: list[float] = []
        self.drift_history: list[dict] = []

    def add_observation(self, value: float) -> dict | None:
        """Add a new observation and check for drift.

        Args:
            value: New observed value.

        Returns:
            Drift report if window is full, None otherwise.
        """
        self.current_window.append(value)

        if len(self.current_window) >= self.window_size:
            current_tensor = torch.tensor(self.current_window)
            psi = compute_psi(self.reference_data, current_tensor)
            ks_result = ks_drift_test(self.reference_data, current_tensor)

            report = {
                "window_size": len(self.current_window),
                "psi": psi,
                "ks_statistic": ks_result["statistic"],
                "ks_p_value": ks_result["p_value"],
                "drift_detected": (
                    psi > self.psi_threshold
                    or ks_result["drift_detected"]
                ),
            }

            self.drift_history.append(report)
            self.current_window = []  # Reset window
            return report

        return None

34.9 A/B Testing for Models

A/B testing is the gold standard for evaluating whether a new model actually improves business outcomes in production.

34.9.1 Why A/B Test Models

Offline metrics (accuracy, F1, BLEU) do not always correlate with business outcomes. A model that is 2% more accurate might generate 10% more revenue---or it might not. A/B testing measures the actual impact.

34.9.2 Traffic Splitting Strategies

"""A/B testing framework for ML models.

Implements traffic splitting, metric collection, and
statistical significance testing.
"""

import hashlib
import time
from dataclasses import dataclass, field
from typing import Any

import torch
from scipy import stats as scipy_stats

torch.manual_seed(42)


@dataclass
class ExperimentConfig:
    """Configuration for an A/B test experiment.

    Attributes:
        name: Experiment identifier.
        control_model: Control model identifier.
        treatment_model: Treatment model identifier.
        traffic_split: Fraction of traffic to treatment (0-1).
        min_samples: Minimum samples before significance test.
        significance_level: P-value threshold.
    """
    name: str
    control_model: str
    treatment_model: str
    traffic_split: float = 0.5
    min_samples: int = 1000
    significance_level: float = 0.05


@dataclass
class ExperimentResult:
    """Results of an A/B test experiment.

    Attributes:
        control_metric: Mean metric for control group.
        treatment_metric: Mean metric for treatment group.
        relative_improvement: Relative improvement (%).
        p_value: Statistical significance.
        is_significant: Whether result is statistically significant.
        control_n: Number of control observations.
        treatment_n: Number of treatment observations.
    """
    control_metric: float
    treatment_metric: float
    relative_improvement: float
    p_value: float
    is_significant: bool
    control_n: int
    treatment_n: int


class ABTestManager:
    """Manages A/B test experiments for ML models.

    Args:
        config: Experiment configuration.
    """

    def __init__(self, config: ExperimentConfig) -> None:
        self.config = config
        self.control_outcomes: list[float] = []
        self.treatment_outcomes: list[float] = []

    def assign_variant(self, user_id: str) -> str:
        """Deterministically assign a user to control or treatment.

        Uses consistent hashing so the same user always gets
        the same variant, even across sessions.

        Args:
            user_id: Unique user identifier.

        Returns:
            'control' or 'treatment'.
        """
        hash_value = hashlib.md5(
            f"{self.config.name}:{user_id}".encode()
        ).hexdigest()
        hash_fraction = int(hash_value[:8], 16) / 0xFFFFFFFF

        if hash_fraction < self.config.traffic_split:
            return "treatment"
        return "control"

    def record_outcome(self, variant: str, metric_value: float) -> None:
        """Record an outcome for a variant.

        Args:
            variant: 'control' or 'treatment'.
            metric_value: The observed metric value.
        """
        if variant == "control":
            self.control_outcomes.append(metric_value)
        elif variant == "treatment":
            self.treatment_outcomes.append(metric_value)
        else:
            raise ValueError(f"Unknown variant: {variant}")

    def analyze(self) -> ExperimentResult | None:
        """Analyze the experiment results.

        Returns:
            ExperimentResult if enough data, None otherwise.
        """
        if (len(self.control_outcomes) < self.config.min_samples
                or len(self.treatment_outcomes) < self.config.min_samples):
            return None

        control_tensor = torch.tensor(self.control_outcomes)
        treatment_tensor = torch.tensor(self.treatment_outcomes)

        control_mean = control_tensor.mean().item()
        treatment_mean = treatment_tensor.mean().item()

        if abs(control_mean) > 1e-8:
            relative_improvement = (
                (treatment_mean - control_mean) / abs(control_mean) * 100
            )
        else:
            relative_improvement = 0.0

        # Welch's t-test
        t_stat, p_value = scipy_stats.ttest_ind(
            self.control_outcomes,
            self.treatment_outcomes,
            equal_var=False,
        )

        return ExperimentResult(
            control_metric=control_mean,
            treatment_metric=treatment_mean,
            relative_improvement=relative_improvement,
            p_value=float(p_value),
            is_significant=p_value < self.config.significance_level,
            control_n=len(self.control_outcomes),
            treatment_n=len(self.treatment_outcomes),
        )

34.9.3 Multi-Armed Bandits

When you want to minimize regret during experimentation, multi-armed bandits adaptively allocate traffic:

class ThompsonSamplingBandit:
    """Thompson Sampling for adaptive model selection.

    Uses Beta distributions to model the probability that each
    model is the best, and samples accordingly.

    Args:
        model_names: List of model identifiers.
    """

    def __init__(self, model_names: list[str]) -> None:
        self.models = model_names
        # Beta distribution parameters (successes, failures)
        self.alpha = {name: 1.0 for name in model_names}
        self.beta_ = {name: 1.0 for name in model_names}

    def select_model(self) -> str:
        """Select a model using Thompson Sampling.

        Returns:
            Name of the selected model.
        """
        samples = {}
        for name in self.models:
            # Sample from Beta posterior
            sample = torch.distributions.Beta(
                torch.tensor(self.alpha[name]),
                torch.tensor(self.beta_[name]),
            ).sample()
            samples[name] = sample.item()

        return max(samples, key=samples.get)

    def update(self, model_name: str, reward: float) -> None:
        """Update beliefs after observing a reward.

        Args:
            model_name: Which model was used.
            reward: Observed reward (0 or 1 for binary).
        """
        self.alpha[model_name] += reward
        self.beta_[model_name] += 1.0 - reward

34.10 LLMOps: Operating Large Language Models

LLMOps extends MLOps with practices specific to large language models. While the foundational principles are the same, LLMs introduce unique operational challenges.

34.10.1 How LLMOps Differs from Traditional MLOps

Aspect Traditional MLOps LLMOps
Training Train from scratch or fine-tune Often use pre-trained API models
Inputs Structured features Unstructured text prompts
Outputs Classes/numbers Free-form text
Versioning Model weights Prompts + configurations
Evaluation Clear metrics Subjective quality, safety
Cost Compute per training run Cost per token, ongoing
Failure modes Wrong prediction Hallucination, toxicity, jailbreak
Latency Low (ms) High (seconds)

34.10.2 Prompt Versioning and Management

Prompts are the "code" of LLM applications. They must be versioned, tested, and managed with the same rigor as software code.

"""Prompt versioning and management system.

Provides structured prompt templates with version control,
A/B testing, and performance tracking.
"""

import json
import hashlib
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any

import torch

torch.manual_seed(42)


@dataclass
class PromptTemplate:
    """A versioned prompt template.

    Attributes:
        name: Template identifier.
        version: Semantic version.
        system_prompt: System/instruction prompt.
        user_template: User message template with {placeholders}.
        model: Target model identifier.
        temperature: Sampling temperature.
        max_tokens: Maximum output tokens.
        metadata: Additional configuration.
    """
    name: str
    version: str
    system_prompt: str
    user_template: str
    model: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 1024
    metadata: dict[str, Any] = field(default_factory=dict)

    @property
    def content_hash(self) -> str:
        """Compute a content hash for change detection."""
        content = f"{self.system_prompt}:{self.user_template}"
        return hashlib.sha256(content.encode()).hexdigest()[:12]

    def render(self, **kwargs: Any) -> dict[str, str]:
        """Render the template with provided variables.

        Args:
            **kwargs: Template variable values.

        Returns:
            Dictionary with 'system' and 'user' messages.

        Raises:
            KeyError: If a required template variable is missing.
        """
        return {
            "system": self.system_prompt,
            "user": self.user_template.format(**kwargs),
        }


class PromptRegistry:
    """Registry for managing prompt templates.

    Args:
        storage_path: Path to store prompt versions.
    """

    def __init__(self, storage_path: str = "./prompt_registry") -> None:
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)
        self.prompts: dict[str, dict[str, PromptTemplate]] = {}

    def register(self, template: PromptTemplate) -> None:
        """Register a new prompt template version.

        Args:
            template: The prompt template to register.
        """
        if template.name not in self.prompts:
            self.prompts[template.name] = {}

        self.prompts[template.name][template.version] = template

        # Persist to disk
        prompt_dir = self.storage_path / template.name
        prompt_dir.mkdir(exist_ok=True)

        metadata = {
            "name": template.name,
            "version": template.version,
            "system_prompt": template.system_prompt,
            "user_template": template.user_template,
            "model": template.model,
            "temperature": template.temperature,
            "max_tokens": template.max_tokens,
            "content_hash": template.content_hash,
            "registered_at": datetime.now().isoformat(),
            "metadata": template.metadata,
        }

        with open(prompt_dir / f"v{template.version}.json", "w") as f:
            json.dump(metadata, f, indent=2)

    def get(
        self,
        name: str,
        version: str | None = None,
    ) -> PromptTemplate | None:
        """Retrieve a prompt template.

        Args:
            name: Template name.
            version: Specific version, or None for latest.

        Returns:
            The prompt template, or None if not found.
        """
        if name not in self.prompts:
            return None

        if version is not None:
            return self.prompts[name].get(version)

        # Return latest version
        versions = sorted(self.prompts[name].keys())
        return self.prompts[name][versions[-1]] if versions else None

    def list_versions(self, name: str) -> list[str]:
        """List all versions of a prompt template.

        Args:
            name: Template name.

        Returns:
            Sorted list of version strings.
        """
        if name not in self.prompts:
            return []
        return sorted(self.prompts[name].keys())

34.10.3 LLM Evaluation Pipelines

Evaluating LLMs is fundamentally different from evaluating traditional ML models. Outputs are free-form text, and quality is often subjective. A comprehensive evaluation pipeline combines automated metrics, model-based judges, and human evaluation.

"""LLM evaluation pipeline.

Implements automated evaluation for LLM outputs using
multiple strategies: exact match, semantic similarity,
and LLM-as-judge.
"""

from dataclasses import dataclass
from typing import Any

import torch
import torch.nn.functional as F

torch.manual_seed(42)


@dataclass
class EvalExample:
    """A single evaluation example.

    Attributes:
        input_text: The input prompt.
        expected_output: Expected/reference output.
        actual_output: Model-generated output.
        metadata: Additional context.
    """
    input_text: str
    expected_output: str
    actual_output: str
    metadata: dict[str, Any] = None


@dataclass
class EvalResult:
    """Evaluation result for a single example.

    Attributes:
        scores: Dictionary of metric scores.
        feedback: Human-readable feedback.
        passed: Whether the example passed all criteria.
    """
    scores: dict[str, float]
    feedback: str
    passed: bool


class LLMEvaluator:
    """Evaluates LLM outputs using multiple strategies.

    Args:
        passing_threshold: Minimum average score to pass.
    """

    def __init__(self, passing_threshold: float = 0.7) -> None:
        self.passing_threshold = passing_threshold
        self.results: list[EvalResult] = []

    def evaluate_exact_match(self, example: EvalExample) -> float:
        """Check for exact string match.

        Args:
            example: The evaluation example.

        Returns:
            1.0 if exact match, 0.0 otherwise.
        """
        return 1.0 if example.actual_output.strip() == example.expected_output.strip() else 0.0

    def evaluate_contains(
        self,
        example: EvalExample,
        required_phrases: list[str],
    ) -> float:
        """Check if output contains required phrases.

        Args:
            example: The evaluation example.
            required_phrases: Phrases that must appear in output.

        Returns:
            Fraction of required phrases found.
        """
        if not required_phrases:
            return 1.0

        found = sum(
            1 for phrase in required_phrases
            if phrase.lower() in example.actual_output.lower()
        )
        return found / len(required_phrases)

    def evaluate_length(
        self,
        example: EvalExample,
        min_words: int = 10,
        max_words: int = 500,
    ) -> float:
        """Check if output length is within bounds.

        Args:
            example: The evaluation example.
            min_words: Minimum word count.
            max_words: Maximum word count.

        Returns:
            1.0 if within bounds, penalty otherwise.
        """
        word_count = len(example.actual_output.split())

        if min_words <= word_count <= max_words:
            return 1.0
        elif word_count < min_words:
            return word_count / min_words
        else:
            return max_words / word_count

    def evaluate_no_hallucination_keywords(
        self,
        example: EvalExample,
        forbidden_phrases: list[str],
    ) -> float:
        """Check that output does not contain forbidden phrases.

        Args:
            example: The evaluation example.
            forbidden_phrases: Phrases that must NOT appear.

        Returns:
            1.0 if no forbidden phrases found, penalized otherwise.
        """
        if not forbidden_phrases:
            return 1.0

        violations = sum(
            1 for phrase in forbidden_phrases
            if phrase.lower() in example.actual_output.lower()
        )
        return max(0.0, 1.0 - violations / len(forbidden_phrases))

    def run_evaluation(
        self,
        examples: list[EvalExample],
        required_phrases_map: dict[int, list[str]] | None = None,
        forbidden_phrases_map: dict[int, list[str]] | None = None,
    ) -> dict[str, Any]:
        """Run full evaluation pipeline on a set of examples.

        Args:
            examples: List of evaluation examples.
            required_phrases_map: Map from example index to required phrases.
            forbidden_phrases_map: Map from example index to forbidden phrases.

        Returns:
            Summary of evaluation results.
        """
        self.results = []

        for i, example in enumerate(examples):
            scores = {}

            scores["exact_match"] = self.evaluate_exact_match(example)
            scores["length"] = self.evaluate_length(example)

            if required_phrases_map and i in required_phrases_map:
                scores["contains"] = self.evaluate_contains(
                    example, required_phrases_map[i]
                )

            if forbidden_phrases_map and i in forbidden_phrases_map:
                scores["no_hallucination"] = (
                    self.evaluate_no_hallucination_keywords(
                        example, forbidden_phrases_map[i]
                    )
                )

            avg_score = sum(scores.values()) / len(scores)
            passed = avg_score >= self.passing_threshold

            result = EvalResult(
                scores=scores,
                feedback=self._generate_feedback(scores),
                passed=passed,
            )
            self.results.append(result)

        # Summary
        pass_rate = sum(1 for r in self.results if r.passed) / len(self.results)
        avg_scores = {}
        for key in self.results[0].scores:
            values = [r.scores.get(key, 0.0) for r in self.results]
            avg_scores[key] = sum(values) / len(values)

        return {
            "pass_rate": pass_rate,
            "total_examples": len(examples),
            "passed": sum(1 for r in self.results if r.passed),
            "failed": sum(1 for r in self.results if not r.passed),
            "average_scores": avg_scores,
        }

    def _generate_feedback(self, scores: dict[str, float]) -> str:
        """Generate human-readable feedback from scores.

        Args:
            scores: Dictionary of metric scores.

        Returns:
            Feedback string.
        """
        feedback_parts = []
        for metric, score in scores.items():
            if score < 0.5:
                feedback_parts.append(f"FAIL {metric}: {score:.2f}")
            elif score < self.passing_threshold:
                feedback_parts.append(f"WARN {metric}: {score:.2f}")
            else:
                feedback_parts.append(f"PASS {metric}: {score:.2f}")
        return " | ".join(feedback_parts)

34.10.4 Guardrails for LLM Applications

Guardrails are runtime safety checks that prevent LLMs from generating harmful, off-topic, or policy-violating content.

"""Guardrails framework for LLM applications.

Provides input validation, output filtering, and safety checks
for production LLM deployments.
"""

import re
from dataclasses import dataclass
from enum import Enum
from typing import Any

import torch

torch.manual_seed(42)


class GuardrailAction(Enum):
    """Action to take when a guardrail is triggered."""
    ALLOW = "allow"
    BLOCK = "block"
    WARN = "warn"
    MODIFY = "modify"


@dataclass
class GuardrailResult:
    """Result of a guardrail check.

    Attributes:
        action: The recommended action.
        rule_name: Which rule was triggered.
        details: Human-readable explanation.
        modified_text: Modified text (if action is MODIFY).
    """
    action: GuardrailAction
    rule_name: str
    details: str
    modified_text: str | None = None


class LLMGuardrails:
    """Guardrails for LLM input and output.

    Checks for PII, prompt injection, toxicity indicators,
    and topic restrictions.
    """

    def __init__(self) -> None:
        # PII patterns
        self.pii_patterns = {
            "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
            "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
        }

        # Prompt injection indicators
        self.injection_patterns = [
            r"ignore\s+(previous|all|above)\s+instructions",
            r"you\s+are\s+now\s+(?:a|an|in)\s+",
            r"disregard\s+(?:your|all|any)\s+",
            r"system\s*prompt",
            r"jailbreak",
        ]

        # Topic restrictions (customize per application)
        self.blocked_topics: list[str] = []

    def check_input(self, text: str) -> list[GuardrailResult]:
        """Check user input against guardrails.

        Args:
            text: User input text.

        Returns:
            List of guardrail results.
        """
        results: list[GuardrailResult] = []

        # Check for prompt injection
        for pattern in self.injection_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                results.append(GuardrailResult(
                    action=GuardrailAction.BLOCK,
                    rule_name="prompt_injection",
                    details=f"Potential prompt injection detected: {pattern}",
                ))

        # Check for PII in input
        for pii_type, pattern in self.pii_patterns.items():
            if re.search(pattern, text):
                results.append(GuardrailResult(
                    action=GuardrailAction.WARN,
                    rule_name=f"input_pii:{pii_type}",
                    details=f"PII detected in input: {pii_type}",
                ))

        # Check input length
        if len(text) > 10000:
            results.append(GuardrailResult(
                action=GuardrailAction.BLOCK,
                rule_name="input_length",
                details=f"Input too long: {len(text)} characters (max 10000)",
            ))

        if not results:
            results.append(GuardrailResult(
                action=GuardrailAction.ALLOW,
                rule_name="input_check",
                details="All input checks passed.",
            ))

        return results

    def check_output(self, text: str) -> list[GuardrailResult]:
        """Check LLM output against guardrails.

        Args:
            text: LLM-generated output text.

        Returns:
            List of guardrail results.
        """
        results: list[GuardrailResult] = []

        # Check for PII leakage in output
        for pii_type, pattern in self.pii_patterns.items():
            matches = re.findall(pattern, text)
            if matches:
                # Redact PII
                redacted = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
                results.append(GuardrailResult(
                    action=GuardrailAction.MODIFY,
                    rule_name=f"output_pii:{pii_type}",
                    details=f"PII found in output: {pii_type} ({len(matches)} instances)",
                    modified_text=redacted,
                ))

        # Check for blocked topics in output
        for topic in self.blocked_topics:
            if topic.lower() in text.lower():
                results.append(GuardrailResult(
                    action=GuardrailAction.BLOCK,
                    rule_name=f"blocked_topic:{topic}",
                    details=f"Output contains blocked topic: {topic}",
                ))

        # Check output length
        if len(text.split()) < 3:
            results.append(GuardrailResult(
                action=GuardrailAction.WARN,
                rule_name="output_too_short",
                details="Output suspiciously short (< 3 words).",
            ))

        if not results:
            results.append(GuardrailResult(
                action=GuardrailAction.ALLOW,
                rule_name="output_check",
                details="All output checks passed.",
            ))

        return results

    def apply_guardrails(
        self,
        input_text: str,
        output_text: str,
    ) -> dict[str, Any]:
        """Apply all guardrails to an input/output pair.

        Args:
            input_text: User input.
            output_text: LLM output.

        Returns:
            Dictionary with guardrail results and final action.
        """
        input_results = self.check_input(input_text)
        output_results = self.check_output(output_text)

        all_results = input_results + output_results

        # Determine final action (most restrictive wins)
        if any(r.action == GuardrailAction.BLOCK for r in all_results):
            final_action = GuardrailAction.BLOCK
            final_text = "I'm sorry, I cannot help with that request."
        elif any(r.action == GuardrailAction.MODIFY for r in all_results):
            final_action = GuardrailAction.MODIFY
            # Apply all modifications
            final_text = output_text
            for r in all_results:
                if r.action == GuardrailAction.MODIFY and r.modified_text:
                    final_text = r.modified_text
        else:
            final_action = GuardrailAction.ALLOW
            final_text = output_text

        return {
            "final_action": final_action.value,
            "final_text": final_text,
            "input_results": [
                {"action": r.action.value, "rule": r.rule_name, "details": r.details}
                for r in input_results
            ],
            "output_results": [
                {"action": r.action.value, "rule": r.rule_name, "details": r.details}
                for r in output_results
            ],
        }

34.11 Infrastructure as Code for ML

Infrastructure as Code (IaC) ensures that ML infrastructure is reproducible, version-controlled, and auditable. The two most common tools are Terraform for cloud infrastructure and Docker for application packaging.

34.11.1 Dockerizing ML Applications

# Dockerfile for ML model serving
FROM python:3.11-slim

# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV MODEL_PATH=/app/models/model.pt

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt

# Copy application code
COPY src/ /app/src/
COPY models/ /app/models/

WORKDIR /app

# Health check
HEALTHCHECK --interval=30s --timeout=5s \
    CMD curl -f http://localhost:8000/health || exit 1

# Run the serving application
EXPOSE 8000
CMD ["uvicorn", "src.serve:app", "--host", "0.0.0.0", "--port", "8000"]

34.11.2 Kubernetes Deployment

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-serving
  labels:
    app: ml-model
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
        - name: model-server
          image: myregistry/ml-model:v1.2.3
          ports:
            - containerPort: 8000
          resources:
            requests:
              memory: "2Gi"
              cpu: "1000m"
              nvidia.com/gpu: "1"
            limits:
              memory: "4Gi"
              cpu: "2000m"
              nvidia.com/gpu: "1"
          env:
            - name: MODEL_VERSION
              value: "v1.2.3"
          readinessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 10
            periodSeconds: 5
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer

34.11.3 Terraform for ML Infrastructure

# main.tf - ML training infrastructure on AWS
provider "aws" {
  region = "us-east-1"
}

# S3 bucket for model artifacts
resource "aws_s3_bucket" "model_artifacts" {
  bucket = "ml-model-artifacts-prod"

  versioning {
    enabled = true
  }

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }

  tags = {
    Environment = "production"
    Team        = "ml-platform"
  }
}

# ECR repository for model serving containers
resource "aws_ecr_repository" "model_serving" {
  name                 = "ml-model-serving"
  image_tag_mutability = "IMMUTABLE"

  image_scanning_configuration {
    scan_on_push = true
  }
}

# SageMaker endpoint for model serving
resource "aws_sagemaker_endpoint_configuration" "model_config" {
  name = "ml-model-config"

  production_variants {
    variant_name           = "primary"
    model_name             = aws_sagemaker_model.model.name
    initial_instance_count = 2
    instance_type          = "ml.g4dn.xlarge"
    initial_variant_weight = 1.0
  }
}

34.12 Cost Management

ML workloads can be extraordinarily expensive. Effective cost management is a critical MLOps capability.

34.12.1 Training Cost Optimization

"""ML cost tracking and optimization utilities.

Provides cost estimation, tracking, and optimization
recommendations for ML workloads.
"""

from dataclasses import dataclass
from typing import Any

import torch

torch.manual_seed(42)


# Approximate GPU costs per hour (on-demand, major cloud providers, 2024)
GPU_COSTS_PER_HOUR = {
    "nvidia-t4": 0.526,
    "nvidia-a10g": 1.006,
    "nvidia-l4": 0.81,
    "nvidia-a100-40gb": 3.67,
    "nvidia-a100-80gb": 5.12,
    "nvidia-h100": 8.28,
}

# Approximate LLM API costs per 1M tokens (2024)
LLM_API_COSTS_PER_1M_TOKENS = {
    "gpt-4-turbo": {"input": 10.0, "output": 30.0},
    "gpt-4o": {"input": 2.50, "output": 10.0},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-sonnet": {"input": 3.0, "output": 15.0},
    "claude-3-haiku": {"input": 0.25, "output": 1.25},
}


@dataclass
class TrainingCostEstimate:
    """Estimated cost for a training run.

    Attributes:
        gpu_type: GPU identifier.
        num_gpus: Number of GPUs.
        estimated_hours: Estimated training time.
        cost_per_hour: Cost per GPU per hour.
        total_cost: Total estimated cost.
        recommendations: Cost optimization suggestions.
    """
    gpu_type: str
    num_gpus: int
    estimated_hours: float
    cost_per_hour: float
    total_cost: float
    recommendations: list[str]


def estimate_training_cost(
    model_params_millions: float,
    dataset_size_gb: float,
    epochs: int,
    gpu_type: str = "nvidia-a100-40gb",
    num_gpus: int = 1,
) -> TrainingCostEstimate:
    """Estimate the cost of a training run.

    Uses rough heuristics based on model size and data volume.
    Actual costs vary significantly based on implementation details.

    Args:
        model_params_millions: Model size in millions of parameters.
        dataset_size_gb: Dataset size in gigabytes.
        epochs: Number of training epochs.
        gpu_type: Type of GPU to use.
        num_gpus: Number of GPUs.

    Returns:
        Cost estimate with recommendations.
    """
    cost_per_hour = GPU_COSTS_PER_HOUR.get(gpu_type, 5.0)

    # Rough estimate: 1M params * 1GB data * 1 epoch ~ 0.1 GPU-hours
    # This is a very rough approximation
    gpu_hours = (
        model_params_millions * dataset_size_gb * epochs * 0.1 / num_gpus
    )

    total_cost = gpu_hours * cost_per_hour * num_gpus

    recommendations = []

    if gpu_type in ("nvidia-a100-80gb", "nvidia-h100"):
        recommendations.append(
            "Consider using spot/preemptible instances for up to 70% savings."
        )

    if num_gpus == 1 and model_params_millions > 500:
        recommendations.append(
            "Consider multi-GPU training with DDP for faster training."
        )

    if epochs > 10:
        recommendations.append(
            "Consider early stopping to avoid wasted compute on "
            "diminishing returns."
        )

    if model_params_millions > 100:
        recommendations.append(
            "Consider mixed-precision training (fp16/bf16) for ~2x speedup."
        )

    return TrainingCostEstimate(
        gpu_type=gpu_type,
        num_gpus=num_gpus,
        estimated_hours=gpu_hours,
        cost_per_hour=cost_per_hour,
        total_cost=total_cost,
        recommendations=recommendations,
    )


def estimate_llm_api_cost(
    model: str,
    input_tokens: int,
    output_tokens: int,
    num_requests: int = 1,
) -> dict[str, float]:
    """Estimate the cost of LLM API usage.

    Args:
        model: Model identifier.
        input_tokens: Tokens per input.
        output_tokens: Tokens per output.
        num_requests: Number of requests.

    Returns:
        Dictionary with cost breakdown.
    """
    costs = LLM_API_COSTS_PER_1M_TOKENS.get(model)
    if costs is None:
        return {"error": f"Unknown model: {model}"}

    total_input_tokens = input_tokens * num_requests
    total_output_tokens = output_tokens * num_requests

    input_cost = total_input_tokens / 1_000_000 * costs["input"]
    output_cost = total_output_tokens / 1_000_000 * costs["output"]

    return {
        "model": model,
        "total_input_tokens": total_input_tokens,
        "total_output_tokens": total_output_tokens,
        "input_cost": input_cost,
        "output_cost": output_cost,
        "total_cost": input_cost + output_cost,
        "cost_per_request": (input_cost + output_cost) / num_requests,
    }

34.12.2 Cost Management Best Practices

  1. Use spot/preemptible instances for training workloads (60--90% savings). Implement checkpointing to handle interruptions.

  2. Right-size your GPUs. Do not use an H100 when an A10G suffices. Profile your workload's GPU utilization.

  3. Implement auto-scaling for serving infrastructure. Scale to zero when there is no traffic.

  4. Cache LLM responses. Identical prompts should return cached results.

  5. Monitor cost per prediction. Track this metric alongside model performance.

  6. Set budget alerts. Cloud providers offer billing alerts; use them aggressively.

  7. Use reserved instances for stable, predictable serving workloads.

  8. Optimize model size. Distillation, pruning, and quantization can dramatically reduce serving costs.


34.13 Putting It All Together: An MLOps Architecture

Here is a reference architecture that combines all the concepts in this chapter:

┌───────────────────────────────────────────────────────────────────┐
│                    MLOps Reference Architecture                   │
│                                                                   │
│  ┌─────────┐  ┌──────────┐  ┌──────────┐  ┌───────────────────┐ │
│  │ Feature  │  │ Training │  │  Model   │  │    Serving        │ │
│  │  Store   │──│ Pipeline │──│ Registry │──│   Infrastructure  │ │
│  └─────────┘  └──────────┘  └──────────┘  └───────────────────┘ │
│       │            │             │                  │             │
│       v            v             v                  v             │
│  ┌─────────┐  ┌──────────┐  ┌──────────┐  ┌───────────────────┐ │
│  │  Data   │  │Experiment│  │  CI/CD   │  │   Monitoring &    │ │
│  │  Store  │  │ Tracking │  │ Pipeline │  │   Observability   │ │
│  │  (DVC)  │  │  (W&B)   │  │(GH Actn)│  │  (Prometheus/     │ │
│  │         │  │          │  │          │  │   Grafana)        │ │
│  └─────────┘  └──────────┘  └──────────┘  └───────────────────┘ │
│                                                                   │
│  ┌────────────────────────────────────────────────────────────┐   │
│  │                   Infrastructure Layer                     │   │
│  │   Kubernetes | Docker | Terraform | Cloud Provider         │   │
│  └────────────────────────────────────────────────────────────┘   │
└───────────────────────────────────────────────────────────────────┘

34.13.1 Decision Framework

When designing your MLOps stack, consider:

  • Team size: Small teams should use managed services; large teams may benefit from self-hosted solutions.
  • Model type: Traditional ML, deep learning, and LLM applications have different operational needs.
  • Regulatory environment: Healthcare, finance, and government have strict audit and reproducibility requirements.
  • Scale: The difference between serving 100 predictions/day and 100 million is not just engineering---it is a different architecture.
  • Budget: Start simple. You do not need every tool from day one.

34.14 Summary

MLOps and LLMOps are not optional luxuries---they are essential engineering disciplines for any organization deploying ML in production. In this chapter, we covered:

  1. The ML lifecycle is a continuous loop, not a linear pipeline. Each phase requires operational tooling and processes.

  2. Experiment tracking (with tools like W&B) ensures reproducibility and enables informed decisions about model development.

  3. Data versioning (with DVC) ensures you can always reproduce a training run and trace a model back to its training data.

  4. Model versioning and registries manage the lifecycle of models from development through production.

  5. CI/CD for ML extends software CI/CD with data validation, model testing, and performance regression checks.

  6. Monitoring and observability catch silent model failures before they impact users.

  7. Drift detection identifies when production data diverges from training data, signaling the need for retraining.

  8. A/B testing measures the actual business impact of model changes.

  9. LLMOps adds prompt versioning, evaluation pipelines, and guardrails to the MLOps toolkit.

  10. Infrastructure as Code makes ML infrastructure reproducible and auditable.

  11. Cost management prevents ML workloads from becoming prohibitively expensive.

The key insight is that MLOps is not a one-time setup---it is a continuous practice that evolves with your models, data, and business needs. Start simple, automate what hurts most, and iterate.

In the next chapter, we will explore distributed training and scaling, which is essential for training the large models that modern ML applications demand.