Appendix A: Python RegTech Reference

A practical coding reference for compliance professionals working with Python in RegTech contexts. This appendix collects the patterns, idioms, and snippets most relevant to the code examples in this textbook. It assumes you are comfortable writing Python and want quick, authoritative answers to RegTech-specific implementation questions — not a tutorial from first principles.

Requires: Python 3.10+. Optional dependencies noted per section.


A.1 Core Patterns

Dataclasses for Compliance Data Models

Dataclasses are the standard way to model compliance entities in this textbook. They provide typed fields, automatic __repr__, and slot-based memory efficiency — all useful when processing large regulatory datasets.

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum


class RiskTier(Enum):
    LOW = "LOW"
    MEDIUM = "MEDIUM"
    HIGH = "HIGH"
    VERY_HIGH = "VERY_HIGH"
    PROHIBITED = "PROHIBITED"


class AlertStatus(Enum):
    OPEN = "OPEN"
    UNDER_REVIEW = "UNDER_REVIEW"
    ESCALATED = "ESCALATED"
    CLOSED_SAR = "CLOSED_SAR"
    CLOSED_NO_ACTION = "CLOSED_NO_ACTION"


@dataclass
class CustomerRecord:
    """Core KYC customer record."""
    customer_id: str
    full_name: str
    date_of_birth: str                        # ISO 8601: YYYY-MM-DD
    nationality: str                           # ISO 3166-1 alpha-2
    country_of_residence: str                  # ISO 3166-1 alpha-2
    risk_tier: RiskTier
    onboarded_at: datetime
    last_reviewed_at: Optional[datetime]       # None if never reviewed
    lei: Optional[str] = None                  # Legal Entity Identifier (corporate)
    is_pep: bool = False
    is_sanctioned: bool = False
    screening_reference: Optional[str] = None  # Link to sanctions check record
    tags: list[str] = field(default_factory=list)

    def requires_enhanced_due_diligence(self) -> bool:
        """Returns True if EDD is required under standard frameworks."""
        return (
            self.risk_tier in {RiskTier.HIGH, RiskTier.VERY_HIGH}
            or self.is_pep
        )

    def is_overdue_for_review(self, as_of: datetime) -> bool:
        """
        Checks if the customer is overdue for periodic review.
        Review frequency is risk-tier dependent.
        """
        if self.last_reviewed_at is None:
            return True

        review_intervals = {
            RiskTier.LOW: 365 * 3,    # 3 years
            RiskTier.MEDIUM: 365,     # 1 year
            RiskTier.HIGH: 180,       # 6 months
            RiskTier.VERY_HIGH: 90,   # 3 months
            RiskTier.PROHIBITED: 0,
        }
        days_since_review = (as_of - self.last_reviewed_at).days
        threshold = review_intervals[self.risk_tier]
        return days_since_review >= threshold


@dataclass
class TransactionRecord:
    """A single financial transaction for monitoring purposes."""
    transaction_id: str
    customer_id: str
    amount: float
    currency: str                              # ISO 4217 code
    instrument: str                            # CASH | WIRE | ACH | CARD | CRYPTO
    tx_type: str                               # DEPOSIT | WITHDRAWAL | TRANSFER | PAYMENT
    timestamp: datetime
    destination_country: Optional[str]         # ISO 3166-1 alpha-2; None for domestic
    counterparty_id: Optional[str]
    reference: Optional[str]
    amount_usd: Optional[float] = None         # FX-converted amount; populated by pipeline
    # Computed velocity fields — populated by feature engineering pipeline
    velocity_24h: int = 0
    velocity_7d: int = 0
    amount_24h_usd: float = 0.0
    amount_30d_usd: float = 0.0


@dataclass
class ComplianceAlert:
    """A compliance alert generated by a monitoring system."""
    alert_id: str
    customer_id: str
    transaction_id: Optional[str]
    alert_type: str                            # AML | SANCTIONS | KYC_OVERDUE | etc.
    risk_score: float                          # 0.0–1.0
    status: AlertStatus
    generated_at: datetime
    assigned_to: Optional[str]                 # Analyst ID
    rule_ids: list[str] = field(default_factory=list)
    model_id: Optional[str] = None             # ML model that generated this
    narrative: str = ""
    closed_at: Optional[datetime] = None
    sar_filed: bool = False

Enums for Regulatory Categories

Enums prevent string-comparison errors — one of the most common sources of bugs in regulatory reporting pipelines.

from enum import Enum, auto


class InstrumentType(Enum):
    CASH = "CASH"
    WIRE = "WIRE"
    ACH = "ACH"
    CARD = "CARD"
    CRYPTO = "CRYPTO"
    CHECK = "CHECK"
    INTERNAL = "INTERNAL"


class SanctionsListType(Enum):
    OFAC_SDN = "OFAC_SDN"
    OFAC_CONS = "OFAC_CONS"
    EU_CONSOLIDATED = "EU_CONSOLIDATED"
    UN_CONSOLIDATED = "UN_CONSOLIDATED"
    HMT_UK = "HMT_UK"
    CUSTOM = "CUSTOM"


class DocumentType(Enum):
    PASSPORT = "PASSPORT"
    NATIONAL_ID = "NATIONAL_ID"
    DRIVERS_LICENSE = "DRIVERS_LICENSE"
    RESIDENCE_PERMIT = "RESIDENCE_PERMIT"
    UTILITY_BILL = "UTILITY_BILL"           # Proof of address
    COMPANY_REG = "COMPANY_REG"
    ARTICLES_OF_INCORP = "ARTICLES_OF_INCORP"


class FilingStatus(Enum):
    """Status of a regulatory filing (SAR, CTR, STR, etc.)."""
    DRAFT = "DRAFT"
    PENDING_APPROVAL = "PENDING_APPROVAL"
    APPROVED = "APPROVED"
    FILED = "FILED"
    ACKNOWLEDGED = "ACKNOWLEDGED"
    REJECTED = "REJECTED"
    AMENDED = "AMENDED"


# Pattern: use match/case with enums for clean dispatch
def get_review_period_days(tier: RiskTier) -> int:
    match tier:
        case RiskTier.LOW:
            return 1095   # 3 years
        case RiskTier.MEDIUM:
            return 365
        case RiskTier.HIGH:
            return 180
        case RiskTier.VERY_HIGH:
            return 90
        case RiskTier.PROHIBITED:
            return 0
        case _:
            raise ValueError(f"Unknown risk tier: {tier}")

Type Hints and Optional Fields

Regulatory data frequently has fields that are required for some customer types and absent for others. Handle this explicitly — do not treat missing and unknown the same way.

from typing import Optional, Union, Literal
from dataclasses import dataclass


# PATTERN: distinguish "not provided" from "confirmed absent"
# Using Optional[str] means we don't know
# Using "" (empty string) is ambiguous — avoid it for regulatory fields
# Using a sentinel value makes the state explicit

NOT_COLLECTED = "NOT_COLLECTED"
CONFIRMED_NONE = "CONFIRMED_NONE"


@dataclass
class BeneficialOwnerRecord:
    """
    Beneficial owner for corporate KYC.

    Fields that are Optional may be None because:
    (a) the data was not collected yet, or
    (b) the field genuinely does not apply to this entity type.
    Use ownership_pct = None for individuals, NOT 0.0 (which implies 0% ownership).
    """
    owner_id: str
    full_name: str
    nationality: str
    ownership_pct: Optional[float]              # None for control persons without equity
    is_control_person: bool
    date_of_birth: Optional[str]                # None if not yet collected
    document_type: Optional[DocumentType]
    document_number: Optional[str]
    document_expiry: Optional[str]
    verification_status: Literal[
        "NOT_STARTED", "IN_PROGRESS", "VERIFIED", "FAILED", "EXPIRED"
    ]


# PATTERN: typed helper for loading from raw dicts with safe defaults
def parse_optional_float(value: object) -> Optional[float]:
    """
    Convert a raw value to Optional[float].
    Returns None for None, empty strings, and non-numeric values.
    Raises ValueError for strings that look like numbers but can't be parsed.
    """
    if value is None:
        return None
    if isinstance(value, float | int):
        return float(value)
    if isinstance(value, str):
        stripped = value.strip()
        if stripped == "" or stripped.lower() in {"null", "none", "n/a", "na"}:
            return None
        return float(stripped)      # Deliberately raises ValueError for bad data
    raise TypeError(f"Cannot parse {type(value).__name__} as float")


# Usage
raw_ownership = {"pct": "24.5"}
pct = parse_optional_float(raw_ownership.get("pct"))  # → 24.5

raw_missing = {"pct": None}
pct = parse_optional_float(raw_missing.get("pct"))    # → None

A.2 Data Quality and Validation

Validating Regulatory Identifiers

The LEI is a 20-character alphanumeric code. Characters 19–20 are a MOD 97-10 checksum (same algorithm as IBAN).

import re


def validate_lei(lei: str) -> bool:
    """
    Validate a Legal Entity Identifier (LEI) per ISO 17442.

    Structure:
      Characters 1-4:   LOU (Local Operating Unit) prefix
      Characters 5-18:  Entity-specific code
      Characters 19-20: Two-digit MOD 97-10 checksum

    Returns True if the LEI is structurally valid.
    This does NOT confirm the LEI is registered — use the GLEIF API for that.
    """
    if not isinstance(lei, str):
        return False

    lei = lei.upper().strip()

    # Must be exactly 20 alphanumeric characters
    if not re.fullmatch(r'[A-Z0-9]{18}[0-9]{2}', lei):
        return False

    # MOD 97-10 checksum (ISO 7064)
    # Convert letters to digits: A=10, B=11, ..., Z=35
    numeric = ""
    for ch in lei:
        if ch.isdigit():
            numeric += ch
        else:
            numeric += str(ord(ch) - ord('A') + 10)

    return int(numeric) % 97 == 1


