Chapter 27: Exercises

Exercises are graded by difficulty: - One star (*): Apply the technique from the chapter to a new dataset or scenario - Two stars (**): Extend the technique or combine it with a previous chapter's methods - Three stars (***): Derive a result, implement from scratch, or design a system component - Four stars (****): Research-level problems that connect to open questions in the field


DAGs and Pipeline Structure

Exercise 27.1 (*)

Consider the following ML pipeline for a fraud detection system:

  1. Extract transaction logs from a data warehouse
  2. Extract user profile data from a user service API
  3. Join transaction logs with user profiles
  4. Compute features (transaction velocity, spending deviation, time-of-day patterns)
  5. Train a fraud classification model (XGBoost)
  6. Evaluate on a held-out set
  7. Register the model if it passes quality gates
  8. Update the online feature store with new feature values

(a) Draw the DAG for this pipeline. Identify which tasks can run in parallel.

(b) Using the PipelineDAG class from Section 27.2, implement this DAG in Python and call get_parallel_stages() to verify your answer from (a).

(c) The pipeline currently takes 4 hours end-to-end. The two extraction tasks each take 45 minutes. How much time does parallelizing the extractions save?


Exercise 27.2 (*)

Explain the difference between a task's logical date (data interval) and its execution date. For each of the following scenarios, specify the logical date and the execution date:

  1. A daily pipeline scheduled at 2am UTC processes yesterday's data. Today is March 15.
  2. A weekly pipeline scheduled for Monday at 6am processes the previous Monday–Sunday. This Monday is March 17.
  3. A backfill run on March 20 reprocesses data for March 10.
  4. A sensor-triggered pipeline runs at 11:47am when an upstream file arrives. The file contains data for the previous day, March 14.

Exercise 27.3 (*)

The topological_sort() method in Section 27.2 uses Kahn's algorithm. Trace the algorithm execution on the StreamRec training pipeline DAG, showing the state of the queue and the result list after each iteration.


Exercise 27.4 (**)

Extend the PipelineDAG class to support conditional edges — dependencies that should be followed only if a specific condition is met (e.g., "run register_models only if at least one model passed evaluation").

(a) Add a condition attribute to the PipelineTask class: a callable that takes a dictionary of upstream task results and returns a boolean.

(b) Modify get_parallel_stages() to skip tasks whose conditions are not met, propagating the skip status to downstream tasks.

(c) Demonstrate your implementation with a pipeline where model registration is conditional on evaluation results.


Framework Comparison

Exercise 27.5 (*)

For each of the following operational requirements, identify which framework feature addresses it and explain how:

  1. "The pipeline must wait until the upstream data ingestion DAG completes before starting extraction."
  2. "No more than 4 GPU training jobs should run concurrently across all pipelines."
  3. "Small metadata (file paths, row counts, evaluation metrics) must be passed between tasks."
  4. "The pipeline must use different storage backends in development (local) and production (S3)."
  5. "When the evaluation task fails, the registration task should still run (to register whichever models passed)."

For each, provide the solution in both Airflow and Dagster.


Exercise 27.6 (**)

Implement the StreamRec pipeline's validate_data task in all three frameworks (Airflow, Dagster, Prefect), ensuring that the core validation logic is shared across all implementations. Use the strategy of separating business logic (validation) from orchestration concerns (context access, XCom, IO managers).


Exercise 27.7 (**)

The chapter notes that Airflow's XCom has a default size limit of 48KB. Design a pattern for passing large intermediate datasets (e.g., a 2GB training DataFrame) between Airflow tasks without using XCom for the data itself. Your solution should:

(a) Store the data in an external store (e.g., S3) with a deterministic, partition-keyed path.

(b) Pass only the path through XCom.

(c) Ensure idempotency: if the upstream task is rerun, the downstream task reads the correct version.

(d) Implement this pattern with a helper function that both tasks use.


Exercise 27.8 (***)

