> "Deploying a model is not the finish line. It is the starting gun."
In This Chapter
- Learning Objectives
- 29.1 The Deployment Gap
- 29.2 MLOps Maturity Levels
- 29.3 CI/CD for ML: The Three-Artifact Pipeline
- 29.4 Artifact Lineage and Reproducibility
- 29.5 Shadow Mode: Evaluation Without Risk
- 29.6 Canary Deployments: Gradual Traffic Exposure
- 29.7 Progressive Rollout: From Canary to Full Traffic
- 29.8 Continuous Training: Keeping Models Fresh
- 29.9 Retraining Pipeline Integration
- 29.10 Model Rollback: The Safety Net
- 29.11 Model Serving Infrastructure
- 29.12 Progressive Project: M13 — StreamRec Deployment Pipeline
- 29.13 Deployment in Regulated Environments
- 29.14 Summary
Chapter 29: Continuous Training and Deployment — CI/CD for ML, Canary Deployments, Shadow Mode, and Progressive Rollout
"Deploying a model is not the finish line. It is the starting gun." — Chip Huyen, Designing Machine Learning Systems (2022)
Learning Objectives
By the end of this chapter, you will be able to:
- Design CI/CD pipelines for ML that handle three artifact types — code, data, and model — with appropriate versioning, testing, and promotion strategies for each
- Implement canary deployments with gradual traffic shifts, statistical significance testing, and automatic promotion or rollback decisions
- Use shadow mode to evaluate a challenger model on production traffic without affecting user experience, and design the metrics comparison framework that translates shadow results into deployment decisions
- Design retraining triggers — drift-based, performance-based, and scheduled — that balance model freshness against computational cost and deployment risk
- Implement model rollback procedures that restore the previous champion within seconds, including the infrastructure, automation, and organizational processes that make rollback safe and reliable
29.1 The Deployment Gap
Chapter 27 built the orchestration pipeline that trains a model. Chapter 28 built the testing infrastructure that validates it. This chapter answers the question that follows: how does a validated model reach production traffic safely, and how does it stay fresh once it gets there?
The question is harder than it appears. In traditional software deployment, the artifact is a binary — a Docker image, a JAR file, a compiled executable. The CI/CD pipeline is well-understood: commit code, run tests, build artifact, deploy to staging, run integration tests, deploy to production. If the deployment fails, roll back to the previous binary. The entire process is deterministic: the same code produces the same binary, and the same binary produces the same behavior.
ML deployment breaks this determinism in three ways.
Three artifacts, not one. A traditional deployment ships code. An ML deployment ships code (the serving infrastructure, feature engineering logic, and API contracts), data (the training dataset, feature schemas, and preprocessing parameters), and a model artifact (the serialized weights, architecture definition, and metadata). Each artifact has its own versioning scheme, its own testing requirements, and its own failure modes. A code change that does not touch the model can break serving. A data change that does not touch the code can degrade predictions. A model retrained on the same code and same data can produce different results due to non-deterministic training. The CI/CD pipeline must version, test, and promote all three artifacts together.
No ground truth at serving time. When a traditional service deploys a new version, integration tests can verify that the output is correct: an API that should return a 200 status code either returns it or does not. When a recommendation model deploys a new version, there is no immediate signal of correctness. The model returns a list of item IDs. Those IDs are valid — they exist in the catalog. But whether the recommendations are good requires observing user behavior over hours or days. The deployment pipeline must evaluate model quality through proxy metrics (latency, error rate, prediction distribution) and delayed outcome metrics (click-through rate, completion rate, engagement) with different time horizons.
Continuous retraining. Traditional software deploys when code changes. ML systems must also deploy when data changes — because the world changes. A recommendation model trained on last month's data does not know about this month's trending content. A credit scoring model trained before a recession does not reflect the new default patterns. The deployment pipeline must support not only on-demand deployment (triggered by code or architecture changes) but also continuous training — scheduled or triggered retraining that keeps the model current without human intervention.
Production ML = Software Engineering: The deployment gap is not a machine learning problem — it is a software engineering problem with ML-specific constraints. Every principle from software deployment applies: immutable artifacts, staged rollouts, automated rollback, observability. ML adds the three-artifact challenge, the delayed-feedback challenge, and the continuous-retraining challenge on top of the standard deployment playbook. The organizations that deploy ML systems reliably are the ones that recognize this: they build ML deployment on top of mature software deployment infrastructure, not from scratch.
This chapter proceeds in five movements. Section 29.2-29.4 cover the three-artifact CI/CD pipeline: how to version, test, and promote code, data, and models together. Section 29.5-29.7 cover deployment strategies: blue-green, canary, shadow mode, and progressive rollout. Section 29.8-29.9 cover continuous training: retraining triggers, scheduling, and the automation that keeps models fresh. Section 29.10-29.11 cover rollback: how to detect a bad deployment and restore the previous model safely. Section 29.12 synthesizes these into the StreamRec deployment pipeline for progressive project milestone M13.
29.2 MLOps Maturity Levels
Before designing a CI/CD pipeline, it is useful to understand where your organization falls on the MLOps maturity spectrum. Google's MLOps maturity framework defines four levels:
Level 0: Manual Process
| Aspect | Description |
|---|---|
| Training | Manual, in notebooks |
| Deployment | Manual, by the data scientist |
| Monitoring | None |
| Retraining | When someone remembers |
| Testing | Manual eyeballing of metrics |
At Level 0, the data scientist trains a model in a Jupyter notebook, exports the weights to a file, copies the file to a server, and updates a configuration file to point to the new model. Deployment is a human process with no automation, no versioning, and no rollback capability. Most ML projects start here. Many never leave.
Level 1: ML Pipeline Automation
| Aspect | Description |
|---|---|
| Training | Automated pipeline (Dagster, Airflow) |
| Deployment | Manual promotion from registry |
| Monitoring | Basic metrics (latency, error rate) |
| Retraining | Scheduled (e.g., weekly) |
| Testing | Automated data validation, model evaluation |
At Level 1, the training pipeline from Chapter 27 runs automatically. Data validation (Chapter 28) catches quality issues. Trained models are registered in MLflow with metrics and metadata. But deployment is still manual: an engineer reviews the metrics, decides to promote, and triggers deployment. This is where the StreamRec pipeline stood at the end of Chapter 28.
Level 2: CI/CD Pipeline Automation
| Aspect | Description |
|---|---|
| Training | Automated pipeline with CT triggers |
| Deployment | Automated CI/CD with staged rollout |
| Monitoring | Model performance + data drift |
| Retraining | Event-driven (drift, degradation, schedule) |
| Testing | Full ML test suite (data, behavioral, validation gate) |
At Level 2, the entire path from code commit to production traffic is automated. Code changes trigger CI tests. Successful training triggers model validation. Validated models enter a staged deployment pipeline (shadow mode, canary, progressive rollout). Monitoring detects issues and triggers rollback or retraining. This is the target of this chapter.
Level 3: Automated ML System
| Aspect | Description |
|---|---|
| Training | Automated with hyperparameter optimization |
| Deployment | Fully automated with zero-touch rollout |
| Monitoring | Automated anomaly detection, root cause analysis |
| Retraining | Automated trigger → train → validate → deploy |
| Testing | Automated test generation, adversarial testing |
At Level 3, the entire system is self-healing. Drift triggers retraining. Retraining triggers validation. Validation triggers deployment. Deployment is monitored. Monitoring triggers rollback or further retraining. The human is notified but not required. Few organizations reach Level 3 for all models — most operate at Level 2 for high-impact models and Level 1 for lower-impact models.
Where should you aim? Level 2 for any model that serves production traffic. Level 1 for experimental or low-impact models. Level 3 only if the model serves high-volume, time-sensitive traffic (e.g., real-time recommendations, fraud detection) where human-in-the-loop deployment introduces unacceptable latency. The jump from Level 0 to Level 1 saves the most engineering hours per unit of investment. The jump from Level 1 to Level 2 saves the most production incidents.
29.3 CI/CD for ML: The Three-Artifact Pipeline
Traditional CI/CD pipelines assume a single artifact type: code. The pipeline checks out code, runs tests, builds a binary, and deploys it. ML CI/CD must handle three artifact types, each with distinct versioning and testing requirements.
The Three Artifacts
| Artifact | Versioning | Testing | Promotion Trigger |
|---|---|---|---|
| Code | Git (commit SHA) | Unit tests, integration tests, linting | Pull request merge |
| Data | Data version (DVC hash, Delta Lake version, partition date) | Schema validation, statistical tests, freshness | Pipeline schedule or sensor |
| Model | Model registry version (MLflow run ID) | Behavioral tests, validation gate, shadow evaluation | Automated or manual approval |
The key insight is that these three artifacts change on different timelines. Code changes when engineers push commits — perhaps daily. Data changes when new events arrive — perhaps hourly. Models change when retraining completes — perhaps weekly. A CI/CD pipeline that triggers only on code changes misses the data and model dimensions entirely.
Pipeline Architecture
A complete ML CI/CD pipeline has three parallel trigger paths that converge at the deployment stage:
Code Change Path:
git push → lint + type check → unit tests → integration tests → build Docker image → push to registry
Data Change Path:
new partition arrives → schema validation (GE) → statistical tests (PSI) → data contract check → trigger retraining if thresholds met
Model Change Path:
training completes → offline evaluation → behavioral tests → validation gate (champion-challenger) → register in MLflow → trigger deployment
Deployment Path (converges):
model registered → shadow mode → canary (10%) → canary evaluation → progressive rollout (25%, 50%, 100%) → promotion to champion
Implementing the Code Path
The code path uses standard CI/CD tooling — GitHub Actions, GitLab CI, or Jenkins — with ML-specific additions.
"""
streamrec_ci.py — CI pipeline configuration for StreamRec serving code.
Implements the code-path CI/CD pipeline as a Python-defined GitHub Actions
workflow configuration. In practice, this would be a YAML workflow file;
the Python representation here makes the logic explicit and testable.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
class CIStage(Enum):
"""Stages in the CI pipeline."""
LINT = "lint"
TYPE_CHECK = "type_check"
UNIT_TEST = "unit_test"
INTEGRATION_TEST = "integration_test"
BUILD = "build"
PUSH = "push"
TRIGGER_DEPLOYMENT = "trigger_deployment"
@dataclass
class CIStep:
"""A single step in the CI pipeline.
Attributes:
name: Human-readable step name.
stage: Pipeline stage this step belongs to.
command: Shell command to execute.
timeout_minutes: Maximum execution time.
retry_count: Number of retries on failure.
required: Whether pipeline fails if this step fails.
"""
name: str
stage: CIStage
command: str
timeout_minutes: int = 10
retry_count: int = 0
required: bool = True
@dataclass
class MLCIPipeline:
"""CI pipeline for ML serving code.
Defines the steps for linting, testing, building, and pushing
the serving Docker image. The pipeline runs on every push to
the main branch and on every pull request.
Attributes:
project_name: Name of the ML project.
python_version: Python version for the CI environment.
docker_registry: Docker registry URL.
steps: Ordered list of CI steps.
"""
project_name: str
python_version: str = "3.11"
docker_registry: str = "us-central1-docker.pkg.dev/streamrec/models"
steps: List[CIStep] = field(default_factory=list)
def __post_init__(self) -> None:
if not self.steps:
self.steps = self._default_steps()
def _default_steps(self) -> List[CIStep]:
"""Generate the default CI pipeline steps."""
return [
CIStep(
name="Lint (ruff)",
stage=CIStage.LINT,
command="ruff check src/ tests/ --select E,W,F,I",
timeout_minutes=5,
),
CIStep(
name="Type check (mypy)",
stage=CIStage.TYPE_CHECK,
command="mypy src/ --strict --ignore-missing-imports",
timeout_minutes=10,
),
CIStep(
name="Unit tests",
stage=CIStage.UNIT_TEST,
command=(
"pytest tests/unit/ -v --cov=src --cov-report=xml "
"--cov-fail-under=85"
),
timeout_minutes=15,
),
CIStep(
name="Integration tests (serving)",
stage=CIStage.INTEGRATION_TEST,
command=(
"pytest tests/integration/ -v -m 'not gpu' "
"--timeout=300"
),
timeout_minutes=30,
retry_count=1,
),
CIStep(
name="Build Docker image",
stage=CIStage.BUILD,
command=(
"docker build -t {registry}/{project}:${{GIT_SHA}} "
"-f Dockerfile.serving ."
).format(
registry="us-central1-docker.pkg.dev/streamrec/models",
project="streamrec-serving",
),
timeout_minutes=20,
),
CIStep(
name="Push Docker image",
stage=CIStage.PUSH,
command=(
"docker push {registry}/{project}:${{GIT_SHA}}"
).format(
registry="us-central1-docker.pkg.dev/streamrec/models",
project="streamrec-serving",
),
timeout_minutes=10,
),
]
def validate_pipeline(self) -> Dict[str, bool]:
"""Validate pipeline configuration.
Returns:
Dictionary mapping check names to pass/fail status.
"""
checks: Dict[str, bool] = {}
# Every stage must have at least one step
stages_covered = {step.stage for step in self.steps}
required_stages = {CIStage.LINT, CIStage.UNIT_TEST, CIStage.BUILD}
checks["required_stages_covered"] = required_stages.issubset(
stages_covered
)
# No step should have timeout > 60 minutes
checks["reasonable_timeouts"] = all(
step.timeout_minutes <= 60 for step in self.steps
)
# At least one required step per stage
for stage in required_stages:
stage_steps = [s for s in self.steps if s.stage == stage]
checks[f"{stage.value}_has_required_step"] = any(
s.required for s in stage_steps
)
return checks
def estimated_duration_minutes(self) -> int:
"""Estimate total pipeline duration assuming sequential execution."""
return sum(step.timeout_minutes for step in self.steps)
# ── Instantiate the StreamRec CI pipeline ────────────────────────────────
streamrec_ci = MLCIPipeline(project_name="streamrec")
validation = streamrec_ci.validate_pipeline()
assert all(validation.values()), f"CI pipeline validation failed: {validation}"
# Estimated duration: ~90 minutes worst case (sequential), ~35 minutes (parallel)
Model Versioning and the Model Registry
The model registry is the bridge between training and deployment. Chapter 27 registered models in MLflow at the end of the Dagster pipeline. Here, we formalize the registry's role in CI/CD.
A model registry must support five operations:
- Register: Store a model artifact with metadata (metrics, parameters, data version, code version, training date, feature schema).
- Version: Assign a monotonically increasing version number within a model name.
- Stage transition: Move a model between stages:
None→Staging→Production→Archived. - Query: Retrieve the current production model, or any historical version.
- Lineage: Given a model version, retrieve the exact code commit, data version, and hyperparameters that produced it.
"""
model_registry.py — Model versioning and stage management for CI/CD.
Wraps MLflow's model registry with deployment-specific logic:
stage transitions, promotion rules, and artifact lineage.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum
import hashlib
import json
class ModelStage(Enum):
"""Model lifecycle stages in the registry."""
NONE = "None"
STAGING = "Staging"
SHADOW = "Shadow"
CANARY = "Canary"
PRODUCTION = "Production"
ARCHIVED = "Archived"
VALID_TRANSITIONS: Dict[ModelStage, List[ModelStage]] = {
ModelStage.NONE: [ModelStage.STAGING],
ModelStage.STAGING: [ModelStage.SHADOW, ModelStage.ARCHIVED],
ModelStage.SHADOW: [ModelStage.CANARY, ModelStage.ARCHIVED],
ModelStage.CANARY: [ModelStage.PRODUCTION, ModelStage.ARCHIVED],
ModelStage.PRODUCTION: [ModelStage.ARCHIVED],
ModelStage.ARCHIVED: [],
}
@dataclass
class ModelArtifact:
"""A versioned model artifact with full lineage.
Attributes:
model_name: Registered model name (e.g., 'streamrec-retrieval').
version: Monotonically increasing version number.
stage: Current lifecycle stage.
metrics: Evaluation metrics from offline validation.
params: Hyperparameters used during training.
code_version: Git commit SHA of the training code.
data_version: Hash or version of the training data.
feature_schema: Schema of input features (column names and types).
training_date: Timestamp when training completed.
artifact_uri: URI to the serialized model artifact.
tags: Additional metadata tags.
"""
model_name: str
version: int
stage: ModelStage = ModelStage.NONE
metrics: Dict[str, float] = field(default_factory=dict)
params: Dict[str, Any] = field(default_factory=dict)
code_version: str = ""
data_version: str = ""
feature_schema: Dict[str, str] = field(default_factory=dict)
training_date: datetime = field(default_factory=datetime.utcnow)
artifact_uri: str = ""
tags: Dict[str, str] = field(default_factory=dict)
@property
def lineage_hash(self) -> str:
"""Compute a deterministic hash of the model's full lineage.
This hash uniquely identifies the combination of code, data,
and hyperparameters that produced this model. Two models with
the same lineage hash were trained from identical inputs
(though non-deterministic training may produce different weights).
"""
lineage = {
"code_version": self.code_version,
"data_version": self.data_version,
"params": self.params,
}
lineage_str = json.dumps(lineage, sort_keys=True)
return hashlib.sha256(lineage_str.encode()).hexdigest()[:16]
@dataclass
class ModelRegistry:
"""In-memory model registry for CI/CD pipeline demonstration.
In production, this wraps MLflow's Model Registry API. The in-memory
implementation here makes the stage transition logic explicit.
Attributes:
models: Mapping from (model_name, version) to ModelArtifact.
"""
models: Dict[tuple, ModelArtifact] = field(default_factory=dict)
def register(self, artifact: ModelArtifact) -> ModelArtifact:
"""Register a new model version.
Args:
artifact: The model artifact to register.
Returns:
The registered artifact with assigned version.
"""
# Assign next version number
existing_versions = [
v for (name, v) in self.models
if name == artifact.model_name
]
artifact.version = max(existing_versions, default=0) + 1
self.models[(artifact.model_name, artifact.version)] = artifact
return artifact
def transition_stage(
self,
model_name: str,
version: int,
target_stage: ModelStage,
reason: str = "",
) -> ModelArtifact:
"""Transition a model to a new lifecycle stage.
Enforces valid transitions: None→Staging→Shadow→Canary→Production.
Any stage can transition to Archived (rollback or retirement).
Args:
model_name: Registered model name.
version: Model version number.
target_stage: Target lifecycle stage.
reason: Human-readable reason for the transition.
Returns:
Updated model artifact.
Raises:
ValueError: If the transition is not valid.
KeyError: If the model version does not exist.
"""
key = (model_name, version)
if key not in self.models:
raise KeyError(
f"Model {model_name} version {version} not found."
)
artifact = self.models[key]
valid_targets = VALID_TRANSITIONS.get(artifact.stage, [])
if target_stage not in valid_targets:
raise ValueError(
f"Invalid transition: {artifact.stage.value} → "
f"{target_stage.value}. Valid targets: "
f"{[s.value for s in valid_targets]}"
)
# If promoting to Production, archive the current Production model
if target_stage == ModelStage.PRODUCTION:
for k, m in self.models.items():
if (
m.model_name == model_name
and m.stage == ModelStage.PRODUCTION
):
m.stage = ModelStage.ARCHIVED
m.tags["archived_reason"] = (
f"Replaced by version {version}"
)
artifact.stage = target_stage
artifact.tags["stage_transition_reason"] = reason
artifact.tags["stage_transition_time"] = (
datetime.utcnow().isoformat()
)
return artifact
def get_production_model(
self, model_name: str
) -> Optional[ModelArtifact]:
"""Retrieve the current production model.
Args:
model_name: Registered model name.
Returns:
The production model artifact, or None if no model is in
production.
"""
for (name, _), artifact in self.models.items():
if name == model_name and artifact.stage == ModelStage.PRODUCTION:
return artifact
return None
def get_model_history(
self, model_name: str
) -> List[ModelArtifact]:
"""Retrieve all versions of a model, ordered by version.
Args:
model_name: Registered model name.
Returns:
List of model artifacts sorted by version (ascending).
"""
artifacts = [
artifact
for (name, _), artifact in self.models.items()
if name == model_name
]
return sorted(artifacts, key=lambda a: a.version)
The stage transition enforcement is critical. A model cannot jump from None to Production — it must pass through Staging, Shadow, and Canary first. This encoding of the deployment process into the registry ensures that no model reaches production without completing every evaluation stage, even if an engineer attempts to bypass the pipeline.
29.4 Artifact Lineage and Reproducibility
A model in production must be fully reproducible: given the model's version number, an engineer must be able to reconstruct the exact training environment — the code, the data, the hyperparameters, and the random seed — that produced it. This is not merely a best practice; in regulated industries (credit scoring, healthcare, insurance), it is a legal requirement.
Artifact lineage connects three version identifiers:
Model v17 (MLflow)
├── Code: git commit abc123def (GitHub)
├── Data: delta_version=47, partition_date=2025-03-14 (Delta Lake)
├── Features: feast_feature_set v3.2 (Feast)
├── Parameters: lr=0.001, batch_size=256, epochs=15
├── Environment: Docker image streamrec-train:abc123def
└── Evaluation: Recall@20=0.214, NDCG@20=0.186, AUC=0.847
When a production incident occurs, lineage enables rapid root cause analysis: was the regression caused by a code change (compare the git diffs), a data change (compare the data versions), or a training artifact (compare the hyperparameters and random seeds)?
"""
lineage.py — Artifact lineage tracking for ML deployments.
"""
from dataclasses import dataclass, field
from typing import Dict, Optional, List
from datetime import datetime
import json
@dataclass
class ArtifactLineage:
"""Complete lineage record for a deployed model.
Captures every input that contributed to a model artifact,
enabling full reproducibility and root cause analysis.
Attributes:
model_name: Registered model name.
model_version: Model version in the registry.
code_commit: Git commit SHA.
code_branch: Git branch name.
code_repo: Git repository URL.
data_source: Data lake or warehouse identifier.
data_version: Data version or partition identifier.
data_hash: Hash of the training dataset.
data_row_count: Number of training examples.
data_date_range: Date range of training data.
feature_set_version: Version of the feature engineering pipeline.
feature_names: List of feature names used.
hyperparameters: Training hyperparameters.
random_seed: Random seed for reproducibility.
docker_image: Docker image used for training.
gpu_type: GPU type used for training.
training_duration_seconds: Wall-clock training time.
evaluation_metrics: Offline evaluation metrics.
validation_results: Results from validation gate (Chapter 28).
created_at: Timestamp of lineage record creation.
"""
model_name: str
model_version: int
code_commit: str
code_branch: str = "main"
code_repo: str = ""
data_source: str = ""
data_version: str = ""
data_hash: str = ""
data_row_count: int = 0
data_date_range: str = ""
feature_set_version: str = ""
feature_names: List[str] = field(default_factory=list)
hyperparameters: Dict[str, float] = field(default_factory=dict)
random_seed: int = 42
docker_image: str = ""
gpu_type: str = ""
training_duration_seconds: float = 0.0
evaluation_metrics: Dict[str, float] = field(default_factory=dict)
validation_results: Dict[str, bool] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.utcnow)
def to_json(self) -> str:
"""Serialize lineage to JSON for storage."""
data = {
"model_name": self.model_name,
"model_version": self.model_version,
"code": {
"commit": self.code_commit,
"branch": self.code_branch,
"repo": self.code_repo,
},
"data": {
"source": self.data_source,
"version": self.data_version,
"hash": self.data_hash,
"row_count": self.data_row_count,
"date_range": self.data_date_range,
},
"features": {
"set_version": self.feature_set_version,
"names": self.feature_names,
},
"training": {
"hyperparameters": self.hyperparameters,
"random_seed": self.random_seed,
"docker_image": self.docker_image,
"gpu_type": self.gpu_type,
"duration_seconds": self.training_duration_seconds,
},
"evaluation": self.evaluation_metrics,
"validation": self.validation_results,
"created_at": self.created_at.isoformat(),
}
return json.dumps(data, indent=2)
def diff(self, other: "ArtifactLineage") -> Dict[str, Dict[str, str]]:
"""Compare two lineage records to identify differences.
Useful for root cause analysis: when a model regresses,
comparing the lineage of the current and previous versions
identifies what changed.
Args:
other: Another lineage record to compare against.
Returns:
Dictionary of changed fields with old and new values.
"""
changes: Dict[str, Dict[str, str]] = {}
if self.code_commit != other.code_commit:
changes["code_commit"] = {
"old": other.code_commit,
"new": self.code_commit,
}
if self.data_version != other.data_version:
changes["data_version"] = {
"old": other.data_version,
"new": self.data_version,
}
if self.data_row_count != other.data_row_count:
changes["data_row_count"] = {
"old": str(other.data_row_count),
"new": str(self.data_row_count),
}
if self.hyperparameters != other.hyperparameters:
changes["hyperparameters"] = {
"old": json.dumps(other.hyperparameters),
"new": json.dumps(self.hyperparameters),
}
if self.feature_set_version != other.feature_set_version:
changes["feature_set_version"] = {
"old": other.feature_set_version,
"new": self.feature_set_version,
}
return changes
29.5 Shadow Mode: Evaluation Without Risk
Shadow mode is the first deployment stage after the validation gate. In shadow mode, the challenger model receives a copy of every production request, computes its prediction, and logs the result — but the prediction is never served to users. The champion model continues to serve all traffic. Shadow mode evaluates the challenger on real production data without any risk to user experience.
Why Shadow Mode?
Offline evaluation (holdout sets, cross-validation) tests a model on historical data. Shadow mode tests a model on current data — the actual requests arriving in production right now. The gap between historical and current data can be substantial:
- Distribution shift: User behavior changes over time. A model evaluated on last week's data may perform differently on today's traffic.
- Serving-training skew: The features computed during training may differ subtly from the features computed during serving, due to different code paths, latency constraints, or data freshness. Shadow mode tests the actual serving path, not the training path.
- Edge cases: Production traffic includes edge cases that the holdout set may not: new users with no history, items just added to the catalog, users on unusual devices, bots.
- Latency: Offline evaluation does not measure serving latency. Shadow mode measures how long the model takes to produce predictions under production load.
Architecture
Production Traffic
│
├──────────────────────► Champion Model ──► User Response
│ │
└──── (async copy) ──► Shadow Model ──► Metrics Logger (no user impact)
│
▼
Shadow Dashboard
(latency, predictions,
distribution comparison)
The shadow infrastructure must satisfy three requirements:
- Zero user impact: Shadow predictions are never returned to users, even if the shadow model is faster or "better."
- Identical inputs: The shadow model receives the exact same features as the champion model for every request. Any difference in inputs would confound the comparison.
- Bounded resource cost: Shadow mode doubles the inference compute. The shadow model should run on a separate resource pool so that it cannot degrade the champion's latency.
Implementation
"""
shadow_mode.py — Shadow deployment for model evaluation on production traffic.
The ShadowRouter sends every production request to both the champion
and challenger models. The champion's response is returned to the user;
the challenger's response is logged for offline comparison.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timedelta
import statistics
import math
@dataclass
class ShadowPrediction:
"""A single prediction from both champion and challenger.
Attributes:
request_id: Unique request identifier.
timestamp: When the request was received.
features: Input features (shared by both models).
champion_scores: Champion model's predicted scores.
challenger_scores: Challenger model's predicted scores.
champion_latency_ms: Champion inference latency.
challenger_latency_ms: Challenger inference latency.
"""
request_id: str
timestamp: datetime
features: Dict[str, Any]
champion_scores: List[float]
challenger_scores: List[float]
champion_latency_ms: float
challenger_latency_ms: float
@dataclass
class ShadowEvaluator:
"""Evaluates shadow mode results to decide whether to promote.
Collects shadow predictions over a configurable window and
computes comparison metrics: rank correlation, latency comparison,
score distribution divergence, and (when outcomes arrive) quality
metrics.
Attributes:
model_name: Name of the model being evaluated.
champion_version: Current champion model version.
challenger_version: Challenger model version in shadow.
evaluation_window_hours: Duration of the shadow evaluation.
min_requests: Minimum predictions required for evaluation.
max_latency_p99_ms: Maximum acceptable p99 latency for challenger.
max_latency_ratio: Maximum ratio of challenger/champion p99 latency.
predictions: Collected shadow predictions.
"""
model_name: str
champion_version: int
challenger_version: int
evaluation_window_hours: int = 168 # 1 week
min_requests: int = 100_000
max_latency_p99_ms: float = 50.0
max_latency_ratio: float = 1.5
predictions: List[ShadowPrediction] = field(default_factory=list)
def add_prediction(self, prediction: ShadowPrediction) -> None:
"""Record a shadow prediction."""
self.predictions.append(prediction)
def compute_latency_comparison(self) -> Dict[str, float]:
"""Compare latency distributions of champion and challenger.
Returns:
Dictionary with p50, p95, p99 latencies for both models
and the p99 ratio.
"""
if not self.predictions:
return {}
champ_latencies = sorted(
p.champion_latency_ms for p in self.predictions
)
chall_latencies = sorted(
p.challenger_latency_ms for p in self.predictions
)
def percentile(data: List[float], pct: float) -> float:
idx = int(len(data) * pct / 100)
return data[min(idx, len(data) - 1)]
champ_p99 = percentile(champ_latencies, 99)
chall_p99 = percentile(chall_latencies, 99)
return {
"champion_p50_ms": percentile(champ_latencies, 50),
"champion_p95_ms": percentile(champ_latencies, 95),
"champion_p99_ms": champ_p99,
"challenger_p50_ms": percentile(chall_latencies, 50),
"challenger_p95_ms": percentile(chall_latencies, 95),
"challenger_p99_ms": chall_p99,
"p99_ratio": chall_p99 / champ_p99 if champ_p99 > 0 else 0.0,
}
def compute_rank_correlation(self) -> float:
"""Compute Spearman rank correlation between champion and challenger.
High correlation (>0.95) indicates the challenger ranks items
similarly to the champion. Low correlation indicates substantially
different behavior — which may be desirable (better model) or
concerning (broken model).
Returns:
Spearman rank correlation coefficient.
"""
if len(self.predictions) < 2:
return 0.0
# Compare the top-K ranking for each request
rank_diffs_squared: List[float] = []
for pred in self.predictions:
n = min(len(pred.champion_scores), len(pred.challenger_scores))
if n < 2:
continue
# Rank the scores (higher score = rank 1)
champ_ranks = _compute_ranks(pred.champion_scores[:n])
chall_ranks = _compute_ranks(pred.challenger_scores[:n])
for i in range(n):
diff = champ_ranks[i] - chall_ranks[i]
rank_diffs_squared.append(diff * diff)
if not rank_diffs_squared:
return 0.0
n_pairs = len(rank_diffs_squared)
# Approximate items-per-request for Spearman formula
items_per_req = n_pairs / max(len(self.predictions), 1)
d_sq_sum = sum(rank_diffs_squared)
# Spearman: 1 - 6 * sum(d^2) / (n * (n^2 - 1))
denominator = items_per_req * (items_per_req ** 2 - 1)
if denominator == 0:
return 0.0
rho = 1.0 - (6.0 * d_sq_sum) / (len(self.predictions) * denominator)
return max(-1.0, min(1.0, rho))
def compute_score_divergence(self) -> Dict[str, float]:
"""Compute distributional divergence between champion and challenger.
Returns the mean absolute difference and the Jensen-Shannon
divergence of score distributions.
Returns:
Dictionary with divergence metrics.
"""
if not self.predictions:
return {}
abs_diffs: List[float] = []
for pred in self.predictions:
n = min(len(pred.champion_scores), len(pred.challenger_scores))
for i in range(n):
abs_diffs.append(
abs(pred.champion_scores[i] - pred.challenger_scores[i])
)
return {
"mean_absolute_score_diff": statistics.mean(abs_diffs),
"median_absolute_score_diff": statistics.median(abs_diffs),
"max_absolute_score_diff": max(abs_diffs),
"std_absolute_score_diff": (
statistics.stdev(abs_diffs) if len(abs_diffs) > 1 else 0.0
),
}
def evaluate(self) -> Dict[str, Any]:
"""Run the full shadow evaluation and return a promotion decision.
Returns:
Dictionary with evaluation results and a promote/reject/extend
decision.
"""
n_predictions = len(self.predictions)
if n_predictions < self.min_requests:
return {
"decision": "extend",
"reason": (
f"Insufficient predictions: {n_predictions} < "
f"{self.min_requests}"
),
"n_predictions": n_predictions,
}
latency = self.compute_latency_comparison()
rank_corr = self.compute_rank_correlation()
divergence = self.compute_score_divergence()
# Decision logic
issues: List[str] = []
# Check latency
if latency.get("challenger_p99_ms", 0) > self.max_latency_p99_ms:
issues.append(
f"Challenger p99 latency {latency['challenger_p99_ms']:.1f}ms "
f"> {self.max_latency_p99_ms}ms threshold"
)
if latency.get("p99_ratio", 0) > self.max_latency_ratio:
issues.append(
f"Latency ratio {latency['p99_ratio']:.2f} > "
f"{self.max_latency_ratio} threshold"
)
# Check for anomalous divergence (very different predictions)
mean_diff = divergence.get("mean_absolute_score_diff", 0)
if mean_diff > 0.3:
issues.append(
f"Mean score divergence {mean_diff:.3f} > 0.3 threshold — "
f"challenger produces substantially different predictions"
)
decision = "reject" if issues else "promote"
return {
"decision": decision,
"issues": issues,
"n_predictions": n_predictions,
"latency": latency,
"rank_correlation": rank_corr,
"score_divergence": divergence,
"champion_version": self.champion_version,
"challenger_version": self.challenger_version,
}
def _compute_ranks(scores: List[float]) -> List[float]:
"""Compute ranks for a list of scores (highest score = rank 1).
Args:
scores: List of numeric scores.
Returns:
List of ranks (1-indexed, with average ranks for ties).
"""
indexed = sorted(
enumerate(scores), key=lambda x: -x[1]
)
ranks = [0.0] * len(scores)
for rank_idx, (orig_idx, _) in enumerate(indexed):
ranks[orig_idx] = float(rank_idx + 1)
return ranks
Shadow Mode Duration
How long should shadow mode run? The answer depends on three factors:
-
Traffic volume. Higher traffic produces statistically significant comparisons faster. StreamRec, serving 20 million requests per day, accumulates 100,000 shadow predictions in minutes. A lower-traffic service might need days.
-
Outcome delay. If the evaluation metric depends on user outcomes (e.g., did the user click?), shadow mode must run long enough for outcomes to arrive. For StreamRec, click outcomes arrive within seconds; completion outcomes may take hours; subscription-level outcomes may take weeks.
-
Temporal coverage. A shadow evaluation should cover at least one full business cycle. For StreamRec, traffic patterns differ between weekdays and weekends, so a minimum 7-day shadow period ensures the challenger is evaluated across both patterns.
For StreamRec, the standard shadow evaluation window is 7 days with a minimum of 100,000 predictions. The evaluation focuses on latency, rank correlation, and score distribution divergence. Quality metrics (CTR comparison) are computed after outcomes arrive but do not gate the shadow → canary transition; they are tracked as metadata.
29.6 Canary Deployments: Gradual Traffic Exposure
A canary deployment routes a small percentage of production traffic to the new model while the majority continues to be served by the champion. Unlike shadow mode, canary predictions are served to real users — the canary model's output determines what users see. This makes canary evaluation more realistic (it measures actual user behavior, not predicted behavior) but also more risky (a bad canary model affects real users).
The Canary Pattern
┌──── 90% ────► Champion Model ──► User Response
Production Traffic ──────┤
└──── 10% ────► Canary Model ──► User Response
│
▼
Canary Dashboard
(CTR, latency, errors,
engagement comparison)
The traffic split is controlled by a feature flag or a load balancer configuration. The critical requirement is that the split is consistent per user: a user should see recommendations from the same model throughout the canary period. If users bounce between champion and canary, the comparison is confounded.
Statistical Evaluation of Canary Results
A canary deployment is a controlled experiment: two groups (champion and canary), one treatment (the new model), and one or more outcome metrics. The evaluation must account for statistical significance — a small difference in CTR between champion and canary may be noise.
"""
canary.py — Canary deployment with statistical evaluation.
Implements the canary deployment pattern: route a configurable
percentage of traffic to the challenger model, collect outcome
metrics, and evaluate using sequential hypothesis testing.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from enum import Enum
import math
import statistics
class CanaryDecision(Enum):
"""Possible outcomes of canary evaluation."""
PROMOTE = "promote"
ROLLBACK = "rollback"
CONTINUE = "continue"
@dataclass
class CanaryMetrics:
"""Outcome metrics for a canary deployment.
Attributes:
champion_impressions: Number of requests served by champion.
canary_impressions: Number of requests served by canary.
champion_clicks: Number of clicks on champion recommendations.
canary_clicks: Number of clicks on canary recommendations.
champion_completions: Number of content completions (champion).
canary_completions: Number of content completions (canary).
champion_latency_p99_ms: Champion p99 serving latency.
canary_latency_p99_ms: Canary p99 serving latency.
champion_error_rate: Fraction of champion requests with errors.
canary_error_rate: Fraction of canary requests with errors.
"""
champion_impressions: int = 0
canary_impressions: int = 0
champion_clicks: int = 0
canary_clicks: int = 0
champion_completions: int = 0
canary_completions: int = 0
champion_latency_p99_ms: float = 0.0
canary_latency_p99_ms: float = 0.0
champion_error_rate: float = 0.0
canary_error_rate: float = 0.0
@property
def champion_ctr(self) -> float:
"""Click-through rate for champion model."""
if self.champion_impressions == 0:
return 0.0
return self.champion_clicks / self.champion_impressions
@property
def canary_ctr(self) -> float:
"""Click-through rate for canary model."""
if self.canary_impressions == 0:
return 0.0
return self.canary_clicks / self.canary_impressions
@property
def ctr_lift(self) -> float:
"""Relative CTR lift of canary over champion."""
if self.champion_ctr == 0:
return 0.0
return (self.canary_ctr - self.champion_ctr) / self.champion_ctr
@property
def champion_completion_rate(self) -> float:
"""Content completion rate for champion model."""
if self.champion_impressions == 0:
return 0.0
return self.champion_completions / self.champion_impressions
@property
def canary_completion_rate(self) -> float:
"""Content completion rate for canary model."""
if self.canary_impressions == 0:
return 0.0
return self.canary_completions / self.canary_impressions
@dataclass
class CanaryConfig:
"""Configuration for a canary deployment.
Attributes:
model_name: Name of the model.
champion_version: Current champion version.
challenger_version: Challenger version in canary.
canary_percentage: Fraction of traffic to route to canary (0-1).
min_canary_impressions: Minimum impressions before evaluation.
max_canary_duration_hours: Maximum canary duration before forced decision.
min_ctr_lift: Minimum CTR lift to promote (-0.01 means accept
up to 1% regression).
max_latency_increase_ms: Maximum acceptable p99 latency increase.
max_error_rate: Maximum acceptable error rate for canary.
significance_level: Alpha for hypothesis testing.
"""
model_name: str
champion_version: int
challenger_version: int
canary_percentage: float = 0.10
min_canary_impressions: int = 500_000
max_canary_duration_hours: int = 72
min_ctr_lift: float = -0.01
max_latency_increase_ms: float = 10.0
max_error_rate: float = 0.005
significance_level: float = 0.05
def evaluate_canary(
config: CanaryConfig,
metrics: CanaryMetrics,
) -> Dict[str, object]:
"""Evaluate canary deployment and recommend a decision.
Uses a two-proportion z-test for CTR comparison and checks
guardrail metrics (latency, error rate).
Args:
config: Canary deployment configuration.
metrics: Collected canary metrics.
Returns:
Dictionary with decision, statistical results, and reasoning.
"""
# Check minimum sample size
if metrics.canary_impressions < config.min_canary_impressions:
return {
"decision": CanaryDecision.CONTINUE,
"reason": (
f"Insufficient canary impressions: "
f"{metrics.canary_impressions:,} < "
f"{config.min_canary_impressions:,}"
),
"metrics_summary": _metrics_summary(metrics),
}
# Guardrail checks (immediate rollback if violated)
guardrail_issues: List[str] = []
if metrics.canary_error_rate > config.max_error_rate:
guardrail_issues.append(
f"Error rate {metrics.canary_error_rate:.4f} > "
f"{config.max_error_rate} threshold"
)
latency_increase = (
metrics.canary_latency_p99_ms - metrics.champion_latency_p99_ms
)
if latency_increase > config.max_latency_increase_ms:
guardrail_issues.append(
f"Latency increase {latency_increase:.1f}ms > "
f"{config.max_latency_increase_ms}ms threshold"
)
if guardrail_issues:
return {
"decision": CanaryDecision.ROLLBACK,
"reason": "Guardrail violation(s): " + "; ".join(guardrail_issues),
"guardrail_issues": guardrail_issues,
"metrics_summary": _metrics_summary(metrics),
}
# Statistical test: two-proportion z-test for CTR
z_stat, p_value = _two_proportion_z_test(
successes_a=metrics.champion_clicks,
trials_a=metrics.champion_impressions,
successes_b=metrics.canary_clicks,
trials_b=metrics.canary_impressions,
)
is_significant = p_value < config.significance_level
ctr_lift = metrics.ctr_lift
# Decision logic
if is_significant and ctr_lift < config.min_ctr_lift:
decision = CanaryDecision.ROLLBACK
reason = (
f"Statistically significant CTR regression: "
f"{ctr_lift:+.3%} (p={p_value:.4f})"
)
elif is_significant and ctr_lift >= 0:
decision = CanaryDecision.PROMOTE
reason = (
f"Statistically significant CTR improvement: "
f"{ctr_lift:+.3%} (p={p_value:.4f})"
)
elif not is_significant:
# Not significant — promote if within tolerance (non-inferiority)
if ctr_lift >= config.min_ctr_lift:
decision = CanaryDecision.PROMOTE
reason = (
f"CTR difference not significant (p={p_value:.4f}), "
f"but within tolerance ({ctr_lift:+.3%} >= "
f"{config.min_ctr_lift:+.3%}). Promoting on non-inferiority."
)
else:
decision = CanaryDecision.CONTINUE
reason = (
f"CTR difference not significant (p={p_value:.4f}) "
f"and below tolerance. Continuing canary."
)
else:
decision = CanaryDecision.CONTINUE
reason = "Inconclusive. Continuing canary."
return {
"decision": decision,
"reason": reason,
"z_statistic": z_stat,
"p_value": p_value,
"is_significant": is_significant,
"ctr_lift": ctr_lift,
"metrics_summary": _metrics_summary(metrics),
}
def _two_proportion_z_test(
successes_a: int,
trials_a: int,
successes_b: int,
trials_b: int,
) -> Tuple[float, float]:
"""Two-proportion z-test for comparing conversion rates.
Tests H0: p_a = p_b against H1: p_a != p_b (two-sided).
Args:
successes_a: Number of successes in group A.
trials_a: Number of trials in group A.
successes_b: Number of successes in group B.
trials_b: Number of trials in group B.
Returns:
Tuple of (z-statistic, p-value).
"""
if trials_a == 0 or trials_b == 0:
return 0.0, 1.0
p_a = successes_a / trials_a
p_b = successes_b / trials_b
# Pooled proportion under H0
p_pool = (successes_a + successes_b) / (trials_a + trials_b)
if p_pool == 0 or p_pool == 1:
return 0.0, 1.0
se = math.sqrt(p_pool * (1 - p_pool) * (1 / trials_a + 1 / trials_b))
if se == 0:
return 0.0, 1.0
z = (p_b - p_a) / se
# Two-sided p-value using normal CDF approximation
p_value = 2.0 * _normal_cdf(-abs(z))
return z, p_value
def _normal_cdf(x: float) -> float:
"""Approximate the standard normal CDF using the error function.
Args:
x: Value at which to evaluate the CDF.
Returns:
Approximate P(Z <= x) for Z ~ N(0,1).
"""
return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))
def _metrics_summary(metrics: CanaryMetrics) -> Dict[str, float]:
"""Summarize canary metrics for reporting."""
return {
"champion_ctr": metrics.champion_ctr,
"canary_ctr": metrics.canary_ctr,
"ctr_lift": metrics.ctr_lift,
"champion_completion_rate": metrics.champion_completion_rate,
"canary_completion_rate": metrics.canary_completion_rate,
"champion_p99_ms": metrics.champion_latency_p99_ms,
"canary_p99_ms": metrics.canary_latency_p99_ms,
"champion_error_rate": metrics.champion_error_rate,
"canary_error_rate": metrics.canary_error_rate,
"champion_impressions": metrics.champion_impressions,
"canary_impressions": metrics.canary_impressions,
}
Canary Duration and Traffic Percentage
The canary configuration involves a tradeoff between risk and speed:
| Parameter | Conservative | Moderate | Aggressive |
|---|---|---|---|
| Canary % | 1-5% | 5-10% | 10-25% |
| Duration | 7 days | 3 days | 1 day |
| Min impressions | 1,000,000 | 500,000 | 100,000 |
| Use case | Financial, healthcare | General | Low-risk, high-traffic |
StreamRec uses the moderate configuration: 10% canary traffic for 3 days, with a minimum of 500,000 canary impressions. At 20 million daily requests, 10% canary produces 2 million impressions per day — well above the 500,000 threshold within the first day. The 3-day duration ensures coverage of weekday and weekend patterns.
29.7 Progressive Rollout: From Canary to Full Traffic
If the canary evaluation succeeds, the deployment pipeline does not immediately switch all traffic to the new model. Instead, it executes a progressive rollout — a staged increase in traffic percentage with monitoring at each stage.
Rollout Schedule
A typical progressive rollout for StreamRec:
| Stage | Traffic % | Duration | Monitoring Focus | Rollback Trigger |
|---|---|---|---|---|
| Shadow | 0% (parallel) | 7 days | Latency, rank correlation, prediction distribution | p99 > 50ms, rank corr < 0.7 |
| Canary | 10% | 3 days | CTR, completion rate, error rate, latency | CTR < -2%, errors > 0.5%, p99 > +10ms |
| Stage 1 | 25% | 1 day | Same as canary | Same as canary |
| Stage 2 | 50% | 1 day | Same + engagement duration | Engagement < -3% |
| Full | 100% | -- | Full monitoring (Chapter 30) | Any monitoring alert |
Each stage is a checkpoint: the deployment pipeline evaluates metrics against thresholds before advancing to the next stage. If any threshold is violated, the pipeline automatically rolls back to the previous stage — not all the way to the champion, but one stage back. If the violation persists after rollback, the pipeline rolls back further.
"""
progressive_rollout.py — Progressive deployment from canary to full traffic.
Implements the staged rollout pattern: shadow → canary → 25% → 50% → 100%,
with monitoring and automatic rollback at each stage.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from enum import Enum
class RolloutStage(Enum):
"""Stages in a progressive rollout."""
SHADOW = "shadow"
CANARY = "canary"
STAGE_25 = "stage_25"
STAGE_50 = "stage_50"
FULL = "full"
ROLLED_BACK = "rolled_back"
STAGE_ORDER = [
RolloutStage.SHADOW,
RolloutStage.CANARY,
RolloutStage.STAGE_25,
RolloutStage.STAGE_50,
RolloutStage.FULL,
]
STAGE_TRAFFIC = {
RolloutStage.SHADOW: 0.0,
RolloutStage.CANARY: 0.10,
RolloutStage.STAGE_25: 0.25,
RolloutStage.STAGE_50: 0.50,
RolloutStage.FULL: 1.00,
}
@dataclass
class RolloutThreshold:
"""Threshold for a single metric at a rollout stage.
Attributes:
metric_name: Name of the metric to monitor.
min_value: Minimum acceptable value (None = no lower bound).
max_value: Maximum acceptable value (None = no upper bound).
comparison_type: 'absolute' for raw values, 'relative' for
comparison against champion baseline.
"""
metric_name: str
min_value: Optional[float] = None
max_value: Optional[float] = None
comparison_type: str = "absolute"
@dataclass
class RolloutStageConfig:
"""Configuration for a single rollout stage.
Attributes:
stage: The rollout stage.
traffic_percentage: Fraction of traffic for this stage.
min_duration_hours: Minimum time to spend at this stage.
thresholds: Metrics thresholds that must be satisfied to advance.
"""
stage: RolloutStage
traffic_percentage: float
min_duration_hours: int
thresholds: List[RolloutThreshold] = field(default_factory=list)
@dataclass
class RolloutState:
"""Current state of a progressive rollout.
Attributes:
model_name: Name of the model being deployed.
challenger_version: Version being rolled out.
champion_version: Current production version.
current_stage: Current rollout stage.
stage_started_at: When the current stage began.
stage_history: History of stage transitions.
rollback_count: Number of rollbacks during this deployment.
"""
model_name: str
challenger_version: int
champion_version: int
current_stage: RolloutStage = RolloutStage.SHADOW
stage_started_at: datetime = field(default_factory=datetime.utcnow)
stage_history: List[Dict[str, str]] = field(default_factory=list)
rollback_count: int = 0
def record_transition(
self,
from_stage: RolloutStage,
to_stage: RolloutStage,
reason: str,
) -> None:
"""Record a stage transition in the history."""
self.stage_history.append({
"from": from_stage.value,
"to": to_stage.value,
"reason": reason,
"timestamp": datetime.utcnow().isoformat(),
})
@dataclass
class ProgressiveRolloutController:
"""Controller for progressive model rollout.
Manages the state machine that advances (or rolls back) a model
deployment through shadow → canary → 25% → 50% → 100%.
Attributes:
rollout_state: Current deployment state.
stage_configs: Configuration for each stage.
"""
rollout_state: RolloutState
stage_configs: Dict[RolloutStage, RolloutStageConfig] = field(
default_factory=dict
)
def __post_init__(self) -> None:
if not self.stage_configs:
self.stage_configs = self._default_configs()
def _default_configs(self) -> Dict[RolloutStage, RolloutStageConfig]:
"""Default StreamRec rollout configuration."""
return {
RolloutStage.SHADOW: RolloutStageConfig(
stage=RolloutStage.SHADOW,
traffic_percentage=0.0,
min_duration_hours=168, # 7 days
thresholds=[
RolloutThreshold("challenger_p99_ms", max_value=50.0),
RolloutThreshold("rank_correlation", min_value=0.70),
RolloutThreshold(
"mean_score_divergence", max_value=0.30
),
],
),
RolloutStage.CANARY: RolloutStageConfig(
stage=RolloutStage.CANARY,
traffic_percentage=0.10,
min_duration_hours=72, # 3 days
thresholds=[
RolloutThreshold(
"ctr_lift", min_value=-0.02,
comparison_type="relative",
),
RolloutThreshold("canary_error_rate", max_value=0.005),
RolloutThreshold(
"latency_increase_ms", max_value=10.0,
comparison_type="relative",
),
],
),
RolloutStage.STAGE_25: RolloutStageConfig(
stage=RolloutStage.STAGE_25,
traffic_percentage=0.25,
min_duration_hours=24,
thresholds=[
RolloutThreshold(
"ctr_lift", min_value=-0.02,
comparison_type="relative",
),
RolloutThreshold("error_rate", max_value=0.005),
RolloutThreshold(
"latency_increase_ms", max_value=10.0,
comparison_type="relative",
),
],
),
RolloutStage.STAGE_50: RolloutStageConfig(
stage=RolloutStage.STAGE_50,
traffic_percentage=0.50,
min_duration_hours=24,
thresholds=[
RolloutThreshold(
"ctr_lift", min_value=-0.02,
comparison_type="relative",
),
RolloutThreshold(
"engagement_duration_lift", min_value=-0.03,
comparison_type="relative",
),
RolloutThreshold("error_rate", max_value=0.005),
],
),
}
def check_advancement(
self,
current_metrics: Dict[str, float],
) -> Dict[str, object]:
"""Check whether the rollout should advance to the next stage.
Evaluates all thresholds for the current stage. If all pass
and the minimum duration has elapsed, recommends advancement.
If any threshold is violated, recommends rollback.
Args:
current_metrics: Dictionary of current metric values.
Returns:
Dictionary with recommendation and details.
"""
state = self.rollout_state
stage = state.current_stage
if stage == RolloutStage.FULL:
return {"action": "complete", "reason": "Already at full traffic"}
if stage == RolloutStage.ROLLED_BACK:
return {
"action": "stopped",
"reason": "Rollout has been rolled back",
}
config = self.stage_configs.get(stage)
if config is None:
return {"action": "advance", "reason": "No config for stage"}
# Check minimum duration
elapsed = datetime.utcnow() - state.stage_started_at
min_duration = timedelta(hours=config.min_duration_hours)
if elapsed < min_duration:
remaining = min_duration - elapsed
return {
"action": "wait",
"reason": (
f"Minimum duration not met: {elapsed} < {min_duration}. "
f"Remaining: {remaining}"
),
}
# Check thresholds
violations: List[str] = []
for threshold in config.thresholds:
value = current_metrics.get(threshold.metric_name)
if value is None:
violations.append(
f"Missing metric: {threshold.metric_name}"
)
continue
if threshold.min_value is not None and value < threshold.min_value:
violations.append(
f"{threshold.metric_name}={value:.4f} < "
f"min={threshold.min_value}"
)
if threshold.max_value is not None and value > threshold.max_value:
violations.append(
f"{threshold.metric_name}={value:.4f} > "
f"max={threshold.max_value}"
)
if violations:
return {
"action": "rollback",
"reason": "Threshold violations: " + "; ".join(violations),
"violations": violations,
}
# All thresholds pass, minimum duration met — advance
current_idx = STAGE_ORDER.index(stage)
next_stage = STAGE_ORDER[current_idx + 1]
return {
"action": "advance",
"next_stage": next_stage.value,
"next_traffic_percentage": STAGE_TRAFFIC[next_stage],
"reason": "All thresholds pass, minimum duration met",
}
def advance(self, reason: str = "Thresholds met") -> RolloutStage:
"""Advance to the next rollout stage.
Args:
reason: Reason for advancement.
Returns:
The new rollout stage.
"""
state = self.rollout_state
current_idx = STAGE_ORDER.index(state.current_stage)
if current_idx >= len(STAGE_ORDER) - 1:
return state.current_stage # Already at full
next_stage = STAGE_ORDER[current_idx + 1]
state.record_transition(state.current_stage, next_stage, reason)
state.current_stage = next_stage
state.stage_started_at = datetime.utcnow()
return next_stage
def rollback(self, reason: str = "Threshold violation") -> RolloutStage:
"""Roll back to the previous stage, or fully roll back.
If this is the first rollback, go back one stage.
If this is the second rollback, fully roll back to champion.
Args:
reason: Reason for rollback.
Returns:
The new rollout stage.
"""
state = self.rollout_state
state.rollback_count += 1
if state.rollback_count >= 2:
# Two rollbacks — abort the deployment
state.record_transition(
state.current_stage,
RolloutStage.ROLLED_BACK,
f"Full rollback (attempt {state.rollback_count}): {reason}",
)
state.current_stage = RolloutStage.ROLLED_BACK
return RolloutStage.ROLLED_BACK
# First rollback — go back one stage
current_idx = STAGE_ORDER.index(state.current_stage)
if current_idx == 0:
state.record_transition(
state.current_stage,
RolloutStage.ROLLED_BACK,
f"Rollback from first stage: {reason}",
)
state.current_stage = RolloutStage.ROLLED_BACK
return RolloutStage.ROLLED_BACK
prev_stage = STAGE_ORDER[current_idx - 1]
state.record_transition(
state.current_stage, prev_stage, f"Rollback: {reason}"
)
state.current_stage = prev_stage
state.stage_started_at = datetime.utcnow()
return prev_stage
Blue-Green Deployment
Before discussing retraining, it is worth noting an alternative to canary: blue-green deployment. In blue-green, two identical environments ("blue" and "green") exist in parallel. The current production model runs in blue. The new model is deployed to green. All traffic is switched from blue to green instantaneously via a load balancer or DNS change.
| Aspect | Blue-Green | Canary |
|---|---|---|
| Traffic transition | Instantaneous (0% → 100%) | Gradual (0% → 10% → 25% → 50% → 100%) |
| Rollback speed | Instantaneous (switch back to blue) | Fast (reduce canary %, route to champion) |
| Risk exposure | All users see the new model immediately | Only canary users see the new model |
| Infrastructure cost | 2x (two full environments) | 1x + canary fraction |
| Statistical evaluation | Post-deployment only | During deployment |
| Best for | Stateless services, low-risk changes | ML models, high-risk changes |
Blue-green is simpler but riskier: if the new model is bad, all users are affected before the rollback occurs. Canary deployment limits blast radius at the cost of a longer deployment timeline. For ML models, where quality is uncertain and feedback is delayed, canary deployment is almost always preferred. Blue-green may be appropriate for infrastructure changes (e.g., upgrading the serving framework) where the model itself does not change.
29.8 Continuous Training: Keeping Models Fresh
A model deployed to production today will degrade over time. The world changes: user preferences shift, new content is added, seasonal patterns rotate, competitor actions alter behavior. A recommendation model trained on January data does not know about February's trending content. A credit scoring model trained before a recession does not reflect the new economic reality. This temporal degradation is called concept drift (Chapter 30 covers detection in detail), and the remedy is continuous training — retraining the model on fresh data at regular intervals or in response to detected drift.
Three Retraining Triggers
Continuous training is triggered by one of three mechanisms, each with different tradeoffs:
1. Scheduled Retraining
The simplest approach: retrain every N hours/days/weeks on the most recent data window.
| Schedule | Use Case | Pros | Cons |
|---|---|---|---|
| Hourly | Fraud detection, real-time bidding | Captures rapid distribution shifts | High compute cost, deployment overhead |
| Daily | Recommendations, search ranking | Good freshness/cost balance | May miss intra-day shifts |
| Weekly | Credit scoring, churn prediction | Low cost, stable models | Slow to adapt, stale features |
| Monthly | Regulatory models, clinical scoring | Minimal cost | Significant staleness risk |
StreamRec uses weekly retraining: every Sunday at 02:00 UTC, the Dagster pipeline from Chapter 27 triggers. The training window is 30 days of interaction data with 7 days of holdout. The weekly cadence balances freshness (new content appears in recommendations within one week) against cost (one training run per week on 4x A100s costs approximately $150 in cloud compute).
"""
retraining_triggers.py — Retraining trigger logic for continuous training.
Implements three trigger types: scheduled (cron-based), drift-based
(PSI threshold), and performance-based (metric degradation).
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from enum import Enum
import math
class TriggerType(Enum):
"""Types of retraining triggers."""
SCHEDULED = "scheduled"
DRIFT = "drift"
PERFORMANCE = "performance"
MANUAL = "manual"
@dataclass
class TriggerEvent:
"""A single retraining trigger event.
Attributes:
trigger_type: What caused the trigger.
triggered_at: When the trigger fired.
reason: Human-readable description.
metadata: Additional context (metric values, drift scores, etc.).
priority: Priority level (1=urgent, 2=standard, 3=low).
"""
trigger_type: TriggerType
triggered_at: datetime
reason: str
metadata: Dict[str, float] = field(default_factory=dict)
priority: int = 2
@dataclass
class ScheduledTrigger:
"""Cron-based retraining trigger.
Attributes:
cron_expression: Cron schedule (e.g., '0 2 * * 0' for Sunday 2am).
last_triggered: When the trigger last fired.
model_name: Name of the model to retrain.
"""
cron_expression: str
last_triggered: Optional[datetime] = None
model_name: str = ""
def should_trigger(self, now: datetime) -> Optional[TriggerEvent]:
"""Check if the scheduled trigger should fire.
This is a simplified check: in production, use a proper
cron parser (e.g., croniter).
Args:
now: Current timestamp.
Returns:
TriggerEvent if the trigger should fire, None otherwise.
"""
if self.last_triggered is None:
self.last_triggered = now
return TriggerEvent(
trigger_type=TriggerType.SCHEDULED,
triggered_at=now,
reason=f"Initial scheduled trigger: {self.cron_expression}",
priority=3,
)
# Simplified: trigger if more than 7 days since last trigger
# (for the weekly cron '0 2 * * 0')
if now - self.last_triggered >= timedelta(days=7):
self.last_triggered = now
return TriggerEvent(
trigger_type=TriggerType.SCHEDULED,
triggered_at=now,
reason=(
f"Weekly scheduled retraining: "
f"{self.cron_expression}"
),
priority=3,
)
return None
@dataclass
class DriftTrigger:
"""Data drift-based retraining trigger.
Monitors PSI (Population Stability Index) for key features.
Triggers retraining when PSI exceeds the threshold for any
monitored feature.
Attributes:
feature_thresholds: Mapping from feature name to PSI threshold.
psi_alert_threshold: Default PSI threshold for unspecified features.
model_name: Name of the model to retrain.
"""
feature_thresholds: Dict[str, float] = field(default_factory=dict)
psi_alert_threshold: float = 0.25
model_name: str = ""
def check_drift(
self,
current_psi_values: Dict[str, float],
) -> Optional[TriggerEvent]:
"""Check if any feature's PSI exceeds its threshold.
Args:
current_psi_values: Mapping from feature name to current PSI.
Returns:
TriggerEvent if drift is detected, None otherwise.
"""
violations: List[str] = []
max_psi = 0.0
for feature, psi in current_psi_values.items():
threshold = self.feature_thresholds.get(
feature, self.psi_alert_threshold
)
if psi > threshold:
violations.append(
f"{feature}: PSI={psi:.3f} > {threshold:.3f}"
)
max_psi = max(max_psi, psi)
if violations:
return TriggerEvent(
trigger_type=TriggerType.DRIFT,
triggered_at=datetime.utcnow(),
reason=(
f"Data drift detected in {len(violations)} feature(s): "
+ "; ".join(violations)
),
metadata=current_psi_values,
priority=1 if max_psi > 0.5 else 2,
)
return None
@dataclass
class PerformanceTrigger:
"""Performance degradation-based retraining trigger.
Monitors production metrics and triggers retraining when
performance drops below thresholds.
Attributes:
metric_thresholds: Mapping from metric name to
(absolute_floor, relative_decline_from_baseline).
baseline_metrics: Baseline metric values from the last deployment.
model_name: Name of the model to retrain.
"""
metric_thresholds: Dict[str, Tuple[float, float]] = field(
default_factory=dict
)
baseline_metrics: Dict[str, float] = field(default_factory=dict)
model_name: str = ""
def check_performance(
self,
current_metrics: Dict[str, float],
) -> Optional[TriggerEvent]:
"""Check if any production metric has degraded below threshold.
Args:
current_metrics: Current production metric values.
Returns:
TriggerEvent if degradation is detected, None otherwise.
"""
violations: List[str] = []
is_critical = False
for metric, (absolute_floor, max_decline) in (
self.metric_thresholds.items()
):
current = current_metrics.get(metric)
baseline = self.baseline_metrics.get(metric)
if current is None:
continue
# Check absolute floor
if current < absolute_floor:
violations.append(
f"{metric}={current:.4f} < floor={absolute_floor:.4f}"
)
is_critical = True
# Check relative decline from baseline
if baseline is not None and baseline > 0:
decline = (baseline - current) / baseline
if decline > max_decline:
violations.append(
f"{metric} declined {decline:.1%} from baseline "
f"{baseline:.4f} (max allowed: {max_decline:.1%})"
)
if violations:
return TriggerEvent(
trigger_type=TriggerType.PERFORMANCE,
triggered_at=datetime.utcnow(),
reason=(
f"Performance degradation: "
+ "; ".join(violations)
),
metadata=current_metrics,
priority=1 if is_critical else 2,
)
return None
@dataclass
class RetrainingTriggerManager:
"""Orchestrates multiple retraining triggers for a model.
Combines scheduled, drift, and performance triggers. When any
trigger fires, the manager decides whether to initiate retraining
(with deduplication to prevent redundant retraining).
Attributes:
model_name: Name of the model.
scheduled: Scheduled retraining trigger.
drift: Data drift trigger.
performance: Performance degradation trigger.
min_retrain_interval_hours: Minimum time between retraining runs.
last_retrain_time: When the last retraining was initiated.
trigger_history: History of trigger events.
"""
model_name: str
scheduled: ScheduledTrigger
drift: DriftTrigger
performance: PerformanceTrigger
min_retrain_interval_hours: int = 24
last_retrain_time: Optional[datetime] = None
trigger_history: List[TriggerEvent] = field(default_factory=list)
def evaluate(
self,
now: datetime,
psi_values: Dict[str, float],
production_metrics: Dict[str, float],
) -> Optional[TriggerEvent]:
"""Evaluate all triggers and return the highest-priority event.
Deduplicates: if retraining was triggered recently (within
min_retrain_interval_hours), suppress non-critical triggers.
Args:
now: Current timestamp.
psi_values: Current PSI values for monitored features.
production_metrics: Current production metric values.
Returns:
The highest-priority TriggerEvent, or None if no trigger fires.
"""
events: List[TriggerEvent] = []
# Check each trigger
scheduled_event = self.scheduled.should_trigger(now)
if scheduled_event:
events.append(scheduled_event)
drift_event = self.drift.check_drift(psi_values)
if drift_event:
events.append(drift_event)
perf_event = self.performance.check_performance(production_metrics)
if perf_event:
events.append(perf_event)
if not events:
return None
# Sort by priority (lower = higher priority)
events.sort(key=lambda e: e.priority)
best_event = events[0]
# Deduplication: suppress non-critical triggers if recent retrain
if self.last_retrain_time is not None:
elapsed = now - self.last_retrain_time
min_interval = timedelta(hours=self.min_retrain_interval_hours)
if elapsed < min_interval and best_event.priority > 1:
return None # Suppress non-critical trigger
self.trigger_history.append(best_event)
self.last_retrain_time = now
return best_event
2. Drift-Based Retraining
When the monitoring system (Chapter 30) detects that input data has shifted significantly — PSI exceeds the threshold for one or more key features — it triggers retraining. Drift-based triggers are reactive: they retrain only when the data distribution has actually changed, avoiding unnecessary retraining when the world is stable.
The tradeoff is detection latency. PSI requires accumulating enough data to compute a reliable distribution estimate — typically one day's worth. If the drift is sudden (a schema change, a logging bug), drift detection may lag by 24 hours, during which the model serves predictions on shifted data. Performance-based triggers can catch this faster if the drift affects model quality directly.
3. Performance-Based Retraining
When a production metric (CTR, NDCG@20, AUC) drops below an absolute floor or declines more than a relative threshold from the baseline, the system triggers retraining. Performance-based triggers are the most direct: they retrain when the model is actually underperforming, not when the data has shifted (which may or may not affect model quality).
The tradeoff is that performance metrics are lagging indicators: they require user outcomes, which may arrive hours or days after the predictions are served. A recommendation model's CTR can be measured within hours; a credit scoring model's default rate cannot be measured for months.
StreamRec Retraining Configuration
StreamRec uses all three triggers in combination:
# StreamRec retraining trigger configuration
streamrec_triggers = RetrainingTriggerManager(
model_name="streamrec-retrieval",
scheduled=ScheduledTrigger(
cron_expression="0 2 * * 0", # Sunday 2am UTC
model_name="streamrec-retrieval",
),
drift=DriftTrigger(
feature_thresholds={
"user_engagement_rate": 0.20,
"item_popularity_score": 0.25,
"session_length_minutes": 0.25,
"platform_distribution": 0.20,
},
psi_alert_threshold=0.25,
model_name="streamrec-retrieval",
),
performance=PerformanceTrigger(
metric_thresholds={
"recall_at_20": (0.15, 0.10), # floor=0.15, max 10% decline
"ndcg_at_20": (0.12, 0.10), # floor=0.12, max 10% decline
"ctr": (0.03, 0.15), # floor=3%, max 15% decline
"completion_rate": (0.08, 0.15), # floor=8%, max 15% decline
},
baseline_metrics={
"recall_at_20": 0.214,
"ndcg_at_20": 0.186,
"ctr": 0.052,
"completion_rate": 0.134,
},
model_name="streamrec-retrieval",
),
min_retrain_interval_hours=24,
)
The scheduled trigger provides a guaranteed weekly refresh. The drift trigger catches distribution shifts between scheduled retraining windows. The performance trigger catches quality degradation that drift detection might miss (because the drift affects features not monitored by PSI, or because the model is sensitive to small shifts that fall below the PSI threshold). The 24-hour minimum interval prevents redundant retraining when multiple triggers fire simultaneously.
29.9 Retraining Pipeline Integration
When a retraining trigger fires, the system must execute the full pipeline: extract data, validate data, compute features, train, evaluate, validate, and deploy. This is the same Dagster pipeline from Chapter 27, triggered automatically rather than manually.
The integration between the trigger manager and the orchestration pipeline is:
Trigger Manager
│
├── fires TriggerEvent (type=scheduled|drift|performance)
│
▼
Dagster Sensor
│
├── reads trigger event from message queue (e.g., SQS, Pub/Sub)
├── creates a Dagster run with trigger metadata as run tags
│
▼
Dagster Pipeline (Chapter 27)
│
├── extract → validate → features → train → evaluate → register
│
▼
Model Registry
│
├── new model version registered in Staging
│
▼
Deployment Pipeline (this chapter)
│
├── shadow → canary → progressive rollout → production
│
▼
Monitoring (Chapter 30)
│
├── update baseline metrics for next trigger evaluation
The critical design decision is whether the trigger-to-production path is fully automated or requires human approval. For StreamRec, the configuration is:
- Scheduled retraining: Fully automated (shadow → canary → rollout proceeds without approval).
- Drift-triggered retraining: Automated training and validation; human approval required before canary deployment (because drift may indicate a data quality issue, not a model staleness issue).
- Performance-triggered retraining: Automated training; human approval required (because performance degradation may have a root cause that retraining cannot fix — a feature pipeline bug, an upstream schema change, or a business context change).
This tiered automation ensures that routine retraining happens without human bottlenecks while preserving human judgment for anomalous situations.
29.10 Model Rollback: The Safety Net
Every deployment must have a rollback plan. For ML models, rollback means restoring the previous champion model — the model that was serving production traffic before the current deployment. Rollback must be:
- Fast: The model must be restored within seconds, not minutes. A degraded model serving production traffic for 10 minutes at 20 million requests per day affects 140,000 requests.
- Safe: The rollback itself must not introduce additional risk. The previous champion model must still be available, its serving infrastructure must still be running, and the rollback mechanism must be tested regularly.
- Automated: Rollback should not require a human to log in, identify the problem, find the previous model version, and manually switch traffic. The monitoring system (Chapter 30) or the canary evaluator (Section 29.6) should trigger rollback automatically when thresholds are violated.
Rollback Architecture
The standard rollback architecture keeps the previous champion model deployed and ready to serve:
Load Balancer / Feature Flag
│
├── traffic_weight=1.0 ──► Current Champion (v18)
├── traffic_weight=0.0 ──► Previous Champion (v17) [warm standby]
└── traffic_weight=0.0 ──► Canary (v19) [if active]
Rollback:
Set v18 traffic_weight = 0.0
Set v17 traffic_weight = 1.0
Duration: < 5 seconds (load balancer config change)
The previous champion remains deployed (warm standby) throughout the current champion's lifetime. Its serving infrastructure is allocated but idle, consuming resources but enabling instantaneous rollback. When the current champion is retired (replaced by a new champion that has completed full progressive rollout), the warm standby is deallocated.
"""
rollback.py — Automated model rollback with health checks.
Implements the rollback procedure: detect issue, verify rollback
target health, switch traffic, confirm, and notify.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
class RollbackReason(Enum):
"""Reasons for triggering a model rollback."""
CANARY_FAILURE = "canary_threshold_violation"
LATENCY_SPIKE = "latency_p99_exceeded"
ERROR_RATE = "error_rate_exceeded"
METRIC_DEGRADATION = "business_metric_degradation"
DATA_QUALITY = "upstream_data_quality_issue"
MANUAL = "manual_trigger"
class RollbackStatus(Enum):
"""Status of a rollback operation."""
INITIATED = "initiated"
HEALTH_CHECK = "health_check"
TRAFFIC_SWITCHING = "traffic_switching"
VERIFYING = "verifying"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class RollbackTarget:
"""The model to roll back to.
Attributes:
model_name: Registered model name.
version: Version number to restore.
artifact_uri: URI to the serialized model.
serving_endpoint: URL of the warm standby endpoint.
last_health_check: When the standby was last verified healthy.
is_warm: Whether the model is loaded and ready to serve.
"""
model_name: str
version: int
artifact_uri: str
serving_endpoint: str
last_health_check: Optional[datetime] = None
is_warm: bool = True
@dataclass
class RollbackProcedure:
"""Automated rollback procedure.
Executes: detect → health check → switch traffic → verify → notify.
Attributes:
current_champion: The current (potentially degraded) model.
rollback_target: The previous champion to restore.
reason: Why the rollback was triggered.
status: Current status of the rollback.
started_at: When the rollback was initiated.
completed_at: When the rollback completed.
steps: Log of rollback steps.
"""
current_champion: Dict[str, object]
rollback_target: RollbackTarget
reason: RollbackReason
status: RollbackStatus = RollbackStatus.INITIATED
started_at: datetime = field(default_factory=datetime.utcnow)
completed_at: Optional[datetime] = None
steps: List[Dict[str, str]] = field(default_factory=list)
def execute(self) -> Dict[str, object]:
"""Execute the full rollback procedure.
Returns:
Dictionary with rollback outcome and timing.
"""
# Step 1: Health check on rollback target
self.status = RollbackStatus.HEALTH_CHECK
self._log_step("health_check", "Verifying rollback target health")
if not self.rollback_target.is_warm:
self.status = RollbackStatus.FAILED
self._log_step(
"health_check_failed",
"Rollback target is not warm — cannot proceed",
)
return self._result("failed", "Rollback target not available")
# Step 2: Switch traffic
self.status = RollbackStatus.TRAFFIC_SWITCHING
self._log_step(
"traffic_switch",
f"Routing 100% traffic to v{self.rollback_target.version}",
)
# In production, this calls the load balancer API or updates
# the feature flag service. The actual traffic switch is
# typically < 1 second.
# Step 3: Verify
self.status = RollbackStatus.VERIFYING
self._log_step(
"verify",
"Verifying rollback target is serving correctly",
)
# In production, this sends health check requests and verifies
# that the rollback target returns valid predictions.
# Step 4: Complete
self.status = RollbackStatus.COMPLETED
self.completed_at = datetime.utcnow()
duration = self.completed_at - self.started_at
self._log_step(
"completed",
f"Rollback completed in {duration.total_seconds():.1f}s",
)
return self._result("success", f"Rolled back in {duration}")
def _log_step(self, step_name: str, description: str) -> None:
"""Log a rollback step."""
self.steps.append({
"step": step_name,
"description": description,
"timestamp": datetime.utcnow().isoformat(),
"status": self.status.value,
})
def _result(
self, outcome: str, message: str
) -> Dict[str, object]:
"""Construct the rollback result."""
return {
"outcome": outcome,
"message": message,
"reason": self.reason.value,
"rolled_back_from": self.current_champion,
"rolled_back_to": {
"model_name": self.rollback_target.model_name,
"version": self.rollback_target.version,
},
"started_at": self.started_at.isoformat(),
"completed_at": (
self.completed_at.isoformat()
if self.completed_at
else None
),
"steps": self.steps,
}
Rollback Testing
A rollback procedure that has never been tested is not a rollback procedure — it is a hope. StreamRec tests rollback in three ways:
-
Monthly rollback drill. Once per month, the on-call engineer executes a planned rollback during a low-traffic window. The drill verifies that the warm standby is healthy, that the traffic switch completes in under 5 seconds, and that the restored model serves predictions correctly. The drill is logged and reviewed.
-
Chaos engineering. The team uses a chaos engineering framework to inject failures: kill a model server pod, inject latency into the feature store, corrupt a model artifact. The rollback automation must detect the failure and restore service without human intervention.
-
Staging environment testing. Every deployment to production is first tested in a staging environment that mirrors production. The staging deployment includes a forced rollback after 10 minutes: deploy the new model, verify it serves correctly, roll back to the previous model, verify the rollback completes. Only after the staging rollback succeeds does the production deployment proceed.
29.11 Model Serving Infrastructure
The deployment pipeline described in this chapter assumes a model serving infrastructure that supports traffic splitting, health checks, and rapid rollback. Three common serving frameworks:
Comparison of Serving Frameworks
| Feature | TorchServe | Seldon Core | BentoML |
|---|---|---|---|
| Framework | PyTorch native | Framework-agnostic (K8s) | Framework-agnostic |
| Deployment | Standalone or K8s | Kubernetes-native | Docker, K8s, cloud |
| Traffic splitting | Via load balancer | Native (Istio) | Via load balancer |
| A/B testing | Manual | Native | Via router |
| Canary | Via Istio/Argo | Native | Via cloud providers |
| Auto-scaling | K8s HPA | K8s HPA + custom | K8s HPA, Yatai |
| Model format | .mar (TorchScript, eager) | ONNX, TF, PyTorch, custom | Any Python callable |
| Batching | Built-in (dynamic batching) | Via inference server | Built-in (adaptive) |
| Multi-model | Yes | Yes (multi-armed bandit) | Yes |
| Best for | PyTorch-only shops | K8s-native ML platform teams | Rapid prototyping → production |
For StreamRec, the architecture uses BentoML for model packaging (creating a self-contained "Bento" that includes the model, preprocessing code, and API definition) deployed on Kubernetes. Traffic splitting for canary and progressive rollout is handled by Istio service mesh, which provides fine-grained traffic routing at the network level:
Istio VirtualService Configuration:
route:
- destination: streamrec-champion (v18)
weight: 90
- destination: streamrec-canary (v19)
weight: 10
Updating the traffic weight requires a single Kubernetes API call to modify the Istio VirtualService resource. The change propagates to all proxies within seconds, making both canary promotion and rollback near-instantaneous.
Containerized Model Serving
Every model is packaged as a Docker container with pinned dependencies:
Dockerfile.serving
├── Base image: python:3.11-slim
├── System dependencies: libgomp (for LightGBM), libgl1 (for OpenCV)
├── Python dependencies: requirements-serving.txt (pinned versions)
├── Model artifact: /app/model/ (copied from registry)
├── Serving code: /app/src/ (feature preprocessing, prediction logic)
├── Config: /app/config/serving.yaml (batch size, timeout, health check)
└── Entry point: bentoml serve streamrec_service:svc --port 3000
The container is the immutable artifact of deployment. The same container that passes integration tests in CI is the container that runs in staging and production. No environment-specific modifications, no runtime downloads, no "works on my machine" failures.
29.12 Progressive Project: M13 — StreamRec Deployment Pipeline
Milestone M13 builds the complete deployment pipeline for StreamRec, integrating the CI/CD, shadow mode, canary deployment, progressive rollout, retraining triggers, and rollback infrastructure developed throughout this chapter.
M13 Requirements
- Weekly retraining (Sunday 2am UTC) with the Dagster pipeline from Chapter 27
- Shadow mode evaluation (7 days, 100,000 minimum predictions)
- Canary deployment (10% traffic, 3 days, statistical evaluation)
- Progressive rollout (25% → 50% → 100%, 1 day per stage)
- Automatic rollback on guardrail violation (error rate > 0.5%, CTR decline > 2%)
- Drift-based retraining trigger (PSI > 0.25 on key features)
- Performance-based retraining trigger (Recall@20 < 0.15 or >10% decline)
- Full artifact lineage (code commit, data version, hyperparameters, evaluation metrics)
M13 Implementation
"""
m13_streamrec_deployment.py — Complete deployment pipeline for StreamRec.
Progressive Project Milestone M13: integrates CI/CD, shadow mode,
canary deployment, progressive rollout, retraining triggers, and
rollback into a single orchestrated deployment pipeline.
This module ties together the components developed throughout Chapter 29:
- ModelRegistry and ModelArtifact (Section 29.3)
- ArtifactLineage (Section 29.4)
- ShadowEvaluator (Section 29.5)
- CanaryConfig and evaluate_canary (Section 29.6)
- ProgressiveRolloutController (Section 29.7)
- RetrainingTriggerManager (Section 29.8)
- RollbackProcedure (Section 29.10)
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
class PipelinePhase(Enum):
"""Phases of the StreamRec deployment pipeline."""
TRAINING = "training"
VALIDATION = "validation"
SHADOW = "shadow"
CANARY = "canary"
PROGRESSIVE_ROLLOUT = "progressive_rollout"
PRODUCTION = "production"
ROLLED_BACK = "rolled_back"
@dataclass
class DeploymentPipelineConfig:
"""Configuration for the StreamRec deployment pipeline.
Consolidates all deployment parameters into a single configuration
object that can be version-controlled and audited.
Attributes:
model_name: Name of the model (e.g., 'streamrec-retrieval').
retraining_schedule: Cron expression for scheduled retraining.
shadow_duration_hours: Duration of shadow evaluation.
shadow_min_predictions: Minimum predictions for shadow evaluation.
canary_percentage: Fraction of traffic for canary.
canary_duration_hours: Maximum canary duration.
canary_min_impressions: Minimum impressions for canary evaluation.
rollout_stages: Traffic percentages for progressive rollout.
rollout_stage_duration_hours: Minimum hours per rollout stage.
max_latency_p99_ms: Maximum acceptable p99 latency.
max_error_rate: Maximum acceptable serving error rate.
min_ctr_lift: Minimum CTR lift for canary promotion.
drift_psi_threshold: PSI threshold for drift-triggered retraining.
performance_floor: Absolute metric floors for performance trigger.
performance_max_decline: Maximum relative decline for performance trigger.
"""
model_name: str = "streamrec-retrieval"
retraining_schedule: str = "0 2 * * 0" # Sunday 2am UTC
# Shadow mode
shadow_duration_hours: int = 168 # 7 days
shadow_min_predictions: int = 100_000
shadow_max_latency_p99_ms: float = 50.0
# Canary
canary_percentage: float = 0.10
canary_duration_hours: int = 72 # 3 days
canary_min_impressions: int = 500_000
# Progressive rollout
rollout_stages: List[float] = field(
default_factory=lambda: [0.25, 0.50, 1.00]
)
rollout_stage_duration_hours: int = 24
# Guardrails
max_latency_p99_ms: float = 50.0
max_error_rate: float = 0.005
min_ctr_lift: float = -0.02
# Retraining triggers
drift_psi_threshold: float = 0.25
performance_floor: Dict[str, float] = field(
default_factory=lambda: {
"recall_at_20": 0.15,
"ndcg_at_20": 0.12,
"ctr": 0.03,
}
)
performance_max_decline: Dict[str, float] = field(
default_factory=lambda: {
"recall_at_20": 0.10,
"ndcg_at_20": 0.10,
"ctr": 0.15,
}
)
@dataclass
class DeploymentPipelineState:
"""Tracks the current state of a deployment pipeline run.
Attributes:
run_id: Unique identifier for this pipeline run.
model_name: Name of the model being deployed.
challenger_version: Version of the model being deployed.
champion_version: Current production model version.
phase: Current pipeline phase.
started_at: When the pipeline run started.
phase_started_at: When the current phase started.
trigger_event: What triggered this pipeline run.
phase_history: Log of phase transitions.
metrics: Collected metrics at each phase.
"""
run_id: str
model_name: str
challenger_version: int
champion_version: int
phase: PipelinePhase = PipelinePhase.TRAINING
started_at: datetime = field(default_factory=datetime.utcnow)
phase_started_at: datetime = field(default_factory=datetime.utcnow)
trigger_event: Optional[Dict[str, str]] = None
phase_history: List[Dict[str, str]] = field(default_factory=list)
metrics: Dict[str, Dict[str, float]] = field(default_factory=dict)
def transition_to(
self, new_phase: PipelinePhase, reason: str
) -> None:
"""Record a phase transition.
Args:
new_phase: The phase to transition to.
reason: Why the transition occurred.
"""
self.phase_history.append({
"from_phase": self.phase.value,
"to_phase": new_phase.value,
"reason": reason,
"timestamp": datetime.utcnow().isoformat(),
})
self.phase = new_phase
self.phase_started_at = datetime.utcnow()
def record_metrics(
self, phase: str, metrics: Dict[str, float]
) -> None:
"""Record metrics for a pipeline phase.
Args:
phase: Phase name.
metrics: Dictionary of metric values.
"""
self.metrics[phase] = metrics
@dataclass
class StreamRecDeploymentPipeline:
"""The complete StreamRec deployment pipeline.
Orchestrates the full lifecycle:
trigger → train → validate → shadow → canary → rollout → production
This class is the top-level coordinator that references the
components from earlier sections. In production, each phase
would be implemented as a Dagster asset or job; here, the
logic is consolidated for clarity.
Attributes:
config: Pipeline configuration.
state: Current pipeline state.
"""
config: DeploymentPipelineConfig
state: Optional[DeploymentPipelineState] = None
def initiate_deployment(
self,
run_id: str,
challenger_version: int,
champion_version: int,
trigger_type: str = "scheduled",
trigger_reason: str = "",
) -> DeploymentPipelineState:
"""Start a new deployment pipeline run.
Args:
run_id: Unique run identifier.
challenger_version: Model version to deploy.
champion_version: Current production version.
trigger_type: What initiated this deployment.
trigger_reason: Detailed reason for the trigger.
Returns:
Initial pipeline state.
"""
self.state = DeploymentPipelineState(
run_id=run_id,
model_name=self.config.model_name,
challenger_version=challenger_version,
champion_version=champion_version,
trigger_event={
"type": trigger_type,
"reason": trigger_reason,
},
)
return self.state
def evaluate_phase(
self, current_metrics: Dict[str, float]
) -> Dict[str, Any]:
"""Evaluate whether the current phase should advance, wait, or rollback.
Applies the appropriate evaluation logic based on the current phase.
Args:
current_metrics: Current metrics for the active phase.
Returns:
Dictionary with action ('advance', 'wait', 'rollback') and details.
"""
if self.state is None:
return {"action": "error", "reason": "No active deployment"}
phase = self.state.phase
if phase == PipelinePhase.SHADOW:
return self._evaluate_shadow(current_metrics)
elif phase == PipelinePhase.CANARY:
return self._evaluate_canary(current_metrics)
elif phase == PipelinePhase.PROGRESSIVE_ROLLOUT:
return self._evaluate_rollout(current_metrics)
else:
return {
"action": "wait",
"reason": f"Phase {phase.value} not evaluable",
}
def _evaluate_shadow(
self, metrics: Dict[str, float]
) -> Dict[str, Any]:
"""Evaluate shadow mode metrics."""
assert self.state is not None
elapsed = datetime.utcnow() - self.state.phase_started_at
min_duration = timedelta(hours=self.config.shadow_duration_hours)
if elapsed < min_duration:
return {
"action": "wait",
"reason": f"Shadow duration: {elapsed} < {min_duration}",
}
n_predictions = int(metrics.get("n_predictions", 0))
if n_predictions < self.config.shadow_min_predictions:
return {
"action": "wait",
"reason": (
f"Predictions: {n_predictions:,} < "
f"{self.config.shadow_min_predictions:,}"
),
}
p99 = metrics.get("challenger_p99_ms", 0.0)
if p99 > self.config.shadow_max_latency_p99_ms:
return {
"action": "rollback",
"reason": (
f"Shadow p99 latency {p99:.1f}ms > "
f"{self.config.shadow_max_latency_p99_ms}ms"
),
}
self.state.record_metrics("shadow", metrics)
return {"action": "advance", "next_phase": "canary"}
def _evaluate_canary(
self, metrics: Dict[str, float]
) -> Dict[str, Any]:
"""Evaluate canary deployment metrics."""
assert self.state is not None
elapsed = datetime.utcnow() - self.state.phase_started_at
min_duration = timedelta(hours=self.config.canary_duration_hours)
# Immediate rollback on guardrail violation
error_rate = metrics.get("canary_error_rate", 0.0)
if error_rate > self.config.max_error_rate:
return {
"action": "rollback",
"reason": (
f"Error rate {error_rate:.4f} > "
f"{self.config.max_error_rate}"
),
}
ctr_lift = metrics.get("ctr_lift", 0.0)
is_significant = metrics.get("is_significant", False)
if is_significant and ctr_lift < self.config.min_ctr_lift:
return {
"action": "rollback",
"reason": (
f"Significant CTR regression: {ctr_lift:+.3%}"
),
}
if elapsed < min_duration:
return {
"action": "wait",
"reason": f"Canary duration: {elapsed} < {min_duration}",
}
impressions = int(metrics.get("canary_impressions", 0))
if impressions < self.config.canary_min_impressions:
return {
"action": "wait",
"reason": (
f"Impressions: {impressions:,} < "
f"{self.config.canary_min_impressions:,}"
),
}
# Canary passed — advance to progressive rollout
self.state.record_metrics("canary", metrics)
return {"action": "advance", "next_phase": "progressive_rollout"}
def _evaluate_rollout(
self, metrics: Dict[str, float]
) -> Dict[str, Any]:
"""Evaluate progressive rollout metrics."""
assert self.state is not None
# Check guardrails
error_rate = metrics.get("error_rate", 0.0)
if error_rate > self.config.max_error_rate:
return {
"action": "rollback",
"reason": f"Error rate {error_rate:.4f} exceeded",
}
elapsed = datetime.utcnow() - self.state.phase_started_at
min_duration = timedelta(
hours=self.config.rollout_stage_duration_hours
)
if elapsed < min_duration:
return {
"action": "wait",
"reason": (
f"Rollout stage duration: {elapsed} < {min_duration}"
),
}
self.state.record_metrics("rollout", metrics)
return {"action": "advance", "next_phase": "next_stage"}
def generate_pipeline_report(self) -> Dict[str, Any]:
"""Generate a summary report of the deployment pipeline run.
Returns:
Dictionary with complete pipeline run summary.
"""
if self.state is None:
return {"error": "No active deployment"}
total_duration = datetime.utcnow() - self.state.started_at
return {
"run_id": self.state.run_id,
"model_name": self.state.model_name,
"challenger_version": self.state.challenger_version,
"champion_version": self.state.champion_version,
"current_phase": self.state.phase.value,
"total_duration": str(total_duration),
"trigger": self.state.trigger_event,
"phase_history": self.state.phase_history,
"metrics": self.state.metrics,
}
# ── StreamRec M13 Configuration ─────────────────────────────────────────
streamrec_config = DeploymentPipelineConfig(
model_name="streamrec-retrieval",
retraining_schedule="0 2 * * 0",
shadow_duration_hours=168,
shadow_min_predictions=100_000,
shadow_max_latency_p99_ms=50.0,
canary_percentage=0.10,
canary_duration_hours=72,
canary_min_impressions=500_000,
rollout_stages=[0.25, 0.50, 1.00],
rollout_stage_duration_hours=24,
max_latency_p99_ms=50.0,
max_error_rate=0.005,
min_ctr_lift=-0.02,
drift_psi_threshold=0.25,
performance_floor={
"recall_at_20": 0.15,
"ndcg_at_20": 0.12,
"ctr": 0.03,
},
performance_max_decline={
"recall_at_20": 0.10,
"ndcg_at_20": 0.10,
"ctr": 0.15,
},
)
pipeline = StreamRecDeploymentPipeline(config=streamrec_config)
M13 Timeline
For a typical weekly deployment:
| Day | Phase | Traffic | Key Metrics |
|---|---|---|---|
| Sunday 02:00 | Retraining triggered | 0% | Pipeline starts |
| Sunday 05:30 | Training completes, validation passes | 0% | Recall@20=0.218, NDCG@20=0.189 |
| Sunday 06:00 | Shadow mode begins | 0% (parallel) | Latency, rank correlation |
| Saturday 06:00 | Shadow evaluation passes | 0% | p99=32ms, rank_corr=0.94 |
| Saturday 07:00 | Canary begins | 10% | CTR, completion rate |
| Tuesday 07:00 | Canary evaluation passes | 10% | CTR +0.8% (p=0.03) |
| Tuesday 08:00 | Progressive rollout stage 1 | 25% | CTR, engagement |
| Wednesday 08:00 | Progressive rollout stage 2 | 50% | CTR, engagement |
| Thursday 08:00 | Full rollout | 100% | Promoted to champion |
Total deployment time: approximately 12 days from retraining trigger to full production rollout. This is deliberately conservative — the 7-day shadow period and 3-day canary period prioritize safety over speed. For urgent deployments (e.g., fixing a critical model bug), the pipeline supports a "fast track" mode that reduces shadow to 24 hours and canary to 24 hours, with explicit human approval.
29.13 Deployment in Regulated Environments
The deployment pipeline described for StreamRec operates in a low-regulation environment: recommendations are not life-critical, and a bad model degrades engagement metrics but does not violate regulations. Credit scoring, healthcare, insurance, and financial trading operate under regulatory frameworks that impose additional deployment constraints.
Model Risk Management (SR 11-7)
The Federal Reserve's SR 11-7 guidance requires financial institutions to maintain a model risk management (MRM) framework that covers model development, validation, and ongoing monitoring. For deployment, this means:
- Independent validation. The model validation gate (Chapter 28) must be executed by a team independent of the model development team. The data scientist who trained the model cannot approve its deployment.
- Documentation. Every deployment must produce auditable documentation: the model's purpose, methodology, assumptions, limitations, validation results, and approval signatures.
- Change management. Model deployments are classified by materiality: material changes (new architecture, new feature set, new training data source) require full re-validation; non-material changes (retraining on fresh data with the same architecture) require abbreviated validation.
- Approval workflow. Before a model reaches production, it must be approved by the model owner, the MRM team, and (for material changes) the model risk committee. This approval is not automated — it is a human judgment.
The deployment pipeline for Meridian Financial's credit scoring model adds these regulatory stages:
Standard Pipeline:
train → validate → shadow → canary → rollout
Regulated Pipeline (Meridian Financial):
train → validate → [MRM REVIEW] → [MRM APPROVAL] → shadow → canary → [BUSINESS APPROVAL] → rollout
The MRM review stage typically adds 2-5 business days to the deployment timeline. The business approval stage adds 1-2 business days. A deployment that takes 12 days at StreamRec takes 17-19 days at Meridian Financial.
Feature Flags for Regulatory Compliance
Feature flags (also called feature toggles) provide fine-grained control over model deployment without infrastructure changes. A feature flag is a runtime configuration that determines which model serves which traffic:
| Flag | Type | Use Case |
|---|---|---|
model_version |
String | Select model version for serving |
canary_percentage |
Float | Control canary traffic split |
enable_new_features |
Boolean | Gate new feature engineering code |
fallback_model |
String | Model to use if primary fails |
kill_switch |
Boolean | Instantly disable a model and serve fallback |
The kill switch is the most critical feature flag. It provides a single-click mechanism to disable any model and route all traffic to a known-safe fallback. For Meridian Financial, the kill switch routes all scoring to the previous approved model — ensuring that credit decisions are never interrupted.
29.14 Summary
Continuous training and deployment extends the ML pipeline from "model trained and validated" to "model serving production traffic safely and staying fresh over time." The key ideas:
-
Three artifacts, not one. ML deployments manage code, data, and model artifacts with different versioning, testing, and promotion strategies. The CI/CD pipeline must handle all three.
-
Staged deployment. Shadow mode (zero risk), canary (limited risk), and progressive rollout (controlled risk) provide increasing confidence that the new model is safe before full traffic exposure.
-
Continuous training. Scheduled, drift-based, and performance-based triggers keep models fresh. The trigger → train → validate → deploy pipeline runs automatically, with human gates for anomalous situations.
-
Rollback as infrastructure. Rollback is not an emergency procedure — it is a designed, tested, and automated capability. The previous champion remains warm and ready to serve at all times.
-
MLOps maturity. The path from manual notebooks (Level 0) to automated ML systems (Level 3) is a progression of engineering investment. Level 2 — automated CI/CD with staged rollout — is the target for production models.
Production ML = Software Engineering: The deployment pipeline in this chapter is fundamentally a software engineering artifact. It uses the same patterns — CI/CD, immutable artifacts, staged rollouts, feature flags, chaos engineering, rollback testing — that mature software organizations have refined over decades. The ML-specific additions (three-artifact versioning, shadow mode, retraining triggers, model validation gates) build on top of this software engineering foundation, not in place of it. The organizations that deploy ML systems reliably are the ones that recognize this.
Chapter 30 builds the monitoring and observability infrastructure that closes the loop: detecting data drift, model degradation, and system failures in production, and feeding that information back to the retraining triggers and rollback procedures developed in this chapter.