> "Data is the hardest part of ML and the most important piece to get right... Broken data is the most common cause of problems in production ML systems."
In This Chapter
- Learning Objectives
- 25.1 The Plumbing Nobody Teaches
- 25.2 The Feature Store: Why It Exists
- 25.3 Feature Definitions and Feature Views
- 25.4 Point-in-Time Joins: The Feature That Prevents Data Leakage
- 25.5 Online Serving: The Feature Lookup Path
- 25.6 Data Warehouses, Data Lakes, and Lakehouses
- 25.7 Time-Travel and Reproducibility
- 25.8 Columnar Storage Formats: Parquet, ORC, and the I/O Cost of Feature Engineering
- 25.9 Feature Engineering at Scale: SQL Patterns
- 25.10 Data Contracts: The Agreement Nobody Writes Down
- 25.11 Schema Evolution: Changing Without Breaking
- 25.12 Data Lineage and the Data Catalog
- 25.13 Data Versioning with DVC
- 25.14 ETL vs. ELT and the Modern Data Stack
- 25.15 Data Mesh: Decentralized Data Ownership
- 25.16 The StreamRec Feature Store: Complete Design (M10)
- 25.17 Chapter Summary
Chapter 25: Data Infrastructure — Feature Stores, Data Warehouses, Lakehouses, and the Plumbing Nobody Teaches
"Data is the hardest part of ML and the most important piece to get right... Broken data is the most common cause of problems in production ML systems." — Andrej Karpathy, "Software 2.0" (Medium, 2017); extended in internal Tesla AI Day talks
Learning Objectives
By the end of this chapter, you will be able to:
- Design feature store architectures that ensure online-offline consistency
- Compare data warehouse, data lake, and lakehouse architectures and select the appropriate one for a given ML workload
- Implement feature engineering pipelines with proper versioning and lineage tracking
- Design data contracts between producers and consumers to prevent silent data quality issues
- Apply schema evolution strategies that do not break downstream ML pipelines
25.1 The Plumbing Nobody Teaches
Chapter 24 gave us the architectural blueprint: the multi-stage recommendation funnel, the feature store that feeds it, the training infrastructure that improves it, the monitoring system that watches it. But the blueprint left a critical question unanswered: where do the features come from, and how do you guarantee they are correct?
This is the plumbing question. It is the question that no graduate course covers, no Kaggle competition tests, and no research paper treats as interesting. It is also the question whose answer determines whether your production ML system works or silently degrades.
Consider a concrete scenario. The StreamRec ranking model uses a feature called user_7d_completion_rate — the fraction of content items the user completed in the last seven days. During training, this feature is computed from a historical event log using a SQL window function. During serving, it is retrieved from a Redis cache that a Flink streaming job updates in near-real-time. Two different code paths, two different systems, one feature name.
Now suppose the SQL window function uses a half-open interval [today - 7, today) while the Flink job uses a closed interval [now - 7 days, now]. The difference is subtle — at most one day of events — but it means the model sees systematically different feature values in training versus serving. The model was optimized for one distribution; it receives another. There is no error, no exception, no log message. The model simply makes slightly worse predictions for every user, every request, every day.
This is training-serving skew, and it is the single most common cause of production ML quality degradation (Sculley et al., 2015; Polyzotis et al., 2017). Feature stores exist to solve this problem. Data warehouses, data lakes, and lakehouses exist to store the raw and transformed data that feeds those feature stores. Data contracts exist to prevent the upstream data changes that break them. Schema evolution strategies exist to allow the inevitable changes without cascading failures.
Production Reality: In a 2022 survey of ML engineers at 50+ companies, Tecton (a feature store company) found that 60% of production ML incidents traced back to data quality or feature computation issues — not model architecture, not training bugs, not serving infrastructure. The plumbing is where systems fail.
This chapter covers the plumbing. We proceed in five stages:
- Feature stores (Sections 25.2–25.5): the architecture that ensures online-offline consistency, the point-in-time join that prevents data leakage, and the Feast implementation for StreamRec.
- Storage architectures (Sections 25.6–25.8): warehouses, lakes, lakehouses, and the trade-offs that determine which to use.
- Feature engineering at scale (Section 25.9): SQL patterns for complex features, with versioning and lineage.
- Data contracts (Section 25.10): the agreements between producers and consumers that prevent silent breakage.
- Schema evolution (Section 25.11): how to change data structures without breaking ML pipelines.
By the end, you will have designed and implemented the feature store for StreamRec — the M10 progressive project milestone — and you will understand the data infrastructure decisions that underpin every production ML system.
25.2 The Feature Store: Why It Exists
A feature store is a centralized system for defining, computing, storing, and serving features for machine learning. The concept emerged from engineering teams at Uber (Michelangelo, 2017), Google (Feast's intellectual predecessor), and Airbnb (Zipline, 2018), all of whom discovered the same problem independently: when training and serving compute features using different code paths, the features diverge, and the model degrades.
The Online-Offline Consistency Problem
To understand why feature stores are necessary, consider the two contexts in which an ML system uses features:
Offline (training). You are constructing a training dataset. For each historical example (user $u$ interacted with item $i$ at time $t$), you need the feature values that were available at time $t$. You cannot use features computed from data after $t$ — that would be data leakage. You compute features from a data warehouse using SQL or Spark, producing a static table:
$$\mathcal{D}_{\text{train}} = \{(\mathbf{x}_u^{(t)}, \mathbf{x}_i^{(t)}, y_{u,i}^{(t)})\}_{(u, i, t) \in \mathcal{E}}$$
where $\mathbf{x}_u^{(t)}$ is the user feature vector at time $t$, $\mathbf{x}_i^{(t)}$ is the item feature vector at time $t$, and $y_{u,i}^{(t)}$ is the label (e.g., completion).
Online (serving). A user opens the application. You need their current feature values — right now, in under 15 milliseconds (the feature lookup budget from Chapter 24's latency allocation). You fetch pre-computed features from a low-latency key-value store (Redis, DynamoDB, Bigtable).
The feature store bridges these two worlds by enforcing a single source of truth for feature definitions:
┌─────────────────────────────────────────────────────────────┐
│ FEATURE DEFINITIONS │
│ (single source of truth) │
│ │
│ Feature: user_7d_completion_rate │
│ Entity: user_id │
│ Type: Float64 │
│ Aggregation: count(completed) / count(started) │
│ Window: 7 days, half-open [t-7d, t) │
│ Default: 0.0 (cold-start users) │
│ SLA: online p95 < 5ms, offline freshness < 24h │
└─────────────┬───────────────────────┬───────────────────────┘
│ │
┌─────────▼────────┐ ┌────────▼─────────┐
│ OFFLINE STORE │ │ ONLINE STORE │
│ (Data Lake / │ │ (Redis / │
│ Warehouse) │ │ DynamoDB) │
│ │ │ │
│ Historical │ │ Latest values │
│ feature values │ │ per entity │
│ with timestamps │ │ Low-latency │
│ │ │ lookup │
│ Used for: │ │ Used for: │
│ - Training data │ │ - Real-time │
│ - Batch scoring │ │ - Online serving │
│ - Analysis │ │ - A/B test logging │
└──────────────────-┘ └────────────────────┘
The critical guarantee: the offline store and online store are populated by the same computation pipeline, so the features a model trains on are identical (up to temporal freshness) to the features it sees in production.
What a Feature Store Is Not
A feature store is not:
- A database (it uses databases as backends but adds ML-specific semantics)
- A feature engineering library (it stores and serves computed features; the computation can happen anywhere)
- A data catalog (it catalogs features, but a data catalog covers all data assets, not just features)
- A replacement for a data warehouse (it sits downstream of the warehouse, which remains the source of raw and curated data)
The Landscape
The feature store ecosystem has matured significantly:
| System | Type | Online Store | Offline Store | Point-in-Time Join | Origin |
|---|---|---|---|---|---|
| Feast | Open source | Redis, DynamoDB, Bigtable | S3/GCS (Parquet), BigQuery, Redshift | Yes (via get_historical_features) |
Gojek → Linux Foundation |
| Tecton | Commercial | DynamoDB | S3 (Parquet) | Yes (built-in) | Uber Michelangelo alumni |
| Hopsworks | Open source + commercial | RonDB | Hive / S3 | Yes | KTH research |
| Databricks Feature Store | Commercial | Cosmos DB, DynamoDB | Delta Lake | Yes | Databricks |
| SageMaker Feature Store | Commercial | DynamoDB (auto) | S3 (auto) | Via FeatureGroup API |
AWS |
| Vertex AI Feature Store | Commercial | Bigtable (auto) | BigQuery (auto) | Yes | GCP |
For this chapter, we use Feast — it is open source, framework-agnostic, and its architecture exposes the core concepts without vendor abstraction.
25.3 Feature Definitions and Feature Views
In Feast, features are organized into feature views — logical groupings of features that share an entity, a data source, and a materialization schedule.
Entities
An entity is the key by which features are looked up. For StreamRec:
from feast import Entity, ValueType
# Entity: a unique user
user_entity = Entity(
name="user_id",
value_type=ValueType.INT64,
description="Unique identifier for a StreamRec user.",
)
# Entity: a unique content item
item_entity = Entity(
name="item_id",
value_type=ValueType.INT64,
description="Unique identifier for a StreamRec content item.",
)
Feature Views
A feature view binds a set of features to a data source and specifies the time-to-live (TTL) — how long a cached feature value remains valid before it must be refreshed.
from feast import FeatureView, Field
from feast.types import Float64, Int64, String, UnixTimestamp
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
from datetime import timedelta
# Data source: batch-computed user features in Delta Lake
user_batch_source = SparkSource(
name="user_batch_features",
table="streamrec.features.user_daily",
timestamp_field="feature_timestamp",
description="Daily-computed user engagement features from Spark pipeline.",
)
# Feature view: batch user features (daily refresh)
user_batch_fv = FeatureView(
name="user_batch_features",
entities=[user_entity],
schema=[
Field(name="user_7d_completion_rate", dtype=Float64),
Field(name="user_30d_avg_session_length_min", dtype=Float64),
Field(name="user_lifetime_item_count", dtype=Int64),
Field(name="user_top_category", dtype=String),
Field(name="user_account_age_days", dtype=Int64),
Field(name="user_subscription_tier", dtype=String),
],
source=user_batch_source,
ttl=timedelta(days=1),
online=True, # Materialize to online store
tags={
"owner": "ml-platform",
"pipeline": "user_daily_features",
"freshness_sla": "24h",
},
)
The ttl=timedelta(days=1) means that if a feature value in the online store is more than one day old, the feature store returns the default value rather than a stale value. This is a safety mechanism: stale features are worse than missing features, because stale features silently degrade model quality while missing features trigger explicit handling.
Streaming Feature Views
Batch features capture long-horizon patterns (7-day completion rate, 30-day session length). But the A/B test in Chapter 24 showed that session-level features improve engagement by 12%. These features change every few seconds and cannot wait for a daily batch job.
Feast supports stream feature views backed by a streaming processing engine (Flink, Spark Structured Streaming, or a custom consumer):
from feast import StreamFeatureView
from feast.data_source import KafkaSource
# Data source: real-time click stream from Kafka
user_stream_source = KafkaSource(
name="user_click_stream",
kafka_bootstrap_servers="kafka-broker-1:9092,kafka-broker-2:9092",
topic="streamrec.events.interactions",
timestamp_field="event_timestamp",
message_format={"proto_class": "StreamRecInteractionEvent"},
watermark_delay_threshold=timedelta(minutes=1),
batch_source=user_batch_source, # Fallback for historical data
description="Real-time user interaction events from Kafka.",
)
# Stream feature view: real-time user session features
user_stream_fv = StreamFeatureView(
name="user_stream_features",
entities=[user_entity],
schema=[
Field(name="user_session_click_count", dtype=Int64),
Field(name="user_session_duration_sec", dtype=Float64),
Field(name="user_last_10_categories", dtype=String), # JSON-encoded list
Field(name="user_session_completion_rate", dtype=Float64),
Field(name="user_minutes_since_last_interaction", dtype=Float64),
],
source=user_stream_source,
ttl=timedelta(hours=2), # Session features expire quickly
online=True,
tags={
"owner": "ml-platform",
"pipeline": "user_stream_features",
"freshness_sla": "60s",
},
)
The batch_source parameter is important: it tells Feast that when constructing historical training data, the system should use the batch source (the data warehouse) rather than attempting to replay the Kafka topic. This is the mechanism that ensures offline and online features share the same semantic definition even though they are computed by different systems.
Production Reality: The hardest part of streaming features is not the streaming infrastructure — Flink and Spark Structured Streaming are mature. The hardest part is ensuring that the streaming computation and the batch computation produce identical results for the same time window. At Uber, the Michelangelo team reported spending more engineering time on batch-stream consistency testing than on any other feature store component (Hermann and Del Balso, 2017).
25.4 Point-in-Time Joins: The Feature That Prevents Data Leakage
The most important feature of a feature store is not storage or serving — it is the point-in-time join. This is the operation that constructs training datasets with the feature values that were available at the time each training example was generated, preventing temporal data leakage.
The Problem
Suppose you are training a model to predict whether a user will complete a content item. Your training data includes an event: user 42 started item 789 at 2025-03-15 14:30:00. You need the feature user_7d_completion_rate for user 42 at that time.
A naive approach joins the event table with the feature table on user_id:
-- WRONG: naive join causes data leakage
SELECT
e.user_id,
e.item_id,
e.event_timestamp,
f.user_7d_completion_rate,
e.completed AS label
FROM events e
JOIN user_features f
ON e.user_id = f.user_id;
This join retrieves the latest feature value for user 42, which may have been computed from data that includes events after 2025-03-15 14:30:00 — including the very event you are trying to predict. The model trains on features that contain the answer, learns a spuriously strong signal, and performs brilliantly on the training set. In production, the features do not contain the answer, and performance collapses.
The Solution: Point-in-Time Correct Joins
A point-in-time join retrieves the feature value that was the most recent value before the event timestamp:
-- CORRECT: point-in-time join prevents leakage
SELECT
e.user_id,
e.item_id,
e.event_timestamp,
f.user_7d_completion_rate,
e.completed AS label
FROM events e
ASOF JOIN user_features f
ON e.user_id = f.user_id
AND f.feature_timestamp <= e.event_timestamp;
The ASOF JOIN (supported in DuckDB, ClickHouse, and some other systems) finds the most recent feature row where feature_timestamp <= event_timestamp. Not all SQL engines support ASOF JOIN natively, so the equivalent using window functions is:
-- Point-in-time join using window functions (standard SQL)
WITH ranked_features AS (
SELECT
e.user_id,
e.item_id,
e.event_timestamp,
e.completed AS label,
f.user_7d_completion_rate,
f.feature_timestamp,
ROW_NUMBER() OVER (
PARTITION BY e.user_id, e.item_id, e.event_timestamp
ORDER BY f.feature_timestamp DESC
) AS rn
FROM events e
JOIN user_features f
ON e.user_id = f.user_id
AND f.feature_timestamp <= e.event_timestamp
)
SELECT
user_id,
item_id,
event_timestamp,
user_7d_completion_rate,
label
FROM ranked_features
WHERE rn = 1;
This pattern — partition by the join keys plus the event timestamp, order by the feature timestamp descending, take row number 1 — is the standard implementation of a point-in-time join. It is expensive: the JOIN condition f.feature_timestamp <= e.event_timestamp produces a fan-out that the window function then collapses. For large tables, it requires careful indexing and partitioning.
Feast Implementation
Feast's get_historical_features handles the point-in-time join automatically:
from feast import FeatureStore
import pandas as pd
# Initialize the Feast feature store
store = FeatureStore(repo_path="./feature_repo")
# Training events: the spine of the training dataset
entity_df = pd.DataFrame({
"user_id": [42, 42, 107, 107, 256],
"item_id": [789, 1023, 456, 789, 1023],
"event_timestamp": pd.to_datetime([
"2025-03-15 14:30:00",
"2025-03-16 09:15:00",
"2025-03-15 20:00:00",
"2025-03-17 11:45:00",
"2025-03-16 16:30:00",
]),
"label": [1, 0, 1, 1, 0],
})
# Retrieve point-in-time correct features
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_batch_features:user_7d_completion_rate",
"user_batch_features:user_30d_avg_session_length_min",
"user_batch_features:user_lifetime_item_count",
"user_stream_features:user_session_click_count",
"item_batch_features:item_avg_completion_rate",
"item_batch_features:item_category",
],
).to_df()
print(f"Training dataset shape: {training_df.shape}")
print(f"Columns: {list(training_df.columns)}")
print(f"\nSample rows:")
print(training_df.head())
Training dataset shape: (5, 10)
Columns: ['user_id', 'item_id', 'event_timestamp', 'label',
'user_7d_completion_rate', 'user_30d_avg_session_length_min',
'user_lifetime_item_count', 'user_session_click_count',
'item_avg_completion_rate', 'item_category']
Sample rows:
user_id item_id event_timestamp label user_7d_completion_rate ...
0 42 789 2025-03-15 14:30:00 1 0.72 ...
1 42 1023 2025-03-16 09:15:00 0 0.73 ...
2 107 456 2025-03-15 20:00:00 1 0.45 ...
3 107 789 2025-03-17 11:45:00 1 0.48 ...
4 256 1023 2025-03-16 16:30:00 0 0.61 ...
Notice that user 42's user_7d_completion_rate is 0.72 for the March 15 event and 0.73 for the March 16 event — the feature value changed because the March 15 interaction itself became part of the trailing 7-day window. This is correct: the model should learn from the feature value that was actually available at prediction time.
The Mathematical Guarantee
Let $f_k(u, t)$ denote the value of feature $k$ for entity $u$ at time $t$, and let $\hat{f}_k(u, t)$ denote the feature value retrieved by the point-in-time join. The guarantee is:
$$\hat{f}_k(u, t) = f_k(u, t^{*}) \quad \text{where} \quad t^{*} = \max\{t' : t' \leq t, \; f_k(u, t') \text{ is materialized}\}$$
That is, the retrieved value is the most recent materialized feature value that does not use any information after time $t$. The residual discrepancy $f_k(u, t) - \hat{f}_k(u, t^{*})$ is bounded by the feature's staleness — the time between $t^{*}$ and $t$ — which is in turn bounded by the materialization frequency.
Mathematical Foundation: The point-in-time join is a form of filtration-respecting feature construction in the language of stochastic processes. The feature values at time $t$ depend only on the natural filtration $\mathcal{F}_t = \sigma(\{X_s : s \leq t\})$ — the sigma-algebra generated by all observations up to time $t$. This is the same condition that makes a feature a valid predictor in a time series context and that prevents "look-ahead bias" in backtesting financial strategies.
25.5 Online Serving: The Feature Lookup Path
When a user opens the StreamRec application, the serving system needs features within the 15ms budget allocated in Chapter 24. The online store is the mechanism.
Materialization
Materialization is the process of copying feature values from the offline store to the online store. For batch features, this happens on a schedule (e.g., after the daily Spark pipeline completes). For stream features, this happens continuously as events arrive.
from datetime import datetime, timedelta
# Materialize batch features: copy latest values to online store
store.materialize(
start_date=datetime(2025, 3, 14),
end_date=datetime(2025, 3, 16),
feature_views=["user_batch_features", "item_batch_features"],
)
# This writes the latest feature values for all entities
# to the online store (Redis) for low-latency serving.
# For incremental updates (production pattern):
store.materialize_incremental(
end_date=datetime(2025, 3, 16),
feature_views=["user_batch_features"],
)
# Only processes new rows since the last materialization.
Online Feature Retrieval
At serving time, the ranking model needs features for a specific user and a set of candidate items:
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
import time
@dataclass
class FeatureVector:
"""A feature vector retrieved from the online store.
Attributes:
entity_key: The entity identifier (e.g., user_id or item_id).
features: Dictionary mapping feature name to value.
retrieved_at: Unix timestamp of retrieval.
staleness_ms: Time since feature was last updated, in milliseconds.
used_default: List of feature names where default values were used.
"""
entity_key: Dict[str, Any]
features: Dict[str, Any]
retrieved_at: float
staleness_ms: float = 0.0
used_default: List[str] = field(default_factory=list)
@property
def has_defaults(self) -> bool:
"""Check if any features fell back to default values."""
return len(self.used_default) > 0
@dataclass
class OnlineFeatureService:
"""Service for retrieving features from the online store.
Wraps Feast's online retrieval with latency tracking, default
handling, and staleness monitoring.
Attributes:
store: The Feast FeatureStore instance.
feature_refs: List of feature references to retrieve.
latency_budget_ms: Maximum acceptable retrieval latency.
default_values: Default values for each feature when missing.
"""
store: Any # feast.FeatureStore
feature_refs: List[str] = field(default_factory=list)
latency_budget_ms: float = 15.0
default_values: Dict[str, Any] = field(default_factory=dict)
def get_user_features(self, user_id: int) -> FeatureVector:
"""Retrieve features for a single user.
Args:
user_id: The user identifier.
Returns:
FeatureVector with user features and metadata.
Raises:
TimeoutError: If retrieval exceeds latency budget.
"""
start = time.monotonic()
# Feast online retrieval
response = self.store.get_online_features(
features=self.feature_refs,
entity_rows=[{"user_id": user_id}],
)
result = response.to_dict()
elapsed_ms = (time.monotonic() - start) * 1000
if elapsed_ms > self.latency_budget_ms:
raise TimeoutError(
f"Feature retrieval took {elapsed_ms:.1f}ms, "
f"exceeding budget of {self.latency_budget_ms:.1f}ms."
)
# Extract features and track defaults
features = {}
used_default = []
for ref in self.feature_refs:
feature_name = ref.split(":")[-1]
value = result.get(feature_name, [None])[0]
if value is None:
value = self.default_values.get(feature_name, 0.0)
used_default.append(feature_name)
features[feature_name] = value
return FeatureVector(
entity_key={"user_id": user_id},
features=features,
retrieved_at=time.time(),
staleness_ms=elapsed_ms,
used_default=used_default,
)
def get_batch_item_features(
self, item_ids: List[int]
) -> List[FeatureVector]:
"""Retrieve features for multiple items in a single call.
Uses Feast's batched online retrieval for efficiency.
Args:
item_ids: List of item identifiers.
Returns:
List of FeatureVector instances, one per item.
"""
start = time.monotonic()
entity_rows = [{"item_id": iid} for iid in item_ids]
response = self.store.get_online_features(
features=self.feature_refs,
entity_rows=entity_rows,
)
result = response.to_dict()
elapsed_ms = (time.monotonic() - start) * 1000
vectors = []
for idx, iid in enumerate(item_ids):
features = {}
used_default = []
for ref in self.feature_refs:
feature_name = ref.split(":")[-1]
values = result.get(feature_name, [])
value = values[idx] if idx < len(values) else None
if value is None:
value = self.default_values.get(feature_name, 0.0)
used_default.append(feature_name)
features[feature_name] = value
vectors.append(FeatureVector(
entity_key={"item_id": iid},
features=features,
retrieved_at=time.time(),
staleness_ms=elapsed_ms / len(item_ids),
used_default=used_default,
))
return vectors
# Example: configure the online feature service for StreamRec
user_feature_refs = [
"user_batch_features:user_7d_completion_rate",
"user_batch_features:user_30d_avg_session_length_min",
"user_batch_features:user_lifetime_item_count",
"user_batch_features:user_top_category",
"user_stream_features:user_session_click_count",
"user_stream_features:user_session_duration_sec",
"user_stream_features:user_session_completion_rate",
]
user_defaults = {
"user_7d_completion_rate": 0.0,
"user_30d_avg_session_length_min": 0.0,
"user_lifetime_item_count": 0,
"user_top_category": "unknown",
"user_session_click_count": 0,
"user_session_duration_sec": 0.0,
"user_session_completion_rate": 0.0,
}
print("Feature references configured:", len(user_feature_refs))
print("Default values configured:", len(user_defaults))
Feature references configured: 7
Default values configured: 7
Default Value Strategy
When a feature is missing from the online store — because the user is new, the materialization is delayed, or the entity has never been seen — the feature store returns a default value. The choice of default matters:
| Strategy | When to Use | Risk |
|---|---|---|
| Zero | Numerical features where zero is a natural "no data" value | Biased predictions if zero is far from the population mean |
| Global mean | Numerical features where zero is misleading | Requires a recent estimate of the mean; still loses personalization |
| Category-level mean | When entity belongs to a known category | Requires maintaining category-level aggregates |
| Model-specific default | When the model was trained with explicit missing indicators | Safest, but requires feature store and model to coordinate |
Production Reality: At StreamRec, cold-start users (no features in the online store) represent 15% of daily active users. Using zero defaults for
user_7d_completion_ratebiases the ranking model toward popular content (the same content that performs well for users with low completion rates). Using the global mean (0.52) instead improved new-user engagement by 8% in an A/B test — a single default value change, not a model change.
25.6 Data Warehouses, Data Lakes, and Lakehouses
The feature store is one layer of the data infrastructure stack. Below it sits the storage layer that holds the raw events, transformed tables, and historical features. The three dominant architectures — warehouse, lake, and lakehouse — represent different trade-offs between structure, cost, and analytical capability.
The Data Warehouse
A data warehouse is a structured, schema-on-write storage system optimized for analytical queries. The defining properties:
- Schema-on-write: data must conform to a predefined schema before ingestion
- Columnar storage: data is stored column-by-column (e.g., Parquet internally), enabling efficient analytical queries that scan subsets of columns
- ACID transactions: reads and writes are consistent, isolated, and durable
- SQL-native: the primary interface is SQL with advanced analytical functions
Modern cloud data warehouses (BigQuery, Snowflake, Redshift) add elastic compute scaling and separation of storage and compute.
Best for ML when: your features are derived from structured tables with well-defined schemas, your feature engineering is expressible in SQL, and you need strong consistency guarantees (e.g., financial data for credit scoring).
The Data Lake
A data lake is a schema-on-read storage system that stores data in its raw format — JSON, CSV, Parquet, Avro, images, video, audio — on cheap object storage (S3, GCS, ADLS).
- Schema-on-read: no schema enforcement at ingestion; the reader imposes structure at query time
- Any format: stores structured, semi-structured, and unstructured data
- Cheap storage: object storage costs $0.02–0.03 per GB/month (10-50x cheaper than warehouse storage)
- Processing variety: supports Spark, Presto/Trino, Dask, Ray, and direct file access for ML training
Best for ML when: your data is large, diverse (text, images, event logs), and you need direct file access for training frameworks like PyTorch's DataLoader.
The data lake's weakness: without transactions, concurrent readers and writers can see inconsistent data. Without schema enforcement, "data swamp" is a real risk — the lake accumulates files of unknown provenance, format, and quality.
The Lakehouse
A data lakehouse is a hybrid architecture that adds warehouse-like features (ACID transactions, schema enforcement, time-travel, fine-grained access control) to a data lake. The three dominant implementations:
| Lakehouse Format | Creator | Key Feature | Transaction Protocol |
|---|---|---|---|
| Delta Lake | Databricks | ACID transactions on Parquet via a JSON transaction log | Delta Log (ordered commit sequence) |
| Apache Iceberg | Netflix, then Apache | Hidden partitioning, schema evolution, partition evolution | Metadata layer with snapshot isolation |
| Apache Hudi | Uber, then Apache | Upserts, incremental processing, record-level updates | Timeline-based metadata |
All three store data as Parquet files on object storage, adding a metadata layer that provides:
- ACID transactions — concurrent reads and writes are safe
- Time-travel — query data as it existed at any point in the past
- Schema evolution — add, rename, or reorder columns without rewriting data
- Partition evolution — change partitioning schemes without rewriting data (Iceberg)
from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
class StorageArchitecture(Enum):
"""Data storage architectures for ML systems."""
DATA_WAREHOUSE = "data_warehouse"
DATA_LAKE = "data_lake"
DATA_LAKEHOUSE = "data_lakehouse"
@dataclass
class StorageDecisionCriteria:
"""Decision criteria for selecting a storage architecture.
Evaluates the requirements of an ML workload against the
strengths of each storage architecture.
Attributes:
name: Name of the ML workload.
data_volume_tb: Total data volume in terabytes.
data_types: Types of data (structured, semi-structured, unstructured).
query_pattern: Primary query pattern (SQL analytics, file scan, both).
consistency_requirement: Level of consistency required.
budget_monthly_usd: Monthly budget for storage and compute.
requires_time_travel: Whether historical queries are needed.
ml_framework_direct_access: Whether ML frameworks need direct file access.
"""
name: str
data_volume_tb: float
data_types: List[str]
query_pattern: str
consistency_requirement: str
budget_monthly_usd: float
requires_time_travel: bool = False
ml_framework_direct_access: bool = False
def recommend(self) -> StorageArchitecture:
"""Recommend a storage architecture based on the criteria.
Returns:
The recommended StorageArchitecture.
"""
# Heuristic decision logic based on common patterns
has_unstructured = "unstructured" in self.data_types
needs_sql = "sql" in self.query_pattern.lower()
needs_file_access = self.ml_framework_direct_access
strict_consistency = self.consistency_requirement == "strong"
if has_unstructured and not needs_sql:
return StorageArchitecture.DATA_LAKE
elif strict_consistency and not has_unstructured and not needs_file_access:
return StorageArchitecture.DATA_WAREHOUSE
else:
return StorageArchitecture.DATA_LAKEHOUSE
def explain_recommendation(self) -> str:
"""Explain the reasoning behind the recommendation.
Returns:
Human-readable explanation string.
"""
rec = self.recommend()
reasons = []
if rec == StorageArchitecture.DATA_WAREHOUSE:
reasons.append(
"All data is structured with strong consistency needs."
)
reasons.append(
"SQL-based feature engineering is the primary workload."
)
reasons.append(
f"Data volume ({self.data_volume_tb:.1f} TB) is within "
"warehouse cost efficiency."
)
elif rec == StorageArchitecture.DATA_LAKE:
reasons.append("Unstructured data requires direct file access.")
reasons.append(
"ML frameworks (PyTorch, TensorFlow) need to read files "
"directly."
)
reasons.append(
f"Data volume ({self.data_volume_tb:.1f} TB) benefits from "
"object storage pricing."
)
else: # LAKEHOUSE
reasons.append(
"Mixed workload: SQL analytics and ML training on the same data."
)
if self.requires_time_travel:
reasons.append(
"Time-travel queries needed for point-in-time feature "
"construction."
)
reasons.append(
"ACID transactions protect against read-write conflicts "
"in concurrent pipelines."
)
return (
f"Recommendation for '{self.name}': {rec.value}\n"
+ "\n".join(f" - {r}" for r in reasons)
)
# StreamRec storage decision
streamrec_criteria = StorageDecisionCriteria(
name="StreamRec Feature Platform",
data_volume_tb=15.0,
data_types=["structured", "semi-structured"],
query_pattern="SQL analytics + direct Parquet access for training",
consistency_requirement="eventual",
budget_monthly_usd=12000,
requires_time_travel=True,
ml_framework_direct_access=True,
)
# Meridian Financial storage decision
meridian_criteria = StorageDecisionCriteria(
name="Meridian Financial Credit Features",
data_volume_tb=2.5,
data_types=["structured"],
query_pattern="SQL analytics",
consistency_requirement="strong",
budget_monthly_usd=8000,
requires_time_travel=True,
ml_framework_direct_access=False,
)
print(streamrec_criteria.explain_recommendation())
print()
print(meridian_criteria.explain_recommendation())
Recommendation for 'StreamRec Feature Platform': data_lakehouse
- Mixed workload: SQL analytics and ML training on the same data.
- Time-travel queries needed for point-in-time feature construction.
- ACID transactions protect against read-write conflicts in concurrent pipelines.
Recommendation for 'Meridian Financial Credit Features': data_warehouse
- All data is structured with strong consistency needs.
- SQL-based feature engineering is the primary workload.
- Data volume (2.5 TB) is within warehouse cost efficiency.
25.7 Time-Travel and Reproducibility
The lakehouse's time-travel capability is transformative for ML. It allows you to query data as it existed at any historical point, which enables:
- Reproducible training datasets: reconstruct the exact features that were available at training time, even months later
- Debugging production issues: if a model degraded on March 10, query the features as they were on March 10 to investigate
- Regulatory compliance: demonstrate that a credit decision was made using the features that were available at decision time (not features computed retroactively)
Delta Lake Time-Travel
-- Query the user features table as it existed on March 10
SELECT
user_id,
user_7d_completion_rate,
user_lifetime_item_count
FROM streamrec.features.user_daily
TIMESTAMP AS OF '2025-03-10 00:00:00'
WHERE user_id = 42;
-- Query by version number (each transaction creates a version)
SELECT COUNT(*) AS total_users
FROM streamrec.features.user_daily
VERSION AS OF 127;
-- View the transaction history
DESCRIBE HISTORY streamrec.features.user_daily;
Apache Iceberg Time-Travel
-- Iceberg: query by snapshot timestamp
SELECT
user_id,
user_7d_completion_rate
FROM streamrec.features.user_daily
FOR SYSTEM_TIME AS OF TIMESTAMP '2025-03-10 00:00:00+00:00';
-- Iceberg: query by snapshot ID
SELECT COUNT(*) AS total_users
FROM streamrec.features.user_daily
FOR SYSTEM_VERSION AS OF 5283940192336843948;
-- Iceberg: view snapshot history
SELECT
committed_at,
snapshot_id,
operation,
summary['added-records'] AS added_records,
summary['deleted-records'] AS deleted_records
FROM streamrec.features.user_daily.snapshots
ORDER BY committed_at DESC
LIMIT 10;
Connecting Time-Travel to Feature Stores
Time-travel enables a powerful pattern for feature store backfills: when you add a new feature to the feature store, you can compute its historical values by querying the source data at historical timestamps.
from dataclasses import dataclass
from typing import List
from datetime import datetime, timedelta
@dataclass
class FeatureBackfill:
"""Configuration for backfilling a new feature's historical values.
When a new feature is added to the feature store, historical values
must be computed for all entities at all relevant timestamps to
enable point-in-time correct training data construction.
Attributes:
feature_name: Name of the new feature.
source_table: Delta Lake / Iceberg table containing source data.
entity_key: Entity column name.
start_date: Earliest date to backfill from.
end_date: Latest date to backfill to.
computation_sql: SQL expression to compute the feature.
partition_dates: Generated list of partition dates.
"""
feature_name: str
source_table: str
entity_key: str
start_date: datetime
end_date: datetime
computation_sql: str
partition_dates: List[datetime] = None
def __post_init__(self):
"""Generate partition dates for the backfill."""
if self.partition_dates is None:
self.partition_dates = []
current = self.start_date
while current <= self.end_date:
self.partition_dates.append(current)
current += timedelta(days=1)
def generate_backfill_query(self, target_date: datetime) -> str:
"""Generate a SQL query to compute the feature at a specific date.
Uses time-travel to query source data as of the target date,
ensuring no future data leakage.
Args:
target_date: The date for which to compute feature values.
Returns:
SQL query string.
"""
date_str = target_date.strftime("%Y-%m-%d 00:00:00")
return f"""
INSERT INTO feature_store.{self.feature_name}_history
SELECT
{self.entity_key},
TIMESTAMP '{date_str}' AS feature_timestamp,
({self.computation_sql}) AS {self.feature_name}
FROM {self.source_table}
TIMESTAMP AS OF '{date_str}'
GROUP BY {self.entity_key}
"""
@property
def total_partitions(self) -> int:
"""Total number of daily partitions to backfill."""
return len(self.partition_dates)
@property
def estimated_cost_description(self) -> str:
"""Estimate the backfill scope."""
return (
f"Backfill '{self.feature_name}': {self.total_partitions} days "
f"from {self.start_date.date()} to {self.end_date.date()}"
)
# Example: backfill a new "user_category_entropy" feature
backfill = FeatureBackfill(
feature_name="user_category_entropy",
source_table="streamrec.events.interactions",
entity_key="user_id",
start_date=datetime(2025, 1, 1),
end_date=datetime(2025, 3, 15),
computation_sql="""
-SUM(
(category_count * 1.0 / total_count)
* LN(category_count * 1.0 / total_count)
)
""",
)
print(backfill.estimated_cost_description)
print(f"\nSample query for March 1:")
print(backfill.generate_backfill_query(datetime(2025, 3, 1)))
Backfill 'user_category_entropy': 74 days from 2025-01-01 to 2025-03-15
Sample query for March 1:
INSERT INTO feature_store.user_category_entropy_history
SELECT
user_id,
TIMESTAMP '2025-03-01 00:00:00' AS feature_timestamp,
(
-SUM(
(category_count * 1.0 / total_count)
* LN(category_count * 1.0 / total_count)
)
) AS user_category_entropy
FROM streamrec.events.interactions
TIMESTAMP AS OF '2025-03-01 00:00:00'
GROUP BY user_id
Common Misconception: "Time-travel is just for debugging — we don't need it in production." In fact, time-travel is the mechanism that makes feature backfills safe. Without it, backfilling a new feature requires either replaying the entire event stream from the beginning or accepting that historical feature values may include future data. Time-travel eliminates both problems by providing a consistent snapshot of the source data at any historical point.
25.8 Columnar Storage Formats: Parquet, ORC, and the I/O Cost of Feature Engineering
All three lakehouse formats (Delta, Iceberg, Hudi) store data as Parquet files on object storage. Understanding Parquet's internal structure is essential for writing efficient feature engineering queries.
Parquet Structure
A Parquet file organizes data in a columnar layout:
┌──────────────────────────────────────────┐
│ Parquet File │
├──────────────────────────────────────────┤
│ Row Group 1 (e.g., 128 MB) │
│ ┌──────────┬──────────┬──────────┐ │
│ │ Column A │ Column B │ Column C │ │
│ │ (chunk) │ (chunk) │ (chunk) │ │
│ │ [1,2,3] │[a,b,c] │[0.1,0.2] │ │
│ │ min: 1 │ min: a │ min: 0.1 │ │
│ │ max: 3 │ max: c │ max: 0.2 │ │
│ └──────────┴──────────┴──────────┘ │
│ │
│ Row Group 2 (e.g., 128 MB) │
│ ┌──────────┬──────────┬──────────┐ │
│ │ Column A │ Column B │ Column C │ │
│ │ (chunk) │ (chunk) │ (chunk) │ │
│ │ [4,5,6] │[d,e,f] │[0.3,0.4] │ │
│ │ min: 4 │ min: d │ min: 0.3 │ │
│ │ max: 6 │ max: f │ max: 0.4 │ │
│ └──────────┴──────────┴──────────┘ │
│ │
│ Footer: schema, row group metadata, │
│ column statistics, encoding info │
└──────────────────────────────────────────┘
Key properties for ML feature engineering:
-
Column pruning: if a query only needs columns A and C, Parquet reads only those column chunks, skipping B entirely. A table with 200 columns but a query touching 10 reads ~5% of the data.
-
Row group pruning: the min/max statistics in each row group's metadata enable predicate pushdown. If a query filters
WHERE user_id = 42and a row group's min/max foruser_idis [1000, 2000], the entire row group is skipped. -
Encoding and compression: Parquet applies per-column encoding (dictionary, run-length, delta, bit-packing) and per-column-chunk compression (Snappy, ZSTD, LZ4). Dictionary encoding is especially effective for categorical features.
Performance Implications for Feature Engineering
-- Efficient: only reads user_id and event_type columns
-- Predicate pushdown skips row groups without user_id = 42
SELECT
COUNT(*) FILTER (WHERE event_type = 'completion') AS completions,
COUNT(*) AS total_events
FROM streamrec.events.interactions
WHERE user_id = 42
AND event_date BETWEEN '2025-03-08' AND '2025-03-15';
-- Inefficient: SELECT * reads all columns
-- No predicate pushdown (function on column prevents it)
SELECT *
FROM streamrec.events.interactions
WHERE YEAR(event_timestamp) = 2025;
Implementation Note: Partition the events table by
event_date(daily partitions) so that date-range queries only scan relevant partitions. For Iceberg, use hidden partitioning: define the partition asdays(event_timestamp)and the engine automatically prunes partitions without requiring the user to include a partition column in queries.
Parquet vs. ORC
Both are columnar formats. ORC (Optimized Row Columnar) originated in the Hive ecosystem and uses a different encoding scheme with built-in indexes and Bloom filters. In practice:
- Parquet is the dominant format for ML workloads (PyTorch, TensorFlow, and Spark all have native Parquet readers)
- ORC has slightly better performance for complex Hive queries but narrower ecosystem support
- Lakehouse formats all support Parquet, making it the default choice
25.9 Feature Engineering at Scale: SQL Patterns
Feature engineering for ML at scale is primarily a SQL problem. The features that production models consume — aggregations, ratios, window functions, cross-entity joins — are most naturally expressed in SQL and computed by warehouse or lakehouse engines (Spark SQL, BigQuery, Snowflake, Trino).
Pattern 1: Trailing Window Aggregations
The most common feature pattern: aggregate events over a trailing time window.
-- User engagement features: trailing 7-day and 30-day windows
WITH daily_activity AS (
SELECT
user_id,
event_date,
COUNT(*) AS daily_events,
COUNT(*) FILTER (WHERE event_type = 'completion') AS daily_completions,
COUNT(DISTINCT item_id) AS daily_unique_items,
SUM(duration_sec) AS daily_duration_sec
FROM streamrec.events.interactions
WHERE event_date BETWEEN CURRENT_DATE - INTERVAL '30 days'
AND CURRENT_DATE - INTERVAL '1 day' -- Exclude today (incomplete)
GROUP BY user_id, event_date
)
SELECT
user_id,
CURRENT_DATE AS feature_timestamp,
-- 7-day features
SUM(CASE WHEN event_date >= CURRENT_DATE - INTERVAL '7 days'
THEN daily_completions ELSE 0 END)
* 1.0
/ NULLIF(SUM(CASE WHEN event_date >= CURRENT_DATE - INTERVAL '7 days'
THEN daily_events ELSE 0 END), 0)
AS user_7d_completion_rate,
COALESCE(
SUM(CASE WHEN event_date >= CURRENT_DATE - INTERVAL '7 days'
THEN daily_duration_sec ELSE 0 END)
/ NULLIF(
COUNT(CASE WHEN event_date >= CURRENT_DATE - INTERVAL '7 days'
THEN 1 END), 0)
/ 60.0, 0.0
) AS user_7d_avg_session_length_min,
-- 30-day features
SUM(daily_completions) * 1.0
/ NULLIF(SUM(daily_events), 0)
AS user_30d_completion_rate,
SUM(daily_unique_items) AS user_30d_unique_items,
-- Trend feature: 7d rate vs 30d rate (momentum signal)
COALESCE(
SUM(CASE WHEN event_date >= CURRENT_DATE - INTERVAL '7 days'
THEN daily_completions ELSE 0 END)
* 1.0
/ NULLIF(SUM(CASE WHEN event_date >= CURRENT_DATE - INTERVAL '7 days'
THEN daily_events ELSE 0 END), 0)
-
SUM(daily_completions) * 1.0
/ NULLIF(SUM(daily_events), 0),
0.0
) AS user_engagement_trend
FROM daily_activity
GROUP BY user_id;
Pattern 2: Slowly Changing Dimensions
User attributes (subscription tier, geographic region, age group) change infrequently but must be tracked over time for point-in-time correctness. This is the slowly changing dimension (SCD) pattern from data warehousing:
-- SCD Type 2: track historical values with valid_from / valid_to
CREATE TABLE streamrec.dimensions.user_profile_history (
user_id BIGINT NOT NULL,
subscription_tier VARCHAR(20) NOT NULL,
country_code VARCHAR(3) NOT NULL,
age_bucket VARCHAR(10) NOT NULL,
valid_from TIMESTAMP NOT NULL,
valid_to TIMESTAMP, -- NULL means current record
is_current BOOLEAN NOT NULL DEFAULT TRUE,
PRIMARY KEY (user_id, valid_from)
);
-- Point-in-time lookup: what was user 42's tier on March 10?
SELECT
user_id,
subscription_tier,
country_code,
age_bucket
FROM streamrec.dimensions.user_profile_history
WHERE user_id = 42
AND valid_from <= TIMESTAMP '2025-03-10 00:00:00'
AND (valid_to > TIMESTAMP '2025-03-10 00:00:00' OR valid_to IS NULL);
Pattern 3: Cross-Entity Features
Features that combine information from multiple entities — user-item interaction features, item popularity within a user's preferred categories:
-- Cross-entity feature: how popular is this item among users
-- who share the same top category as the target user?
WITH user_categories AS (
SELECT
user_id,
user_top_category
FROM streamrec.features.user_daily
WHERE feature_timestamp = (
SELECT MAX(feature_timestamp) FROM streamrec.features.user_daily
)
),
category_item_popularity AS (
SELECT
uc.user_top_category,
e.item_id,
COUNT(DISTINCT e.user_id) AS category_users_engaged,
AVG(CASE WHEN e.event_type = 'completion' THEN 1.0 ELSE 0.0 END)
AS category_completion_rate
FROM streamrec.events.interactions e
JOIN user_categories uc ON e.user_id = uc.user_id
WHERE e.event_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY uc.user_top_category, e.item_id
)
SELECT
uc.user_id,
cip.item_id,
cip.category_users_engaged
AS item_same_category_users_7d,
cip.category_completion_rate
AS item_same_category_completion_rate_7d
FROM user_categories uc
JOIN category_item_popularity cip
ON uc.user_top_category = cip.user_top_category;
Pattern 4: Feature Versioning
When the definition of a feature changes (e.g., changing the completion rate window from 7 days to 14 days), you need to version the feature to avoid silently breaking models trained on the old definition:
from dataclasses import dataclass, field
from typing import Dict, Optional
from datetime import datetime
@dataclass
class FeatureDefinition:
"""Versioned definition of a feature.
Tracks the feature's computation logic, version history,
and compatibility information.
Attributes:
name: Feature name (stable across versions).
version: Semantic version string.
description: Human-readable description.
computation_sql: SQL expression for the feature.
entity_key: The entity this feature is keyed on.
dtype: Data type of the feature value.
default_value: Default when feature is missing.
created_at: When this version was created.
deprecated: Whether this version is deprecated.
breaking_change: Whether this version breaks backward compatibility.
changelog: Description of changes from previous version.
"""
name: str
version: str
description: str
computation_sql: str
entity_key: str
dtype: str
default_value: float = 0.0
created_at: datetime = None
deprecated: bool = False
breaking_change: bool = False
changelog: str = ""
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
@property
def qualified_name(self) -> str:
"""Fully qualified feature name including version."""
return f"{self.name}_v{self.version.replace('.', '_')}"
@dataclass
class FeatureRegistry:
"""Registry of versioned feature definitions.
Maintains a mapping from feature names to their version history,
enabling backward compatibility and migration tracking.
Attributes:
features: Nested dict mapping feature_name -> version -> definition.
"""
features: Dict[str, Dict[str, FeatureDefinition]] = field(
default_factory=dict
)
def register(self, feature: FeatureDefinition) -> None:
"""Register a feature definition.
Args:
feature: The feature definition to register.
"""
if feature.name not in self.features:
self.features[feature.name] = {}
self.features[feature.name][feature.version] = feature
def get_latest(self, name: str) -> Optional[FeatureDefinition]:
"""Get the latest non-deprecated version of a feature.
Args:
name: The feature name.
Returns:
The latest FeatureDefinition, or None if not found.
"""
if name not in self.features:
return None
versions = self.features[name]
non_deprecated = {
v: d for v, d in versions.items() if not d.deprecated
}
if not non_deprecated:
return None
latest_version = sorted(non_deprecated.keys())[-1]
return non_deprecated[latest_version]
def get_version(self, name: str, version: str) -> Optional[FeatureDefinition]:
"""Get a specific version of a feature.
Args:
name: The feature name.
version: The version string.
Returns:
The FeatureDefinition, or None if not found.
"""
return self.features.get(name, {}).get(version)
# Example: version the completion rate feature
registry = FeatureRegistry()
registry.register(FeatureDefinition(
name="user_completion_rate",
version="1.0",
description="7-day completion rate, half-open window [t-7d, t).",
computation_sql=(
"COUNT(completed) / NULLIF(COUNT(started), 0) "
"WHERE event_date BETWEEN t - 7 AND t - 1"
),
entity_key="user_id",
dtype="float64",
default_value=0.0,
deprecated=True,
changelog="Initial version. Deprecated: window too short.",
))
registry.register(FeatureDefinition(
name="user_completion_rate",
version="2.0",
description="14-day completion rate, half-open window [t-14d, t).",
computation_sql=(
"COUNT(completed) / NULLIF(COUNT(started), 0) "
"WHERE event_date BETWEEN t - 14 AND t - 1"
),
entity_key="user_id",
dtype="float64",
default_value=0.0,
breaking_change=True,
changelog="Extended window from 7d to 14d. BREAKING: retrain required.",
))
latest = registry.get_latest("user_completion_rate")
print(f"Latest version: {latest.version}")
print(f"Qualified name: {latest.qualified_name}")
print(f"Breaking change: {latest.breaking_change}")
print(f"Changelog: {latest.changelog}")
Latest version: 2.0
Qualified name: user_completion_rate_v2_0
Breaking change: True
Changelog: Extended window from 7d to 14d. BREAKING: retrain required.
Production Reality: Feature versioning is the feature store equivalent of API versioning. Models trained on
user_completion_rate_v1.0(7-day window) will degrade if serveduser_completion_rate_v2.0(14-day window) — the value distributions are different. The registry tracks which model versions depend on which feature versions, enabling safe rollout: train the new model on v2.0 features, A/B test it against the old model on v1.0 features, and only deprecate v1.0 after the new model is promoted.
25.10 Data Contracts: The Agreement Nobody Writes Down
A data contract is a formal agreement between a data producer and a data consumer about the schema, quality, semantics, and service-level expectations of a data asset. It is the data equivalent of an API contract.
Why Data Contracts Matter for ML
Consider the StreamRec event pipeline. The mobile engineering team produces interaction_events to a Kafka topic. The ML team consumes these events to compute features. One day, the mobile team renames the field duration_seconds to watch_time_seconds to align with their internal naming convention. They did not know the ML team used this field. The Kafka schema registry (if one exists) catches the rename. If no schema registry exists, the streaming pipeline silently produces null values for the feature user_session_duration_sec, and the model degrades.
Data contracts prevent this by making the dependency explicit:
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from enum import Enum
from datetime import datetime
class DataType(Enum):
"""Supported data types in contracts."""
STRING = "string"
INT64 = "int64"
FLOAT64 = "float64"
BOOLEAN = "boolean"
TIMESTAMP = "timestamp"
ARRAY_STRING = "array<string>"
class CompatibilityMode(Enum):
"""Schema compatibility modes for evolution."""
BACKWARD = "backward" # New schema can read old data
FORWARD = "forward" # Old schema can read new data
FULL = "full" # Both directions
NONE = "none" # No compatibility guarantee
@dataclass
class FieldContract:
"""Contract for a single field in a data asset.
Attributes:
name: Field name.
dtype: Data type.
nullable: Whether null values are permitted.
description: Semantic description of the field.
pii: Whether the field contains personally identifiable information.
valid_range: Optional tuple of (min, max) for numeric fields.
valid_values: Optional list of allowed values for categorical fields.
"""
name: str
dtype: DataType
nullable: bool = False
description: str = ""
pii: bool = False
valid_range: Optional[tuple] = None
valid_values: Optional[List[str]] = None
@dataclass
class QualityExpectation:
"""A data quality expectation for a data contract.
Attributes:
name: Expectation name.
description: Human-readable description.
check_sql: SQL expression that evaluates to TRUE when the expectation is met.
severity: 'error' blocks the pipeline; 'warning' logs but continues.
"""
name: str
description: str
check_sql: str
severity: str = "error"
@dataclass
class DataContract:
"""A data contract between a producer and consumer.
Defines the schema, quality expectations, SLAs, and ownership
of a data asset. The contract is the authoritative reference
for what the data contains and how it should behave.
Attributes:
name: Contract name (typically matches the data asset name).
version: Contract version (semantic versioning).
producer: Team or service that produces the data.
consumers: Teams or services that consume the data.
description: Purpose and context of the data asset.
fields: Ordered list of field contracts.
quality_expectations: Data quality checks.
compatibility_mode: Schema evolution compatibility mode.
freshness_sla_minutes: Maximum acceptable staleness in minutes.
completeness_sla_percent: Minimum acceptable row completeness.
owner_email: Contact for contract violations.
created_at: When the contract was created.
"""
name: str
version: str
producer: str
consumers: List[str]
description: str
fields: List[FieldContract]
quality_expectations: List[QualityExpectation] = field(
default_factory=list
)
compatibility_mode: CompatibilityMode = CompatibilityMode.BACKWARD
freshness_sla_minutes: int = 60
completeness_sla_percent: float = 99.0
owner_email: str = ""
created_at: datetime = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
@property
def field_names(self) -> List[str]:
"""List of field names in the contract."""
return [f.name for f in self.fields]
@property
def pii_fields(self) -> List[str]:
"""List of fields marked as PII."""
return [f.name for f in self.fields if f.pii]
def validate_schema(self, actual_columns: Dict[str, str]) -> List[str]:
"""Validate that actual data matches the contract schema.
Args:
actual_columns: Dict mapping column name to actual data type.
Returns:
List of violation messages (empty if valid).
"""
violations = []
for fc in self.fields:
if fc.name not in actual_columns:
if not fc.nullable:
violations.append(
f"Required field '{fc.name}' is missing."
)
else:
actual_type = actual_columns[fc.name]
expected_type = fc.dtype.value
if actual_type != expected_type:
violations.append(
f"Field '{fc.name}': expected type "
f"'{expected_type}', got '{actual_type}'."
)
return violations
# StreamRec interaction events data contract
interaction_contract = DataContract(
name="streamrec.events.interactions",
version="2.1",
producer="mobile-engineering",
consumers=["ml-platform", "analytics", "content-team"],
description=(
"User interaction events from the StreamRec mobile and web "
"applications. Each event represents a user engaging with a "
"content item (view, click, completion, rating)."
),
fields=[
FieldContract(
name="event_id",
dtype=DataType.STRING,
description="Unique event identifier (UUID v4).",
),
FieldContract(
name="user_id",
dtype=DataType.INT64,
description="Unique user identifier.",
pii=True,
),
FieldContract(
name="item_id",
dtype=DataType.INT64,
description="Unique content item identifier.",
),
FieldContract(
name="event_type",
dtype=DataType.STRING,
description="Type of interaction.",
valid_values=["view", "click", "completion", "rating"],
),
FieldContract(
name="event_timestamp",
dtype=DataType.TIMESTAMP,
description="UTC timestamp when the event occurred.",
),
FieldContract(
name="duration_seconds",
dtype=DataType.FLOAT64,
nullable=True,
description="Duration of engagement in seconds. "
"Null for instant events (click).",
valid_range=(0.0, 86400.0),
),
FieldContract(
name="device_type",
dtype=DataType.STRING,
description="Device category.",
valid_values=["mobile_ios", "mobile_android", "web", "tablet"],
),
FieldContract(
name="session_id",
dtype=DataType.STRING,
description="Session identifier grouping events in one visit.",
),
],
quality_expectations=[
QualityExpectation(
name="no_null_user_ids",
description="user_id must never be null.",
check_sql="SELECT COUNT(*) = 0 FROM {table} WHERE user_id IS NULL",
severity="error",
),
QualityExpectation(
name="valid_event_types",
description="event_type must be one of the allowed values.",
check_sql=(
"SELECT COUNT(*) = 0 FROM {table} "
"WHERE event_type NOT IN "
"('view', 'click', 'completion', 'rating')"
),
severity="error",
),
QualityExpectation(
name="duration_non_negative",
description="duration_seconds must be non-negative when present.",
check_sql=(
"SELECT COUNT(*) = 0 FROM {table} "
"WHERE duration_seconds < 0"
),
severity="error",
),
QualityExpectation(
name="event_freshness",
description="Events should arrive within 5 minutes of occurrence.",
check_sql=(
"SELECT PERCENTILE_CONT(0.95) WITHIN GROUP "
"(ORDER BY ingestion_timestamp - event_timestamp) < "
"INTERVAL '5 minutes' FROM {table}"
),
severity="warning",
),
],
compatibility_mode=CompatibilityMode.BACKWARD,
freshness_sla_minutes=5,
completeness_sla_percent=99.5,
owner_email="data-platform@streamrec.example.com",
)
print(f"Contract: {interaction_contract.name} v{interaction_contract.version}")
print(f"Producer: {interaction_contract.producer}")
print(f"Consumers: {interaction_contract.consumers}")
print(f"Fields: {len(interaction_contract.fields)}")
print(f"PII fields: {interaction_contract.pii_fields}")
print(f"Quality checks: {len(interaction_contract.quality_expectations)}")
print(f"Compatibility: {interaction_contract.compatibility_mode.value}")
# Test schema validation
actual = {
"event_id": "string",
"user_id": "int64",
"item_id": "int64",
"event_type": "string",
"event_timestamp": "timestamp",
"watch_time_seconds": "float64", # RENAMED! Contract says duration_seconds
"device_type": "string",
"session_id": "string",
}
violations = interaction_contract.validate_schema(actual)
print(f"\nSchema violations: {len(violations)}")
for v in violations:
print(f" - {v}")
Contract: streamrec.events.interactions v2.1
Producer: mobile-engineering
Consumers: ['ml-platform', 'analytics', 'content-team']
Fields: 8
PII fields: ['user_id']
Quality checks: 4
Compatibility: backward
Schema violations: 1
- Required field 'duration_seconds' is missing.
The schema validation caught the rename. In a production system, this check runs as part of a CI/CD pipeline: when the mobile engineering team submits a schema change, the contract validation fails, the pull request is blocked, and the team is directed to coordinate with the consumers.
Implementing Data Contracts with Great Expectations
The quality expectations in the data contract translate directly to Great Expectations (GX) test suites:
# Conceptual mapping: data contract → Great Expectations suite
# In production, this would run as part of the data pipeline.
from dataclasses import dataclass
from typing import List
@dataclass
class ExpectationResult:
"""Result of a single data quality expectation check.
Attributes:
expectation_name: Name of the expectation.
success: Whether the check passed.
observed_value: The actual value observed.
expected_description: What was expected.
"""
expectation_name: str
success: bool
observed_value: str
expected_description: str
@dataclass
class ContractValidationReport:
"""Report from validating a data asset against its contract.
Attributes:
contract_name: Name of the data contract.
contract_version: Version of the contract.
results: List of expectation results.
timestamp: When the validation ran.
"""
contract_name: str
contract_version: str
results: List[ExpectationResult]
timestamp: str = ""
@property
def all_passed(self) -> bool:
"""Whether all expectations passed."""
return all(r.success for r in self.results)
@property
def failure_count(self) -> int:
"""Number of failed expectations."""
return sum(1 for r in self.results if not r.success)
def summary(self) -> str:
"""Human-readable summary of validation results."""
status = "PASSED" if self.all_passed else "FAILED"
lines = [
f"Contract Validation: {self.contract_name} "
f"v{self.contract_version} — {status}",
f" Total checks: {len(self.results)}",
f" Passed: {len(self.results) - self.failure_count}",
f" Failed: {self.failure_count}",
]
if not self.all_passed:
lines.append(" Failures:")
for r in self.results:
if not r.success:
lines.append(
f" - {r.expectation_name}: "
f"observed {r.observed_value}, "
f"expected {r.expected_description}"
)
return "\n".join(lines)
# Example validation report (simulated)
report = ContractValidationReport(
contract_name="streamrec.events.interactions",
contract_version="2.1",
results=[
ExpectationResult(
"no_null_user_ids", True, "0 nulls", "0 nulls"
),
ExpectationResult(
"valid_event_types", True, "0 invalid", "0 invalid"
),
ExpectationResult(
"duration_non_negative", False,
"47 negative values",
"0 negative values"
),
ExpectationResult(
"event_freshness", True,
"p95 latency: 2.3 min", "p95 < 5 min"
),
],
timestamp="2025-03-16T06:00:00Z",
)
print(report.summary())
Contract Validation: streamrec.events.interactions v2.1 — FAILED
Total checks: 4
Passed: 3
Failed: 1
Failures:
- duration_non_negative: observed 47 negative values, expected 0 negative values
Production Reality: Data contracts are the organizational mechanism that makes feature stores reliable. Without them, the feature store is only as good as its inputs — and its inputs are controlled by teams who may not know or care that an ML model depends on them. Contracts make the dependency explicit, testable, and enforceable. Companies that adopt data contracts (GoCardless, PayPal, Saxo Bank) report 60-80% reductions in data-related incidents (Dehghani, 2022; Majchrzak, 2023).
25.11 Schema Evolution: Changing Without Breaking
Data schemas change. New features are added, old features are deprecated, types are widened, fields are renamed. Schema evolution is the discipline of managing these changes so that existing consumers — including ML training pipelines that read historical data — continue to work.
Compatibility Modes
The schema registry (Confluent Schema Registry for Kafka, Iceberg's schema evolution, Delta Lake's schema enforcement) enforces compatibility rules:
| Mode | Rule | Example |
|---|---|---|
| Backward compatible | New schema can read data written by old schema | Adding a nullable column, widening a type (int → long) |
| Forward compatible | Old schema can read data written by new schema | Removing a column, adding a column with a default |
| Full compatible | Both backward and forward | Adding a nullable column with a default |
| Breaking | Neither | Renaming a column, changing a type narrowly (long → int) |
For ML pipelines, backward compatibility is the minimum requirement: the training pipeline must be able to read historical data (written under old schemas) using the current schema. Forward compatibility is a bonus: it means the serving pipeline (using the current schema) can process data from producers who have not yet upgraded.
Schema Evolution in Iceberg
Apache Iceberg provides the most elegant schema evolution model among lakehouse formats:
-- Adding a new column (backward compatible)
ALTER TABLE streamrec.features.user_daily
ADD COLUMN user_category_entropy DOUBLE;
-- Old data files: column is read as NULL
-- New data files: column contains computed values
-- Renaming a column (Iceberg-specific: safe rename)
ALTER TABLE streamrec.features.user_daily
RENAME COLUMN user_top_category TO user_primary_category;
-- Iceberg tracks columns by ID, not name.
-- Old data files are read correctly without rewriting.
-- This is NOT supported in Delta Lake without rewriting data.
-- Widening a type (backward compatible)
ALTER TABLE streamrec.events.interactions
ALTER COLUMN user_id TYPE BIGINT; -- was INT
-- Reordering columns (no data rewrite)
ALTER TABLE streamrec.features.user_daily
ALTER COLUMN user_category_entropy AFTER user_7d_completion_rate;
Schema Evolution Strategy for ML Pipelines
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from enum import Enum
from datetime import datetime
class EvolutionType(Enum):
"""Types of schema changes."""
ADD_COLUMN = "add_column"
DROP_COLUMN = "drop_column"
RENAME_COLUMN = "rename_column"
WIDEN_TYPE = "widen_type"
NARROW_TYPE = "narrow_type"
CHANGE_DEFAULT = "change_default"
@dataclass
class SchemaChange:
"""A single schema change in an evolution plan.
Attributes:
evolution_type: The type of schema change.
column_name: The column being changed.
new_name: New column name (for renames).
new_type: New data type (for type changes).
new_default: New default value (for default changes).
is_backward_compatible: Whether old readers can handle this.
is_forward_compatible: Whether new readers can handle old data.
requires_backfill: Whether historical data needs recomputation.
requires_model_retrain: Whether dependent models need retraining.
"""
evolution_type: EvolutionType
column_name: str
new_name: Optional[str] = None
new_type: Optional[str] = None
new_default: Optional[str] = None
is_backward_compatible: bool = True
is_forward_compatible: bool = True
requires_backfill: bool = False
requires_model_retrain: bool = False
@dataclass
class SchemaEvolutionPlan:
"""A plan for evolving a data schema safely.
Coordinates schema changes with dependent ML pipelines
to prevent silent breakage.
Attributes:
table_name: The table being evolved.
current_version: Current schema version.
target_version: Target schema version after evolution.
changes: List of schema changes to apply.
dependent_models: Model names that depend on this table.
dependent_pipelines: Pipeline names that read this table.
rollback_plan: Description of how to undo the changes.
"""
table_name: str
current_version: str
target_version: str
changes: List[SchemaChange]
dependent_models: List[str] = field(default_factory=list)
dependent_pipelines: List[str] = field(default_factory=list)
rollback_plan: str = ""
@property
def has_breaking_changes(self) -> bool:
"""Check if any change is not backward compatible."""
return any(
not c.is_backward_compatible for c in self.changes
)
@property
def requires_retrain(self) -> bool:
"""Check if any change requires model retraining."""
return any(c.requires_model_retrain for c in self.changes)
@property
def requires_backfill(self) -> bool:
"""Check if any change requires historical data backfill."""
return any(c.requires_backfill for c in self.changes)
def migration_checklist(self) -> List[str]:
"""Generate a migration checklist for the schema evolution.
Returns:
Ordered list of steps to execute the migration safely.
"""
steps = []
# Pre-migration checks
steps.append(
"1. Verify all dependent pipelines are paused or in "
"maintenance mode."
)
steps.append(
f"2. Notify consumers: {', '.join(self.dependent_pipelines)}."
)
if self.has_breaking_changes:
steps.append(
"3. [BREAKING] Create new table version; do NOT alter "
"in place."
)
steps.append(
"4. [BREAKING] Run dual-write pipeline: write to both "
"old and new tables."
)
else:
steps.append("3. Apply schema changes to existing table.")
if self.requires_backfill:
steps.append(
f"{'5' if self.has_breaking_changes else '4'}. "
"Backfill historical data for new/changed columns."
)
if self.requires_retrain:
steps.append(
f"{'6' if self.has_breaking_changes else '5'}. "
f"Retrain models: {', '.join(self.dependent_models)}."
)
steps.append(
f"{len(steps) + 1}. Run contract validation against new schema."
)
steps.append(
f"{len(steps) + 1}. Monitor feature distribution for 48 hours."
)
return steps
# Example: evolving the StreamRec user features table
evolution_plan = SchemaEvolutionPlan(
table_name="streamrec.features.user_daily",
current_version="3.1",
target_version="3.2",
changes=[
SchemaChange(
evolution_type=EvolutionType.ADD_COLUMN,
column_name="user_category_entropy",
new_type="float64",
new_default="NULL",
is_backward_compatible=True,
requires_backfill=True,
requires_model_retrain=False,
),
SchemaChange(
evolution_type=EvolutionType.RENAME_COLUMN,
column_name="user_top_category",
new_name="user_primary_category",
is_backward_compatible=False, # Old readers fail
requires_model_retrain=True,
),
],
dependent_models=[
"streamrec-ranking-v3",
"streamrec-retrieval-v2",
],
dependent_pipelines=[
"daily-feature-pipeline",
"hourly-feature-refresh",
"training-data-generator",
],
rollback_plan=(
"Revert schema change via Iceberg table rollback "
"(rollback to snapshot before migration). "
"Re-materialize online store from pre-migration offline data."
),
)
print(f"Table: {evolution_plan.table_name}")
print(f"Version: {evolution_plan.current_version} → "
f"{evolution_plan.target_version}")
print(f"Breaking changes: {evolution_plan.has_breaking_changes}")
print(f"Requires retrain: {evolution_plan.requires_retrain}")
print(f"Requires backfill: {evolution_plan.requires_backfill}")
print(f"\nMigration checklist:")
for step in evolution_plan.migration_checklist():
print(f" {step}")
Table: streamrec.features.user_daily
Version: 3.1 → 3.2
Breaking changes: True
Requires retrain: True
Requires backfill: True
Migration checklist:
1. Verify all dependent pipelines are paused or in maintenance mode.
2. Notify consumers: daily-feature-pipeline, hourly-feature-refresh, training-data-generator.
3. [BREAKING] Create new table version; do NOT alter in place.
4. [BREAKING] Run dual-write pipeline: write to both old and new tables.
5. Backfill historical data for new/changed columns.
6. Retrain models: streamrec-ranking-v3, streamrec-retrieval-v2.
7. Run contract validation against new schema.
8. Monitor feature distribution for 48 hours.
Common Misconception: "Schema evolution is a database problem, not an ML problem." In traditional software, a schema change affects the application that reads the database. In ML, a schema change affects every model that was trained on data with the old schema, every feature pipeline that computes features from the changed table, and every monitoring pipeline that tracks feature distributions. The blast radius of a schema change in an ML system is much larger than in a traditional application.
25.12 Data Lineage and the Data Catalog
Data lineage tracks how data flows from source to destination — which raw events were used to compute which features, which features were used to train which models, which models serve which products. A data catalog provides a searchable inventory of all data assets, their schemas, owners, quality metrics, and lineage.
Why Lineage Matters for ML
- Debugging: when a model degrades, lineage traces the problem from prediction quality → feature values → feature pipeline → raw data → data source.
- Impact analysis: before changing a data source, lineage shows which downstream features, models, and products are affected.
- Regulatory compliance: credit scoring regulations (ECOA, FCRA) require demonstrating which data influenced each decision. Lineage provides this audit trail.
- Reproducibility: reconstructing a training dataset requires knowing which data sources, at which versions, with which transformations, produced the features.
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set
from datetime import datetime
@dataclass
class LineageNode:
"""A node in the data lineage graph.
Represents a data asset (table, feature, model, or endpoint)
and its relationships to upstream and downstream nodes.
Attributes:
node_id: Unique identifier.
node_type: Type of node (source, feature, model, endpoint).
name: Human-readable name.
owner: Team or individual responsible.
upstream: Set of node IDs that this node depends on.
downstream: Set of node IDs that depend on this node.
metadata: Additional metadata (schema version, quality score, etc.).
"""
node_id: str
node_type: str
name: str
owner: str = ""
upstream: Set[str] = field(default_factory=set)
downstream: Set[str] = field(default_factory=set)
metadata: Dict[str, str] = field(default_factory=dict)
@dataclass
class DataLineageGraph:
"""A lineage graph tracking data flow through the ML system.
Enables impact analysis, debugging, and regulatory compliance
by recording the full provenance of every data asset.
Attributes:
nodes: Dictionary mapping node_id to LineageNode.
"""
nodes: Dict[str, LineageNode] = field(default_factory=dict)
def add_node(self, node: LineageNode) -> None:
"""Add a node to the lineage graph."""
self.nodes[node.node_id] = node
def add_edge(self, upstream_id: str, downstream_id: str) -> None:
"""Add a directed edge from upstream to downstream.
Args:
upstream_id: The node that produces data.
downstream_id: The node that consumes data.
"""
if upstream_id in self.nodes:
self.nodes[upstream_id].downstream.add(downstream_id)
if downstream_id in self.nodes:
self.nodes[downstream_id].upstream.add(upstream_id)
def impact_analysis(self, node_id: str) -> Set[str]:
"""Find all downstream nodes affected by a change to the given node.
Performs a breadth-first traversal of the downstream graph.
Args:
node_id: The node to analyze.
Returns:
Set of all downstream node IDs (transitive).
"""
affected = set()
queue = [node_id]
while queue:
current = queue.pop(0)
for downstream_id in self.nodes.get(current, LineageNode(
node_id="", node_type="", name=""
)).downstream:
if downstream_id not in affected:
affected.add(downstream_id)
queue.append(downstream_id)
return affected
def root_cause_trace(self, node_id: str) -> List[List[str]]:
"""Trace all paths from the given node back to source nodes.
Args:
node_id: The node to trace from.
Returns:
List of paths, each a list of node IDs from source to target.
"""
paths = []
def dfs(current: str, path: List[str]):
node = self.nodes.get(current)
if not node or not node.upstream:
paths.append(list(reversed(path)))
return
for parent in node.upstream:
dfs(parent, path + [parent])
dfs(node_id, [node_id])
return paths
# Build the StreamRec lineage graph
lineage = DataLineageGraph()
# Source nodes
lineage.add_node(LineageNode(
"kafka.interactions", "source", "Interaction Events",
owner="mobile-engineering"
))
lineage.add_node(LineageNode(
"postgres.item_metadata", "source", "Item Metadata",
owner="content-team"
))
# Feature nodes
lineage.add_node(LineageNode(
"feature.user_7d_completion_rate", "feature",
"User 7d Completion Rate", owner="ml-platform"
))
lineage.add_node(LineageNode(
"feature.user_session_clicks", "feature",
"User Session Click Count", owner="ml-platform"
))
lineage.add_node(LineageNode(
"feature.item_avg_completion", "feature",
"Item Avg Completion Rate", owner="ml-platform"
))
# Model nodes
lineage.add_node(LineageNode(
"model.ranking_v3", "model", "Ranking Model v3",
owner="ml-ranking"
))
# Endpoint nodes
lineage.add_node(LineageNode(
"endpoint.recommendations", "endpoint", "Recommendation API",
owner="backend"
))
# Edges
lineage.add_edge("kafka.interactions", "feature.user_7d_completion_rate")
lineage.add_edge("kafka.interactions", "feature.user_session_clicks")
lineage.add_edge("postgres.item_metadata", "feature.item_avg_completion")
lineage.add_edge("kafka.interactions", "feature.item_avg_completion")
lineage.add_edge("feature.user_7d_completion_rate", "model.ranking_v3")
lineage.add_edge("feature.user_session_clicks", "model.ranking_v3")
lineage.add_edge("feature.item_avg_completion", "model.ranking_v3")
lineage.add_edge("model.ranking_v3", "endpoint.recommendations")
# Impact analysis: what happens if Kafka interactions schema changes?
affected = lineage.impact_analysis("kafka.interactions")
print("Impact of changing 'kafka.interactions':")
for node_id in sorted(affected):
node = lineage.nodes[node_id]
print(f" [{node.node_type}] {node.name} ({node.owner})")
# Root cause trace: why might recommendations degrade?
print("\nRoot cause paths for 'endpoint.recommendations':")
paths = lineage.root_cause_trace("endpoint.recommendations")
for i, path in enumerate(paths):
names = [lineage.nodes[nid].name for nid in path]
print(f" Path {i + 1}: {' → '.join(names)}")
Impact of changing 'kafka.interactions':
[feature] Item Avg Completion Rate (ml-platform)
[feature] User 7d Completion Rate (ml-platform)
[feature] User Session Click Count (ml-platform)
[model] Ranking Model v3 (ml-ranking)
[endpoint] Recommendation API (backend)
Root cause paths for 'endpoint.recommendations':
Path 1: Interaction Events → User 7d Completion Rate → Ranking Model v3 → Recommendation API
Path 2: Interaction Events → User Session Click Count → Ranking Model v3 → Recommendation API
Path 3: Item Metadata → Item Avg Completion Rate → Ranking Model v3 → Recommendation API
Path 4: Interaction Events → Item Avg Completion Rate → Ranking Model v3 → Recommendation API
25.13 Data Versioning with DVC
While the feature store handles feature versioning and the lakehouse handles table versioning, the raw datasets used for model training also need version control. DVC (Data Version Control) extends Git to handle large data files and ML artifacts.
# Initialize DVC in the StreamRec project
cd /opt/streamrec-ml
dvc init
# Track a training dataset
dvc add data/training/2025-03-15/train.parquet
git add data/training/2025-03-15/train.parquet.dvc .gitignore
git commit -m "Track training dataset for March 15 run"
# Push data to remote storage (S3)
dvc remote add -d s3remote s3://streamrec-ml-data/dvc-store
dvc push
# Reproduce a specific training run: check out the data version
git checkout v3.1.0 # Model version tag
dvc checkout # Restore the exact data files for that version
# Compare datasets between model versions
dvc diff v3.0.0 v3.1.0
Modified:
data/training/2025-03-15/train.parquet
size: 4.2 GB -> 4.7 GB
hash: a8f3c1...e2b1d4 -> b7d2e5...f3a1c2
data/features/user_daily.parquet
size: 1.1 GB -> 1.3 GB
hash: c2d4e6...a1b3c5 -> d3e5f7...b2c4d6
Added:
data/features/user_category_entropy.parquet
size: 0.8 GB
hash: e4f6a8...c3d5e7
The key insight: git checkout v3.1.0 && dvc checkout restores the exact data that model v3.1.0 was trained on. Combined with the feature store's point-in-time joins and the lakehouse's time-travel, this provides end-to-end reproducibility from raw events to model predictions.
25.14 ETL vs. ELT and the Modern Data Stack
Two paradigms for data transformation:
ETL (Extract, Transform, Load): data is transformed before loading into the target system. The traditional data warehousing approach. Transformations happen in a separate processing layer (Spark, custom scripts).
ELT (Extract, Load, Transform): data is loaded raw into the target system, then transformed in place. The modern cloud data warehouse approach. Transformations happen inside the warehouse using SQL (dbt, Dataform).
ETL: ELT:
Source → [Transform] → Warehouse Source → Warehouse → [Transform in SQL]
Spark, Airflow, custom code Fivetran/Airbyte → dbt → feature tables
For ML feature engineering, the practical guidance:
| Consideration | ETL | ELT |
|---|---|---|
| Unstructured data | Required (warehouse cannot store raw images/text) | Not applicable |
| Complex transformations | Python/Spark for ML-specific logic | SQL for aggregations and joins |
| Iteration speed | Slow (deploy pipeline, run, check) | Fast (write SQL, run dbt, check) |
| Cost | Compute cluster (Spark) | Warehouse compute (pay per query) |
| Testability | Unit tests on transform functions | dbt tests on SQL models |
Production Reality: Most production ML systems use both. ELT (dbt on the warehouse) handles the structured feature engineering — aggregations, joins, window functions. ETL (Spark or Python scripts) handles the ML-specific transformations — embeddings, tokenization, feature crossing. The feature store sits downstream of both and provides a unified serving layer.
25.15 Data Mesh: Decentralized Data Ownership
Data mesh (Dehghani, 2022) is an organizational architecture that treats data as a product, with decentralized ownership by domain teams rather than centralized ownership by a data engineering team. Its four principles:
- Domain ownership: the team that produces data owns it as a product, including quality, documentation, and SLAs.
- Data as a product: each data product has a discoverable interface, documented schema, quality guarantees, and an owner — just like a software product.
- Self-serve data platform: a shared platform provides the infrastructure (storage, compute, governance) so domain teams can publish data products without building infrastructure from scratch.
- Federated computational governance: global policies (privacy, security, interoperability) are enforced by the platform, while domain-specific decisions are made by domain teams.
For ML teams, data mesh means:
- Feature stores become data products that the ML platform team owns and publishes with contracts, SLAs, and documentation.
- Raw data assets are owned by domain teams (mobile engineering owns interaction events, content team owns item metadata) and published with contracts.
- The ML platform team is a consumer of domain data products and a producer of ML feature products.
This aligns precisely with the data contract pattern from Section 25.10: the contract is the interface between the domain data product and its ML consumer.
25.16 The StreamRec Feature Store: Complete Design (M10)
We now bring everything together in the M10 progressive project milestone: the complete feature store design for StreamRec.
Architecture Overview
┌─────────────────────────────────────────────────────────────────────┐
│ StreamRec Feature Platform │
│ │
│ ┌──────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Mobile App │ │ Content CMS │ │ User Service │ │
│ │ (Kafka) │ │ (PostgreSQL CDC) │ │ (API) │ │
│ └──────┬───────┘ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │ Kafka │ │ Kafka │ │ Kafka │ │
│ │ topic: │ │ topic: │ │ topic: │ │
│ │ events │ │ items │ │ users │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌────▼─────────────────────▼────────────────────────▼────┐ │
│ │ Flink Stream Processing │ │
│ │ - Session features (real-time) │ │
│ │ - Trending scores (real-time) │ │
│ │ - Event deduplication │ │
│ └────┬───────────────────────────────────┬───────────────┘ │
│ │ (streaming writes) │ (to data lake) │
│ ┌────▼────────┐ ┌───────▼───────────┐ │
│ │ Redis │ │ Delta Lake (S3) │ │
│ │ Online │ │ Offline Store │ │
│ │ Store │ │ + Raw Events │ │
│ └────┬────────┘ └───────┬───────────┘ │
│ │ │ │
│ │ ┌───────▼───────────┐ │
│ │ │ Spark Batch │ │
│ │ │ Pipeline (daily) │ │
│ │ │ - User features │ │
│ │ │ - Item features │ │
│ │ │ - Cross features │ │
│ │ └───────┬───────────┘ │
│ │ │ (materialize) │
│ │◄──────────────────────────────────┘ │
│ │ │
│ ┌────▼──────────────────────────────────┐ │
│ │ Feast Feature Server │ │
│ │ - Online serving (Redis backend) │ │
│ │ - Historical retrieval (Delta Lake) │ │
│ │ - Point-in-time joins │ │
│ └────┬──────────────────────────────────┘ │
│ │ │
│ ┌────▼────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Ranking │ │ Training │ │ Monitoring │ │
│ │ Service │ │ Pipeline │ │ (feature │ │
│ │ (online)│ │ (offline) │ │ drift) │ │
│ └─────────┘ └────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Feature Table Summary
| Feature View | Entity | Features | Update Frequency | Online Store | TTL |
|---|---|---|---|---|---|
user_batch_features |
user_id |
7d/30d completion rates, session length, lifetime count, top category, subscription tier, account age | Daily (Spark) | Redis | 24h |
user_stream_features |
user_id |
Session clicks, session duration, last 10 categories, session completion rate, minutes since last interaction | Real-time (Flink) | Redis | 2h |
item_batch_features |
item_id |
Avg completion rate, category, content type, creator id, publication age, total impressions, trending score (batch) | Daily (Spark) | Redis | 24h |
item_stream_features |
item_id |
Hourly impressions, hourly completions, real-time trending score | Real-time (Flink) | Redis | 1h |
user_item_cross_features |
user_id, item_id |
Category affinity score, creator affinity score | Daily (Spark) | Not materialized (computed at serving time from user + item features) | N/A |
Implementation Cost
| Component | Monthly Cost (Estimate) | Justification |
|---|---|---|
| Delta Lake on S3 (15 TB) | $450 | S3 Standard: $0.023/GB/month | |
| Spark cluster (daily batch) | $3,200 | 4x r5.2xlarge, 6h/day |
| Flink cluster (streaming) | $2,800 | 3x c5.2xlarge, 24/7 |
| Redis cluster (online store) | $4,500 | 3x r6g.xlarge, 3 replicas |
| Feast server (API) | $600 | 2x c5.large, behind ALB |
| Total | $11,550 |
This is approximately 0.003% of StreamRec's $400M annual revenue — a fraction of a fraction of the value the recommendation system generates.
Production ML = Software Engineering: The StreamRec feature store is not a research artifact — it is a distributed system with three storage backends (S3, Redis, Kafka), two compute engines (Spark, Flink), a serving layer (Feast), monitoring (feature drift detection), and a contract system. Building it requires software engineering skills — distributed systems, API design, monitoring, testing — not just data science skills. This is the reality of Theme 3: production ML is software engineering.
25.17 Chapter Summary
Data infrastructure is the plumbing of production ML. It is not glamorous, it does not appear in research papers, and it is the layer most likely to determine whether your model works in production.
The key ideas:
-
Feature stores solve the online-offline consistency problem by providing a single source of truth for feature definitions. The online store serves features in milliseconds for inference; the offline store provides point-in-time correct features for training. The same computation pipeline populates both.
-
Point-in-time joins prevent temporal data leakage in training data construction. They ensure that each training example uses only features that were available at the time the example was generated. Without them, training data contains the future, and the model learns a signal that does not exist in production.
-
Lakehouses (Delta Lake, Iceberg, Hudi) combine the cheap storage of data lakes with the ACID transactions, schema evolution, and time-travel of data warehouses. For ML workloads that require both SQL analytics and direct file access, the lakehouse is the pragmatic choice.
-
Data contracts formalize the agreement between data producers and consumers. They specify the schema, quality expectations, and SLAs that the ML platform depends on. Without contracts, upstream changes silently break downstream models.
-
Schema evolution must be managed with the same discipline as API versioning. Backward-compatible changes (adding nullable columns) are safe; breaking changes (renames, type narrowing) require coordinated migration across all dependent models and pipelines.
-
Data lineage enables impact analysis, debugging, and regulatory compliance by tracking the flow of data from source to model to prediction. When the model degrades, lineage traces the root cause. When a regulation requires auditability, lineage provides the trail.
The StreamRec feature store (M10) demonstrates all of these ideas in a concrete architecture: batch and streaming feature computation, Feast as the feature serving layer, Delta Lake as the offline store, Redis as the online store, and data contracts governing the interfaces between teams.
In Chapter 26, we turn to the compute infrastructure: how to train the models that consume these features at scale, using distributed training on multi-GPU clusters.