27 min read

> — Richard Hamming, Numerical Methods for Scientists and Engineers (1962)

Chapter 5: Computational Complexity and Scalability — Knowing What's Possible Before You Start Coding

"The purpose of computing is insight, not numbers." — Richard Hamming, Numerical Methods for Scientists and Engineers (1962)


Learning Objectives

By the end of this chapter, you will be able to:

  1. Analyze time and space complexity of common ML algorithms and data structures
  2. Identify computational bottlenecks in ML pipelines using profiling and benchmarking
  3. Apply algorithmic techniques (approximation, randomization, streaming) to make intractable problems tractable
  4. Evaluate the computational cost of training and inference for different model architectures
  5. Make informed architecture decisions based on computational constraints (latency budgets, memory limits, cost targets)

5.1 Why Computational Complexity Matters for Data Science

The most common failure mode of senior data scientists is not building a bad model — it is building a good model that is too slow, too expensive, or too large to deploy.

Consider two scenarios that motivate this chapter:

  • StreamRec (Content Platform Recommender). When a user opens the StreamRec app, the system has approximately 100 milliseconds to select 20 content recommendations from a catalog of 200,000 items. A brute-force approach — computing the similarity between the user's embedding and every item's embedding — requires 200,000 inner products. At 256-dimensional embeddings, that is $200{,}000 \times 256 = 51.2$ million floating-point operations per request. At 10,000 requests per second, the system would need $5.12 \times 10^{11}$ FLOPs/s just for candidate retrieval — before any ranking, filtering, or business logic. This is feasible on a GPU cluster, but the per-request latency of the brute-force scan exceeds the 100ms budget at the 99th percentile. Approximate nearest neighbor (ANN) methods reduce the problem from $O(nd)$ to $O(d \log n)$, making real-time serving possible on commodity hardware.

  • Meridian Financial (Credit Scoring). A credit application arrives via API. The regulatory and customer-experience requirement is a decision within 50 milliseconds. The production model is a gradient-boosted tree ensemble with 500 trees and 8 features. Inference is fast — but the team is considering replacing it with a transformer-based model that achieves 1.2% higher AUC on historical data. The transformer's inference latency is 340ms. Is 1.2% AUC improvement worth a 7x latency increase that violates the SLA? This chapter gives you the tools to answer that question before you build the model.

These scenarios illustrate a principle that separates production data science from academic data science: computational cost is a first-class design constraint, not an afterthought. The algorithm that is theoretically optimal may be practically infeasible. The model that achieves the best accuracy may be too slow, too memory-hungry, or too expensive to serve. Understanding computational complexity means understanding what is possible at your scale — before you start coding.

Production Reality: In a 2023 survey of ML platform teams at large technology companies, "latency budget exceeded" and "infrastructure cost too high" were the top two reasons for blocking a model from production deployment. Model accuracy was the least common blocking reason. The production ML pipeline is a system, and systems have resource constraints. This chapter equips you to reason about those constraints quantitatively.


5.2 Asymptotic Analysis: Big-O and Its Companions

The Language of Growth Rates

Big-O notation describes how an algorithm's resource usage (time or space) scales as the input size grows. You have almost certainly encountered it before. Here, we formalize it and connect it to ML-specific usage.

Definition. A function $f(n)$ is $O(g(n))$ if there exist constants $c > 0$ and $n_0 > 0$ such that $f(n) \leq c \cdot g(n)$ for all $n \geq n_0$.

Intuitively, $f(n) = O(g(n))$ means $f$ grows no faster than $g$, ignoring constant factors. The notation captures scaling behavior: how the cost changes when you double the dataset or triple the feature count.

The complete family of asymptotic notations:

Notation Meaning Analogy
$O(g(n))$ Upper bound ("grows no faster than $g$") $\leq$
$\Omega(g(n))$ Lower bound ("grows no slower than $g$") $\geq$
$\Theta(g(n))$ Tight bound ("grows at the same rate as $g$") $=$
$o(g(n))$ Strictly slower growth $<$
$\omega(g(n))$ Strictly faster growth $>$

In practice, ML papers and engineers almost exclusively use Big-O, and they use it loosely — often saying "$O(n^2)$" when they mean "$\Theta(n^2)$." We will follow this convention but note when the distinction matters.

Common Growth Rates in ML

import numpy as np
from typing import Dict, List

def compute_operation_counts(n_values: List[int]) -> Dict[str, np.ndarray]:
    """Compute theoretical operation counts for common complexity classes.

    Args:
        n_values: Input sizes to evaluate.

    Returns:
        Dictionary mapping complexity class names to operation count arrays.
    """
    n = np.array(n_values, dtype=np.float64)
    return {
        "O(1)":         np.ones_like(n),
        "O(log n)":     np.log2(n),
        "O(n)":         n,
        "O(n log n)":   n * np.log2(n),
        "O(n^2)":       n ** 2,
        "O(n^3)":       n ** 3,
        "O(2^n)":       2.0 ** np.minimum(n, 30),  # Cap to avoid overflow
    }

n_values = [10, 100, 1_000, 10_000, 100_000, 1_000_000]
counts = compute_operation_counts(n_values)

print(f"{'n':>10s}  {'O(log n)':>10s}  {'O(n)':>12s}  {'O(n log n)':>14s}  "
      f"{'O(n^2)':>14s}  {'O(n^3)':>16s}")
print("-" * 85)
for i, n in enumerate(n_values):
    print(f"{n:>10,d}  {counts['O(log n)'][i]:>10.1f}  {counts['O(n)'][i]:>12,.0f}  "
          f"{counts['O(n log n)'][i]:>14,.0f}  {counts['O(n^2)'][i]:>14,.0f}  "
          f"{counts['O(n^3)'][i]:>16,.0f}")
         n    O(log n)          O(n)    O(n log n)        O(n^2)          O(n^3)
-------------------------------------------------------------------------------------
        10         3.3            10            33           100           1,000
       100         6.6           100           664        10,000       1,000,000
     1,000        10.0         1,000         9,966     1,000,000   1,000,000,000
    10,000        13.3        10,000       132,877   100,000,000   ...overflow...
   100,000        16.6       100,000     1,660,964   ...         ...
 1,000,000        19.9     1,000,000    19,931,569   ...         ...

The table reveals the central lesson of complexity analysis: constant-factor improvements do not survive scaling. An $O(n^2)$ algorithm that is twice as fast as an $O(n \log n)$ algorithm at $n = 100$ will be 3,000 times slower at $n = 1{,}000{,}000$. No amount of hardware acceleration can overcome a fundamental complexity gap at scale.

Mathematical Foundation: The formal hierarchy of common growth rates is:

$$O(1) \subset O(\log n) \subset O(\sqrt{n}) \subset O(n) \subset O(n \log n) \subset O(n^{1+\epsilon}) \subset O(n^2) \subset O(n^3) \subset O(2^n) \subset O(n!)$$

For ML, the critical transitions are: $O(n)$ to $O(n^2)$ (the boundary between "scales to production" and "does not scale to production" for most datasets), and $O(n^2)$ to $O(n^3)$ (the boundary between "feasible on a single machine" and "requires distributed computing or algorithmic innovation").

Complexity of Common ML Algorithms

The following table summarizes the computational complexity of algorithms you will encounter throughout this book:

Algorithm Training Prediction Space Key Parameters
Linear regression (OLS) $O(nd^2 + d^3)$ $O(d)$ $O(nd)$ $n$: samples, $d$: features
Logistic regression (SGD) $O(nde)$ $O(d)$ $O(d)$ $e$: epochs
k-NN (brute force) $O(1)$ $O(nd)$ $O(nd)$
k-NN (KD-tree) $O(n d \log n)$ $O(d \log n)$ avg $O(nd)$ Degrades in high $d$
Random forest $O(nkd'\log n)$ $O(kd'\log n)$ $O(kn)$ $k$: trees, $d' = \sqrt{d}$
Gradient boosting $O(nkd\log n)$ $O(k \log n)$ $O(kn)$
k-means $O(nKde)$ $O(Kd)$ $O(nd + Kd)$ $K$: clusters, $e$: iterations
SVD (truncated, rank $r$) $O(ndr)$ $O(nr + dr)$ $r$: target rank
MLP forward pass $O(d_1 d_2 + d_2 d_3 + \cdots)$ Same $O(\sum d_i d_{i+1})$ $d_i$: layer widths
Transformer (single layer) $O(n^2 d)$ $O(n^2 d)$ $O(n^2 + nd)$ $n$: sequence length

Common Misconception: "k-NN has no training time, so it is efficient." This is misleading. k-NN defers computation to prediction time, where each query requires scanning the entire training set ($O(nd)$). For a system serving 10,000 predictions per second on $n = 10^6$ samples with $d = 256$, brute-force k-NN requires $2.56 \times 10^{12}$ operations per second — just for nearest neighbor lookups. The "no training" advantage is a mirage if the prediction workload is high.


5.3 Space Complexity and Memory Hierarchies

Why Memory Matters as Much as Compute

A common pitfall: analyzing only time complexity while ignoring space complexity. In production ML, memory constraints are frequently the binding bottleneck.

import numpy as np

def estimate_memory_bytes(
    n_samples: int,
    n_features: int,
    dtype_bytes: int = 4  # float32
) -> Dict[str, float]:
    """Estimate memory usage for common ML data structures.

    Args:
        n_samples: Number of data points.
        n_features: Number of features per data point.
        dtype_bytes: Bytes per element (4 for float32, 8 for float64).

    Returns:
        Dictionary mapping structure names to memory in gigabytes.
    """
    gb = 1024 ** 3
    dense_matrix = n_samples * n_features * dtype_bytes / gb

    # Pairwise distance matrix: n x n
    distance_matrix = n_samples * n_samples * dtype_bytes / gb

    # Gram (kernel) matrix: n x n
    kernel_matrix = distance_matrix

    # User-item matrix for collaborative filtering (sparse storage)
    sparsity = 0.01  # 1% density is typical
    sparse_elements = n_samples * n_features * sparsity
    # COO format: 3 arrays (row, col, val) x sparse_elements
    sparse_coo = sparse_elements * (4 + 4 + dtype_bytes) / gb

    return {
        "Dense feature matrix": dense_matrix,
        "Pairwise distance matrix": distance_matrix,
        "Kernel matrix": kernel_matrix,
        "Sparse matrix (1% density, COO)": sparse_coo,
    }

# StreamRec scale: 5M users, 200K items
streamrec_memory = estimate_memory_bytes(5_000_000, 200_000, dtype_bytes=4)
print("StreamRec User-Item Matrix Memory Estimates (float32)")
print("=" * 55)
for name, gb in streamrec_memory.items():
    if gb < 1:
        print(f"  {name}: {gb * 1024:.1f} MB")
    elif gb < 1024:
        print(f"  {name}: {gb:.1f} GB")
    else:
        print(f"  {name}: {gb / 1024:.1f} TB")