# Examples
assert validate_lei("2138004YJJDDEJN2B130") is True    # HSBC UK — valid
assert validate_lei("2138004YJJDDEJN2B131") is False   # Bad checksum
assert validate_lei("INVALID") is False


def validate_isin(isin: str) -> bool:
    """
    Validate an ISIN (International Securities Identification Number) per ISO 6166.

    Structure:
      Characters 1-2:   Country code (ISO 3166-1 alpha-2)
      Characters 3-11:  National security identifier (9 chars)
      Character 12:     Luhn check digit

    Returns True if structurally valid.
    """
    if not isinstance(isin, str):
        return False

    isin = isin.upper().strip()

    if not re.fullmatch(r'[A-Z]{2}[A-Z0-9]{9}[0-9]', isin):
        return False

    # Luhn algorithm with letter expansion
    # Convert each letter to two digits: A=10, B=11, ..., Z=35
    expanded = ""
    for ch in isin[:-1]:            # Exclude check digit
        if ch.isdigit():
            expanded += ch
        else:
            expanded += str(ord(ch) - ord('A') + 10)
    expanded += isin[-1]            # Add check digit back

    total = 0
    for i, digit in enumerate(reversed(expanded)):
        n = int(digit)
        if i % 2 == 1:              # Every second digit from right, doubled
            n *= 2
            if n > 9:
                n -= 9
        total += n

    return total % 10 == 0


def validate_bic(bic: str) -> bool:
    """
    Validate a BIC/SWIFT code per ISO 9362:2022.

    Structure:
      4 chars: Institution code (alpha)
      2 chars: Country code (alpha, ISO 3166-1)
      2 chars: Location code (alphanumeric)
      3 chars: Branch code (optional; 'XXX' = primary office)
    """
    if not isinstance(bic, str):
        return False
    bic = bic.upper().strip()
    return bool(re.fullmatch(r'[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?', bic))


def validate_mic(mic: str) -> bool:
    """
    Validate a Market Identifier Code per ISO 10383.
    A MIC is exactly 4 uppercase alphabetic characters.
    """
    if not isinstance(mic, str):
        return False
    return bool(re.fullmatch(r'[A-Z]{4}', mic.upper().strip()))

Date and Time in Regulatory Contexts

Regulatory systems must be precise about time zones. A transaction timestamped "2024-03-15 23:58:00" means different things in Tokyo, London, and New York. Always store and process datetimes as timezone-aware.

from datetime import datetime, timezone, timedelta, date
from zoneinfo import ZoneInfo
import re


# RULE: always store timestamps in UTC; convert to local time only for display
def utc_now() -> datetime:
    """Returns the current UTC datetime, timezone-aware."""
    return datetime.now(tz=timezone.utc)


def to_utc(dt: datetime, source_tz: str) -> datetime:
    """
    Convert a naive datetime from a known timezone to UTC.
    source_tz: IANA timezone string, e.g. "America/New_York", "Europe/London"
    """
    if dt.tzinfo is not None:
        raise ValueError("datetime is already timezone-aware; use astimezone(timezone.utc)")
    local_tz = ZoneInfo(source_tz)
    localized = dt.replace(tzinfo=local_tz)
    return localized.astimezone(timezone.utc)


def parse_iso8601(s: str) -> datetime:
    """
    Parse an ISO 8601 datetime string to a timezone-aware datetime.
    Accepts: '2024-03-15T14:30:00Z', '2024-03-15T14:30:00+00:00', '2024-03-15'
    """
    s = s.strip()
    # Date only → midnight UTC
    if re.fullmatch(r'\d{4}-\d{2}-\d{2}', s):
        return datetime.fromisoformat(s).replace(tzinfo=timezone.utc)
    # With trailing Z
    if s.endswith('Z'):
        s = s[:-1] + '+00:00'
    dt = datetime.fromisoformat(s)
    if dt.tzinfo is None:
        raise ValueError(f"Ambiguous timestamp (no timezone): {s!r}")
    return dt


def calculate_regulatory_deadline(
    base_date: date,
    calendar_days: int,
    exclude_weekends: bool = False,
    reference: str = ""
) -> date:
    """
    Calculate a regulatory deadline from a base date.

    Most regulatory deadlines are expressed in calendar days (not business days),
    but some (e.g., certain MiFID II reporting windows) exclude weekends.
    Always confirm with the specific regulation.

    Args:
        base_date:        The event date the clock starts from.
        calendar_days:    Number of days allowed.
        exclude_weekends: If True, skip Saturdays and Sundays.
        reference:        Regulatory citation (for audit trail; not used in calc).

    Returns: The deadline date.
    """
    if not exclude_weekends:
        return base_date + timedelta(days=calendar_days)

    current = base_date
    added = 0
    while added < calendar_days:
        current += timedelta(days=1)
        if current.weekday() < 5:    # Monday=0, Friday=4
            added += 1
    return current


# Common regulatory deadlines
def sar_filing_deadline(detection_date: date) -> date:
    """US BSA/FinCEN: SAR must be filed within 30 calendar days of detection."""
    return calculate_regulatory_deadline(detection_date, 30)


def ctr_filing_deadline(transaction_date: date) -> date:
    """US CTR: must be filed within 15 calendar days of the transaction."""
    return calculate_regulatory_deadline(transaction_date, 15)


def mifid_trade_report_deadline(trade_date: date) -> date:
    """MiFID II: trade reports due by end of next business day (T+1)."""
    return calculate_regulatory_deadline(trade_date, 1, exclude_weekends=True)

Missing Data Patterns

from dataclasses import dataclass
from typing import Optional
import pandas as pd


# PATTERN: validate required vs optional fields at ingestion boundary

def validate_customer_fields(record: dict) -> tuple[bool, list[str]]:
    """
    Validate a raw customer record at the ingestion boundary.
    Returns (is_valid, list_of_errors).

    Required fields must be present and non-empty.
    Optional fields may be None but must be the correct type if present.
    """
    errors: list[str] = []

    REQUIRED_FIELDS = {
        "customer_id": str,
        "full_name": str,
        "date_of_birth": str,
        "nationality": str,
        "country_of_residence": str,
    }

    OPTIONAL_FIELDS = {
        "lei": str,
        "tax_id": str,
        "phone": str,
        "email": str,
    }

    for field, expected_type in REQUIRED_FIELDS.items():
        value = record.get(field)
        if value is None or (isinstance(value, str) and not value.strip()):
            errors.append(f"Required field missing or empty: {field!r}")
        elif not isinstance(value, expected_type):
            errors.append(
                f"Field {field!r}: expected {expected_type.__name__}, "
                f"got {type(value).__name__}"
            )

    for field, expected_type in OPTIONAL_FIELDS.items():
        value = record.get(field)
        if value is not None and not isinstance(value, expected_type):
            errors.append(
                f"Optional field {field!r}: expected {expected_type.__name__} or None, "
                f"got {type(value).__name__}"
            )

    return len(errors) == 0, errors


# PATTERN: pandas — tracking missingness in regulatory DataFrames
def audit_missing_data(df: pd.DataFrame, required_cols: list[str]) -> pd.DataFrame:
    """
    Produce a missingness audit report for a regulatory DataFrame.
    Returns a DataFrame with one row per column showing counts and percentages.

    Dependencies: pandas
    """
    report_rows = []
    total = len(df)

    for col in df.columns:
        null_count = df[col].isna().sum()
        empty_count = (df[col] == "").sum() if df[col].dtype == object else 0
        missing = null_count + empty_count

        report_rows.append({
            "column": col,
            "is_required": col in required_cols,
            "total_rows": total,
            "null_count": int(null_count),
            "empty_count": int(empty_count),
            "missing_total": int(missing),
            "missing_pct": round(missing / total * 100, 2) if total > 0 else 0.0,
        })

    report = pd.DataFrame(report_rows)
    # Flag required fields with any missingness as a compliance concern
    report["requires_attention"] = (
        report["is_required"] & (report["missing_total"] > 0)
    )
    return report.sort_values(["requires_attention", "missing_pct"],
                               ascending=[False, False])

A.3 Financial Crime Patterns

Transaction Feature Engineering

Dependencies: pandas, numpy

import pandas as pd
import numpy as np
from datetime import timedelta


def compute_velocity_features(
    transactions: pd.DataFrame,
    customer_id_col: str = "customer_id",
    amount_col: str = "amount_usd",
    timestamp_col: str = "timestamp",
    windows: list[int] | None = None,
) -> pd.DataFrame:
    """
    Compute rolling velocity features for AML transaction monitoring.

    For each transaction, calculates the count and total amount of the same
    customer's transactions within lookback windows ending at (but not including)
    the current transaction.

    Args:
        transactions: DataFrame sorted by timestamp (oldest first).
        windows:      Lookback windows in hours. Default: [24, 168, 720]
                      (1 day, 7 days, 30 days).

    Returns: transactions DataFrame with added velocity columns.

    Note: This implementation uses a merge-asof approach for large DataFrames.
    For real-time scoring, use a rolling window cache (Redis, DynamoDB) instead.
    """
    if windows is None:
        windows = [24, 168, 720]

    df = transactions.copy()
    df[timestamp_col] = pd.to_datetime(df[timestamp_col], utc=True)
    df = df.sort_values([customer_id_col, timestamp_col]).reset_index(drop=True)

    for window_hours in windows:
        window_td = pd.Timedelta(hours=window_hours)
        col_count = f"velocity_{window_hours}h_count"
        col_amount = f"velocity_{window_hours}h_amount"

        counts = []
        amounts = []

        # Group by customer for efficiency
        for _, group in df.groupby(customer_id_col, sort=False):
            timestamps = group[timestamp_col].values
            amts = group[amount_col].values

            group_counts = []
            group_amounts = []

            for i in range(len(group)):
                ts = group.iloc[i][timestamp_col]
                cutoff = ts - window_td
                # Prior transactions within window (exclude current)
                prior_mask = (group[timestamp_col] >= cutoff) & (group[timestamp_col] < ts)
                group_counts.append(int(prior_mask.sum()))
                group_amounts.append(float(group.loc[prior_mask, amount_col].sum()))

            counts.extend(group_counts)
            amounts.extend(group_amounts)

        df[col_count] = counts
        df[col_amount] = amounts

    return df


