26 min read

> "Deploying a model is not the finish line. It is the starting gun."

Chapter 29: Continuous Training and Deployment — CI/CD for ML, Canary Deployments, Shadow Mode, and Progressive Rollout

"Deploying a model is not the finish line. It is the starting gun." — Chip Huyen, Designing Machine Learning Systems (2022)


Learning Objectives

By the end of this chapter, you will be able to:

  1. Design CI/CD pipelines for ML that handle three artifact types — code, data, and model — with appropriate versioning, testing, and promotion strategies for each
  2. Implement canary deployments with gradual traffic shifts, statistical significance testing, and automatic promotion or rollback decisions
  3. Use shadow mode to evaluate a challenger model on production traffic without affecting user experience, and design the metrics comparison framework that translates shadow results into deployment decisions
  4. Design retraining triggers — drift-based, performance-based, and scheduled — that balance model freshness against computational cost and deployment risk
  5. Implement model rollback procedures that restore the previous champion within seconds, including the infrastructure, automation, and organizational processes that make rollback safe and reliable

29.1 The Deployment Gap

Chapter 27 built the orchestration pipeline that trains a model. Chapter 28 built the testing infrastructure that validates it. This chapter answers the question that follows: how does a validated model reach production traffic safely, and how does it stay fresh once it gets there?

The question is harder than it appears. In traditional software deployment, the artifact is a binary — a Docker image, a JAR file, a compiled executable. The CI/CD pipeline is well-understood: commit code, run tests, build artifact, deploy to staging, run integration tests, deploy to production. If the deployment fails, roll back to the previous binary. The entire process is deterministic: the same code produces the same binary, and the same binary produces the same behavior.

ML deployment breaks this determinism in three ways.

Three artifacts, not one. A traditional deployment ships code. An ML deployment ships code (the serving infrastructure, feature engineering logic, and API contracts), data (the training dataset, feature schemas, and preprocessing parameters), and a model artifact (the serialized weights, architecture definition, and metadata). Each artifact has its own versioning scheme, its own testing requirements, and its own failure modes. A code change that does not touch the model can break serving. A data change that does not touch the code can degrade predictions. A model retrained on the same code and same data can produce different results due to non-deterministic training. The CI/CD pipeline must version, test, and promote all three artifacts together.

No ground truth at serving time. When a traditional service deploys a new version, integration tests can verify that the output is correct: an API that should return a 200 status code either returns it or does not. When a recommendation model deploys a new version, there is no immediate signal of correctness. The model returns a list of item IDs. Those IDs are valid — they exist in the catalog. But whether the recommendations are good requires observing user behavior over hours or days. The deployment pipeline must evaluate model quality through proxy metrics (latency, error rate, prediction distribution) and delayed outcome metrics (click-through rate, completion rate, engagement) with different time horizons.

Continuous retraining. Traditional software deploys when code changes. ML systems must also deploy when data changes — because the world changes. A recommendation model trained on last month's data does not know about this month's trending content. A credit scoring model trained before a recession does not reflect the new default patterns. The deployment pipeline must support not only on-demand deployment (triggered by code or architecture changes) but also continuous training — scheduled or triggered retraining that keeps the model current without human intervention.

Production ML = Software Engineering: The deployment gap is not a machine learning problem — it is a software engineering problem with ML-specific constraints. Every principle from software deployment applies: immutable artifacts, staged rollouts, automated rollback, observability. ML adds the three-artifact challenge, the delayed-feedback challenge, and the continuous-retraining challenge on top of the standard deployment playbook. The organizations that deploy ML systems reliably are the ones that recognize this: they build ML deployment on top of mature software deployment infrastructure, not from scratch.

This chapter proceeds in five movements. Section 29.2-29.4 cover the three-artifact CI/CD pipeline: how to version, test, and promote code, data, and models together. Section 29.5-29.7 cover deployment strategies: blue-green, canary, shadow mode, and progressive rollout. Section 29.8-29.9 cover continuous training: retraining triggers, scheduling, and the automation that keeps models fresh. Section 29.10-29.11 cover rollback: how to detect a bad deployment and restore the previous model safely. Section 29.12 synthesizes these into the StreamRec deployment pipeline for progressive project milestone M13.


29.2 MLOps Maturity Levels

Before designing a CI/CD pipeline, it is useful to understand where your organization falls on the MLOps maturity spectrum. Google's MLOps maturity framework defines four levels:

Level 0: Manual Process

Aspect Description
Training Manual, in notebooks
Deployment Manual, by the data scientist
Monitoring None
Retraining When someone remembers
Testing Manual eyeballing of metrics

At Level 0, the data scientist trains a model in a Jupyter notebook, exports the weights to a file, copies the file to a server, and updates a configuration file to point to the new model. Deployment is a human process with no automation, no versioning, and no rollback capability. Most ML projects start here. Many never leave.

Level 1: ML Pipeline Automation

Aspect Description
Training Automated pipeline (Dagster, Airflow)
Deployment Manual promotion from registry
Monitoring Basic metrics (latency, error rate)
Retraining Scheduled (e.g., weekly)
Testing Automated data validation, model evaluation

At Level 1, the training pipeline from Chapter 27 runs automatically. Data validation (Chapter 28) catches quality issues. Trained models are registered in MLflow with metrics and metadata. But deployment is still manual: an engineer reviews the metrics, decides to promote, and triggers deployment. This is where the StreamRec pipeline stood at the end of Chapter 28.

Level 2: CI/CD Pipeline Automation

Aspect Description
Training Automated pipeline with CT triggers
Deployment Automated CI/CD with staged rollout
Monitoring Model performance + data drift
Retraining Event-driven (drift, degradation, schedule)
Testing Full ML test suite (data, behavioral, validation gate)

At Level 2, the entire path from code commit to production traffic is automated. Code changes trigger CI tests. Successful training triggers model validation. Validated models enter a staged deployment pipeline (shadow mode, canary, progressive rollout). Monitoring detects issues and triggers rollback or retraining. This is the target of this chapter.

Level 3: Automated ML System

Aspect Description
Training Automated with hyperparameter optimization
Deployment Fully automated with zero-touch rollout
Monitoring Automated anomaly detection, root cause analysis
Retraining Automated trigger → train → validate → deploy
Testing Automated test generation, adversarial testing

At Level 3, the entire system is self-healing. Drift triggers retraining. Retraining triggers validation. Validation triggers deployment. Deployment is monitored. Monitoring triggers rollback or further retraining. The human is notified but not required. Few organizations reach Level 3 for all models — most operate at Level 2 for high-impact models and Level 1 for lower-impact models.

Where should you aim? Level 2 for any model that serves production traffic. Level 1 for experimental or low-impact models. Level 3 only if the model serves high-volume, time-sensitive traffic (e.g., real-time recommendations, fraud detection) where human-in-the-loop deployment introduces unacceptable latency. The jump from Level 0 to Level 1 saves the most engineering hours per unit of investment. The jump from Level 1 to Level 2 saves the most production incidents.


29.3 CI/CD for ML: The Three-Artifact Pipeline

Traditional CI/CD pipelines assume a single artifact type: code. The pipeline checks out code, runs tests, builds a binary, and deploys it. ML CI/CD must handle three artifact types, each with distinct versioning and testing requirements.

The Three Artifacts

Artifact Versioning Testing Promotion Trigger
Code Git (commit SHA) Unit tests, integration tests, linting Pull request merge
Data Data version (DVC hash, Delta Lake version, partition date) Schema validation, statistical tests, freshness Pipeline schedule or sensor
Model Model registry version (MLflow run ID) Behavioral tests, validation gate, shadow evaluation Automated or manual approval

The key insight is that these three artifacts change on different timelines. Code changes when engineers push commits — perhaps daily. Data changes when new events arrive — perhaps hourly. Models change when retraining completes — perhaps weekly. A CI/CD pipeline that triggers only on code changes misses the data and model dimensions entirely.

Pipeline Architecture

A complete ML CI/CD pipeline has three parallel trigger paths that converge at the deployment stage:

Code Change Path:
  git push → lint + type check → unit tests → integration tests → build Docker image → push to registry

Data Change Path:
  new partition arrives → schema validation (GE) → statistical tests (PSI) → data contract check → trigger retraining if thresholds met

Model Change Path:
  training completes → offline evaluation → behavioral tests → validation gate (champion-challenger) → register in MLflow → trigger deployment

Deployment Path (converges):
  model registered → shadow mode → canary (10%) → canary evaluation → progressive rollout (25%, 50%, 100%) → promotion to champion

Implementing the Code Path

The code path uses standard CI/CD tooling — GitHub Actions, GitLab CI, or Jenkins — with ML-specific additions.

"""
streamrec_ci.py — CI pipeline configuration for StreamRec serving code.

Implements the code-path CI/CD pipeline as a Python-defined GitHub Actions
workflow configuration. In practice, this would be a YAML workflow file;
the Python representation here makes the logic explicit and testable.
"""

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum


class CIStage(Enum):
    """Stages in the CI pipeline."""
    LINT = "lint"
    TYPE_CHECK = "type_check"
    UNIT_TEST = "unit_test"
    INTEGRATION_TEST = "integration_test"
    BUILD = "build"
    PUSH = "push"
    TRIGGER_DEPLOYMENT = "trigger_deployment"


@dataclass
class CIStep:
    """A single step in the CI pipeline.

    Attributes:
        name: Human-readable step name.
        stage: Pipeline stage this step belongs to.
        command: Shell command to execute.
        timeout_minutes: Maximum execution time.
        retry_count: Number of retries on failure.
        required: Whether pipeline fails if this step fails.
    """
    name: str
    stage: CIStage
    command: str
    timeout_minutes: int = 10
    retry_count: int = 0
    required: bool = True


@dataclass
class MLCIPipeline:
    """CI pipeline for ML serving code.

    Defines the steps for linting, testing, building, and pushing
    the serving Docker image. The pipeline runs on every push to
    the main branch and on every pull request.

    Attributes:
        project_name: Name of the ML project.
        python_version: Python version for the CI environment.
        docker_registry: Docker registry URL.
        steps: Ordered list of CI steps.
    """
    project_name: str
    python_version: str = "3.11"
    docker_registry: str = "us-central1-docker.pkg.dev/streamrec/models"
    steps: List[CIStep] = field(default_factory=list)

    def __post_init__(self) -> None:
        if not self.steps:
            self.steps = self._default_steps()

    def _default_steps(self) -> List[CIStep]:
        """Generate the default CI pipeline steps."""
        return [
            CIStep(
                name="Lint (ruff)",
                stage=CIStage.LINT,
                command="ruff check src/ tests/ --select E,W,F,I",
                timeout_minutes=5,
            ),
            CIStep(
                name="Type check (mypy)",
                stage=CIStage.TYPE_CHECK,
                command="mypy src/ --strict --ignore-missing-imports",
                timeout_minutes=10,
            ),
            CIStep(
                name="Unit tests",
                stage=CIStage.UNIT_TEST,
                command=(
                    "pytest tests/unit/ -v --cov=src --cov-report=xml "
                    "--cov-fail-under=85"
                ),
                timeout_minutes=15,
            ),
            CIStep(
                name="Integration tests (serving)",
                stage=CIStage.INTEGRATION_TEST,
                command=(
                    "pytest tests/integration/ -v -m 'not gpu' "
                    "--timeout=300"
                ),
                timeout_minutes=30,
                retry_count=1,
            ),
            CIStep(
                name="Build Docker image",
                stage=CIStage.BUILD,
                command=(
                    "docker build -t {registry}/{project}:${{GIT_SHA}} "
                    "-f Dockerfile.serving ."
                ).format(
                    registry="us-central1-docker.pkg.dev/streamrec/models",
                    project="streamrec-serving",
                ),
                timeout_minutes=20,
            ),
            CIStep(
                name="Push Docker image",
                stage=CIStage.PUSH,
                command=(
                    "docker push {registry}/{project}:${{GIT_SHA}}"
                ).format(
                    registry="us-central1-docker.pkg.dev/streamrec/models",
                    project="streamrec-serving",
                ),
                timeout_minutes=10,
            ),
        ]

    def validate_pipeline(self) -> Dict[str, bool]:
        """Validate pipeline configuration.

        Returns:
            Dictionary mapping check names to pass/fail status.
        """
        checks: Dict[str, bool] = {}

        # Every stage must have at least one step
        stages_covered = {step.stage for step in self.steps}
        required_stages = {CIStage.LINT, CIStage.UNIT_TEST, CIStage.BUILD}
        checks["required_stages_covered"] = required_stages.issubset(
            stages_covered
        )

        # No step should have timeout > 60 minutes
        checks["reasonable_timeouts"] = all(
            step.timeout_minutes <= 60 for step in self.steps
        )

        # At least one required step per stage
        for stage in required_stages:
            stage_steps = [s for s in self.steps if s.stage == stage]
            checks[f"{stage.value}_has_required_step"] = any(
                s.required for s in stage_steps
            )

        return checks

    def estimated_duration_minutes(self) -> int:
        """Estimate total pipeline duration assuming sequential execution."""
        return sum(step.timeout_minutes for step in self.steps)