Evaluate the three orchestration frameworks against the following criteria for a company with 50 data scientists, 200 pipelines, and strict compliance requirements (SOC 2, HIPAA). Score each framework 1-5 on each criterion and justify your scores.

Criterion Airflow Dagster Prefect
Enterprise maturity
Data lineage and auditing
Role-based access control
Kubernetes-native deployment
Testing and CI/CD integration
Operational observability
Team onboarding effort

Idempotency

Exercise 27.9 (*)

For each of the following pipeline tasks, determine whether it is idempotent. If not, explain why and propose a modification to make it idempotent.

  1. A task that reads from a Delta Lake table using a specific version number.
  2. A task that appends training metrics to a Postgres table.
  3. A task that uploads a model artifact to S3 at a path containing the current timestamp.
  4. A task that sends a Slack notification when training completes.
  5. A task that increments a "pipeline runs" counter in Redis.
  6. A task that writes a Parquet file to a path determined by the logical date.

Exercise 27.10 (**)

Implement the idempotent_write pattern from Section 27.6 for S3 using boto3. Your implementation must handle the case where S3 does not support atomic rename — instead, use the "write to temporary key, then copy to final key, then delete temporary key" pattern.

import boto3
from typing import Callable

def idempotent_s3_write(
    data: bytes,
    bucket: str,
    final_key: str,
) -> str:
    """Write data to S3 idempotently.

    Uses a temporary key with copy-and-delete to simulate
    atomic writes on S3.

    Args:
        data: Bytes to write.
        bucket: S3 bucket name.
        final_key: Final S3 key for the object.

    Returns:
        S3 URI of the written object.
    """
    # Your implementation here
    pass

Exercise 27.11 (**)

A pipeline computes daily user engagement features and writes them to a feature table. The feature computation depends on a rolling 7-day window of user interactions. Is this task idempotent? Under what conditions could re-running it produce different results? Design a strategy that ensures idempotency even for window-based computations.


Failure Handling

Exercise 27.12 (*)

A pipeline has 5 sequential tasks, each with a 10% independent failure probability. Tasks have a retry policy with max 2 retries (3 total attempts per task).

(a) What is the probability that a single task succeeds after up to 3 attempts?

(b) What is the probability that the entire pipeline succeeds in a single run (all 5 tasks succeed with retries)?

(c) If the pipeline runs daily for a year (365 days), how many days is it expected to fail?


Exercise 27.13 (**)

Extend the RetryConfig class from Section 27.7 to support circuit breaker behavior: if a task fails $n$ consecutive times across different pipeline runs (not retries within a single run), the task is automatically disabled and a P0 alert is sent.

(a) Add a CircuitBreakerConfig dataclass with failure_threshold, reset_timeout_minutes, and half_open_max_attempts attributes.

(b) Implement the circuit breaker state machine (CLOSED, OPEN, HALF-OPEN) as a class.

(c) Integrate the circuit breaker with the retry policy: retries happen within a single run (inner loop), while the circuit breaker operates across runs (outer loop).


Exercise 27.14 (**)

Design an alerting strategy for the following scenario: the StreamRec pipeline runs daily, and on average produces one P3 (low-severity) alert per week, one P2 (medium) per month, and one P1 (high) per quarter.

(a) For each severity level, specify the notification channel (email, Slack, PagerDuty), the expected response time, and the escalation policy if the alert is not acknowledged.

(b) The team is experiencing alert fatigue from P3 alerts. Propose two strategies to reduce alert fatigue without hiding genuine issues.

(c) Implement a AlertThrottler class that suppresses duplicate alerts within a configurable window (e.g., do not re-alert for the same task failure within 2 hours).


Exercise 27.15 (**)

Implement the DeadLetterQueue class from Section 27.7 with the following extension: the DLQ should support automatic reprocessing. Records in the DLQ are retried after a configurable delay, and if they fail again, they are moved to a permanent failure store.


Backfill

Exercise 27.16 (*)

