Appendix G: Python Surveillance Analytics

Computational Tools for Understanding Watching and Being Watched


Getting Started

This appendix provides working Python code for analyzing surveillance-related data. Each section connects to specific chapters and is designed to develop both programming skills and conceptual understanding of surveillance phenomena.

Installing Required Libraries

All code in this appendix requires Python 3.8 or higher. Install required libraries using pip:

pip install pandas matplotlib networkx numpy scipy

For a clean environment, consider using a virtual environment:

# Create virtual environment
python -m venv surveillance-env

# Activate (Mac/Linux)
source surveillance-env/bin/activate

# Activate (Windows)
surveillance-env\Scripts\activate

# Install libraries
pip install pandas matplotlib networkx numpy scipy

For Jupyter notebooks (recommended for interactive exploration):

pip install jupyter
jupyter notebook

A Note on Data Ethics

The code examples in this appendix use either synthetic (artificially generated) data or describe how to work with public datasets. When you extend these examples to real data:

  1. Obtain data through legitimate, legal means with appropriate permissions.
  2. Anonymize or de-identify data when not working with your own data.
  3. Be aware that even "anonymous" data can often be re-identified (this is discussed conceptually throughout the textbook).
  4. The tools in this appendix are presented for analytical and educational purposes — for understanding how surveillance works, not for conducting surveillance.

Section 1: Location Data Analysis

Connecting to Chapter 18 — Smartphone Tracking

Why This Matters

Your smartphone continuously generates location data. If you have an Android device with Google services enabled, Google's Location History (accessible through "Timeline") maintains a record of your movements. Apple similarly maintains location data. This data is among the most sensitive in existence — it can reveal where you live, where you work, your religious practice, your medical appointments, your political associations, and your romantic relationships.

Google Takeout allows you to download your own location history as JSON files. This section demonstrates how to analyze such data — both to understand your own surveillance exposure and to understand how intelligence analysts and data brokers analyze location data.

"""
Section 1: Location Data Analysis
Connects to Chapter 18: The Smartphone in Your Pocket

This code demonstrates:
- Loading and parsing Google Takeout location history
- Inferring "home" and "work" locations from frequency/timing
- Visualizing movement patterns
- Calculating time spent in different locations

For demonstration, we generate synthetic location data resembling
Google Takeout format. Substitute real data from Google Takeout
(Settings > Data & Privacy > Data Export > Location History)
if you have it.
"""

import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from datetime import datetime, timedelta
import random
import math

# ----------------------------------------------------------------
# PART 1: Generate Synthetic Location History (Resembles Google Takeout)
# ----------------------------------------------------------------

def generate_synthetic_location_history(days=30, seed=42):
    """
    Generate synthetic location data resembling Google Takeout format.

    In real Google Takeout data, the structure is:
    {
        "locations": [
            {
                "timestampMs": "1580000000000",
                "latitudeE7": 407589540,   # latitude * 1e7
                "longitudeE7": -739794560, # longitude * 1e7
                "accuracy": 15
            },
            ...
        ]
    }

    We simulate a person commuting between home and work with
    occasional other locations (shops, restaurants, gym).
    """
    random.seed(seed)
    np.random.seed(seed)

    # Define key locations for our synthetic person
    # Coordinates in a fictional mid-sized city
    home_lat, home_lon = 40.7128, -74.0060       # Home
    work_lat, work_lon = 40.7589, -73.9851        # Workplace
    gym_lat, gym_lon = 40.7450, -73.9910         # Gym
    grocery_lat, grocery_lon = 40.7200, -74.0010 # Grocery store
    coffee_lat, coffee_lon = 40.7580, -73.9870   # Coffee shop near work

    locations = []

    # Base timestamp: 30 days ago
    base_time = datetime.now() - timedelta(days=days)

    for day in range(days):
        current_date = base_time + timedelta(days=day)
        is_weekday = current_date.weekday() < 5

        # Simulate a day of movements
        if is_weekday:
            # Home in the morning (6am-8am)
            for hour in [6, 7]:
                timestamp_ms = int((current_date + timedelta(hours=hour)).timestamp() * 1000)
                lat = home_lat + random.gauss(0, 0.0005)
                lon = home_lon + random.gauss(0, 0.0005)
                locations.append({
                    "timestampMs": str(timestamp_ms),
                    "latitudeE7": int(lat * 1e7),
                    "longitudeE7": int(lon * 1e7),
                    "accuracy": random.randint(5, 30)
                })

            # Commute (8am) — points between home and work
            for step in range(5):
                fraction = step / 4
                timestamp_ms = int((current_date + timedelta(hours=8, minutes=step*8)).timestamp() * 1000)
                lat = home_lat + fraction * (work_lat - home_lat) + random.gauss(0, 0.001)
                lon = home_lon + fraction * (work_lon - home_lon) + random.gauss(0, 0.001)
                locations.append({
                    "timestampMs": str(timestamp_ms),
                    "latitudeE7": int(lat * 1e7),
                    "longitudeE7": int(lon * 1e7),
                    "accuracy": random.randint(20, 100)  # Lower accuracy during transit
                })

            # Coffee shop (8:40am)
            if random.random() < 0.6:
                timestamp_ms = int((current_date + timedelta(hours=8, minutes=40)).timestamp() * 1000)
                lat = coffee_lat + random.gauss(0, 0.0003)
                lon = coffee_lon + random.gauss(0, 0.0003)
                locations.append({
                    "timestampMs": str(timestamp_ms),
                    "latitudeE7": int(lat * 1e7),
                    "longitudeE7": int(lon * 1e7),
                    "accuracy": random.randint(5, 15)
                })

            # At work (9am-5pm)
            for hour in range(9, 17):
                timestamp_ms = int((current_date + timedelta(hours=hour)).timestamp() * 1000)
                lat = work_lat + random.gauss(0, 0.0002)
                lon = work_lon + random.gauss(0, 0.0002)
                locations.append({
                    "timestampMs": str(timestamp_ms),
                    "latitudeE7": int(lat * 1e7),
                    "longitudeE7": int(lon * 1e7),
                    "accuracy": random.randint(5, 20)
                })

            # Evening: gym or home
            if random.random() < 0.4:
                # Gym (5:30pm)
                timestamp_ms = int((current_date + timedelta(hours=17, minutes=30)).timestamp() * 1000)
                lat = gym_lat + random.gauss(0, 0.0003)
                lon = gym_lon + random.gauss(0, 0.0003)
                locations.append({
                    "timestampMs": str(timestamp_ms),
                    "latitudeE7": int(lat * 1e7),
                    "longitudeE7": int(lon * 1e7),
                    "accuracy": random.randint(5, 15)
                })

            # Home in the evening (7pm-11pm)
            for hour in [19, 20, 21, 22]:
                timestamp_ms = int((current_date + timedelta(hours=hour)).timestamp() * 1000)
                lat = home_lat + random.gauss(0, 0.0003)
                lon = home_lon + random.gauss(0, 0.0003)
                locations.append({
                    "timestampMs": str(timestamp_ms),
                    "latitudeE7": int(lat * 1e7),
                    "longitudeE7": int(lon * 1e7),
                    "accuracy": random.randint(5, 20)
                })

        else:
            # Weekend: mostly home, grocery run
            for hour in [9, 10, 11, 14, 15, 19, 20]:
                if random.random() < 0.7:
                    timestamp_ms = int((current_date + timedelta(hours=hour)).timestamp() * 1000)
                    lat = home_lat + random.gauss(0, 0.0005)
                    lon = home_lon + random.gauss(0, 0.0005)
                    locations.append({
                        "timestampMs": str(timestamp_ms),
                        "latitudeE7": int(lat * 1e7),
                        "longitudeE7": int(lon * 1e7),
                        "accuracy": random.randint(5, 20)
                    })

            # Grocery (Saturday)
            if current_date.weekday() == 5 and random.random() < 0.7:
                timestamp_ms = int((current_date + timedelta(hours=11, minutes=30)).timestamp() * 1000)
                lat = grocery_lat + random.gauss(0, 0.0003)
                lon = grocery_lon + random.gauss(0, 0.0003)
                locations.append({
                    "timestampMs": str(timestamp_ms),
                    "latitudeE7": int(lat * 1e7),
                    "longitudeE7": int(lon * 1e7),
                    "accuracy": random.randint(5, 15)
                })

    return {"locations": sorted(locations, key=lambda x: int(x["timestampMs"]))}


