Appendix A: Python RegTech Reference
A practical coding reference for compliance professionals working with Python in RegTech contexts. This appendix collects the patterns, idioms, and snippets most relevant to the code examples in this textbook. It assumes you are comfortable writing Python and want quick, authoritative answers to RegTech-specific implementation questions — not a tutorial from first principles.
Requires: Python 3.10+. Optional dependencies noted per section.
A.1 Core Patterns
Dataclasses for Compliance Data Models
Dataclasses are the standard way to model compliance entities in this textbook. They provide typed fields, automatic __repr__, and slot-based memory efficiency — all useful when processing large regulatory datasets.
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from enum import Enum
class RiskTier(Enum):
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
VERY_HIGH = "VERY_HIGH"
PROHIBITED = "PROHIBITED"
class AlertStatus(Enum):
OPEN = "OPEN"
UNDER_REVIEW = "UNDER_REVIEW"
ESCALATED = "ESCALATED"
CLOSED_SAR = "CLOSED_SAR"
CLOSED_NO_ACTION = "CLOSED_NO_ACTION"
@dataclass
class CustomerRecord:
"""Core KYC customer record."""
customer_id: str
full_name: str
date_of_birth: str # ISO 8601: YYYY-MM-DD
nationality: str # ISO 3166-1 alpha-2
country_of_residence: str # ISO 3166-1 alpha-2
risk_tier: RiskTier
onboarded_at: datetime
last_reviewed_at: Optional[datetime] # None if never reviewed
lei: Optional[str] = None # Legal Entity Identifier (corporate)
is_pep: bool = False
is_sanctioned: bool = False
screening_reference: Optional[str] = None # Link to sanctions check record
tags: list[str] = field(default_factory=list)
def requires_enhanced_due_diligence(self) -> bool:
"""Returns True if EDD is required under standard frameworks."""
return (
self.risk_tier in {RiskTier.HIGH, RiskTier.VERY_HIGH}
or self.is_pep
)
def is_overdue_for_review(self, as_of: datetime) -> bool:
"""
Checks if the customer is overdue for periodic review.
Review frequency is risk-tier dependent.
"""
if self.last_reviewed_at is None:
return True
review_intervals = {
RiskTier.LOW: 365 * 3, # 3 years
RiskTier.MEDIUM: 365, # 1 year
RiskTier.HIGH: 180, # 6 months
RiskTier.VERY_HIGH: 90, # 3 months
RiskTier.PROHIBITED: 0,
}
days_since_review = (as_of - self.last_reviewed_at).days
threshold = review_intervals[self.risk_tier]
return days_since_review >= threshold
@dataclass
class TransactionRecord:
"""A single financial transaction for monitoring purposes."""
transaction_id: str
customer_id: str
amount: float
currency: str # ISO 4217 code
instrument: str # CASH | WIRE | ACH | CARD | CRYPTO
tx_type: str # DEPOSIT | WITHDRAWAL | TRANSFER | PAYMENT
timestamp: datetime
destination_country: Optional[str] # ISO 3166-1 alpha-2; None for domestic
counterparty_id: Optional[str]
reference: Optional[str]
amount_usd: Optional[float] = None # FX-converted amount; populated by pipeline
# Computed velocity fields — populated by feature engineering pipeline
velocity_24h: int = 0
velocity_7d: int = 0
amount_24h_usd: float = 0.0
amount_30d_usd: float = 0.0
@dataclass
class ComplianceAlert:
"""A compliance alert generated by a monitoring system."""
alert_id: str
customer_id: str
transaction_id: Optional[str]
alert_type: str # AML | SANCTIONS | KYC_OVERDUE | etc.
risk_score: float # 0.0–1.0
status: AlertStatus
generated_at: datetime
assigned_to: Optional[str] # Analyst ID
rule_ids: list[str] = field(default_factory=list)
model_id: Optional[str] = None # ML model that generated this
narrative: str = ""
closed_at: Optional[datetime] = None
sar_filed: bool = False
Enums for Regulatory Categories
Enums prevent string-comparison errors — one of the most common sources of bugs in regulatory reporting pipelines.
from enum import Enum, auto
class InstrumentType(Enum):
CASH = "CASH"
WIRE = "WIRE"
ACH = "ACH"
CARD = "CARD"
CRYPTO = "CRYPTO"
CHECK = "CHECK"
INTERNAL = "INTERNAL"
class SanctionsListType(Enum):
OFAC_SDN = "OFAC_SDN"
OFAC_CONS = "OFAC_CONS"
EU_CONSOLIDATED = "EU_CONSOLIDATED"
UN_CONSOLIDATED = "UN_CONSOLIDATED"
HMT_UK = "HMT_UK"
CUSTOM = "CUSTOM"
class DocumentType(Enum):
PASSPORT = "PASSPORT"
NATIONAL_ID = "NATIONAL_ID"
DRIVERS_LICENSE = "DRIVERS_LICENSE"
RESIDENCE_PERMIT = "RESIDENCE_PERMIT"
UTILITY_BILL = "UTILITY_BILL" # Proof of address
COMPANY_REG = "COMPANY_REG"
ARTICLES_OF_INCORP = "ARTICLES_OF_INCORP"
class FilingStatus(Enum):
"""Status of a regulatory filing (SAR, CTR, STR, etc.)."""
DRAFT = "DRAFT"
PENDING_APPROVAL = "PENDING_APPROVAL"
APPROVED = "APPROVED"
FILED = "FILED"
ACKNOWLEDGED = "ACKNOWLEDGED"
REJECTED = "REJECTED"
AMENDED = "AMENDED"
# Pattern: use match/case with enums for clean dispatch
def get_review_period_days(tier: RiskTier) -> int:
match tier:
case RiskTier.LOW:
return 1095 # 3 years
case RiskTier.MEDIUM:
return 365
case RiskTier.HIGH:
return 180
case RiskTier.VERY_HIGH:
return 90
case RiskTier.PROHIBITED:
return 0
case _:
raise ValueError(f"Unknown risk tier: {tier}")
Type Hints and Optional Fields
Regulatory data frequently has fields that are required for some customer types and absent for others. Handle this explicitly — do not treat missing and unknown the same way.
from typing import Optional, Union, Literal
from dataclasses import dataclass
# PATTERN: distinguish "not provided" from "confirmed absent"
# Using Optional[str] means we don't know
# Using "" (empty string) is ambiguous — avoid it for regulatory fields
# Using a sentinel value makes the state explicit
NOT_COLLECTED = "NOT_COLLECTED"
CONFIRMED_NONE = "CONFIRMED_NONE"
@dataclass
class BeneficialOwnerRecord:
"""
Beneficial owner for corporate KYC.
Fields that are Optional may be None because:
(a) the data was not collected yet, or
(b) the field genuinely does not apply to this entity type.
Use ownership_pct = None for individuals, NOT 0.0 (which implies 0% ownership).
"""
owner_id: str
full_name: str
nationality: str
ownership_pct: Optional[float] # None for control persons without equity
is_control_person: bool
date_of_birth: Optional[str] # None if not yet collected
document_type: Optional[DocumentType]
document_number: Optional[str]
document_expiry: Optional[str]
verification_status: Literal[
"NOT_STARTED", "IN_PROGRESS", "VERIFIED", "FAILED", "EXPIRED"
]
# PATTERN: typed helper for loading from raw dicts with safe defaults
def parse_optional_float(value: object) -> Optional[float]:
"""
Convert a raw value to Optional[float].
Returns None for None, empty strings, and non-numeric values.
Raises ValueError for strings that look like numbers but can't be parsed.
"""
if value is None:
return None
if isinstance(value, float | int):
return float(value)
if isinstance(value, str):
stripped = value.strip()
if stripped == "" or stripped.lower() in {"null", "none", "n/a", "na"}:
return None
return float(stripped) # Deliberately raises ValueError for bad data
raise TypeError(f"Cannot parse {type(value).__name__} as float")
# Usage
raw_ownership = {"pct": "24.5"}
pct = parse_optional_float(raw_ownership.get("pct")) # → 24.5
raw_missing = {"pct": None}
pct = parse_optional_float(raw_missing.get("pct")) # → None
A.2 Data Quality and Validation
Validating Regulatory Identifiers
LEI (Legal Entity Identifier) — ISO 17442
The LEI is a 20-character alphanumeric code. Characters 19–20 are a MOD 97-10 checksum (same algorithm as IBAN).
import re
def validate_lei(lei: str) -> bool:
"""
Validate a Legal Entity Identifier (LEI) per ISO 17442.
Structure:
Characters 1-4: LOU (Local Operating Unit) prefix
Characters 5-18: Entity-specific code
Characters 19-20: Two-digit MOD 97-10 checksum
Returns True if the LEI is structurally valid.
This does NOT confirm the LEI is registered — use the GLEIF API for that.
"""
if not isinstance(lei, str):
return False
lei = lei.upper().strip()
# Must be exactly 20 alphanumeric characters
if not re.fullmatch(r'[A-Z0-9]{18}[0-9]{2}', lei):
return False
# MOD 97-10 checksum (ISO 7064)
# Convert letters to digits: A=10, B=11, ..., Z=35
numeric = ""
for ch in lei:
if ch.isdigit():
numeric += ch
else:
numeric += str(ord(ch) - ord('A') + 10)
return int(numeric) % 97 == 1
# Examples
assert validate_lei("2138004YJJDDEJN2B130") is True # HSBC UK — valid
assert validate_lei("2138004YJJDDEJN2B131") is False # Bad checksum
assert validate_lei("INVALID") is False
def validate_isin(isin: str) -> bool:
"""
Validate an ISIN (International Securities Identification Number) per ISO 6166.
Structure:
Characters 1-2: Country code (ISO 3166-1 alpha-2)
Characters 3-11: National security identifier (9 chars)
Character 12: Luhn check digit
Returns True if structurally valid.
"""
if not isinstance(isin, str):
return False
isin = isin.upper().strip()
if not re.fullmatch(r'[A-Z]{2}[A-Z0-9]{9}[0-9]', isin):
return False
# Luhn algorithm with letter expansion
# Convert each letter to two digits: A=10, B=11, ..., Z=35
expanded = ""
for ch in isin[:-1]: # Exclude check digit
if ch.isdigit():
expanded += ch
else:
expanded += str(ord(ch) - ord('A') + 10)
expanded += isin[-1] # Add check digit back
total = 0
for i, digit in enumerate(reversed(expanded)):
n = int(digit)
if i % 2 == 1: # Every second digit from right, doubled
n *= 2
if n > 9:
n -= 9
total += n
return total % 10 == 0
def validate_bic(bic: str) -> bool:
"""
Validate a BIC/SWIFT code per ISO 9362:2022.
Structure:
4 chars: Institution code (alpha)
2 chars: Country code (alpha, ISO 3166-1)
2 chars: Location code (alphanumeric)
3 chars: Branch code (optional; 'XXX' = primary office)
"""
if not isinstance(bic, str):
return False
bic = bic.upper().strip()
return bool(re.fullmatch(r'[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?', bic))
def validate_mic(mic: str) -> bool:
"""
Validate a Market Identifier Code per ISO 10383.
A MIC is exactly 4 uppercase alphabetic characters.
"""
if not isinstance(mic, str):
return False
return bool(re.fullmatch(r'[A-Z]{4}', mic.upper().strip()))
Date and Time in Regulatory Contexts
Regulatory systems must be precise about time zones. A transaction timestamped "2024-03-15 23:58:00" means different things in Tokyo, London, and New York. Always store and process datetimes as timezone-aware.
from datetime import datetime, timezone, timedelta, date
from zoneinfo import ZoneInfo
import re
# RULE: always store timestamps in UTC; convert to local time only for display
def utc_now() -> datetime:
"""Returns the current UTC datetime, timezone-aware."""
return datetime.now(tz=timezone.utc)
def to_utc(dt: datetime, source_tz: str) -> datetime:
"""
Convert a naive datetime from a known timezone to UTC.
source_tz: IANA timezone string, e.g. "America/New_York", "Europe/London"
"""
if dt.tzinfo is not None:
raise ValueError("datetime is already timezone-aware; use astimezone(timezone.utc)")
local_tz = ZoneInfo(source_tz)
localized = dt.replace(tzinfo=local_tz)
return localized.astimezone(timezone.utc)
def parse_iso8601(s: str) -> datetime:
"""
Parse an ISO 8601 datetime string to a timezone-aware datetime.
Accepts: '2024-03-15T14:30:00Z', '2024-03-15T14:30:00+00:00', '2024-03-15'
"""
s = s.strip()
# Date only → midnight UTC
if re.fullmatch(r'\d{4}-\d{2}-\d{2}', s):
return datetime.fromisoformat(s).replace(tzinfo=timezone.utc)
# With trailing Z
if s.endswith('Z'):
s = s[:-1] + '+00:00'
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
raise ValueError(f"Ambiguous timestamp (no timezone): {s!r}")
return dt
def calculate_regulatory_deadline(
base_date: date,
calendar_days: int,
exclude_weekends: bool = False,
reference: str = ""
) -> date:
"""
Calculate a regulatory deadline from a base date.
Most regulatory deadlines are expressed in calendar days (not business days),
but some (e.g., certain MiFID II reporting windows) exclude weekends.
Always confirm with the specific regulation.
Args:
base_date: The event date the clock starts from.
calendar_days: Number of days allowed.
exclude_weekends: If True, skip Saturdays and Sundays.
reference: Regulatory citation (for audit trail; not used in calc).
Returns: The deadline date.
"""
if not exclude_weekends:
return base_date + timedelta(days=calendar_days)
current = base_date
added = 0
while added < calendar_days:
current += timedelta(days=1)
if current.weekday() < 5: # Monday=0, Friday=4
added += 1
return current
# Common regulatory deadlines
def sar_filing_deadline(detection_date: date) -> date:
"""US BSA/FinCEN: SAR must be filed within 30 calendar days of detection."""
return calculate_regulatory_deadline(detection_date, 30)
def ctr_filing_deadline(transaction_date: date) -> date:
"""US CTR: must be filed within 15 calendar days of the transaction."""
return calculate_regulatory_deadline(transaction_date, 15)
def mifid_trade_report_deadline(trade_date: date) -> date:
"""MiFID II: trade reports due by end of next business day (T+1)."""
return calculate_regulatory_deadline(trade_date, 1, exclude_weekends=True)
Missing Data Patterns
from dataclasses import dataclass
from typing import Optional
import pandas as pd
# PATTERN: validate required vs optional fields at ingestion boundary
def validate_customer_fields(record: dict) -> tuple[bool, list[str]]:
"""
Validate a raw customer record at the ingestion boundary.
Returns (is_valid, list_of_errors).
Required fields must be present and non-empty.
Optional fields may be None but must be the correct type if present.
"""
errors: list[str] = []
REQUIRED_FIELDS = {
"customer_id": str,
"full_name": str,
"date_of_birth": str,
"nationality": str,
"country_of_residence": str,
}
OPTIONAL_FIELDS = {
"lei": str,
"tax_id": str,
"phone": str,
"email": str,
}
for field, expected_type in REQUIRED_FIELDS.items():
value = record.get(field)
if value is None or (isinstance(value, str) and not value.strip()):
errors.append(f"Required field missing or empty: {field!r}")
elif not isinstance(value, expected_type):
errors.append(
f"Field {field!r}: expected {expected_type.__name__}, "
f"got {type(value).__name__}"
)
for field, expected_type in OPTIONAL_FIELDS.items():
value = record.get(field)
if value is not None and not isinstance(value, expected_type):
errors.append(
f"Optional field {field!r}: expected {expected_type.__name__} or None, "
f"got {type(value).__name__}"
)
return len(errors) == 0, errors
# PATTERN: pandas — tracking missingness in regulatory DataFrames
def audit_missing_data(df: pd.DataFrame, required_cols: list[str]) -> pd.DataFrame:
"""
Produce a missingness audit report for a regulatory DataFrame.
Returns a DataFrame with one row per column showing counts and percentages.
Dependencies: pandas
"""
report_rows = []
total = len(df)
for col in df.columns:
null_count = df[col].isna().sum()
empty_count = (df[col] == "").sum() if df[col].dtype == object else 0
missing = null_count + empty_count
report_rows.append({
"column": col,
"is_required": col in required_cols,
"total_rows": total,
"null_count": int(null_count),
"empty_count": int(empty_count),
"missing_total": int(missing),
"missing_pct": round(missing / total * 100, 2) if total > 0 else 0.0,
})
report = pd.DataFrame(report_rows)
# Flag required fields with any missingness as a compliance concern
report["requires_attention"] = (
report["is_required"] & (report["missing_total"] > 0)
)
return report.sort_values(["requires_attention", "missing_pct"],
ascending=[False, False])
A.3 Financial Crime Patterns
Transaction Feature Engineering
Dependencies: pandas, numpy
import pandas as pd
import numpy as np
from datetime import timedelta
def compute_velocity_features(
transactions: pd.DataFrame,
customer_id_col: str = "customer_id",
amount_col: str = "amount_usd",
timestamp_col: str = "timestamp",
windows: list[int] | None = None,
) -> pd.DataFrame:
"""
Compute rolling velocity features for AML transaction monitoring.
For each transaction, calculates the count and total amount of the same
customer's transactions within lookback windows ending at (but not including)
the current transaction.
Args:
transactions: DataFrame sorted by timestamp (oldest first).
windows: Lookback windows in hours. Default: [24, 168, 720]
(1 day, 7 days, 30 days).
Returns: transactions DataFrame with added velocity columns.
Note: This implementation uses a merge-asof approach for large DataFrames.
For real-time scoring, use a rolling window cache (Redis, DynamoDB) instead.
"""
if windows is None:
windows = [24, 168, 720]
df = transactions.copy()
df[timestamp_col] = pd.to_datetime(df[timestamp_col], utc=True)
df = df.sort_values([customer_id_col, timestamp_col]).reset_index(drop=True)
for window_hours in windows:
window_td = pd.Timedelta(hours=window_hours)
col_count = f"velocity_{window_hours}h_count"
col_amount = f"velocity_{window_hours}h_amount"
counts = []
amounts = []
# Group by customer for efficiency
for _, group in df.groupby(customer_id_col, sort=False):
timestamps = group[timestamp_col].values
amts = group[amount_col].values
group_counts = []
group_amounts = []
for i in range(len(group)):
ts = group.iloc[i][timestamp_col]
cutoff = ts - window_td
# Prior transactions within window (exclude current)
prior_mask = (group[timestamp_col] >= cutoff) & (group[timestamp_col] < ts)
group_counts.append(int(prior_mask.sum()))
group_amounts.append(float(group.loc[prior_mask, amount_col].sum()))
counts.extend(group_counts)
amounts.extend(group_amounts)
df[col_count] = counts
df[col_amount] = amounts
return df
def compute_amount_percentile(
transactions: pd.DataFrame,
customer_id_col: str = "customer_id",
amount_col: str = "amount_usd",
history_df: pd.DataFrame | None = None,
) -> pd.DataFrame:
"""
Score each transaction's amount against that customer's historical distribution.
Returns a percentile score from 0.0 (lowest) to 1.0 (highest in history).
If history_df is provided, percentile is computed against historical data
(not the current batch) — important for avoiding look-ahead bias.
"""
df = transactions.copy()
base = history_df if history_df is not None else df
def pct_rank(group_df: pd.DataFrame, ref_df: pd.DataFrame) -> pd.Series:
cid = group_df[customer_id_col].iloc[0]
ref_amounts = ref_df[ref_df[customer_id_col] == cid][amount_col]
if len(ref_amounts) < 5:
return pd.Series([np.nan] * len(group_df), index=group_df.index)
return group_df[amount_col].apply(
lambda x: float((ref_amounts < x).mean())
)
df["amount_percentile"] = (
df.groupby(customer_id_col, group_keys=False)
.apply(lambda g: pct_rank(g, base))
)
return df
Scoring and Thresholding
from dataclasses import dataclass, field
from typing import Callable
import logging
logger = logging.getLogger(__name__)
@dataclass
class ScoringRule:
"""A single rule contributing to an aggregate risk score."""
rule_id: str
description: str
weight: float # Contribution weight (0.0–1.0)
evaluator: Callable[..., float] # Returns 0.0–1.0
category: str # VELOCITY | GEOGRAPHY | AMOUNT | BEHAVIOR
@dataclass
class ScoringConfig:
"""
Threshold configuration for a scoring system.
Threshold values should be reviewed and approved by compliance leadership.
Changes should be documented in the model governance log.
"""
alert_threshold: float = 0.65 # Score at or above this → alert
sar_threshold: float = 0.85 # Score at or above this → SAR review
suppression_threshold: float = 0.30 # Score below this → suppress alert
version: str = "1.0"
approved_by: str = ""
approved_at: str = ""
class RiskScorer:
"""
Configurable risk scoring engine for compliance monitoring.
Design principles:
- Weights must sum to 1.0 across all active rules.
- Scores are normalized to [0.0, 1.0].
- All score components are logged for audit purposes.
"""
def __init__(self, rules: list[ScoringRule], config: ScoringConfig):
self.rules = rules
self.config = config
self._validate_weights()
def _validate_weights(self) -> None:
total = sum(r.weight for r in self.rules)
if not (0.999 <= total <= 1.001):
raise ValueError(
f"Rule weights must sum to 1.0; got {total:.4f}. "
f"Adjust weights before deploying."
)
def score(self, record: dict) -> dict:
"""
Score a transaction or customer record.
Returns a dict with:
- 'score': float (0.0–1.0)
- 'components': list of (rule_id, raw_score, weighted_contribution)
- 'recommendation': str
- 'threshold_version': str
"""
components = []
total_score = 0.0
for rule in self.rules:
try:
raw = float(rule.evaluator(record))
raw = max(0.0, min(1.0, raw)) # Clamp to [0, 1]
contribution = raw * rule.weight
total_score += contribution
components.append({
"rule_id": rule.rule_id,
"description": rule.description,
"raw_score": round(raw, 4),
"weight": rule.weight,
"contribution": round(contribution, 4),
"category": rule.category,
})
except Exception as exc:
logger.error(
"Rule %s failed on record %s: %s",
rule.rule_id,
record.get("transaction_id", "UNKNOWN"),
exc,
)
# Fail-safe: treat failed rule as zero contribution
components.append({
"rule_id": rule.rule_id,
"raw_score": 0.0,
"weight": rule.weight,
"contribution": 0.0,
"error": str(exc),
})
total_score = round(total_score, 4)
if total_score >= self.config.sar_threshold:
recommendation = "SAR_REVIEW"
elif total_score >= self.config.alert_threshold:
recommendation = "ALERT"
elif total_score <= self.config.suppression_threshold:
recommendation = "SUPPRESS"
else:
recommendation = "QUEUE"
return {
"score": total_score,
"components": components,
"recommendation": recommendation,
"threshold_version": self.config.version,
}
Alert Deduplication
Financial institutions often run multiple monitoring systems that may generate overlapping alerts for the same underlying activity.
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import json
@dataclass
class AlertSignature:
"""Key fields used to identify duplicate alerts."""
customer_id: str
alert_type: str
primary_transaction_id: str | None
def compute_alert_fingerprint(
customer_id: str,
alert_type: str,
transaction_id: str | None,
amount_bucket: str, # Bucketed, not exact, to catch near-duplicates
) -> str:
"""
Compute a deterministic fingerprint for duplicate detection.
Two alerts with the same fingerprint are candidates for deduplication.
"""
payload = {
"customer_id": customer_id,
"alert_type": alert_type,
"transaction_id": transaction_id or "",
"amount_bucket": amount_bucket,
}
canonical = json.dumps(payload, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()[:16]
def bucket_amount(amount_usd: float) -> str:
"""
Bucket a dollar amount for fingerprinting purposes.
This allows near-duplicate detection across amounts that differ by rounding.
"""
# Round to nearest $500 bucket
bucket = round(amount_usd / 500) * 500
return f"${bucket:,.0f}"
def deduplicate_alerts(
alerts: list[dict],
lookback_hours: int = 24,
as_of: datetime | None = None,
) -> tuple[list[dict], list[dict]]:
"""
Identify duplicate alerts across monitoring systems.
Deduplication logic:
- Same customer + alert type + transaction ID within lookback window → duplicate
- Where transaction ID is missing, also match on amount bucket
Returns: (unique_alerts, duplicate_alerts)
Note: This is a simple in-memory deduplicator for small batches.
Production systems use a distributed cache (Redis) or database index.
"""
as_of = as_of or datetime.now(tz=__import__("datetime").timezone.utc)
cutoff = as_of - timedelta(hours=lookback_hours)
seen_fingerprints: dict[str, str] = {} # fingerprint → first alert_id
unique: list[dict] = []
duplicates: list[dict] = []
for alert in sorted(alerts, key=lambda a: a["generated_at"]):
generated_at = alert["generated_at"]
if isinstance(generated_at, str):
generated_at = datetime.fromisoformat(generated_at)
if generated_at < cutoff:
unique.append(alert) # Outside window — treat as unique
continue
fp = compute_alert_fingerprint(
customer_id=alert["customer_id"],
alert_type=alert["alert_type"],
transaction_id=alert.get("transaction_id"),
amount_bucket=bucket_amount(alert.get("amount_usd", 0.0)),
)
if fp in seen_fingerprints:
alert["duplicate_of"] = seen_fingerprints[fp]
duplicates.append(alert)
else:
seen_fingerprints[fp] = alert["alert_id"]
unique.append(alert)
return unique, duplicates
A.4 Regulatory Reporting
Data Transformation Patterns
Dependencies: pandas
import pandas as pd
from datetime import datetime
# Field mapping: internal name → regulatory submission name
MIFID_FIELD_MAP = {
"transaction_id": "TransactionIdentification",
"timestamp": "ExecutionTimestamp",
"instrument_isin": "ISIN",
"amount": "Quantity",
"price": "Price",
"currency": "CurrencyCode",
"trader_id": "TraderId",
"venue_mic": "TradingVenue",
"counterparty_lei": "CounterpartyLEI",
"buy_sell": "BuySellIndicator",
}
BSA_CTR_FIELD_MAP = {
"transaction_id": "TRAN_ID",
"customer_id": "PERSON_ID",
"full_name": "PERSON_NAME",
"amount": "CASH_AMT",
"instrument": "TRAN_TYPE",
"timestamp": "TRAN_DATE",
"account_number": "ACCT_NUM",
"branch_id": "BRANCH_NUM",
}
def apply_field_mapping(
df: pd.DataFrame,
field_map: dict[str, str],
drop_unmapped: bool = True,
) -> pd.DataFrame:
"""
Rename DataFrame columns for regulatory submission.
Args:
drop_unmapped: If True (default), drop columns not in the mapping.
Set False to retain all columns (useful for debugging).
"""
# Only rename columns that exist in the DataFrame
actual_map = {k: v for k, v in field_map.items() if k in df.columns}
result = df.rename(columns=actual_map)
if drop_unmapped:
result = result[list(actual_map.values())]
return result
def coerce_regulatory_types(df: pd.DataFrame, schema: dict[str, str]) -> pd.DataFrame:
"""
Coerce DataFrame columns to types required by a regulatory schema.
schema: dict mapping column name → type spec
Type specs: 'str', 'float', 'int', 'date:YYYY-MM-DD', 'datetime:ISO8601'
Returns a copy of df with types coerced.
Raises ValueError if a required column is missing.
"""
df = df.copy()
for col, type_spec in schema.items():
if col not in df.columns:
raise ValueError(f"Required column missing from DataFrame: {col!r}")
if type_spec == "str":
df[col] = df[col].astype(str).str.strip()
elif type_spec == "float":
df[col] = pd.to_numeric(df[col], errors="coerce")
elif type_spec == "int":
df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")
elif type_spec.startswith("date:"):
fmt = type_spec.split(":", 1)[1]
df[col] = pd.to_datetime(df[col]).dt.strftime(fmt)
elif type_spec == "datetime:ISO8601":
df[col] = pd.to_datetime(df[col], utc=True).dt.strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
return df
XBRL Basic Output
XBRL (eXtensible Business Reporting Language) is used for structured financial regulatory reporting, including COREP/FINREP under EU CRD and Solvency II reporting. The example below generates an inline XBRL stub.
from xml.etree.ElementTree import Element, SubElement, tostring, indent
import xml.etree.ElementTree as ET
from datetime import date
# XBRL namespaces (simplified — full submissions require taxonomy imports)
XBRL_NAMESPACES = {
"xbrli": "http://www.xbrl.org/2003/instance",
"link": "http://www.xbrl.org/2003/linkbase",
"xlink": "http://www.w3.org/1999/xlink",
"iso4217": "http://www.xbrl.org/2003/iso4217",
"corep": "http://www.eba.europa.eu/xbrl/crr/dict/met",
}
def generate_xbrl_report(
entity_lei: str,
reporting_date: date,
facts: dict[str, tuple[float, str]], # metric → (value, unit)
period_start: date | None = None,
) -> str:
"""
Generate a minimal XBRL instance document for a regulatory report.
Args:
entity_lei: Reporting entity's LEI.
reporting_date: The period end date.
facts: Dict mapping XBRL concept name to (value, ISO 4217 currency).
Example: {"corep:OwnFunds": (5_000_000.0, "EUR")}
period_start: Period start date for duration contexts (optional).
Returns: XBRL XML as a string.
NOTE: This is a structural template. Actual regulatory submissions require
taxonomy-specific element names, dimensional coordinates, and schema imports
that vary by jurisdiction and report type. Always validate against the
relevant taxonomy before submission.
"""
# Build namespace map
ns_map = {k: v for k, v in XBRL_NAMESPACES.items()}
root = Element("xbrli:xbrl")
for prefix, uri in ns_map.items():
root.set(f"xmlns:{prefix}", uri)
# Context: instant (for balance sheet items)
ctx_instant = SubElement(root, "xbrli:context", id="ctx_instant")
entity = SubElement(ctx_instant, "xbrli:entity")
identifier = SubElement(entity, "xbrli:identifier", scheme="http://standards.iso.org/iso/17442")
identifier.text = entity_lei
period = SubElement(ctx_instant, "xbrli:period")
instant_el = SubElement(period, "xbrli:instant")
instant_el.text = reporting_date.isoformat()
# Context: duration (for P&L items)
if period_start:
ctx_duration = SubElement(root, "xbrli:context", id="ctx_duration")
entity2 = SubElement(ctx_duration, "xbrli:entity")
identifier2 = SubElement(entity2, "xbrli:identifier",
scheme="http://standards.iso.org/iso/17442")
identifier2.text = entity_lei
period2 = SubElement(ctx_duration, "xbrli:period")
start_el = SubElement(period2, "xbrli:startDate")
start_el.text = period_start.isoformat()
end_el = SubElement(period2, "xbrli:endDate")
end_el.text = reporting_date.isoformat()
# Units
seen_units: set[str] = set()
for _, (_, unit) in facts.items():
if unit not in seen_units:
unit_el = SubElement(root, "xbrli:unit", id=f"unit_{unit}")
measure = SubElement(unit_el, "xbrli:measure")
measure.text = f"iso4217:{unit}"
seen_units.add(unit)
# Facts
for concept, (value, unit) in facts.items():
fact_el = SubElement(root, concept)
fact_el.set("contextRef", "ctx_instant")
fact_el.set("unitRef", f"unit_{unit}")
fact_el.set("decimals", "2")
fact_el.text = f"{value:.2f}"
indent(root, space=" ")
return '<?xml version="1.0" encoding="UTF-8"?>\n' + tostring(root, encoding="unicode")
# Usage example
if __name__ == "__main__":
xml_output = generate_xbrl_report(
entity_lei="2138004YJJDDEJN2B130",
reporting_date=date(2024, 12, 31),
period_start=date(2024, 1, 1),
facts={
"corep:OwnFunds": (125_000_000.0, "EUR"),
"corep:Tier1Capital": (95_000_000.0, "EUR"),
"corep:TotalRiskExposure": (980_000_000.0, "EUR"),
},
)
print(xml_output[:800]) # First 800 chars for illustration
CSV Generation for Regulatory Submissions
import csv
import io
from datetime import date, datetime
from typing import Any
def write_regulatory_csv(
records: list[dict],
field_order: list[str],
output_path: str | None = None,
date_format: str = "%Y-%m-%d",
datetime_format: str = "%Y-%m-%dT%H:%M:%SZ",
encoding: str = "utf-8-sig", # utf-8-sig adds BOM for Excel compatibility
delimiter: str = ",",
quoting: int = csv.QUOTE_ALL, # Quote all fields for regulatory safety
) -> str:
"""
Write a regulatory submission CSV with consistent formatting.
Key differences from general-purpose CSV:
- All fields quoted (avoids ambiguity with commas in names/references)
- Dates formatted per the specific regulation's requirements
- BOM included for Windows/Excel compatibility (common in regulatory portals)
- UTF-8 encoding (required by most modern regulatory systems)
- No trailing newline issues
Returns the CSV as a string. If output_path is provided, also writes the file.
"""
buffer = io.StringIO()
writer = csv.DictWriter(
buffer,
fieldnames=field_order,
delimiter=delimiter,
quoting=quoting,
extrasaction="ignore", # Ignore extra keys not in field_order
lineterminator="\r\n", # CRLF: required by many regulatory portals
)
writer.writeheader()
for record in records:
row: dict[str, Any] = {}
for field in field_order:
value = record.get(field)
if isinstance(value, datetime):
row[field] = value.strftime(datetime_format)
elif isinstance(value, date):
row[field] = value.strftime(date_format)
elif value is None:
row[field] = ""
elif isinstance(value, float):
row[field] = f"{value:.2f}"
else:
row[field] = str(value)
writer.writerow(row)
content = buffer.getvalue()
if output_path:
with open(output_path, "w", encoding=encoding, newline="") as f:
f.write(content)
return content
A.5 NLP for Regulatory Text
Regex Patterns for Regulatory Documents
import re
from typing import Iterator
# ── Date extraction ───────────────────────────────────────────────────────────
# ISO 8601 dates
DATE_ISO = re.compile(r'\b(\d{4})-(\d{2})-(\d{2})\b')
# Written dates: "15 March 2024", "March 15, 2024", "15th March 2024"
DATE_WRITTEN = re.compile(
r'\b(\d{1,2})(?:st|nd|rd|th)?\s+'
r'(January|February|March|April|May|June|July|August|'
r'September|October|November|December)\s+(\d{4})\b'
r'|'
r'\b(January|February|March|April|May|June|July|August|'
r'September|October|November|December)\s+(\d{1,2})(?:st|nd|rd|th)?,?\s+(\d{4})\b',
re.IGNORECASE
)
# Quarters: Q1 2024, Q3/2024
DATE_QUARTER = re.compile(r'\bQ([1-4])\s*[/\-]?\s*(\d{4})\b', re.IGNORECASE)
# ── Monetary amounts ──────────────────────────────────────────────────────────
# "$1,234,567.89", "USD 1.2m", "EUR 50bn", "£10,000"
AMOUNT_PATTERN = re.compile(
r'(?:'
r'(?P<symbol>[$£€¥])\s*'
r'|(?P<currency>[A-Z]{3})\s+'
r')'
r'(?P<amount>[\d,]+(?:\.\d+)?)'
r'(?:\s*(?P<multiplier>[kmb](?:illion|n)?))?\b',
re.IGNORECASE
)
def extract_monetary_amounts(text: str) -> list[dict]:
"""
Extract monetary amounts from regulatory text.
Returns list of dicts with keys: raw, amount, currency, multiplier, amount_usd_approx.
"""
MULTIPLIERS = {"k": 1_000, "m": 1_000_000, "mn": 1_000_000,
"b": 1_000_000_000, "bn": 1_000_000_000,
"billion": 1_000_000_000, "million": 1_000_000}
SYMBOLS = {"$": "USD", "£": "GBP", "€": "EUR", "¥": "JPY"}
results = []
for match in AMOUNT_PATTERN.finditer(text):
raw_amount = float(match.group("amount").replace(",", ""))
multiplier_str = (match.group("multiplier") or "").lower()
multiplier = MULTIPLIERS.get(multiplier_str, 1)
actual_amount = raw_amount * multiplier
symbol = match.group("symbol") or ""
currency = SYMBOLS.get(symbol) or match.group("currency") or "UNKNOWN"
results.append({
"raw": match.group(0),
"amount": actual_amount,
"currency": currency,
})
return results
# ── Regulatory references ─────────────────────────────────────────────────────
# Article references: "Article 9", "Art. 14(3)(b)", "Articles 9 to 15"
ARTICLE_REF = re.compile(
r'\bArt(?:icle)?s?\.\s*(\d+(?:\(\w+\))*(?:\s*(?:to|and|,)\s*\d+(?:\(\w+\))*)*)\b',
re.IGNORECASE
)
# Regulation references: "Regulation (EU) 2024/1689", "2022/2554/EU"
EU_REGULATION = re.compile(
r'Regulation\s+\(EU\)\s+(\d{4}/\d+)'
r'|(\d{4}/\d+/EU)',
re.IGNORECASE
)
# Directive references: "Directive 2014/65/EU", "MiFID II"
EU_DIRECTIVE = re.compile(
r'Directive\s+\d{4}/\d+/EU'
r'|(?:MiFID|AIFMD|UCITS|CRD|BRRD|PSD)\s*(?:II|IV|V|VI|\d+)?',
re.IGNORECASE
)
# US CFR references: "12 CFR Part 1005", "12 C.F.R. § 226.5"
US_CFR = re.compile(
r'(\d+)\s+C\.?F\.?R\.?\s+(?:Part\s+)?(\d+)(?:\.(\d+))?',
re.IGNORECASE
)
def extract_regulatory_references(text: str) -> dict[str, list[str]]:
"""Extract all regulatory cross-references from a block of text."""
return {
"articles": [m.group(0) for m in ARTICLE_REF.finditer(text)],
"eu_regulations": [m.group(0) for m in EU_REGULATION.finditer(text)],
"eu_directives": [m.group(0) for m in EU_DIRECTIVE.finditer(text)],
"us_cfr": [m.group(0) for m in US_CFR.finditer(text)],
}
Text Preprocessing
import re
import string
from collections import Counter
# Regulatory stopwords — extend the standard NLP list with domain terms
# that appear frequently but carry little discriminative meaning
REGULATORY_STOPWORDS = {
# Standard
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
"of", "with", "by", "from", "as", "is", "are", "was", "were", "be",
"been", "being", "have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "shall", "that", "this",
"these", "those", "it", "its", "which", "who", "whom", "whose",
# Regulatory boilerplate
"pursuant", "accordance", "herein", "thereof", "hereof", "whereas",
"provided", "notwithstanding", "subject", "within", "without",
"including", "including", "inter", "alia", "etc", "such", "any",
"all", "each", "every", "following", "above", "below", "paragraph",
"article", "section", "subsection", "clause", "annex", "schedule",
"regulation", "directive", "act", "law", "rule", "requirement",
"obligation", "provision", "measure", "member", "state", "states",
"competent", "authority", "authorities", "institution", "institutions",
}
def preprocess_regulatory_text(
text: str,
remove_stopwords: bool = True,
normalize_whitespace: bool = True,
lowercase: bool = True,
remove_punctuation: bool = False, # Preserve for regulatory text by default
) -> list[str]:
"""
Tokenize and preprocess regulatory text.
Returns a list of tokens. Preserves hyphenated terms (e.g., "risk-based"),
roman numerals (MiFID II), and numeric identifiers (2024/1689) by default.
"""
if normalize_whitespace:
text = re.sub(r'\s+', ' ', text).strip()
if lowercase:
text = text.lower()
# Split on whitespace (preserve hyphens and slashes within tokens)
tokens = re.split(r'[\s]+', text)
# Remove leading/trailing punctuation from each token (but not internal)
cleaned_tokens = []
for tok in tokens:
tok = tok.strip(string.punctuation)
if tok:
cleaned_tokens.append(tok)
if remove_punctuation:
cleaned_tokens = [
t for t in cleaned_tokens
if not all(c in string.punctuation for c in t)
]
if remove_stopwords:
cleaned_tokens = [
t for t in cleaned_tokens
if t.lower() not in REGULATORY_STOPWORDS
]
return cleaned_tokens
def compute_term_frequencies(
texts: list[str],
top_n: int = 50,
) -> list[tuple[str, int]]:
"""
Compute term frequencies across a corpus of regulatory texts.
Useful for identifying domain vocabulary and building custom stopword lists.
"""
all_tokens: list[str] = []
for text in texts:
all_tokens.extend(preprocess_regulatory_text(text, remove_stopwords=True))
counter = Counter(all_tokens)
return counter.most_common(top_n)
Basic Text Classification
import re
from dataclasses import dataclass
@dataclass
class DocumentClassification:
document_type: str
confidence: str # HIGH | MEDIUM | LOW
matched_signals: list[str]
# Signal-based classifier for common regulatory document types
DOCUMENT_SIGNALS: dict[str, list[str]] = {
"SUSPICIOUS_ACTIVITY_REPORT": [
r"suspicious activity report",
r"\bSAR\b",
r"suspicious transaction",
r"FinCEN Form 111",
r"NCA SARs",
],
"CURRENCY_TRANSACTION_REPORT": [
r"currency transaction report",
r"\bCTR\b",
r"FinCEN Form 112",
r"cash transaction report",
],
"KNOW_YOUR_CUSTOMER": [
r"\bKYC\b",
r"know your customer",
r"customer due diligence",
r"\bCDD\b",
r"enhanced due diligence",
r"\bEDD\b",
],
"SANCTIONS_SCREENING": [
r"sanctions screen",
r"\bOFAC\b",
r"SDN list",
r"designated person",
r"asset freeze",
r"\bPEP\b.*screen",
],
"MODEL_VALIDATION": [
r"model validation",
r"model risk",
r"\bSR 11-7\b",
r"conceptual soundness",
r"back-test",
r"challenger model",
],
"REGULATORY_REPORT": [
r"\bCOREP\b",
r"\bFINREP\b",
r"\bMiFID\b.*report",
r"regulatory capital",
r"\bXBRL\b",
r"prudential report",
],
}
def classify_regulatory_document(
text: str,
threshold: int = 2,
) -> DocumentClassification:
"""
Classify a regulatory document by type using signal matching.
Args:
text: The document text (or a substantial excerpt).
threshold: Minimum number of signals to trigger a HIGH confidence match.
Returns: DocumentClassification with type and confidence level.
"""
text_lower = text.lower()
scores: dict[str, list[str]] = {}
for doc_type, patterns in DOCUMENT_SIGNALS.items():
matched = []
for pattern in patterns:
if re.search(pattern, text, re.IGNORECASE):
matched.append(pattern)
if matched:
scores[doc_type] = matched
if not scores:
return DocumentClassification(
document_type="UNKNOWN",
confidence="LOW",
matched_signals=[],
)
# Pick the document type with the most signal matches
best_type = max(scores, key=lambda t: len(scores[t]))
match_count = len(scores[best_type])
confidence = (
"HIGH" if match_count >= threshold
else "MEDIUM" if match_count == 1
else "LOW"
)
return DocumentClassification(
document_type=best_type,
confidence=confidence,
matched_signals=scores[best_type],
)
A.6 Machine Learning Patterns
Train/Test Split with Temporal Data
Standard random train/test splits are incorrect for time-series compliance data. Using a random split allows the model to "see the future" during training, leading to inflated performance metrics that will not hold in production.
import pandas as pd
import numpy as np
from datetime import datetime
def temporal_train_test_split(
df: pd.DataFrame,
timestamp_col: str,
test_size: float = 0.2,
gap_days: int = 0,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Split a time-series DataFrame into train and test sets using a temporal cutoff.
Unlike sklearn's train_test_split, this preserves temporal order:
- All training data comes BEFORE the cutoff.
- All test data comes AFTER the cutoff.
- An optional gap prevents look-ahead through features derived
from future transactions (e.g., 30-day rolling aggregates).
Args:
df: DataFrame with a timestamp column.
timestamp_col: Name of the timestamp column.
test_size: Proportion of data to use for testing (from the end).
gap_days: Days to exclude around the cutoff to prevent leakage.
Returns: (train_df, test_df)
WHY THIS MATTERS:
If you randomly split, a training sample might use context (velocity features,
peer-group comparisons) that was computed using transactions that appear in
the test set. This inflates AUC and gives a false sense of model performance.
The right approach is always temporal: train on past, evaluate on future.
"""
df = df.copy()
df[timestamp_col] = pd.to_datetime(df[timestamp_col], utc=True)
df = df.sort_values(timestamp_col)
n = len(df)
cutoff_idx = int(n * (1 - test_size))
cutoff_time = df.iloc[cutoff_idx][timestamp_col]
gap = pd.Timedelta(days=gap_days)
train = df[df[timestamp_col] < (cutoff_time - gap)]
test = df[df[timestamp_col] > (cutoff_time + gap)]
print(f"Train: {len(train):,} records "
f"({df[timestamp_col].min().date()} — {train[timestamp_col].max().date()})")
print(f"Gap: {gap_days} days around {cutoff_time.date()}")
print(f"Test: {len(test):,} records "
f"({test[timestamp_col].min().date()} — {df[timestamp_col].max().date()})")
return train, test
Handling Class Imbalance
import numpy as np
import pandas as pd
from sklearn.utils.class_weight import compute_class_weight
def compute_compliance_class_weights(y: np.ndarray) -> dict[int, float]:
"""
Compute class weights for imbalanced compliance datasets.
In AML and fraud detection, suspicious cases are typically 0.1%–2% of
transactions. Standard ML treats all errors equally; compliance requires
that false negatives (missed suspicious activity) are penalized more
heavily than false positives (excessive alerts).
Pass the result to the class_weight parameter of sklearn estimators.
"""
classes = np.unique(y)
weights = compute_class_weight(class_weight="balanced", classes=classes, y=y)
weight_dict = dict(zip(classes, weights))
print("Class distribution:")
for cls in classes:
n = (y == cls).sum()
label = "Suspicious" if cls == 1 else "Legitimate"
print(f" Class {cls} ({label}): {n:,} samples — weight: {weight_dict[cls]:.2f}")
return weight_dict
def optimize_threshold_for_precision(
y_true: np.ndarray,
y_prob: np.ndarray,
target_precision: float = 0.50,
) -> tuple[float, float, float]:
"""
Find the probability threshold that achieves a target precision.
In compliance contexts, you often need to maintain a minimum precision
(alert quality) to keep analyst workloads manageable. This function
finds the lowest threshold that achieves your target precision.
Returns: (threshold, achieved_precision, achieved_recall)
Example: target_precision=0.50 means at least 50% of alerts should be
genuine suspicious activity. Anything below this creates too much
analyst burden and is operationally unsustainable.
"""
from sklearn.metrics import precision_recall_curve
precisions, recalls, thresholds = precision_recall_curve(y_true, y_prob)
# Find thresholds that meet the precision target
valid_mask = precisions[:-1] >= target_precision
if not valid_mask.any():
print(f"WARNING: Target precision {target_precision:.0%} is not achievable.")
print(f"Maximum achievable precision: {precisions.max():.2%}")
return (1.0, float(precisions.max()), 0.0)
# Among valid thresholds, take the lowest (maximizes recall)
valid_thresholds = thresholds[valid_mask]
valid_recalls = recalls[:-1][valid_mask]
valid_precisions = precisions[:-1][valid_mask]
best_idx = np.argmin(valid_thresholds)
return (
float(valid_thresholds[best_idx]),
float(valid_precisions[best_idx]),
float(valid_recalls[best_idx]),
)
SHAP Values — Interpretation in Compliance Contexts
Dependencies: shap, scikit-learn
import numpy as np
import pandas as pd
def compute_shap_explanation(
model,
X: pd.DataFrame,
instance_idx: int,
feature_names: list[str] | None = None,
) -> dict:
"""
Compute SHAP values for a single prediction and return a structured
explanation suitable for adverse action notices or audit documentation.
Dependencies: shap (pip install shap)
Args:
model: A fitted sklearn-compatible model.
X: The feature DataFrame used for scoring.
instance_idx: Index of the instance to explain.
feature_names: Optional list of human-readable feature names.
Returns: dict with base_value, prediction, and ranked feature contributions.
COMPLIANCE NOTE: SHAP values explain individual predictions in terms of
feature contributions. This is required for:
- Adverse action notices under ECOA/Reg B (US)
- Explainability under EU AI Act Article 13 (high-risk AI)
- SR 11-7 model validation (model transparency)
SHAP values are additive: base_value + sum(shap_values) = model output.
A positive SHAP value means that feature INCREASED the risk score.
A negative SHAP value means it DECREASED the risk score.
"""
try:
import shap
except ImportError:
raise ImportError("Install shap: pip install shap")
names = feature_names or list(X.columns)
# TreeExplainer for tree-based models; KernelExplainer is model-agnostic
try:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X)
base_value = float(explainer.expected_value)
except Exception:
# Fall back to KernelExplainer for non-tree models
background = shap.sample(X, min(100, len(X)))
explainer = shap.KernelExplainer(model.predict_proba, background)
shap_values = explainer.shap_values(X)[1] # Class 1 (suspicious)
base_value = float(explainer.expected_value[1])
# For binary classifiers, shap_values may be a list [class0, class1]
if isinstance(shap_values, list):
instance_shap = shap_values[1][instance_idx] # Suspicious class
else:
instance_shap = shap_values[instance_idx]
instance_features = X.iloc[instance_idx]
contributions = []
for name, shap_val, feature_val in zip(names, instance_shap, instance_features):
contributions.append({
"feature": name,
"feature_value": float(feature_val),
"shap_contribution": round(float(shap_val), 4),
"direction": "INCREASES_RISK" if shap_val > 0 else "DECREASES_RISK",
})
# Sort by absolute contribution (most impactful first)
contributions.sort(key=lambda x: abs(x["shap_contribution"]), reverse=True)
prediction = float(model.predict_proba(X.iloc[[instance_idx]])[0][1])
return {
"instance_idx": instance_idx,
"base_value": round(base_value, 4),
"prediction": round(prediction, 4),
"shap_sum": round(float(sum(c["shap_contribution"] for c in contributions)), 4),
"top_factors": contributions[:5], # Top 5 for adverse action notices
"all_factors": contributions,
"interpretation": (
f"Model base rate: {base_value:.1%}. "
f"This transaction scored {prediction:.1%}. "
f"The top driver was '{contributions[0]['feature']}' "
f"({contributions[0]['direction'].replace('_', ' ').lower()}, "
f"contribution: {contributions[0]['shap_contribution']:+.4f})."
),
}
PSI Calculation
Population Stability Index (PSI) measures how much a model's input distribution has shifted between development and deployment. PSI > 0.25 typically triggers model review under SR 11-7.
import numpy as np
import pandas as pd
def calculate_psi(
expected: np.ndarray,
actual: np.ndarray,
n_bins: int = 10,
min_bin_pct: float = 0.0001,
) -> dict:
"""
Calculate Population Stability Index (PSI) for a single feature or model score.
PSI = sum((Actual% - Expected%) * ln(Actual% / Expected%))
Interpretation (SR 11-7 / industry standard):
PSI < 0.10: No significant change. Model stable.
PSI 0.10–0.25: Moderate change. Investigate if sustained.
PSI > 0.25: Major shift. Model review required.
Args:
expected: Distribution at model development time (reference population).
actual: Current distribution (monitoring population).
n_bins: Number of equal-width bins.
min_bin_pct: Floor value to avoid log(0) errors.
Returns: dict with psi, verdict, and per-bin breakdown.
"""
# Create bins based on the expected distribution
min_val = min(expected.min(), actual.min())
max_val = max(expected.max(), actual.max())
bin_edges = np.linspace(min_val, max_val, n_bins + 1)
expected_counts, _ = np.histogram(expected, bins=bin_edges)
actual_counts, _ = np.histogram(actual, bins=bin_edges)
expected_pct = expected_counts / len(expected)
actual_pct = actual_counts / len(actual)
# Apply floor to avoid division by zero and log(0)
expected_pct = np.clip(expected_pct, min_bin_pct, None)
actual_pct = np.clip(actual_pct, min_bin_pct, None)
psi_per_bin = (actual_pct - expected_pct) * np.log(actual_pct / expected_pct)
psi = float(psi_per_bin.sum())
if psi < 0.10:
verdict = "STABLE"
elif psi < 0.25:
verdict = "MODERATE_SHIFT"
else:
verdict = "MAJOR_SHIFT_REVIEW_REQUIRED"
bin_breakdown = []
for i in range(n_bins):
bin_breakdown.append({
"bin": i + 1,
"range": f"[{bin_edges[i]:.3f}, {bin_edges[i+1]:.3f})",
"expected_pct": round(float(expected_pct[i]) * 100, 2),
"actual_pct": round(float(actual_pct[i]) * 100, 2),
"psi_contribution": round(float(psi_per_bin[i]), 5),
})
return {
"psi": round(psi, 5),
"verdict": verdict,
"n_bins": n_bins,
"expected_n": len(expected),
"actual_n": len(actual),
"bin_breakdown": bin_breakdown,
}
def monitor_model_stability(
model,
X_dev: pd.DataFrame,
X_current: pd.DataFrame,
feature_names: list[str] | None = None,
score_psi_only: bool = False,
) -> pd.DataFrame:
"""
Run PSI monitoring across model output scores and (optionally) all input features.
Returns a DataFrame with PSI and verdict for each monitored variable.
Variables with MAJOR_SHIFT should be escalated to the model risk team.
"""
results = []
names = feature_names or list(X_dev.columns)
# Always compute PSI on model output score
scores_dev = model.predict_proba(X_dev)[:, 1]
scores_cur = model.predict_proba(X_current)[:, 1]
score_psi = calculate_psi(scores_dev, scores_cur)
results.append({
"variable": "MODEL_OUTPUT_SCORE",
"psi": score_psi["psi"],
"verdict": score_psi["verdict"],
})
if not score_psi_only:
for name, col in zip(names, X_dev.columns):
feature_psi = calculate_psi(
X_dev[col].values.astype(float),
X_current[col].values.astype(float),
)
results.append({
"variable": name,
"psi": feature_psi["psi"],
"verdict": feature_psi["verdict"],
})
report = pd.DataFrame(results).sort_values("psi", ascending=False)
return report
A.7 API Integration
REST API Client Pattern
import time
import logging
from typing import Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger(__name__)
class RegTechAPIClient:
"""
Base REST API client for RegTech integrations.
Features:
- Automatic retry with exponential backoff
- Rate limiting (configurable requests per second)
- Structured error handling
- Request/response logging for audit purposes
Extend this class for specific vendor APIs (sanctions providers,
LEI registries, identity verification services, etc.).
"""
def __init__(
self,
base_url: str,
api_key: str,
timeout: int = 30,
max_retries: int = 3,
rate_limit_rps: float = 10.0,
):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
self._min_interval = 1.0 / rate_limit_rps
self._last_request_time = 0.0
# Configure session with retry logic
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json",
})
retry_strategy = Retry(
total=max_retries,
backoff_factor=1, # Wait 1s, 2s, 4s between retries
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
def _rate_limit(self) -> None:
"""Enforce rate limiting between requests."""
elapsed = time.monotonic() - self._last_request_time
if elapsed < self._min_interval:
time.sleep(self._min_interval - elapsed)
self._last_request_time = time.monotonic()
def _request(
self,
method: str,
endpoint: str,
**kwargs: Any,
) -> dict:
"""
Make an HTTP request with rate limiting and error handling.
Raises requests.HTTPError for 4xx/5xx responses.
"""
self._rate_limit()
url = f"{self.base_url}/{endpoint.lstrip('/')}"
logger.debug("%s %s", method.upper(), url)
response = self.session.request(
method,
url,
timeout=self.timeout,
**kwargs,
)
if not response.ok:
logger.error(
"API error %s %s: %d %s",
method.upper(), url, response.status_code, response.text[:500]
)
response.raise_for_status()
return response.json()
def get(self, endpoint: str, params: dict | None = None) -> dict:
return self._request("GET", endpoint, params=params)
def post(self, endpoint: str, payload: dict) -> dict:
return self._request("POST", endpoint, json=payload)
class GLEIFClient(RegTechAPIClient):
"""
Client for the GLEIF (Global LEI Foundation) public API.
Base URL: https://api.gleif.org/api/v1
No API key required for the public GLEIF API.
"""
def __init__(self):
super().__init__(
base_url="https://api.gleif.org/api/v1",
api_key="", # Public API
rate_limit_rps=5.0,
)
self.session.headers.pop("Authorization", None)
def lookup_lei(self, lei: str) -> dict:
"""Retrieve full LEI record including entity name and registration status."""
response = self.get(f"/lei-records/{lei}")
data = response.get("data", {})
attributes = data.get("attributes", {})
entity = attributes.get("entity", {})
return {
"lei": lei,
"legal_name": entity.get("legalName", {}).get("name"),
"status": attributes.get("registration", {}).get("status"),
"jurisdiction": entity.get("jurisdiction"),
"category": entity.get("category"),
}
def search_by_name(self, name: str, limit: int = 10) -> list[dict]:
"""Search for LEI records by entity name."""
response = self.get("/lei-records", params={
"filter[entity.names]": name,
"page[size]": limit,
})
return [
{
"lei": item["id"],
"legal_name": (item.get("attributes", {})
.get("entity", {})
.get("legalName", {})
.get("name")),
}
for item in response.get("data", [])
]
Webhook Handler Pattern
Dependencies: flask
import hashlib
import hmac
import json
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
def create_webhook_app(secret_key: str, alert_processor):
"""
Create a Flask webhook receiver for compliance alerts.
Args:
secret_key: HMAC secret shared with the sending system.
alert_processor: Callable that processes a validated alert dict.
Returns: A Flask application.
Security notes:
- Always validate the HMAC signature before processing.
- Return 200 quickly; do heavy processing asynchronously.
- Log all received webhooks for audit purposes.
Dependencies: flask (pip install flask)
"""
try:
from flask import Flask, request, jsonify
except ImportError:
raise ImportError("Install flask: pip install flask")
app = Flask(__name__)
def verify_signature(payload_bytes: bytes, signature_header: str) -> bool:
"""Verify HMAC-SHA256 signature from the sending system."""
if not signature_header:
return False
expected = hmac.new(
secret_key.encode(),
payload_bytes,
hashlib.sha256
).hexdigest()
# Use constant-time comparison to prevent timing attacks
return hmac.compare_digest(
f"sha256={expected}",
signature_header
)
@app.route("/webhooks/alerts", methods=["POST"])
def receive_alert():
"""Webhook endpoint for incoming compliance alerts."""
payload_bytes = request.get_data()
signature = request.headers.get("X-Signature", "")
# Always validate signature before processing
if not verify_signature(payload_bytes, signature):
logger.warning(
"Webhook signature validation failed from %s",
request.remote_addr
)
return jsonify({"error": "Invalid signature"}), 401
try:
alert = json.loads(payload_bytes)
except json.JSONDecodeError as exc:
logger.error("Webhook payload is not valid JSON: %s", exc)
return jsonify({"error": "Invalid JSON"}), 400
# Log receipt for audit trail
logger.info(
"Webhook received: alert_id=%s type=%s customer=%s at=%s",
alert.get("alert_id", "UNKNOWN"),
alert.get("alert_type", "UNKNOWN"),
alert.get("customer_id", "UNKNOWN"),
datetime.now(tz=timezone.utc).isoformat(),
)
# Return 200 immediately; process asynchronously
try:
alert_processor(alert)
except Exception as exc:
logger.error("Alert processing failed: %s", exc, exc_info=True)
# Still return 200 to prevent retry storms;
# failed alerts should be handled via dead-letter queue
return jsonify({"status": "accepted"}), 200
return app
Authentication Patterns
import time
import threading
from dataclasses import dataclass, field
import requests
@dataclass
class OAuthToken:
"""An OAuth 2.0 access token with expiry tracking."""
access_token: str
token_type: str
expires_at: float # Unix timestamp
scope: str = ""
def is_expired(self, buffer_seconds: int = 60) -> bool:
"""Returns True if the token has expired or is within buffer_seconds of expiry."""
return time.time() >= (self.expires_at - buffer_seconds)
class OAuth2ClientCredentials:
"""
OAuth 2.0 client credentials flow token manager for RegTech APIs.
Handles token caching, automatic refresh, and thread-safe access.
Suitable for server-to-server API integrations (no user interaction).
Common use cases:
- Sanctions list provider APIs (Refinitiv World-Check, Dow Jones)
- Identity verification APIs (Jumio, Onfido, Trulioo)
- Regulatory data feeds
Thread safety: This class is safe for concurrent use from multiple threads.
"""
def __init__(
self,
token_url: str,
client_id: str,
client_secret: str,
scope: str = "",
):
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self._token: OAuthToken | None = None
self._lock = threading.Lock()
def _fetch_token(self) -> OAuthToken:
"""Fetch a fresh access token from the authorization server."""
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
if self.scope:
payload["scope"] = self.scope
response = requests.post(
self.token_url,
data=payload,
timeout=10,
)
response.raise_for_status()
data = response.json()
return OAuthToken(
access_token=data["access_token"],
token_type=data.get("token_type", "Bearer"),
expires_at=time.time() + data.get("expires_in", 3600),
scope=data.get("scope", self.scope),
)
def get_token(self) -> str:
"""
Returns a valid access token, refreshing if necessary.
Thread-safe.
"""
with self._lock:
if self._token is None or self._token.is_expired():
self._token = self._fetch_token()
return self._token.access_token
def get_headers(self) -> dict[str, str]:
"""Returns Authorization headers for API requests."""
return {"Authorization": f"Bearer {self.get_token()}"}
A.8 Audit Trail Patterns
Immutable Audit Log
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
@dataclass
class AuditEntry:
"""A single immutable entry in a compliance audit log."""
sequence: int
timestamp: str # ISO 8601 UTC
actor: str # User ID or system identifier
action: str # SCREEN | SCORE | ALERT | CLOSE | OVERRIDE
entity_type: str # CUSTOMER | TRANSACTION | ALERT
entity_id: str
payload: dict # Action-specific data
previous_hash: str # Hash of previous entry (chain)
entry_hash: str = field(init=False)
def __post_init__(self):
self.entry_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""
Compute SHA-256 hash of this entry's content.
The hash includes the previous entry's hash, creating a tamper-evident chain.
"""
content = {
"sequence": self.sequence,
"timestamp": self.timestamp,
"actor": self.actor,
"action": self.action,
"entity_type": self.entity_type,
"entity_id": self.entity_id,
"payload": self.payload,
"previous_hash": self.previous_hash,
}
canonical = json.dumps(content, sort_keys=True, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
def to_dict(self) -> dict:
return {
"sequence": self.sequence,
"timestamp": self.timestamp,
"actor": self.actor,
"action": self.action,
"entity_type": self.entity_type,
"entity_id": self.entity_id,
"payload": self.payload,
"previous_hash": self.previous_hash,
"entry_hash": self.entry_hash,
}
class ImmutableAuditLog:
"""
Append-only audit log with hash-chaining for tamper detection.
Each entry contains the hash of the previous entry. Any modification
to a historical entry will invalidate all subsequent hashes, making
tampering detectable.
In production, persist entries to a write-once store (e.g., AWS WORM S3,
Azure Immutable Blob Storage, or an append-only database table) and
regularly archive the hash chain for regulatory evidence.
Regulatory relevance:
- GDPR Article 5(1)(f): integrity and confidentiality of processing
- FCA SYSC 10A: audit trail requirements for MiFID firms
- BSA: five-year record retention for SARs and CTRs
- DORA Article 12: ICT-related incident log integrity
"""
GENESIS_HASH = "0" * 64 # Starting hash for the first entry
def __init__(self):
self._entries: list[AuditEntry] = []
@property
def last_hash(self) -> str:
if not self._entries:
return self.GENESIS_HASH
return self._entries[-1].entry_hash
def append(
self,
actor: str,
action: str,
entity_type: str,
entity_id: str,
payload: dict,
) -> AuditEntry:
"""
Append a new entry to the audit log.
Once appended, entries cannot be modified.
"""
entry = AuditEntry(
sequence=len(self._entries),
timestamp=datetime.now(tz=timezone.utc).isoformat(),
actor=actor,
action=action,
entity_type=entity_type,
entity_id=entity_id,
payload=payload,
previous_hash=self.last_hash,
)
self._entries.append(entry)
return entry
def verify_integrity(self) -> tuple[bool, int | None]:
"""
Verify the hash chain is intact.
Returns (is_valid, first_broken_sequence).
"""
previous_hash = self.GENESIS_HASH
for entry in self._entries:
if entry.previous_hash != previous_hash:
return False, entry.sequence
expected_hash = entry._compute_hash()
if entry.entry_hash != expected_hash:
return False, entry.sequence
previous_hash = entry.entry_hash
return True, None
def entries_for_entity(self, entity_id: str) -> list[dict]:
"""Retrieve all audit entries for a specific entity."""
return [
e.to_dict() for e in self._entries
if e.entity_id == entity_id
]
Decision Recording
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
import json
@dataclass
class ComplianceDecision:
"""
A fully recorded compliance decision for audit purposes.
Captures everything a regulator would need to reconstruct
why a decision was made: who made it, on what data, using
what rules or model, and what the outcome was.
This is distinct from a brief audit log entry — it preserves
the full decision context, including the model version and
all input features used at decision time.
"""
decision_id: str
decision_type: str # ALERT_TRIAGE | SAR_DECISION | KYC_APPROVAL
entity_id: str
entity_type: str # CUSTOMER | TRANSACTION | ALERT
decided_by: str # Analyst ID or "SYSTEM" for automated
decided_at: datetime
outcome: str # ESCALATE | NO_ACTION | SAR_FILE | APPROVE
rationale: str # Free text justification
risk_score: float | None
model_version: str | None
input_features: dict # Snapshot of all model inputs
rule_results: list[dict] # Results of each rule evaluation
supporting_documents: list[str] = field(default_factory=list)
overrides: list[dict] = field(default_factory=list) # Any threshold overrides
def to_audit_payload(self) -> dict:
"""Serialize to a dict suitable for the audit log payload."""
return {
"decision_id": self.decision_id,
"decision_type": self.decision_type,
"entity_id": self.entity_id,
"entity_type": self.entity_type,
"decided_by": self.decided_by,
"decided_at": self.decided_at.isoformat(),
"outcome": self.outcome,
"rationale": self.rationale,
"risk_score": self.risk_score,
"model_version": self.model_version,
"input_features": self.input_features,
"rule_results": self.rule_results,
"supporting_documents": self.supporting_documents,
"overrides": self.overrides,
}
def record_triage_decision(
alert: "ComplianceAlert",
analyst_id: str,
outcome: str,
rationale: str,
audit_log: "ImmutableAuditLog",
risk_score_breakdown: dict | None = None,
) -> ComplianceDecision:
"""
Record the full context of an analyst's alert triage decision.
Args:
alert: The alert being triaged.
analyst_id: ID of the analyst making the decision.
outcome: One of: ESCALATE | SAR_FILE | NO_ACTION | MONITOR
rationale: Analyst's narrative justification (required).
audit_log: The system audit log.
risk_score_breakdown: Score component breakdown from the scoring engine.
Returns: The recorded ComplianceDecision.
"""
if not rationale.strip():
raise ValueError(
"Rationale is required for all compliance decisions. "
"A blank rationale is not acceptable for regulatory purposes."
)
decision = ComplianceDecision(
decision_id=f"DEC-{alert.alert_id}-{int(datetime.now().timestamp())}",
decision_type="ALERT_TRIAGE",
entity_id=alert.alert_id,
entity_type="ALERT",
decided_by=analyst_id,
decided_at=datetime.now(tz=timezone.utc),
outcome=outcome,
rationale=rationale,
risk_score=alert.risk_score,
model_version=alert.model_id,
input_features={"alert_type": alert.alert_type, "rule_ids": alert.rule_ids},
rule_results=risk_score_breakdown.get("components", []) if risk_score_breakdown else [],
)
audit_log.append(
actor=analyst_id,
action="ALERT_TRIAGE",
entity_type="ALERT",
entity_id=alert.alert_id,
payload=decision.to_audit_payload(),
)
return decision
A.9 Quick Reference
Common Regulatory Data Formats
| Field | Format | Validation Rule | Example |
|---|---|---|---|
| LEI | 20-char alphanumeric | MOD 97-10 checksum | 2138004YJJDDEJN2B130 |
| ISIN | 12-char: 2 alpha + 9 alphanum + 1 digit | Luhn checksum | GB0002634946 |
| BIC/SWIFT | 8 or 11 chars: 4 alpha + 2 alpha + 2 alnum + (3 alnum) | ISO 9362 format | HBUKGB4B |
| MIC | 4 uppercase alpha | Exact 4 chars | XLON |
| ISO 4217 | 3 uppercase alpha | ISO 4217 list | USD, EUR, GBP |
| ISO 3166-1 | 2 uppercase alpha | ISO 3166-1 list | US, GB, DE |
| Date (ISO 8601) | YYYY-MM-DD |
Valid calendar date | 2024-03-15 |
| Datetime (UTC) | YYYY-MM-DDTHH:MM:SSZ |
Timezone-aware UTC | 2024-03-15T14:30:00Z |
| NCA Reference | Varies by jurisdiction | Regex per jurisdiction | FCA/2024/001234 |
| CUSIP | 9-char: 6 alphanum + 2 alphanum + 1 digit | Luhn-based checksum | 594918104 |
Package Quick Reference
| Use Case | Package | Key Functions / Classes |
|---|---|---|
| Data manipulation | pandas |
DataFrame, read_csv, groupby, merge, to_datetime |
| Numerical computing | numpy |
ndarray, histogram, clip, where, unique |
| Machine learning | scikit-learn |
GradientBoostingClassifier, train_test_split, roc_auc_score |
| Class imbalance | scikit-learn |
compute_class_weight, precision_recall_curve |
| Explainability | shap |
TreeExplainer, KernelExplainer, shap_values |
| HTTP / API calls | requests |
Session, HTTPAdapter, Retry |
| Webhook server | flask |
Flask, request, jsonify |
| Date/time | datetime, zoneinfo |
datetime, timezone, ZoneInfo, timedelta |
| Data validation | built-in | dataclass, Enum, type hints |
| Regex | re |
compile, fullmatch, search, finditer |
| Hashing / audit | hashlib, hmac |
sha256, new, compare_digest |
| XML / XBRL | xml.etree.ElementTree |
Element, SubElement, tostring, indent |
| CSV output | csv |
DictWriter, QUOTE_ALL |
| JSON | json |
dumps, loads |
| Logging | logging |
getLogger, basicConfig |
Common Pitfalls
-
Naive datetimes in regulatory data. A
datetimewithout timezone information is ambiguous. Always store timestamps as UTC-aware (datetime.now(tz=timezone.utc)). Naive datetimes will cause subtle, hard-to-debug comparison errors when records arrive from different time zones. -
Using random train/test split on time-series data. Standard
sklearn.model_selection.train_test_splituses a random shuffle, which allows the model to learn from "future" data during training. Always use a temporal split — train on earlier data, test on later data. -
String comparison for regulatory identifiers. Comparing LEIs or ISINs with
==on raw strings fails silently for case mismatches or leading/trailing whitespace. Always normalize to.upper().strip()before comparison. -
Treating 0.0 and None as equivalent. In regulatory data,
ownership_pct=0.0(verified zero ownership) is legally distinct fromownership_pct=None(not yet collected). Do not coerce missing values to 0. -
Alert threshold as a technical decision. Setting the model score threshold that triggers an alert is a compliance and legal decision — not a data science optimization. The threshold determines what the institution "has reason to suspect" under the BSA and equivalent statutes. Document threshold decisions with compliance sign-off.
-
Logging sensitive personal data. Logging full customer names, document numbers, or account details in plaintext violates GDPR Article 5 and most data protection laws. Log entity IDs and reference numbers only.
-
Not validating checksums on regulatory identifiers. Accepting any 20-character string as an LEI, or any 12-character string as an ISIN, without checksum validation will cause reporting errors that can result in regulatory penalties.
-
Mutable default arguments in dataclasses. Using
listordictas default values in dataclasses (withoutfield(default_factory=...)) causes all instances to share the same list, leading to subtle data corruption in batch processing. Always usefield(default_factory=list). -
Ignoring PSI drift until it is catastrophic. PSI monitoring should run on every model scoring cycle, not quarterly. A PSI that creeps from 0.05 to 0.20 over three months represents a model that is quietly degrading. Automate PSI alerts.
-
Suppressing exceptions in scoring pipelines. Catching all exceptions and returning a default score (
except Exception: return 0.0) silently converts system failures into "all-clear" outcomes — exactly the wrong behavior for a safety-critical compliance system. Log all failures, route to a dead-letter queue, and escalate persistent errors.