def compute_amount_percentile(
    transactions: pd.DataFrame,
    customer_id_col: str = "customer_id",
    amount_col: str = "amount_usd",
    history_df: pd.DataFrame | None = None,
) -> pd.DataFrame:
    """
    Score each transaction's amount against that customer's historical distribution.
    Returns a percentile score from 0.0 (lowest) to 1.0 (highest in history).

    If history_df is provided, percentile is computed against historical data
    (not the current batch) — important for avoiding look-ahead bias.
    """
    df = transactions.copy()
    base = history_df if history_df is not None else df

    def pct_rank(group_df: pd.DataFrame, ref_df: pd.DataFrame) -> pd.Series:
        cid = group_df[customer_id_col].iloc[0]
        ref_amounts = ref_df[ref_df[customer_id_col] == cid][amount_col]
        if len(ref_amounts) < 5:
            return pd.Series([np.nan] * len(group_df), index=group_df.index)
        return group_df[amount_col].apply(
            lambda x: float((ref_amounts < x).mean())
        )

    df["amount_percentile"] = (
        df.groupby(customer_id_col, group_keys=False)
          .apply(lambda g: pct_rank(g, base))
    )
    return df

Scoring and Thresholding

from dataclasses import dataclass, field
from typing import Callable
import logging

logger = logging.getLogger(__name__)


@dataclass
class ScoringRule:
    """A single rule contributing to an aggregate risk score."""
    rule_id: str
    description: str
    weight: float                              # Contribution weight (0.0–1.0)
    evaluator: Callable[..., float]            # Returns 0.0–1.0
    category: str                              # VELOCITY | GEOGRAPHY | AMOUNT | BEHAVIOR


@dataclass
class ScoringConfig:
    """
    Threshold configuration for a scoring system.
    Threshold values should be reviewed and approved by compliance leadership.
    Changes should be documented in the model governance log.
    """
    alert_threshold: float = 0.65             # Score at or above this → alert
    sar_threshold: float = 0.85               # Score at or above this → SAR review
    suppression_threshold: float = 0.30       # Score below this → suppress alert
    version: str = "1.0"
    approved_by: str = ""
    approved_at: str = ""


class RiskScorer:
    """
    Configurable risk scoring engine for compliance monitoring.

    Design principles:
    - Weights must sum to 1.0 across all active rules.
    - Scores are normalized to [0.0, 1.0].
    - All score components are logged for audit purposes.
    """

    def __init__(self, rules: list[ScoringRule], config: ScoringConfig):
        self.rules = rules
        self.config = config
        self._validate_weights()

    def _validate_weights(self) -> None:
        total = sum(r.weight for r in self.rules)
        if not (0.999 <= total <= 1.001):
            raise ValueError(
                f"Rule weights must sum to 1.0; got {total:.4f}. "
                f"Adjust weights before deploying."
            )

    def score(self, record: dict) -> dict:
        """
        Score a transaction or customer record.

        Returns a dict with:
          - 'score': float (0.0–1.0)
          - 'components': list of (rule_id, raw_score, weighted_contribution)
          - 'recommendation': str
          - 'threshold_version': str
        """
        components = []
        total_score = 0.0

        for rule in self.rules:
            try:
                raw = float(rule.evaluator(record))
                raw = max(0.0, min(1.0, raw))    # Clamp to [0, 1]
                contribution = raw * rule.weight
                total_score += contribution
                components.append({
                    "rule_id": rule.rule_id,
                    "description": rule.description,
                    "raw_score": round(raw, 4),
                    "weight": rule.weight,
                    "contribution": round(contribution, 4),
                    "category": rule.category,
                })
            except Exception as exc:
                logger.error(
                    "Rule %s failed on record %s: %s",
                    rule.rule_id,
                    record.get("transaction_id", "UNKNOWN"),
                    exc,
                )
                # Fail-safe: treat failed rule as zero contribution
                components.append({
                    "rule_id": rule.rule_id,
                    "raw_score": 0.0,
                    "weight": rule.weight,
                    "contribution": 0.0,
                    "error": str(exc),
                })

        total_score = round(total_score, 4)

        if total_score >= self.config.sar_threshold:
            recommendation = "SAR_REVIEW"
        elif total_score >= self.config.alert_threshold:
            recommendation = "ALERT"
        elif total_score <= self.config.suppression_threshold:
            recommendation = "SUPPRESS"
        else:
            recommendation = "QUEUE"

        return {
            "score": total_score,
            "components": components,
            "recommendation": recommendation,
            "threshold_version": self.config.version,
        }

Alert Deduplication

Financial institutions often run multiple monitoring systems that may generate overlapping alerts for the same underlying activity.

from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import json


@dataclass
class AlertSignature:
    """Key fields used to identify duplicate alerts."""
    customer_id: str
    alert_type: str
    primary_transaction_id: str | None


def compute_alert_fingerprint(
    customer_id: str,
    alert_type: str,
    transaction_id: str | None,
    amount_bucket: str,            # Bucketed, not exact, to catch near-duplicates
) -> str:
    """
    Compute a deterministic fingerprint for duplicate detection.
    Two alerts with the same fingerprint are candidates for deduplication.
    """
    payload = {
        "customer_id": customer_id,
        "alert_type": alert_type,
        "transaction_id": transaction_id or "",
        "amount_bucket": amount_bucket,
    }
    canonical = json.dumps(payload, sort_keys=True)
    return hashlib.sha256(canonical.encode()).hexdigest()[:16]


def bucket_amount(amount_usd: float) -> str:
    """
    Bucket a dollar amount for fingerprinting purposes.
    This allows near-duplicate detection across amounts that differ by rounding.
    """
    # Round to nearest $500 bucket
    bucket = round(amount_usd / 500) * 500
    return f"${bucket:,.0f}"


def deduplicate_alerts(
    alerts: list[dict],
    lookback_hours: int = 24,
    as_of: datetime | None = None,
) -> tuple[list[dict], list[dict]]:
    """
    Identify duplicate alerts across monitoring systems.

    Deduplication logic:
    - Same customer + alert type + transaction ID within lookback window → duplicate
    - Where transaction ID is missing, also match on amount bucket

    Returns: (unique_alerts, duplicate_alerts)

    Note: This is a simple in-memory deduplicator for small batches.
    Production systems use a distributed cache (Redis) or database index.
    """
    as_of = as_of or datetime.now(tz=__import__("datetime").timezone.utc)
    cutoff = as_of - timedelta(hours=lookback_hours)

    seen_fingerprints: dict[str, str] = {}  # fingerprint → first alert_id
    unique: list[dict] = []
    duplicates: list[dict] = []

    for alert in sorted(alerts, key=lambda a: a["generated_at"]):
        generated_at = alert["generated_at"]
        if isinstance(generated_at, str):
            generated_at = datetime.fromisoformat(generated_at)

        if generated_at < cutoff:
            unique.append(alert)   # Outside window — treat as unique
            continue

        fp = compute_alert_fingerprint(
            customer_id=alert["customer_id"],
            alert_type=alert["alert_type"],
            transaction_id=alert.get("transaction_id"),
            amount_bucket=bucket_amount(alert.get("amount_usd", 0.0)),
        )

        if fp in seen_fingerprints:
            alert["duplicate_of"] = seen_fingerprints[fp]
            duplicates.append(alert)
        else:
            seen_fingerprints[fp] = alert["alert_id"]
            unique.append(alert)

    return unique, duplicates

A.4 Regulatory Reporting

Data Transformation Patterns

Dependencies: pandas

import pandas as pd
from datetime import datetime


# Field mapping: internal name → regulatory submission name
MIFID_FIELD_MAP = {
    "transaction_id":     "TransactionIdentification",
    "timestamp":          "ExecutionTimestamp",
    "instrument_isin":    "ISIN",
    "amount":             "Quantity",
    "price":              "Price",
    "currency":           "CurrencyCode",
    "trader_id":          "TraderId",
    "venue_mic":          "TradingVenue",
    "counterparty_lei":   "CounterpartyLEI",
    "buy_sell":           "BuySellIndicator",
}

BSA_CTR_FIELD_MAP = {
    "transaction_id":          "TRAN_ID",
    "customer_id":             "PERSON_ID",
    "full_name":               "PERSON_NAME",
    "amount":                  "CASH_AMT",
    "instrument":              "TRAN_TYPE",
    "timestamp":               "TRAN_DATE",
    "account_number":          "ACCT_NUM",
    "branch_id":               "BRANCH_NUM",
}


def apply_field_mapping(
    df: pd.DataFrame,
    field_map: dict[str, str],
    drop_unmapped: bool = True,
) -> pd.DataFrame:
    """
    Rename DataFrame columns for regulatory submission.

    Args:
        drop_unmapped: If True (default), drop columns not in the mapping.
                       Set False to retain all columns (useful for debugging).
    """
    # Only rename columns that exist in the DataFrame
    actual_map = {k: v for k, v in field_map.items() if k in df.columns}
    result = df.rename(columns=actual_map)

    if drop_unmapped:
        result = result[list(actual_map.values())]

    return result