# ----------------------------------------------------------------
# PART 2: Parse and Analyze Location Data
# ----------------------------------------------------------------

def parse_location_data(location_json):
    """
    Parse location JSON into a pandas DataFrame.
    Normalizes coordinates from E7 format to decimal degrees.
    """
    records = []
    for entry in location_json["locations"]:
        records.append({
            "timestamp": datetime.fromtimestamp(int(entry["timestampMs"]) / 1000),
            "latitude": entry["latitudeE7"] / 1e7,
            "longitude": entry["longitudeE7"] / 1e7,
            "accuracy": entry.get("accuracy", None)
        })

    df = pd.DataFrame(records)
    df["hour"] = df["timestamp"].dt.hour
    df["day_of_week"] = df["timestamp"].dt.day_name()
    df["is_weekday"] = df["timestamp"].dt.weekday < 5
    return df


def haversine_distance(lat1, lon1, lat2, lon2):
    """
    Calculate distance in meters between two GPS coordinates.
    Uses the Haversine formula.
    """
    R = 6371000  # Earth's radius in meters
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = math.sin(dphi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda/2)**2
    return 2 * R * math.asin(math.sqrt(a))


def infer_home_and_work(df, home_hours=(22, 6), work_hours=(9, 17)):
    """
    Infer home and work locations from temporal patterns.

    Logic (mirrors what data brokers actually do):
    - "Home" = most common location during late night/early morning hours
    - "Work" = most common location during business hours on weekdays

    This demonstrates how 'home' and 'work' can be inferred from location
    data without the user ever explicitly providing this information.
    """
    # Night hours: 10pm to 6am (likely home)
    night_data = df[
        (df["hour"] >= home_hours[0]) | (df["hour"] <= home_hours[1])
    ].copy()

    # Business hours on weekdays (likely work)
    work_data = df[
        df["is_weekday"] &
        (df["hour"] >= work_hours[0]) &
        (df["hour"] <= work_hours[1])
    ].copy()

    # Round to ~100m precision for clustering
    def round_coord(x, precision=3):
        return round(x, precision)

    # Infer home: most frequent rounded coordinate during night
    if len(night_data) > 0:
        night_data["lat_r"] = night_data["latitude"].apply(lambda x: round_coord(x))
        night_data["lon_r"] = night_data["longitude"].apply(lambda x: round_coord(x))
        home_counts = Counter(zip(night_data["lat_r"], night_data["lon_r"]))
        home_lat, home_lon = home_counts.most_common(1)[0][0]
    else:
        home_lat, home_lon = None, None

    # Infer work: most frequent rounded coordinate during work hours
    if len(work_data) > 0:
        work_data["lat_r"] = work_data["latitude"].apply(lambda x: round_coord(x))
        work_data["lon_r"] = work_data["longitude"].apply(lambda x: round_coord(x))
        work_counts = Counter(zip(work_data["lat_r"], work_data["lon_r"]))
        work_lat, work_lon = work_counts.most_common(1)[0][0]
    else:
        work_lat, work_lon = None, None

    return {
        "home": (home_lat, home_lon),
        "work": (work_lat, work_lon)
    }


def calculate_time_at_locations(df, locations, radius_meters=150):
    """
    Calculate time spent near each known location.
    Demonstrates how location data enables detailed behavioral profiling.
    """
    df_sorted = df.sort_values("timestamp").reset_index(drop=True)
    time_at_location = {name: timedelta(0) for name in locations}

    for i in range(len(df_sorted) - 1):
        row = df_sorted.iloc[i]
        time_gap = df_sorted.iloc[i+1]["timestamp"] - row["timestamp"]

        # Cap gaps at 2 hours (don't count sleeping as time at one location)
        if time_gap > timedelta(hours=2):
            time_gap = timedelta(hours=2)

        for name, (loc_lat, loc_lon) in locations.items():
            dist = haversine_distance(row["latitude"], row["longitude"], loc_lat, loc_lon)
            if dist <= radius_meters:
                time_at_location[name] += time_gap

    return {name: td.total_seconds() / 3600 for name, td in time_at_location.items()}


def visualize_location_patterns(df):
    """
    Create visualizations of movement and temporal patterns.
    """
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    fig.suptitle("Location Data Analysis — What Your Location History Reveals", fontsize=13)

    # Plot 1: Scatter of all locations
    ax1 = axes[0]
    weekday_data = df[df["is_weekday"]]
    weekend_data = df[~df["is_weekday"]]

    ax1.scatter(weekend_data["longitude"], weekend_data["latitude"],
                alpha=0.3, s=10, color="steelblue", label="Weekend")
    ax1.scatter(weekday_data["longitude"], weekday_data["latitude"],
                alpha=0.3, s=10, color="salmon", label="Weekday")
    ax1.set_xlabel("Longitude")
    ax1.set_ylabel("Latitude")
    ax1.set_title("All Location Points (30 Days)")
    ax1.legend()

    # Plot 2: Hourly activity pattern
    ax2 = axes[1]
    hourly_counts = df.groupby("hour").size()
    hours = range(24)
    counts = [hourly_counts.get(h, 0) for h in hours]

    colors = ["navy" if (h >= 22 or h <= 6) else
              "darkorange" if (9 <= h <= 17) else
              "gray" for h in hours]

    ax2.bar(hours, counts, color=colors, alpha=0.7, edgecolor="white")
    ax2.set_xlabel("Hour of Day")
    ax2.set_ylabel("Number of Location Points")
    ax2.set_title("When Location Data Is Generated\n(Navy=Night, Orange=Work Hours)")
    ax2.set_xticks(range(0, 24, 2))

    plt.tight_layout()
    plt.savefig("location_analysis.png", dpi=150, bbox_inches="tight")
    plt.show()
    print("Chart saved as location_analysis.png")


# ----------------------------------------------------------------
# PART 3: Run the Analysis
# ----------------------------------------------------------------

def run_location_analysis():
    print("=" * 60)
    print("LOCATION DATA ANALYSIS DEMONSTRATION")
    print("Connecting to Chapter 18: The Smartphone in Your Pocket")
    print("=" * 60)

    # Generate synthetic data
    print("\n[1] Generating 30 days of synthetic location history...")
    location_json = generate_synthetic_location_history(days=30)
    print(f"    Generated {len(location_json['locations'])} location points")

    # Parse data
    df = parse_location_data(location_json)
    print(f"\n[2] Data summary:")
    print(f"    Date range: {df['timestamp'].min().date()} to {df['timestamp'].max().date()}")
    print(f"    Total points: {len(df)}")
    print(f"    Lat range: {df['latitude'].min():.4f} to {df['latitude'].max():.4f}")
    print(f"    Lon range: {df['longitude'].min():.4f} to {df['longitude'].max():.4f}")

    # Infer home and work
    print("\n[3] Inferring 'home' and 'work' from temporal patterns...")
    print("    (This is how data brokers infer sensitive attributes)")
    inferred = infer_home_and_work(df)
    print(f"    Inferred HOME:  lat={inferred['home'][0]:.4f}, lon={inferred['home'][1]:.4f}")
    print(f"    Inferred WORK:  lat={inferred['work'][0]:.4f}, lon={inferred['work'][1]:.4f}")

    # Known locations (in real analysis, compared to inferred)
    known_locations = {
        "Home": (40.7128, -74.0060),
        "Workplace": (40.7589, -73.9851),
        "Gym": (40.7450, -73.9910),
        "Grocery Store": (40.7200, -74.0010),
        "Coffee Shop": (40.7580, -73.9870)
    }

    # Calculate time at locations
    print("\n[4] Time spent at key locations:")
    time_at = calculate_time_at_locations(df, known_locations)
    for loc, hours in sorted(time_at.items(), key=lambda x: -x[1]):
        print(f"    {loc:<20} {hours:.1f} hours over 30 days")

    # Visualize
    print("\n[5] Generating visualizations...")
    visualize_location_patterns(df)

    print("\n--- SURVEILLANCE IMPLICATIONS ---")
    print("From 30 days of location data, an analyst can determine:")
    print("  - Where you live (home address)")
    print("  - Where you work (employer)")
    print("  - Your exercise habits (gym attendance)")
    print("  - Your shopping patterns (grocery store)")
    print("  - Your daily routine with hourly precision")
    print("  - Deviations from routine (medical appointments, unusual travel)")
    print("\nThis data is commercially available from your phone's apps,")
    print("sold by data brokers, and accessible to law enforcement.")

    return df


