Scheduling and Logging Automation Jobs
A script that runs only when you remember to start it is not automation — it is manual work with extra steps. The gap between a working script and a reliable unattended job comes down to three concerns: triggering it on a schedule, recording what happened, and recovering (or alerting) when something breaks. This guide covers all three for document and data pipelines.
Prerequisites
# pip install schedule tenacity
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install schedule tenacity
# Verify
python -c "import schedule, tenacity; print('OK')"
For email alerts you also need access to an SMTP server (or a free transactional relay such as SendGrid). For webhook alerts, any HTTP endpoint works — Slack incoming webhooks are the most common choice.
Diagnostic: Confirm Your Runtime Environment
Before scheduling anything, confirm the Python and file paths the scheduler will actually see. Cron and Task Scheduler both run with a minimal environment; your interactive shell's PATH and activated virtualenv are absent.
# pip install (none — stdlib only)
"""
Run this script from the scheduler (not your terminal) and check the log file.
It captures the runtime environment so you can diagnose PATH/venv issues before
adding real pipeline logic.
"""
import sys
import os
import logging
from pathlib import Path
LOG_PATH = Path("/tmp/env_check.log")
logging.basicConfig(
filename=LOG_PATH,
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(message)s",
)
logging.info("Python: %s", sys.executable)
logging.info("Version: %s", sys.version)
logging.info("CWD: %s", Path.cwd())
logging.info("PATH: %s", os.environ.get("PATH", "(not set)"))
logging.info("VIRTUAL_ENV: %s", os.environ.get("VIRTUAL_ENV", "(not set)"))
logging.info("Script location: %s", Path(__file__).resolve())
Schedule this script first. If the log file shows the wrong Python executable, fix your scheduler invocation (absolute path to the venv interpreter) before proceeding.
Core Implementation
Step 1 — Wrap Your Job in a Callable
Every scheduler expects a zero-argument callable. Wrap your pipeline logic in a function that accepts no positional arguments and returns on success or raises on failure.
# pip install pdfplumber pandas openpyxl
"""
Step 1: Wrap the pipeline as a zero-argument callable.
Pairs with the extracting-PDF-data step that feeds data into pandas.
"""
import logging
from pathlib import Path
import pdfplumber
import pandas as pd
INPUT_DIR = Path("/data/incoming")
OUTPUT_PATH = Path("/data/reports/daily_summary.xlsx")
logger = logging.getLogger(__name__)
def run_daily_pipeline() -> None:
"""Extract tables from every PDF in INPUT_DIR and write a combined Excel report."""
frames = []
pdf_files = list(INPUT_DIR.glob("*.pdf"))
if not pdf_files:
logger.warning("No PDFs found in %s", INPUT_DIR)
return
for pdf_path in pdf_files:
try:
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
table = page.extract_table()
if table:
df = pd.DataFrame(table[1:], columns=table[0])
df["_source"] = pdf_path.name
frames.append(df)
except Exception:
logger.exception("Failed to parse %s", pdf_path.name)
if not frames:
logger.warning("No tables extracted — nothing to write")
return
combined = pd.concat(frames, ignore_index=True)
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
combined.to_excel(OUTPUT_PATH, index=False)
logger.info("Wrote %d rows to %s", len(combined), OUTPUT_PATH)
The same wrapping pattern applies whether your pipeline uses pdfplumber to pull raw tables or goes through the full Extracting PDF Data into pandas workflow first.
Step 2 — Add Structured Logging
The stdlib logging module is sufficient for most pipelines. Use a RotatingFileHandler so logs do not grow unbounded, and emit at the right level so production logs stay readable.
# pip install (none — stdlib only)
"""
Step 2: Configure structured, rotating file logging.
Call configure_logging() once at the top of your entry-point script.
"""
import logging
import logging.handlers
from pathlib import Path
LOG_DIR = Path("/var/log/doc-pipeline")
def configure_logging(name: str, level: int = logging.INFO) -> logging.Logger:
LOG_DIR.mkdir(parents=True, exist_ok=True)
log_file = LOG_DIR / f"{name}.log"
handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=5 * 1024 * 1024, # 5 MB per file
backupCount=7, # keep one week of rotations
encoding="utf-8",
)
formatter = logging.Formatter(
fmt="%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
handler.setFormatter(formatter)
root = logging.getLogger()
root.setLevel(level)
root.addHandler(handler)
# Also write WARNING+ to stderr so cron can capture it in its mail
stderr_handler = logging.StreamHandler()
stderr_handler.setLevel(logging.WARNING)
stderr_handler.setFormatter(formatter)
root.addHandler(stderr_handler)
return logging.getLogger(name)
Log at DEBUG inside inner loops, INFO at job boundaries (started / finished N rows), WARNING for recoverable anomalies (empty input, skipped file), and ERROR/CRITICAL for failures that need attention.
Step 3 — Add Retry and Backoff
Transient failures — network timeouts, locked files, momentarily unavailable APIs — should not abort the whole job. Use tenacity for declarative retry logic, or a hand-rolled decorator when you want zero extra dependencies.
# pip install tenacity
"""
Step 3: Retry with exponential backoff using tenacity.
Apply @retry_transient to any function that calls a network or file-system resource.
"""
import logging
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
logger = logging.getLogger(__name__)
# Tenacity decorator — 4 attempts, 2 s → 4 s → 8 s backoff
retry_transient = retry(
reraise=True,
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type((IOError, TimeoutError, ConnectionError)),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
@retry_transient
def fetch_remote_report(url: str) -> bytes:
import urllib.request
with urllib.request.urlopen(url, timeout=15) as resp:
return resp.read()
# Hand-rolled alternative (zero deps)
import functools
import time
def retry(attempts: int = 3, delay: float = 2.0, backoff: float = 2.0):
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
wait = delay
for attempt in range(1, attempts + 1):
try:
return fn(*args, **kwargs)
except Exception as exc:
if attempt == attempts:
raise
logger.warning(
"%s attempt %d/%d failed: %s — retrying in %.1fs",
fn.__name__, attempt, attempts, exc, wait,
)
time.sleep(wait)
wait *= backoff
return wrapper
return decorator
Step 4 — Schedule the Job
Pick the scheduler that matches your environment.
cron (Linux/macOS)
# Run daily_pipeline.py every weekday at 06:30 using the venv interpreter
crontab -e
30 6 * * 1-5 /data/.venv/bin/python /data/scripts/daily_pipeline.py >> /var/log/doc-pipeline/cron.log 2>&1
Always use the absolute path to the venv's python, not python3 or python. The >> ... 2>&1 captures any stderr (uncaught exceptions) alongside cron's own output mail.
Windows Task Scheduler
rem Trigger: Daily at 06:30
rem Program/script: C:\data\.venv\Scripts\python.exe
rem Arguments: C:\data\scripts\daily_pipeline.py
rem Start in: C:\data\scripts
Set "Run whether user is logged on or not" and "Run with highest privileges" if the script writes to protected paths.
GitHub Actions (CI/cloud)
# .github/workflows/daily_pipeline.yml
name: Daily Document Pipeline
on:
schedule:
- cron: "30 6 * * 1-5" # UTC
workflow_dispatch: # allow manual trigger
jobs:
run-pipeline:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install deps
run: pip install -r requirements.txt
- name: Run pipeline
env:
SMTP_HOST: ${{ secrets.SMTP_HOST }}
SMTP_PASS: ${{ secrets.SMTP_PASS }}
run: python scripts/daily_pipeline.py --output /tmp/report.xlsx
- name: Upload report
uses: actions/upload-artifact@v4
with:
name: daily-report
path: /tmp/report.xlsx
schedule library (long-running process)
# pip install schedule
"""
Use the schedule library when your process runs continuously (e.g., inside a container
or a systemd service) rather than being launched fresh by cron.
"""
import time
import logging
import schedule
logger = logging.getLogger(__name__)
def job_with_guard() -> None:
logger.info("Job started")
try:
run_daily_pipeline() # from Step 1
logger.info("Job finished OK")
except Exception:
logger.exception("Job failed")
schedule.every().day.at("06:30").do(job_with_guard)
if __name__ == "__main__":
logger.info("Scheduler started — waiting for next run")
while True:
schedule.run_pending()
time.sleep(30)
Edge Cases and Variants
Overlapping Runs and Lock Files
If your job takes longer than its scheduling interval, two instances can run simultaneously and corrupt shared output files. A lock file prevents this.
# pip install (none — stdlib only)
"""
Lock file guard — place at the start of your entry-point script.
Uses O_CREAT | O_EXCL so that creation is atomic on POSIX systems.
"""
import os
import sys
import logging
from pathlib import Path
LOCK_PATH = Path("/tmp/doc_pipeline.lock")
logger = logging.getLogger(__name__)
def acquire_lock() -> bool:
try:
fd = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.write(fd, str(os.getpid()).encode())
os.close(fd)
return True
except FileExistsError:
return False
def release_lock() -> None:
try:
LOCK_PATH.unlink()
except FileNotFoundError:
pass
if __name__ == "__main__":
if not acquire_lock():
logger.warning("Another instance is running — exiting")
sys.exit(0)
try:
run_daily_pipeline() # your job from Step 1
finally:
release_lock()
On Windows use msvcrt.locking or a named mutex instead of O_EXCL.
Failure Alerting (Email and Webhook)
# pip install (none — stdlib only; requests for webhook)
"""
Send an alert on job failure. Call send_email_alert() or send_webhook_alert()
inside your except block.
"""
import smtplib
import json
import urllib.request
import os
from email.message import EmailMessage
from pathlib import Path
SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.example.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
SMTP_USER = os.environ.get("SMTP_USER", "[email protected]")
SMTP_PASS = os.environ.get("SMTP_PASS", "")
ALERT_TO = os.environ.get("ALERT_TO", "[email protected]")
WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
def send_email_alert(subject: str, body: str) -> None:
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = SMTP_USER
msg["To"] = ALERT_TO
msg.set_content(body)
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as s:
s.starttls()
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg)
except Exception:
import logging
logging.getLogger(__name__).exception("Failed to send alert email")
def send_webhook_alert(text: str) -> None:
if not WEBHOOK_URL:
return
payload = json.dumps({"text": text}).encode()
req = urllib.request.Request(
WEBHOOK_URL,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10):
pass
except Exception:
import logging
logging.getLogger(__name__).exception("Failed to send webhook alert")
Store credentials in environment variables, never in source code. On GitHub Actions they go in repository Secrets; on a server use a .env file owned by the service account.
Idempotency
A job that can safely re-run without duplicating output is far easier to operate. Track processed files in a small SQLite ledger.
# pip install (none — stdlib only)
"""
Idempotency ledger: record each processed file by content hash.
Re-running the job skips files already in the ledger.
"""
import sqlite3
import hashlib
from pathlib import Path
LEDGER_PATH = Path("/data/pipeline_ledger.db")
def _get_db() -> sqlite3.Connection:
conn = sqlite3.connect(LEDGER_PATH)
conn.execute(
"CREATE TABLE IF NOT EXISTS processed "
"(file_hash TEXT PRIMARY KEY, path TEXT, processed_at TEXT)"
)
conn.commit()
return conn
def file_hash(path: Path) -> str:
h = hashlib.sha256()
h.update(path.read_bytes())
return h.hexdigest()
def already_processed(path: Path) -> bool:
fh = file_hash(path)
conn = _get_db()
row = conn.execute(
"SELECT 1 FROM processed WHERE file_hash = ?", (fh,)
).fetchone()
conn.close()
return row is not None
def mark_processed(path: Path) -> None:
from datetime import datetime, timezone
fh = file_hash(path)
conn = _get_db()
conn.execute(
"INSERT OR IGNORE INTO processed (file_hash, path, processed_at) VALUES (?, ?, ?)",
(fh, str(path), datetime.now(timezone.utc).isoformat()),
)
conn.commit()
conn.close()
Validation and Health Check
After the job runs, assert the output is sane before writing it to its final destination.
# pip install pandas openpyxl
"""
Validation step: check row count, required columns, and absence of all-null rows.
Raises ValueError on any anomaly so the caller can catch and alert.
"""
import pandas as pd
from pathlib import Path
def validate_output(path: Path, min_rows: int = 1) -> None:
df = pd.read_excel(path)
if len(df) < min_rows:
raise ValueError(f"Output has {len(df)} rows — expected at least {min_rows}")
required_cols = {"date", "amount", "_source"}
missing = required_cols - set(df.columns)
if missing:
raise ValueError(f"Missing columns: {missing}")
if df.isnull().all(axis=1).any():
raise ValueError("Output contains fully null rows")
The same validation pattern applies when Generating Reports from Pipeline Data — check the report's row count and spot-sample a few cells before emailing it.
Performance and Scale Notes
- Memory: if
INPUT_DIRholds hundreds of large PDFs, do not load all frames into a list. Write each parsed DataFrame to a staging Parquet file, then concatenate from disk at the end. - Chunking: for large CSVs fed into the pipeline, use
pd.read_csv(..., chunksize=10_000)and process each chunk inside the retry-wrapped function. - Parallelism:
concurrent.futures.ProcessPoolExecutorworks well for CPU-bound OCR batches; keep pool size toos.cpu_count() - 1to leave headroom for the OS. - GitHub Actions minutes: free tier gives 2,000 minutes/month. A 5-minute daily pipeline costs ~150 minutes/month — well within budget. Cache the
pipinstall step withactions/cacheto cut run time by ~80%. - schedule library drift: the
schedulelibrary accumulates small timing drift over days. For production use with strict timing, prefer cron or GitHub Actions, which are backed by system clocks.
Troubleshooting
| Symptom | Root cause | Fix |
|---|---|---|
ModuleNotFoundError in cron but not in terminal | Cron uses /usr/bin/python3, not your venv | Use the absolute path to .venv/bin/python in the crontab line |
| Script runs in terminal but not in Task Scheduler | "Start in" directory not set; relative paths resolve against C:\Windows\System32 | Set "Start in" to the script's directory; use Path(__file__).parent for relative paths |
FileNotFoundError for a dependency binary (e.g., Tesseract, Java) | Cron's PATH is /usr/bin:/bin — system-installed tools may be in /usr/local/bin | Add PATH=/usr/local/bin:/usr/bin:/bin at the top of the crontab, or use absolute paths in subprocess.run() |
| Lock file left behind after a crash | Previous run died before calling release_lock() | Add a staleness check: if the PID in the lock file is not in psutil.pids(), delete it and proceed |
| GitHub Actions job silently skips | Cron schedule is parsed in UTC; your intended local time differs | Convert your local time to UTC explicitly; add workflow_dispatch: for manual testing |
Complete Working Script
# pip install pdfplumber pandas openpyxl tenacity schedule
"""
daily_pipeline.py — self-contained document pipeline with scheduling, logging,
lock-file guard, retry, idempotency, validation, and failure alerting.
Usage:
python daily_pipeline.py # run once and exit
python daily_pipeline.py --daemon # run on internal schedule (06:30 daily)
python daily_pipeline.py --input /path # override input directory
"""
import argparse
import hashlib
import logging
import logging.handlers
import os
import sqlite3
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import pdfplumber
import pandas as pd
import schedule
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
# ── Config (override via CLI or environment) ──────────────────────────────────
INPUT_DIR = Path(os.environ.get("PIPELINE_INPUT_DIR", "/data/incoming"))
OUTPUT_DIR = Path(os.environ.get("PIPELINE_OUTPUT_DIR", "/data/reports"))
LOG_DIR = Path(os.environ.get("PIPELINE_LOG_DIR", "/var/log/doc-pipeline"))
LEDGER_PATH = Path(os.environ.get("PIPELINE_LEDGER", "/data/pipeline_ledger.db"))
LOCK_PATH = Path("/tmp/doc_pipeline.lock")
SMTP_HOST = os.environ.get("SMTP_HOST", "")
SMTP_USER = os.environ.get("SMTP_USER", "")
SMTP_PASS = os.environ.get("SMTP_PASS", "")
ALERT_TO = os.environ.get("ALERT_TO", "")
WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
# ── Logging setup ─────────────────────────────────────────────────────────────
def configure_logging() -> logging.Logger:
LOG_DIR.mkdir(parents=True, exist_ok=True)
handler = logging.handlers.RotatingFileHandler(
LOG_DIR / "pipeline.log",
maxBytes=5 * 1024 * 1024,
backupCount=7,
encoding="utf-8",
)
fmt = logging.Formatter(
"%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
handler.setFormatter(fmt)
stderr = logging.StreamHandler()
stderr.setLevel(logging.WARNING)
stderr.setFormatter(fmt)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(stderr)
return logging.getLogger("pipeline")
logger = configure_logging()
# ── Lock file ─────────────────────────────────────────────────────────────────
def acquire_lock() -> bool:
try:
fd = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.write(fd, str(os.getpid()).encode())
os.close(fd)
return True
except FileExistsError:
return False
def release_lock() -> None:
try:
LOCK_PATH.unlink()
except FileNotFoundError:
pass
# ── Idempotency ledger ────────────────────────────────────────────────────────
def _db() -> sqlite3.Connection:
conn = sqlite3.connect(LEDGER_PATH)
conn.execute(
"CREATE TABLE IF NOT EXISTS processed "
"(file_hash TEXT PRIMARY KEY, path TEXT, processed_at TEXT)"
)
conn.commit()
return conn
def _hash(path: Path) -> str:
return hashlib.sha256(path.read_bytes()).hexdigest()
def already_done(path: Path) -> bool:
fh = _hash(path)
conn = _db()
found = conn.execute(
"SELECT 1 FROM processed WHERE file_hash = ?", (fh,)
).fetchone()
conn.close()
return found is not None
def mark_done(path: Path) -> None:
conn = _db()
conn.execute(
"INSERT OR IGNORE INTO processed (file_hash, path, processed_at) VALUES (?, ?, ?)",
(_hash(path), str(path), datetime.now(timezone.utc).isoformat()),
)
conn.commit()
conn.close()
# ── Retry decorator ───────────────────────────────────────────────────────────
retry_io = retry(
reraise=True,
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type((IOError, TimeoutError)),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
# ── Alert helpers ─────────────────────────────────────────────────────────────
def _alert(subject: str, body: str) -> None:
import json
import smtplib
import urllib.request
from email.message import EmailMessage
if SMTP_HOST and ALERT_TO:
try:
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = SMTP_USER
msg["To"] = ALERT_TO
msg.set_content(body)
with smtplib.SMTP(SMTP_HOST, 587) as s:
s.starttls()
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg)
except Exception:
logger.exception("Email alert failed")
if WEBHOOK_URL:
payload = json.dumps({"text": f"*{subject}*\n{body}"}).encode()
req = urllib.request.Request(
WEBHOOK_URL,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10):
pass
except Exception:
logger.exception("Webhook alert failed")
# ── Core extraction ───────────────────────────────────────────────────────────
@retry_io
def _extract_pdf(pdf_path: Path) -> list[pd.DataFrame]:
frames = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
table = page.extract_table()
if table and len(table) > 1:
df = pd.DataFrame(table[1:], columns=table[0])
df["_source"] = pdf_path.name
frames.append(df)
return frames
# ── Main job ──────────────────────────────────────────────────────────────────
def run_job(input_dir: Path, output_dir: Path) -> None:
logger.info("Job started — scanning %s", input_dir)
pdf_files = [p for p in input_dir.glob("*.pdf") if not already_done(p)]
if not pdf_files:
logger.info("No new PDFs to process")
return
frames: list[pd.DataFrame] = []
for pdf_path in pdf_files:
try:
extracted = _extract_pdf(pdf_path)
frames.extend(extracted)
mark_done(pdf_path)
logger.info("Parsed %s — %d table(s)", pdf_path.name, len(extracted))
except Exception:
logger.exception("Skipping %s after retries exhausted", pdf_path.name)
if not frames:
logger.warning("No tables extracted from %d file(s)", len(pdf_files))
return
combined = pd.concat(frames, ignore_index=True)
output_dir.mkdir(parents=True, exist_ok=True)
today = datetime.now(timezone.utc).strftime("%Y%m%d")
out_path = output_dir / f"pipeline_{today}.xlsx"
combined.to_excel(out_path, index=False)
logger.info("Wrote %d rows to %s", len(combined), out_path)
def run_with_guard(input_dir: Path, output_dir: Path) -> None:
if not acquire_lock():
logger.warning("Another instance is running — skipping this run")
return
try:
run_job(input_dir, output_dir)
except Exception as exc:
logger.exception("Job failed with unhandled exception")
_alert(
subject="[doc-pipeline] Job FAILED",
body=f"Exception: {exc}\nSee {LOG_DIR / 'pipeline.log'} for details.",
)
sys.exit(1)
finally:
release_lock()
# ── Entry point ───────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="Document extraction pipeline")
parser.add_argument("--input", type=Path, default=INPUT_DIR)
parser.add_argument("--output", type=Path, default=OUTPUT_DIR)
parser.add_argument(
"--daemon",
action="store_true",
help="Run on internal schedule (06:30 daily) instead of once",
)
args = parser.parse_args()
if args.daemon:
logger.info("Daemon mode — scheduling daily at 06:30")
schedule.every().day.at("06:30").do(
run_with_guard, input_dir=args.input, output_dir=args.output
)
while True:
schedule.run_pending()
time.sleep(30)
else:
run_with_guard(input_dir=args.input, output_dir=args.output)
if __name__ == "__main__":
main()
Related
- Extracting PDF Data into pandas — the upstream step that feeds raw tables into the pipeline this guide schedules
- Generating Reports from Pipeline Data — downstream step: turn pipeline output into formatted Excel or PDF reports
- Automating Excel Report Generation — production scheduling and logging patterns for Excel-specific pipelines
- Automating PDF Extraction & Generation — covers the production-hardening section of the PDF extraction workflow
Part of Automating Document & Data Pipelines.