Case Study 2: MediCore — Orchestrating a Periodic Causal Analysis Pipeline
Context
MediCore Pharmaceuticals, introduced in the causal inference chapters (Chapters 15–19) and the Bayesian modeling chapter (Chapter 21), conducts periodic causal analyses of treatment effects across its clinical portfolio. The analyses feed regulatory submissions, safety monitoring reports, and internal decision-making on trial design.
Until now, the causal analysis pipeline has been a collection of R scripts and Jupyter notebooks maintained by three biostatisticians. Each monthly analysis follows the same steps:
- Extract patient-level data from the Electronic Data Capture (EDC) system
- Apply inclusion/exclusion criteria per the study protocol
- Compute baseline covariates and check covariate balance
- Estimate propensity scores (logistic regression, Chapter 18)
- Run the primary analysis: doubly robust estimation of the average treatment effect (ATE)
- Run sensitivity analyses: varying trimming thresholds, alternative estimators (IPW, AIPW), subgroup analyses
- Generate a regulatory-grade report (PDF with tables, figures, and statistical summaries)
- Archive the report, dataset, and code snapshot for regulatory retention (minimum 15 years)
The pipeline is run monthly for 8 active studies. Each run takes 4–6 hours of analyst time, and the analysts spend an additional 2–3 hours on quality checks and report formatting. The total monthly effort is approximately 60 person-hours.
The Chief Medical Officer has mandated automation: "The analysis must be reproducible, auditable, and less dependent on individual analysts. If a regulator asks 'show me how you computed this number,' we need to produce the exact code, data, and configuration within 24 hours."
The Design Challenge
MediCore's requirements differ fundamentally from StreamRec's in ways that shape every orchestration decision:
| Dimension | StreamRec | MediCore |
|---|---|---|
| Frequency | Daily | Monthly |
| Latency requirement | Models fresh by 8am | Reports ready within 5 business days of data lock |
| Failure tolerance | Automatic retry; accept model staleness | Zero tolerance; regulatory consequences for errors |
| Data volume | Millions of interactions/day | 500–5,000 patients per study |
| Primary concern | Throughput, freshness | Correctness, auditability, reproducibility |
| Regulatory requirement | None | FDA 21 CFR Part 11; ICH E9(R1) |
| Retention | 90 days for training data | 15 years for all artifacts |
The regulatory dimension is paramount. FDA 21 CFR Part 11 requires electronic records to be attributable, legible, contemporaneous, original, and accurate (ALCOA). Every computation must be traceable to a specific code version, data snapshot, and analyst action.
The Pipeline Architecture
Framework Selection: Dagster
MediCore selects Dagster for the same asset-centric reasons as StreamRec, plus two regulatory-specific advantages:
- Asset metadata for audit trails. Every Dagster asset materialization records metadata — input data versions, code version, analyst who triggered the run, parameter values. This metadata chain satisfies the "attributable" and "contemporaneous" ALCOA requirements.
- Immutable materialization history. Dagster's asset materialization log provides a tamper-evident record of every computation, including failed attempts. This satisfies the "original" requirement — an auditor can see not just the final report, but every intermediate result and every failed run.
The Asset Graph
"""medicore_pipeline/assets.py — Monthly causal analysis pipeline.
Each asset represents a regulatory-grade data artifact with
full provenance metadata for audit compliance.
"""
from dagster import (
asset,
MonthlyPartitionsDefinition,
Output,
MetadataValue,
Config,
)
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
import pandas as pd
import numpy as np
monthly_partitions = MonthlyPartitionsDefinition(start_date="2024-01-01")
class StudyConfig(Config):
"""Configuration for a single study's causal analysis.
Attributes:
study_id: Unique study identifier (e.g., 'MED-2024-0147').
treatment_column: Column name for treatment assignment.
outcome_column: Column name for the primary outcome.
covariate_columns: Comma-separated list of baseline covariates.
propensity_trim_threshold: Extreme propensity score trimming bound.
"""
study_id: str
treatment_column: str = "treatment_arm"
outcome_column: str = "primary_endpoint"
covariate_columns: str = "age,sex,bmi,baseline_score,comorbidity_index"
propensity_trim_threshold: float = 0.05
@asset(partitions_def=monthly_partitions, group_name="data_extraction")
def patient_data(context, config: StudyConfig) -> Output[pd.DataFrame]:
"""Extract patient-level data from the EDC system.
Pulls the complete dataset for the configured study,
including demographics, treatment assignment, outcomes,
and baseline covariates. Records the EDC export timestamp
and data version for regulatory traceability.
"""
partition_month = context.partition_key # e.g., "2025-03"
# EDC extraction logic (study-specific API)
# In production, this calls the EDC system's export API
edc_export_timestamp = "2025-03-15T14:30:00Z" # From EDC response
edc_version = "v847" # EDC data version identifier
df = pd.DataFrame() # Populated by EDC extraction
return Output(
df,
metadata={
"study_id": MetadataValue.text(config.study_id),
"num_patients": MetadataValue.int(len(df)),
"edc_export_timestamp": MetadataValue.text(edc_export_timestamp),
"edc_data_version": MetadataValue.text(edc_version),
"partition_month": MetadataValue.text(partition_month),
},
)
@asset(partitions_def=monthly_partitions, group_name="data_preparation")
def analysis_cohort(
context, config: StudyConfig, patient_data: pd.DataFrame
) -> Output[pd.DataFrame]:
"""Apply inclusion/exclusion criteria to define the analysis cohort.
Records the number of patients excluded at each criterion
for the CONSORT flow diagram.
"""
cohort = patient_data.copy()
exclusion_log: List[Dict[str, Any]] = []
# Example exclusion criteria (study-specific)
initial_n = len(cohort)
# Exclude patients with missing primary outcome
outcome_col = config.outcome_column
if outcome_col in cohort.columns:
missing_outcome = cohort[outcome_col].isna()
n_excluded = missing_outcome.sum()
cohort = cohort[~missing_outcome]
exclusion_log.append({
"criterion": "Missing primary outcome",
"n_excluded": int(n_excluded),
"n_remaining": len(cohort),
})
# Exclude patients with missing treatment assignment
treatment_col = config.treatment_column
if treatment_col in cohort.columns:
missing_treatment = cohort[treatment_col].isna()
n_excluded = missing_treatment.sum()
cohort = cohort[~missing_treatment]
exclusion_log.append({
"criterion": "Missing treatment assignment",
"n_excluded": int(n_excluded),
"n_remaining": len(cohort),
})
return Output(
cohort,
metadata={
"initial_patients": MetadataValue.int(initial_n),
"final_cohort_size": MetadataValue.int(len(cohort)),
"exclusion_log": MetadataValue.json(exclusion_log),
},
)
@asset(partitions_def=monthly_partitions, group_name="analysis")
def propensity_scores(
context, config: StudyConfig, analysis_cohort: pd.DataFrame
) -> Output[pd.DataFrame]:
"""Estimate propensity scores via logistic regression.
Computes P(treatment | covariates) for each patient.
Records covariate balance diagnostics (standardized mean
differences before and after weighting) for regulatory review.
"""
from sklearn.linear_model import LogisticRegression
covariates = [c.strip() for c in config.covariate_columns.split(",")]
treatment_col = config.treatment_column
X = analysis_cohort[covariates].values
t = analysis_cohort[treatment_col].values
model = LogisticRegression(max_iter=1000, random_state=42)
model.fit(X, t)
cohort_with_ps = analysis_cohort.copy()
cohort_with_ps["propensity_score"] = model.predict_proba(X)[:, 1]
# Trim extreme propensity scores
trim = config.propensity_trim_threshold
trimmed = cohort_with_ps[
(cohort_with_ps["propensity_score"] >= trim)
& (cohort_with_ps["propensity_score"] <= 1 - trim)
].copy()
n_trimmed = len(cohort_with_ps) - len(trimmed)
# Compute standardized mean differences for balance check
smd_before = {}
smd_after = {}
for cov in covariates:
treated = cohort_with_ps[cohort_with_ps[treatment_col] == 1][cov]
control = cohort_with_ps[cohort_with_ps[treatment_col] == 0][cov]
pooled_std = np.sqrt((treated.var() + control.var()) / 2)
if pooled_std > 0:
smd_before[cov] = abs(treated.mean() - control.mean()) / pooled_std
else:
smd_before[cov] = 0.0
# After weighting (simplified; full IPW-weighted SMD in production)
smd_after[cov] = smd_before[cov] * 0.3 # Placeholder reduction
max_smd_before = max(smd_before.values()) if smd_before else 0.0
max_smd_after = max(smd_after.values()) if smd_after else 0.0
return Output(
trimmed,
metadata={
"n_trimmed": MetadataValue.int(n_trimmed),
"cohort_after_trimming": MetadataValue.int(len(trimmed)),
"max_smd_before_weighting": MetadataValue.float(max_smd_before),
"max_smd_after_weighting": MetadataValue.float(max_smd_after),
"balance_acceptable": MetadataValue.bool(max_smd_after < 0.1),
},
)
@asset(partitions_def=monthly_partitions, group_name="analysis")
def treatment_effect_estimate(
context, config: StudyConfig, propensity_scores: pd.DataFrame
) -> Output[Dict[str, Any]]:
"""Estimate the average treatment effect using doubly robust estimation.
Implements the AIPW estimator from Chapter 18 with bootstrap
confidence intervals. Records the full estimation results
including point estimate, standard error, 95% CI, and p-value.
"""
from sklearn.linear_model import LinearRegression
treatment_col = config.treatment_column
outcome_col = config.outcome_column
covariates = [c.strip() for c in config.covariate_columns.split(",")]
t = propensity_scores[treatment_col].values
y = propensity_scores[outcome_col].values
X = propensity_scores[covariates].values
ps = propensity_scores["propensity_score"].values
# Outcome model: E[Y | X, T]
outcome_model_1 = LinearRegression()
outcome_model_0 = LinearRegression()
outcome_model_1.fit(X[t == 1], y[t == 1])
outcome_model_0.fit(X[t == 0], y[t == 0])
mu1_hat = outcome_model_1.predict(X)
mu0_hat = outcome_model_0.predict(X)
# AIPW estimator (Chapter 18, Section 18.7)
n = len(y)
aipw_scores = (
mu1_hat - mu0_hat
+ t * (y - mu1_hat) / ps
- (1 - t) * (y - mu0_hat) / (1 - ps)
)
ate_estimate = float(np.mean(aipw_scores))
ate_se = float(np.std(aipw_scores) / np.sqrt(n))
# 95% confidence interval
ci_lower = ate_estimate - 1.96 * ate_se
ci_upper = ate_estimate + 1.96 * ate_se
# Bootstrap CI for robustness (1000 resamples)
rng = np.random.default_rng(42)
bootstrap_ates = []
for _ in range(1000):
idx = rng.choice(n, size=n, replace=True)
bootstrap_ates.append(float(np.mean(aipw_scores[idx])))
bootstrap_ci_lower = float(np.percentile(bootstrap_ates, 2.5))
bootstrap_ci_upper = float(np.percentile(bootstrap_ates, 97.5))
results = {
"estimator": "AIPW (doubly robust)",
"ate_estimate": ate_estimate,
"standard_error": ate_se,
"ci_95_lower": ci_lower,
"ci_95_upper": ci_upper,
"bootstrap_ci_95_lower": bootstrap_ci_lower,
"bootstrap_ci_95_upper": bootstrap_ci_upper,
"n_analyzed": n,
"significant_at_005": bool(ci_lower > 0 or ci_upper < 0),
}
return Output(
results,
metadata={
"ate": MetadataValue.float(ate_estimate),
"ci_95": MetadataValue.text(f"[{ci_lower:.4f}, {ci_upper:.4f}]"),
"significant": MetadataValue.bool(results["significant_at_005"]),
"n_analyzed": MetadataValue.int(n),
},
)
@asset(partitions_def=monthly_partitions, group_name="reporting")
def regulatory_report(
context,
config: StudyConfig,
analysis_cohort: pd.DataFrame,
propensity_scores: pd.DataFrame,
treatment_effect_estimate: Dict[str, Any],
) -> Output[Dict[str, str]]:
"""Generate the regulatory-grade analysis report.
Produces a PDF report with:
- CONSORT flow diagram
- Covariate balance table (before/after weighting)
- Primary analysis results (ATE, CI, p-value)
- Sensitivity analysis summary
- Data provenance appendix
Archives the report alongside the frozen dataset and code snapshot.
"""
import subprocess
import json
partition_month = context.partition_key
study_id = config.study_id
# Generate report content (Jinja2 template + LaTeX rendering)
report_path = (
f"s3://medicore-regulatory/reports/{study_id}/"
f"{partition_month}/analysis_report.pdf"
)
data_archive_path = (
f"s3://medicore-regulatory/archives/{study_id}/"
f"{partition_month}/frozen_dataset.parquet"
)
code_archive_path = (
f"s3://medicore-regulatory/archives/{study_id}/"
f"{partition_month}/code_snapshot.tar.gz"
)
# Archive the frozen dataset
propensity_scores.to_parquet(data_archive_path, index=False)
# Archive the code snapshot (git bundle)
git_sha = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True, text=True
).stdout.strip()
# Write provenance metadata
provenance = {
"study_id": study_id,
"analysis_month": partition_month,
"git_sha": git_sha,
"n_analyzed": treatment_effect_estimate["n_analyzed"],
"ate": treatment_effect_estimate["ate_estimate"],
"report_generated_utc": "2025-03-20T10:00:00Z", # From datetime
"analyst": "pipeline-automated",
"retention_years": 15,
}
provenance_path = (
f"s3://medicore-regulatory/archives/{study_id}/"
f"{partition_month}/provenance.json"
)
result = {
"report_path": report_path,
"data_archive_path": data_archive_path,
"code_archive_path": code_archive_path,
"provenance_path": provenance_path,
"git_sha": git_sha,
}
return Output(
result,
metadata={
"report_path": MetadataValue.text(report_path),
"git_sha": MetadataValue.text(git_sha),
"retention_until": MetadataValue.text("2040-03"),
},
)
Regulatory Compliance Through Orchestration
The Dagster pipeline satisfies the ALCOA requirements through technical mechanisms:
| ALCOA Requirement | Technical Implementation |
|---|---|
| Attributable | Every asset materialization records the Dagster user, git SHA, and run ID. The analyst field in provenance metadata identifies who triggered the run. |
| Legible | The PDF report is generated from structured data via a versioned LaTeX template. All intermediate data is stored in Parquet with self-describing schemas. |
| Contemporaneous | Dagster records materialization timestamps automatically. The provenance metadata includes the generation timestamp. |
| Original | The frozen dataset and code snapshot are archived to immutable S3 storage (with object lock) at the time of report generation. |
| Accurate | Data contracts validate the analysis cohort at each stage. The AIPW estimator includes bootstrap CIs as a robustness check. Covariate balance is verified (SMD < 0.1) before proceeding to the primary analysis. |
Results
After 6 months of automated operation across 8 studies:
| Metric | Before (Manual) | After (Dagster) |
|---|---|---|
| Monthly analyst hours | 60 | 8 (review + approval only) |
| Report turnaround | 5–12 business days | 2 business days (automated + 1 day review) |
| Reproducibility audit pass rate | 72% (missing code/data versions) | 100% |
| Regulatory query response time | 3–5 days (find the right notebook) | < 4 hours (Dagster lineage + S3 archive) |
| Cross-study consistency | Variable (analyst-dependent formatting) | 100% (template-driven) |
The most significant impact was on regulatory audits. During a routine FDA inspection, the auditor requested the complete provenance chain for a treatment effect estimate from 8 months earlier. The team retrieved the archived dataset, code snapshot, and Dagster materialization metadata in under 3 hours — a process that previously required 2 analysts working for 3 days to reconstruct from notebooks, email threads, and shared drives.
Lessons Learned
1. Regulatory requirements and engineering best practices converge. Every ALCOA requirement maps to a software engineering principle: attribution is logging, legibility is documentation, contemporaneousness is timestamping, originality is immutable storage, accuracy is testing. Good engineering satisfies regulation naturally.
2. Monthly pipelines need orchestration too. The low frequency (monthly) disguised the high cost of manual execution. The 60 person-hours per month, spread across 3 analysts and 8 studies, was invisible in any single analyst's workload but represented 720 person-hours per year — nearly half a full-time position.
3. The causal analysis pipeline demands stricter failure handling than the ML training pipeline. StreamRec can tolerate a day of stale models; MediCore cannot tolerate a single incorrect treatment effect estimate. The pipeline halts on any data quality violation, covariate imbalance, or propensity score anomaly, and requires human review before proceeding — a deliberate departure from the "retry and continue" philosophy of the StreamRec pipeline.
4. Archival is a first-class pipeline output. The regulatory report, frozen dataset, code snapshot, and provenance metadata are not byproducts of the analysis — they are the primary deliverables. Treating archival as an explicit asset in the Dagster graph (rather than an afterthought script) ensures that it is versioned, tested, and monitored like every other pipeline output.