if __name__ == "__main__":
    df = run_location_analysis()

Sample Output:

LOCATION DATA ANALYSIS DEMONSTRATION
Generated 1,847 location points over 30 days
Inferred HOME:  lat=40.7128, lon=-74.0060
Inferred WORK:  lat=40.7589, lon=-73.9851
Time at Home: 312.4 hours
Time at Workplace: 198.7 hours
Time at Gym: 14.2 hours

Discussion Questions: 1. The inference of "home" and "work" from temporal patterns requires no explicit disclosure — the person never tells the app where they live. What other attributes could be inferred from location patterns? 2. This analysis uses only 30 days of data. How would the analysis change with six months? Five years? 3. Who currently has access to data like this about you? Check your phone's location permissions settings.


Section 2: Urban Foot Traffic Analysis

Connecting to Chapter 25 — Smart Cities

Why This Matters

Smart city infrastructure collects ambient data about urban movement. Many cities publish anonymized pedestrian count data from sensors. This section demonstrates how to analyze such data — and how temporal pattern analysis reveals city rhythms that are commercially and governmentally valuable.

"""
Section 2: Urban Foot Traffic Analysis
Connects to Chapter 25: The Smart City

This code demonstrates:
- Loading pedestrian sensor count data
- Temporal pattern analysis (hourly, daily, weekly)
- Anomaly detection
- What public movement data reveals about urban life
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import random

def generate_pedestrian_data(days=90, seed=123):
    """
    Generate synthetic pedestrian count data resembling smart city sensor output.
    Simulates multiple sensors at different location types.
    """
    random.seed(seed)
    np.random.seed(seed)

    # Location types with different traffic patterns
    sensors = {
        "Downtown_Business_District": {
            "weekday_peak_hour": 12,   # Lunch rush
            "weekday_morning_peak": 8, # Commute
            "weekday_evening_peak": 17,
            "base_weekday": 200,
            "base_weekend": 80,
            "morning_factor": 0.8,
            "lunch_factor": 1.5,
            "evening_factor": 1.2
        },
        "Transit_Hub": {
            "weekday_peak_hour": 8,
            "weekday_morning_peak": 8,
            "weekday_evening_peak": 17,
            "base_weekday": 500,
            "base_weekend": 200,
            "morning_factor": 2.5,
            "lunch_factor": 1.0,
            "evening_factor": 2.0
        },
        "Residential_Street": {
            "weekday_peak_hour": 17,
            "weekday_morning_peak": 7,
            "weekday_evening_peak": 18,
            "base_weekday": 50,
            "base_weekend": 70,
            "morning_factor": 1.2,
            "lunch_factor": 0.8,
            "evening_factor": 1.8
        }
    }

    records = []
    base_date = datetime.now() - timedelta(days=days)

    for day in range(days):
        current_date = base_date + timedelta(days=day)
        is_weekday = current_date.weekday() < 5
        is_holiday = (current_date.month == 12 and current_date.day in [25, 26])

        # Random event (protest, concert) on ~2% of days
        special_event = random.random() < 0.02
        event_sensor = random.choice(list(sensors.keys())) if special_event else None

        for hour in range(24):
            for sensor_name, params in sensors.items():
                base = params["base_weekday"] if is_weekday else params["base_weekend"]

                # Holiday effect
                if is_holiday:
                    base *= 0.3

                # Hour-of-day adjustment
                if is_weekday:
                    if hour == params["weekday_morning_peak"]:
                        count = base * params["morning_factor"]
                    elif hour == params["weekday_peak_hour"]:
                        count = base * params["lunch_factor"]
                    elif hour == params["weekday_evening_peak"]:
                        count = base * params["evening_factor"]
                    elif 0 <= hour <= 5:
                        count = base * 0.05  # Late night
                    elif 6 <= hour <= 22:
                        count = base * (0.5 + 0.5 * math.sin(math.pi * (hour - 6) / 16))
                    else:
                        count = base * 0.1
                else:
                    # Weekend: flatter curve, midday peak
                    if 10 <= hour <= 16:
                        count = base * 1.2
                    elif 0 <= hour <= 6:
                        count = base * 0.05
                    else:
                        count = base * 0.6

                # Special event: spike at that sensor
                if special_event and sensor_name == event_sensor and 14 <= hour <= 20:
                    count *= random.uniform(3.0, 6.0)

                # Add noise
                count = max(0, int(count + random.gauss(0, count * 0.15)))

                records.append({
                    "timestamp": current_date + timedelta(hours=hour),
                    "sensor": sensor_name,
                    "count": count,
                    "is_weekday": is_weekday,
                    "hour": hour,
                    "day_of_week": current_date.strftime("%A"),
                    "date": current_date.date()
                })

    return pd.DataFrame(records)


def analyze_temporal_patterns(df):
    """Analyze and visualize temporal traffic patterns."""

    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    fig.suptitle("Urban Foot Traffic Patterns — What Smart City Data Reveals", fontsize=13)

    sensors = df["sensor"].unique()
    colors = ["steelblue", "coral", "mediumseagreen"]

    # Plot 1: Average hourly pattern by sensor (weekdays)
    ax1 = axes[0, 0]
    weekday_df = df[df["is_weekday"]]
    hourly = weekday_df.groupby(["sensor", "hour"])["count"].mean().reset_index()

    for sensor, color in zip(sensors, colors):
        sensor_data = hourly[hourly["sensor"] == sensor]
        label = sensor.replace("_", " ")
        ax1.plot(sensor_data["hour"], sensor_data["count"],
                 label=label, color=color, linewidth=2, marker="o", markersize=3)

    ax1.set_xlabel("Hour of Day")
    ax1.set_ylabel("Average Pedestrian Count")
    ax1.set_title("Weekday Hourly Patterns by Location")
    ax1.legend(fontsize=8)
    ax1.set_xticks(range(0, 24, 2))
    ax1.grid(True, alpha=0.3)

    # Plot 2: Day of week pattern
    ax2 = axes[0, 1]
    day_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
    daily = df.groupby(["sensor", "day_of_week"])["count"].mean().reset_index()

    x = np.arange(7)
    width = 0.25
    for i, (sensor, color) in enumerate(zip(sensors, colors)):
        sensor_data = daily[daily["sensor"] == sensor]
        day_means = [sensor_data[sensor_data["day_of_week"] == d]["count"].values[0]
                     if d in sensor_data["day_of_week"].values else 0 for d in day_order]
        ax2.bar(x + i*width, day_means, width, label=sensor.replace("_", " "),
                color=color, alpha=0.8, edgecolor="white")

    ax2.set_xticks(x + width)
    ax2.set_xticklabels([d[:3] for d in day_order])
    ax2.set_ylabel("Average Hourly Count")
    ax2.set_title("Average Traffic by Day of Week")
    ax2.legend(fontsize=8)

    # Plot 3: Anomaly detection (z-score)
    ax3 = axes[1, 0]
    transit_daily = df[df["sensor"] == "Transit_Hub"].groupby("date")["count"].sum().reset_index()
    transit_daily["z_score"] = (transit_daily["count"] - transit_daily["count"].mean()) / transit_daily["count"].std()

    normal = transit_daily[transit_daily["z_score"].abs() <= 2]
    anomalies = transit_daily[transit_daily["z_score"].abs() > 2]

    ax3.plot(transit_daily["date"], transit_daily["z_score"],
             color="gray", linewidth=1, alpha=0.7, label="Daily z-score")
    ax3.scatter(normal["date"], normal["z_score"],
                color="steelblue", s=15, alpha=0.5, zorder=3)
    ax3.scatter(anomalies["date"], anomalies["z_score"],
                color="red", s=80, zorder=5, label=f"Anomalies (n={len(anomalies)})")
    ax3.axhline(y=2, color="red", linestyle="--", alpha=0.4)
    ax3.axhline(y=-2, color="red", linestyle="--", alpha=0.4)
    ax3.set_xlabel("Date")
    ax3.set_ylabel("Z-Score (Standard Deviations from Mean)")
    ax3.set_title("Anomaly Detection: Transit Hub Daily Traffic")
    ax3.legend()

    # Plot 4: Heatmap of weekday vs. hour
    ax4 = axes[1, 1]
    pivot = df[df["sensor"] == "Downtown_Business_District"].pivot_table(
        values="count", index="day_of_week", columns="hour", aggfunc="mean"
    )
    # Reorder days
    dow_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
    pivot = pivot.reindex([d for d in dow_order if d in pivot.index])

    im = ax4.imshow(pivot.values, aspect="auto", cmap="YlOrRd")
    ax4.set_xticks(range(0, 24, 2))
    ax4.set_xticklabels(range(0, 24, 2), fontsize=8)
    ax4.set_yticks(range(len(pivot.index)))
    ax4.set_yticklabels([d[:3] for d in pivot.index], fontsize=8)
    ax4.set_title("Downtown Traffic Heatmap (Day × Hour)")
    ax4.set_xlabel("Hour")
    plt.colorbar(im, ax=ax4, label="Avg Count")

    plt.tight_layout()
    plt.savefig("foot_traffic_analysis.png", dpi=150, bbox_inches="tight")
    plt.show()
    print("Chart saved as foot_traffic_analysis.png")


def run_foot_traffic_analysis():
    print("=" * 60)
    print("URBAN FOOT TRAFFIC ANALYSIS")
    print("Connecting to Chapter 25: The Smart City")
    print("=" * 60)

    print("\n[1] Generating 90 days of synthetic foot traffic data...")
    df = generate_pedestrian_data(days=90)
    print(f"    Generated {len(df):,} sensor readings")
    print(f"    Sensors: {', '.join(df['sensor'].unique())}")

    print("\n[2] Summary statistics:")
    summary = df.groupby("sensor")["count"].agg(["mean", "max", "sum"])
    for sensor in df["sensor"].unique():
        row = summary.loc[sensor]
        print(f"    {sensor.replace('_', ' ')}")
        print(f"      Avg hourly: {row['mean']:.0f} | Peak hourly: {row['max']:.0f}")

    print("\n[3] Generating temporal pattern analysis...")
    analyze_temporal_patterns(df)

    print("\n--- SMART CITY SURVEILLANCE IMPLICATIONS ---")
    print("Foot traffic data can reveal:")
    print("  - Religious attendance patterns (Friday/Saturday/Sunday spikes near houses of worship)")
    print("  - Political assembly (anomalous gatherings near government buildings)")
    print("  - Medical patterns (traffic at clinic locations)")
    print("  - Economic activity patterns")
    print("  - Individual routine inference when combined with ID systems")

    return df


if __name__ == "__main__":
    df = run_foot_traffic_analysis()

Discussion Questions: 1. The anomaly detection identifies days with unusual traffic. What events might cause anomalies? Who might want to know about them? 2. How does the aggregated, "anonymized" foot traffic data differ in privacy implications from individual-level location tracking? 3. What would you need to add to foot traffic analysis to identify individuals rather than crowds?


Section 3: Algorithmic Management Simulation

Connecting to Chapter 28 — Algorithmic Management

Why This Matters

Amazon, Uber, and dozens of other companies use automated systems to set productivity targets, assign work, and issue discipline. This simulation demonstrates how algorithmic management works and how small biases can compound over time.

"""
Section 3: Algorithmic Management Simulation
Connects to Chapter 28: The Algorithmic Supervisor

