Case Study 2: Batch vs. Real-Time --- Choosing the Right Deployment Pattern
Background
MedScore is a health insurance company with 4.2 million members. Their data science team has built a hospital readmission risk model that predicts, within 24 hours of discharge, whether a patient will be readmitted within 30 days. The model is an XGBoost classifier with an AUC of 0.81, trained on claims data, diagnosis codes, length of stay, prior admissions, and demographic features.
The model works. The question is how to deploy it.
The business has three use cases for readmission predictions:
-
Discharge planning (real-time). When a care coordinator reviews a patient's discharge plan, the coordinator portal should display the readmission risk score. The coordinator is sitting with the patient. Latency matters.
-
Post-discharge outreach (batch). Every morning, the care management team receives a list of patients discharged in the past 24 hours, ranked by readmission risk. High-risk patients get a follow-up call within 48 hours.
-
Population health reporting (batch). Every quarter, the analytics team generates a report showing readmission risk by geography, diagnosis group, and demographic segment. This feeds into contract negotiations with hospital systems.
Each use case has different latency requirements, different data freshness needs, and different cost profiles. Deploying all three through a single real-time API would work but would be expensive and unnecessarily complex. Deploying all three as batch jobs would work but would make the discharge planning use case unusable (the coordinator cannot wait until tomorrow's batch run).
This case study follows MedScore's ML engineer, David, as he designs a deployment architecture that serves all three use cases efficiently.
Analyzing the Requirements
David starts by mapping each use case to its constraints:
| Use Case | Latency | Data Freshness | Volume | Frequency |
|---|---|---|---|---|
| Discharge planning | < 500 ms | Real-time (current patient) | 1 request at a time | ~2,000/day (working hours) |
| Post-discharge outreach | Hours OK | Last 24 hours | ~800 patients per batch | Once daily at 6 AM |
| Population health reporting | Days OK | Quarterly snapshot | All 4.2M members | Quarterly |
The differences are stark. The first use case needs a real-time API. The third use case needs a batch job that runs on a schedule. The second use case is in between --- it could be real-time or batch, but batch is simpler and cheaper because the morning list can be pre-computed overnight.
The Architecture
David designs a two-track architecture:
+-------------------+
| Model Artifact |
| (S3 / Registry) |
+--------+----------+
|
+--------------+--------------+
| |
+---------v---------+ +------------v-----------+
| Real-Time Track | | Batch Track |
| FastAPI + Docker | | Airflow DAG |
| ECS Fargate | | Fargate Spot Task |
+---------+---------+ +------------+-----------+
| |
+---------v---------+ +------------v-----------+
| Discharge Portal | | care_management_scores |
| (coordinator UI) | | (database table) |
+-------------------+ +------------------------+
| population_health_ |
| report (quarterly CSV) |
+------------------------+
Both tracks load the same model artifact from the same registry. The model is trained once, versioned once, and served twice through different interfaces. This is a critical principle: one model, multiple serving paths.
Track 1: Real-Time API for Discharge Planning
The real-time track is a FastAPI application following the pattern from the chapter. David's implementation has a few medical-domain-specific considerations.
The Request Schema
from pydantic import BaseModel, Field
from typing import Literal
class ReadmissionRequest(BaseModel):
patient_age: int = Field(..., ge=0, le=120)
length_of_stay_days: int = Field(..., ge=0, le=365)
num_prior_admissions_12m: int = Field(
..., ge=0,
description="Hospital admissions in the past 12 months"
)
num_chronic_conditions: int = Field(
..., ge=0,
description="Count of active chronic conditions"
)
primary_diagnosis_group: Literal[
"cardiac", "respiratory", "digestive", "musculoskeletal",
"neurological", "endocrine", "other"
] = Field(..., description="Primary diagnosis category")
discharge_disposition: Literal[
"home", "home_with_care", "skilled_nursing", "rehab", "other"
] = Field(..., description="Where the patient is being discharged to")
insurance_type: Literal[
"commercial", "medicare", "medicaid", "dual_eligible"
] = Field(...)
emergency_admission: bool = Field(
..., description="Whether the admission was through the ED"
)
num_medications_at_discharge: int = Field(
..., ge=0, description="Number of medications prescribed at discharge"
)
has_follow_up_scheduled: bool = Field(
..., description="Whether a follow-up appointment is scheduled"
)
Practical Tip --- Notice that David uses
Literaltypes for categorical fields. In a healthcare context, this is especially important. If a client sends"diagnosis_group": "cardac"(a typo), the API rejects it immediately instead of encoding it as "other" and returning a misleading prediction. In healthcare, a misleading prediction is not a minor inconvenience --- it affects patient care.
The Response
class ReadmissionResponse(BaseModel):
readmission_probability: float = Field(..., ge=0, le=1)
risk_tier: Literal["low", "moderate", "high", "very_high"]
recommended_actions: list[str] = Field(
...,
description="Suggested interventions based on risk factors"
)
model_version: str
disclaimer: str = Field(
default="This score is a decision-support tool. "
"Clinical judgment should guide all care decisions.",
description="Required clinical disclaimer"
)
The disclaimer field is a legal requirement, not a nice-to-have. In healthcare ML, every prediction must be accompanied by a statement that the model is advisory, not prescriptive. David bakes it into the schema so it cannot be accidentally omitted by the frontend.
The recommended_actions field translates risk factors into actionable interventions:
def generate_recommendations(
request: ReadmissionRequest, probability: float
) -> list[str]:
actions = []
if probability > 0.5 and not request.has_follow_up_scheduled:
actions.append(
"Schedule follow-up appointment within 7 days of discharge"
)
if request.num_medications_at_discharge > 8:
actions.append(
"Medication reconciliation review recommended"
)
if request.num_prior_admissions_12m >= 2:
actions.append(
"Consider referral to care management program"
)
if request.discharge_disposition == "home" and probability > 0.4:
actions.append(
"Evaluate need for home health services"
)
if not actions:
actions.append("Standard discharge protocol appropriate")
return actions
These recommendations are not model outputs --- they are business rules layered on top of the model output. The model provides the probability; the business logic translates it into interventions. This separation is deliberate. When the clinical team changes intervention thresholds, they modify the business rules without retraining the model.
Track 2: Batch Scoring for Post-Discharge Outreach
The batch track runs every morning at 6 AM via an Airflow DAG. It loads the same model, scores all patients discharged in the past 24 hours, and writes the results to a database table.
# batch_scorer.py --- Nightly readmission risk scoring
import pandas as pd
import joblib
import boto3
from datetime import datetime, timedelta
from sqlalchemy import create_engine
# Load the same model artifact used by the API
model = joblib.load("model/readmission_model.joblib")
# Connect to the data warehouse
engine = create_engine("postgresql://user:pass@warehouse:5432/medscore")
# Query patients discharged in the last 24 hours
query = """
SELECT
patient_id,
patient_age,
length_of_stay_days,
num_prior_admissions_12m,
num_chronic_conditions,
primary_diagnosis_group,
discharge_disposition,
insurance_type,
emergency_admission,
num_medications_at_discharge,
has_follow_up_scheduled,
discharge_timestamp
FROM discharge_events
WHERE discharge_timestamp >= NOW() - INTERVAL '24 hours'
"""
discharges = pd.read_sql(query, engine)
print(f"Scoring {len(discharges)} patients discharged in last 24 hours")
# Preprocess (same encoding as the API)
feature_cols = [
"patient_age", "length_of_stay_days", "num_prior_admissions_12m",
"num_chronic_conditions", "primary_diagnosis_group",
"discharge_disposition", "insurance_type", "emergency_admission",
"num_medications_at_discharge", "has_follow_up_scheduled",
]
# Encoding maps (identical to the API's preprocessing module)
DIAGNOSIS_MAP = {
"cardiac": 0, "respiratory": 1, "digestive": 2,
"musculoskeletal": 3, "neurological": 4, "endocrine": 5, "other": 6,
}
DISPOSITION_MAP = {
"home": 0, "home_with_care": 1, "skilled_nursing": 2,
"rehab": 3, "other": 4,
}
INSURANCE_MAP = {
"commercial": 0, "medicare": 1, "medicaid": 2, "dual_eligible": 3,
}
features = discharges.copy()
features["primary_diagnosis_group"] = features["primary_diagnosis_group"].map(
DIAGNOSIS_MAP
)
features["discharge_disposition"] = features["discharge_disposition"].map(
DISPOSITION_MAP
)
features["insurance_type"] = features["insurance_type"].map(INSURANCE_MAP)
features["emergency_admission"] = features["emergency_admission"].astype(int)
features["has_follow_up_scheduled"] = features["has_follow_up_scheduled"].astype(int)
# Vectorized prediction (all patients at once)
X = features[feature_cols].values
probabilities = model.predict_proba(X)[:, 1]
# Build output
results = pd.DataFrame({
"patient_id": discharges["patient_id"],
"discharge_timestamp": discharges["discharge_timestamp"],
"readmission_probability": probabilities.round(4),
"risk_tier": pd.cut(
probabilities,
bins=[0, 0.2, 0.4, 0.6, 1.0],
labels=["low", "moderate", "high", "very_high"],
),
"scored_at": datetime.utcnow(),
"model_version": "v1.2.0",
})
# Sort by risk (highest first) for the care management team
results = results.sort_values("readmission_probability", ascending=False)
# Write to the care management table
results.to_sql(
"care_management_scores",
engine,
if_exists="append",
index=False,
)
print(f"Scored {len(results)} patients")
print(f"High/Very High risk: {(probabilities > 0.4).sum()}")
print(f"Top 10 risk scores: {sorted(probabilities, reverse=True)[:10]}")
Why Batch, Not API?
David could score these patients by calling the real-time API 800 times in a loop. Here is why he does not:
| Factor | API Loop (800 calls) | Batch Job (1 call) |
|---|---|---|
| Total time | ~800 x 50 ms = 40 seconds | ~2 seconds (vectorized) |
| Network overhead | 800 HTTP round trips | 0 (runs in the same VPC as the database) |
| SHAP computation | 800 SHAP calls (expensive) | Not needed (outreach team uses probability only) |
| Cost | API infrastructure must be scaled | Runs on a Fargate Spot task for pennies |
| Failure handling | Must handle 800 potential failures | One job succeeds or fails atomically |
The batch job is 20x faster, cheaper, simpler, and more reliable. The outreach team does not need SHAP explanations or sub-second latency. They need a ranked list of patients by 7 AM. The batch job delivers that.
Track 3: Population Health Reporting
The quarterly report is even simpler. It runs as a scheduled Airflow DAG that:
- Loads all 4.2 million member records from the data warehouse
- Scores them in batches of 50,000 (to fit in memory)
- Aggregates by geography, diagnosis group, and demographic segment
- Writes summary statistics to a reporting table
- Generates a PDF report with matplotlib
# population_health_scorer.py (simplified)
import pandas as pd
import joblib
import numpy as np
model = joblib.load("model/readmission_model.joblib")
# Process in chunks to manage memory
chunk_size = 50_000
all_results = []
for chunk in pd.read_sql(
"SELECT * FROM member_features",
engine,
chunksize=chunk_size,
):
X = preprocess(chunk) # Same encoding as above
probs = model.predict_proba(X)[:, 1]
chunk["readmission_probability"] = probs
all_results.append(chunk[["member_id", "region", "diagnosis_group",
"age_group", "readmission_probability"]])
results = pd.concat(all_results, ignore_index=True)
# Aggregate for the report
summary = results.groupby(["region", "diagnosis_group"]).agg(
mean_risk=("readmission_probability", "mean"),
high_risk_count=("readmission_probability", lambda x: (x > 0.4).sum()),
total_members=("readmission_probability", "count"),
).reset_index()
summary["high_risk_pct"] = (
summary["high_risk_count"] / summary["total_members"] * 100
).round(1)
print(summary.head(10))
This job takes 15 minutes to score 4.2 million members. It runs quarterly. Nobody is waiting for it. A real-time API would be absurd here.
The Shared Model Contract
The most important design decision is not the deployment pattern --- it is the shared preprocessing contract. All three tracks (real-time API, daily batch, quarterly report) must encode features identically. If the API maps "cardiac" to 0 but the batch job maps it to 1, the predictions will be silently wrong.
David solves this by extracting the encoding logic into a shared Python package:
medscore-ml/
medscore_ml/
__init__.py
preprocessing.py # Shared encoding logic
schemas.py # Pydantic models
api/
app.py # FastAPI (imports medscore_ml.preprocessing)
batch/
daily_scorer.py # Daily batch (imports medscore_ml.preprocessing)
quarterly_scorer.py # Quarterly batch (imports medscore_ml.preprocessing)
model/
readmission_model.joblib
pyproject.toml
# medscore_ml/preprocessing.py
# Single source of truth for feature encoding
DIAGNOSIS_MAP = {
"cardiac": 0, "respiratory": 1, "digestive": 2,
"musculoskeletal": 3, "neurological": 4, "endocrine": 5, "other": 6,
}
DISPOSITION_MAP = {
"home": 0, "home_with_care": 1, "skilled_nursing": 2,
"rehab": 3, "other": 4,
}
INSURANCE_MAP = {
"commercial": 0, "medicare": 1, "medicaid": 2, "dual_eligible": 3,
}
FEATURE_ORDER = [
"patient_age", "length_of_stay_days", "num_prior_admissions_12m",
"num_chronic_conditions", "primary_diagnosis_group",
"discharge_disposition", "insurance_type", "emergency_admission",
"num_medications_at_discharge", "has_follow_up_scheduled",
]
def encode_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Apply the standard encoding to a DataFrame.
Used by both the API and the batch jobs.
"""
encoded = df.copy()
encoded["primary_diagnosis_group"] = encoded[
"primary_diagnosis_group"
].map(DIAGNOSIS_MAP)
encoded["discharge_disposition"] = encoded[
"discharge_disposition"
].map(DISPOSITION_MAP)
encoded["insurance_type"] = encoded["insurance_type"].map(INSURANCE_MAP)
encoded["emergency_admission"] = encoded["emergency_admission"].astype(int)
encoded["has_follow_up_scheduled"] = encoded[
"has_follow_up_scheduled"
].astype(int)
return encoded[FEATURE_ORDER]
The API imports encode_dataframe. The batch jobs import encode_dataframe. When the encoding changes (a new diagnosis category, a renamed feature), the change is made once in preprocessing.py, tested once, and deployed to all three tracks.
Warning
--- The most dangerous bug in a multi-track deployment is a preprocessing mismatch between the real-time and batch paths. The predictions will be wrong, and nothing will crash. No error, no exception, no alert. Just silently incorrect predictions served to clinicians. A shared preprocessing module prevents this.
Decision Framework
David summarizes the decision criteria for future models:
| Question | If Yes --> Real-Time | If Yes --> Batch |
|---|---|---|
| Is a human waiting for the result? | X | |
| Is the result needed for all records on a schedule? | X | |
| Does latency need to be < 1 second? | X | |
| Are we scoring > 10,000 records at once? | X | |
| Does the downstream system speak HTTP? | X | |
| Does the downstream system read from a database? | X | |
| Do we need SHAP explanations per record? | X (with caching) | |
| Is cost a primary concern? | X |
Many real-world systems use both: a real-time API for interactive use cases and a batch job for scheduled bulk scoring. The two are not in competition. They are complementary.
Lessons Learned
-
One model, multiple serving paths. The same model artifact powers three use cases through two deployment patterns. Training happens once. Serving happens through whichever interface matches the use case.
-
Batch is not a lesser deployment pattern. It is the correct pattern when nobody is waiting and the volume is high. A batch job scoring 4.2 million records for $0.50 on Fargate Spot is better engineering than an API handling 4.2 million requests at $150/month.
-
The preprocessing contract is the most important artifact. More important than the model, more important than the API code. If the encoding is wrong, the predictions are wrong, and nobody will notice until a clinician makes a decision based on a bad score.
-
Healthcare requires a disclaimer on every prediction. This is not optional. Bake it into the response schema so it cannot be forgotten.
-
The business requirement determines the deployment pattern, not the model complexity. A simple logistic regression and a complex XGBoost ensemble both face the same deployment question: who needs the prediction, how fast, and how often?
This case study supports Chapter 31: Model Deployment. Return to the chapter for the foundational concepts.