Case Study 02: From Procedural to Object-Oriented with AI

Background

DataFlow Analytics is a mid-sized data consultancy that has been using a set of Python scripts to process client data since 2016. What started as a single 300-line script for parsing CSV files and generating summary reports has grown into a collection of 12 scripts totaling 8,500 lines of procedural Python. These scripts are the backbone of the company's data pipeline, processing 200+ client files per day.

The scripts were originally written by the company's founder, a domain expert with self-taught programming skills. They work — and work well — but maintaining them has become increasingly painful as the company has grown from 3 to 25 employees.

The Codebase

The main processing script, process_data.py, is 2,400 lines long and contains 47 functions. It follows a strictly procedural style:

# process_data.py (excerpt - representative of the style)
import csv
import os
import json
import sqlite3
from datetime import datetime

CONFIG = {
    "db_path": "/data/analytics.db",
    "output_dir": "/data/reports",
    "max_rows": 1000000,
    "log_file": "/var/log/dataflow.log",
}

errors = []
warnings = []
stats = {"rows_processed": 0, "rows_skipped": 0, "files_completed": 0}


def log_message(level, message):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    line = f"[{timestamp}] [{level}] {message}\n"
    with open(CONFIG["log_file"], "a") as f:
        f.write(line)
    if level == "ERROR":
        errors.append(message)
    elif level == "WARNING":
        warnings.append(message)


def connect_db():
    return sqlite3.connect(CONFIG["db_path"])


def read_csv_file(filepath):
    rows = []
    with open(filepath, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            rows.append(row)
    return rows


def validate_row(row, rules):
    for field, rule in rules.items():
        if field not in row:
            log_message("ERROR", f"Missing field: {field}")
            return False
        if rule == "numeric" and not row[field].replace(".", "").isdigit():
            log_message("WARNING", f"Non-numeric value in {field}: {row[field]}")
            return False
        if rule == "required" and not row[field].strip():
            log_message("ERROR", f"Empty required field: {field}")
            return False
    return True


def transform_row(row, mappings):
    result = {}
    for target, source in mappings.items():
        if callable(source):
            result[target] = source(row)
        elif source in row:
            result[target] = row[source]
        else:
            result[target] = None
    return result


def save_to_db(rows, table_name):
    conn = connect_db()
    cursor = conn.cursor()
    for row in rows:
        columns = ", ".join(row.keys())
        placeholders = ", ".join(["?" for _ in row])
        cursor.execute(
            f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})",
            list(row.values())
        )
    conn.commit()
    conn.close()


def generate_summary(rows, group_by_field):
    groups = {}
    for row in rows:
        key = row.get(group_by_field, "unknown")
        if key not in groups:
            groups[key] = {"count": 0, "total": 0.0}
        groups[key]["count"] += 1
        if "amount" in row:
            try:
                groups[key]["total"] += float(row["amount"])
            except (ValueError, TypeError):
                pass
    return groups


