Scheduling and Logging Automation Jobs

A script that runs only when you remember to start it is not automation — it is manual work with extra steps. The gap between a working script and a reliable unattended job comes down to three concerns: triggering it on a schedule, recording what happened, and recovering (or alerting) when something breaks. This guide covers all three for document and data pipelines.

Prerequisites

# pip install schedule tenacity
python -m venv .venv
source .venv/bin/activate          # Windows: .venv\Scripts\activate
pip install schedule tenacity

# Verify
python -c "import schedule, tenacity; print('OK')"

For email alerts you also need access to an SMTP server (or a free transactional relay such as SendGrid). For webhook alerts, any HTTP endpoint works — Slack incoming webhooks are the most common choice.

Diagnostic: Confirm Your Runtime Environment

Before scheduling anything, confirm the Python and file paths the scheduler will actually see. Cron and Task Scheduler both run with a minimal environment; your interactive shell's PATH and activated virtualenv are absent.

# pip install (none — stdlib only)
"""
Run this script from the scheduler (not your terminal) and check the log file.
It captures the runtime environment so you can diagnose PATH/venv issues before
adding real pipeline logic.
"""
import sys
import os
import logging
from pathlib import Path

LOG_PATH = Path("/tmp/env_check.log")

logging.basicConfig(
    filename=LOG_PATH,
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)s %(message)s",
)

logging.info("Python: %s", sys.executable)
logging.info("Version: %s", sys.version)
logging.info("CWD: %s", Path.cwd())
logging.info("PATH: %s", os.environ.get("PATH", "(not set)"))
logging.info("VIRTUAL_ENV: %s", os.environ.get("VIRTUAL_ENV", "(not set)"))
logging.info("Script location: %s", Path(__file__).resolve())

Schedule this script first. If the log file shows the wrong Python executable, fix your scheduler invocation (absolute path to the venv interpreter) before proceeding.

Core Implementation

Step 1 — Wrap Your Job in a Callable

Every scheduler expects a zero-argument callable. Wrap your pipeline logic in a function that accepts no positional arguments and returns on success or raises on failure.

# pip install pdfplumber pandas openpyxl
"""
Step 1: Wrap the pipeline as a zero-argument callable.
Pairs with the extracting-PDF-data step that feeds data into pandas.
"""
import logging
from pathlib import Path

import pdfplumber
import pandas as pd

INPUT_DIR = Path("/data/incoming")
OUTPUT_PATH = Path("/data/reports/daily_summary.xlsx")

logger = logging.getLogger(__name__)


def run_daily_pipeline() -> None:
    """Extract tables from every PDF in INPUT_DIR and write a combined Excel report."""
    frames = []
    pdf_files = list(INPUT_DIR.glob("*.pdf"))
    if not pdf_files:
        logger.warning("No PDFs found in %s", INPUT_DIR)
        return

    for pdf_path in pdf_files:
        try:
            with pdfplumber.open(pdf_path) as pdf:
                for page in pdf.pages:
                    table = page.extract_table()
                    if table:
                        df = pd.DataFrame(table[1:], columns=table[0])
                        df["_source"] = pdf_path.name
                        frames.append(df)
        except Exception:
            logger.exception("Failed to parse %s", pdf_path.name)

    if not frames:
        logger.warning("No tables extracted — nothing to write")
        return

    combined = pd.concat(frames, ignore_index=True)
    OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
    combined.to_excel(OUTPUT_PATH, index=False)
    logger.info("Wrote %d rows to %s", len(combined), OUTPUT_PATH)

The same wrapping pattern applies whether your pipeline uses pdfplumber to pull raw tables or goes through the full Extracting PDF Data into pandas workflow first.

Step 2 — Add Structured Logging

The stdlib logging module is sufficient for most pipelines. Use a RotatingFileHandler so logs do not grow unbounded, and emit at the right level so production logs stay readable.

# pip install (none — stdlib only)
"""
Step 2: Configure structured, rotating file logging.
Call configure_logging() once at the top of your entry-point script.
"""
import logging
import logging.handlers
from pathlib import Path

LOG_DIR = Path("/var/log/doc-pipeline")


