Case Study 2: Meridian Financial — Feature Lineage for Regulatory Compliance

Context

Meridian Financial processes 15,000 credit card applications per day (introduced in Case Study 2 of Chapter 24). Under ECOA (Equal Credit Opportunity Act) and FCRA (Fair Credit Reporting Act), every credit denial must include an adverse action notice listing the principal factors that contributed to the denial. Regulators can — and do — audit these decisions, sometimes years after the fact. The question they ask is precise:

"For application #A-2025-03-15-7842, submitted by applicant Jane Doe on March 15, 2025, at 2:47 PM EST: what data sources were consulted, what feature values were computed, which model version produced the score, what threshold was applied, and what factors were cited in the adverse action notice?"

Answering this question requires complete data lineage — the ability to trace a single decision backward through every layer of the data infrastructure, from the denial letter to the model score to the feature values to the raw data sources.

The Challenge

Meridian's existing infrastructure could not answer the regulatory question. The credit scoring model used 200 features from 5 data sources, but the feature computation pipeline was a monolithic Spark job that:

  1. Read from 5 data sources in parallel.
  2. Computed all 200 features in a single DataFrame transformation chain.
  3. Wrote the features and scores to a PostgreSQL table.
  4. Overwrote the table daily (no history).

Three specific failures motivated the redesign:

Audit failure 1: A regulator asked for the feature values used to deny application #A-2024-11-08-3291. The team could produce the current feature values for the applicant but not the values that existed on November 8, 2024. The PostgreSQL table had been overwritten 130 times since then. The team reconstructed the features by replaying the Spark pipeline on historical data — but could not guarantee that the reconstructed values matched the original values, because the pipeline code had been modified 14 times since November 8.

Audit failure 2: A regulator asked which credit bureau data was used for a specific application. The Spark pipeline fetched credit bureau data via an API call but did not log the raw response. The team could show the computed features (e.g., credit_utilization = 0.73) but could not demonstrate the raw input (total_balance = $18,250`, `total_limit = $25,000). The regulator considered this a documentation deficiency.

Audit failure 3: The regulator requested evidence that the model version in production on January 15, 2025, was the same model that produced the scores logged for that day's applications. The team had no model registry with deployment timestamps. They could show the current model but not prove which model was active on a specific historical date.

The Solution: Immutable Feature Store with Full Lineage

Architecture

Meridian redesigned the credit scoring data infrastructure around three principles: immutability (nothing is overwritten; every version is preserved), traceability (every output links to its inputs), and auditability (any historical state is reconstructable).

from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime


@dataclass
class CreditDecisionRecord:
    """Immutable record of a single credit decision with full lineage.

    Every field needed to reconstruct and explain the decision
    is stored in this record. Records are append-only (never updated)
    and retained for 7 years per FCRA requirements.

    Attributes:
        application_id: Unique application identifier.
        applicant_id: Anonymized applicant identifier.
        decision_timestamp: When the decision was made.
        decision: The credit decision (approved, denied, referred).
        model_version: Exact model version (registry ID + hash).
        model_score: The model's output score.
        decision_threshold: The threshold applied to the score.
        feature_values: Complete feature vector at decision time.
        feature_versions: Version of each feature definition used.
        data_source_snapshots: Snapshot IDs for each raw data source.
        adverse_action_factors: Top factors for denial (if applicable).
        pipeline_version: Git commit hash of the scoring pipeline.
        infrastructure_metadata: Runtime metadata (node, container, etc.).
    """
    application_id: str
    applicant_id: str
    decision_timestamp: datetime
    decision: str
    model_version: str
    model_score: float
    decision_threshold: float
    feature_values: Dict[str, float]
    feature_versions: Dict[str, str]
    data_source_snapshots: Dict[str, str]
    adverse_action_factors: List[str] = field(default_factory=list)
    pipeline_version: str = ""
    infrastructure_metadata: Dict[str, str] = field(default_factory=dict)

    @property
    def is_denial(self) -> bool:
        """Check if this decision was a denial."""
        return self.decision == "denied"

    @property
    def score_margin(self) -> float:
        """Distance between the score and the decision threshold."""
        return self.model_score - self.decision_threshold

    def regulatory_summary(self) -> str:
        """Generate a regulatory-compliant summary of the decision.

        Returns:
            Formatted string suitable for regulatory audit response.
        """
        lines = [
            f"Application: {self.application_id}",
            f"Decision: {self.decision.upper()}",
            f"Timestamp: {self.decision_timestamp.isoformat()}",
            f"Model: {self.model_version}",
            f"Score: {self.model_score:.4f} "
            f"(threshold: {self.decision_threshold:.4f})",
            f"Pipeline: {self.pipeline_version[:12]}",
            f"Features used: {len(self.feature_values)}",
            f"Data sources: {', '.join(self.data_source_snapshots.keys())}",
        ]
        if self.is_denial and self.adverse_action_factors:
            lines.append("Adverse action factors:")
            for i, factor in enumerate(self.adverse_action_factors, 1):
                lines.append(f"  {i}. {factor}")
        return "\n".join(lines)

    def source_traceability(self) -> Dict[str, str]:
        """Map each data source to its retrievable snapshot.

        Returns:
            Dictionary mapping source name to snapshot identifier.
        """
        return {
            source: snapshot_id
            for source, snapshot_id in self.data_source_snapshots.items()
        }


