Case Study 2: MediCore — Orchestrating a Periodic Causal Analysis Pipeline

Context

MediCore Pharmaceuticals, introduced in the causal inference chapters (Chapters 15–19) and the Bayesian modeling chapter (Chapter 21), conducts periodic causal analyses of treatment effects across its clinical portfolio. The analyses feed regulatory submissions, safety monitoring reports, and internal decision-making on trial design.

Until now, the causal analysis pipeline has been a collection of R scripts and Jupyter notebooks maintained by three biostatisticians. Each monthly analysis follows the same steps:

  1. Extract patient-level data from the Electronic Data Capture (EDC) system
  2. Apply inclusion/exclusion criteria per the study protocol
  3. Compute baseline covariates and check covariate balance
  4. Estimate propensity scores (logistic regression, Chapter 18)
  5. Run the primary analysis: doubly robust estimation of the average treatment effect (ATE)
  6. Run sensitivity analyses: varying trimming thresholds, alternative estimators (IPW, AIPW), subgroup analyses
  7. Generate a regulatory-grade report (PDF with tables, figures, and statistical summaries)
  8. Archive the report, dataset, and code snapshot for regulatory retention (minimum 15 years)

The pipeline is run monthly for 8 active studies. Each run takes 4–6 hours of analyst time, and the analysts spend an additional 2–3 hours on quality checks and report formatting. The total monthly effort is approximately 60 person-hours.

The Chief Medical Officer has mandated automation: "The analysis must be reproducible, auditable, and less dependent on individual analysts. If a regulator asks 'show me how you computed this number,' we need to produce the exact code, data, and configuration within 24 hours."

The Design Challenge

MediCore's requirements differ fundamentally from StreamRec's in ways that shape every orchestration decision:

Dimension StreamRec MediCore
Frequency Daily Monthly
Latency requirement Models fresh by 8am Reports ready within 5 business days of data lock
Failure tolerance Automatic retry; accept model staleness Zero tolerance; regulatory consequences for errors
Data volume Millions of interactions/day 500–5,000 patients per study
Primary concern Throughput, freshness Correctness, auditability, reproducibility
Regulatory requirement None FDA 21 CFR Part 11; ICH E9(R1)
Retention 90 days for training data 15 years for all artifacts

The regulatory dimension is paramount. FDA 21 CFR Part 11 requires electronic records to be attributable, legible, contemporaneous, original, and accurate (ALCOA). Every computation must be traceable to a specific code version, data snapshot, and analyst action.

The Pipeline Architecture

Framework Selection: Dagster

MediCore selects Dagster for the same asset-centric reasons as StreamRec, plus two regulatory-specific advantages:

  1. Asset metadata for audit trails. Every Dagster asset materialization records metadata — input data versions, code version, analyst who triggered the run, parameter values. This metadata chain satisfies the "attributable" and "contemporaneous" ALCOA requirements.
  2. Immutable materialization history. Dagster's asset materialization log provides a tamper-evident record of every computation, including failed attempts. This satisfies the "original" requirement — an auditor can see not just the final report, but every intermediate result and every failed run.

The Asset Graph

"""medicore_pipeline/assets.py — Monthly causal analysis pipeline.

Each asset represents a regulatory-grade data artifact with
full provenance metadata for audit compliance.
"""
from dagster import (
    asset,
    MonthlyPartitionsDefinition,
    Output,
    MetadataValue,
    Config,
)
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
import pandas as pd
import numpy as np


monthly_partitions = MonthlyPartitionsDefinition(start_date="2024-01-01")


class StudyConfig(Config):
    """Configuration for a single study's causal analysis.

    Attributes:
        study_id: Unique study identifier (e.g., 'MED-2024-0147').
        treatment_column: Column name for treatment assignment.
        outcome_column: Column name for the primary outcome.
        covariate_columns: Comma-separated list of baseline covariates.
        propensity_trim_threshold: Extreme propensity score trimming bound.
    """
    study_id: str
    treatment_column: str = "treatment_arm"
    outcome_column: str = "primary_endpoint"
    covariate_columns: str = "age,sex,bmi,baseline_score,comorbidity_index"
    propensity_trim_threshold: float = 0.05


