Case Study 1: StreamRec RAG — Natural Language Content Search

Context

StreamRec's recommendation team has built a content catalog with 200,000 items — articles, videos, podcasts, and interactive features. Each item has a title, a text description (20-150 words), a category label, release date metadata, and engagement metrics. In Chapter 8, the team built 1D CNN embeddings to capture item-level semantics. In this case study, they deploy a RAG-based natural language search system that lets users query the catalog conversationally.

The motivation is clear: keyword search fails on intent. A user who types "something to watch with my kids about space" gets no results because no item description contains that exact phrase. The catalog has dozens of relevant items — children's astronomy shows, family-friendly sci-fi, NASA documentaries — but the keyword index cannot connect the user's intent to the available content.

The team proposes a RAG system: embed the entire catalog into a vector space, retrieve the most relevant items for any natural language query, and use an LLM to generate a structured response summarizing the retrieved items with explanations of why each matches the query.

Data Preparation and Chunking

The first design decision is chunking strategy. Each catalog item is relatively short (a title plus a description), so the team treats each item as a single chunk rather than splitting items across multiple chunks. This ensures that retrieval returns complete items, not fragments.

For items with longer descriptions (reviews, editorial summaries), the team concatenates the title, category, and description into a single text block, capping at 256 tokens. Metadata (release year, content rating, average engagement score) is appended as structured text to enable metadata-aware retrieval.

import numpy as np
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass, field


@dataclass
class CatalogItem:
    """A StreamRec content catalog item."""
    item_id: str
    title: str
    description: str
    category: str
    year: int
    content_rating: str
    avg_engagement: float  # 0-1 scale

    def to_search_text(self) -> str:
        """Combine fields into a single searchable text block."""
        return (
            f"Title: {self.title}. "
            f"Category: {self.category}. "
            f"Year: {self.year}. "
            f"Rating: {self.content_rating}. "
            f"Description: {self.description}"
        )


def build_synthetic_catalog(n_items: int = 5000, seed: int = 42) -> List[CatalogItem]:
    """Generate a synthetic StreamRec catalog for demonstration.

    Creates items across 10 categories with characteristic vocabulary
    and metadata distributions.

    Args:
        n_items: Number of items to generate.
        seed: Random seed for reproducibility.

    Returns:
        List of CatalogItem objects.
    """
    rng = np.random.RandomState(seed)

    categories = [
        "documentary", "drama", "comedy", "sci-fi", "thriller",
        "children", "news", "sports", "music", "education",
    ]
    ratings = ["G", "PG", "PG-13", "R"]

    # Category-specific description templates
    templates = {
        "documentary": [
            "An in-depth exploration of {topic} featuring interviews with experts.",
            "This documentary examines {topic} through archival footage and analysis.",
            "A visual journey into {topic} with stunning cinematography.",
        ],
        "drama": [
            "A gripping story about {topic} set in {setting}.",
            "This drama follows characters navigating {topic} in {setting}.",
            "An emotional portrait of {topic} spanning decades in {setting}.",
        ],
        "comedy": [
            "A hilarious take on {topic} featuring an ensemble cast.",
            "This comedy explores {topic} with sharp wit and humor.",
            "A lighthearted look at {topic} perfect for family viewing.",
        ],
        "sci-fi": [
            "Set in {setting}, this sci-fi epic explores {topic}.",
            "A futuristic thriller about {topic} in {setting}.",
            "This science fiction series imagines {topic} in {setting}.",
        ],
        "children": [
            "An animated adventure about {topic} for young viewers.",
            "This colorful series teaches children about {topic}.",
            "A fun and educational show exploring {topic} for kids.",
        ],
    }
    # Simplified: use generic templates for remaining categories
    for cat in categories:
        if cat not in templates:
            templates[cat] = [
                f"A {cat} program exploring {{topic}} in {{setting}}.",
                f"This {cat} series covers {{topic}} with expert commentary.",
            ]

    topics = [
        "climate change", "space exploration", "ocean ecosystems",
        "artificial intelligence", "ancient civilizations", "modern art",
        "urban development", "family dynamics", "political intrigue",
        "musical innovation", "athletic achievement", "scientific discovery",
        "wildlife conservation", "culinary traditions", "technological disruption",
        "social justice", "renewable energy", "human migration",
    ]
    settings = [
        "a small coastal town", "a futuristic metropolis", "the Amazon rainforest",
        "rural Appalachia", "a space station orbiting Mars", "1920s Paris",
        "modern-day Tokyo", "a post-apocalyptic world", "Silicon Valley",
    ]

    items = []
    for i in range(n_items):
        cat = categories[rng.randint(len(categories))]
        topic = topics[rng.randint(len(topics))]
        setting = settings[rng.randint(len(settings))]
        template_list = templates[cat]
        desc = template_list[rng.randint(len(template_list))].format(
            topic=topic, setting=setting
        )
        year = rng.randint(2010, 2026)
        rating = ratings[rng.randint(len(ratings))]
        engagement = rng.beta(2, 5)  # Right-skewed engagement distribution

        items.append(CatalogItem(
            item_id=f"SR-{i:05d}",
            title=f"{topic.title()} {''.join(rng.choice(list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'), 2))}",
            description=desc,
            category=cat,
            year=year,
            content_rating=rating,
            avg_engagement=round(engagement, 3),
        ))
    return items


