DEV Community

Cover image for Real Multi-Tenancy with FastAPI and PostgreSQL: Plans, Quotas and Data Isolation
Martin Palopoli
Martin Palopoli

Posted on

Real Multi-Tenancy with FastAPI and PostgreSQL: Plans, Quotas and Data Isolation

I implemented real multi-tenancy in a RAG SaaS engine: tenants with plans, quota enforcement with atomic counters (UPSERT), Redis rate limiting (sliding window), RBAC with 3 roles, and data isolation where every query is filtered by tenant_id. This article covers the full architecture, the pitfalls I avoided, and the key code.


Why "Adding a tenant_id" Isn't Multi-Tenancy

Most multi-tenancy tutorials stop at "add a tenant_id column and filter every query". That's 10% of the problem. In production you'll need:

  • Plans with real limits that can't be bypassed with concurrent requests
  • Atomic counters that don't lose increments under load
  • Rate limiting that survives server restarts
  • Roles that restrict actions, not just visibility
  • Real isolation where a bug in one service doesn't expose another tenant's data
  • Billing cycles that reset correctly each month

This article shows how I solved each one in a real system with async FastAPI and PostgreSQL.


The Stack

Component Technology
Backend Python 3.12 + FastAPI (100% async)
Database PostgreSQL 16
ORM SQLAlchemy async + Alembic
Cache/Rate limit Redis 7
Auth JWT (access 30min, refresh 7d with rotation)
Passwords bcrypt (via passlib)

Database Design

The 4 Fundamental Tables

┌──────────────┐     ┌──────────────┐
│    plans     │     │   tenants    │
├──────────────┤     ├──────────────┤
│ id (UUID)    │◄────│ plan_id (FK) │
│ name         │     │ id (UUID)    │
│ slug         │     │ name         │
│ max_users    │     │ slug (unique)│
│ max_kbs      │     │ is_active    │
│ max_documents│     │ plan_started │
│ max_storage  │     │ created_at   │
│ max_messages │     └──────┬───────┘
│ max_tokens   │            │
│ max_api_keys │     ┌──────┴───────┐
│ price_monthly│     │    users     │
│ max_concurrent│    ├──────────────┤
└──────────────┘     │ id (UUID)    │
                     │ tenant_id(FK)│
                     │ email        │
                     │ role (enum)  │
                     │ is_active    │
                     └──────────────┘

┌───────────────────────┐
│ tenant_monthly_usage  │
├───────────────────────┤
│ id (UUID)             │
│ tenant_id (FK)        │
│ billing_period (str)  │
│ messages_count (int)  │
│ tokens_used (bigint)  │
│ UNIQUE(tenant_id,     │
│   billing_period)     │
└───────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Plan Model

Each plan defines all the limits the system will enforce:

class Plan(Base):
    __tablename__ = "plans"

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name: Mapped[str] = mapped_column(String(100), nullable=False)
    slug: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
    max_users: Mapped[int] = mapped_column(Integer, nullable=False, default=3)
    max_concurrent: Mapped[int] = mapped_column(Integer, nullable=False, default=2)
    max_kbs: Mapped[int] = mapped_column(Integer, nullable=False, default=3)
    max_documents: Mapped[int] = mapped_column(Integer, nullable=False, default=20)
    max_storage_mb: Mapped[int] = mapped_column(Integer, nullable=False, default=200)
    max_messages_month: Mapped[int] = mapped_column(Integer, nullable=False, default=500)
    max_tokens_month: Mapped[int] = mapped_column(BigInteger, nullable=False, default=200000)
    max_api_keys: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
    price_monthly: Mapped[float | None] = mapped_column(Numeric(10, 2), nullable=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
Enter fullscreen mode Exit fullscreen mode

Key decision: -1 means unlimited. Instead of having an is_unlimited boolean per resource, I use -1 as a convention. Simplifies all comparisons:

if plan.max_users == -1:
    return  # Unlimited, skip check
Enter fullscreen mode Exit fullscreen mode

Tenant Model

class Tenant(Base):
    __tablename__ = "tenants"

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    plan_id: Mapped[uuid.UUID | None] = mapped_column(
        UUID(as_uuid=True), ForeignKey("plans.id"), nullable=True
    )
    plan_started_at: Mapped[datetime | None] = mapped_column(
        DateTime(timezone=True), nullable=True, default=None
    )
    notification_preferences: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
Enter fullscreen mode Exit fullscreen mode

plan_started_at is crucial: it defines the billing cycle anchor. When a tenant upgrades from Free to a paid plan, it resets to the payment timestamp. This determines when monthly counters reset.

User Model

class UserRole(str, PyEnum):
    SUPER_ADMIN = "super_admin"
    TENANT_ADMIN = "tenant_admin"
    MEMBER = "member"


class User(Base):
    __tablename__ = "users"

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
    full_name: Mapped[str] = mapped_column(String(255), default="")
    role: Mapped[UserRole] = mapped_column(
        Enum(UserRole, name="user_role", values_callable=lambda e: [x.value for x in e]),
        default=UserRole.MEMBER,
    )
    tenant_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
    )
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    email_verified: Mapped[bool] = mapped_column(Boolean, default=False)
Enter fullscreen mode Exit fullscreen mode