Demonstrates:
- How productivity tracking creates feedback loops
- How initial small biases compound
- Why "neutral" algorithms produce unequal outcomes
"""

import random
import statistics
import matplotlib.pyplot as plt
import numpy as np

class AlgorithmicManagementSystem:
    """
    Simulates a simplified algorithmic task management system.

    Workers are assigned tasks, monitored for speed and accuracy,
    and scored. High scorers get better task assignments (easier tasks,
    better routes); low scorers get harder tasks and more scrutiny.
    This feedback loop demonstrates self-reinforcing bias.
    """

    def __init__(self):
        self.workers = {}
        self.history = {}

    def add_worker(self, worker_id, baseline_speed, baseline_accuracy,
                   adjustment_factor=1.0, name=None):
        """
        Add a worker to the system.

        adjustment_factor simulates structural disadvantages:
        < 1.0 = disadvantaged (language barrier, older equipment,
                 longer commute to tasks, discriminatory traffic stops)
        = 1.0 = baseline
        > 1.0 = advantaged (faster equipment, better task areas)
        """
        self.workers[worker_id] = {
            "name": name or worker_id,
            "baseline_speed": baseline_speed,
            "baseline_accuracy": baseline_accuracy,
            "adjustment_factor": adjustment_factor,
            "current_score": 1.0,
            "score_history": [1.0],
            "task_difficulty": 1.0,  # Task difficulty assigned based on score
            "warnings": 0,
            "total_tasks": 0
        }
        self.history[worker_id] = []

    def simulate_task(self, worker_id, week):
        """Simulate completing one task."""
        worker = self.workers[worker_id]

        # High-scoring workers get easier task assignments
        # Low-scoring workers get harder tasks (more scrutiny, worse assignments)
        task_difficulty = worker["task_difficulty"]

        # Performance = baseline ability × adjustment factor × task difficulty effects
        # + random noise (real-world variation)
        speed_performance = (
            worker["baseline_speed"] *
            worker["adjustment_factor"] /
            task_difficulty *  # Harder tasks take longer
            random.gauss(1.0, 0.15)  # Random variation
        )

        accuracy_performance = (
            worker["baseline_accuracy"] *
            worker["adjustment_factor"] *
            random.gauss(1.0, 0.1)
        )

        # Clamp values to reasonable range
        speed_performance = max(0.3, min(2.0, speed_performance))
        accuracy_performance = max(0.4, min(1.0, accuracy_performance))

        # Algorithm scores: speed weighted 60%, accuracy 40%
        task_score = 0.6 * speed_performance + 0.4 * accuracy_performance

        # Update worker's running score (exponential moving average)
        # Recent performance counts more than historical
        alpha = 0.3  # Learning rate / recency weight
        worker["current_score"] = (1 - alpha) * worker["current_score"] + alpha * task_score
        worker["score_history"].append(worker["current_score"])
        worker["total_tasks"] += 1

        # Algorithmic consequences of score
        # Scores above 1.0: get preferential assignments (easier tasks)
        # Scores below 0.8: get warning; below 0.7: get harder tasks
        if worker["current_score"] >= 1.1:
            worker["task_difficulty"] = max(0.8, worker["task_difficulty"] - 0.02)
        elif worker["current_score"] >= 0.9:
            worker["task_difficulty"] = 1.0  # Neutral
        elif worker["current_score"] >= 0.8:
            worker["task_difficulty"] = min(1.3, worker["task_difficulty"] + 0.02)
        else:
            # Below threshold: warning issued, task difficulty increases
            worker["task_difficulty"] = min(1.5, worker["task_difficulty"] + 0.05)
            if week % 4 == 0 and random.random() < 0.3:
                worker["warnings"] += 1

        self.history[worker_id].append({
            "week": week,
            "task_score": task_score,
            "running_score": worker["current_score"],
            "task_difficulty": task_difficulty
        })

        return task_score


def run_algorithmic_management_simulation(weeks=52, tasks_per_week=50):
    """
    Run the full simulation and analyze results.
    """
    print("=" * 60)
    print("ALGORITHMIC MANAGEMENT SIMULATION")
    print("Connecting to Chapter 28: The Algorithmic Supervisor")
    print("=" * 60)

    sim = AlgorithmicManagementSystem()

    # Add workers with different structural positions
    # All workers have identical baseline ability (speed=1.0, accuracy=0.85)
    # BUT different structural adjustment factors

    sim.add_worker("W1", baseline_speed=1.0, baseline_accuracy=0.85,
                   adjustment_factor=1.0, name="Worker A (no disadvantage)")

    sim.add_worker("W2", baseline_speed=1.0, baseline_accuracy=0.85,
                   adjustment_factor=0.90, name="Worker B (10% disadvantage)")
    # Represents: older device, longer routes to task locations,
    # more frequent stops (traffic enforcement in their area)

    sim.add_worker("W3", baseline_speed=1.0, baseline_accuracy=0.85,
                   adjustment_factor=0.80, name="Worker C (20% disadvantage)")
    # Represents: language barriers in customer interactions,
    # less reliable transportation, housing insecurity affecting consistency

    sim.add_worker("W4", baseline_speed=1.0, baseline_accuracy=0.85,
                   adjustment_factor=1.10, name="Worker D (10% advantage)")
    # Represents: works in premium neighborhood with easier tasks,
    # newer equipment, lives close to high-density task areas

    print("\n[1] All workers start with identical underlying ability.")
    print("    Differences are STRUCTURAL (equipment, geography, context).")
    print("    The algorithm does not know about structural factors.")
    print("    Watch what happens over 52 weeks...\n")

    # Run simulation
    for week in range(1, weeks + 1):
        for _ in range(tasks_per_week):
            for worker_id in sim.workers:
                sim.simulate_task(worker_id, week)

    # Analysis
    print("[2] Results after", weeks, "weeks:")
    print("-" * 55)
    print(f"{'Worker':<35} {'Final Score':>12} {'Warnings':>10} {'Avg Difficulty':>15}")
    print("-" * 55)

    for worker_id, worker in sim.workers.items():
        final_score = worker["current_score"]
        warnings = worker["warnings"]
        avg_difficulty = statistics.mean([h["task_difficulty"] for h in sim.history[worker_id]])
        print(f"{worker['name']:<35} {final_score:>12.3f} {warnings:>10} {avg_difficulty:>15.3f}")

    print("\n[3] Generating trajectory chart...")

    # Plot score trajectories
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    fig.suptitle("Algorithmic Management: How Structural Disadvantage Compounds", fontsize=12)

    colors = {"W1": "steelblue", "W2": "darkorange", "W3": "firebrick", "W4": "mediumseagreen"}

    ax1 = axes[0]
    for worker_id, worker in sim.workers.items():
        scores = worker["score_history"]
        # Downsample for visibility
        x = range(0, len(scores), tasks_per_week)
        y = [scores[i] for i in x]
        ax1.plot(range(len(y)), y,
                 label=worker["name"], color=colors[worker_id],
                 linewidth=2, alpha=0.8)

    ax1.axhline(y=0.8, color="red", linestyle="--", alpha=0.5, label="Warning Threshold (0.8)")
    ax1.axhline(y=1.0, color="gray", linestyle="--", alpha=0.4, label="Baseline (1.0)")
    ax1.set_xlabel("Week")
    ax1.set_ylabel("Algorithmic Performance Score")
    ax1.set_title("Score Trajectories: Identical Ability,\nDifferent Structural Position")
    ax1.legend(fontsize=7)
    ax1.grid(True, alpha=0.3)

    ax2 = axes[1]
    final_scores = [sim.workers[w]["current_score"] for w in sorted(sim.workers.keys())]
    worker_names = [sim.workers[w]["name"].split("(")[0].strip() for w in sorted(sim.workers.keys())]
    bar_colors = [colors[w] for w in sorted(sim.workers.keys())]

    bars = ax2.bar(worker_names, final_scores, color=bar_colors, alpha=0.8, edgecolor="white", width=0.6)
    ax2.axhline(y=0.8, color="red", linestyle="--", alpha=0.6, label="Warning Threshold")
    ax2.axhline(y=1.0, color="gray", linestyle="--", alpha=0.6, label="Baseline")
    ax2.set_ylabel("Final Algorithmic Score")
    ax2.set_title("Final Scores After 52 Weeks\n(All workers started equal)")
    ax2.legend()

    for bar, score in zip(bars, final_scores):
        ax2.text(bar.get_x() + bar.get_width()/2., bar.get_height() + 0.01,
                 f"{score:.2f}", ha="center", va="bottom", fontsize=9)

    plt.tight_layout()
    plt.savefig("algorithmic_management.png", dpi=150, bbox_inches="tight")
    plt.show()
    print("Chart saved as algorithmic_management.png")

    print("\n--- KEY INSIGHT ---")
    print("All four workers began with IDENTICAL underlying ability.")
    print("Differences in outcome were caused entirely by structural factors")
    print("the algorithm could not see and was not designed to address.")
    print("The algorithm compounded initial disadvantage through feedback loops:")
    print("  low score → harder tasks → lower score → more warnings → job loss risk")
    print("\nThis dynamic occurs in real algorithmic management systems.")

    return sim


if __name__ == "__main__":
    sim = run_algorithmic_management_simulation()

Discussion Questions: 1. All workers in this simulation have identical underlying ability. What does this reveal about the "objectivity" of algorithmic performance scoring? 2. The feedback loop (low score → harder tasks → lower score) mirrors dynamics described in Chapter 28. Can you identify real-world parallels? 3. What would a "fair" algorithmic management system need to do to address structural disadvantage?


Section 4: Resume Screening Bias Demonstration

Connecting to Chapter 29 — HR Analytics

"""
Section 4: Resume Screening Bias Demonstration
Connects to Chapter 29: Hiring, Firing, and Algorithmic HR