def configure_logging(name: str, level: int = logging.INFO) -> logging.Logger:
    LOG_DIR.mkdir(parents=True, exist_ok=True)
    log_file = LOG_DIR / f"{name}.log"

    handler = logging.handlers.RotatingFileHandler(
        log_file,
        maxBytes=5 * 1024 * 1024,   # 5 MB per file
        backupCount=7,               # keep one week of rotations
        encoding="utf-8",
    )
    formatter = logging.Formatter(
        fmt="%(asctime)s %(name)s %(levelname)s %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%S",
    )
    handler.setFormatter(formatter)

    root = logging.getLogger()
    root.setLevel(level)
    root.addHandler(handler)

    # Also write WARNING+ to stderr so cron can capture it in its mail
    stderr_handler = logging.StreamHandler()
    stderr_handler.setLevel(logging.WARNING)
    stderr_handler.setFormatter(formatter)
    root.addHandler(stderr_handler)

    return logging.getLogger(name)

Log at DEBUG inside inner loops, INFO at job boundaries (started / finished N rows), WARNING for recoverable anomalies (empty input, skipped file), and ERROR/CRITICAL for failures that need attention.

Step 3 — Add Retry and Backoff

Transient failures — network timeouts, locked files, momentarily unavailable APIs — should not abort the whole job. Use tenacity for declarative retry logic, or a hand-rolled decorator when you want zero extra dependencies.

# pip install tenacity
"""
Step 3: Retry with exponential backoff using tenacity.
Apply @retry_transient to any function that calls a network or file-system resource.
"""
import logging
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
)

logger = logging.getLogger(__name__)

# Tenacity decorator — 4 attempts, 2 s → 4 s → 8 s backoff
retry_transient = retry(
    reraise=True,
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type((IOError, TimeoutError, ConnectionError)),
    before_sleep=before_sleep_log(logger, logging.WARNING),
)


@retry_transient
def fetch_remote_report(url: str) -> bytes:
    import urllib.request
    with urllib.request.urlopen(url, timeout=15) as resp:
        return resp.read()


# Hand-rolled alternative (zero deps)
import functools
import time


def retry(attempts: int = 3, delay: float = 2.0, backoff: float = 2.0):
    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            wait = delay
            for attempt in range(1, attempts + 1):
                try:
                    return fn(*args, **kwargs)
                except Exception as exc:
                    if attempt == attempts:
                        raise
                    logger.warning(
                        "%s attempt %d/%d failed: %s — retrying in %.1fs",
                        fn.__name__, attempt, attempts, exc, wait,
                    )
                    time.sleep(wait)
                    wait *= backoff
        return wrapper
    return decorator

Step 4 — Schedule the Job

Pick the scheduler that matches your environment.

cron (Linux/macOS)

# Run daily_pipeline.py every weekday at 06:30 using the venv interpreter
crontab -e
30 6 * * 1-5 /data/.venv/bin/python /data/scripts/daily_pipeline.py >> /var/log/doc-pipeline/cron.log 2>&1

Always use the absolute path to the venv's python, not python3 or python. The >> ... 2>&1 captures any stderr (uncaught exceptions) alongside cron's own output mail.

Windows Task Scheduler

rem Trigger: Daily at 06:30
rem Program/script: C:\data\.venv\Scripts\python.exe
rem Arguments:    C:\data\scripts\daily_pipeline.py
rem Start in:     C:\data\scripts

Set "Run whether user is logged on or not" and "Run with highest privileges" if the script writes to protected paths.

GitHub Actions (CI/cloud)

# .github/workflows/daily_pipeline.yml
name: Daily Document Pipeline

on:
  schedule:
    - cron: "30 6 * * 1-5"   # UTC
  workflow_dispatch:           # allow manual trigger

jobs:
  run-pipeline:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - name: Install deps
        run: pip install -r requirements.txt
      - name: Run pipeline
        env:
          SMTP_HOST: ${{ secrets.SMTP_HOST }}
          SMTP_PASS: ${{ secrets.SMTP_PASS }}
        run: python scripts/daily_pipeline.py --output /tmp/report.xlsx
      - name: Upload report
        uses: actions/upload-artifact@v4
        with:
          name: daily-report
          path: /tmp/report.xlsx