# ── Instantiate the StreamRec CI pipeline ────────────────────────────────
streamrec_ci = MLCIPipeline(project_name="streamrec")
validation = streamrec_ci.validate_pipeline()
assert all(validation.values()), f"CI pipeline validation failed: {validation}"
# Estimated duration: ~90 minutes worst case (sequential), ~35 minutes (parallel)

Model Versioning and the Model Registry

The model registry is the bridge between training and deployment. Chapter 27 registered models in MLflow at the end of the Dagster pipeline. Here, we formalize the registry's role in CI/CD.

A model registry must support five operations:

  1. Register: Store a model artifact with metadata (metrics, parameters, data version, code version, training date, feature schema).
  2. Version: Assign a monotonically increasing version number within a model name.
  3. Stage transition: Move a model between stages: NoneStagingProductionArchived.
  4. Query: Retrieve the current production model, or any historical version.
  5. Lineage: Given a model version, retrieve the exact code commit, data version, and hyperparameters that produced it.
"""
model_registry.py — Model versioning and stage management for CI/CD.

Wraps MLflow's model registry with deployment-specific logic:
stage transitions, promotion rules, and artifact lineage.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum
import hashlib
import json


class ModelStage(Enum):
    """Model lifecycle stages in the registry."""
    NONE = "None"
    STAGING = "Staging"
    SHADOW = "Shadow"
    CANARY = "Canary"
    PRODUCTION = "Production"
    ARCHIVED = "Archived"


VALID_TRANSITIONS: Dict[ModelStage, List[ModelStage]] = {
    ModelStage.NONE: [ModelStage.STAGING],
    ModelStage.STAGING: [ModelStage.SHADOW, ModelStage.ARCHIVED],
    ModelStage.SHADOW: [ModelStage.CANARY, ModelStage.ARCHIVED],
    ModelStage.CANARY: [ModelStage.PRODUCTION, ModelStage.ARCHIVED],
    ModelStage.PRODUCTION: [ModelStage.ARCHIVED],
    ModelStage.ARCHIVED: [],
}


@dataclass
class ModelArtifact:
    """A versioned model artifact with full lineage.

    Attributes:
        model_name: Registered model name (e.g., 'streamrec-retrieval').
        version: Monotonically increasing version number.
        stage: Current lifecycle stage.
        metrics: Evaluation metrics from offline validation.
        params: Hyperparameters used during training.
        code_version: Git commit SHA of the training code.
        data_version: Hash or version of the training data.
        feature_schema: Schema of input features (column names and types).
        training_date: Timestamp when training completed.
        artifact_uri: URI to the serialized model artifact.
        tags: Additional metadata tags.
    """
    model_name: str
    version: int
    stage: ModelStage = ModelStage.NONE
    metrics: Dict[str, float] = field(default_factory=dict)
    params: Dict[str, Any] = field(default_factory=dict)
    code_version: str = ""
    data_version: str = ""
    feature_schema: Dict[str, str] = field(default_factory=dict)
    training_date: datetime = field(default_factory=datetime.utcnow)
    artifact_uri: str = ""
    tags: Dict[str, str] = field(default_factory=dict)

    @property
    def lineage_hash(self) -> str:
        """Compute a deterministic hash of the model's full lineage.

        This hash uniquely identifies the combination of code, data,
        and hyperparameters that produced this model. Two models with
        the same lineage hash were trained from identical inputs
        (though non-deterministic training may produce different weights).
        """
        lineage = {
            "code_version": self.code_version,
            "data_version": self.data_version,
            "params": self.params,
        }
        lineage_str = json.dumps(lineage, sort_keys=True)
        return hashlib.sha256(lineage_str.encode()).hexdigest()[:16]


@dataclass
class ModelRegistry:
    """In-memory model registry for CI/CD pipeline demonstration.

    In production, this wraps MLflow's Model Registry API. The in-memory
    implementation here makes the stage transition logic explicit.

    Attributes:
        models: Mapping from (model_name, version) to ModelArtifact.
    """
    models: Dict[tuple, ModelArtifact] = field(default_factory=dict)

    def register(self, artifact: ModelArtifact) -> ModelArtifact:
        """Register a new model version.

        Args:
            artifact: The model artifact to register.

        Returns:
            The registered artifact with assigned version.
        """
        # Assign next version number
        existing_versions = [
            v for (name, v) in self.models
            if name == artifact.model_name
        ]
        artifact.version = max(existing_versions, default=0) + 1
        self.models[(artifact.model_name, artifact.version)] = artifact
        return artifact

    def transition_stage(
        self,
        model_name: str,
        version: int,
        target_stage: ModelStage,
        reason: str = "",
    ) -> ModelArtifact:
        """Transition a model to a new lifecycle stage.

        Enforces valid transitions: None→Staging→Shadow→Canary→Production.
        Any stage can transition to Archived (rollback or retirement).

        Args:
            model_name: Registered model name.
            version: Model version number.
            target_stage: Target lifecycle stage.
            reason: Human-readable reason for the transition.

        Returns:
            Updated model artifact.

        Raises:
            ValueError: If the transition is not valid.
            KeyError: If the model version does not exist.
        """
        key = (model_name, version)
        if key not in self.models:
            raise KeyError(
                f"Model {model_name} version {version} not found."
            )

        artifact = self.models[key]
        valid_targets = VALID_TRANSITIONS.get(artifact.stage, [])

        if target_stage not in valid_targets:
            raise ValueError(
                f"Invalid transition: {artifact.stage.value} → "
                f"{target_stage.value}. Valid targets: "
                f"{[s.value for s in valid_targets]}"
            )

        # If promoting to Production, archive the current Production model
        if target_stage == ModelStage.PRODUCTION:
            for k, m in self.models.items():
                if (
                    m.model_name == model_name
                    and m.stage == ModelStage.PRODUCTION
                ):
                    m.stage = ModelStage.ARCHIVED
                    m.tags["archived_reason"] = (
                        f"Replaced by version {version}"
                    )

        artifact.stage = target_stage
        artifact.tags["stage_transition_reason"] = reason
        artifact.tags["stage_transition_time"] = (
            datetime.utcnow().isoformat()
        )
        return artifact

    def get_production_model(
        self, model_name: str
    ) -> Optional[ModelArtifact]:
        """Retrieve the current production model.

        Args:
            model_name: Registered model name.

        Returns:
            The production model artifact, or None if no model is in
            production.
        """
        for (name, _), artifact in self.models.items():
            if name == model_name and artifact.stage == ModelStage.PRODUCTION:
                return artifact
        return None

    def get_model_history(
        self, model_name: str
    ) -> List[ModelArtifact]:
        """Retrieve all versions of a model, ordered by version.

        Args:
            model_name: Registered model name.

        Returns:
            List of model artifacts sorted by version (ascending).
        """
        artifacts = [
            artifact
            for (name, _), artifact in self.models.items()
            if name == model_name
        ]
        return sorted(artifacts, key=lambda a: a.version)

The stage transition enforcement is critical. A model cannot jump from None to Production — it must pass through Staging, Shadow, and Canary first. This encoding of the deployment process into the registry ensures that no model reaches production without completing every evaluation stage, even if an engineer attempts to bypass the pipeline.


29.4 Artifact Lineage and Reproducibility

A model in production must be fully reproducible: given the model's version number, an engineer must be able to reconstruct the exact training environment — the code, the data, the hyperparameters, and the random seed — that produced it. This is not merely a best practice; in regulated industries (credit scoring, healthcare, insurance), it is a legal requirement.

Artifact lineage connects three version identifiers:

Model v17 (MLflow)
  ├── Code: git commit abc123def (GitHub)
  ├── Data: delta_version=47, partition_date=2025-03-14 (Delta Lake)
  ├── Features: feast_feature_set v3.2 (Feast)
  ├── Parameters: lr=0.001, batch_size=256, epochs=15
  ├── Environment: Docker image streamrec-train:abc123def
  └── Evaluation: Recall@20=0.214, NDCG@20=0.186, AUC=0.847

When a production incident occurs, lineage enables rapid root cause analysis: was the regression caused by a code change (compare the git diffs), a data change (compare the data versions), or a training artifact (compare the hyperparameters and random seeds)?

"""
lineage.py — Artifact lineage tracking for ML deployments.
"""

from dataclasses import dataclass, field
from typing import Dict, Optional, List
from datetime import datetime
import json


@dataclass
class ArtifactLineage:
    """Complete lineage record for a deployed model.

    Captures every input that contributed to a model artifact,
    enabling full reproducibility and root cause analysis.

    Attributes:
        model_name: Registered model name.
        model_version: Model version in the registry.
        code_commit: Git commit SHA.
        code_branch: Git branch name.
        code_repo: Git repository URL.
        data_source: Data lake or warehouse identifier.
        data_version: Data version or partition identifier.
        data_hash: Hash of the training dataset.
        data_row_count: Number of training examples.
        data_date_range: Date range of training data.
        feature_set_version: Version of the feature engineering pipeline.
        feature_names: List of feature names used.
        hyperparameters: Training hyperparameters.
        random_seed: Random seed for reproducibility.
        docker_image: Docker image used for training.
        gpu_type: GPU type used for training.
        training_duration_seconds: Wall-clock training time.
        evaluation_metrics: Offline evaluation metrics.
        validation_results: Results from validation gate (Chapter 28).
        created_at: Timestamp of lineage record creation.
    """
    model_name: str
    model_version: int
    code_commit: str
    code_branch: str = "main"
    code_repo: str = ""
    data_source: str = ""
    data_version: str = ""
    data_hash: str = ""
    data_row_count: int = 0
    data_date_range: str = ""
    feature_set_version: str = ""
    feature_names: List[str] = field(default_factory=list)
    hyperparameters: Dict[str, float] = field(default_factory=dict)
    random_seed: int = 42
    docker_image: str = ""
    gpu_type: str = ""
    training_duration_seconds: float = 0.0
    evaluation_metrics: Dict[str, float] = field(default_factory=dict)
    validation_results: Dict[str, bool] = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)

    def to_json(self) -> str:
        """Serialize lineage to JSON for storage."""
        data = {
            "model_name": self.model_name,
            "model_version": self.model_version,
            "code": {
                "commit": self.code_commit,
                "branch": self.code_branch,
                "repo": self.code_repo,
            },
            "data": {
                "source": self.data_source,
                "version": self.data_version,
                "hash": self.data_hash,
                "row_count": self.data_row_count,
                "date_range": self.data_date_range,
            },
            "features": {
                "set_version": self.feature_set_version,
                "names": self.feature_names,
            },
            "training": {
                "hyperparameters": self.hyperparameters,
                "random_seed": self.random_seed,
                "docker_image": self.docker_image,
                "gpu_type": self.gpu_type,
                "duration_seconds": self.training_duration_seconds,
            },
            "evaluation": self.evaluation_metrics,
            "validation": self.validation_results,
            "created_at": self.created_at.isoformat(),
        }
        return json.dumps(data, indent=2)

    def diff(self, other: "ArtifactLineage") -> Dict[str, Dict[str, str]]:
        """Compare two lineage records to identify differences.

        Useful for root cause analysis: when a model regresses,
        comparing the lineage of the current and previous versions
        identifies what changed.

        Args:
            other: Another lineage record to compare against.

        Returns:
            Dictionary of changed fields with old and new values.
        """
        changes: Dict[str, Dict[str, str]] = {}

        if self.code_commit != other.code_commit:
            changes["code_commit"] = {
                "old": other.code_commit,
                "new": self.code_commit,
            }
        if self.data_version != other.data_version:
            changes["data_version"] = {
                "old": other.data_version,
                "new": self.data_version,
            }
        if self.data_row_count != other.data_row_count:
            changes["data_row_count"] = {
                "old": str(other.data_row_count),
                "new": str(self.data_row_count),
            }
        if self.hyperparameters != other.hyperparameters:
            changes["hyperparameters"] = {
                "old": json.dumps(other.hyperparameters),
                "new": json.dumps(self.hyperparameters),
            }
        if self.feature_set_version != other.feature_set_version:
            changes["feature_set_version"] = {
                "old": other.feature_set_version,
                "new": self.feature_set_version,
            }

        return changes

29.5 Shadow Mode: Evaluation Without Risk

Shadow mode is the first deployment stage after the validation gate. In shadow mode, the challenger model receives a copy of every production request, computes its prediction, and logs the result — but the prediction is never served to users. The champion model continues to serve all traffic. Shadow mode evaluates the challenger on real production data without any risk to user experience.

Why Shadow Mode?

Offline evaluation (holdout sets, cross-validation) tests a model on historical data. Shadow mode tests a model on current data — the actual requests arriving in production right now. The gap between historical and current data can be substantial:

  • Distribution shift: User behavior changes over time. A model evaluated on last week's data may perform differently on today's traffic.
  • Serving-training skew: The features computed during training may differ subtly from the features computed during serving, due to different code paths, latency constraints, or data freshness. Shadow mode tests the actual serving path, not the training path.
  • Edge cases: Production traffic includes edge cases that the holdout set may not: new users with no history, items just added to the catalog, users on unusual devices, bots.
  • Latency: Offline evaluation does not measure serving latency. Shadow mode measures how long the model takes to produce predictions under production load.

Architecture

Production Traffic
       │
       ├──────────────────────► Champion Model ──► User Response
       │                              │
       └──── (async copy) ──► Shadow Model ──► Metrics Logger (no user impact)
                                      │
                                      ▼
                               Shadow Dashboard
                             (latency, predictions,
                              distribution comparison)

The shadow infrastructure must satisfy three requirements:

  1. Zero user impact: Shadow predictions are never returned to users, even if the shadow model is faster or "better."
  2. Identical inputs: The shadow model receives the exact same features as the champion model for every request. Any difference in inputs would confound the comparison.
  3. Bounded resource cost: Shadow mode doubles the inference compute. The shadow model should run on a separate resource pool so that it cannot degrade the champion's latency.

Implementation

"""
shadow_mode.py — Shadow deployment for model evaluation on production traffic.