# Example: a denied application with full lineage
denied_application = CreditDecisionRecord(
    application_id="A-2025-03-15-7842",
    applicant_id="ANON-8a3f2c",
    decision_timestamp=datetime(2025, 3, 15, 14, 47, 23),
    decision="denied",
    model_version="credit-xgb-v4.2.1-sha256:a8f3c1e2b1d4",
    model_score=0.312,
    decision_threshold=0.45,
    feature_values={
        "credit_utilization": 0.73,
        "payment_history_score": 0.61,
        "income_to_debt_ratio": 0.28,
        "delinquency_count_24m": 3.0,
        "credit_age_months": 18.0,
        "inquiry_count_6m": 7.0,
        "employment_tenure_months": 8.0,
        "bank_balance_stability": 0.42,
    },
    feature_versions={
        "credit_utilization": "v2.1",
        "payment_history_score": "v3.0",
        "income_to_debt_ratio": "v1.2",
        "delinquency_count_24m": "v1.0",
        "credit_age_months": "v1.0",
        "inquiry_count_6m": "v1.1",
        "employment_tenure_months": "v2.0",
        "bank_balance_stability": "v1.3",
    },
    data_source_snapshots={
        "application_form": "app-snap-2025-03-15T14:47:00Z",
        "credit_bureau_experian": "cb-snap-2025-03-15T14:47:05Z",
        "bank_transactions": "bt-snap-2025-03-15T06:00:00Z",
        "employment_verification": "ev-snap-2025-03-14T22:15:00Z",
    },
    adverse_action_factors=[
        "High credit utilization ratio (73%)",
        "Multiple recent credit inquiries (7 in 6 months)",
        "Recent delinquencies (3 in 24 months)",
        "Low income-to-debt ratio",
    ],
    pipeline_version="git:7a2b3c4d5e6f",
    infrastructure_metadata={
        "compute_node": "scoring-prod-us-east-1a-i-0abc123",
        "container_image": "scoring:4.2.1-20250314",
        "latency_ms": "1247",
    },
)

print(denied_application.regulatory_summary())
Application: A-2025-03-15-7842
Decision: DENIED
Timestamp: 2025-03-15T14:47:23
Model: credit-xgb-v4.2.1-sha256:a8f3c1e2b1d4
Score: 0.3120 (threshold: 0.4500)
Pipeline: git:7a2b3c4d
Features used: 8
Data sources: application_form, credit_bureau_experian, bank_transactions, employment_verification
Adverse action factors:
  1. High credit utilization ratio (73%)
  2. Multiple recent credit inquiries (7 in 6 months)
  3. Recent delinquencies (3 in 24 months)
  4. Low income-to-debt ratio

Data Source Snapshotting

The key architectural decision: every raw data source response is stored as an immutable snapshot, referenced by a unique snapshot ID. When the pipeline fetches credit bureau data for application #A-2025-03-15-7842, the raw API response is written to the immutable store before any feature computation begins.

from dataclasses import dataclass
from typing import Any, Dict
from datetime import datetime
import hashlib
import json


