11 min read

> "A system that requires a human to remember to run it is not a system — it is a chore."

Chapter 22: Scheduling and Task Automation

"A system that requires a human to remember to run it is not a system — it is a chore."


Opening Scenario: The System That Ran Once

At the end of Chapter 21, Sandra Chen asked Priya Okonkwo to make the market intelligence pull run automatically every Friday morning.

For a moment, Priya imagined setting an alarm on her phone to remind herself to run the script every Friday. Then she imagined what happens when she's on vacation. Or sick. Or in back-to-back meetings until noon. Or simply forgets because it's a busy week.

A script that humans have to remember to run is just a faster spreadsheet. The real power of automation is the part where it happens without you.

This chapter is about scheduling: making Python scripts run automatically, reliably, on a defined timetable, whether or not you are at your desk.

By the end of this chapter, Priya's Monday report will generate itself. It will run at 7:45 AM every Monday, pull the latest data, produce the Excel file, and deposit it in Sandra's email inbox before the 9 AM staff meeting — without Priya touching a keyboard.

That is what production automation looks like.


22.1 Why Scheduling Matters

When you move from running scripts manually to running them on a schedule, you cross a threshold. You are no longer a person who runs programs; you are a person who builds systems.

The business case for scheduling is straightforward:

Consistency. A scheduled job runs at the same time every day, every week, every month. Manual execution doesn't — people are busy, distracted, or traveling. The 9 AM Monday report that slips to 11 AM because you were in a meeting is the 9 AM Monday report that doesn't exist for the purpose of the 9 AM Monday meeting.

Human capital. If Priya's job includes running a script every Monday morning, that's 15 minutes of her time and attention every week — time she cannot spend on analysis, on building better tools, or on anything else. Multiply that by 52 weeks and you have Priya spending a full workday every year starting a process that could start itself.

Reliability under adversity. Automated jobs run when their operator is sick, on leave, or no longer with the company. Critical business processes should not depend on any individual's availability.

Auditability. A scheduled job that logs its execution creates a record: what ran, when, whether it succeeded, what data it processed. Manual execution creates no such record unless the operator documents it themselves, which they almost never do.

Three Levels of Scheduling

You have three tools available, appropriate for different contexts:

  1. Python's schedule library — simple, pure-Python scheduling that runs inside a long-running Python process. Best for quick scripts you'll run in the background or on a personal machine.

  2. APScheduler — a more powerful Python library that supports cron expressions, persistent job stores, timezone-aware scheduling, and running multiple jobs concurrently. Best for more complex scheduling requirements within Python.

  3. OS-level scheduling — Windows Task Scheduler or Linux/macOS cron. These run Python scripts as external processes. Best for production deployments, server-based automation, and scripts that need to run even when no Python program is already running.


22.2 The schedule Library: Simple Python Scheduling

The schedule library uses a human-readable API that makes simple scheduling almost self-documenting.

Install it:

pip install schedule

Basic Usage

import schedule
import time

def generate_morning_report():
    print("Generating morning report...")
    # Your report logic here

def send_weekly_summary():
    print("Sending weekly summary email...")
    # Your email logic here

# Define the schedule
schedule.every().day.at("08:00").do(generate_morning_report)
schedule.every().monday.at("07:45").do(send_weekly_summary)

# Run the scheduler loop
while True:
    schedule.run_pending()
    time.sleep(60)  # Check every 60 seconds

The while True loop is the scheduler's heartbeat. It calls schedule.run_pending() to execute any jobs whose scheduled time has arrived, then sleeps for a short interval before checking again.

Schedule Syntax Reference

The schedule library's API is designed to read like English:

# Time-based scheduling
schedule.every().hour.do(job)
schedule.every(2).hours.do(job)
schedule.every().day.at("08:30").do(job)
schedule.every().day.at("17:00").do(job)
schedule.every(10).minutes.do(job)

# Day-based scheduling
schedule.every().monday.do(job)
schedule.every().monday.at("07:45").do(job)
schedule.every().tuesday.at("09:00").do(job)
schedule.every().wednesday.at("12:00").do(job)
schedule.every().thursday.do(job)
schedule.every().friday.at("16:30").do(job)
schedule.every().saturday.do(job)
schedule.every().sunday.do(job)

