Case Study 33.2: Python Data Pipeline from DB2 to Data Lake
Background
Meridian National Bank's data engineering team needs to build an automated pipeline that extracts data from DB2 (the system of record) and loads it into a data lake (cloud object storage) for use by the analytics, data science, and regulatory reporting teams.
The source system is DB2 for z/OS, containing: - ACCOUNTS: 4.2 million rows, 45 columns - TRANSACTIONS: 450 million rows, 22 columns (7 years of history, growing at 200,000 rows/day) - CUSTOMERS: 1.8 million rows, 38 columns - LOANS: 320,000 rows, 52 columns
The pipeline must run daily (incremental) with weekly full snapshots.
Challenges
Challenge 1: Data Volume
The TRANSACTIONS table is 450 million rows. Loading this entirely into a pandas DataFrame would require approximately 80 GB of memory — far beyond the 32 GB available on the pipeline server.
Challenge 2: DB2 z/OS Resource Impact
Long-running queries against the production DB2 subsystem risk: - Holding locks that interfere with batch processing - Consuming CPU (MSU) that increases the monthly software bill - Filling up the DB2 buffer pool with pipeline data, evicting hot OLTP pages
Challenge 3: Data Type Fidelity
DB2 DECIMAL columns must preserve exact precision through the pipeline. Floating-point conversion (common in pandas with numpy) would corrupt financial amounts.
Challenge 4: Incremental Extraction
The daily pipeline must extract only new and changed data. TRANSACTIONS has no explicit "last modified" timestamp — the team must use the transaction date column.
Solution Architecture
DB2 for z/OS (Production)
│
│ JDBC/DRDA (Type 4)
▼
Python Pipeline Server
├── ibm_db (direct extraction)
├── pandas (transformation)
└── pyarrow (Parquet output)
│
│ Object Storage API
▼
Data Lake (S3-compatible)
├── raw/ (exact DB2 extract)
├── curated/ (cleaned, partitioned)
└── analytics/ (aggregated summaries)
Implementation
Database Connection Layer
import ibm_db
import ibm_db_dbi
import pandas as pd
from sqlalchemy import create_engine
from datetime import date, timedelta
import pyarrow as pa
import pyarrow.parquet as pq
import logging
import time
logger = logging.getLogger(__name__)
class DB2Extractor:
"""Extracts data from DB2 with resource-conscious patterns."""
def __init__(self, config):
self.config = config
self.engine = create_engine(
f"db2+ibm_db://{config['user']}:{config['password']}@"
f"{config['host']}:{config['port']}/{config['database']}",
pool_size=3, # Only 3 connections — be gentle on DB2
max_overflow=2,
pool_recycle=900, # 15 minutes
pool_pre_ping=True,
connect_args={
"CURRENTSCHEMA": "MERIDIAN",
"QUERYTIMEOUT": "600", # 10 minute query timeout
}
)
def extract_incremental_transactions(self, extract_date):
"""Extract one day of transactions using chunked read."""
sql = """
SELECT TXN_ID, ACCOUNT_NUM, TXN_TYPE, TXN_AMOUNT,
TXN_DESC, TXN_DATE, TXN_TIMESTAMP,
NEW_BALANCE, BRANCH_ID, TELLER_ID,
CHANNEL, REFERENCE_NUM
FROM MERIDIAN.TRANSACTIONS
WHERE TXN_DATE = :extract_dt
ORDER BY TXN_ID
WITH UR
"""
# WITH UR: Uncommitted Read isolation — no locks held,
# minimal impact on OLTP workload
chunks = []
total_rows = 0
for chunk in pd.read_sql(
sql, self.engine,
params={"extract_dt": str(extract_date)},
chunksize=50000
):
chunks.append(chunk)
total_rows += len(chunk)
logger.info(f" Fetched chunk: {len(chunk)} rows "
f"(total: {total_rows})")
if chunks:
df = pd.concat(chunks, ignore_index=True)
else:
df = pd.DataFrame()
logger.info(f"Extracted {total_rows} transactions "
f"for {extract_date}")
return df
def extract_full_table(self, table_name, order_col):
"""Extract full table with chunking for large tables."""
sql = f"""
SELECT * FROM MERIDIAN.{table_name}
ORDER BY {order_col}
WITH UR
"""
chunks = []
total_rows = 0
for chunk in pd.read_sql(
sql, self.engine,
chunksize=100000
):
chunks.append(chunk)
total_rows += len(chunk)
if total_rows % 500000 == 0:
logger.info(f" {table_name}: {total_rows} rows loaded")
df = pd.concat(chunks, ignore_index=True)
logger.info(f"Extracted {total_rows} rows from {table_name}")
return df
def extract_account_snapshot(self):
"""Extract current account state."""
return self.extract_full_table("ACCOUNTS", "ACCOUNT_NUM")
def extract_customer_snapshot(self):
"""Extract current customer state."""
return self.extract_full_table("CUSTOMERS", "CUST_ID")
Data Transformation Layer
class DataTransformer:
"""Transforms extracted data for the data lake."""
@staticmethod
def ensure_decimal_precision(df, decimal_columns):
"""Ensure financial amounts maintain exact decimal precision.
pandas with numpy uses float64 by default, which cannot
represent all decimal values exactly. Convert to Python
Decimal or string for precision-critical columns.
"""
for col in decimal_columns:
if col in df.columns:
# Convert to string to preserve exact decimal repr
df[col] = df[col].apply(
lambda x: str(x) if pd.notna(x) else None
)
return df
@staticmethod
def add_derived_columns(txn_df):
"""Add computed columns for analytics."""
# Day of week (0=Monday)
txn_df['TXN_DAY_OF_WEEK'] = pd.to_datetime(
txn_df['TXN_DATE']).dt.dayofweek
# Hour of day from timestamp
txn_df['TXN_HOUR'] = pd.to_datetime(
txn_df['TXN_TIMESTAMP']).dt.hour
# Transaction size bucket
txn_df['AMOUNT_BUCKET'] = pd.cut(
txn_df['TXN_AMOUNT'].astype(float),
bins=[0, 100, 1000, 10000, 50000, float('inf')],
labels=['MICRO', 'SMALL', 'MEDIUM', 'LARGE', 'JUMBO']
)
# Is weekend flag
txn_df['IS_WEEKEND'] = txn_df['TXN_DAY_OF_WEEK'].isin([5, 6])
return txn_df
@staticmethod
def generate_daily_summary(txn_df):
"""Generate daily transaction summary by branch and type."""
summary = txn_df.groupby(
['TXN_DATE', 'BRANCH_ID', 'TXN_TYPE', 'CHANNEL']
).agg(
TXN_COUNT=('TXN_ID', 'count'),
TOTAL_AMOUNT=('TXN_AMOUNT', lambda x: x.astype(float).sum()),
AVG_AMOUNT=('TXN_AMOUNT', lambda x: x.astype(float).mean()),
MAX_AMOUNT=('TXN_AMOUNT', lambda x: x.astype(float).max()),
MIN_AMOUNT=('TXN_AMOUNT', lambda x: x.astype(float).min()),
).reset_index()
return summary
Data Lake Writer
class DataLakeWriter:
"""Writes data to the data lake in Parquet format."""
def __init__(self, base_path):
self.base_path = base_path
def write_partitioned(self, df, table_name, partition_cols,
extract_date):
"""Write DataFrame as partitioned Parquet files."""
output_path = (
f"{self.base_path}/curated/{table_name}"
)
# Convert to PyArrow Table for efficient Parquet writing
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path=output_path,
partition_cols=partition_cols,
use_legacy_dataset=False,
)
logger.info(f"Wrote {len(df)} rows to {output_path}")
def write_snapshot(self, df, table_name, extract_date):
"""Write a full table snapshot as a single Parquet file."""
output_path = (
f"{self.base_path}/raw/{table_name}/"
f"{table_name}_{extract_date}.parquet"
)
df.to_parquet(output_path, index=False, engine='pyarrow')
logger.info(f"Wrote snapshot: {output_path} "
f"({len(df)} rows)")
def write_summary(self, df, summary_name, extract_date):
"""Write a summary/aggregate dataset."""
output_path = (
f"{self.base_path}/analytics/{summary_name}/"
f"{summary_name}_{extract_date}.parquet"
)
df.to_parquet(output_path, index=False, engine='pyarrow')
logger.info(f"Wrote summary: {output_path}")
Pipeline Orchestrator
class DailyPipeline:
"""Orchestrates the daily DB2-to-data-lake pipeline."""
def __init__(self, db2_config, lake_base_path):
self.extractor = DB2Extractor(db2_config)
self.transformer = DataTransformer()
self.writer = DataLakeWriter(lake_base_path)
def run_daily(self, extract_date=None):
"""Run the daily incremental pipeline."""
if extract_date is None:
extract_date = date.today() - timedelta(days=1)
logger.info(f"=== DAILY PIPELINE START: {extract_date} ===")
start_time = time.time()
metrics = {}
try:
# Step 1: Extract transactions
logger.info("Step 1: Extracting transactions...")
txn_df = self.extractor.extract_incremental_transactions(
extract_date)
metrics['transactions_extracted'] = len(txn_df)
if len(txn_df) == 0:
logger.warning(f"No transactions for {extract_date}")
return metrics
# Step 2: Transform
logger.info("Step 2: Transforming data...")
txn_df = self.transformer.ensure_decimal_precision(
txn_df, ['TXN_AMOUNT', 'NEW_BALANCE'])
txn_df = self.transformer.add_derived_columns(txn_df)
# Step 3: Generate summaries
logger.info("Step 3: Generating summaries...")
daily_summary = self.transformer.generate_daily_summary(
txn_df)
metrics['summary_rows'] = len(daily_summary)
# Step 4: Write to data lake
logger.info("Step 4: Writing to data lake...")
self.writer.write_snapshot(
txn_df, "transactions", extract_date)
self.writer.write_summary(
daily_summary, "daily_txn_summary", extract_date)
# Step 5: Write metrics back to DB2
logger.info("Step 5: Recording pipeline metrics...")
self._record_metrics(extract_date, metrics)
elapsed = time.time() - start_time
metrics['elapsed_seconds'] = round(elapsed, 1)
logger.info(
f"=== DAILY PIPELINE COMPLETE: {elapsed:.1f}s ===")
logger.info(f"Metrics: {metrics}")
return metrics
except Exception as e:
logger.error(f"Pipeline failed: {e}")
self._record_failure(extract_date, str(e))
raise
def run_weekly_snapshot(self):
"""Run the weekly full snapshot pipeline."""
extract_date = date.today()
logger.info(f"=== WEEKLY SNAPSHOT START: {extract_date} ===")
# Account snapshot
logger.info("Extracting account snapshot...")
acct_df = self.extractor.extract_account_snapshot()
acct_df = self.transformer.ensure_decimal_precision(
acct_df,
['BALANCE', 'INTEREST_RATE', 'ACCRUED_INTEREST']
)
self.writer.write_snapshot(acct_df, "accounts", extract_date)
# Customer snapshot
logger.info("Extracting customer snapshot...")
cust_df = self.extractor.extract_customer_snapshot()
self.writer.write_snapshot(cust_df, "customers", extract_date)
logger.info("=== WEEKLY SNAPSHOT COMPLETE ===")
def _record_metrics(self, extract_date, metrics):
"""Write pipeline metrics to DB2 tracking table."""
metrics_df = pd.DataFrame([{
'PIPELINE_NAME': 'DAILY_TXN_EXTRACT',
'EXTRACT_DATE': extract_date,
'ROWS_EXTRACTED': metrics.get('transactions_extracted', 0),
'SUMMARY_ROWS': metrics.get('summary_rows', 0),
'STATUS': 'SUCCESS',
'RUN_TIMESTAMP': pd.Timestamp.now(),
}])
metrics_df.to_sql(
'PIPELINE_METRICS',
self.extractor.engine,
schema='MERIDIAN',
if_exists='append',
index=False,
)
def _record_failure(self, extract_date, error_msg):
"""Record pipeline failure in DB2 tracking table."""
try:
failure_df = pd.DataFrame([{
'PIPELINE_NAME': 'DAILY_TXN_EXTRACT',
'EXTRACT_DATE': extract_date,
'ROWS_EXTRACTED': 0,
'SUMMARY_ROWS': 0,
'STATUS': 'FAILED',
'ERROR_MSG': error_msg[:500],
'RUN_TIMESTAMP': pd.Timestamp.now(),
}])
failure_df.to_sql(
'PIPELINE_METRICS',
self.extractor.engine,
schema='MERIDIAN',
if_exists='append',
index=False,
)
except Exception:
logger.error("Failed to record pipeline failure to DB2")
Performance Results
Daily Pipeline (200,000 transactions)
| Phase | Duration | Notes |
|---|---|---|
| Extract from DB2 | 45 seconds | 4 chunks of 50,000 rows |
| Transform | 8 seconds | Derived columns, precision fix |
| Generate summaries | 3 seconds | GroupBy aggregation |
| Write to data lake | 12 seconds | Parquet with snappy compression |
| Record metrics | 1 second | Single INSERT to DB2 |
| Total | 69 seconds |
Weekly Full Snapshot
| Table | Rows | Duration | Parquet Size |
|---|---|---|---|
| ACCOUNTS | 4,200,000 | 12 minutes | 1.2 GB |
| CUSTOMERS | 1,800,000 | 6 minutes | 890 MB |
| TRANSACTIONS (7 yr) | Not extracted weekly | — | — |
| Total | 18 minutes |
DB2 Resource Impact
| Metric | Without WITH UR |
With WITH UR |
|---|---|---|
| DB2 CPU consumed | 4.2 seconds | 3.8 seconds |
| Locks held | 200,000 row locks | 0 locks |
| Impact on OLTP | Lock waits detected | Zero impact |
| Data consistency | Point-in-time consistent | May read uncommitted |
The WITH UR isolation level is appropriate for this pipeline because:
- The data lake tolerates minor inconsistencies (uncommitted rows)
- The pipeline runs against yesterday's data, which is fully committed
- Zero lock impact on production OLTP workloads
Lessons Learned
-
Chunked reading is essential. Loading 450M rows into memory fails. Using
chunksize=50000withread_sqlkeeps memory under 2 GB at all times. -
WITH URis your friend for extracts. Read-only extract pipelines should almost always use Uncommitted Read to avoid impacting production workloads. -
Decimal precision requires attention. pandas uses numpy float64 by default, which cannot represent all DECIMAL values exactly. For financial data, convert to string or use Python's Decimal type before writing to Parquet.
-
Parquet is the right output format. Columnar storage with snappy compression reduces the 4.2 million account snapshot from ~8 GB (CSV) to 1.2 GB (Parquet), while supporting efficient column-level reads for analytics queries.
-
Limit concurrent connections. The pipeline uses only 3-5 DB2 connections. More connections would not make the extract faster (DB2 is the bottleneck, not the network) and would consume scarce z/OS resources.
-
Write pipeline metrics to DB2. Storing pipeline run status in a DB2 table allows the operations team to monitor pipeline health using existing monitoring tools.
Discussion Questions
- How would you handle a scenario where the daily pipeline fails midway? Should it be idempotent (re-runnable without duplicates)?
- What changes would you make if the pipeline needed to capture UPDATE and DELETE operations (not just new INSERTs)?
- How would you modify this pipeline to run in real-time (streaming) instead of daily batch?
- What are the trade-offs between extracting from DB2 directly vs. reading from DB2's transaction log (log-based CDC)?