def write_report(summary, output_path, report_format):
    if report_format == "csv":
        with open(output_path, "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["Group", "Count", "Total"])
            for group, data in summary.items():
                writer.writerow([group, data["count"], data["total"]])
    elif report_format == "json":
        with open(output_path, "w") as f:
            json.dump(summary, f, indent=2)
    else:
        log_message("ERROR", f"Unknown format: {report_format}")


# ... 39 more functions following the same pattern ...


def main():
    log_message("INFO", "Starting data processing")
    files = os.listdir(CONFIG["input_dir"])
    for filepath in files:
        if not filepath.endswith(".csv"):
            continue
        full_path = os.path.join(CONFIG["input_dir"], filepath)
        rows = read_csv_file(full_path)
        valid_rows = []
        for row in rows:
            if validate_row(row, VALIDATION_RULES):
                transformed = transform_row(row, FIELD_MAPPINGS)
                valid_rows.append(transformed)
                stats["rows_processed"] += 1
            else:
                stats["rows_skipped"] += 1
        if valid_rows:
            save_to_db(valid_rows, "processed_data")
            summary = generate_summary(valid_rows, "category")
            output_path = os.path.join(
                CONFIG["output_dir"],
                filepath.replace(".csv", "_report.json")
            )
            write_report(summary, output_path, "json")
        stats["files_completed"] += 1
    log_message("INFO", f"Processing complete. Stats: {stats}")

The Problems

  1. Global mutable state. The CONFIG, errors, warnings, and stats dictionaries are module-level globals modified by functions throughout the script. This makes behavior unpredictable and testing impossible.

  2. No separation of concerns. File I/O, validation, transformation, database access, and reporting are all interleaved. Adding a new data source (for example, JSON or XML files) requires modifying the same functions that handle CSV.

  3. No testability. Functions depend on file system access, database connections, and global state. There are no tests, and adding tests would require substantial refactoring.

  4. Code duplication. Similar patterns (read file, validate, transform, save) are repeated across the 12 scripts with slight variations. Bug fixes in one script are not propagated to others.

  5. No error recovery. If processing fails halfway through a file, there is no way to resume. The entire file must be reprocessed.

  6. Configuration is hardcoded. Database paths, output directories, and other settings are embedded in the source code, making it impossible to run in different environments without modifying the scripts.

The Refactoring Approach

The team assigned a mid-level developer, guided by an AI coding assistant, to refactor the procedural scripts into a clean object-oriented design over eight weeks. The developer had experience with OOP but had not performed a large-scale refactoring before.

Week 1: Understanding and Characterization

AI-Assisted Analysis. The developer fed the main script to the AI assistant:

Prompt: "Analyze this 2,400-line procedural Python script.
Identify:
1. All the distinct responsibilities it handles
2. The data flow from input to output
3. Global state and its effects
4. Code that is duplicated or nearly duplicated
5. Functions that do more than one thing"

The AI identified seven distinct responsibilities: file reading, data validation, data transformation, database persistence, report generation, logging, and orchestration. It also flagged that save_to_db was vulnerable to SQL injection through the dynamically constructed table name.

Characterization Tests. Using AI, the developer generated characterization tests for the core processing pipeline. Since the functions depended on files and databases, the tests used temporary directories and in-memory SQLite databases:

def test_validate_row_captures_behavior():
    """Characterization: validate_row with standard rules."""
    rules = {"name": "required", "amount": "numeric"}

    assert validate_row({"name": "Alice", "amount": "100"}, rules) is True
    assert validate_row({"name": "", "amount": "100"}, rules) is False
    assert validate_row({"name": "Alice", "amount": "abc"}, rules) is False
    assert validate_row({"name": "Alice"}, rules) is False  # missing field

The developer created 43 characterization tests covering the main processing functions, achieving coverage over the code paths that would be affected by the refactoring.

Weeks 2-3: Designing the Object-Oriented Architecture

The developer asked the AI to propose a class hierarchy:

Prompt: "Given the seven responsibilities identified in this
procedural script (file reading, validation, transformation,
persistence, reporting, logging, orchestration), design an
object-oriented architecture using these design patterns where
appropriate: Strategy, Template Method, Repository, and
Pipeline. Show me the class diagram and explain the
responsibilities of each class."

The AI proposed the following architecture:

DataProcessor (orchestrator)
├── DataReader (abstract) ← CSVReader, JSONReader, XMLReader
├── DataValidator
│   └── ValidationRule (abstract) ← RequiredRule, NumericRule, etc.
├── DataTransformer
│   └── FieldMapping (abstract) ← DirectMapping, ComputedMapping
├── DataRepository (abstract) ← SQLiteRepository, PostgresRepository
├── ReportGenerator (abstract) ← CSVReportGenerator, JSONReportGenerator
├── ProcessingLogger
└── ProcessingConfig

The developer discussed this design with the AI, iterating on questions like "Should ValidationRule be a class or a function?" and "Is the Repository pattern overkill for this use case?" After two rounds of refinement, they settled on the design.

Weeks 3-5: Incremental Extraction

The refactoring proceeded one class at a time, starting from the lowest-level components (fewest dependencies) and working up to the orchestrator.

Step 1: Extract ProcessingConfig. Replace global CONFIG with a configuration class:

from dataclasses import dataclass, field
from pathlib import Path


@dataclass
class ProcessingConfig:
    """Configuration for the data processing pipeline."""

    db_path: Path = Path("/data/analytics.db")
    output_dir: Path = Path("/data/reports")
    input_dir: Path = Path("/data/input")
    max_rows: int = 1_000_000
    log_file: Path = Path("/var/log/dataflow.log")
    report_format: str = "json"

    @classmethod
    def from_file(cls, config_path: Path) -> "ProcessingConfig":
        """Load configuration from a JSON file."""
        import json
        with open(config_path) as f:
            data = json.load(f)
        return cls(**{k: Path(v) if k.endswith(("_path", "_dir", "_file"))
                      else v for k, v in data.items()})

    @classmethod
    def from_env(cls) -> "ProcessingConfig":
        """Load configuration from environment variables."""
        import os
        return cls(
            db_path=Path(os.getenv("DF_DB_PATH", "/data/analytics.db")),
            output_dir=Path(os.getenv("DF_OUTPUT_DIR", "/data/reports")),
            input_dir=Path(os.getenv("DF_INPUT_DIR", "/data/input")),
            max_rows=int(os.getenv("DF_MAX_ROWS", "1000000")),
            log_file=Path(os.getenv("DF_LOG_FILE", "/var/log/dataflow.log")),
        )

After each extraction, the developer ran the characterization tests to verify nothing broke. The original script was updated to use ProcessingConfig while maintaining identical behavior.

Step 2: Extract ProcessingLogger. Replace global errors, warnings, and the log_message function:

import logging
from dataclasses import dataclass, field


@dataclass
class ProcessingStats:
    """Tracks processing statistics."""

    rows_processed: int = 0
    rows_skipped: int = 0
    files_completed: int = 0
    errors: list[str] = field(default_factory=list)
    warnings: list[str] = field(default_factory=list)


class ProcessingLogger:
    """Structured logging for the data processing pipeline."""

    def __init__(self, log_file: str | None = None):
        self.logger = logging.getLogger("dataflow")
        if log_file:
            handler = logging.FileHandler(log_file)
            handler.setFormatter(
                logging.Formatter("[%(asctime)s] [%(levelname)s] %(message)s")
            )
            self.logger.addHandler(handler)
        self.stats = ProcessingStats()

    def error(self, message: str) -> None:
        self.logger.error(message)
        self.stats.errors.append(message)

    def warning(self, message: str) -> None:
        self.logger.warning(message)
        self.stats.warnings.append(message)

    def info(self, message: str) -> None:
        self.logger.info(message)

Step 3: Extract DataReader with Strategy Pattern. The original read_csv_file became one strategy in a family of readers:

from abc import ABC, abstractmethod


class DataReader(ABC):
    """Abstract base class for reading data from files."""

    @abstractmethod
    def read(self, filepath: Path) -> list[dict[str, str]]:
        """Read data from a file and return a list of row dictionaries."""
        ...

    @abstractmethod
    def can_handle(self, filepath: Path) -> bool:
        """Return True if this reader can handle the given file type."""
        ...


class CSVReader(DataReader):
    """Reads data from CSV files."""

    def read(self, filepath: Path) -> list[dict[str, str]]:
        rows = []
        with open(filepath, "r", newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                rows.append(dict(row))
        return rows

    def can_handle(self, filepath: Path) -> bool:
        return filepath.suffix.lower() == ".csv"

Steps 4-7: Similar extractions for DataValidator, DataTransformer, DataRepository, and ReportGenerator.

Weeks 5-6: Building the Pipeline Orchestrator

With all components extracted, the developer created the orchestrator:

class DataProcessor:
    """Orchestrates the data processing pipeline."""

    def __init__(
        self,
        config: ProcessingConfig,
        reader: DataReader,
        validator: DataValidator,
        transformer: DataTransformer,
        repository: DataRepository,
        report_generator: ReportGenerator,
        logger: ProcessingLogger,
    ):
        self.config = config
        self.reader = reader
        self.validator = validator
        self.transformer = transformer
        self.repository = repository
        self.report_generator = report_generator
        self.logger = logger

    def process_file(self, filepath: Path) -> ProcessingResult:
        """Process a single data file through the pipeline."""
        self.logger.info(f"Processing: {filepath}")

        rows = self.reader.read(filepath)
        valid_rows = []

        for row in rows:
            if self.validator.validate(row):
                transformed = self.transformer.transform(row)
                valid_rows.append(transformed)
                self.logger.stats.rows_processed += 1
            else:
                self.logger.stats.rows_skipped += 1

        if valid_rows:
            self.repository.save_batch(valid_rows)
            summary = self.report_generator.generate(valid_rows)
            output_path = (
                self.config.output_dir
                / filepath.with_suffix(".json").name
            )
            self.report_generator.write(summary, output_path)

        self.logger.stats.files_completed += 1
        return ProcessingResult(
            filepath=filepath,
            rows_processed=len(valid_rows),
            rows_skipped=len(rows) - len(valid_rows),
        )

    def process_directory(self, directory: Path) -> list[ProcessingResult]:
        """Process all supported files in a directory."""
        results = []
        for filepath in sorted(directory.iterdir()):
            if self.reader.can_handle(filepath):
                try:
                    result = self.process_file(filepath)
                    results.append(result)
                except Exception as e:
                    self.logger.error(f"Failed to process {filepath}: {e}")
        return results

Weeks 6-7: Testing the New Architecture

With the OOP architecture in place, the developer wrote proper unit tests for each component:

class TestDataValidator:
    def test_required_field_present(self):
        validator = DataValidator([RequiredFieldRule("name")])
        assert validator.validate({"name": "Alice"}) is True

    def test_required_field_missing(self):
        validator = DataValidator([RequiredFieldRule("name")])
        assert validator.validate({"age": "30"}) is False

    def test_numeric_field_valid(self):
        validator = DataValidator([NumericFieldRule("amount")])
        assert validator.validate({"amount": "100.50"}) is True

    def test_numeric_field_invalid(self):
        validator = DataValidator([NumericFieldRule("amount")])
        assert validator.validate({"amount": "abc"}) is False


class TestDataProcessor:
    def test_process_file_happy_path(self, tmp_path):
        """Integration test: full pipeline with in-memory components."""
        csv_file = tmp_path / "test.csv"
        csv_file.write_text("name,amount\nAlice,100\nBob,200\n")

        processor = DataProcessor(
            config=ProcessingConfig(output_dir=tmp_path),
            reader=CSVReader(),
            validator=DataValidator([RequiredFieldRule("name")]),
            transformer=DataTransformer([DirectMapping("name", "name")]),
            repository=InMemoryRepository(),
            report_generator=JSONReportGenerator(),
            logger=ProcessingLogger(),
        )

        result = processor.process_file(csv_file)
        assert result.rows_processed == 2
        assert result.rows_skipped == 0

Test coverage for the refactored code reached 91%.

Week 8: Migration and Cleanup

The team updated all 12 scripts to use the new OOP architecture. Most scripts became thin configuration files:

# process_sales_data.py (after refactoring)
from dataflow import (
    DataProcessor, ProcessingConfig, CSVReader,
    DataValidator, RequiredFieldRule, NumericFieldRule,
    DataTransformer, DirectMapping, ComputedMapping,
    SQLiteRepository, JSONReportGenerator, ProcessingLogger,
)

config = ProcessingConfig.from_env()
processor = DataProcessor(
    config=config,
    reader=CSVReader(),
    validator=DataValidator([
        RequiredFieldRule("customer_id"),
        RequiredFieldRule("product"),
        NumericFieldRule("amount"),
    ]),
    transformer=DataTransformer([
        DirectMapping("customer_id", "customer_id"),
        DirectMapping("product", "product"),
        ComputedMapping("amount_cents", lambda r: int(float(r["amount"]) * 100)),
    ]),
    repository=SQLiteRepository(config.db_path, "sales"),
    report_generator=JSONReportGenerator(group_by="product"),
    logger=ProcessingLogger(str(config.log_file)),
)

if __name__ == "__main__":
    results = processor.process_directory(config.input_dir)
    print(f"Processed {len(results)} files")

Results

Metric Before After
Total lines of code 8,500 (12 scripts) 4,200 (shared library + 12 thin scripts)
Test coverage 0% 91%
Code duplication ~40% across scripts < 5%
Time to add new file format 2-3 days 2-3 hours (new DataReader subclass)
Time to add new output format 1-2 days 1-2 hours (new ReportGenerator subclass)
Bug fix propagation Manual across 12 scripts Automatic (shared library)
New developer onboarding 2 weeks 2 days

Key Lessons

  1. Procedural code is not inherently bad. The original scripts worked correctly for years. The refactoring was justified not because the procedural style was wrong, but because the code needed to evolve in ways that the procedural structure could not accommodate easily (new file formats, new output types, better error handling).

  2. AI as a design partner. The most valuable AI interactions were not about writing code but about discussing design trade-offs. Questions like "Is this class too abstract?" and "Should validation be a separate class or just methods on the transformer?" led to a better design than either the developer or the AI would have produced alone.

  3. Extract from the bottom up. Starting with the lowest-level components (configuration, logging) and working up to the orchestrator meant that each extraction had a stable foundation. The characterization tests caught issues at each step.

  4. Keep the original working until the end. Throughout the refactoring, the original script continued to work. The team could fall back to it at any point. The switch to the new architecture happened only after comprehensive testing confirmed identical behavior.

  5. OOP is not the goal — clarity is. Some functions (like validate_row) remained as simple functions rather than becoming classes because a function was the clearest expression of that logic. The design patterns (Strategy, Repository) were applied where they added value, not everywhere.

  6. Global state was the root cause of most problems. Replacing the global CONFIG, errors, warnings, and stats with encapsulated class instances solved testing difficulties, made the code thread-safe, and eliminated an entire category of bugs where one function inadvertently affected another through shared state.

  7. The 12-to-1 consolidation was the biggest win. Having 12 scripts with duplicated logic was the source of most bugs. Consolidating into a shared library with thin script wrappers meant that bug fixes and improvements applied everywhere automatically.