Chapter 28 Exercises: Performance Optimization
These exercises are organized into five tiers of increasing difficulty, aligned with Bloom's taxonomy. Complete the exercises in order within each tier before moving to the next tier.
Tier 1: Remember and Understand (Exercises 1–6)
Exercise 1: Performance Vocabulary
Define each of the following terms in one or two sentences, and give a practical example of when each concept matters: - Latency - Throughput - P95 response time - Cache hit ratio - Amdahl's Law - Time complexity (Big-O)
Exercise 2: Profiling Tool Matching
Match each profiling scenario (left) to the most appropriate tool (right):
| Scenario | Tool Options |
|---|---|
| Identify which function consumes the most total time in a script | a. memory_profiler |
| Determine which line in a specific function is slowest | b. cProfile |
| Attach to a running production web server without restarting it | c. py-spy |
| Find which part of a data loading function allocates the most memory | d. line_profiler |
| Generate a visual flame graph of a long-running process | e. tracemalloc |
| Track memory allocations by source line over time | f. py-spy (record mode) |
Exercise 3: Big-O Classification
Classify the time complexity of each code snippet and explain your reasoning:
# Snippet A
def find_max(items: list[int]) -> int:
current_max = items[0]
for item in items:
if item > current_max:
current_max = item
return current_max
# Snippet B
def has_duplicates(items: list[int]) -> bool:
for i in range(len(items)):
for j in range(i + 1, len(items)):
if items[i] == items[j]:
return True
return False
# Snippet C
def binary_search(sorted_list: list[int], target: int) -> int:
low, high = 0, len(sorted_list) - 1
while low <= high:
mid = (low + high) // 2
if sorted_list[mid] == target:
return mid
elif sorted_list[mid] < target:
low = mid + 1
else:
high = mid - 1
return -1
# Snippet D
def get_first(items: list[int]) -> int:
return items[0]
# Snippet E
def power_set(items: list[int]) -> list[list[int]]:
if not items:
return [[]]
rest = power_set(items[1:])
return rest + [[items[0]] + subset for subset in rest]
Exercise 4: Caching Concepts
Answer the following questions about caching:
- What is the difference between
lru_cacheand a TTL-based cache? When would you use each? - What does "cache invalidation" mean, and why is it considered a hard problem?
- Explain the difference between in-process caching and distributed caching. Give an example of each.
- What is a "cache stampede" and how can it be prevented?
Exercise 5: Concurrency Model Selection
For each scenario, identify whether the workload is I/O-bound or CPU-bound, and recommend the best Python concurrency model (asyncio, threading, or multiprocessing). Explain your choice.
- Downloading 500 images from various URLs.
- Resizing 500 images to thumbnail size.
- Making 200 database queries using a synchronous database driver.
- Computing SHA-256 hashes of 10,000 large files.
- Sending 1,000 HTTP API requests and processing the JSON responses.
- Training a machine learning model on a large dataset.
Exercise 6: N+1 Query Identification
Read the following code and answer the questions below:
from flask import jsonify
@app.route("/api/classrooms")
def list_classrooms():
classrooms = Classroom.query.all()
result = []
for classroom in classrooms:
students = Student.query.filter_by(classroom_id=classroom.id).all()
teacher = Teacher.query.get(classroom.teacher_id)
result.append({
"name": classroom.name,
"student_count": len(students),
"teacher_name": teacher.name,
})
return jsonify(result)
- How many database queries does this endpoint execute if there are 50 classrooms?
- Identify the N+1 pattern(s) in this code.
- Write the corrected version using JOINs or eager loading.
Tier 2: Apply (Exercises 7–12)
Exercise 7: Profile a Slow Function
The following function is intentionally slow. Use cProfile to profile it, identify the bottleneck, and then optimize it. Report the before and after timings.
import time
import random
def slow_analysis(data: list[dict]) -> dict:
"""Analyze sales data — intentionally slow."""
# Find top customer
customer_totals = {}
for record in data:
cust = record["customer"]
if cust not in customer_totals:
customer_totals[cust] = 0
customer_totals[cust] += record["amount"]
top_customer = None
top_amount = 0
for cust, total in customer_totals.items():
if total > top_amount:
top_amount = total
top_customer = cust
# Find duplicate transactions (intentionally O(n^2))
duplicates = []
for i in range(len(data)):
for j in range(i + 1, len(data)):
if (data[i]["id"] == data[j]["id"]
and data[i]["amount"] == data[j]["amount"]):
duplicates.append(data[i]["id"])
return {
"top_customer": top_customer,
"top_amount": top_amount,
"duplicate_count": len(duplicates),
}
# Generate test data
data = [
{
"id": random.randint(1, 5000),
"customer": f"customer_{random.randint(1, 100)}",
"amount": round(random.uniform(10, 500), 2),
}
for _ in range(10000)
]
Exercise 8: Implement a TTL Cache Decorator
Write a @ttl_cache decorator that:
1. Caches function results for a configurable duration (default 60 seconds).
2. Supports a configurable maximum cache size.
3. Evicts expired entries when the cache exceeds maxsize.
4. Provides cache_info() and cache_clear() methods on the decorated function.
5. Works with functions that take hashable arguments.
Write tests that verify expiration behavior and cache statistics.
Exercise 9: Async Web Scraper
Build an async web scraper that:
1. Takes a list of URLs.
2. Fetches all URLs concurrently using aiohttp.
3. Respects a configurable concurrency limit (e.g., max 20 simultaneous connections).
4. Handles errors gracefully (timeout, connection error, HTTP error).
5. Returns results as a list of dictionaries with URL, status code, content length, and elapsed time.
Compare the performance of your async implementation against a sequential implementation using requests for 50 URLs.
Exercise 10: Database Query Optimization
Given this database schema:
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
total_amount DECIMAL(10, 2) NOT NULL,
status VARCHAR(20) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255) NOT NULL,
tier VARCHAR(20) NOT NULL DEFAULT 'standard'
);
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
category VARCHAR(50) NOT NULL,
price DECIMAL(10, 2) NOT NULL
);
Write optimized SQL queries (including any necessary indexes) for: 1. Find the top 10 customers by total spending in the last 30 days. 2. Find all orders for "premium" tier customers that are still "pending." 3. Find the most popular product in each category (by total quantity ordered). 4. Find customers who have not placed any orders in the last 90 days.
For each query, explain what indexes you would add and why.
Exercise 11: Generator-Based Data Pipeline
Rewrite the following memory-intensive data pipeline to use generators, so it can process a 10 GB CSV file without exhausting memory:
import csv
import json
def process_data(input_file: str, output_file: str):
# Step 1: Read all rows
with open(input_file, "r") as f:
reader = csv.DictReader(f)
rows = list(reader)
# Step 2: Filter rows
filtered = [row for row in rows if float(row["amount"]) > 100]
# Step 3: Transform rows
transformed = []
for row in filtered:
transformed.append({
"id": int(row["id"]),
"amount_cents": int(float(row["amount"]) * 100),
"category": row["category"].upper(),
})
# Step 4: Write output
with open(output_file, "w") as f:
for record in transformed:
f.write(json.dumps(record) + "\n")
Exercise 12: Microbenchmark Suite
Using timeit, create a benchmark suite that compares:
1. List append vs. deque append (for 100,000 items).
2. List membership test vs. set membership test (for 10,000 lookups in 100,000 items).
3. String concatenation vs. "".join() (for combining 10,000 strings).
4. sorted() vs. heapq.nlargest() for finding the top 10 items in 100,000.
5. Dictionary comprehension vs. manual loop for building a lookup table.
Present results in a formatted table showing the operation, approach, time, and speedup factor.
Tier 3: Analyze (Exercises 13–18)
Exercise 13: Profiling Output Analysis
Given the following cProfile output for a web API endpoint that takes 3.7 seconds:
1284562 function calls (1278901 primitive calls) in 3.712 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.002 0.002 3.712 3.712 views.py:89(generate_dashboard)
25 0.001 0.000 2.891 0.116 models.py:45(get_metrics)
25 2.134 0.085 2.856 0.114 connection.py:178(execute)
1 0.521 0.521 0.521 0.521 charts.py:23(render_charts)
250000 0.145 0.000 0.198 0.000 models.py:78(to_dict)
1 0.034 0.034 0.067 0.034 templates.py:12(render_template)
25000 0.023 0.000 0.023 0.000 {method 'append' of 'list'}
1 0.001 0.001 0.001 0.001 {built-in method json.dumps}
Answer:
1. What is the primary bottleneck? What percentage of total time does it consume?
2. Why is connection.py:178(execute) called 25 times? What pattern does this suggest?
3. Why is models.py:78(to_dict) called 250,000 times? Is this a concern?
4. Propose three specific optimizations, ranked by expected impact.
5. After fixing the top bottleneck, what would you expect the new total time to be?
Exercise 14: Memory Profile Interpretation
Analyze this memory_profiler output and answer the questions:
Filename: data_processor.py
Line # Mem usage Increment Occurrences Line Contents
=============================================================
10 45.2 MiB 45.2 MiB 1 @profile
11 def process_file(filepath):
12 345.8 MiB 300.6 MiB 1 data = json.load(open(filepath))
13 645.2 MiB 299.4 MiB 1 records = [transform(r) for r in data]
14 645.3 MiB 0.1 MiB 1 filtered = [r for r in records if r["active"]]
15 645.3 MiB 0.0 MiB 1 result = aggregate(filtered)
16 645.3 MiB 0.0 MiB 1 return result
- What is the peak memory usage?
- Which line causes the largest memory allocation, and why?
- Why does the list comprehension on line 13 nearly double the memory?
- Propose a refactored version that uses less than 100 MB for the same input.
- What Python tools or techniques would you use to verify your optimization works?
Exercise 15: Concurrency Bottleneck Analysis
A developer reports that their threaded web scraper is not faster than the sequential version. Here is their code:
import threading
import requests
results = []
lock = threading.Lock()
def fetch_and_process(url):
response = requests.get(url)
data = response.json()
processed = heavy_computation(data) # CPU-intensive: ~500ms
with lock:
results.append(processed)
threads = []
for url in urls: # 100 URLs
t = threading.Thread(target=fetch_and_process, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
- Explain why this code does not achieve the expected speedup.
- Identify two separate performance issues.
- Propose a redesigned solution that addresses both issues.
- Estimate the expected speedup of your solution compared to sequential execution.
Exercise 16: Cache Strategy Evaluation
An e-commerce application has these endpoints with the following characteristics:
| Endpoint | Calls/sec | DB Time | Changes How Often | Staleness Tolerance |
|---|---|---|---|---|
| GET /products | 500 | 200ms | Hourly | 5 minutes |
| GET /product/{id} | 1000 | 50ms | Daily | 1 minute |
| GET /cart/{user_id} | 200 | 30ms | Per request | 0 (real-time) |
| GET /recommendations/{user_id} | 100 | 2s | Hourly | 30 minutes |
| GET /search?q=... | 300 | 500ms | Continuous | 10 seconds |
For each endpoint, recommend: 1. Whether to cache or not. 2. What type of cache (in-process, Redis, HTTP, CDN). 3. TTL setting. 4. Cache invalidation strategy. 5. Expected performance improvement.
Justify each recommendation.
Exercise 17: Algorithm Comparison Analysis
A colleague proposes two solutions for finding common elements between two large lists. Analyze both:
# Solution A
def find_common_a(list_a: list[int], list_b: list[int]) -> list[int]:
return [x for x in list_a if x in list_b]
# Solution B
def find_common_b(list_a: list[int], list_b: list[int]) -> list[int]:
set_b = set(list_b)
return [x for x in list_a if x in set_b]
- What is the time complexity of each solution?
- For
len(list_a) = 10,000andlen(list_b) = 10,000, estimate the execution time ratio between Solution A and Solution B. - When (if ever) would Solution A be faster than Solution B?
- Write a Solution C that handles the case where both lists may contain duplicates and you want to preserve duplicate counts in the output.
- Benchmark all three solutions and present the results.
Exercise 18: Load Test Results Interpretation
A Locust load test produced these results for an API:
| Users | RPS | Median (ms) | P95 (ms) | P99 (ms) | Error % |
|---|---|---|---|---|---|
| 10 | 45 | 22 | 89 | 145 | 0.0 |
| 50 | 180 | 28 | 112 | 234 | 0.0 |
| 100 | 310 | 35 | 178 | 456 | 0.1 |
| 200 | 420 | 89 | 890 | 2340 | 2.3 |
| 500 | 380 | 456 | 4500 | 12000 | 15.7 |
| 1000 | 290 | 2300 | 15000 | 30000 | 42.1 |
- At what user count does the system begin to degrade?
- What is the maximum sustainable throughput (RPS)?
- What pattern do you observe in how P95 and P99 scale compared to median?
- What likely causes the RPS to decrease at 500 and 1000 users?
- Propose three infrastructure or application changes to increase capacity.
Tier 4: Evaluate (Exercises 19–24)
Exercise 19: Optimization Strategy Debate
Your team is debating how to optimize a dashboard endpoint that aggregates data from five microservices, each taking 200-500ms. The current sequential implementation takes 1.8 seconds. Two proposals are on the table:
Proposal A: Use asyncio.gather() to fetch from all five services concurrently.
Proposal B: Pre-compute the dashboard data every 5 minutes and serve from cache.
Write a detailed evaluation of both proposals covering: 1. Expected performance improvement. 2. Implementation complexity. 3. Data freshness implications. 4. Failure mode behavior (what happens when one service is down). 5. Operational overhead. 6. Your recommendation and justification.
Exercise 20: Premature Optimization Judgment Calls
For each scenario, decide whether optimizing now is premature or justified. Defend your answer with specific reasoning.
- Your startup's MVP has 10 beta users. The main page loads in 3 seconds. A developer wants to spend a week adding Redis caching.
- Your company's ETL pipeline processes 10,000 records in 45 minutes. It runs nightly. The dataset grows 50% annually.
- An API endpoint responds in 800ms. It is called 10,000 times per day by paying customers. The SLA promises < 500ms.
- A developer proposes replacing all list comprehensions with generator expressions across the codebase "for memory efficiency."
- A background job sends email notifications. Each email takes 2 seconds to send. Currently there are 50 users, but the product is launching publicly next month expecting 10,000 users.
Exercise 21: Caching vs. Query Optimization Trade-off
An API endpoint queries a PostgreSQL database for aggregated sales statistics. The query takes 4 seconds. You have two optimization options:
Option A: Optimize the SQL query by adding indexes, rewriting subqueries as CTEs, and denormalizing a frequently joined table. Estimated result: 200ms query time.
Option B: Cache the query result in Redis with a 10-minute TTL. Estimated result: 2ms for cache hits (95% of requests), 4 seconds for cache misses (5% of requests).
Evaluate both options considering: 1. Average response time under each option. 2. Worst-case response time. 3. Implementation effort and risk. 4. Long-term maintenance burden. 5. Impact on data freshness. 6. Which would you implement first, and why?
Exercise 22: Concurrency Model Evaluation
A data processing application needs to: 1. Read records from a REST API (I/O-bound, 50ms per request, 10,000 records). 2. Transform each record using a complex algorithm (CPU-bound, 20ms per record). 3. Write results to a database (I/O-bound, 5ms per write, batches of 100).
Design the optimal pipeline architecture. Consider: - Which concurrency model to use for each stage. - How to connect the stages (queues, buffers, etc.). - How to handle backpressure (fast producer, slow consumer). - Expected total processing time vs. sequential processing. - Error handling and retry strategies.
Exercise 23: Performance Budget Allocation
Your team is building a new e-commerce checkout flow. The total end-to-end target is 2 seconds. The flow involves:
- Validate cart items (check stock availability).
- Calculate tax and shipping.
- Process payment (external API).
- Create order in database.
- Send confirmation email.
- Update inventory.
Allocate a performance budget (in milliseconds) to each step. For each allocation: - Justify why that step gets that amount of time. - Identify which steps can run in parallel. - Describe what you would do if a step exceeds its budget. - Identify which steps can be deferred (not blocking the user response).
Exercise 24: Code Review for Performance
Review the following Flask application code for performance issues. Identify at least 8 problems, explain why each is a problem, and provide the corrected code.
from flask import Flask, jsonify, request
import json
import time
app = Flask(__name__)
@app.route("/api/analytics/daily")
def daily_analytics():
date = request.args.get("date", "2024-01-01")
# Get all events
events = db.query("SELECT * FROM events")
# Filter by date in Python
daily_events = [e for e in events if str(e["created_at"]).startswith(date)]
# Count by type
type_counts = {}
for event in daily_events:
event_type = event["type"]
if event_type in type_counts:
type_counts[event_type] = type_counts[event_type] + 1
else:
type_counts[event_type] = 1
# Get user details for each event
enriched_events = []
for event in daily_events:
user = db.query(
"SELECT * FROM users WHERE id = " + str(event["user_id"])
)
event["user"] = user[0] if user else None
enriched_events.append(event)
# Sort by timestamp
enriched_events.sort(key=lambda e: e["created_at"], reverse=True)
# Convert to JSON and back to ensure serializable
result = json.loads(json.dumps({
"date": date,
"total_events": len(daily_events),
"type_counts": type_counts,
"events": enriched_events,
}, default=str))
return jsonify(result)
Tier 5: Create (Exercises 25–30)
Exercise 25: Build a Performance Profiling Wrapper
Create a Python decorator/context manager library called perfwatch that:
1. Measures wall-clock time, CPU time, and peak memory usage for any function.
2. Supports nested measurements (function A calls function B, both measured).
3. Outputs a hierarchical timing report showing the call tree.
4. Supports configurable thresholds that trigger warnings for slow functions.
5. Can export results as JSON for integration with monitoring systems.
6. Has minimal overhead (< 5% when enabled, zero when disabled).
Write comprehensive tests and a usage example.
Exercise 26: Implement a Multi-Level Cache
Build a MultiLevelCache class that:
1. Checks an L1 in-process cache first (fastest, smallest).
2. Falls back to an L2 Redis cache (fast, shared across processes).
3. Falls back to the original function call on cache miss.
4. Promotes results: L2 hit copies to L1, original call copies to both.
5. Supports independent TTLs for L1 and L2.
6. Provides statistics (hit rates, miss rates, latency per level).
7. Supports cache invalidation by key pattern.
Include a decorator interface: @multi_cache(l1_ttl=30, l2_ttl=300, l1_maxsize=1000).
Exercise 27: Build a Load Testing Framework
Create a simplified load testing framework inspired by Locust that: 1. Supports defining user scenarios as Python classes. 2. Ramping up users gradually over a configurable period. 3. Running multiple request types with configurable weights. 4. Collecting and reporting latency percentiles (P50, P95, P99), throughput, and error rates. 5. Detecting performance degradation automatically (e.g., P95 increases by more than 2x). 6. Outputting results as both a terminal summary and a JSON report.
Exercise 28: Async Pipeline Framework
Build an async data processing pipeline framework that: 1. Allows defining pipeline stages as async functions. 2. Connects stages with bounded async queues for backpressure. 3. Supports configurable concurrency per stage. 4. Handles errors with configurable retry policies. 5. Provides real-time metrics (items/second, queue depths, error rates). 6. Supports graceful shutdown (drain queues, finish in-progress items).
Demonstrate it with a pipeline that: fetches URLs → parses HTML → extracts data → stores results.
Exercise 29: Database Query Analyzer
Build a tool that: 1. Intercepts all database queries made by a Python application. 2. Logs each query with its execution time, parameters, and caller location. 3. Detects N+1 query patterns automatically (similar queries repeated many times with different parameters). 4. Detects slow queries (above configurable threshold). 5. Detects missing indexes by analyzing query patterns. 6. Generates a summary report with recommendations.
This should work as a middleware or context manager, not requiring changes to existing query code.
Exercise 30: Full Performance Optimization Project
Take the following deliberately unoptimized web application and systematically optimize it to handle 10x the load. Document every step.
from flask import Flask, jsonify
import sqlite3
import time
app = Flask(__name__)
def get_db():
return sqlite3.connect("store.db")
@app.route("/api/dashboard")
def dashboard():
db = get_db()
cursor = db.cursor()
# Get all products
products = cursor.execute("SELECT * FROM products").fetchall()
product_list = []
for product in products:
# Get reviews for each product
reviews = cursor.execute(
"SELECT * FROM reviews WHERE product_id = ?", (product[0],)
).fetchall()
# Calculate average rating
if reviews:
avg_rating = sum(r[3] for r in reviews) / len(reviews)
else:
avg_rating = 0
# Get category name
category = cursor.execute(
"SELECT name FROM categories WHERE id = ?", (product[2],)
).fetchone()
product_list.append({
"id": product[0],
"name": product[1],
"category": category[0] if category else "Unknown",
"avg_rating": round(avg_rating, 2),
"review_count": len(reviews),
"price": product[3],
})
# Sort by rating
product_list.sort(key=lambda p: p["avg_rating"], reverse=True)
db.close()
return jsonify(product_list[:20])
Your deliverables: 1. Profile the original application and document bottlenecks. 2. Apply at least 5 distinct optimization techniques. 3. Write a load test using Locust. 4. Benchmark before and after each optimization. 5. Write a performance optimization report documenting the process, results, and trade-offs.
Submission Guidelines
For each exercise: - Include all source code, fully runnable. - Include profiling or benchmarking output showing measured improvements. - Explain why each optimization works, not just what you changed. - For analysis exercises, support your conclusions with data from the exercise. - When asking an AI assistant for help, share measurement data rather than just code.