The ShadowRouter sends every production request to both the champion
and challenger models. The champion's response is returned to the user;
the challenger's response is logged for offline comparison.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timedelta
import statistics
import math


@dataclass
class ShadowPrediction:
    """A single prediction from both champion and challenger.

    Attributes:
        request_id: Unique request identifier.
        timestamp: When the request was received.
        features: Input features (shared by both models).
        champion_scores: Champion model's predicted scores.
        challenger_scores: Challenger model's predicted scores.
        champion_latency_ms: Champion inference latency.
        challenger_latency_ms: Challenger inference latency.
    """
    request_id: str
    timestamp: datetime
    features: Dict[str, Any]
    champion_scores: List[float]
    challenger_scores: List[float]
    champion_latency_ms: float
    challenger_latency_ms: float


@dataclass
class ShadowEvaluator:
    """Evaluates shadow mode results to decide whether to promote.

    Collects shadow predictions over a configurable window and
    computes comparison metrics: rank correlation, latency comparison,
    score distribution divergence, and (when outcomes arrive) quality
    metrics.

    Attributes:
        model_name: Name of the model being evaluated.
        champion_version: Current champion model version.
        challenger_version: Challenger model version in shadow.
        evaluation_window_hours: Duration of the shadow evaluation.
        min_requests: Minimum predictions required for evaluation.
        max_latency_p99_ms: Maximum acceptable p99 latency for challenger.
        max_latency_ratio: Maximum ratio of challenger/champion p99 latency.
        predictions: Collected shadow predictions.
    """
    model_name: str
    champion_version: int
    challenger_version: int
    evaluation_window_hours: int = 168  # 1 week
    min_requests: int = 100_000
    max_latency_p99_ms: float = 50.0
    max_latency_ratio: float = 1.5
    predictions: List[ShadowPrediction] = field(default_factory=list)

    def add_prediction(self, prediction: ShadowPrediction) -> None:
        """Record a shadow prediction."""
        self.predictions.append(prediction)

    def compute_latency_comparison(self) -> Dict[str, float]:
        """Compare latency distributions of champion and challenger.

        Returns:
            Dictionary with p50, p95, p99 latencies for both models
            and the p99 ratio.
        """
        if not self.predictions:
            return {}

        champ_latencies = sorted(
            p.champion_latency_ms for p in self.predictions
        )
        chall_latencies = sorted(
            p.challenger_latency_ms for p in self.predictions
        )

        def percentile(data: List[float], pct: float) -> float:
            idx = int(len(data) * pct / 100)
            return data[min(idx, len(data) - 1)]

        champ_p99 = percentile(champ_latencies, 99)
        chall_p99 = percentile(chall_latencies, 99)

        return {
            "champion_p50_ms": percentile(champ_latencies, 50),
            "champion_p95_ms": percentile(champ_latencies, 95),
            "champion_p99_ms": champ_p99,
            "challenger_p50_ms": percentile(chall_latencies, 50),
            "challenger_p95_ms": percentile(chall_latencies, 95),
            "challenger_p99_ms": chall_p99,
            "p99_ratio": chall_p99 / champ_p99 if champ_p99 > 0 else 0.0,
        }

    def compute_rank_correlation(self) -> float:
        """Compute Spearman rank correlation between champion and challenger.

        High correlation (>0.95) indicates the challenger ranks items
        similarly to the champion. Low correlation indicates substantially
        different behavior — which may be desirable (better model) or
        concerning (broken model).

        Returns:
            Spearman rank correlation coefficient.
        """
        if len(self.predictions) < 2:
            return 0.0

        # Compare the top-K ranking for each request
        rank_diffs_squared: List[float] = []

        for pred in self.predictions:
            n = min(len(pred.champion_scores), len(pred.challenger_scores))
            if n < 2:
                continue

            # Rank the scores (higher score = rank 1)
            champ_ranks = _compute_ranks(pred.champion_scores[:n])
            chall_ranks = _compute_ranks(pred.challenger_scores[:n])

            for i in range(n):
                diff = champ_ranks[i] - chall_ranks[i]
                rank_diffs_squared.append(diff * diff)

        if not rank_diffs_squared:
            return 0.0

        n_pairs = len(rank_diffs_squared)
        # Approximate items-per-request for Spearman formula
        items_per_req = n_pairs / max(len(self.predictions), 1)
        d_sq_sum = sum(rank_diffs_squared)

        # Spearman: 1 - 6 * sum(d^2) / (n * (n^2 - 1))
        denominator = items_per_req * (items_per_req ** 2 - 1)
        if denominator == 0:
            return 0.0

        rho = 1.0 - (6.0 * d_sq_sum) / (len(self.predictions) * denominator)
        return max(-1.0, min(1.0, rho))

    def compute_score_divergence(self) -> Dict[str, float]:
        """Compute distributional divergence between champion and challenger.

        Returns the mean absolute difference and the Jensen-Shannon
        divergence of score distributions.

        Returns:
            Dictionary with divergence metrics.
        """
        if not self.predictions:
            return {}

        abs_diffs: List[float] = []
        for pred in self.predictions:
            n = min(len(pred.champion_scores), len(pred.challenger_scores))
            for i in range(n):
                abs_diffs.append(
                    abs(pred.champion_scores[i] - pred.challenger_scores[i])
                )

        return {
            "mean_absolute_score_diff": statistics.mean(abs_diffs),
            "median_absolute_score_diff": statistics.median(abs_diffs),
            "max_absolute_score_diff": max(abs_diffs),
            "std_absolute_score_diff": (
                statistics.stdev(abs_diffs) if len(abs_diffs) > 1 else 0.0
            ),
        }

    def evaluate(self) -> Dict[str, Any]:
        """Run the full shadow evaluation and return a promotion decision.

        Returns:
            Dictionary with evaluation results and a promote/reject/extend
            decision.
        """
        n_predictions = len(self.predictions)

        if n_predictions < self.min_requests:
            return {
                "decision": "extend",
                "reason": (
                    f"Insufficient predictions: {n_predictions} < "
                    f"{self.min_requests}"
                ),
                "n_predictions": n_predictions,
            }

        latency = self.compute_latency_comparison()
        rank_corr = self.compute_rank_correlation()
        divergence = self.compute_score_divergence()

        # Decision logic
        issues: List[str] = []

        # Check latency
        if latency.get("challenger_p99_ms", 0) > self.max_latency_p99_ms:
            issues.append(
                f"Challenger p99 latency {latency['challenger_p99_ms']:.1f}ms "
                f"> {self.max_latency_p99_ms}ms threshold"
            )

        if latency.get("p99_ratio", 0) > self.max_latency_ratio:
            issues.append(
                f"Latency ratio {latency['p99_ratio']:.2f} > "
                f"{self.max_latency_ratio} threshold"
            )

        # Check for anomalous divergence (very different predictions)
        mean_diff = divergence.get("mean_absolute_score_diff", 0)
        if mean_diff > 0.3:
            issues.append(
                f"Mean score divergence {mean_diff:.3f} > 0.3 threshold — "
                f"challenger produces substantially different predictions"
            )

        decision = "reject" if issues else "promote"

        return {
            "decision": decision,
            "issues": issues,
            "n_predictions": n_predictions,
            "latency": latency,
            "rank_correlation": rank_corr,
            "score_divergence": divergence,
            "champion_version": self.champion_version,
            "challenger_version": self.challenger_version,
        }


def _compute_ranks(scores: List[float]) -> List[float]:
    """Compute ranks for a list of scores (highest score = rank 1).

    Args:
        scores: List of numeric scores.

    Returns:
        List of ranks (1-indexed, with average ranks for ties).
    """
    indexed = sorted(
        enumerate(scores), key=lambda x: -x[1]
    )
    ranks = [0.0] * len(scores)
    for rank_idx, (orig_idx, _) in enumerate(indexed):
        ranks[orig_idx] = float(rank_idx + 1)
    return ranks

Shadow Mode Duration

How long should shadow mode run? The answer depends on three factors:

  1. Traffic volume. Higher traffic produces statistically significant comparisons faster. StreamRec, serving 20 million requests per day, accumulates 100,000 shadow predictions in minutes. A lower-traffic service might need days.

  2. Outcome delay. If the evaluation metric depends on user outcomes (e.g., did the user click?), shadow mode must run long enough for outcomes to arrive. For StreamRec, click outcomes arrive within seconds; completion outcomes may take hours; subscription-level outcomes may take weeks.

  3. Temporal coverage. A shadow evaluation should cover at least one full business cycle. For StreamRec, traffic patterns differ between weekdays and weekends, so a minimum 7-day shadow period ensures the challenger is evaluated across both patterns.

For StreamRec, the standard shadow evaluation window is 7 days with a minimum of 100,000 predictions. The evaluation focuses on latency, rank correlation, and score distribution divergence. Quality metrics (CTR comparison) are computed after outcomes arrive but do not gate the shadow → canary transition; they are tracked as metadata.


29.6 Canary Deployments: Gradual Traffic Exposure

A canary deployment routes a small percentage of production traffic to the new model while the majority continues to be served by the champion. Unlike shadow mode, canary predictions are served to real users — the canary model's output determines what users see. This makes canary evaluation more realistic (it measures actual user behavior, not predicted behavior) but also more risky (a bad canary model affects real users).

The Canary Pattern

                         ┌──── 90% ────► Champion Model ──► User Response
Production Traffic ──────┤
                         └──── 10% ────► Canary Model ──► User Response
                                              │
                                              ▼
                                    Canary Dashboard
                                  (CTR, latency, errors,
                                   engagement comparison)

The traffic split is controlled by a feature flag or a load balancer configuration. The critical requirement is that the split is consistent per user: a user should see recommendations from the same model throughout the canary period. If users bounce between champion and canary, the comparison is confounded.

Statistical Evaluation of Canary Results

A canary deployment is a controlled experiment: two groups (champion and canary), one treatment (the new model), and one or more outcome metrics. The evaluation must account for statistical significance — a small difference in CTR between champion and canary may be noise.

