Building a Document Processing Pipeline with 0xPdf and Python
Most document workflows start simple and become painful fast.
You parse a few PDFs by hand, maybe write a quick script, and everything seems fine -- until volume grows. New vendors appear, layouts change, and your extraction breaks in production. Suddenly you're spending more time fixing parsing logic than building product features.
In this guide, I'll show a practical, production-style pipeline for document processing with Python and 0xPdf:
- Watch a folder for incoming PDFs
- Parse files into structured JSON with 0xPdf
- Store results in PostgreSQL
- Send Slack notifications
- Add retries and error handling
- Scale to async processing for bigger workloads
This is the pattern I'd use for internal ops automation, AP/finance workflows, and document-heavy backend services.
Why automate document processing
If your team deals with invoices, forms, contracts, or reports, you probably face one or more of these issues:
- Manual copy/paste into systems
- Fragile regex-heavy parsers
- OCR output that still needs custom post-processing
- No visibility when parsing fails
Automation solves this by giving you:
- Structured JSON output that downstream systems can use immediately
- Faster throughput with fewer human errors
- Reliable retries, logging, and monitoring hooks
- A scalable path from "small script" to "production workflow"
Architecture overview
The pipeline flow is straightforward:
- A PDF lands in an input folder (
./inbox) - A file watcher detects the new file
- The parser sends it to 0xPdf
- Parsed output is written to PostgreSQL
- Slack receives success/failure notifications
- Retry logic handles transient API/network failures
Conceptually:
Incoming PDFs -> Watcher -> 0xPdf Parse -> PostgreSQL
-> Slack Notification
-> Retry / Error Handling
Setup
Create a virtual environment and install dependencies:
python -m venv .venv
source .venv/bin/activate
pip install requests watchdog sqlalchemy psycopg2-binary tenacity slack_sdk python-dotenv
Create your .env file:
OXPDF_API_KEY=your_api_key
OXPDF_BASE_URL=https://api.0xpdf.io/api/v1
DATABASE_URL=postgresql://user:pass@localhost:5432/documents
SLACK_BOT_TOKEN=xoxb-...
SLACK_CHANNEL_ID=C1234567890
Code walkthrough
1) File watcher (watchdog)
The watcher listens to a folder and triggers processing when a PDF appears.
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
import time
INBOX_DIR = Path("./inbox")
INBOX_DIR.mkdir(exist_ok=True)
class PDFHandler(FileSystemEventHandler):
def __init__(self, on_pdf):
self.on_pdf = on_pdf
def on_created(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() == ".pdf":
# small delay if copy/write is still in progress
time.sleep(0.5)
self.on_pdf(path)
def start_watcher(on_pdf):
observer = Observer()
observer.schedule(PDFHandler(on_pdf), str(INBOX_DIR), recursive=False)
observer.start()
return observer
2) Parse with 0xPdf
Call the parse endpoint with your API key and a schema template.
import requests
import os
OXPDF_API_KEY = os.getenv("OXPDF_API_KEY")
OXPDF_BASE_URL = os.getenv("OXPDF_BASE_URL", "https://api.0xpdf.io/api/v1")
def parse_pdf(pdf_path: str, schema_template: str = "invoice") -> dict:
url = f"{OXPDF_BASE_URL}/pdf/parse"
headers = {"X-API-Key": OXPDF_API_KEY}
params = {"schema_template": schema_template}
with open(pdf_path, "rb") as f:
files = {"file": (os.path.basename(pdf_path), f, "application/pdf")}
response = requests.post(
url,
headers=headers,
params=params,
files=files,
timeout=120
)
response.raise_for_status()
return response.json()
3) Store results in PostgreSQL
Persist both successful output and failure metadata for observability.
from sqlalchemy import create_engine, Column, Integer, String, JSON, DateTime, func
from sqlalchemy.orm import declarative_base, sessionmaker
import os
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
class ParsedDocument(Base):
__tablename__ = "parsed_documents"
id = Column(Integer, primary_key=True)
filename = Column(String, nullable=False)
status = Column(String, nullable=False) # success | failed
schema_template = Column(String, nullable=False)
payload = Column(JSON, nullable=True)
error_message = Column(String, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
Base.metadata.create_all(bind=engine)
def save_result(filename: str, schema_template: str, status: str, payload=None, error_message=None):
db = SessionLocal()
try:
row = ParsedDocument(
filename=filename,
schema_template=schema_template,
status=status,
payload=payload,
error_message=error_message
)
db.add(row)
db.commit()
finally:
db.close()
4) Send Slack notification
Slack helps your team see pipeline health in real time.
from slack_sdk import WebClient
import os
slack = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
SLACK_CHANNEL_ID = os.getenv("SLACK_CHANNEL_ID")
def notify_slack(message: str):
slack.chat_postMessage(channel=SLACK_CHANNEL_ID, text=message)
5) Handle errors and retries
Retries protect you from transient failures (network timeout, temporary upstream issues, etc.).
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((requests.Timeout, requests.ConnectionError, requests.HTTPError))
)
def parse_with_retry(pdf_path: str, schema_template: str = "invoice") -> dict:
return parse_pdf(pdf_path, schema_template)
End-to-end processing function
Tie everything together in one worker function:
def process_pdf(path):
filename = path.name
schema_template = "invoice"
try:
result = parse_with_retry(str(path), schema_template=schema_template)
save_result(filename, schema_template, "success", payload=result)
notify_slack(f"Parsed `{filename}` successfully")
except Exception as e:
save_result(filename, schema_template, "failed", error_message=str(e))
notify_slack(f"Failed parsing `{filename}`: {e}")
And run the watcher:
if __name__ == "__main__":
observer = start_watcher(process_pdf)
print("Watching ./inbox for new PDFs...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
Drop a PDF into ./inbox, and it processes automatically.
Scaling with async mode (large batches)
For larger workloads, move from direct synchronous processing to an async job pipeline:
- Queue jobs with Celery/RQ/worker processes
- Process many PDFs in parallel
- Use async/polling workflow where appropriate
- Batch writes to reduce DB overhead
- Send summary notifications instead of one Slack message per file
Recommended production upgrades:
- Idempotency keys (avoid duplicate processing)
- Dead-letter queue for repeated failures
- Structured logging and alerting
- Backpressure controls + rate limiting
Final thoughts
A strong document pipeline is not just OCR -- it's reliability, recoverability, and clean structured output.
0xPdf helps by turning PDFs into structured JSON directly, so you can focus on building business workflows instead of writing brittle extraction logic for every new layout.
If you want, I can publish a full GitHub repo version of this exact pipeline (Docker Compose, migrations, and a ready-to-run starter).
Tags: python automation pdf api postgresql backend
Top comments (0)