schedule library (long-running process)

# pip install schedule
"""
Use the schedule library when your process runs continuously (e.g., inside a container
or a systemd service) rather than being launched fresh by cron.
"""
import time
import logging
import schedule

logger = logging.getLogger(__name__)


def job_with_guard() -> None:
    logger.info("Job started")
    try:
        run_daily_pipeline()          # from Step 1
        logger.info("Job finished OK")
    except Exception:
        logger.exception("Job failed")


schedule.every().day.at("06:30").do(job_with_guard)

if __name__ == "__main__":
    logger.info("Scheduler started — waiting for next run")
    while True:
        schedule.run_pending()
        time.sleep(30)

Scheduling and logging job flow A scheduler trigger fires a job run. The job writes to a log. On failure it retries with backoff; on persistent failure it sends an alert. On success it writes output and exits 0. Scheduler cron / Actions Lock check skip if running Job run logging + retry/backoff Success exit 0, write output Failure alert email / webhook Rotating log .log + backups already running → skip

Edge Cases and Variants

Overlapping Runs and Lock Files

If your job takes longer than its scheduling interval, two instances can run simultaneously and corrupt shared output files. A lock file prevents this.

# pip install (none — stdlib only)
"""
Lock file guard — place at the start of your entry-point script.
Uses O_CREAT | O_EXCL so that creation is atomic on POSIX systems.
"""
import os
import sys
import logging
from pathlib import Path

LOCK_PATH = Path("/tmp/doc_pipeline.lock")
logger = logging.getLogger(__name__)


def acquire_lock() -> bool:
    try:
        fd = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
        os.write(fd, str(os.getpid()).encode())
        os.close(fd)
        return True
    except FileExistsError:
        return False


def release_lock() -> None:
    try:
        LOCK_PATH.unlink()
    except FileNotFoundError:
        pass


if __name__ == "__main__":
    if not acquire_lock():
        logger.warning("Another instance is running — exiting")
        sys.exit(0)
    try:
        run_daily_pipeline()   # your job from Step 1
    finally:
        release_lock()

On Windows use msvcrt.locking or a named mutex instead of O_EXCL.

Failure Alerting (Email and Webhook)

# pip install (none — stdlib only; requests for webhook)
"""
Send an alert on job failure. Call send_email_alert() or send_webhook_alert()
inside your except block.
"""
import smtplib
import json
import urllib.request
import os
from email.message import EmailMessage
from pathlib import Path

SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.example.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
SMTP_USER = os.environ.get("SMTP_USER", "[email protected]")
SMTP_PASS = os.environ.get("SMTP_PASS", "")
ALERT_TO = os.environ.get("ALERT_TO", "[email protected]")
WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")


def send_email_alert(subject: str, body: str) -> None:
    msg = EmailMessage()
    msg["Subject"] = subject
    msg["From"] = SMTP_USER
    msg["To"] = ALERT_TO
    msg.set_content(body)
    try:
        with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as s:
            s.starttls()
            s.login(SMTP_USER, SMTP_PASS)
            s.send_message(msg)
    except Exception:
        import logging
        logging.getLogger(__name__).exception("Failed to send alert email")


def send_webhook_alert(text: str) -> None:
    if not WEBHOOK_URL:
        return
    payload = json.dumps({"text": text}).encode()
    req = urllib.request.Request(
        WEBHOOK_URL,
        data=payload,
        headers={"Content-Type": "application/json"},
        method="POST",
    )
    try:
        with urllib.request.urlopen(req, timeout=10):
            pass
    except Exception:
        import logging
        logging.getLogger(__name__).exception("Failed to send webhook alert")

Store credentials in environment variables, never in source code. On GitHub Actions they go in repository Secrets; on a server use a .env file owned by the service account.

Idempotency

A job that can safely re-run without duplicating output is far easier to operate. Track processed files in a small SQLite ledger.

# pip install (none — stdlib only)
"""
Idempotency ledger: record each processed file by content hash.
Re-running the job skips files already in the ledger.
"""
import sqlite3
import hashlib
from pathlib import Path

