DEV Community

Ugur Aslim
Ugur Aslim

Posted on • Originally published at uguraslim.com

FastAPI Background Tasks vs. Celery for AI Feature Processing: When to Queue and When to Fire-and-Forget

FastAPI Background Tasks vs. Celery for AI Feature Processing: When to Queue and When to Fire-and-Forget

I've shipped AI feature processing in CitizenApp three different ways: BackgroundTasks, Celery + Redis, and naked async tasks. Each burned me in production. This post maps the exact failure modes that determine which pattern wins for your workload.

The tldr: BackgroundTasks is perfect for sub-5-second tasks in low-concurrency environments. Celery is overkill until you hit 50+ concurrent AI inferences or need horizontal scaling. Async tasks are the dangerous middle ground—they feel right until your process crashes and loses work.

Why This Matters for Multi-Tenant AI

When you're processing Claude API calls across 200 tenants, a single failure mode cascades differently depending on your queue strategy:

  • BackgroundTasks dies with the process → tenant's AI feature blocks or silently fails
  • Async tasks die if you don't handle graceful shutdown → you lose inference requests mid-processing
  • Celery with Redis persists work → AI features eventually complete, even after deploys

I learned this when pushing CitizenApp to production. We had 12 concurrent AI document analyses running when I deployed a code change. BackgroundTasks lost 3 of them. The tenants got a generic error and no way to retry.

The Patterns

Pattern 1: FastAPI BackgroundTasks (Fire-and-Forget Lite)

This is what I use for synchronous side effects: logging, webhook retries, non-critical notifications.

from fastapi import BackgroundTasks, FastAPI
from sqlalchemy.orm import Session

app = FastAPI()

def log_ai_feature_usage(tenant_id: str, feature_name: str, db: Session):
    """Non-blocking usage logging—fires after response returns."""
    try:
        usage = FeatureUsage(
            tenant_id=tenant_id,
            feature_name=feature_name,
            timestamp=datetime.utcnow(),
        )
        db.add(usage)
        db.commit()
    except Exception as e:
        # This fails silently in BackgroundTasks. Genuinely doesn't matter here.
        logger.warning(f"Failed to log usage: {e}")

@app.post("/analyze")
async def analyze_document(
    tenant_id: str,
    document: str,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
):
    # Return immediately while logging happens in background
    background_tasks.add_task(log_ai_feature_usage, tenant_id, "document_analysis", db)

    return {"status": "queued", "document_id": "..."}
Enter fullscreen mode Exit fullscreen mode

Why I use this: Synchronous, built-in, zero dependencies. No Redis. No Celery workers. Perfect for:

  • Analytics logging
  • Webhook retries (with exponential backoff in the task)
  • Cleanup jobs under 5 seconds
  • Non-critical email sends

Where it breaks:

  • Task dies if the worker process crashes → no retry mechanism
  • No visibility into failures (unless you instrument logging)
  • Blocks deployment until all tasks complete (FastAPI waits for BackgroundTasks on shutdown)
  • No tenant isolation by default—tasks run in shared worker pool

The last one bit me. A tenant's usage logging task had a database constraint violation, and FastAPI's default behavior logged it to stdout but didn't surface it to the tenant. They lost audit data and blamed our system.

Pattern 2: Async Tasks (The Trap)

Here's the seductive pattern that burned me hardest:

import asyncio
from datetime import datetime

async def process_ai_inference(tenant_id: str, input_text: str, db: Session):
    """Process Claude API call asynchronously."""
    try:
        # Call Claude
        response = await anthropic_client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{"role": "user", "content": input_text}],
        )

        # Store result
        inference = AIInference(
            tenant_id=tenant_id,
            input=input_text,
            output=response.content[0].text,
            status="completed",
            completed_at=datetime.utcnow(),
        )
        db.add(inference)
        db.commit()
    except Exception as e:
        logger.error(f"Inference failed for tenant {tenant_id}: {e}")
        # Try to mark as failed, but if DB is down, this silently fails
        inference = AIInference(
            tenant_id=tenant_id,
            status="failed",
            error=str(e),
        )
        db.add(inference)
        db.commit()