"""
canary.py — Canary deployment with statistical evaluation.

Implements the canary deployment pattern: route a configurable
percentage of traffic to the challenger model, collect outcome
metrics, and evaluate using sequential hypothesis testing.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from enum import Enum
import math
import statistics


class CanaryDecision(Enum):
    """Possible outcomes of canary evaluation."""
    PROMOTE = "promote"
    ROLLBACK = "rollback"
    CONTINUE = "continue"


@dataclass
class CanaryMetrics:
    """Outcome metrics for a canary deployment.

    Attributes:
        champion_impressions: Number of requests served by champion.
        canary_impressions: Number of requests served by canary.
        champion_clicks: Number of clicks on champion recommendations.
        canary_clicks: Number of clicks on canary recommendations.
        champion_completions: Number of content completions (champion).
        canary_completions: Number of content completions (canary).
        champion_latency_p99_ms: Champion p99 serving latency.
        canary_latency_p99_ms: Canary p99 serving latency.
        champion_error_rate: Fraction of champion requests with errors.
        canary_error_rate: Fraction of canary requests with errors.
    """
    champion_impressions: int = 0
    canary_impressions: int = 0
    champion_clicks: int = 0
    canary_clicks: int = 0
    champion_completions: int = 0
    canary_completions: int = 0
    champion_latency_p99_ms: float = 0.0
    canary_latency_p99_ms: float = 0.0
    champion_error_rate: float = 0.0
    canary_error_rate: float = 0.0

    @property
    def champion_ctr(self) -> float:
        """Click-through rate for champion model."""
        if self.champion_impressions == 0:
            return 0.0
        return self.champion_clicks / self.champion_impressions

    @property
    def canary_ctr(self) -> float:
        """Click-through rate for canary model."""
        if self.canary_impressions == 0:
            return 0.0
        return self.canary_clicks / self.canary_impressions

    @property
    def ctr_lift(self) -> float:
        """Relative CTR lift of canary over champion."""
        if self.champion_ctr == 0:
            return 0.0
        return (self.canary_ctr - self.champion_ctr) / self.champion_ctr

    @property
    def champion_completion_rate(self) -> float:
        """Content completion rate for champion model."""
        if self.champion_impressions == 0:
            return 0.0
        return self.champion_completions / self.champion_impressions

    @property
    def canary_completion_rate(self) -> float:
        """Content completion rate for canary model."""
        if self.canary_impressions == 0:
            return 0.0
        return self.canary_completions / self.canary_impressions


@dataclass
class CanaryConfig:
    """Configuration for a canary deployment.

    Attributes:
        model_name: Name of the model.
        champion_version: Current champion version.
        challenger_version: Challenger version in canary.
        canary_percentage: Fraction of traffic to route to canary (0-1).
        min_canary_impressions: Minimum impressions before evaluation.
        max_canary_duration_hours: Maximum canary duration before forced decision.
        min_ctr_lift: Minimum CTR lift to promote (-0.01 means accept
            up to 1% regression).
        max_latency_increase_ms: Maximum acceptable p99 latency increase.
        max_error_rate: Maximum acceptable error rate for canary.
        significance_level: Alpha for hypothesis testing.
    """
    model_name: str
    champion_version: int
    challenger_version: int
    canary_percentage: float = 0.10
    min_canary_impressions: int = 500_000
    max_canary_duration_hours: int = 72
    min_ctr_lift: float = -0.01
    max_latency_increase_ms: float = 10.0
    max_error_rate: float = 0.005
    significance_level: float = 0.05


def evaluate_canary(
    config: CanaryConfig,
    metrics: CanaryMetrics,
) -> Dict[str, object]:
    """Evaluate canary deployment and recommend a decision.

    Uses a two-proportion z-test for CTR comparison and checks
    guardrail metrics (latency, error rate).

    Args:
        config: Canary deployment configuration.
        metrics: Collected canary metrics.

    Returns:
        Dictionary with decision, statistical results, and reasoning.
    """
    # Check minimum sample size
    if metrics.canary_impressions < config.min_canary_impressions:
        return {
            "decision": CanaryDecision.CONTINUE,
            "reason": (
                f"Insufficient canary impressions: "
                f"{metrics.canary_impressions:,} < "
                f"{config.min_canary_impressions:,}"
            ),
            "metrics_summary": _metrics_summary(metrics),
        }

    # Guardrail checks (immediate rollback if violated)
    guardrail_issues: List[str] = []

    if metrics.canary_error_rate > config.max_error_rate:
        guardrail_issues.append(
            f"Error rate {metrics.canary_error_rate:.4f} > "
            f"{config.max_error_rate} threshold"
        )

    latency_increase = (
        metrics.canary_latency_p99_ms - metrics.champion_latency_p99_ms
    )
    if latency_increase > config.max_latency_increase_ms:
        guardrail_issues.append(
            f"Latency increase {latency_increase:.1f}ms > "
            f"{config.max_latency_increase_ms}ms threshold"
        )

    if guardrail_issues:
        return {
            "decision": CanaryDecision.ROLLBACK,
            "reason": "Guardrail violation(s): " + "; ".join(guardrail_issues),
            "guardrail_issues": guardrail_issues,
            "metrics_summary": _metrics_summary(metrics),
        }

    # Statistical test: two-proportion z-test for CTR
    z_stat, p_value = _two_proportion_z_test(
        successes_a=metrics.champion_clicks,
        trials_a=metrics.champion_impressions,
        successes_b=metrics.canary_clicks,
        trials_b=metrics.canary_impressions,
    )

    is_significant = p_value < config.significance_level
    ctr_lift = metrics.ctr_lift

    # Decision logic
    if is_significant and ctr_lift < config.min_ctr_lift:
        decision = CanaryDecision.ROLLBACK
        reason = (
            f"Statistically significant CTR regression: "
            f"{ctr_lift:+.3%} (p={p_value:.4f})"
        )
    elif is_significant and ctr_lift >= 0:
        decision = CanaryDecision.PROMOTE
        reason = (
            f"Statistically significant CTR improvement: "
            f"{ctr_lift:+.3%} (p={p_value:.4f})"
        )
    elif not is_significant:
        # Not significant — promote if within tolerance (non-inferiority)
        if ctr_lift >= config.min_ctr_lift:
            decision = CanaryDecision.PROMOTE
            reason = (
                f"CTR difference not significant (p={p_value:.4f}), "
                f"but within tolerance ({ctr_lift:+.3%} >= "
                f"{config.min_ctr_lift:+.3%}). Promoting on non-inferiority."
            )
        else:
            decision = CanaryDecision.CONTINUE
            reason = (
                f"CTR difference not significant (p={p_value:.4f}) "
                f"and below tolerance. Continuing canary."
            )
    else:
        decision = CanaryDecision.CONTINUE
        reason = "Inconclusive. Continuing canary."

    return {
        "decision": decision,
        "reason": reason,
        "z_statistic": z_stat,
        "p_value": p_value,
        "is_significant": is_significant,
        "ctr_lift": ctr_lift,
        "metrics_summary": _metrics_summary(metrics),
    }


def _two_proportion_z_test(
    successes_a: int,
    trials_a: int,
    successes_b: int,
    trials_b: int,
) -> Tuple[float, float]:
    """Two-proportion z-test for comparing conversion rates.

    Tests H0: p_a = p_b against H1: p_a != p_b (two-sided).

    Args:
        successes_a: Number of successes in group A.
        trials_a: Number of trials in group A.
        successes_b: Number of successes in group B.
        trials_b: Number of trials in group B.

    Returns:
        Tuple of (z-statistic, p-value).
    """
    if trials_a == 0 or trials_b == 0:
        return 0.0, 1.0

    p_a = successes_a / trials_a
    p_b = successes_b / trials_b

    # Pooled proportion under H0
    p_pool = (successes_a + successes_b) / (trials_a + trials_b)

    if p_pool == 0 or p_pool == 1:
        return 0.0, 1.0

    se = math.sqrt(p_pool * (1 - p_pool) * (1 / trials_a + 1 / trials_b))

    if se == 0:
        return 0.0, 1.0

    z = (p_b - p_a) / se

    # Two-sided p-value using normal CDF approximation
    p_value = 2.0 * _normal_cdf(-abs(z))
    return z, p_value


def _normal_cdf(x: float) -> float:
    """Approximate the standard normal CDF using the error function.

    Args:
        x: Value at which to evaluate the CDF.

    Returns:
        Approximate P(Z <= x) for Z ~ N(0,1).
    """
    return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0)))


def _metrics_summary(metrics: CanaryMetrics) -> Dict[str, float]:
    """Summarize canary metrics for reporting."""
    return {
        "champion_ctr": metrics.champion_ctr,
        "canary_ctr": metrics.canary_ctr,
        "ctr_lift": metrics.ctr_lift,
        "champion_completion_rate": metrics.champion_completion_rate,
        "canary_completion_rate": metrics.canary_completion_rate,
        "champion_p99_ms": metrics.champion_latency_p99_ms,
        "canary_p99_ms": metrics.canary_latency_p99_ms,
        "champion_error_rate": metrics.champion_error_rate,
        "canary_error_rate": metrics.canary_error_rate,
        "champion_impressions": metrics.champion_impressions,
        "canary_impressions": metrics.canary_impressions,
    }

Canary Duration and Traffic Percentage

The canary configuration involves a tradeoff between risk and speed:

Parameter Conservative Moderate Aggressive
Canary % 1-5% 5-10% 10-25%
Duration 7 days 3 days 1 day
Min impressions 1,000,000 500,000 100,000
Use case Financial, healthcare General Low-risk, high-traffic

StreamRec uses the moderate configuration: 10% canary traffic for 3 days, with a minimum of 500,000 canary impressions. At 20 million daily requests, 10% canary produces 2 million impressions per day — well above the 500,000 threshold within the first day. The 3-day duration ensures coverage of weekday and weekend patterns.


29.7 Progressive Rollout: From Canary to Full Traffic

If the canary evaluation succeeds, the deployment pipeline does not immediately switch all traffic to the new model. Instead, it executes a progressive rollout — a staged increase in traffic percentage with monitoring at each stage.

Rollout Schedule

A typical progressive rollout for StreamRec:

Stage Traffic % Duration Monitoring Focus Rollback Trigger
Shadow 0% (parallel) 7 days Latency, rank correlation, prediction distribution p99 > 50ms, rank corr < 0.7
Canary 10% 3 days CTR, completion rate, error rate, latency CTR < -2%, errors > 0.5%, p99 > +10ms
Stage 1 25% 1 day Same as canary Same as canary
Stage 2 50% 1 day Same + engagement duration Engagement < -3%
Full 100% -- Full monitoring (Chapter 30) Any monitoring alert

Each stage is a checkpoint: the deployment pipeline evaluates metrics against thresholds before advancing to the next stage. If any threshold is violated, the pipeline automatically rolls back to the previous stage — not all the way to the champion, but one stage back. If the violation persists after rollback, the pipeline rolls back further.

"""
progressive_rollout.py — Progressive deployment from canary to full traffic.

Implements the staged rollout pattern: shadow → canary → 25% → 50% → 100%,
with monitoring and automatic rollback at each stage.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from enum import Enum


class RolloutStage(Enum):
    """Stages in a progressive rollout."""
    SHADOW = "shadow"
    CANARY = "canary"
    STAGE_25 = "stage_25"
    STAGE_50 = "stage_50"
    FULL = "full"
    ROLLED_BACK = "rolled_back"


STAGE_ORDER = [
    RolloutStage.SHADOW,
    RolloutStage.CANARY,
    RolloutStage.STAGE_25,
    RolloutStage.STAGE_50,
    RolloutStage.FULL,
]

STAGE_TRAFFIC = {
    RolloutStage.SHADOW: 0.0,
    RolloutStage.CANARY: 0.10,
    RolloutStage.STAGE_25: 0.25,
    RolloutStage.STAGE_50: 0.50,
    RolloutStage.FULL: 1.00,
}


@dataclass
class RolloutThreshold:
    """Threshold for a single metric at a rollout stage.

    Attributes:
        metric_name: Name of the metric to monitor.
        min_value: Minimum acceptable value (None = no lower bound).
        max_value: Maximum acceptable value (None = no upper bound).
        comparison_type: 'absolute' for raw values, 'relative' for
            comparison against champion baseline.
    """
    metric_name: str
    min_value: Optional[float] = None
    max_value: Optional[float] = None
    comparison_type: str = "absolute"


@dataclass
class RolloutStageConfig:
    """Configuration for a single rollout stage.

    Attributes:
        stage: The rollout stage.
        traffic_percentage: Fraction of traffic for this stage.
        min_duration_hours: Minimum time to spend at this stage.
        thresholds: Metrics thresholds that must be satisfied to advance.
    """
    stage: RolloutStage
    traffic_percentage: float
    min_duration_hours: int
    thresholds: List[RolloutThreshold] = field(default_factory=list)