LEDGER_PATH = Path("/data/pipeline_ledger.db")


def _get_db() -> sqlite3.Connection:
    conn = sqlite3.connect(LEDGER_PATH)
    conn.execute(
        "CREATE TABLE IF NOT EXISTS processed "
        "(file_hash TEXT PRIMARY KEY, path TEXT, processed_at TEXT)"
    )
    conn.commit()
    return conn


def file_hash(path: Path) -> str:
    h = hashlib.sha256()
    h.update(path.read_bytes())
    return h.hexdigest()


def already_processed(path: Path) -> bool:
    fh = file_hash(path)
    conn = _get_db()
    row = conn.execute(
        "SELECT 1 FROM processed WHERE file_hash = ?", (fh,)
    ).fetchone()
    conn.close()
    return row is not None


def mark_processed(path: Path) -> None:
    from datetime import datetime, timezone
    fh = file_hash(path)
    conn = _get_db()
    conn.execute(
        "INSERT OR IGNORE INTO processed (file_hash, path, processed_at) VALUES (?, ?, ?)",
        (fh, str(path), datetime.now(timezone.utc).isoformat()),
    )
    conn.commit()
    conn.close()

Validation and Health Check

After the job runs, assert the output is sane before writing it to its final destination.

# pip install pandas openpyxl
"""
Validation step: check row count, required columns, and absence of all-null rows.
Raises ValueError on any anomaly so the caller can catch and alert.
"""
import pandas as pd
from pathlib import Path


def validate_output(path: Path, min_rows: int = 1) -> None:
    df = pd.read_excel(path)
    if len(df) < min_rows:
        raise ValueError(f"Output has {len(df)} rows — expected at least {min_rows}")

    required_cols = {"date", "amount", "_source"}
    missing = required_cols - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns: {missing}")

    if df.isnull().all(axis=1).any():
        raise ValueError("Output contains fully null rows")

The same validation pattern applies when Generating Reports from Pipeline Data — check the report's row count and spot-sample a few cells before emailing it.

Performance and Scale Notes

  • Memory: if INPUT_DIR holds hundreds of large PDFs, do not load all frames into a list. Write each parsed DataFrame to a staging Parquet file, then concatenate from disk at the end.
  • Chunking: for large CSVs fed into the pipeline, use pd.read_csv(..., chunksize=10_000) and process each chunk inside the retry-wrapped function.
  • Parallelism: concurrent.futures.ProcessPoolExecutor works well for CPU-bound OCR batches; keep pool size to os.cpu_count() - 1 to leave headroom for the OS.
  • GitHub Actions minutes: free tier gives 2,000 minutes/month. A 5-minute daily pipeline costs ~150 minutes/month — well within budget. Cache the pip install step with actions/cache to cut run time by ~80%.
  • schedule library drift: the schedule library accumulates small timing drift over days. For production use with strict timing, prefer cron or GitHub Actions, which are backed by system clocks.

Troubleshooting

SymptomRoot causeFix
ModuleNotFoundError in cron but not in terminalCron uses /usr/bin/python3, not your venvUse the absolute path to .venv/bin/python in the crontab line
Script runs in terminal but not in Task Scheduler"Start in" directory not set; relative paths resolve against C:\Windows\System32Set "Start in" to the script's directory; use Path(__file__).parent for relative paths
FileNotFoundError for a dependency binary (e.g., Tesseract, Java)Cron's PATH is /usr/bin:/bin — system-installed tools may be in /usr/local/binAdd PATH=/usr/local/bin:/usr/bin:/bin at the top of the crontab, or use absolute paths in subprocess.run()
Lock file left behind after a crashPrevious run died before calling release_lock()Add a staleness check: if the PID in the lock file is not in psutil.pids(), delete it and proceed
GitHub Actions job silently skipsCron schedule is parsed in UTC; your intended local time differsConvert your local time to UTC explicitly; add workflow_dispatch: for manual testing

Complete Working Script