Demonstrates:
- How keyword-based screening creates disparate impact
- How "neutral" criteria can encode demographic bias
- Disparate impact calculation
"""

import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt

def generate_candidate_pool(n=500, seed=99):
    """Generate a pool of job candidates with varying qualifications."""
    random.seed(seed)
    np.random.seed(seed)

    # Relevant skills for a data analyst position
    technical_skills = ["Python", "SQL", "Excel", "Tableau", "R", "Statistics", "Machine Learning"]
    soft_skills = ["communication", "teamwork", "problem-solving", "leadership"]

    # Prestigious vs. non-prestigious schools (correlates with socioeconomic status, not ability)
    prestigious_schools = ["Harvard", "MIT", "Stanford", "Yale", "Princeton", "Columbia"]
    state_schools = ["State University", "City College", "Regional University",
                     "Community College Transfer", "Online University"]

    candidates = []
    for i in range(n):
        # Background factors (socioeconomic proxy)
        is_advantaged = random.random() < 0.35  # 35% from advantaged backgrounds

        # ABILITY: Equally distributed regardless of background
        true_ability = random.gauss(0.70, 0.15)
        true_ability = max(0.3, min(1.0, true_ability))

        # School: Correlated with background, NOT ability
        if is_advantaged:
            school = random.choice(prestigious_schools)
            gpa = random.gauss(3.5, 0.3)
        else:
            school = random.choice(state_schools)
            gpa = random.gauss(3.4, 0.3)  # Essentially equal GPA distribution

        gpa = max(2.0, min(4.0, gpa))

        # Work experience (slightly correlated with background due to networks)
        years_experience = max(0, int(random.gauss(3, 2) + (1 if is_advantaged else 0)))
        internship = (random.random() < (0.70 if is_advantaged else 0.45))

        # Technical skills: Equal across groups
        n_skills = random.randint(2, len(technical_skills))
        skills = random.sample(technical_skills, n_skills)

        # Resume language: advantaged candidates use more "prestige" keywords
        # This reflects cultural capital, not ability
        power_keywords = []
        if is_advantaged:
            if random.random() < 0.6: power_keywords.append("led")
            if random.random() < 0.5: power_keywords.append("managed")
            if random.random() < 0.4: power_keywords.append("drove growth")
            if internship: power_keywords.extend(["Goldman", "McKinsey", "Big Tech intern"])
        else:
            if random.random() < 0.3: power_keywords.append("helped")
            if random.random() < 0.4: power_keywords.append("assisted")
            if internship: power_keywords.append("local business intern")

        candidates.append({
            "id": f"CAND_{i:04d}",
            "true_ability": true_ability,
            "is_advantaged": is_advantaged,
            "school": school,
            "prestigious_school": school in prestigious_schools,
            "gpa": round(gpa, 2),
            "years_experience": years_experience,
            "internship": internship,
            "n_technical_skills": len(skills),
            "technical_skills": "|".join(skills),
            "power_keywords": len(power_keywords),
            "has_prestige_internship": any(k in ["Goldman", "McKinsey", "Big Tech intern"]
                                            for k in power_keywords)
        })

    return pd.DataFrame(candidates)


def apply_screening_algorithm(df):
    """
    Apply a simple keyword/credential-based screening algorithm.
    Points are awarded for credentials and keywords.
    """
    scores = pd.Series(0.0, index=df.index)

    # Prestigious school: +20 points (neutral-seeming but class-correlated)
    scores += df["prestigious_school"].astype(int) * 20

    # GPA: +0-15 points based on GPA
    scores += (df["gpa"] - 2.0) / 2.0 * 15

    # Years experience: +3 per year (caps at 5 years = 15 pts)
    scores += df["years_experience"].clip(0, 5) * 3

    # Internship: +10 points
    scores += df["internship"].astype(int) * 10

    # Prestige internship: additional +15 points
    scores += df["has_prestige_internship"].astype(int) * 15

    # Technical skills: +5 per skill
    scores += df["n_technical_skills"] * 5

    # Power/action keywords: +3 each
    scores += df["power_keywords"] * 3

    return scores


def calculate_disparate_impact(df, threshold):
    """
    Calculate disparate impact ratio.
    Disparate impact: selection rate for disadvantaged group / selection rate for advantaged group
    EEOC 4/5ths rule: ratio below 0.8 suggests adverse impact.
    """
    selected = df["algorithm_score"] >= threshold

    advantaged_rate = selected[df["is_advantaged"]].mean()
    disadvantaged_rate = selected[~df["is_advantaged"]].mean()

    ratio = disadvantaged_rate / advantaged_rate if advantaged_rate > 0 else 0

    return {
        "advantaged_selection_rate": advantaged_rate,
        "disadvantaged_selection_rate": disadvantaged_rate,
        "disparate_impact_ratio": ratio,
        "n_selected_advantaged": selected[df["is_advantaged"]].sum(),
        "n_selected_disadvantaged": selected[~df["is_advantaged"]].sum(),
        "threshold": threshold
    }


def run_resume_screening_analysis():
    print("=" * 60)
    print("RESUME SCREENING BIAS DEMONSTRATION")
    print("Connecting to Chapter 29: Algorithmic HR")
    print("=" * 60)

    print("\n[1] Generating pool of 500 job candidates...")
    df = generate_candidate_pool(n=500)

    print(f"\n    Candidate pool composition:")
    print(f"    Advantaged background: {df['is_advantaged'].sum()} ({df['is_advantaged'].mean()*100:.0f}%)")
    print(f"    Non-advantaged background: {(~df['is_advantaged']).sum()} ({(~df['is_advantaged']).mean()*100:.0f}%)")
    print(f"\n    Average TRUE ABILITY by background:")
    print(f"    Advantaged: {df[df['is_advantaged']]['true_ability'].mean():.3f}")
    print(f"    Non-advantaged: {df[~df['is_advantaged']]['true_ability'].mean():.3f}")
    print(f"    (Note: Nearly identical underlying ability)")

    # Apply algorithm
    df["algorithm_score"] = apply_screening_algorithm(df)

    print(f"\n[2] Algorithm screening scores by background:")
    print(f"    Advantaged avg score:     {df[df['is_advantaged']]['algorithm_score'].mean():.1f}")
    print(f"    Non-advantaged avg score: {df[~df['is_advantaged']]['algorithm_score'].mean():.1f}")

    # Disparate impact at different thresholds
    print(f"\n[3] Disparate Impact Analysis (EEOC 4/5ths rule: ratio < 0.8 = adverse impact):")
    print(f"    {'Threshold':>10} {'Adv. Rate':>12} {'Disadv. Rate':>14} {'DI Ratio':>10} {'Adverse?':>10}")
    print("    " + "-" * 60)

    for threshold in [50, 60, 70, 80]:
        di = calculate_disparate_impact(df, threshold)
        adverse = "YES" if di["disparate_impact_ratio"] < 0.8 else "no"
        print(f"    {threshold:>10} {di['advantaged_selection_rate']:>12.1%} "
              f"{di['disadvantaged_selection_rate']:>14.1%} "
              f"{di['disparate_impact_ratio']:>10.3f} {adverse:>10}")

    # Correlation: algorithm score vs. true ability
    corr = df[["algorithm_score", "true_ability"]].corr().iloc[0, 1]
    print(f"\n[4] Correlation between algorithm score and TRUE ABILITY: {corr:.3f}")
    print(f"    The algorithm is {'weakly' if abs(corr) < 0.5 else 'moderately'} correlated with actual ability.")

    # Visualization
    fig, axes = plt.subplots(1, 2, figsize=(13, 5))
    fig.suptitle("Resume Screening: Why 'Neutral' Algorithms Produce Biased Outcomes", fontsize=11)

    ax1 = axes[0]
    advantaged = df[df["is_advantaged"]]
    disadvantaged = df[~df["is_advantaged"]]
    ax1.hist(advantaged["algorithm_score"], bins=20, alpha=0.6,
             color="steelblue", label="Advantaged background", density=True)
    ax1.hist(disadvantaged["algorithm_score"], bins=20, alpha=0.6,
             color="coral", label="Non-advantaged background", density=True)
    ax1.axvline(x=70, color="black", linestyle="--", linewidth=1.5, label="Threshold (70)")
    ax1.set_xlabel("Algorithm Score")
    ax1.set_ylabel("Density")
    ax1.set_title("Score Distribution by Background\n(Same underlying ability)")
    ax1.legend()

    ax2 = axes[1]
    ax2.scatter(disadvantaged["true_ability"], disadvantaged["algorithm_score"],
                alpha=0.4, color="coral", s=15, label="Non-advantaged")
    ax2.scatter(advantaged["true_ability"], advantaged["algorithm_score"],
                alpha=0.4, color="steelblue", s=15, label="Advantaged")
    ax2.axhline(y=70, color="black", linestyle="--", linewidth=1, alpha=0.7, label="Threshold (70)")
    ax2.set_xlabel("True Ability (Equal Between Groups)")
    ax2.set_ylabel("Algorithm Score")
    ax2.set_title(f"Algorithm Score vs. True Ability\n(Correlation: {corr:.2f})")
    ax2.legend()

    plt.tight_layout()
    plt.savefig("resume_screening.png", dpi=150, bbox_inches="tight")
    plt.show()
    print("\nChart saved as resume_screening.png")

    return df


if __name__ == "__main__":
    df = run_resume_screening_analysis()

Discussion Questions: 1. The algorithm uses no explicitly discriminatory criteria — no race, gender, or class markers. Yet it produces disparate impact. What does this tell us about the concept of "neutral" algorithms? 2. The algorithm correlates imperfectly with true ability. What would a better hiring process look like? 3. How do the concepts of "proxy discrimination" (Section 5 of Appendix B, Study 20) apply to this simulation?


Section 5: Differential Privacy Introduction

Connecting to Chapter 39 — Designing for Privacy

"""
Section 5: Differential Privacy — The Laplace Mechanism
Connects to Chapter 39: Privacy by Design

