Appendix B: The bluekit Defender's Toolkit

Throughout the book you build bluekit, a small Python package a working defender can actually use. Each chapter's Project Checkpoint adds one module; they are collected here in order. As everywhere in this book, the code is illustrative — every example shows its hand-traced expected output and is meant to be read and adapted, not run blindly in production.

Chapter 1 — What Is Cybersecurity

# bluekit/riskcalc.py  — Chapter 1 increment (the toolkit's first module)
"""Qualitative risk scoring: the first tool in the defender's kit.

Risk = likelihood x impact, on a 1 (very low) to 5 (very high) scale.
Extended with quantitative methods (single/annualized loss expectancy) in Chapter 27.

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
"""


def risk_score(likelihood: int, impact: int) -> int:
    """Return a qualitative risk score (1-25) from 1-5 likelihood and impact."""
    for name, v in (("likelihood", likelihood), ("impact", impact)):
        if not 1 <= v <= 5:
            raise ValueError(f"{name} must be 1-5, got {v}")
    return likelihood * impact


def band(score: int) -> str:
    """Map a 1-25 score to an action band a board will understand."""
    if score >= 15:
        return "CRITICAL"   # fix now
    if score >= 8:
        return "HIGH"       # fix this quarter
    if score >= 4:
        return "MEDIUM"     # plan it
    return "LOW"            # accept or monitor


if __name__ == "__main__":
    findings = [
        ("Weak passwords on online-banking portal", 4, 5),
        ("Unpatched internal printer server",        2, 1),
        ("Active account for departed contractor",   3, 4),
    ]
    for desc, likelihood, impact in sorted(
            findings, key=lambda f: risk_score(f[1], f[2]), reverse=True):
        s = risk_score(likelihood, impact)
        print(f"{s:2d}  {band(s):8s}  {desc}")

# Expected output:
# 20  CRITICAL  Weak passwords on online-banking portal
# 12  HIGH      Active account for departed contractor
#  2  LOW       Unpatched internal printer server

Chapter 2 — Threat Landscape

# bluekit/threatmodel.py  — Chapter 2 increment (the toolkit's second module)
"""Map security events to kill-chain stages, and size an attack surface.

kill_chain_stage() is a teaching classifier (keyword-based), not a detection
engine — Chapter 22 builds the real thing. attack_surface() turns Chapter 1's
qualitative idea into a countable metric to track over time (Chapter 36).

All bluekit code is illustrative and hand-traced: nothing here is executed
during authoring; every runnable block shows its expected output as a comment.
"""

# minimal keyword -> kill-chain stage map (lowercased substring match)
_SIGNS = {
    "port scan": "Reconnaissance", "vuln scan": "Reconnaissance",
    "phishing": "Delivery", "malicious link": "Delivery",
    "exploit": "Exploitation", "credential use": "Exploitation",
    "new service": "Installation", "scheduled task": "Installation",
    "beacon": "Command and Control", "c2": "Command and Control",
    "exfil": "Actions on Objectives", "ransomware": "Actions on Objectives",
}


def kill_chain_stage(event: str) -> str:
    """Return the kill-chain stage for an event description, or 'Unknown'."""
    e = event.lower()
    for sign, stage in _SIGNS.items():
        if sign in e:
            return stage
    return "Unknown"


def attack_surface(assets: dict) -> int:
    """Sum exposure points: {asset: {'ports': n, 'apis': n, 'users': n}}."""
    return sum(a.get("ports", 0) + a.get("apis", 0) + a.get("users", 0)
               for a in assets.values())


if __name__ == "__main__":
    for ev in ["Port scan from 203.0.113.9", "Phishing email reported",
               "Beacon to unknown domain every 60s", "Ransomware note found"]:
        print(f"{kill_chain_stage(ev):22s} <- {ev}")
    meridian = {"portal": {"ports": 2, "apis": 5, "users": 0},
                "ad": {"ports": 3, "apis": 0, "users": 1800}}
    print("attack surface points:", attack_surface(meridian))

# Expected output:
# Reconnaissance         <- Port scan from 203.0.113.9
# Delivery               <- Phishing email reported
# Command and Control    <- Beacon to unknown domain every 60s
# Actions on Objectives  <- Ransomware note found
# attack surface points: 1810

Chapter 3 — Security Principles

# bluekit/controls.py  — Chapter 3 increment (the toolkit's control classifier)
"""Classify a security control on two axes: FUNCTION x NATURE (sec 3.3).

function: preventive | detective | corrective | compensating
nature:   administrative | technical | physical

Keeps Meridian's control framework consistently labelled so gaps are visible.
All bluekit code is illustrative and hand-traced: the expected output is shown
as a comment. Nothing here is executed during authoring.
"""

FUNCTIONS = {"preventive", "detective", "corrective", "compensating"}
NATURES = {"administrative", "technical", "physical"}


def classify(control: dict) -> tuple[str, str]:
    """Return (function, nature) for a control dict; validate both axes."""
    function = control.get("function", "").lower()
    nature = control.get("nature", "").lower()
    if function not in FUNCTIONS:
        raise ValueError(f"function must be one of {sorted(FUNCTIONS)}, got {function!r}")
    if nature not in NATURES:
        raise ValueError(f"nature must be one of {sorted(NATURES)}, got {nature!r}")
    return (function, nature)


if __name__ == "__main__":
    framework = [
        {"name": "Secure email gateway",       "function": "preventive", "nature": "technical"},
        {"name": "Security-awareness training", "function": "preventive", "nature": "administrative"},
        {"name": "FIDO2 security key (MFA)",    "function": "preventive", "nature": "technical"},
        {"name": "SOC log review",              "function": "detective",  "nature": "technical"},
        {"name": "Restore from backup",         "function": "corrective", "nature": "technical"},
    ]
    for c in framework:
        fn, nat = classify(c)
        print(f"{c['name']:30s} -> {fn:12s} / {nat}")

# Expected output:
# Secure email gateway           -> preventive   / technical
# Security-awareness training    -> preventive   / administrative
# FIDO2 security key (MFA)       -> preventive   / technical
# SOC log review                 -> detective    / technical
# Restore from backup            -> corrective   / technical

Chapter 4 — Cryptography Fundamentals

# bluekit/cryptutil.py  — Chapter 4 increment (the toolkit's crypto helpers)
"""Defensive crypto helpers built on the standard library's VETTED primitives.

We do NOT implement cryptography ourselves (the cardinal rule of sec 4.7): we wrap
hashlib / hmac / secrets so a defender can hash for integrity, authenticate a
message, and judge a secret's strength. Extended with TLS-config grading in Ch.5.

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring, and the
hash/HMAC outputs are shown as clearly-illustrative placeholders (a one-way digest
cannot be predicted by hand -- that is exactly the point of sec 4.4).
"""
import hashlib
import hmac
import math


def sha256_hex(data: bytes) -> str:
    """Hex SHA-256 digest of bytes -- for INTEGRITY, never for password storage."""
    return hashlib.sha256(data).hexdigest()


def hmac_sign(key: bytes, msg: bytes) -> str:
    """HMAC-SHA256 tag: authenticity + integrity for a shared-key message."""
    return hmac.new(key, msg, hashlib.sha256).hexdigest()


def entropy_bits(charset_size: int, length: int) -> float:
    """Approx entropy (bits) of a uniformly random secret: length * log2(charset)."""
    if charset_size < 2 or length < 1:
        raise ValueError("charset_size >= 2 and length >= 1 required")
    return round(length * math.log2(charset_size), 1)


if __name__ == "__main__":
    print(sha256_hex(b"meridian"))                    # integrity digest (illustrative)
    print(hmac_sign(b"shared-key", b"transfer:500"))  # auth tag (illustrative)
    print(entropy_bits(94, 12))                       # 12-char full-keyboard password
    print(entropy_bits(2, 128))                       # a 128-bit random key

# Expected output (hashes shown TRUNCATED/FAKE -- real digests are 64 hex chars):
# 9c1185a5c5e9fc54612808977ee8f548b2258d31...   <- SHA-256 digest, 64 hex in reality
# 7f9a2b41c0d3e6f5a8b1c2d3e4f5061728394a5b...   <- HMAC tag, 64 hex chars
# 78.7      <- 12 * log2(94)  ~= 12 * 6.555 = 78.66 -> rounds to 78.7
# 128.0     <- 128 * log2(2)  = 128 * 1 = 128.0  (a full AES-128 key's worth)

Chapter 5 — Applied Cryptography

# bluekit/cryptutil.py  — Chapter 5 increment (extends the Chapter 4 module)
"""Applied-crypto helper for the defender's toolkit: grade a TLS endpoint config.

Chapter 4 created cryptutil.py (sha256_hex, hmac_sign, entropy_bits). Chapter 5
adds tls_config_grade(), which turns a parsed TLS scan (§5.7) into a letter grade
and a list of findings, so scanning hundreds of Meridian endpoints produces a
ranked remediation queue instead of a wall of text.

All bluekit code is illustrative and hand-traced: the runnable block below shows
its expected output as a comment. Nothing here is executed during authoring.
"""


def tls_config_grade(params: dict) -> tuple[str, list[str]]:
    """Grade a TLS endpoint config. params keys:
       min_protocol (e.g. 'TLS1.0'..'TLS1.3'), forward_secrecy (bool),
       weak_ciphers (list[str]), cert_days_left (int).
    Returns (grade, findings). Obsolete protocol or expired cert => automatic F."""
    findings = []
    rank = {"SSLv3": 0, "TLS1.0": 1, "TLS1.1": 2, "TLS1.2": 3, "TLS1.3": 4}
    if rank.get(params["min_protocol"], 0) < rank["TLS1.2"]:
        findings.append(f"obsolete protocol allowed: {params['min_protocol']}")
    if not params["forward_secrecy"]:
        findings.append("no forward secrecy (non-ephemeral key exchange)")
    if params["weak_ciphers"]:
        findings.append("weak ciphers offered: " + ", ".join(params["weak_ciphers"]))
    if params["cert_days_left"] < 0:
        findings.append("certificate EXPIRED")
    elif params["cert_days_left"] < 14:
        findings.append(f"certificate expires in {params['cert_days_left']} days")
    if any("obsolete protocol" in f or "EXPIRED" in f for f in findings):
        return "F", findings
    grade = "ABCDF"[min(len(findings), 4)]
    return grade, findings


if __name__ == "__main__":
    endpoints = {
        "banking (good)":   dict(min_protocol="TLS1.2", forward_secrecy=True,
                                 weak_ciphers=[], cert_days_left=200),
        "legacy microsite": dict(min_protocol="TLS1.0", forward_secrecy=False,
                                 weak_ciphers=["RC4", "3DES"], cert_days_left=-42),
        "api (expiring)":   dict(min_protocol="TLS1.3", forward_secrecy=True,
                                 weak_ciphers=[], cert_days_left=9),
    }
    for name, cfg in endpoints.items():
        grade, why = tls_config_grade(cfg)
        print(f"{grade}  {name}: {'; '.join(why) or 'clean'}")

# Expected output:
# A  banking (good): clean
# F  legacy microsite: obsolete protocol allowed: TLS1.0; no forward secrecy (non-ephemeral key exchange); weak ciphers offered: RC4, 3DES; certificate EXPIRED
# B  api (expiring): certificate expires in 9 days

Chapter 6 — Network Security Fundamentals

# bluekit/netfilter.py  — Chapter 6 increment (the toolkit's first network module)
"""Parse firewall logs into structured records — the first network tool.

A firewall log line is the ground truth of what crossed (or was denied at) a
zone boundary. We turn the raw text into a dict a SOC can filter, count, and
alert on. Extended in Chapter 7 with rule_matches(pkt, rule) and
default_deny(rules) so the toolkit can reason about whole rulesets.

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
"""

# Words a firewall might use for "blocked"; normalized to one `denied` flag.
_DENY_ACTIONS = ("DENY", "DROP", "BLOCK", "REJECT")


