> "Code without tests is legacy code. I don't care how beautiful or well-designed it is. Without tests, we don't know if it works. With tests, we can change the system's behavior quickly and verifiably."
In This Chapter
- Learning Objectives
- 28.1 Code Without Tests Is Legacy Code — Doubly True for ML
- 28.2 Data Validation with Great Expectations
- 28.3 Schema Validation with Pandera
- 28.4 Statistical Data Validation
- 28.5 Data Contracts
- 28.6 Contract Testing Between Pipeline Stages
- 28.7 Behavioral Testing for ML Models: The CheckList Framework
- 28.8 Building a Behavioral Test Suite for StreamRec
- 28.9 Behavioral Testing for Credit Scoring
- 28.10 Model Validation Gates
- 28.11 Shadow Evaluation and Staged Validation
- 28.12 Comprehensive ML Testing Strategy
- 28.13 Progressive Project M12: StreamRec Testing Infrastructure
- 28.14 Summary
- References
Chapter 28: ML Testing and Validation Infrastructure — Data Contracts, Behavioral Testing, and Great Expectations
"Code without tests is legacy code. I don't care how beautiful or well-designed it is. Without tests, we don't know if it works. With tests, we can change the system's behavior quickly and verifiably." — Michael Feathers, Working Effectively with Legacy Code (2004)
Learning Objectives
By the end of this chapter, you will be able to:
- Implement data validation pipelines using Great Expectations and Pandera
- Design behavioral tests for ML models (invariance, directional, minimum functionality)
- Build data contracts between pipeline stages to catch silent data quality issues
- Implement model validation gates that prevent bad models from reaching production
- Design a comprehensive ML testing strategy covering data, features, models, and infrastructure
28.1 Code Without Tests Is Legacy Code — Doubly True for ML
Michael Feathers' definition of legacy code — code without tests — applies with amplified force to machine learning systems. In traditional software, a function either produces the correct output or it does not. In ML, a model can produce outputs that are syntactically correct, numerically plausible, and completely wrong. A recommendation model that returns item IDs from the catalog is "working" by every software metric, even if it is recommending children's content to adults or stale content from three years ago. A credit scoring model that returns a probability between 0 and 1 is "working," even if a data pipeline change caused the income feature to be reported in cents rather than dollars, silently shifting every score.
Traditional software has a testing playbook refined over decades: unit tests verify individual functions, integration tests verify component interactions, end-to-end tests verify user-visible behavior. This playbook is necessary for ML systems, but it is not sufficient. ML systems introduce three additional testing dimensions that have no analogue in traditional software:
Data testing. The "input" to an ML system is not a function argument — it is a dataset that may contain billions of rows, change daily, and degrade silently. A column that was 99.8% non-null yesterday may be 40% null today because an upstream service changed its logging format. A feature that was standardized to mean zero during training may arrive at serving time with mean 147 because the normalization step was skipped. These failures produce no exceptions. The pipeline runs. The model scores. The predictions are garbage.
Model behavioral testing. Traditional tests check exact outputs: assert add(2, 3) == 5. ML models are stochastic, approximate, and context-dependent. We cannot assert that the model produces a specific recommendation for a specific user. But we can assert behavioral properties: changing a user's name should not change their recommendations (invariance). Increasing a user's engagement with science fiction should increase the model's science fiction recommendations (directionality). The model should outperform a popularity baseline on every user segment (minimum functionality). These behavioral tests, formalized by Ribeiro et al. (2020) in the CheckList framework, provide the testing vocabulary that ML has been missing.
Model validation gates. In traditional software, a new version either passes its test suite or it does not. In ML, a new model may pass all behavioral tests and still be worse than the current production model — because the training data was slightly different, the hyperparameters were slightly off, or the random seed was slightly unlucky. Model validation gates compare the challenger model against the champion on held-out data, on sliced performance across user segments, and on business metrics in shadow mode before permitting deployment.
Production ML = Software Engineering: This chapter operationalizes the theme of Part V: production ML is software engineering with additional dimensions. Every testing practice from software engineering applies — and then ML adds data validation, behavioral testing, and model validation gates on top. The organizations that ship reliable ML systems are the ones that invest in this infrastructure before the first model reaches production.
The chapter proceeds in four movements. Section 28.2-28.4 cover data validation: ensuring that the data entering your pipeline matches the contract you expect, using Great Expectations and Pandera. Section 28.5-28.6 cover data contracts: formalizing the agreements between pipeline stages so that silent data quality regressions become loud failures. Section 28.7-28.9 cover behavioral testing: the CheckList framework, invariance, directional, and minimum functionality tests. Section 28.10-28.11 cover model validation gates: the automated checks that stand between a trained model and production traffic. Section 28.12 synthesizes these into a comprehensive ML testing strategy and applies it to the StreamRec progressive project.
28.2 Data Validation with Great Expectations
Great Expectations (GE) is an open-source Python framework for data validation, documentation, and profiling. Its core abstraction is the expectation — a declarative assertion about a dataset that can be evaluated against any batch of data. An expectation suite is a collection of expectations that together define the "contract" a dataset must satisfy.
Core Concepts
| Concept | Definition |
|---|---|
| Expectation | A declarative assertion about data (e.g., "column user_id is never null") |
| Expectation Suite | A named collection of expectations applied to a specific data asset |
| Validator | The engine that evaluates expectations against a batch of data |
| Checkpoint | An orchestration unit that runs a validator against a data source and triggers actions on success/failure |
| Data Docs | Auto-generated HTML documentation of expectations and validation results |
| Data Source | A connection to a data backend (Pandas, Spark, SQL) |
| Data Asset | A specific table, file, or query within a data source |
The power of Great Expectations is that expectations are declarative and data-backend agnostic. The same expectation suite can validate a Pandas DataFrame during local development, a Spark DataFrame during batch processing, and a SQL table in production — with no code changes. Expectations are also self-documenting: every expectation generates a human-readable description that becomes part of the data documentation.
Building an Expectation Suite
Consider the StreamRec user event data — the click stream that feeds the recommendation pipeline. Every row represents a user interaction: a view, a click, a completion, or a skip. The schema is:
| Column | Type | Description |
|---|---|---|
event_id |
string (UUID) | Unique event identifier |
user_id |
string | User identifier |
item_id |
string | Content item identifier |
event_type |
string | One of: view, click, complete, skip |
timestamp |
datetime | Event timestamp (UTC) |
session_id |
string | Session identifier |
platform |
string | One of: ios, android, web |
duration_seconds |
float | Time spent on item (nullable for non-view events) |
We build a Great Expectations suite that codifies every assumption the downstream pipeline makes about this data:
import great_expectations as gx
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
from typing import List, Dict, Any
def build_streamrec_event_suite() -> ExpectationSuite:
"""Build a Great Expectations suite for StreamRec user event data.
The suite encodes schema expectations, completeness requirements,
value range constraints, distributional properties, and freshness
checks for the click stream data that feeds the recommendation
pipeline.
Returns:
ExpectationSuite: Named suite 'streamrec_user_events'.
"""
suite = ExpectationSuite(name="streamrec_user_events")
# --- Schema expectations ---
required_columns = [
"event_id", "user_id", "item_id", "event_type",
"timestamp", "session_id", "platform", "duration_seconds",
]
suite.add_expectation(
ExpectationConfiguration(
type="expect_table_columns_to_match_set",
kwargs={"column_set": required_columns, "exact_match": True},
)
)
# --- Uniqueness ---
suite.add_expectation(
ExpectationConfiguration(
type="expect_column_values_to_be_unique",
kwargs={"column": "event_id"},
)
)
# --- Non-null constraints ---
for col in ["event_id", "user_id", "item_id", "event_type",
"timestamp", "session_id", "platform"]:
suite.add_expectation(
ExpectationConfiguration(
type="expect_column_values_to_not_be_null",
kwargs={"column": col},
)
)
# --- Categorical value sets ---
suite.add_expectation(
ExpectationConfiguration(
type="expect_column_values_to_be_in_set",
kwargs={
"column": "event_type",
"value_set": ["view", "click", "complete", "skip"],
},
)
)
suite.add_expectation(
ExpectationConfiguration(
type="expect_column_values_to_be_in_set",
kwargs={
"column": "platform",
"value_set": ["ios", "android", "web"],
},
)
)
# --- Numeric ranges ---
suite.add_expectation(
ExpectationConfiguration(
type="expect_column_values_to_be_between",
kwargs={
"column": "duration_seconds",
"min_value": 0.0,
"max_value": 14400.0, # 4 hours max
"mostly": 0.999,
},
)
)
# --- Volume expectations ---
suite.add_expectation(
ExpectationConfiguration(
type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 500_000,
"max_value": 50_000_000,
},
)
)
# --- Statistical expectations ---
# Event type distribution should roughly match historical proportions
suite.add_expectation(
ExpectationConfiguration(
type="expect_column_proportion_of_unique_values_to_be_between",
kwargs={
"column": "user_id",
"min_value": 0.01,
"max_value": 0.80,
},
)
)
# --- Freshness: most recent timestamp should be within 2 hours ---
# (Custom expectation; illustrative of the pattern)
suite.add_expectation(
ExpectationConfiguration(
type="expect_column_max_to_be_between",
kwargs={
"column": "timestamp",
"min_value": "{{current_time_minus_2h}}",
"max_value": "{{current_time}}",
"parse_strings_as_datetimes": True,
},
)
)
return suite
Several patterns in this suite deserve attention:
The mostly parameter. The duration_seconds expectation uses mostly=0.999, meaning it tolerates up to 0.1% of values outside the range. This is critical. Real-world data is messy: a user leaves a tab open for 6 hours, a clock synchronization issue produces a negative duration, a mobile app reports a NaN. Setting mostly=1.0 on every expectation guarantees false alarms. Setting it too low defeats the purpose. Calibrating mostly is an empirical exercise: run the suite against 30 days of historical data, observe the failure rate, and set mostly to be stricter than the historical worst case but not so strict that normal variance triggers failures.
Volume expectations. The row count expectation (500_000 to 50_000_000) is a coarse but powerful check. If the daily batch normally contains 5-15 million events and today's batch contains 200,000, something is catastrophically wrong upstream — a logging service crashed, an ETL job failed silently, or a schema change filtered out 97% of events. This single expectation would have caught several real incidents at StreamRec during its prototype phase.
Freshness. The timestamp expectation verifies that the data is recent. A pipeline that processes "yesterday's data" but actually processes data from three days ago (because the extraction job silently failed and was backfilled with stale data) will train a model on outdated patterns. Freshness checks catch this.
Running Validation with Checkpoints
An expectation suite defines what to check. A checkpoint defines when and how to check it:
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
from typing import Dict, Any
def create_streamrec_checkpoint(
context: gx.DataContext,
suite_name: str = "streamrec_user_events",
datasource_name: str = "streamrec_delta_lake",
data_asset_name: str = "daily_events",
) -> Checkpoint:
"""Create a GE checkpoint for daily StreamRec event validation.
The checkpoint validates the daily event batch against the
expectation suite and triggers Slack/PagerDuty alerts on failure.
Args:
context: Great Expectations DataContext.
suite_name: Name of the expectation suite to use.
datasource_name: Name of the configured data source.
data_asset_name: Name of the data asset to validate.
Returns:
Checkpoint: Configured checkpoint ready for execution.
"""
checkpoint = Checkpoint(
name="streamrec_daily_validation",
data_context=context,
validations=[
{
"batch_request": {
"datasource_name": datasource_name,
"data_asset_name": data_asset_name,
"options": {"partition_date": "{{current_date}}"},
},
"expectation_suite_name": suite_name,
},
],
action_list=[
# Store validation results for Data Docs
{
"name": "store_validation_result",
"action": {
"class_name": "StoreValidationResultAction",
},
},
# Update Data Docs site
{
"name": "update_data_docs",
"action": {
"class_name": "UpdateDataDocsAction",
},
},
# Send Slack alert on failure
{
"name": "send_slack_on_failure",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK_URL}",
"notify_on": "failure",
"renderer": {
"module_name": "great_expectations.render.renderer."
"slack_renderer",
"class_name": "SlackRenderer",
},
},
},
],
)
return checkpoint
def run_validation(context: gx.DataContext) -> Dict[str, Any]:
"""Execute the StreamRec daily validation checkpoint.
Returns:
Dict containing validation success status and result details.
"""
checkpoint = context.get_checkpoint("streamrec_daily_validation")
result = checkpoint.run()
return {
"success": result.success,
"run_id": str(result.run_id),
"statistics": {
"evaluated_expectations": result.statistics[
"evaluated_expectations"
],
"successful_expectations": result.statistics[
"successful_expectations"
],
"unsuccessful_expectations": result.statistics[
"unsuccessful_expectations"
],
"success_percent": result.statistics["success_percent"],
},
}
The checkpoint orchestrates the full validation workflow: fetch the data, evaluate expectations, store results, update documentation, and alert on failure. In the StreamRec pipeline (Chapter 27), this checkpoint runs as the first step of the daily training DAG — before any feature computation or model training begins. If validation fails, the pipeline halts, and the on-call engineer receives a Slack message with the specific expectations that failed.
Data Docs: Living Documentation
Great Expectations' Data Docs feature auto-generates HTML documentation from expectation suites and validation results. Each expectation becomes a row in a human-readable table: what was checked, what was expected, what was observed, and whether it passed. Over time, Data Docs accumulate a history of validation results that serves as an audit trail.
For the StreamRec team, Data Docs became the single source of truth for data quality. When a product manager asked "has our click stream data quality changed in the last month?", the answer was a URL, not a SQL query. When an auditor at Meridian Financial (Case Study 2) asked "how do you ensure the training data meets regulatory requirements?", the answer was the same: a URL showing 100% pass rates on all 47 expectations for the last 90 days.
28.3 Schema Validation with Pandera
While Great Expectations excels at batch validation — checking a dataset after it has been produced — Pandera excels at inline validation — checking DataFrames as they flow through Python code. Pandera defines schemas as Python classes, integrates with type checkers, and raises exceptions at the point where invalid data enters the pipeline rather than at a downstream checkpoint.
Schema Definition
import pandera as pa
from pandera import Column, Check, Index
from pandera.typing import DataFrame, Series
import pandas as pd
from typing import Optional
class UserEventSchema(pa.DataFrameModel):
"""Pandera schema for StreamRec user event DataFrames.
Validates structure, types, value ranges, and cross-column
constraints at the point where data enters a processing function.
This schema enforces the same contract as the Great Expectations
suite (Section 28.2) but operates inline — raising a SchemaError
immediately when invalid data is encountered rather than producing
a validation report after the fact.
"""
event_id: Series[str] = pa.Field(unique=True, nullable=False)
user_id: Series[str] = pa.Field(nullable=False)
item_id: Series[str] = pa.Field(nullable=False)
event_type: Series[str] = pa.Field(
isin=["view", "click", "complete", "skip"],
nullable=False,
)
timestamp: Series[pd.Timestamp] = pa.Field(nullable=False)
session_id: Series[str] = pa.Field(nullable=False)
platform: Series[str] = pa.Field(
isin=["ios", "android", "web"],
nullable=False,
)
duration_seconds: Optional[Series[float]] = pa.Field(
ge=0.0,
le=14400.0,
nullable=True,
)
class Config:
"""Schema configuration."""
strict = True # Reject extra columns
coerce = False # Do not silently coerce types
@pa.check("duration_seconds", name="view_events_have_duration")
def view_events_require_duration(cls, series: Series[float]) -> bool:
"""Verify that view events have non-null duration.
This is a cross-column check: when event_type is 'view',
duration_seconds must not be null.
Note: Pandera evaluates column-level checks on the column
series; cross-column logic requires a dataframe-level check
(see below).
"""
# Column-level check: duration should be positive when present
return series.dropna().ge(0).all()
@pa.dataframe_check
def view_events_have_duration_df(cls, df: pd.DataFrame) -> bool:
"""Cross-column check: view events must have non-null duration."""
view_mask = df["event_type"] == "view"
return df.loc[view_mask, "duration_seconds"].notna().all()
@pa.dataframe_check
def timestamps_are_ordered_within_sessions(
cls, df: pd.DataFrame
) -> bool:
"""Timestamps should be non-decreasing within each session."""
for _, group in df.groupby("session_id"):
if not group["timestamp"].is_monotonic_increasing:
return False
return True
Using Pandera as a Function Decorator
Pandera's decorator syntax integrates validation into the function signature. The schema is checked automatically when the function is called:
import pandera as pa
from pandera.typing import DataFrame
import pandas as pd
from typing import Dict
@pa.check_types
def compute_session_features(
events: DataFrame[UserEventSchema],
) -> pd.DataFrame:
"""Compute session-level features from validated event data.
Args:
events: User event DataFrame, validated against UserEventSchema
on function entry.
Returns:
DataFrame with one row per session containing aggregated features.
Raises:
pa.errors.SchemaError: If input fails schema validation.
"""
session_features = events.groupby("session_id").agg(
user_id=("user_id", "first"),
n_events=("event_id", "count"),
n_clicks=("event_type", lambda s: (s == "click").sum()),
n_completions=("event_type", lambda s: (s == "complete").sum()),
total_duration=("duration_seconds", "sum"),
n_unique_items=("item_id", "nunique"),
session_start=("timestamp", "min"),
session_end=("timestamp", "max"),
).reset_index()
session_features["click_rate"] = (
session_features["n_clicks"] / session_features["n_events"]
)
session_features["completion_rate"] = (
session_features["n_completions"] / session_features["n_events"]
)
session_features["session_duration_minutes"] = (
(session_features["session_end"] - session_features["session_start"])
.dt.total_seconds() / 60.0
)
return session_features
When compute_session_features is called with a DataFrame that violates the schema — a missing column, a null user_id, an unexpected event_type value — Pandera raises a SchemaError with a detailed message specifying exactly which check failed and on which rows. The error surfaces at the function boundary, not three pipeline stages later when a model produces nonsensical predictions.
Great Expectations vs. Pandera: When to Use Which
| Dimension | Great Expectations | Pandera |
|---|---|---|
| Validation point | After data is produced (checkpoint) | During data processing (inline) |
| Integration | Orchestration systems (Airflow, Dagster) | Python function signatures |
| Backend support | Pandas, Spark, SQL | Pandas (primary), Polars, Dask |
| Statistical checks | Rich library (distribution, correlation) | Basic (custom checks via functions) |
| Documentation | Data Docs (auto-generated HTML) | Schema as code (docstrings) |
| Best for | Pipeline stage gates, audit trails | Library/function input validation |
The two tools are complementary. In the StreamRec pipeline, Great Expectations validates raw data at ingestion (the checkpoint in Section 28.2), while Pandera validates DataFrames at function boundaries within the feature engineering code. A data quality issue caught by Great Expectations prevents the pipeline from running at all; an issue caught by Pandera pinpoints the exact function where the contract is violated.
28.4 Statistical Data Validation
Schema validation catches structural problems: wrong types, missing columns, null values. But many data quality issues are distributional: the data has the right schema but the wrong statistics. A feature that normally has mean 0.5 suddenly has mean 0.05 because an upstream normalization changed. A column that normally has 3% nulls suddenly has 40% nulls because a mobile app update broke event logging for Android users. These issues require statistical validation.
Distribution Shift Detection
The Population Stability Index (PSI) measures how much a feature's distribution has shifted between a reference period and the current period:
$$\text{PSI} = \sum_{i=1}^{B} (p_i - q_i) \cdot \ln\left(\frac{p_i}{q_i}\right)$$
where $p_i$ is the proportion of observations in bin $i$ for the current data and $q_i$ is the proportion in bin $i$ for the reference data. PSI is a symmetrized version of the KL divergence. The standard interpretation:
| PSI Value | Interpretation |
|---|---|
| $< 0.1$ | No significant shift |
| $0.1 - 0.25$ | Moderate shift — investigate |
| $> 0.25$ | Significant shift — action required |
import numpy as np
from typing import Tuple
def compute_psi(
reference: np.ndarray,
current: np.ndarray,
n_bins: int = 10,
epsilon: float = 1e-6,
) -> Tuple[float, np.ndarray]:
"""Compute Population Stability Index between two distributions.
PSI measures the shift between a reference distribution and a
current distribution using binned proportions. It is widely used
in credit scoring (where regulators expect PSI monitoring) and
is equally applicable to any feature in an ML pipeline.
Args:
reference: Reference distribution (e.g., training data).
current: Current distribution (e.g., today's serving data).
n_bins: Number of equal-width bins.
epsilon: Small constant to avoid division by zero / log(0).
Returns:
Tuple of (total PSI, per-bin PSI contributions).
"""
# Create bins from reference distribution
breakpoints = np.linspace(
min(reference.min(), current.min()),
max(reference.max(), current.max()),
n_bins + 1,
)
# Compute bin proportions
ref_counts = np.histogram(reference, bins=breakpoints)[0]
cur_counts = np.histogram(current, bins=breakpoints)[0]
ref_proportions = ref_counts / ref_counts.sum() + epsilon
cur_proportions = cur_counts / cur_counts.sum() + epsilon
# Compute PSI per bin
psi_per_bin = (cur_proportions - ref_proportions) * np.log(
cur_proportions / ref_proportions
)
return float(psi_per_bin.sum()), psi_per_bin
Feature Importance Stability
PSI detects shift in individual features. But a feature can shift without affecting the model (if the model does not rely on it) and the model can degrade without any single feature shifting (if a subtle joint distribution change affects the decision boundary). Feature importance stability measures whether the model's reliance on features has changed:
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, List, Optional
@dataclass
class FeatureImportanceStability:
"""Track feature importance stability across model versions.
Compares permutation importance rankings between the reference
model and the current model to detect changes in feature reliance
that may indicate data or concept drift.
Attributes:
reference_importances: Feature importance dict from reference model.
current_importances: Feature importance dict from current model.
feature_names: Ordered list of feature names.
"""
reference_importances: Dict[str, float]
current_importances: Dict[str, float]
feature_names: List[str] = field(default_factory=list)
def __post_init__(self) -> None:
if not self.feature_names:
self.feature_names = sorted(self.reference_importances.keys())
def rank_correlation(self) -> float:
"""Compute Spearman rank correlation between importance rankings.
A correlation near 1.0 indicates stable feature reliance.
A correlation below 0.8 suggests the model has changed which
features it relies on, warranting investigation.
Returns:
Spearman rho between reference and current importance rankings.
"""
ref_values = np.array(
[self.reference_importances[f] for f in self.feature_names]
)
cur_values = np.array(
[self.current_importances[f] for f in self.feature_names]
)
# Compute ranks (scipy-free implementation)
ref_ranks = _rank_array(ref_values)
cur_ranks = _rank_array(cur_values)
n = len(ref_ranks)
d_squared = np.sum((ref_ranks - cur_ranks) ** 2)
rho = 1.0 - (6.0 * d_squared) / (n * (n ** 2 - 1))
return float(rho)
def top_k_overlap(self, k: int = 10) -> float:
"""Fraction of top-k features that are the same in both models.
Args:
k: Number of top features to compare.
Returns:
Jaccard similarity of top-k feature sets (0.0 to 1.0).
"""
ref_sorted = sorted(
self.reference_importances.items(),
key=lambda x: x[1],
reverse=True,
)
cur_sorted = sorted(
self.current_importances.items(),
key=lambda x: x[1],
reverse=True,
)
ref_top_k = {name for name, _ in ref_sorted[:k]}
cur_top_k = {name for name, _ in cur_sorted[:k]}
intersection = ref_top_k & cur_top_k
union = ref_top_k | cur_top_k
return len(intersection) / len(union)
def importance_shift_report(
self, threshold: float = 0.05
) -> List[Dict[str, float]]:
"""Identify features with large importance changes.
Args:
threshold: Minimum absolute change in normalized importance
to flag a feature.
Returns:
List of dicts with feature name, reference importance,
current importance, and absolute change.
"""
# Normalize importances to sum to 1
ref_total = sum(self.reference_importances.values())
cur_total = sum(self.current_importances.values())
shifted_features = []
for feat in self.feature_names:
ref_norm = self.reference_importances[feat] / ref_total
cur_norm = self.current_importances[feat] / cur_total
abs_change = abs(cur_norm - ref_norm)
if abs_change >= threshold:
shifted_features.append({
"feature": feat,
"reference_importance": round(ref_norm, 4),
"current_importance": round(cur_norm, 4),
"absolute_change": round(abs_change, 4),
})
return sorted(
shifted_features, key=lambda x: x["absolute_change"], reverse=True
)
def _rank_array(arr: np.ndarray) -> np.ndarray:
"""Compute ranks of array elements (1-based, ascending).
Args:
arr: 1D numpy array.
Returns:
Array of ranks.
"""
order = arr.argsort()
ranks = np.empty_like(order, dtype=float)
ranks[order] = np.arange(1, len(arr) + 1, dtype=float)
return ranks
Integrating Statistical Validation into GE
Great Expectations supports custom expectations. The following registers PSI as a native expectation:
from great_expectations.expectations.expectation import BatchExpectation
from great_expectations.core import ExpectationConfiguration
from typing import Dict, Any, Optional
import numpy as np
class ExpectColumnPsiToBeLessThan(BatchExpectation):
"""Custom GE expectation: PSI of a column must be below a threshold.
Compares the current batch's distribution of a column against a
stored reference distribution using the Population Stability Index.
Args (via kwargs):
column: Column name.
reference_distribution: List of reference values or path to stored ref.
threshold: Maximum PSI value (default 0.25).
n_bins: Number of bins for PSI computation (default 10).
"""
expectation_type = "expect_column_psi_to_be_less_than"
default_kwarg_values = {
"threshold": 0.25,
"n_bins": 10,
}
def _validate(
self,
metrics: Dict[str, Any],
runtime_configuration: Optional[Dict[str, Any]] = None,
result_format: Optional[str] = None,
) -> Dict[str, Any]:
column_values = metrics["column_values.nonnull"]
reference = np.array(self.configuration.kwargs["reference_distribution"])
threshold = self.configuration.kwargs.get("threshold", 0.25)
n_bins = self.configuration.kwargs.get("n_bins", 10)
current = np.array(column_values)
psi_value, _ = compute_psi(reference, current, n_bins=n_bins)
return {
"success": psi_value < threshold,
"result": {
"observed_value": round(psi_value, 4),
"threshold": threshold,
"details": {
"interpretation": (
"no significant shift" if psi_value < 0.1
else "moderate shift" if psi_value < 0.25
else "significant shift"
),
},
},
}
28.5 Data Contracts
A data contract is a formal agreement between a data producer and a data consumer about the structure, semantics, quality, and service level of a data asset. It is the ML analogue of an API contract in software engineering.
Why Data Contracts Matter
In the StreamRec pipeline (Chapter 24), the recommendation model depends on data produced by at least five upstream teams:
- Client engineering produces click stream events
- Content engineering produces item metadata (title, genre, duration)
- User engineering produces user profile data (registration date, preferences)
- Data engineering produces computed features (engagement rates, trending scores)
- Trust & safety produces content moderation labels
When any of these teams changes their data output — adds a column, renames a field, changes the encoding of a categorical variable, modifies the logging frequency — the recommendation pipeline may break. Without data contracts, these changes are silent. The upstream team does not know that the recommendation pipeline depends on the exact format of their output, because the dependency is implicit.
Data contracts make these dependencies explicit:
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from enum import Enum
from datetime import timedelta
class DataType(Enum):
"""Supported data types in data contracts."""
STRING = "string"
INTEGER = "integer"
FLOAT = "float"
BOOLEAN = "boolean"
TIMESTAMP = "timestamp"
ARRAY = "array"
MAP = "map"
class SLALevel(Enum):
"""Service level for data delivery."""
BEST_EFFORT = "best_effort"
STANDARD = "standard" # 99% on-time
CRITICAL = "critical" # 99.9% on-time
@dataclass
class ColumnContract:
"""Contract for a single column in a data asset.
Attributes:
name: Column name.
data_type: Expected data type.
nullable: Whether null values are permitted.
description: Human-readable description of the column.
valid_values: For categoricals, the set of allowed values.
min_value: For numerics, the minimum allowed value.
max_value: For numerics, the maximum allowed value.
pii: Whether this column contains personally identifiable info.
deprecated: Whether this column is scheduled for removal.
"""
name: str
data_type: DataType
nullable: bool = False
description: str = ""
valid_values: Optional[Set[str]] = None
min_value: Optional[float] = None
max_value: Optional[float] = None
pii: bool = False
deprecated: bool = False
@dataclass
class DataContract:
"""Formal agreement between a data producer and consumer.
A data contract specifies the schema, quality requirements,
delivery SLA, and ownership for a data asset. It serves as the
enforceable interface between pipeline stages.
Attributes:
name: Contract identifier (e.g., 'streamrec_click_events_v2').
version: Semantic version of the contract.
producer: Team or service that produces the data.
consumers: Teams or services that consume the data.
description: Purpose and contents of the data asset.
columns: List of column-level contracts.
delivery_sla: Expected delivery frequency and lateness tolerance.
freshness_requirement: Maximum age of data at consumption time.
min_row_count: Minimum expected rows per delivery.
max_row_count: Maximum expected rows per delivery.
quality_expectations: GE expectation suite name for validation.
owner_email: Contact email for the data owner.
breaking_change_policy: How breaking changes are communicated.
"""
name: str
version: str
producer: str
consumers: List[str]
description: str
columns: List[ColumnContract]
delivery_sla: SLALevel = SLALevel.STANDARD
freshness_requirement: timedelta = timedelta(hours=24)
min_row_count: int = 0
max_row_count: int = 100_000_000
quality_expectations: str = ""
owner_email: str = ""
breaking_change_policy: str = "30 days notice to all consumers"
def validate_schema(self, actual_columns: Dict[str, str]) -> List[str]:
"""Validate that actual data schema matches the contract.
Args:
actual_columns: Dict mapping column name to actual data type.
Returns:
List of violation messages (empty if schema matches).
"""
violations = []
expected_names = {col.name for col in self.columns
if not col.deprecated}
actual_names = set(actual_columns.keys())
# Missing columns
for missing in expected_names - actual_names:
violations.append(
f"Missing required column: '{missing}'"
)
# Unexpected columns (warning, not violation in non-strict mode)
for extra in actual_names - expected_names:
violations.append(
f"Unexpected column not in contract: '{extra}'"
)
# Type mismatches
for col in self.columns:
if col.name in actual_columns:
expected_type = col.data_type.value
actual_type = actual_columns[col.name]
if expected_type != actual_type:
violations.append(
f"Column '{col.name}': expected type "
f"'{expected_type}', got '{actual_type}'"
)
return violations
def to_great_expectations_suite(self) -> Dict:
"""Generate a Great Expectations suite from this contract.
Returns:
Dict representation of a GE expectation suite.
"""
expectations = []
# Table-level: column set
expectations.append({
"expectation_type": "expect_table_columns_to_match_set",
"kwargs": {
"column_set": [
c.name for c in self.columns if not c.deprecated
],
},
})
# Row count bounds
if self.min_row_count > 0 or self.max_row_count < 100_000_000:
expectations.append({
"expectation_type": (
"expect_table_row_count_to_be_between"
),
"kwargs": {
"min_value": self.min_row_count,
"max_value": self.max_row_count,
},
})
# Column-level expectations
for col in self.columns:
if col.deprecated:
continue
if not col.nullable:
expectations.append({
"expectation_type": (
"expect_column_values_to_not_be_null"
),
"kwargs": {"column": col.name},
})
if col.valid_values:
expectations.append({
"expectation_type": (
"expect_column_values_to_be_in_set"
),
"kwargs": {
"column": col.name,
"value_set": sorted(col.valid_values),
},
})
if col.min_value is not None or col.max_value is not None:
kwargs = {"column": col.name}
if col.min_value is not None:
kwargs["min_value"] = col.min_value
if col.max_value is not None:
kwargs["max_value"] = col.max_value
expectations.append({
"expectation_type": (
"expect_column_values_to_be_between"
),
"kwargs": kwargs,
})
return {
"expectation_suite_name": f"{self.name}_contract",
"expectations": expectations,
}
Defining the StreamRec Click Events Contract
streamrec_events_contract = DataContract(
name="streamrec_click_events",
version="2.1.0",
producer="client-engineering",
consumers=["ml-recommendations", "analytics", "trust-safety"],
description=(
"User interaction events from StreamRec clients. "
"One row per user-content interaction. "
"Used for recommendation model training, engagement analytics, "
"and content moderation signal aggregation."
),
columns=[
ColumnContract(
name="event_id", data_type=DataType.STRING,
nullable=False, description="UUID v4 event identifier",
),
ColumnContract(
name="user_id", data_type=DataType.STRING,
nullable=False, description="Stable user identifier",
pii=True,
),
ColumnContract(
name="item_id", data_type=DataType.STRING,
nullable=False, description="Content item identifier",
),
ColumnContract(
name="event_type", data_type=DataType.STRING,
nullable=False, description="Interaction type",
valid_values={"view", "click", "complete", "skip"},
),
ColumnContract(
name="timestamp", data_type=DataType.TIMESTAMP,
nullable=False, description="UTC event timestamp",
),
ColumnContract(
name="session_id", data_type=DataType.STRING,
nullable=False, description="Client session identifier",
),
ColumnContract(
name="platform", data_type=DataType.STRING,
nullable=False, description="Client platform",
valid_values={"ios", "android", "web"},
),
ColumnContract(
name="duration_seconds", data_type=DataType.FLOAT,
nullable=True, description="Time spent on item",
min_value=0.0, max_value=14400.0,
),
],
delivery_sla=SLALevel.CRITICAL,
freshness_requirement=timedelta(hours=2),
min_row_count=500_000,
max_row_count=50_000_000,
quality_expectations="streamrec_user_events",
owner_email="client-eng-data@streamrec.example.com",
breaking_change_policy=(
"30 days notice via #data-contracts Slack channel. "
"All consumers must acknowledge before deployment."
),
)
Schema Evolution and Backward Compatibility
Data contracts must support evolution. New columns are added, old columns are deprecated, and value sets expand. The key principle is backward compatibility: a change is backward-compatible if existing consumers continue to work without modification.
| Change Type | Backward Compatible? | Policy |
|---|---|---|
| Add nullable column | Yes | No notice required |
| Add non-nullable column | No | Breaking: 30-day notice |
| Remove column | No | Breaking: deprecate first, remove after 90 days |
| Expand valid value set | Usually yes | Notify consumers |
| Narrow valid value set | No | Breaking: 30-day notice |
| Change column type | No | Breaking: new contract version |
| Change column semantics | No | Breaking: new contract version |
28.6 Contract Testing Between Pipeline Stages
Data contracts define the interface. Contract tests verify that the interface is honored at runtime. A contract test runs at the boundary between two pipeline stages and verifies that the output of stage $N$ matches the input contract of stage $N+1$.
from dataclasses import dataclass
from typing import Dict, List, Callable, Any, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class ContractTestResult:
"""Result of a contract test execution.
Attributes:
contract_name: Name of the data contract being tested.
producer_stage: Pipeline stage that produced the data.
consumer_stage: Pipeline stage that will consume the data.
passed: Whether all contract checks passed.
violations: List of violation messages.
metadata: Additional context (row count, timestamp, etc.).
"""
contract_name: str
producer_stage: str
consumer_stage: str
passed: bool
violations: List[str]
metadata: Dict[str, Any]
class PipelineContractEnforcer:
"""Enforce data contracts between pipeline stages.
Sits at the boundary between pipeline stages and validates that
the output of each stage matches the input contract of the next.
Integrates with pipeline orchestrators (Dagster, Airflow) via
the validate_handoff method.
Attributes:
contracts: Dict mapping contract name to DataContract.
test_results: History of contract test executions.
"""
def __init__(self) -> None:
self.contracts: Dict[str, DataContract] = {}
self.test_results: List[ContractTestResult] = []
def register_contract(self, contract: DataContract) -> None:
"""Register a data contract.
Args:
contract: DataContract instance to register.
"""
self.contracts[contract.name] = contract
logger.info(
"Registered contract '%s' v%s (producer: %s, consumers: %s)",
contract.name, contract.version, contract.producer,
contract.consumers,
)
def validate_handoff(
self,
contract_name: str,
producer_stage: str,
consumer_stage: str,
actual_columns: Dict[str, str],
row_count: int,
custom_checks: Optional[List[Callable[[], bool]]] = None,
) -> ContractTestResult:
"""Validate a data handoff between pipeline stages.
Called at the boundary between stages — after the producer
writes its output and before the consumer reads its input.
Args:
contract_name: Name of the contract to validate against.
producer_stage: Name of the producing pipeline stage.
consumer_stage: Name of the consuming pipeline stage.
actual_columns: Dict mapping column name to data type string.
row_count: Number of rows in the output.
custom_checks: Optional list of callable checks returning bool.
Returns:
ContractTestResult with pass/fail status and violation details.
Raises:
KeyError: If contract_name is not registered.
"""
contract = self.contracts[contract_name]
violations = []
# Schema validation
schema_violations = contract.validate_schema(actual_columns)
violations.extend(schema_violations)
# Row count validation
if row_count < contract.min_row_count:
violations.append(
f"Row count {row_count:,} below minimum "
f"{contract.min_row_count:,}"
)
if row_count > contract.max_row_count:
violations.append(
f"Row count {row_count:,} above maximum "
f"{contract.max_row_count:,}"
)
# Custom checks
if custom_checks:
for i, check in enumerate(custom_checks):
try:
if not check():
violations.append(
f"Custom check {i + 1} failed"
)
except Exception as e:
violations.append(
f"Custom check {i + 1} raised exception: {e}"
)
result = ContractTestResult(
contract_name=contract_name,
producer_stage=producer_stage,
consumer_stage=consumer_stage,
passed=len(violations) == 0,
violations=violations,
metadata={
"contract_version": contract.version,
"row_count": row_count,
"n_columns": len(actual_columns),
},
)
self.test_results.append(result)
if not result.passed:
logger.error(
"Contract '%s' VIOLATED at %s -> %s: %s",
contract_name, producer_stage, consumer_stage,
"; ".join(violations),
)
else:
logger.info(
"Contract '%s' PASSED at %s -> %s (%d rows)",
contract_name, producer_stage, consumer_stage,
row_count,
)
return result
In the StreamRec Dagster pipeline (Chapter 27), contract tests run as sensor-triggered checks between each asset materialization. If the click events contract test fails, the downstream feature computation asset does not execute, and the pipeline sends an alert to both the producer team (client engineering) and the consumer team (ML recommendations).
28.7 Behavioral Testing for ML Models: The CheckList Framework
In 2020, Ribeiro, Wu, Guestrin, and Singh published "Beyond Accuracy: Behavioral Testing of NLP Models with CheckList" at ACL. The paper introduced a testing methodology inspired by software engineering's concept of capabilities: rather than measuring aggregate accuracy on a held-out set, test the model's behavior under systematic perturbations. The paper demonstrated that models achieving state-of-the-art accuracy on standard benchmarks failed spectacularly on simple behavioral tests — a sentiment analysis model that achieved 92% accuracy on SST-2 but predicted negative sentiment for "the food was not bad" 62% of the time.
The CheckList framework defines three types of behavioral tests:
Minimum Functionality Tests (MFT)
An MFT is the ML equivalent of a unit test: a small, focused test set that checks a specific capability. The model must achieve a minimum performance threshold on the test set.
For StreamRec's recommendation model:
- Popularity baseline. The model must outperform a popularity-based recommender (recommend the globally most popular items) on every user segment. This is the absolute floor — a model that cannot beat "show everyone the same popular items" is worse than useless.
- Per-segment performance. The model must achieve Recall@20 $\geq 0.10$ on every user segment (defined by tenure: new, medium, long-term), every platform (iOS, Android, web), and every primary content category.
- Cold-start viability. For users with fewer than 5 interactions, the model must achieve Recall@20 $\geq 0.05$ — worse than warm users, but better than random.
Invariance Tests (INV)
An invariance test perturbs the input in a way that should not change the output. If the model's predictions change, the model has learned a spurious correlation.
For StreamRec:
- Name invariance. Changing a user's display name should not change their recommendations. If it does, the model is encoding name-based features (which may correlate with demographics) as a proxy for preference.
- Timestamp jitter. Shifting all event timestamps by $\pm 5$ minutes should not significantly change recommendations. If it does, the model is overfitting to exact timing rather than sequential patterns.
- Platform invariance. A user who watches the same content on iOS and Android should receive similar recommendations on both platforms. (Similar, not identical — some platform-specific effects are legitimate, such as shorter content on mobile.)
Directional Expectation Tests (DIR)
A directional test perturbs the input in a way that should change the output in a predictable direction. If the model's predictions do not move in the expected direction, the model has failed to learn a basic relationship.
For StreamRec:
- Genre affinity. Adding 10 science fiction completions to a user's history should increase the model's science fiction recommendation scores. The magnitude is unspecified — but the direction must be correct.
- Recency. A user who watched a cooking show 1 hour ago should receive a higher score for cooking-related content than a user who watched the same show 30 days ago. The model should exhibit recency sensitivity.
- Engagement signal. Changing a user's interaction with an item from "skip" to "complete" should increase the model's score for similar items. Completions are stronger positive signals than skips.
Implementation
from dataclasses import dataclass, field
from typing import (
Callable, Dict, List, Optional, Set, Tuple, Any, Protocol,
)
from enum import Enum
import numpy as np
import logging
logger = logging.getLogger(__name__)
class TestType(Enum):
"""Types of behavioral tests in the CheckList framework."""
MINIMUM_FUNCTIONALITY = "MFT"
INVARIANCE = "INV"
DIRECTIONAL = "DIR"
@dataclass
class BehavioralTestCase:
"""A single test case for behavioral testing.
Attributes:
name: Human-readable test name.
test_type: MFT, INV, or DIR.
description: What capability this test verifies.
input_original: Original input to the model.
input_perturbed: Perturbed input (for INV/DIR tests).
expected_direction: For DIR tests, 'increase' or 'decrease'.
minimum_threshold: For MFT tests, minimum acceptable metric.
tolerance: For INV tests, maximum acceptable change.
"""
name: str
test_type: TestType
description: str
input_original: Any = None
input_perturbed: Any = None
expected_direction: Optional[str] = None # 'increase' or 'decrease'
minimum_threshold: Optional[float] = None
tolerance: Optional[float] = None
@dataclass
class BehavioralTestResult:
"""Result of executing a behavioral test case.
Attributes:
test_case: The test case that was executed.
passed: Whether the test passed.
metric_value: The observed metric value.
details: Additional context about the result.
"""
test_case: BehavioralTestCase
passed: bool
metric_value: float
details: str = ""
class RecommenderModel(Protocol):
"""Protocol for recommendation models under behavioral testing."""
def recommend(
self, user_features: Dict[str, Any], k: int = 20
) -> List[Tuple[str, float]]:
"""Return top-k (item_id, score) recommendations."""
...
class BehavioralTestSuite:
"""A suite of behavioral tests for an ML model.
Implements the CheckList framework (Ribeiro et al., 2020) for
recommendation models. Tests are organized by type (MFT, INV, DIR)
and can be run individually or as a complete suite.
Attributes:
name: Suite name.
tests: List of registered test cases.
results: Results from the most recent suite execution.
"""
def __init__(self, name: str) -> None:
self.name = name
self.tests: List[BehavioralTestCase] = []
self.results: List[BehavioralTestResult] = []
def add_test(self, test_case: BehavioralTestCase) -> None:
"""Register a behavioral test case.
Args:
test_case: Test case to add to the suite.
"""
self.tests.append(test_case)
def run_invariance_test(
self,
model: RecommenderModel,
test_case: BehavioralTestCase,
k: int = 20,
) -> BehavioralTestResult:
"""Execute an invariance test.
Checks that the model's recommendations do not change
significantly when the input is perturbed in a way that
should be irrelevant.
The metric is Jaccard similarity between the top-k
recommendation sets before and after perturbation.
Args:
model: The recommendation model to test.
test_case: INV test case with original and perturbed inputs.
k: Number of recommendations to compare.
Returns:
BehavioralTestResult with Jaccard similarity metric.
"""
original_recs = model.recommend(test_case.input_original, k=k)
perturbed_recs = model.recommend(test_case.input_perturbed, k=k)
original_items = {item_id for item_id, _ in original_recs}
perturbed_items = {item_id for item_id, _ in perturbed_recs}
intersection = original_items & perturbed_items
union = original_items | perturbed_items
jaccard = len(intersection) / len(union) if union else 1.0
tolerance = test_case.tolerance or 0.8
passed = jaccard >= tolerance
return BehavioralTestResult(
test_case=test_case,
passed=passed,
metric_value=jaccard,
details=(
f"Jaccard similarity: {jaccard:.3f} "
f"(threshold: {tolerance:.3f}). "
f"Original: {len(original_items)} items, "
f"Perturbed: {len(perturbed_items)} items, "
f"Overlap: {len(intersection)} items."
),
)
def run_directional_test(
self,
model: RecommenderModel,
test_case: BehavioralTestCase,
target_items: Set[str],
k: int = 20,
) -> BehavioralTestResult:
"""Execute a directional expectation test.
Checks that the model's scores for target items move in the
expected direction when the input is perturbed.
Args:
model: The recommendation model to test.
test_case: DIR test case with original, perturbed inputs,
and expected direction.
target_items: Set of item IDs whose scores should change.
k: Number of recommendations to retrieve.
Returns:
BehavioralTestResult with fraction of items moving in
the expected direction.
"""
original_recs = model.recommend(test_case.input_original, k=k)
perturbed_recs = model.recommend(test_case.input_perturbed, k=k)
original_scores = {
item_id: score for item_id, score in original_recs
}
perturbed_scores = {
item_id: score for item_id, score in perturbed_recs
}
# For items in both recommendation sets, check direction
common_items = (
set(original_scores.keys())
& set(perturbed_scores.keys())
& target_items
)
if not common_items:
return BehavioralTestResult(
test_case=test_case,
passed=False,
metric_value=0.0,
details="No target items found in both recommendation sets.",
)
correct_direction = 0
for item_id in common_items:
delta = perturbed_scores[item_id] - original_scores[item_id]
if test_case.expected_direction == "increase" and delta > 0:
correct_direction += 1
elif test_case.expected_direction == "decrease" and delta < 0:
correct_direction += 1
fraction_correct = correct_direction / len(common_items)
threshold = 0.7 # At least 70% of items should move correctly
passed = fraction_correct >= threshold
return BehavioralTestResult(
test_case=test_case,
passed=passed,
metric_value=fraction_correct,
details=(
f"{correct_direction}/{len(common_items)} target items "
f"moved in the expected direction "
f"('{test_case.expected_direction}'). "
f"Fraction: {fraction_correct:.3f} "
f"(threshold: {threshold:.3f})."
),
)
def run_mft(
self,
metric_value: float,
test_case: BehavioralTestCase,
) -> BehavioralTestResult:
"""Execute a minimum functionality test.
Checks that a pre-computed metric meets the minimum threshold.
Args:
metric_value: The observed metric value.
test_case: MFT test case with minimum threshold.
Returns:
BehavioralTestResult with pass/fail based on threshold.
"""
threshold = test_case.minimum_threshold or 0.0
passed = metric_value >= threshold
return BehavioralTestResult(
test_case=test_case,
passed=passed,
metric_value=metric_value,
details=(
f"Observed: {metric_value:.4f}, "
f"Threshold: {threshold:.4f}."
),
)
def summary(self) -> Dict[str, Any]:
"""Generate a summary of the most recent suite execution.
Returns:
Dict with counts by test type and overall pass rate.
"""
by_type: Dict[str, Dict[str, int]] = {}
for result in self.results:
ttype = result.test_case.test_type.value
if ttype not in by_type:
by_type[ttype] = {"passed": 0, "failed": 0, "total": 0}
by_type[ttype]["total"] += 1
if result.passed:
by_type[ttype]["passed"] += 1
else:
by_type[ttype]["failed"] += 1
total = len(self.results)
total_passed = sum(1 for r in self.results if r.passed)
return {
"suite_name": self.name,
"total_tests": total,
"total_passed": total_passed,
"total_failed": total - total_passed,
"pass_rate": total_passed / total if total > 0 else 0.0,
"by_type": by_type,
"failed_tests": [
{
"name": r.test_case.name,
"type": r.test_case.test_type.value,
"metric": r.metric_value,
"details": r.details,
}
for r in self.results
if not r.passed
],
}
28.8 Building a Behavioral Test Suite for StreamRec
With the framework in place, we construct the full behavioral test suite for the StreamRec recommendation model. This suite runs as part of the model validation pipeline (Section 28.10) — after training completes and before the model is promoted to production.
def build_streamrec_behavioral_suite() -> BehavioralTestSuite:
"""Construct the complete behavioral test suite for StreamRec.
Returns:
BehavioralTestSuite with MFT, INV, and DIR tests covering
the core capabilities of the recommendation model.
"""
suite = BehavioralTestSuite(name="streamrec_recommendation_v2")
# --- Minimum Functionality Tests ---
suite.add_test(BehavioralTestCase(
name="mft_popularity_baseline_overall",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description=(
"Model must outperform popularity baseline on Recall@20 "
"across all users."
),
minimum_threshold=0.15, # Popularity baseline is ~0.08
))
suite.add_test(BehavioralTestCase(
name="mft_popularity_baseline_new_users",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description=(
"Model must outperform popularity baseline on Recall@20 "
"for new users (< 5 interactions)."
),
minimum_threshold=0.05,
))
suite.add_test(BehavioralTestCase(
name="mft_popularity_baseline_ios",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description=(
"Model must outperform popularity baseline on Recall@20 "
"for iOS users."
),
minimum_threshold=0.12,
))
suite.add_test(BehavioralTestCase(
name="mft_popularity_baseline_android",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description=(
"Model must outperform popularity baseline on Recall@20 "
"for Android users."
),
minimum_threshold=0.12,
))
suite.add_test(BehavioralTestCase(
name="mft_popularity_baseline_web",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description=(
"Model must outperform popularity baseline on Recall@20 "
"for web users."
),
minimum_threshold=0.10,
))
suite.add_test(BehavioralTestCase(
name="mft_ndcg_minimum",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description=(
"Model must achieve NDCG@20 >= 0.12 on the holdout set."
),
minimum_threshold=0.12,
))
# --- Invariance Tests ---
suite.add_test(BehavioralTestCase(
name="inv_user_name_change",
test_type=TestType.INVARIANCE,
description=(
"Changing a user's display name must not change their "
"recommendations. Tests that the model does not encode "
"name-based features as preference proxies."
),
input_original={
"user_id": "u_test_001",
"display_name": "Alice Johnson",
"interaction_history": ["item_sci_fi_01", "item_sci_fi_02"],
},
input_perturbed={
"user_id": "u_test_001",
"display_name": "Xiang Wei",
"interaction_history": ["item_sci_fi_01", "item_sci_fi_02"],
},
tolerance=0.95, # 95% overlap required
))
suite.add_test(BehavioralTestCase(
name="inv_timestamp_jitter",
test_type=TestType.INVARIANCE,
description=(
"Shifting event timestamps by +/- 5 minutes must not "
"significantly change recommendations."
),
input_original={
"user_id": "u_test_002",
"timestamps": ["2026-01-15T10:00:00", "2026-01-15T10:30:00"],
},
input_perturbed={
"user_id": "u_test_002",
"timestamps": ["2026-01-15T10:03:00", "2026-01-15T10:27:00"],
},
tolerance=0.85,
))
suite.add_test(BehavioralTestCase(
name="inv_platform_switch",
test_type=TestType.INVARIANCE,
description=(
"A user with the same viewing history on iOS and Android "
"should receive similar (not identical) recommendations."
),
input_original={
"user_id": "u_test_003",
"platform": "ios",
"interaction_history": ["item_comedy_01", "item_comedy_02"],
},
input_perturbed={
"user_id": "u_test_003",
"platform": "android",
"interaction_history": ["item_comedy_01", "item_comedy_02"],
},
tolerance=0.70, # Lower tolerance: some platform effects OK
))
# --- Directional Expectation Tests ---
suite.add_test(BehavioralTestCase(
name="dir_scifi_affinity",
test_type=TestType.DIRECTIONAL,
description=(
"Adding 10 science fiction completions to a user's history "
"should increase science fiction recommendation scores."
),
input_original={
"user_id": "u_test_004",
"interaction_history": ["item_general_01", "item_general_02"],
},
input_perturbed={
"user_id": "u_test_004",
"interaction_history": [
"item_general_01", "item_general_02",
"item_sci_fi_01", "item_sci_fi_02", "item_sci_fi_03",
"item_sci_fi_04", "item_sci_fi_05", "item_sci_fi_06",
"item_sci_fi_07", "item_sci_fi_08", "item_sci_fi_09",
"item_sci_fi_10",
],
},
expected_direction="increase",
))
suite.add_test(BehavioralTestCase(
name="dir_recency_effect",
test_type=TestType.DIRECTIONAL,
description=(
"A recent interaction should produce higher relevance "
"scores for similar content than an old interaction."
),
input_original={
"user_id": "u_test_005",
"interaction_history": [
{"item": "item_cooking_01", "days_ago": 30},
],
},
input_perturbed={
"user_id": "u_test_005",
"interaction_history": [
{"item": "item_cooking_01", "days_ago": 1},
],
},
expected_direction="increase",
))
suite.add_test(BehavioralTestCase(
name="dir_completion_vs_skip",
test_type=TestType.DIRECTIONAL,
description=(
"A 'complete' signal should produce higher scores for "
"similar content than a 'skip' signal."
),
input_original={
"user_id": "u_test_006",
"interaction_history": [
{"item": "item_drama_01", "event_type": "skip"},
],
},
input_perturbed={
"user_id": "u_test_006",
"interaction_history": [
{"item": "item_drama_01", "event_type": "complete"},
],
},
expected_direction="increase",
))
return suite
The test suite encodes domain knowledge that no amount of aggregate accuracy measurement can capture. A model with Recall@20 of 0.22 on the holdout set might be an excellent model — or it might be a model that achieves 0.30 on long-term iOS users and 0.03 on new Android users, or a model that recommends action movies to everyone regardless of their history, or a model that is sensitive to user names. Behavioral tests surface these pathologies.
28.9 Behavioral Testing for Credit Scoring
The StreamRec behavioral tests verify recommendation quality. For the Meridian Financial credit scoring model, behavioral tests verify regulatory compliance and fairness — a qualitatively different but structurally identical testing problem.
Know How Your Model Is Wrong: Credit scoring models are subject to the Equal Credit Opportunity Act (ECOA), the Fair Credit Reporting Act (FCRA), and model risk management guidance (SR 11-7 / OCC 2011-12). Regulators do not ask "what is your model's AUC?" They ask: "Does your model discriminate on the basis of race, sex, age, or national origin? Does it produce consistent results? Can you explain its decisions?" Behavioral tests translate these regulatory questions into executable code.
def build_credit_scoring_behavioral_suite() -> BehavioralTestSuite:
"""Construct behavioral tests for Meridian Financial credit model.
Tests encode regulatory requirements (ECOA, FCRA), model risk
management expectations (SR 11-7), and fair lending principles.
Returns:
BehavioralTestSuite for the credit scoring model.
"""
suite = BehavioralTestSuite(name="meridian_credit_scoring_v3")
# --- Minimum Functionality Tests ---
suite.add_test(BehavioralTestCase(
name="mft_auc_overall",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description="AUC must meet regulatory minimum on holdout set.",
minimum_threshold=0.78,
))
suite.add_test(BehavioralTestCase(
name="mft_auc_by_income_quartile",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description=(
"AUC must exceed 0.72 for each income quartile. "
"Prevents a model that performs well overall but fails "
"on low-income applicants."
),
minimum_threshold=0.72,
))
suite.add_test(BehavioralTestCase(
name="mft_calibration",
test_type=TestType.MINIMUM_FUNCTIONALITY,
description=(
"Hosmer-Lemeshow calibration p-value must exceed 0.05. "
"Ensures predicted probabilities match observed default rates."
),
minimum_threshold=0.05,
))
# --- Invariance Tests (Fair Lending) ---
suite.add_test(BehavioralTestCase(
name="inv_gender_invariance",
test_type=TestType.INVARIANCE,
description=(
"Changing applicant gender (with all else equal) must not "
"change the credit decision. ECOA prohibits discrimination "
"on the basis of sex."
),
input_original={
"income": 75000, "credit_score": 720,
"employment_years": 5, "gender": "male",
"debt_to_income": 0.28,
},
input_perturbed={
"income": 75000, "credit_score": 720,
"employment_years": 5, "gender": "female",
"debt_to_income": 0.28,
},
tolerance=0.99, # Near-perfect invariance required
))
suite.add_test(BehavioralTestCase(
name="inv_zip_code_invariance",
test_type=TestType.INVARIANCE,
description=(
"Changing zip code (a proxy for race/ethnicity) with all "
"financial factors equal must not change the credit "
"decision by more than 0.02 in probability."
),
input_original={
"income": 60000, "credit_score": 680,
"zip_code": "10001", # Manhattan
},
input_perturbed={
"income": 60000, "credit_score": 680,
"zip_code": "48205", # Detroit
},
tolerance=0.98,
))
# --- Directional Expectation Tests ---
suite.add_test(BehavioralTestCase(
name="dir_higher_income_lower_risk",
test_type=TestType.DIRECTIONAL,
description=(
"Higher income (with all else equal) should decrease "
"predicted default probability."
),
input_original={
"income": 50000, "credit_score": 700,
"employment_years": 3,
},
input_perturbed={
"income": 100000, "credit_score": 700,
"employment_years": 3,
},
expected_direction="decrease",
))
suite.add_test(BehavioralTestCase(
name="dir_higher_credit_score_lower_risk",
test_type=TestType.DIRECTIONAL,
description=(
"Higher credit bureau score (with all else equal) should "
"decrease predicted default probability."
),
input_original={
"income": 70000, "credit_score": 620,
"employment_years": 4,
},
input_perturbed={
"income": 70000, "credit_score": 780,
"employment_years": 4,
},
expected_direction="decrease",
))
suite.add_test(BehavioralTestCase(
name="dir_higher_dti_higher_risk",
test_type=TestType.DIRECTIONAL,
description=(
"Higher debt-to-income ratio (with all else equal) should "
"increase predicted default probability."
),
input_original={
"income": 80000, "debt_to_income": 0.15,
"credit_score": 710,
},
input_perturbed={
"income": 80000, "debt_to_income": 0.45,
"credit_score": 710,
},
expected_direction="increase",
))
return suite
The credit scoring suite differs from the StreamRec suite in three ways. First, invariance tolerances are much higher (0.98-0.99 vs. 0.70-0.95) because regulatory requirements demand near-perfect invariance to protected attributes. Second, directional tests encode economic relationships that regulators expect the model to respect — a model that assigns higher risk to higher income is not just wrong; it is a regulatory finding. Third, the test suite serves as documentation for model risk management: when an examiner asks "how do you verify that the model does not discriminate?", the answer is the invariance test suite, its results, and its execution history.
28.10 Model Validation Gates
A model validation gate is an automated check that stands between a trained model and production deployment. The gate evaluates the candidate model against multiple criteria and produces a binary decision: promote (the model may proceed to the next deployment stage) or block (the model is rejected).
The Champion-Challenger Pattern
In production, there is always a champion — the model currently serving traffic. A newly trained model is a challenger. The validation gate compares the challenger against the champion on held-out data, behavioral tests, and operational criteria before permitting the challenger to serve any traffic.
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class GateDecision(Enum):
"""Outcome of a model validation gate."""
PROMOTE = "promote"
BLOCK = "block"
MANUAL_REVIEW = "manual_review"
@dataclass
class MetricComparison:
"""Comparison of a single metric between champion and challenger.
Attributes:
metric_name: Name of the metric.
champion_value: Champion model's metric value.
challenger_value: Challenger model's metric value.
minimum_absolute: Absolute floor the challenger must exceed.
maximum_regression: Maximum allowed regression from champion.
passed: Whether this comparison passed.
reason: Explanation of the result.
"""
metric_name: str
champion_value: float
challenger_value: float
minimum_absolute: float
maximum_regression: float
passed: bool = False
reason: str = ""
def __post_init__(self) -> None:
# Check absolute minimum
if self.challenger_value < self.minimum_absolute:
self.passed = False
self.reason = (
f"{self.metric_name}: challenger ({self.challenger_value:.4f}) "
f"below absolute minimum ({self.minimum_absolute:.4f})"
)
return
# Check regression from champion
regression = self.champion_value - self.challenger_value
if regression > self.maximum_regression:
self.passed = False
self.reason = (
f"{self.metric_name}: regression of {regression:.4f} "
f"exceeds maximum allowed ({self.maximum_regression:.4f})"
)
return
self.passed = True
improvement = self.challenger_value - self.champion_value
self.reason = (
f"{self.metric_name}: challenger ({self.challenger_value:.4f}) "
f"vs champion ({self.champion_value:.4f}), "
f"{'improvement' if improvement >= 0 else 'regression'} "
f"of {abs(improvement):.4f}"
)
@dataclass
class ValidationGateConfig:
"""Configuration for a model validation gate.
Attributes:
gate_name: Identifier for this validation gate.
metrics: Dict mapping metric name to (absolute_min, max_regression).
behavioral_test_suite: Name of the behavioral test suite to run.
require_all_behavioral_tests: If True, all behavioral tests must pass.
min_behavioral_pass_rate: Minimum fraction of behavioral tests passing.
max_latency_p99_ms: Maximum p99 inference latency in milliseconds.
max_model_size_mb: Maximum model artifact size in megabytes.
require_feature_parity: Challenger must use same feature set.
sliced_metrics: Dict of (slice_name -> (metric, min_value)).
"""
gate_name: str
metrics: Dict[str, Tuple[float, float]] = field(default_factory=dict)
behavioral_test_suite: str = ""
require_all_behavioral_tests: bool = True
min_behavioral_pass_rate: float = 1.0
max_latency_p99_ms: float = 100.0
max_model_size_mb: float = 500.0
require_feature_parity: bool = True
sliced_metrics: Dict[str, Tuple[str, float]] = field(
default_factory=dict
)
@dataclass
class GateResult:
"""Full result of a model validation gate evaluation.
Attributes:
gate_name: Name of the validation gate.
decision: PROMOTE, BLOCK, or MANUAL_REVIEW.
metric_comparisons: List of metric comparison results.
behavioral_test_passed: Whether behavioral tests passed.
behavioral_pass_rate: Fraction of behavioral tests that passed.
latency_check_passed: Whether inference latency is acceptable.
size_check_passed: Whether model size is acceptable.
sliced_metric_results: Results of per-slice metric checks.
blocking_reasons: List of reasons for BLOCK decision.
"""
gate_name: str
decision: GateDecision
metric_comparisons: List[MetricComparison]
behavioral_test_passed: bool
behavioral_pass_rate: float
latency_check_passed: bool
size_check_passed: bool
sliced_metric_results: Dict[str, bool] = field(default_factory=dict)
blocking_reasons: List[str] = field(default_factory=list)
class ModelValidationGate:
"""Automated model validation gate for production deployment.
Implements the champion-challenger pattern: compares a newly
trained challenger model against the current production champion
on metrics, behavioral tests, latency, size, and per-slice
performance before permitting deployment.
Attributes:
config: Gate configuration.
"""
def __init__(self, config: ValidationGateConfig) -> None:
self.config = config
def evaluate(
self,
champion_metrics: Dict[str, float],
challenger_metrics: Dict[str, float],
behavioral_results: Optional[Dict[str, Any]] = None,
challenger_latency_p99_ms: float = 0.0,
challenger_size_mb: float = 0.0,
sliced_challenger_metrics: Optional[Dict[str, float]] = None,
) -> GateResult:
"""Evaluate a challenger model against the champion.
Args:
champion_metrics: Dict of metric_name -> value for champion.
challenger_metrics: Dict of metric_name -> value for challenger.
behavioral_results: Output of BehavioralTestSuite.summary().
challenger_latency_p99_ms: Challenger's p99 inference latency.
challenger_size_mb: Challenger's artifact size in MB.
sliced_challenger_metrics: Per-slice metric values.
Returns:
GateResult with decision and detailed comparison results.
"""
blocking_reasons: List[str] = []
metric_comparisons: List[MetricComparison] = []
# --- Metric comparisons ---
for metric_name, (abs_min, max_reg) in self.config.metrics.items():
champion_val = champion_metrics.get(metric_name, 0.0)
challenger_val = challenger_metrics.get(metric_name, 0.0)
comparison = MetricComparison(
metric_name=metric_name,
champion_value=champion_val,
challenger_value=challenger_val,
minimum_absolute=abs_min,
maximum_regression=max_reg,
)
metric_comparisons.append(comparison)
if not comparison.passed:
blocking_reasons.append(comparison.reason)
# --- Behavioral tests ---
behavioral_passed = True
behavioral_pass_rate = 1.0
if behavioral_results:
behavioral_pass_rate = behavioral_results.get(
"pass_rate", 0.0
)
if self.config.require_all_behavioral_tests:
behavioral_passed = behavioral_pass_rate == 1.0
else:
behavioral_passed = (
behavioral_pass_rate
>= self.config.min_behavioral_pass_rate
)
if not behavioral_passed:
failed_tests = behavioral_results.get("failed_tests", [])
test_names = [t["name"] for t in failed_tests[:5]]
blocking_reasons.append(
f"Behavioral tests: {behavioral_pass_rate:.1%} pass rate "
f"(failed: {', '.join(test_names)})"
)
# --- Latency check ---
latency_passed = (
challenger_latency_p99_ms <= self.config.max_latency_p99_ms
)
if not latency_passed:
blocking_reasons.append(
f"Latency p99: {challenger_latency_p99_ms:.1f}ms "
f"exceeds limit {self.config.max_latency_p99_ms:.1f}ms"
)
# --- Size check ---
size_passed = (
challenger_size_mb <= self.config.max_model_size_mb
)
if not size_passed:
blocking_reasons.append(
f"Model size: {challenger_size_mb:.1f}MB "
f"exceeds limit {self.config.max_model_size_mb:.1f}MB"
)
# --- Sliced metric checks ---
sliced_results: Dict[str, bool] = {}
if sliced_challenger_metrics and self.config.sliced_metrics:
for slice_name, (metric, min_val) in (
self.config.sliced_metrics.items()
):
actual = sliced_challenger_metrics.get(slice_name, 0.0)
passed = actual >= min_val
sliced_results[slice_name] = passed
if not passed:
blocking_reasons.append(
f"Sliced metric '{slice_name}': {actual:.4f} "
f"below minimum {min_val:.4f}"
)
# --- Final decision ---
if blocking_reasons:
decision = GateDecision.BLOCK
else:
decision = GateDecision.PROMOTE
result = GateResult(
gate_name=self.config.gate_name,
decision=decision,
metric_comparisons=metric_comparisons,
behavioral_test_passed=behavioral_passed,
behavioral_pass_rate=behavioral_pass_rate,
latency_check_passed=latency_passed,
size_check_passed=size_passed,
sliced_metric_results=sliced_results,
blocking_reasons=blocking_reasons,
)
if decision == GateDecision.PROMOTE:
logger.info(
"Gate '%s': PROMOTE (all checks passed)",
self.config.gate_name,
)
else:
logger.warning(
"Gate '%s': BLOCK (%d reasons: %s)",
self.config.gate_name,
len(blocking_reasons),
"; ".join(blocking_reasons),
)
return result
Configuring the StreamRec Validation Gate
streamrec_gate_config = ValidationGateConfig(
gate_name="streamrec_recommendation_gate_v2",
metrics={
# (absolute_minimum, max_regression_from_champion)
"recall_at_20": (0.15, 0.02),
"ndcg_at_20": (0.12, 0.015),
"hit_rate_at_10": (0.30, 0.03),
},
behavioral_test_suite="streamrec_recommendation_v2",
require_all_behavioral_tests=True,
max_latency_p99_ms=45.0, # Ranking model's portion of 200ms budget
max_model_size_mb=250.0,
sliced_metrics={
"recall_new_users": ("recall_at_20", 0.05),
"recall_ios": ("recall_at_20", 0.12),
"recall_android": ("recall_at_20", 0.12),
"recall_web": ("recall_at_20", 0.10),
},
)
The gate configuration encodes two types of constraints: absolute floors (the model must achieve Recall@20 $\geq 0.15$ regardless of the champion) and regression limits (the model must not regress more than 0.02 from the champion). This dual-threshold design prevents both catastrophic failure and gradual degradation. A model that barely beats the popularity baseline is blocked even if it is slightly better than a poorly-performing champion. A model that regresses 0.025 from a strong champion is also blocked, even though it exceeds the absolute floor.
28.11 Shadow Evaluation and Staged Validation
The validation gate (Section 28.10) evaluates the challenger on held-out data — a necessary but insufficient check. Held-out data is retrospective: it tells you how the model would have performed on past traffic. Shadow evaluation tells you how the model actually performs on live traffic without affecting users.
In shadow mode, both the champion and the challenger receive every production request. The champion's response is returned to the user; the challenger's response is logged but discarded. After a shadow period (typically 1-7 days), the team compares the two models' responses on every request.
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Any
import numpy as np
@dataclass
class ShadowEvaluationResult:
"""Result of a shadow evaluation comparing champion and challenger.
Attributes:
n_requests: Total number of requests evaluated.
champion_metrics: Aggregated metrics for the champion.
challenger_metrics: Aggregated metrics for the challenger.
agreement_rate: Fraction of requests where both models return
the same top-1 recommendation.
per_segment_comparison: Per-segment metric comparison.
latency_comparison: Latency distribution comparison.
"""
n_requests: int
champion_metrics: Dict[str, float]
challenger_metrics: Dict[str, float]
agreement_rate: float
per_segment_comparison: Dict[str, Dict[str, float]] = field(
default_factory=dict
)
latency_comparison: Dict[str, Dict[str, float]] = field(
default_factory=dict
)
def analyze_shadow_results(
champion_responses: List[Dict[str, Any]],
challenger_responses: List[Dict[str, Any]],
ground_truth: List[Dict[str, Any]],
k: int = 20,
) -> ShadowEvaluationResult:
"""Analyze shadow evaluation results.
Compares champion and challenger responses on the same production
requests against ground truth (actual user interactions observed
after the recommendation was served).
Args:
champion_responses: List of champion model responses.
challenger_responses: List of challenger model responses.
ground_truth: Actual user interactions for each request.
k: Number of recommendations to evaluate.
Returns:
ShadowEvaluationResult with comprehensive comparison.
"""
n = len(champion_responses)
assert len(challenger_responses) == n == len(ground_truth)
champion_hits = 0
challenger_hits = 0
agreements = 0
champion_ndcg_values = []
challenger_ndcg_values = []
for i in range(n):
champ_items = [
r["item_id"] for r in champion_responses[i]["recommendations"][:k]
]
chall_items = [
r["item_id"]
for r in challenger_responses[i]["recommendations"][:k]
]
actual_items = set(ground_truth[i].get("interacted_items", []))
# Hit rate
if set(champ_items) & actual_items:
champion_hits += 1
if set(chall_items) & actual_items:
challenger_hits += 1
# Agreement: same top-1
if champ_items and chall_items and champ_items[0] == chall_items[0]:
agreements += 1
# NDCG
champion_ndcg_values.append(
_compute_ndcg(champ_items, actual_items, k)
)
challenger_ndcg_values.append(
_compute_ndcg(chall_items, actual_items, k)
)
return ShadowEvaluationResult(
n_requests=n,
champion_metrics={
"hit_rate": champion_hits / n,
"ndcg": float(np.mean(champion_ndcg_values)),
},
challenger_metrics={
"hit_rate": challenger_hits / n,
"ndcg": float(np.mean(challenger_ndcg_values)),
},
agreement_rate=agreements / n,
)
def _compute_ndcg(
recommended: List[str], relevant: set, k: int
) -> float:
"""Compute NDCG@k for a single recommendation list.
Args:
recommended: Ordered list of recommended item IDs.
relevant: Set of relevant item IDs.
k: Cutoff rank.
Returns:
NDCG@k value between 0.0 and 1.0.
"""
dcg = 0.0
for i, item in enumerate(recommended[:k]):
if item in relevant:
dcg += 1.0 / np.log2(i + 2) # i+2 because rank starts at 1
# Ideal DCG: all relevant items at the top
n_relevant = min(len(relevant), k)
idcg = sum(1.0 / np.log2(i + 2) for i in range(n_relevant))
return dcg / idcg if idcg > 0 else 0.0
The Staged Validation Pipeline
The complete validation pipeline for StreamRec proceeds through four stages:
| Stage | What Is Checked | Duration | Blocking? |
|---|---|---|---|
| 1. Offline evaluation | Held-out metrics, sliced metrics | Minutes | Yes |
| 2. Behavioral tests | MFT, INV, DIR test suites | Minutes | Yes |
| 3. Shadow evaluation | Live traffic comparison | 3-7 days | Yes |
| 4. Canary deployment | 5-10% traffic, real user impact | 3 days | Yes |
Each stage is a gate. A model that passes stage 1 but fails stage 2 (a behavioral test reveals name sensitivity) is blocked before it consumes shadow evaluation resources. A model that passes stages 1-3 but shows degraded engagement during canary is rolled back automatically. This staged approach minimizes both the risk of deploying a bad model and the cost of evaluating a good one.
28.12 Comprehensive ML Testing Strategy
We now synthesize data validation, data contracts, behavioral testing, and model validation gates into a unified testing strategy. The following table maps testing types to their purpose, tooling, and position in the ML lifecycle:
The ML Test Taxonomy
| Test Type | What It Validates | When It Runs | Tool |
|---|---|---|---|
| Schema test | Column names, types, constraints | Data ingestion | Great Expectations, Pandera |
| Completeness test | Null rates, row counts, freshness | Data ingestion | Great Expectations |
| Statistical test | Distribution shift (PSI), outliers | Data ingestion, pre-training | Custom GE expectations |
| Contract test | Producer-consumer agreement | Pipeline stage boundary | PipelineContractEnforcer |
| Unit test | Feature engineering functions | Code commit (CI) | pytest |
| Smoke test | Pipeline runs end-to-end | Code commit (CI) | pytest, pipeline runner |
| Regression test | Performance vs. known baseline | Post-training | pytest, model registry |
| Behavioral test (MFT) | Per-segment minimum performance | Post-training | BehavioralTestSuite |
| Behavioral test (INV) | Invariance to irrelevant perturbations | Post-training | BehavioralTestSuite |
| Behavioral test (DIR) | Directional response to perturbations | Post-training | BehavioralTestSuite |
| Model validation gate | Champion vs. challenger comparison | Pre-deployment | ModelValidationGate |
| Shadow evaluation | Live traffic comparison | Pre-deployment | Shadow pipeline |
| Integration test | Full serving path correctness | Pre-deployment | End-to-end test harness |
| Drift detection | Feature/prediction distribution shift | Continuous (post-deployment) | Monitoring pipeline |
The ML Test Score
Breck et al. (2017) at Google proposed the ML Test Score — a rubric for evaluating the maturity of an ML system's testing infrastructure. The rubric covers four categories:
- Tests for features and data (0-5 points): Schema validation, feature importance correlation, data pipeline unit tests, feature coverage, data monitoring
- Tests for model development (0-5 points): Training reproducibility, model quality on held-out data, model quality on important subsets, model staleness, faster-to-test proxy metrics
- Tests for ML infrastructure (0-5 points): Training infrastructure tests, serving infrastructure tests, ML API integration tests, model reproducibility from stored artifacts, data pipeline latency tests
- Monitoring tests for ML (0-5 points): Model quality tracking, feature distribution monitoring, model age tracking, inference latency monitoring, prediction bias tracking
Each category awards 0.5 points per test implemented. A score below 5 indicates critical gaps; 5-10 indicates a functional but immature system; above 10 indicates a mature, production-ready system; 20 is aspirational.
from dataclasses import dataclass, field
from typing import Dict, List
@dataclass
class MLTestScoreCategory:
"""One category of the ML Test Score rubric.
Attributes:
name: Category name.
tests: Dict mapping test name to (implemented: bool, weight: float).
"""
name: str
tests: Dict[str, bool] = field(default_factory=dict)
weight_per_test: float = 0.5
@property
def score(self) -> float:
"""Total score for this category."""
return sum(
self.weight_per_test for implemented in self.tests.values()
if implemented
)
@property
def max_score(self) -> float:
"""Maximum possible score for this category."""
return len(self.tests) * self.weight_per_test
@dataclass
class MLTestScore:
"""ML Test Score rubric (after Breck et al., 2017).
Evaluates the maturity of an ML system's testing infrastructure
across four categories.
Attributes:
system_name: Name of the ML system being evaluated.
categories: List of scoring categories.
"""
system_name: str
categories: List[MLTestScoreCategory] = field(default_factory=list)
@property
def total_score(self) -> float:
"""Total ML Test Score across all categories."""
return sum(cat.score for cat in self.categories)
@property
def max_score(self) -> float:
"""Maximum possible score."""
return sum(cat.max_score for cat in self.categories)
def maturity_level(self) -> str:
"""Classify system maturity based on total score."""
score = self.total_score
if score < 5:
return "Critical gaps — high risk of production failures"
elif score < 10:
return "Functional but immature — significant blind spots"
elif score < 15:
return "Mature — most risks are covered"
else:
return "Advanced — comprehensive testing infrastructure"
def report(self) -> str:
"""Generate a human-readable test score report."""
lines = [
f"ML Test Score Report: {self.system_name}",
f"{'=' * 50}",
f"Total Score: {self.total_score:.1f} / {self.max_score:.1f}",
f"Maturity: {self.maturity_level()}",
"",
]
for cat in self.categories:
lines.append(
f"{cat.name}: {cat.score:.1f} / {cat.max_score:.1f}"
)
for test_name, implemented in cat.tests.items():
status = "[x]" if implemented else "[ ]"
lines.append(f" {status} {test_name}")
lines.append("")
return "\n".join(lines)
def evaluate_streamrec_test_score() -> MLTestScore:
"""Evaluate the StreamRec system's ML Test Score.
Assumes M12 progressive project milestones have been completed.
Returns:
MLTestScore for the StreamRec recommendation system.
"""
score = MLTestScore(system_name="StreamRec Recommendation System v2")
# Category 1: Features and Data
score.categories.append(MLTestScoreCategory(
name="Tests for Features and Data",
tests={
"Schema validation (GE suite)": True,
"Feature importance correlation": True,
"Data pipeline unit tests": True,
"Feature coverage monitoring": True,
"Data freshness monitoring": True,
"Statistical distribution checks (PSI)": True,
"Data contract enforcement": True,
"Cross-feature consistency checks": True,
"Training-serving skew detection": True,
"Upstream data source health checks": True,
},
))
# Category 2: Model Development
score.categories.append(MLTestScoreCategory(
name="Tests for Model Development",
tests={
"Training reproducibility": True,
"Model quality on held-out data": True,
"Model quality on important slices": True,
"Model staleness detection": True,
"Proxy metric correlation": True,
"Behavioral tests (MFT)": True,
"Behavioral tests (INV)": True,
"Behavioral tests (DIR)": True,
"Hyperparameter sensitivity": False,
"Ablation studies": False,
},
))
# Category 3: ML Infrastructure
score.categories.append(MLTestScoreCategory(
name="Tests for ML Infrastructure",
tests={
"Training infra tests": True,
"Serving infra tests": True,
"API integration tests": True,
"Model artifact reproducibility": True,
"Pipeline latency tests": True,
"Feature store consistency tests": True,
"Rollback procedure tests": True,
"Load/stress tests": False,
},
))
# Category 4: Monitoring
score.categories.append(MLTestScoreCategory(
name="Monitoring Tests for ML",
tests={
"Model quality tracking": True,
"Feature distribution monitoring": True,
"Model age tracking": True,
"Inference latency monitoring": True,
"Prediction distribution monitoring": True,
"Engagement metric tracking": True,
"Fairness metric monitoring": False,
"Alerting and escalation": True,
},
))
return score
Running this evaluation for StreamRec after completing M12 yields a score of approximately 14.0 out of 18.0 — a "Mature" rating. The three gaps (hyperparameter sensitivity, ablation studies, load/stress tests, fairness monitoring) identify the next investments. This is a strategic tool: rather than guessing where to invest engineering effort, the ML Test Score identifies the specific gaps with the highest risk.
28.13 Progressive Project M12: StreamRec Testing Infrastructure
Milestone M12 builds on M9 (system architecture, Chapter 24), M10 (feature store, Chapter 25), and M11 (pipeline orchestration, Chapter 27). By the end of this milestone, the StreamRec system has automated data validation, behavioral testing, and model validation gates.
Task 1: Great Expectations Suite for User Events
Implement the full expectation suite from Section 28.2 against the StreamRec click stream data. Calibrate mostly thresholds using 30 days of historical data. Configure the checkpoint to run as the first step of the Dagster training pipeline.
Acceptance criteria: - Suite contains $\geq 15$ expectations covering schema, completeness, value ranges, volume, and freshness - Checkpoint runs successfully on 30 days of historical data with $\leq 2$ false alarms - Pipeline halts if validation fails, with Slack notification
Task 2: Behavioral Test Suite
Implement the behavioral test suite from Section 28.8 with at least: - 6 minimum functionality tests (overall, per-platform, new users, NDCG floor) - 3 invariance tests (name, timestamp jitter, platform) - 3 directional tests (genre affinity, recency, engagement signal)
Run the suite against the current production model and verify all tests pass.
Acceptance criteria: - Suite passes on current production model - Each test has a documented rationale linking to a product requirement or known failure mode - Suite execution completes in $< 10$ minutes
Task 3: Model Validation Gate
Implement the validation gate from Section 28.10. Configure it with the StreamRec gate parameters. Integrate it into the training pipeline so that a newly trained model must pass the gate before being registered in MLflow as a candidate for shadow evaluation.
Acceptance criteria: - Gate blocks a model with Recall@20 below the absolute minimum - Gate blocks a model that regresses more than 0.02 from the champion - Gate blocks a model that fails any behavioral test - Gate blocks a model with p99 latency exceeding 45ms - Gate logs all decisions with full comparison details
Track A (Complete): Implement Tasks 1-3 with the provided configurations.
Track B (Extended): Add Pandera schemas to the feature engineering functions. Implement the PSI-based custom expectation (Section 28.4) and run it on 3 features with known historical drift.
Track C (Production): Add contract tests between all pipeline stages. Implement shadow evaluation analysis (Section 28.11). Compute the ML Test Score for your system and identify the top 3 gaps.
28.14 Summary
Testing ML systems requires testing at four layers: data, features, model behavior, and model validation. At the data layer, Great Expectations provides batch validation with expectation suites, checkpoints, and auto-generated documentation; Pandera provides inline validation at function boundaries. At the feature layer, data contracts formalize producer-consumer agreements and contract tests enforce them at pipeline stage boundaries. At the model behavior layer, the CheckList framework's three test types — minimum functionality, invariance, and directional expectation — check that the model has learned the right things and has not learned the wrong things. At the model validation layer, automated gates compare challengers against champions on metrics, behavioral tests, latency, and sliced performance before permitting deployment.
The ML Test Score (Breck et al., 2017) provides a systematic rubric for evaluating the maturity of this infrastructure and identifying the highest-priority gaps. For StreamRec, M12 builds the testing infrastructure that stands between a trained model and production traffic — the safety net that catches the problems that accuracy metrics miss.
The next chapter (Chapter 29) takes the validated model and deploys it: CI/CD for ML, canary deployments, shadow mode, progressive rollout, and automatic rollback. The testing infrastructure built here is the foundation that makes safe, automated deployment possible.
References
Breck, Eric, Shanqing Cai, Eric Nielsen, Michael Salib, and D. Sculley. "The ML Test Score: A Rubric for ML Production Readiness and Technical Debt Reduction." In Proceedings of IEEE Big Data, 2017.
Feathers, Michael. Working Effectively with Legacy Code. Prentice Hall, 2004.
Ribeiro, Marco Tulio, Tongshuang Wu, Carlos Guestrin, and Sameer Singh. "Beyond Accuracy: Behavioral Testing of NLP Models with CheckList." In Proceedings of the 58th Annual Meeting of the Association for Computational Linguistics (ACL), 2020.
Sculley, D., Gary Holt, Daniel Golovin, Eugene Davydov, Todd Phillips, Dietmar Ebner, Vinay Chaudhary, Michael Young, Jean-Francois Crespo, and Dan Dennison. "Hidden Technical Debt in Machine Learning Systems." In Advances in Neural Information Processing Systems (NeurIPS), 2015.
Great Expectations. "Great Expectations Documentation." https://docs.greatexpectations.io/.
Pandera. "Pandera Documentation." https://pandera.readthedocs.io/.