@app.post("/infer")
async def infer(tenant_id: str, input_text: str, db: Session = Depends(get_db)):
    # Fire task without waiting
    asyncio.create_task(process_ai_inference(tenant_id, input_text, db))
    return {"status": "processing"}
Enter fullscreen mode Exit fullscreen mode

Why this feels right: One-liner to spawn work, no external dependencies, uses Python's native async/await.

Why it's a trap:

  • Tasks are lost on process crash—no persistence
  • Graceful shutdown is tricky (need asyncio.gather() on app.on_event("shutdown"))
  • Concurrent limit is your server's event loop, not explicit
  • Tenant isolation requires manual task tracking
  • Zero visibility into which tenants have stuck tasks

This is what we used in CitizenApp v1. We'd deploy, and during the 30-second shutdown, any in-flight Claude API calls would get killed mid-response. We'd log them as failed, but the tenant's UI would timeout waiting for a response that was already discarded.

Pattern 3: Celery + Redis (Industrial Strength)

This is what I use now for anything that:

  • Takes >5 seconds
  • Must survive a deploy
  • Needs per-tenant concurrency limits
  • Touches expensive APIs (Claude, Stripe)
# tasks.py
from celery import Celery, Task
from kombu import Queue

app = Celery("citiezenapp")
app.conf.update(
    broker_url="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
    task_serializer="json",
    accept_content=["json"],
    result_expires=3600,
)

# Per-tenant queue routing
app.conf.task_queues = (
    Queue("default", routing_key="default"),
    Queue("ai_inference", routing_key="ai.inference"),
)

class CallbackTask(Task):
    """Track task state in DB for multi-tenant visibility."""
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        tenant_id = kwargs.get("tenant_id")
        logger.error(f"Task {task_id} failed for tenant {tenant_id}: {exc}")
        # Update tenant's failure count, trigger alert

@app.task(base=CallbackTask, bind=True, max_retries=3)
def process_ai_inference(
    self,
    tenant_id: str,
    inference_id: str,
    input_text: str,
):
    """
    Process Claude inference with automatic retries.
    Lives in Redis until completion.
    """
    try:
        db = next(get_db())
        inference = db.query(AIInference).filter_by(id=inference_id).first()

        if not inference:
            return {"error": "Inference not found"}

        # Check tenant rate limits (per-tenant concurrency)
        tenant_config = db.query(TenantAIConfig).filter_by(
            tenant_id=tenant_id
        ).first()

        if tenant_config.concurrent_inferences >= tenant_config.max_concurrent:
            # Retry after 10 seconds
            raise self.retry(countdown=10)

        inference.status = "processing"
        db.commit()

        response = anthropic_client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{"role": "user", "content": input_text}],
        )

        inference.output = response.content[0].text
        inference.status = "completed"
        inference.completed_at = datetime.utcnow()
        db.commit()

        return {"inference_id": inference_id, "status": "completed"}

    except Exception as exc:
        # Exponential backoff: 60s, 120s, 240s
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)

# fastapi_app.py
@app.post("/infer")
async def infer(
    tenant_id: str,
    input_text: str,
    db: Session = Depends(get_db),
):
    inference = AIInference(
        tenant_id=tenant_id,
        input=input_text,
        status="queued",
    )
    db.add(inference)
    db.commit()

    # Queue task, returns immediately
    task = process_ai_inference.apply_async(
        kwargs={
            "tenant_id": tenant_id,
            "inference_id": inference.id,
            "input_text": input_text,
        },
        queue="ai_inference",
        priority=5,  # Per-tenant prioritization
    )

    inference.celery_task_id = task.id
    db.commit()

    return {"inference_id": inference.id, "task_id": task.id}
Enter fullscreen mode Exit fullscreen mode

Why I use Celery:

  • Tasks persist in Redis—survive deploys, crashes, power outages
  • Built-in retry logic with exponential backoff
  • Per-queue concurrency control (we have separate ai_inference queue with 10 workers)
  • Dead letter queue for failed tasks
  • Monitoring integration (Flower, DataDog)
  • Per-tenant rate limiting (shown above)

Cost: Running Redis + Celery workers adds ~$20

Top comments (0)