def coerce_regulatory_types(df: pd.DataFrame, schema: dict[str, str]) -> pd.DataFrame:
    """
    Coerce DataFrame columns to types required by a regulatory schema.

    schema: dict mapping column name → type spec
      Type specs: 'str', 'float', 'int', 'date:YYYY-MM-DD', 'datetime:ISO8601'

    Returns a copy of df with types coerced.
    Raises ValueError if a required column is missing.
    """
    df = df.copy()

    for col, type_spec in schema.items():
        if col not in df.columns:
            raise ValueError(f"Required column missing from DataFrame: {col!r}")

        if type_spec == "str":
            df[col] = df[col].astype(str).str.strip()
        elif type_spec == "float":
            df[col] = pd.to_numeric(df[col], errors="coerce")
        elif type_spec == "int":
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")
        elif type_spec.startswith("date:"):
            fmt = type_spec.split(":", 1)[1]
            df[col] = pd.to_datetime(df[col]).dt.strftime(fmt)
        elif type_spec == "datetime:ISO8601":
            df[col] = pd.to_datetime(df[col], utc=True).dt.strftime(
                "%Y-%m-%dT%H:%M:%SZ"
            )

    return df

XBRL Basic Output

XBRL (eXtensible Business Reporting Language) is used for structured financial regulatory reporting, including COREP/FINREP under EU CRD and Solvency II reporting. The example below generates an inline XBRL stub.

from xml.etree.ElementTree import Element, SubElement, tostring, indent
import xml.etree.ElementTree as ET
from datetime import date


# XBRL namespaces (simplified — full submissions require taxonomy imports)
XBRL_NAMESPACES = {
    "xbrli":  "http://www.xbrl.org/2003/instance",
    "link":   "http://www.xbrl.org/2003/linkbase",
    "xlink":  "http://www.w3.org/1999/xlink",
    "iso4217": "http://www.xbrl.org/2003/iso4217",
    "corep":  "http://www.eba.europa.eu/xbrl/crr/dict/met",
}


def generate_xbrl_report(
    entity_lei: str,
    reporting_date: date,
    facts: dict[str, tuple[float, str]],       # metric → (value, unit)
    period_start: date | None = None,
) -> str:
    """
    Generate a minimal XBRL instance document for a regulatory report.

    Args:
        entity_lei:     Reporting entity's LEI.
        reporting_date: The period end date.
        facts:          Dict mapping XBRL concept name to (value, ISO 4217 currency).
                        Example: {"corep:OwnFunds": (5_000_000.0, "EUR")}
        period_start:   Period start date for duration contexts (optional).

    Returns: XBRL XML as a string.

    NOTE: This is a structural template. Actual regulatory submissions require
    taxonomy-specific element names, dimensional coordinates, and schema imports
    that vary by jurisdiction and report type. Always validate against the
    relevant taxonomy before submission.
    """
    # Build namespace map
    ns_map = {k: v for k, v in XBRL_NAMESPACES.items()}

    root = Element("xbrli:xbrl")
    for prefix, uri in ns_map.items():
        root.set(f"xmlns:{prefix}", uri)

    # Context: instant (for balance sheet items)
    ctx_instant = SubElement(root, "xbrli:context", id="ctx_instant")
    entity = SubElement(ctx_instant, "xbrli:entity")
    identifier = SubElement(entity, "xbrli:identifier", scheme="http://standards.iso.org/iso/17442")
    identifier.text = entity_lei
    period = SubElement(ctx_instant, "xbrli:period")
    instant_el = SubElement(period, "xbrli:instant")
    instant_el.text = reporting_date.isoformat()

    # Context: duration (for P&L items)
    if period_start:
        ctx_duration = SubElement(root, "xbrli:context", id="ctx_duration")
        entity2 = SubElement(ctx_duration, "xbrli:entity")
        identifier2 = SubElement(entity2, "xbrli:identifier",
                                  scheme="http://standards.iso.org/iso/17442")
        identifier2.text = entity_lei
        period2 = SubElement(ctx_duration, "xbrli:period")
        start_el = SubElement(period2, "xbrli:startDate")
        start_el.text = period_start.isoformat()
        end_el = SubElement(period2, "xbrli:endDate")
        end_el.text = reporting_date.isoformat()

    # Units
    seen_units: set[str] = set()
    for _, (_, unit) in facts.items():
        if unit not in seen_units:
            unit_el = SubElement(root, "xbrli:unit", id=f"unit_{unit}")
            measure = SubElement(unit_el, "xbrli:measure")
            measure.text = f"iso4217:{unit}"
            seen_units.add(unit)

    # Facts
    for concept, (value, unit) in facts.items():
        fact_el = SubElement(root, concept)
        fact_el.set("contextRef", "ctx_instant")
        fact_el.set("unitRef", f"unit_{unit}")
        fact_el.set("decimals", "2")
        fact_el.text = f"{value:.2f}"

    indent(root, space="  ")
    return '<?xml version="1.0" encoding="UTF-8"?>\n' + tostring(root, encoding="unicode")


# Usage example
if __name__ == "__main__":
    xml_output = generate_xbrl_report(
        entity_lei="2138004YJJDDEJN2B130",
        reporting_date=date(2024, 12, 31),
        period_start=date(2024, 1, 1),
        facts={
            "corep:OwnFunds":          (125_000_000.0, "EUR"),
            "corep:Tier1Capital":       (95_000_000.0, "EUR"),
            "corep:TotalRiskExposure": (980_000_000.0, "EUR"),
        },
    )
    print(xml_output[:800])    # First 800 chars for illustration

CSV Generation for Regulatory Submissions

import csv
import io
from datetime import date, datetime
from typing import Any


def write_regulatory_csv(
    records: list[dict],
    field_order: list[str],
    output_path: str | None = None,
    date_format: str = "%Y-%m-%d",
    datetime_format: str = "%Y-%m-%dT%H:%M:%SZ",
    encoding: str = "utf-8-sig",           # utf-8-sig adds BOM for Excel compatibility
    delimiter: str = ",",
    quoting: int = csv.QUOTE_ALL,          # Quote all fields for regulatory safety
) -> str:
    """
    Write a regulatory submission CSV with consistent formatting.

    Key differences from general-purpose CSV:
    - All fields quoted (avoids ambiguity with commas in names/references)
    - Dates formatted per the specific regulation's requirements
    - BOM included for Windows/Excel compatibility (common in regulatory portals)
    - UTF-8 encoding (required by most modern regulatory systems)
    - No trailing newline issues

    Returns the CSV as a string. If output_path is provided, also writes the file.
    """
    buffer = io.StringIO()
    writer = csv.DictWriter(
        buffer,
        fieldnames=field_order,
        delimiter=delimiter,
        quoting=quoting,
        extrasaction="ignore",          # Ignore extra keys not in field_order
        lineterminator="\r\n",          # CRLF: required by many regulatory portals
    )

    writer.writeheader()

    for record in records:
        row: dict[str, Any] = {}
        for field in field_order:
            value = record.get(field)
            if isinstance(value, datetime):
                row[field] = value.strftime(datetime_format)
            elif isinstance(value, date):
                row[field] = value.strftime(date_format)
            elif value is None:
                row[field] = ""
            elif isinstance(value, float):
                row[field] = f"{value:.2f}"
            else:
                row[field] = str(value)
        writer.writerow(row)

    content = buffer.getvalue()

    if output_path:
        with open(output_path, "w", encoding=encoding, newline="") as f:
            f.write(content)

    return content

A.5 NLP for Regulatory Text

Regex Patterns for Regulatory Documents

import re
from typing import Iterator


# ── Date extraction ───────────────────────────────────────────────────────────

# ISO 8601 dates
DATE_ISO = re.compile(r'\b(\d{4})-(\d{2})-(\d{2})\b')

# Written dates: "15 March 2024", "March 15, 2024", "15th March 2024"
DATE_WRITTEN = re.compile(
    r'\b(\d{1,2})(?:st|nd|rd|th)?\s+'
    r'(January|February|March|April|May|June|July|August|'
    r'September|October|November|December)\s+(\d{4})\b'
    r'|'
    r'\b(January|February|March|April|May|June|July|August|'
    r'September|October|November|December)\s+(\d{1,2})(?:st|nd|rd|th)?,?\s+(\d{4})\b',
    re.IGNORECASE
)

# Quarters: Q1 2024, Q3/2024
DATE_QUARTER = re.compile(r'\bQ([1-4])\s*[/\-]?\s*(\d{4})\b', re.IGNORECASE)


# ── Monetary amounts ──────────────────────────────────────────────────────────

# "$1,234,567.89", "USD 1.2m", "EUR 50bn", "£10,000"
AMOUNT_PATTERN = re.compile(
    r'(?:'
    r'(?P<symbol>[$£€¥])\s*'
    r'|(?P<currency>[A-Z]{3})\s+'
    r')'
    r'(?P<amount>[\d,]+(?:\.\d+)?)'
    r'(?:\s*(?P<multiplier>[kmb](?:illion|n)?))?\b',
    re.IGNORECASE
)


def extract_monetary_amounts(text: str) -> list[dict]:
    """
    Extract monetary amounts from regulatory text.
    Returns list of dicts with keys: raw, amount, currency, multiplier, amount_usd_approx.
    """
    MULTIPLIERS = {"k": 1_000, "m": 1_000_000, "mn": 1_000_000,
                   "b": 1_000_000_000, "bn": 1_000_000_000,
                   "billion": 1_000_000_000, "million": 1_000_000}

    SYMBOLS = {"$": "USD", "£": "GBP", "€": "EUR", "¥": "JPY"}

    results = []
    for match in AMOUNT_PATTERN.finditer(text):
        raw_amount = float(match.group("amount").replace(",", ""))
        multiplier_str = (match.group("multiplier") or "").lower()
        multiplier = MULTIPLIERS.get(multiplier_str, 1)
        actual_amount = raw_amount * multiplier

        symbol = match.group("symbol") or ""
        currency = SYMBOLS.get(symbol) or match.group("currency") or "UNKNOWN"

        results.append({
            "raw": match.group(0),
            "amount": actual_amount,
            "currency": currency,
        })

    return results


# ── Regulatory references ─────────────────────────────────────────────────────

