Case Study 33.2: Python Data Pipeline from DB2 to Data Lake

Background

Meridian National Bank's data engineering team needs to build an automated pipeline that extracts data from DB2 (the system of record) and loads it into a data lake (cloud object storage) for use by the analytics, data science, and regulatory reporting teams.

The source system is DB2 for z/OS, containing: - ACCOUNTS: 4.2 million rows, 45 columns - TRANSACTIONS: 450 million rows, 22 columns (7 years of history, growing at 200,000 rows/day) - CUSTOMERS: 1.8 million rows, 38 columns - LOANS: 320,000 rows, 52 columns

The pipeline must run daily (incremental) with weekly full snapshots.

Challenges

Challenge 1: Data Volume

The TRANSACTIONS table is 450 million rows. Loading this entirely into a pandas DataFrame would require approximately 80 GB of memory — far beyond the 32 GB available on the pipeline server.

Challenge 2: DB2 z/OS Resource Impact

Long-running queries against the production DB2 subsystem risk: - Holding locks that interfere with batch processing - Consuming CPU (MSU) that increases the monthly software bill - Filling up the DB2 buffer pool with pipeline data, evicting hot OLTP pages

Challenge 3: Data Type Fidelity

DB2 DECIMAL columns must preserve exact precision through the pipeline. Floating-point conversion (common in pandas with numpy) would corrupt financial amounts.

Challenge 4: Incremental Extraction

The daily pipeline must extract only new and changed data. TRANSACTIONS has no explicit "last modified" timestamp — the team must use the transaction date column.

Solution Architecture

DB2 for z/OS (Production)
        │
        │  JDBC/DRDA (Type 4)
        ▼
Python Pipeline Server
  ├── ibm_db (direct extraction)
  ├── pandas (transformation)
  └── pyarrow (Parquet output)
        │
        │  Object Storage API
        ▼
Data Lake (S3-compatible)
  ├── raw/          (exact DB2 extract)
  ├── curated/      (cleaned, partitioned)
  └── analytics/    (aggregated summaries)

Implementation

Database Connection Layer

import ibm_db
import ibm_db_dbi
import pandas as pd
from sqlalchemy import create_engine
from datetime import date, timedelta
import pyarrow as pa
import pyarrow.parquet as pq
import logging
import time

logger = logging.getLogger(__name__)


class DB2Extractor:
    """Extracts data from DB2 with resource-conscious patterns."""

    def __init__(self, config):
        self.config = config
        self.engine = create_engine(
            f"db2+ibm_db://{config['user']}:{config['password']}@"
            f"{config['host']}:{config['port']}/{config['database']}",
            pool_size=3,           # Only 3 connections — be gentle on DB2
            max_overflow=2,
            pool_recycle=900,      # 15 minutes
            pool_pre_ping=True,
            connect_args={
                "CURRENTSCHEMA": "MERIDIAN",
                "QUERYTIMEOUT": "600",  # 10 minute query timeout
            }
        )

    def extract_incremental_transactions(self, extract_date):
        """Extract one day of transactions using chunked read."""
        sql = """
            SELECT TXN_ID, ACCOUNT_NUM, TXN_TYPE, TXN_AMOUNT,
                   TXN_DESC, TXN_DATE, TXN_TIMESTAMP,
                   NEW_BALANCE, BRANCH_ID, TELLER_ID,
                   CHANNEL, REFERENCE_NUM
            FROM MERIDIAN.TRANSACTIONS
            WHERE TXN_DATE = :extract_dt
            ORDER BY TXN_ID
            WITH UR
        """
        # WITH UR: Uncommitted Read isolation — no locks held,
        # minimal impact on OLTP workload

        chunks = []
        total_rows = 0

        for chunk in pd.read_sql(
            sql, self.engine,
            params={"extract_dt": str(extract_date)},
            chunksize=50000
        ):
            chunks.append(chunk)
            total_rows += len(chunk)
            logger.info(f"  Fetched chunk: {len(chunk)} rows "
                       f"(total: {total_rows})")

        if chunks:
            df = pd.concat(chunks, ignore_index=True)
        else:
            df = pd.DataFrame()

        logger.info(f"Extracted {total_rows} transactions "
                   f"for {extract_date}")
        return df

    def extract_full_table(self, table_name, order_col):
        """Extract full table with chunking for large tables."""
        sql = f"""
            SELECT * FROM MERIDIAN.{table_name}
            ORDER BY {order_col}
            WITH UR
        """

        chunks = []
        total_rows = 0

        for chunk in pd.read_sql(
            sql, self.engine,
            chunksize=100000
        ):
            chunks.append(chunk)
            total_rows += len(chunk)
            if total_rows % 500000 == 0:
                logger.info(f"  {table_name}: {total_rows} rows loaded")

        df = pd.concat(chunks, ignore_index=True)
        logger.info(f"Extracted {total_rows} rows from {table_name}")
        return df

    def extract_account_snapshot(self):
        """Extract current account state."""
        return self.extract_full_table("ACCOUNTS", "ACCOUNT_NUM")

    def extract_customer_snapshot(self):
        """Extract current customer state."""
        return self.extract_full_table("CUSTOMERS", "CUST_ID")

