DEV Community

German Yamil
German Yamil

Posted on

How to Structure a Python Project for Long-Running LLM Batch Jobs

LLM batch jobs are not like web requests. They run for hours, they call expensive APIs, and they fail partway through. If your project structure doesn't account for this from the start, you'll spend more time recovering from crashes than writing code. This article covers a directory layout, config validation, logging, graceful shutdown, and environment variable handling that hold up in production.

Directory Layout

Start with a layout that separates concerns clearly:

my_pipeline/
├── src/
│   ├── __init__.py
│   ├── pipeline.py       # main orchestration
│   ├── llm_client.py     # API calls
│   ├── parser.py         # output parsing
│   └── storage.py        # file I/O and checkpoints
├── checkpoints/          # progress state (gitignore this)
├── outputs/              # final generated files
│   ├── chapters/
│   └── covers/
├── logs/                 # rotating log files
├── config.py             # Config dataclass
├── main.py               # entrypoint
├── .env                  # secrets (never commit)
├── .env.example          # committed template
├── requirements.txt
└── .gitignore
Enter fullscreen mode Exit fullscreen mode

The checkpoints/ directory is the most important one. It stores progress state so a 3-hour job that crashes at chapter 18 of 25 can resume from chapter 18, not chapter 1.

Config Dataclass with Validation

Avoid scattered os.getenv() calls throughout your code. Centralise all config in one place and validate it at startup — fail fast, fail loudly.

# config.py
import os
from dataclasses import dataclass, field
from pathlib import Path


@dataclass
class PipelineConfig:
    # Paths
    output_dir: Path = field(default_factory=lambda: Path("outputs"))
    checkpoint_dir: Path = field(default_factory=lambda: Path("checkpoints"))
    log_dir: Path = field(default_factory=lambda: Path("logs"))

    # LLM settings
    api_key: str = field(default_factory=lambda: os.environ["ANTHROPIC_API_KEY"])
    model: str = "claude-opus-4-5"
    max_tokens: int = 4096
    temperature: float = 0.7

    # Batch settings
    batch_size: int = 5
    delay_between_calls: float = 2.0

    def __post_init__(self):
        self._validate()
        self._create_directories()

    def _validate(self):
        if not self.api_key:
            raise ValueError("ANTHROPIC_API_KEY is not set")
        if not 0.0 <= self.temperature <= 1.0:
            raise ValueError(f"temperature must be 0.0-1.0, got {self.temperature}")
        if self.max_tokens < 256:
            raise ValueError(f"max_tokens too low: {self.max_tokens}")
        if self.batch_size < 1:
            raise ValueError(f"batch_size must be >= 1, got {self.batch_size}")

    def _create_directories(self):
        for d in [self.output_dir, self.checkpoint_dir, self.log_dir]:
            d.mkdir(parents=True, exist_ok=True)


def load_config(**overrides) -> PipelineConfig:
    """Load config from environment, then apply any overrides."""
    return PipelineConfig(**overrides)
Enter fullscreen mode Exit fullscreen mode

Logging: Rotating File Handler + Console + JSON

Use Python's standard logging module with two handlers: a rotating file handler for persistent logs and a console handler for live feedback. For machine-readable logs (useful if you ever pipe to a log aggregator), add JSON formatting.

# src/logging_setup.py
import json
import logging
import sys
from logging.handlers import RotatingFileHandler
from pathlib import Path


class JSONFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        log_obj = {
            "time": self.formatTime(record, self.datefmt),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        if record.exc_info:
            log_obj["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_obj)


def setup_logging(log_dir: Path, level: int = logging.INFO) -> logging.Logger:
    logger = logging.getLogger("pipeline")
    logger.setLevel(level)
    logger.handlers.clear()

    # Rotating file handler: 10 MB per file, keep 5 backups
    log_file = log_dir / "pipeline.log"
    file_handler = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=5)
    file_handler.setFormatter(JSONFormatter())
    file_handler.setLevel(logging.DEBUG)

    # Console handler: human-readable
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(
        logging.Formatter("%(asctime)s  %(levelname)-8s  %(message)s", datefmt="%H:%M:%S")
    )
    console_handler.setLevel(logging.INFO)

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger
Enter fullscreen mode Exit fullscreen mode

Graceful Shutdown: Save Checkpoint Before Exiting

Long-running jobs must handle SIGINT (Ctrl+C) and SIGTERM (kill signal from a scheduler or container orchestrator) without losing progress. The trick is a shared flag that the main loop checks each iteration.

# src/shutdown.py
import signal
import logging

log = logging.getLogger("pipeline")

class GracefulShutdown:
    """Set shutdown.requested = True when SIGINT or SIGTERM is received."""

    def __init__(self):
        self.requested = False
        signal.signal(signal.SIGINT,  self._handler)
        signal.signal(signal.SIGTERM, self._handler)

    def _handler(self, signum, frame):
        sig_name = signal.Signals(signum).name
        log.warning(f"Signal {sig_name} received — finishing current item then stopping")
        self.requested = True
Enter fullscreen mode Exit fullscreen mode

Use it in your main loop:

# src/pipeline.py
import json
from pathlib import Path

def run(items: list, config, logger, shutdown) -> None:
    checkpoint_file = config.checkpoint_dir / "progress.json"

    # Load existing checkpoint
    completed = set()
    if checkpoint_file.exists():
        state = json.loads(checkpoint_file.read_text())
        completed = set(state.get("completed", []))
        logger.info(f"Resuming: {len(completed)}/{len(items)} already done")

    try:
        for item in items:
            if shutdown.requested:
                logger.info("Shutdown requested — stopping loop")
                break

            if item["id"] in completed:
                logger.info(f"Skipping {item['id']} (already done)")
                continue

            process_item(item, config, logger)
            completed.add(item["id"])

            # Save checkpoint after every item
            checkpoint_file.write_text(json.dumps({"completed": list(completed)}))

    finally:
        logger.info(f"Checkpoint saved: {len(completed)}/{len(items)} complete")
Enter fullscreen mode Exit fullscreen mode

Environment Variable Loading with python-dotenv

# main.py
from dotenv import load_dotenv

load_dotenv()  # loads .env before anything else imports os.environ

from config import load_config
from src.logging_setup import setup_logging
from src.shutdown import GracefulShutdown
from src.pipeline import run

def main():
    config = load_config()
    logger = setup_logging(config.log_dir)
    shutdown = GracefulShutdown()

    logger.info(f"Pipeline starting — model={config.model}, batch_size={config.batch_size}")

    items = load_work_items()  # your data source
    run(items, config, logger, shutdown)
    logger.info("Pipeline finished")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

The .env.example file should list every required variable with placeholder values:

# .env.example
ANTHROPIC_API_KEY=sk-ant-...
Enter fullscreen mode Exit fullscreen mode

Copy it to .env, fill in real values, and never commit .env. Add it to .gitignore.

Why This Structure Pays Off

When your 4-hour generation job crashes at chapter 22, you lose one in-progress item instead of all 25. When a teammate clones the repo, they get a ValueError at startup telling them exactly which env var is missing. When something goes wrong at 2am, the rotating JSON logs give you a timestamp-accurate audit trail.

This is unglamorous infrastructure work, but it's what separates a script from a pipeline.

Full pipeline + source code: germy5.gumroad.com/l/xhxkzz — $19.99, 30-day refund.

Top comments (0)