The StreamRec pipeline was down from March 8 to March 14 (7 days). Each daily run takes 3.5 hours on average and requires 4 A100 GPUs. The team has access to 12 A100 GPUs total.

(a) How long would a sequential backfill take?

(b) How long would a parallel backfill with max_parallel=3 take?

(c) The team chooses a prioritized strategy (newest first, parallel with max_parallel=3). After processing March 14, 13, and 12 in the first batch, they discover that the data for March 10 is corrupted and cannot be processed. How should they handle this?


Exercise 27.17 (**)

Design a backfill safety checklist — a procedure that the team follows before executing any backfill. The checklist should include at least 8 items covering: scope verification, resource planning, output isolation, comparison testing, rollback planning, communication, and monitoring.


Exercise 27.18 (***)

The StreamRec feature computation uses a rolling 7-day window. When backfilling March 8–14, the features for March 8 depend on data from March 1–7 (which was processed correctly before the outage). But the features for March 12 depend on data from March 5–11, which includes dates that are themselves being backfilled.

(a) Explain why parallel backfill would produce incorrect features in this scenario.

(b) Design a backfill strategy that handles rolling window dependencies correctly. Specify the execution order and the minimum number of sequential batches required.

(c) Generalize: for a rolling window of $w$ days and a backfill of $n$ days, what is the minimum number of sequential batches required, and what is the maximum parallelism per batch?


Pipeline Versioning and Artifacts

Exercise 27.19 (*)

Given a model currently serving in production, list all the artifacts and metadata you would need to fully reproduce the model from scratch. Organize your list into categories: data artifacts, code artifacts, configuration artifacts, and environment artifacts.


Exercise 27.20 (**)

Implement a PipelineRunMetadata tracker that integrates with both MLflow and Dagster. The tracker should:

(a) Capture the git SHA of the pipeline code at the start of the run.

(b) Record the Delta Lake version of each input data table.

(c) Link the MLflow run IDs for each trained model to the pipeline run.

(d) Write the complete metadata record as a JSON artifact in both MLflow and the Dagster asset metadata.


Exercise 27.21 (***)

Design a pipeline diff tool that compares two pipeline runs (e.g., yesterday's run and today's run) and reports:

  1. Which tasks changed (new tasks, removed tasks, modified task code)
  2. Which configuration parameters changed
  3. Which input data changed (row count, schema, value distribution statistics)
  4. Which output metrics changed

Implement the comparison for the data and metrics dimensions using pandas.


Testing

Exercise 27.22 (*)

Write unit tests for the DataContract class from Section 27.10. Your test suite should cover:

  1. A DataFrame that passes all contract checks
  2. A DataFrame missing a required column
  3. A DataFrame with an incorrect column dtype
  4. A DataFrame with null values in a non-null column
  5. A DataFrame below the minimum row count
  6. A DataFrame that violates multiple contract clauses simultaneously

Exercise 27.23 (**)

The integration test in Section 27.10 uses unittest.mock.patch to mock external dependencies (Feast, MLflow). Identify three risks of excessive mocking in integration tests and propose a testing strategy that uses test containers (lightweight Docker containers) for Feast and MLflow instead of mocks.


Exercise 27.24 (***)

Implement a pipeline smoke test that runs the full StreamRec training pipeline on a 1% sample of production data. The smoke test should:

(a) Sample 1% of the most recent day's interactions deterministically (same seed produces same sample).

(b) Run all pipeline stages with reduced resource requirements (1 GPU instead of 4, 1 epoch instead of 10).

(c) Verify that all inter-stage data contracts are satisfied.

(d) Complete in under 15 minutes.

(e) Run automatically before every merge to the main branch (CI integration).

Provide the implementation as a pytest test file with Dagster's build_asset_context for testing assets without running the full Dagster runtime.


Design and Architecture

Exercise 27.25 (**)

