DEV Community

Ugur Aslim
Ugur Aslim

Posted on • Originally published at uguraslim.com

Anthropic Batch API for Asynchronous Multi-Tenant AI Processing: Cutting Claude Costs by 50% Without Sacrificing Feature Responsiveness

Anthropic Batch API for Asynchronous Multi-Tenant AI Processing: Cutting Claude Costs by 50% Without Sacrificing Feature Responsiveness

I'm going to be blunt: if you're running a SaaS that uses Claude, and you haven't implemented the Batch API yet, you're hemorrhaging money. Not metaphorically—literally 50% off the per-token cost for any request that doesn't need a response in the next 60 seconds.

At CitizenApp, we process thousands of document summaries, compliance classifications, and policy analyses daily. Our first instinct was synchronous: user uploads document → Claude responds in real-time → dashboard updates. It felt responsive. It also felt like burning cash.

Then we realized: 80% of those requests don't actually need synchronous responses. A user uploads a document for summarization? They're fine waiting 5–30 minutes. A tenant wants to bulk-classify 500 policies? That's explicitly an async job. The Batch API is built for this, and it's absurdly underutilized.

Here's how I wired it into CitizenApp—and why you should too.

Why Batch API Matters (Beyond the 50% Discount)

The 50% cost reduction is the headline, but the real win is architectural. Batching forces you to separate concerns:

  1. Synchronous requests (fast, expensive): Real-time chat, document Q&A, immediate feedback loops.
  2. Asynchronous requests (slow, cheap): Bulk operations, scheduled analyses, background enrichment.

This separation is healthy. You stop cramming everything through Claude's sync API and actually think about what needs to be urgent.

In CitizenApp's case, we reduced our monthly Claude bill from ~$8k to ~$4k while actually increasing feature coverage. The key was offloading 65% of our workload to batches.

Architecture: Queue → Batch → LISTEN/NOTIFY → React

Here's the flow:

User Request
    ↓
FastAPI Endpoint (Validate, Enqueue)
    ↓
PostgreSQL Queue Table
    ↓
Batch Processor (reads queue, submits to Anthropic)
    ↓
Anthropic Batch Job (runs in background)
    ↓
Webhook/Polling Handler (gets results)
    ↓
PostgreSQL LISTEN/NOTIFY
    ↓
WebSocket → React 19 Dashboard (real-time update)
Enter fullscreen mode Exit fullscreen mode

Let's build it.

Step 1: Data Model

# models.py
from sqlalchemy import Column, String, Integer, Text, DateTime, Enum, ForeignKey
from sqlalchemy.orm import declarative_base
from datetime import datetime
import enum

Base = declarative_base()

class AIJobStatus(str, enum.Enum):
    QUEUED = "queued"
    SUBMITTED = "submitted"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

class AIJob(Base):
    __tablename__ = "ai_jobs"

    id = Column(String, primary_key=True)
    tenant_id = Column(String, ForeignKey("tenants.id"), nullable=False, index=True)
    user_id = Column(String, nullable=False)
    job_type = Column(String, nullable=False)  # "summarize", "classify", etc.

    status = Column(Enum(AIJobStatus), default=AIJobStatus.QUEUED, index=True)

    input_data = Column(Text, nullable=False)  # JSON stringified
    result = Column(Text, nullable=True)  # Result from Claude

    batch_id = Column(String, nullable=True, index=True)  # Anthropic batch ID
    request_id = Column(String, nullable=True)  # Within batch

    created_at = Column(DateTime, default=datetime.utcnow, index=True)
    completed_at = Column(DateTime, nullable=True)

    error_message = Column(Text, nullable=True)
Enter fullscreen mode Exit fullscreen mode

Step 2: Queue Endpoint

# api/ai.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import uuid
import json
from sqlalchemy.orm import Session
from database import get_db
from models import AIJob, AIJobStatus

router = APIRouter()

class SummarizeRequest(BaseModel):
    document_text: str
    max_length: int = 500