print()

# Credit scoring scale: 10M historical records, 200 features
credit_memory = estimate_memory_bytes(10_000_000, 200, dtype_bytes=4)
print("Meridian Financial Feature Matrix Memory Estimates (float32)")
print("=" * 55)
for name, gb in credit_memory.items():
    if gb < 1:
        print(f"  {name}: {gb * 1024:.1f} MB")
    elif gb < 1024:
        print(f"  {name}: {gb:.1f} GB")
    else:
        print(f"  {name}: {gb / 1024:.1f} TB")
StreamRec User-Item Matrix Memory Estimates (float32)
=======================================================
  Dense feature matrix: 3725.3 GB
  Pairwise distance matrix: 93132.3 TB
  Kernel matrix: 93132.3 TB
  Sparse matrix (1% density, COO): 111.8 GB

Meridian Financial Feature Matrix Memory Estimates (float32)
=======================================================
  Dense feature matrix: 7.5 GB
  Pairwise distance matrix: 372529.0 TB
  Kernel matrix: 372529.0 TB
  Sparse matrix (1% density, COO): 0.2 GB

The StreamRec user-item matrix, stored densely, requires 3.7 TB — impossible for a single machine. Even the sparse representation at 1% density needs 112 GB. The pairwise distance matrix is measured in petabytes. This is why the choice of data structure and algorithm is not a matter of elegance — it is a matter of feasibility.

The Memory Hierarchy

Modern hardware has a memory hierarchy with dramatically different access latencies:

Level Typical Size Latency Bandwidth
L1 cache 32–64 KB ~1 ns ~1 TB/s
L2 cache 256 KB–1 MB ~4 ns ~400 GB/s
L3 cache 8–64 MB ~12 ns ~200 GB/s
Main memory (RAM) 16–512 GB ~100 ns ~50 GB/s
GPU HBM 16–80 GB ~300 ns ~2 TB/s
SSD 256 GB–8 TB ~100 $\mu$s ~3 GB/s
Network (same datacenter) Unlimited ~500 $\mu$s ~12.5 GB/s

Implementation Note: The GPU HBM row is critical for deep learning. GPU memory bandwidth (~2 TB/s for an A100) is 40x higher than CPU main memory (~50 GB/s), but GPU memory capacity is limited (80 GB for an A100 vs. 512+ GB for a CPU server). This means GPU-accelerated workloads are often memory-capacity-bound, not compute-bound. When your model or batch does not fit in GPU memory, you face a cliff: performance drops by 10-100x as data must be shuffled between GPU and CPU memory.

Sparse Matrix Operations

When matrices have structure (sparsity, low rank, block structure), exploiting that structure can transform an infeasible computation into a trivial one.

import numpy as np
from scipy import sparse
from typing import Tuple
import time

def benchmark_sparse_vs_dense(
    n_rows: int,
    n_cols: int,
    density: float,
    n_trials: int = 5
) -> Tuple[float, float, float, float]:
    """Benchmark sparse vs. dense matrix-vector product.

    Args:
        n_rows: Number of rows.
        n_cols: Number of columns.
        density: Fraction of non-zero entries.
        n_trials: Number of trials for timing.

    Returns:
        Tuple of (dense_time_ms, sparse_time_ms, dense_memory_mb, sparse_memory_mb).
    """
    rng = np.random.RandomState(42)

    # Dense matrix
    dense_matrix = np.zeros((n_rows, n_cols), dtype=np.float32)
    nnz = int(n_rows * n_cols * density)
    row_idx = rng.randint(0, n_rows, nnz)
    col_idx = rng.randint(0, n_cols, nnz)
    values = rng.randn(nnz).astype(np.float32)
    dense_matrix[row_idx, col_idx] = values

    # Sparse CSR matrix
    sparse_matrix = sparse.csr_matrix(dense_matrix)

    # Random vector
    x = rng.randn(n_cols).astype(np.float32)

    # Benchmark dense
    dense_times = []
    for _ in range(n_trials):
        start = time.perf_counter()
        _ = dense_matrix @ x
        dense_times.append((time.perf_counter() - start) * 1000)

    # Benchmark sparse
    sparse_times = []
    for _ in range(n_trials):
        start = time.perf_counter()
        _ = sparse_matrix @ x
        sparse_times.append((time.perf_counter() - start) * 1000)

    dense_memory = dense_matrix.nbytes / (1024 ** 2)
    sparse_memory = (sparse_matrix.data.nbytes +
                     sparse_matrix.indices.nbytes +
                     sparse_matrix.indptr.nbytes) / (1024 ** 2)

    return (
        np.median(dense_times),
        np.median(sparse_times),
        dense_memory,
        sparse_memory,
    )

# StreamRec-like scenario: 100K users x 50K items, 0.1% density
n, m, density = 100_000, 50_000, 0.001
dense_t, sparse_t, dense_mem, sparse_mem = benchmark_sparse_vs_dense(n, m, density)

print(f"Matrix: {n:,} x {m:,}, density: {density}")
print(f"Dense:  {dense_t:>8.1f} ms, {dense_mem:>10.1f} MB")
print(f"Sparse: {sparse_t:>8.1f} ms, {sparse_mem:>10.1f} MB")
print(f"Speedup: {dense_t / sparse_t:>7.1f}x")
print(f"Memory savings: {dense_mem / sparse_mem:>7.1f}x")
Matrix: 100,000 x 50,000, density: 0.001
Dense:    142.3 ms,   18626.5 MB
Sparse:     1.4 ms,      71.5 MB
Speedup:  101.6x
Memory savings:  260.4x

At 0.1% density — typical for user-item interaction matrices — sparse representation yields a 100x speedup and a 260x memory reduction. The speedup arises because the sparse matrix-vector product performs only $O(\text{nnz})$ operations instead of $O(nm)$, where $\text{nnz} \ll nm$.

Fundamentals > Frontier: Sparse linear algebra is not exotic or cutting-edge. It is 1960s technology. But it is the single most impactful optimization for recommendation systems, graph algorithms, and NLP pipelines. Before reaching for a GPU, ask whether your matrix is sparse. If it is, sparse operations on a CPU may outperform dense operations on a GPU.


5.4 Amortized Analysis and Algorithm Design Patterns

Why Worst-Case Analysis Misleads

Standard Big-O analysis describes the worst case for a single operation. But ML pipelines execute sequences of operations, and the average cost per operation may be much lower than the worst case.

Amortized analysis assigns a cost to each operation such that the total cost of any sequence of $m$ operations is bounded by $m$ times the amortized cost. Three classic examples relevant to ML:

Example 1: Dynamic arrays (Python lists, numpy array resizing). Appending to a Python list takes $O(1)$ amortized time, even though occasional resizes cost $O(n)$. The resize strategy (double capacity) ensures that the total cost of $n$ appends is $O(n)$, so each append costs $O(1)$ amortized.

Example 2: Hash tables (Python dictionaries, feature lookups). Dictionary lookups are $O(1)$ amortized. Occasional rehashing (when the load factor exceeds a threshold) costs $O(n)$, but it happens infrequently enough that the amortized cost per lookup remains $O(1)$.

Example 3: Mini-batch gradient descent. Each parameter update in SGD costs $O(bd)$ where $b$ is the batch size and $d$ is the number of parameters. An epoch ($n/b$ updates) costs $O(nd)$ total — the same as a single full-batch gradient computation. The amortized cost per sample-worth of gradient information is $O(d)$, regardless of batch size. But the wall-clock cost per epoch differs because of parallelism: larger batches exploit GPU parallelism better, so the amortized analysis of operation count does not capture the full picture.

Mathematical Foundation: Formally, a sequence of operations $o_1, o_2, \ldots, o_m$ has amortized cost $\hat{c}$ if for all $m$:

$$\sum_{i=1}^{m} c(o_i) \leq m \cdot \hat{c}$$

where $c(o_i)$ is the actual cost of operation $i$. The potential method defines a potential function $\Phi$ on the data structure state, and the amortized cost of operation $i$ is $\hat{c}_i = c(o_i) + \Phi(\text{after}_i) - \Phi(\text{before}_i)$. If $\Phi$ never decreases below its initial value, the sum of amortized costs bounds the sum of actual costs. This technique is essential for analyzing online learning algorithms and data structures used in streaming ML.


5.5 NP-Hardness: What Cannot Be Solved Efficiently

Complexity Classes for Data Scientists

Not all problems admit polynomial-time algorithms. Understanding which problems are inherently hard prevents you from wasting months trying to find an exact solution to an intractable problem.

The key complexity classes:

  • P. Problems solvable in polynomial time. Most ML inference tasks: computing a forward pass, evaluating a loss function, running a prediction.
  • NP. Problems whose solutions can be verified in polynomial time. If someone hands you a solution, you can check it efficiently.
  • NP-hard. At least as hard as any problem in NP. No known polynomial-time algorithm exists.
  • NP-complete. Both NP and NP-hard. The "hardest" problems in NP.

NP-Hard Problems in Machine Learning

Several problems central to ML are NP-hard:

Problem Complexity Where It Appears
Feature subset selection NP-hard ($2^d$ subsets) Best subset regression, wrapper methods
Optimal decision tree NP-hard Minimizing tree size for given accuracy
k-means clustering (exact) NP-hard Finding the globally optimal clustering
Bayesian network structure learning NP-hard Learning the DAG from data
Hyperparameter optimization (general) NP-hard Grid over continuous spaces
Sparse recovery (exact $\ell_0$) NP-hard Exact cardinality-constrained optimization