The MediCore pharmaceutical pipeline (Case Study 2) requires running a causal analysis pipeline monthly. The pipeline:

  1. Extracts clinical trial data from an EDC (Electronic Data Capture) system
  2. Applies inclusion/exclusion criteria
  3. Computes covariates and propensity scores
  4. Runs doubly robust estimation (Chapter 18)
  5. Generates a regulatory report

Design the pipeline DAG. For each task, specify: (a) the timeout, (b) the retry policy, (c) the idempotency strategy, and (d) the data contract with the next task.


Exercise 27.26 (***)

You are migrating a team from Airflow to Dagster. The team has 15 Airflow DAGs with 180 total tasks. Design a migration plan that:

(a) Identifies which DAGs to migrate first (scoring criteria).

(b) Runs Airflow and Dagster in parallel during the transition.

(c) Validates that the Dagster pipeline produces identical outputs to the Airflow pipeline for the same data interval.

(d) Handles the case where some DAGs depend on other DAGs across frameworks.


Exercise 27.27 (***)

Design a multi-tenant orchestration platform for an organization with 10 ML teams, each running 5–20 pipelines. The platform must provide:

  1. Resource isolation (one team's pipeline cannot starve another's)
  2. Shared infrastructure (GPU pool, feature store, model registry)
  3. Self-service pipeline deployment (teams deploy without platform team involvement)
  4. Cost attribution (track compute cost per team per pipeline)
  5. Common observability (unified alerting and monitoring)

Specify the architecture, Kubernetes resource configuration, and Dagster/Airflow configuration that achieves these requirements.


Exercise 27.28 (****)

The chapter treats pipeline orchestration as a DAG scheduling problem. Formally model the problem as follows:

(a) Define the pipeline scheduling problem: given a DAG $G = (V, E)$ with task durations $d_v$ for each $v \in V$, $k$ parallel execution slots, and a deadline $T$, determine whether all tasks can complete by $T$.

(b) Prove that this problem is NP-hard by reduction from multiprocessor scheduling.

(c) Describe the greedy heuristic used by Airflow's scheduler (longest-processing-time-first) and analyze its approximation ratio for the makespan minimization objective.

(d) Propose a practical improvement to Airflow's scheduler that accounts for task duration estimates. How would you obtain reliable duration estimates?


Progressive Project: Milestone 11

Exercise 27.29 (***)

M11: Build the StreamRec Training Pipeline in Dagster

Implement the complete StreamRec training pipeline as a set of Dagster software-defined assets. Your implementation must include:

  1. Assets: raw_interactions, validated_interactions, training_features, retrieval_model, ranking_model, retrieval_evaluation, ranking_evaluation, registered_models
  2. Partitioning: Daily partitions with backfill support
  3. IO Manager: S3-backed (use local filesystem for development via resource configuration)
  4. Retry policies: Exponential backoff on training tasks (max 2 retries)
  5. Data contracts: Validate the output schema of each asset before passing to the next
  6. Metadata: Log row counts, column statistics, MLflow run IDs, and evaluation metrics as Dagster metadata
  7. Tests: At least 5 unit tests for individual asset logic and 1 integration test for the full pipeline

Use the Dagster implementation from Section 27.4 as your starting point. Extend it with the testing patterns from Section 27.10 and the failure handling patterns from Section 27.7.

Deliverable: A working Dagster project that can be run with dagster dev on synthetic data.


Exercise 27.30 (****)

M11 Extension: Multi-Framework Comparison

Implement the same pipeline in both Airflow and Prefect (in addition to the Dagster implementation from Exercise 27.29). Run all three implementations on the same synthetic dataset and compare:

  1. Lines of code (framework boilerplate vs. business logic)
  2. Time to implement (track your hours)
  3. Ease of testing (can you run unit tests without the orchestrator runtime?)
  4. Backfill experience (backfill 7 days and report the workflow)
  5. Debugging experience (introduce a deliberate bug in the validation logic and compare how each framework reports it)

Write a 2-page comparison report with a recommendation for which framework you would choose for a team of 5 ML engineers building 10 pipelines.