def parse_fw_log(line: str) -> dict:
    """Parse one space-delimited 'key=value' firewall log line into a dict.

    Expected fields include action, src, dst, dport, proto. Unknown keys are
    kept as-is. A line with no recognizable key=value pairs raises ValueError —
    a small integrity check on your own input, mirroring riskcalc.py in Ch.1.
    """
    fields = {}
    for token in line.split():
        if "=" in token:
            key, _, value = token.partition("=")
            fields[key] = value
    if not fields:
        raise ValueError(f"no key=value pairs found in: {line!r}")
    fields["denied"] = fields.get("action", "").upper() in _DENY_ACTIONS
    return fields


if __name__ == "__main__":
    log = [
        "action=ALLOW src=198.51.100.7 dst=192.0.2.10 dport=443 proto=tcp",
        "action=DENY  src=203.0.113.66 dst=192.0.2.10 dport=22  proto=tcp",
        "action=DROP  src=203.0.113.66 dst=192.0.2.10 dport=23  proto=tcp",
    ]
    for line in log:
        r = parse_fw_log(line)
        flag = "DENIED " if r["denied"] else "allowed"
        print(f"{flag} {r['src']:>15} -> {r['dst']}:{r['dport']}/{r['proto']}")

# Expected output:
# allowed    198.51.100.7 -> 192.0.2.10:443/tcp
# DENIED    203.0.113.66 -> 192.0.2.10:22/tcp
# DENIED    203.0.113.66 -> 192.0.2.10:23/tcp

Chapter 7 — Firewalls Ids Ips Nac

#!/usr/bin/env python3
# bluekit/netfilter.py  — Chapter 7 increment (extends Ch.6's parse_fw_log)
"""Evaluate packets against a default-deny ruleset (first match wins).

Canonical bluekit module name: netfilter.py. Chapter 6 added parse_fw_log(line)
to READ firewall logs; Chapter 7 adds the logic that DECIDES what those logs
should contain. Code is NEVER executed during authoring — output is hand-traced.
"""

def rule_matches(pkt: dict, rule: dict) -> bool:
    """True if pkt matches every field the rule specifies. '*' = wildcard."""
    for field in ("src", "dst", "proto", "dport"):
        want = rule.get(field, "*")
        if want != "*" and pkt.get(field) != want:
            return False          # any specified field that differs -> no match
    return True                   # all specified fields matched


def default_deny(pkt: dict, rules: list) -> str:
    """Return the action of the first matching rule; deny if none match."""
    for rule in rules:            # ordered: first match wins (see Section 7.2)
        if rule_matches(pkt, rule):
            return rule["action"]
    return "DENY"                 # the implicit final deny-all: fail safe


if __name__ == "__main__":
    cde_rules = [
        {"src": "10.30.0.40", "dst": "10.30.0.50", "proto": "tcp",
         "dport": 5432, "action": "ALLOW"},          # app -> CDE db only
        {"src": "10.20.9.10", "dst": "10.30.0.0/24", "proto": "tcp",
         "dport": 22, "action": "ALLOW"},            # jump host -> SSH only
    ]
    app_db   = {"src": "10.30.0.40", "dst": "10.30.0.50", "proto": "tcp", "dport": 5432}
    rogue_db = {"src": "10.20.5.55", "dst": "10.30.0.50", "proto": "tcp", "dport": 5432}
    for name, pkt in (("app->db", app_db), ("rogue->db", rogue_db)):
        print(f"{name:11s} -> {default_deny(pkt, cde_rules)}")

# Expected output:
# app->db     -> ALLOW
# rogue->db   -> DENY
#
# Hand trace: app_db matches rule 0 on every specified field -> ALLOW.
# rogue_db has the right dst/port but the WRONG src, so it fails rule 0 on `src`
# and does not match rule 1 either; it falls through to the final DENY. That last
# line is least privilege on the wire: anything not explicitly allowed is denied.

Chapter 8 — Wireless Security

# defender-checkpoint.py
# bluekit/wifiaudit.py  -- Chapter 8 increment (canonical module name: wifiaudit.py)
#
# Assess a wireless network configuration against Meridian's wireless policy.
# Input cfg is a dict describing one SSID/network. We return a list of finding
# strings (empty list == passes). Defensive: we only flag misconfigurations;
# we never attack anything.
#
# NEVER EXECUTED during authoring -- output is hand-traced below.

WEAK_PROTOCOLS = {"open", "wep", "wpa", "wpa-tkip"}   # prohibited or deprecated


def assess_wifi(cfg: dict) -> list[str]:
    """Return policy findings for one wireless network config."""
    findings = []
    proto = cfg.get("protocol", "").lower()
    if proto in WEAK_PROTOCOLS:                       # 8.2: WEP/WPA/open are findings
        findings.append(f"CRITICAL: weak/deprecated protocol '{proto}'")
    if proto == "wpa2-personal" and len(cfg.get("psk", "")) < 20:
        findings.append("HIGH: WPA2-PSK passphrase < 20 chars (offline-crackable)")
    if cfg.get("role") == "staff" and "enterprise" not in proto:
        findings.append("HIGH: staff network not using WPA-Enterprise (802.1X)")
    if proto.endswith("enterprise") and not cfg.get("server_cert_validation", False):
        findings.append("CRITICAL: enterprise EAP without server-cert validation (evil-twin risk)")
    if cfg.get("role") == "guest" and not cfg.get("isolated_from_internal", False):
        findings.append("CRITICAL: guest network not isolated from internal segments")
    if not cfg.get("pmf", False):                     # 8.4: 802.11w stops deauth
        findings.append("MEDIUM: Protected Management Frames (802.11w) not enforced")
    return findings


if __name__ == "__main__":
    bad_guest = {"protocol": "wpa2-personal", "psk": "guest", "role": "guest",
                 "isolated_from_internal": False, "pmf": False}
    good_staff = {"protocol": "wpa3-enterprise", "role": "staff",
                  "server_cert_validation": True, "pmf": True}
    for name, cfg in (("Branch-Guest", bad_guest), ("Branch-Staff", good_staff)):
        issues = assess_wifi(cfg)
        print(f"{name}: {'PASS' if not issues else str(len(issues)) + ' finding(s)'}")
        for i in issues:
            print(f"    - {i}")

# Expected output:
# Branch-Guest: 3 finding(s)
#     - HIGH: WPA2-PSK passphrase < 20 chars (offline-crackable)
#     - CRITICAL: guest network not isolated from internal segments
#     - MEDIUM: Protected Management Frames (802.11w) not enforced
# Branch-Staff: PASS

Chapter 9 — Dns Email Web Security

# bluekit/dnsguard.py  — Chapter 9 increment (DNS & email-auth auditing)
"""Audit email-authentication DNS records and score domains for DGA-like randomness.

Three defender questions, answered from the records themselves:
  - check_spf(record):   is the domain's SPF actually enforcing (-all)?
  - check_dmarc(record): does the domain's DMARC protect, or merely watch?
  - dga_score(name):     does this name look machine-generated (high entropy)?

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
"""
import math
from collections import Counter


def check_spf(record: str) -> str:
    """Grade an SPF TXT record by its terminating 'all' mechanism."""
    if not record.startswith("v=spf1"):
        return "INVALID: not an SPF record"
    if "-all" in record:
        return "STRONG: hard-fail (-all)"
    if "~all" in record:
        return "WEAK: soft-fail (~all) — tighten to -all"
    if "?all" in record or "+all" in record:
        return "DANGER: neutral/pass-all — record is decorative"
    return "INCOMPLETE: no 'all' mechanism — add -all"


def check_dmarc(record: str) -> str:
    """Grade a DMARC TXT record by its enforcement policy (p=)."""
    if "v=DMARC1" not in record:
        return "INVALID: not a DMARC record"
    if "p=reject" in record:
        return "STRONG: enforcing (reject)"
    if "p=quarantine" in record:
        return "MODERATE: quarantine — ramp to reject"
    if "p=none" in record:
        return "MONITOR-ONLY: no protection yet — advance the ladder"
    return "INVALID: missing policy (p=)"


def dga_score(name: str) -> float:
    """Shannon entropy (bits/char) of the leftmost label; higher = more DGA-like."""
    label = name.split(".")[0]
    counts = Counter(label)
    n = len(label)
    if n == 0:
        return 0.0
    return round(-sum((c / n) * math.log2(c / n) for c in counts.values()), 2)


if __name__ == "__main__":
    print(check_spf("v=spf1 ip4:203.0.113.10 -all"))
    print(check_dmarc("v=DMARC1; p=none; rua=mailto:dmarc@meridianbank.example"))
    print("loan ->", dga_score("loan.meridianbank.example"))   # human-readable
    print("dga  ->", dga_score("kq3v9xzplmw.example"))         # 11 distinct chars

# Expected output:
# STRONG: hard-fail (-all)
# MONITOR-ONLY: no protection yet — advance the ladder
# loan -> 2.0
# dga  -> 3.46
#
# Hand-trace of dga_score:
#   "loan" -> 4 distinct chars, each p=1/4 -> -sum(1/4 * log2(1/4)) = log2(4) = 2.0
#   "kq3v9xzplmw" -> 11 distinct chars, each p=1/11 -> log2(11) ~= 3.46

Chapter 10 — Network Monitoring

# bluekit/pktflow.py  — Chapter 10 increment (the network-monitoring tools)
"""Flow analysis for defenders: summarize, rank, and score for beaconing.

Input rows are simple dicts, as a NetFlow/IPFIX collector or a Zeek conn.log
would yield: {"src": ip, "dst": ip, "bytes": int, ...}. These three functions
are the network-visibility primitives the chapter builds toward:

    summarize_flows -> the baseline-building aggregation (per src/dst)
    top_talkers     -> the exfiltration first-look (rank by outbound bytes)
    beacon_score    -> the C2-beacon detector (regularity of check-in timing)

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
"""
from collections import defaultdict


def summarize_flows(rows):
    """Aggregate flows by (src, dst) -> {bytes, flows}. Baseline primitive."""
    agg = defaultdict(lambda: {"bytes": 0, "flows": 0})
    for r in rows:
        key = (r["src"], r["dst"])
        agg[key]["bytes"] += r["bytes"]
        agg[key]["flows"] += 1
    return dict(agg)


def top_talkers(flows, n=3):
    """Return the n (src, dst) pairs with the most bytes — the exfil first-look."""
    items = [(k, v["bytes"]) for k, v in summarize_flows(flows).items()]
    return sorted(items, key=lambda kv: kv[1], reverse=True)[:n]


def beacon_score(timestamps):
    """Regularity of check-ins in [0,1]; high = beacon-like (low relative variance).

    Method: take the gaps between consecutive connection times and compute the
    coefficient of variation (std / mean). Constant gaps -> CV near 0 -> score
    near 1.0; irregular human traffic -> large CV -> low score. We require at
    least three connections to judge a rhythm at all.
    """
    ts = sorted(timestamps)
    if len(ts) < 3:
        return 0.0
    gaps = [b - a for a, b in zip(ts, ts[1:])]
    mean = sum(gaps) / len(gaps)
    if mean == 0:
        return 0.0
    var = sum((g - mean) ** 2 for g in gaps) / len(gaps)
    cv = (var ** 0.5) / mean
    return round(max(0.0, 1.0 - cv), 3)


if __name__ == "__main__":
    flows = [
        {"src": "10.20.4.55", "dst": "192.0.2.80",   "bytes": 2181},
        {"src": "10.20.4.55", "dst": "192.0.2.80",   "bytes": 2180},
        {"src": "10.20.4.55", "dst": "192.0.2.80",   "bytes": 2195},
        {"src": "10.20.9.10", "dst": "198.51.100.7", "bytes": 9_000_000},
    ]
    print("top talker:", top_talkers(flows, n=1)[0])
    beacon = [3600, 7201, 10799, 14402, 18000]   # hourly, small jitter -> regular
    print("beacon score:", beacon_score(beacon))
    human = [0, 700, 2000, 2700, 4000]           # bursty browsing -> irregular
    print("human score: ", beacon_score(human))

# Expected output:
# top talker: (('10.20.9.10', '198.51.100.7'), 9000000)
# beacon score: 0.999
# human score:  0.7