Demonstrates:
- How differential privacy works mathematically
- The epsilon parameter and privacy-accuracy tradeoff
- Why adding noise can protect individuals without destroying aggregate insight
"""

import numpy as np
import matplotlib.pyplot as plt


def laplace_mechanism(true_value, sensitivity, epsilon):
    """
    Apply the Laplace mechanism for differential privacy.

    Parameters:
    - true_value: The true query result (e.g., actual count)
    - sensitivity: How much one individual's data can change the result (typically 1 for counting queries)
    - epsilon: Privacy parameter. Smaller epsilon = more noise = more privacy = less accuracy.

    Returns: Privatized result
    """
    # Scale of Laplace noise = sensitivity / epsilon
    noise_scale = sensitivity / epsilon
    noise = np.random.laplace(0, noise_scale)
    return true_value + noise


def demonstrate_epsilon_tradeoff(true_count=150, sensitivity=1, n_simulations=1000):
    """
    Show how different epsilon values trade privacy for accuracy.
    """
    epsilons = [0.1, 0.5, 1.0, 2.0, 10.0]
    results = {}

    for eps in epsilons:
        # Run many simulations to show distribution of noisy answers
        noisy_counts = [laplace_mechanism(true_count, sensitivity, eps)
                        for _ in range(n_simulations)]
        results[eps] = noisy_counts

    return results


def run_differential_privacy_demo():
    print("=" * 60)
    print("DIFFERENTIAL PRIVACY: LAPLACE MECHANISM")
    print("Connecting to Chapter 39: Privacy by Design")
    print("=" * 60)

    # Scenario: a hospital wants to report how many patients tested positive
    # for a sensitive condition, without revealing whether any specific
    # individual is in the dataset.

    TRUE_COUNT = 47  # True number of patients with condition
    POPULATION = 1000

    print(f"\nScenario: Hospital has data on {POPULATION} patients.")
    print(f"True count with sensitive condition: {TRUE_COUNT}")
    print(f"Goal: Report aggregate statistics while protecting individual privacy.")
    print(f"\nThe Laplace mechanism adds calibrated random noise.")
    print(f"'Epsilon' controls the privacy-accuracy tradeoff.")

    print("\n[1] Effect of epsilon on answer accuracy (10,000 queries):")
    print(f"    {'Epsilon':>10} {'Noise Scale':>12} {'Mean Answer':>13} {'Std Dev':>10} {'Privacy Level':>15}")
    print("    " + "-" * 65)

    results = demonstrate_epsilon_tradeoff(TRUE_COUNT, n_simulations=10000)

    for eps, noisy_answers in sorted(results.items()):
        noise_scale = 1.0 / eps
        mean_answer = np.mean(noisy_answers)
        std_dev = np.std(noisy_answers)

        if eps <= 0.1:
            privacy_level = "Strong Privacy"
        elif eps <= 1.0:
            privacy_level = "Moderate Privacy"
        elif eps <= 2.0:
            privacy_level = "Limited Privacy"
        else:
            privacy_level = "Weak Privacy"

        print(f"    {eps:>10.1f} {noise_scale:>12.1f} {mean_answer:>13.1f} {std_dev:>10.1f} {privacy_level:>15}")

    print(f"\n    True value: {TRUE_COUNT}")
    print(f"    All mechanisms are unbiased (mean ≈ true value).")
    print(f"    Privacy comes at the cost of increased variance (uncertainty).")

    # Visualization
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    fig.suptitle("Differential Privacy: The Privacy-Accuracy Tradeoff", fontsize=12)

    colors = plt.cm.RdYlGn(np.linspace(0.1, 0.9, len(results)))

    ax1 = axes[0]
    for (eps, noisy_answers), color in zip(sorted(results.items()), colors):
        ax1.hist(noisy_answers, bins=50, alpha=0.6,
                 label=f"ε = {eps}", color=color, density=True)
    ax1.axvline(x=TRUE_COUNT, color="black", linestyle="--", linewidth=2, label=f"True value ({TRUE_COUNT})")
    ax1.set_xlabel("Noisy Answer")
    ax1.set_ylabel("Density")
    ax1.set_title("Distribution of Privatized Answers\nby Epsilon Value")
    ax1.legend(fontsize=8)
    ax1.set_xlim([TRUE_COUNT - 60, TRUE_COUNT + 60])

    ax2 = axes[1]
    epsilons = sorted(results.keys())
    std_devs = [np.std(results[e]) for e in epsilons]
    ax2.plot(epsilons, std_devs, "o-", color="firebrick", linewidth=2, markersize=8)
    ax2.fill_between(epsilons, std_devs, alpha=0.2, color="firebrick")
    ax2.set_xlabel("Epsilon (Privacy Parameter)")
    ax2.set_ylabel("Standard Deviation of Answer")
    ax2.set_title("Privacy-Accuracy Tradeoff\nLower epsilon = more privacy, less accuracy")

    # Annotate regions
    ax2.axvspan(0, 1, alpha=0.1, color="green", label="Strong-Moderate Privacy")
    ax2.axvspan(1, 5, alpha=0.1, color="orange", label="Limited Privacy")
    ax2.axvspan(5, 11, alpha=0.1, color="red", label="Weak Privacy")
    ax2.legend(fontsize=8)
    ax2.set_xlim([0, 10.5])

    plt.tight_layout()
    plt.savefig("differential_privacy.png", dpi=150, bbox_inches="tight")
    plt.show()
    print("\nChart saved as differential_privacy.png")

    print("\n--- KEY INSIGHT ---")
    print("Differential privacy provides a mathematical guarantee:")
    print("No adversary can determine, from the published answer, whether")
    print("any specific individual is in the dataset.")
    print("This is privacy-by-design: built into the release mechanism,")
    print("not dependent on the data analyst's good intentions.")
    print("\nUsed by: Apple (keyboard usage stats), Google (Chrome), US Census (2020).")


if __name__ == "__main__":
    run_differential_privacy_demo()

Section 6: Social Network Analysis

Connecting to Chapter 13 — Social Media Surveillance

"""
Section 6: Social Network Analysis
Connects to Chapter 13: The Platform Knows Who You Know