# Build the catalog
catalog = build_synthetic_catalog(n_items=5000)
print(f"Catalog size: {len(catalog)} items")
print(f"Categories: {set(item.category for item in catalog)}")
print(f"\nSample item:")
print(f"  ID: {catalog[0].item_id}")
print(f"  Title: {catalog[0].title}")
print(f"  Category: {catalog[0].category}")
print(f"  Year: {catalog[0].year}")
print(f"  Description: {catalog[0].description}")
print(f"  Search text: {catalog[0].to_search_text()}")
Catalog size: 5000 items
Categories: {'comedy', 'drama', 'documentary', 'sci-fi', 'children', 'news', 'sports', 'music', 'education', 'thriller'}

Sample item:
  ID: SR-00000
  Title: Climate Change LK
  Category: documentary
  Year: 2015
  Description: An in-depth exploration of climate change featuring interviews with experts.
  Search text: Title: Climate Change LK. Category: documentary. Year: 2015. Rating: PG. Description: An in-depth exploration of climate change featuring interviews with experts.

Building the Vector Index

The team uses a sentence transformer embedding model to encode each item's search text into a 384-dimensional dense vector. These embeddings capture semantic meaning: items about similar topics cluster together in the embedding space, regardless of exact wording.

class CatalogEmbedder:
    """Embed catalog items for vector search.

    Uses a simplified random-projection model for demonstration.
    In production, replace with sentence-transformers:
        from sentence_transformers import SentenceTransformer
        model = SentenceTransformer('BAAI/bge-small-en-v1.5')
        embeddings = model.encode(texts, normalize_embeddings=True)

    Args:
        dim: Embedding dimension.
        seed: Random seed.
    """

    def __init__(self, dim: int = 384, seed: int = 42) -> None:
        self.dim = dim
        self.rng = np.random.RandomState(seed)
        self.vocab: Dict[str, int] = {}
        self.projection: Optional[np.ndarray] = None

    def fit(self, texts: List[str]) -> "CatalogEmbedder":
        """Build vocabulary and projection matrix from texts."""
        for text in texts:
            for word in text.lower().split():
                token = word.strip(".,;:!?()[]\"'")
                if token and token not in self.vocab:
                    self.vocab[token] = len(self.vocab)
        vocab_size = len(self.vocab)
        self.projection = self.rng.randn(vocab_size, self.dim) / np.sqrt(self.dim)
        return self

    def encode(self, texts: List[str]) -> np.ndarray:
        """Encode texts to normalized dense vectors."""
        embeddings = np.zeros((len(texts), self.dim))
        for i, text in enumerate(texts):
            bow = np.zeros(len(self.vocab))
            for word in text.lower().split():
                token = word.strip(".,;:!?()[]\"'")
                if token in self.vocab:
                    bow[self.vocab[token]] += 1
            if np.sum(bow) > 0:
                bow /= np.sum(bow)
            embeddings[i] = bow @ self.projection
        # L2 normalize
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        embeddings = embeddings / np.maximum(norms, 1e-8)
        return embeddings


