Case Study 2: Migrating from JSON Files to PostgreSQL

Overview

In Chapter 6, you built a task manager that stored data in JSON files. That approach was perfect for learning -- zero setup, instant feedback, and no external dependencies. But as the application grew, cracks appeared: concurrent access corrupted files, searching tasks required loading the entire dataset into memory, and there was no way to enforce data integrity beyond what the application code checked.

This case study walks through the complete process of migrating the Chapter 6 task manager from JSON file storage to a proper PostgreSQL database. You will see how to analyze an existing data format, design a relational schema, write a migration script, and update the application code -- all while preserving every piece of existing user data.

The Starting Point

The Chapter 6 task manager stores data in a tasks.json file with this structure:

{
  "tasks": [
    {
      "id": 1,
      "title": "Buy groceries",
      "description": "Milk, eggs, bread, and cheese",
      "completed": false,
      "priority": "high",
      "created_at": "2025-01-15T09:30:00",
      "tags": ["personal", "shopping"],
      "due_date": "2025-01-16"
    },
    {
      "id": 2,
      "title": "Write Chapter 18",
      "description": "Database design and data modeling",
      "completed": true,
      "priority": "high",
      "created_at": "2025-01-10T14:00:00",
      "completed_at": "2025-01-14T16:30:00",
      "tags": ["work", "writing"],
      "due_date": "2025-01-15"
    }
  ],
  "next_id": 3
}

And the original Python data access code looks something like this:

import json
from pathlib import Path

DATA_FILE = Path("tasks.json")


def load_tasks() -> dict:
    if DATA_FILE.exists():
        return json.loads(DATA_FILE.read_text())
    return {"tasks": [], "next_id": 1}


def save_tasks(data: dict) -> None:
    DATA_FILE.write_text(json.dumps(data, indent=2, default=str))


def add_task(title: str, description: str = "", priority: str = "medium",
             tags: list[str] | None = None, due_date: str | None = None) -> dict:
    data = load_tasks()
    task = {
        "id": data["next_id"],
        "title": title,
        "description": description,
        "completed": False,
        "priority": priority,
        "created_at": datetime.now().isoformat(),
        "tags": tags or [],
        "due_date": due_date,
    }
    data["tasks"].append(task)
    data["next_id"] += 1
    save_tasks(data)
    return task

Problems with the JSON Approach

Before migrating, it is worth documenting exactly what problems we are solving:

  1. No concurrent access safety. If two processes call save_tasks() simultaneously, one write overwrites the other. Data is lost silently.

  2. No querying capability. Finding all high-priority incomplete tasks requires loading the entire file, parsing JSON, and filtering in Python. With 10,000 tasks, this is noticeably slow.

  3. No referential integrity. Tags are stored as plain strings. There is no way to enforce that a tag name is valid, rename a tag across all tasks, or prevent duplicate spellings ("work" vs "Work" vs "WORK").

  4. No data validation at the storage level. A bug in the application could write "priority": 42 or "completed": "maybe" and the JSON file would accept it without complaint.

  5. No migration path. Changing the data format (adding a field, renaming a field) requires writing custom scripts to transform every JSON file.

  6. Backup and recovery are manual. There is no transaction log, no point-in-time recovery, no automated backups.

Step 1: Design the Relational Schema

Analyzing the JSON structure, we identify the following entities:

  • Tasks: The core entity with title, description, status, priority, dates
  • Tags: Currently embedded as string arrays, should be a separate table
  • Task-Tag relationship: Many-to-many (a task can have multiple tags, a tag can be on multiple tasks)

SQLAlchemy Models

from datetime import datetime, date
from sqlalchemy import (
    String, Text, Boolean, Integer, Date, ForeignKey,
    Index, CheckConstraint, func, create_engine,
)
from sqlalchemy.orm import (
    DeclarativeBase, Mapped, mapped_column, relationship, Session,
)


class Base(DeclarativeBase):
    pass


class Task(Base):
    __tablename__ = "tasks"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    description: Mapped[str] = mapped_column(Text, server_default="")
    completed: Mapped[bool] = mapped_column(Boolean, server_default="false")
    priority: Mapped[str] = mapped_column(
        String(10), server_default="medium"
    )
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    completed_at: Mapped[datetime | None] = mapped_column(default=None)
    due_date: Mapped[date | None] = mapped_column(Date, default=None)

    tags: Mapped[list["Tag"]] = relationship(
        secondary="task_tags", back_populates="tasks"
    )

    __table_args__ = (
        Index("ix_tasks_completed", "completed"),
        Index("ix_tasks_priority", "priority"),
        Index("ix_tasks_due_date", "due_date"),
        CheckConstraint(
            "priority IN ('low', 'medium', 'high')",
            name="valid_priority",
        ),
    )

    def __repr__(self) -> str:
        return f"<Task(id={self.id}, title='{self.title}')>"

    def to_dict(self) -> dict:
        """Convert to dictionary for API responses."""
        return {
            "id": self.id,
            "title": self.title,
            "description": self.description,
            "completed": self.completed,
            "priority": self.priority,
            "created_at": self.created_at.isoformat(),
            "completed_at": (
                self.completed_at.isoformat() if self.completed_at else None
            ),
            "due_date": self.due_date.isoformat() if self.due_date else None,
            "tags": [tag.name for tag in self.tags],
        }


