DEV Community

risha-max
risha-max

Posted on

Building a Document Processing Pipeline with 0xPdf and Python

Building a Document Processing Pipeline with 0xPdf and Python

Most document workflows start simple and become painful fast.

You parse a few PDFs by hand, maybe write a quick script, and everything seems fine -- until volume grows. New vendors appear, layouts change, and your extraction breaks in production. Suddenly you're spending more time fixing parsing logic than building product features.

In this guide, I'll show a practical, production-style pipeline for document processing with Python and 0xPdf:

  • Watch a folder for incoming PDFs
  • Parse files into structured JSON with 0xPdf
  • Store results in PostgreSQL
  • Send Slack notifications
  • Add retries and error handling
  • Scale to async processing for bigger workloads

This is the pattern I'd use for internal ops automation, AP/finance workflows, and document-heavy backend services.


Why automate document processing

If your team deals with invoices, forms, contracts, or reports, you probably face one or more of these issues:

  • Manual copy/paste into systems
  • Fragile regex-heavy parsers
  • OCR output that still needs custom post-processing
  • No visibility when parsing fails

Automation solves this by giving you:

  • Structured JSON output that downstream systems can use immediately
  • Faster throughput with fewer human errors
  • Reliable retries, logging, and monitoring hooks
  • A scalable path from "small script" to "production workflow"

Architecture overview

The pipeline flow is straightforward:

  1. A PDF lands in an input folder (./inbox)
  2. A file watcher detects the new file
  3. The parser sends it to 0xPdf
  4. Parsed output is written to PostgreSQL
  5. Slack receives success/failure notifications
  6. Retry logic handles transient API/network failures

Conceptually:

Incoming PDFs -> Watcher -> 0xPdf Parse -> PostgreSQL

-> Slack Notification

-> Retry / Error Handling


Setup

Create a virtual environment and install dependencies:

python -m venv .venv
source .venv/bin/activate
pip install requests watchdog sqlalchemy psycopg2-binary tenacity slack_sdk python-dotenv
Enter fullscreen mode Exit fullscreen mode

Create your .env file:

OXPDF_API_KEY=your_api_key
OXPDF_BASE_URL=https://api.0xpdf.io/api/v1
DATABASE_URL=postgresql://user:pass@localhost:5432/documents
SLACK_BOT_TOKEN=xoxb-...
SLACK_CHANNEL_ID=C1234567890
Enter fullscreen mode Exit fullscreen mode

Code walkthrough

1) File watcher (watchdog)

The watcher listens to a folder and triggers processing when a PDF appears.

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
import time

INBOX_DIR = Path("./inbox")
INBOX_DIR.mkdir(exist_ok=True)

class PDFHandler(FileSystemEventHandler):
    def __init__(self, on_pdf):
        self.on_pdf = on_pdf

    def on_created(self, event):
        if event.is_directory:
            return
        path = Path(event.src_path)
        if path.suffix.lower() == ".pdf":
            # small delay if copy/write is still in progress
            time.sleep(0.5)
            self.on_pdf(path)

def start_watcher(on_pdf):
    observer = Observer()
    observer.schedule(PDFHandler(on_pdf), str(INBOX_DIR), recursive=False)
    observer.start()
    return observer
Enter fullscreen mode Exit fullscreen mode

2) Parse with 0xPdf

Call the parse endpoint with your API key and a schema template.

import requests
import os

OXPDF_API_KEY = os.getenv("OXPDF_API_KEY")
OXPDF_BASE_URL = os.getenv("OXPDF_BASE_URL", "https://api.0xpdf.io/api/v1")

def parse_pdf(pdf_path: str, schema_template: str = "invoice") -> dict:
    url = f"{OXPDF_BASE_URL}/pdf/parse"
    headers = {"X-API-Key": OXPDF_API_KEY}
    params = {"schema_template": schema_template}

    with open(pdf_path, "rb") as f:
        files = {"file": (os.path.basename(pdf_path), f, "application/pdf")}
        response = requests.post(
            url,
            headers=headers,
            params=params,
            files=files,
            timeout=120
        )
        response.raise_for_status()
        return response.json()