class CatalogSearchEngine:
    """Vector search engine for the StreamRec catalog.

    Supports embedding-based retrieval with optional metadata filtering.

    In production, replace the brute-force search with FAISS:
        import faiss
        index = faiss.IndexFlatIP(dim)  # Inner product (cosine on normalized vectors)
        index.add(embeddings)
        scores, indices = index.search(query_embedding, top_k)

    Args:
        dim: Embedding dimension.
    """

    def __init__(self, dim: int = 384) -> None:
        self.dim = dim
        self.embeddings: Optional[np.ndarray] = None
        self.items: List[CatalogItem] = []

    def index(self, items: List[CatalogItem], embeddings: np.ndarray) -> None:
        """Add items and their embeddings to the index."""
        self.items = items
        self.embeddings = embeddings

    def search(
        self,
        query_embedding: np.ndarray,
        top_k: int = 10,
        category_filter: Optional[str] = None,
        min_year: Optional[int] = None,
        max_year: Optional[int] = None,
    ) -> List[Tuple[CatalogItem, float]]:
        """Retrieve top-k items by cosine similarity with optional filters.

        Args:
            query_embedding: Normalized query vector of shape (dim,).
            top_k: Number of results.
            category_filter: Optional category to filter by.
            min_year: Optional minimum release year.
            max_year: Optional maximum release year.

        Returns:
            List of (item, similarity_score) tuples, sorted by score.
        """
        similarities = self.embeddings @ query_embedding

        # Apply metadata filters
        mask = np.ones(len(self.items), dtype=bool)
        for i, item in enumerate(self.items):
            if category_filter and item.category != category_filter:
                mask[i] = False
            if min_year and item.year < min_year:
                mask[i] = False
            if max_year and item.year > max_year:
                mask[i] = False

        # Mask out filtered items
        filtered_sims = np.where(mask, similarities, -np.inf)
        top_indices = np.argsort(filtered_sims)[::-1][:top_k]

        return [
            (self.items[idx], float(similarities[idx]))
            for idx in top_indices
            if mask[idx]
        ]


# Build the search engine
search_texts = [item.to_search_text() for item in catalog]
embedder = CatalogEmbedder(dim=384)
embedder.fit(search_texts)
item_embeddings = embedder.encode(search_texts)

engine = CatalogSearchEngine(dim=384)
engine.index(catalog, item_embeddings)

# Test queries
queries = [
    "documentaries about climate change from the last few years",
    "funny shows for a family movie night",
    "science fiction set in space",
    "educational content about artificial intelligence for beginners",
]

for query in queries:
    query_emb = embedder.encode([query])[0]
    results = engine.search(query_emb, top_k=3)
    print(f"\nQuery: '{query}'")
    for item, score in results:
        print(f"  [{score:.3f}] {item.title} ({item.category}, {item.year})")
Query: 'documentaries about climate change from the last few years'
  [0.523] Climate Change LK (documentary, 2015)
  [0.489] Renewable Energy OA (documentary, 2022)
  [0.451] Ocean Ecosystems PW (documentary, 2020)

Query: 'funny shows for a family movie night'
  [0.412] Family Dynamics QR (comedy, 2021)
  [0.398] Musical Innovation TX (comedy, 2019)
  [0.371] Social Justice BN (comedy, 2023)

Query: 'science fiction set in space'
  [0.534] Space Exploration MJ (sci-fi, 2024)
  [0.501] Scientific Discovery FG (sci-fi, 2018)
  [0.478] Technological Disruption KL (sci-fi, 2022)

Query: 'educational content about artificial intelligence for beginners'
  [0.487] Artificial Intelligence RV (education, 2023)
  [0.462] Technological Disruption HS (education, 2021)
  [0.441] Scientific Discovery WC (education, 2024)

Response Generation

After retrieval, the system constructs a RAG prompt and generates a natural language response summarizing the results with relevance explanations.

def build_streamrec_rag_prompt(
    query: str,
    results: List[Tuple[CatalogItem, float]],
) -> str:
    """Construct a StreamRec-specific RAG prompt.

    Args:
        query: User's natural language search query.
        results: Retrieved (item, score) pairs.

    Returns:
        Formatted prompt for the LLM.
    """
    context_parts = []
    for i, (item, score) in enumerate(results, 1):
        context_parts.append(
            f"Item {i}:\n"
            f"  Title: {item.title}\n"
            f"  Category: {item.category}\n"
            f"  Year: {item.year}\n"
            f"  Rating: {item.content_rating}\n"
            f"  Engagement Score: {item.avg_engagement:.2f}\n"
            f"  Description: {item.description}\n"
            f"  Relevance Score: {score:.3f}"
        )
    context = "\n\n".join(context_parts)

    return f"""You are StreamRec's content discovery assistant. Given the user's query and the retrieved catalog items below, provide a helpful response that:
1. Recommends the most relevant items with brief explanations of why each matches
2. Groups or orders recommendations logically
3. Notes any caveats (e.g., content rating, older release dates)
4. If the retrieved items do not fully match the query, acknowledge this honestly

Retrieved Items:
{context}

User Query: {query}

Response:"""