# Weekdays (Monday through Friday)
schedule.every().weekday.do(job)  # schedule-library 1.1+

# Interval scheduling
schedule.every(30).minutes.do(job)
schedule.every(2).hours.do(job)
schedule.every(3).days.do(job)
schedule.every().week.do(job)

Passing Arguments to Jobs

By default, do() expects a function with no arguments. To pass arguments, use Python's functools.partial or keyword arguments via do():

import schedule
from functools import partial

def generate_report(region, output_dir):
    print(f"Generating report for {region} → {output_dir}")

# Using functools.partial
midwest_report = partial(generate_report, region="Midwest", output_dir="/reports")
northeast_report = partial(generate_report, region="Northeast", output_dir="/reports")

schedule.every().monday.at("07:45").do(midwest_report)
schedule.every().monday.at("07:50").do(northeast_report)

# Alternative: lambda
schedule.every().tuesday.at("09:00").do(
    lambda: generate_report("South", "/reports")
)

Canceling Jobs

# Cancel a specific job
my_job = schedule.every().day.at("08:00").do(daily_report)
schedule.cancel_job(my_job)

# Cancel all jobs
schedule.clear()

# Cancel all jobs with a specific tag
schedule.every().day.at("08:00").do(daily_report).tag("reports")
schedule.every().monday.do(weekly_summary).tag("reports")
schedule.clear("reports")  # Cancel all jobs tagged "reports"

Running Jobs Once and Canceling

A common pattern: run a job now, then let it run on schedule, but cancel it after the first failure:

import schedule
import time

def check_system_health():
    """Returns True on success, False on failure."""
    try:
        # ... health check logic ...
        return True
    except Exception as e:
        print(f"Health check failed: {e}")
        return False

def health_check_with_cancel():
    """Wrapper that cancels the job if the check fails repeatedly."""
    success = check_system_health()
    if not success:
        return schedule.CancelJob  # Special sentinel — schedule removes this job

schedule.every(5).minutes.do(health_check_with_cancel)

Returning schedule.CancelJob from a job function causes schedule to remove that job automatically.


22.3 Error Handling in Scheduled Jobs

This is where many scheduling implementations fail. A job that runs at 7:45 AM, crashes silently, and leaves no trace is not automation — it is a false sense of security.

The key rule: Errors in scheduled jobs must be caught and logged. They must never silently crash the job or, worse, crash the scheduler loop itself.

The Naive Problem

def generate_monday_report():
    # If this raises an exception, the scheduler might
    # crash the entire loop or silently skip future runs
    data = load_sales_data()   # FileNotFoundError?
    report = build_report(data) # Any exception?
    send_email(report)          # SMTP failure?

schedule.every().monday.at("07:45").do(generate_monday_report)

If load_sales_data() raises FileNotFoundError, the exception propagates up to the schedule library. In most versions of schedule, this will crash the job but not the scheduler — the job simply stops executing. But you will never know unless you check manually.

The Correct Pattern

Wrap every scheduled job in a try/except:

import schedule
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

def generate_monday_report():
    """Generate and send the Monday executive report."""
    job_start_time = datetime.now()
    logger.info(f"Monday report job started at {job_start_time.strftime('%H:%M:%S')}")

    try:
        data = load_sales_data()
        report_path = build_report(data)
        send_email(report_path)
        elapsed = (datetime.now() - job_start_time).total_seconds()
        logger.info(f"Monday report completed successfully in {elapsed:.1f}s")

    except FileNotFoundError as e:
        logger.error(f"Monday report FAILED: Sales data file not found. {e}")
        send_failure_alert(
            subject="Monday Report Failed: Missing Data File",
            body=str(e),
        )
    except smtplib.SMTPException as e:
        logger.error(f"Monday report FAILED: Could not send email. {e}")
        # Report was built — save it locally so we don't lose the work
        save_report_locally(report_path)
    except Exception as e:
        # Catch-all for unexpected failures — log full traceback
        logger.exception(f"Monday report FAILED with unexpected error: {e}")
        send_failure_alert(
            subject="Monday Report Failed: Unexpected Error",
            body=f"{type(e).__name__}: {e}",
        )