A user belongs to exactly one tenant. No multi-tenant users. If someone needs access to two organizations, create two accounts. This enormously simplifies the permissions model.


RBAC: 3 Roles, Zero Ambiguity

The Roles

Role Scope Can Do
super_admin Platform View all tenants, change plans, view global metrics
tenant_admin Their tenant CRUD KBs, documents, FAQs, invite users, view analytics
member Their tenant (restricted) Chat, view assigned KBs, feedback

Dependency Injection for Authorization

FastAPI has a dependency system that's perfect for RBAC. I created a dependency factory:

from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")


async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
):
    payload = decode_access_token(token)
    user_id = payload.get("sub")
    if not user_id:
        raise UnauthorizedException("Invalid token payload")

    user = await get_user_by_id(db, UUID(user_id))
    if not user:
        raise UnauthorizedException("User not found")
    if not user.is_active:
        raise UnauthorizedException("Account deactivated")

    # Verify tenant is active (super_admin bypasses)
    if user.role != UserRole.SUPER_ADMIN:
        tenant = await db.get(Tenant, user.tenant_id)
        if not tenant or not tenant.is_active:
            raise UnauthorizedException("Tenant deactivated")

    return user
Enter fullscreen mode Exit fullscreen mode

Subtle detail: super_admin bypasses the active tenant check. If we deactivate a malicious tenant, the super_admin can still get in to investigate.

Role Restriction Factory

def require_role(*roles):
    """Dependency factory: restricts endpoint to specific roles."""
    async def _check(current_user=Depends(get_current_user)):
        if current_user.role not in roles:
            raise ForbiddenException("You don't have permission for this action")
        return current_user
    return _check


# Shortcuts
def require_super_admin():
    return require_role(UserRole.SUPER_ADMIN)

def require_tenant_admin_or_above():
    return require_role(UserRole.SUPER_ADMIN, UserRole.TENANT_ADMIN)
Enter fullscreen mode Exit fullscreen mode

Usage in endpoints:

@router.get("/tenants", response_model=list[TenantResponse])
async def list_tenants(
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    if current_user.role != UserRole.SUPER_ADMIN:
        raise ForbiddenException("Only super_admin can list all tenants")
    return await tenant_service.list_tenants(db)


@router.put("/{kb_id}/access")
async def update_kb_access(
    kb_id: UUID,
    data: KBAccessConfig,
    db: AsyncSession = Depends(require_tenant_admin_or_above()),
):
    # Only tenant_admin or super_admin reach here
    ...
Enter fullscreen mode Exit fullscreen mode

Data Isolation: tenant_id Everywhere

The Pattern

Every table containing user data has a tenant_id column (direct or transitive). No exceptions.

class KnowledgeBase(Base):
    __tablename__ = "knowledge_bases"
    # ...
    tenant_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
    )

class ApiKey(Base):
    __tablename__ = "api_keys"
    # ...
    tenant_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
    )
Enter fullscreen mode Exit fullscreen mode

Filtering in Every Service

Every function that lists or searches data filters by the authenticated user's tenant_id:

async def list_knowledge_bases(db: AsyncSession, user: User) -> list[dict]:
    stmt = (
        select(KnowledgeBase, func.count(Document.id).label("document_count"))
        .outerjoin(Document, Document.knowledge_base_id == KnowledgeBase.id)
    )

    if user.role == UserRole.SUPER_ADMIN:
        # Super admin sees only their own KBs + system KBs
        stmt = stmt.where(or_(
            KnowledgeBase.tenant_id == user.tenant_id,
            KnowledgeBase.is_system == True,
        ))
    else:
        stmt = stmt.where(or_(
            KnowledgeBase.tenant_id == user.tenant_id,
            KnowledgeBase.is_system == True,
        ))
        # For members, apply access filter
        access_filter = await get_accessible_kb_filter(user)
        if access_filter is not None:
            stmt = stmt.where(or_(KnowledgeBase.is_system == True, access_filter))

    stmt = stmt.group_by(KnowledgeBase.id).order_by(KnowledgeBase.created_at.desc())
    result = await db.execute(stmt)
    # ...
Enter fullscreen mode Exit fullscreen mode

Important lesson: the super_admin does NOT see other tenants' data. They're the platform operator. They see aggregate statistics (total tenants, revenue, etc.) but can't read other tenants' conversations or KBs. This was a deliberate privacy decision.

Granular KB Access

Within the same tenant, not all members see all KBs. There's an access control system:

async def _check_kb_access(db: AsyncSession, kb: KnowledgeBase, user: User) -> None:
    has_access = await user_can_access_kb(db, user, kb)
    if not has_access:
        raise ForbiddenException("You don't have access to this knowledge base")
Enter fullscreen mode Exit fullscreen mode

KBs can be:

  • Public within the tenant: all members can see them
  • Private: only members assigned via groups can see them
  • System: system KBs, visible to all, only modifiable by super_admin

CASCADE: Clean Deletion

All foreign keys use ondelete="CASCADE". When a tenant is deleted, everything cascades: users, KBs, documents, chunks, conversations, API keys, usage logs. A single operation:

async def delete_tenant(db: AsyncSession, tenant_id: UUID, current_user: User) -> dict:
    tenant = await get_tenant(db, tenant_id)

    # Safety checks
    if current_user.tenant_id == tenant_id:
        raise BadRequestException("You cannot delete your own tenant")
    if tenant.slug == "system-tenant":
        raise ForbiddenException("Cannot delete the system tenant")

    # 1. Cancel active subscription
    try:
        await admin_cancel_subscription(db, tenant_id, immediate=True)
    except NotFoundException:
        pass

    # 2. Clean Redis cache
    redis = await get_redis()
    if redis:
        keys = await redis.keys(f"*:{tenant_id}:*")
        if keys:
            await redis.delete(*keys)

    # 3. DELETE → CASCADE handles the rest
    await db.delete(tenant)
    await db.commit()
Enter fullscreen mode Exit fullscreen mode

Plan Enforcement: Atomic Counters with UPSERT

The Problem

Imagine a Free-plan tenant has a limit of 50 messages/month. Two users send a message at the same time. If you do SELECT count → check → INSERT, you have a race condition: both read 49, both pass the check, both insert. Now there are 51.

The Solution: Atomic UPSERT

The tenant_monthly_usage table uses a UNIQUE constraint on (tenant_id, billing_period):

class TenantMonthlyUsage(Base):
    __tablename__ = "tenant_monthly_usage"

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    tenant_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
    )
    billing_period: Mapped[str] = mapped_column(String(10), nullable=False)
    messages_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    tokens_used: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)

    __table_args__ = (
        UniqueConstraint("tenant_id", "billing_period", name="uq_tenant_billing_usage"),
    )
Enter fullscreen mode Exit fullscreen mode

The increment uses INSERT ... ON CONFLICT DO UPDATE:

async def increment_message_count(db: AsyncSession, tenant_id: UUID) -> None:
    anchor = await _get_billing_anchor(db, tenant_id)
    bp = _get_billing_period_key(anchor)
    await db.execute(text("""
        INSERT INTO tenant_monthly_usage (id, tenant_id, billing_period, messages_count, tokens_used)
        VALUES (gen_random_uuid(), :tid, :bp, 1, 0)
        ON CONFLICT (tenant_id, billing_period)
        DO UPDATE SET messages_count = tenant_monthly_usage.messages_count + 1,
                      updated_at = now()
    """), {"tid": tenant_id, "bp": bp})
    await db.commit()
Enter fullscreen mode Exit fullscreen mode

