Capstone Project 02: Build a Regulatory Reporting Pipeline
Practitioner Role: Regulatory Technology Lead, Cornerstone Financial Group, reporting to the Head of Regulatory Affairs
Regulatory Obligation: MiFIR Article 26 transaction reporting; submission via Approved Reporting Mechanism (ARM) to the FCA
Source Systems: Three — equity trading (Oracle DB), fixed income (REST API/JSON), derivatives (CSV flat files)
Task: Design and implement a regulatory reporting pipeline that extracts, transforms, validates, and outputs MiFIR-compliant transaction reports
Scenario Context
Cornerstone Financial Group's MiFIR transaction reporting has been a persistent headache. The equity system has been producing raw transaction records since MiFID II's implementation in January 2018, but they have never been automatically transformed into RTS 22 format. The fixed income desk switched to a new REST API-based system eighteen months ago, and the derivatives system has used the same flat-file format for six years. Three systems, three data models, three sets of data quality problems, and a single daily submission deadline.
The Head of Regulatory Affairs, Wei Zhang, has been living with manual reconciliation for too long. Two weeks ago, the FCA's Market Oversight team sent a data quality letter noting inconsistencies in Cornerstone's LEI population rates across report types. Wei has tasked the RegTech lead with building a proper pipeline by end of quarter. The clock is running.
Part A: Regulatory Requirements
A.1 MiFIR Article 26 — Core Requirements
Article 26 of the Markets in Financial Instruments Regulation (MiFIR, Regulation (EU) No 600/2014, as retained and amended in UK law) requires investment firms to report complete and accurate details of transactions in financial instruments to their competent authority as quickly as possible, and no later than the close of the following working day.
The report must cover any transaction in a financial instrument admitted to trading or traded on a trading venue, or for which a request for admission to trading has been made, where the firm is executing on its own account, executing orders on behalf of clients, or transmitting orders for execution elsewhere.
Key fields under RTS 22 (Commission Delegated Regulation (EU) 2017/590, as retained in UK law) cover: - The transaction itself (identification, timestamp, venue, instrument) - The parties (buyer and seller, identified by LEI where they are legal entities) - The financial terms (quantity, price, currency, net amount) - The report metadata (status — new, cancel, or amendment)
The simplified field subset used in this capstone is drawn from the full Annex 1 field list and is sufficient to demonstrate all key data engineering challenges.
A.2 Data Quality Requirements
MiFIR reporting is subject to three dimensions of quality, all of which carry regulatory consequence:
Completeness: All required fields must be populated. Conditional fields (buyer LEI, seller LEI) must be populated whenever the condition is met — a legal entity counterparty. The FCA's data quality framework flags reports where LEI population rates fall below expected levels, which is precisely the issue that triggered Wei's letter from Market Oversight.
Accuracy: Field values must conform to the specified formats and represent the actual transaction. An ISIN that fails the check digit validation is not merely a formatting error — it may indicate that the wrong instrument has been reported. A net amount that is materially inconsistent with price multiplied by quantity suggests a data mapping error in the transformation layer.
Timeliness: Reports must be submitted by close of business on the day following execution (T+1). A firm that misses the submission window is in breach even if the data is accurate. The pipeline must therefore complete its extract, transform, validate, and load cycle with sufficient margin before the ARM's submission cut-off.
A.3 Consequences of Late, Incomplete, or Inaccurate Reports
FCA regulatory action: The FCA can impose unlimited financial penalties for persistent transaction reporting failures. Systemic data quality failures — not isolated errors but structural problems in the reporting pipeline — are treated as evidence of inadequate governance.
Market manipulation detection failure: MiFIR transaction reports are the FCA's primary tool for detecting market abuse. Reports that are late, inaccurate, or incomplete undermine the regulator's ability to perform market surveillance. Firms are not merely filing administrative returns — they are contributing to the integrity of UK markets.
Reputational and commercial consequences: A data quality letter from FCA Market Oversight (as Cornerstone has received) is an early warning signal. Firms that do not address data quality issues promptly face escalation to formal supervisory action, which becomes public.
ARM penalties: The Approved Reporting Mechanism through which Cornerstone submits may impose rejection fees or restrict access for persistent data quality failures, creating an operational risk alongside the regulatory one.
Part B: Pipeline Architecture Design
B.1 Four Pipeline Stages
Stage 1 — Extract: Retrieve raw transaction data from all three source systems for the reporting day. The equity system is accessed by direct database query (Oracle JDBC connection); the fixed income system is called via REST API with authentication; the derivatives system produces a flat file that is deposited in a shared location by end of trading. The extract stage normalises nothing — it retrieves data in native format and passes it downstream.
Stage 2 — Transform: Convert each source system's native format into the canonical MiFIRTransaction data model. This is the most complex stage: each source system uses different field names, different date/time formats, different representations of instrument type, and different approaches to counterparty identification. The transform stage also derives fields where possible — for example, calculating net_amount from quantity and price where the source system does not directly provide it.
Stage 3 — Validate: Apply regulatory validation rules to every transformed record. Validation is not the same as transformation — it does not fix errors, it identifies them. Every validation failure is recorded as a ValidationError with a field reference, an error code, a message, and the transaction identifier. Valid records proceed to output; invalid records are quarantined and routed to the data quality workflow.
Stage 4 — Load/Output: Generate the submission file in the format required by Cornerstone's ARM (CSV with specific column ordering and header conventions). Generate the data quality report for the compliance and operations teams. Log the pipeline run result for audit purposes.
B.2 Data Flow Diagram
MIFIR REPORTING PIPELINE — DATA FLOW
=======================================================
SOURCE SYSTEMS EXTRACT STAGE
------------------ ----------------------
[Equity Trading from_equity_system()
Oracle DB] --raw-dict--> maps Oracle rows to
200 fields list[dict]
~500 txns/day |
v
[Fixed Income --raw-json--> from_fixed_income_api()
REST API maps JSON to
JSON format list[dict]
~150 txns/day |
v
[Derivatives --raw-csv---> from_derivatives_csv()
Flat File maps CSV lines to
CSV format list[dict]
~200 txns/day |
v
TRANSFORM STAGE
----------------------
Canonical MiFIRTransaction
objects created from each
source's dict representation
Field mapping applied
Derived fields calculated
Type coercions performed
|
v
VALIDATE STAGE
----------------------
MiFIRTransactionValidator
validates each record
ValidationErrors collected
|
_______________|_______________
| |
v v
[VALID RECORDS] [INVALID RECORDS]
submission_ready quarantine queue
| |
v v
LOAD/OUTPUT STAGE DATA QUALITY WORKFLOW
---------------------- ----------------------
generate_submission_file() generate_quality_report()
| Compliance team review
v Manual correction or
ARM submission CSV escalation to source
| system owners
v
ARM submission
(T+1 deadline)
=======================================================
B.3 Key Design Choices
Canonical data model as the centrepiece: Rather than transforming each source system's format directly into the output format, the pipeline routes everything through a canonical MiFIRTransaction object. This means adding a fourth source system in the future requires only a new extractor method — the validator and output generator require no changes.
Validate, don't fix: The pipeline quarantines invalid records rather than attempting automated correction. Automated correction of regulatory data is a governance risk — it may silently introduce errors. Human review of data quality failures ensures that corrections are deliberate and documented.
Fail loudly on schema errors, fail softly on validation errors: If a source system produces data in an unexpected format (e.g., the equity system adds a new column or changes a field name), the pipeline should raise an exception and halt rather than silently mis-mapping fields. By contrast, records that fail validation rules should not halt the pipeline — valid records from the same run should still be submitted.
Part C: Python Implementation
"""
mifir_pipeline.py — MiFIR Article 26 Transaction Reporting Pipeline
Cornerstone Financial Group — Regulatory Technology
This module implements the complete Extract-Transform-Validate-Output pipeline
for MiFIR transaction reports. It handles three source system formats and
produces an ARM-ready submission file.
Regulatory basis:
- MiFIR Article 26 (as retained in UK law)
- RTS 22 Annex 1 (simplified 12-field subset)
- FCA Transaction Reporting User Pack (TRUP)
"""
import csv
import io
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
class InstrumentType(Enum):
EQUITY = "EQTY"
BOND = "BOND"
DERIVATIVE = "DERIV"
class ReportStatus(Enum):
NEW = "NEW"
CANCEL = "CANC"
AMEND = "AMND"
# ---------------------------------------------------------------------------
# Data Models
# ---------------------------------------------------------------------------
@dataclass
class MiFIRTransaction:
"""
Canonical MiFIR transaction report record.
Field definitions follow RTS 22 Annex 1 (simplified 12-field subset).
All fields must be populated except buyer_id and seller_id, which are
conditional on the counterparty being a legal entity.
"""
transaction_id: str
execution_timestamp: datetime
trading_venue: str # 4-character MIC code (ISO 10383)
instrument_id: str # ISIN (ISO 6166) — 12 characters
instrument_type: InstrumentType
quantity: float
price: float
price_currency: str # ISO 4217 — 3 characters
net_amount: float
report_status: ReportStatus
buyer_id: Optional[str] = None # LEI — 20 characters alphanumeric
seller_id: Optional[str] = None # LEI — 20 characters alphanumeric
source_system: Optional[str] = None # Internal tracking — not submitted
@dataclass
class ValidationError:
"""Records a single validation failure for a transaction field."""
field: str
error_code: str
message: str
transaction_id: str
# ---------------------------------------------------------------------------
# Validator
# ---------------------------------------------------------------------------
# ISO 4217 active currency codes (abbreviated subset for this implementation)
VALID_CURRENCIES = {
"GBP", "USD", "EUR", "JPY", "CHF", "AUD", "CAD", "HKD", "SEK",
"NOK", "DKK", "NZD", "SGD", "ZAR", "MXN", "BRL", "CNY", "INR",
"KRW", "TRY", "RUB", "PLN", "CZK", "HUF", "ILS", "AED", "SAR",
}
class MiFIRTransactionValidator:
"""
Validates MiFIRTransaction records against RTS 22 regulatory rules.
Validation rules implemented:
- V001: Required fields not null
- V002: ISIN format (2-letter country + 9 alphanumeric + 1 check digit structure)
- V003: LEI format (20 alphanumeric characters)
- V004: MIC code format (exactly 4 uppercase alphabetic characters)
- V005: Currency code valid (ISO 4217 3-letter code)
- V006: Quantity must be positive
- V007: Price must be positive
- V008: Net amount must be within 1% of quantity * price
- V009: Execution timestamp must be timezone-aware
- V010: Transaction ID must not be empty or whitespace-only
"""
NET_AMOUNT_TOLERANCE = 0.01 # 1% tolerance
def validate(self, txn: MiFIRTransaction) -> list[ValidationError]:
"""
Validate a single MiFIRTransaction.
Returns a list of ValidationError objects; an empty list indicates
that the transaction passed all validation rules.
"""
errors: list[ValidationError] = []
# V001 — Required fields not null
required_fields = [
("transaction_id", txn.transaction_id),
("execution_timestamp", txn.execution_timestamp),
("trading_venue", txn.trading_venue),
("instrument_id", txn.instrument_id),
("instrument_type", txn.instrument_type),
("quantity", txn.quantity),
("price", txn.price),
("price_currency", txn.price_currency),
("net_amount", txn.net_amount),
("report_status", txn.report_status),
]
for fname, fval in required_fields:
if fval is None:
errors.append(ValidationError(
field=fname,
error_code="V001",
message=f"Required field '{fname}' is null.",
transaction_id=txn.transaction_id or "UNKNOWN",
))
# V010 — Transaction ID not blank
if txn.transaction_id is not None and not txn.transaction_id.strip():
errors.append(ValidationError(
field="transaction_id",
error_code="V010",
message="transaction_id must not be empty or whitespace.",
transaction_id="[BLANK]",
))
# V002 — ISIN format: 2 alpha + 9 alphanumeric + 1 check digit
# Full Luhn-style check digit validation is omitted here in favour of
# structural validation, which catches the most common mapping errors.
if txn.instrument_id is not None:
isin_pattern = re.compile(r'^[A-Z]{2}[A-Z0-9]{9}[0-9]$')
if not isin_pattern.match(txn.instrument_id):
errors.append(ValidationError(
field="instrument_id",
error_code="V002",
message=(
f"instrument_id '{txn.instrument_id}' does not conform to "
"ISIN format (2 alpha country code + 9 alphanumeric + 1 numeric "
"check digit = 12 characters total)."
),
transaction_id=txn.transaction_id or "UNKNOWN",
))
# V003 — LEI format: 20 alphanumeric characters
lei_pattern = re.compile(r'^[A-Z0-9]{20}$')
for lei_field, lei_value in [("buyer_id", txn.buyer_id), ("seller_id", txn.seller_id)]:
if lei_value is not None:
if not lei_pattern.match(lei_value):
errors.append(ValidationError(
field=lei_field,
error_code="V003",
message=(
f"{lei_field} '{lei_value}' does not conform to LEI format "
"(20 uppercase alphanumeric characters)."
),
transaction_id=txn.transaction_id or "UNKNOWN",
))
# V004 — MIC code: exactly 4 uppercase alphabetic characters
if txn.trading_venue is not None:
mic_pattern = re.compile(r'^[A-Z]{4}$')
if not mic_pattern.match(txn.trading_venue):
errors.append(ValidationError(
field="trading_venue",
error_code="V004",
message=(
f"trading_venue '{txn.trading_venue}' does not conform to "
"MIC code format (4 uppercase alphabetic characters)."
),
transaction_id=txn.transaction_id or "UNKNOWN",
))
# V005 — Currency code valid
if txn.price_currency is not None:
if txn.price_currency.upper() not in VALID_CURRENCIES:
errors.append(ValidationError(
field="price_currency",
error_code="V005",
message=(
f"price_currency '{txn.price_currency}' is not a recognised "
"ISO 4217 active currency code."
),
transaction_id=txn.transaction_id or "UNKNOWN",
))
# V006 — Quantity > 0
if txn.quantity is not None and txn.quantity <= 0:
errors.append(ValidationError(
field="quantity",
error_code="V006",
message=f"quantity must be positive; got {txn.quantity}.",
transaction_id=txn.transaction_id or "UNKNOWN",
))
# V007 — Price > 0
if txn.price is not None and txn.price <= 0:
errors.append(ValidationError(
field="price",
error_code="V007",
message=f"price must be positive; got {txn.price}.",
transaction_id=txn.transaction_id or "UNKNOWN",
))
# V008 — Net amount within 1% of quantity * price
if (
txn.quantity is not None and txn.quantity > 0
and txn.price is not None and txn.price > 0
and txn.net_amount is not None
):
expected = txn.quantity * txn.price
if expected != 0:
deviation = abs(txn.net_amount - expected) / expected
if deviation > self.NET_AMOUNT_TOLERANCE:
errors.append(ValidationError(
field="net_amount",
error_code="V008",
message=(
f"net_amount {txn.net_amount:.4f} deviates from "
f"quantity * price ({expected:.4f}) by "
f"{deviation * 100:.2f}%, exceeding 1% tolerance. "
"Check for currency conversion or accrued interest mapping."
),
transaction_id=txn.transaction_id or "UNKNOWN",
))
# V009 — Execution timestamp must be timezone-aware
if txn.execution_timestamp is not None:
if txn.execution_timestamp.tzinfo is None:
errors.append(ValidationError(
field="execution_timestamp",
error_code="V009",
message=(
"execution_timestamp is timezone-naive. RTS 22 requires "
"timestamps to be in UTC (ISO 8601 with Z suffix)."
),
transaction_id=txn.transaction_id or "UNKNOWN",
))
return errors
def validate_batch(self, transactions: list[MiFIRTransaction]) -> dict:
"""
Validate a list of MiFIRTransaction objects.
Returns a summary dictionary:
{
'total': int,
'valid': int,
'invalid': int,
'error_rate': float, # proportion of invalid records
'errors_by_code': dict[str, int], # count by error code
'errors_by_field': dict[str, int],# count by field name
'all_errors': list[ValidationError],
'valid_transactions': list[MiFIRTransaction],
'invalid_transactions': list[MiFIRTransaction],
}
"""
all_errors: list[ValidationError] = []
valid_txns: list[MiFIRTransaction] = []
invalid_txns: list[MiFIRTransaction] = []
for txn in transactions:
txn_errors = self.validate(txn)
all_errors.extend(txn_errors)
if txn_errors:
invalid_txns.append(txn)
else:
valid_txns.append(txn)
errors_by_code: dict[str, int] = {}
errors_by_field: dict[str, int] = {}
for err in all_errors:
errors_by_code[err.error_code] = errors_by_code.get(err.error_code, 0) + 1
errors_by_field[err.field] = errors_by_field.get(err.field, 0) + 1
total = len(transactions)
invalid = len(invalid_txns)
return {
"total": total,
"valid": len(valid_txns),
"invalid": invalid,
"error_rate": invalid / total if total > 0 else 0.0,
"errors_by_code": errors_by_code,
"errors_by_field": errors_by_field,
"all_errors": all_errors,
"valid_transactions": valid_txns,
"invalid_transactions": invalid_txns,
}
# ---------------------------------------------------------------------------
# Source System Extractor
# ---------------------------------------------------------------------------
class SourceSystemExtractor:
"""
Extracts transaction data from three heterogeneous source systems and
transforms each into the canonical MiFIRTransaction format.
Field mapping notes:
EQUITY SYSTEM (Oracle DB, dict keys use Oracle column name conventions):
EQ_TXN_ID → transaction_id
EQ_EXEC_DT → execution_timestamp (format: "YYYY-MM-DD HH:MM:SS", UTC assumed)
EQ_VENUE_CD → trading_venue (already MIC code)
EQ_ISIN → instrument_id
EQ_QTY → quantity
EQ_PRC → price
EQ_CCY → price_currency
EQ_NET_AMT → net_amount
EQ_BUYER_LEI → buyer_id
EQ_SELLER_LEI → seller_id
EQ_RPT_STS → report_status ("N"→NEW, "C"→CANC, "A"→AMND)
(instrument type is always EQUITY for this system)
FIXED INCOME API (JSON, camelCase keys):
transactionId → transaction_id
executionTime → execution_timestamp (ISO 8601 with offset)
mic → trading_venue
isin → instrument_id
nominalQuantity → quantity
cleanPrice → price
currency → price_currency
consideration → net_amount (this is the settlement amount; used as net_amount)
buyerLei → buyer_id
sellerLei → seller_id
amendFlag → derives report_status ("true" → AMEND; else NEW)
(instrument type is always BOND for this system)
DERIVATIVES SYSTEM (CSV, positional columns):
Column 0: txn_ref → transaction_id
Column 1: exec_ts → execution_timestamp (format: "DD/MM/YYYY HH:MM:SS UTC")
Column 2: venue → trading_venue
Column 3: isin → instrument_id
Column 4: qty → quantity
Column 5: strike_price → price (used as the reported price field)
Column 6: ccy → price_currency
Column 7: notional → net_amount
Column 8: buyer → buyer_id (blank if retail/individual)
Column 9: seller → seller_id (blank if retail/individual)
Column 10: status → report_status ("NEW"/"CANC"/"AMND")
(instrument type is always DERIVATIVE for this system)
"""
# ------------------------------------------------------------------
# Equity system
# ------------------------------------------------------------------
def from_equity_system(self, raw_data: list[dict]) -> list[MiFIRTransaction]:
"""Convert equity trading system Oracle DB records to canonical form."""
transactions: list[MiFIRTransaction] = []
status_map = {"N": ReportStatus.NEW, "C": ReportStatus.CANCEL, "A": ReportStatus.AMEND}
for row in raw_data:
try:
# Parse timestamp — equity system stores in UTC without offset marker
ts_str = row.get("EQ_EXEC_DT", "")
try:
ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S").replace(
tzinfo=timezone.utc
)
except (ValueError, TypeError):
ts = None # Will fail V009 validation — intentional
# Map status code
status_code = row.get("EQ_RPT_STS", "N")
status = status_map.get(status_code, ReportStatus.NEW)
# LEI — empty string treated as None (no legal entity counterparty)
buyer_lei = row.get("EQ_BUYER_LEI") or None
seller_lei = row.get("EQ_SELLER_LEI") or None
txn = MiFIRTransaction(
transaction_id=str(row.get("EQ_TXN_ID", "")),
execution_timestamp=ts,
trading_venue=str(row.get("EQ_VENUE_CD", "")),
instrument_id=str(row.get("EQ_ISIN", "")),
instrument_type=InstrumentType.EQUITY,
quantity=float(row.get("EQ_QTY", 0)),
price=float(row.get("EQ_PRC", 0)),
price_currency=str(row.get("EQ_CCY", "")),
net_amount=float(row.get("EQ_NET_AMT", 0)),
report_status=status,
buyer_id=buyer_lei,
seller_id=seller_lei,
source_system="EQUITY",
)
transactions.append(txn)
except (KeyError, ValueError, TypeError) as exc:
# Log and skip — do not silently swallow; in production this
# would write to the pipeline error log
print(f"[EQUITY] Skipping row due to extraction error: {exc} | Row: {row}")
return transactions
# ------------------------------------------------------------------
# Fixed income API
# ------------------------------------------------------------------
def from_fixed_income_api(self, json_data: list[dict]) -> list[MiFIRTransaction]:
"""Convert fixed income REST API JSON response to canonical form."""
transactions: list[MiFIRTransaction] = []
for record in json_data:
try:
# Parse ISO 8601 timestamp — may include UTC offset
ts_str = record.get("executionTime", "")
try:
# Handle both 'Z' suffix and explicit offset
ts_str_norm = ts_str.replace("Z", "+00:00")
ts = datetime.fromisoformat(ts_str_norm)
except (ValueError, TypeError):
ts = None
# Derive report_status from amendFlag boolean
amend_flag = record.get("amendFlag", False)
if isinstance(amend_flag, str):
amend_flag = amend_flag.lower() == "true"
status = ReportStatus.AMEND if amend_flag else ReportStatus.NEW
buyer_lei = record.get("buyerLei") or None
seller_lei = record.get("sellerLei") or None
txn = MiFIRTransaction(
transaction_id=str(record.get("transactionId", "")),
execution_timestamp=ts,
trading_venue=str(record.get("mic", "")),
instrument_id=str(record.get("isin", "")),
instrument_type=InstrumentType.BOND,
quantity=float(record.get("nominalQuantity", 0)),
price=float(record.get("cleanPrice", 0)),
price_currency=str(record.get("currency", "")),
net_amount=float(record.get("consideration", 0)),
report_status=status,
buyer_id=buyer_lei,
seller_id=seller_lei,
source_system="FIXED_INCOME",
)
transactions.append(txn)
except (KeyError, ValueError, TypeError) as exc:
print(f"[FIXED_INCOME] Skipping record due to extraction error: {exc} | Record: {record}")
return transactions
# ------------------------------------------------------------------
# Derivatives CSV
# ------------------------------------------------------------------
def from_derivatives_csv(self, csv_lines: list[str]) -> list[MiFIRTransaction]:
"""Convert derivatives flat-file CSV lines to canonical form."""
transactions: list[MiFIRTransaction] = []
status_map = {
"NEW": ReportStatus.NEW,
"CANC": ReportStatus.CANCEL,
"AMND": ReportStatus.AMEND,
}
reader = csv.reader(csv_lines)
# Skip header row
try:
header = next(reader)
except StopIteration:
return transactions
for line_num, row in enumerate(reader, start=2):
if len(row) < 11:
print(f"[DERIVATIVES] Skipping line {line_num}: insufficient columns ({len(row)} found, 11 required)")
continue
try:
# Parse timestamp — derivatives system uses DD/MM/YYYY HH:MM:SS UTC
ts_str = row[1].strip()
try:
ts = datetime.strptime(ts_str, "%d/%m/%Y %H:%M:%S UTC").replace(
tzinfo=timezone.utc
)
except ValueError:
ts = None
status_str = row[10].strip().upper()
status = status_map.get(status_str, ReportStatus.NEW)
buyer_lei = row[8].strip() or None
seller_lei = row[9].strip() or None
txn = MiFIRTransaction(
transaction_id=row[0].strip(),
execution_timestamp=ts,
trading_venue=row[2].strip(),
instrument_id=row[3].strip(),
instrument_type=InstrumentType.DERIVATIVE,
quantity=float(row[4].strip()),
price=float(row[5].strip()),
price_currency=row[6].strip(),
net_amount=float(row[7].strip()),
report_status=status,
buyer_id=buyer_lei,
seller_id=seller_lei,
source_system="DERIVATIVES",
)
transactions.append(txn)
except (IndexError, ValueError, TypeError) as exc:
print(f"[DERIVATIVES] Skipping line {line_num} due to extraction error: {exc}")
return transactions
# ---------------------------------------------------------------------------
# Pipeline Orchestrator
# ---------------------------------------------------------------------------
class ReportingPipelineOrchestrator:
"""
Coordinates the full Extract-Transform-Validate-Output pipeline for
MiFIR Article 26 transaction reports.
Usage:
orchestrator = ReportingPipelineOrchestrator()
result = orchestrator.run(
equity_data=equity_rows,
fi_json_data=fi_records,
deriv_csv=csv_lines,
report_date=datetime(2024, 11, 14, tzinfo=timezone.utc),
)
submission_csv = orchestrator.generate_submission_file(result["submission_ready"])
quality_report = orchestrator.generate_quality_report(result)
"""
def __init__(self):
self.extractor = SourceSystemExtractor()
self.validator = MiFIRTransactionValidator()
def run(
self,
equity_data: list[dict],
fi_json_data: list[dict],
deriv_csv: list[str],
report_date: datetime,
) -> dict:
"""
Run the full pipeline.
Returns:
{
'report_date': datetime,
'transactions': list[MiFIRTransaction], # all extracted records
'validation_errors': list[ValidationError],
'summary': {
'total': int,
'valid': int,
'invalid': int,
'error_rate': float,
'by_source': {
'EQUITY': {'total': int, 'valid': int, 'invalid': int},
'FIXED_INCOME': {'total': int, 'valid': int, 'invalid': int},
'DERIVATIVES': {'total': int, 'valid': int, 'invalid': int},
},
'errors_by_code': dict[str, int],
'errors_by_field': dict[str, int],
},
'submission_ready': list[MiFIRTransaction], # valid records only
}
"""
print(f"[PIPELINE] Starting run for report date: {report_date.date()}")
# -- EXTRACT -------------------------------------------------------
print("[PIPELINE] Stage 1: Extract")
equity_txns = self.extractor.from_equity_system(equity_data)
fi_txns = self.extractor.from_fixed_income_api(fi_json_data)
deriv_txns = self.extractor.from_derivatives_csv(deriv_csv)
print(f" Equity: {len(equity_txns)} records extracted")
print(f" Fixed Income: {len(fi_txns)} records extracted")
print(f" Derivatives: {len(deriv_txns)} records extracted")
all_transactions = equity_txns + fi_txns + deriv_txns
print(f" Total: {len(all_transactions)} records")
# -- VALIDATE ------------------------------------------------------
print("[PIPELINE] Stage 2: Validate")
batch_result = self.validator.validate_batch(all_transactions)
print(f" Valid: {batch_result['valid']} records")
print(f" Invalid: {batch_result['invalid']} records")
print(f" Error rate: {batch_result['error_rate'] * 100:.1f}%")
# -- SUMMARISE BY SOURCE -------------------------------------------
def source_stats(source_name: str, txns: list[MiFIRTransaction]) -> dict:
valid_ids = {t.transaction_id for t in batch_result["valid_transactions"]}
valid_count = sum(1 for t in txns if t.transaction_id in valid_ids)
return {
"total": len(txns),
"valid": valid_count,
"invalid": len(txns) - valid_count,
}
summary = {
"total": batch_result["total"],
"valid": batch_result["valid"],
"invalid": batch_result["invalid"],
"error_rate": batch_result["error_rate"],
"by_source": {
"EQUITY": source_stats("EQUITY", equity_txns),
"FIXED_INCOME": source_stats("FIXED_INCOME", fi_txns),
"DERIVATIVES": source_stats("DERIVATIVES", deriv_txns),
},
"errors_by_code": batch_result["errors_by_code"],
"errors_by_field": batch_result["errors_by_field"],
}
return {
"report_date": report_date,
"transactions": all_transactions,
"validation_errors": batch_result["all_errors"],
"summary": summary,
"submission_ready": batch_result["valid_transactions"],
}
def generate_submission_file(self, transactions: list[MiFIRTransaction]) -> str:
"""
Generate CSV output in ARM submission format.
Column order follows the simplified RTS 22 Annex 1 field sequence.
The output string can be written directly to the ARM's file submission endpoint.
"""
output = io.StringIO()
writer = csv.writer(output, lineterminator="\r\n")
# Header row — field names as per ARM specification
writer.writerow([
"TRANSACTION_ID",
"EXECUTION_TIMESTAMP",
"TRADING_VENUE",
"INSTRUMENT_ID",
"INSTRUMENT_TYPE",
"BUYER_ID",
"SELLER_ID",
"QUANTITY",
"PRICE",
"PRICE_CURRENCY",
"NET_AMOUNT",
"REPORT_STATUS",
])
for txn in transactions:
# Format timestamp as ISO 8601 UTC (Z suffix)
ts_str = (
txn.execution_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
if txn.execution_timestamp
else ""
)
writer.writerow([
txn.transaction_id,
ts_str,
txn.trading_venue,
txn.instrument_id,
txn.instrument_type.value if txn.instrument_type else "",
txn.buyer_id or "",
txn.seller_id or "",
f"{txn.quantity:.6f}",
f"{txn.price:.6f}",
txn.price_currency,
f"{txn.net_amount:.2f}",
txn.report_status.value if txn.report_status else "",
])
return output.getvalue()
def generate_quality_report(self, run_result: dict) -> str:
"""
Generate a human-readable data quality report for the compliance and
operations teams.
This report is retained as part of the audit trail for the submission.
"""
s = run_result["summary"]
report_date = run_result["report_date"]
lines = [
"=" * 65,
"MIFIR TRANSACTION REPORTING — DATA QUALITY REPORT",
f"Report Date: {report_date.date()}",
f"Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}",
"=" * 65,
"",
"PIPELINE SUMMARY",
"-" * 40,
f"Total records extracted: {s['total']:>6}",
f"Valid (submission-ready): {s['valid']:>6}",
f"Invalid (quarantined): {s['invalid']:>6}",
f"Overall error rate: {s['error_rate'] * 100:>5.1f}%",
"",
"RESULTS BY SOURCE SYSTEM",
"-" * 40,
]
for source, stats in s["by_source"].items():
error_rate = (
stats["invalid"] / stats["total"] * 100 if stats["total"] > 0 else 0.0
)
lines.append(
f" {source:<20} Total: {stats['total']:>4} "
f"Valid: {stats['valid']:>4} "
f"Invalid: {stats['invalid']:>4} "
f"Error rate: {error_rate:>5.1f}%"
)
if s["errors_by_code"]:
lines += [
"",
"VALIDATION ERRORS BY ERROR CODE",
"-" * 40,
]
code_descriptions = {
"V001": "Required field null",
"V002": "ISIN format invalid",
"V003": "LEI format invalid",
"V004": "MIC code format invalid",
"V005": "Currency code not recognised",
"V006": "Quantity not positive",
"V007": "Price not positive",
"V008": "Net amount deviation > 1%",
"V009": "Timestamp timezone-naive",
"V010": "Transaction ID blank",
}
for code, count in sorted(s["errors_by_code"].items()):
desc = code_descriptions.get(code, "Unknown error")
lines.append(f" {code} ({desc:<35}) {count:>4} occurrences")
if s["errors_by_field"]:
lines += [
"",
"VALIDATION ERRORS BY FIELD",
"-" * 40,
]
for field_name, count in sorted(
s["errors_by_field"].items(), key=lambda x: -x[1]
):
lines.append(f" {field_name:<25} {count:>4} occurrences")
# Recommendation
lines += [
"",
"RECOMMENDED ACTIONS",
"-" * 40,
]
if s["invalid"] == 0:
lines.append(" All records passed validation. Submission file is ready.")
else:
lines.append(
f" {s['invalid']} records are quarantined and require manual review."
)
if "V003" in s["errors_by_code"]:
lines.append(
" Action: LEI validation failures (V003) suggest counterparty data "
"quality issues in source systems. Escalate to Front Office Data team."
)
if "V002" in s["errors_by_code"]:
lines.append(
" Action: ISIN format failures (V002) suggest instrument reference "
"data mapping errors. Escalate to Instrument Reference Data team."
)
if "V008" in s["errors_by_code"]:
lines.append(
" Action: Net amount deviation failures (V008) may indicate "
"currency conversion or accrued interest being included in the "
"consideration field. Review mapping logic for affected source system."
)
lines.append(
" Quarantined records will NOT be included in today's ARM submission."
)
lines.append(
" Manual corrections should be resubmitted as AMND reports once resolved."
)
lines += [
"",
"=" * 65,
"END OF DATA QUALITY REPORT",
"=" * 65,
]
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Sample Data
# ---------------------------------------------------------------------------
# Equity system — Oracle DB rows as list of dicts (2 valid, 1 with data quality issue)
SAMPLE_EQUITY_DATA = [
{
"EQ_TXN_ID": "EQ-2024-001",
"EQ_EXEC_DT": "2024-11-14 09:15:32",
"EQ_VENUE_CD": "XLON",
"EQ_ISIN": "GB0002634946", # Valid ISIN (Barclays, illustrative)
"EQ_QTY": 10000,
"EQ_PRC": 1.8550,
"EQ_CCY": "GBP",
"EQ_NET_AMT": 18550.00,
"EQ_BUYER_LEI": "213800MBWEIJDM5CU638", # Valid LEI format
"EQ_SELLER_LEI": "XKZZ2JZF41MRHTR1V493", # Valid LEI format
"EQ_RPT_STS": "N",
},
{
"EQ_TXN_ID": "EQ-2024-002",
"EQ_EXEC_DT": "2024-11-14 10:44:07",
"EQ_VENUE_CD": "XLON",
"EQ_ISIN": "GB0031348658", # Valid ISIN (Lloyds, illustrative)
"EQ_QTY": 25000,
"EQ_PRC": 0.4420,
"EQ_CCY": "GBP",
"EQ_NET_AMT": 11050.00,
"EQ_BUYER_LEI": "", # Individual counterparty — blank is valid
"EQ_SELLER_LEI": "XKZZ2JZF41MRHTR1V493",
"EQ_RPT_STS": "N",
},
{
"EQ_TXN_ID": "EQ-2024-003",
"EQ_EXEC_DT": "2024-11-14 11:22:45",
"EQ_VENUE_CD": "XLON",
"EQ_ISIN": "INVALID-ISIN", # DATA QUALITY ISSUE — will fail V002
"EQ_QTY": 5000,
"EQ_PRC": 2.1100,
"EQ_CCY": "GBP",
"EQ_NET_AMT": 10550.00,
"EQ_BUYER_LEI": "213800MBWEIJDM5CU638",
"EQ_SELLER_LEI": "",
"EQ_RPT_STS": "N",
},
]
# Fixed income API — JSON records (2 valid, 1 with LEI issue)
SAMPLE_FI_DATA = [
{
"transactionId": "FI-2024-001",
"executionTime": "2024-11-14T09:30:00Z",
"mic": "XLON",
"isin": "GB00B84ZW083", # Valid ISIN (UK Gilt, illustrative)
"nominalQuantity": 500000,
"cleanPrice": 98.75,
"currency": "GBP",
"consideration": 493750.00,
"buyerLei": "213800MBWEIJDM5CU638",
"sellerLei": "XKZZ2JZF41MRHTR1V493",
"amendFlag": False,
},
{
"transactionId": "FI-2024-002",
"executionTime": "2024-11-14T11:05:22Z",
"mic": "XOFF", # Off-venue (SI)
"isin": "GB00B84ZW083",
"nominalQuantity": 1000000,
"cleanPrice": 98.60,
"currency": "GBP",
"consideration": 986000.00,
"buyerLei": "XKZZ2JZF41MRHTR1V493",
"sellerLei": "213800MBWEIJDM5CU638",
"amendFlag": True,
},
{
"transactionId": "FI-2024-003",
"executionTime": "2024-11-14T14:20:00Z",
"mic": "XLON",
"isin": "US912810TD00", # Valid ISIN (US Treasury, illustrative)
"nominalQuantity": 250000,
"cleanPrice": 99.10,
"currency": "USD",
"consideration": 247750.00,
"buyerLei": "SHORTLEI", # DATA QUALITY ISSUE — will fail V003
"sellerLei": "213800MBWEIJDM5CU638",
"amendFlag": False,
},
]
# Derivatives CSV — flat file lines including header
SAMPLE_DERIV_CSV = [
"txn_ref,exec_ts,venue,isin,qty,strike_price,ccy,notional,buyer,seller,status",
"DV-2024-001,14/11/2024 08:45:00 UTC,XEUR,DE000C21EMZ1,100,105.50,EUR,10550.00,213800MBWEIJDM5CU638,XKZZ2JZF41MRHTR1V493,NEW",
"DV-2024-002,14/11/2024 09:15:00 UTC,XEUR,DE000C21EMZ1,50,104.75,EUR,5237.50,XKZZ2JZF41MRHTR1V493,,NEW",
# Data quality issue: net_amount is materially different from qty * price
"DV-2024-003,14/11/2024 10:30:00 UTC,XEUR,DE000C21EMZ1,200,110.00,EUR,5000.00,,213800MBWEIJDM5CU638,NEW",
]
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
"""
Demonstrates the complete MiFIR reporting pipeline using sample data
from all three source systems.
"""
from datetime import datetime, timezone
report_date = datetime(2024, 11, 14, tzinfo=timezone.utc)
print("\n" + "=" * 65)
print("CORNERSTONE FINANCIAL GROUP")
print("MiFIR Article 26 Transaction Reporting Pipeline")
print(f"Report Date: {report_date.date()}")
print("=" * 65 + "\n")
orchestrator = ReportingPipelineOrchestrator()
# Run the pipeline
result = orchestrator.run(
equity_data=SAMPLE_EQUITY_DATA,
fi_json_data=SAMPLE_FI_DATA,
deriv_csv=SAMPLE_DERIV_CSV,
report_date=report_date,
)
# Generate outputs
submission_csv = orchestrator.generate_submission_file(result["submission_ready"])
quality_report = orchestrator.generate_quality_report(result)
print("\n" + quality_report)
print("\n--- SUBMISSION FILE (first 5 lines) ---")
for i, line in enumerate(submission_csv.splitlines()[:6]):
print(line)
print(f"\n[PIPELINE] Complete.")
print(f" Submission-ready records: {len(result['submission_ready'])}")
print(f" Quarantined records: {result['summary']['invalid']}")
print(f" Validation errors logged: {len(result['validation_errors'])}")
return result
if __name__ == "__main__":
main()
Part D: Governance Documentation
D.1 Pipeline Architecture Description (For Technical Auditors)
The MiFIR transaction reporting pipeline is a Python-based ETL system that runs daily, initiated by a scheduled task triggered at 18:00 UK time (T+0, the day of execution). The submission file must reach the ARM's ingestion endpoint by 22:00 UK time to meet the T+1 regulatory deadline with adequate margin.
Data sources: Three source systems contribute transaction data. The equity trading system (Oracle DB) is accessed by a read-only service account via JDBC connection; no writes are made to the source database. The fixed income system is accessed via REST API using an API key stored in the secrets manager (not in source code or configuration files). The derivatives system produces a flat file written to a network share; the pipeline reads from a designated input directory and archives processed files to a separate directory.
Canonical model: All source records are transformed into the MiFIRTransaction dataclass before any validation or output processing. This design ensures that validation rules are source-system-agnostic — a change to one source system's format requires changes only in the corresponding extractor method.
Record segregation: Records that fail validation are quarantined — they are not included in the submission file. They are written to a quarantine log (JSON format) for manual review by the data quality team. Quarantined records are tracked to resolution: either corrected and resubmitted as AMND reports or confirmed as not reportable with documented rationale.
Audit trail: Every pipeline run produces three artefacts that are retained for five years: the raw submission file, the data quality report, and the quarantine log. These are written to immutable object storage (versioned S3 bucket) immediately upon generation. The pipeline run itself is logged to a structured log (including record counts, error rates, and submission timestamp) that feeds into the regulatory operations dashboard.
D.2 Validation Rules Reference (For Compliance Review)
| Rule Code | Field | Rule Description | Regulatory Basis |
|---|---|---|---|
| V001 | All required fields | Required fields must not be null | RTS 22 Annex 1 mandatory field designation |
| V002 | instrument_id | ISIN must conform to ISO 6166 structural format: 2-char alpha country code + 9 alphanumeric chars + 1 numeric check digit | RTS 22 Annex 1, Table 2 Field 2 |
| V003 | buyer_id, seller_id | LEI must be 20 uppercase alphanumeric characters conforming to ISO 17442 | RTS 22 Annex 1, Table 2 Fields 10–11; FCA TRUP guidance on LEI population |
| V004 | trading_venue | MIC code must be 4 uppercase alphabetic characters per ISO 10383 | RTS 22 Annex 1, Table 2 Field 36 |
| V005 | price_currency | Currency code must be a recognised ISO 4217 active currency code | RTS 22 Annex 1, Table 2 |
| V006 | quantity | Quantity must be greater than zero | Commercial logic; zero-quantity reports indicate data mapping error |
| V007 | price | Price must be greater than zero | Commercial logic; zero-price reports indicate data mapping error |
| V008 | net_amount | Net amount must be within 1% of quantity × price | Internal data quality control; material deviations indicate currency conversion mapping or incorrect field selection |
| V009 | execution_timestamp | Timestamp must be UTC with timezone offset | RTS 22 Annex 1 requires UTC; timezone-naive timestamps are rejected by ARM |
| V010 | transaction_id | Transaction ID must not be blank or whitespace | Unique identification requirement; blank IDs indicate extraction failure |
D.3 Error Handling and Escalation Procedure
Level 1 — Automated handling: Validation errors that affect individual records. Records are quarantined; submission proceeds with valid records; data quality report is generated and sent to the data quality team inbox automatically.
Level 2 — Data quality team review (same day): The data quality team reviews the quarantine log and quality report by 20:00. For V003 LEI failures: contact the Front Office data team to obtain correct LEIs. For V002 ISIN failures: escalate to Instrument Reference Data. For V008 net amount deviations: review field mapping logic with the pipeline engineering team.
Level 3 — Compliance escalation (within 24 hours): If the error rate exceeds 10% for any source system, or if more than 20 records are quarantined in a single run, the Head of Regulatory Affairs is notified. The team must assess whether the errors indicate a systemic issue that requires reporting to the FCA.
Level 4 — Regulatory notification: If Cornerstone determines that a material number of transactions were not reported on time, or were reported inaccurately, the Regulatory Affairs team assesses whether voluntary disclosure to the FCA is appropriate. The firm's legal obligation to report transaction reporting failures is documented in the AML and Market Conduct Policy.
Pipeline failure (pipeline does not complete by 21:00): The engineering on-call is paged. The Regulatory Affairs Head is notified. A manual submission process (direct file upload to ARM portal) is initiated. If the deadline cannot be met, the FCA is notified by phone before close of business on T+1.
D.4 Testing Approach for Regulatory Changes
When RTS 22 is updated (for example, following FCA consultation on field changes), the following testing sequence applies:
- Rule mapping review: The Regulatory Affairs team maps each changed field or validation rule to the corresponding pipeline component (extractor, validator, output generator).
- Unit tests updated: Each validation rule has a corresponding unit test covering both valid and invalid inputs. New or changed rules require new test coverage before deployment.
- Sample data testing: The pipeline is run against a synthetic test dataset that includes all known edge cases (including the deliberate data quality issues in the sample data above). Expected outputs are pre-calculated.
- ARM acceptance testing: Before the production go-live of any changed field, the updated submission file is submitted to the ARM's test environment for format validation.
- Parallel run: For one week following deployment, the new pipeline runs in parallel with the previous version. Output differences are reviewed and reconciled before the previous version is decommissioned.
Part E: Deliverables and Assessment Rubric
Deliverables
-
Working Python pipeline: A complete, runnable implementation of the
MiFIRTransactionValidator,SourceSystemExtractor, andReportingPipelineOrchestratorclasses, including themain()function, with all methods fully implemented (not stubbed). -
Sample data and test output: Three sets of sample data covering at least two valid records and one deliberate data quality issue per source system. The
main()function should run against this data and produce both a submission file and a quality report. -
Validation rules documentation: A reference table covering all implemented validation rules, their regulatory basis, and an example of a valid and invalid value for each rule.
-
Pipeline architecture document: A written description of the pipeline architecture suitable for a technical auditor, covering data sources, transformation logic, error handling, and the audit trail.
-
Error handling and escalation procedure: A documented procedure describing what happens when validation errors occur, including escalation thresholds and the approach to regulatory notification.
Assessment Rubric
| Criterion | 1 — Insufficient | 2 — Developing | 3 — Competent | 4 — Proficient | 5 — Expert |
|---|---|---|---|---|---|
| Regulatory Accuracy | MiFIR requirements misunderstood or absent; validation rules do not reflect RTS 22 | Core field requirements identified; validation logic incomplete or incorrect for 2+ rules | All 10 validation rules implemented with correct regulatory basis; field formats accurate | Validation logic is correct and complete; edge cases handled (e.g., timezone-naive timestamps, blank LEIs for individuals) | Validation logic is correct, complete, and annotated with specific regulatory references; nuances (e.g., LEI optional for natural persons) explicitly handled |
| Code Correctness | Code does not run; fundamental errors in class structure or method signatures | Code runs but produces incorrect outputs for 2+ validation scenarios; significant bugs | Code runs and produces correct outputs for standard scenarios; minor bugs in edge cases | Code runs correctly for all sample data; edge cases handled; output formats correct | Code is robust, well-structured, Pythonic; no bugs in provided scenarios; defensive coding against unexpected input; type hints throughout |
| Architecture Coherence | No pipeline structure; extraction and validation mixed together | Basic pipeline stages present but transformation logic conflated with validation | Four stages clearly separated; canonical model used; extractor/validator/orchestrator separation maintained | Architecture is extensible (adding a new source system requires only a new extractor method); design choices explicitly justified | Architecture could be handed to an engineering team as a production specification; all design trade-offs acknowledged; production considerations (scheduling, secrets management, logging) addressed |
| Documentation Quality | No governance documentation produced | Basic description of what the code does; no regulatory context | Governance documentation covers architecture, validation rules, and error handling | Documentation is written for its intended audience (technical auditor, compliance reviewer); complete and accurate | Documentation is publication-ready; each section addresses the question its reader will actually ask; regulatory obligations traced throughout |
| Error Handling | No error handling; pipeline crashes on bad input | Try/except blocks present but exceptions silently swallowed; no error logging | Extraction errors logged and skipped; validation errors collected as ValidationError objects; quarantine logic implemented | All error scenarios produce useful diagnostic output; the quality report enables the data quality team to take action without reading the code | Error handling covers all realistic failure modes (schema changes, network errors, timezone edge cases); escalation thresholds built into orchestrator; audit trail complete |
This capstone draws on material from Part 3 (Regulatory Reporting), Part 1 (Data Architecture), and Part 4 (Trading Compliance). Students should review Chapters 13, 14, and 5 before beginning. The pipeline architecture here deliberately leaves several production-readiness gaps — secrets management, database connection pooling, retry logic for API calls — that students are invited to identify and address as extension work.