class Tag(Base):
    __tablename__ = "tags"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)

    tasks: Mapped[list["Task"]] = relationship(
        secondary="task_tags", back_populates="tags"
    )

    def __repr__(self) -> str:
        return f"<Tag(id={self.id}, name='{self.name}')>"


# Junction table for many-to-many relationship
from sqlalchemy import Table, Column

task_tags = Table(
    "task_tags",
    Base.metadata,
    Column("task_id", Integer, ForeignKey("tasks.id", ondelete="CASCADE"),
           primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"),
           primary_key=True),
)

Design Decisions

Several design decisions differ from the JSON format:

  1. Tags are a separate table. This normalizes the data, prevents duplicate tag names, and enables efficient queries like "find all tasks with tag X" using a simple JOIN instead of scanning every task's tag array.

  2. Priority is constrained. A CHECK constraint ensures only valid priority values are stored. The JSON file accepted any string.

  3. Dates use proper types. created_at is a TIMESTAMP, due_date is a DATE. The JSON file stored these as strings, making date comparisons unreliable.

  4. The next_id counter is gone. The database handles auto-incrementing IDs automatically.

  5. Indexes support common queries. Filtering by completed, priority, and due_date are the most common operations, so those columns are indexed.

Step 2: Write the Migration Script

The migration script reads the JSON file, transforms the data, and inserts it into the database:

"""Migration script: JSON tasks to PostgreSQL."""
import json
from datetime import datetime, date
from pathlib import Path

from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session

from models import Base, Task, Tag, task_tags


def parse_date(date_str: str | None) -> date | None:
    """Parse a date string, handling various formats gracefully."""
    if not date_str:
        return None
    try:
        return date.fromisoformat(date_str)
    except (ValueError, TypeError):
        print(f"  Warning: Could not parse date '{date_str}', skipping")
        return None


def parse_datetime(dt_str: str | None) -> datetime | None:
    """Parse a datetime string, handling various formats."""
    if not dt_str:
        return None
    try:
        return datetime.fromisoformat(dt_str)
    except (ValueError, TypeError):
        print(f"  Warning: Could not parse datetime '{dt_str}', skipping")
        return None


def normalize_priority(priority: str | None) -> str:
    """Normalize priority values to valid enum values."""
    if not priority:
        return "medium"
    normalized = priority.strip().lower()
    if normalized in ("low", "medium", "high"):
        return normalized
    print(f"  Warning: Unknown priority '{priority}', defaulting to 'medium'")
    return "medium"


def normalize_tag(tag: str) -> str:
    """Normalize tag names to lowercase, stripped."""
    return tag.strip().lower()