def send_failure_alert(subject, body):
    """Send an email or Slack message when a scheduled job fails."""
    # Implementation depends on your notification setup
    # At minimum: print to stderr so the error is visible in logs
    print(f"ALERT: {subject}\n{body}", file=sys.stderr)
    logger.critical(f"JOB FAILURE ALERT: {subject}")

A Defensive Wrapper Decorator

Rather than duplicating the try/except in every job, write a decorator:

import functools
import logging
import traceback

logger = logging.getLogger(__name__)


def scheduled_job(job_name=None):
    """
    Decorator that wraps a scheduled job function with error handling and logging.

    Prevents unhandled exceptions from crashing the scheduler.
    Logs job start, success, failure, and duration.

    Usage:
        @scheduled_job("Monday Report")
        def generate_monday_report():
            ...
    """
    def decorator(func):
        name = job_name or func.__name__

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = datetime.now()
            logger.info(f"[{name}] Job started")
            try:
                result = func(*args, **kwargs)
                elapsed = (datetime.now() - start_time).total_seconds()
                logger.info(f"[{name}] Job completed successfully in {elapsed:.1f}s")
                return result
            except Exception as e:
                elapsed = (datetime.now() - start_time).total_seconds()
                logger.error(
                    f"[{name}] Job FAILED after {elapsed:.1f}s: "
                    f"{type(e).__name__}: {e}\n"
                    f"{traceback.format_exc()}"
                )
                # Don't re-raise — the scheduler should continue
                # Return None to indicate failure
                return None
        return wrapper
    return decorator


# Usage:
@scheduled_job("Monday Executive Report")
def generate_monday_report():
    data = load_sales_data()
    report_path = build_report(data)
    send_email(report_path)

@scheduled_job("Daily Exchange Rate Pull")
def fetch_daily_exchange_rates():
    rates = get_exchange_rates()
    save_rates_to_csv(rates)


schedule.every().monday.at("07:45").do(generate_monday_report)
schedule.every().day.at("06:00").do(fetch_daily_exchange_rates)

22.4 Logging for Scheduled Tasks

Logging is to scheduled automation what an audit trail is to accounting: it tells you what happened, when, and why. A scheduled job without logging is a black box.

Setting Up File-Based Logging

import logging
import logging.handlers
from pathlib import Path
from datetime import datetime

