DEV Community

Richard Quaicoe
Richard Quaicoe

Posted on

Managing Background Tasks in FastAPI: From Basic to Production-Ready Beyond Fire and Forget

Handling Background Tasks in FastAPI: From Basic to Production-Ready
When you build a web API, some work should not happen inside the request/response cycle. Sending a welcome email after signup, generating a PDF report, processing a webhook payload, resizing an uploaded image, none of these should make the user wait. They should be queued and handled after the response is already on the way.

FastAPI has a built-in answer for this. It works well for simple cases. But as your application grows, you will run into a set of limitations that the built-in system was never designed to solve. This article walks through both sides of that story.

Background tasks in FastAPI

FastAPI ships with a BackgroundTasks class from Starlette. You inject it into a route function and call add_task() with the function you want to run.

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def send_welcome_email(address: str) -> None:
    # Simulate sending an email
    print(f"Sending welcome email to {address}")

@app.post("/signup")
def signup(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_welcome_email, email)
    return {"message": "Signed up successfully"}
Enter fullscreen mode Exit fullscreen mode

That is all there is to it. When POST /signup is called, FastAPI sends the response immediately and then runs send_welcome_email in the background. No extra processes, no broker, no configuration.

You can stack multiple tasks in the same request:

@app.post("/signup")
def signup(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_welcome_email, email)
    background_tasks.add_task(create_audit_log, email, action="signup")
    return {"message": "Signed up successfully"}

Enter fullscreen mode Exit fullscreen mode

Both functions are called after the response, in the order they were added.

Async support

FastAPI handles async and sync task functions the same way. If your task is a coroutine it will be awaited:

import httpx

async def notify_slack(message: str) -> None:
    async with httpx.AsyncClient() as client:
        await client.post("https://hooks.slack.com/...", json={"text": message})

@app.post("/deploy")
async def deploy(background_tasks: BackgroundTasks):
    background_tasks.add_task(notify_slack, "Deployment started")
    return {"status": "deploying"}
Enter fullscreen mode Exit fullscreen mode

Dependency injection

BackgroundTasks participates in FastAPI's dependency system. You can inject it anywhere in the dependency tree, which makes it easy to push background work into service layers without threading it through every function signature manually:

from fastapi import Depends

class NotificationService:
    def __init__(self, bg: BackgroundTasks):
        self.bg = bg

    def queue_welcome(self, email: str) -> None:
        self.bg.add_task(send_welcome_email, email)

def get_notification_service(bg: BackgroundTasks) -> NotificationService:
    return NotificationService(bg)

@app.post("/signup")
def signup(
    email: str,
    notifications: NotificationService = Depends(get_notification_service),
):
    notifications.queue_welcome(email)
    return {"message": "Signed up successfully"}
Enter fullscreen mode Exit fullscreen mode

Where the built-in system stops

The BackgroundTasks model is intentionally simple. That simplicity is a feature when you are building something small. But it creates real problems in production.

No task visibility
Once add_task() is called, the task is a function pointer in a list. There is no ID, no status, no record that it ever existed. If a user asks whether their email went out, you have no answer. If an operation fails silently, you will not know unless you happen to be watching logs at that moment.

def send_welcome_email(address: str) -> None:
    raise Exception("SMTP server unavailable")

# This exception goes nowhere visible.
# The response already said "Signed up successfully".
# The user never gets their email and nobody knows.
Enter fullscreen mode Exit fullscreen mode

No retries
If the task function raises an exception, it fails and that is the end of it. There is no retry mechanism, no backoff, no way to tell FastAPI to try again in 5 seconds. You have to build all of that yourself if you need it.

def send_welcome_email(address: str) -> None:
    # If this fails on attempt 1, there is no attempt 2.
    smtp_client.send(address, subject="Welcome", body="...")
Enter fullscreen mode Exit fullscreen mode

For anything that touches an external service like email providers, payment APIs, notification webhooks, a single attempt is not enough. Networks fail. Third-party services go down. Without retries you are accepting silent data loss as a design choice.

No persistence
All task state lives in memory. If your application restarts, a redeploy, a crash, a container restart, every pending task that had not started yet is gone. There is no way to recover it.