@dataclass
class RolloutState:
    """Current state of a progressive rollout.

    Attributes:
        model_name: Name of the model being deployed.
        challenger_version: Version being rolled out.
        champion_version: Current production version.
        current_stage: Current rollout stage.
        stage_started_at: When the current stage began.
        stage_history: History of stage transitions.
        rollback_count: Number of rollbacks during this deployment.
    """
    model_name: str
    challenger_version: int
    champion_version: int
    current_stage: RolloutStage = RolloutStage.SHADOW
    stage_started_at: datetime = field(default_factory=datetime.utcnow)
    stage_history: List[Dict[str, str]] = field(default_factory=list)
    rollback_count: int = 0

    def record_transition(
        self,
        from_stage: RolloutStage,
        to_stage: RolloutStage,
        reason: str,
    ) -> None:
        """Record a stage transition in the history."""
        self.stage_history.append({
            "from": from_stage.value,
            "to": to_stage.value,
            "reason": reason,
            "timestamp": datetime.utcnow().isoformat(),
        })


@dataclass
class ProgressiveRolloutController:
    """Controller for progressive model rollout.

    Manages the state machine that advances (or rolls back) a model
    deployment through shadow → canary → 25% → 50% → 100%.

    Attributes:
        rollout_state: Current deployment state.
        stage_configs: Configuration for each stage.
    """
    rollout_state: RolloutState
    stage_configs: Dict[RolloutStage, RolloutStageConfig] = field(
        default_factory=dict
    )

    def __post_init__(self) -> None:
        if not self.stage_configs:
            self.stage_configs = self._default_configs()

    def _default_configs(self) -> Dict[RolloutStage, RolloutStageConfig]:
        """Default StreamRec rollout configuration."""
        return {
            RolloutStage.SHADOW: RolloutStageConfig(
                stage=RolloutStage.SHADOW,
                traffic_percentage=0.0,
                min_duration_hours=168,  # 7 days
                thresholds=[
                    RolloutThreshold("challenger_p99_ms", max_value=50.0),
                    RolloutThreshold("rank_correlation", min_value=0.70),
                    RolloutThreshold(
                        "mean_score_divergence", max_value=0.30
                    ),
                ],
            ),
            RolloutStage.CANARY: RolloutStageConfig(
                stage=RolloutStage.CANARY,
                traffic_percentage=0.10,
                min_duration_hours=72,  # 3 days
                thresholds=[
                    RolloutThreshold(
                        "ctr_lift", min_value=-0.02,
                        comparison_type="relative",
                    ),
                    RolloutThreshold("canary_error_rate", max_value=0.005),
                    RolloutThreshold(
                        "latency_increase_ms", max_value=10.0,
                        comparison_type="relative",
                    ),
                ],
            ),
            RolloutStage.STAGE_25: RolloutStageConfig(
                stage=RolloutStage.STAGE_25,
                traffic_percentage=0.25,
                min_duration_hours=24,
                thresholds=[
                    RolloutThreshold(
                        "ctr_lift", min_value=-0.02,
                        comparison_type="relative",
                    ),
                    RolloutThreshold("error_rate", max_value=0.005),
                    RolloutThreshold(
                        "latency_increase_ms", max_value=10.0,
                        comparison_type="relative",
                    ),
                ],
            ),
            RolloutStage.STAGE_50: RolloutStageConfig(
                stage=RolloutStage.STAGE_50,
                traffic_percentage=0.50,
                min_duration_hours=24,
                thresholds=[
                    RolloutThreshold(
                        "ctr_lift", min_value=-0.02,
                        comparison_type="relative",
                    ),
                    RolloutThreshold(
                        "engagement_duration_lift", min_value=-0.03,
                        comparison_type="relative",
                    ),
                    RolloutThreshold("error_rate", max_value=0.005),
                ],
            ),
        }

    def check_advancement(
        self,
        current_metrics: Dict[str, float],
    ) -> Dict[str, object]:
        """Check whether the rollout should advance to the next stage.

        Evaluates all thresholds for the current stage. If all pass
        and the minimum duration has elapsed, recommends advancement.
        If any threshold is violated, recommends rollback.

        Args:
            current_metrics: Dictionary of current metric values.

        Returns:
            Dictionary with recommendation and details.
        """
        state = self.rollout_state
        stage = state.current_stage

        if stage == RolloutStage.FULL:
            return {"action": "complete", "reason": "Already at full traffic"}

        if stage == RolloutStage.ROLLED_BACK:
            return {
                "action": "stopped",
                "reason": "Rollout has been rolled back",
            }

        config = self.stage_configs.get(stage)
        if config is None:
            return {"action": "advance", "reason": "No config for stage"}

        # Check minimum duration
        elapsed = datetime.utcnow() - state.stage_started_at
        min_duration = timedelta(hours=config.min_duration_hours)
        if elapsed < min_duration:
            remaining = min_duration - elapsed
            return {
                "action": "wait",
                "reason": (
                    f"Minimum duration not met: {elapsed} < {min_duration}. "
                    f"Remaining: {remaining}"
                ),
            }

        # Check thresholds
        violations: List[str] = []
        for threshold in config.thresholds:
            value = current_metrics.get(threshold.metric_name)
            if value is None:
                violations.append(
                    f"Missing metric: {threshold.metric_name}"
                )
                continue

            if threshold.min_value is not None and value < threshold.min_value:
                violations.append(
                    f"{threshold.metric_name}={value:.4f} < "
                    f"min={threshold.min_value}"
                )
            if threshold.max_value is not None and value > threshold.max_value:
                violations.append(
                    f"{threshold.metric_name}={value:.4f} > "
                    f"max={threshold.max_value}"
                )

        if violations:
            return {
                "action": "rollback",
                "reason": "Threshold violations: " + "; ".join(violations),
                "violations": violations,
            }

        # All thresholds pass, minimum duration met — advance
        current_idx = STAGE_ORDER.index(stage)
        next_stage = STAGE_ORDER[current_idx + 1]

        return {
            "action": "advance",
            "next_stage": next_stage.value,
            "next_traffic_percentage": STAGE_TRAFFIC[next_stage],
            "reason": "All thresholds pass, minimum duration met",
        }

    def advance(self, reason: str = "Thresholds met") -> RolloutStage:
        """Advance to the next rollout stage.

        Args:
            reason: Reason for advancement.

        Returns:
            The new rollout stage.
        """
        state = self.rollout_state
        current_idx = STAGE_ORDER.index(state.current_stage)

        if current_idx >= len(STAGE_ORDER) - 1:
            return state.current_stage  # Already at full

        next_stage = STAGE_ORDER[current_idx + 1]
        state.record_transition(state.current_stage, next_stage, reason)
        state.current_stage = next_stage
        state.stage_started_at = datetime.utcnow()
        return next_stage

    def rollback(self, reason: str = "Threshold violation") -> RolloutStage:
        """Roll back to the previous stage, or fully roll back.

        If this is the first rollback, go back one stage.
        If this is the second rollback, fully roll back to champion.

        Args:
            reason: Reason for rollback.

        Returns:
            The new rollout stage.
        """
        state = self.rollout_state
        state.rollback_count += 1

        if state.rollback_count >= 2:
            # Two rollbacks — abort the deployment
            state.record_transition(
                state.current_stage,
                RolloutStage.ROLLED_BACK,
                f"Full rollback (attempt {state.rollback_count}): {reason}",
            )
            state.current_stage = RolloutStage.ROLLED_BACK
            return RolloutStage.ROLLED_BACK

        # First rollback — go back one stage
        current_idx = STAGE_ORDER.index(state.current_stage)
        if current_idx == 0:
            state.record_transition(
                state.current_stage,
                RolloutStage.ROLLED_BACK,
                f"Rollback from first stage: {reason}",
            )
            state.current_stage = RolloutStage.ROLLED_BACK
            return RolloutStage.ROLLED_BACK

        prev_stage = STAGE_ORDER[current_idx - 1]
        state.record_transition(
            state.current_stage, prev_stage, f"Rollback: {reason}"
        )
        state.current_stage = prev_stage
        state.stage_started_at = datetime.utcnow()
        return prev_stage

Blue-Green Deployment

Before discussing retraining, it is worth noting an alternative to canary: blue-green deployment. In blue-green, two identical environments ("blue" and "green") exist in parallel. The current production model runs in blue. The new model is deployed to green. All traffic is switched from blue to green instantaneously via a load balancer or DNS change.

Aspect Blue-Green Canary
Traffic transition Instantaneous (0% → 100%) Gradual (0% → 10% → 25% → 50% → 100%)
Rollback speed Instantaneous (switch back to blue) Fast (reduce canary %, route to champion)
Risk exposure All users see the new model immediately Only canary users see the new model
Infrastructure cost 2x (two full environments) 1x + canary fraction
Statistical evaluation Post-deployment only During deployment
Best for Stateless services, low-risk changes ML models, high-risk changes

Blue-green is simpler but riskier: if the new model is bad, all users are affected before the rollback occurs. Canary deployment limits blast radius at the cost of a longer deployment timeline. For ML models, where quality is uncertain and feedback is delayed, canary deployment is almost always preferred. Blue-green may be appropriate for infrastructure changes (e.g., upgrading the serving framework) where the model itself does not change.


29.8 Continuous Training: Keeping Models Fresh

A model deployed to production today will degrade over time. The world changes: user preferences shift, new content is added, seasonal patterns rotate, competitor actions alter behavior. A recommendation model trained on January data does not know about February's trending content. A credit scoring model trained before a recession does not reflect the new economic reality. This temporal degradation is called concept drift (Chapter 30 covers detection in detail), and the remedy is continuous training — retraining the model on fresh data at regular intervals or in response to detected drift.

Three Retraining Triggers

Continuous training is triggered by one of three mechanisms, each with different tradeoffs:

1. Scheduled Retraining

The simplest approach: retrain every N hours/days/weeks on the most recent data window.

Schedule Use Case Pros Cons
Hourly Fraud detection, real-time bidding Captures rapid distribution shifts High compute cost, deployment overhead
Daily Recommendations, search ranking Good freshness/cost balance May miss intra-day shifts
Weekly Credit scoring, churn prediction Low cost, stable models Slow to adapt, stale features
Monthly Regulatory models, clinical scoring Minimal cost Significant staleness risk

StreamRec uses weekly retraining: every Sunday at 02:00 UTC, the Dagster pipeline from Chapter 27 triggers. The training window is 30 days of interaction data with 7 days of holdout. The weekly cadence balances freshness (new content appears in recommendations within one week) against cost (one training run per week on 4x A100s costs approximately $150 in cloud compute).