def migrate_json_to_db(
    json_path: str,
    database_url: str,
    dry_run: bool = False,
) -> dict:
    """
    Migrate tasks from a JSON file to a PostgreSQL database.

    Args:
        json_path: Path to the JSON tasks file.
        database_url: SQLAlchemy database URL.
        dry_run: If True, rolls back instead of committing.

    Returns:
        Dictionary with migration statistics.
    """
    stats = {
        "tasks_read": 0,
        "tasks_migrated": 0,
        "tasks_skipped": 0,
        "tags_created": 0,
        "warnings": [],
    }

    # Read the JSON file
    json_file = Path(json_path)
    if not json_file.exists():
        raise FileNotFoundError(f"JSON file not found: {json_path}")

    data = json.loads(json_file.read_text())
    tasks_data = data.get("tasks", [])
    stats["tasks_read"] = len(tasks_data)

    print(f"Read {len(tasks_data)} tasks from {json_path}")

    # Set up the database
    engine = create_engine(database_url)
    Base.metadata.create_all(engine)

    with Session(engine) as session:
        # Phase 1: Collect and create all unique tags
        all_tags: set[str] = set()
        for task_data in tasks_data:
            for tag in task_data.get("tags", []):
                all_tags.add(normalize_tag(tag))

        tag_objects: dict[str, Tag] = {}
        for tag_name in sorted(all_tags):
            # Check if tag already exists
            existing = session.scalars(
                select(Tag).where(Tag.name == tag_name)
            ).first()
            if existing:
                tag_objects[tag_name] = existing
            else:
                tag_obj = Tag(name=tag_name)
                session.add(tag_obj)
                tag_objects[tag_name] = tag_obj
                stats["tags_created"] += 1

        session.flush()  # Get tag IDs
        print(f"Created {stats['tags_created']} tags: {sorted(all_tags)}")

        # Phase 2: Migrate tasks
        for task_data in tasks_data:
            task_id = task_data.get("id")
            title = task_data.get("title", "").strip()

            if not title:
                msg = f"Task {task_id} has no title, skipping"
                print(f"  Warning: {msg}")
                stats["warnings"].append(msg)
                stats["tasks_skipped"] += 1
                continue

            # Check for duplicate IDs
            existing = session.get(Task, task_id)
            if existing:
                msg = f"Task with ID {task_id} already exists, skipping"
                print(f"  Warning: {msg}")
                stats["warnings"].append(msg)
                stats["tasks_skipped"] += 1
                continue

            task = Task(
                id=task_id,
                title=title[:200],  # Enforce max length
                description=task_data.get("description", ""),
                completed=bool(task_data.get("completed", False)),
                priority=normalize_priority(task_data.get("priority")),
                created_at=parse_datetime(task_data.get("created_at"))
                or datetime.now(),
                completed_at=parse_datetime(task_data.get("completed_at")),
                due_date=parse_date(task_data.get("due_date")),
            )

            # Attach tags
            for tag_name in task_data.get("tags", []):
                normalized = normalize_tag(tag_name)
                if normalized in tag_objects:
                    task.tags.append(tag_objects[normalized])

            session.add(task)
            stats["tasks_migrated"] += 1

        if dry_run:
            print("\nDRY RUN: Rolling back all changes")
            session.rollback()
        else:
            session.commit()
            print(f"\nMigration complete: {stats['tasks_migrated']} tasks migrated")

    return stats

Key Design Decisions in the Migration Script

  1. Data normalization during migration. Tags are lowercased and stripped. Priorities are validated against the allowed values. This cleans up inconsistencies that accumulated in the JSON file.

  2. Graceful error handling. Invalid dates and unknown priorities produce warnings but do not stop the migration. The stats dictionary tracks exactly what happened.

  3. Dry-run mode. The dry_run=True flag lets you test the migration without actually modifying the database. Always test migrations this way first.

  4. Duplicate detection. The script checks for existing records before inserting, making it safe to run multiple times (idempotent).

  5. Two-phase approach. Tags are created first (Phase 1), then tasks with their tag associations (Phase 2). This ensures all foreign key references are valid.

Step 3: Verification

After migration, verify that the data is correct:

def verify_migration(json_path: str, database_url: str) -> bool:
    """
    Verify that all JSON data was correctly migrated to the database.

    Returns True if verification passes, False otherwise.
    """
    data = json.loads(Path(json_path).read_text())
    tasks_data = data.get("tasks", [])

    engine = create_engine(database_url)
    errors = []

    with Session(engine) as session:
        for task_data in tasks_data:
            task_id = task_data["id"]
            db_task = session.get(Task, task_id)

            if db_task is None:
                errors.append(f"Task {task_id} not found in database")
                continue

            # Verify core fields
            if db_task.title != task_data["title"][:200]:
                errors.append(
                    f"Task {task_id}: title mismatch: "
                    f"'{db_task.title}' vs '{task_data['title']}'"
                )

            if db_task.completed != bool(task_data.get("completed", False)):
                errors.append(
                    f"Task {task_id}: completed mismatch: "
                    f"{db_task.completed} vs {task_data.get('completed')}"
                )

            # Verify tags
            db_tags = sorted(tag.name for tag in db_task.tags)
            json_tags = sorted(
                normalize_tag(t) for t in task_data.get("tags", [])
            )
            if db_tags != json_tags:
                errors.append(
                    f"Task {task_id}: tag mismatch: {db_tags} vs {json_tags}"
                )

        # Verify total counts
        db_count = session.query(Task).count()
        json_count = len([t for t in tasks_data if t.get("title", "").strip()])
        if db_count != json_count:
            errors.append(
                f"Count mismatch: {db_count} in DB vs {json_count} in JSON"
            )

    if errors:
        print(f"Verification FAILED with {len(errors)} errors:")
        for error in errors:
            print(f"  - {error}")
        return False
    else:
        print(f"Verification PASSED: {len(tasks_data)} tasks verified")
        return True

Step 4: Update the Application Code

The final step replaces the JSON-based functions with database-backed equivalents:

"""Updated task manager using PostgreSQL instead of JSON files."""
from datetime import datetime, date
from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import Session, selectinload

from models import Base, Task, Tag

DATABASE_URL = "postgresql://user:password@localhost:5432/taskmanager"
engine = create_engine(DATABASE_URL)