Why does this work? PostgreSQL executes INSERT ... ON CONFLICT DO UPDATE atomically. There's no race condition window. If two requests arrive at the same time, one will INSERT and the other will UPDATE, but the counter will always be correct.

Anti-fraud detail: counters only increment. There's no endpoint to decrement. No way for a user to manipulate their usage. The only way to "reset" is when the billing period changes (i.e., a month passes).

Billing Period Calculation

def _get_billing_period_key(anchor: datetime) -> str:
    now = datetime.now(timezone.utc)
    billing_day = anchor.day

    # Clamping: if anchor is day 31 and we're in February (28 days),
    # effective billing day is 28
    max_day = calendar.monthrange(now.year, now.month)[1]
    effective_day = min(billing_day, max_day)

    if now.day >= effective_day:
        return f"{now.year}-{now.month:02d}-{effective_day:02d}"
    else:
        # Current period started last month
        if now.month == 1:
            prev_year, prev_month = now.year - 1, 12
        else:
            prev_year, prev_month = now.year, now.month - 1
        max_day_prev = calendar.monthrange(prev_year, prev_month)[1]
        effective_day_prev = min(billing_day, max_day_prev)
        return f"{prev_year}-{prev_month:02d}-{effective_day_prev:02d}"
Enter fullscreen mode Exit fullscreen mode

