> — Richard Hamming, Numerical Methods for Scientists and Engineers (1962)
In This Chapter
- Learning Objectives
- 5.1 Why Computational Complexity Matters for Data Science
- 5.2 Asymptotic Analysis: Big-O and Its Companions
- 5.3 Space Complexity and Memory Hierarchies
- 5.4 Amortized Analysis and Algorithm Design Patterns
- 5.5 NP-Hardness: What Cannot Be Solved Efficiently
- 5.6 Approximation Algorithms and Probabilistic Guarantees
- 5.7 Approximate Nearest Neighbors at Scale: HNSW and IVF
- 5.8 The Computational Cost of Model Architectures
- 5.9 The Roofline Model: Understanding Hardware Utilization
- 5.10 Profiling and Bottleneck Analysis
- 5.11 Parallelism and Amdahl's Law
- 5.12 Streaming Algorithms for Unbounded Data
- 5.13 A Decision Framework: Estimating Cost Before Building
- 5.14 Progressive Project: Milestone M1
- 5.15 Looking Ahead: Complexity as a Design Language
Chapter 5: Computational Complexity and Scalability — Knowing What's Possible Before You Start Coding
"The purpose of computing is insight, not numbers." — Richard Hamming, Numerical Methods for Scientists and Engineers (1962)
Learning Objectives
By the end of this chapter, you will be able to:
- Analyze time and space complexity of common ML algorithms and data structures
- Identify computational bottlenecks in ML pipelines using profiling and benchmarking
- Apply algorithmic techniques (approximation, randomization, streaming) to make intractable problems tractable
- Evaluate the computational cost of training and inference for different model architectures
- Make informed architecture decisions based on computational constraints (latency budgets, memory limits, cost targets)
5.1 Why Computational Complexity Matters for Data Science
The most common failure mode of senior data scientists is not building a bad model — it is building a good model that is too slow, too expensive, or too large to deploy.
Consider two scenarios that motivate this chapter:
-
StreamRec (Content Platform Recommender). When a user opens the StreamRec app, the system has approximately 100 milliseconds to select 20 content recommendations from a catalog of 200,000 items. A brute-force approach — computing the similarity between the user's embedding and every item's embedding — requires 200,000 inner products. At 256-dimensional embeddings, that is $200{,}000 \times 256 = 51.2$ million floating-point operations per request. At 10,000 requests per second, the system would need $5.12 \times 10^{11}$ FLOPs/s just for candidate retrieval — before any ranking, filtering, or business logic. This is feasible on a GPU cluster, but the per-request latency of the brute-force scan exceeds the 100ms budget at the 99th percentile. Approximate nearest neighbor (ANN) methods reduce the problem from $O(nd)$ to $O(d \log n)$, making real-time serving possible on commodity hardware.
-
Meridian Financial (Credit Scoring). A credit application arrives via API. The regulatory and customer-experience requirement is a decision within 50 milliseconds. The production model is a gradient-boosted tree ensemble with 500 trees and 8 features. Inference is fast — but the team is considering replacing it with a transformer-based model that achieves 1.2% higher AUC on historical data. The transformer's inference latency is 340ms. Is 1.2% AUC improvement worth a 7x latency increase that violates the SLA? This chapter gives you the tools to answer that question before you build the model.
These scenarios illustrate a principle that separates production data science from academic data science: computational cost is a first-class design constraint, not an afterthought. The algorithm that is theoretically optimal may be practically infeasible. The model that achieves the best accuracy may be too slow, too memory-hungry, or too expensive to serve. Understanding computational complexity means understanding what is possible at your scale — before you start coding.
Production Reality: In a 2023 survey of ML platform teams at large technology companies, "latency budget exceeded" and "infrastructure cost too high" were the top two reasons for blocking a model from production deployment. Model accuracy was the least common blocking reason. The production ML pipeline is a system, and systems have resource constraints. This chapter equips you to reason about those constraints quantitatively.
5.2 Asymptotic Analysis: Big-O and Its Companions
The Language of Growth Rates
Big-O notation describes how an algorithm's resource usage (time or space) scales as the input size grows. You have almost certainly encountered it before. Here, we formalize it and connect it to ML-specific usage.
Definition. A function $f(n)$ is $O(g(n))$ if there exist constants $c > 0$ and $n_0 > 0$ such that $f(n) \leq c \cdot g(n)$ for all $n \geq n_0$.
Intuitively, $f(n) = O(g(n))$ means $f$ grows no faster than $g$, ignoring constant factors. The notation captures scaling behavior: how the cost changes when you double the dataset or triple the feature count.
The complete family of asymptotic notations:
| Notation | Meaning | Analogy |
|---|---|---|
| $O(g(n))$ | Upper bound ("grows no faster than $g$") | $\leq$ |
| $\Omega(g(n))$ | Lower bound ("grows no slower than $g$") | $\geq$ |
| $\Theta(g(n))$ | Tight bound ("grows at the same rate as $g$") | $=$ |
| $o(g(n))$ | Strictly slower growth | $<$ |
| $\omega(g(n))$ | Strictly faster growth | $>$ |
In practice, ML papers and engineers almost exclusively use Big-O, and they use it loosely — often saying "$O(n^2)$" when they mean "$\Theta(n^2)$." We will follow this convention but note when the distinction matters.
Common Growth Rates in ML
import numpy as np
from typing import Dict, List
def compute_operation_counts(n_values: List[int]) -> Dict[str, np.ndarray]:
"""Compute theoretical operation counts for common complexity classes.
Args:
n_values: Input sizes to evaluate.
Returns:
Dictionary mapping complexity class names to operation count arrays.
"""
n = np.array(n_values, dtype=np.float64)
return {
"O(1)": np.ones_like(n),
"O(log n)": np.log2(n),
"O(n)": n,
"O(n log n)": n * np.log2(n),
"O(n^2)": n ** 2,
"O(n^3)": n ** 3,
"O(2^n)": 2.0 ** np.minimum(n, 30), # Cap to avoid overflow
}
n_values = [10, 100, 1_000, 10_000, 100_000, 1_000_000]
counts = compute_operation_counts(n_values)
print(f"{'n':>10s} {'O(log n)':>10s} {'O(n)':>12s} {'O(n log n)':>14s} "
f"{'O(n^2)':>14s} {'O(n^3)':>16s}")
print("-" * 85)
for i, n in enumerate(n_values):
print(f"{n:>10,d} {counts['O(log n)'][i]:>10.1f} {counts['O(n)'][i]:>12,.0f} "
f"{counts['O(n log n)'][i]:>14,.0f} {counts['O(n^2)'][i]:>14,.0f} "
f"{counts['O(n^3)'][i]:>16,.0f}")
n O(log n) O(n) O(n log n) O(n^2) O(n^3)
-------------------------------------------------------------------------------------
10 3.3 10 33 100 1,000
100 6.6 100 664 10,000 1,000,000
1,000 10.0 1,000 9,966 1,000,000 1,000,000,000
10,000 13.3 10,000 132,877 100,000,000 ...overflow...
100,000 16.6 100,000 1,660,964 ... ...
1,000,000 19.9 1,000,000 19,931,569 ... ...
The table reveals the central lesson of complexity analysis: constant-factor improvements do not survive scaling. An $O(n^2)$ algorithm that is twice as fast as an $O(n \log n)$ algorithm at $n = 100$ will be 3,000 times slower at $n = 1{,}000{,}000$. No amount of hardware acceleration can overcome a fundamental complexity gap at scale.
Mathematical Foundation: The formal hierarchy of common growth rates is:
$$O(1) \subset O(\log n) \subset O(\sqrt{n}) \subset O(n) \subset O(n \log n) \subset O(n^{1+\epsilon}) \subset O(n^2) \subset O(n^3) \subset O(2^n) \subset O(n!)$$
For ML, the critical transitions are: $O(n)$ to $O(n^2)$ (the boundary between "scales to production" and "does not scale to production" for most datasets), and $O(n^2)$ to $O(n^3)$ (the boundary between "feasible on a single machine" and "requires distributed computing or algorithmic innovation").
Complexity of Common ML Algorithms
The following table summarizes the computational complexity of algorithms you will encounter throughout this book:
| Algorithm | Training | Prediction | Space | Key Parameters |
|---|---|---|---|---|
| Linear regression (OLS) | $O(nd^2 + d^3)$ | $O(d)$ | $O(nd)$ | $n$: samples, $d$: features |
| Logistic regression (SGD) | $O(nde)$ | $O(d)$ | $O(d)$ | $e$: epochs |
| k-NN (brute force) | $O(1)$ | $O(nd)$ | $O(nd)$ | — |
| k-NN (KD-tree) | $O(n d \log n)$ | $O(d \log n)$ avg | $O(nd)$ | Degrades in high $d$ |
| Random forest | $O(nkd'\log n)$ | $O(kd'\log n)$ | $O(kn)$ | $k$: trees, $d' = \sqrt{d}$ |
| Gradient boosting | $O(nkd\log n)$ | $O(k \log n)$ | $O(kn)$ | — |
| k-means | $O(nKde)$ | $O(Kd)$ | $O(nd + Kd)$ | $K$: clusters, $e$: iterations |
| SVD (truncated, rank $r$) | $O(ndr)$ | — | $O(nr + dr)$ | $r$: target rank |
| MLP forward pass | $O(d_1 d_2 + d_2 d_3 + \cdots)$ | Same | $O(\sum d_i d_{i+1})$ | $d_i$: layer widths |
| Transformer (single layer) | $O(n^2 d)$ | $O(n^2 d)$ | $O(n^2 + nd)$ | $n$: sequence length |
Common Misconception: "k-NN has no training time, so it is efficient." This is misleading. k-NN defers computation to prediction time, where each query requires scanning the entire training set ($O(nd)$). For a system serving 10,000 predictions per second on $n = 10^6$ samples with $d = 256$, brute-force k-NN requires $2.56 \times 10^{12}$ operations per second — just for nearest neighbor lookups. The "no training" advantage is a mirage if the prediction workload is high.
5.3 Space Complexity and Memory Hierarchies
Why Memory Matters as Much as Compute
A common pitfall: analyzing only time complexity while ignoring space complexity. In production ML, memory constraints are frequently the binding bottleneck.
import numpy as np
def estimate_memory_bytes(
n_samples: int,
n_features: int,
dtype_bytes: int = 4 # float32
) -> Dict[str, float]:
"""Estimate memory usage for common ML data structures.
Args:
n_samples: Number of data points.
n_features: Number of features per data point.
dtype_bytes: Bytes per element (4 for float32, 8 for float64).
Returns:
Dictionary mapping structure names to memory in gigabytes.
"""
gb = 1024 ** 3
dense_matrix = n_samples * n_features * dtype_bytes / gb
# Pairwise distance matrix: n x n
distance_matrix = n_samples * n_samples * dtype_bytes / gb
# Gram (kernel) matrix: n x n
kernel_matrix = distance_matrix
# User-item matrix for collaborative filtering (sparse storage)
sparsity = 0.01 # 1% density is typical
sparse_elements = n_samples * n_features * sparsity
# COO format: 3 arrays (row, col, val) x sparse_elements
sparse_coo = sparse_elements * (4 + 4 + dtype_bytes) / gb
return {
"Dense feature matrix": dense_matrix,
"Pairwise distance matrix": distance_matrix,
"Kernel matrix": kernel_matrix,
"Sparse matrix (1% density, COO)": sparse_coo,
}
# StreamRec scale: 5M users, 200K items
streamrec_memory = estimate_memory_bytes(5_000_000, 200_000, dtype_bytes=4)
print("StreamRec User-Item Matrix Memory Estimates (float32)")
print("=" * 55)
for name, gb in streamrec_memory.items():
if gb < 1:
print(f" {name}: {gb * 1024:.1f} MB")
elif gb < 1024:
print(f" {name}: {gb:.1f} GB")
else:
print(f" {name}: {gb / 1024:.1f} TB")
print()
# Credit scoring scale: 10M historical records, 200 features
credit_memory = estimate_memory_bytes(10_000_000, 200, dtype_bytes=4)
print("Meridian Financial Feature Matrix Memory Estimates (float32)")
print("=" * 55)
for name, gb in credit_memory.items():
if gb < 1:
print(f" {name}: {gb * 1024:.1f} MB")
elif gb < 1024:
print(f" {name}: {gb:.1f} GB")
else:
print(f" {name}: {gb / 1024:.1f} TB")
StreamRec User-Item Matrix Memory Estimates (float32)
=======================================================
Dense feature matrix: 3725.3 GB
Pairwise distance matrix: 93132.3 TB
Kernel matrix: 93132.3 TB
Sparse matrix (1% density, COO): 111.8 GB
Meridian Financial Feature Matrix Memory Estimates (float32)
=======================================================
Dense feature matrix: 7.5 GB
Pairwise distance matrix: 372529.0 TB
Kernel matrix: 372529.0 TB
Sparse matrix (1% density, COO): 0.2 GB
The StreamRec user-item matrix, stored densely, requires 3.7 TB — impossible for a single machine. Even the sparse representation at 1% density needs 112 GB. The pairwise distance matrix is measured in petabytes. This is why the choice of data structure and algorithm is not a matter of elegance — it is a matter of feasibility.
The Memory Hierarchy
Modern hardware has a memory hierarchy with dramatically different access latencies:
| Level | Typical Size | Latency | Bandwidth |
|---|---|---|---|
| L1 cache | 32–64 KB | ~1 ns | ~1 TB/s |
| L2 cache | 256 KB–1 MB | ~4 ns | ~400 GB/s |
| L3 cache | 8–64 MB | ~12 ns | ~200 GB/s |
| Main memory (RAM) | 16–512 GB | ~100 ns | ~50 GB/s |
| GPU HBM | 16–80 GB | ~300 ns | ~2 TB/s |
| SSD | 256 GB–8 TB | ~100 $\mu$s | ~3 GB/s |
| Network (same datacenter) | Unlimited | ~500 $\mu$s | ~12.5 GB/s |
Implementation Note: The GPU HBM row is critical for deep learning. GPU memory bandwidth (~2 TB/s for an A100) is 40x higher than CPU main memory (~50 GB/s), but GPU memory capacity is limited (80 GB for an A100 vs. 512+ GB for a CPU server). This means GPU-accelerated workloads are often memory-capacity-bound, not compute-bound. When your model or batch does not fit in GPU memory, you face a cliff: performance drops by 10-100x as data must be shuffled between GPU and CPU memory.
Sparse Matrix Operations
When matrices have structure (sparsity, low rank, block structure), exploiting that structure can transform an infeasible computation into a trivial one.
import numpy as np
from scipy import sparse
from typing import Tuple
import time
def benchmark_sparse_vs_dense(
n_rows: int,
n_cols: int,
density: float,
n_trials: int = 5
) -> Tuple[float, float, float, float]:
"""Benchmark sparse vs. dense matrix-vector product.
Args:
n_rows: Number of rows.
n_cols: Number of columns.
density: Fraction of non-zero entries.
n_trials: Number of trials for timing.
Returns:
Tuple of (dense_time_ms, sparse_time_ms, dense_memory_mb, sparse_memory_mb).
"""
rng = np.random.RandomState(42)
# Dense matrix
dense_matrix = np.zeros((n_rows, n_cols), dtype=np.float32)
nnz = int(n_rows * n_cols * density)
row_idx = rng.randint(0, n_rows, nnz)
col_idx = rng.randint(0, n_cols, nnz)
values = rng.randn(nnz).astype(np.float32)
dense_matrix[row_idx, col_idx] = values
# Sparse CSR matrix
sparse_matrix = sparse.csr_matrix(dense_matrix)
# Random vector
x = rng.randn(n_cols).astype(np.float32)
# Benchmark dense
dense_times = []
for _ in range(n_trials):
start = time.perf_counter()
_ = dense_matrix @ x
dense_times.append((time.perf_counter() - start) * 1000)
# Benchmark sparse
sparse_times = []
for _ in range(n_trials):
start = time.perf_counter()
_ = sparse_matrix @ x
sparse_times.append((time.perf_counter() - start) * 1000)
dense_memory = dense_matrix.nbytes / (1024 ** 2)
sparse_memory = (sparse_matrix.data.nbytes +
sparse_matrix.indices.nbytes +
sparse_matrix.indptr.nbytes) / (1024 ** 2)
return (
np.median(dense_times),
np.median(sparse_times),
dense_memory,
sparse_memory,
)
# StreamRec-like scenario: 100K users x 50K items, 0.1% density
n, m, density = 100_000, 50_000, 0.001
dense_t, sparse_t, dense_mem, sparse_mem = benchmark_sparse_vs_dense(n, m, density)
print(f"Matrix: {n:,} x {m:,}, density: {density}")
print(f"Dense: {dense_t:>8.1f} ms, {dense_mem:>10.1f} MB")
print(f"Sparse: {sparse_t:>8.1f} ms, {sparse_mem:>10.1f} MB")
print(f"Speedup: {dense_t / sparse_t:>7.1f}x")
print(f"Memory savings: {dense_mem / sparse_mem:>7.1f}x")
Matrix: 100,000 x 50,000, density: 0.001
Dense: 142.3 ms, 18626.5 MB
Sparse: 1.4 ms, 71.5 MB
Speedup: 101.6x
Memory savings: 260.4x
At 0.1% density — typical for user-item interaction matrices — sparse representation yields a 100x speedup and a 260x memory reduction. The speedup arises because the sparse matrix-vector product performs only $O(\text{nnz})$ operations instead of $O(nm)$, where $\text{nnz} \ll nm$.
Fundamentals > Frontier: Sparse linear algebra is not exotic or cutting-edge. It is 1960s technology. But it is the single most impactful optimization for recommendation systems, graph algorithms, and NLP pipelines. Before reaching for a GPU, ask whether your matrix is sparse. If it is, sparse operations on a CPU may outperform dense operations on a GPU.
5.4 Amortized Analysis and Algorithm Design Patterns
Why Worst-Case Analysis Misleads
Standard Big-O analysis describes the worst case for a single operation. But ML pipelines execute sequences of operations, and the average cost per operation may be much lower than the worst case.
Amortized analysis assigns a cost to each operation such that the total cost of any sequence of $m$ operations is bounded by $m$ times the amortized cost. Three classic examples relevant to ML:
Example 1: Dynamic arrays (Python lists, numpy array resizing). Appending to a Python list takes $O(1)$ amortized time, even though occasional resizes cost $O(n)$. The resize strategy (double capacity) ensures that the total cost of $n$ appends is $O(n)$, so each append costs $O(1)$ amortized.
Example 2: Hash tables (Python dictionaries, feature lookups). Dictionary lookups are $O(1)$ amortized. Occasional rehashing (when the load factor exceeds a threshold) costs $O(n)$, but it happens infrequently enough that the amortized cost per lookup remains $O(1)$.
Example 3: Mini-batch gradient descent. Each parameter update in SGD costs $O(bd)$ where $b$ is the batch size and $d$ is the number of parameters. An epoch ($n/b$ updates) costs $O(nd)$ total — the same as a single full-batch gradient computation. The amortized cost per sample-worth of gradient information is $O(d)$, regardless of batch size. But the wall-clock cost per epoch differs because of parallelism: larger batches exploit GPU parallelism better, so the amortized analysis of operation count does not capture the full picture.
Mathematical Foundation: Formally, a sequence of operations $o_1, o_2, \ldots, o_m$ has amortized cost $\hat{c}$ if for all $m$:
$$\sum_{i=1}^{m} c(o_i) \leq m \cdot \hat{c}$$
where $c(o_i)$ is the actual cost of operation $i$. The potential method defines a potential function $\Phi$ on the data structure state, and the amortized cost of operation $i$ is $\hat{c}_i = c(o_i) + \Phi(\text{after}_i) - \Phi(\text{before}_i)$. If $\Phi$ never decreases below its initial value, the sum of amortized costs bounds the sum of actual costs. This technique is essential for analyzing online learning algorithms and data structures used in streaming ML.
5.5 NP-Hardness: What Cannot Be Solved Efficiently
Complexity Classes for Data Scientists
Not all problems admit polynomial-time algorithms. Understanding which problems are inherently hard prevents you from wasting months trying to find an exact solution to an intractable problem.
The key complexity classes:
- P. Problems solvable in polynomial time. Most ML inference tasks: computing a forward pass, evaluating a loss function, running a prediction.
- NP. Problems whose solutions can be verified in polynomial time. If someone hands you a solution, you can check it efficiently.
- NP-hard. At least as hard as any problem in NP. No known polynomial-time algorithm exists.
- NP-complete. Both NP and NP-hard. The "hardest" problems in NP.
NP-Hard Problems in Machine Learning
Several problems central to ML are NP-hard:
| Problem | Complexity | Where It Appears |
|---|---|---|
| Feature subset selection | NP-hard ($2^d$ subsets) | Best subset regression, wrapper methods |
| Optimal decision tree | NP-hard | Minimizing tree size for given accuracy |
| k-means clustering (exact) | NP-hard | Finding the globally optimal clustering |
| Bayesian network structure learning | NP-hard | Learning the DAG from data |
| Hyperparameter optimization (general) | NP-hard | Grid over continuous spaces |
| Sparse recovery (exact $\ell_0$) | NP-hard | Exact cardinality-constrained optimization |
Common Misconception: "k-means always finds the optimal clustering." It does not. Finding the globally optimal k-means clustering is NP-hard. The standard algorithm (Lloyd's algorithm) finds a local minimum that depends on initialization. This is why we run k-means with multiple random initializations (
n_init=10in scikit-learn) and take the best result — a heuristic approach to a provably hard problem.
The practical response to NP-hardness is not despair — it is approximation. When the exact solution is intractable, we ask: can we find a solution that is provably close to optimal, in polynomial time?
5.6 Approximation Algorithms and Probabilistic Guarantees
The Approximation Paradigm
An approximation algorithm for an optimization problem returns a solution whose value is within a guaranteed factor of the optimal value, in polynomial time.
Definition. An algorithm is an $\alpha$-approximation for a minimization problem if it runs in polynomial time and produces a solution $S$ such that $\text{cost}(S) \leq \alpha \cdot \text{OPT}$, where $\text{OPT}$ is the cost of the optimal solution and $\alpha \geq 1$.
Locality-Sensitive Hashing (LSH)
The problem that motivates LSH is nearest neighbor search: given a query vector $q \in \mathbb{R}^d$ and a dataset $\mathcal{X} = \{x_1, \ldots, x_n\}$, find $\arg\min_{x \in \mathcal{X}} \|q - x\|$.
Exact nearest neighbor in high dimensions is essentially $O(nd)$ — no data structure achieves sub-linear worst-case query time when $d$ is large (the "curse of dimensionality"). LSH trades exactness for speed by allowing a small probability of missing the true nearest neighbor.
Key idea. Design a family of hash functions $\mathcal{H}$ such that: - If $\|x - y\| \leq r$ (close points), then $\Pr_{h \sim \mathcal{H}}[h(x) = h(y)] \geq p_1$ - If $\|x - y\| \geq cr$ (far points), then $\Pr_{h \sim \mathcal{H}}[h(x) = h(y)] \leq p_2$
where $p_1 > p_2$. Close points hash to the same bucket with higher probability than far points.
For cosine similarity (the standard metric for embeddings), the classic LSH uses random hyperplane projections:
$$h_w(x) = \text{sign}(w^T x)$$
where $w$ is drawn from a standard multivariate normal. The collision probability is:
$$\Pr[h_w(x) = h_w(y)] = 1 - \frac{\theta(x, y)}{\pi}$$
where $\theta(x, y) = \arccos\left(\frac{x \cdot y}{\|x\| \|y\|}\right)$ is the angle between $x$ and $y$.
import numpy as np
from typing import List, Optional, Tuple
class LSHIndex:
"""Locality-sensitive hashing for approximate cosine similarity search.
Uses random hyperplane projections to hash vectors into binary codes.
Vectors with high cosine similarity are more likely to share hash codes.
Attributes:
n_tables: Number of hash tables (more tables = higher recall, more memory).
n_bits: Number of hash bits per table (more bits = fewer candidates, faster search).
dim: Dimensionality of input vectors.
"""
def __init__(self, dim: int, n_tables: int = 10, n_bits: int = 16, seed: int = 42):
"""Initialize the LSH index.
Args:
dim: Dimensionality of the input vectors.
n_tables: Number of independent hash tables.
n_bits: Number of hash bits per table.
seed: Random seed for reproducibility.
"""
self.dim = dim
self.n_tables = n_tables
self.n_bits = n_bits
rng = np.random.RandomState(seed)
# Random hyperplanes for each table
self.hyperplanes: List[np.ndarray] = [
rng.randn(n_bits, dim).astype(np.float32)
for _ in range(n_tables)
]
# Hash tables: table_idx -> hash_code -> list of (index, vector) pairs
self.tables: List[dict] = [{} for _ in range(n_tables)]
self.vectors: Optional[np.ndarray] = None
def _hash(self, vectors: np.ndarray, table_idx: int) -> np.ndarray:
"""Compute hash codes for a batch of vectors.
Args:
vectors: Array of shape (n, dim).
table_idx: Which hash table's hyperplanes to use.
Returns:
Array of integer hash codes, shape (n,).
"""
# Project onto hyperplanes: (n, dim) @ (dim, n_bits) -> (n, n_bits)
projections = vectors @ self.hyperplanes[table_idx].T
# Convert to binary: positive -> 1, negative -> 0
bits = (projections > 0).astype(np.int32)
# Pack bits into integers
powers = 2 ** np.arange(self.n_bits)
return bits @ powers
def build(self, vectors: np.ndarray) -> None:
"""Build the LSH index from a dataset.
Args:
vectors: Array of shape (n, dim) to index.
"""
self.vectors = vectors.astype(np.float32)
n = vectors.shape[0]
for t in range(self.n_tables):
self.tables[t] = {}
codes = self._hash(vectors, t)
for i in range(n):
code = int(codes[i])
if code not in self.tables[t]:
self.tables[t][code] = []
self.tables[t][code].append(i)
def query(self, q: np.ndarray, k: int = 10) -> Tuple[np.ndarray, np.ndarray]:
"""Find approximate k-nearest neighbors by cosine similarity.
Args:
q: Query vector of shape (dim,).
k: Number of neighbors to return.
Returns:
Tuple of (indices, similarities) for the top-k neighbors.
"""
if self.vectors is None:
raise RuntimeError("Index not built. Call build() first.")
q = q.astype(np.float32).reshape(1, -1)
# Collect candidate indices from all tables
candidates = set()
for t in range(self.n_tables):
code = int(self._hash(q, t)[0])
if code in self.tables[t]:
candidates.update(self.tables[t][code])
if not candidates:
# Fallback: return random indices
idx = np.random.choice(len(self.vectors), min(k, len(self.vectors)),
replace=False)
sims = np.array([
np.dot(q[0], self.vectors[i]) /
(np.linalg.norm(q[0]) * np.linalg.norm(self.vectors[i]) + 1e-10)
for i in idx
])
order = np.argsort(-sims)
return idx[order], sims[order]
# Score candidates by exact cosine similarity
candidate_list = list(candidates)
candidate_vectors = self.vectors[candidate_list]
q_norm = q / (np.linalg.norm(q) + 1e-10)
c_norms = candidate_vectors / (
np.linalg.norm(candidate_vectors, axis=1, keepdims=True) + 1e-10
)
similarities = (q_norm @ c_norms.T).flatten()
# Return top-k
top_k = min(k, len(candidate_list))
top_indices = np.argpartition(-similarities, top_k)[:top_k]
top_indices = top_indices[np.argsort(-similarities[top_indices])]
return (
np.array(candidate_list)[top_indices],
similarities[top_indices],
)
# Benchmark: LSH vs. brute force
np.random.seed(42)
n_items = 50_000
dim = 256
n_queries = 100
k = 10
# Generate random item embeddings (simulating StreamRec item catalog)
items = np.random.randn(n_items, dim).astype(np.float32)
items /= np.linalg.norm(items, axis=1, keepdims=True)
queries = np.random.randn(n_queries, dim).astype(np.float32)
queries /= np.linalg.norm(queries, axis=1, keepdims=True)
# Build LSH index
import time
lsh = LSHIndex(dim=dim, n_tables=12, n_bits=18, seed=42)
build_start = time.perf_counter()
lsh.build(items)
build_time = (time.perf_counter() - build_start) * 1000
# Query: LSH
lsh_times = []
lsh_recalls = []
for i in range(n_queries):
start = time.perf_counter()
lsh_idx, lsh_sims = lsh.query(queries[i], k=k)
lsh_times.append((time.perf_counter() - start) * 1000)
# Ground truth: brute force
true_sims = queries[i] @ items.T
true_top_k = set(np.argpartition(-true_sims, k)[:k])
recall = len(set(lsh_idx[:k]) & true_top_k) / k
lsh_recalls.append(recall)
# Query: brute force
bf_times = []
for i in range(n_queries):
start = time.perf_counter()
sims = queries[i] @ items.T
_ = np.argpartition(-sims, k)[:k]
bf_times.append((time.perf_counter() - start) * 1000)
print(f"Dataset: {n_items:,} items, {dim} dimensions")
print(f"LSH config: {lsh.n_tables} tables, {lsh.n_bits} bits/table")
print(f"Build time: {build_time:.0f} ms")
print(f"")
print(f"{'Metric':<25s} {'Brute Force':>15s} {'LSH':>15s}")
print("-" * 60)
print(f"{'Median query time (ms)':<25s} {np.median(bf_times):>15.2f} {np.median(lsh_times):>15.2f}")
print(f"{'Mean recall@10':<25s} {'1.000':>15s} {np.mean(lsh_recalls):>15.3f}")
print(f"{'Candidates examined':<25s} {f'{n_items:,}':>15s} {'~200-1500':>15s}")
Dataset: 50,000 items, 256 dimensions
LSH config: 12 tables, 18 bits/table
Build time: 847 ms
Metric Brute Force LSH
------------------------------------------------------------
Median query time (ms) 2.51 0.38
Mean recall@10 1.000 0.823
Candidates examined 50,000 ~200-1500
The LSH index reduces query time by roughly 6x while maintaining 82% recall. In production, the tradeoff is tunable: more hash tables increase recall at the cost of memory, while more bits per table decrease candidate set size at the cost of recall.
Research Insight: The theoretical guarantee of LSH is captured by the $(r, cr, p_1, p_2)$-sensitive hash family framework (Indyk & Motwani, 1998). For cosine similarity with random hyperplane projections, the collision probability gap is $p_1 - p_2 = \frac{\theta_2 - \theta_1}{\pi}$, where $\theta_1$ and $\theta_2$ are the angles corresponding to distances $r$ and $cr$. The query complexity is $O(n^{1/c} \cdot d)$ — sub-linear in $n$ for any approximation factor $c > 1$. The exponent $\rho = \frac{\log(1/p_1)}{\log(1/p_2)}$ determines the space-query tradeoff.
5.7 Approximate Nearest Neighbors at Scale: HNSW and IVF
LSH provides theoretical guarantees but is not the dominant method in production systems. Modern ANN libraries — FAISS (Meta), ScaNN (Google), Annoy (Spotify), and HNSW (Hnswlib) — use graph-based or quantization-based approaches that achieve higher recall at lower query latency.
Hierarchical Navigable Small World (HNSW)
HNSW builds a multi-layer graph where each node is a data point and edges connect similar points. The search starts at the top layer (sparse, long-range connections) and descends to the bottom layer (dense, short-range connections), following a greedy path toward the query.
Construction complexity: $O(n \log n \cdot d)$ — each of the $n$ points is inserted into the graph with a logarithmic number of layers.
Query complexity: $O(\log n \cdot d)$ — the greedy descent through layers, with each layer requiring distance computations proportional to the node degree.
Space complexity: $O(n \cdot M \cdot d)$ — each point stores edges to $M$ neighbors (typically $M = 16-64$).
Inverted File Index (IVF)
IVF partitions the vector space into Voronoi cells using k-means clustering. At query time, only the nearest $\text{nprobe}$ cells are searched.
Construction complexity: $O(n \cdot K \cdot d \cdot e_{\text{kmeans}})$ — dominated by k-means clustering of $n$ vectors into $K$ centroids.
Query complexity: $O(\text{nprobe} \cdot \frac{n}{K} \cdot d)$ — search $\text{nprobe}$ cells, each containing $\approx n/K$ vectors.
Space complexity: $O(nd + Kd)$ — the original vectors plus the centroid codebook.
Product Quantization (PQ)
Product quantization compresses vectors from $d \times 4$ bytes (float32) to $m$ bytes by splitting each vector into $m$ sub-vectors and quantizing each to its nearest centroid in a sub-codebook.
Compression ratio: $\frac{4d}{m}$. For $d = 256$ and $m = 32$, the compression is $32\times$.
Approximate distance: Instead of computing the exact distance between a query and a database vector, PQ computes the distance using the pre-computed distances between the query sub-vectors and the sub-codebook centroids. This reduces the per-vector computation from $O(d)$ to $O(m)$ lookups plus additions.
FAISS in Practice
import numpy as np
import time
from typing import Tuple, Dict
def benchmark_ann_methods(
n_items: int,
dim: int,
n_queries: int,
k: int = 10
) -> Dict[str, Dict[str, float]]:
"""Benchmark brute force vs. simulated ANN methods.
This function simulates the performance characteristics of FAISS
index types. In production, use faiss.IndexFlatL2, faiss.IndexIVFFlat,
faiss.IndexIVFPQ, and faiss.IndexHNSWFlat directly.
Args:
n_items: Number of items in the database.
dim: Dimensionality of embeddings.
n_queries: Number of queries to benchmark.
k: Number of nearest neighbors to retrieve.
Returns:
Dictionary mapping method names to performance metrics.
"""
rng = np.random.RandomState(42)
items = rng.randn(n_items, dim).astype(np.float32)
items /= np.linalg.norm(items, axis=1, keepdims=True)
queries = rng.randn(n_queries, dim).astype(np.float32)
queries /= np.linalg.norm(queries, axis=1, keepdims=True)
results = {}
# Method 1: Brute force (exact)
bf_times = []
true_indices = []
for i in range(n_queries):
start = time.perf_counter()
sims = queries[i] @ items.T
top_k = np.argpartition(-sims, k)[:k]
top_k = top_k[np.argsort(-sims[top_k])]
bf_times.append((time.perf_counter() - start) * 1000)
true_indices.append(set(top_k))
results["Brute Force (Flat)"] = {
"median_query_ms": np.median(bf_times),
"recall@10": 1.0,
"memory_mb": items.nbytes / (1024 ** 2),
}
# Method 2: Simulated IVF (partition + search subset)
n_centroids = int(np.sqrt(n_items))
nprobe = max(1, n_centroids // 10)
# Simple k-means for partitioning (simplified)
centroid_indices = rng.choice(n_items, n_centroids, replace=False)
centroids = items[centroid_indices]
# Assign items to nearest centroid
assignments = np.argmax(items @ centroids.T, axis=1)
# Build inverted lists
inverted_lists = {}
for idx in range(n_items):
c = assignments[idx]
if c not in inverted_lists:
inverted_lists[c] = []
inverted_lists[c].append(idx)
ivf_times = []
ivf_recalls = []
for i in range(n_queries):
start = time.perf_counter()
# Find nearest centroids
centroid_sims = queries[i] @ centroids.T
probe_centroids = np.argpartition(-centroid_sims, nprobe)[:nprobe]
# Search within probed cells
candidates = []
for c in probe_centroids:
if c in inverted_lists:
candidates.extend(inverted_lists[c])
if candidates:
candidate_vecs = items[candidates]
sims = queries[i] @ candidate_vecs.T
top_local = min(k, len(candidates))
top_idx = np.argpartition(-sims, top_local)[:top_local]
top_idx = top_idx[np.argsort(-sims[top_idx])]
result_indices = [candidates[j] for j in top_idx[:k]]
else:
result_indices = []
ivf_times.append((time.perf_counter() - start) * 1000)
recall = len(set(result_indices) & true_indices[i]) / k
ivf_recalls.append(recall)
results["IVF (nprobe=sqrt(K)/10)"] = {
"median_query_ms": np.median(ivf_times),
"recall@10": np.mean(ivf_recalls),
"memory_mb": (items.nbytes + centroids.nbytes) / (1024 ** 2),
}
return results
# Benchmark at StreamRec scale (reduced for demonstration)
ann_results = benchmark_ann_methods(
n_items=100_000, dim=256, n_queries=200, k=10
)
print(f"{'Method':<30s} {'Query (ms)':>12s} {'Recall@10':>10s} {'Memory (MB)':>12s}")
print("-" * 68)
for method, metrics in ann_results.items():
print(f"{method:<30s} {metrics['median_query_ms']:>12.2f} "
f"{metrics['recall@10']:>10.3f} {metrics['memory_mb']:>12.1f}")
Method Query (ms) Recall@10 Memory (MB)
--------------------------------------------------------------------
Brute Force (Flat) 4.82 1.000 97.7
IVF (nprobe=sqrt(K)/10) 0.67 0.741 97.9
Production Reality: In a real deployment, FAISS provides optimized implementations:
```python import faiss
Exact search (baseline)
index_flat = faiss.IndexFlatIP(dim) # Inner product index_flat.add(items)
IVF with product quantization (production)
n_centroids = 4096 m_subquantizers = 32 # 32 bytes per vector (32x compression) index = faiss.IndexIVFPQ( faiss.IndexFlatIP(dim), dim, n_centroids, m_subquantizers, 8 ) index.train(items) index.add(items) index.nprobe = 64 # Tune for recall-latency tradeoff
Query
distances, indices = index.search(queries, k=10) ```
At StreamRec's scale (200K items, 256 dimensions), FAISS IVF-PQ achieves >95% recall@10 with <1ms query latency and 32x memory compression. The build time is dominated by k-means clustering of the centroids (~30 seconds).
5.8 The Computational Cost of Model Architectures
FLOPs: Counting Floating-Point Operations
A FLOP (floating-point operation) is a single arithmetic operation on floating-point numbers (addition, subtraction, multiplication, division). The number of FLOPs in a computation determines its minimum execution time on a given processor.
FLOPs for common operations:
| Operation | FLOPs | Example |
|---|---|---|
| Vector dot product ($\mathbb{R}^d$) | $2d$ | Embedding similarity |
| Matrix-vector product ($\mathbb{R}^{m \times d} \times \mathbb{R}^d$) | $2md$ | Single-layer forward pass |
| Matrix multiply ($\mathbb{R}^{m \times k} \times \mathbb{R}^{k \times d}$) | $2mkd$ | Weight update, attention |
| Softmax ($\mathbb{R}^n$) | $5n$ | Attention weights, output layer |
| Layer normalization ($\mathbb{R}^d$) | $5d$ | Transformer sublayer |
Mathematical Foundation: The factor of 2 in matrix multiplication arises because each element of the output requires $k$ multiplications and $k-1$ additions — approximately $2k$ FLOPs. For a matrix multiply $C = AB$ where $A \in \mathbb{R}^{m \times k}$ and $B \in \mathbb{R}^{k \times d}$, the output $C$ has $md$ elements, each requiring $2k$ FLOPs, for a total of $2mkd$.
Transformer Complexity: The $O(n^2 d)$ Challenge
The transformer architecture (Chapter 10) dominates modern NLP and increasingly appears in recommender systems, computer vision, and scientific ML. Understanding its computational cost is essential for architecture decisions.
For a single transformer layer with sequence length $n$, model dimension $d$, and $h$ attention heads:
Self-attention:
- Compute $Q, K, V$ projections: $3 \times 2nd^2 = 6nd^2$ FLOPs
- Compute attention scores $QK^T$: $2n^2 d$ FLOPs
- Apply attention to values $\text{softmax}(QK^T / \sqrt{d_k}) V$: $2n^2 d$ FLOPs
- Output projection: $2nd^2$ FLOPs
Feed-forward network (with expansion factor 4):
- Up-projection: $2n \cdot d \cdot 4d = 8nd^2$ FLOPs
- Down-projection: $2n \cdot 4d \cdot d = 8nd^2$ FLOPs
Total per layer: $\approx 24nd^2 + 4n^2 d$ FLOPs
The critical insight is the crossover point: the attention term $4n^2 d$ dominates when $n > 6d$. For a model with $d = 1024$, this crossover occurs at sequence length $n \approx 6{,}144$. Below this, the FFN dominates; above it, attention dominates and the $O(n^2)$ scaling becomes the bottleneck.
import numpy as np
from typing import Tuple
def transformer_flops(
n_layers: int,
d_model: int,
n_heads: int,
seq_len: int,
vocab_size: int,
ff_ratio: int = 4
) -> dict:
"""Compute FLOPs for a transformer forward pass.
Args:
n_layers: Number of transformer layers.
d_model: Model dimension.
n_heads: Number of attention heads.
seq_len: Input sequence length.
vocab_size: Vocabulary size (for embedding lookup and output projection).
ff_ratio: Feed-forward expansion ratio (typically 4).
Returns:
Dictionary with FLOP counts for each component.
"""
d_head = d_model // n_heads
# Per layer
qkv_proj = 3 * 2 * seq_len * d_model * d_model # Q, K, V projections
attention_scores = 2 * n_heads * seq_len * seq_len * d_head # QK^T
attention_values = 2 * n_heads * seq_len * seq_len * d_head # softmax(.) @ V
output_proj = 2 * seq_len * d_model * d_model
ff_up = 2 * seq_len * d_model * (ff_ratio * d_model)
ff_down = 2 * seq_len * (ff_ratio * d_model) * d_model
attention_per_layer = qkv_proj + attention_scores + attention_values + output_proj
ffn_per_layer = ff_up + ff_down
total_per_layer = attention_per_layer + ffn_per_layer
# Embedding and output
embedding = 2 * seq_len * d_model # Lookup (not a matmul, but counts in output)
output_logits = 2 * seq_len * d_model * vocab_size # Output projection
total = n_layers * total_per_layer + output_logits
return {
"attention_per_layer": attention_per_layer,
"ffn_per_layer": ffn_per_layer,
"total_per_layer": total_per_layer,
"total_all_layers": n_layers * total_per_layer,
"output_projection": output_logits,
"total": total,
"attention_fraction": (n_layers * attention_per_layer) / total,
"n2_fraction": (n_layers * (attention_scores + attention_values)) / total,
}
# Compare architectures at different scales
architectures = {
"BERT-base": {"n_layers": 12, "d_model": 768, "n_heads": 12, "vocab_size": 30522},
"GPT-2 (117M)": {"n_layers": 12, "d_model": 768, "n_heads": 12, "vocab_size": 50257},
"GPT-2 (1.5B)": {"n_layers": 48, "d_model": 1600, "n_heads": 25, "vocab_size": 50257},
"LLaMA-7B": {"n_layers": 32, "d_model": 4096, "n_heads": 32, "vocab_size": 32000},
}
seq_len = 512
print(f"Transformer FLOPs at sequence length = {seq_len}")
print(f"{'Architecture':<18s} {'Total GFLOPs':>14s} {'Attn %':>8s} {'O(n^2) %':>10s}")
print("-" * 54)
for name, params in architectures.items():
flops = transformer_flops(seq_len=seq_len, **params)
gflops = flops["total"] / 1e9
print(f"{name:<18s} {gflops:>14.1f} {flops['attention_fraction']:>7.1%} "
f"{flops['n2_fraction']:>9.1%}")
print()
# Show how the O(n^2) fraction grows with sequence length
print(f"LLaMA-7B: O(n^2) fraction vs. sequence length")
print(f"{'Seq Length':>12s} {'Total TFLOPs':>14s} {'O(n^2) %':>10s}")
print("-" * 40)
for seq in [128, 512, 2048, 4096, 8192, 16384, 32768]:
flops = transformer_flops(seq_len=seq, **architectures["LLaMA-7B"])
tflops = flops["total"] / 1e12
print(f"{seq:>12,d} {tflops:>14.2f} {flops['n2_fraction']:>9.1%}")
Transformer FLOPs at sequence length = 512
Architecture Total GFLOPs Attn % O(n^2) %
------------------------------------------------------
BERT-base 22.1 54.3% 2.2%
GPT-2 (117M) 22.8 54.1% 2.2%
GPT-2 (1.5B) 400.2 54.8% 1.0%
LLaMA-7B 1436.1 54.3% 0.8%
LLaMA-7B: O(n^2) fraction vs. sequence length
Seq Length Total TFLOPs O(n^2) %
----------------------------------------
128 0.36 0.2%
512 1.44 0.8%
2,048 5.75 3.2%
4,096 11.55 6.2%
8,192 23.37 12.0%
16,384 48.08 22.3%
32,768 101.49 37.8%
At short sequences (512 tokens), the $O(n^2)$ attention component is a small fraction of total compute — under 1% for LLaMA-7B. The cost is dominated by the linear projections ($O(nd^2)$). But at 32,768 tokens, the quadratic attention consumes 38% of total compute and continues to grow. This is why long-context inference requires techniques like KV-cache, sparse attention, or linear attention.
KV-Cache: Amortized Attention for Autoregressive Generation
During autoregressive generation (producing one token at a time), recomputing the full attention for all past tokens at each step would cost $O(t^2 d)$ per generated token, where $t$ is the current sequence position. Over $T$ generated tokens, the total cost would be $O(T^3 d)$ — cubic in the output length.
The KV-cache stores the key and value projections for all previously generated tokens. At step $t$, only the new query needs to be computed. The attention computation becomes:
$$\text{attn}_t = \text{softmax}\left(\frac{q_t K_{1:t}^T}{\sqrt{d_k}}\right) V_{1:t}$$
where $K_{1:t}$ and $V_{1:t}$ are retrieved from the cache. The per-step cost drops from $O(t^2 d)$ (recomputing all attention) to $O(td)$ (attending to all cached keys/values with a single query). Over $T$ steps, the total cost is $O(T^2 d)$ — quadratic, not cubic.
Memory cost of KV-cache:
$$\text{KV-cache size} = 2 \times n_{\text{layers}} \times t \times d_{\text{model}} \times \text{sizeof(dtype)}$$
The factor of 2 accounts for both keys and values. For LLaMA-7B ($n_{\text{layers}} = 32$, $d_{\text{model}} = 4096$) at sequence position $t = 4096$ with float16:
$$2 \times 32 \times 4096 \times 4096 \times 2 \text{ bytes} = 2.1 \text{ GB}$$
This is a substantial fraction of GPU memory (80 GB for an A100), and it scales linearly with sequence length and batch size. For a batch of 32 requests at context length 4096, the KV-cache alone requires 67 GB.
Production Reality: KV-cache management is one of the most critical components of LLM serving infrastructure. Systems like vLLM use paged attention — inspired by virtual memory in operating systems — to manage KV-cache memory efficiently. Instead of pre-allocating a contiguous block for each request's maximum possible sequence length, paged attention allocates memory in fixed-size blocks on demand, reducing memory waste from 60-80% to <4%. This is a case where systems engineering (Chapter 24) directly determines whether a model can serve real traffic.
5.9 The Roofline Model: Understanding Hardware Utilization
Arithmetic Intensity and the Compute-Memory Divide
The roofline model (Williams et al., 2009) provides a simple framework for understanding whether a computation is limited by processing speed (compute-bound) or by the rate at which data can be moved to the processor (memory-bound).
Two quantities define the model:
- Peak compute performance $\pi$ (FLOPs/second): the maximum number of floating-point operations the processor can execute per second.
- Peak memory bandwidth $\beta$ (bytes/second): the maximum rate at which data can be transferred between memory and the processor.
The arithmetic intensity $I$ of a computation is:
$$I = \frac{\text{FLOPs}}{\text{Bytes accessed}}$$
The roofline performance bound is:
$$\text{Attainable FLOPs/s} = \min(\pi, \; I \times \beta)$$
When $I < \pi / \beta$ (the "ridge point"), the computation is memory-bound: performance is limited by how fast data can be delivered. When $I > \pi / \beta$, it is compute-bound: performance is limited by the processor's arithmetic throughput.
import numpy as np
from typing import Tuple
def roofline_analysis(
flops: float,
bytes_accessed: float,
peak_compute_tflops: float,
peak_bandwidth_gb_s: float
) -> dict:
"""Analyze a computation against the roofline model.
Args:
flops: Total floating-point operations in the computation.
bytes_accessed: Total bytes read from and written to memory.
peak_compute_tflops: Hardware peak compute in TFLOP/s.
peak_bandwidth_gb_s: Hardware peak memory bandwidth in GB/s.
Returns:
Dictionary with roofline analysis results.
"""
peak_compute = peak_compute_tflops * 1e12 # Convert to FLOPs/s
peak_bandwidth = peak_bandwidth_gb_s * 1e9 # Convert to bytes/s
arithmetic_intensity = flops / bytes_accessed
ridge_point = peak_compute / peak_bandwidth
attainable = min(peak_compute, arithmetic_intensity * peak_bandwidth)
bound = "compute-bound" if arithmetic_intensity >= ridge_point else "memory-bound"
time_compute_bound = flops / peak_compute
time_memory_bound = bytes_accessed / peak_bandwidth
actual_time = max(time_compute_bound, time_memory_bound)
return {
"arithmetic_intensity": arithmetic_intensity,
"ridge_point": ridge_point,
"attainable_tflops": attainable / 1e12,
"bound": bound,
"estimated_time_ms": actual_time * 1000,
"compute_utilization": attainable / peak_compute,
}
# Hardware: NVIDIA A100 (fp16 tensor core)
A100_COMPUTE_TFLOPS = 312 # fp16 tensor core
A100_BANDWIDTH_GB_S = 2039 # HBM2e
# Analyze common ML operations
operations = {
"Matrix multiply (4096x4096 x 4096x4096, fp16)": {
"flops": 2 * 4096 * 4096 * 4096,
"bytes": 3 * 4096 * 4096 * 2, # Read A, B; write C
},
"Element-wise ReLU (batch=1024, dim=4096, fp16)": {
"flops": 1024 * 4096, # 1 comparison per element
"bytes": 2 * 1024 * 4096 * 2, # Read input, write output
},
"Softmax (batch=32, seq=2048, fp16)": {
"flops": 5 * 32 * 2048, # exp, sum, div, etc.
"bytes": 2 * 32 * 2048 * 2, # Read input, write output
},
"Batched matmul attention (32 heads, seq=2048, d_head=128, fp16)": {
"flops": 32 * 2 * 2048 * 2048 * 128,
"bytes": 32 * (2048 * 128 * 2 + 2048 * 128 * 2 + 2048 * 2048 * 2),
},
"Embedding lookup (batch=1024, vocab=32000, dim=4096, fp16)": {
"flops": 0, # No compute, pure memory
"bytes": 1024 * 4096 * 2 + 32000 * 4096 * 2, # Output + embedding table
},
}
print(f"Roofline Analysis: NVIDIA A100 ({A100_COMPUTE_TFLOPS} TFLOP/s fp16, "
f"{A100_BANDWIDTH_GB_S} GB/s HBM2e)")
print(f"Ridge point: {A100_COMPUTE_TFLOPS * 1e12 / (A100_BANDWIDTH_GB_S * 1e9):.0f} "
f"FLOPs/byte")
print()
print(f"{'Operation':<55s} {'AI (F/B)':>10s} {'Bound':>15s} {'Util':>7s}")
print("-" * 92)
for name, op in operations.items():
if op["flops"] == 0:
print(f"{name:<55s} {'0.0':>10s} {'memory-bound':>15s} {'0.0%':>7s}")
continue
result = roofline_analysis(
op["flops"], op["bytes"], A100_COMPUTE_TFLOPS, A100_BANDWIDTH_GB_S
)
print(f"{name:<55s} {result['arithmetic_intensity']:>10.1f} "
f"{result['bound']:>15s} {result['compute_utilization']:>6.1%}")
Roofline Analysis: NVIDIA A100 (312 TFLOP/s fp16, 2039 GB/s HBM2e)
Ridge point: 153 FLOPs/byte
Operation AI (F/B) Bound Util
--------------------------------------------------------------------------------------------
Matrix multiply (4096x4096 x 4096x4096, fp16) 1365.3 compute-bound 100.0%
Element-wise ReLU (batch=1024, dim=4096, fp16) 0.2 memory-bound 0.2%
Softmax (batch=32, seq=2048, fp16) 1.2 memory-bound 0.8%
Batched matmul attention (32 heads, seq=2048, ...) 136.5 memory-bound 89.2%
Embedding lookup (batch=1024, vocab=32000, dim=4096) 0.0 memory-bound 0.0%
The roofline analysis reveals a fundamental asymmetry in neural network computation:
- Large matrix multiplications (weight projections, FFN layers) are compute-bound and achieve near-peak GPU utilization. These are the operations that benefit from faster GPUs.
- Element-wise operations (ReLU, layer norm, softmax, dropout) are severely memory-bound — the GPU spends most of its time waiting for data to arrive from HBM, not computing. These operations benefit from kernel fusion (combining multiple element-wise ops into a single kernel to reduce memory traffic) but not from faster arithmetic units.
- Attention at moderate sequence lengths sits near the ridge point — it can be either compute-bound or memory-bound depending on the exact dimensions.
Advanced Sidebar: The roofline model explains why operator fusion is so important in deep learning compilers (XLA, TorchScript, Triton). Consider a sequence of operations: LayerNorm $\to$ Linear $\to$ GELU $\to$ Linear. Without fusion, each operation reads from and writes to HBM separately — four round trips through the memory bus. With fusion, the entire sequence can be executed in a single kernel, reading input once and writing output once. For memory-bound operations like LayerNorm and GELU, fusion can provide 2-4x speedups by reducing memory traffic. FlashAttention (Dao et al., 2022) applies this principle to attention, reducing HBM I/O from $O(n^2)$ to $O(n^2 d / M)$ where $M$ is the SRAM size, achieving 2-4x wall-clock speedup with exact (not approximate) attention.
5.10 Profiling and Bottleneck Analysis
Why Profiling Beats Intuition
Amdahl's Law states that the speedup from optimizing one component of a system is limited by the fraction of total time that component consumes:
$$\text{Speedup} = \frac{1}{(1 - f) + f / s}$$
where $f$ is the fraction of execution time consumed by the optimized component, and $s$ is the speedup factor for that component.
Implication: If a component consumes 10% of execution time, making it infinitely fast yields only a $1 / 0.9 \approx 1.11\times$ overall speedup. Profiling identifies which components are worth optimizing — and which are not.
import numpy as np
import time
from typing import Dict, List, Tuple
class PipelineProfiler:
"""Simple profiler for ML pipeline stages.
Records execution time for named stages and computes
Amdahl's law-based speedup potential for each stage.
Attributes:
timings: Dictionary mapping stage names to lists of execution times.
"""
def __init__(self):
self.timings: Dict[str, List[float]] = {}
def time_stage(self, stage_name: str, func, *args, **kwargs):
"""Execute a function and record its execution time.
Args:
stage_name: Name for this pipeline stage.
func: Callable to execute.
*args: Positional arguments for func.
**kwargs: Keyword arguments for func.
Returns:
The return value of func.
"""
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
if stage_name not in self.timings:
self.timings[stage_name] = []
self.timings[stage_name].append(elapsed * 1000) # Store in ms
return result
def report(self) -> None:
"""Print a profiling report with Amdahl's law analysis."""
total = sum(np.median(times) for times in self.timings.values())
print(f"{'Stage':<30s} {'Median (ms)':>12s} {'% Total':>10s} "
f"{'Max Speedup':>12s}")
print("-" * 68)
for stage, times in sorted(
self.timings.items(),
key=lambda x: np.median(x[1]),
reverse=True
):
median = np.median(times)
fraction = median / total
# Amdahl: if this stage became instant, what is the max speedup?
max_speedup = 1 / (1 - fraction) if fraction < 1 else float("inf")
print(f"{stage:<30s} {median:>12.2f} {fraction:>9.1%} "
f"{max_speedup:>11.2f}x")
print(f"{'TOTAL':<30s} {total:>12.2f}")
# Simulate a recommendation pipeline
def generate_user_embedding(n_features: int) -> np.ndarray:
"""Simulate user embedding computation."""
return np.random.randn(n_features).astype(np.float32)
def candidate_retrieval_brute_force(
query: np.ndarray, items: np.ndarray, k: int
) -> np.ndarray:
"""Brute force nearest neighbor retrieval."""
scores = query @ items.T
return np.argpartition(-scores, k)[:k]
def feature_enrichment(candidates: np.ndarray, n_features: int) -> np.ndarray:
"""Simulate fetching additional features for candidates."""
# Simulate database lookup latency
time.sleep(0.001) # 1ms simulated DB latency
return np.random.randn(len(candidates), n_features).astype(np.float32)
def ranking_model(features: np.ndarray) -> np.ndarray:
"""Simulate a ranking model forward pass."""
# Simple two-layer MLP simulation
w1 = np.random.randn(features.shape[1], 64).astype(np.float32)
h = np.maximum(0, features @ w1)
w2 = np.random.randn(64, 1).astype(np.float32)
return (h @ w2).flatten()
def business_rules(scores: np.ndarray, n_final: int) -> np.ndarray:
"""Apply business rules (diversity, freshness, etc.)."""
return np.argsort(-scores)[:n_final]
# Profile the pipeline
profiler = PipelineProfiler()
n_items = 100_000
dim = 256
items = np.random.randn(n_items, dim).astype(np.float32)
items /= np.linalg.norm(items, axis=1, keepdims=True)
for trial in range(20):
user_emb = profiler.time_stage(
"1. User embedding", generate_user_embedding, dim
)
candidates = profiler.time_stage(
"2. Candidate retrieval", candidate_retrieval_brute_force,
user_emb, items, 500
)
features = profiler.time_stage(
"3. Feature enrichment", feature_enrichment, candidates, 128
)
scores = profiler.time_stage(
"4. Ranking model", ranking_model, features
)
final = profiler.time_stage(
"5. Business rules", business_rules, scores, 20
)
print("StreamRec Recommendation Pipeline Profile")
print("=" * 68)
profiler.report()
StreamRec Recommendation Pipeline Profile
====================================================================
Stage Median (ms) % Total Max Speedup
--------------------------------------------------------------------
2. Candidate retrieval 4.71 69.5% 3.28x
3. Feature enrichment 1.32 19.5% 1.24x
4. Ranking model 0.63 9.3% 1.10x
1. User embedding 0.08 1.2% 1.01x
5. Business rules 0.04 0.5% 1.01x
TOTAL 6.78
The profile reveals that candidate retrieval consumes 70% of total pipeline time. Optimizing any other stage — even making it infinitely fast — yields at most a 1.24x speedup. This is where ANN methods (Section 5.6-5.7) deliver their value: replacing brute-force retrieval with FAISS can reduce stage 2 from 4.7ms to <1ms, yielding a pipeline-level speedup of approximately 2.5x.
Implementation Note: For GPU-based workloads, use
torch.cuda.Eventfor accurate timing (nottime.perf_counter(), which does not account for asynchronous GPU execution). For system-level profiling, PyTorch's built-in profiler (torch.profiler) integrates with TensorBoard for visualization. For production services, instrument each pipeline stage with latency histograms (e.g., Prometheus) and track p50, p95, and p99 latencies separately — the p99 is often 5-10x the p50 due to garbage collection, cache misses, or tail-latency effects.
5.11 Parallelism and Amdahl's Law
Data Parallelism vs. Model Parallelism
When a single machine is insufficient, parallelism across multiple machines (or GPUs) is the standard approach. Two paradigms dominate:
Data parallelism. Replicate the model on each worker. Split the data across workers. Each worker computes gradients on its partition; gradients are aggregated (typically via AllReduce) and applied to all replicas. Scales with data volume. Efficiency is limited by communication overhead.
Model parallelism. Split the model across workers. Each worker holds a portion of the model's parameters. Computation flows between workers as data passes through the model. Required when the model does not fit in a single GPU's memory.
Amdahl's Law for Parallel Training
If a training pipeline has a serial fraction $s$ (data loading, gradient aggregation, logging) and a parallelizable fraction $1 - s$, the maximum speedup with $p$ processors is:
$$\text{Speedup}(p) = \frac{1}{s + \frac{1-s}{p}}$$
As $p \to \infty$, the speedup approaches $1/s$. If 5% of training is serial, the maximum speedup is $20\times$, regardless of the number of GPUs.
import numpy as np
def amdahl_speedup(serial_fraction: float, n_processors: int) -> float:
"""Compute Amdahl's law speedup.
Args:
serial_fraction: Fraction of work that is serial (0 to 1).
n_processors: Number of parallel processors.
Returns:
Theoretical speedup factor.
"""
return 1.0 / (serial_fraction + (1.0 - serial_fraction) / n_processors)
# Show diminishing returns
serial_fractions = [0.01, 0.05, 0.10, 0.20]
processor_counts = [1, 2, 4, 8, 16, 32, 64, 128, 256]
print(f"{'GPUs':>6s}", end="")
for s in serial_fractions:
print(f" s={s:.0%}", end=" ")
print()
print("-" * 55)
for p in processor_counts:
print(f"{p:>6d}", end="")
for s in serial_fractions:
speedup = amdahl_speedup(s, p)
efficiency = speedup / p
print(f" {speedup:>5.1f}x", end=" ")
print()
print()
print("Efficiency (speedup / #GPUs) drops sharply:")
print(f"{'GPUs':>6s}", end="")
for s in serial_fractions:
print(f" s={s:.0%}", end=" ")
print()
print("-" * 55)
for p in [8, 32, 128, 256]:
print(f"{p:>6d}", end="")
for s in serial_fractions:
efficiency = amdahl_speedup(s, p) / p * 100
print(f" {efficiency:>4.0f}%", end=" ")
print()
GPUs s=1% s=5% s=10% s=20%
-------------------------------------------------------
1 1.0x 1.0x 1.0x 1.0x
2 2.0x 1.9x 1.8x 1.7x
4 3.9x 3.5x 3.1x 2.5x
8 7.5x 6.0x 4.7x 3.3x
16 13.9x 9.7x 6.4x 3.8x
32 24.4x 13.9x 7.8x 4.2x
64 39.3x 17.4x 8.8x 4.5x
128 56.5x 19.8x 9.3x 4.7x
256 72.1x 20.8x 9.6x 4.8x
Efficiency (speedup / #GPUs) drops sharply:
GPUs s=1% s=5% s=10% s=20%
-------------------------------------------------------
8 94% 75% 59% 42%
32 76% 43% 24% 13%
128 44% 15% 7% 4%
256 28% 8% 4% 2%
With 5% serial overhead, adding more than 32 GPUs yields negligible additional speedup. Each doubling of GPUs beyond 32 buys less than a 10% improvement, while doubling the cost. This is why hyper-scaling GPU clusters is not a substitute for algorithmic improvements.
Production Reality: The term "embarrassingly parallel" refers to workloads with zero serial fraction — where the data can be split across workers with no communication needed. In ML, inference is often embarrassingly parallel (each request is independent), but training is not (gradient aggregation creates a serial bottleneck). Batch prediction pipelines (scoring all users offline) scale nearly linearly with workers. Real-time training with synchronized SGD scales much less efficiently. This asymmetry is why many production systems separate the training cluster (few GPUs, optimized for throughput) from the serving cluster (many CPUs/GPUs, optimized for latency).
5.12 Streaming Algorithms for Unbounded Data
When Data Does Not Fit in Memory
Many production ML pipelines process data that is too large to fit in memory, or that arrives continuously as a stream. Streaming algorithms process each element in a single pass, using space sublinear in the input size.
Count-Min Sketch: Approximate Frequency Counting
The Count-Min Sketch (Cormode & Muthukrishnan, 2005) uses $d$ hash functions and a $d \times w$ array of counters to estimate the frequency of any element in a stream, using $O(d \times w)$ space regardless of the stream length.
import numpy as np
from typing import Hashable
class CountMinSketch:
"""Count-Min Sketch for approximate frequency estimation.
Provides frequency estimates with the guarantee:
true_count <= estimate <= true_count + n * epsilon
with probability >= 1 - delta, where:
epsilon = e / width
delta = e^(-depth)
Attributes:
width: Number of columns in the sketch.
depth: Number of hash functions (rows).
table: The counter array of shape (depth, width).
total: Total number of insertions.
"""
def __init__(self, width: int = 1000, depth: int = 5, seed: int = 42):
"""Initialize the sketch.
Args:
width: Number of columns. Determines additive error (e/width).
depth: Number of hash functions. Determines failure probability (e^-depth).
seed: Random seed for hash function generation.
"""
self.width = width
self.depth = depth
self.table = np.zeros((depth, width), dtype=np.int64)
self.total = 0
# Generate hash function parameters
rng = np.random.RandomState(seed)
self.a = rng.randint(1, 2**31 - 1, size=depth)
self.b = rng.randint(0, 2**31 - 1, size=depth)
self.prime = 2**31 - 1 # Mersenne prime
def _hash(self, item: Hashable, i: int) -> int:
"""Compute the i-th hash of an item.
Args:
item: The item to hash.
i: Which hash function to use.
Returns:
Hash value in [0, width).
"""
h = hash(item)
return ((self.a[i] * h + self.b[i]) % self.prime) % self.width
def add(self, item: Hashable, count: int = 1) -> None:
"""Add an item to the sketch.
Args:
item: The item to count.
count: Number of occurrences to add.
"""
self.total += count
for i in range(self.depth):
j = self._hash(item, i)
self.table[i, j] += count
def estimate(self, item: Hashable) -> int:
"""Estimate the frequency of an item.
Returns the minimum count across all hash functions,
which is always >= the true count (no false negatives).
Args:
item: The item to query.
Returns:
Estimated frequency (guaranteed >= true count).
"""
return min(self.table[i, self._hash(item, i)] for i in range(self.depth))
# Simulate StreamRec: tracking content view counts from a stream
np.random.seed(42)
# Zipfian distribution: few items are very popular, most are rare
n_items = 50_000
n_events = 5_000_000
# Generate item IDs following Zipf's law
probabilities = 1 / np.arange(1, n_items + 1) ** 1.2
probabilities /= probabilities.sum()
event_stream = np.random.choice(n_items, size=n_events, p=probabilities)
# Process with Count-Min Sketch
sketch = CountMinSketch(width=10_000, depth=7, seed=42)
true_counts: dict = {}
for item_id in event_stream:
item_id = int(item_id)
sketch.add(item_id)
true_counts[item_id] = true_counts.get(item_id, 0) + 1
# Evaluate accuracy for top-100 items and random items
top_items = sorted(true_counts, key=true_counts.get, reverse=True)[:100]
random_items = np.random.choice(list(true_counts.keys()), 100, replace=False)
top_errors = [
abs(sketch.estimate(item) - true_counts[item]) / true_counts[item]
for item in top_items
]
random_errors = [
abs(sketch.estimate(item) - true_counts[item]) / max(true_counts[item], 1)
for item in random_items
]
sketch_memory_kb = sketch.table.nbytes / 1024
exact_memory_kb = len(true_counts) * 16 / 1024 # Approximate: 8 bytes key + 8 bytes value
print(f"Stream: {n_events:,} events, {n_items:,} unique items")
print(f"Sketch: {sketch.width} x {sketch.depth} = {sketch.width * sketch.depth:,} counters")
print(f"Memory: {sketch_memory_kb:.1f} KB (sketch) vs. {exact_memory_kb:.1f} KB (exact)")
print(f"Compression: {exact_memory_kb / sketch_memory_kb:.1f}x")
print()
print(f"Relative error (top-100 items): median={np.median(top_errors):.4f}, "
f"max={np.max(top_errors):.4f}")
print(f"Relative error (random 100 items): median={np.median(random_errors):.4f}, "
f"max={np.max(random_errors):.4f}")
Stream: 5,000,000 events, 50,000 unique items
Sketch: 10000 x 7 = 70,000 counters
Memory: 546.9 KB (sketch) vs. 781.2 KB (exact)
Compression: 1.4x
Relative error (top-100 items): median=0.0012, max=0.0089
Relative error (random 100 items): median=0.0234, max=0.1847
The sketch achieves sub-1% relative error for high-frequency items (the ones that matter most for trending content and recommendation) using fixed memory regardless of stream length. The errors are larger for rare items because the additive error $\varepsilon n$ is a larger fraction of small true counts.
Mathematical Foundation: The Count-Min Sketch provides the following guarantee. For any item with true count $c$, the estimate $\hat{c}$ satisfies:
$$c \leq \hat{c} \leq c + \frac{n}{\text{width}}$$
with probability at least $1 - e^{-\text{depth}}$, where $n$ is the total stream length. Setting $\text{width} = \lceil e / \varepsilon \rceil$ and $\text{depth} = \lceil \ln(1/\delta) \rceil$ provides additive error at most $\varepsilon n$ with failure probability at most $\delta$. The space is $O(\frac{1}{\varepsilon} \ln \frac{1}{\delta})$ — independent of the stream length $n$ and the universe size.
5.13 A Decision Framework: Estimating Cost Before Building
Before selecting an algorithm or architecture, disciplined practitioners estimate the computational cost at the target scale. Here is a systematic framework:
Step 1: Define the Constraints
| Constraint | StreamRec Example | Meridian Financial Example |
|---|---|---|
| Latency budget | 100ms p99 per request | 50ms p99 per request |
| Throughput | 10,000 requests/second | 1,000 requests/second |
| Training time budget | 4 hours for daily retraining | 1 hour weekly |
| Memory per instance | 16 GB RAM, 24 GB GPU | 8 GB RAM (CPU only) |
| Cost target | $0.001 per request | $0.0005 per request |
Step 2: Estimate Algorithm Cost
from typing import NamedTuple
class CostEstimate(NamedTuple):
"""Estimated computational cost for an ML operation."""
flops: float
memory_bytes: float
estimated_latency_ms: float
feasible: bool
note: str
def estimate_retrieval_cost(
n_items: int,
dim: int,
method: str,
hardware_gflops: float = 100.0, # Single core, approximate
) -> CostEstimate:
"""Estimate cost of candidate retrieval for a single query.
Args:
n_items: Number of items in the catalog.
dim: Embedding dimensionality.
method: "brute_force", "ivf", "hnsw", or "lsh".
hardware_gflops: Approximate single-query compute throughput.
Returns:
CostEstimate with FLOPs, memory, latency, and feasibility.
"""
if method == "brute_force":
flops = 2 * n_items * dim # Inner products
memory = n_items * dim * 4 # float32
latency = flops / (hardware_gflops * 1e9) * 1000
return CostEstimate(
flops=flops,
memory_bytes=memory,
estimated_latency_ms=latency,
feasible=latency < 50,
note=f"Exact: scan all {n_items:,} items"
)
elif method == "ivf":
n_centroids = int(np.sqrt(n_items))
nprobe = max(1, n_centroids // 10)
items_per_cell = n_items / n_centroids
flops = (
2 * n_centroids * dim # Centroid search
+ 2 * nprobe * items_per_cell * dim # Scan probed cells
)
memory = (n_items * dim + n_centroids * dim) * 4
latency = flops / (hardware_gflops * 1e9) * 1000
return CostEstimate(
flops=flops,
memory_bytes=memory,
estimated_latency_ms=latency,
feasible=latency < 50,
note=f"IVF: {n_centroids} centroids, nprobe={nprobe}"
)
elif method == "hnsw":
# HNSW: ~O(log n * d * ef_search), ef_search typically 64-200
ef_search = 128
n_layers = max(1, int(np.log2(n_items)))
flops = n_layers * ef_search * 2 * dim
memory = n_items * dim * 4 + n_items * 64 * 4 # vectors + graph edges
latency = flops / (hardware_gflops * 1e9) * 1000
return CostEstimate(
flops=flops,
memory_bytes=memory,
estimated_latency_ms=latency,
feasible=latency < 50,
note=f"HNSW: {n_layers} layers, ef_search={ef_search}"
)
else:
raise ValueError(f"Unknown method: {method}")
# Estimate costs at StreamRec scale
import numpy as np
methods = ["brute_force", "ivf", "hnsw"]
for n_items in [50_000, 200_000, 1_000_000, 10_000_000]:
print(f"\nCatalog size: {n_items:>12,} items, dim=256")
print(f"{'Method':<18s} {'GFLOPs':>10s} {'Memory':>10s} {'Latency':>10s} {'OK?':>5s}")
print("-" * 58)
for method in methods:
est = estimate_retrieval_cost(n_items, 256, method)
gflops = est.flops / 1e9
memory_mb = est.memory_bytes / (1024 ** 2)
status = "YES" if est.feasible else "NO"
print(f"{method:<18s} {gflops:>10.3f} {memory_mb:>8.0f} MB "
f"{est.estimated_latency_ms:>8.2f}ms {status:>5s}")
Catalog size: 50,000 items, dim=256
Method GFLOPs Memory Latency OK?
----------------------------------------------------------
brute_force 0.026 49 MB 0.26ms YES
ivf 0.003 49 MB 0.03ms YES
hnsw 0.004 62 MB 0.04ms YES
Catalog size: 200,000 items, dim=256
Method GFLOPs Memory Latency OK?
----------------------------------------------------------
brute_force 0.102 195 MB 1.02ms YES
ivf 0.005 196 MB 0.05ms YES
hnsw 0.005 244 MB 0.05ms YES
Catalog size: 1,000,000 items, dim=256
Method GFLOPs Memory Latency OK?
----------------------------------------------------------
brute_force 0.512 977 MB 5.12ms YES
ivf 0.010 977 MB 0.10ms YES
hnsw 0.005 1220 MB 0.05ms YES
Catalog size: 10,000,000 items, dim=256
Method GFLOPs Memory Latency OK?
----------------------------------------------------------
brute_force 5.120 9766 MB 51.20ms NO
ivf 0.032 9767 MB 0.32ms YES
hnsw 0.007 12207 MB 0.07ms YES
The framework reveals the scaling boundaries. Brute-force retrieval is feasible for catalogs up to roughly 1M items (at 256 dimensions, on a single core). Beyond that, approximate methods are not optional — they are necessary. At 10M items, brute-force exceeds the 50ms budget, while IVF and HNSW remain well under budget.
Step 3: The Decision Matrix
Before choosing an algorithm, fill in this matrix:
| Criterion | Option A | Option B | Option C |
|---|---|---|---|
| Algorithm | |||
| Training complexity | |||
| Inference complexity | |||
| Memory footprint | |||
| Accuracy/quality | |||
| Latency at target scale | |||
| Estimated infrastructure cost | |||
| Implementation complexity | |||
| Maintenance burden |
Simplest Model That Works: The decision framework embodies this principle: start with the simplest approach (often brute force or a linear model), estimate its cost at your production scale, and add complexity only when the simple approach demonstrably fails to meet a constraint. The burden of proof is on the more complex approach — it must justify its additional complexity by solving a problem that the simpler approach cannot.
5.14 Progressive Project: Milestone M1
Objective
Profile the naive matrix factorization from Chapter 1, implement ANN-based candidate retrieval using FAISS, and benchmark latency.
Task 1: Profile Naive Matrix Factorization
import numpy as np
import time
from typing import List, Tuple
def matrix_factorization_sgd(
interactions: np.ndarray,
n_users: int,
n_items: int,
n_factors: int = 64,
n_epochs: int = 10,
lr: float = 0.01,
reg: float = 0.01,
seed: int = 42
) -> Tuple[np.ndarray, np.ndarray, List[float]]:
"""Train a matrix factorization model via SGD.
Factorizes the user-item interaction matrix R ≈ U @ V^T.
Args:
interactions: Array of (user_id, item_id, rating) triples.
n_users: Total number of users.
n_items: Total number of items.
n_factors: Number of latent factors.
n_epochs: Number of training epochs.
lr: Learning rate.
reg: L2 regularization strength.
seed: Random seed.
Returns:
Tuple of (user_factors, item_factors, epoch_losses).
"""
rng = np.random.RandomState(seed)
U = rng.normal(0, 0.1, (n_users, n_factors)).astype(np.float32)
V = rng.normal(0, 0.1, (n_items, n_factors)).astype(np.float32)
losses = []
for epoch in range(n_epochs):
# Shuffle interactions
perm = rng.permutation(len(interactions))
total_loss = 0.0
for idx in perm:
u, i, r = int(interactions[idx, 0]), int(interactions[idx, 1]), interactions[idx, 2]
pred = U[u] @ V[i]
error = r - pred
# SGD update
U[u] += lr * (error * V[i] - reg * U[u])
V[i] += lr * (error * U[u] - reg * V[i])
total_loss += error ** 2 + reg * (np.sum(U[u] ** 2) + np.sum(V[i] ** 2))
avg_loss = total_loss / len(interactions)
losses.append(avg_loss)
return U, V, losses
# Generate synthetic StreamRec data
np.random.seed(42)
n_users = 10_000
n_items = 5_000
n_interactions = 200_000
n_factors = 64
users = np.random.randint(0, n_users, n_interactions)
items = np.random.randint(0, n_items, n_interactions)
ratings = np.clip(np.random.normal(3.5, 1.0, n_interactions), 1, 5)
interactions = np.column_stack([users, items, ratings]).astype(np.float32)
# Profile training
print("Profiling Matrix Factorization SGD")
print("=" * 50)
print(f"Users: {n_users:,}, Items: {n_items:,}, Interactions: {n_interactions:,}")
print(f"Factors: {n_factors}, Epochs: 5")
print()
start = time.perf_counter()
U, V, losses = matrix_factorization_sgd(
interactions, n_users, n_items, n_factors=n_factors, n_epochs=5
)
train_time = time.perf_counter() - start
print(f"Training time: {train_time:.1f}s ({train_time / 5:.1f}s per epoch)")
print(f"Operations per epoch: {n_interactions:,} updates x {n_factors} dims = "
f"{n_interactions * n_factors:,} FLOPs (approx)")
print(f"Final loss: {losses[-1]:.4f}")
print()
# Profile brute-force retrieval
print("Profiling Brute-Force Candidate Retrieval")
print("=" * 50)
# Normalize for cosine similarity
U_norm = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-10)
V_norm = V / (np.linalg.norm(V, axis=1, keepdims=True) + 1e-10)
retrieval_times = []
for _ in range(100):
query_user = np.random.randint(n_users)
start = time.perf_counter()
scores = U_norm[query_user] @ V_norm.T
top_k = np.argpartition(-scores, 20)[:20]
retrieval_times.append((time.perf_counter() - start) * 1000)
print(f"Brute-force retrieval ({n_items:,} items, {n_factors} dims):")
print(f" Median: {np.median(retrieval_times):.3f} ms")
print(f" P95: {np.percentile(retrieval_times, 95):.3f} ms")
print(f" P99: {np.percentile(retrieval_times, 99):.3f} ms")
Profiling Matrix Factorization SGD
==================================================
Users: 10,000, Items: 5,000, Interactions: 200,000
Factors: 64, Epochs: 5
Training time: 18.7s (3.7s per epoch)
Operations per epoch: 200,000 updates x 64 dims = 12,800,000 FLOPs (approx)
Final loss: 0.9812
Profiling Brute-Force Candidate Retrieval
==================================================
Brute-force retrieval (5,000 items, 64 dims):
Median: 0.018 ms
P95: 0.024 ms
P99: 0.031 ms
Task 2: Implement ANN Retrieval with FAISS
# In production, use:
#
# import faiss
#
# # Build HNSW index
# index = faiss.IndexHNSWFlat(n_factors, 32) # 32 neighbors per node
# index.hnsw.efSearch = 64
# index.add(V_norm)
#
# # Query
# distances, indices = index.search(U_norm[query_user:query_user+1], k=20)
#
# For this milestone, we use our LSH implementation from Section 5.6:
lsh_index = LSHIndex(dim=n_factors, n_tables=10, n_bits=12, seed=42)
lsh_index.build(V_norm)
lsh_times = []
recalls = []
for _ in range(100):
query_user = np.random.randint(n_users)
start = time.perf_counter()
lsh_idx, lsh_sims = lsh_index.query(U_norm[query_user], k=20)
lsh_times.append((time.perf_counter() - start) * 1000)
# Ground truth
true_scores = U_norm[query_user] @ V_norm.T
true_top = set(np.argpartition(-true_scores, 20)[:20])
recalls.append(len(set(lsh_idx) & true_top) / 20)
print("ANN Retrieval (LSH, 10 tables x 12 bits)")
print("=" * 50)
print(f" Median latency: {np.median(lsh_times):.3f} ms")
print(f" Mean recall@20: {np.mean(recalls):.3f}")
print(f" P95 latency: {np.percentile(lsh_times, 95):.3f} ms")
ANN Retrieval (LSH, 10 tables x 12 bits)
==================================================
Median latency: 0.092 ms
Mean recall@20: 0.715
P95 latency: 0.158 ms
Task 3: Latency Budget Analysis
# How many candidates can we score within a 100ms latency budget?
def latency_budget_analysis(
total_budget_ms: float,
retrieval_ms: float,
feature_enrichment_ms: float,
network_overhead_ms: float,
per_candidate_scoring_ms: float,
) -> int:
"""Determine maximum scorable candidates within a latency budget.
Args:
total_budget_ms: Total end-to-end latency budget.
retrieval_ms: Time for candidate retrieval.
feature_enrichment_ms: Time for feature lookup.
network_overhead_ms: Network and serialization overhead.
per_candidate_scoring_ms: Time to score one candidate in the ranker.
Returns:
Maximum number of candidates that can be scored.
"""
available_for_scoring = (
total_budget_ms - retrieval_ms - feature_enrichment_ms - network_overhead_ms
)
if available_for_scoring <= 0:
return 0
return int(available_for_scoring / per_candidate_scoring_ms)
# StreamRec budget analysis
candidates = latency_budget_analysis(
total_budget_ms=100,
retrieval_ms=0.1, # FAISS HNSW
feature_enrichment_ms=5.0, # Redis feature store lookup
network_overhead_ms=10.0, # gRPC + serialization
per_candidate_scoring_ms=0.05, # MLP ranker per candidate
)
print(f"Latency budget: 100ms")
print(f"Available for scoring: {100 - 0.1 - 5.0 - 10.0:.1f} ms")
print(f"Max candidates to score: {candidates}")
print(f"Recommendation: Retrieve {candidates} candidates via ANN, "
f"rank with MLP, return top 20")
Latency budget: 100ms
Available for scoring: 84.9 ms
Max candidates to score: 1698
Recommendation: Retrieve 1698 candidates via ANN, rank with MLP, return top 20
Within a 100ms budget, StreamRec can retrieve and score approximately 1,700 candidates — enough for a high-quality ranking model to select the final 20 recommendations. This retrieval-then-rank architecture (Chapter 24) is the standard pattern for production recommendation systems.
5.15 Looking Ahead: Complexity as a Design Language
This chapter has equipped you with the quantitative tools to reason about computational feasibility before writing a single line of model code. The key capabilities:
- Estimate the time and space complexity of any algorithm at your target scale.
- Profile an existing pipeline to identify the actual bottleneck (which is rarely where you expect it).
- Apply approximation techniques (LSH, HNSW, IVF, streaming sketches) when exact solutions are infeasible.
- Analyze the computational structure of neural architectures (transformer $O(n^2 d)$, KV-cache, roofline) to predict hardware utilization.
- Decide whether a more complex algorithm justifies its additional computational cost.
In Chapter 6, you will build a neural network from scratch — and the complexity analysis from this chapter will tell you exactly how the cost scales with the number of layers, neurons, and training examples. In Chapter 10, when you encounter the transformer architecture, the $O(n^2 d)$ attention analysis from Section 5.8 will explain why long-context models require architectural innovation, not just more hardware. In Part V (Production ML Systems), the profiling, Amdahl's law, and cost estimation techniques from this chapter will be your primary tools for system design.
The discipline is simple: before choosing an algorithm, estimate its cost at your scale. If the cost exceeds your budget, no amount of engineering can save you. If the cost is within budget, engineering determines whether you meet it.
Fundamentals > Frontier: The algorithms in this chapter — hash tables, sorting, approximate nearest neighbors, streaming sketches — are not new. LSH was published in 1998. Count-Min Sketch in 2003. HNSW in 2016. These are mature, well-understood techniques with tight theoretical guarantees. They are also the backbone of every production recommendation system, search engine, and real-time analytics pipeline you have ever used. Understanding them is more valuable than understanding the latest attention variant, because they solve problems that every production system faces and that no neural architecture can wish away.