# pip install pdfplumber pandas openpyxl tenacity schedule
"""
daily_pipeline.py — self-contained document pipeline with scheduling, logging,
lock-file guard, retry, idempotency, validation, and failure alerting.

Usage:
  python daily_pipeline.py                 # run once and exit
  python daily_pipeline.py --daemon        # run on internal schedule (06:30 daily)
  python daily_pipeline.py --input /path   # override input directory
"""
import argparse
import hashlib
import logging
import logging.handlers
import os
import sqlite3
import sys
import time
from datetime import datetime, timezone
from pathlib import Path

import pdfplumber
import pandas as pd
import schedule
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    before_sleep_log,
)

# ── Config (override via CLI or environment) ──────────────────────────────────
INPUT_DIR = Path(os.environ.get("PIPELINE_INPUT_DIR", "/data/incoming"))
OUTPUT_DIR = Path(os.environ.get("PIPELINE_OUTPUT_DIR", "/data/reports"))
LOG_DIR = Path(os.environ.get("PIPELINE_LOG_DIR", "/var/log/doc-pipeline"))
LEDGER_PATH = Path(os.environ.get("PIPELINE_LEDGER", "/data/pipeline_ledger.db"))
LOCK_PATH = Path("/tmp/doc_pipeline.lock")
SMTP_HOST = os.environ.get("SMTP_HOST", "")
SMTP_USER = os.environ.get("SMTP_USER", "")
SMTP_PASS = os.environ.get("SMTP_PASS", "")
ALERT_TO = os.environ.get("ALERT_TO", "")
WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")

# ── Logging setup ─────────────────────────────────────────────────────────────

def configure_logging() -> logging.Logger:
    LOG_DIR.mkdir(parents=True, exist_ok=True)
    handler = logging.handlers.RotatingFileHandler(
        LOG_DIR / "pipeline.log",
        maxBytes=5 * 1024 * 1024,
        backupCount=7,
        encoding="utf-8",
    )
    fmt = logging.Formatter(
        "%(asctime)s %(name)s %(levelname)s %(message)s",
        datefmt="%Y-%m-%dT%H:%M:%S",
    )
    handler.setFormatter(fmt)
    stderr = logging.StreamHandler()
    stderr.setLevel(logging.WARNING)
    stderr.setFormatter(fmt)
    root = logging.getLogger()
    root.setLevel(logging.INFO)
    root.addHandler(handler)
    root.addHandler(stderr)
    return logging.getLogger("pipeline")


logger = configure_logging()

# ── Lock file ─────────────────────────────────────────────────────────────────

def acquire_lock() -> bool:
    try:
        fd = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
        os.write(fd, str(os.getpid()).encode())
        os.close(fd)
        return True
    except FileExistsError:
        return False


def release_lock() -> None:
    try:
        LOCK_PATH.unlink()
    except FileNotFoundError:
        pass

# ── Idempotency ledger ────────────────────────────────────────────────────────

def _db() -> sqlite3.Connection:
    conn = sqlite3.connect(LEDGER_PATH)
    conn.execute(
        "CREATE TABLE IF NOT EXISTS processed "
        "(file_hash TEXT PRIMARY KEY, path TEXT, processed_at TEXT)"
    )
    conn.commit()
    return conn


def _hash(path: Path) -> str:
    return hashlib.sha256(path.read_bytes()).hexdigest()


def already_done(path: Path) -> bool:
    fh = _hash(path)
    conn = _db()
    found = conn.execute(
        "SELECT 1 FROM processed WHERE file_hash = ?", (fh,)
    ).fetchone()
    conn.close()
    return found is not None


def mark_done(path: Path) -> None:
    conn = _db()
    conn.execute(
        "INSERT OR IGNORE INTO processed (file_hash, path, processed_at) VALUES (?, ?, ?)",
        (_hash(path), str(path), datetime.now(timezone.utc).isoformat()),
    )
    conn.commit()
    conn.close()

# ── Retry decorator ───────────────────────────────────────────────────────────

retry_io = retry(
    reraise=True,
    stop=stop_after_attempt(4),
    wait=wait_exponential(multiplier=1, min=2, max=30),
    retry=retry_if_exception_type((IOError, TimeoutError)),
    before_sleep=before_sleep_log(logger, logging.WARNING),
)