def get_session() -> Session:
    """Create a new database session."""
    return Session(engine)


def add_task(
    title: str,
    description: str = "",
    priority: str = "medium",
    tags: list[str] | None = None,
    due_date: date | None = None,
) -> dict:
    """Add a new task to the database."""
    with get_session() as session:
        with session.begin():
            task = Task(
                title=title,
                description=description,
                priority=priority,
                due_date=due_date,
            )
            if tags:
                for tag_name in tags:
                    normalized = tag_name.strip().lower()
                    tag = session.scalars(
                        select(Tag).where(Tag.name == normalized)
                    ).first()
                    if not tag:
                        tag = Tag(name=normalized)
                        session.add(tag)
                    task.tags.append(tag)
            session.add(task)
            session.flush()
            return task.to_dict()


def list_tasks(
    completed: bool | None = None,
    priority: str | None = None,
    tag: str | None = None,
    limit: int = 50,
) -> list[dict]:
    """List tasks with optional filters."""
    with get_session() as session:
        stmt = select(Task).options(selectinload(Task.tags))

        if completed is not None:
            stmt = stmt.where(Task.completed == completed)
        if priority:
            stmt = stmt.where(Task.priority == priority)
        if tag:
            stmt = stmt.join(Task.tags).where(Tag.name == tag.lower())

        stmt = stmt.order_by(Task.created_at.desc()).limit(limit)
        tasks = session.scalars(stmt).all()
        return [task.to_dict() for task in tasks]


def complete_task(task_id: int) -> dict | None:
    """Mark a task as completed."""
    with get_session() as session:
        with session.begin():
            task = session.get(Task, task_id)
            if not task:
                return None
            task.completed = True
            task.completed_at = datetime.now()
            return task.to_dict()


def delete_task(task_id: int) -> bool:
    """Delete a task by ID."""
    with get_session() as session:
        with session.begin():
            task = session.get(Task, task_id)
            if not task:
                return False
            session.delete(task)
            return True


def search_tasks(query: str) -> list[dict]:
    """Search tasks by title or description."""
    with get_session() as session:
        stmt = (
            select(Task)
            .options(selectinload(Task.tags))
            .where(
                Task.title.ilike(f"%{query}%")
                | Task.description.ilike(f"%{query}%")
            )
            .order_by(Task.created_at.desc())
        )
        tasks = session.scalars(stmt).all()
        return [task.to_dict() for task in tasks]


def get_stats() -> dict:
    """Get task statistics."""
    with get_session() as session:
        total = session.scalar(select(func.count(Task.id)))
        completed = session.scalar(
            select(func.count(Task.id)).where(Task.completed == True)
        )
        by_priority = session.execute(
            select(Task.priority, func.count(Task.id))
            .group_by(Task.priority)
        ).all()

        return {
            "total": total,
            "completed": completed,
            "pending": total - completed,
            "by_priority": {p: c for p, c in by_priority},
        }

Comparing Before and After

Aspect JSON File PostgreSQL
Concurrent access Unsafe -- data corruption Safe -- ACID transactions
Search performance O(n) -- scan all tasks O(log n) -- uses indexes
Data validation Application code only Database constraints + application
Filtering by tag Load all, filter in Python SQL JOIN with index
Statistics Load all, count in Python SQL aggregation (instant)
Backup/recovery Manual file copies Automated, point-in-time recovery
Storage at 100K tasks ~50MB JSON file, loaded entirely ~20MB on disk, queried efficiently

Lessons Learned

  1. Migration is not just copying data. The hardest part is data normalization -- converting embedded arrays to proper tables, cleaning inconsistent values, handling edge cases in date parsing.

  2. The verification step is non-negotiable. Without systematic verification comparing source and destination, you cannot know whether the migration was successful. Write verification code before the migration code.

  3. Dry-run mode prevents disasters. Always test the migration without committing first. This catches errors in your transformation logic before they affect real data.

  4. The application code becomes simpler. Notice how list_tasks() with filters is cleaner with SQLAlchemy than with manual JSON filtering. The database handles the heavy lifting.

  5. Searching becomes trivial. What required loading the entire JSON file and scanning in Python becomes a single SQL query with ILIKE.

  6. Statistics go from expensive to free. COUNT and GROUP BY in SQL are orders of magnitude faster than loading all data into Python and counting manually.

  7. Start with SQLite, finish with PostgreSQL. During the migration development, you can use SQLite to iterate quickly. Switch to PostgreSQL for the final migration and production deployment. SQLAlchemy makes this switch a one-line change to the database URL.

  8. Keep the old code path available. During the transition period, keep the JSON reader functional so you can verify data and roll back if needed. Only remove it once the database is proven reliable in production.