def configure_scheduler_logging(log_dir: str = "logs", log_name: str = "scheduler"):
    """
    Set up rotating file logging for a scheduled job runner.

    Creates:
        logs/scheduler.log — current log file
        logs/scheduler.log.1, .2, ... — rotated archives (up to 30 days)

    Args:
        log_dir: Directory to store log files
        log_name: Base name for the log file
    """
    log_path = Path(log_dir)
    log_path.mkdir(exist_ok=True)

    log_file = log_path / f"{log_name}.log"

    # Root logger — captures everything
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.DEBUG)

    # Console handler — INFO and above to stdout
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(
        logging.Formatter(
            fmt="%(asctime)s  %(levelname)-8s  %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )
    )

    # File handler — DEBUG and above, rotates daily, keeps 30 days
    file_handler = logging.handlers.TimedRotatingFileHandler(
        filename=log_file,
        when="midnight",
        interval=1,
        backupCount=30,
        encoding="utf-8",
    )
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(
        logging.Formatter(
            fmt="%(asctime)s  %(levelname)-8s  %(name)s  %(funcName)s  %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )
    )

    root_logger.addHandler(console_handler)
    root_logger.addHandler(file_handler)

    logging.info(f"Logging initialized. Log file: {log_file}")
    return root_logger

What to Log

Every scheduled job should log:

def my_scheduled_job():
    job_id = datetime.now().strftime("%Y%m%d_%H%M%S")

    logger.info(f"Job started | job_id={job_id}")

    # At each significant milestone:
    logger.info(f"Loading data | source=sales_data.csv | job_id={job_id}")
    records = load_data()
    logger.info(f"Data loaded | records={len(records)} | job_id={job_id}")

    logger.info(f"Generating report | job_id={job_id}")
    report_path = generate_report(records)
    logger.info(f"Report generated | path={report_path} | job_id={job_id}")

    logger.info(f"Sending email | recipients=['sandra@acme.com'] | job_id={job_id}")
    send_email(report_path)
    logger.info(f"Email sent | job_id={job_id}")

    logger.info(f"Job completed | job_id={job_id} | status=SUCCESS")

The job_id pattern lets you trace a single execution through the log even when multiple jobs are running.


22.5 APScheduler: Professional-Grade Python Scheduling

The schedule library is excellent for simple use cases. When you need cron-style scheduling, timezone support, persistent job stores, or background execution without a blocking loop, APScheduler (Advanced Python Scheduler) is the right tool.

Install it:

pip install apscheduler

Core Concepts

APScheduler has three main components:

  • Schedulers: manage and execute jobs (BackgroundScheduler, BlockingScheduler)
  • Job Stores: where job definitions are stored (in memory, SQLite, PostgreSQL)
  • Triggers: define when a job runs (IntervalTrigger, CronTrigger, DateTrigger)

BackgroundScheduler: Running Alongside Your Code

BackgroundScheduler runs jobs in background threads, so it doesn't block your main program:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def generate_daily_report():
    logger.info("Daily report job executing...")
    # ... report logic ...
    logger.info("Daily report complete")


def check_exchange_rates():
    logger.info("Exchange rate check executing...")
    # ... API call logic ...


# Create scheduler
scheduler = BackgroundScheduler()

# Add jobs with different trigger types
scheduler.add_job(
    func=generate_daily_report,
    trigger=CronTrigger(day_of_week="mon-fri", hour=8, minute=0),
    id="daily_report",
    name="Daily Executive Report",
    misfire_grace_time=300,  # Allow up to 5 minutes late if system was busy
    coalesce=True,  # If multiple runs were missed, only run once to catch up
)

scheduler.add_job(
    func=check_exchange_rates,
    trigger=IntervalTrigger(hours=6),
    id="exchange_rates",
    name="Exchange Rate Monitor",
)

# Start the background scheduler
scheduler.start()
logger.info("Scheduler started. Running in background.")

try:
    # Your main program continues here while jobs run in the background
    while True:
        time.sleep(60)
        logger.debug(f"Scheduler active. Jobs: {len(scheduler.get_jobs())}")

except (KeyboardInterrupt, SystemExit):
    logger.info("Shutting down scheduler...")
    scheduler.shutdown()
    logger.info("Scheduler shut down cleanly")

Trigger Types

CronTrigger — for cron-style scheduling:

from apscheduler.triggers.cron import CronTrigger

# Every weekday at 8:00 AM
CronTrigger(day_of_week="mon-fri", hour=8, minute=0)

# Every Monday at 7:45 AM
CronTrigger(day_of_week="mon", hour=7, minute=45)

# First day of every month at midnight
CronTrigger(day=1, hour=0, minute=0)

# Every 15 minutes during business hours on weekdays
CronTrigger(day_of_week="mon-fri", hour="9-17", minute="0,15,30,45")

# Every quarter (January, April, July, October) on the 1st at 6 AM
CronTrigger(month="1,4,7,10", day=1, hour=6)

# Cron string syntax (same as system crontab)
CronTrigger.from_crontab("45 7 * * 1")  # 7:45 AM every Monday

IntervalTrigger — for regular intervals:

from apscheduler.triggers.interval import IntervalTrigger

IntervalTrigger(seconds=30)      # Every 30 seconds
IntervalTrigger(minutes=15)      # Every 15 minutes
IntervalTrigger(hours=4)         # Every 4 hours
IntervalTrigger(days=1)          # Every 24 hours
IntervalTrigger(weeks=1)         # Every 7 days

# With a start time (don't run immediately — start at 9:00 AM tomorrow)
from datetime import datetime, timedelta
IntervalTrigger(
    hours=1,
    start_date=datetime.now().replace(hour=9, minute=0, second=0) + timedelta(days=1),
)

DateTrigger — run once at a specific time:

from apscheduler.triggers.date import DateTrigger
from datetime import datetime

# Run once at a specific time (useful for one-time scheduled tasks)
DateTrigger(run_date=datetime(2025, 1, 1, 0, 0, 0))

# Run 10 minutes from now
DateTrigger(run_date=datetime.now() + timedelta(minutes=10))

Timezone-Aware Scheduling

For any business automation, timezone handling is important. A report scheduled for 8:00 AM should run at 8:00 AM in Chicago, regardless of where the server is located.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz

# Install pytz: pip install pytz

chicago_tz = pytz.timezone("America/Chicago")
new_york_tz = pytz.timezone("America/New_York")

scheduler = BackgroundScheduler(timezone=chicago_tz)

# All cron times are now interpreted in Chicago timezone
scheduler.add_job(
    func=generate_monday_report,
    trigger=CronTrigger(day_of_week="mon", hour=7, minute=45),
    id="monday_report",
)

Persistent Job Stores with SQLite

By default, APScheduler stores job definitions in memory — they are lost when the process restarts. For production use, persist jobs to a database:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

# Jobs are saved in a SQLite database file
job_stores = {
    "default": SQLAlchemyJobStore(url="sqlite:///scheduler_jobs.db")
}

scheduler = BackgroundScheduler(jobstores=job_stores)
scheduler.start()

# Jobs added now persist across restarts
# The first time: add the job
if not scheduler.get_job("monday_report"):
    scheduler.add_job(
        func=generate_monday_report,
        trigger=CronTrigger(day_of_week="mon", hour=7, minute=45),
        id="monday_report",
        replace_existing=True,
    )

For this, you need SQLAlchemy:

pip install apscheduler sqlalchemy

22.6 OS-Level Scheduling

Python-based scheduling (schedule or APScheduler) requires a running Python process. OS-level scheduling launches Python as a fresh process on a defined schedule, which is more robust for production use.

Windows Task Scheduler

Windows Task Scheduler is built into every Windows installation. It can run any program — including a Python script — on a defined schedule, even when no one is logged in.

Via the GUI (Task Scheduler app):

  1. Open Task Scheduler (search for it in the Start menu)
  2. Click "Create Basic Task"
  3. Name: Acme Monday Report
  4. Trigger: Weekly, Monday, 7:45 AM
  5. Action: Start a program
  6. Program/script: C:\Users\priya\AppData\Local\Programs\Python\Python311\python.exe
  7. Arguments: C:\Acme\reports\monday_report.py
  8. Start in: C:\Acme\reports\

Via PowerShell (scriptable and repeatable):

# Create a scheduled task via PowerShell
$TaskAction = New-ScheduledTaskAction `
    -Execute "C:\Users\priya\AppData\Local\Programs\Python\Python311\python.exe" `
    -Argument "C:\Acme\reports\monday_report.py" `
    -WorkingDirectory "C:\Acme\reports\"

$TaskTrigger = New-ScheduledTaskTrigger `
    -Weekly `
    -DaysOfWeek Monday `
    -At "7:45AM"

$TaskSettings = New-ScheduledTaskSettingsSet `
    -ExecutionTimeLimit (New-TimeSpan -Hours 1) `
    -RestartCount 2 `
    -RestartInterval (New-TimeSpan -Minutes 5)

Register-ScheduledTask `
    -TaskName "Acme Monday Report" `
    -Action $TaskAction `
    -Trigger $TaskTrigger `
    -Settings $TaskSettings `
    -RunLevel Highest

Important Windows Task Scheduler settings:

  • Run whether user is logged on or not — required for the task to run unattended
  • Run with highest privileges — needed for scripts that write to system directories or manage other processes
  • Start in (Working Directory) — set this to the directory containing your script; relative file paths in your script resolve from here
  • Configure for — set to your Windows version
  • Execution time limit — prevents a hung script from blocking future runs

Linux/macOS: Cron Jobs

cron is the standard Unix job scheduler. You configure it by editing a "crontab" file.

# Open your user crontab for editing
crontab -e

# View your current crontab
crontab -l

Crontab syntax:

MIN  HOUR  DAY  MONTH  WEEKDAY  command
 *    *     *     *       *      /path/to/command
Field Values Examples
MIN 0-59 0 = top of hour, */15 = every 15 min
HOUR 0-23 8 = 8 AM, */6 = every 6 hours
DAY 1-31 1 = 1st of month, * = every day
MONTH 1-12 1 = January, */3 = every quarter
WEEKDAY 0-7 (0 and 7 = Sunday) 1 = Monday, 1-5 = weekdays

Examples:

# Every Monday at 7:45 AM
45 7 * * 1 /usr/bin/python3 /home/priya/reports/monday_report.py

# Every weekday at 8:00 AM
0 8 * * 1-5 /usr/bin/python3 /home/priya/reports/daily_report.py

# First day of every month at 6 AM
0 6 1 * * /usr/bin/python3 /home/priya/reports/monthly_summary.py

# Every 15 minutes during business hours on weekdays
*/15 9-17 * * 1-5 /usr/bin/python3 /home/priya/monitoring/health_check.py

# Every Friday at 4:30 PM
30 16 * * 5 /usr/bin/python3 /home/priya/reports/weekly_brief.py

Best practices for cron jobs:

# Redirect output to a log file to capture errors
45 7 * * 1 /usr/bin/python3 /home/priya/reports/monday_report.py \
    >> /home/priya/logs/monday_report.log 2>&1

# Use full paths for everything — cron has a minimal PATH
45 7 * * 1 /home/priya/.venv/bin/python /home/priya/reports/monday_report.py \
    >> /home/priya/logs/monday_report.log 2>&1

# Set MAILTO to receive cron output by email
MAILTO=priya@acme.com
45 7 * * 1 /usr/bin/python3 /home/priya/reports/monday_report.py

Crontab validation tools:

Before deploying a cron expression, validate it using an online tool: - https://crontab.guru — type any cron expression and see a plain-English description


22.7 Building a Scheduled Reporting Pipeline

Now let's build something complete. This is a production-quality scheduled reporting pipeline that:

  1. Loads Acme's weekly sales data from CSV files
  2. Calculates key metrics
  3. Generates an Excel report with multiple sheets
  4. Emails the report to Sandra Chen

It is designed to run every Monday at 7:45 AM, but can also be triggered manually.

The code here is an overview — the full implementation is in code/report_pipeline.py.

Pipeline Architecture

monday_report_pipeline()
    ├── load_weekly_sales_data()
    │       └── Reads acme_sales_YYYY-WW.csv from data directory
    ├── calculate_weekly_metrics()
    │       ├── Total revenue, units, margin
    │       ├── Regional breakdown
    │       ├── Top products
    │       └── Week-over-week change
    ├── generate_excel_report()
    │       ├── Sheet 1: Executive Summary
    │       ├── Sheet 2: Regional Detail
    │       └── Sheet 3: Product Rankings
    └── send_report_email()
            └── Attaches Excel file and sends to Sandra

The Pipeline Runner

import schedule
import time
import logging
from datetime import datetime
from pathlib import Path

logger = logging.getLogger(__name__)

# Import pipeline functions (from report_pipeline.py)
from report_pipeline import (
    load_weekly_sales_data,
    calculate_weekly_metrics,
    generate_excel_report,
    send_report_email,
)


def run_monday_report_pipeline():
    """
    Full Monday executive report pipeline.

    This function is the entry point for the scheduled job.
    Each step is logged. Errors are caught and reported.
    """
    run_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    logger.info(f"=" * 55)
    logger.info(f"Monday Report Pipeline — {run_timestamp}")
    logger.info(f"=" * 55)

    try:
        # Step 1: Load data
        logger.info("Step 1/4: Loading weekly sales data...")
        sales_data = load_weekly_sales_data(
            data_directory="data/weekly_sales/",
        )
        logger.info(f"  Loaded {len(sales_data)} sales records")

        # Step 2: Calculate metrics
        logger.info("Step 2/4: Calculating metrics...")
        metrics = calculate_weekly_metrics(sales_data)
        logger.info(
            f"  Total revenue: ${metrics['total_revenue']:,.2f} | "
            f"WoW change: {metrics['wow_change_pct']:+.1f}%"
        )

        # Step 3: Generate Excel report
        logger.info("Step 3/4: Generating Excel report...")
        report_path = generate_excel_report(
            metrics=metrics,
            sales_data=sales_data,
            output_dir="reports/",
        )
        logger.info(f"  Report saved: {report_path}")

        # Step 4: Send email
        logger.info("Step 4/4: Sending email to Sandra Chen...")
        send_report_email(
            report_path=report_path,
            to_email="sandra.chen@acme.com",
            subject=f"Weekly Sales Report — Week of {metrics['week_label']}",
        )
        logger.info(f"  Email sent to sandra.chen@acme.com")

        logger.info(f"Pipeline COMPLETE — Monday report delivered successfully")
        return True

    except FileNotFoundError as e:
        logger.error(f"Pipeline FAILED: Missing data file. {e}")
        logger.error("Check that weekly_sales CSV files are present in data/ directory")
        return False

    except Exception as e:
        logger.exception(f"Pipeline FAILED with unexpected error: {e}")
        return False


# Schedule the pipeline
schedule.every().monday.at("07:45").do(run_monday_report_pipeline)

if __name__ == "__main__":
    configure_scheduler_logging()
    logger.info("Monday Report Scheduler — starting up")
    logger.info("Scheduled: Every Monday at 07:45")

    # Option: run immediately if it's Monday and past 7:45 (catch-up logic)
    if datetime.now().weekday() == 0 and datetime.now().hour >= 7:
        logger.info("It's Monday — running pipeline immediately for catch-up check")
        run_monday_report_pipeline()

    # Main loop
    while True:
        schedule.run_pending()
        time.sleep(30)

22.8 Deployment Considerations

A scheduler that only runs when you open a terminal window is not production automation.

Keeping the Process Running

On a personal Windows machine: Use the Windows Task Scheduler to launch your scheduler script at system startup, not at a specific time. The script runs continuously and handles its own scheduling:

Trigger: At startup
Action: python C:\Acme\scheduler\run_scheduler.py
Run in background: Yes

On a Windows server: Use Windows Services or NSSM (Non-Sucking Service Manager), a free utility that wraps any process as a Windows service.

On Linux: Use systemd to create a service:

# /etc/systemd/system/acme-scheduler.service
[Unit]
Description=Acme Corp Report Scheduler
After=network.target

[Service]
Type=simple
User=priya
WorkingDirectory=/home/priya/acme-reports
ExecStart=/home/priya/.venv/bin/python /home/priya/acme-reports/run_scheduler.py
Restart=always
RestartSec=30
StandardOutput=append:/var/log/acme-scheduler.log
StandardError=append:/var/log/acme-scheduler.log

[Install]
WantedBy=multi-user.target

Enable and start:

sudo systemctl enable acme-scheduler
sudo systemctl start acme-scheduler
sudo systemctl status acme-scheduler

The OS-Level vs. Python-Level Tradeoff

Concern Python scheduler OS scheduler
Setup complexity Low Medium
Requires running process Yes No
Survives system restart Only if service is configured Yes (built-in)
Logging Managed by your code Managed by OS + your code
Monitoring Your responsibility OS provides task history
Multiple jobs Easy in one process One task per job definition
Timezone handling Explicit with pytz OS handles via system timezone

For a single script running on a personal machine: use schedule with Windows Task Scheduler or cron to launch it at startup.

For a server running multiple automated jobs: use APScheduler with a persistent job store.

For a few independent scripts on a server: use OS-level scheduling (cron or Task Scheduler) with each script logging to its own file.

Environment Variables on Scheduled Processes

This is a common gotcha. Environment variables set in your terminal are not automatically available to scheduled processes. The .env file approach from Chapter 21 is the cleanest solution for scheduled scripts:

# At the top of every script run by a scheduler
from dotenv import load_dotenv
import os

# Explicit path to .env file — don't rely on the working directory
load_dotenv(dotenv_path="/absolute/path/to/project/.env")

smtp_password = os.environ.get("SMTP_PASSWORD")
api_key = os.environ.get("NEWS_API_KEY")

For production servers, set environment variables at the system service level rather than using .env files.


22.9 Complete End-to-End Example: Priya's Monday Report

Let's trace the complete implementation, from the raw data files on disk to Sandra Chen's inbox.

The Data

Acme's sales team drops a CSV file into data/weekly_sales/ every Sunday evening. The file is named acme_sales_2024_W43.csv (ISO week numbering). It has columns: date, region, product_category, product_sku, quantity, unit_price, cogs_per_unit.

The Full Configuration

# config.py — all configuration in one place
from pathlib import Path

# Paths
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / "data" / "weekly_sales"
REPORTS_DIR = BASE_DIR / "reports"
LOGS_DIR = BASE_DIR / "logs"

# Report configuration
REPORT_RECIPIENTS = ["sandra.chen@acme.com"]
REPORT_CC = ["marcus.webb@acme.com"]
REPORT_SENDER_NAME = "Acme Analytics"

# Schedule
MONDAY_REPORT_TIME = "07:45"
DAILY_EXCHANGE_RATE_TIME = "06:00"

# Ensure directories exist
DATA_DIR.mkdir(parents=True, exist_ok=True)
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)

What the Schedule Looks Like

# run_scheduler.py
import schedule
import time
import logging
from config import MONDAY_REPORT_TIME, DAILY_EXCHANGE_RATE_TIME

from pipeline.monday_report import run_monday_report_pipeline
from pipeline.exchange_rates import run_daily_exchange_rate_pull
from pipeline.health_check import run_system_health_check

# Monday morning: full report pipeline
schedule.every().monday.at(MONDAY_REPORT_TIME).do(run_monday_report_pipeline)

# Every weekday morning: exchange rate update
schedule.every().weekday.at(DAILY_EXCHANGE_RATE_TIME).do(run_daily_exchange_rate_pull)

# Every hour: health check (confirm data files are being deposited)
schedule.every().hour.do(run_system_health_check)

logging.info("Scheduler configured. Waiting for jobs...")
while True:
    schedule.run_pending()
    time.sleep(60)

22.10 Monitoring Your Scheduled Jobs

A scheduler that runs silently in the background is only trustworthy if you have a way to verify it is actually running.

Log Monitoring

The simplest monitoring: check the log file. Each job writes a start and end message. A missing entry means the job didn't run.

For Windows, use PowerShell to check the last job run:

# Find the last line containing "Monday Report Pipeline"
Select-String -Path "logs\scheduler.log" -Pattern "Monday Report Pipeline" | Select-Object -Last 5

For Linux/macOS:

# Check the last 20 lines of the log
tail -20 logs/scheduler.log

# Find all failures in the last week
grep "FAILED\|ERROR" logs/scheduler.log | tail -30

Health Check Job

Add a "heartbeat" job that writes a timestamp to a known file. You can check whether the file is recent to confirm the scheduler is alive:

from pathlib import Path
from datetime import datetime

def scheduler_heartbeat():
    """Write current timestamp to a heartbeat file. Confirms scheduler is alive."""
    heartbeat_path = Path("logs/scheduler_heartbeat.txt")
    with open(heartbeat_path, "w") as f:
        f.write(datetime.now().isoformat())


schedule.every(5).minutes.do(scheduler_heartbeat)


def check_scheduler_alive(max_age_minutes=10):
    """
    Return True if the scheduler has run a heartbeat within the last max_age_minutes.
    Can be called from a separate monitoring script.
    """
    heartbeat_path = Path("logs/scheduler_heartbeat.txt")
    if not heartbeat_path.exists():
        return False

    last_modified_seconds = (
        datetime.now().timestamp() - heartbeat_path.stat().st_mtime
    )
    return last_modified_seconds < (max_age_minutes * 60)

Chapter Summary

Scheduling transforms scripts from tools into infrastructure. The key ideas:

  • The schedule library provides a simple, readable API for in-process job scheduling. Use it for scripts that run on a personal machine where a Python process can remain running.

  • APScheduler provides production-grade features: cron expressions, timezone awareness, persistent job stores, and background execution. Use it when your scheduling requirements outgrow schedule.

  • OS-level scheduling (Windows Task Scheduler / cron) is the most reliable approach for production: it launches Python as a fresh process, survives reboots, and doesn't require a running Python process.

  • Error handling in scheduled jobs is non-negotiable. Every job must catch its own exceptions, log them, and continue — a failed job should never crash the scheduler.

  • Logging is how you know what's working. Time-rotating log files, structured log messages with job IDs, and heartbeat files give you the visibility to trust your automation.

  • Deployment matters. A script that only runs when the terminal is open is not production automation. Configure a service, a startup task, or a system-level scheduler to ensure jobs run unattended.

Priya's Monday report now runs at 7:45 AM every Monday. Sandra receives it before the 9 AM meeting. Priya doesn't think about it anymore. That is the goal.


Next: Chapter 23 — Database Basics: SQL + Python with SQLite and PostgreSQL