Chapter 11 — Operating System Security

# bluekit/harden.py  — Chapter 11 increment (the toolkit's host-hardening auditor)
"""Audit an observed host configuration against an approved baseline.

A baseline maps each setting to its required value. audit_baseline reports every
setting that is missing or non-compliant -- the 'drift' a hardening program exists
to catch. A standard nobody audits is a suggestion; this turns the written baseline
into a repeatable, automatic verdict per host.

All bluekit code is illustrative and hand-traced: the runnable block below shows its
expected output as a comment. Nothing here is executed during authoring (Hard Rule 1).
Extended detection/vulnerability prioritization arrives in Chapters 22 and 23.
"""


def audit_baseline(settings: dict, baseline: dict) -> list[dict]:
    """Return one finding per setting that is missing or != its required value.

    settings : observed configuration of a host (key -> actual value)
    baseline : the approved standard            (key -> required value)
    returns  : list of {setting, required, actual} for each drifted setting
    """
    findings = []
    for key, required in baseline.items():
        actual = settings.get(key, "<MISSING>")
        if actual != required:
            findings.append({"setting": key, "required": required, "actual": actual})
    return findings


if __name__ == "__main__":
    baseline = {                      # Meridian Windows Server baseline (excerpt)
        "smbv1_enabled": False,
        "local_admin_enabled": False,        # LAPS-managed; account disabled
        "powershell_logging": True,
        "defender_tamper_protection": True,
        "host_firewall": "default_deny",
    }
    observed = {                      # the file server from this chapter's opening
        "smbv1_enabled": True,
        "local_admin_enabled": True,
        "powershell_logging": False,
        "defender_tamper_protection": False,
        "host_firewall": "default_deny",     # the one thing it got right
    }
    drift = audit_baseline(observed, baseline)
    print(f"{len(drift)} setting(s) drifted from baseline:")
    for f in drift:
        print(f"  {f['setting']:28s} required={f['required']!s:14s} actual={f['actual']!s}")

# Expected output:
# 4 setting(s) drifted from baseline:
#   smbv1_enabled                required=False          actual=True
#   local_admin_enabled          required=False          actual=True
#   powershell_logging           required=True           actual=False
#   defender_tamper_protection   required=True           actual=False

Chapter 12 — Application Security

# bluekit/appsec.py  — Chapter 12 increment (the toolkit's application-security module)
"""Application-security helpers for the bluekit toolkit.

Chapter 12 adds scan_dependencies(): a miniature software composition analysis (SCA) check.
Chapter 13 will add an illustrative, DEFENSIVE taint_demo(src, sink) for the source-to-sink
idea behind SAST. Keep these signatures stable so the toolkit composes (Appendix B assembles
the full package).

All bluekit code is illustrative and HAND-TRACED: every runnable block shows its expected
output as a comment. Nothing here is executed during authoring.

Why this exists: Log4Shell (CVE-2021-44228) taught the industry that the hard question in a
critical-dependency event is not "how does the exploit work?" but "do we use it, and WHERE?"
scan_dependencies answers that question against a (toy) advisory feed. A real SCA tool runs
the same idea over the FULL TRANSITIVE dependency tree and a LIVE vulnerability database.
"""

# A toy advisory feed: {package: (max_vulnerable_version_inclusive, advisory_id)}.
# A package at or below the listed version is considered vulnerable.
KNOWN_VULNERABLE = {
    "log4j-core": ("2.14.1", "CVE-2021-44228"),   # Log4Shell: fixed in 2.15.0+ (use 2.17.1+)
    "openssl":    ("1.0.2u", "CVE-2016-2107"),
}


def _ver_tuple(v):
    """Turn '2.14.1' into (2, 14, 1) for comparison; non-numeric parts -> 0."""
    return tuple(int(p) if p.isdigit() else 0 for p in v.split("."))


def scan_dependencies(reqs):
    """reqs: list of 'package==version' strings (a pinned requirements list).
    Return a list of (package, version, advisory_id) for any package at or below a
    known-vulnerable version. Unpinned lines are skipped (a finding of its own in practice).
    """
    findings = []
    for line in reqs:
        if "==" not in line:
            continue                                  # not pinned -> can't assess here
        pkg, ver = (p.strip() for p in line.split("==", 1))
        vuln = KNOWN_VULNERABLE.get(pkg.lower())
        if vuln and _ver_tuple(ver) <= _ver_tuple(vuln[0]):
            findings.append((pkg, ver, vuln[1]))      # (package, version, advisory)
    return findings


if __name__ == "__main__":
    requirements = [
        "log4j-core==2.14.1",   # vulnerable (Log4Shell)
        "log4j-core==2.17.1",   # patched -> not flagged
        "flask==2.0.1",         # not in our toy feed -> not flagged
        "openssl==1.0.2k",      # vulnerable (older than 1.0.2u)
    ]
    for pkg, ver, advisory in scan_dependencies(requirements):
        print(f"VULNERABLE  {pkg}=={ver}  ({advisory})")

# Expected output (hand-traced):
# VULNERABLE  log4j-core==2.14.1  (CVE-2021-44228)
# VULNERABLE  openssl==1.0.2k  (CVE-2016-2107)
#
# Trace:
#   log4j-core==2.14.1 -> (2,14,1) <= (2,14,1) is True  -> flagged
#   log4j-core==2.17.1 -> (2,17,1) <= (2,14,1) is False -> skipped
#   flask==2.0.1       -> not in feed                   -> skipped
#   openssl==1.0.2k    -> (1,0,2,0) <= (1,0,2,0) is True -> flagged  (k/u are non-numeric -> 0)

Chapter 13 — Web Application Security

"""
bluekit/appsec.py — Chapter 13 increment (the Project Checkpoint).

DEFENSIVE TOOLKIT. This file extends `appsec.py` (created in Chapter 12 with
`scan_dependencies`) with `taint_demo(src, sink)`: an ILLUSTRATIVE taint-tracking
demonstrator.

Concept: data from an untrusted SOURCE that reaches a dangerous SINK *without*
passing through a SANITIZER is "tainted" -> a possible injection vulnerability.
This mirrors how static analysis (the SAST you met in Chapter 12) reasons about
injection. It is a TEACHING MODEL, not a real static analyzer.

The vulnerability is always a FLOW: untrusted data reaching a dangerous interpreter
without a boundary in between. Every fix in Chapter 13 inserts that boundary
(parameterize, encode, allowlist, escape).

Illustrative only; <= 40 lines of logic; NEVER executed during authoring.
"# Expected output:" is hand-derived (see the Chapter 13 Project Checkpoint, where
one case is deliberately traced by hand as a teaching trap).
"""

UNTRUSTED_SOURCES = {"request.args", "request.form", "request.body", "url_param"}
DANGEROUS_SINKS   = {"db.execute", "os.system", "subprocess(shell=True)", "innerHTML"}
SANITIZERS        = {"parameterize", "html.escape", "allowlist", "shlex.quote"}


def taint_demo(src: str, sink: str, via=()):
    """Return ('VULNERABLE'|'safe', explanation) for a data-flow from src to sink.

    `via` is the ordered list of steps the data passes through (e.g., a sanitizer).
    """
    if src not in UNTRUSTED_SOURCES:
        return ("safe", f"{src!r} is trusted input; no taint introduced")
    if any(step in SANITIZERS for step in via):
        used = [s for s in via if s in SANITIZERS]
        return ("safe", f"tainted data sanitized by {used} before {sink!r}")
    if sink in DANGEROUS_SINKS:
        return ("VULNERABLE", f"untrusted {src!r} reaches {sink!r} unsanitized")
    return ("safe", f"{sink!r} is not a dangerous sink")


if __name__ == "__main__":
    cases = [
        ("request.args", "db.execute", ()),                 # the §13.2 bug
        ("request.args", "db.execute", ("parameterize",)),  # the §13.2 fix
        ("request.form", "innerHTML", ()),                  # the §13.3 DOM bug
        ("request.form", "innerHTML", ("html.escape",)),    # encoded -> safe
        ("config.value", "os.system", ()),                  # trusted source -> safe
    ]
    for src, sink, via in cases:
        verdict, why = taint_demo(src, sink, via)
        print(f"{verdict:10s} {src:14s} -> {sink:24s} | {why}")

# Expected output (hand-traced):
# VULNERABLE request.args   -> db.execute               | untrusted 'request.args' reaches 'db.execute' unsanitized
# safe       request.args   -> db.execute               | tainted data sanitized by ['parameterize'] before 'db.execute'
# VULNERABLE request.form   -> innerHTML                | untrusted 'request.form' reaches 'innerHTML' unsanitized
# safe       request.form   -> innerHTML                | tainted data sanitized by ['html.escape'] before 'innerHTML'
# safe       config.value   -> os.system                | 'config.value' is trusted input; no taint introduced

Chapter 14 — Mobile And Iot Security

# bluekit/iotinv.py  — Chapter 14 increment
"""Device inventory + default-credential triage for a mixed mobile/IoT fleet.

You cannot secure what you have not inventoried (Ch.1). This turns a raw
device list into a triage view and flags the single most-exploited IoT
weakness: unchanged default credentials (Ch.14 sec.4).

NOT EXECUTED during authoring — output below is hand-traced.
"""

# A tiny, illustrative set of well-known default pairs. A real tool would
# load a maintained database; we keep it short and obvious for teaching.
DEFAULT_CREDS = {("admin", "admin"), ("admin", "password"),
                 ("root", "root"), ("admin", "")}


def default_cred_flag(dev: dict) -> bool:
    """True if the device's (user, password) is a known default pair."""
    return (dev.get("user", ""), dev.get("password", "")) in DEFAULT_CREDS


def inventory(devices: list) -> dict:
    """Summarize a fleet: totals, default-cred offenders, and unmanaged count."""
    flagged = [d["id"] for d in devices if default_cred_flag(d)]
    unmanaged = [d["id"] for d in devices if not d.get("managed", False)]
    return {"total": len(devices),
            "default_cred": flagged,        # log in with a guess -> fix first
            "unmanaged": unmanaged}         # candidates for the IoT segment


if __name__ == "__main__":
    fleet = [
        {"id": "cam-lobby-07", "user": "admin", "password": "admin",    "managed": False},
        {"id": "phone-kim",    "user": "",      "password": "",         "managed": True},
        {"id": "printer-br12", "user": "admin", "password": "password", "managed": False},
        {"id": "atm-0042",     "user": "svc",   "password": "x9$Kp2#m", "managed": True},
    ]
    report = inventory(fleet)
    print(f"total={report['total']}")
    print(f"default_cred={report['default_cred']}")
    print(f"unmanaged={report['unmanaged']}")

# Expected output:
# total=4
# default_cred=['cam-lobby-07', 'printer-br12']
# unmanaged=['cam-lobby-07', 'printer-br12']

Chapter 15 — Cloud Security

# bluekit/cloudpost.py  — Chapter 15 increment
"""Cloud posture checks: the two findings that cause most cloud breaches.

s3_public(acl)        -> True if a bucket ACL grants public access.
iam_overbroad(policy) -> True if an IAM policy grants wildcard action/resource.
Inputs are parsed dicts (e.g., from boto3); we never call a live cloud here.

This is the canonical bluekit module for Chapter 15 (signatures are frozen so
later chapters compose with it). NEVER run posture tooling against an account
you do not own or are not authorized to assess.
"""

PUBLIC_GRANTEES = {"AllUsers", "AuthenticatedUsers"}  # AWS "everyone" / "any AWS account"


def s3_public(acl: dict) -> bool:
    """Return True if the bucket ACL grants READ/WRITE to a public grantee."""
    for grant in acl.get("Grants", []):
        grantee = grant.get("Grantee", {}).get("URI", "")
        # AWS encodes public grantees as .../groups/global/AllUsers etc.
        if any(g in grantee for g in PUBLIC_GRANTEES):
            return True
    return False