# Article references: "Article 9", "Art. 14(3)(b)", "Articles 9 to 15"
ARTICLE_REF = re.compile(
    r'\bArt(?:icle)?s?\.\s*(\d+(?:\(\w+\))*(?:\s*(?:to|and|,)\s*\d+(?:\(\w+\))*)*)\b',
    re.IGNORECASE
)

# Regulation references: "Regulation (EU) 2024/1689", "2022/2554/EU"
EU_REGULATION = re.compile(
    r'Regulation\s+\(EU\)\s+(\d{4}/\d+)'
    r'|(\d{4}/\d+/EU)',
    re.IGNORECASE
)

# Directive references: "Directive 2014/65/EU", "MiFID II"
EU_DIRECTIVE = re.compile(
    r'Directive\s+\d{4}/\d+/EU'
    r'|(?:MiFID|AIFMD|UCITS|CRD|BRRD|PSD)\s*(?:II|IV|V|VI|\d+)?',
    re.IGNORECASE
)

# US CFR references: "12 CFR Part 1005", "12 C.F.R. § 226.5"
US_CFR = re.compile(
    r'(\d+)\s+C\.?F\.?R\.?\s+(?:Part\s+)?(\d+)(?:\.(\d+))?',
    re.IGNORECASE
)


def extract_regulatory_references(text: str) -> dict[str, list[str]]:
    """Extract all regulatory cross-references from a block of text."""
    return {
        "articles":         [m.group(0) for m in ARTICLE_REF.finditer(text)],
        "eu_regulations":   [m.group(0) for m in EU_REGULATION.finditer(text)],
        "eu_directives":    [m.group(0) for m in EU_DIRECTIVE.finditer(text)],
        "us_cfr":           [m.group(0) for m in US_CFR.finditer(text)],
    }

Text Preprocessing

import re
import string
from collections import Counter


# Regulatory stopwords — extend the standard NLP list with domain terms
# that appear frequently but carry little discriminative meaning
REGULATORY_STOPWORDS = {
    # Standard
    "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
    "of", "with", "by", "from", "as", "is", "are", "was", "were", "be",
    "been", "being", "have", "has", "had", "do", "does", "did", "will",
    "would", "could", "should", "may", "might", "shall", "that", "this",
    "these", "those", "it", "its", "which", "who", "whom", "whose",
    # Regulatory boilerplate
    "pursuant", "accordance", "herein", "thereof", "hereof", "whereas",
    "provided", "notwithstanding", "subject", "within", "without",
    "including", "including", "inter", "alia", "etc", "such", "any",
    "all", "each", "every", "following", "above", "below", "paragraph",
    "article", "section", "subsection", "clause", "annex", "schedule",
    "regulation", "directive", "act", "law", "rule", "requirement",
    "obligation", "provision", "measure", "member", "state", "states",
    "competent", "authority", "authorities", "institution", "institutions",
}


def preprocess_regulatory_text(
    text: str,
    remove_stopwords: bool = True,
    normalize_whitespace: bool = True,
    lowercase: bool = True,
    remove_punctuation: bool = False,      # Preserve for regulatory text by default
) -> list[str]:
    """
    Tokenize and preprocess regulatory text.

    Returns a list of tokens. Preserves hyphenated terms (e.g., "risk-based"),
    roman numerals (MiFID II), and numeric identifiers (2024/1689) by default.
    """
    if normalize_whitespace:
        text = re.sub(r'\s+', ' ', text).strip()

    if lowercase:
        text = text.lower()

    # Split on whitespace (preserve hyphens and slashes within tokens)
    tokens = re.split(r'[\s]+', text)

    # Remove leading/trailing punctuation from each token (but not internal)
    cleaned_tokens = []
    for tok in tokens:
        tok = tok.strip(string.punctuation)
        if tok:
            cleaned_tokens.append(tok)

    if remove_punctuation:
        cleaned_tokens = [
            t for t in cleaned_tokens
            if not all(c in string.punctuation for c in t)
        ]

    if remove_stopwords:
        cleaned_tokens = [
            t for t in cleaned_tokens
            if t.lower() not in REGULATORY_STOPWORDS
        ]

    return cleaned_tokens


def compute_term_frequencies(
    texts: list[str],
    top_n: int = 50,
) -> list[tuple[str, int]]:
    """
    Compute term frequencies across a corpus of regulatory texts.
    Useful for identifying domain vocabulary and building custom stopword lists.
    """
    all_tokens: list[str] = []
    for text in texts:
        all_tokens.extend(preprocess_regulatory_text(text, remove_stopwords=True))

    counter = Counter(all_tokens)
    return counter.most_common(top_n)

Basic Text Classification

import re
from dataclasses import dataclass


@dataclass
class DocumentClassification:
    document_type: str
    confidence: str                  # HIGH | MEDIUM | LOW
    matched_signals: list[str]


# Signal-based classifier for common regulatory document types
DOCUMENT_SIGNALS: dict[str, list[str]] = {
    "SUSPICIOUS_ACTIVITY_REPORT": [
        r"suspicious activity report",
        r"\bSAR\b",
        r"suspicious transaction",
        r"FinCEN Form 111",
        r"NCA SARs",
    ],
    "CURRENCY_TRANSACTION_REPORT": [
        r"currency transaction report",
        r"\bCTR\b",
        r"FinCEN Form 112",
        r"cash transaction report",
    ],
    "KNOW_YOUR_CUSTOMER": [
        r"\bKYC\b",
        r"know your customer",
        r"customer due diligence",
        r"\bCDD\b",
        r"enhanced due diligence",
        r"\bEDD\b",
    ],
    "SANCTIONS_SCREENING": [
        r"sanctions screen",
        r"\bOFAC\b",
        r"SDN list",
        r"designated person",
        r"asset freeze",
        r"\bPEP\b.*screen",
    ],
    "MODEL_VALIDATION": [
        r"model validation",
        r"model risk",
        r"\bSR 11-7\b",
        r"conceptual soundness",
        r"back-test",
        r"challenger model",
    ],
    "REGULATORY_REPORT": [
        r"\bCOREP\b",
        r"\bFINREP\b",
        r"\bMiFID\b.*report",
        r"regulatory capital",
        r"\bXBRL\b",
        r"prudential report",
    ],
}


def classify_regulatory_document(
    text: str,
    threshold: int = 2,
) -> DocumentClassification:
    """
    Classify a regulatory document by type using signal matching.

    Args:
        text:      The document text (or a substantial excerpt).
        threshold: Minimum number of signals to trigger a HIGH confidence match.

    Returns: DocumentClassification with type and confidence level.
    """
    text_lower = text.lower()
    scores: dict[str, list[str]] = {}

    for doc_type, patterns in DOCUMENT_SIGNALS.items():
        matched = []
        for pattern in patterns:
            if re.search(pattern, text, re.IGNORECASE):
                matched.append(pattern)
        if matched:
            scores[doc_type] = matched

    if not scores:
        return DocumentClassification(
            document_type="UNKNOWN",
            confidence="LOW",
            matched_signals=[],
        )

    # Pick the document type with the most signal matches
    best_type = max(scores, key=lambda t: len(scores[t]))
    match_count = len(scores[best_type])

    confidence = (
        "HIGH" if match_count >= threshold
        else "MEDIUM" if match_count == 1
        else "LOW"
    )

    return DocumentClassification(
        document_type=best_type,
        confidence=confidence,
        matched_signals=scores[best_type],
    )

A.6 Machine Learning Patterns

Train/Test Split with Temporal Data

Standard random train/test splits are incorrect for time-series compliance data. Using a random split allows the model to "see the future" during training, leading to inflated performance metrics that will not hold in production.

import pandas as pd
import numpy as np
from datetime import datetime