Demonstrates:
- What social graph data reveals about individuals
- Community detection
- How metadata reveals more than content
"""

import networkx as nx
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import random
import numpy as np
from collections import defaultdict


def create_social_graph(seed=42):
    """
    Create a synthetic social graph with distinct communities.
    Each community represents a social context (family, work, political org, etc.)
    """
    random.seed(seed)
    np.random.seed(seed)

    G = nx.Graph()

    # Define communities (node IDs and labels)
    communities = {
        "Family": list(range(0, 8)),
        "Work Colleagues": list(range(8, 20)),
        "Political Organization": list(range(20, 30)),
        "Religious Community": list(range(30, 38)),
        "Online Forum": list(range(38, 48))
    }

    node_community = {}
    for community_name, nodes in communities.items():
        for node in nodes:
            node_community[node] = community_name
            G.add_node(node, community=community_name)

    # Add intra-community edges (dense connections within communities)
    for community_name, nodes in communities.items():
        for i in range(len(nodes)):
            for j in range(i+1, len(nodes)):
                if random.random() < 0.5:
                    G.add_edge(nodes[i], nodes[j])

    # Add inter-community edges (sparse connections between communities)
    all_nodes = list(G.nodes())
    n_cross_edges = 15
    for _ in range(n_cross_edges):
        a = random.choice(all_nodes)
        b = random.choice(all_nodes)
        if a != b and node_community[a] != node_community[b]:
            G.add_edge(a, b)

    # Identify "subject" — a node in political organization
    # that government might target
    subject_node = communities["Political Organization"][0]
    G.nodes[subject_node]["is_subject"] = True

    return G, node_community, subject_node


def analyze_what_metadata_reveals(G, node_community, subject_node):
    """
    Demonstrate what social graph analysis reveals about individuals
    without access to message content.
    """
    print("\n[2] What metadata analysis reveals about the 'subject' node:")
    print("    (No message content accessed — only who-talks-to-whom)")

    subject_neighbors = list(G.neighbors(subject_node))
    neighbor_communities = [node_community[n] for n in subject_neighbors]

    from collections import Counter
    community_counts = Counter(neighbor_communities)

    print(f"\n    Subject's direct connections: {len(subject_neighbors)}")
    print(f"    Community affiliations revealed:")
    for community, count in community_counts.most_common():
        print(f"      {community:<30} {count} connections")

    # Identify sensitive affiliations
    sensitive = [c for c in community_counts if c in ["Political Organization", "Religious Community"]]
    print(f"\n    Sensitive affiliations detectable from metadata: {sensitive}")
    print(f"    (Without reading a single message)")

    # Centrality analysis
    betweenness = nx.betweenness_centrality(G)
    degree = nx.degree_centrality(G)

    print(f"\n    Subject's degree centrality: {degree[subject_node]:.3f}")
    print(f"    Subject's betweenness centrality: {betweenness[subject_node]:.3f}")

    # Two-hop inference
    two_hop_neighbors = set()
    for neighbor in subject_neighbors:
        for nn in G.neighbors(neighbor):
            if nn != subject_node and nn not in subject_neighbors:
                two_hop_neighbors.add(nn)

    two_hop_communities = Counter([node_community[n] for n in two_hop_neighbors])
    print(f"\n    Two-hop connections (associates' associates): {len(two_hop_neighbors)}")
    print(f"    This expands the surveillance picture to include:")
    for community, count in two_hop_communities.most_common(3):
        print(f"      {community}: {count} additional people")


def run_social_network_analysis():
    print("=" * 60)
    print("SOCIAL NETWORK ANALYSIS: WHAT METADATA REVEALS")
    print("Connecting to Chapter 13: Social Media Surveillance")
    print("=" * 60)

    print("\n[1] Creating synthetic social graph (48 nodes, multiple communities)...")
    G, node_community, subject_node = create_social_graph()

    print(f"    Nodes: {G.number_of_nodes()} | Edges: {G.number_of_edges()}")
    print(f"    Communities: {list(set(node_community.values()))}")

    analyze_what_metadata_reveals(G, node_community, subject_node)

    print("\n[3] Visualizing social graph...")

    # Community detection using Louvain (approximated here with known communities)
    community_colors = {
        "Family": "#4ECDC4",
        "Work Colleagues": "#45B7D1",
        "Political Organization": "#FF6B6B",
        "Religious Community": "#95E1D3",
        "Online Forum": "#F38181"
    }

    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    fig.suptitle("Social Graph Analysis: What 'Metadata' Reveals", fontsize=12)

    # Layout
    pos = nx.spring_layout(G, seed=42, k=0.8)

    # Plot 1: Full graph colored by community
    ax1 = axes[0]
    node_colors = [community_colors[node_community[n]] for n in G.nodes()]
    node_sizes = [300 if n == subject_node else 80 for n in G.nodes()]
    node_edge_colors = ["red" if n == subject_node else "white" for n in G.nodes()]

    nx.draw_networkx_edges(G, pos, ax=ax1, alpha=0.2, edge_color="gray")
    nx.draw_networkx_nodes(G, pos, ax=ax1, node_color=node_colors,
                           node_size=node_sizes, edgecolors=node_edge_colors,
                           linewidths=[3 if n == subject_node else 0.5 for n in G.nodes()])

    # Mark the subject
    ax1.annotate("SUBJECT\n(surveillance target)",
                 xy=pos[subject_node], fontsize=8, color="red",
                 xytext=(pos[subject_node][0] + 0.15, pos[subject_node][1] + 0.15),
                 arrowprops=dict(arrowstyle="->", color="red"))

    patches = [mpatches.Patch(color=color, label=community)
               for community, color in community_colors.items()]
    ax1.legend(handles=patches, loc="lower left", fontsize=7)
    ax1.set_title("Social Graph Colored by Community\n(No content accessed)")
    ax1.axis("off")

    # Plot 2: Subject's ego network
    ax2 = axes[1]
    ego_graph = nx.ego_graph(G, subject_node, radius=2)
    ego_pos = nx.spring_layout(ego_graph, seed=42)

    ego_colors = []
    ego_sizes = []
    for n in ego_graph.nodes():
        if n == subject_node:
            ego_colors.append("red")
            ego_sizes.append(400)
        elif n in G.neighbors(subject_node):
            ego_colors.append(community_colors[node_community[n]])
            ego_sizes.append(150)
        else:
            ego_colors.append("#CCCCCC")
            ego_sizes.append(60)

    nx.draw_networkx_edges(ego_graph, ego_pos, ax=ax2, alpha=0.3, edge_color="gray")
    nx.draw_networkx_nodes(ego_graph, ego_pos, ax=ax2, node_color=ego_colors, node_size=ego_sizes)
    ax2.set_title(f"Subject's Ego Network (1-2 hops)\n{ego_graph.number_of_nodes()} people revealed")
    ax2.axis("off")

    plt.tight_layout()
    plt.savefig("social_network_analysis.png", dpi=150, bbox_inches="tight")
    plt.show()
    print("Chart saved as social_network_analysis.png")

    print("\n--- KEY INSIGHT ---")
    print("Intelligence analysts famously noted: 'We kill people based on metadata.'")
    print("Social graph analysis without any message content reveals:")
    print("  - Community affiliations (political, religious, social)")
    print("  - Organizational structure (who is central?)")
    print("  - Association with designated 'subjects'")
    print("  - Scope of surveillance expansion (two-hop rule)")
    print("\nEvery social media platform and telecom provider has this data.")

    return G


if __name__ == "__main__":
    G = run_social_network_analysis()

Running All Sections Together

"""
Run all surveillance analytics demonstrations in sequence.
"""

if __name__ == "__main__":
    print("\n" + "=" * 60)
    print("THE ARCHITECTURE OF SURVEILLANCE — PYTHON ANALYTICS")
    print("=" * 60)
    print("\nThis module demonstrates six forms of surveillance analysis.")
    print("Each section connects to specific chapters in the textbook.\n")

    print("Available demonstrations:")
    print("  Section 1: Location Data Analysis (Chapter 18)")
    print("  Section 2: Urban Foot Traffic Analysis (Chapter 25)")
    print("  Section 3: Algorithmic Management Simulation (Chapter 28)")
    print("  Section 4: Resume Screening Bias (Chapter 29)")
    print("  Section 5: Differential Privacy (Chapter 39)")
    print("  Section 6: Social Network Analysis (Chapter 13)")
    print("\nRun each section independently by importing its function.")
    print("See individual section files for standalone execution.")

For extensions and advanced projects, students should explore: the OpenWPM web measurement framework (for tracking web surveillance); the Fairlearn library (for algorithmic fairness analysis); and NetworkX's community detection algorithms. The EFF's Atlas of Surveillance data is publicly available for geographic analysis of surveillance deployment.