def iam_overbroad(policy: dict) -> bool:
    """Return True if any Allow statement uses a wildcard action AND resource."""
    statements = policy.get("Statement", [])
    if isinstance(statements, dict):       # a single-statement policy is a dict
        statements = [statements]
    for st in statements:
        if st.get("Effect") != "Allow":
            continue
        actions = st.get("Action", [])
        resources = st.get("Resource", [])
        actions = [actions] if isinstance(actions, str) else actions
        resources = [resources] if isinstance(resources, str) else resources
        if "*" in actions and "*" in resources:
            return True
    return False


if __name__ == "__main__":
    public_acl = {"Grants": [
        {"Grantee": {"URI": "http://acs.amazonaws.com/groups/global/AllUsers"},
         "Permission": "READ"}]}
    private_acl = {"Grants": [
        {"Grantee": {"ID": "ownerid"}, "Permission": "FULL_CONTROL"}]}
    admin_policy = {"Statement": [{"Effect": "Allow", "Action": "*", "Resource": "*"}]}
    scoped_policy = {"Statement": [{"Effect": "Allow",
        "Action": ["s3:GetObject"], "Resource": ["arn:aws:s3:::meridian-loan-docs/*"]}]}

    print("public bucket ACL   ->", s3_public(public_acl))
    print("private bucket ACL  ->", s3_public(private_acl))
    print("admin (*) policy    ->", iam_overbroad(admin_policy))
    print("scoped policy       ->", iam_overbroad(scoped_policy))

# Expected output:
# public bucket ACL   -> True
# private bucket ACL  -> False
# admin (*) policy    -> True
# scoped policy       -> False

Chapter 16 — Authentication

# bluekit/authn.py — Chapter 16 increment (the toolkit's authentication module)
"""Authentication helpers: password strength and breach-prefix (k-anonymity).

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring, and no
network call is made — `breached_prefix` only demonstrates the k-anonymity SHAPE
(send a hash prefix, never the password). Extended alongside the IAM modules in
Chapters 17-20.
"""

# A tiny stand-in for a real breached/common-password corpus (would be millions of entries).
COMMON = {"password", "123456", "qwerty", "letmein", "password1", "iloveyou"}


def password_strength(pw: str) -> str:
    """Crude strength band from length + variety + a common-password screen.
    Returns 'reject' | 'weak' | 'ok' | 'strong'. Screening beats composition rules."""
    if pw.lower() in COMMON or len(pw) < 8:
        return "reject"
    classes = sum(bool(s) for s in (
        any(c.islower() for c in pw), any(c.isupper() for c in pw),
        any(c.isdigit() for c in pw), any(not c.isalnum() for c in pw)))
    if len(pw) >= 16 or (len(pw) >= 12 and classes >= 3):
        return "strong"
    return "ok" if len(pw) >= 12 or classes >= 3 else "weak"


def breached_prefix(sha1_hex: str) -> str:
    """k-anonymity concept: you send only the first 5 hex chars of SHA-1(password)
    to a breach service, which returns all suffixes for that prefix to match LOCALLY.
    The full hash (and password) never leaves your side. (No network here.)
    Takes a precomputed SHA-1 hex digest so this stays a pure, deterministic helper."""
    return sha1_hex[:5].upper()


if __name__ == "__main__":
    for pw in ("password1", "hunter2", "correct horse battery", "Tr0ub4dor&3xtra!"):
        print(f"{pw!r:28} -> {password_strength(pw)}")
    # In real use: digest = hashlib.sha1(pw.encode()).hexdigest(); send only the prefix.
    # We pass an ILLUSTRATIVE (fake) digest here so the trace is deterministic without
    # executing a hash — the point is the SHAPE (send 5 chars), not this constant.
    illustrative_digest = "abf0f1d2e3c4b5a6978899aabbccddeeff001122"  # FAKE example
    print("send-prefix:", breached_prefix(illustrative_digest))

# Expected output:
# 'password1'                  -> reject
# 'hunter2'                    -> reject
# 'correct horse battery'      -> strong
# 'Tr0ub4dor&3xtra!'           -> strong
# send-prefix: ABF0F

Chapter 17 — Authorization Access Control

# bluekit/authz.py  — Chapter 17 increment (the toolkit's authorization core)
"""Two authorization primitives: an RBAC permission check and an ABAC policy
evaluation. rbac_check answers 'do this user's roles grant the action?';
abac_eval answers 'do the request's attributes satisfy the policy's
conditions?'. Real systems combine them: RBAC for the coarse 'what job
function', ABAC for the fine 'but only under these conditions'.

All bluekit code is illustrative and hand-traced: the runnable block below
shows its expected output as a comment. Nothing here is executed during
authoring (Hard Rule 1). Extended into context-aware decisions in Chapter 32.
"""

# Role -> permissions catalog (excerpt of Meridian's branch roles, Fig 17.1).
ROLE_PERMS = {
    "Teller":        {"open_account", "accept_deposit", "read_balance"},
    "Senior_Teller": {"open_account", "accept_deposit", "read_balance", "reverse_txn"},
    "Wire_Operator": {"initiate_wire"},   # note: NOT approve_wire (segregation)
    "Wire_Approver": {"approve_wire"},    # note: NOT initiate_wire (segregation)
}


def rbac_check(user_roles: set[str], action: str) -> bool:
    """Permit iff ANY of the user's roles grants the requested action."""
    if not user_roles:
        return False
    granted = set().union(*(ROLE_PERMS.get(r, set()) for r in user_roles))
    return action in granted


def abac_eval(attrs: dict, policy: dict) -> bool:
    """Permit iff EVERY condition in policy is satisfied by attrs (fail-closed)."""
    for key, required in policy.items():
        if attrs.get(key) != required:
            return False          # any unmet/missing condition => deny
    return True


if __name__ == "__main__":
    # RBAC: a Wire_Operator may initiate but NOT approve (segregation of duties).
    op = {"Wire_Operator"}
    print("operator initiate_wire:", rbac_check(op, "initiate_wire"))  # True
    print("operator approve_wire :", rbac_check(op, "approve_wire"))   # False

    # ABAC: approving a wire also requires a managed device, on-network, MFA.
    policy = {"device": "managed", "network": "corp", "mfa": True}
    ok = {"device": "managed", "network": "corp", "mfa": True}
    bad = {"device": "managed", "network": "corp", "mfa": False}  # missing MFA
    print("approve from compliant context:", abac_eval(ok, policy))   # True
    print("approve, MFA absent           :", abac_eval(bad, policy))  # False

# Expected output:
# operator initiate_wire: True
# operator approve_wire : False
# approve from compliant context: True
# approve, MFA absent           : False

Chapter 18 — Identity Governance

# bluekit/idgov.py  — Chapter 18 increment (the toolkit's identity-governance module)
"""Identity governance: find orphaned accounts and review access grants.

orphan_accounts() reconciles directory users against the HR/roster source of
truth (the core §18.5 hunt). access_review() flags grants for certification.

All bluekit code is illustrative and hand-traced: the runnable block below shows
its expected output as a comment. Nothing here is executed during authoring.
"""


def orphan_accounts(users, hr):
    """Return enabled directory users with no active record in the source of truth.

    users: list of dicts {"name", "enabled"}.  hr: set of names of active people.
    The reconciliation at the heart of identity governance: directory (who CAN
    get in) minus systems of record (who SHOULD) = orphans.
    """
    return [u["name"] for u in users
            if u["enabled"] and u["name"] not in hr]


def access_review(grants):
    """Tag each access grant KEEP or REVOKE for a certification report.

    grants: list of dicts {"user", "active", "role_ok"}.
    Revoke if the user is not active (orphan) OR the grant mismatches their role
    (privilege creep / mover not reconciled).
    """
    report = []
    for g in grants:
        revoke = (not g["active"]) or (not g["role_ok"])
        report.append((g["user"], "REVOKE" if revoke else "KEEP"))
    return report


if __name__ == "__main__":
    users = [{"name": "loanofficer3", "enabled": True},
             {"name": "contractor_x", "enabled": True},   # gone, never disabled
             {"name": "pwozniak",     "enabled": True}]    # terminated ex-employee
    hr = {"loanofficer3", "loanofficer7"}                  # active people only
    print("orphans:", orphan_accounts(users, hr))

    grants = [{"user": "loanofficer3", "active": True,  "role_ok": True},
              {"user": "rkhan",        "active": True,  "role_ok": False},  # moved
              {"user": "contractor_x", "active": False, "role_ok": False}]  # orphan
    for user, dec in access_review(grants):
        print(f"{dec:6s} {user}")

# Expected output:
# orphans: ['contractor_x', 'pwozniak']
# KEEP   loanofficer3
# REVOKE rkhan
# REVOKE contractor_x

Chapter 19 — Privileged Access Management

# bluekit/pam.py  — Chapter 19 increment (privileged access management)
"""Privileged Access Management helpers: inventory triage and JIT windows.

privileged_inventory() flags privileged accounts whose hygiene is risky;
jit_window() computes the time-boxed grant for a just-in-time access request.

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
Service-account *secrets* (vaulting/rotation of non-human credentials) are handled
fully in Chapter 20; here service accounts appear only as privileged accounts to flag.
"""
from datetime import datetime, timedelta

# Max JIT grant (minutes) by tier — tighter for the more sensitive tiers.
MAX_MINUTES = {0: 60, 1: 120, 2: 240}


def privileged_inventory(accounts):
    """Return (name, risk_flags) for accounts needing attention. Highest risk first.

    Flags model the §19.1 hygiene failures that turn a foothold into a takeover:
    standing shared secrets, no vault, stale credentials, shared-without-recording.
    """
    out = []
    for a in accounts:
        flags = []
        if not a.get("vaulted"):
            flags.append("NOT_VAULTED")
        if a.get("standing"):
            flags.append("STANDING_ACCESS")        # should be JIT-eligible, not a member
        if a.get("shared") and not a.get("recorded"):
            flags.append("SHARED_NO_RECORDING")     # shared account with no accountability
        if a.get("last_rotated_days", 999) > 90:
            flags.append("STALE_CREDENTIAL")
        if flags:
            out.append((a["name"], flags))
    return sorted(out, key=lambda t: len(t[1]), reverse=True)


def jit_window(req, now=None):
    """Time-box a JIT request to its tier's max; deny if Tier 0/1 approval is missing.

    Encodes §19.3: separation of duties (Tier 0/1 needs approval) and time-boxing
    (the requested duration is clamped to the tier maximum).
    """
    now = now or datetime(2026, 6, 14, 14, 0)          # fixed for a stable hand-trace
    tier = req["tier"]
    if tier <= 1 and not req.get("approved"):
        return {"granted": False, "reason": "approval required for Tier 0/1"}
    minutes = min(req.get("minutes", 60), MAX_MINUTES[tier])
    return {"granted": True, "start": now,
            "end": now + timedelta(minutes=minutes), "minutes": minutes}


if __name__ == "__main__":
    inv = [
        {"name": "DOMAIN\\admin",  "vaulted": False, "standing": True,
         "shared": True, "recorded": False, "last_rotated_days": 330},
        {"name": "svc-backup",     "vaulted": True,  "standing": True,
         "last_rotated_days": 400},
        {"name": "help-desk-adm",  "vaulted": True,  "standing": False,
         "last_rotated_days": 10},
    ]
    for name, flags in privileged_inventory(inv):
        print(f"{name:14s} {','.join(flags)}")
    print("---")
    print(jit_window({"tier": 0, "minutes": 240, "approved": False}))
    print(jit_window({"tier": 0, "minutes": 240, "approved": True}))

# Expected output:
# DOMAIN\admin   NOT_VAULTED,STANDING_ACCESS,SHARED_NO_RECORDING,STALE_CREDENTIAL
# svc-backup     STANDING_ACCESS,STALE_CREDENTIAL
# ---
# {'granted': False, 'reason': 'approval required for Tier 0/1'}
# {'granted': True, 'start': datetime.datetime(2026, 6, 14, 14, 0), 'end': datetime.datetime(2026, 6, 14, 15, 0), 'minutes': 60}

Chapter 20 — Secrets And Machine Identity