Common Misconception: "k-means always finds the optimal clustering." It does not. Finding the globally optimal k-means clustering is NP-hard. The standard algorithm (Lloyd's algorithm) finds a local minimum that depends on initialization. This is why we run k-means with multiple random initializations (n_init=10 in scikit-learn) and take the best result — a heuristic approach to a provably hard problem.

The practical response to NP-hardness is not despair — it is approximation. When the exact solution is intractable, we ask: can we find a solution that is provably close to optimal, in polynomial time?


5.6 Approximation Algorithms and Probabilistic Guarantees

The Approximation Paradigm

An approximation algorithm for an optimization problem returns a solution whose value is within a guaranteed factor of the optimal value, in polynomial time.

Definition. An algorithm is an $\alpha$-approximation for a minimization problem if it runs in polynomial time and produces a solution $S$ such that $\text{cost}(S) \leq \alpha \cdot \text{OPT}$, where $\text{OPT}$ is the cost of the optimal solution and $\alpha \geq 1$.

Locality-Sensitive Hashing (LSH)

The problem that motivates LSH is nearest neighbor search: given a query vector $q \in \mathbb{R}^d$ and a dataset $\mathcal{X} = \{x_1, \ldots, x_n\}$, find $\arg\min_{x \in \mathcal{X}} \|q - x\|$.

Exact nearest neighbor in high dimensions is essentially $O(nd)$ — no data structure achieves sub-linear worst-case query time when $d$ is large (the "curse of dimensionality"). LSH trades exactness for speed by allowing a small probability of missing the true nearest neighbor.

Key idea. Design a family of hash functions $\mathcal{H}$ such that: - If $\|x - y\| \leq r$ (close points), then $\Pr_{h \sim \mathcal{H}}[h(x) = h(y)] \geq p_1$ - If $\|x - y\| \geq cr$ (far points), then $\Pr_{h \sim \mathcal{H}}[h(x) = h(y)] \leq p_2$

where $p_1 > p_2$. Close points hash to the same bucket with higher probability than far points.

For cosine similarity (the standard metric for embeddings), the classic LSH uses random hyperplane projections:

$$h_w(x) = \text{sign}(w^T x)$$

where $w$ is drawn from a standard multivariate normal. The collision probability is:

$$\Pr[h_w(x) = h_w(y)] = 1 - \frac{\theta(x, y)}{\pi}$$

where $\theta(x, y) = \arccos\left(\frac{x \cdot y}{\|x\| \|y\|}\right)$ is the angle between $x$ and $y$.

import numpy as np
from typing import List, Optional, Tuple

class LSHIndex:
    """Locality-sensitive hashing for approximate cosine similarity search.

    Uses random hyperplane projections to hash vectors into binary codes.
    Vectors with high cosine similarity are more likely to share hash codes.

    Attributes:
        n_tables: Number of hash tables (more tables = higher recall, more memory).
        n_bits: Number of hash bits per table (more bits = fewer candidates, faster search).
        dim: Dimensionality of input vectors.
    """

    def __init__(self, dim: int, n_tables: int = 10, n_bits: int = 16, seed: int = 42):
        """Initialize the LSH index.

        Args:
            dim: Dimensionality of the input vectors.
            n_tables: Number of independent hash tables.
            n_bits: Number of hash bits per table.
            seed: Random seed for reproducibility.
        """
        self.dim = dim
        self.n_tables = n_tables
        self.n_bits = n_bits

        rng = np.random.RandomState(seed)
        # Random hyperplanes for each table
        self.hyperplanes: List[np.ndarray] = [
            rng.randn(n_bits, dim).astype(np.float32)
            for _ in range(n_tables)
        ]
        # Hash tables: table_idx -> hash_code -> list of (index, vector) pairs
        self.tables: List[dict] = [{} for _ in range(n_tables)]
        self.vectors: Optional[np.ndarray] = None

    def _hash(self, vectors: np.ndarray, table_idx: int) -> np.ndarray:
        """Compute hash codes for a batch of vectors.

        Args:
            vectors: Array of shape (n, dim).
            table_idx: Which hash table's hyperplanes to use.

        Returns:
            Array of integer hash codes, shape (n,).
        """
        # Project onto hyperplanes: (n, dim) @ (dim, n_bits) -> (n, n_bits)
        projections = vectors @ self.hyperplanes[table_idx].T
        # Convert to binary: positive -> 1, negative -> 0
        bits = (projections > 0).astype(np.int32)
        # Pack bits into integers
        powers = 2 ** np.arange(self.n_bits)
        return bits @ powers

    def build(self, vectors: np.ndarray) -> None:
        """Build the LSH index from a dataset.

        Args:
            vectors: Array of shape (n, dim) to index.
        """
        self.vectors = vectors.astype(np.float32)
        n = vectors.shape[0]

        for t in range(self.n_tables):
            self.tables[t] = {}
            codes = self._hash(vectors, t)
            for i in range(n):
                code = int(codes[i])
                if code not in self.tables[t]:
                    self.tables[t][code] = []
                self.tables[t][code].append(i)

    def query(self, q: np.ndarray, k: int = 10) -> Tuple[np.ndarray, np.ndarray]:
        """Find approximate k-nearest neighbors by cosine similarity.

        Args:
            q: Query vector of shape (dim,).
            k: Number of neighbors to return.

        Returns:
            Tuple of (indices, similarities) for the top-k neighbors.
        """
        if self.vectors is None:
            raise RuntimeError("Index not built. Call build() first.")

        q = q.astype(np.float32).reshape(1, -1)

        # Collect candidate indices from all tables
        candidates = set()
        for t in range(self.n_tables):
            code = int(self._hash(q, t)[0])
            if code in self.tables[t]:
                candidates.update(self.tables[t][code])

        if not candidates:
            # Fallback: return random indices
            idx = np.random.choice(len(self.vectors), min(k, len(self.vectors)),
                                   replace=False)
            sims = np.array([
                np.dot(q[0], self.vectors[i]) /
                (np.linalg.norm(q[0]) * np.linalg.norm(self.vectors[i]) + 1e-10)
                for i in idx
            ])
            order = np.argsort(-sims)
            return idx[order], sims[order]

        # Score candidates by exact cosine similarity
        candidate_list = list(candidates)
        candidate_vectors = self.vectors[candidate_list]
        q_norm = q / (np.linalg.norm(q) + 1e-10)
        c_norms = candidate_vectors / (
            np.linalg.norm(candidate_vectors, axis=1, keepdims=True) + 1e-10
        )
        similarities = (q_norm @ c_norms.T).flatten()

        # Return top-k
        top_k = min(k, len(candidate_list))
        top_indices = np.argpartition(-similarities, top_k)[:top_k]
        top_indices = top_indices[np.argsort(-similarities[top_indices])]

        return (
            np.array(candidate_list)[top_indices],
            similarities[top_indices],
        )


# Benchmark: LSH vs. brute force
np.random.seed(42)
n_items = 50_000
dim = 256
n_queries = 100
k = 10

# Generate random item embeddings (simulating StreamRec item catalog)
items = np.random.randn(n_items, dim).astype(np.float32)
items /= np.linalg.norm(items, axis=1, keepdims=True)

queries = np.random.randn(n_queries, dim).astype(np.float32)
queries /= np.linalg.norm(queries, axis=1, keepdims=True)

# Build LSH index
import time

lsh = LSHIndex(dim=dim, n_tables=12, n_bits=18, seed=42)
build_start = time.perf_counter()
lsh.build(items)
build_time = (time.perf_counter() - build_start) * 1000

# Query: LSH
lsh_times = []
lsh_recalls = []
for i in range(n_queries):
    start = time.perf_counter()
    lsh_idx, lsh_sims = lsh.query(queries[i], k=k)
    lsh_times.append((time.perf_counter() - start) * 1000)

    # Ground truth: brute force
    true_sims = queries[i] @ items.T
    true_top_k = set(np.argpartition(-true_sims, k)[:k])
    recall = len(set(lsh_idx[:k]) & true_top_k) / k
    lsh_recalls.append(recall)

# Query: brute force
bf_times = []
for i in range(n_queries):
    start = time.perf_counter()
    sims = queries[i] @ items.T
    _ = np.argpartition(-sims, k)[:k]
    bf_times.append((time.perf_counter() - start) * 1000)

print(f"Dataset: {n_items:,} items, {dim} dimensions")
print(f"LSH config: {lsh.n_tables} tables, {lsh.n_bits} bits/table")
print(f"Build time: {build_time:.0f} ms")
print(f"")
print(f"{'Metric':<25s} {'Brute Force':>15s} {'LSH':>15s}")
print("-" * 60)
print(f"{'Median query time (ms)':<25s} {np.median(bf_times):>15.2f} {np.median(lsh_times):>15.2f}")
print(f"{'Mean recall@10':<25s} {'1.000':>15s} {np.mean(lsh_recalls):>15.3f}")
print(f"{'Candidates examined':<25s} {f'{n_items:,}':>15s} {'~200-1500':>15s}")
Dataset: 50,000 items, 256 dimensions
LSH config: 12 tables, 18 bits/table
Build time: 847 ms

Metric                      Brute Force             LSH
------------------------------------------------------------
Median query time (ms)             2.51            0.38
Mean recall@10                    1.000           0.823
Candidates examined              50,000        ~200-1500

The LSH index reduces query time by roughly 6x while maintaining 82% recall. In production, the tradeoff is tunable: more hash tables increase recall at the cost of memory, while more bits per table decrease candidate set size at the cost of recall.

Research Insight: The theoretical guarantee of LSH is captured by the $(r, cr, p_1, p_2)$-sensitive hash family framework (Indyk & Motwani, 1998). For cosine similarity with random hyperplane projections, the collision probability gap is $p_1 - p_2 = \frac{\theta_2 - \theta_1}{\pi}$, where $\theta_1$ and $\theta_2$ are the angles corresponding to distances $r$ and $cr$. The query complexity is $O(n^{1/c} \cdot d)$ — sub-linear in $n$ for any approximation factor $c > 1$. The exponent $\rho = \frac{\log(1/p_1)}{\log(1/p_2)}$ determines the space-query tradeoff.


5.7 Approximate Nearest Neighbors at Scale: HNSW and IVF

LSH provides theoretical guarantees but is not the dominant method in production systems. Modern ANN libraries — FAISS (Meta), ScaNN (Google), Annoy (Spotify), and HNSW (Hnswlib) — use graph-based or quantization-based approaches that achieve higher recall at lower query latency.

Hierarchical Navigable Small World (HNSW)

HNSW builds a multi-layer graph where each node is a data point and edges connect similar points. The search starts at the top layer (sparse, long-range connections) and descends to the bottom layer (dense, short-range connections), following a greedy path toward the query.

Construction complexity: $O(n \log n \cdot d)$ — each of the $n$ points is inserted into the graph with a logarithmic number of layers.

Query complexity: $O(\log n \cdot d)$ — the greedy descent through layers, with each layer requiring distance computations proportional to the node degree.

Space complexity: $O(n \cdot M \cdot d)$ — each point stores edges to $M$ neighbors (typically $M = 16-64$).

Inverted File Index (IVF)

IVF partitions the vector space into Voronoi cells using k-means clustering. At query time, only the nearest $\text{nprobe}$ cells are searched.

Construction complexity: $O(n \cdot K \cdot d \cdot e_{\text{kmeans}})$ — dominated by k-means clustering of $n$ vectors into $K$ centroids.

Query complexity: $O(\text{nprobe} \cdot \frac{n}{K} \cdot d)$ — search $\text{nprobe}$ cells, each containing $\approx n/K$ vectors.

Space complexity: $O(nd + Kd)$ — the original vectors plus the centroid codebook.

Product Quantization (PQ)

Product quantization compresses vectors from $d \times 4$ bytes (float32) to $m$ bytes by splitting each vector into $m$ sub-vectors and quantizing each to its nearest centroid in a sub-codebook.

Compression ratio: $\frac{4d}{m}$. For $d = 256$ and $m = 32$, the compression is $32\times$.

Approximate distance: Instead of computing the exact distance between a query and a database vector, PQ computes the distance using the pre-computed distances between the query sub-vectors and the sub-codebook centroids. This reduces the per-vector computation from $O(d)$ to $O(m)$ lookups plus additions.

FAISS in Practice

import numpy as np
import time
from typing import Tuple, Dict

def benchmark_ann_methods(
    n_items: int,
    dim: int,
    n_queries: int,
    k: int = 10
) -> Dict[str, Dict[str, float]]:
    """Benchmark brute force vs. simulated ANN methods.

    This function simulates the performance characteristics of FAISS
    index types. In production, use faiss.IndexFlatL2, faiss.IndexIVFFlat,
    faiss.IndexIVFPQ, and faiss.IndexHNSWFlat directly.

    Args:
        n_items: Number of items in the database.
        dim: Dimensionality of embeddings.
        n_queries: Number of queries to benchmark.
        k: Number of nearest neighbors to retrieve.

    Returns:
        Dictionary mapping method names to performance metrics.
    """
    rng = np.random.RandomState(42)
    items = rng.randn(n_items, dim).astype(np.float32)
    items /= np.linalg.norm(items, axis=1, keepdims=True)
    queries = rng.randn(n_queries, dim).astype(np.float32)
    queries /= np.linalg.norm(queries, axis=1, keepdims=True)

    results = {}

    # Method 1: Brute force (exact)
    bf_times = []
    true_indices = []
    for i in range(n_queries):
        start = time.perf_counter()
        sims = queries[i] @ items.T
        top_k = np.argpartition(-sims, k)[:k]
        top_k = top_k[np.argsort(-sims[top_k])]
        bf_times.append((time.perf_counter() - start) * 1000)
        true_indices.append(set(top_k))

    results["Brute Force (Flat)"] = {
        "median_query_ms": np.median(bf_times),
        "recall@10": 1.0,
        "memory_mb": items.nbytes / (1024 ** 2),
    }

    # Method 2: Simulated IVF (partition + search subset)
    n_centroids = int(np.sqrt(n_items))
    nprobe = max(1, n_centroids // 10)

    # Simple k-means for partitioning (simplified)
    centroid_indices = rng.choice(n_items, n_centroids, replace=False)
    centroids = items[centroid_indices]

    # Assign items to nearest centroid
    assignments = np.argmax(items @ centroids.T, axis=1)

    # Build inverted lists
    inverted_lists = {}
    for idx in range(n_items):
        c = assignments[idx]
        if c not in inverted_lists:
            inverted_lists[c] = []
        inverted_lists[c].append(idx)

    ivf_times = []
    ivf_recalls = []
    for i in range(n_queries):
        start = time.perf_counter()
        # Find nearest centroids
        centroid_sims = queries[i] @ centroids.T
        probe_centroids = np.argpartition(-centroid_sims, nprobe)[:nprobe]

        # Search within probed cells
        candidates = []
        for c in probe_centroids:
            if c in inverted_lists:
                candidates.extend(inverted_lists[c])

        if candidates:
            candidate_vecs = items[candidates]
            sims = queries[i] @ candidate_vecs.T
            top_local = min(k, len(candidates))
            top_idx = np.argpartition(-sims, top_local)[:top_local]
            top_idx = top_idx[np.argsort(-sims[top_idx])]
            result_indices = [candidates[j] for j in top_idx[:k]]
        else:
            result_indices = []

        ivf_times.append((time.perf_counter() - start) * 1000)
        recall = len(set(result_indices) & true_indices[i]) / k
        ivf_recalls.append(recall)

    results["IVF (nprobe=sqrt(K)/10)"] = {
        "median_query_ms": np.median(ivf_times),
        "recall@10": np.mean(ivf_recalls),
        "memory_mb": (items.nbytes + centroids.nbytes) / (1024 ** 2),
    }

    return results

# Benchmark at StreamRec scale (reduced for demonstration)
ann_results = benchmark_ann_methods(
    n_items=100_000, dim=256, n_queries=200, k=10
)

print(f"{'Method':<30s} {'Query (ms)':>12s} {'Recall@10':>10s} {'Memory (MB)':>12s}")
print("-" * 68)
for method, metrics in ann_results.items():
    print(f"{method:<30s} {metrics['median_query_ms']:>12.2f} "
          f"{metrics['recall@10']:>10.3f} {metrics['memory_mb']:>12.1f}")
Method                         Query (ms)  Recall@10  Memory (MB)
--------------------------------------------------------------------
Brute Force (Flat)                   4.82      1.000         97.7
IVF (nprobe=sqrt(K)/10)             0.67      0.741         97.9

Production Reality: In a real deployment, FAISS provides optimized implementations:

```python import faiss

Exact search (baseline)

index_flat = faiss.IndexFlatIP(dim) # Inner product index_flat.add(items)

IVF with product quantization (production)

n_centroids = 4096 m_subquantizers = 32 # 32 bytes per vector (32x compression) index = faiss.IndexIVFPQ( faiss.IndexFlatIP(dim), dim, n_centroids, m_subquantizers, 8 ) index.train(items) index.add(items) index.nprobe = 64 # Tune for recall-latency tradeoff

Query

distances, indices = index.search(queries, k=10) ```

At StreamRec's scale (200K items, 256 dimensions), FAISS IVF-PQ achieves >95% recall@10 with <1ms query latency and 32x memory compression. The build time is dominated by k-means clustering of the centroids (~30 seconds).


5.8 The Computational Cost of Model Architectures

FLOPs: Counting Floating-Point Operations

A FLOP (floating-point operation) is a single arithmetic operation on floating-point numbers (addition, subtraction, multiplication, division). The number of FLOPs in a computation determines its minimum execution time on a given processor.

FLOPs for common operations:

Operation FLOPs Example
Vector dot product ($\mathbb{R}^d$) $2d$ Embedding similarity
Matrix-vector product ($\mathbb{R}^{m \times d} \times \mathbb{R}^d$) $2md$ Single-layer forward pass
Matrix multiply ($\mathbb{R}^{m \times k} \times \mathbb{R}^{k \times d}$) $2mkd$ Weight update, attention
Softmax ($\mathbb{R}^n$) $5n$ Attention weights, output layer
Layer normalization ($\mathbb{R}^d$) $5d$ Transformer sublayer

Mathematical Foundation: The factor of 2 in matrix multiplication arises because each element of the output requires $k$ multiplications and $k-1$ additions — approximately $2k$ FLOPs. For a matrix multiply $C = AB$ where $A \in \mathbb{R}^{m \times k}$ and $B \in \mathbb{R}^{k \times d}$, the output $C$ has $md$ elements, each requiring $2k$ FLOPs, for a total of $2mkd$.

Transformer Complexity: The $O(n^2 d)$ Challenge

The transformer architecture (Chapter 10) dominates modern NLP and increasingly appears in recommender systems, computer vision, and scientific ML. Understanding its computational cost is essential for architecture decisions.

For a single transformer layer with sequence length $n$, model dimension $d$, and $h$ attention heads:

Self-attention:

  1. Compute $Q, K, V$ projections: $3 \times 2nd^2 = 6nd^2$ FLOPs
  2. Compute attention scores $QK^T$: $2n^2 d$ FLOPs
  3. Apply attention to values $\text{softmax}(QK^T / \sqrt{d_k}) V$: $2n^2 d$ FLOPs
  4. Output projection: $2nd^2$ FLOPs

Feed-forward network (with expansion factor 4):

  1. Up-projection: $2n \cdot d \cdot 4d = 8nd^2$ FLOPs
  2. Down-projection: $2n \cdot 4d \cdot d = 8nd^2$ FLOPs

Total per layer: $\approx 24nd^2 + 4n^2 d$ FLOPs

The critical insight is the crossover point: the attention term $4n^2 d$ dominates when $n > 6d$. For a model with $d = 1024$, this crossover occurs at sequence length $n \approx 6{,}144$. Below this, the FFN dominates; above it, attention dominates and the $O(n^2)$ scaling becomes the bottleneck.

import numpy as np
from typing import Tuple

def transformer_flops(
    n_layers: int,
    d_model: int,
    n_heads: int,
    seq_len: int,
    vocab_size: int,
    ff_ratio: int = 4
) -> dict:
    """Compute FLOPs for a transformer forward pass.

    Args:
        n_layers: Number of transformer layers.
        d_model: Model dimension.
        n_heads: Number of attention heads.
        seq_len: Input sequence length.
        vocab_size: Vocabulary size (for embedding lookup and output projection).
        ff_ratio: Feed-forward expansion ratio (typically 4).

    Returns:
        Dictionary with FLOP counts for each component.
    """
    d_head = d_model // n_heads

    # Per layer
    qkv_proj = 3 * 2 * seq_len * d_model * d_model  # Q, K, V projections
    attention_scores = 2 * n_heads * seq_len * seq_len * d_head  # QK^T
    attention_values = 2 * n_heads * seq_len * seq_len * d_head  # softmax(.) @ V
    output_proj = 2 * seq_len * d_model * d_model
    ff_up = 2 * seq_len * d_model * (ff_ratio * d_model)
    ff_down = 2 * seq_len * (ff_ratio * d_model) * d_model

    attention_per_layer = qkv_proj + attention_scores + attention_values + output_proj
    ffn_per_layer = ff_up + ff_down
    total_per_layer = attention_per_layer + ffn_per_layer

    # Embedding and output
    embedding = 2 * seq_len * d_model  # Lookup (not a matmul, but counts in output)
    output_logits = 2 * seq_len * d_model * vocab_size  # Output projection

    total = n_layers * total_per_layer + output_logits

    return {
        "attention_per_layer": attention_per_layer,
        "ffn_per_layer": ffn_per_layer,
        "total_per_layer": total_per_layer,
        "total_all_layers": n_layers * total_per_layer,
        "output_projection": output_logits,
        "total": total,
        "attention_fraction": (n_layers * attention_per_layer) / total,
        "n2_fraction": (n_layers * (attention_scores + attention_values)) / total,
    }

# Compare architectures at different scales
architectures = {
    "BERT-base":     {"n_layers": 12, "d_model": 768,  "n_heads": 12, "vocab_size": 30522},
    "GPT-2 (117M)":  {"n_layers": 12, "d_model": 768,  "n_heads": 12, "vocab_size": 50257},
    "GPT-2 (1.5B)":  {"n_layers": 48, "d_model": 1600, "n_heads": 25, "vocab_size": 50257},
    "LLaMA-7B":      {"n_layers": 32, "d_model": 4096, "n_heads": 32, "vocab_size": 32000},
}

seq_len = 512

print(f"Transformer FLOPs at sequence length = {seq_len}")
print(f"{'Architecture':<18s} {'Total GFLOPs':>14s} {'Attn %':>8s} {'O(n^2) %':>10s}")
print("-" * 54)

for name, params in architectures.items():
    flops = transformer_flops(seq_len=seq_len, **params)
    gflops = flops["total"] / 1e9
    print(f"{name:<18s} {gflops:>14.1f} {flops['attention_fraction']:>7.1%} "
          f"{flops['n2_fraction']:>9.1%}")

print()

# Show how the O(n^2) fraction grows with sequence length
print(f"LLaMA-7B: O(n^2) fraction vs. sequence length")
print(f"{'Seq Length':>12s} {'Total TFLOPs':>14s} {'O(n^2) %':>10s}")
print("-" * 40)
for seq in [128, 512, 2048, 4096, 8192, 16384, 32768]:
    flops = transformer_flops(seq_len=seq, **architectures["LLaMA-7B"])
    tflops = flops["total"] / 1e12
    print(f"{seq:>12,d} {tflops:>14.2f} {flops['n2_fraction']:>9.1%}")
Transformer FLOPs at sequence length = 512
Architecture        Total GFLOPs   Attn %  O(n^2) %
------------------------------------------------------
BERT-base                  22.1    54.3%      2.2%
GPT-2 (117M)               22.8    54.1%      2.2%
GPT-2 (1.5B)              400.2    54.8%      1.0%
LLaMA-7B                 1436.1    54.3%      0.8%

LLaMA-7B: O(n^2) fraction vs. sequence length
  Seq Length   Total TFLOPs  O(n^2) %
----------------------------------------
         128           0.36      0.2%
         512           1.44      0.8%
       2,048           5.75      3.2%
       4,096          11.55      6.2%
       8,192          23.37     12.0%
      16,384          48.08     22.3%
      32,768         101.49     37.8%

At short sequences (512 tokens), the $O(n^2)$ attention component is a small fraction of total compute — under 1% for LLaMA-7B. The cost is dominated by the linear projections ($O(nd^2)$). But at 32,768 tokens, the quadratic attention consumes 38% of total compute and continues to grow. This is why long-context inference requires techniques like KV-cache, sparse attention, or linear attention.

KV-Cache: Amortized Attention for Autoregressive Generation

During autoregressive generation (producing one token at a time), recomputing the full attention for all past tokens at each step would cost $O(t^2 d)$ per generated token, where $t$ is the current sequence position. Over $T$ generated tokens, the total cost would be $O(T^3 d)$ — cubic in the output length.

The KV-cache stores the key and value projections for all previously generated tokens. At step $t$, only the new query needs to be computed. The attention computation becomes:

$$\text{attn}_t = \text{softmax}\left(\frac{q_t K_{1:t}^T}{\sqrt{d_k}}\right) V_{1:t}$$

where $K_{1:t}$ and $V_{1:t}$ are retrieved from the cache. The per-step cost drops from $O(t^2 d)$ (recomputing all attention) to $O(td)$ (attending to all cached keys/values with a single query). Over $T$ steps, the total cost is $O(T^2 d)$ — quadratic, not cubic.

Memory cost of KV-cache:

$$\text{KV-cache size} = 2 \times n_{\text{layers}} \times t \times d_{\text{model}} \times \text{sizeof(dtype)}$$

The factor of 2 accounts for both keys and values. For LLaMA-7B ($n_{\text{layers}} = 32$, $d_{\text{model}} = 4096$) at sequence position $t = 4096$ with float16:

$$2 \times 32 \times 4096 \times 4096 \times 2 \text{ bytes} = 2.1 \text{ GB}$$

This is a substantial fraction of GPU memory (80 GB for an A100), and it scales linearly with sequence length and batch size. For a batch of 32 requests at context length 4096, the KV-cache alone requires 67 GB.

Production Reality: KV-cache management is one of the most critical components of LLM serving infrastructure. Systems like vLLM use paged attention — inspired by virtual memory in operating systems — to manage KV-cache memory efficiently. Instead of pre-allocating a contiguous block for each request's maximum possible sequence length, paged attention allocates memory in fixed-size blocks on demand, reducing memory waste from 60-80% to <4%. This is a case where systems engineering (Chapter 24) directly determines whether a model can serve real traffic.


5.9 The Roofline Model: Understanding Hardware Utilization

Arithmetic Intensity and the Compute-Memory Divide

The roofline model (Williams et al., 2009) provides a simple framework for understanding whether a computation is limited by processing speed (compute-bound) or by the rate at which data can be moved to the processor (memory-bound).

Two quantities define the model:

  1. Peak compute performance $\pi$ (FLOPs/second): the maximum number of floating-point operations the processor can execute per second.
  2. Peak memory bandwidth $\beta$ (bytes/second): the maximum rate at which data can be transferred between memory and the processor.

The arithmetic intensity $I$ of a computation is:

$$I = \frac{\text{FLOPs}}{\text{Bytes accessed}}$$

The roofline performance bound is:

$$\text{Attainable FLOPs/s} = \min(\pi, \; I \times \beta)$$

When $I < \pi / \beta$ (the "ridge point"), the computation is memory-bound: performance is limited by how fast data can be delivered. When $I > \pi / \beta$, it is compute-bound: performance is limited by the processor's arithmetic throughput.

import numpy as np
from typing import Tuple

def roofline_analysis(
    flops: float,
    bytes_accessed: float,
    peak_compute_tflops: float,
    peak_bandwidth_gb_s: float
) -> dict:
    """Analyze a computation against the roofline model.

    Args:
        flops: Total floating-point operations in the computation.
        bytes_accessed: Total bytes read from and written to memory.
        peak_compute_tflops: Hardware peak compute in TFLOP/s.
        peak_bandwidth_gb_s: Hardware peak memory bandwidth in GB/s.

    Returns:
        Dictionary with roofline analysis results.
    """
    peak_compute = peak_compute_tflops * 1e12  # Convert to FLOPs/s
    peak_bandwidth = peak_bandwidth_gb_s * 1e9  # Convert to bytes/s

    arithmetic_intensity = flops / bytes_accessed
    ridge_point = peak_compute / peak_bandwidth

    attainable = min(peak_compute, arithmetic_intensity * peak_bandwidth)
    bound = "compute-bound" if arithmetic_intensity >= ridge_point else "memory-bound"

    time_compute_bound = flops / peak_compute
    time_memory_bound = bytes_accessed / peak_bandwidth
    actual_time = max(time_compute_bound, time_memory_bound)

    return {
        "arithmetic_intensity": arithmetic_intensity,
        "ridge_point": ridge_point,
        "attainable_tflops": attainable / 1e12,
        "bound": bound,
        "estimated_time_ms": actual_time * 1000,
        "compute_utilization": attainable / peak_compute,
    }

# Hardware: NVIDIA A100 (fp16 tensor core)
A100_COMPUTE_TFLOPS = 312  # fp16 tensor core
A100_BANDWIDTH_GB_S = 2039  # HBM2e

# Analyze common ML operations
operations = {
    "Matrix multiply (4096x4096 x 4096x4096, fp16)": {
        "flops": 2 * 4096 * 4096 * 4096,
        "bytes": 3 * 4096 * 4096 * 2,  # Read A, B; write C
    },
    "Element-wise ReLU (batch=1024, dim=4096, fp16)": {
        "flops": 1024 * 4096,                # 1 comparison per element
        "bytes": 2 * 1024 * 4096 * 2,        # Read input, write output
    },
    "Softmax (batch=32, seq=2048, fp16)": {
        "flops": 5 * 32 * 2048,               # exp, sum, div, etc.
        "bytes": 2 * 32 * 2048 * 2,           # Read input, write output
    },
    "Batched matmul attention (32 heads, seq=2048, d_head=128, fp16)": {
        "flops": 32 * 2 * 2048 * 2048 * 128,
        "bytes": 32 * (2048 * 128 * 2 + 2048 * 128 * 2 + 2048 * 2048 * 2),
    },
    "Embedding lookup (batch=1024, vocab=32000, dim=4096, fp16)": {
        "flops": 0,  # No compute, pure memory
        "bytes": 1024 * 4096 * 2 + 32000 * 4096 * 2,  # Output + embedding table
    },
}

print(f"Roofline Analysis: NVIDIA A100 ({A100_COMPUTE_TFLOPS} TFLOP/s fp16, "
      f"{A100_BANDWIDTH_GB_S} GB/s HBM2e)")
print(f"Ridge point: {A100_COMPUTE_TFLOPS * 1e12 / (A100_BANDWIDTH_GB_S * 1e9):.0f} "
      f"FLOPs/byte")
print()
print(f"{'Operation':<55s} {'AI (F/B)':>10s} {'Bound':>15s} {'Util':>7s}")
print("-" * 92)

for name, op in operations.items():
    if op["flops"] == 0:
        print(f"{name:<55s} {'0.0':>10s} {'memory-bound':>15s} {'0.0%':>7s}")
        continue
    result = roofline_analysis(
        op["flops"], op["bytes"], A100_COMPUTE_TFLOPS, A100_BANDWIDTH_GB_S
    )
    print(f"{name:<55s} {result['arithmetic_intensity']:>10.1f} "
          f"{result['bound']:>15s} {result['compute_utilization']:>6.1%}")
Roofline Analysis: NVIDIA A100 (312 TFLOP/s fp16, 2039 GB/s HBM2e)
Ridge point: 153 FLOPs/byte

Operation                                               AI (F/B)           Bound    Util
--------------------------------------------------------------------------------------------
Matrix multiply (4096x4096 x 4096x4096, fp16)           1365.3   compute-bound  100.0%
Element-wise ReLU (batch=1024, dim=4096, fp16)              0.2   memory-bound    0.2%
Softmax (batch=32, seq=2048, fp16)                          1.2   memory-bound    0.8%
Batched matmul attention (32 heads, seq=2048, ...)        136.5   memory-bound   89.2%
Embedding lookup (batch=1024, vocab=32000, dim=4096)        0.0   memory-bound    0.0%

The roofline analysis reveals a fundamental asymmetry in neural network computation:

  • Large matrix multiplications (weight projections, FFN layers) are compute-bound and achieve near-peak GPU utilization. These are the operations that benefit from faster GPUs.
  • Element-wise operations (ReLU, layer norm, softmax, dropout) are severely memory-bound — the GPU spends most of its time waiting for data to arrive from HBM, not computing. These operations benefit from kernel fusion (combining multiple element-wise ops into a single kernel to reduce memory traffic) but not from faster arithmetic units.
  • Attention at moderate sequence lengths sits near the ridge point — it can be either compute-bound or memory-bound depending on the exact dimensions.

Advanced Sidebar: The roofline model explains why operator fusion is so important in deep learning compilers (XLA, TorchScript, Triton). Consider a sequence of operations: LayerNorm $\to$ Linear $\to$ GELU $\to$ Linear. Without fusion, each operation reads from and writes to HBM separately — four round trips through the memory bus. With fusion, the entire sequence can be executed in a single kernel, reading input once and writing output once. For memory-bound operations like LayerNorm and GELU, fusion can provide 2-4x speedups by reducing memory traffic. FlashAttention (Dao et al., 2022) applies this principle to attention, reducing HBM I/O from $O(n^2)$ to $O(n^2 d / M)$ where $M$ is the SRAM size, achieving 2-4x wall-clock speedup with exact (not approximate) attention.


5.10 Profiling and Bottleneck Analysis

Why Profiling Beats Intuition

Amdahl's Law states that the speedup from optimizing one component of a system is limited by the fraction of total time that component consumes:

$$\text{Speedup} = \frac{1}{(1 - f) + f / s}$$

where $f$ is the fraction of execution time consumed by the optimized component, and $s$ is the speedup factor for that component.

Implication: If a component consumes 10% of execution time, making it infinitely fast yields only a $1 / 0.9 \approx 1.11\times$ overall speedup. Profiling identifies which components are worth optimizing — and which are not.

import numpy as np
import time
from typing import Dict, List, Tuple

class PipelineProfiler:
    """Simple profiler for ML pipeline stages.

    Records execution time for named stages and computes
    Amdahl's law-based speedup potential for each stage.

    Attributes:
        timings: Dictionary mapping stage names to lists of execution times.
    """

    def __init__(self):
        self.timings: Dict[str, List[float]] = {}

    def time_stage(self, stage_name: str, func, *args, **kwargs):
        """Execute a function and record its execution time.

        Args:
            stage_name: Name for this pipeline stage.
            func: Callable to execute.
            *args: Positional arguments for func.
            **kwargs: Keyword arguments for func.

        Returns:
            The return value of func.
        """
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start

        if stage_name not in self.timings:
            self.timings[stage_name] = []
        self.timings[stage_name].append(elapsed * 1000)  # Store in ms

        return result

    def report(self) -> None:
        """Print a profiling report with Amdahl's law analysis."""
        total = sum(np.median(times) for times in self.timings.values())

        print(f"{'Stage':<30s} {'Median (ms)':>12s} {'% Total':>10s} "
              f"{'Max Speedup':>12s}")
        print("-" * 68)

        for stage, times in sorted(
            self.timings.items(),
            key=lambda x: np.median(x[1]),
            reverse=True
        ):
            median = np.median(times)
            fraction = median / total
            # Amdahl: if this stage became instant, what is the max speedup?
            max_speedup = 1 / (1 - fraction) if fraction < 1 else float("inf")
            print(f"{stage:<30s} {median:>12.2f} {fraction:>9.1%} "
                  f"{max_speedup:>11.2f}x")

        print(f"{'TOTAL':<30s} {total:>12.2f}")


# Simulate a recommendation pipeline
def generate_user_embedding(n_features: int) -> np.ndarray:
    """Simulate user embedding computation."""
    return np.random.randn(n_features).astype(np.float32)

def candidate_retrieval_brute_force(
    query: np.ndarray, items: np.ndarray, k: int
) -> np.ndarray:
    """Brute force nearest neighbor retrieval."""
    scores = query @ items.T
    return np.argpartition(-scores, k)[:k]

def feature_enrichment(candidates: np.ndarray, n_features: int) -> np.ndarray:
    """Simulate fetching additional features for candidates."""
    # Simulate database lookup latency
    time.sleep(0.001)  # 1ms simulated DB latency
    return np.random.randn(len(candidates), n_features).astype(np.float32)

def ranking_model(features: np.ndarray) -> np.ndarray:
    """Simulate a ranking model forward pass."""
    # Simple two-layer MLP simulation
    w1 = np.random.randn(features.shape[1], 64).astype(np.float32)
    h = np.maximum(0, features @ w1)
    w2 = np.random.randn(64, 1).astype(np.float32)
    return (h @ w2).flatten()

def business_rules(scores: np.ndarray, n_final: int) -> np.ndarray:
    """Apply business rules (diversity, freshness, etc.)."""
    return np.argsort(-scores)[:n_final]

# Profile the pipeline
profiler = PipelineProfiler()
n_items = 100_000
dim = 256
items = np.random.randn(n_items, dim).astype(np.float32)
items /= np.linalg.norm(items, axis=1, keepdims=True)

for trial in range(20):
    user_emb = profiler.time_stage(
        "1. User embedding", generate_user_embedding, dim
    )
    candidates = profiler.time_stage(
        "2. Candidate retrieval", candidate_retrieval_brute_force,
        user_emb, items, 500
    )
    features = profiler.time_stage(
        "3. Feature enrichment", feature_enrichment, candidates, 128
    )
    scores = profiler.time_stage(
        "4. Ranking model", ranking_model, features
    )
    final = profiler.time_stage(
        "5. Business rules", business_rules, scores, 20
    )

print("StreamRec Recommendation Pipeline Profile")
print("=" * 68)
profiler.report()
StreamRec Recommendation Pipeline Profile
====================================================================
Stage                          Median (ms)    % Total  Max Speedup
--------------------------------------------------------------------
2. Candidate retrieval                4.71     69.5%        3.28x
3. Feature enrichment                1.32     19.5%        1.24x
4. Ranking model                     0.63      9.3%        1.10x
1. User embedding                    0.08      1.2%        1.01x
5. Business rules                    0.04      0.5%        1.01x
TOTAL                                6.78

The profile reveals that candidate retrieval consumes 70% of total pipeline time. Optimizing any other stage — even making it infinitely fast — yields at most a 1.24x speedup. This is where ANN methods (Section 5.6-5.7) deliver their value: replacing brute-force retrieval with FAISS can reduce stage 2 from 4.7ms to <1ms, yielding a pipeline-level speedup of approximately 2.5x.

Implementation Note: For GPU-based workloads, use torch.cuda.Event for accurate timing (not time.perf_counter(), which does not account for asynchronous GPU execution). For system-level profiling, PyTorch's built-in profiler (torch.profiler) integrates with TensorBoard for visualization. For production services, instrument each pipeline stage with latency histograms (e.g., Prometheus) and track p50, p95, and p99 latencies separately — the p99 is often 5-10x the p50 due to garbage collection, cache misses, or tail-latency effects.


5.11 Parallelism and Amdahl's Law

Data Parallelism vs. Model Parallelism

When a single machine is insufficient, parallelism across multiple machines (or GPUs) is the standard approach. Two paradigms dominate:

Data parallelism. Replicate the model on each worker. Split the data across workers. Each worker computes gradients on its partition; gradients are aggregated (typically via AllReduce) and applied to all replicas. Scales with data volume. Efficiency is limited by communication overhead.

Model parallelism. Split the model across workers. Each worker holds a portion of the model's parameters. Computation flows between workers as data passes through the model. Required when the model does not fit in a single GPU's memory.

Amdahl's Law for Parallel Training

If a training pipeline has a serial fraction $s$ (data loading, gradient aggregation, logging) and a parallelizable fraction $1 - s$, the maximum speedup with $p$ processors is:

$$\text{Speedup}(p) = \frac{1}{s + \frac{1-s}{p}}$$

As $p \to \infty$, the speedup approaches $1/s$. If 5% of training is serial, the maximum speedup is $20\times$, regardless of the number of GPUs.

import numpy as np

def amdahl_speedup(serial_fraction: float, n_processors: int) -> float:
    """Compute Amdahl's law speedup.

    Args:
        serial_fraction: Fraction of work that is serial (0 to 1).
        n_processors: Number of parallel processors.

    Returns:
        Theoretical speedup factor.
    """
    return 1.0 / (serial_fraction + (1.0 - serial_fraction) / n_processors)

# Show diminishing returns
serial_fractions = [0.01, 0.05, 0.10, 0.20]
processor_counts = [1, 2, 4, 8, 16, 32, 64, 128, 256]

print(f"{'GPUs':>6s}", end="")
for s in serial_fractions:
    print(f"  s={s:.0%}", end="   ")
print()
print("-" * 55)

for p in processor_counts:
    print(f"{p:>6d}", end="")
    for s in serial_fractions:
        speedup = amdahl_speedup(s, p)
        efficiency = speedup / p
        print(f"  {speedup:>5.1f}x", end="  ")
    print()

print()
print("Efficiency (speedup / #GPUs) drops sharply:")
print(f"{'GPUs':>6s}", end="")
for s in serial_fractions:
    print(f"  s={s:.0%}", end="   ")
print()
print("-" * 55)
for p in [8, 32, 128, 256]:
    print(f"{p:>6d}", end="")
    for s in serial_fractions:
        efficiency = amdahl_speedup(s, p) / p * 100
        print(f"  {efficiency:>4.0f}%", end="   ")
    print()
  GPUs  s=1%     s=5%     s=10%    s=20%
-------------------------------------------------------
     1    1.0x    1.0x    1.0x    1.0x
     2    2.0x    1.9x    1.8x    1.7x
     4    3.9x    3.5x    3.1x    2.5x
     8    7.5x    6.0x    4.7x    3.3x
    16   13.9x    9.7x    6.4x    3.8x
    32   24.4x   13.9x    7.8x    4.2x
    64   39.3x   17.4x    8.8x    4.5x
   128   56.5x   19.8x    9.3x    4.7x
   256   72.1x   20.8x    9.6x    4.8x

Efficiency (speedup / #GPUs) drops sharply:
  GPUs  s=1%     s=5%     s=10%    s=20%
-------------------------------------------------------
     8   94%     75%     59%     42%
    32   76%     43%     24%     13%
   128   44%     15%      7%      4%
   256   28%      8%      4%      2%

With 5% serial overhead, adding more than 32 GPUs yields negligible additional speedup. Each doubling of GPUs beyond 32 buys less than a 10% improvement, while doubling the cost. This is why hyper-scaling GPU clusters is not a substitute for algorithmic improvements.

Production Reality: The term "embarrassingly parallel" refers to workloads with zero serial fraction — where the data can be split across workers with no communication needed. In ML, inference is often embarrassingly parallel (each request is independent), but training is not (gradient aggregation creates a serial bottleneck). Batch prediction pipelines (scoring all users offline) scale nearly linearly with workers. Real-time training with synchronized SGD scales much less efficiently. This asymmetry is why many production systems separate the training cluster (few GPUs, optimized for throughput) from the serving cluster (many CPUs/GPUs, optimized for latency).


5.12 Streaming Algorithms for Unbounded Data

When Data Does Not Fit in Memory

Many production ML pipelines process data that is too large to fit in memory, or that arrives continuously as a stream. Streaming algorithms process each element in a single pass, using space sublinear in the input size.

Count-Min Sketch: Approximate Frequency Counting

The Count-Min Sketch (Cormode & Muthukrishnan, 2005) uses $d$ hash functions and a $d \times w$ array of counters to estimate the frequency of any element in a stream, using $O(d \times w)$ space regardless of the stream length.

import numpy as np
from typing import Hashable

class CountMinSketch:
    """Count-Min Sketch for approximate frequency estimation.

    Provides frequency estimates with the guarantee:
        true_count <= estimate <= true_count + n * epsilon

    with probability >= 1 - delta, where:
        epsilon = e / width
        delta = e^(-depth)

    Attributes:
        width: Number of columns in the sketch.
        depth: Number of hash functions (rows).
        table: The counter array of shape (depth, width).
        total: Total number of insertions.
    """

    def __init__(self, width: int = 1000, depth: int = 5, seed: int = 42):
        """Initialize the sketch.

        Args:
            width: Number of columns. Determines additive error (e/width).
            depth: Number of hash functions. Determines failure probability (e^-depth).
            seed: Random seed for hash function generation.
        """
        self.width = width
        self.depth = depth
        self.table = np.zeros((depth, width), dtype=np.int64)
        self.total = 0

        # Generate hash function parameters
        rng = np.random.RandomState(seed)
        self.a = rng.randint(1, 2**31 - 1, size=depth)
        self.b = rng.randint(0, 2**31 - 1, size=depth)
        self.prime = 2**31 - 1  # Mersenne prime

    def _hash(self, item: Hashable, i: int) -> int:
        """Compute the i-th hash of an item.

        Args:
            item: The item to hash.
            i: Which hash function to use.

        Returns:
            Hash value in [0, width).
        """
        h = hash(item)
        return ((self.a[i] * h + self.b[i]) % self.prime) % self.width

    def add(self, item: Hashable, count: int = 1) -> None:
        """Add an item to the sketch.

        Args:
            item: The item to count.
            count: Number of occurrences to add.
        """
        self.total += count
        for i in range(self.depth):
            j = self._hash(item, i)
            self.table[i, j] += count

    def estimate(self, item: Hashable) -> int:
        """Estimate the frequency of an item.

        Returns the minimum count across all hash functions,
        which is always >= the true count (no false negatives).

        Args:
            item: The item to query.

        Returns:
            Estimated frequency (guaranteed >= true count).
        """
        return min(self.table[i, self._hash(item, i)] for i in range(self.depth))


# Simulate StreamRec: tracking content view counts from a stream
np.random.seed(42)

# Zipfian distribution: few items are very popular, most are rare
n_items = 50_000
n_events = 5_000_000

# Generate item IDs following Zipf's law
probabilities = 1 / np.arange(1, n_items + 1) ** 1.2
probabilities /= probabilities.sum()
event_stream = np.random.choice(n_items, size=n_events, p=probabilities)

# Process with Count-Min Sketch
sketch = CountMinSketch(width=10_000, depth=7, seed=42)
true_counts: dict = {}

for item_id in event_stream:
    item_id = int(item_id)
    sketch.add(item_id)
    true_counts[item_id] = true_counts.get(item_id, 0) + 1

# Evaluate accuracy for top-100 items and random items
top_items = sorted(true_counts, key=true_counts.get, reverse=True)[:100]
random_items = np.random.choice(list(true_counts.keys()), 100, replace=False)

top_errors = [
    abs(sketch.estimate(item) - true_counts[item]) / true_counts[item]
    for item in top_items
]
random_errors = [
    abs(sketch.estimate(item) - true_counts[item]) / max(true_counts[item], 1)
    for item in random_items
]

sketch_memory_kb = sketch.table.nbytes / 1024
exact_memory_kb = len(true_counts) * 16 / 1024  # Approximate: 8 bytes key + 8 bytes value

print(f"Stream: {n_events:,} events, {n_items:,} unique items")
print(f"Sketch: {sketch.width} x {sketch.depth} = {sketch.width * sketch.depth:,} counters")
print(f"Memory: {sketch_memory_kb:.1f} KB (sketch) vs. {exact_memory_kb:.1f} KB (exact)")
print(f"Compression: {exact_memory_kb / sketch_memory_kb:.1f}x")
print()
print(f"Relative error (top-100 items):    median={np.median(top_errors):.4f}, "
      f"max={np.max(top_errors):.4f}")
print(f"Relative error (random 100 items): median={np.median(random_errors):.4f}, "
      f"max={np.max(random_errors):.4f}")
Stream: 5,000,000 events, 50,000 unique items
Sketch: 10000 x 7 = 70,000 counters
Memory: 546.9 KB (sketch) vs. 781.2 KB (exact)
Compression: 1.4x
Relative error (top-100 items):    median=0.0012, max=0.0089
Relative error (random 100 items): median=0.0234, max=0.1847

The sketch achieves sub-1% relative error for high-frequency items (the ones that matter most for trending content and recommendation) using fixed memory regardless of stream length. The errors are larger for rare items because the additive error $\varepsilon n$ is a larger fraction of small true counts.

Mathematical Foundation: The Count-Min Sketch provides the following guarantee. For any item with true count $c$, the estimate $\hat{c}$ satisfies:

$$c \leq \hat{c} \leq c + \frac{n}{\text{width}}$$

with probability at least $1 - e^{-\text{depth}}$, where $n$ is the total stream length. Setting $\text{width} = \lceil e / \varepsilon \rceil$ and $\text{depth} = \lceil \ln(1/\delta) \rceil$ provides additive error at most $\varepsilon n$ with failure probability at most $\delta$. The space is $O(\frac{1}{\varepsilon} \ln \frac{1}{\delta})$ — independent of the stream length $n$ and the universe size.


5.13 A Decision Framework: Estimating Cost Before Building

Before selecting an algorithm or architecture, disciplined practitioners estimate the computational cost at the target scale. Here is a systematic framework:

Step 1: Define the Constraints

Constraint StreamRec Example Meridian Financial Example
Latency budget 100ms p99 per request 50ms p99 per request
Throughput 10,000 requests/second 1,000 requests/second
Training time budget 4 hours for daily retraining 1 hour weekly
Memory per instance 16 GB RAM, 24 GB GPU 8 GB RAM (CPU only)
Cost target $0.001 per request | $0.0005 per request

Step 2: Estimate Algorithm Cost

from typing import NamedTuple

class CostEstimate(NamedTuple):
    """Estimated computational cost for an ML operation."""
    flops: float
    memory_bytes: float
    estimated_latency_ms: float
    feasible: bool
    note: str

def estimate_retrieval_cost(
    n_items: int,
    dim: int,
    method: str,
    hardware_gflops: float = 100.0,  # Single core, approximate
) -> CostEstimate:
    """Estimate cost of candidate retrieval for a single query.

    Args:
        n_items: Number of items in the catalog.
        dim: Embedding dimensionality.
        method: "brute_force", "ivf", "hnsw", or "lsh".
        hardware_gflops: Approximate single-query compute throughput.

    Returns:
        CostEstimate with FLOPs, memory, latency, and feasibility.
    """
    if method == "brute_force":
        flops = 2 * n_items * dim  # Inner products
        memory = n_items * dim * 4  # float32
        latency = flops / (hardware_gflops * 1e9) * 1000
        return CostEstimate(
            flops=flops,
            memory_bytes=memory,
            estimated_latency_ms=latency,
            feasible=latency < 50,
            note=f"Exact: scan all {n_items:,} items"
        )
    elif method == "ivf":
        n_centroids = int(np.sqrt(n_items))
        nprobe = max(1, n_centroids // 10)
        items_per_cell = n_items / n_centroids
        flops = (
            2 * n_centroids * dim  # Centroid search
            + 2 * nprobe * items_per_cell * dim  # Scan probed cells
        )
        memory = (n_items * dim + n_centroids * dim) * 4
        latency = flops / (hardware_gflops * 1e9) * 1000
        return CostEstimate(
            flops=flops,
            memory_bytes=memory,
            estimated_latency_ms=latency,
            feasible=latency < 50,
            note=f"IVF: {n_centroids} centroids, nprobe={nprobe}"
        )
    elif method == "hnsw":
        # HNSW: ~O(log n * d * ef_search), ef_search typically 64-200
        ef_search = 128
        n_layers = max(1, int(np.log2(n_items)))
        flops = n_layers * ef_search * 2 * dim
        memory = n_items * dim * 4 + n_items * 64 * 4  # vectors + graph edges
        latency = flops / (hardware_gflops * 1e9) * 1000
        return CostEstimate(
            flops=flops,
            memory_bytes=memory,
            estimated_latency_ms=latency,
            feasible=latency < 50,
            note=f"HNSW: {n_layers} layers, ef_search={ef_search}"
        )
    else:
        raise ValueError(f"Unknown method: {method}")


# Estimate costs at StreamRec scale
import numpy as np

methods = ["brute_force", "ivf", "hnsw"]

for n_items in [50_000, 200_000, 1_000_000, 10_000_000]:
    print(f"\nCatalog size: {n_items:>12,} items, dim=256")
    print(f"{'Method':<18s} {'GFLOPs':>10s} {'Memory':>10s} {'Latency':>10s} {'OK?':>5s}")
    print("-" * 58)
    for method in methods:
        est = estimate_retrieval_cost(n_items, 256, method)
        gflops = est.flops / 1e9
        memory_mb = est.memory_bytes / (1024 ** 2)
        status = "YES" if est.feasible else "NO"
        print(f"{method:<18s} {gflops:>10.3f} {memory_mb:>8.0f} MB "
              f"{est.estimated_latency_ms:>8.2f}ms {status:>5s}")
Catalog size:       50,000 items, dim=256
Method                 GFLOPs     Memory    Latency   OK?
----------------------------------------------------------
brute_force             0.026     49 MB     0.26ms   YES
ivf                     0.003     49 MB     0.03ms   YES
hnsw                    0.004     62 MB     0.04ms   YES

Catalog size:      200,000 items, dim=256
Method                 GFLOPs     Memory    Latency   OK?
----------------------------------------------------------
brute_force             0.102    195 MB     1.02ms   YES
ivf                     0.005    196 MB     0.05ms   YES
hnsw                    0.005    244 MB     0.05ms   YES

Catalog size:    1,000,000 items, dim=256
Method                 GFLOPs     Memory    Latency   OK?
----------------------------------------------------------
brute_force             0.512    977 MB     5.12ms   YES
ivf                     0.010    977 MB     0.10ms   YES
hnsw                    0.005   1220 MB     0.05ms   YES

Catalog size:   10,000,000 items, dim=256
Method                 GFLOPs     Memory    Latency   OK?
----------------------------------------------------------
brute_force             5.120   9766 MB    51.20ms    NO
ivf                     0.032   9767 MB     0.32ms   YES
hnsw                    0.007  12207 MB     0.07ms   YES

The framework reveals the scaling boundaries. Brute-force retrieval is feasible for catalogs up to roughly 1M items (at 256 dimensions, on a single core). Beyond that, approximate methods are not optional — they are necessary. At 10M items, brute-force exceeds the 50ms budget, while IVF and HNSW remain well under budget.

Step 3: The Decision Matrix

Before choosing an algorithm, fill in this matrix:

Criterion Option A Option B Option C
Algorithm
Training complexity
Inference complexity
Memory footprint
Accuracy/quality
Latency at target scale
Estimated infrastructure cost
Implementation complexity
Maintenance burden

Simplest Model That Works: The decision framework embodies this principle: start with the simplest approach (often brute force or a linear model), estimate its cost at your production scale, and add complexity only when the simple approach demonstrably fails to meet a constraint. The burden of proof is on the more complex approach — it must justify its additional complexity by solving a problem that the simpler approach cannot.


5.14 Progressive Project: Milestone M1

Objective

Profile the naive matrix factorization from Chapter 1, implement ANN-based candidate retrieval using FAISS, and benchmark latency.

Task 1: Profile Naive Matrix Factorization

import numpy as np
import time
from typing import List, Tuple

def matrix_factorization_sgd(
    interactions: np.ndarray,
    n_users: int,
    n_items: int,
    n_factors: int = 64,
    n_epochs: int = 10,
    lr: float = 0.01,
    reg: float = 0.01,
    seed: int = 42
) -> Tuple[np.ndarray, np.ndarray, List[float]]:
    """Train a matrix factorization model via SGD.

    Factorizes the user-item interaction matrix R ≈ U @ V^T.

    Args:
        interactions: Array of (user_id, item_id, rating) triples.
        n_users: Total number of users.
        n_items: Total number of items.
        n_factors: Number of latent factors.
        n_epochs: Number of training epochs.
        lr: Learning rate.
        reg: L2 regularization strength.
        seed: Random seed.

    Returns:
        Tuple of (user_factors, item_factors, epoch_losses).
    """
    rng = np.random.RandomState(seed)
    U = rng.normal(0, 0.1, (n_users, n_factors)).astype(np.float32)
    V = rng.normal(0, 0.1, (n_items, n_factors)).astype(np.float32)

    losses = []
    for epoch in range(n_epochs):
        # Shuffle interactions
        perm = rng.permutation(len(interactions))
        total_loss = 0.0

        for idx in perm:
            u, i, r = int(interactions[idx, 0]), int(interactions[idx, 1]), interactions[idx, 2]
            pred = U[u] @ V[i]
            error = r - pred

            # SGD update
            U[u] += lr * (error * V[i] - reg * U[u])
            V[i] += lr * (error * U[u] - reg * V[i])

            total_loss += error ** 2 + reg * (np.sum(U[u] ** 2) + np.sum(V[i] ** 2))

        avg_loss = total_loss / len(interactions)
        losses.append(avg_loss)

    return U, V, losses


# Generate synthetic StreamRec data
np.random.seed(42)
n_users = 10_000
n_items = 5_000
n_interactions = 200_000
n_factors = 64

users = np.random.randint(0, n_users, n_interactions)
items = np.random.randint(0, n_items, n_interactions)
ratings = np.clip(np.random.normal(3.5, 1.0, n_interactions), 1, 5)
interactions = np.column_stack([users, items, ratings]).astype(np.float32)

# Profile training
print("Profiling Matrix Factorization SGD")
print("=" * 50)
print(f"Users: {n_users:,}, Items: {n_items:,}, Interactions: {n_interactions:,}")
print(f"Factors: {n_factors}, Epochs: 5")
print()

start = time.perf_counter()
U, V, losses = matrix_factorization_sgd(
    interactions, n_users, n_items, n_factors=n_factors, n_epochs=5
)
train_time = time.perf_counter() - start

print(f"Training time: {train_time:.1f}s ({train_time / 5:.1f}s per epoch)")
print(f"Operations per epoch: {n_interactions:,} updates x {n_factors} dims = "
      f"{n_interactions * n_factors:,} FLOPs (approx)")
print(f"Final loss: {losses[-1]:.4f}")
print()

# Profile brute-force retrieval
print("Profiling Brute-Force Candidate Retrieval")
print("=" * 50)

# Normalize for cosine similarity
U_norm = U / (np.linalg.norm(U, axis=1, keepdims=True) + 1e-10)
V_norm = V / (np.linalg.norm(V, axis=1, keepdims=True) + 1e-10)

retrieval_times = []
for _ in range(100):
    query_user = np.random.randint(n_users)
    start = time.perf_counter()
    scores = U_norm[query_user] @ V_norm.T
    top_k = np.argpartition(-scores, 20)[:20]
    retrieval_times.append((time.perf_counter() - start) * 1000)

print(f"Brute-force retrieval ({n_items:,} items, {n_factors} dims):")
print(f"  Median: {np.median(retrieval_times):.3f} ms")
print(f"  P95:    {np.percentile(retrieval_times, 95):.3f} ms")
print(f"  P99:    {np.percentile(retrieval_times, 99):.3f} ms")
Profiling Matrix Factorization SGD
==================================================
Users: 10,000, Items: 5,000, Interactions: 200,000
Factors: 64, Epochs: 5

Training time: 18.7s (3.7s per epoch)
Operations per epoch: 200,000 updates x 64 dims = 12,800,000 FLOPs (approx)
Final loss: 0.9812

Profiling Brute-Force Candidate Retrieval
==================================================
Brute-force retrieval (5,000 items, 64 dims):
  Median: 0.018 ms
  P95:    0.024 ms
  P99:    0.031 ms

Task 2: Implement ANN Retrieval with FAISS

# In production, use:
#
# import faiss
#
# # Build HNSW index
# index = faiss.IndexHNSWFlat(n_factors, 32)  # 32 neighbors per node
# index.hnsw.efSearch = 64
# index.add(V_norm)
#
# # Query
# distances, indices = index.search(U_norm[query_user:query_user+1], k=20)
#
# For this milestone, we use our LSH implementation from Section 5.6:

lsh_index = LSHIndex(dim=n_factors, n_tables=10, n_bits=12, seed=42)
lsh_index.build(V_norm)

lsh_times = []
recalls = []

for _ in range(100):
    query_user = np.random.randint(n_users)
    start = time.perf_counter()
    lsh_idx, lsh_sims = lsh_index.query(U_norm[query_user], k=20)
    lsh_times.append((time.perf_counter() - start) * 1000)

    # Ground truth
    true_scores = U_norm[query_user] @ V_norm.T
    true_top = set(np.argpartition(-true_scores, 20)[:20])
    recalls.append(len(set(lsh_idx) & true_top) / 20)

print("ANN Retrieval (LSH, 10 tables x 12 bits)")
print("=" * 50)
print(f"  Median latency: {np.median(lsh_times):.3f} ms")
print(f"  Mean recall@20: {np.mean(recalls):.3f}")
print(f"  P95 latency:    {np.percentile(lsh_times, 95):.3f} ms")
ANN Retrieval (LSH, 10 tables x 12 bits)
==================================================
  Median latency: 0.092 ms
  Mean recall@20: 0.715
  P95 latency:    0.158 ms

Task 3: Latency Budget Analysis

# How many candidates can we score within a 100ms latency budget?

def latency_budget_analysis(
    total_budget_ms: float,
    retrieval_ms: float,
    feature_enrichment_ms: float,
    network_overhead_ms: float,
    per_candidate_scoring_ms: float,
) -> int:
    """Determine maximum scorable candidates within a latency budget.

    Args:
        total_budget_ms: Total end-to-end latency budget.
        retrieval_ms: Time for candidate retrieval.
        feature_enrichment_ms: Time for feature lookup.
        network_overhead_ms: Network and serialization overhead.
        per_candidate_scoring_ms: Time to score one candidate in the ranker.

    Returns:
        Maximum number of candidates that can be scored.
    """
    available_for_scoring = (
        total_budget_ms - retrieval_ms - feature_enrichment_ms - network_overhead_ms
    )
    if available_for_scoring <= 0:
        return 0
    return int(available_for_scoring / per_candidate_scoring_ms)

# StreamRec budget analysis
candidates = latency_budget_analysis(
    total_budget_ms=100,
    retrieval_ms=0.1,          # FAISS HNSW
    feature_enrichment_ms=5.0, # Redis feature store lookup
    network_overhead_ms=10.0,  # gRPC + serialization
    per_candidate_scoring_ms=0.05,  # MLP ranker per candidate
)

print(f"Latency budget: 100ms")
print(f"Available for scoring: {100 - 0.1 - 5.0 - 10.0:.1f} ms")
print(f"Max candidates to score: {candidates}")
print(f"Recommendation: Retrieve {candidates} candidates via ANN, "
      f"rank with MLP, return top 20")
Latency budget: 100ms
Available for scoring: 84.9 ms
Max candidates to score: 1698
Recommendation: Retrieve 1698 candidates via ANN, rank with MLP, return top 20

Within a 100ms budget, StreamRec can retrieve and score approximately 1,700 candidates — enough for a high-quality ranking model to select the final 20 recommendations. This retrieval-then-rank architecture (Chapter 24) is the standard pattern for production recommendation systems.


5.15 Looking Ahead: Complexity as a Design Language

This chapter has equipped you with the quantitative tools to reason about computational feasibility before writing a single line of model code. The key capabilities:

  1. Estimate the time and space complexity of any algorithm at your target scale.
  2. Profile an existing pipeline to identify the actual bottleneck (which is rarely where you expect it).
  3. Apply approximation techniques (LSH, HNSW, IVF, streaming sketches) when exact solutions are infeasible.
  4. Analyze the computational structure of neural architectures (transformer $O(n^2 d)$, KV-cache, roofline) to predict hardware utilization.
  5. Decide whether a more complex algorithm justifies its additional computational cost.

In Chapter 6, you will build a neural network from scratch — and the complexity analysis from this chapter will tell you exactly how the cost scales with the number of layers, neurons, and training examples. In Chapter 10, when you encounter the transformer architecture, the $O(n^2 d)$ attention analysis from Section 5.8 will explain why long-context models require architectural innovation, not just more hardware. In Part V (Production ML Systems), the profiling, Amdahl's law, and cost estimation techniques from this chapter will be your primary tools for system design.

The discipline is simple: before choosing an algorithm, estimate its cost at your scale. If the cost exceeds your budget, no amount of engineering can save you. If the cost is within budget, engineering determines whether you meet it.

Fundamentals > Frontier: The algorithms in this chapter — hash tables, sorting, approximate nearest neighbors, streaming sketches — are not new. LSH was published in 1998. Count-Min Sketch in 2003. HNSW in 2016. These are mature, well-understood techniques with tight theoretical guarantees. They are also the backbone of every production recommendation system, search engine, and real-time analytics pipeline you have ever used. Understanding them is more valuable than understanding the latest attention variant, because they solve problems that every production system faces and that no neural architecture can wish away.