Case Study 2: Migrating from JSON Files to PostgreSQL
Overview
In Chapter 6, you built a task manager that stored data in JSON files. That approach was perfect for learning -- zero setup, instant feedback, and no external dependencies. But as the application grew, cracks appeared: concurrent access corrupted files, searching tasks required loading the entire dataset into memory, and there was no way to enforce data integrity beyond what the application code checked.
This case study walks through the complete process of migrating the Chapter 6 task manager from JSON file storage to a proper PostgreSQL database. You will see how to analyze an existing data format, design a relational schema, write a migration script, and update the application code -- all while preserving every piece of existing user data.
The Starting Point
The Chapter 6 task manager stores data in a tasks.json file with this structure:
{
"tasks": [
{
"id": 1,
"title": "Buy groceries",
"description": "Milk, eggs, bread, and cheese",
"completed": false,
"priority": "high",
"created_at": "2025-01-15T09:30:00",
"tags": ["personal", "shopping"],
"due_date": "2025-01-16"
},
{
"id": 2,
"title": "Write Chapter 18",
"description": "Database design and data modeling",
"completed": true,
"priority": "high",
"created_at": "2025-01-10T14:00:00",
"completed_at": "2025-01-14T16:30:00",
"tags": ["work", "writing"],
"due_date": "2025-01-15"
}
],
"next_id": 3
}
And the original Python data access code looks something like this:
import json
from pathlib import Path
DATA_FILE = Path("tasks.json")
def load_tasks() -> dict:
if DATA_FILE.exists():
return json.loads(DATA_FILE.read_text())
return {"tasks": [], "next_id": 1}
def save_tasks(data: dict) -> None:
DATA_FILE.write_text(json.dumps(data, indent=2, default=str))
def add_task(title: str, description: str = "", priority: str = "medium",
tags: list[str] | None = None, due_date: str | None = None) -> dict:
data = load_tasks()
task = {
"id": data["next_id"],
"title": title,
"description": description,
"completed": False,
"priority": priority,
"created_at": datetime.now().isoformat(),
"tags": tags or [],
"due_date": due_date,
}
data["tasks"].append(task)
data["next_id"] += 1
save_tasks(data)
return task
Problems with the JSON Approach
Before migrating, it is worth documenting exactly what problems we are solving:
-
No concurrent access safety. If two processes call
save_tasks()simultaneously, one write overwrites the other. Data is lost silently. -
No querying capability. Finding all high-priority incomplete tasks requires loading the entire file, parsing JSON, and filtering in Python. With 10,000 tasks, this is noticeably slow.
-
No referential integrity. Tags are stored as plain strings. There is no way to enforce that a tag name is valid, rename a tag across all tasks, or prevent duplicate spellings ("work" vs "Work" vs "WORK").
-
No data validation at the storage level. A bug in the application could write
"priority": 42or"completed": "maybe"and the JSON file would accept it without complaint. -
No migration path. Changing the data format (adding a field, renaming a field) requires writing custom scripts to transform every JSON file.
-
Backup and recovery are manual. There is no transaction log, no point-in-time recovery, no automated backups.
Step 1: Design the Relational Schema
Analyzing the JSON structure, we identify the following entities:
- Tasks: The core entity with title, description, status, priority, dates
- Tags: Currently embedded as string arrays, should be a separate table
- Task-Tag relationship: Many-to-many (a task can have multiple tags, a tag can be on multiple tasks)
SQLAlchemy Models
from datetime import datetime, date
from sqlalchemy import (
String, Text, Boolean, Integer, Date, ForeignKey,
Index, CheckConstraint, func, create_engine,
)
from sqlalchemy.orm import (
DeclarativeBase, Mapped, mapped_column, relationship, Session,
)
class Base(DeclarativeBase):
pass
class Task(Base):
__tablename__ = "tasks"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
description: Mapped[str] = mapped_column(Text, server_default="")
completed: Mapped[bool] = mapped_column(Boolean, server_default="false")
priority: Mapped[str] = mapped_column(
String(10), server_default="medium"
)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
completed_at: Mapped[datetime | None] = mapped_column(default=None)
due_date: Mapped[date | None] = mapped_column(Date, default=None)
tags: Mapped[list["Tag"]] = relationship(
secondary="task_tags", back_populates="tasks"
)
__table_args__ = (
Index("ix_tasks_completed", "completed"),
Index("ix_tasks_priority", "priority"),
Index("ix_tasks_due_date", "due_date"),
CheckConstraint(
"priority IN ('low', 'medium', 'high')",
name="valid_priority",
),
)
def __repr__(self) -> str:
return f"<Task(id={self.id}, title='{self.title}')>"
def to_dict(self) -> dict:
"""Convert to dictionary for API responses."""
return {
"id": self.id,
"title": self.title,
"description": self.description,
"completed": self.completed,
"priority": self.priority,
"created_at": self.created_at.isoformat(),
"completed_at": (
self.completed_at.isoformat() if self.completed_at else None
),
"due_date": self.due_date.isoformat() if self.due_date else None,
"tags": [tag.name for tag in self.tags],
}
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
tasks: Mapped[list["Task"]] = relationship(
secondary="task_tags", back_populates="tags"
)
def __repr__(self) -> str:
return f"<Tag(id={self.id}, name='{self.name}')>"
# Junction table for many-to-many relationship
from sqlalchemy import Table, Column
task_tags = Table(
"task_tags",
Base.metadata,
Column("task_id", Integer, ForeignKey("tasks.id", ondelete="CASCADE"),
primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"),
primary_key=True),
)
Design Decisions
Several design decisions differ from the JSON format:
-
Tags are a separate table. This normalizes the data, prevents duplicate tag names, and enables efficient queries like "find all tasks with tag X" using a simple JOIN instead of scanning every task's tag array.
-
Priority is constrained. A CHECK constraint ensures only valid priority values are stored. The JSON file accepted any string.
-
Dates use proper types.
created_atis a TIMESTAMP,due_dateis a DATE. The JSON file stored these as strings, making date comparisons unreliable. -
The
next_idcounter is gone. The database handles auto-incrementing IDs automatically. -
Indexes support common queries. Filtering by
completed,priority, anddue_dateare the most common operations, so those columns are indexed.
Step 2: Write the Migration Script
The migration script reads the JSON file, transforms the data, and inserts it into the database:
"""Migration script: JSON tasks to PostgreSQL."""
import json
from datetime import datetime, date
from pathlib import Path
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from models import Base, Task, Tag, task_tags
def parse_date(date_str: str | None) -> date | None:
"""Parse a date string, handling various formats gracefully."""
if not date_str:
return None
try:
return date.fromisoformat(date_str)
except (ValueError, TypeError):
print(f" Warning: Could not parse date '{date_str}', skipping")
return None
def parse_datetime(dt_str: str | None) -> datetime | None:
"""Parse a datetime string, handling various formats."""
if not dt_str:
return None
try:
return datetime.fromisoformat(dt_str)
except (ValueError, TypeError):
print(f" Warning: Could not parse datetime '{dt_str}', skipping")
return None
def normalize_priority(priority: str | None) -> str:
"""Normalize priority values to valid enum values."""
if not priority:
return "medium"
normalized = priority.strip().lower()
if normalized in ("low", "medium", "high"):
return normalized
print(f" Warning: Unknown priority '{priority}', defaulting to 'medium'")
return "medium"
def normalize_tag(tag: str) -> str:
"""Normalize tag names to lowercase, stripped."""
return tag.strip().lower()
def migrate_json_to_db(
json_path: str,
database_url: str,
dry_run: bool = False,
) -> dict:
"""
Migrate tasks from a JSON file to a PostgreSQL database.
Args:
json_path: Path to the JSON tasks file.
database_url: SQLAlchemy database URL.
dry_run: If True, rolls back instead of committing.
Returns:
Dictionary with migration statistics.
"""
stats = {
"tasks_read": 0,
"tasks_migrated": 0,
"tasks_skipped": 0,
"tags_created": 0,
"warnings": [],
}
# Read the JSON file
json_file = Path(json_path)
if not json_file.exists():
raise FileNotFoundError(f"JSON file not found: {json_path}")
data = json.loads(json_file.read_text())
tasks_data = data.get("tasks", [])
stats["tasks_read"] = len(tasks_data)
print(f"Read {len(tasks_data)} tasks from {json_path}")
# Set up the database
engine = create_engine(database_url)
Base.metadata.create_all(engine)
with Session(engine) as session:
# Phase 1: Collect and create all unique tags
all_tags: set[str] = set()
for task_data in tasks_data:
for tag in task_data.get("tags", []):
all_tags.add(normalize_tag(tag))
tag_objects: dict[str, Tag] = {}
for tag_name in sorted(all_tags):
# Check if tag already exists
existing = session.scalars(
select(Tag).where(Tag.name == tag_name)
).first()
if existing:
tag_objects[tag_name] = existing
else:
tag_obj = Tag(name=tag_name)
session.add(tag_obj)
tag_objects[tag_name] = tag_obj
stats["tags_created"] += 1
session.flush() # Get tag IDs
print(f"Created {stats['tags_created']} tags: {sorted(all_tags)}")
# Phase 2: Migrate tasks
for task_data in tasks_data:
task_id = task_data.get("id")
title = task_data.get("title", "").strip()
if not title:
msg = f"Task {task_id} has no title, skipping"
print(f" Warning: {msg}")
stats["warnings"].append(msg)
stats["tasks_skipped"] += 1
continue
# Check for duplicate IDs
existing = session.get(Task, task_id)
if existing:
msg = f"Task with ID {task_id} already exists, skipping"
print(f" Warning: {msg}")
stats["warnings"].append(msg)
stats["tasks_skipped"] += 1
continue
task = Task(
id=task_id,
title=title[:200], # Enforce max length
description=task_data.get("description", ""),
completed=bool(task_data.get("completed", False)),
priority=normalize_priority(task_data.get("priority")),
created_at=parse_datetime(task_data.get("created_at"))
or datetime.now(),
completed_at=parse_datetime(task_data.get("completed_at")),
due_date=parse_date(task_data.get("due_date")),
)
# Attach tags
for tag_name in task_data.get("tags", []):
normalized = normalize_tag(tag_name)
if normalized in tag_objects:
task.tags.append(tag_objects[normalized])
session.add(task)
stats["tasks_migrated"] += 1
if dry_run:
print("\nDRY RUN: Rolling back all changes")
session.rollback()
else:
session.commit()
print(f"\nMigration complete: {stats['tasks_migrated']} tasks migrated")
return stats
Key Design Decisions in the Migration Script
-
Data normalization during migration. Tags are lowercased and stripped. Priorities are validated against the allowed values. This cleans up inconsistencies that accumulated in the JSON file.
-
Graceful error handling. Invalid dates and unknown priorities produce warnings but do not stop the migration. The
statsdictionary tracks exactly what happened. -
Dry-run mode. The
dry_run=Trueflag lets you test the migration without actually modifying the database. Always test migrations this way first. -
Duplicate detection. The script checks for existing records before inserting, making it safe to run multiple times (idempotent).
-
Two-phase approach. Tags are created first (Phase 1), then tasks with their tag associations (Phase 2). This ensures all foreign key references are valid.
Step 3: Verification
After migration, verify that the data is correct:
def verify_migration(json_path: str, database_url: str) -> bool:
"""
Verify that all JSON data was correctly migrated to the database.
Returns True if verification passes, False otherwise.
"""
data = json.loads(Path(json_path).read_text())
tasks_data = data.get("tasks", [])
engine = create_engine(database_url)
errors = []
with Session(engine) as session:
for task_data in tasks_data:
task_id = task_data["id"]
db_task = session.get(Task, task_id)
if db_task is None:
errors.append(f"Task {task_id} not found in database")
continue
# Verify core fields
if db_task.title != task_data["title"][:200]:
errors.append(
f"Task {task_id}: title mismatch: "
f"'{db_task.title}' vs '{task_data['title']}'"
)
if db_task.completed != bool(task_data.get("completed", False)):
errors.append(
f"Task {task_id}: completed mismatch: "
f"{db_task.completed} vs {task_data.get('completed')}"
)
# Verify tags
db_tags = sorted(tag.name for tag in db_task.tags)
json_tags = sorted(
normalize_tag(t) for t in task_data.get("tags", [])
)
if db_tags != json_tags:
errors.append(
f"Task {task_id}: tag mismatch: {db_tags} vs {json_tags}"
)
# Verify total counts
db_count = session.query(Task).count()
json_count = len([t for t in tasks_data if t.get("title", "").strip()])
if db_count != json_count:
errors.append(
f"Count mismatch: {db_count} in DB vs {json_count} in JSON"
)
if errors:
print(f"Verification FAILED with {len(errors)} errors:")
for error in errors:
print(f" - {error}")
return False
else:
print(f"Verification PASSED: {len(tasks_data)} tasks verified")
return True
Step 4: Update the Application Code
The final step replaces the JSON-based functions with database-backed equivalents:
"""Updated task manager using PostgreSQL instead of JSON files."""
from datetime import datetime, date
from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import Session, selectinload
from models import Base, Task, Tag
DATABASE_URL = "postgresql://user:password@localhost:5432/taskmanager"
engine = create_engine(DATABASE_URL)
def get_session() -> Session:
"""Create a new database session."""
return Session(engine)
def add_task(
title: str,
description: str = "",
priority: str = "medium",
tags: list[str] | None = None,
due_date: date | None = None,
) -> dict:
"""Add a new task to the database."""
with get_session() as session:
with session.begin():
task = Task(
title=title,
description=description,
priority=priority,
due_date=due_date,
)
if tags:
for tag_name in tags:
normalized = tag_name.strip().lower()
tag = session.scalars(
select(Tag).where(Tag.name == normalized)
).first()
if not tag:
tag = Tag(name=normalized)
session.add(tag)
task.tags.append(tag)
session.add(task)
session.flush()
return task.to_dict()
def list_tasks(
completed: bool | None = None,
priority: str | None = None,
tag: str | None = None,
limit: int = 50,
) -> list[dict]:
"""List tasks with optional filters."""
with get_session() as session:
stmt = select(Task).options(selectinload(Task.tags))
if completed is not None:
stmt = stmt.where(Task.completed == completed)
if priority:
stmt = stmt.where(Task.priority == priority)
if tag:
stmt = stmt.join(Task.tags).where(Tag.name == tag.lower())
stmt = stmt.order_by(Task.created_at.desc()).limit(limit)
tasks = session.scalars(stmt).all()
return [task.to_dict() for task in tasks]
def complete_task(task_id: int) -> dict | None:
"""Mark a task as completed."""
with get_session() as session:
with session.begin():
task = session.get(Task, task_id)
if not task:
return None
task.completed = True
task.completed_at = datetime.now()
return task.to_dict()
def delete_task(task_id: int) -> bool:
"""Delete a task by ID."""
with get_session() as session:
with session.begin():
task = session.get(Task, task_id)
if not task:
return False
session.delete(task)
return True
def search_tasks(query: str) -> list[dict]:
"""Search tasks by title or description."""
with get_session() as session:
stmt = (
select(Task)
.options(selectinload(Task.tags))
.where(
Task.title.ilike(f"%{query}%")
| Task.description.ilike(f"%{query}%")
)
.order_by(Task.created_at.desc())
)
tasks = session.scalars(stmt).all()
return [task.to_dict() for task in tasks]
def get_stats() -> dict:
"""Get task statistics."""
with get_session() as session:
total = session.scalar(select(func.count(Task.id)))
completed = session.scalar(
select(func.count(Task.id)).where(Task.completed == True)
)
by_priority = session.execute(
select(Task.priority, func.count(Task.id))
.group_by(Task.priority)
).all()
return {
"total": total,
"completed": completed,
"pending": total - completed,
"by_priority": {p: c for p, c in by_priority},
}
Comparing Before and After
| Aspect | JSON File | PostgreSQL |
|---|---|---|
| Concurrent access | Unsafe -- data corruption | Safe -- ACID transactions |
| Search performance | O(n) -- scan all tasks | O(log n) -- uses indexes |
| Data validation | Application code only | Database constraints + application |
| Filtering by tag | Load all, filter in Python | SQL JOIN with index |
| Statistics | Load all, count in Python | SQL aggregation (instant) |
| Backup/recovery | Manual file copies | Automated, point-in-time recovery |
| Storage at 100K tasks | ~50MB JSON file, loaded entirely | ~20MB on disk, queried efficiently |
Lessons Learned
-
Migration is not just copying data. The hardest part is data normalization -- converting embedded arrays to proper tables, cleaning inconsistent values, handling edge cases in date parsing.
-
The verification step is non-negotiable. Without systematic verification comparing source and destination, you cannot know whether the migration was successful. Write verification code before the migration code.
-
Dry-run mode prevents disasters. Always test the migration without committing first. This catches errors in your transformation logic before they affect real data.
-
The application code becomes simpler. Notice how
list_tasks()with filters is cleaner with SQLAlchemy than with manual JSON filtering. The database handles the heavy lifting. -
Searching becomes trivial. What required loading the entire JSON file and scanning in Python becomes a single SQL query with ILIKE.
-
Statistics go from expensive to free. COUNT and GROUP BY in SQL are orders of magnitude faster than loading all data into Python and counting manually.
-
Start with SQLite, finish with PostgreSQL. During the migration development, you can use SQLite to iterate quickly. Switch to PostgreSQL for the final migration and production deployment. SQLAlchemy makes this switch a one-line change to the database URL.
-
Keep the old code path available. During the transition period, keep the JSON reader functional so you can verify data and roll back if needed. Only remove it once the database is proven reliable in production.