For low-stakes tasks this might be acceptable. For tasks that represent work a user paid for, or that complete a critical business operation, it is not.

No priority
Every task is equal. A password reset email and a bulk analytics export sit in the same queue and execute in arrival order. There is no way to say "this task is urgent, run it first."

No scheduled tasks
The built-in BackgroundTasks only runs tasks in response to a request. There is no way to run a function every 10 minutes, or at midnight every day, without adding a separate scheduler library and wiring it up yourself.

What production applications actually need

When you run background tasks at any meaningful scale, you need:

  • A unique ID for every task so you can query its status
  • Automatic retries with configurable delay and backoff
  • Persistence so tasks survive restarts
  • Priority so urgent work does not wait behind bulk work
  • Scheduled tasks on intervals or cron expressions
  • A dashboard to see what is running, what failed, and why
  • Concurrency limits so you do not overwhelm downstream services

Building all of this from scratch on top of BackgroundTasks is a significant amount of work, and most of it is not specific to your application.

fastapi-taskflow

fastapi-taskflow is a background task library built specifically for FastAPI that addresses each of these gaps without requiring a message broker or a separate worker process. Everything runs in your existing FastAPI process. The only optional dependency is a database for persistence.

Installation
pip install fastapi-taskflow

Setup

from fastapi import FastAPI
from fastapi_taskflow import TaskAdmin, TaskManager

task_manager = TaskManager(snapshot_db="tasks.db")
app = FastAPI()

TaskAdmin(app, task_manager)
Enter fullscreen mode Exit fullscreen mode

TaskAdmin mounts the task API routes and dashboard, and wires up the startup and shutdown lifecycle handlers. snapshot_db tells it to persist completed tasks to a SQLite file so they survive restarts.

Defining tasks
Tasks are registered with a decorator that carries execution configuration:

@task_manager.task(retries=3, delay=1.0, backoff=2.0)
def send_welcome_email(address: str) -> None:
    smtp_client.send(address, subject="Welcome", body="...")
Enter fullscreen mode Exit fullscreen mode

retries=3 means up to 3 additional attempts after the first failure. delay=1.0 is the wait in seconds before the first retry. backoff=2.0 doubles the wait on each subsequent retry: 1s, 2s, 4s. If all attempts fail the task is marked failed and stays in history.

Async functions work exactly the same way:

@task_manager.task(retries=2, delay=0.5)
async def process_webhook(payload: dict) -> None:
    async with httpx.AsyncClient() as client:
        await client.post("/internal/webhook", json=payload)
Enter fullscreen mode Exit fullscreen mode

Using tasks in routes
The route signature does not change from standard FastAPI. add_task() now returns a UUID you can use to track the task:

from fastapi import BackgroundTasks

@app.post("/signup")
def signup(email: str, background_tasks: BackgroundTasks):
    task_id = background_tasks.add_task(send_welcome_email, address=email)
    return {"task_id": task_id}
Enter fullscreen mode Exit fullscreen mode

If you prefer explicit dependency injection:

from fastapi import Depends

@app.post("/signup")
def signup(email: str, tasks=Depends(task_manager.get_tasks)):
    task_id = tasks.add_task(send_welcome_email, address=email)
    return {"task_id": task_id}
Enter fullscreen mode Exit fullscreen mode

Task visibility
Every task gets a record with a status that moves through pending, running, success, or failed. You can query it directly from the store:

@app.get("/tasks/{task_id}/status")
def task_status(task_id: str):
    record = task_manager.store.get(task_id)
    if record is None:
        raise HTTPException(status_code=404)
    return {"status": record.status.value, "error": record.error}
Enter fullscreen mode Exit fullscreen mode

Or use the built-in API endpoint that TaskAdmin mounts automatically:

curl http://localhost:8000/tasks/abc-123

{
"task_id": "abc-123",
"func_name": "send_welcome_email",
"status": "success",
"created_at": "2024-01-15T10:30:00Z",
"started_at": "2024-01-15T10:30:00.1Z",
"ended_at": "2024-01-15T10:30:00.3Z",
"duration": 0.2,
"retries_used": 0
}

Priority
When a task needs to run before others in the queue, set a priority:

@task_manager.task(retries=2, priority=9)
def send_otp(phone: str) -> None:
    sms_client.send(phone, "Your OTP is 847291")

@task_manager.task(retries=1, priority=1)
def generate_monthly_report(month: int) -> None:
    ...
Enter fullscreen mode Exit fullscreen mode

Priority 9 tasks run before priority 1 tasks regardless of arrival order. Equal-priority tasks are FIFO. The conventional range is 1 to 10 but any integer is accepted.

You can also set priority per call, overriding the decorator default:

task_id = tasks.add_task(generate_monthly_report, month=3, priority=8)
Enter fullscreen mode Exit fullscreen mode

Scheduled tasks
Register a function to run on a fixed interval or cron expression:

@task_manager.schedule(every=300)  # every 5 minutes
async def sync_exchange_rates() -> None:
    rates = await fetch_rates()
    await db.update_rates(rates)

@task_manager.schedule(cron="0 9 * * 1-5", timezone="America/New_York")
def send_daily_digest() -> None:
    send_digest_email_to_all_subscribers()
Enter fullscreen mode Exit fullscreen mode

Cron support requires pip install "fastapi-taskflow[scheduler]". Interval schedules have no extra dependencies.

In a multi-instance deployment, a distributed lock ensures only one instance fires each schedule even when multiple servers are running the same code.

Concurrency limits
Cap how many tasks run at the same time to protect downstream services:

task_manager = TaskManager(
    snapshot_db="tasks.db",
    max_concurrency=10,
)
Enter fullscreen mode Exit fullscreen mode

Tasks beyond the limit wait in queue. They do not pile up on the event loop or overwhelm connection pools.

Persistence and requeue
By default, completed tasks are flushed to SQLite periodically and loaded back on startup so history survives restarts. If you also want pending tasks to survive a restart:

task_manager = TaskManager(
    snapshot_db="tasks.db",
    requeue_pending=True,
)
Enter fullscreen mode Exit fullscreen mode

Tasks that were pending when the process shut down are re-dispatched on the next startup. For tasks that were mid-execution, use requeue_on_interrupt=True on the decorator, but only if the function is safe to run from scratch:

@task_manager.task(retries=1, requeue_on_interrupt=True)
def sync_user_profile(user_id: int) -> None:
    ...
Enter fullscreen mode Exit fullscreen mode

The dashboard

TaskAdmin mounts a live dashboard at /tasks/dashboard. It shows every task, its status, duration, retry count, and error message. It updates over a server-sent events stream so you see running tasks transition to success or failed in real time without refreshing the page.

The Dead Letters tab shows all failed tasks. From there you can select individual tasks and replay them, or pick a time window (last hour, last 24 hours, last 7 days) and replay everything in that window at once. A confirmation modal shows exactly what will be dispatched before anything runs.

Dashboard View

To protect the dashboard with a login:

TaskAdmin(app, task_manager, auth=("admin", "secret"))

Comparing the two approaches

Standard FastAPI

@app.post("/signup")
def signup(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_welcome_email, email)
    return {"message": "ok"}
Enter fullscreen mode Exit fullscreen mode

With fastapi-taskflow

@app.post("/signup")
def signup(email: str, background_tasks: BackgroundTasks):
    task_id = background_tasks.add_task(send_welcome_email, address=email)
    return {"task_id": task_id}
Enter fullscreen mode Exit fullscreen mode

The route code is nearly identical. The difference is what happens after add_task() is called.

Comparison Table

When to use each

The built-in BackgroundTasks is the right choice when the task is low-stakes, you do not need to track it, and losing it on a restart is acceptable. Logging a page view, firing off a non-critical analytics ping, clearing a short-lived cache entry.

fastapi-taskflow is the right choice when the task represents real work that has to complete. Sending a transactional email, processing a payment webhook, generating a user-requested report, syncing data to an external system. Any time a user or a business operation is depending on the result, you need visibility, retries, and persistence.

The good news is that switching from one to the other requires almost no changes to your existing routes. The decorator is optional. A plain function passed to add_task() is still tracked with a UUID and a default config. You can adopt it incrementally, one task at a time.

Source
fastapi-taskflow is open source: github.com/Attakay78/fastapi-taskflow

pip install fastapi-taskflow

Top comments (0)