@router.post("/api/ai/summarize")
async def queue_summarize(
    request: SummarizeRequest,
    db: Session = Depends(get_db),
    tenant_id: str = Depends(get_tenant_id),
    user_id: str = Depends(get_user_id),
):
    """
    Queue a document for summarization (async).
    Returns immediately with job ID.
    User receives result via WebSocket when batch completes.
    """

    job_id = str(uuid.uuid4())

    job = AIJob(
        id=job_id,
        tenant_id=tenant_id,
        user_id=user_id,
        job_type="summarize",
        input_data=json.dumps({
            "document_text": request.document_text,
            "max_length": request.max_length,
        }),
        status=AIJobStatus.QUEUED,
    )

    db.add(job)
    db.commit()

    return {
        "job_id": job_id,
        "status": "queued",
        "message": "Your request is queued. You'll receive results in 5-30 minutes.",
    }
Enter fullscreen mode Exit fullscreen mode

Step 3: Batch Processor (Background Task)

I prefer a separate worker process (via Celery or APScheduler) that runs periodically. Here's APScheduler for simplicity:


python
# workers/batch_processor.py
from anthropic import Anthropic
from sqlalchemy.orm import Session
from database import SessionLocal
from models import AIJob, AIJobStatus
import json
from datetime import datetime

client = Anthropic()

def process_batch_jobs():
    """
    Run every 5 minutes (via APScheduler).
    Collects queued jobs and submits to Anthropic Batch API.
    """
    db = SessionLocal()

    # Get all queued jobs (batch size: 10k requests per batch max)
    queued_jobs = db.query(AIJob).filter(
        AIJob.status == AIJobStatus.QUEUED
    ).limit(100).all()

    if not queued_jobs:
        db.close()
        return

    # Build batch request
    requests = []
    job_map = {}

    for job in queued_jobs:
        input_data = json.loads(job.input_data)

        if job.job_type == "summarize":
            message = f"Summarize the following document in {input_data['max_length']} words:\n\n{input_data['document_text']}"
        elif job.job_type == "classify":
            message = f"Classify this text into one of: {input_data['categories']}\n\nText: {input_data['text']}"
        else:
            continue

        request_id = job.id
        job_map[request_id] = job.id

        requests.append({
            "custom_id": request_id,
            "params": {
                "model": "claude-3-5-sonnet-20241022",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": message}],
            },
        })

    # Submit batch to Anthropic
    batch = client.beta.messages.batches.create(
        requests=requests,
        betas=["batch-2024-09-24"],
    )

    # Mark all jobs as submitted
    for job in queued_jobs:
        job.status = AIJobStatus.SUBMITTED
        job.batch_id = batch.id

    db.commit()
    print(f"Submitted batch {batch.id} with {len(requests)} requests")
    db.close()


def poll_batch_results():
    """
    Run every 30 seconds.
    Checks submitted batches for completion, stores results.
    """
    db = SessionLocal()

    # Get all submitted jobs
    submitted_jobs = db.query(AIJob).filter(
        AIJob.status == AIJobStatus.SUBMITTED
    ).all()

    batch_ids = set(job.batch_id for job in submitted_jobs)

    for batch_id in batch_ids:
        batch = client.beta.messages.batches.retrieve(batch_id, betas=["batch-2024-09-24"])

        if batch.processing_status == "in_progress":
            continue

        if batch.processing_status == "expired":
            # Mark jobs as failed
            for job in submitted_jobs:
                if job.batch_id == batch_id:
                    job.status = AIJobStatus.FAILED
                    job.error_message = "Batch expired"
            db.commit()
            continue

        # Batch complete — fetch results
        results = client.beta.messages.batches.results(batch_id, betas=["batch-2024-09-24"])

        for result in results:
            request_id = result.custom_id
            job = db.query(AIJob).filter(AIJob.id == request_id).first()

            if not job:
                continue

            if result.result.type == "succeeded":
                job.status = AIJobStatus.COMPLETED
                job.result = result.result.message.content[0].text
            elif result.result.type == "errored":
                job.status = AIJobStatus.FAILED
                job.error_message = result.result.error.message

            job.completed_at = datetime.utcnow()

        db.commit()
        # Notify connected clients (next step)
        notify_clients_batch_complete(batch_id)

    db.close()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)