"""
retraining_triggers.py — Retraining trigger logic for continuous training.

Implements three trigger types: scheduled (cron-based), drift-based
(PSI threshold), and performance-based (metric degradation).
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from enum import Enum
import math


class TriggerType(Enum):
    """Types of retraining triggers."""
    SCHEDULED = "scheduled"
    DRIFT = "drift"
    PERFORMANCE = "performance"
    MANUAL = "manual"


@dataclass
class TriggerEvent:
    """A single retraining trigger event.

    Attributes:
        trigger_type: What caused the trigger.
        triggered_at: When the trigger fired.
        reason: Human-readable description.
        metadata: Additional context (metric values, drift scores, etc.).
        priority: Priority level (1=urgent, 2=standard, 3=low).
    """
    trigger_type: TriggerType
    triggered_at: datetime
    reason: str
    metadata: Dict[str, float] = field(default_factory=dict)
    priority: int = 2


@dataclass
class ScheduledTrigger:
    """Cron-based retraining trigger.

    Attributes:
        cron_expression: Cron schedule (e.g., '0 2 * * 0' for Sunday 2am).
        last_triggered: When the trigger last fired.
        model_name: Name of the model to retrain.
    """
    cron_expression: str
    last_triggered: Optional[datetime] = None
    model_name: str = ""

    def should_trigger(self, now: datetime) -> Optional[TriggerEvent]:
        """Check if the scheduled trigger should fire.

        This is a simplified check: in production, use a proper
        cron parser (e.g., croniter).

        Args:
            now: Current timestamp.

        Returns:
            TriggerEvent if the trigger should fire, None otherwise.
        """
        if self.last_triggered is None:
            self.last_triggered = now
            return TriggerEvent(
                trigger_type=TriggerType.SCHEDULED,
                triggered_at=now,
                reason=f"Initial scheduled trigger: {self.cron_expression}",
                priority=3,
            )

        # Simplified: trigger if more than 7 days since last trigger
        # (for the weekly cron '0 2 * * 0')
        if now - self.last_triggered >= timedelta(days=7):
            self.last_triggered = now
            return TriggerEvent(
                trigger_type=TriggerType.SCHEDULED,
                triggered_at=now,
                reason=(
                    f"Weekly scheduled retraining: "
                    f"{self.cron_expression}"
                ),
                priority=3,
            )

        return None


@dataclass
class DriftTrigger:
    """Data drift-based retraining trigger.

    Monitors PSI (Population Stability Index) for key features.
    Triggers retraining when PSI exceeds the threshold for any
    monitored feature.

    Attributes:
        feature_thresholds: Mapping from feature name to PSI threshold.
        psi_alert_threshold: Default PSI threshold for unspecified features.
        model_name: Name of the model to retrain.
    """
    feature_thresholds: Dict[str, float] = field(default_factory=dict)
    psi_alert_threshold: float = 0.25
    model_name: str = ""

    def check_drift(
        self,
        current_psi_values: Dict[str, float],
    ) -> Optional[TriggerEvent]:
        """Check if any feature's PSI exceeds its threshold.

        Args:
            current_psi_values: Mapping from feature name to current PSI.

        Returns:
            TriggerEvent if drift is detected, None otherwise.
        """
        violations: List[str] = []
        max_psi = 0.0

        for feature, psi in current_psi_values.items():
            threshold = self.feature_thresholds.get(
                feature, self.psi_alert_threshold
            )
            if psi > threshold:
                violations.append(
                    f"{feature}: PSI={psi:.3f} > {threshold:.3f}"
                )
            max_psi = max(max_psi, psi)

        if violations:
            return TriggerEvent(
                trigger_type=TriggerType.DRIFT,
                triggered_at=datetime.utcnow(),
                reason=(
                    f"Data drift detected in {len(violations)} feature(s): "
                    + "; ".join(violations)
                ),
                metadata=current_psi_values,
                priority=1 if max_psi > 0.5 else 2,
            )

        return None


@dataclass
class PerformanceTrigger:
    """Performance degradation-based retraining trigger.

    Monitors production metrics and triggers retraining when
    performance drops below thresholds.

    Attributes:
        metric_thresholds: Mapping from metric name to
            (absolute_floor, relative_decline_from_baseline).
        baseline_metrics: Baseline metric values from the last deployment.
        model_name: Name of the model to retrain.
    """
    metric_thresholds: Dict[str, Tuple[float, float]] = field(
        default_factory=dict
    )
    baseline_metrics: Dict[str, float] = field(default_factory=dict)
    model_name: str = ""

    def check_performance(
        self,
        current_metrics: Dict[str, float],
    ) -> Optional[TriggerEvent]:
        """Check if any production metric has degraded below threshold.

        Args:
            current_metrics: Current production metric values.

        Returns:
            TriggerEvent if degradation is detected, None otherwise.
        """
        violations: List[str] = []
        is_critical = False

        for metric, (absolute_floor, max_decline) in (
            self.metric_thresholds.items()
        ):
            current = current_metrics.get(metric)
            baseline = self.baseline_metrics.get(metric)

            if current is None:
                continue

            # Check absolute floor
            if current < absolute_floor:
                violations.append(
                    f"{metric}={current:.4f} < floor={absolute_floor:.4f}"
                )
                is_critical = True

            # Check relative decline from baseline
            if baseline is not None and baseline > 0:
                decline = (baseline - current) / baseline
                if decline > max_decline:
                    violations.append(
                        f"{metric} declined {decline:.1%} from baseline "
                        f"{baseline:.4f} (max allowed: {max_decline:.1%})"
                    )

        if violations:
            return TriggerEvent(
                trigger_type=TriggerType.PERFORMANCE,
                triggered_at=datetime.utcnow(),
                reason=(
                    f"Performance degradation: "
                    + "; ".join(violations)
                ),
                metadata=current_metrics,
                priority=1 if is_critical else 2,
            )

        return None


@dataclass
class RetrainingTriggerManager:
    """Orchestrates multiple retraining triggers for a model.

    Combines scheduled, drift, and performance triggers. When any
    trigger fires, the manager decides whether to initiate retraining
    (with deduplication to prevent redundant retraining).

    Attributes:
        model_name: Name of the model.
        scheduled: Scheduled retraining trigger.
        drift: Data drift trigger.
        performance: Performance degradation trigger.
        min_retrain_interval_hours: Minimum time between retraining runs.
        last_retrain_time: When the last retraining was initiated.
        trigger_history: History of trigger events.
    """
    model_name: str
    scheduled: ScheduledTrigger
    drift: DriftTrigger
    performance: PerformanceTrigger
    min_retrain_interval_hours: int = 24
    last_retrain_time: Optional[datetime] = None
    trigger_history: List[TriggerEvent] = field(default_factory=list)

    def evaluate(
        self,
        now: datetime,
        psi_values: Dict[str, float],
        production_metrics: Dict[str, float],
    ) -> Optional[TriggerEvent]:
        """Evaluate all triggers and return the highest-priority event.

        Deduplicates: if retraining was triggered recently (within
        min_retrain_interval_hours), suppress non-critical triggers.

        Args:
            now: Current timestamp.
            psi_values: Current PSI values for monitored features.
            production_metrics: Current production metric values.

        Returns:
            The highest-priority TriggerEvent, or None if no trigger fires.
        """
        events: List[TriggerEvent] = []

        # Check each trigger
        scheduled_event = self.scheduled.should_trigger(now)
        if scheduled_event:
            events.append(scheduled_event)

        drift_event = self.drift.check_drift(psi_values)
        if drift_event:
            events.append(drift_event)

        perf_event = self.performance.check_performance(production_metrics)
        if perf_event:
            events.append(perf_event)

        if not events:
            return None

        # Sort by priority (lower = higher priority)
        events.sort(key=lambda e: e.priority)
        best_event = events[0]

        # Deduplication: suppress non-critical triggers if recent retrain
        if self.last_retrain_time is not None:
            elapsed = now - self.last_retrain_time
            min_interval = timedelta(hours=self.min_retrain_interval_hours)
            if elapsed < min_interval and best_event.priority > 1:
                return None  # Suppress non-critical trigger

        self.trigger_history.append(best_event)
        self.last_retrain_time = now
        return best_event

2. Drift-Based Retraining

When the monitoring system (Chapter 30) detects that input data has shifted significantly — PSI exceeds the threshold for one or more key features — it triggers retraining. Drift-based triggers are reactive: they retrain only when the data distribution has actually changed, avoiding unnecessary retraining when the world is stable.

The tradeoff is detection latency. PSI requires accumulating enough data to compute a reliable distribution estimate — typically one day's worth. If the drift is sudden (a schema change, a logging bug), drift detection may lag by 24 hours, during which the model serves predictions on shifted data. Performance-based triggers can catch this faster if the drift affects model quality directly.

3. Performance-Based Retraining

When a production metric (CTR, NDCG@20, AUC) drops below an absolute floor or declines more than a relative threshold from the baseline, the system triggers retraining. Performance-based triggers are the most direct: they retrain when the model is actually underperforming, not when the data has shifted (which may or may not affect model quality).

The tradeoff is that performance metrics are lagging indicators: they require user outcomes, which may arrive hours or days after the predictions are served. A recommendation model's CTR can be measured within hours; a credit scoring model's default rate cannot be measured for months.

StreamRec Retraining Configuration

StreamRec uses all three triggers in combination:

# StreamRec retraining trigger configuration
streamrec_triggers = RetrainingTriggerManager(
    model_name="streamrec-retrieval",
    scheduled=ScheduledTrigger(
        cron_expression="0 2 * * 0",  # Sunday 2am UTC
        model_name="streamrec-retrieval",
    ),
    drift=DriftTrigger(
        feature_thresholds={
            "user_engagement_rate": 0.20,
            "item_popularity_score": 0.25,
            "session_length_minutes": 0.25,
            "platform_distribution": 0.20,
        },
        psi_alert_threshold=0.25,
        model_name="streamrec-retrieval",
    ),
    performance=PerformanceTrigger(
        metric_thresholds={
            "recall_at_20": (0.15, 0.10),       # floor=0.15, max 10% decline
            "ndcg_at_20": (0.12, 0.10),          # floor=0.12, max 10% decline
            "ctr": (0.03, 0.15),                  # floor=3%, max 15% decline
            "completion_rate": (0.08, 0.15),      # floor=8%, max 15% decline
        },
        baseline_metrics={
            "recall_at_20": 0.214,
            "ndcg_at_20": 0.186,
            "ctr": 0.052,
            "completion_rate": 0.134,
        },
        model_name="streamrec-retrieval",
    ),
    min_retrain_interval_hours=24,
)

The scheduled trigger provides a guaranteed weekly refresh. The drift trigger catches distribution shifts between scheduled retraining windows. The performance trigger catches quality degradation that drift detection might miss (because the drift affects features not monitored by PSI, or because the model is sensitive to small shifts that fall below the PSI threshold). The 24-hour minimum interval prevents redundant retraining when multiple triggers fire simultaneously.


29.9 Retraining Pipeline Integration

When a retraining trigger fires, the system must execute the full pipeline: extract data, validate data, compute features, train, evaluate, validate, and deploy. This is the same Dagster pipeline from Chapter 27, triggered automatically rather than manually.

The integration between the trigger manager and the orchestration pipeline is:

Trigger Manager
     │
     ├── fires TriggerEvent (type=scheduled|drift|performance)
     │
     ▼
Dagster Sensor
     │
     ├── reads trigger event from message queue (e.g., SQS, Pub/Sub)
     ├── creates a Dagster run with trigger metadata as run tags
     │
     ▼
Dagster Pipeline (Chapter 27)
     │
     ├── extract → validate → features → train → evaluate → register
     │
     ▼
Model Registry
     │
     ├── new model version registered in Staging
     │
     ▼
Deployment Pipeline (this chapter)
     │
     ├── shadow → canary → progressive rollout → production
     │
     ▼
Monitoring (Chapter 30)
     │
     ├── update baseline metrics for next trigger evaluation

The critical design decision is whether the trigger-to-production path is fully automated or requires human approval. For StreamRec, the configuration is:

  • Scheduled retraining: Fully automated (shadow → canary → rollout proceeds without approval).
  • Drift-triggered retraining: Automated training and validation; human approval required before canary deployment (because drift may indicate a data quality issue, not a model staleness issue).
  • Performance-triggered retraining: Automated training; human approval required (because performance degradation may have a root cause that retraining cannot fix — a feature pipeline bug, an upstream schema change, or a business context change).

This tiered automation ensures that routine retraining happens without human bottlenecks while preserving human judgment for anomalous situations.


29.10 Model Rollback: The Safety Net

Every deployment must have a rollback plan. For ML models, rollback means restoring the previous champion model — the model that was serving production traffic before the current deployment. Rollback must be:

  1. Fast: The model must be restored within seconds, not minutes. A degraded model serving production traffic for 10 minutes at 20 million requests per day affects 140,000 requests.
  2. Safe: The rollback itself must not introduce additional risk. The previous champion model must still be available, its serving infrastructure must still be running, and the rollback mechanism must be tested regularly.
  3. Automated: Rollback should not require a human to log in, identify the problem, find the previous model version, and manually switch traffic. The monitoring system (Chapter 30) or the canary evaluator (Section 29.6) should trigger rollback automatically when thresholds are violated.

Rollback Architecture

The standard rollback architecture keeps the previous champion model deployed and ready to serve:

Load Balancer / Feature Flag
         │
         ├── traffic_weight=1.0 ──► Current Champion (v18)
         ├── traffic_weight=0.0 ──► Previous Champion (v17) [warm standby]
         └── traffic_weight=0.0 ──► Canary (v19) [if active]

Rollback:
  Set v18 traffic_weight = 0.0
  Set v17 traffic_weight = 1.0
  Duration: < 5 seconds (load balancer config change)

The previous champion remains deployed (warm standby) throughout the current champion's lifetime. Its serving infrastructure is allocated but idle, consuming resources but enabling instantaneous rollback. When the current champion is retired (replaced by a new champion that has completed full progressive rollout), the warm standby is deallocated.

"""
rollback.py — Automated model rollback with health checks.

Implements the rollback procedure: detect issue, verify rollback
target health, switch traffic, confirm, and notify.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum


class RollbackReason(Enum):
    """Reasons for triggering a model rollback."""
    CANARY_FAILURE = "canary_threshold_violation"
    LATENCY_SPIKE = "latency_p99_exceeded"
    ERROR_RATE = "error_rate_exceeded"
    METRIC_DEGRADATION = "business_metric_degradation"
    DATA_QUALITY = "upstream_data_quality_issue"
    MANUAL = "manual_trigger"


class RollbackStatus(Enum):
    """Status of a rollback operation."""
    INITIATED = "initiated"
    HEALTH_CHECK = "health_check"
    TRAFFIC_SWITCHING = "traffic_switching"
    VERIFYING = "verifying"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class RollbackTarget:
    """The model to roll back to.

    Attributes:
        model_name: Registered model name.
        version: Version number to restore.
        artifact_uri: URI to the serialized model.
        serving_endpoint: URL of the warm standby endpoint.
        last_health_check: When the standby was last verified healthy.
        is_warm: Whether the model is loaded and ready to serve.
    """
    model_name: str
    version: int
    artifact_uri: str
    serving_endpoint: str
    last_health_check: Optional[datetime] = None
    is_warm: bool = True


