Operationalizing Machine Learning and Large Language Models for Production
In This Chapter
- 34.1 Introduction: The Gap Between Model and Product
- 34.2 The ML Lifecycle
- 34.3 Experiment Tracking with Weights & Biases
- 34.4 Model Versioning
- 34.5 Data Versioning with DVC
- 34.6 CI/CD for ML Pipelines
- 34.7 Model Monitoring and Observability
- 34.8 Drift Detection
- 34.9 A/B Testing for Models
- 34.10 LLMOps: Operating Large Language Models
- 34.11 Infrastructure as Code for ML
- 34.12 Cost Management
- 34.13 Putting It All Together: An MLOps Architecture
- 34.14 Summary
Chapter 34: MLOps and LLMOps
Operationalizing Machine Learning and Large Language Models for Production
"Everyone wants to build ML models, but no one wants to operate them." --- Attributed to countless ML platform engineers
34.1 Introduction: The Gap Between Model and Product
Building a machine learning model that achieves impressive metrics on a held-out test set is, paradoxically, the easiest part of delivering ML-powered products. The hard part---the part that consumes 80--90% of engineering effort in mature organizations---is everything that happens around the model: data pipelines, experiment tracking, reproducibility, deployment, monitoring, retraining, and governance.
MLOps (Machine Learning Operations) is the discipline that bridges this gap. It borrows principles from DevOps---continuous integration, continuous delivery, infrastructure as code, monitoring---and adapts them for the unique challenges of machine learning systems. With the rise of large language models, a specialized subset has emerged: LLMOps, which addresses the distinct operational challenges of deploying, evaluating, and governing LLM-powered applications.
This chapter provides a comprehensive, hands-on guide to both MLOps and LLMOps. We will cover the full ML lifecycle, from experiment tracking and data versioning to production monitoring and cost management. By the end, you will have the knowledge and tools to take any model from a Jupyter notebook to a production-grade, observable, continuously improving system.
Why MLOps Matters
Consider what happens without MLOps:
- Reproducibility crisis: "It worked on my machine" becomes "it worked in my notebook three months ago, but I don't remember which dataset version I used."
- Deployment friction: Models sit in notebooks for weeks or months before reaching production.
- Silent failures: Models degrade in production due to data drift, and nobody notices until a customer complains.
- Compliance risks: Auditors ask which model version generated a prediction, and nobody can answer.
- Wasted compute: Teams retrain models unnecessarily because they cannot track what has already been tried.
MLOps solves these problems systematically. Let us begin with the foundational concept: the ML lifecycle.
34.2 The ML Lifecycle
The ML lifecycle is not a linear pipeline but a continuous loop. Understanding its phases is essential for designing robust operational processes.
34.2.1 Phases of the ML Lifecycle
┌─────────────────────────────────────────────────────────────────┐
│ ML LIFECYCLE │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Problem │───>│ Data │───>│ Model │───>│ Model │ │
│ │ Framing │ │ Pipeline │ │ Training │ │ Evaluation│ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ ^ │ │
│ │ v │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Retrain │<───│ Monitor │<───│ Serve │<───│ Deploy │ │
│ │ / Update │ │ & Alert │ │ Model │ │ Model │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
Phase 1: Problem Framing. Define the business problem, success metrics, and constraints. This phase determines everything downstream---the data you collect, the models you consider, and the serving requirements.
Phase 2: Data Pipeline. Ingest, clean, validate, transform, and version your data. Data quality is the single largest determinant of model quality.
Phase 3: Model Training. Experiment with architectures, hyperparameters, and training strategies. Track every experiment rigorously.
Phase 4: Model Evaluation. Evaluate models against business metrics, fairness criteria, and robustness requirements. Go beyond simple accuracy.
Phase 5: Model Deployment. Package the model, deploy it to the serving infrastructure, and route traffic.
Phase 6: Model Serving. Serve predictions with low latency, high throughput, and proper error handling.
Phase 7: Monitoring and Alerting. Continuously monitor model performance, data quality, and system health.
Phase 8: Retraining. When performance degrades or new data becomes available, retrain and redeploy.
34.2.2 MLOps Maturity Levels
Google's MLOps maturity model provides a useful framework:
| Level | Description | Characteristics |
|---|---|---|
| Level 0 | Manual | Jupyter notebooks, manual deployment, no monitoring |
| Level 1 | ML Pipeline Automation | Automated training pipelines, experiment tracking, basic monitoring |
| Level 2 | CI/CD for ML | Automated testing, continuous training, A/B testing, full observability |
Most organizations are at Level 0. The goal of this chapter is to move you toward Level 2.
34.3 Experiment Tracking with Weights & Biases
Experiment tracking is the foundation of reproducible ML. Without it, you cannot answer basic questions: "Which hyperparameters produced the best model?" "What data was used?" "Can we reproduce this result?"
34.3.1 Why Experiment Tracking
Every training run produces a rich set of artifacts:
- Hyperparameters: Learning rate, batch size, architecture choices
- Metrics: Loss, accuracy, F1, custom business metrics
- Artifacts: Model weights, predictions, confusion matrices
- Environment: Python version, library versions, hardware
- Data: Which dataset version, preprocessing steps, splits
Manual tracking (spreadsheets, notebooks) breaks down quickly. Dedicated tools like Weights & Biases (W&B), MLflow, and Neptune provide structured, searchable, and shareable experiment records.
34.3.2 Weights & Biases Integration
W&B has become the de facto standard for experiment tracking in deep learning. Here is how to integrate it into a PyTorch training loop:
"""Experiment tracking with Weights & Biases.
Demonstrates logging metrics, hyperparameters, and artifacts
during a PyTorch training loop.
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import wandb
torch.manual_seed(42)
def create_synthetic_data(
n_samples: int = 1000,
n_features: int = 20,
n_classes: int = 5,
) -> tuple[TensorDataset, TensorDataset]:
"""Create synthetic classification data for demonstration.
Args:
n_samples: Number of training samples.
n_features: Number of input features.
n_classes: Number of output classes.
Returns:
Tuple of (train_dataset, val_dataset).
"""
X = torch.randn(n_samples, n_features)
y = torch.randint(0, n_classes, (n_samples,))
split = int(0.8 * n_samples)
train_ds = TensorDataset(X[:split], y[:split])
val_ds = TensorDataset(X[split:], y[split:])
return train_ds, val_ds
class SimpleClassifier(nn.Module):
"""A simple feedforward classifier for demonstration."""
def __init__(self, n_features: int, n_hidden: int, n_classes: int) -> None:
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_features, n_hidden),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(n_hidden, n_hidden),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(n_hidden, n_classes),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward pass."""
return self.net(x)
def train_with_wandb(config: dict) -> None:
"""Train a model with W&B experiment tracking.
Args:
config: Dictionary of hyperparameters.
"""
# Initialize W&B run
run = wandb.init(
project="mlops-textbook",
config=config,
tags=["chapter-34", "experiment-tracking"],
)
# Create data
train_ds, val_ds = create_synthetic_data(
n_features=config["n_features"],
n_classes=config["n_classes"],
)
train_loader = DataLoader(train_ds, batch_size=config["batch_size"], shuffle=True)
val_loader = DataLoader(val_ds, batch_size=config["batch_size"])
# Create model
model = SimpleClassifier(
n_features=config["n_features"],
n_hidden=config["n_hidden"],
n_classes=config["n_classes"],
)
# Log model architecture
wandb.watch(model, log="all", log_freq=100)
optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
criterion = nn.CrossEntropyLoss()
best_val_loss = float("inf")
for epoch in range(config["epochs"]):
# Training
model.train()
train_loss = 0.0
correct = 0
total = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
logits = model(X_batch)
loss = criterion(logits, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item() * X_batch.size(0)
correct += (logits.argmax(dim=1) == y_batch).sum().item()
total += y_batch.size(0)
train_loss /= total
train_acc = correct / total
# Validation
model.eval()
val_loss = 0.0
val_correct = 0
val_total = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
logits = model(X_batch)
loss = criterion(logits, y_batch)
val_loss += loss.item() * X_batch.size(0)
val_correct += (logits.argmax(dim=1) == y_batch).sum().item()
val_total += y_batch.size(0)
val_loss /= val_total
val_acc = val_correct / val_total
# Log metrics to W&B
wandb.log({
"epoch": epoch,
"train/loss": train_loss,
"train/accuracy": train_acc,
"val/loss": val_loss,
"val/accuracy": val_acc,
"learning_rate": optimizer.param_groups[0]["lr"],
})
# Save best model as artifact
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), "best_model.pt")
artifact = wandb.Artifact(
name="best-model",
type="model",
description=f"Best model at epoch {epoch}",
metadata={"val_loss": val_loss, "val_acc": val_acc},
)
artifact.add_file("best_model.pt")
run.log_artifact(artifact)
wandb.finish()
if __name__ == "__main__":
config = {
"n_features": 20,
"n_classes": 5,
"n_hidden": 128,
"batch_size": 32,
"learning_rate": 1e-3,
"epochs": 50,
}
train_with_wandb(config)
34.3.3 Hyperparameter Sweeps
W&B Sweeps automate hyperparameter search:
"""W&B Sweep configuration for hyperparameter optimization."""
sweep_config = {
"method": "bayes", # Bayesian optimization
"metric": {"name": "val/loss", "goal": "minimize"},
"parameters": {
"learning_rate": {
"distribution": "log_uniform_values",
"min": 1e-5,
"max": 1e-2,
},
"n_hidden": {"values": [64, 128, 256, 512]},
"batch_size": {"values": [16, 32, 64]},
"epochs": {"value": 50},
"n_features": {"value": 20},
"n_classes": {"value": 5},
},
}
# Launch sweep
# sweep_id = wandb.sweep(sweep_config, project="mlops-textbook")
# wandb.agent(sweep_id, function=train_with_wandb, count=20)
34.3.4 Comparing Experiment Tracking Tools
| Feature | W&B | MLflow | Neptune | CometML |
|---|---|---|---|---|
| Hosting | Cloud + Self-hosted | Self-hosted + Cloud | Cloud | Cloud |
| UI Quality | Excellent | Good | Excellent | Good |
| Artifact Tracking | Yes | Yes | Yes | Yes |
| Hyperparameter Sweeps | Built-in | Limited | Limited | Built-in |
| Team Collaboration | Excellent | Good | Good | Good |
| Pricing | Free tier + Paid | Open source | Paid | Free tier + Paid |
| LLM Support | W&B Prompts | MLflow AI Gateway | Yes | Yes |
34.4 Model Versioning
Model versioning goes beyond saving model weights. A truly versioned model includes:
- Model weights (the serialized parameters)
- Model code (the architecture definition)
- Training configuration (hyperparameters, optimizer settings)
- Data version (which data produced this model)
- Environment (dependencies, hardware)
- Evaluation results (metrics on standard benchmarks)
34.4.1 Model Registry
A model registry is a centralized store for managing model versions and their lifecycle stages:
"""Model registry implementation using a structured approach.
Demonstrates model versioning, staging, and lifecycle management.
"""
import json
import hashlib
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any
import torch
import torch.nn as nn
torch.manual_seed(42)
class ModelStage(Enum):
"""Model lifecycle stages."""
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
ARCHIVED = "archived"
@dataclass
class ModelVersion:
"""Represents a versioned model in the registry.
Attributes:
name: Model name.
version: Semantic version string.
stage: Current lifecycle stage.
metrics: Evaluation metrics dictionary.
parameters: Training hyperparameters.
data_version: Hash or tag of the training data.
created_at: Timestamp of creation.
description: Human-readable description.
tags: Arbitrary key-value tags.
"""
name: str
version: str
stage: ModelStage = ModelStage.DEVELOPMENT
metrics: dict[str, float] = field(default_factory=dict)
parameters: dict[str, Any] = field(default_factory=dict)
data_version: str = ""
created_at: str = field(
default_factory=lambda: datetime.now().isoformat()
)
description: str = ""
tags: dict[str, str] = field(default_factory=dict)
class ModelRegistry:
"""A simple file-based model registry.
In production, use MLflow Model Registry, W&B Model Registry,
or a cloud-native solution (SageMaker, Vertex AI).
Args:
registry_path: Path to the registry directory.
"""
def __init__(self, registry_path: str = "./model_registry") -> None:
self.registry_path = Path(registry_path)
self.registry_path.mkdir(parents=True, exist_ok=True)
self.manifest_path = self.registry_path / "manifest.json"
self.manifest: dict[str, list[dict]] = self._load_manifest()
def _load_manifest(self) -> dict[str, list[dict]]:
"""Load the registry manifest from disk."""
if self.manifest_path.exists():
with open(self.manifest_path, "r") as f:
return json.load(f)
return {}
def _save_manifest(self) -> None:
"""Persist the registry manifest to disk."""
with open(self.manifest_path, "w") as f:
json.dump(self.manifest, f, indent=2, default=str)
def register_model(
self,
model: nn.Module,
model_version: ModelVersion,
) -> str:
"""Register a model version in the registry.
Args:
model: The PyTorch model to register.
model_version: Metadata for this version.
Returns:
The path where the model was saved.
"""
model_dir = (
self.registry_path / model_version.name / model_version.version
)
model_dir.mkdir(parents=True, exist_ok=True)
# Save model weights
model_path = model_dir / "model.pt"
torch.save(model.state_dict(), model_path)
# Compute checksum
with open(model_path, "rb") as f:
checksum = hashlib.sha256(f.read()).hexdigest()
model_version.tags["checksum"] = checksum
# Save metadata
metadata = asdict(model_version)
metadata["stage"] = model_version.stage.value
with open(model_dir / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
# Update manifest
if model_version.name not in self.manifest:
self.manifest[model_version.name] = []
self.manifest[model_version.name].append(metadata)
self._save_manifest()
print(
f"Registered {model_version.name} v{model_version.version} "
f"[{model_version.stage.value}]"
)
return str(model_path)
def promote_model(
self,
name: str,
version: str,
target_stage: ModelStage,
) -> None:
"""Promote a model to a new lifecycle stage.
Args:
name: Model name.
version: Version to promote.
target_stage: Target stage.
"""
if name not in self.manifest:
raise ValueError(f"Model '{name}' not found in registry.")
for entry in self.manifest[name]:
if entry["version"] == version:
old_stage = entry["stage"]
entry["stage"] = target_stage.value
self._save_manifest()
print(
f"Promoted {name} v{version}: "
f"{old_stage} -> {target_stage.value}"
)
return
raise ValueError(f"Version '{version}' not found for model '{name}'.")
def get_production_model(self, name: str) -> dict | None:
"""Get the current production model metadata.
Args:
name: Model name.
Returns:
Metadata dict or None if no production model exists.
"""
if name not in self.manifest:
return None
for entry in reversed(self.manifest[name]):
if entry["stage"] == ModelStage.PRODUCTION.value:
return entry
return None
34.5 Data Versioning with DVC
Data is the foundation of every ML model, yet it is often the least well-managed artifact. DVC (Data Version Control) solves this by extending Git to handle large files and datasets.
34.5.1 Why Version Data
Data versioning answers critical questions:
- "Which dataset did we use to train model v2.3?"
- "What changed in the data between the last two training runs?"
- "Can we reproduce the training run from six months ago?"
34.5.2 DVC Basics
DVC works alongside Git. Git tracks code and small metadata files; DVC tracks large data files and models in remote storage (S3, GCS, Azure Blob, etc.).
# Initialize DVC in an existing Git repo
dvc init
# Track a large dataset
dvc add data/training_data.parquet
# This creates data/training_data.parquet.dvc (small metadata file)
# and adds data/training_data.parquet to .gitignore
# Commit the DVC metadata to Git
git add data/training_data.parquet.dvc data/.gitignore
git commit -m "Track training data with DVC"
# Push data to remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc push
34.5.3 DVC Pipelines
DVC pipelines define reproducible ML workflows:
# dvc.yaml - Defines the ML pipeline stages
stages:
prepare:
cmd: python src/prepare_data.py
deps:
- src/prepare_data.py
- data/raw/
outs:
- data/processed/
params:
- prepare.split_ratio
- prepare.seed
train:
cmd: python src/train.py
deps:
- src/train.py
- data/processed/
outs:
- models/model.pt
params:
- train.learning_rate
- train.batch_size
- train.epochs
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.pt
- data/processed/test/
metrics:
- metrics/eval_metrics.json:
cache: false
plots:
- metrics/confusion_matrix.csv:
x: predicted
y: actual
# params.yaml - Centralized parameters
prepare:
split_ratio: 0.8
seed: 42
train:
learning_rate: 0.001
batch_size: 32
epochs: 50
34.5.4 Reproducing Experiments with DVC
# Run the full pipeline
dvc repro
# Compare metrics between Git branches
dvc metrics diff main
# Show pipeline DAG
dvc dag
# Switch to a different dataset version
git checkout v1.0 -- data/training_data.parquet.dvc
dvc checkout
34.6 CI/CD for ML Pipelines
Continuous Integration and Continuous Delivery (CI/CD) for ML extends traditional software CI/CD with ML-specific checks.
34.6.1 What to Test in ML Pipelines
┌──────────────────────────────────────────────────────────────┐
│ ML CI/CD Pipeline │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Code │ │ Data │ │ Model │ │
│ │ Tests │ │ Tests │ │ Tests │ │
│ ├────────────┤ ├────────────┤ ├────────────┤ │
│ │ Unit tests │ │ Schema │ │ Performance│ │
│ │ Lint/type │ │ validation │ │ regression │ │
│ │ Integration│ │ Distribution│ │ Fairness │ │
│ │ Smoke tests│ │ Freshness │ │ Latency │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Infrastructure Tests │ │
│ │ Dockerfile builds | Serving endpoint health │ │
│ │ Resource limits | Rollback procedures │ │
│ └────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
34.6.2 Data Validation
Data validation catches data quality issues before they corrupt your models:
"""Data validation framework for ML pipelines.
Implements schema validation, distribution checks, and freshness
monitoring for training data.
"""
from dataclasses import dataclass
from typing import Any
import torch
torch.manual_seed(42)
@dataclass
class ColumnSchema:
"""Schema definition for a single data column.
Attributes:
name: Column identifier.
dtype: Expected data type.
min_value: Minimum allowed value (numeric columns).
max_value: Maximum allowed value (numeric columns).
nullable: Whether null values are permitted.
allowed_values: Set of allowed categorical values.
"""
name: str
dtype: str
min_value: float | None = None
max_value: float | None = None
nullable: bool = False
allowed_values: set[str] | None = None
@dataclass
class ValidationResult:
"""Result of a data validation check.
Attributes:
passed: Whether the validation passed.
check_name: Name of the validation check.
details: Human-readable details.
"""
passed: bool
check_name: str
details: str
class DataValidator:
"""Validates data against a defined schema and statistical expectations.
Args:
schema: List of column schema definitions.
"""
def __init__(self, schema: list[ColumnSchema]) -> None:
self.schema = {col.name: col for col in schema}
def validate_schema(
self,
data: dict[str, torch.Tensor],
) -> list[ValidationResult]:
"""Validate data against the defined schema.
Args:
data: Dictionary mapping column names to tensor values.
Returns:
List of validation results.
"""
results: list[ValidationResult] = []
# Check for missing columns
for col_name in self.schema:
if col_name not in data:
results.append(ValidationResult(
passed=False,
check_name=f"column_exists:{col_name}",
details=f"Expected column '{col_name}' not found in data.",
))
else:
results.append(ValidationResult(
passed=True,
check_name=f"column_exists:{col_name}",
details=f"Column '{col_name}' present.",
))
# Check for unexpected columns
for col_name in data:
if col_name not in self.schema:
results.append(ValidationResult(
passed=False,
check_name=f"unexpected_column:{col_name}",
details=f"Unexpected column '{col_name}' found in data.",
))
# Validate value ranges
for col_name, col_schema in self.schema.items():
if col_name not in data:
continue
tensor = data[col_name]
if col_schema.min_value is not None:
min_val = tensor.min().item()
passed = min_val >= col_schema.min_value
results.append(ValidationResult(
passed=passed,
check_name=f"min_value:{col_name}",
details=(
f"Min value {min_val:.4f} "
f"{'>=':} {col_schema.min_value}"
if passed else
f"Min value {min_val:.4f} < {col_schema.min_value}"
),
))
if col_schema.max_value is not None:
max_val = tensor.max().item()
passed = max_val <= col_schema.max_value
results.append(ValidationResult(
passed=passed,
check_name=f"max_value:{col_name}",
details=(
f"Max value {max_val:.4f} <= {col_schema.max_value}"
if passed else
f"Max value {max_val:.4f} > {col_schema.max_value}"
),
))
# Check for NaN values
if not col_schema.nullable:
has_nan = torch.isnan(tensor).any().item()
results.append(ValidationResult(
passed=not has_nan,
check_name=f"no_nan:{col_name}",
details=(
f"No NaN values in '{col_name}'."
if not has_nan else
f"NaN values detected in non-nullable column "
f"'{col_name}'."
),
))
return results
def validate_distribution(
self,
reference: dict[str, torch.Tensor],
current: dict[str, torch.Tensor],
threshold: float = 0.1,
) -> list[ValidationResult]:
"""Compare distributions between reference and current data.
Uses a simple mean/std comparison. In production, consider
using KL divergence, KS test, or PSI.
Args:
reference: Reference (training) data.
current: Current (serving) data.
threshold: Maximum allowed relative difference.
Returns:
List of validation results.
"""
results: list[ValidationResult] = []
for col_name in reference:
if col_name not in current:
continue
ref_mean = reference[col_name].float().mean().item()
cur_mean = current[col_name].float().mean().item()
ref_std = reference[col_name].float().std().item()
cur_std = current[col_name].float().std().item()
# Relative mean shift
if abs(ref_mean) > 1e-8:
mean_shift = abs(cur_mean - ref_mean) / abs(ref_mean)
else:
mean_shift = abs(cur_mean - ref_mean)
results.append(ValidationResult(
passed=mean_shift <= threshold,
check_name=f"mean_drift:{col_name}",
details=(
f"Mean shift: {mean_shift:.4f} "
f"(ref={ref_mean:.4f}, cur={cur_mean:.4f})"
),
))
# Relative std shift
if abs(ref_std) > 1e-8:
std_shift = abs(cur_std - ref_std) / abs(ref_std)
else:
std_shift = abs(cur_std - ref_std)
results.append(ValidationResult(
passed=std_shift <= threshold,
check_name=f"std_drift:{col_name}",
details=(
f"Std shift: {std_shift:.4f} "
f"(ref={ref_std:.4f}, cur={cur_std:.4f})"
),
))
return results
34.6.3 GitHub Actions for ML
Here is a production-grade GitHub Actions workflow for an ML pipeline:
# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
code-quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install ruff mypy pytest
- name: Lint
run: ruff check src/
- name: Type check
run: mypy src/ --ignore-missing-imports
- name: Unit tests
run: pytest tests/unit/ -v
data-validation:
runs-on: ubuntu-latest
needs: code-quality
steps:
- uses: actions/checkout@v4
- name: Install DVC
run: pip install dvc[s3]
- name: Pull data
run: dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Validate data
run: python src/validate_data.py
train-and-evaluate:
runs-on: ubuntu-latest
needs: data-validation
steps:
- uses: actions/checkout@v4
- name: Pull data
run: |
pip install dvc[s3]
dvc pull
- name: Train model
run: python src/train.py --config configs/ci.yaml
- name: Evaluate model
run: python src/evaluate.py
- name: Check performance regression
run: python src/check_regression.py --threshold 0.95
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: model
path: models/
deploy:
runs-on: ubuntu-latest
needs: train-and-evaluate
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: ./scripts/deploy.sh staging
- name: Run integration tests
run: pytest tests/integration/ -v
- name: Deploy to production
run: ./scripts/deploy.sh production
34.7 Model Monitoring and Observability
Once a model is in production, monitoring becomes critical. Unlike traditional software, ML models can fail silently---they continue returning predictions, but the predictions gradually become wrong.
34.7.1 What to Monitor
Model Performance Metrics: - Prediction accuracy (when ground truth is available) - Prediction confidence distributions - Prediction latency (p50, p95, p99) - Throughput (requests per second) - Error rates
Data Quality Metrics: - Input feature distributions - Missing value rates - Feature correlations - Input volume
System Metrics: - CPU/GPU utilization - Memory usage - Disk I/O - Network latency
34.7.2 Building a Monitoring System
"""Model monitoring system for production ML.
Tracks predictions, detects drift, and generates alerts.
"""
import time
import json
from collections import deque
from dataclasses import dataclass, field
from typing import Any
import torch
torch.manual_seed(42)
@dataclass
class PredictionLog:
"""A single prediction record.
Attributes:
timestamp: Unix timestamp of the prediction.
input_features: Input feature values.
prediction: Model output.
confidence: Prediction confidence score.
latency_ms: Inference latency in milliseconds.
model_version: Version of the serving model.
"""
timestamp: float
input_features: dict[str, float]
prediction: Any
confidence: float
latency_ms: float
model_version: str
class ModelMonitor:
"""Monitors model performance and data quality in production.
Args:
model_name: Name of the model being monitored.
window_size: Number of recent predictions to track.
alert_threshold: Confidence below which to alert.
"""
def __init__(
self,
model_name: str,
window_size: int = 1000,
alert_threshold: float = 0.5,
) -> None:
self.model_name = model_name
self.window_size = window_size
self.alert_threshold = alert_threshold
# Rolling window of predictions
self.predictions: deque[PredictionLog] = deque(maxlen=window_size)
# Reference statistics (from training data)
self.reference_stats: dict[str, dict[str, float]] = {}
# Alert callbacks
self.alert_handlers: list[callable] = []
def set_reference_stats(
self,
feature_stats: dict[str, dict[str, float]],
) -> None:
"""Set reference statistics from training data.
Args:
feature_stats: Dict mapping feature names to
{"mean": float, "std": float, "min": float, "max": float}.
"""
self.reference_stats = feature_stats
def log_prediction(self, log: PredictionLog) -> None:
"""Log a prediction and check for anomalies.
Args:
log: The prediction record to log.
"""
self.predictions.append(log)
# Check for low confidence
if log.confidence < self.alert_threshold:
self._alert(
level="WARNING",
message=(
f"Low confidence prediction: {log.confidence:.4f} "
f"(threshold: {self.alert_threshold})"
),
)
# Check latency
if log.latency_ms > 1000:
self._alert(
level="WARNING",
message=f"High latency: {log.latency_ms:.1f}ms",
)
def compute_drift_score(
self,
feature_name: str,
) -> float | None:
"""Compute drift score for a specific feature.
Uses Population Stability Index (PSI) approximation.
Args:
feature_name: Name of the feature to check.
Returns:
PSI score, or None if insufficient data.
"""
if feature_name not in self.reference_stats:
return None
if len(self.predictions) < 100:
return None
# Gather current values
current_values = torch.tensor([
p.input_features.get(feature_name, 0.0)
for p in self.predictions
if feature_name in p.input_features
])
if len(current_values) == 0:
return None
ref = self.reference_stats[feature_name]
cur_mean = current_values.mean().item()
cur_std = current_values.std().item()
# Simplified PSI using normal distribution assumption
if ref["std"] < 1e-8 or cur_std < 1e-8:
return float("inf") if abs(cur_mean - ref["mean"]) > 0.01 else 0.0
psi = (
(cur_mean - ref["mean"]) ** 2 / (2 * ref["std"] ** 2)
+ 0.5 * ((cur_std / ref["std"]) ** 2
+ (ref["std"] / cur_std) ** 2 - 2)
)
return psi
def get_summary(self) -> dict[str, Any]:
"""Get a summary of current monitoring state.
Returns:
Dictionary with monitoring metrics.
"""
if not self.predictions:
return {"status": "no_data"}
confidences = torch.tensor([p.confidence for p in self.predictions])
latencies = torch.tensor([p.latency_ms for p in self.predictions])
summary = {
"model_name": self.model_name,
"total_predictions": len(self.predictions),
"confidence": {
"mean": confidences.mean().item(),
"std": confidences.std().item(),
"p5": confidences.quantile(0.05).item(),
"p50": confidences.quantile(0.50).item(),
"p95": confidences.quantile(0.95).item(),
},
"latency_ms": {
"mean": latencies.mean().item(),
"p50": latencies.quantile(0.50).item(),
"p95": latencies.quantile(0.95).item(),
"p99": latencies.quantile(0.99).item(),
},
"low_confidence_rate": (
(confidences < self.alert_threshold).float().mean().item()
),
}
# Compute drift for each tracked feature
drift_scores = {}
for feature_name in self.reference_stats:
score = self.compute_drift_score(feature_name)
if score is not None:
drift_scores[feature_name] = score
summary["drift_scores"] = drift_scores
return summary
def _alert(self, level: str, message: str) -> None:
"""Trigger an alert.
Args:
level: Alert severity (INFO, WARNING, CRITICAL).
message: Alert message.
"""
alert_data = {
"level": level,
"model": self.model_name,
"message": message,
"timestamp": time.time(),
}
for handler in self.alert_handlers:
handler(alert_data)
34.8 Drift Detection
Drift is the phenomenon where the statistical properties of data change over time, causing model performance to degrade. Understanding and detecting drift is one of the most important MLOps capabilities.
34.8.1 Types of Drift
Data Drift (Covariate Shift): The distribution of input features changes. For example, a credit scoring model trained on pre-pandemic data sees dramatically different income distributions post-pandemic.
$$P_{\text{train}}(X) \neq P_{\text{prod}}(X)$$
Concept Drift: The relationship between inputs and outputs changes. The features look the same, but the correct predictions are different.
$$P_{\text{train}}(Y|X) \neq P_{\text{prod}}(Y|X)$$
Prediction Drift: The distribution of model outputs changes, even if the input distribution seems stable.
$$P_{\text{train}}(\hat{Y}) \neq P_{\text{prod}}(\hat{Y})$$
34.8.2 Drift Detection Methods
Population Stability Index (PSI):
$$\text{PSI} = \sum_{i=1}^{k} (p_i - q_i) \ln \frac{p_i}{q_i}$$
where $p_i$ is the proportion in bucket $i$ for the reference distribution and $q_i$ is the proportion for the current distribution.
| PSI Value | Interpretation |
|---|---|
| < 0.1 | No significant drift |
| 0.1 -- 0.2 | Moderate drift, investigate |
| > 0.2 | Significant drift, action needed |
Kolmogorov-Smirnov (KS) Test:
$$D = \sup_x |F_{\text{ref}}(x) - F_{\text{cur}}(x)|$$
The KS test measures the maximum distance between two cumulative distribution functions.
"""Drift detection implementations.
Provides PSI, KS test, and windowed drift detection.
"""
import torch
from scipy import stats
torch.manual_seed(42)
def compute_psi(
reference: torch.Tensor,
current: torch.Tensor,
n_bins: int = 10,
eps: float = 1e-4,
) -> float:
"""Compute Population Stability Index between two distributions.
Args:
reference: Reference distribution values.
current: Current distribution values.
n_bins: Number of histogram bins.
eps: Small constant to avoid division by zero.
Returns:
PSI value (0 = identical distributions).
"""
# Create bins from reference distribution
min_val = min(reference.min().item(), current.min().item())
max_val = max(reference.max().item(), current.max().item())
bins = torch.linspace(min_val, max_val, n_bins + 1)
# Compute histograms
ref_hist = torch.histogram(reference.float(), bins=bins).hist
cur_hist = torch.histogram(current.float(), bins=bins).hist
# Normalize to proportions
ref_proportions = ref_hist / ref_hist.sum() + eps
cur_proportions = cur_hist / cur_hist.sum() + eps
# Compute PSI
psi = ((cur_proportions - ref_proportions)
* torch.log(cur_proportions / ref_proportions)).sum()
return psi.item()
def ks_drift_test(
reference: torch.Tensor,
current: torch.Tensor,
significance_level: float = 0.05,
) -> dict[str, float | bool]:
"""Perform Kolmogorov-Smirnov test for drift detection.
Args:
reference: Reference distribution values.
current: Current distribution values.
significance_level: P-value threshold for drift.
Returns:
Dictionary with statistic, p_value, and drift_detected.
"""
statistic, p_value = stats.ks_2samp(
reference.numpy(),
current.numpy(),
)
return {
"statistic": float(statistic),
"p_value": float(p_value),
"drift_detected": p_value < significance_level,
}
class WindowedDriftDetector:
"""Detects drift using sliding windows over streaming data.
Maintains a reference window and compares incoming data
against it periodically.
Args:
reference_data: Reference distribution (e.g., training data).
window_size: Size of the comparison window.
psi_threshold: PSI threshold for drift alert.
"""
def __init__(
self,
reference_data: torch.Tensor,
window_size: int = 500,
psi_threshold: float = 0.2,
) -> None:
self.reference_data = reference_data
self.window_size = window_size
self.psi_threshold = psi_threshold
self.current_window: list[float] = []
self.drift_history: list[dict] = []
def add_observation(self, value: float) -> dict | None:
"""Add a new observation and check for drift.
Args:
value: New observed value.
Returns:
Drift report if window is full, None otherwise.
"""
self.current_window.append(value)
if len(self.current_window) >= self.window_size:
current_tensor = torch.tensor(self.current_window)
psi = compute_psi(self.reference_data, current_tensor)
ks_result = ks_drift_test(self.reference_data, current_tensor)
report = {
"window_size": len(self.current_window),
"psi": psi,
"ks_statistic": ks_result["statistic"],
"ks_p_value": ks_result["p_value"],
"drift_detected": (
psi > self.psi_threshold
or ks_result["drift_detected"]
),
}
self.drift_history.append(report)
self.current_window = [] # Reset window
return report
return None
34.9 A/B Testing for Models
A/B testing is the gold standard for evaluating whether a new model actually improves business outcomes in production.
34.9.1 Why A/B Test Models
Offline metrics (accuracy, F1, BLEU) do not always correlate with business outcomes. A model that is 2% more accurate might generate 10% more revenue---or it might not. A/B testing measures the actual impact.
34.9.2 Traffic Splitting Strategies
"""A/B testing framework for ML models.
Implements traffic splitting, metric collection, and
statistical significance testing.
"""
import hashlib
import time
from dataclasses import dataclass, field
from typing import Any
import torch
from scipy import stats as scipy_stats
torch.manual_seed(42)
@dataclass
class ExperimentConfig:
"""Configuration for an A/B test experiment.
Attributes:
name: Experiment identifier.
control_model: Control model identifier.
treatment_model: Treatment model identifier.
traffic_split: Fraction of traffic to treatment (0-1).
min_samples: Minimum samples before significance test.
significance_level: P-value threshold.
"""
name: str
control_model: str
treatment_model: str
traffic_split: float = 0.5
min_samples: int = 1000
significance_level: float = 0.05
@dataclass
class ExperimentResult:
"""Results of an A/B test experiment.
Attributes:
control_metric: Mean metric for control group.
treatment_metric: Mean metric for treatment group.
relative_improvement: Relative improvement (%).
p_value: Statistical significance.
is_significant: Whether result is statistically significant.
control_n: Number of control observations.
treatment_n: Number of treatment observations.
"""
control_metric: float
treatment_metric: float
relative_improvement: float
p_value: float
is_significant: bool
control_n: int
treatment_n: int
class ABTestManager:
"""Manages A/B test experiments for ML models.
Args:
config: Experiment configuration.
"""
def __init__(self, config: ExperimentConfig) -> None:
self.config = config
self.control_outcomes: list[float] = []
self.treatment_outcomes: list[float] = []
def assign_variant(self, user_id: str) -> str:
"""Deterministically assign a user to control or treatment.
Uses consistent hashing so the same user always gets
the same variant, even across sessions.
Args:
user_id: Unique user identifier.
Returns:
'control' or 'treatment'.
"""
hash_value = hashlib.md5(
f"{self.config.name}:{user_id}".encode()
).hexdigest()
hash_fraction = int(hash_value[:8], 16) / 0xFFFFFFFF
if hash_fraction < self.config.traffic_split:
return "treatment"
return "control"
def record_outcome(self, variant: str, metric_value: float) -> None:
"""Record an outcome for a variant.
Args:
variant: 'control' or 'treatment'.
metric_value: The observed metric value.
"""
if variant == "control":
self.control_outcomes.append(metric_value)
elif variant == "treatment":
self.treatment_outcomes.append(metric_value)
else:
raise ValueError(f"Unknown variant: {variant}")
def analyze(self) -> ExperimentResult | None:
"""Analyze the experiment results.
Returns:
ExperimentResult if enough data, None otherwise.
"""
if (len(self.control_outcomes) < self.config.min_samples
or len(self.treatment_outcomes) < self.config.min_samples):
return None
control_tensor = torch.tensor(self.control_outcomes)
treatment_tensor = torch.tensor(self.treatment_outcomes)
control_mean = control_tensor.mean().item()
treatment_mean = treatment_tensor.mean().item()
if abs(control_mean) > 1e-8:
relative_improvement = (
(treatment_mean - control_mean) / abs(control_mean) * 100
)
else:
relative_improvement = 0.0
# Welch's t-test
t_stat, p_value = scipy_stats.ttest_ind(
self.control_outcomes,
self.treatment_outcomes,
equal_var=False,
)
return ExperimentResult(
control_metric=control_mean,
treatment_metric=treatment_mean,
relative_improvement=relative_improvement,
p_value=float(p_value),
is_significant=p_value < self.config.significance_level,
control_n=len(self.control_outcomes),
treatment_n=len(self.treatment_outcomes),
)
34.9.3 Multi-Armed Bandits
When you want to minimize regret during experimentation, multi-armed bandits adaptively allocate traffic:
class ThompsonSamplingBandit:
"""Thompson Sampling for adaptive model selection.
Uses Beta distributions to model the probability that each
model is the best, and samples accordingly.
Args:
model_names: List of model identifiers.
"""
def __init__(self, model_names: list[str]) -> None:
self.models = model_names
# Beta distribution parameters (successes, failures)
self.alpha = {name: 1.0 for name in model_names}
self.beta_ = {name: 1.0 for name in model_names}
def select_model(self) -> str:
"""Select a model using Thompson Sampling.
Returns:
Name of the selected model.
"""
samples = {}
for name in self.models:
# Sample from Beta posterior
sample = torch.distributions.Beta(
torch.tensor(self.alpha[name]),
torch.tensor(self.beta_[name]),
).sample()
samples[name] = sample.item()
return max(samples, key=samples.get)
def update(self, model_name: str, reward: float) -> None:
"""Update beliefs after observing a reward.
Args:
model_name: Which model was used.
reward: Observed reward (0 or 1 for binary).
"""
self.alpha[model_name] += reward
self.beta_[model_name] += 1.0 - reward
34.10 LLMOps: Operating Large Language Models
LLMOps extends MLOps with practices specific to large language models. While the foundational principles are the same, LLMs introduce unique operational challenges.
34.10.1 How LLMOps Differs from Traditional MLOps
| Aspect | Traditional MLOps | LLMOps |
|---|---|---|
| Training | Train from scratch or fine-tune | Often use pre-trained API models |
| Inputs | Structured features | Unstructured text prompts |
| Outputs | Classes/numbers | Free-form text |
| Versioning | Model weights | Prompts + configurations |
| Evaluation | Clear metrics | Subjective quality, safety |
| Cost | Compute per training run | Cost per token, ongoing |
| Failure modes | Wrong prediction | Hallucination, toxicity, jailbreak |
| Latency | Low (ms) | High (seconds) |
34.10.2 Prompt Versioning and Management
Prompts are the "code" of LLM applications. They must be versioned, tested, and managed with the same rigor as software code.
"""Prompt versioning and management system.
Provides structured prompt templates with version control,
A/B testing, and performance tracking.
"""
import json
import hashlib
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
import torch
torch.manual_seed(42)
@dataclass
class PromptTemplate:
"""A versioned prompt template.
Attributes:
name: Template identifier.
version: Semantic version.
system_prompt: System/instruction prompt.
user_template: User message template with {placeholders}.
model: Target model identifier.
temperature: Sampling temperature.
max_tokens: Maximum output tokens.
metadata: Additional configuration.
"""
name: str
version: str
system_prompt: str
user_template: str
model: str = "gpt-4"
temperature: float = 0.7
max_tokens: int = 1024
metadata: dict[str, Any] = field(default_factory=dict)
@property
def content_hash(self) -> str:
"""Compute a content hash for change detection."""
content = f"{self.system_prompt}:{self.user_template}"
return hashlib.sha256(content.encode()).hexdigest()[:12]
def render(self, **kwargs: Any) -> dict[str, str]:
"""Render the template with provided variables.
Args:
**kwargs: Template variable values.
Returns:
Dictionary with 'system' and 'user' messages.
Raises:
KeyError: If a required template variable is missing.
"""
return {
"system": self.system_prompt,
"user": self.user_template.format(**kwargs),
}
class PromptRegistry:
"""Registry for managing prompt templates.
Args:
storage_path: Path to store prompt versions.
"""
def __init__(self, storage_path: str = "./prompt_registry") -> None:
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.prompts: dict[str, dict[str, PromptTemplate]] = {}
def register(self, template: PromptTemplate) -> None:
"""Register a new prompt template version.
Args:
template: The prompt template to register.
"""
if template.name not in self.prompts:
self.prompts[template.name] = {}
self.prompts[template.name][template.version] = template
# Persist to disk
prompt_dir = self.storage_path / template.name
prompt_dir.mkdir(exist_ok=True)
metadata = {
"name": template.name,
"version": template.version,
"system_prompt": template.system_prompt,
"user_template": template.user_template,
"model": template.model,
"temperature": template.temperature,
"max_tokens": template.max_tokens,
"content_hash": template.content_hash,
"registered_at": datetime.now().isoformat(),
"metadata": template.metadata,
}
with open(prompt_dir / f"v{template.version}.json", "w") as f:
json.dump(metadata, f, indent=2)
def get(
self,
name: str,
version: str | None = None,
) -> PromptTemplate | None:
"""Retrieve a prompt template.
Args:
name: Template name.
version: Specific version, or None for latest.
Returns:
The prompt template, or None if not found.
"""
if name not in self.prompts:
return None
if version is not None:
return self.prompts[name].get(version)
# Return latest version
versions = sorted(self.prompts[name].keys())
return self.prompts[name][versions[-1]] if versions else None
def list_versions(self, name: str) -> list[str]:
"""List all versions of a prompt template.
Args:
name: Template name.
Returns:
Sorted list of version strings.
"""
if name not in self.prompts:
return []
return sorted(self.prompts[name].keys())
34.10.3 LLM Evaluation Pipelines
Evaluating LLMs is fundamentally different from evaluating traditional ML models. Outputs are free-form text, and quality is often subjective. A comprehensive evaluation pipeline combines automated metrics, model-based judges, and human evaluation.
"""LLM evaluation pipeline.
Implements automated evaluation for LLM outputs using
multiple strategies: exact match, semantic similarity,
and LLM-as-judge.
"""
from dataclasses import dataclass
from typing import Any
import torch
import torch.nn.functional as F
torch.manual_seed(42)
@dataclass
class EvalExample:
"""A single evaluation example.
Attributes:
input_text: The input prompt.
expected_output: Expected/reference output.
actual_output: Model-generated output.
metadata: Additional context.
"""
input_text: str
expected_output: str
actual_output: str
metadata: dict[str, Any] = None
@dataclass
class EvalResult:
"""Evaluation result for a single example.
Attributes:
scores: Dictionary of metric scores.
feedback: Human-readable feedback.
passed: Whether the example passed all criteria.
"""
scores: dict[str, float]
feedback: str
passed: bool
class LLMEvaluator:
"""Evaluates LLM outputs using multiple strategies.
Args:
passing_threshold: Minimum average score to pass.
"""
def __init__(self, passing_threshold: float = 0.7) -> None:
self.passing_threshold = passing_threshold
self.results: list[EvalResult] = []
def evaluate_exact_match(self, example: EvalExample) -> float:
"""Check for exact string match.
Args:
example: The evaluation example.
Returns:
1.0 if exact match, 0.0 otherwise.
"""
return 1.0 if example.actual_output.strip() == example.expected_output.strip() else 0.0
def evaluate_contains(
self,
example: EvalExample,
required_phrases: list[str],
) -> float:
"""Check if output contains required phrases.
Args:
example: The evaluation example.
required_phrases: Phrases that must appear in output.
Returns:
Fraction of required phrases found.
"""
if not required_phrases:
return 1.0
found = sum(
1 for phrase in required_phrases
if phrase.lower() in example.actual_output.lower()
)
return found / len(required_phrases)
def evaluate_length(
self,
example: EvalExample,
min_words: int = 10,
max_words: int = 500,
) -> float:
"""Check if output length is within bounds.
Args:
example: The evaluation example.
min_words: Minimum word count.
max_words: Maximum word count.
Returns:
1.0 if within bounds, penalty otherwise.
"""
word_count = len(example.actual_output.split())
if min_words <= word_count <= max_words:
return 1.0
elif word_count < min_words:
return word_count / min_words
else:
return max_words / word_count
def evaluate_no_hallucination_keywords(
self,
example: EvalExample,
forbidden_phrases: list[str],
) -> float:
"""Check that output does not contain forbidden phrases.
Args:
example: The evaluation example.
forbidden_phrases: Phrases that must NOT appear.
Returns:
1.0 if no forbidden phrases found, penalized otherwise.
"""
if not forbidden_phrases:
return 1.0
violations = sum(
1 for phrase in forbidden_phrases
if phrase.lower() in example.actual_output.lower()
)
return max(0.0, 1.0 - violations / len(forbidden_phrases))
def run_evaluation(
self,
examples: list[EvalExample],
required_phrases_map: dict[int, list[str]] | None = None,
forbidden_phrases_map: dict[int, list[str]] | None = None,
) -> dict[str, Any]:
"""Run full evaluation pipeline on a set of examples.
Args:
examples: List of evaluation examples.
required_phrases_map: Map from example index to required phrases.
forbidden_phrases_map: Map from example index to forbidden phrases.
Returns:
Summary of evaluation results.
"""
self.results = []
for i, example in enumerate(examples):
scores = {}
scores["exact_match"] = self.evaluate_exact_match(example)
scores["length"] = self.evaluate_length(example)
if required_phrases_map and i in required_phrases_map:
scores["contains"] = self.evaluate_contains(
example, required_phrases_map[i]
)
if forbidden_phrases_map and i in forbidden_phrases_map:
scores["no_hallucination"] = (
self.evaluate_no_hallucination_keywords(
example, forbidden_phrases_map[i]
)
)
avg_score = sum(scores.values()) / len(scores)
passed = avg_score >= self.passing_threshold
result = EvalResult(
scores=scores,
feedback=self._generate_feedback(scores),
passed=passed,
)
self.results.append(result)
# Summary
pass_rate = sum(1 for r in self.results if r.passed) / len(self.results)
avg_scores = {}
for key in self.results[0].scores:
values = [r.scores.get(key, 0.0) for r in self.results]
avg_scores[key] = sum(values) / len(values)
return {
"pass_rate": pass_rate,
"total_examples": len(examples),
"passed": sum(1 for r in self.results if r.passed),
"failed": sum(1 for r in self.results if not r.passed),
"average_scores": avg_scores,
}
def _generate_feedback(self, scores: dict[str, float]) -> str:
"""Generate human-readable feedback from scores.
Args:
scores: Dictionary of metric scores.
Returns:
Feedback string.
"""
feedback_parts = []
for metric, score in scores.items():
if score < 0.5:
feedback_parts.append(f"FAIL {metric}: {score:.2f}")
elif score < self.passing_threshold:
feedback_parts.append(f"WARN {metric}: {score:.2f}")
else:
feedback_parts.append(f"PASS {metric}: {score:.2f}")
return " | ".join(feedback_parts)
34.10.4 Guardrails for LLM Applications
Guardrails are runtime safety checks that prevent LLMs from generating harmful, off-topic, or policy-violating content.
"""Guardrails framework for LLM applications.
Provides input validation, output filtering, and safety checks
for production LLM deployments.
"""
import re
from dataclasses import dataclass
from enum import Enum
from typing import Any
import torch
torch.manual_seed(42)
class GuardrailAction(Enum):
"""Action to take when a guardrail is triggered."""
ALLOW = "allow"
BLOCK = "block"
WARN = "warn"
MODIFY = "modify"
@dataclass
class GuardrailResult:
"""Result of a guardrail check.
Attributes:
action: The recommended action.
rule_name: Which rule was triggered.
details: Human-readable explanation.
modified_text: Modified text (if action is MODIFY).
"""
action: GuardrailAction
rule_name: str
details: str
modified_text: str | None = None
class LLMGuardrails:
"""Guardrails for LLM input and output.
Checks for PII, prompt injection, toxicity indicators,
and topic restrictions.
"""
def __init__(self) -> None:
# PII patterns
self.pii_patterns = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
}
# Prompt injection indicators
self.injection_patterns = [
r"ignore\s+(previous|all|above)\s+instructions",
r"you\s+are\s+now\s+(?:a|an|in)\s+",
r"disregard\s+(?:your|all|any)\s+",
r"system\s*prompt",
r"jailbreak",
]
# Topic restrictions (customize per application)
self.blocked_topics: list[str] = []
def check_input(self, text: str) -> list[GuardrailResult]:
"""Check user input against guardrails.
Args:
text: User input text.
Returns:
List of guardrail results.
"""
results: list[GuardrailResult] = []
# Check for prompt injection
for pattern in self.injection_patterns:
if re.search(pattern, text, re.IGNORECASE):
results.append(GuardrailResult(
action=GuardrailAction.BLOCK,
rule_name="prompt_injection",
details=f"Potential prompt injection detected: {pattern}",
))
# Check for PII in input
for pii_type, pattern in self.pii_patterns.items():
if re.search(pattern, text):
results.append(GuardrailResult(
action=GuardrailAction.WARN,
rule_name=f"input_pii:{pii_type}",
details=f"PII detected in input: {pii_type}",
))
# Check input length
if len(text) > 10000:
results.append(GuardrailResult(
action=GuardrailAction.BLOCK,
rule_name="input_length",
details=f"Input too long: {len(text)} characters (max 10000)",
))
if not results:
results.append(GuardrailResult(
action=GuardrailAction.ALLOW,
rule_name="input_check",
details="All input checks passed.",
))
return results
def check_output(self, text: str) -> list[GuardrailResult]:
"""Check LLM output against guardrails.
Args:
text: LLM-generated output text.
Returns:
List of guardrail results.
"""
results: list[GuardrailResult] = []
# Check for PII leakage in output
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, text)
if matches:
# Redact PII
redacted = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", text)
results.append(GuardrailResult(
action=GuardrailAction.MODIFY,
rule_name=f"output_pii:{pii_type}",
details=f"PII found in output: {pii_type} ({len(matches)} instances)",
modified_text=redacted,
))
# Check for blocked topics in output
for topic in self.blocked_topics:
if topic.lower() in text.lower():
results.append(GuardrailResult(
action=GuardrailAction.BLOCK,
rule_name=f"blocked_topic:{topic}",
details=f"Output contains blocked topic: {topic}",
))
# Check output length
if len(text.split()) < 3:
results.append(GuardrailResult(
action=GuardrailAction.WARN,
rule_name="output_too_short",
details="Output suspiciously short (< 3 words).",
))
if not results:
results.append(GuardrailResult(
action=GuardrailAction.ALLOW,
rule_name="output_check",
details="All output checks passed.",
))
return results
def apply_guardrails(
self,
input_text: str,
output_text: str,
) -> dict[str, Any]:
"""Apply all guardrails to an input/output pair.
Args:
input_text: User input.
output_text: LLM output.
Returns:
Dictionary with guardrail results and final action.
"""
input_results = self.check_input(input_text)
output_results = self.check_output(output_text)
all_results = input_results + output_results
# Determine final action (most restrictive wins)
if any(r.action == GuardrailAction.BLOCK for r in all_results):
final_action = GuardrailAction.BLOCK
final_text = "I'm sorry, I cannot help with that request."
elif any(r.action == GuardrailAction.MODIFY for r in all_results):
final_action = GuardrailAction.MODIFY
# Apply all modifications
final_text = output_text
for r in all_results:
if r.action == GuardrailAction.MODIFY and r.modified_text:
final_text = r.modified_text
else:
final_action = GuardrailAction.ALLOW
final_text = output_text
return {
"final_action": final_action.value,
"final_text": final_text,
"input_results": [
{"action": r.action.value, "rule": r.rule_name, "details": r.details}
for r in input_results
],
"output_results": [
{"action": r.action.value, "rule": r.rule_name, "details": r.details}
for r in output_results
],
}
34.11 Infrastructure as Code for ML
Infrastructure as Code (IaC) ensures that ML infrastructure is reproducible, version-controlled, and auditable. The two most common tools are Terraform for cloud infrastructure and Docker for application packaging.
34.11.1 Dockerizing ML Applications
# Dockerfile for ML model serving
FROM python:3.11-slim
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV MODEL_PATH=/app/models/model.pt
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
# Copy application code
COPY src/ /app/src/
COPY models/ /app/models/
WORKDIR /app
# Health check
HEALTHCHECK --interval=30s --timeout=5s \
CMD curl -f http://localhost:8000/health || exit 1
# Run the serving application
EXPOSE 8000
CMD ["uvicorn", "src.serve:app", "--host", "0.0.0.0", "--port", "8000"]
34.11.2 Kubernetes Deployment
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-model-serving
labels:
app: ml-model
spec:
replicas: 3
selector:
matchLabels:
app: ml-model
template:
metadata:
labels:
app: ml-model
spec:
containers:
- name: model-server
image: myregistry/ml-model:v1.2.3
ports:
- containerPort: 8000
resources:
requests:
memory: "2Gi"
cpu: "1000m"
nvidia.com/gpu: "1"
limits:
memory: "4Gi"
cpu: "2000m"
nvidia.com/gpu: "1"
env:
- name: MODEL_VERSION
value: "v1.2.3"
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: ml-model-service
spec:
selector:
app: ml-model
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
34.11.3 Terraform for ML Infrastructure
# main.tf - ML training infrastructure on AWS
provider "aws" {
region = "us-east-1"
}
# S3 bucket for model artifacts
resource "aws_s3_bucket" "model_artifacts" {
bucket = "ml-model-artifacts-prod"
versioning {
enabled = true
}
server_side_encryption_configuration {
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
tags = {
Environment = "production"
Team = "ml-platform"
}
}
# ECR repository for model serving containers
resource "aws_ecr_repository" "model_serving" {
name = "ml-model-serving"
image_tag_mutability = "IMMUTABLE"
image_scanning_configuration {
scan_on_push = true
}
}
# SageMaker endpoint for model serving
resource "aws_sagemaker_endpoint_configuration" "model_config" {
name = "ml-model-config"
production_variants {
variant_name = "primary"
model_name = aws_sagemaker_model.model.name
initial_instance_count = 2
instance_type = "ml.g4dn.xlarge"
initial_variant_weight = 1.0
}
}
34.12 Cost Management
ML workloads can be extraordinarily expensive. Effective cost management is a critical MLOps capability.
34.12.1 Training Cost Optimization
"""ML cost tracking and optimization utilities.
Provides cost estimation, tracking, and optimization
recommendations for ML workloads.
"""
from dataclasses import dataclass
from typing import Any
import torch
torch.manual_seed(42)
# Approximate GPU costs per hour (on-demand, major cloud providers, 2024)
GPU_COSTS_PER_HOUR = {
"nvidia-t4": 0.526,
"nvidia-a10g": 1.006,
"nvidia-l4": 0.81,
"nvidia-a100-40gb": 3.67,
"nvidia-a100-80gb": 5.12,
"nvidia-h100": 8.28,
}
# Approximate LLM API costs per 1M tokens (2024)
LLM_API_COSTS_PER_1M_TOKENS = {
"gpt-4-turbo": {"input": 10.0, "output": 30.0},
"gpt-4o": {"input": 2.50, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.0, "output": 15.0},
"claude-3-haiku": {"input": 0.25, "output": 1.25},
}
@dataclass
class TrainingCostEstimate:
"""Estimated cost for a training run.
Attributes:
gpu_type: GPU identifier.
num_gpus: Number of GPUs.
estimated_hours: Estimated training time.
cost_per_hour: Cost per GPU per hour.
total_cost: Total estimated cost.
recommendations: Cost optimization suggestions.
"""
gpu_type: str
num_gpus: int
estimated_hours: float
cost_per_hour: float
total_cost: float
recommendations: list[str]
def estimate_training_cost(
model_params_millions: float,
dataset_size_gb: float,
epochs: int,
gpu_type: str = "nvidia-a100-40gb",
num_gpus: int = 1,
) -> TrainingCostEstimate:
"""Estimate the cost of a training run.
Uses rough heuristics based on model size and data volume.
Actual costs vary significantly based on implementation details.
Args:
model_params_millions: Model size in millions of parameters.
dataset_size_gb: Dataset size in gigabytes.
epochs: Number of training epochs.
gpu_type: Type of GPU to use.
num_gpus: Number of GPUs.
Returns:
Cost estimate with recommendations.
"""
cost_per_hour = GPU_COSTS_PER_HOUR.get(gpu_type, 5.0)
# Rough estimate: 1M params * 1GB data * 1 epoch ~ 0.1 GPU-hours
# This is a very rough approximation
gpu_hours = (
model_params_millions * dataset_size_gb * epochs * 0.1 / num_gpus
)
total_cost = gpu_hours * cost_per_hour * num_gpus
recommendations = []
if gpu_type in ("nvidia-a100-80gb", "nvidia-h100"):
recommendations.append(
"Consider using spot/preemptible instances for up to 70% savings."
)
if num_gpus == 1 and model_params_millions > 500:
recommendations.append(
"Consider multi-GPU training with DDP for faster training."
)
if epochs > 10:
recommendations.append(
"Consider early stopping to avoid wasted compute on "
"diminishing returns."
)
if model_params_millions > 100:
recommendations.append(
"Consider mixed-precision training (fp16/bf16) for ~2x speedup."
)
return TrainingCostEstimate(
gpu_type=gpu_type,
num_gpus=num_gpus,
estimated_hours=gpu_hours,
cost_per_hour=cost_per_hour,
total_cost=total_cost,
recommendations=recommendations,
)
def estimate_llm_api_cost(
model: str,
input_tokens: int,
output_tokens: int,
num_requests: int = 1,
) -> dict[str, float]:
"""Estimate the cost of LLM API usage.
Args:
model: Model identifier.
input_tokens: Tokens per input.
output_tokens: Tokens per output.
num_requests: Number of requests.
Returns:
Dictionary with cost breakdown.
"""
costs = LLM_API_COSTS_PER_1M_TOKENS.get(model)
if costs is None:
return {"error": f"Unknown model: {model}"}
total_input_tokens = input_tokens * num_requests
total_output_tokens = output_tokens * num_requests
input_cost = total_input_tokens / 1_000_000 * costs["input"]
output_cost = total_output_tokens / 1_000_000 * costs["output"]
return {
"model": model,
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": input_cost + output_cost,
"cost_per_request": (input_cost + output_cost) / num_requests,
}
34.12.2 Cost Management Best Practices
-
Use spot/preemptible instances for training workloads (60--90% savings). Implement checkpointing to handle interruptions.
-
Right-size your GPUs. Do not use an H100 when an A10G suffices. Profile your workload's GPU utilization.
-
Implement auto-scaling for serving infrastructure. Scale to zero when there is no traffic.
-
Cache LLM responses. Identical prompts should return cached results.
-
Monitor cost per prediction. Track this metric alongside model performance.
-
Set budget alerts. Cloud providers offer billing alerts; use them aggressively.
-
Use reserved instances for stable, predictable serving workloads.
-
Optimize model size. Distillation, pruning, and quantization can dramatically reduce serving costs.
34.13 Putting It All Together: An MLOps Architecture
Here is a reference architecture that combines all the concepts in this chapter:
┌───────────────────────────────────────────────────────────────────┐
│ MLOps Reference Architecture │
│ │
│ ┌─────────┐ ┌──────────┐ ┌──────────┐ ┌───────────────────┐ │
│ │ Feature │ │ Training │ │ Model │ │ Serving │ │
│ │ Store │──│ Pipeline │──│ Registry │──│ Infrastructure │ │
│ └─────────┘ └──────────┘ └──────────┘ └───────────────────┘ │
│ │ │ │ │ │
│ v v v v │
│ ┌─────────┐ ┌──────────┐ ┌──────────┐ ┌───────────────────┐ │
│ │ Data │ │Experiment│ │ CI/CD │ │ Monitoring & │ │
│ │ Store │ │ Tracking │ │ Pipeline │ │ Observability │ │
│ │ (DVC) │ │ (W&B) │ │(GH Actn)│ │ (Prometheus/ │ │
│ │ │ │ │ │ │ │ Grafana) │ │
│ └─────────┘ └──────────┘ └──────────┘ └───────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Infrastructure Layer │ │
│ │ Kubernetes | Docker | Terraform | Cloud Provider │ │
│ └────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────┘
34.13.1 Decision Framework
When designing your MLOps stack, consider:
- Team size: Small teams should use managed services; large teams may benefit from self-hosted solutions.
- Model type: Traditional ML, deep learning, and LLM applications have different operational needs.
- Regulatory environment: Healthcare, finance, and government have strict audit and reproducibility requirements.
- Scale: The difference between serving 100 predictions/day and 100 million is not just engineering---it is a different architecture.
- Budget: Start simple. You do not need every tool from day one.
34.14 Summary
MLOps and LLMOps are not optional luxuries---they are essential engineering disciplines for any organization deploying ML in production. In this chapter, we covered:
-
The ML lifecycle is a continuous loop, not a linear pipeline. Each phase requires operational tooling and processes.
-
Experiment tracking (with tools like W&B) ensures reproducibility and enables informed decisions about model development.
-
Data versioning (with DVC) ensures you can always reproduce a training run and trace a model back to its training data.
-
Model versioning and registries manage the lifecycle of models from development through production.
-
CI/CD for ML extends software CI/CD with data validation, model testing, and performance regression checks.
-
Monitoring and observability catch silent model failures before they impact users.
-
Drift detection identifies when production data diverges from training data, signaling the need for retraining.
-
A/B testing measures the actual business impact of model changes.
-
LLMOps adds prompt versioning, evaluation pipelines, and guardrails to the MLOps toolkit.
-
Infrastructure as Code makes ML infrastructure reproducible and auditable.
-
Cost management prevents ML workloads from becoming prohibitively expensive.
The key insight is that MLOps is not a one-time setup---it is a continuous practice that evolves with your models, data, and business needs. Start simple, automate what hurts most, and iterate.
In the next chapter, we will explore distributed training and scaling, which is essential for training the large models that modern ML applications demand.