Why is this complex? Because months don't have the same number of days. If a tenant paid on January 31st, their next cycle doesn't start on February 31st (doesn't exist). The clamping handles this.

Limit Verification

Before executing the RAG pipeline, I verify limits:

async def check_message_limit(db: AsyncSession, tenant_id: UUID) -> None:
    plan, tenant = await _get_plan_and_tenant(db, tenant_id)
    if not plan:
        return
    if plan.max_messages_month == -1:
        return  # Unlimited
    usage = await get_monthly_usage(db, tenant_id)
    if usage["messages_month"] >= plan.max_messages_month:
        raise PlanLimitException(
            f"Monthly message limit reached "
            f"({plan.max_messages_month} max for {plan.name} plan)"
        )
Enter fullscreen mode Exit fullscreen mode

PlanLimitException returns HTTP 403, not 429. It's a plan restriction, not rate limiting. The frontend distinguishes between "upgrade your plan" (403) and "wait a moment" (429).

Limits on All Resources

Not just messages. Every resource has its check:

# Before creating a user
await check_user_limit(db, tenant_id)

# Before creating a KB
await check_kb_limit(db, tenant_id)

# Before uploading a document
await check_document_limit(db, tenant_id)
await check_storage_limit(db, tenant_id, new_file_size_bytes=file_size)

# Before creating an API key
await check_api_key_limit(db, tenant_id)

# Before sending a message to the LLM
await check_message_limit(db, tenant_id)
await check_concurrent_limit(db, tenant_id)
Enter fullscreen mode Exit fullscreen mode

The pattern is always the same:

async def check_kb_limit(db: AsyncSession, tenant_id: UUID) -> None:
    tenant = await get_tenant(db, tenant_id)
    if not tenant.plan_id:
        return
    plan = await db.get(Plan, tenant.plan_id)
    if not plan or plan.max_kbs == -1:
        return
    stmt = select(func.count(KnowledgeBase.id)).where(KnowledgeBase.tenant_id == tenant_id)
    result = await db.execute(stmt)
    current_count = result.scalar() or 0
    if current_count >= plan.max_kbs:
        raise PlanLimitException(
            f"Knowledge base limit reached ({plan.max_kbs} max)"
        )
Enter fullscreen mode Exit fullscreen mode

Rate Limiting with Redis: Sliding Window

Why Redis and Not PostgreSQL

Billing period counters are in PostgreSQL because they need durability. Rate limiting is in Redis because it needs speed and automatic TTL.

If the server restarts mid-stream, I don't want a "phantom" counter in PostgreSQL blocking the tenant. Redis with TTL self-cleans.

Sliding Window with Sorted Sets

import time
import redis.asyncio as redis

async def is_allowed(key: str, max_requests: int, window_seconds: int = 60) -> bool:
    client = await get_redis()
    if client is None:
        return True  # Fail-open

    redis_key = f"ratelimit:{key}"
    now = time.time()
    cutoff = now - window_seconds

    try:
        pipe = client.pipeline()
        pipe.zremrangebyscore(redis_key, 0, cutoff)   # Clean expired
        pipe.zcard(redis_key)                          # Count current
        pipe.zadd(redis_key, {str(now): now})          # Add this request
        pipe.expire(redis_key, window_seconds + 1)     # TTL safety net
        results = await pipe.execute()

        current_count = results[1]
        if current_count >= max_requests:
            await client.zrem(redis_key, str(now))     # Rollback
            return False
        return True
    except Exception:
        return True  # Fail-open
Enter fullscreen mode Exit fullscreen mode

Why sorted sets and not a simple INCR? Because I need a sliding window, not a fixed window. With INCR, if a user sends 60 requests at second 59 of the minute, and 60 more at second 1 of the next minute, 120 requests pass in 2 seconds. With sorted sets, each request has its timestamp and the window slides continuously.

Why fail-open? If Redis goes down, I prefer to let requests through (graceful degradation) than to block all users. Billing limits in PostgreSQL are still active as a safety net.

Stream Concurrency

To limit simultaneous SSE connections per tenant, I use a different pattern: INCR/DECR with safety TTL.

_CONCURRENT_TTL = 300  # 5 minutes

async def increment_concurrent(tenant_id: str) -> int:
    client = await get_redis()
    if client is None:
        return 0

    redis_key = f"concurrent:{tenant_id}"
    try:
        pipe = client.pipeline()
        pipe.incr(redis_key)
        pipe.expire(redis_key, _CONCURRENT_TTL)
        results = await pipe.execute()
        return results[0]
    except Exception:
        return 0


async def decrement_concurrent(tenant_id: str) -> int:
    client = await get_redis()
    if client is None:
        return 0

    redis_key = f"concurrent:{tenant_id}"
    try:
        count = await client.decr(redis_key)
        if count <= 0:
            await client.delete(redis_key)
            return 0
        await client.expire(redis_key, _CONCURRENT_TTL)
        return count
    except Exception:
        return 0
Enter fullscreen mode Exit fullscreen mode

Why 5-minute TTL? If the server crashes during a stream, the DECR never executes and the counter stays inflated. The TTL self-heals: after 5 minutes without activity, the counter deletes itself.

Usage in the chat endpoint:

@router.post("/conversations/{conv_id}/messages/stream")
async def stream_chat(conv_id: UUID, data: MessageCreate, ...):
    # Check LLM budget (soft check)
    faq_only_mode = await is_llm_budget_exhausted(db, current_user.tenant_id)
    if not faq_only_mode:
        await check_concurrent_limit(db, current_user.tenant_id)

    # ... prepare pipeline ...

    async def event_generator():
        await increment_concurrent(str(current_user.tenant_id))
        try:
            async for event in _event_generator_inner():
                yield event
        finally:
            # ALWAYS decrement, even on error
            await decrement_concurrent(str(current_user.tenant_id))

    return EventSourceResponse(event_generator())
Enter fullscreen mode Exit fullscreen mode

The finally is critical. Without it, any exception during streaming would leave the counter inflated.


Registration: One Tenant Per User, Automatic

When someone registers, a tenant with the Free plan is automatically created:

async def register(db: AsyncSession, data: UserRegister) -> User:
    existing = await get_user_by_email(db, data.email)
    if existing:
        raise BadRequestException("Email already registered")

    # Create tenant automatically
    tenant = await create_tenant(db, data.tenant_name, plan_slug="free")

    user = User(
        email=data.email,
        hashed_password=hash_password(data.password),
        full_name=data.full_name,
        role=UserRole.TENANT_ADMIN,  # Creator is admin
        tenant_id=tenant.id,
        email_verified=False,
    )
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user
Enter fullscreen mode Exit fullscreen mode

The tenant slug is auto-generated with deduplication:

def _generate_slug(name: str) -> str:
    slug = re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-')
    return slug[:80] if slug else 'tenant'


async def create_tenant(db: AsyncSession, name: str, plan_slug: str = "free") -> Tenant:
    # Find plan
    stmt = select(Plan).where(Plan.slug == plan_slug, Plan.is_active == True)
    result = await db.execute(stmt)
    plan = result.scalar_one_or_none()

    base_slug = _generate_slug(name)
    slug = base_slug

    # If exists, add random suffix
    existing = await db.execute(select(Tenant).where(Tenant.slug == slug))
    if existing.scalar_one_or_none():
        slug = f"{base_slug}-{uuid.uuid4().hex[:6]}"

    tenant = Tenant(name=name, slug=slug, plan_id=plan.id if plan else None)
    db.add(tenant)
    await db.flush()
    return tenant
Enter fullscreen mode Exit fullscreen mode

JWT: tenant_id in the Token

The access token includes tenant_id and role directly in the payload:

def create_access_token(user_id: str, tenant_id: str, role: str,
                        expire_minutes: int | None = None) -> str:
    minutes = expire_minutes or settings.access_token_expire_minutes
    expire = datetime.now(timezone.utc) + timedelta(minutes=minutes)
    payload = {
        "sub": user_id,
        "tenant_id": tenant_id,
        "role": role,
        "exp": expire,
        "type": "access",
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
Enter fullscreen mode Exit fullscreen mode

Why include tenant_id in the JWT? To avoid a DB query on every request just to find out which tenant the user belongs to. In get_current_user I still verify against the DB (in case the user was deactivated), but the token's tenant_id serves as a quick hint.

Refresh Token Rotation

Every time a refresh token is used, it's revoked and a new one is issued:

async def refresh_access_token(db: AsyncSession, refresh_token_value: str):
    stmt = select(RefreshToken).where(
        RefreshToken.token == refresh_token_value,
        RefreshToken.revoked == False,
    )
    result = await db.execute(stmt)
    token_record = result.scalar_one_or_none()

    if not token_record:
        raise UnauthorizedException("Invalid refresh token")

    # Revoke the old one (rotation)
    token_record.revoked = True

    user = await get_user_by_id(db, token_record.user_id)
    if not user or not user.is_active:
        await db.commit()
        raise UnauthorizedException("User not found or deactivated")

    # Issue new tokens
    new_access = create_access_token(str(user.id), str(user.tenant_id), user.role.value)
    new_refresh_value = create_refresh_token_value()
    new_refresh = RefreshToken(
        user_id=user.id,
        token=new_refresh_value,
        expires_at=datetime.now(timezone.utc) + timedelta(days=settings.refresh_token_expire_days),
    )
    db.add(new_refresh)
    await db.commit()

    return new_access, new_refresh_value
Enter fullscreen mode Exit fullscreen mode

If someone intercepts a refresh token and uses it, the original token gets revoked. When the legitimate user tries to refresh, it will fail and they'll know there was a compromise.


Threshold Warnings: Alert Before It Breaks

I don't wait until the tenant hits 100% of their limit. I implemented warnings at 80% and 95%:

async def get_usage_warnings(db: AsyncSession, tenant_id: UUID) -> list[dict]:
    plan, tenant = await _get_plan_and_tenant(db, tenant_id)
    if not plan:
        return []

    usage = await get_monthly_usage(db, tenant_id)
    warnings = []

    resources = [
        ("messages", usage["messages_month"], plan.max_messages_month),
        ("tokens", usage["tokens_month"], plan.max_tokens_month),
    ]

    for resource, used, max_val in resources:
        if max_val == -1 or max_val <= 0:
            continue
        pct = round((used / max_val) * 100, 1)
        if pct >= 95:
            warnings.append({
                "resource": resource, "used": used, "max": max_val,
                "pct": pct, "level": "critical"
            })
        elif pct >= 80:
            warnings.append({
                "resource": resource, "used": used, "max": max_val,
                "pct": pct, "level": "warning"
            })

    return warnings
Enter fullscreen mode Exit fullscreen mode

Warnings are sent by email to tenant admins, with deduplication to avoid spamming:

# Only send if:
# 1. Not sent for this billing period, OR
# 2. Escalating from warning → critical
if last_sent == bp and last_level == "critical":
    return  # Already sent critical for this period

# Save dedup marker
updated_prefs["_last_warning_bp"] = bp
updated_prefs["_last_warning_level"] = most_severe["level"]
Enter fullscreen mode Exit fullscreen mode

FAQ-Only Fallback: Graceful Degradation

When a tenant exhausts their LLM budget, I don't block everything. FAQs keep working:

async def is_llm_budget_exhausted(db: AsyncSession, tenant_id: UUID) -> bool:
    plan, tenant = await _get_plan_and_tenant(db, tenant_id)
    if not plan:
        return False

    usage = await get_monthly_usage(db, tenant_id)

    if plan.max_messages_month != -1 and usage["messages_month"] >= plan.max_messages_month:
        return True
    if plan.max_tokens_month != -1 and usage["tokens_month"] >= plan.max_tokens_month:
        return True
    return False
Enter fullscreen mode Exit fullscreen mode

In the chat flow, if is_llm_budget_exhausted returns True, FAQ match is attempted first. If it matches, free response. If not, an upgrade_required SSE event is sent and the frontend shows an upgrade card.


Custom Exceptions: Semantic Errors

class PlanLimitException(HTTPException):
    def __init__(self, detail: str = "Plan limit reached"):
        super().__init__(status_code=status.HTTP_403_FORBIDDEN, detail=detail)

class TooManyRequestsException(HTTPException):
    def __init__(self, detail: str = "Too many requests"):
        super().__init__(status_code=429, detail=detail)
Enter fullscreen mode Exit fullscreen mode

The frontend distinguishes:

  • 403 PlanLimit: "You've reached your plan limit. Upgrade available."
  • 429 RateLimit: "Too many requests. Try again in a few seconds."
  • 401 Unauthorized: "Session expired. Please log in again."

Numbers That Matter

Resource Free Starter Pro
Users 3 10 50
Knowledge Bases 3 10 -1 (unlimited)
Documents 20 100 -1
Storage 200 MB 1 GB 10 GB
AI Messages/month 50 500 5000
API Keys 1 5 20
Concurrency 2 5 20

Lessons Learned

1. UPSERT > SELECT + INSERT

Never do "read count → check → insert" for usage counters. The race condition is inevitable under load. PostgreSQL's atomic UPSERT is the only correct approach. Saved me hours of debugging "why does the tenant have 51 messages when the limit is 50".

2. Fail-Open for Rate Limiting, Fail-Closed for Billing

If Redis goes down, rate limits relax (fail-open) but plan limits in PostgreSQL remain active (fail-closed). Never let a downed dependency block all your users.

3. The Super Admin Isn't an Omniscient God

My first design gave the super_admin full access to all tenants' data. Bad. The super_admin is the platform operator, not the data owner. They see aggregate metrics, not individual conversations. This simplifies compliance and builds trust.

4. -1 for Unlimited Is Better Than a Boolean

Having max_kbs: int with -1 = unlimited is cleaner than max_kbs: int + is_kbs_unlimited: bool. One column, one check. Applied it to all plan limits.

5. Billing Period Based on Payment Date, Not Calendar Month

If a tenant paid on March 15th, their cycle runs from 3/15 to 4/14, not 3/1 to 3/31. Seems like a minor detail until a user complains that "I paid yesterday and already used half my quota".

6. TTL as Safety Net for Redis Counters

Redis concurrency counters need TTL. Without it, a server crash leaves "zombie" counters that block the tenant. With a 5-minute TTL, the system self-heals.

7. CASCADE Simplifies Deletion But You Must Clean Redis Too

PostgreSQL's ON DELETE CASCADE is wonderful for deleting everything related to a tenant. But it doesn't clean Redis. You have to do it explicitly. Found out when a deleted tenant was still counting rate limits.


What's Next

  • Audit log: Record who did what, when, on what resource
  • Fine-grained permissions: Permissions per KB and per action (not just global role)
  • Multi-region: Tenants assigned to specific regions for compliance

Conclusion

Real multi-tenancy isn't an extra column in the database. It's an enforcement system spanning from the data model to rate limiting, through authorization and billing. The most critical points:

  1. Atomic counters (UPSERT) so limits are respected under concurrency
  2. Redis for the ephemeral (rate limits, concurrency) and PostgreSQL for the durable (billing, usage)
  3. Clear roles with dependency injection that makes authorization impossible to skip
  4. Fail-open where it hurts little, fail-closed where it hurts a lot
  5. Isolation by default — every query filters by tenant_id, no exceptions

If you're building a multi-tenant SaaS, invest in enforcement from day 1. Refactoring this later is orders of magnitude more painful than doing it right from the start.


If you're building something multi-tenant and hit a problem I didn't cover, drop a comment. And if this article saved you time, a like helps it reach more people.

Top comments (0)