@dataclass
class RollbackProcedure:
    """Automated rollback procedure.

    Executes: detect → health check → switch traffic → verify → notify.

    Attributes:
        current_champion: The current (potentially degraded) model.
        rollback_target: The previous champion to restore.
        reason: Why the rollback was triggered.
        status: Current status of the rollback.
        started_at: When the rollback was initiated.
        completed_at: When the rollback completed.
        steps: Log of rollback steps.
    """
    current_champion: Dict[str, object]
    rollback_target: RollbackTarget
    reason: RollbackReason
    status: RollbackStatus = RollbackStatus.INITIATED
    started_at: datetime = field(default_factory=datetime.utcnow)
    completed_at: Optional[datetime] = None
    steps: List[Dict[str, str]] = field(default_factory=list)

    def execute(self) -> Dict[str, object]:
        """Execute the full rollback procedure.

        Returns:
            Dictionary with rollback outcome and timing.
        """
        # Step 1: Health check on rollback target
        self.status = RollbackStatus.HEALTH_CHECK
        self._log_step("health_check", "Verifying rollback target health")

        if not self.rollback_target.is_warm:
            self.status = RollbackStatus.FAILED
            self._log_step(
                "health_check_failed",
                "Rollback target is not warm — cannot proceed",
            )
            return self._result("failed", "Rollback target not available")

        # Step 2: Switch traffic
        self.status = RollbackStatus.TRAFFIC_SWITCHING
        self._log_step(
            "traffic_switch",
            f"Routing 100% traffic to v{self.rollback_target.version}",
        )

        # In production, this calls the load balancer API or updates
        # the feature flag service. The actual traffic switch is
        # typically < 1 second.

        # Step 3: Verify
        self.status = RollbackStatus.VERIFYING
        self._log_step(
            "verify",
            "Verifying rollback target is serving correctly",
        )

        # In production, this sends health check requests and verifies
        # that the rollback target returns valid predictions.

        # Step 4: Complete
        self.status = RollbackStatus.COMPLETED
        self.completed_at = datetime.utcnow()
        duration = self.completed_at - self.started_at

        self._log_step(
            "completed",
            f"Rollback completed in {duration.total_seconds():.1f}s",
        )

        return self._result("success", f"Rolled back in {duration}")

    def _log_step(self, step_name: str, description: str) -> None:
        """Log a rollback step."""
        self.steps.append({
            "step": step_name,
            "description": description,
            "timestamp": datetime.utcnow().isoformat(),
            "status": self.status.value,
        })

    def _result(
        self, outcome: str, message: str
    ) -> Dict[str, object]:
        """Construct the rollback result."""
        return {
            "outcome": outcome,
            "message": message,
            "reason": self.reason.value,
            "rolled_back_from": self.current_champion,
            "rolled_back_to": {
                "model_name": self.rollback_target.model_name,
                "version": self.rollback_target.version,
            },
            "started_at": self.started_at.isoformat(),
            "completed_at": (
                self.completed_at.isoformat()
                if self.completed_at
                else None
            ),
            "steps": self.steps,
        }

Rollback Testing

A rollback procedure that has never been tested is not a rollback procedure — it is a hope. StreamRec tests rollback in three ways:

  1. Monthly rollback drill. Once per month, the on-call engineer executes a planned rollback during a low-traffic window. The drill verifies that the warm standby is healthy, that the traffic switch completes in under 5 seconds, and that the restored model serves predictions correctly. The drill is logged and reviewed.

  2. Chaos engineering. The team uses a chaos engineering framework to inject failures: kill a model server pod, inject latency into the feature store, corrupt a model artifact. The rollback automation must detect the failure and restore service without human intervention.

  3. Staging environment testing. Every deployment to production is first tested in a staging environment that mirrors production. The staging deployment includes a forced rollback after 10 minutes: deploy the new model, verify it serves correctly, roll back to the previous model, verify the rollback completes. Only after the staging rollback succeeds does the production deployment proceed.


29.11 Model Serving Infrastructure

The deployment pipeline described in this chapter assumes a model serving infrastructure that supports traffic splitting, health checks, and rapid rollback. Three common serving frameworks:

Comparison of Serving Frameworks

Feature TorchServe Seldon Core BentoML
Framework PyTorch native Framework-agnostic (K8s) Framework-agnostic
Deployment Standalone or K8s Kubernetes-native Docker, K8s, cloud
Traffic splitting Via load balancer Native (Istio) Via load balancer
A/B testing Manual Native Via router
Canary Via Istio/Argo Native Via cloud providers
Auto-scaling K8s HPA K8s HPA + custom K8s HPA, Yatai
Model format .mar (TorchScript, eager) ONNX, TF, PyTorch, custom Any Python callable
Batching Built-in (dynamic batching) Via inference server Built-in (adaptive)
Multi-model Yes Yes (multi-armed bandit) Yes
Best for PyTorch-only shops K8s-native ML platform teams Rapid prototyping → production

For StreamRec, the architecture uses BentoML for model packaging (creating a self-contained "Bento" that includes the model, preprocessing code, and API definition) deployed on Kubernetes. Traffic splitting for canary and progressive rollout is handled by Istio service mesh, which provides fine-grained traffic routing at the network level:

Istio VirtualService Configuration:
  route:
    - destination: streamrec-champion (v18)
      weight: 90
    - destination: streamrec-canary (v19)
      weight: 10

Updating the traffic weight requires a single Kubernetes API call to modify the Istio VirtualService resource. The change propagates to all proxies within seconds, making both canary promotion and rollback near-instantaneous.

Containerized Model Serving

Every model is packaged as a Docker container with pinned dependencies:

Dockerfile.serving
├── Base image: python:3.11-slim
├── System dependencies: libgomp (for LightGBM), libgl1 (for OpenCV)
├── Python dependencies: requirements-serving.txt (pinned versions)
├── Model artifact: /app/model/ (copied from registry)
├── Serving code: /app/src/ (feature preprocessing, prediction logic)
├── Config: /app/config/serving.yaml (batch size, timeout, health check)
└── Entry point: bentoml serve streamrec_service:svc --port 3000

The container is the immutable artifact of deployment. The same container that passes integration tests in CI is the container that runs in staging and production. No environment-specific modifications, no runtime downloads, no "works on my machine" failures.


29.12 Progressive Project: M13 — StreamRec Deployment Pipeline

Milestone M13 builds the complete deployment pipeline for StreamRec, integrating the CI/CD, shadow mode, canary deployment, progressive rollout, retraining triggers, and rollback infrastructure developed throughout this chapter.

M13 Requirements

  1. Weekly retraining (Sunday 2am UTC) with the Dagster pipeline from Chapter 27
  2. Shadow mode evaluation (7 days, 100,000 minimum predictions)
  3. Canary deployment (10% traffic, 3 days, statistical evaluation)
  4. Progressive rollout (25% → 50% → 100%, 1 day per stage)
  5. Automatic rollback on guardrail violation (error rate > 0.5%, CTR decline > 2%)
  6. Drift-based retraining trigger (PSI > 0.25 on key features)
  7. Performance-based retraining trigger (Recall@20 < 0.15 or >10% decline)
  8. Full artifact lineage (code commit, data version, hyperparameters, evaluation metrics)

M13 Implementation

"""
m13_streamrec_deployment.py — Complete deployment pipeline for StreamRec.

Progressive Project Milestone M13: integrates CI/CD, shadow mode,
canary deployment, progressive rollout, retraining triggers, and
rollback into a single orchestrated deployment pipeline.

This module ties together the components developed throughout Chapter 29:
- ModelRegistry and ModelArtifact (Section 29.3)
- ArtifactLineage (Section 29.4)
- ShadowEvaluator (Section 29.5)
- CanaryConfig and evaluate_canary (Section 29.6)
- ProgressiveRolloutController (Section 29.7)
- RetrainingTriggerManager (Section 29.8)
- RollbackProcedure (Section 29.10)
"""

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum


class PipelinePhase(Enum):
    """Phases of the StreamRec deployment pipeline."""
    TRAINING = "training"
    VALIDATION = "validation"
    SHADOW = "shadow"
    CANARY = "canary"
    PROGRESSIVE_ROLLOUT = "progressive_rollout"
    PRODUCTION = "production"
    ROLLED_BACK = "rolled_back"


@dataclass
class DeploymentPipelineConfig:
    """Configuration for the StreamRec deployment pipeline.

    Consolidates all deployment parameters into a single configuration
    object that can be version-controlled and audited.

    Attributes:
        model_name: Name of the model (e.g., 'streamrec-retrieval').
        retraining_schedule: Cron expression for scheduled retraining.
        shadow_duration_hours: Duration of shadow evaluation.
        shadow_min_predictions: Minimum predictions for shadow evaluation.
        canary_percentage: Fraction of traffic for canary.
        canary_duration_hours: Maximum canary duration.
        canary_min_impressions: Minimum impressions for canary evaluation.
        rollout_stages: Traffic percentages for progressive rollout.
        rollout_stage_duration_hours: Minimum hours per rollout stage.
        max_latency_p99_ms: Maximum acceptable p99 latency.
        max_error_rate: Maximum acceptable serving error rate.
        min_ctr_lift: Minimum CTR lift for canary promotion.
        drift_psi_threshold: PSI threshold for drift-triggered retraining.
        performance_floor: Absolute metric floors for performance trigger.
        performance_max_decline: Maximum relative decline for performance trigger.
    """
    model_name: str = "streamrec-retrieval"
    retraining_schedule: str = "0 2 * * 0"  # Sunday 2am UTC

    # Shadow mode
    shadow_duration_hours: int = 168  # 7 days
    shadow_min_predictions: int = 100_000
    shadow_max_latency_p99_ms: float = 50.0

    # Canary
    canary_percentage: float = 0.10
    canary_duration_hours: int = 72  # 3 days
    canary_min_impressions: int = 500_000

    # Progressive rollout
    rollout_stages: List[float] = field(
        default_factory=lambda: [0.25, 0.50, 1.00]
    )
    rollout_stage_duration_hours: int = 24

    # Guardrails
    max_latency_p99_ms: float = 50.0
    max_error_rate: float = 0.005
    min_ctr_lift: float = -0.02

    # Retraining triggers
    drift_psi_threshold: float = 0.25
    performance_floor: Dict[str, float] = field(
        default_factory=lambda: {
            "recall_at_20": 0.15,
            "ndcg_at_20": 0.12,
            "ctr": 0.03,
        }
    )
    performance_max_decline: Dict[str, float] = field(
        default_factory=lambda: {
            "recall_at_20": 0.10,
            "ndcg_at_20": 0.10,
            "ctr": 0.15,
        }
    )


@dataclass
class DeploymentPipelineState:
    """Tracks the current state of a deployment pipeline run.

    Attributes:
        run_id: Unique identifier for this pipeline run.
        model_name: Name of the model being deployed.
        challenger_version: Version of the model being deployed.
        champion_version: Current production model version.
        phase: Current pipeline phase.
        started_at: When the pipeline run started.
        phase_started_at: When the current phase started.
        trigger_event: What triggered this pipeline run.
        phase_history: Log of phase transitions.
        metrics: Collected metrics at each phase.
    """
    run_id: str
    model_name: str
    challenger_version: int
    champion_version: int
    phase: PipelinePhase = PipelinePhase.TRAINING
    started_at: datetime = field(default_factory=datetime.utcnow)
    phase_started_at: datetime = field(default_factory=datetime.utcnow)
    trigger_event: Optional[Dict[str, str]] = None
    phase_history: List[Dict[str, str]] = field(default_factory=list)
    metrics: Dict[str, Dict[str, float]] = field(default_factory=dict)

    def transition_to(
        self, new_phase: PipelinePhase, reason: str
    ) -> None:
        """Record a phase transition.

        Args:
            new_phase: The phase to transition to.
            reason: Why the transition occurred.
        """
        self.phase_history.append({
            "from_phase": self.phase.value,
            "to_phase": new_phase.value,
            "reason": reason,
            "timestamp": datetime.utcnow().isoformat(),
        })
        self.phase = new_phase
        self.phase_started_at = datetime.utcnow()

    def record_metrics(
        self, phase: str, metrics: Dict[str, float]
    ) -> None:
        """Record metrics for a pipeline phase.

        Args:
            phase: Phase name.
            metrics: Dictionary of metric values.
        """
        self.metrics[phase] = metrics