@dataclass
class DataSourceSnapshot:
    """Immutable snapshot of a raw data source response.

    Stored in an append-only table (Delta Lake / Iceberg) for
    regulatory auditability. Never updated or deleted within
    the retention period (7 years for FCRA).

    Attributes:
        snapshot_id: Unique identifier for this snapshot.
        source_name: Name of the data source.
        application_id: The application this data was fetched for.
        fetched_at: When the data was fetched.
        raw_response: The raw response (serialized JSON).
        response_hash: SHA-256 hash of the raw response (integrity check).
        api_version: Version of the data source API.
        latency_ms: Time taken to fetch the data.
    """
    snapshot_id: str
    source_name: str
    application_id: str
    fetched_at: datetime
    raw_response: str
    response_hash: str
    api_version: str = ""
    latency_ms: float = 0.0

    @classmethod
    def create(
        cls,
        source_name: str,
        application_id: str,
        raw_response: Dict[str, Any],
        api_version: str = "",
        latency_ms: float = 0.0,
    ) -> "DataSourceSnapshot":
        """Create a snapshot from a raw API response.

        Args:
            source_name: Data source name.
            application_id: Application identifier.
            raw_response: Raw response dictionary.
            api_version: API version string.
            latency_ms: Fetch latency.

        Returns:
            A new DataSourceSnapshot with computed hash and ID.
        """
        response_json = json.dumps(raw_response, sort_keys=True)
        response_hash = hashlib.sha256(response_json.encode()).hexdigest()
        now = datetime.utcnow()
        snapshot_id = (
            f"{source_name.replace(' ', '-')}-snap-"
            f"{now.strftime('%Y-%m-%dT%H:%M:%SZ')}"
        )

        return cls(
            snapshot_id=snapshot_id,
            source_name=source_name,
            application_id=application_id,
            fetched_at=now,
            raw_response=response_json,
            response_hash=response_hash,
            api_version=api_version,
            latency_ms=latency_ms,
        )

    def verify_integrity(self) -> bool:
        """Verify that the stored response has not been tampered with.

        Returns:
            True if the hash matches the stored response.
        """
        computed = hashlib.sha256(self.raw_response.encode()).hexdigest()
        return computed == self.response_hash


# Example: snapshot a credit bureau response
credit_bureau_response = {
    "applicant_ssn_hash": "sha256:redacted",
    "credit_score": 642,
    "total_accounts": 8,
    "total_balance": 18250.00,
    "total_credit_limit": 25000.00,
    "utilization_ratio": 0.73,
    "delinquencies_24m": 3,
    "inquiries_6m": 7,
    "oldest_account_months": 54,
    "report_date": "2025-03-15",
}

snapshot = DataSourceSnapshot.create(
    source_name="credit_bureau_experian",
    application_id="A-2025-03-15-7842",
    raw_response=credit_bureau_response,
    api_version="v3.2",
    latency_ms=847.0,
)

print(f"Snapshot ID: {snapshot.snapshot_id}")
print(f"Source: {snapshot.source_name}")
print(f"Hash: {snapshot.response_hash[:24]}...")
print(f"Integrity check: {snapshot.verify_integrity()}")
print(f"Latency: {snapshot.latency_ms:.0f}ms")
Snapshot ID: credit_bureau_experian-snap-2025-03-15T14:47:05Z
Source: credit_bureau_experian
Hash: 7b2a4c6e8f0a1b3d5e7f...
Integrity check: True
Latency: 847ms

Storage Architecture

Meridian chose a data warehouse (Snowflake) for the feature store's offline layer rather than a lakehouse. The rationale:

  1. Regulatory requirement for strong consistency. Snowflake provides ACID transactions with serializable isolation. Auditors need to query a single, consistent view of the data.
  2. All data is structured. Credit features are tabular. There are no unstructured data types that would require a data lake.
  3. Time-travel is built in. Snowflake retains historical data for up to 90 days by default (extended to 365 days on the Enterprise tier). Combined with the immutable snapshot table, this provides the full 7-year audit trail.
  4. SQL-native governance. Row-level access policies, dynamic data masking, and audit logging are native Snowflake features, simplifying FCRA compliance.

The immutable decision records and data source snapshots are stored in a separate, append-only Snowflake table with a 7-year retention policy. These tables are never vacuumed or compacted in a way that removes historical records.