# bluekit/secrets.py  — Chapter 20 increment
"""Find leaked secrets and flag soon-to-expire certificates.

Defensive use only: scan systems you own or are authorized to scan.
Uses ONLY obviously-fake placeholder secret values in examples.
Canonical bluekit signatures: scan_secrets(text), cert_days_left(not_after).
Code is NEVER executed during authoring; expected output is hand-traced.
"""
import re
from datetime import datetime, timezone

SECRET_PATTERNS = {
    "aws_access_key_id": re.compile(r"\bAKIA[0-9A-Z]{16}\b"),
    "github_pat":        re.compile(r"\bghp_[0-9A-Za-z]{36}\b"),
    "private_key_block": re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----"),
}


def scan_secrets(text: str) -> list[tuple[str, str]]:
    """Return (kind, value) for each suspected secret found in text."""
    out = []
    for kind, pat in SECRET_PATTERNS.items():
        out.extend((kind, m) for m in pat.findall(text))
    return out


def cert_days_left(not_after: str, now: datetime | None = None) -> int:
    """Days until an ISO-8601 UTC expiry; negative means already expired."""
    now = now or datetime.now(timezone.utc)
    return (datetime.fromisoformat(not_after) - now).days


if __name__ == "__main__":
    code = 'key = "AKIAIOSFODNN7EXAMPLE"\nok = "not a secret"'
    for kind, value in scan_secrets(code):
        print(f"LEAK {kind}: {value}")
    fixed_now = datetime(2026, 6, 14, tzinfo=timezone.utc)
    print("cert days left:", cert_days_left("2026-07-04T00:00:00+00:00", fixed_now))

# Expected output:
# LEAK aws_access_key_id: AKIAIOSFODNN7EXAMPLE
# cert days left: 20
#
# Trace: scan finds AKIA + 16 uppercase chars on line 1; line 2 has no pattern.
#        cert_days_left: 2026-07-04 minus 2026-06-14 = 20 days.
# Remember the rule: a confirmed leak is ROTATED, never just deleted.

Chapter 21 — Siem

# bluekit/siem.py  — Chapter 21 increment
"""Minimal SIEM core: normalize raw events to a common schema, then correlate.

Two operations at the heart of any SIEM:
  normalize(raw, source)  -> map a source-specific raw event onto the common schema
  correlate(events, rule) -> apply a simple threshold rule to normalized events

NEVER executed during authoring; expected output is hand-traced below.
"""


def normalize(raw: dict, source: str) -> dict:
    """Map a source-specific raw event onto the common schema."""
    field_map = {                       # which raw key holds each canonical field
        "vpn":          {"user": "u",  "src_ip": "ip", "outcome": "res"},
        "win_security": {"user": "TargetUserName", "src_ip": "IpAddress",
                         "outcome": "Status"},
    }
    m = field_map[source]
    res = raw.get(m["outcome"], "")
    outcome = "success" if str(res) in ("success", "0x0", "0") else "failure"
    return {"timestamp": raw["ts"], "source": source, "action": "login",
            "user": raw.get(m["user"]), "src_ip": raw.get(m["src_ip"]),
            "outcome": outcome}


def correlate(events: list, rule: dict) -> list:
    """Threshold rule: alert if >= rule['count'] matching events for one user."""
    counts = {}
    for e in events:
        if all(e.get(k) == v for k, v in rule["match"].items()):
            counts[e["user"]] = counts.get(e["user"], 0) + 1
    return [{"alert": rule["name"], "user": u, "n": n}
            for u, n in counts.items() if n >= rule["count"]]


if __name__ == "__main__":
    raw = [{"ts": "…02:01:10Z", "u": "mreyes", "ip": "203.0.113.77", "res": "fail"},
           {"ts": "…02:01:14Z", "u": "mreyes", "ip": "203.0.113.77", "res": "fail"},
           {"ts": "…02:01:19Z", "u": "mreyes", "ip": "203.0.113.77", "res": "fail"}]
    evs = [normalize(r, "vpn") for r in raw]
    rule = {"name": "vpn_brute_force", "count": 3,
            "match": {"action": "login", "outcome": "failure"}}
    print(evs[0])
    print(correlate(evs, rule))

# Expected output:
# {'timestamp': '…02:01:10Z', 'source': 'vpn', 'action': 'login', 'user': 'mreyes', 'src_ip': '203.0.113.77', 'outcome': 'failure'}
# [{'alert': 'vpn_brute_force', 'user': 'mreyes', 'n': 3}]

Chapter 22 — Threat Detection And Hunting

# bluekit/detect.py  — Chapter 22 increment (the toolkit's detection module)
"""Two faces of detection: indicator matching (bottom of the pyramid of pain)
and behavioral technique-tagging (top of the pyramid).

`ioc_match` is precise but brittle — it only fires on indicators you already
know are bad. `attack_technique` is robust to the adversary changing their
infrastructure, because it keys on the BEHAVIOR a technique requires.

All bluekit code is illustrative and hand-traced: every runnable block shows
its expected output as a comment. Nothing here is executed during authoring.
"""


def ioc_match(event: dict, iocs: dict) -> list[str]:
    """Return notes for any known-bad indicator present in the event.

    `iocs` maps an indicator-type (an event field name) to a set of known-bad
    values, e.g. {"dest_ip": {"203.0.113.77"}, "sha256": {"5f4dcc3b2ab7"}}.
    """
    hits = []
    for field, bad_values in iocs.items():
        value = event.get(field)
        if value is not None and value in bad_values:
            hits.append(f"IoC[{field}]={value}")
    return hits


def attack_technique(event: dict) -> str:
    """Map an event to the ATT&CK technique it most resembles (behavioral)."""
    parent = (event.get("parent_image") or "").lower()
    image = (event.get("image") or "").lower()
    if parent.endswith(("winword.exe", "excel.exe")) and "cmd" in image:
        return "T1059.003"      # Command and Scripting Interpreter: Windows Command Shell
    if (event.get("src_zone") == "server"
            and event.get("dest_category") == "external"
            and event.get("beacon_regular")):
        return "T1071.001"      # Application Layer Protocol: Web Protocols (C2 beacon)
    if "lsass" in (event.get("target_image") or "").lower():
        return "T1003.001"      # OS Credential Dumping: LSASS Memory
    return "unknown"


if __name__ == "__main__":
    iocs = {"dest_ip": {"203.0.113.77"}, "sha256": {"5f4dcc3b2ab7"}}
    e1 = {"dest_ip": "203.0.113.77", "src_zone": "server",
          "dest_category": "external", "beacon_regular": True}
    e2 = {"parent_image": r"C:\Office\winword.exe", "image": r"C:\Windows\cmd.exe"}
    print("e1 IoCs:    ", ioc_match(e1, iocs))
    print("e1 technique:", attack_technique(e1))
    print("e2 technique:", attack_technique(e2))

# Expected output:
# e1 IoCs:     ['IoC[dest_ip]=203.0.113.77']
# e1 technique: T1071.001
# e2 technique: T1059.003

Chapter 23 — Vulnerability Management

# bluekit/vulnmgmt.py  — Chapter 23 increment (risk-based vulnerability prioritization)
"""Risk-based vulnerability prioritization and patch SLAs for the defender's kit.

priority(): rank a finding by CVSS + EPSS + KEV (the section 23.3 signals). KEV
            (active exploitation) dominates; high EPSS escalates; CVSS is the base.
            Asset context is folded in by the CALLER (it knows what's internet-facing).
patch_sla(): map a priority label to a remediation deadline in days (section 23.4).

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
"""


def priority(cvss: float, kev: bool, epss: float) -> str:
    """Return a remediation priority. KEV or high exploit-probability beats raw CVSS."""
    if kev or epss >= 0.5:                 # actively exploited OR likely to be: jump the queue
        return "P1-EMERGENCY"
    if cvss >= 9.0 and epss >= 0.1:        # critical severity AND non-trivial exploit odds
        return "P2-CRITICAL"
    if cvss >= 7.0:                        # high severity, low exploit odds: scheduled
        return "P3-HIGH"
    return "P4-ROUTINE"                    # everything else: routine SLA


def patch_sla(sev: str) -> int:
    """Days-to-remediate for an (internet-facing) asset, by priority label."""
    return {"P1-EMERGENCY": 3, "P2-CRITICAL": 7, "P3-HIGH": 14, "P4-ROUTINE": 30}.get(sev, 30)


if __name__ == "__main__":
    # (label, cvss, kev, epss) — the five Meridian findings from the section 23.3 table
    findings = [
        ("A Log4Shell on banking portal", 10.0, True,  0.94),
        ("C OpenSSL on marketing site",    9.8, False, 0.02),
        ("D Local priv-esc on workstation",7.8, False, 0.01),
        ("E Windows RCE (on KEV)",         8.1, True,  0.90),
    ]
    for name, cvss, kev, epss in findings:
        p = priority(cvss, kev, epss)
        print(f"{p:13s}  SLA {patch_sla(p):2d}d  {name}")

# Expected output:
# P1-EMERGENCY   SLA  3d  A Log4Shell on banking portal
# P3-HIGH        SLA 14d  C OpenSSL on marketing site
# P4-ROUTINE     SLA 30d  D Local priv-esc on workstation
# P1-EMERGENCY   SLA  3d  E Windows RCE (on KEV)

Chapter 24 — Incident Response

# bluekit/ir.py  — Chapter 24 increment
"""Incident-response triage and containment helpers.

triage(alert):       signals -> (severity, action)   [the SS24.3 decision tree]
containment(type):   incident type -> posture        [the SS24.4 tradeoff logic]

Canonical bluekit module for Chapter 24. NEVER executed during authoring —
expected output is hand-traced below. Signatures are stable (see _style-bible.md SS4).
"""


def triage(alert: dict) -> tuple[str, str]:
    """Map alert signals to (severity, recommended action). Highest trigger wins."""
    data_or_core = alert.get("affects") in {"customer_data", "core_banking", "domain_controller"}
    if alert.get("ransomware") or data_or_core:
        return ("SEV-1", "DECLARE: assign IC, open war room, invoke playbook, notify per comms plan")
    if alert.get("privileged_account") or alert.get("lateral_movement"):
        return ("SEV-2", "ENGAGE IR lead; consider declaring; begin scoping")
    if alert.get("malware") or alert.get("account_compromise"):
        return ("SEV-3", "HANDLE in SOC; document; monitor")
    return ("SEV-4", "LOG and trend")


def containment(incident_type: str) -> str:
    """Return the containment posture for an incident type (SS24.4 tradeoffs)."""
    fast_destructive = {"ransomware", "wiper", "active_exfiltration"}
    stealthy = {"apt", "persistent_intrusion", "insider_slow"}
    if incident_type in fast_destructive:
        return "IMMEDIATE aggressive containment: isolate now, scope in parallel (speed > stealth)"
    if incident_type in stealthy:
        return "QUIET thorough scoping first, then coordinated containment everywhere at once"
    return "PROPORTIONATE: contain the known footholds, keep scoping"


if __name__ == "__main__":
    print(triage({"ransomware": True, "affects": "core_banking"}))
    print(triage({"privileged_account": True}))
    print(triage({"malware": True}))
    print(containment("ransomware"))
    print(containment("apt"))

# Expected output:
# ('SEV-1', 'DECLARE: assign IC, open war room, invoke playbook, notify per comms plan')
# ('SEV-2', 'ENGAGE IR lead; consider declaring; begin scoping')
# ('SEV-3', 'HANDLE in SOC; document; monitor')
# IMMEDIATE aggressive containment: isolate now, scope in parallel (speed > stealth)
# QUIET thorough scoping first, then coordinated containment everywhere at once

Chapter 25 — Digital Forensics

# bluekit/forensics.py  — Chapter 25 increment
"""Two core DFIR operations: evidence-integrity verification and timeline merge.

evidence_hash(): confirm an image matches its source (integrity, ch.25 sec.2).
merge_timeline(): normalize multi-source events to one sorted timeline (sec.5).

NOT EXECUTED during authoring — outputs below are hand-traced. Hashes are
illustrative placeholders (a real SHA-256 digest is 64 hex chars).
"""
import hashlib


