Appendix G: Python Data Ethics Toolkit -- Consolidated Code Reference
This appendix consolidates all Python code from the seven Python chapters (Chapters 10, 14, 15, 22, 27, 29, and 34) plus the GovernanceSimulator from Chapter 39 into a single, self-contained reference. Each section provides complete working code, docstrings, example usage, and expected output.
Requirements: Python 3.8 or later. All code uses only the Python standard library plus three widely available packages:
pip install pandas numpy matplotlib
The dataclasses module (used extensively) is part of the standard library from Python 3.7 onward.
Section 1: Setup and Installation
1.1 Environment Setup
Create a virtual environment and install dependencies:
# Create a virtual environment
python -m venv data-ethics-env
# Activate it (macOS/Linux)
source data-ethics-env/bin/activate
# Activate it (Windows)
data-ethics-env\Scripts\activate
# Install dependencies
pip install pandas numpy matplotlib
1.2 Common Imports
The following imports are used across multiple sections. Include them at the top of your scripts:
from dataclasses import dataclass, field
from datetime import datetime, date
from typing import Optional
import pandas as pd
import numpy as np
1.3 Verifying Your Setup
import sys
print(f"Python version: {sys.version}")
print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")
# Verify dataclasses work
@dataclass
class TestClass:
name: str
value: int = 0
t = TestClass("setup_check", 42)
print(f"Dataclass test: {t}")
# Expected: Dataclass test: TestClass(name='setup_check', value=42)
Section 2: k-Anonymity Checker (Chapter 10)
The k-anonymity checker determines the level of anonymization a dataset achieves by examining how many records share each unique combination of quasi-identifier values.
2.1 Core Function
import pandas as pd
def check_k_anonymity(df: pd.DataFrame, quasi_identifiers: list) -> int:
"""
Check the k-anonymity level of a DataFrame.
Groups records by the specified quasi-identifier columns and returns
the minimum group size. This minimum is the k-value the dataset
achieves: every individual is indistinguishable from at least
(k - 1) other individuals on the quasi-identifier attributes.
Args:
df: pandas DataFrame containing the dataset
quasi_identifiers: list of column names to use as quasi-identifiers
Returns:
int: the minimum group size (k-value). A value of 1 means at least
one record is uniquely identifiable.
Example:
>>> df = pd.DataFrame({
... 'age': [25, 25, 30, 30],
... 'zip': ['10001', '10001', '10002', '10002'],
... 'diagnosis': ['Flu', 'Cold', 'Flu', 'Diabetes']
... })
>>> check_k_anonymity(df, ['age', 'zip'])
2
"""
group_sizes = df.groupby(quasi_identifiers).size()
return int(group_sizes.min())
def k_anonymity_report(df: pd.DataFrame, quasi_identifiers: list) -> str:
"""
Generate a detailed k-anonymity report showing all equivalence classes.
Args:
df: pandas DataFrame containing the dataset
quasi_identifiers: list of column names to use as quasi-identifiers
Returns:
str: formatted report showing each equivalence class and its size
"""
group_sizes = df.groupby(quasi_identifiers).size().reset_index(name='count')
group_sizes = group_sizes.sort_values('count')
k = int(group_sizes['count'].min())
total_groups = len(group_sizes)
vulnerable = len(group_sizes[group_sizes['count'] < 3])
lines = [
"K-ANONYMITY REPORT",
"=" * 50,
f"Dataset size: {len(df)} records",
f"Quasi-identifiers: {quasi_identifiers}",
f"Number of equivalence classes: {total_groups}",
f"Achieved k-anonymity level: {k}",
f"Classes with fewer than 3 records: {vulnerable}",
"",
"Equivalence Classes (sorted by size):",
"-" * 50
]
for _, row in group_sizes.iterrows():
qi_values = [f"{col}={row[col]}" for col in quasi_identifiers]
lines.append(f" {', '.join(qi_values)} -> {int(row['count'])} records")
return "\n".join(lines)
2.2 Generalization Helper
def generalize_age(df: pd.DataFrame, column: str = 'age',
bin_size: int = 10) -> pd.DataFrame:
"""
Generalize an age column into ranges to improve k-anonymity.
Replaces exact ages with range labels (e.g., 27 -> "20-29").
Args:
df: DataFrame to modify (returns a copy)
column: name of the age column
bin_size: width of each age range
Returns:
pd.DataFrame: copy of df with the age column generalized
"""
result = df.copy()
result[column] = result[column].apply(
lambda age: f"{(age // bin_size) * bin_size}-"
f"{(age // bin_size) * bin_size + bin_size - 1}"
)
return result
2.3 l-Diversity Checker
def check_l_diversity(df: pd.DataFrame, quasi_identifiers: list,
sensitive_attribute: str) -> int:
"""
Check the l-diversity level of a DataFrame.
l-diversity requires that each equivalence class (defined by
quasi-identifiers) contains at least l distinct values of the
sensitive attribute.
Args:
df: pandas DataFrame
quasi_identifiers: list of quasi-identifier column names
sensitive_attribute: name of the sensitive attribute column
Returns:
int: minimum number of distinct sensitive values across all
equivalence classes (the l-diversity level)
"""
diversity = df.groupby(quasi_identifiers)[sensitive_attribute].nunique()
return int(diversity.min())
2.4 Example Usage
# Create test dataset
data = {
'age': [25, 25, 30, 30, 30, 35, 35, 35, 40, 40],
'zipcode': ['10001', '10001', '10002', '10002', '10002',
'10003', '10003', '10003', '10004', '10004'],
'gender': ['M', 'M', 'F', 'F', 'F', 'M', 'M', 'F', 'F', 'F'],
'diagnosis': ['Flu', 'Cold', 'Flu', 'Diabetes', 'Cold',
'Flu', 'Flu', 'Cancer', 'Cold', 'Flu']
}
df = pd.DataFrame(data)
# Check k-anonymity
quasi_ids = ['age', 'zipcode', 'gender']
print(k_anonymity_report(df, quasi_ids))
# Check l-diversity
l = check_l_diversity(df, quasi_ids, 'diagnosis')
print(f"\nl-diversity level: {l}")
# Apply generalization and re-check
df_gen = generalize_age(df, 'age', 10)
k_new = check_k_anonymity(df_gen, ['age', 'zipcode', 'gender'])
print(f"\nAfter age generalization (bin_size=10): k = {k_new}")
Section 3: Bias Auditor (Chapter 14)
The BiasAuditor class assesses algorithmic bias by computing selection rates across groups and checking for disparate impact using the four-fifths rule.
3.1 Core Class
from dataclasses import dataclass, field
@dataclass
class BiasAuditor:
"""
Audit an algorithmic system for disparate impact bias.
The auditor computes selection rates for each group and checks
whether the ratio between the lowest and highest selection rates
falls below the four-fifths (80%) threshold -- a widely used
standard for identifying potential discriminatory impact.
Attributes:
predictions: list of binary predictions (1 = selected, 0 = not)
groups: list of group labels (same length as predictions)
Usage:
>>> auditor = BiasAuditor(
... predictions=[1, 0, 1, 1, 0, 1, 0, 0],
... groups=['A', 'A', 'A', 'A', 'B', 'B', 'B', 'B']
... )
>>> print(auditor.audit_report())
"""
predictions: list
groups: list
def selection_rates(self) -> dict:
"""Calculate selection rate for each group."""
group_data = {}
for pred, group in zip(self.predictions, self.groups):
if group not in group_data:
group_data[group] = {'total': 0, 'selected': 0}
group_data[group]['total'] += 1
group_data[group]['selected'] += pred
return {
group: data['selected'] / data['total']
for group, data in group_data.items()
}
def disparate_impact_ratio(self) -> float:
"""
Calculate the disparate impact ratio.
Returns the ratio of the lowest selection rate to the highest.
A ratio below 0.8 (four-fifths) indicates potential disparate impact.
"""
rates = self.selection_rates()
if not rates:
return 1.0
min_rate = min(rates.values())
max_rate = max(rates.values())
if max_rate == 0:
return 1.0
return min_rate / max_rate
def four_fifths_test(self) -> bool:
"""
Apply the four-fifths rule.
Returns True if the system passes (ratio >= 0.8).
Returns False if the system fails (ratio < 0.8).
"""
return self.disparate_impact_ratio() >= 0.8
def audit_report(self) -> str:
"""Generate a formatted audit report."""
rates = self.selection_rates()
ratio = self.disparate_impact_ratio()
passes = self.four_fifths_test()
lines = [
"BIAS AUDIT REPORT",
"=" * 50,
f"Total records: {len(self.predictions)}",
f"Number of groups: {len(rates)}",
"",
"Selection Rates by Group:",
"-" * 30,
]
for group in sorted(rates.keys()):
count = sum(1 for g in self.groups if g == group)
selected = sum(1 for p, g in zip(self.predictions, self.groups)
if g == group and p == 1)
lines.append(
f" {group}: {rates[group]:.3f} "
f"({selected}/{count})"
)
lines.extend([
"",
f"Disparate Impact Ratio: {ratio:.3f}",
f"Four-Fifths Threshold: 0.800",
f"Result: {'PASS' if passes else 'FAIL -- Potential Disparate Impact'}",
])
if not passes:
most = max(rates, key=rates.get)
least = min(rates, key=rates.get)
lines.append(
f"\nMost selected group: {most} ({rates[most]:.3f})"
)
lines.append(
f"Least selected group: {least} ({rates[least]:.3f})"
)
return "\n".join(lines)
3.2 Example Usage
# Hiring scenario: 100 applicants from Group A, 80 from Group B
import random
random.seed(42)
preds_a = [1 if random.random() < 0.60 else 0 for _ in range(100)]
preds_b = [1 if random.random() < 0.35 else 0 for _ in range(80)]
auditor = BiasAuditor(
predictions=preds_a + preds_b,
groups=['Group A'] * 100 + ['Group B'] * 80
)
print(auditor.audit_report())
# Expected output: FAIL -- disparate impact detected
# Group A selection rate ~ 0.60
# Group B selection rate ~ 0.35
# Disparate impact ratio ~ 0.58 (below 0.80 threshold)
Section 4: Fairness Calculator (Chapter 15)
The FairnessCalculator computes and compares multiple fairness metrics across groups, including demographic parity, equalized odds, and calibration.
4.1 Core Dataclass
from dataclasses import dataclass
@dataclass
class FairnessCalculator:
"""
Calculate and compare fairness metrics across demographic groups.
Computes confusion matrix components, per-group metrics, and
assesses three fairness criteria: demographic parity, equalized
odds, and calibration.
Attributes:
predictions: list of binary predictions (1/0)
actuals: list of actual outcomes (1/0)
groups: list of group labels
The fairness criteria assessed:
- Demographic Parity: equal selection rates across groups
- Equalized Odds: equal TPR and FPR across groups
- Calibration: equal PPV (positive predictive value) across groups
"""
predictions: list
actuals: list
groups: list
def _group_indices(self) -> dict:
"""Return dict mapping group labels to lists of indices."""
indices = {}
for i, g in enumerate(self.groups):
indices.setdefault(g, []).append(i)
return indices
def metrics_by_group(self) -> dict:
"""
Compute confusion matrix and derived metrics for each group.
Returns:
dict mapping group labels to metric dictionaries containing:
n, tp, fp, tn, fn, base_rate, selection_rate, tpr, fpr, ppv
"""
results = {}
for group, idx in self._group_indices().items():
preds = [self.predictions[i] for i in idx]
acts = [self.actuals[i] for i in idx]
tp = sum(1 for p, a in zip(preds, acts) if p == 1 and a == 1)
fp = sum(1 for p, a in zip(preds, acts) if p == 1 and a == 0)
tn = sum(1 for p, a in zip(preds, acts) if p == 0 and a == 0)
fn = sum(1 for p, a in zip(preds, acts) if p == 0 and a == 1)
n = len(idx)
pos = tp + fn
neg = fp + tn
results[group] = {
'n': n, 'tp': tp, 'fp': fp, 'tn': tn, 'fn': fn,
'base_rate': pos / n if n > 0 else 0,
'selection_rate': (tp + fp) / n if n > 0 else 0,
'tpr': tp / pos if pos > 0 else 0,
'fpr': fp / neg if neg > 0 else 0,
'ppv': tp / (tp + fp) if (tp + fp) > 0 else 0,
}
return results
def check_demographic_parity(self, threshold: float = 0.05) -> dict:
"""Check if selection rates are approximately equal."""
m = self.metrics_by_group()
rates = {g: v['selection_rate'] for g, v in m.items()}
diff = max(rates.values()) - min(rates.values())
return {
'rates': rates,
'max_difference': diff,
'satisfied': diff <= threshold
}
def check_equalized_odds(self, threshold: float = 0.05) -> dict:
"""Check if TPR and FPR are approximately equal across groups."""
m = self.metrics_by_group()
tprs = {g: v['tpr'] for g, v in m.items()}
fprs = {g: v['fpr'] for g, v in m.items()}
tpr_diff = max(tprs.values()) - min(tprs.values())
fpr_diff = max(fprs.values()) - min(fprs.values())
return {
'tprs': tprs, 'fprs': fprs,
'tpr_difference': tpr_diff,
'fpr_difference': fpr_diff,
'satisfied': tpr_diff <= threshold and fpr_diff <= threshold
}
def check_calibration(self, threshold: float = 0.05) -> dict:
"""Check if PPV is approximately equal across groups."""
m = self.metrics_by_group()
ppvs = {g: v['ppv'] for g, v in m.items()}
diff = max(ppvs.values()) - min(ppvs.values())
return {
'ppvs': ppvs,
'max_difference': diff,
'satisfied': diff <= threshold
}
def full_report(self) -> str:
"""Generate a comprehensive fairness report."""
m = self.metrics_by_group()
dp = self.check_demographic_parity()
eo = self.check_equalized_odds()
cal = self.check_calibration()
lines = [
"FAIRNESS REPORT",
"=" * 60
]
for g in sorted(m.keys()):
v = m[g]
lines.extend([
f"\nGroup: {g} (n={v['n']})",
f" Base rate: {v['base_rate']:.3f}",
f" Selection rate: {v['selection_rate']:.3f}",
f" TPR: {v['tpr']:.3f}",
f" FPR: {v['fpr']:.3f}",
f" PPV: {v['ppv']:.3f}",
])
status = lambda s: "SATISFIED" if s else "VIOLATED"
lines.extend([
"\n" + "-" * 60,
"FAIRNESS CRITERIA ASSESSMENT",
"-" * 60,
f"\n1. Demographic Parity: {status(dp['satisfied'])}",
f" Max selection rate difference: {dp['max_difference']:.3f}",
f"\n2. Equalized Odds: {status(eo['satisfied'])}",
f" TPR difference: {eo['tpr_difference']:.3f}",
f" FPR difference: {eo['fpr_difference']:.3f}",
f"\n3. Calibration: {status(cal['satisfied'])}",
f" Max PPV difference: {cal['max_difference']:.3f}",
])
return "\n".join(lines)
4.2 Example Usage
# COMPAS-like scenario: different base rates, calibrated predictions
import random
random.seed(42)
# Group A: 500 people, 40% base rate
# Group B: 500 people, 20% base rate
def generate_group(n, base_rate, threshold=0.5):
preds, acts = [], []
for _ in range(n):
actual = 1 if random.random() < base_rate else 0
# Score correlated with actual outcome
score = random.gauss(0.7 if actual else 0.3, 0.2)
pred = 1 if score > threshold else 0
preds.append(pred)
acts.append(actual)
return preds, acts
preds_a, acts_a = generate_group(500, 0.40)
preds_b, acts_b = generate_group(500, 0.20)
fc = FairnessCalculator(
predictions=preds_a + preds_b,
actuals=acts_a + acts_b,
groups=['A'] * 500 + ['B'] * 500
)
print(fc.full_report())
Section 5: Data Quality Auditor (Chapter 22)
The DataQualityAuditor class assesses data quality across six standard dimensions.
5.1 Core Class
from dataclasses import dataclass, field
import pandas as pd
from datetime import datetime
@dataclass
class DataQualityAuditor:
"""
Audit a DataFrame for data quality across six dimensions:
accuracy, completeness, consistency, timeliness, validity, uniqueness.
Attributes:
df: the DataFrame to audit
name: descriptive name for the dataset
"""
df: pd.DataFrame
name: str = "Unnamed Dataset"
def completeness_score(self) -> dict:
"""
Calculate completeness (percentage of non-null values) per column.
Returns:
dict with per-column scores and overall score
"""
scores = {}
for col in self.df.columns:
non_null = self.df[col].notna().sum()
scores[col] = non_null / len(self.df)
overall = sum(scores.values()) / len(scores) if scores else 0
return {'per_column': scores, 'overall': overall}
def uniqueness_score(self, key_column: str) -> float:
"""
Calculate uniqueness score for a key column.
Returns: ratio of unique values to total values (1.0 = all unique)
"""
unique_count = self.df[key_column].nunique()
total_count = len(self.df)
return unique_count / total_count if total_count > 0 else 0
def validity_check(self, column: str, rule_name: str,
rule_func) -> dict:
"""
Check validity of a column against a custom rule.
Args:
column: column name to check
rule_name: descriptive name of the validation rule
rule_func: function that takes a value and returns True/False
Returns:
dict with pass_rate, failing_count, and failing_indices
"""
mask = self.df[column].apply(
lambda x: rule_func(x) if pd.notna(x) else True
)
failing = self.df[~mask].index.tolist()
pass_rate = mask.sum() / len(self.df)
return {
'rule': rule_name,
'column': column,
'pass_rate': pass_rate,
'failing_count': len(failing),
'failing_indices': failing[:10] # First 10 for inspection
}
def consistency_check(self, col1: str, col2: str,
check_func) -> dict:
"""
Check consistency between two columns using a custom function.
Args:
col1, col2: columns to compare
check_func: function(val1, val2) -> bool
Returns:
dict with consistency rate and failing records
"""
results = self.df.apply(
lambda row: check_func(row[col1], row[col2])
if pd.notna(row[col1]) and pd.notna(row[col2]) else True,
axis=1
)
return {
'columns': (col1, col2),
'consistency_rate': results.sum() / len(self.df),
'inconsistent_count': (~results).sum()
}
def generate_report(self, key_column: str) -> str:
"""Generate a comprehensive data quality report."""
comp = self.completeness_score()
uniq = self.uniqueness_score(key_column)
lines = [
f"DATA QUALITY REPORT: {self.name}",
"=" * 60,
f"Total records: {len(self.df)}",
f"Total columns: {len(self.df.columns)}",
"",
"COMPLETENESS",
"-" * 40,
]
for col, score in comp['per_column'].items():
bar = '#' * int(score * 20)
lines.append(f" {col:20s} {score:.1%} {bar}")
lines.extend([
f" Overall: {comp['overall']:.1%}",
"",
"UNIQUENESS",
"-" * 40,
f" Key column '{key_column}': {uniq:.1%}",
f" Duplicate keys: {len(self.df) - self.df[key_column].nunique()}",
])
# Summary rating
scores = list(comp['per_column'].values()) + [uniq]
min_score = min(scores)
if min_score >= 0.95:
rating = "EXCELLENT"
elif min_score >= 0.85:
rating = "ACCEPTABLE"
elif min_score >= 0.70:
rating = "NEEDS IMPROVEMENT"
else:
rating = "CRITICAL"
lines.extend([
"",
f"OVERALL RATING: {rating}",
f"Lowest dimension score: {min_score:.1%}"
])
return "\n".join(lines)
5.2 Example Usage
import random
random.seed(42)
# Create sample dataset with quality issues
n = 200
data = {
'customer_id': list(range(1, n + 1)),
'name': [f"Customer_{i}" for i in range(1, n + 1)],
'email': [f"user{i}@example.com" if random.random() > 0.1
else None for i in range(1, n + 1)],
'age': [random.randint(18, 85) if random.random() > 0.05
else -5 for _ in range(n)],
}
# Add some duplicate IDs
data['customer_id'][195] = data['customer_id'][10]
data['customer_id'][196] = data['customer_id'][20]
df = pd.DataFrame(data)
auditor = DataQualityAuditor(df, "Customer Records")
print(auditor.generate_report('customer_id'))
# Validity check for age
result = auditor.validity_check('age', 'positive_age', lambda x: x > 0)
print(f"\nAge validity: {result['pass_rate']:.1%} pass rate")
print(f" Failing records: {result['failing_count']}")
Section 6: Data Lineage Tracker (Chapter 27)
The DataLineageTracker records the provenance, transformations, and access history of a data asset.
6.1 Supporting Dataclasses and Core Class
from dataclasses import dataclass, field
from datetime import datetime, date
from typing import Optional
@dataclass
class TransformationRecord:
"""Record of a data transformation operation."""
operation: str
performed_by: str
timestamp: datetime
description: str
rows_before: int
rows_after: int
columns_affected: list = field(default_factory=list)
@dataclass
class AccessRecord:
"""Record of a data access event."""
user: str
access_type: str # read, write, export, delete
timestamp: datetime
approved: bool
purpose: str
@dataclass
class DataLineageTracker:
"""
Track the lineage, transformations, and access history of a data asset.
Provides a complete audit trail from data source through all
transformations and access events, supporting governance requirements
for accountability and transparency.
Attributes:
asset_name: human-readable name of the data asset
source: origin of the data (e.g., "clinic intake forms")
classification: sensitivity level (public/internal/confidential/restricted)
storage_location: where the data is currently stored
retention_policy: human-readable retention rule
retention_expiry: date when the data should be deleted or reviewed
"""
asset_name: str
source: str
classification: str
storage_location: str
retention_policy: str
retention_expiry: Optional[date] = None
created_at: datetime = field(default_factory=datetime.now)
transformations: list = field(default_factory=list)
access_log: list = field(default_factory=list)
def add_transformation(self, record: TransformationRecord) -> None:
"""Log a transformation applied to this data asset."""
self.transformations.append(record)
def log_access(self, record: AccessRecord) -> None:
"""Log an access event for this data asset."""
self.access_log.append(record)
def check_retention(self) -> dict:
"""
Check retention status against the expiry date.
Returns:
dict with status ('active', 'expiring_soon', 'expired', 'no_expiry'),
days_remaining (if applicable), and a recommendation
"""
if self.retention_expiry is None:
return {
'status': 'no_expiry',
'recommendation': 'Set a retention expiry date.'
}
today = date.today()
days_remaining = (self.retention_expiry - today).days
if days_remaining < 0:
return {
'status': 'expired',
'days_overdue': abs(days_remaining),
'recommendation': 'Data should be reviewed for deletion.'
}
elif days_remaining <= 90:
return {
'status': 'expiring_soon',
'days_remaining': days_remaining,
'recommendation': 'Begin retention review process.'
}
else:
return {
'status': 'active',
'days_remaining': days_remaining,
'recommendation': 'No action needed.'
}
def unapproved_access_count(self) -> int:
"""Count access events that were not approved."""
return sum(1 for a in self.access_log if not a.approved)
def generate_report(self) -> str:
"""Generate a comprehensive lineage report."""
retention = self.check_retention()
unapproved = self.unapproved_access_count()
lines = [
f"DATA LINEAGE REPORT",
"=" * 60,
f"Asset: {self.asset_name}",
f"Source: {self.source}",
f"Classification: {self.classification}",
f"Storage: {self.storage_location}",
f"Created: {self.created_at.strftime('%Y-%m-%d')}",
f"Retention: {self.retention_policy}",
f"Expiry: {self.retention_expiry or 'Not set'}",
f"Retention status: {retention['status']}",
f" -> {retention['recommendation']}",
"",
f"TRANSFORMATIONS ({len(self.transformations)})",
"-" * 40,
]
for i, t in enumerate(self.transformations, 1):
lines.extend([
f" {i}. {t.operation}",
f" By: {t.performed_by} on "
f"{t.timestamp.strftime('%Y-%m-%d %H:%M')}",
f" Rows: {t.rows_before} -> {t.rows_after}",
f" {t.description}",
])
lines.extend([
"",
f"ACCESS LOG ({len(self.access_log)} entries, "
f"{unapproved} unapproved)",
"-" * 40,
])
for a in self.access_log:
status = "APPROVED" if a.approved else "** UNAPPROVED **"
lines.append(
f" {a.timestamp.strftime('%Y-%m-%d %H:%M')} | "
f"{a.access_type:6s} | {a.user:15s} | {status}"
)
# Ethical review notes
lines.extend(["", "ETHICAL REVIEW NOTES", "-" * 40])
if unapproved > 0:
lines.append(
f" WARNING: {unapproved} unapproved access event(s) detected."
)
if self.classification in ('restricted', 'confidential'):
exports = sum(1 for a in self.access_log
if a.access_type == 'export')
if exports > 0:
lines.append(
f" NOTICE: {exports} export(s) of {self.classification} "
f"data recorded."
)
if retention['status'] == 'expired':
lines.append(" ALERT: Data has exceeded its retention period.")
return "\n".join(lines)
6.2 Example Usage
tracker = DataLineageTracker(
asset_name="Patient Demographics v2",
source="Clinic intake forms (47 clinics)",
classification="restricted",
storage_location="vitramed-prod-db-east",
retention_policy="7 years from last clinical visit",
retention_expiry=date(2032, 12, 31)
)
tracker.add_transformation(TransformationRecord(
operation="De-identification",
performed_by="Dr. Khoury",
timestamp=datetime(2025, 6, 15, 10, 30),
description="Removed SSN, full name; generalized DOB to year",
rows_before=15420, rows_after=15420,
columns_affected=['ssn', 'full_name', 'dob']
))
tracker.log_access(AccessRecord(
user="Mira Chakravarti",
access_type="read",
timestamp=datetime(2025, 7, 1, 9, 15),
approved=True,
purpose="Governance framework audit"
))
tracker.log_access(AccessRecord(
user="External Analyst",
access_type="export",
timestamp=datetime(2025, 7, 10, 16, 45),
approved=False,
purpose="Third-party analytics"
))
print(tracker.generate_report())
Section 7: Model Card Generator (Chapter 29)
The ModelCard dataclass documents a machine learning model's purpose, performance, limitations, and ethical considerations.
7.1 Core Dataclass
from dataclasses import dataclass, field
@dataclass
class ModelCard:
"""
Standardized documentation for a machine learning model.
Based on Mitchell et al. (2019), "Model Cards for Model Reporting."
Documents the model's purpose, performance, and ethical considerations
to support transparency and accountability.
"""
model_name: str
version: str
description: str
model_type: str
intended_use: str
out_of_scope_uses: list
training_data_summary: str
evaluation_data_summary: str
evaluation_metrics: dict
disaggregated_metrics: dict
ethical_considerations: list
limitations: list
recommendations: list = field(default_factory=list)
last_updated: str = ""
contact: str = ""
def generate_report(self) -> str:
"""Generate a formatted model card report."""
lines = [
f"{'=' * 60}",
f"MODEL CARD: {self.model_name}",
f"Version: {self.version}",
f"Last Updated: {self.last_updated}",
f"Contact: {self.contact}",
f"{'=' * 60}",
"",
"1. MODEL DETAILS",
f" Type: {self.model_type}",
f" Description: {self.description}",
"",
"2. INTENDED USE",
f" {self.intended_use}",
"",
"3. OUT-OF-SCOPE USES",
]
for use in self.out_of_scope_uses:
lines.append(f" - {use}")
lines.extend([
"",
"4. TRAINING DATA",
f" {self.training_data_summary}",
"",
"5. EVALUATION DATA",
f" {self.evaluation_data_summary}",
"",
"6. OVERALL PERFORMANCE METRICS",
])
for metric, value in self.evaluation_metrics.items():
lines.append(f" {metric}: {value}")
lines.extend(["", "7. DISAGGREGATED PERFORMANCE"])
for group, metrics in self.disaggregated_metrics.items():
lines.append(f" {group}:")
for metric, value in metrics.items():
lines.append(f" {metric}: {value}")
lines.extend(["", "8. ETHICAL CONSIDERATIONS"])
for consideration in self.ethical_considerations:
lines.append(f" - {consideration}")
lines.extend(["", "9. KNOWN LIMITATIONS"])
for limitation in self.limitations:
lines.append(f" - {limitation}")
if self.recommendations:
lines.extend(["", "10. RECOMMENDATIONS"])
for rec in self.recommendations:
lines.append(f" - {rec}")
lines.append(f"\n{'=' * 60}")
return "\n".join(lines)
7.2 Example: VitraMed Patient Risk Model
vitramed_card = ModelCard(
model_name="VitraMed Patient Risk Predictor",
version="3.1",
description="Gradient boosted model predicting 30-day hospital "
"readmission risk for discharged patients.",
model_type="XGBoost (gradient boosted decision trees)",
intended_use="Flag high-risk patients for nurse follow-up calls "
"within 48 hours of discharge.",
out_of_scope_uses=[
"Denial or limitation of insurance coverage",
"Automated discharge decisions without clinician review",
"Risk scoring for populations not in the training data"
],
training_data_summary="EHR data from 47 partner clinics, "
"2019-2024, n=89,400 discharge episodes.",
evaluation_data_summary="Hold-out test set of 15,200 episodes "
"from 2024, stratified by clinic and demographics.",
evaluation_metrics={
'AUC-ROC': 0.84, 'Accuracy': 0.78,
'Precision': 0.72, 'Recall': 0.69
},
disaggregated_metrics={
'White patients (n=8,200)': {'AUC': 0.86, 'FPR': 0.14},
'Black patients (n=3,100)': {'AUC': 0.79, 'FPR': 0.22},
'Hispanic patients (n=2,800)': {'AUC': 0.81, 'FPR': 0.19},
'Patients age 65+ (n=5,400)': {'AUC': 0.76, 'FPR': 0.26},
},
ethical_considerations=[
"Performance gap for Black patients (AUC 0.79 vs 0.86) "
"requires monitoring and mitigation.",
"Model uses healthcare utilization features that may encode "
"access disparities as health differences.",
"Patients over 65 have significantly higher FPR, risking "
"unnecessary anxiety from false positive flags.",
"Training data underrepresents rural clinics."
],
limitations=[
"Not validated for pediatric populations.",
"Performance degrades for patients with fewer than 3 prior visits.",
"Does not account for social determinants of health."
],
recommendations=[
"Re-evaluate every 6 months for performance drift.",
"Conduct community advisory review before expanding to new regions.",
"All flagged patients should receive human clinical review."
],
last_updated="2025-09-15",
contact="Mira Chakravarti, VitraMed Data Governance"
)
print(vitramed_card.generate_report())
Section 8: Carbon Estimator (Chapter 34)
The CarbonEstimator estimates the carbon footprint of AI model training based on GPU type, training duration, and cloud region.
8.1 Core Class
from dataclasses import dataclass
@dataclass
class CarbonEstimator:
"""
Estimate the carbon emissions of AI model training.
Uses GPU power consumption, training duration, regional carbon
intensity, and PUE to estimate total energy use and CO2 emissions.
Attributes:
gpu_type: GPU model identifier (A100, H100, V100, T4)
num_gpus: number of GPUs used for training
training_hours: total training duration in hours
cloud_region: cloud provider region identifier
pue: Power Usage Effectiveness (default 1.1 for modern facilities)
"""
gpu_type: str
num_gpus: int
training_hours: float
cloud_region: str
pue: float = 1.1
# GPU thermal design power in watts
GPU_POWER_WATTS: dict = None
# Regional carbon intensity in gCO2 per kWh
CARBON_INTENSITY: dict = None
# Reference values
FLIGHT_KG_CO2: float = 900 # One transatlantic round-trip
CAR_YEAR_KG_CO2: float = 4600 # Average car per year
HOME_YEAR_KG_CO2: float = 7500 # Average US home per year
def __post_init__(self):
self.GPU_POWER_WATTS = {
'T4': 70, 'V100': 300, 'A100': 250,
'A100-80GB': 300, 'H100': 350, 'H200': 400,
}
self.CARBON_INTENSITY = {
'us-east': 380, 'us-west': 210, 'us-central': 440,
'canada-central': 30, 'eu-west': 270, 'eu-north': 50,
'uk-south': 230, 'asia-southeast': 490,
'india-central': 700, 'australia-east': 660,
'brazil-south': 75, 'japan-east': 460,
}
def total_energy_kwh(self) -> float:
"""
Calculate total energy consumption in kilowatt-hours.
Formula: (GPU_watts * num_GPUs * hours * PUE) / 1000
"""
gpu_watts = self.GPU_POWER_WATTS.get(self.gpu_type, 250)
return (gpu_watts * self.num_gpus * self.training_hours
* self.pue) / 1000
def total_carbon_kg(self) -> float:
"""
Calculate total carbon emissions in kilograms of CO2.
Formula: energy_kWh * carbon_intensity_gCO2/kWh / 1000
"""
intensity = self.CARBON_INTENSITY.get(self.cloud_region, 400)
return self.total_energy_kwh() * intensity / 1000
def equivalents(self) -> dict:
"""Express carbon emissions in familiar equivalents."""
carbon = self.total_carbon_kg()
return {
'transatlantic_flights': carbon / self.FLIGHT_KG_CO2,
'car_years': carbon / self.CAR_YEAR_KG_CO2,
'home_months': (carbon / self.HOME_YEAR_KG_CO2) * 12,
}
def compare_regions(self) -> str:
"""Compare emissions across all available regions."""
lines = [
f"REGIONAL COMPARISON: {self.num_gpus}x {self.gpu_type} "
f"for {self.training_hours}h",
"=" * 60,
f"{'Region':<20s} {'gCO2/kWh':>10s} {'Total kg CO2':>12s} "
f"{'Flights':>8s}",
"-" * 60,
]
results = []
for region, intensity in sorted(self.CARBON_INTENSITY.items(),
key=lambda x: x[1]):
est = CarbonEstimator(
self.gpu_type, self.num_gpus,
self.training_hours, region, self.pue
)
results.append((region, intensity, est.total_carbon_kg()))
for region, intensity, carbon in results:
flights = carbon / self.FLIGHT_KG_CO2
lines.append(
f" {region:<18s} {intensity:>10d} {carbon:>12.1f} "
f"{flights:>8.2f}"
)
if results:
min_c = min(r[2] for r in results)
max_c = max(r[2] for r in results)
lines.append(f"\nRatio (highest/lowest): {max_c/min_c:.1f}x")
return "\n".join(lines)
def report(self) -> str:
"""Generate a complete carbon footprint report."""
energy = self.total_energy_kwh()
carbon = self.total_carbon_kg()
equiv = self.equivalents()
return "\n".join([
"CARBON FOOTPRINT ESTIMATE",
"=" * 50,
f"Configuration:",
f" GPU: {self.num_gpus}x {self.gpu_type}",
f" Duration: {self.training_hours} hours",
f" Region: {self.cloud_region}",
f" PUE: {self.pue}",
"",
f"Total energy: {energy:,.1f} kWh",
f"Total carbon: {carbon:,.1f} kg CO2",
"",
"Equivalents:",
f" Transatlantic flights: {equiv['transatlantic_flights']:.2f}",
f" Car-years: {equiv['car_years']:.2f}",
f" Home-months: {equiv['home_months']:.1f}",
])
8.2 Example Usage
# Estimate carbon for VitraMed's model training
est = CarbonEstimator(
gpu_type='A100', num_gpus=8,
training_hours=72, cloud_region='us-east'
)
print(est.report())
print()
print(est.compare_regions())
Section 9: Governance Simulator (Chapter 39)
The GovernanceSimulator models how different governance structures distribute benefits, privacy protection, and voice across stakeholders in a community data ecosystem.
9.1 Supporting Dataclasses and Core Class
from dataclasses import dataclass, field
import random
@dataclass
class Stakeholder:
"""A member of a data governance community."""
name: str
group: str # e.g., "resident", "business", "government"
data_contribution: int # 0-100: how much data they contribute
technical_literacy: int # 0-100: ability to engage with technical systems
political_influence: int # 0-100: power to shape governance decisions
privacy_preference: int # 0-100: how much they value privacy
benefit_threshold: float # minimum benefit level they consider acceptable
@dataclass
class GovernanceOutcome:
"""Results of a governance simulation for one model."""
model_name: str
benefits: dict # stakeholder name -> benefit value
privacy_scores: dict # stakeholder name -> privacy protection (0-1)
voice_scores: dict # stakeholder name -> voice in governance (0-1)
gini_coefficient: float # inequality in benefit distribution
below_threshold: list # stakeholders below their benefit threshold
@dataclass
class GovernanceSimulator:
"""
Simulate and compare different data governance models.
Models how corporate, regulatory, cooperative, and open commons
governance structures distribute benefits, privacy protection,
and voice across a community of stakeholders.
"""
community: list # list of Stakeholder objects
seed: int = 42
def __post_init__(self):
random.seed(self.seed)
def _gini(self, values: list) -> float:
"""Calculate the Gini coefficient of a list of values."""
sorted_vals = sorted(values)
n = len(sorted_vals)
if n == 0 or sum(sorted_vals) == 0:
return 0.0
cumulative = sum((2 * (i + 1) - n - 1) * v
for i, v in enumerate(sorted_vals))
return cumulative / (n * sum(sorted_vals))
def corporate_centralized(self) -> GovernanceOutcome:
"""Corporate governance: benefits proportional to influence."""
total_influence = sum(s.political_influence for s in self.community)
total_data = sum(s.data_contribution for s in self.community)
benefits, privacy, voice = {}, {}, {}
for s in self.community:
inf_share = s.political_influence / total_influence
data_share = s.data_contribution / total_data
benefits[s.name] = inf_share * 100 + random.gauss(0, 2)
# Privacy inversely related to data contribution
privacy[s.name] = max(0, min(1,
0.3 + (s.political_influence / 100) * 0.5 +
random.gauss(0, 0.05)))
voice[s.name] = max(0, min(1,
s.political_influence / 100 + random.gauss(0, 0.05)))
vals = list(benefits.values())
below = [s.name for s in self.community
if benefits[s.name] < s.benefit_threshold * 100]
return GovernanceOutcome(
"Corporate Centralized", benefits, privacy, voice,
self._gini(vals), below
)
def regulatory_standard(self) -> GovernanceOutcome:
"""Regulatory governance: minimum standards enforced."""
benefits, privacy, voice = {}, {}, {}
min_privacy = 0.6 # Regulatory floor
for s in self.community:
# Benefits based on a mix of contribution and influence
benefits[s.name] = (
s.data_contribution * 0.4 +
s.political_influence * 0.3 +
s.technical_literacy * 0.2 +
random.gauss(0, 3)
)
# Privacy has a regulatory floor
privacy[s.name] = max(min_privacy,
0.5 + (s.privacy_preference / 100) * 0.4 +
random.gauss(0, 0.05))
# Voice through regulatory processes
voice[s.name] = max(0, min(1,
0.3 + (s.technical_literacy / 100) * 0.3 +
(s.political_influence / 100) * 0.3 +
random.gauss(0, 0.05)))
vals = list(benefits.values())
below = [s.name for s in self.community
if benefits[s.name] < s.benefit_threshold * 100]
return GovernanceOutcome(
"Regulatory Standard", benefits, privacy, voice,
self._gini(vals), below
)
def cooperative_democratic(self) -> GovernanceOutcome:
"""Cooperative governance: democratic, equal voice, coordination costs."""
coordination_cost = 0.85 # 15% efficiency loss
n = len(self.community)
benefits, privacy, voice = {}, {}, {}
total_data = sum(s.data_contribution for s in self.community)
for s in self.community:
# Benefits more equally distributed
equal_share = 100 / n
contribution_bonus = (s.data_contribution / total_data) * 20
benefits[s.name] = (
(equal_share + contribution_bonus) * coordination_cost +
random.gauss(0, 2)
)
# Privacy respects individual preferences
privacy[s.name] = max(0, min(1,
0.5 + (s.privacy_preference / 100) * 0.45 +
random.gauss(0, 0.05)))
# Voice is roughly equal (democratic)
voice[s.name] = max(0, min(1,
0.7 + random.gauss(0, 0.1)))
vals = list(benefits.values())
below = [s.name for s in self.community
if benefits[s.name] < s.benefit_threshold * 100]
return GovernanceOutcome(
"Cooperative Democratic", benefits, privacy, voice,
self._gini(vals), below
)
def open_commons(self) -> GovernanceOutcome:
"""Open commons: minimal governance, benefits to technically literate."""
benefits, privacy, voice = {}, {}, {}
for s in self.community:
# Benefits proportional to technical ability to extract value
benefits[s.name] = (
s.technical_literacy * 0.6 +
s.data_contribution * 0.3 +
random.gauss(0, 5)
)
# Privacy is low -- data is open
privacy[s.name] = max(0, min(1,
0.2 + random.gauss(0, 0.1)))
# Voice proportional to technical engagement
voice[s.name] = max(0, min(1,
s.technical_literacy / 100 + random.gauss(0, 0.1)))
vals = list(benefits.values())
below = [s.name for s in self.community
if benefits[s.name] < s.benefit_threshold * 100]
return GovernanceOutcome(
"Open Commons", benefits, privacy, voice,
self._gini(vals), below
)
def run_all_models(self) -> list:
"""Run all governance models and return results."""
random.seed(self.seed)
return [
self.corporate_centralized(),
self.regulatory_standard(),
self.cooperative_democratic(),
self.open_commons(),
]
def display_comparison(self, results: list) -> str:
"""Generate a formatted comparison of governance model outcomes."""
lines = [
"GOVERNANCE MODEL COMPARISON",
"=" * 70,
]
for r in results:
avg_benefit = sum(r.benefits.values()) / len(r.benefits)
avg_privacy = sum(r.privacy_scores.values()) / len(r.privacy_scores)
avg_voice = sum(r.voice_scores.values()) / len(r.voice_scores)
lines.extend([
f"\n{r.model_name}",
"-" * 40,
f" Avg benefit: {avg_benefit:.1f}",
f" Gini coefficient: {r.gini_coefficient:.3f}",
f" Avg privacy: {avg_privacy:.3f}",
f" Avg voice: {avg_voice:.3f}",
f" Below threshold: {len(r.below_threshold)} stakeholders",
])
if r.below_threshold:
lines.append(f" -> {', '.join(r.below_threshold)}")
return "\n".join(lines)
def create_example_community() -> list:
"""Create a representative community for simulation."""
return [
Stakeholder("Tech Company", "corporate", 90, 95, 85, 20, 0.3),
Stakeholder("Small Business", "business", 40, 50, 30, 50, 0.4),
Stakeholder("City Government", "government", 60, 60, 75, 40, 0.2),
Stakeholder("University", "institution", 70, 85, 55, 60, 0.3),
Stakeholder("Community Org", "civil_society", 30, 40, 25, 80, 0.5),
Stakeholder("Senior Resident", "resident", 20, 15, 10, 90, 0.6),
Stakeholder("Young Professional", "resident", 50, 75, 20, 45, 0.4),
Stakeholder("Gig Worker", "worker", 45, 35, 15, 70, 0.5),
]
9.2 Example Usage
community = create_example_community()
sim = GovernanceSimulator(community, seed=42)
results = sim.run_all_models()
print(sim.display_comparison(results))
Expected output shows that the corporate model produces the highest total benefit but the greatest inequality (highest Gini coefficient), while the cooperative model produces the most equal distribution but lower total benefit due to coordination costs. The regulatory model enforces minimum privacy standards that the corporate model does not. The open commons model strongly favors technically literate stakeholders.
Using This Toolkit
This appendix is designed as a reference, not a tutorial. For learning to write this code from scratch, work through the exercises in the relevant chapters. For building on this code for your capstone projects, start with the working implementations here and extend them.
Suggested extension projects: 1. Combine the BiasAuditor and FairnessCalculator into a comprehensive algorithmic audit pipeline 2. Add the ModelCard and DataLineageTracker to create an end-to-end documentation system 3. Extend the GovernanceSimulator with your own governance model 4. Build a dashboard that combines the CarbonEstimator with the DataQualityAuditor to report on both data quality and environmental cost
All code in this appendix is released for educational use. Adapt it, extend it, and use it as a foundation for your own data ethics practice.