LLM batch jobs are not like web requests. They run for hours, they call expensive APIs, and they fail partway through. If your project structure doesn't account for this from the start, you'll spend more time recovering from crashes than writing code. This article covers a directory layout, config validation, logging, graceful shutdown, and environment variable handling that hold up in production.
Directory Layout
Start with a layout that separates concerns clearly:
my_pipeline/
├── src/
│ ├── __init__.py
│ ├── pipeline.py # main orchestration
│ ├── llm_client.py # API calls
│ ├── parser.py # output parsing
│ └── storage.py # file I/O and checkpoints
├── checkpoints/ # progress state (gitignore this)
├── outputs/ # final generated files
│ ├── chapters/
│ └── covers/
├── logs/ # rotating log files
├── config.py # Config dataclass
├── main.py # entrypoint
├── .env # secrets (never commit)
├── .env.example # committed template
├── requirements.txt
└── .gitignore
The checkpoints/ directory is the most important one. It stores progress state so a 3-hour job that crashes at chapter 18 of 25 can resume from chapter 18, not chapter 1.
Config Dataclass with Validation
Avoid scattered os.getenv() calls throughout your code. Centralise all config in one place and validate it at startup — fail fast, fail loudly.
# config.py
import os
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class PipelineConfig:
# Paths
output_dir: Path = field(default_factory=lambda: Path("outputs"))
checkpoint_dir: Path = field(default_factory=lambda: Path("checkpoints"))
log_dir: Path = field(default_factory=lambda: Path("logs"))
# LLM settings
api_key: str = field(default_factory=lambda: os.environ["ANTHROPIC_API_KEY"])
model: str = "claude-opus-4-5"
max_tokens: int = 4096
temperature: float = 0.7
# Batch settings
batch_size: int = 5
delay_between_calls: float = 2.0
def __post_init__(self):
self._validate()
self._create_directories()
def _validate(self):
if not self.api_key:
raise ValueError("ANTHROPIC_API_KEY is not set")
if not 0.0 <= self.temperature <= 1.0:
raise ValueError(f"temperature must be 0.0-1.0, got {self.temperature}")
if self.max_tokens < 256:
raise ValueError(f"max_tokens too low: {self.max_tokens}")
if self.batch_size < 1:
raise ValueError(f"batch_size must be >= 1, got {self.batch_size}")
def _create_directories(self):
for d in [self.output_dir, self.checkpoint_dir, self.log_dir]:
d.mkdir(parents=True, exist_ok=True)
def load_config(**overrides) -> PipelineConfig:
"""Load config from environment, then apply any overrides."""
return PipelineConfig(**overrides)
Logging: Rotating File Handler + Console + JSON
Use Python's standard logging module with two handlers: a rotating file handler for persistent logs and a console handler for live feedback. For machine-readable logs (useful if you ever pipe to a log aggregator), add JSON formatting.
# src/logging_setup.py
import json
import logging
import sys
from logging.handlers import RotatingFileHandler
from pathlib import Path
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
log_obj = {
"time": self.formatTime(record, self.datefmt),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
log_obj["exception"] = self.formatException(record.exc_info)
return json.dumps(log_obj)
def setup_logging(log_dir: Path, level: int = logging.INFO) -> logging.Logger:
logger = logging.getLogger("pipeline")
logger.setLevel(level)
logger.handlers.clear()
# Rotating file handler: 10 MB per file, keep 5 backups
log_file = log_dir / "pipeline.log"
file_handler = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=5)
file_handler.setFormatter(JSONFormatter())
file_handler.setLevel(logging.DEBUG)
# Console handler: human-readable
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(
logging.Formatter("%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S")
)
console_handler.setLevel(logging.INFO)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
Graceful Shutdown: Save Checkpoint Before Exiting
Long-running jobs must handle SIGINT (Ctrl+C) and SIGTERM (kill signal from a scheduler or container orchestrator) without losing progress. The trick is a shared flag that the main loop checks each iteration.
# src/shutdown.py
import signal
import logging
log = logging.getLogger("pipeline")
class GracefulShutdown:
"""Set shutdown.requested = True when SIGINT or SIGTERM is received."""
def __init__(self):
self.requested = False
signal.signal(signal.SIGINT, self._handler)
signal.signal(signal.SIGTERM, self._handler)
def _handler(self, signum, frame):
sig_name = signal.Signals(signum).name
log.warning(f"Signal {sig_name} received — finishing current item then stopping")
self.requested = True
Use it in your main loop:
# src/pipeline.py
import json
from pathlib import Path
def run(items: list, config, logger, shutdown) -> None:
checkpoint_file = config.checkpoint_dir / "progress.json"
# Load existing checkpoint
completed = set()
if checkpoint_file.exists():
state = json.loads(checkpoint_file.read_text())
completed = set(state.get("completed", []))
logger.info(f"Resuming: {len(completed)}/{len(items)} already done")
try:
for item in items:
if shutdown.requested:
logger.info("Shutdown requested — stopping loop")
break
if item["id"] in completed:
logger.info(f"Skipping {item['id']} (already done)")
continue
process_item(item, config, logger)
completed.add(item["id"])
# Save checkpoint after every item
checkpoint_file.write_text(json.dumps({"completed": list(completed)}))
finally:
logger.info(f"Checkpoint saved: {len(completed)}/{len(items)} complete")
Environment Variable Loading with python-dotenv
# main.py
from dotenv import load_dotenv
load_dotenv() # loads .env before anything else imports os.environ
from config import load_config
from src.logging_setup import setup_logging
from src.shutdown import GracefulShutdown
from src.pipeline import run
def main():
config = load_config()
logger = setup_logging(config.log_dir)
shutdown = GracefulShutdown()
logger.info(f"Pipeline starting — model={config.model}, batch_size={config.batch_size}")
items = load_work_items() # your data source
run(items, config, logger, shutdown)
logger.info("Pipeline finished")
if __name__ == "__main__":
main()
The .env.example file should list every required variable with placeholder values:
# .env.example
ANTHROPIC_API_KEY=sk-ant-...
Copy it to .env, fill in real values, and never commit .env. Add it to .gitignore.
Why This Structure Pays Off
When your 4-hour generation job crashes at chapter 22, you lose one in-progress item instead of all 25. When a teammate clones the repo, they get a ValueError at startup telling them exactly which env var is missing. When something goes wrong at 2am, the rotating JSON logs give you a timestamp-accurate audit trail.
This is unglamorous infrastructure work, but it's what separates a script from a pipeline.
Full pipeline + source code: germy5.gumroad.com/l/xhxkzz — $19.99, 30-day refund.
Top comments (0)