def evidence_hash(data: bytes, expected: str | None = None) -> dict:
    """Hash evidence bytes (SHA-256) and, if given, verify against expected."""
    digest = hashlib.sha256(data).hexdigest()
    verified = (expected is not None and digest == expected)
    return {"sha256": digest, "verified": verified}


def merge_timeline(sources: list[tuple[str, list[tuple[str, str]]]]) -> list[tuple]:
    """Merge (source_name, [(utc_timestamp, event), ...]) into one sorted list.

    Each input timestamp is assumed already normalized to UTC 'YYYY-MM-DD HH:MM:SS'.
    Returns rows (timestamp, source, event) sorted chronologically.
    """
    rows = [(ts, name, event)
            for name, events in sources
            for ts, event in events]
    return sorted(rows, key=lambda r: r[0])      # sort by UTC timestamp string


if __name__ == "__main__":
    h = evidence_hash(b"forensic image bytes",
                      expected=hashlib.sha256(b"forensic image bytes").hexdigest())
    print(f"verified={h['verified']}")
    tl = merge_timeline([
        ("VPN",    [("2025-04-15 02:02:50", "VPN connect user=jlopez")]),
        ("WinEvt", [("2025-04-15 02:14:07", "logon svc_backup"),
                    ("2025-04-15 03:02:10", "audit log cleared")]),
        ("MFT",    [("2025-04-15 02:15:02", "sh.exe created")]),
    ])
    for ts, src, event in tl:
        print(f"{ts}  {src:7s} {event}")

# Expected output:
# verified=True
# 2025-04-15 02:02:50  VPN     VPN connect user=jlopez
# 2025-04-15 02:14:07  WinEvt  logon svc_backup
# 2025-04-15 02:15:02  MFT     sh.exe created
# 2025-04-15 03:02:10  WinEvt  audit log cleared

Chapter 26 — Security Governance

# bluekit/compliance.py  — Chapter 26 increment (the GRC module begins here)
"""Governance & compliance helpers.

Started in Ch.26 with coverage measurement; extended with crosswalk() in Ch.28
and vendor_risk() in Ch.29. Coverage = which framework items your controls satisfy.
All bluekit code is illustrative and hand-traced; nothing is executed at authoring.
"""


def policy_coverage(controls: dict, framework: list) -> dict:
    """Map controls to a framework's required items and report coverage.

    controls : {control_id: set_of_framework_ids_it_satisfies}
    framework: list of required framework item ids (e.g., CSF subcategories)
    returns  : {'covered': [...], 'gaps': [...], 'pct': float}
    """
    satisfied = set()
    for ids in controls.values():
        satisfied |= set(ids)                      # union of all coverage
    covered = [f for f in framework if f in satisfied]
    gaps    = [f for f in framework if f not in satisfied]
    pct = round(100 * len(covered) / len(framework), 1) if framework else 0.0
    return {"covered": covered, "gaps": gaps, "pct": pct}


if __name__ == "__main__":
    # A tiny slice of NIST CSF 2.0 Function/Category ids (illustrative).
    csf_slice = ["GV.RR", "ID.AM", "PR.AA", "PR.DS", "DE.CM", "RS.MA"]
    meridian_controls = {
        "Authentication Standard (Ch.16)": {"PR.AA"},          # identity/auth
        "Hardening Baselines (Ch.11)":     {"PR.DS"},          # data/system protection
        "SIEM + use cases (Ch.21)":        {"DE.CM"},          # continuous monitoring
        "IR Plan (Ch.24)":                 {"RS.MA"},          # response management
        "Asset Inventory (Ch.1)":          {"ID.AM"},          # asset management
        # NOTE: nothing yet maps to GV.RR (Govern: Roles & Responsibilities)!
    }
    result = policy_coverage(meridian_controls, csf_slice)
    print(f"Coverage: {result['pct']}%")
    print("Gaps    :", result["gaps"])

# Expected output:
# Coverage: 83.3%
# Gaps    : ['GV.RR']

Chapter 27 — Risk Management

# bluekit/riskcalc.py  — Chapter 27 increment (adds quantitative methods)
"""Risk scoring for the defender's kit.

Chapter 1 added the qualitative half: risk_score(likelihood, impact) and band(score).
Chapter 27 adds the quantitative half used in §27.3:
    ale(sle, aro)        -- annualized loss expectancy (dollars per year)
    prioritize(risks)    -- sort a register by ALE, highest first

The qualitative functions (risk_score, band) are reproduced UNCHANGED from Chapter 1
so the two views agree and a reader can use this single module either way.

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
"""


def risk_score(likelihood: int, impact: int) -> int:
    """Chapter 1: qualitative score (1-25) from 1-5 likelihood and impact."""
    for name, v in (("likelihood", likelihood), ("impact", impact)):
        if not 1 <= v <= 5:
            raise ValueError(f"{name} must be 1-5, got {v}")
    return likelihood * impact


def band(score: int) -> str:
    """Chapter 1: map a 1-25 score to an action band a board will understand."""
    if score >= 15:
        return "CRITICAL"   # fix now
    if score >= 8:
        return "HIGH"       # fix this quarter
    if score >= 4:
        return "MEDIUM"     # plan it
    return "LOW"            # accept or monitor


def ale(sle: float, aro: float) -> float:
    """Chapter 27: annualized loss expectancy = single loss expectancy x annual rate.
    SLE is per-event dollars; ARO is events per year; ALE is dollars per year."""
    if sle < 0 or aro < 0:
        raise ValueError("SLE and ARO must be non-negative")
    return sle * aro


def prioritize(risks: list[dict]) -> list[dict]:
    """Chapter 27: sort a register (each {'id','sle','aro'}) by ALE, highest first.
    Annotates each risk with its computed 'ale'."""
    for r in risks:
        r["ale"] = ale(r["sle"], r["aro"])
    return sorted(risks, key=lambda r: r["ale"], reverse=True)


if __name__ == "__main__":
    register = [
        {"id": "R1-credential-attack", "sle": 900_000,   "aro": 2.0},
        {"id": "R-ddos-outage",        "sle": 1_000_000, "aro": 2.0},
        {"id": "R-printer-server",     "sle": 5_000,     "aro": 0.5},
    ]
    for r in prioritize(register):
        print(f"${r['ale']:>12,.0f}  {r['id']}")

# Expected output:
# $   2,000,000  R-ddos-outage
# $   1,800,000  R1-credential-attack
# $       2,500  R-printer-server

Chapter 28 — Compliance Frameworks

# bluekit/compliance.py  — Chapter 28 increment (extends Ch.26's compliance module)
"""Control crosswalk: map one control to its requirement areas across frameworks.

A crosswalk lets one control + one artifact satisfy many obligations. We model it
as a dict of {control_id: {framework: requirement_area}} and answer two questions:
which frameworks a control covers, and where two frameworks overlap for a control.

All bluekit code is illustrative and hand-traced: the expected output below is
reasoned out by hand. Nothing here is executed during authoring. The crosswalk
proves controls are MAPPED across frameworks, never that they are GOOD against an
attacker — that judgment lives in the risk register (riskcalc.py, Ch.27).
"""

# Illustrative mapping. Requirement AREAS only (no invented exact requirement numbers).
CROSSWALK = {
    "mfa-admin": {
        "NIST_CSF":  "Protect: Identity Mgmt & Access Control",
        "ISO_27001": "Access control / authentication",
        "SOC2":      "Security: logical access",
        "PCI_DSS":   "Strong authentication into the CDE",
        "GLBA":      "Access controls on customer information",
    },
    "encrypt-transit": {
        "NIST_CSF":  "Protect: Data Security (in transit)",
        "ISO_27001": "Cryptography / secure transmission",
        "SOC2":      "Confidentiality",
        "PCI_DSS":   "Encrypt cardholder data in transit",
    },
}


def crosswalk(framework_a: str, framework_b: str) -> list[tuple[str, str, str]]:
    """Return (control_id, req_in_A, req_in_B) for every control mapped to BOTH."""
    rows = []
    for control_id, mappings in sorted(CROSSWALK.items()):
        if framework_a in mappings and framework_b in mappings:
            rows.append((control_id, mappings[framework_a], mappings[framework_b]))
    return rows


def frameworks_covered(control_id: str) -> list[str]:
    """List every framework a given control is mapped to (a small convenience helper)."""
    return sorted(CROSSWALK.get(control_id, {}).keys())


if __name__ == "__main__":
    for control_id, req_a, req_b in crosswalk("PCI_DSS", "NIST_CSF"):
        print(f"{control_id:16s} PCI: {req_a}")
        print(f"{'':16s} CSF: {req_b}")
    print("---")
    print("mfa-admin covers:", ", ".join(frameworks_covered("mfa-admin")))

# Expected output:
# encrypt-transit  PCI: Encrypt cardholder data in transit
#                  CSF: Protect: Data Security (in transit)
# mfa-admin        PCI: Strong authentication into the CDE
#                  CSF: Protect: Identity Mgmt & Access Control
# ---
# mfa-admin covers: GLBA, ISO_27001, NIST_CSF, PCI_DSS, SOC2

Chapter 29 — Third Party Supply Chain Risk

# bluekit/compliance.py  — Chapter 29 increment
"""Third-party/vendor risk scoring for the defender's kit.

Extends compliance.py (begun Ch.26 with policy_coverage; extended Ch.28 with
crosswalk) with vendor_risk(answers): weighted questionnaire scoring PLUS a
critical-control override, so a high average can never hide a missing admin MFA
(the §29.4 lesson). Returns (pct, tier, flags).

All bluekit code is illustrative and hand-traced: nothing is executed at authoring.
"""

CRITICAL = "critical"   # mark controls whose failure caps the tier regardless of average


def vendor_risk(answers, crit_threshold=2):
    """Score a vendor questionnaire into a tier.

    answers: list of (weight:int, score:0-4, kind) where kind == CRITICAL flags a
    control whose score below crit_threshold forces a HIGH-RISK tier regardless of
    the weighted average. Returns (pct:int, tier:str, flags:list of 1-based indices).
    """
    earned = sum(w * s for (w, s, _k) in answers)
    possible = sum(w * 4 for (w, s, _k) in answers)
    pct = round(100 * earned / possible) if possible else 0
    flags = [i for i, (w, s, k) in enumerate(answers, 1)
             if k == CRITICAL and s < crit_threshold]
    if flags:
        tier = "HIGH-RISK (critical control failed)"
    elif pct >= 85:
        tier = "LOW-RISK"
    elif pct >= 70:
        tier = "MODERATE-RISK (remediate gaps)"
    else:
        tier = "HIGH-RISK"
    return pct, tier, flags


if __name__ == "__main__":
    # The §29.4 document-management vendor (weights/scores from that table).
    doc_vendor = [(3, 4, CRITICAL), (3, 4, CRITICAL), (2, 3, ""), (2, 1, ""),
                  (3, 4, CRITICAL), (3, 3, CRITICAL), (2, 2, ""), (1, 3, ""),
                  (2, 2, ""), (1, 1, "")]
    print(vendor_risk(doc_vendor))
    # A vendor that FAILS admin MFA (a CRITICAL control) despite a high average:
    no_mfa = [(3, 0, CRITICAL), (3, 4, CRITICAL), (2, 4, ""), (2, 4, ""), (1, 4, "")]
    print(vendor_risk(no_mfa))

# Expected output:
# (74, 'MODERATE-RISK (remediate gaps)', [])
# (73, 'HIGH-RISK (critical control failed)', [1])

Chapter 30 — Security Awareness Training

# bluekit/awareness.py  — Chapter 30 increment (the human-firewall metrics module)
"""Phishing-simulation metrics: turn raw campaign results into the numbers that
matter (click rate, report rate) for the human firewall.

Results dict keys: received, opened, clicked, submitted, reported.
Rates are computed against 'received'. The healthy goal is report rate > click
rate — a workforce that reports more than it clicks is on the right side of the
line (Chapter 30, sec 30.4).

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
"""