Data Transformation Layer

class DataTransformer:
    """Transforms extracted data for the data lake."""

    @staticmethod
    def ensure_decimal_precision(df, decimal_columns):
        """Ensure financial amounts maintain exact decimal precision.

        pandas with numpy uses float64 by default, which cannot
        represent all decimal values exactly. Convert to Python
        Decimal or string for precision-critical columns.
        """
        for col in decimal_columns:
            if col in df.columns:
                # Convert to string to preserve exact decimal repr
                df[col] = df[col].apply(
                    lambda x: str(x) if pd.notna(x) else None
                )
        return df

    @staticmethod
    def add_derived_columns(txn_df):
        """Add computed columns for analytics."""
        # Day of week (0=Monday)
        txn_df['TXN_DAY_OF_WEEK'] = pd.to_datetime(
            txn_df['TXN_DATE']).dt.dayofweek

        # Hour of day from timestamp
        txn_df['TXN_HOUR'] = pd.to_datetime(
            txn_df['TXN_TIMESTAMP']).dt.hour

        # Transaction size bucket
        txn_df['AMOUNT_BUCKET'] = pd.cut(
            txn_df['TXN_AMOUNT'].astype(float),
            bins=[0, 100, 1000, 10000, 50000, float('inf')],
            labels=['MICRO', 'SMALL', 'MEDIUM', 'LARGE', 'JUMBO']
        )

        # Is weekend flag
        txn_df['IS_WEEKEND'] = txn_df['TXN_DAY_OF_WEEK'].isin([5, 6])

        return txn_df

    @staticmethod
    def generate_daily_summary(txn_df):
        """Generate daily transaction summary by branch and type."""
        summary = txn_df.groupby(
            ['TXN_DATE', 'BRANCH_ID', 'TXN_TYPE', 'CHANNEL']
        ).agg(
            TXN_COUNT=('TXN_ID', 'count'),
            TOTAL_AMOUNT=('TXN_AMOUNT', lambda x: x.astype(float).sum()),
            AVG_AMOUNT=('TXN_AMOUNT', lambda x: x.astype(float).mean()),
            MAX_AMOUNT=('TXN_AMOUNT', lambda x: x.astype(float).max()),
            MIN_AMOUNT=('TXN_AMOUNT', lambda x: x.astype(float).min()),
        ).reset_index()

        return summary

Data Lake Writer