@asset(partitions_def=monthly_partitions, group_name="data_extraction")
def patient_data(context, config: StudyConfig) -> Output[pd.DataFrame]:
    """Extract patient-level data from the EDC system.

    Pulls the complete dataset for the configured study,
    including demographics, treatment assignment, outcomes,
    and baseline covariates. Records the EDC export timestamp
    and data version for regulatory traceability.
    """
    partition_month = context.partition_key  # e.g., "2025-03"

    # EDC extraction logic (study-specific API)
    # In production, this calls the EDC system's export API
    edc_export_timestamp = "2025-03-15T14:30:00Z"  # From EDC response
    edc_version = "v847"  # EDC data version identifier

    df = pd.DataFrame()  # Populated by EDC extraction

    return Output(
        df,
        metadata={
            "study_id": MetadataValue.text(config.study_id),
            "num_patients": MetadataValue.int(len(df)),
            "edc_export_timestamp": MetadataValue.text(edc_export_timestamp),
            "edc_data_version": MetadataValue.text(edc_version),
            "partition_month": MetadataValue.text(partition_month),
        },
    )


@asset(partitions_def=monthly_partitions, group_name="data_preparation")
def analysis_cohort(
    context, config: StudyConfig, patient_data: pd.DataFrame
) -> Output[pd.DataFrame]:
    """Apply inclusion/exclusion criteria to define the analysis cohort.

    Records the number of patients excluded at each criterion
    for the CONSORT flow diagram.
    """
    cohort = patient_data.copy()
    exclusion_log: List[Dict[str, Any]] = []

    # Example exclusion criteria (study-specific)
    initial_n = len(cohort)

    # Exclude patients with missing primary outcome
    outcome_col = config.outcome_column
    if outcome_col in cohort.columns:
        missing_outcome = cohort[outcome_col].isna()
        n_excluded = missing_outcome.sum()
        cohort = cohort[~missing_outcome]
        exclusion_log.append({
            "criterion": "Missing primary outcome",
            "n_excluded": int(n_excluded),
            "n_remaining": len(cohort),
        })

    # Exclude patients with missing treatment assignment
    treatment_col = config.treatment_column
    if treatment_col in cohort.columns:
        missing_treatment = cohort[treatment_col].isna()
        n_excluded = missing_treatment.sum()
        cohort = cohort[~missing_treatment]
        exclusion_log.append({
            "criterion": "Missing treatment assignment",
            "n_excluded": int(n_excluded),
            "n_remaining": len(cohort),
        })

    return Output(
        cohort,
        metadata={
            "initial_patients": MetadataValue.int(initial_n),
            "final_cohort_size": MetadataValue.int(len(cohort)),
            "exclusion_log": MetadataValue.json(exclusion_log),
        },
    )


@asset(partitions_def=monthly_partitions, group_name="analysis")
def propensity_scores(
    context, config: StudyConfig, analysis_cohort: pd.DataFrame
) -> Output[pd.DataFrame]:
    """Estimate propensity scores via logistic regression.

    Computes P(treatment | covariates) for each patient.
    Records covariate balance diagnostics (standardized mean
    differences before and after weighting) for regulatory review.
    """
    from sklearn.linear_model import LogisticRegression

    covariates = [c.strip() for c in config.covariate_columns.split(",")]
    treatment_col = config.treatment_column

    X = analysis_cohort[covariates].values
    t = analysis_cohort[treatment_col].values

    model = LogisticRegression(max_iter=1000, random_state=42)
    model.fit(X, t)

    cohort_with_ps = analysis_cohort.copy()
    cohort_with_ps["propensity_score"] = model.predict_proba(X)[:, 1]

    # Trim extreme propensity scores
    trim = config.propensity_trim_threshold
    trimmed = cohort_with_ps[
        (cohort_with_ps["propensity_score"] >= trim)
        & (cohort_with_ps["propensity_score"] <= 1 - trim)
    ].copy()

    n_trimmed = len(cohort_with_ps) - len(trimmed)

    # Compute standardized mean differences for balance check
    smd_before = {}
    smd_after = {}
    for cov in covariates:
        treated = cohort_with_ps[cohort_with_ps[treatment_col] == 1][cov]
        control = cohort_with_ps[cohort_with_ps[treatment_col] == 0][cov]
        pooled_std = np.sqrt((treated.var() + control.var()) / 2)
        if pooled_std > 0:
            smd_before[cov] = abs(treated.mean() - control.mean()) / pooled_std
        else:
            smd_before[cov] = 0.0
        # After weighting (simplified; full IPW-weighted SMD in production)
        smd_after[cov] = smd_before[cov] * 0.3  # Placeholder reduction

    max_smd_before = max(smd_before.values()) if smd_before else 0.0
    max_smd_after = max(smd_after.values()) if smd_after else 0.0

    return Output(
        trimmed,
        metadata={
            "n_trimmed": MetadataValue.int(n_trimmed),
            "cohort_after_trimming": MetadataValue.int(len(trimmed)),
            "max_smd_before_weighting": MetadataValue.float(max_smd_before),
            "max_smd_after_weighting": MetadataValue.float(max_smd_after),
            "balance_acceptable": MetadataValue.bool(max_smd_after < 0.1),
        },
    )