Enter fullscreen mode Exit fullscreen mode

3) Store results in PostgreSQL

Persist both successful output and failure metadata for observability.

from sqlalchemy import create_engine, Column, Integer, String, JSON, DateTime, func
from sqlalchemy.orm import declarative_base, sessionmaker
import os

DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()

class ParsedDocument(Base):
    __tablename__ = "parsed_documents"

    id = Column(Integer, primary_key=True)
    filename = Column(String, nullable=False)
    status = Column(String, nullable=False)  # success | failed
    schema_template = Column(String, nullable=False)
    payload = Column(JSON, nullable=True)
    error_message = Column(String, nullable=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

Base.metadata.create_all(bind=engine)

def save_result(filename: str, schema_template: str, status: str, payload=None, error_message=None):
    db = SessionLocal()
    try:
        row = ParsedDocument(
            filename=filename,
            schema_template=schema_template,
            status=status,
            payload=payload,
            error_message=error_message
        )
        db.add(row)
        db.commit()
    finally:
        db.close()
Enter fullscreen mode Exit fullscreen mode

4) Send Slack notification

Slack helps your team see pipeline health in real time.

from slack_sdk import WebClient
import os

slack = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
SLACK_CHANNEL_ID = os.getenv("SLACK_CHANNEL_ID")

def notify_slack(message: str):
    slack.chat_postMessage(channel=SLACK_CHANNEL_ID, text=message)
Enter fullscreen mode Exit fullscreen mode

5) Handle errors and retries

Retries protect you from transient failures (network timeout, temporary upstream issues, etc.).

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
    retry=retry_if_exception_type((requests.Timeout, requests.ConnectionError, requests.HTTPError))
)
def parse_with_retry(pdf_path: str, schema_template: str = "invoice") -> dict:
    return parse_pdf(pdf_path, schema_template)
Enter fullscreen mode Exit fullscreen mode

End-to-end processing function

Tie everything together in one worker function:

def process_pdf(path):
    filename = path.name
    schema_template = "invoice"

    try:
        result = parse_with_retry(str(path), schema_template=schema_template)
        save_result(filename, schema_template, "success", payload=result)
        notify_slack(f"Parsed `{filename}` successfully")
    except Exception as e:
        save_result(filename, schema_template, "failed", error_message=str(e))
        notify_slack(f"Failed parsing `{filename}`: {e}")
Enter fullscreen mode Exit fullscreen mode

And run the watcher:

if __name__ == "__main__":
    observer = start_watcher(process_pdf)
    print("Watching ./inbox for new PDFs...")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
        observer.join()
Enter fullscreen mode Exit fullscreen mode

Drop a PDF into ./inbox, and it processes automatically.


Scaling with async mode (large batches)

For larger workloads, move from direct synchronous processing to an async job pipeline:

  • Queue jobs with Celery/RQ/worker processes
  • Process many PDFs in parallel
  • Use async/polling workflow where appropriate
  • Batch writes to reduce DB overhead
  • Send summary notifications instead of one Slack message per file

Recommended production upgrades:

  • Idempotency keys (avoid duplicate processing)
  • Dead-letter queue for repeated failures
  • Structured logging and alerting
  • Backpressure controls + rate limiting

Final thoughts

A strong document pipeline is not just OCR -- it's reliability, recoverability, and clean structured output.

0xPdf helps by turning PDFs into structured JSON directly, so you can focus on building business workflows instead of writing brittle extraction logic for every new layout.

If you want, I can publish a full GitHub repo version of this exact pipeline (Docker Compose, migrations, and a ready-to-run starter).


Tags: python automation pdf api postgresql backend

Top comments (0)