class DataLakeWriter:
    """Writes data to the data lake in Parquet format."""

    def __init__(self, base_path):
        self.base_path = base_path

    def write_partitioned(self, df, table_name, partition_cols,
                          extract_date):
        """Write DataFrame as partitioned Parquet files."""
        output_path = (
            f"{self.base_path}/curated/{table_name}"
        )

        # Convert to PyArrow Table for efficient Parquet writing
        table = pa.Table.from_pandas(df)

        pq.write_to_dataset(
            table,
            root_path=output_path,
            partition_cols=partition_cols,
            use_legacy_dataset=False,
        )

        logger.info(f"Wrote {len(df)} rows to {output_path}")

    def write_snapshot(self, df, table_name, extract_date):
        """Write a full table snapshot as a single Parquet file."""
        output_path = (
            f"{self.base_path}/raw/{table_name}/"
            f"{table_name}_{extract_date}.parquet"
        )

        df.to_parquet(output_path, index=False, engine='pyarrow')
        logger.info(f"Wrote snapshot: {output_path} "
                   f"({len(df)} rows)")

    def write_summary(self, df, summary_name, extract_date):
        """Write a summary/aggregate dataset."""
        output_path = (
            f"{self.base_path}/analytics/{summary_name}/"
            f"{summary_name}_{extract_date}.parquet"
        )

        df.to_parquet(output_path, index=False, engine='pyarrow')
        logger.info(f"Wrote summary: {output_path}")

Pipeline Orchestrator

class DailyPipeline:
    """Orchestrates the daily DB2-to-data-lake pipeline."""

    def __init__(self, db2_config, lake_base_path):
        self.extractor = DB2Extractor(db2_config)
        self.transformer = DataTransformer()
        self.writer = DataLakeWriter(lake_base_path)

    def run_daily(self, extract_date=None):
        """Run the daily incremental pipeline."""
        if extract_date is None:
            extract_date = date.today() - timedelta(days=1)

        logger.info(f"=== DAILY PIPELINE START: {extract_date} ===")
        start_time = time.time()
        metrics = {}

        try:
            # Step 1: Extract transactions
            logger.info("Step 1: Extracting transactions...")
            txn_df = self.extractor.extract_incremental_transactions(
                extract_date)
            metrics['transactions_extracted'] = len(txn_df)

            if len(txn_df) == 0:
                logger.warning(f"No transactions for {extract_date}")
                return metrics

            # Step 2: Transform
            logger.info("Step 2: Transforming data...")
            txn_df = self.transformer.ensure_decimal_precision(
                txn_df, ['TXN_AMOUNT', 'NEW_BALANCE'])
            txn_df = self.transformer.add_derived_columns(txn_df)

            # Step 3: Generate summaries
            logger.info("Step 3: Generating summaries...")
            daily_summary = self.transformer.generate_daily_summary(
                txn_df)
            metrics['summary_rows'] = len(daily_summary)

            # Step 4: Write to data lake
            logger.info("Step 4: Writing to data lake...")
            self.writer.write_snapshot(
                txn_df, "transactions", extract_date)
            self.writer.write_summary(
                daily_summary, "daily_txn_summary", extract_date)

            # Step 5: Write metrics back to DB2
            logger.info("Step 5: Recording pipeline metrics...")
            self._record_metrics(extract_date, metrics)

            elapsed = time.time() - start_time
            metrics['elapsed_seconds'] = round(elapsed, 1)
            logger.info(
                f"=== DAILY PIPELINE COMPLETE: {elapsed:.1f}s ===")
            logger.info(f"Metrics: {metrics}")

            return metrics

        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
            self._record_failure(extract_date, str(e))
            raise

    def run_weekly_snapshot(self):
        """Run the weekly full snapshot pipeline."""
        extract_date = date.today()
        logger.info(f"=== WEEKLY SNAPSHOT START: {extract_date} ===")

        # Account snapshot
        logger.info("Extracting account snapshot...")
        acct_df = self.extractor.extract_account_snapshot()
        acct_df = self.transformer.ensure_decimal_precision(
            acct_df,
            ['BALANCE', 'INTEREST_RATE', 'ACCRUED_INTEREST']
        )
        self.writer.write_snapshot(acct_df, "accounts", extract_date)

        # Customer snapshot
        logger.info("Extracting customer snapshot...")
        cust_df = self.extractor.extract_customer_snapshot()
        self.writer.write_snapshot(cust_df, "customers", extract_date)

        logger.info("=== WEEKLY SNAPSHOT COMPLETE ===")

    def _record_metrics(self, extract_date, metrics):
        """Write pipeline metrics to DB2 tracking table."""
        metrics_df = pd.DataFrame([{
            'PIPELINE_NAME': 'DAILY_TXN_EXTRACT',
            'EXTRACT_DATE': extract_date,
            'ROWS_EXTRACTED': metrics.get('transactions_extracted', 0),
            'SUMMARY_ROWS': metrics.get('summary_rows', 0),
            'STATUS': 'SUCCESS',
            'RUN_TIMESTAMP': pd.Timestamp.now(),
        }])

        metrics_df.to_sql(
            'PIPELINE_METRICS',
            self.extractor.engine,
            schema='MERIDIAN',
            if_exists='append',
            index=False,
        )

    def _record_failure(self, extract_date, error_msg):
        """Record pipeline failure in DB2 tracking table."""
        try:
            failure_df = pd.DataFrame([{
                'PIPELINE_NAME': 'DAILY_TXN_EXTRACT',
                'EXTRACT_DATE': extract_date,
                'ROWS_EXTRACTED': 0,
                'SUMMARY_ROWS': 0,
                'STATUS': 'FAILED',
                'ERROR_MSG': error_msg[:500],
                'RUN_TIMESTAMP': pd.Timestamp.now(),
            }])

            failure_df.to_sql(
                'PIPELINE_METRICS',
                self.extractor.engine,
                schema='MERIDIAN',
                if_exists='append',
                index=False,
            )
        except Exception:
            logger.error("Failed to record pipeline failure to DB2")