@asset(partitions_def=monthly_partitions, group_name="analysis")
def treatment_effect_estimate(
    context, config: StudyConfig, propensity_scores: pd.DataFrame
) -> Output[Dict[str, Any]]:
    """Estimate the average treatment effect using doubly robust estimation.

    Implements the AIPW estimator from Chapter 18 with bootstrap
    confidence intervals. Records the full estimation results
    including point estimate, standard error, 95% CI, and p-value.
    """
    from sklearn.linear_model import LinearRegression

    treatment_col = config.treatment_column
    outcome_col = config.outcome_column
    covariates = [c.strip() for c in config.covariate_columns.split(",")]

    t = propensity_scores[treatment_col].values
    y = propensity_scores[outcome_col].values
    X = propensity_scores[covariates].values
    ps = propensity_scores["propensity_score"].values

    # Outcome model: E[Y | X, T]
    outcome_model_1 = LinearRegression()
    outcome_model_0 = LinearRegression()
    outcome_model_1.fit(X[t == 1], y[t == 1])
    outcome_model_0.fit(X[t == 0], y[t == 0])

    mu1_hat = outcome_model_1.predict(X)
    mu0_hat = outcome_model_0.predict(X)

    # AIPW estimator (Chapter 18, Section 18.7)
    n = len(y)
    aipw_scores = (
        mu1_hat - mu0_hat
        + t * (y - mu1_hat) / ps
        - (1 - t) * (y - mu0_hat) / (1 - ps)
    )
    ate_estimate = float(np.mean(aipw_scores))
    ate_se = float(np.std(aipw_scores) / np.sqrt(n))

    # 95% confidence interval
    ci_lower = ate_estimate - 1.96 * ate_se
    ci_upper = ate_estimate + 1.96 * ate_se

    # Bootstrap CI for robustness (1000 resamples)
    rng = np.random.default_rng(42)
    bootstrap_ates = []
    for _ in range(1000):
        idx = rng.choice(n, size=n, replace=True)
        bootstrap_ates.append(float(np.mean(aipw_scores[idx])))

    bootstrap_ci_lower = float(np.percentile(bootstrap_ates, 2.5))
    bootstrap_ci_upper = float(np.percentile(bootstrap_ates, 97.5))

    results = {
        "estimator": "AIPW (doubly robust)",
        "ate_estimate": ate_estimate,
        "standard_error": ate_se,
        "ci_95_lower": ci_lower,
        "ci_95_upper": ci_upper,
        "bootstrap_ci_95_lower": bootstrap_ci_lower,
        "bootstrap_ci_95_upper": bootstrap_ci_upper,
        "n_analyzed": n,
        "significant_at_005": bool(ci_lower > 0 or ci_upper < 0),
    }

    return Output(
        results,
        metadata={
            "ate": MetadataValue.float(ate_estimate),
            "ci_95": MetadataValue.text(f"[{ci_lower:.4f}, {ci_upper:.4f}]"),
            "significant": MetadataValue.bool(results["significant_at_005"]),
            "n_analyzed": MetadataValue.int(n),
        },
    )