def click_rate(results: dict) -> float:
    """Fraction of recipients who clicked the link, as a 0-1 float."""
    received = results.get("received", 0)
    if received <= 0:
        raise ValueError("received must be a positive count")
    return results.get("clicked", 0) / received


def report_rate(results: dict) -> float:
    """Fraction of recipients who reported the message, as a 0-1 float."""
    received = results["received"]          # KeyError if missing: fail loud
    return results.get("reported", 0) / received


def health(results: dict) -> str:
    """A one-word read: report rate above click rate is the healthy goal."""
    return "HEALTHY" if report_rate(results) > click_rate(results) else "WATCH"


if __name__ == "__main__":
    campaign = {"received": 400, "opened": 220,
                "clicked": 48, "submitted": 12, "reported": 96}
    cr = click_rate(campaign)
    rr = report_rate(campaign)
    print(f"click_rate  = {cr:.0%}")
    print(f"report_rate = {rr:.0%}")
    print(f"status      = {health(campaign)}")

# Expected output:
# click_rate  = 12%
# report_rate = 24%
# status      = HEALTHY

Chapter 31 — Devsecops

# bluekit/pipeline.py  — Chapter 31 increment (the toolkit's CI/CD gate module)
"""CI/CD security-gate logic: pass or fail a build by finding severity.

A gate inspects findings and decides. Fail only on findings AT OR ABOVE the
threshold severity, so low-severity issues warn without blocking delivery (31.3).

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
"""

# Severity order, lowest to highest. Index = how serious.
_ORDER = ["INFO", "LOW", "MEDIUM", "HIGH", "CRITICAL"]


def ci_gate(findings: list[dict], threshold: str = "HIGH") -> dict:
    """Decide a build's fate. 'findings': dicts with a 'severity' key.

    Returns {'passed': bool, 'blocking': [...], 'warned': [...]}.
    A finding blocks iff its severity index >= the threshold's index.
    """
    if threshold not in _ORDER:
        raise ValueError(f"threshold must be one of {_ORDER}, got {threshold}")
    cut = _ORDER.index(threshold)
    blocking, warned = [], []
    for f in findings:
        sev = f.get("severity", "INFO")
        if sev not in _ORDER:
            raise ValueError(f"bad severity {sev!r}")
        (blocking if _ORDER.index(sev) >= cut else warned).append(f)
    return {"passed": len(blocking) == 0, "blocking": blocking, "warned": warned}


if __name__ == "__main__":
    findings = [
        {"id": "CKV_AWS_24", "severity": "CRITICAL"},  # open SSH to 0.0.0.0/0
        {"id": "CKV_AWS_20", "severity": "HIGH"},      # public S3 bucket
        {"id": "CKV_AWS_23", "severity": "MEDIUM"},    # missing description
    ]
    result = ci_gate(findings, threshold="HIGH")
    print(f"passed={result['passed']}  "
          f"blocking={len(result['blocking'])}  warned={len(result['warned'])}")

# Expected output:
# passed=False  blocking=2  warned=1

Chapter 32 — Zero Trust Architecture

# bluekit/zerotrust.py  — Chapter 32 increment
"""A miniature policy engine: never trust, always verify.

policy_decision blends three signals — identity, device posture, context —
and returns (verdict, reason). Mirrors the wire-approval policy of section 32.3.
Resources carry their own required group, device, and context rules.

Illustrative only; NOT executed during authoring (expected output hand-traced below).
Defensive: an access-CONTROL evaluator returning GRANT / STEP_UP / DENY. No offense.
"""


def policy_decision(subject: dict, resource: dict, context: dict) -> tuple:
    """Return (verdict, reason): 'GRANT', 'STEP_UP', or 'DENY'."""
    # 1. Identity signal: subject must be in the resource's required group.
    if resource["required_group"] not in subject.get("groups", []):
        return ("DENY", "identity: not in required group")
    # 2. Device signal: posture gate (managed + healthy) if the resource needs it.
    if resource.get("require_managed_device"):
        if not (subject.get("device_managed") and subject.get("device_healthy")):
            return ("DENY", "device: unmanaged or unhealthy")
    # 3. Context signal: location, then computed risk.
    if context.get("location") not in resource.get("allowed_locations", []):
        return ("STEP_UP", "context: location not pre-approved")
    if context.get("risk_score", 0) >= resource.get("risk_threshold", 100):
        return ("STEP_UP", "context: risk score too high")
    # All three signals pass -> least-privilege session.
    return ("GRANT", "least-privilege session granted")


if __name__ == "__main__":
    wire_app = {"required_group": "wire-approvers", "require_managed_device": True,
                "allowed_locations": ["in-country"], "risk_threshold": 70}
    base = {"groups": ["wire-approvers"], "device_managed": True, "device_healthy": True}
    print(policy_decision(base, wire_app, {"location": "in-country", "risk_score": 10}))
    print(policy_decision({**base, "device_managed": False}, wire_app,
                          {"location": "in-country", "risk_score": 10}))
    print(policy_decision(base, wire_app, {"location": "foreign", "risk_score": 90}))

# Expected output:
# ('GRANT', 'least-privilege session granted')
# ('DENY', 'device: unmanaged or unhealthy')
# ('STEP_UP', 'context: location not pre-approved')

Chapter 33 — Operational Technology Security

# bluekit/otsec.py  — Chapter 33 increment
"""Purdue-model zoning for OT asset inventory.

Classify an asset into a Purdue level (0-5, plus 3.5 IDMZ) and report
which security domain it belongs to. Reachability is control in OT, so
knowing an asset's zone is the first step in defending the boundary.

All bluekit code is illustrative and hand-traced: the runnable block below
shows its expected output as a comment. Nothing here is executed during authoring.
"""

# Canonical role -> Purdue level. Lower = closer to the physical process.
_ROLE_TO_LEVEL = {
    "sensor": 0, "actuator": 0,        # field devices (the metal)
    "plc": 1, "rtu": 1, "controller": 1,  # basic control
    "hmi": 2, "area_scada": 2,         # area supervisory control
    "scada_server": 3, "historian": 3, "eng_ws": 3,  # site operations
    "idmz_broker": 3.5, "jump_host": 3.5, "historian_replica": 3.5,  # IDMZ
    "mes": 4, "site_file": 4, "scheduling": 4,        # business logistics (IT)
    "email": 5, "ad": 5, "internet": 5,               # enterprise (IT)
}


def purdue_zone(asset: dict) -> dict:
    """Return {level, domain, is_boundary} for an OT/IT asset by its role."""
    role = asset.get("role", "").lower()
    if role not in _ROLE_TO_LEVEL:
        raise ValueError(f"unknown role {role!r}; add it to _ROLE_TO_LEVEL")
    level = _ROLE_TO_LEVEL[role]
    domain = "IT" if level >= 4 else ("IDMZ" if level == 3.5 else "OT")
    # The IDMZ is the only place an IT<->OT exchange may legitimately occur.
    return {"level": level, "domain": domain, "is_boundary": level == 3.5}


if __name__ == "__main__":
    assets = [
        {"name": "server-hall-temp", "role": "sensor"},
        {"name": "bms-plc-3",        "role": "plc"},
        {"name": "facilities-hmi",   "role": "hmi"},
        {"name": "bms-scada-01",     "role": "scada_server"},
        {"name": "facilities-jump",  "role": "jump_host"},
        {"name": "corp-email",       "role": "email"},
    ]
    for a in assets:
        z = purdue_zone(a)
        flag = "  <-- IT/OT boundary" if z["is_boundary"] else ""
        print(f"{a['name']:18s} L{z['level']:<3} {z['domain']:4s}{flag}")

# Expected output:
# server-hall-temp   L0   OT
# bms-plc-3          L1   OT
# facilities-hmi     L2   OT
# bms-scada-01       L3   OT
# facilities-jump    L3.5 IDMZ  <-- IT/OT boundary
# corp-email         L5   IT

Chapter 34 — Ai Ml In Security

# bluekit/mlsec.py  — Chapter 34 increment (the toolkit's anomaly primitive)
"""Transparent anomaly detection: the simplest honest detector.

zscore_anomaly(series) baselines a per-entity series (mean, population stddev)
and reports the z-score of the LAST point and whether it crosses a threshold.
Explainability first: a z-score is a number a human can defend in an IR review.

All bluekit code is illustrative and hand-traced: every runnable block shows its
expected output as a comment. Nothing here is executed during authoring.
Anomalous is NOT the same as malicious (sec 34.1) -- this flags the unusual,
not the malicious; a human + context must judge intent.
"""


def zscore_anomaly(series, threshold=3.0):
    """Baseline = all but the last value; score the last value's deviation.

    Returns (z, is_anomaly). Uses the population standard deviation. Raises if
    the baseline is degenerate (too short, or zero spread -> z is undefined).
    """
    if len(series) < 3:
        raise ValueError("need at least 3 points (>=2 baseline + 1 test)")
    *baseline, x = series
    n = len(baseline)
    mean = sum(baseline) / n
    var = sum((v - mean) ** 2 for v in baseline) / n      # population variance
    std = var ** 0.5
    if std == 0:
        raise ValueError("zero-variance baseline; z-score undefined")
    z = (x - mean) / std
    return z, abs(z) >= threshold


if __name__ == "__main__":
    # svc-reconcile nightly failed logins: 10-night baseline, then tonight = 9
    nightly = [2, 3, 2, 4, 3, 2, 3, 4, 2, 5, 9]
    z, flag = zscore_anomaly(nightly)
    print(f"z = {z:.1f}   anomaly = {flag}")
    # A quiet host whose new-domain count jumps from a baseline of ~10 to 22
    z2, flag2 = zscore_anomaly([10, 12, 8, 10, 10, 22])
    print(f"z = {z2:.2f}  anomaly = {flag2}")

# Expected output:
# z = 6.0   anomaly = True
# z = 9.49  anomaly = True

Chapter 35 — Emerging Threats

# bluekit/cryptutil.py  — Chapter 35 increment (extends Ch.4/Ch.5)
"""crypto_inventory(systems): first step of a post-quantum (PQC) migration.

Quantum risk is HIGH when a system uses ASYMMETRIC public-key crypto (RSA/ECC/
ECDSA/ECDHE) AND its data must stay confidential long enough to be exposed to
harvest-now-decrypt-later. Symmetric/hash algorithms (AES, SHA-2/3, argon2, bcrypt)
are minimally affected. Illustrative and hand-traced -- not executed at authoring.
"""
ASYMMETRIC = ("rsa", "ecc", "ecdsa", "ecdhe", "dh", "dsa")  # quantum-breakable (Shor)


def crypto_inventory(systems, long_life_years=3):
    """systems: list of (name, algorithm, data_life_years). Returns list of
    (name, risk, reason), risk in {HIGH, MEDIUM, LOW}, sorted highest-risk first."""
    rank = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
    out = []
    for name, algo, life in systems:
        asym = any(tag in algo.lower() for tag in ASYMMETRIC)
        if asym and life >= long_life_years:
            risk, why = "HIGH", "asymmetric + long-lived data (harvest-now-decrypt-later)"
        elif asym:
            risk, why = "MEDIUM", "asymmetric but short-lived data; still migrate key exchange"
        else:
            risk, why = "LOW", "symmetric/hash; minimally quantum-affected"
        out.append((name, risk, why))
    return sorted(out, key=lambda r: rank[r[1]])


if __name__ == "__main__":
    inv = [("customer-archive", "RSA-2048+AES-256", 10),
           ("online-banking-TLS", "ECDHE+AES-256", 0),
           ("password-store", "argon2", 0),
           ("code-signing", "ECDSA", 5)]
    for name, risk, why in crypto_inventory(inv):
        print(f"{risk:6s} {name:20s} {why}")

# Expected output:
# HIGH   customer-archive     asymmetric + long-lived data (harvest-now-decrypt-later)
# HIGH   code-signing         asymmetric + long-lived data (harvest-now-decrypt-later)
# MEDIUM online-banking-TLS   asymmetric but short-lived data; still migrate key exchange
# LOW    password-store       symmetric/hash; minimally quantum-affected

Chapter 36 — Security Metrics And Reporting