# ── Alert helpers ─────────────────────────────────────────────────────────────

def _alert(subject: str, body: str) -> None:
    import json
    import smtplib
    import urllib.request
    from email.message import EmailMessage

    if SMTP_HOST and ALERT_TO:
        try:
            msg = EmailMessage()
            msg["Subject"] = subject
            msg["From"] = SMTP_USER
            msg["To"] = ALERT_TO
            msg.set_content(body)
            with smtplib.SMTP(SMTP_HOST, 587) as s:
                s.starttls()
                s.login(SMTP_USER, SMTP_PASS)
                s.send_message(msg)
        except Exception:
            logger.exception("Email alert failed")

    if WEBHOOK_URL:
        payload = json.dumps({"text": f"*{subject}*\n{body}"}).encode()
        req = urllib.request.Request(
            WEBHOOK_URL,
            data=payload,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        try:
            with urllib.request.urlopen(req, timeout=10):
                pass
        except Exception:
            logger.exception("Webhook alert failed")

# ── Core extraction ───────────────────────────────────────────────────────────

@retry_io
def _extract_pdf(pdf_path: Path) -> list[pd.DataFrame]:
    frames = []
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            table = page.extract_table()
            if table and len(table) > 1:
                df = pd.DataFrame(table[1:], columns=table[0])
                df["_source"] = pdf_path.name
                frames.append(df)
    return frames

# ── Main job ──────────────────────────────────────────────────────────────────

def run_job(input_dir: Path, output_dir: Path) -> None:
    logger.info("Job started — scanning %s", input_dir)
    pdf_files = [p for p in input_dir.glob("*.pdf") if not already_done(p)]

    if not pdf_files:
        logger.info("No new PDFs to process")
        return

    frames: list[pd.DataFrame] = []
    for pdf_path in pdf_files:
        try:
            extracted = _extract_pdf(pdf_path)
            frames.extend(extracted)
            mark_done(pdf_path)
            logger.info("Parsed %s%d table(s)", pdf_path.name, len(extracted))
        except Exception:
            logger.exception("Skipping %s after retries exhausted", pdf_path.name)

    if not frames:
        logger.warning("No tables extracted from %d file(s)", len(pdf_files))
        return

    combined = pd.concat(frames, ignore_index=True)
    output_dir.mkdir(parents=True, exist_ok=True)
    today = datetime.now(timezone.utc).strftime("%Y%m%d")
    out_path = output_dir / f"pipeline_{today}.xlsx"
    combined.to_excel(out_path, index=False)
    logger.info("Wrote %d rows to %s", len(combined), out_path)


def run_with_guard(input_dir: Path, output_dir: Path) -> None:
    if not acquire_lock():
        logger.warning("Another instance is running — skipping this run")
        return
    try:
        run_job(input_dir, output_dir)
    except Exception as exc:
        logger.exception("Job failed with unhandled exception")
        _alert(
            subject="[doc-pipeline] Job FAILED",
            body=f"Exception: {exc}\nSee {LOG_DIR / 'pipeline.log'} for details.",
        )
        sys.exit(1)
    finally:
        release_lock()

# ── Entry point ───────────────────────────────────────────────────────────────

def main() -> None:
    parser = argparse.ArgumentParser(description="Document extraction pipeline")
    parser.add_argument("--input", type=Path, default=INPUT_DIR)
    parser.add_argument("--output", type=Path, default=OUTPUT_DIR)
    parser.add_argument(
        "--daemon",
        action="store_true",
        help="Run on internal schedule (06:30 daily) instead of once",
    )
    args = parser.parse_args()

    if args.daemon:
        logger.info("Daemon mode — scheduling daily at 06:30")
        schedule.every().day.at("06:30").do(
            run_with_guard, input_dir=args.input, output_dir=args.output
        )
        while True:
            schedule.run_pending()
            time.sleep(30)
    else:
        run_with_guard(input_dir=args.input, output_dir=args.output)


if __name__ == "__main__":
    main()

Part of Automating Document & Data Pipelines.