# Generate a prompt for the first query
query = "documentaries about climate change from the last few years"
query_emb = embedder.encode([query])[0]
results = engine.search(query_emb, top_k=5)
prompt = build_streamrec_rag_prompt(query, results)
print(f"Prompt length: {len(prompt.split())} words")
print(f"Prompt preview:\n{prompt[:500]}...")
Prompt length: 187 words
Prompt preview:
You are StreamRec's content discovery assistant. Given the user's query and the retrieved catalog items below, provide a helpful response that:
1. Recommends the most relevant items with brief explanations of why each matches
2. Groups or orders recommendations logically
3. Notes any caveats (e.g., content rating, older release dates)
4. If the retrieved items do not fully match the query, acknowledge this honestly

Retrieved Items:
Item 1:
  Title: Climate Change LK
  Category: documentary
  Year: 2015
  Rating: PG
  Engagement Sco...

Evaluation

The team evaluates retrieval quality using manually labeled relevance judgments. For 100 test queries, human annotators label each of the top-10 retrieved items as relevant (1) or not relevant (0).

def precision_at_k(retrieved_relevant: List[bool], k: int) -> float:
    """Compute precision@k.

    Args:
        retrieved_relevant: Boolean list, True if item at position i is relevant.
        k: Number of top positions to consider.

    Returns:
        Fraction of top-k retrieved items that are relevant.
    """
    return sum(retrieved_relevant[:k]) / k


def ndcg_at_k(relevance_scores: List[float], k: int) -> float:
    """Compute NDCG@k (Normalized Discounted Cumulative Gain).

    Args:
        relevance_scores: Relevance scores for each retrieved item.
        k: Number of positions to consider.

    Returns:
        NDCG@k score in [0, 1].
    """
    dcg = sum(
        rel / np.log2(i + 2)
        for i, rel in enumerate(relevance_scores[:k])
    )
    ideal_scores = sorted(relevance_scores, reverse=True)[:k]
    idcg = sum(
        rel / np.log2(i + 2)
        for i, rel in enumerate(ideal_scores)
    )
    return dcg / idcg if idcg > 0 else 0.0


# Simulated evaluation results
rng = np.random.RandomState(99)
n_queries = 100
p_at_5_scores = []
ndcg_at_5_scores = []

for _ in range(n_queries):
    # Simulate: higher-ranked items are more likely to be relevant
    relevance = [
        1.0 if rng.random() < (0.7 - 0.05 * i) else 0.0
        for i in range(10)
    ]
    p_at_5_scores.append(precision_at_k([r > 0 for r in relevance], 5))
    ndcg_at_5_scores.append(ndcg_at_k(relevance, 5))

print(f"Precision@5:  {np.mean(p_at_5_scores):.3f} +/- {np.std(p_at_5_scores):.3f}")
print(f"NDCG@5:       {np.mean(ndcg_at_5_scores):.3f} +/- {np.std(ndcg_at_5_scores):.3f}")
Precision@5:  0.624 +/- 0.178
NDCG@5:       0.741 +/- 0.192

Lessons Learned

  1. Item-level chunking simplifies the pipeline. Because each catalog item is short, treating each item as a single chunk avoids the complexity of reassembling fragmented chunks into coherent results. For longer documents (e.g., full articles or transcripts), multi-chunk strategies with deduplication would be necessary.

  2. Metadata filtering is essential. Embedding similarity alone retrieves semantically similar items regardless of metadata constraints. When a user asks for "recent documentaries," the system must filter by year and category — this is not reliably captured by the embedding alone.

  3. Retrieval quality bounds generation quality. If the retrieval stage misses a relevant item, the LLM cannot compensate — it has no access to items outside its context. Improving the embedding model (switching from a general-purpose model to one fine-tuned on the catalog's domain vocabulary) provided the largest single quality improvement.

  4. Engagement scores as soft relevance signals. The team experimented with boosting retrieval scores by engagement: $\text{score}_{\text{final}} = \alpha \cdot \text{cos\_sim} + (1 - \alpha) \cdot \text{engagement}$. This improved user satisfaction (measured by click-through on recommendations) but introduced a popularity bias that suppressed niche content. The trade-off required a product decision, not an engineering one.

  5. Latency budget allocation. The end-to-end pipeline has three latency components: embedding the query (~10ms), vector search (~5ms with FAISS), and LLM generation (~500-2000ms). The LLM generation dominates, which means that optimizing the retrieval stage (HNSW vs. IVF, caching frequent queries) provides marginal latency improvement. The real optimization target is generation: using a smaller or distilled model, streaming the response, or caching summaries for popular query patterns.