@dataclass
class StreamRecDeploymentPipeline:
    """The complete StreamRec deployment pipeline.

    Orchestrates the full lifecycle:
    trigger → train → validate → shadow → canary → rollout → production

    This class is the top-level coordinator that references the
    components from earlier sections. In production, each phase
    would be implemented as a Dagster asset or job; here, the
    logic is consolidated for clarity.

    Attributes:
        config: Pipeline configuration.
        state: Current pipeline state.
    """
    config: DeploymentPipelineConfig
    state: Optional[DeploymentPipelineState] = None

    def initiate_deployment(
        self,
        run_id: str,
        challenger_version: int,
        champion_version: int,
        trigger_type: str = "scheduled",
        trigger_reason: str = "",
    ) -> DeploymentPipelineState:
        """Start a new deployment pipeline run.

        Args:
            run_id: Unique run identifier.
            challenger_version: Model version to deploy.
            champion_version: Current production version.
            trigger_type: What initiated this deployment.
            trigger_reason: Detailed reason for the trigger.

        Returns:
            Initial pipeline state.
        """
        self.state = DeploymentPipelineState(
            run_id=run_id,
            model_name=self.config.model_name,
            challenger_version=challenger_version,
            champion_version=champion_version,
            trigger_event={
                "type": trigger_type,
                "reason": trigger_reason,
            },
        )
        return self.state

    def evaluate_phase(
        self, current_metrics: Dict[str, float]
    ) -> Dict[str, Any]:
        """Evaluate whether the current phase should advance, wait, or rollback.

        Applies the appropriate evaluation logic based on the current phase.

        Args:
            current_metrics: Current metrics for the active phase.

        Returns:
            Dictionary with action ('advance', 'wait', 'rollback') and details.
        """
        if self.state is None:
            return {"action": "error", "reason": "No active deployment"}

        phase = self.state.phase

        if phase == PipelinePhase.SHADOW:
            return self._evaluate_shadow(current_metrics)
        elif phase == PipelinePhase.CANARY:
            return self._evaluate_canary(current_metrics)
        elif phase == PipelinePhase.PROGRESSIVE_ROLLOUT:
            return self._evaluate_rollout(current_metrics)
        else:
            return {
                "action": "wait",
                "reason": f"Phase {phase.value} not evaluable",
            }

    def _evaluate_shadow(
        self, metrics: Dict[str, float]
    ) -> Dict[str, Any]:
        """Evaluate shadow mode metrics."""
        assert self.state is not None

        elapsed = datetime.utcnow() - self.state.phase_started_at
        min_duration = timedelta(hours=self.config.shadow_duration_hours)

        if elapsed < min_duration:
            return {
                "action": "wait",
                "reason": f"Shadow duration: {elapsed} < {min_duration}",
            }

        n_predictions = int(metrics.get("n_predictions", 0))
        if n_predictions < self.config.shadow_min_predictions:
            return {
                "action": "wait",
                "reason": (
                    f"Predictions: {n_predictions:,} < "
                    f"{self.config.shadow_min_predictions:,}"
                ),
            }

        p99 = metrics.get("challenger_p99_ms", 0.0)
        if p99 > self.config.shadow_max_latency_p99_ms:
            return {
                "action": "rollback",
                "reason": (
                    f"Shadow p99 latency {p99:.1f}ms > "
                    f"{self.config.shadow_max_latency_p99_ms}ms"
                ),
            }

        self.state.record_metrics("shadow", metrics)
        return {"action": "advance", "next_phase": "canary"}

    def _evaluate_canary(
        self, metrics: Dict[str, float]
    ) -> Dict[str, Any]:
        """Evaluate canary deployment metrics."""
        assert self.state is not None

        elapsed = datetime.utcnow() - self.state.phase_started_at
        min_duration = timedelta(hours=self.config.canary_duration_hours)

        # Immediate rollback on guardrail violation
        error_rate = metrics.get("canary_error_rate", 0.0)
        if error_rate > self.config.max_error_rate:
            return {
                "action": "rollback",
                "reason": (
                    f"Error rate {error_rate:.4f} > "
                    f"{self.config.max_error_rate}"
                ),
            }

        ctr_lift = metrics.get("ctr_lift", 0.0)
        is_significant = metrics.get("is_significant", False)

        if is_significant and ctr_lift < self.config.min_ctr_lift:
            return {
                "action": "rollback",
                "reason": (
                    f"Significant CTR regression: {ctr_lift:+.3%}"
                ),
            }

        if elapsed < min_duration:
            return {
                "action": "wait",
                "reason": f"Canary duration: {elapsed} < {min_duration}",
            }

        impressions = int(metrics.get("canary_impressions", 0))
        if impressions < self.config.canary_min_impressions:
            return {
                "action": "wait",
                "reason": (
                    f"Impressions: {impressions:,} < "
                    f"{self.config.canary_min_impressions:,}"
                ),
            }

        # Canary passed — advance to progressive rollout
        self.state.record_metrics("canary", metrics)
        return {"action": "advance", "next_phase": "progressive_rollout"}

    def _evaluate_rollout(
        self, metrics: Dict[str, float]
    ) -> Dict[str, Any]:
        """Evaluate progressive rollout metrics."""
        assert self.state is not None

        # Check guardrails
        error_rate = metrics.get("error_rate", 0.0)
        if error_rate > self.config.max_error_rate:
            return {
                "action": "rollback",
                "reason": f"Error rate {error_rate:.4f} exceeded",
            }

        elapsed = datetime.utcnow() - self.state.phase_started_at
        min_duration = timedelta(
            hours=self.config.rollout_stage_duration_hours
        )

        if elapsed < min_duration:
            return {
                "action": "wait",
                "reason": (
                    f"Rollout stage duration: {elapsed} < {min_duration}"
                ),
            }

        self.state.record_metrics("rollout", metrics)
        return {"action": "advance", "next_phase": "next_stage"}

    def generate_pipeline_report(self) -> Dict[str, Any]:
        """Generate a summary report of the deployment pipeline run.

        Returns:
            Dictionary with complete pipeline run summary.
        """
        if self.state is None:
            return {"error": "No active deployment"}

        total_duration = datetime.utcnow() - self.state.started_at

        return {
            "run_id": self.state.run_id,
            "model_name": self.state.model_name,
            "challenger_version": self.state.challenger_version,
            "champion_version": self.state.champion_version,
            "current_phase": self.state.phase.value,
            "total_duration": str(total_duration),
            "trigger": self.state.trigger_event,
            "phase_history": self.state.phase_history,
            "metrics": self.state.metrics,
        }


# ── StreamRec M13 Configuration ─────────────────────────────────────────

streamrec_config = DeploymentPipelineConfig(
    model_name="streamrec-retrieval",
    retraining_schedule="0 2 * * 0",
    shadow_duration_hours=168,
    shadow_min_predictions=100_000,
    shadow_max_latency_p99_ms=50.0,
    canary_percentage=0.10,
    canary_duration_hours=72,
    canary_min_impressions=500_000,
    rollout_stages=[0.25, 0.50, 1.00],
    rollout_stage_duration_hours=24,
    max_latency_p99_ms=50.0,
    max_error_rate=0.005,
    min_ctr_lift=-0.02,
    drift_psi_threshold=0.25,
    performance_floor={
        "recall_at_20": 0.15,
        "ndcg_at_20": 0.12,
        "ctr": 0.03,
    },
    performance_max_decline={
        "recall_at_20": 0.10,
        "ndcg_at_20": 0.10,
        "ctr": 0.15,
    },
)

pipeline = StreamRecDeploymentPipeline(config=streamrec_config)

M13 Timeline

For a typical weekly deployment:

Day Phase Traffic Key Metrics
Sunday 02:00 Retraining triggered 0% Pipeline starts
Sunday 05:30 Training completes, validation passes 0% Recall@20=0.218, NDCG@20=0.189
Sunday 06:00 Shadow mode begins 0% (parallel) Latency, rank correlation
Saturday 06:00 Shadow evaluation passes 0% p99=32ms, rank_corr=0.94
Saturday 07:00 Canary begins 10% CTR, completion rate
Tuesday 07:00 Canary evaluation passes 10% CTR +0.8% (p=0.03)
Tuesday 08:00 Progressive rollout stage 1 25% CTR, engagement
Wednesday 08:00 Progressive rollout stage 2 50% CTR, engagement
Thursday 08:00 Full rollout 100% Promoted to champion

Total deployment time: approximately 12 days from retraining trigger to full production rollout. This is deliberately conservative — the 7-day shadow period and 3-day canary period prioritize safety over speed. For urgent deployments (e.g., fixing a critical model bug), the pipeline supports a "fast track" mode that reduces shadow to 24 hours and canary to 24 hours, with explicit human approval.


29.13 Deployment in Regulated Environments

The deployment pipeline described for StreamRec operates in a low-regulation environment: recommendations are not life-critical, and a bad model degrades engagement metrics but does not violate regulations. Credit scoring, healthcare, insurance, and financial trading operate under regulatory frameworks that impose additional deployment constraints.

Model Risk Management (SR 11-7)

The Federal Reserve's SR 11-7 guidance requires financial institutions to maintain a model risk management (MRM) framework that covers model development, validation, and ongoing monitoring. For deployment, this means:

  1. Independent validation. The model validation gate (Chapter 28) must be executed by a team independent of the model development team. The data scientist who trained the model cannot approve its deployment.
  2. Documentation. Every deployment must produce auditable documentation: the model's purpose, methodology, assumptions, limitations, validation results, and approval signatures.
  3. Change management. Model deployments are classified by materiality: material changes (new architecture, new feature set, new training data source) require full re-validation; non-material changes (retraining on fresh data with the same architecture) require abbreviated validation.
  4. Approval workflow. Before a model reaches production, it must be approved by the model owner, the MRM team, and (for material changes) the model risk committee. This approval is not automated — it is a human judgment.

The deployment pipeline for Meridian Financial's credit scoring model adds these regulatory stages:

Standard Pipeline:
  train → validate → shadow → canary → rollout

Regulated Pipeline (Meridian Financial):
  train → validate → [MRM REVIEW] → [MRM APPROVAL] → shadow → canary → [BUSINESS APPROVAL] → rollout

The MRM review stage typically adds 2-5 business days to the deployment timeline. The business approval stage adds 1-2 business days. A deployment that takes 12 days at StreamRec takes 17-19 days at Meridian Financial.

Feature Flags for Regulatory Compliance

Feature flags (also called feature toggles) provide fine-grained control over model deployment without infrastructure changes. A feature flag is a runtime configuration that determines which model serves which traffic:

Flag Type Use Case
model_version String Select model version for serving
canary_percentage Float Control canary traffic split
enable_new_features Boolean Gate new feature engineering code
fallback_model String Model to use if primary fails
kill_switch Boolean Instantly disable a model and serve fallback

The kill switch is the most critical feature flag. It provides a single-click mechanism to disable any model and route all traffic to a known-safe fallback. For Meridian Financial, the kill switch routes all scoring to the previous approved model — ensuring that credit decisions are never interrupted.


29.14 Summary

Continuous training and deployment extends the ML pipeline from "model trained and validated" to "model serving production traffic safely and staying fresh over time." The key ideas:

  1. Three artifacts, not one. ML deployments manage code, data, and model artifacts with different versioning, testing, and promotion strategies. The CI/CD pipeline must handle all three.

  2. Staged deployment. Shadow mode (zero risk), canary (limited risk), and progressive rollout (controlled risk) provide increasing confidence that the new model is safe before full traffic exposure.

  3. Continuous training. Scheduled, drift-based, and performance-based triggers keep models fresh. The trigger → train → validate → deploy pipeline runs automatically, with human gates for anomalous situations.

  4. Rollback as infrastructure. Rollback is not an emergency procedure — it is a designed, tested, and automated capability. The previous champion remains warm and ready to serve at all times.

  5. MLOps maturity. The path from manual notebooks (Level 0) to automated ML systems (Level 3) is a progression of engineering investment. Level 2 — automated CI/CD with staged rollout — is the target for production models.

Production ML = Software Engineering: The deployment pipeline in this chapter is fundamentally a software engineering artifact. It uses the same patterns — CI/CD, immutable artifacts, staged rollouts, feature flags, chaos engineering, rollback testing — that mature software organizations have refined over decades. The ML-specific additions (three-artifact versioning, shadow mode, retraining triggers, model validation gates) build on top of this software engineering foundation, not in place of it. The organizations that deploy ML systems reliably are the ones that recognize this.

Chapter 30 builds the monitoring and observability infrastructure that closes the loop: detecting data drift, model degradation, and system failures in production, and feeding that information back to the retraining triggers and rollback procedures developed in this chapter.