def temporal_train_test_split(
    df: pd.DataFrame,
    timestamp_col: str,
    test_size: float = 0.2,
    gap_days: int = 0,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Split a time-series DataFrame into train and test sets using a temporal cutoff.

    Unlike sklearn's train_test_split, this preserves temporal order:
    - All training data comes BEFORE the cutoff.
    - All test data comes AFTER the cutoff.
    - An optional gap prevents look-ahead through features derived
      from future transactions (e.g., 30-day rolling aggregates).

    Args:
        df:            DataFrame with a timestamp column.
        timestamp_col: Name of the timestamp column.
        test_size:     Proportion of data to use for testing (from the end).
        gap_days:      Days to exclude around the cutoff to prevent leakage.

    Returns: (train_df, test_df)

    WHY THIS MATTERS:
    If you randomly split, a training sample might use context (velocity features,
    peer-group comparisons) that was computed using transactions that appear in
    the test set. This inflates AUC and gives a false sense of model performance.
    The right approach is always temporal: train on past, evaluate on future.
    """
    df = df.copy()
    df[timestamp_col] = pd.to_datetime(df[timestamp_col], utc=True)
    df = df.sort_values(timestamp_col)

    n = len(df)
    cutoff_idx = int(n * (1 - test_size))
    cutoff_time = df.iloc[cutoff_idx][timestamp_col]

    gap = pd.Timedelta(days=gap_days)
    train = df[df[timestamp_col] < (cutoff_time - gap)]
    test = df[df[timestamp_col] > (cutoff_time + gap)]

    print(f"Train: {len(train):,} records "
          f"({df[timestamp_col].min().date()} — {train[timestamp_col].max().date()})")
    print(f"Gap:   {gap_days} days around {cutoff_time.date()}")
    print(f"Test:  {len(test):,} records "
          f"({test[timestamp_col].min().date()} — {df[timestamp_col].max().date()})")

    return train, test

Handling Class Imbalance

import numpy as np
import pandas as pd
from sklearn.utils.class_weight import compute_class_weight


def compute_compliance_class_weights(y: np.ndarray) -> dict[int, float]:
    """
    Compute class weights for imbalanced compliance datasets.

    In AML and fraud detection, suspicious cases are typically 0.1%–2% of
    transactions. Standard ML treats all errors equally; compliance requires
    that false negatives (missed suspicious activity) are penalized more
    heavily than false positives (excessive alerts).

    Pass the result to the class_weight parameter of sklearn estimators.
    """
    classes = np.unique(y)
    weights = compute_class_weight(class_weight="balanced", classes=classes, y=y)
    weight_dict = dict(zip(classes, weights))

    print("Class distribution:")
    for cls in classes:
        n = (y == cls).sum()
        label = "Suspicious" if cls == 1 else "Legitimate"
        print(f"  Class {cls} ({label}): {n:,} samples — weight: {weight_dict[cls]:.2f}")

    return weight_dict


def optimize_threshold_for_precision(
    y_true: np.ndarray,
    y_prob: np.ndarray,
    target_precision: float = 0.50,
) -> tuple[float, float, float]:
    """
    Find the probability threshold that achieves a target precision.

    In compliance contexts, you often need to maintain a minimum precision
    (alert quality) to keep analyst workloads manageable. This function
    finds the lowest threshold that achieves your target precision.

    Returns: (threshold, achieved_precision, achieved_recall)

    Example: target_precision=0.50 means at least 50% of alerts should be
    genuine suspicious activity. Anything below this creates too much
    analyst burden and is operationally unsustainable.
    """
    from sklearn.metrics import precision_recall_curve

    precisions, recalls, thresholds = precision_recall_curve(y_true, y_prob)

    # Find thresholds that meet the precision target
    valid_mask = precisions[:-1] >= target_precision
    if not valid_mask.any():
        print(f"WARNING: Target precision {target_precision:.0%} is not achievable.")
        print(f"Maximum achievable precision: {precisions.max():.2%}")
        return (1.0, float(precisions.max()), 0.0)

    # Among valid thresholds, take the lowest (maximizes recall)
    valid_thresholds = thresholds[valid_mask]
    valid_recalls = recalls[:-1][valid_mask]
    valid_precisions = precisions[:-1][valid_mask]

    best_idx = np.argmin(valid_thresholds)
    return (
        float(valid_thresholds[best_idx]),
        float(valid_precisions[best_idx]),
        float(valid_recalls[best_idx]),
    )

SHAP Values — Interpretation in Compliance Contexts

Dependencies: shap, scikit-learn

import numpy as np
import pandas as pd


def compute_shap_explanation(
    model,
    X: pd.DataFrame,
    instance_idx: int,
    feature_names: list[str] | None = None,
) -> dict:
    """
    Compute SHAP values for a single prediction and return a structured
    explanation suitable for adverse action notices or audit documentation.

    Dependencies: shap (pip install shap)

    Args:
        model:        A fitted sklearn-compatible model.
        X:            The feature DataFrame used for scoring.
        instance_idx: Index of the instance to explain.
        feature_names: Optional list of human-readable feature names.

    Returns: dict with base_value, prediction, and ranked feature contributions.

    COMPLIANCE NOTE: SHAP values explain individual predictions in terms of
    feature contributions. This is required for:
    - Adverse action notices under ECOA/Reg B (US)
    - Explainability under EU AI Act Article 13 (high-risk AI)
    - SR 11-7 model validation (model transparency)

    SHAP values are additive: base_value + sum(shap_values) = model output.
    A positive SHAP value means that feature INCREASED the risk score.
    A negative SHAP value means it DECREASED the risk score.
    """
    try:
        import shap
    except ImportError:
        raise ImportError("Install shap: pip install shap")

    names = feature_names or list(X.columns)

    # TreeExplainer for tree-based models; KernelExplainer is model-agnostic
    try:
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X)
        base_value = float(explainer.expected_value)
    except Exception:
        # Fall back to KernelExplainer for non-tree models
        background = shap.sample(X, min(100, len(X)))
        explainer = shap.KernelExplainer(model.predict_proba, background)
        shap_values = explainer.shap_values(X)[1]   # Class 1 (suspicious)
        base_value = float(explainer.expected_value[1])

    # For binary classifiers, shap_values may be a list [class0, class1]
    if isinstance(shap_values, list):
        instance_shap = shap_values[1][instance_idx]    # Suspicious class
    else:
        instance_shap = shap_values[instance_idx]

    instance_features = X.iloc[instance_idx]

    contributions = []
    for name, shap_val, feature_val in zip(names, instance_shap, instance_features):
        contributions.append({
            "feature": name,
            "feature_value": float(feature_val),
            "shap_contribution": round(float(shap_val), 4),
            "direction": "INCREASES_RISK" if shap_val > 0 else "DECREASES_RISK",
        })

    # Sort by absolute contribution (most impactful first)
    contributions.sort(key=lambda x: abs(x["shap_contribution"]), reverse=True)

    prediction = float(model.predict_proba(X.iloc[[instance_idx]])[0][1])

    return {
        "instance_idx": instance_idx,
        "base_value": round(base_value, 4),
        "prediction": round(prediction, 4),
        "shap_sum": round(float(sum(c["shap_contribution"] for c in contributions)), 4),
        "top_factors": contributions[:5],           # Top 5 for adverse action notices
        "all_factors": contributions,
        "interpretation": (
            f"Model base rate: {base_value:.1%}. "
            f"This transaction scored {prediction:.1%}. "
            f"The top driver was '{contributions[0]['feature']}' "
            f"({contributions[0]['direction'].replace('_', ' ').lower()}, "
            f"contribution: {contributions[0]['shap_contribution']:+.4f})."
        ),
    }

PSI Calculation

Population Stability Index (PSI) measures how much a model's input distribution has shifted between development and deployment. PSI > 0.25 typically triggers model review under SR 11-7.

import numpy as np
import pandas as pd


def calculate_psi(
    expected: np.ndarray,
    actual: np.ndarray,
    n_bins: int = 10,
    min_bin_pct: float = 0.0001,
) -> dict:
    """
    Calculate Population Stability Index (PSI) for a single feature or model score.

    PSI = sum((Actual% - Expected%) * ln(Actual% / Expected%))

    Interpretation (SR 11-7 / industry standard):
      PSI < 0.10:  No significant change. Model stable.
      PSI 0.10–0.25: Moderate change. Investigate if sustained.
      PSI > 0.25:  Major shift. Model review required.

    Args:
        expected: Distribution at model development time (reference population).
        actual:   Current distribution (monitoring population).
        n_bins:   Number of equal-width bins.
        min_bin_pct: Floor value to avoid log(0) errors.

    Returns: dict with psi, verdict, and per-bin breakdown.
    """
    # Create bins based on the expected distribution
    min_val = min(expected.min(), actual.min())
    max_val = max(expected.max(), actual.max())
    bin_edges = np.linspace(min_val, max_val, n_bins + 1)

    expected_counts, _ = np.histogram(expected, bins=bin_edges)
    actual_counts, _   = np.histogram(actual,   bins=bin_edges)

    expected_pct = expected_counts / len(expected)
    actual_pct   = actual_counts   / len(actual)

    # Apply floor to avoid division by zero and log(0)
    expected_pct = np.clip(expected_pct, min_bin_pct, None)
    actual_pct   = np.clip(actual_pct,   min_bin_pct, None)

    psi_per_bin = (actual_pct - expected_pct) * np.log(actual_pct / expected_pct)
    psi = float(psi_per_bin.sum())

    if psi < 0.10:
        verdict = "STABLE"
    elif psi < 0.25:
        verdict = "MODERATE_SHIFT"
    else:
        verdict = "MAJOR_SHIFT_REVIEW_REQUIRED"

    bin_breakdown = []
    for i in range(n_bins):
        bin_breakdown.append({
            "bin": i + 1,
            "range": f"[{bin_edges[i]:.3f}, {bin_edges[i+1]:.3f})",
            "expected_pct": round(float(expected_pct[i]) * 100, 2),
            "actual_pct":   round(float(actual_pct[i])   * 100, 2),
            "psi_contribution": round(float(psi_per_bin[i]), 5),
        })

    return {
        "psi": round(psi, 5),
        "verdict": verdict,
        "n_bins": n_bins,
        "expected_n": len(expected),
        "actual_n": len(actual),
        "bin_breakdown": bin_breakdown,
    }


def monitor_model_stability(
    model,
    X_dev: pd.DataFrame,
    X_current: pd.DataFrame,
    feature_names: list[str] | None = None,
    score_psi_only: bool = False,
) -> pd.DataFrame:
    """
    Run PSI monitoring across model output scores and (optionally) all input features.

    Returns a DataFrame with PSI and verdict for each monitored variable.
    Variables with MAJOR_SHIFT should be escalated to the model risk team.
    """
    results = []
    names = feature_names or list(X_dev.columns)

    # Always compute PSI on model output score
    scores_dev = model.predict_proba(X_dev)[:, 1]
    scores_cur = model.predict_proba(X_current)[:, 1]
    score_psi = calculate_psi(scores_dev, scores_cur)
    results.append({
        "variable": "MODEL_OUTPUT_SCORE",
        "psi": score_psi["psi"],
        "verdict": score_psi["verdict"],
    })

    if not score_psi_only:
        for name, col in zip(names, X_dev.columns):
            feature_psi = calculate_psi(
                X_dev[col].values.astype(float),
                X_current[col].values.astype(float),
            )
            results.append({
                "variable": name,
                "psi": feature_psi["psi"],
                "verdict": feature_psi["verdict"],
            })

    report = pd.DataFrame(results).sort_values("psi", ascending=False)
    return report

A.7 API Integration

REST API Client Pattern

import time
import logging
from typing import Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logger = logging.getLogger(__name__)


class RegTechAPIClient:
    """
    Base REST API client for RegTech integrations.

    Features:
    - Automatic retry with exponential backoff
    - Rate limiting (configurable requests per second)
    - Structured error handling
    - Request/response logging for audit purposes

    Extend this class for specific vendor APIs (sanctions providers,
    LEI registries, identity verification services, etc.).
    """

    def __init__(
        self,
        base_url: str,
        api_key: str,
        timeout: int = 30,
        max_retries: int = 3,
        rate_limit_rps: float = 10.0,
    ):
        self.base_url = base_url.rstrip("/")
        self.api_key = api_key
        self.timeout = timeout
        self._min_interval = 1.0 / rate_limit_rps
        self._last_request_time = 0.0

        # Configure session with retry logic
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
            "Accept": "application/json",
        })

        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,                  # Wait 1s, 2s, 4s between retries
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST"],
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("https://", adapter)
        self.session.mount("http://", adapter)

    def _rate_limit(self) -> None:
        """Enforce rate limiting between requests."""
        elapsed = time.monotonic() - self._last_request_time
        if elapsed < self._min_interval:
            time.sleep(self._min_interval - elapsed)
        self._last_request_time = time.monotonic()

    def _request(
        self,
        method: str,
        endpoint: str,
        **kwargs: Any,
    ) -> dict:
        """
        Make an HTTP request with rate limiting and error handling.
        Raises requests.HTTPError for 4xx/5xx responses.
        """
        self._rate_limit()
        url = f"{self.base_url}/{endpoint.lstrip('/')}"

        logger.debug("%s %s", method.upper(), url)
        response = self.session.request(
            method,
            url,
            timeout=self.timeout,
            **kwargs,
        )

        if not response.ok:
            logger.error(
                "API error %s %s: %d %s",
                method.upper(), url, response.status_code, response.text[:500]
            )
            response.raise_for_status()

        return response.json()

    def get(self, endpoint: str, params: dict | None = None) -> dict:
        return self._request("GET", endpoint, params=params)

    def post(self, endpoint: str, payload: dict) -> dict:
        return self._request("POST", endpoint, json=payload)


class GLEIFClient(RegTechAPIClient):
    """
    Client for the GLEIF (Global LEI Foundation) public API.
    Base URL: https://api.gleif.org/api/v1

    No API key required for the public GLEIF API.
    """

    def __init__(self):
        super().__init__(
            base_url="https://api.gleif.org/api/v1",
            api_key="",    # Public API
            rate_limit_rps=5.0,
        )
        self.session.headers.pop("Authorization", None)

    def lookup_lei(self, lei: str) -> dict:
        """Retrieve full LEI record including entity name and registration status."""
        response = self.get(f"/lei-records/{lei}")
        data = response.get("data", {})
        attributes = data.get("attributes", {})
        entity = attributes.get("entity", {})

        return {
            "lei": lei,
            "legal_name": entity.get("legalName", {}).get("name"),
            "status": attributes.get("registration", {}).get("status"),
            "jurisdiction": entity.get("jurisdiction"),
            "category": entity.get("category"),
        }

    def search_by_name(self, name: str, limit: int = 10) -> list[dict]:
        """Search for LEI records by entity name."""
        response = self.get("/lei-records", params={
            "filter[entity.names]": name,
            "page[size]": limit,
        })
        return [
            {
                "lei": item["id"],
                "legal_name": (item.get("attributes", {})
                               .get("entity", {})
                               .get("legalName", {})
                               .get("name")),
            }
            for item in response.get("data", [])
        ]

Webhook Handler Pattern

Dependencies: flask

import hashlib
import hmac
import json
import logging
from datetime import datetime, timezone

logger = logging.getLogger(__name__)


def create_webhook_app(secret_key: str, alert_processor):
    """
    Create a Flask webhook receiver for compliance alerts.

    Args:
        secret_key:      HMAC secret shared with the sending system.
        alert_processor: Callable that processes a validated alert dict.

    Returns: A Flask application.

    Security notes:
    - Always validate the HMAC signature before processing.
    - Return 200 quickly; do heavy processing asynchronously.
    - Log all received webhooks for audit purposes.

    Dependencies: flask (pip install flask)
    """
    try:
        from flask import Flask, request, jsonify
    except ImportError:
        raise ImportError("Install flask: pip install flask")

    app = Flask(__name__)

    def verify_signature(payload_bytes: bytes, signature_header: str) -> bool:
        """Verify HMAC-SHA256 signature from the sending system."""
        if not signature_header:
            return False
        expected = hmac.new(
            secret_key.encode(),
            payload_bytes,
            hashlib.sha256
        ).hexdigest()
        # Use constant-time comparison to prevent timing attacks
        return hmac.compare_digest(
            f"sha256={expected}",
            signature_header
        )

    @app.route("/webhooks/alerts", methods=["POST"])
    def receive_alert():
        """Webhook endpoint for incoming compliance alerts."""
        payload_bytes = request.get_data()
        signature = request.headers.get("X-Signature", "")

        # Always validate signature before processing
        if not verify_signature(payload_bytes, signature):
            logger.warning(
                "Webhook signature validation failed from %s",
                request.remote_addr
            )
            return jsonify({"error": "Invalid signature"}), 401

        try:
            alert = json.loads(payload_bytes)
        except json.JSONDecodeError as exc:
            logger.error("Webhook payload is not valid JSON: %s", exc)
            return jsonify({"error": "Invalid JSON"}), 400

        # Log receipt for audit trail
        logger.info(
            "Webhook received: alert_id=%s type=%s customer=%s at=%s",
            alert.get("alert_id", "UNKNOWN"),
            alert.get("alert_type", "UNKNOWN"),
            alert.get("customer_id", "UNKNOWN"),
            datetime.now(tz=timezone.utc).isoformat(),
        )

        # Return 200 immediately; process asynchronously
        try:
            alert_processor(alert)
        except Exception as exc:
            logger.error("Alert processing failed: %s", exc, exc_info=True)
            # Still return 200 to prevent retry storms;
            # failed alerts should be handled via dead-letter queue

        return jsonify({"status": "accepted"}), 200

    return app

Authentication Patterns

import time
import threading
from dataclasses import dataclass, field
import requests


@dataclass
class OAuthToken:
    """An OAuth 2.0 access token with expiry tracking."""
    access_token: str
    token_type: str
    expires_at: float                          # Unix timestamp
    scope: str = ""

    def is_expired(self, buffer_seconds: int = 60) -> bool:
        """Returns True if the token has expired or is within buffer_seconds of expiry."""
        return time.time() >= (self.expires_at - buffer_seconds)


class OAuth2ClientCredentials:
    """
    OAuth 2.0 client credentials flow token manager for RegTech APIs.

    Handles token caching, automatic refresh, and thread-safe access.
    Suitable for server-to-server API integrations (no user interaction).

    Common use cases:
    - Sanctions list provider APIs (Refinitiv World-Check, Dow Jones)
    - Identity verification APIs (Jumio, Onfido, Trulioo)
    - Regulatory data feeds

    Thread safety: This class is safe for concurrent use from multiple threads.
    """

    def __init__(
        self,
        token_url: str,
        client_id: str,
        client_secret: str,
        scope: str = "",
    ):
        self.token_url = token_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.scope = scope
        self._token: OAuthToken | None = None
        self._lock = threading.Lock()

    def _fetch_token(self) -> OAuthToken:
        """Fetch a fresh access token from the authorization server."""
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        }
        if self.scope:
            payload["scope"] = self.scope

        response = requests.post(
            self.token_url,
            data=payload,
            timeout=10,
        )
        response.raise_for_status()
        data = response.json()

        return OAuthToken(
            access_token=data["access_token"],
            token_type=data.get("token_type", "Bearer"),
            expires_at=time.time() + data.get("expires_in", 3600),
            scope=data.get("scope", self.scope),
        )

    def get_token(self) -> str:
        """
        Returns a valid access token, refreshing if necessary.
        Thread-safe.
        """
        with self._lock:
            if self._token is None or self._token.is_expired():
                self._token = self._fetch_token()
            return self._token.access_token

    def get_headers(self) -> dict[str, str]:
        """Returns Authorization headers for API requests."""
        return {"Authorization": f"Bearer {self.get_token()}"}

A.8 Audit Trail Patterns

Immutable Audit Log

import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any


@dataclass
class AuditEntry:
    """A single immutable entry in a compliance audit log."""
    sequence: int
    timestamp: str                             # ISO 8601 UTC
    actor: str                                 # User ID or system identifier
    action: str                                # SCREEN | SCORE | ALERT | CLOSE | OVERRIDE
    entity_type: str                           # CUSTOMER | TRANSACTION | ALERT
    entity_id: str
    payload: dict                              # Action-specific data
    previous_hash: str                         # Hash of previous entry (chain)
    entry_hash: str = field(init=False)

    def __post_init__(self):
        self.entry_hash = self._compute_hash()

    def _compute_hash(self) -> str:
        """
        Compute SHA-256 hash of this entry's content.
        The hash includes the previous entry's hash, creating a tamper-evident chain.
        """
        content = {
            "sequence": self.sequence,
            "timestamp": self.timestamp,
            "actor": self.actor,
            "action": self.action,
            "entity_type": self.entity_type,
            "entity_id": self.entity_id,
            "payload": self.payload,
            "previous_hash": self.previous_hash,
        }
        canonical = json.dumps(content, sort_keys=True, default=str)
        return hashlib.sha256(canonical.encode()).hexdigest()

    def to_dict(self) -> dict:
        return {
            "sequence": self.sequence,
            "timestamp": self.timestamp,
            "actor": self.actor,
            "action": self.action,
            "entity_type": self.entity_type,
            "entity_id": self.entity_id,
            "payload": self.payload,
            "previous_hash": self.previous_hash,
            "entry_hash": self.entry_hash,
        }


class ImmutableAuditLog:
    """
    Append-only audit log with hash-chaining for tamper detection.

    Each entry contains the hash of the previous entry. Any modification
    to a historical entry will invalidate all subsequent hashes, making
    tampering detectable.

    In production, persist entries to a write-once store (e.g., AWS WORM S3,
    Azure Immutable Blob Storage, or an append-only database table) and
    regularly archive the hash chain for regulatory evidence.

    Regulatory relevance:
    - GDPR Article 5(1)(f): integrity and confidentiality of processing
    - FCA SYSC 10A: audit trail requirements for MiFID firms
    - BSA: five-year record retention for SARs and CTRs
    - DORA Article 12: ICT-related incident log integrity
    """

    GENESIS_HASH = "0" * 64    # Starting hash for the first entry

    def __init__(self):
        self._entries: list[AuditEntry] = []

    @property
    def last_hash(self) -> str:
        if not self._entries:
            return self.GENESIS_HASH
        return self._entries[-1].entry_hash

    def append(
        self,
        actor: str,
        action: str,
        entity_type: str,
        entity_id: str,
        payload: dict,
    ) -> AuditEntry:
        """
        Append a new entry to the audit log.
        Once appended, entries cannot be modified.
        """
        entry = AuditEntry(
            sequence=len(self._entries),
            timestamp=datetime.now(tz=timezone.utc).isoformat(),
            actor=actor,
            action=action,
            entity_type=entity_type,
            entity_id=entity_id,
            payload=payload,
            previous_hash=self.last_hash,
        )
        self._entries.append(entry)
        return entry

    def verify_integrity(self) -> tuple[bool, int | None]:
        """
        Verify the hash chain is intact.
        Returns (is_valid, first_broken_sequence).
        """
        previous_hash = self.GENESIS_HASH
        for entry in self._entries:
            if entry.previous_hash != previous_hash:
                return False, entry.sequence
            expected_hash = entry._compute_hash()
            if entry.entry_hash != expected_hash:
                return False, entry.sequence
            previous_hash = entry.entry_hash
        return True, None

    def entries_for_entity(self, entity_id: str) -> list[dict]:
        """Retrieve all audit entries for a specific entity."""
        return [
            e.to_dict() for e in self._entries
            if e.entity_id == entity_id
        ]

Decision Recording

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
import json


@dataclass
class ComplianceDecision:
    """
    A fully recorded compliance decision for audit purposes.

    Captures everything a regulator would need to reconstruct
    why a decision was made: who made it, on what data, using
    what rules or model, and what the outcome was.

    This is distinct from a brief audit log entry — it preserves
    the full decision context, including the model version and
    all input features used at decision time.
    """
    decision_id: str
    decision_type: str                         # ALERT_TRIAGE | SAR_DECISION | KYC_APPROVAL
    entity_id: str
    entity_type: str                           # CUSTOMER | TRANSACTION | ALERT
    decided_by: str                            # Analyst ID or "SYSTEM" for automated
    decided_at: datetime
    outcome: str                               # ESCALATE | NO_ACTION | SAR_FILE | APPROVE
    rationale: str                             # Free text justification
    risk_score: float | None
    model_version: str | None
    input_features: dict                       # Snapshot of all model inputs
    rule_results: list[dict]                   # Results of each rule evaluation
    supporting_documents: list[str] = field(default_factory=list)
    overrides: list[dict] = field(default_factory=list)  # Any threshold overrides

    def to_audit_payload(self) -> dict:
        """Serialize to a dict suitable for the audit log payload."""
        return {
            "decision_id": self.decision_id,
            "decision_type": self.decision_type,
            "entity_id": self.entity_id,
            "entity_type": self.entity_type,
            "decided_by": self.decided_by,
            "decided_at": self.decided_at.isoformat(),
            "outcome": self.outcome,
            "rationale": self.rationale,
            "risk_score": self.risk_score,
            "model_version": self.model_version,
            "input_features": self.input_features,
            "rule_results": self.rule_results,
            "supporting_documents": self.supporting_documents,
            "overrides": self.overrides,
        }


def record_triage_decision(
    alert: "ComplianceAlert",
    analyst_id: str,
    outcome: str,
    rationale: str,
    audit_log: "ImmutableAuditLog",
    risk_score_breakdown: dict | None = None,
) -> ComplianceDecision:
    """
    Record the full context of an analyst's alert triage decision.

    Args:
        alert:                  The alert being triaged.
        analyst_id:             ID of the analyst making the decision.
        outcome:                One of: ESCALATE | SAR_FILE | NO_ACTION | MONITOR
        rationale:              Analyst's narrative justification (required).
        audit_log:              The system audit log.
        risk_score_breakdown:   Score component breakdown from the scoring engine.

    Returns: The recorded ComplianceDecision.
    """
    if not rationale.strip():
        raise ValueError(
            "Rationale is required for all compliance decisions. "
            "A blank rationale is not acceptable for regulatory purposes."
        )

    decision = ComplianceDecision(
        decision_id=f"DEC-{alert.alert_id}-{int(datetime.now().timestamp())}",
        decision_type="ALERT_TRIAGE",
        entity_id=alert.alert_id,
        entity_type="ALERT",
        decided_by=analyst_id,
        decided_at=datetime.now(tz=timezone.utc),
        outcome=outcome,
        rationale=rationale,
        risk_score=alert.risk_score,
        model_version=alert.model_id,
        input_features={"alert_type": alert.alert_type, "rule_ids": alert.rule_ids},
        rule_results=risk_score_breakdown.get("components", []) if risk_score_breakdown else [],
    )

    audit_log.append(
        actor=analyst_id,
        action="ALERT_TRIAGE",
        entity_type="ALERT",
        entity_id=alert.alert_id,
        payload=decision.to_audit_payload(),
    )

    return decision

A.9 Quick Reference

Common Regulatory Data Formats

Field Format Validation Rule Example
LEI 20-char alphanumeric MOD 97-10 checksum 2138004YJJDDEJN2B130
ISIN 12-char: 2 alpha + 9 alphanum + 1 digit Luhn checksum GB0002634946
BIC/SWIFT 8 or 11 chars: 4 alpha + 2 alpha + 2 alnum + (3 alnum) ISO 9362 format HBUKGB4B
MIC 4 uppercase alpha Exact 4 chars XLON
ISO 4217 3 uppercase alpha ISO 4217 list USD, EUR, GBP
ISO 3166-1 2 uppercase alpha ISO 3166-1 list US, GB, DE
Date (ISO 8601) YYYY-MM-DD Valid calendar date 2024-03-15
Datetime (UTC) YYYY-MM-DDTHH:MM:SSZ Timezone-aware UTC 2024-03-15T14:30:00Z
NCA Reference Varies by jurisdiction Regex per jurisdiction FCA/2024/001234
CUSIP 9-char: 6 alphanum + 2 alphanum + 1 digit Luhn-based checksum 594918104

Package Quick Reference

Use Case Package Key Functions / Classes
Data manipulation pandas DataFrame, read_csv, groupby, merge, to_datetime
Numerical computing numpy ndarray, histogram, clip, where, unique
Machine learning scikit-learn GradientBoostingClassifier, train_test_split, roc_auc_score
Class imbalance scikit-learn compute_class_weight, precision_recall_curve
Explainability shap TreeExplainer, KernelExplainer, shap_values
HTTP / API calls requests Session, HTTPAdapter, Retry
Webhook server flask Flask, request, jsonify
Date/time datetime, zoneinfo datetime, timezone, ZoneInfo, timedelta
Data validation built-in dataclass, Enum, type hints
Regex re compile, fullmatch, search, finditer
Hashing / audit hashlib, hmac sha256, new, compare_digest
XML / XBRL xml.etree.ElementTree Element, SubElement, tostring, indent
CSV output csv DictWriter, QUOTE_ALL
JSON json dumps, loads
Logging logging getLogger, basicConfig

Common Pitfalls

  1. Naive datetimes in regulatory data. A datetime without timezone information is ambiguous. Always store timestamps as UTC-aware (datetime.now(tz=timezone.utc)). Naive datetimes will cause subtle, hard-to-debug comparison errors when records arrive from different time zones.

  2. Using random train/test split on time-series data. Standard sklearn.model_selection.train_test_split uses a random shuffle, which allows the model to learn from "future" data during training. Always use a temporal split — train on earlier data, test on later data.

  3. String comparison for regulatory identifiers. Comparing LEIs or ISINs with == on raw strings fails silently for case mismatches or leading/trailing whitespace. Always normalize to .upper().strip() before comparison.

  4. Treating 0.0 and None as equivalent. In regulatory data, ownership_pct=0.0 (verified zero ownership) is legally distinct from ownership_pct=None (not yet collected). Do not coerce missing values to 0.

  5. Alert threshold as a technical decision. Setting the model score threshold that triggers an alert is a compliance and legal decision — not a data science optimization. The threshold determines what the institution "has reason to suspect" under the BSA and equivalent statutes. Document threshold decisions with compliance sign-off.

  6. Logging sensitive personal data. Logging full customer names, document numbers, or account details in plaintext violates GDPR Article 5 and most data protection laws. Log entity IDs and reference numbers only.

  7. Not validating checksums on regulatory identifiers. Accepting any 20-character string as an LEI, or any 12-character string as an ISIN, without checksum validation will cause reporting errors that can result in regulatory penalties.

  8. Mutable default arguments in dataclasses. Using list or dict as default values in dataclasses (without field(default_factory=...)) causes all instances to share the same list, leading to subtle data corruption in batch processing. Always use field(default_factory=list).

  9. Ignoring PSI drift until it is catastrophic. PSI monitoring should run on every model scoring cycle, not quarterly. A PSI that creeps from 0.05 to 0.20 over three months represents a model that is quietly degrading. Automate PSI alerts.

  10. Suppressing exceptions in scoring pipelines. Catching all exceptions and returning a default score (except Exception: return 0.0) silently converts system failures into "all-clear" outcomes — exactly the wrong behavior for a safety-critical compliance system. Log all failures, route to a dead-letter queue, and escalate persistent errors.