Performance Results

Daily Pipeline (200,000 transactions)

Phase Duration Notes
Extract from DB2 45 seconds 4 chunks of 50,000 rows
Transform 8 seconds Derived columns, precision fix
Generate summaries 3 seconds GroupBy aggregation
Write to data lake 12 seconds Parquet with snappy compression
Record metrics 1 second Single INSERT to DB2
Total 69 seconds

Weekly Full Snapshot

Table Rows Duration Parquet Size
ACCOUNTS 4,200,000 12 minutes 1.2 GB
CUSTOMERS 1,800,000 6 minutes 890 MB
TRANSACTIONS (7 yr) Not extracted weekly
Total 18 minutes

DB2 Resource Impact

Metric Without WITH UR With WITH UR
DB2 CPU consumed 4.2 seconds 3.8 seconds
Locks held 200,000 row locks 0 locks
Impact on OLTP Lock waits detected Zero impact
Data consistency Point-in-time consistent May read uncommitted

The WITH UR isolation level is appropriate for this pipeline because: - The data lake tolerates minor inconsistencies (uncommitted rows) - The pipeline runs against yesterday's data, which is fully committed - Zero lock impact on production OLTP workloads

Lessons Learned

  1. Chunked reading is essential. Loading 450M rows into memory fails. Using chunksize=50000 with read_sql keeps memory under 2 GB at all times.

  2. WITH UR is your friend for extracts. Read-only extract pipelines should almost always use Uncommitted Read to avoid impacting production workloads.

  3. Decimal precision requires attention. pandas uses numpy float64 by default, which cannot represent all DECIMAL values exactly. For financial data, convert to string or use Python's Decimal type before writing to Parquet.

  4. Parquet is the right output format. Columnar storage with snappy compression reduces the 4.2 million account snapshot from ~8 GB (CSV) to 1.2 GB (Parquet), while supporting efficient column-level reads for analytics queries.

  5. Limit concurrent connections. The pipeline uses only 3-5 DB2 connections. More connections would not make the extract faster (DB2 is the bottleneck, not the network) and would consume scarce z/OS resources.

  6. Write pipeline metrics to DB2. Storing pipeline run status in a DB2 table allows the operations team to monitor pipeline health using existing monitoring tools.

Discussion Questions

  1. How would you handle a scenario where the daily pipeline fails midway? Should it be idempotent (re-runnable without duplicates)?
  2. What changes would you make if the pipeline needed to capture UPDATE and DELETE operations (not just new INSERTs)?
  3. How would you modify this pipeline to run in real-time (streaming) instead of daily batch?
  4. What are the trade-offs between extracting from DB2 directly vs. reading from DB2's transaction log (log-based CDC)?