Lineage Graph

The team implemented a lineage graph that connects every decision to its inputs:

-- Audit query: reconstruct the complete decision trail
-- for application #A-2025-03-15-7842
WITH decision AS (
    SELECT *
    FROM credit_decisions.decision_records
    WHERE application_id = 'A-2025-03-15-7842'
),
feature_snapshot AS (
    SELECT
        d.application_id,
        d.decision_timestamp,
        f.feature_name,
        f.feature_value,
        f.feature_version,
        f.computation_timestamp
    FROM decision d
    JOIN credit_decisions.feature_audit_log f
        ON d.application_id = f.application_id
        AND d.decision_timestamp = f.decision_timestamp
),
source_snapshots AS (
    SELECT
        d.application_id,
        s.source_name,
        s.snapshot_id,
        s.fetched_at,
        s.api_version,
        s.response_hash
    FROM decision d
    JOIN credit_decisions.data_source_snapshots s
        ON d.application_id = s.application_id
),
model_metadata AS (
    SELECT
        m.model_version,
        m.registered_at,
        m.training_data_snapshot,
        m.validation_auc,
        m.deployed_at,
        m.retired_at
    FROM credit_decisions.model_registry m
    WHERE m.model_version = (
        SELECT model_version FROM decision
    )
)
SELECT
    d.application_id,
    d.decision,
    d.model_score,
    d.decision_threshold,
    d.decision_timestamp,
    mm.model_version,
    mm.registered_at AS model_registered,
    mm.validation_auc AS model_auc,
    mm.training_data_snapshot,
    fs.feature_name,
    fs.feature_value,
    fs.feature_version,
    ss.source_name,
    ss.snapshot_id,
    ss.fetched_at AS source_fetched_at,
    ss.response_hash AS source_integrity_hash
FROM decision d
CROSS JOIN model_metadata mm
LEFT JOIN feature_snapshot fs
    ON d.application_id = fs.application_id
LEFT JOIN source_snapshots ss
    ON d.application_id = ss.application_id
ORDER BY fs.feature_name, ss.source_name;

This single query returns the complete audit trail: the decision, the model version with its training provenance, every feature value with its version, and every raw data source with its integrity hash. The regulator's question is answered in one query.

Results

After deploying the lineage-enabled feature store:

Metric Before After Change
Audit response time 2-4 weeks (manual reconstruction) < 1 hour (automated query) -98%
Audit findings (documentation deficiency) 3 per year 0 -100%
Feature reconstruction accuracy "Best effort" (unknown) Exact (hash-verified) N/A
Storage cost (7-year retention) N/A (data was overwritten) $14,200/year New cost
Regulatory confidence score "Satisfactory" "Strong" 1 tier improvement

The $14,200/year storage cost for 7-year retention is negligible relative to the regulatory risk: a single adverse finding can result in fines of $10,000-$100,000 per violation under ECOA, and Meridian processes 15,000 applications per day.

Lessons Learned

  1. Immutability is the foundation of auditability. The decision to never overwrite data — only append — made the audit trail structurally reliable. There is no question of "was the data modified after the fact" because the system does not support modification.

  2. Hash-based integrity verification is cheap insurance. Storing a SHA-256 hash of every raw data source response costs nothing in compute and negligible storage, but provides cryptographic proof that the data has not been altered since collection.

  3. Lineage is not a reporting tool — it is a debugging tool. The team originally built lineage for regulatory compliance but found its greatest operational value in debugging: when a model's denial rate spiked by 4% in one week, the lineage graph traced the root cause to a credit bureau API change that altered the scale of a utilization ratio field from [0, 100] to [0, 1].

  4. The feature store and the lineage system must be integrated, not bolted on. Early designs treated lineage as a separate logging system. This created gaps — the lineage log could fall out of sync with the feature store. The final design makes lineage a first-class property of the feature store: every feature computation writes its lineage record as part of the same transaction that writes the feature value.

  5. Compliance drives good engineering. The regulatory requirements forced Meridian to build infrastructure — immutable storage, data versioning, lineage tracking, schema evolution — that would have improved the system even without regulatory pressure. Compliance was the forcing function; better engineering was the side effect.