@asset(partitions_def=monthly_partitions, group_name="reporting")
def regulatory_report(
    context,
    config: StudyConfig,
    analysis_cohort: pd.DataFrame,
    propensity_scores: pd.DataFrame,
    treatment_effect_estimate: Dict[str, Any],
) -> Output[Dict[str, str]]:
    """Generate the regulatory-grade analysis report.

    Produces a PDF report with:
    - CONSORT flow diagram
    - Covariate balance table (before/after weighting)
    - Primary analysis results (ATE, CI, p-value)
    - Sensitivity analysis summary
    - Data provenance appendix

    Archives the report alongside the frozen dataset and code snapshot.
    """
    import subprocess
    import json

    partition_month = context.partition_key
    study_id = config.study_id

    # Generate report content (Jinja2 template + LaTeX rendering)
    report_path = (
        f"s3://medicore-regulatory/reports/{study_id}/"
        f"{partition_month}/analysis_report.pdf"
    )
    data_archive_path = (
        f"s3://medicore-regulatory/archives/{study_id}/"
        f"{partition_month}/frozen_dataset.parquet"
    )
    code_archive_path = (
        f"s3://medicore-regulatory/archives/{study_id}/"
        f"{partition_month}/code_snapshot.tar.gz"
    )

    # Archive the frozen dataset
    propensity_scores.to_parquet(data_archive_path, index=False)

    # Archive the code snapshot (git bundle)
    git_sha = subprocess.run(
        ["git", "rev-parse", "HEAD"],
        capture_output=True, text=True
    ).stdout.strip()

    # Write provenance metadata
    provenance = {
        "study_id": study_id,
        "analysis_month": partition_month,
        "git_sha": git_sha,
        "n_analyzed": treatment_effect_estimate["n_analyzed"],
        "ate": treatment_effect_estimate["ate_estimate"],
        "report_generated_utc": "2025-03-20T10:00:00Z",  # From datetime
        "analyst": "pipeline-automated",
        "retention_years": 15,
    }

    provenance_path = (
        f"s3://medicore-regulatory/archives/{study_id}/"
        f"{partition_month}/provenance.json"
    )

    result = {
        "report_path": report_path,
        "data_archive_path": data_archive_path,
        "code_archive_path": code_archive_path,
        "provenance_path": provenance_path,
        "git_sha": git_sha,
    }

    return Output(
        result,
        metadata={
            "report_path": MetadataValue.text(report_path),
            "git_sha": MetadataValue.text(git_sha),
            "retention_until": MetadataValue.text("2040-03"),
        },
    )

Regulatory Compliance Through Orchestration

The Dagster pipeline satisfies the ALCOA requirements through technical mechanisms:

ALCOA Requirement Technical Implementation
Attributable Every asset materialization records the Dagster user, git SHA, and run ID. The analyst field in provenance metadata identifies who triggered the run.
Legible The PDF report is generated from structured data via a versioned LaTeX template. All intermediate data is stored in Parquet with self-describing schemas.
Contemporaneous Dagster records materialization timestamps automatically. The provenance metadata includes the generation timestamp.
Original The frozen dataset and code snapshot are archived to immutable S3 storage (with object lock) at the time of report generation.
Accurate Data contracts validate the analysis cohort at each stage. The AIPW estimator includes bootstrap CIs as a robustness check. Covariate balance is verified (SMD < 0.1) before proceeding to the primary analysis.

Results

After 6 months of automated operation across 8 studies:

Metric Before (Manual) After (Dagster)
Monthly analyst hours 60 8 (review + approval only)
Report turnaround 5–12 business days 2 business days (automated + 1 day review)
Reproducibility audit pass rate 72% (missing code/data versions) 100%
Regulatory query response time 3–5 days (find the right notebook) < 4 hours (Dagster lineage + S3 archive)
Cross-study consistency Variable (analyst-dependent formatting) 100% (template-driven)

The most significant impact was on regulatory audits. During a routine FDA inspection, the auditor requested the complete provenance chain for a treatment effect estimate from 8 months earlier. The team retrieved the archived dataset, code snapshot, and Dagster materialization metadata in under 3 hours — a process that previously required 2 analysts working for 3 days to reconstruct from notebooks, email threads, and shared drives.

Lessons Learned

1. Regulatory requirements and engineering best practices converge. Every ALCOA requirement maps to a software engineering principle: attribution is logging, legibility is documentation, contemporaneousness is timestamping, originality is immutable storage, accuracy is testing. Good engineering satisfies regulation naturally.

2. Monthly pipelines need orchestration too. The low frequency (monthly) disguised the high cost of manual execution. The 60 person-hours per month, spread across 3 analysts and 8 studies, was invisible in any single analyst's workload but represented 720 person-hours per year — nearly half a full-time position.

3. The causal analysis pipeline demands stricter failure handling than the ML training pipeline. StreamRec can tolerate a day of stale models; MediCore cannot tolerate a single incorrect treatment effect estimate. The pipeline halts on any data quality violation, covariate imbalance, or propensity score anomaly, and requires human review before proceeding — a deliberate departure from the "retry and continue" philosophy of the StreamRec pipeline.

4. Archival is a first-class pipeline output. The regulatory report, frozen dataset, code snapshot, and provenance metadata are not byproducts of the analysis — they are the primary deliverables. Treating archival as an explicit asset in the Dagster graph (rather than an afterthought script) ensures that it is versioned, tested, and monitored like every other pipeline output.