# bluekit/metrics.py — Chapter 36 increment
"""Operational security metrics: the numbers that roll up into the board pack.

mttd/mttr take incidents as (begin, detect, resolve) hour-offsets within a
window; coverage takes counts of protected vs. in-scope items. Illustrative;
hand-traced, never executed at authoring time. See Chapter 27 for risk scoring
that these metrics report alongside.
"""


def mttd(incidents):
    """Mean time to DETECT: average of (detect - begin) over incidents."""
    gaps = [detect - begin for (begin, detect, _resolve) in incidents]
    return round(sum(gaps) / len(gaps), 2)


def mttr(incidents):
    """Mean time to RESPOND: average of (resolve - detect) over incidents."""
    gaps = [resolve - detect for (_begin, detect, resolve) in incidents]
    return round(sum(gaps) / len(gaps), 2)


def coverage(protected, in_scope):
    """Control coverage as a percent; the denominator is everything."""
    if in_scope <= 0:
        raise ValueError("in_scope must be > 0 (you must know your denominator)")
    return round(100 * protected / in_scope, 1)


if __name__ == "__main__":
    # (begin, detect, resolve) as hour-offsets — the four Q1 incidents.
    incidents = [(0, 2.4, 3.4), (0, 0.5, 1.0), (0, 18.0, 42.0), (0, 1.0, 3.0)]
    print(f"MTTD = {mttd(incidents)} h")
    print(f"MTTR = {mttr(incidents)} h")
    print(f"EDR coverage      = {coverage(209, 220)} %")
    print(f"Crit-log coverage = {coverage(51, 60)} %")

# Expected output:
# MTTD = 5.48 h
# MTTR = 6.9 h
# EDR coverage      = 95.0 %
# Crit-log coverage = 85.0 %

Chapter 37 — Building And Leading Security

"""
Chapter 37 — bluekit checkpoint (INTEGRATION, no new module).

Ch.37 integrates rather than adds a bluekit module. This script composes the
Ch.37 staffing helpers with metrics.py (Ch.36: mttd, mttr, coverage) to produce
the two numbers a CISO puts in front of a board: "can the team sustain the load?"
and "is our detection coverage improving?"

NEVER EXECUTED DURING AUTHORING -- hand-traced below. <=40 lines of logic.
Python 3.10+. (metrics.py is the Ch.36 module; staffing helpers are Example 01.)
"""
from bluekit import metrics                 # Ch.36: mttd(), mttr(), coverage()
from staffing import staffing_verdict       # Ch.37 / Example 01

# Ch.36 coverage(controls, framework) -> fraction of framework techniques covered.
# Illustrative inputs (hand-traced so we don't execute Ch.36 here):
ATTACK_TECHNIQUES = ["T1566", "T1078", "T1059", "T1486", "T1021"]  # 5 sampled
COVERED = ["T1566", "T1078", "T1059", "T1486"]                     # 4 of 5 detected


def soc_board_line(weekly_alerts: int, analysts: int,
                   covered, framework) -> str:
    cov = metrics.coverage(covered, framework)          # 4/5 = 0.8
    staff = staffing_verdict(weekly_alerts, analysts)
    health = "HEALTHY" if cov >= 0.75 and "SUSTAINABLE" in staff else "AT RISK"
    return (f"Detection coverage: {cov:.0%} | Staffing: {staff} | "
            f"SOC health: {health}")


if __name__ == "__main__":
    # Meridian AFTER the operating-model fix: ~960 alerts/week on 5 analysts.
    print(soc_board_line(960, 5, COVERED, ATTACK_TECHNIQUES))

# Expected output:
# Detection coverage: 80% | Staffing: SUSTAINABLE (960 vs cap 1250; 77% utilized) | SOC health: HEALTHY
#
# Hand-trace notes:
#   metrics.coverage(COVERED, ATTACK_TECHNIQUES) = 4 covered / 5 total = 0.8 -> "80%".
#   staffing_verdict(960, 5): cap = 5*50*5 = 1250; 960/1250 = 0.768 -> not >1.0,
#     not >0.8 -> "SUSTAINABLE (960 vs cap 1250; 77% utilized)".
#   health: cov 0.8 >= 0.75 TRUE and "SUSTAINABLE" in staff TRUE -> "HEALTHY".
#   The board line tells BOTH halves of the story (Theme: process not product):
#     detection is good AND the team can sustain operating it. Either half alone
#     is a trap -- great coverage with a burned-out team (Case Study 2) still
#     gets breached; a rested team with poor coverage misses what it never sees.

Chapter 38 — Capstone Security Program

# bluekit/program_dashboard.py  — Chapter 38 capstone (integrates the toolkit)
"""One-screen program health view, assembled from the modules built across the book.

Integrates: riskcalc (Ch.1/27 risk + ALE), metrics (Ch.36 MTTD/MTTR/coverage),
and a roadmap burndown. The capstone does in code what the chapter does in prose:
turn many components into one legible, board-ready summary.

Chapters 37-40 integrate rather than add a module; this is bluekit's capstone view.
All bluekit code is illustrative and hand-traced; nothing here is executed at authoring.

This file inlines the one dependency it needs (riskcalc.band, Ch.1) so it reads
standalone; in the assembled package it would `from riskcalc import band`.
"""


def band(score: int) -> str:
    """Ch.1: map a 1-25 qualitative risk score to a board-legible action band."""
    if score >= 15:
        return "CRITICAL"   # fix now
    if score >= 8:
        return "HIGH"       # fix this quarter
    if score >= 4:
        return "MEDIUM"     # plan it
    return "LOW"            # accept or monitor


def program_dashboard(state: dict) -> str:
    """Render a program-on-a-page from integrated module outputs.

    state = {
      'risks':    [{'id','score'}],          # qualitative scores (Ch.1)
      'ale_now':  float, 'ale_target': float,# annualized loss $ (Ch.27)
      'coverage': float,                      # ATT&CK detection coverage 0-1 (Ch.36)
      'mttd_h':   float, 'mttr_h': float,     # mean time to detect/respond, hrs (Ch.36)
      'appetite': float,                      # board-set risk appetite, $ (Ch.27)
    }
    """
    crit = sum(1 for r in state["risks"] if band(r["score"]) == "CRITICAL")
    burned = state["ale_now"] - state["ale_target"]
    over_appetite = state["ale_target"] > state["appetite"]
    lines = [
        "MERIDIAN SECURITY PROGRAM — DASHBOARD",
        f"  Risks: {len(state['risks'])} tracked, {crit} CRITICAL",
        f"  Annualized risk: ${state['ale_now']/1e6:.1f}M -> "
        f"${state['ale_target']/1e6:.1f}M  (burning down ${burned/1e6:.1f}M)",
        f"  Detection coverage: {state['coverage']*100:.0f}%  |  "
        f"MTTD {state['mttd_h']:.0f}h  MTTR {state['mttr_h']:.0f}h",
        f"  Residual vs appetite: "
        f"{'OVER — board decision needed' if over_appetite else 'within appetite'}",
    ]
    return "\n".join(lines)


if __name__ == "__main__":
    meridian = {
        "risks": [{"id": "R1", "score": 20}, {"id": "R2", "score": 15},
                  {"id": "R3", "score": 9},  {"id": "R4", "score": 6}],
        "ale_now": 6_000_000.0, "ale_target": 900_000.0,
        "coverage": 0.72, "mttd_h": 4.0, "mttr_h": 12.0,
        "appetite": 1_000_000.0,
    }
    print(program_dashboard(meridian))

# Expected output:
# MERIDIAN SECURITY PROGRAM — DASHBOARD
#   Risks: 4 tracked, 2 CRITICAL
#   Annualized risk: $6.0M -> $0.9M  (burning down $5.1M)
#   Detection coverage: 72%  |  MTTD 4h  MTTR 12h
#   Residual vs appetite: within appetite

Chapter 39 — Cybersecurity Career

"""defender-checkpoint.py — Chapter 39 checkpoint

Chapter 39 adds NO bluekit module (career development is judgment, not code). This
checkpoint helper instead supports the chapter's deliverable: YOUR personal
development plan. It summarizes a five-part plan and flags whether it is complete and
honest — the same disciplined, incremental method you applied to Meridian's program,
now pointed at yourself.

Illustrative only; never executed during authoring — output is hand-traced. The plan
is "right" when it is honest and specific (a true gap named, a single next step), not
when it is impressive.
"""

REQUIRED_PARTS = ("neighborhood", "skills_gap", "cert_roadmap", "lab_portfolio", "ethics")


def plan_status(plan: dict) -> str:
    """Report which of the five development-plan parts are present."""
    missing = [p for p in REQUIRED_PARTS if not plan.get(p)]
    if missing:
        return "INCOMPLETE — add: " + ", ".join(missing)
    return "COMPLETE — five parts present"


def honesty_check(plan: dict) -> str:
    """A plan must name a real gap and defer something; 'everything' says nothing."""
    has_gap = "gap" in str(plan.get("skills_gap", "")).lower()
    defers = "not yet" in str(plan.get("cert_roadmap", "")).lower()
    if has_gap and defers:
        return "HONEST — names a gap and defers a premature cert"
    return "REVISIT — name a real gap and one cert you are NOT chasing yet"


if __name__ == "__main__":
    theo_plan = {
        "neighborhood": "Blue team -> detection engineering",
        "skills_gap":   "cloud_aws_logging = gap; detections = partial",
        "cert_roadmap": "next Security+; then CySA+; NOT YET CISSP",
        "lab_portfolio":"4-VM lab; one write-up/month",
        "ethics":       "own lab or written yes; unsure = no",
    }
    print(plan_status(theo_plan))
    print(honesty_check(theo_plan))

# Expected output:
# COMPLETE — five parts present
# HONEST — names a gap and defers a premature cert

Chapter 40 — Case Studies

# bluekit/defender_checkpoint.py  — Chapter 40 capstone (integration, no new module)
"""Final synthesis: map each landmark breach to the book's controls and the
chapter that owns each, then render a one-line readiness verdict for Meridian.

Composes the toolkit you built (riskcalc Ch.1, purdue_zone Ch.33, the triage
pattern of ir.py Ch.24): this is reflection-as-code, not a new module.
Illustrative and hand-traced; nothing here is executed during authoring.
"""

# Each breach -> (root-cause shape, highest-leverage control, owning chapter).
BREACH_LESSONS = {
    "SolarWinds":  ("trusted supply chain weaponized", "behavioral detection + segmentation", "10/22/29/31/32"),
    "Colonial":    ("one forgotten account, no MFA",    "MFA + identity governance + IT/OT seg", "16/18/24/33"),
    "Log4Shell":   ("invisible transitive dependency",  "SBOM + risk-based vuln management",     "12/23/29"),
}


def breach_lessons(name: str) -> str:
    """Return a one-line 'cause -> control (chapters)' mapping for a landmark breach."""
    cause, control, ch = BREACH_LESSONS[name]
    return f"{name}: {cause}  ->  {control}  (Ch.{ch})"


def meridian_ready(controls_in_place: set[str]) -> tuple[str, list[str]]:
    """Final posture check: do we have the highest-leverage control class for each case?"""
    needed = {"SolarWinds": "detection", "Colonial": "mfa", "Log4Shell": "sbom"}
    gaps = [b for b, c in needed.items() if c not in controls_in_place]
    verdict = "READY (residual risk remains — the work is never finished)" if not gaps \
        else f"GAPS in: {', '.join(gaps)}"
    return (verdict, gaps)


if __name__ == "__main__":
    for breach in BREACH_LESSONS:
        print(breach_lessons(breach))
    print(meridian_ready({"detection", "mfa", "sbom"}))

# Expected output:
# SolarWinds: trusted supply chain weaponized  ->  behavioral detection + segmentation  (Ch.10/22/29/31/32)
# Colonial: one forgotten account, no MFA  ->  MFA + identity governance + IT/OT seg  (Ch.16/18/24/33)
# Log4Shell: invisible transitive dependency  ->  SBOM + risk-based vuln management  (Ch.12/23/29)
# ('READY (residual risk remains — the work is never finished)', [])