I implemented real multi-tenancy in a RAG SaaS engine: tenants with plans, quota enforcement with atomic counters (UPSERT), Redis rate limiting (sliding window), RBAC with 3 roles, and data isolation where every query is filtered by tenant_id. This article covers the full architecture, the pitfalls I avoided, and the key code.
Why "Adding a tenant_id" Isn't Multi-Tenancy
Most multi-tenancy tutorials stop at "add a tenant_id column and filter every query". That's 10% of the problem. In production you'll need:
- Plans with real limits that can't be bypassed with concurrent requests
- Atomic counters that don't lose increments under load
- Rate limiting that survives server restarts
- Roles that restrict actions, not just visibility
- Real isolation where a bug in one service doesn't expose another tenant's data
- Billing cycles that reset correctly each month
This article shows how I solved each one in a real system with async FastAPI and PostgreSQL.
The Stack
| Component | Technology |
|---|---|
| Backend | Python 3.12 + FastAPI (100% async) |
| Database | PostgreSQL 16 |
| ORM | SQLAlchemy async + Alembic |
| Cache/Rate limit | Redis 7 |
| Auth | JWT (access 30min, refresh 7d with rotation) |
| Passwords | bcrypt (via passlib) |
Database Design
The 4 Fundamental Tables
┌──────────────┐ ┌──────────────┐
│ plans │ │ tenants │
├──────────────┤ ├──────────────┤
│ id (UUID) │◄────│ plan_id (FK) │
│ name │ │ id (UUID) │
│ slug │ │ name │
│ max_users │ │ slug (unique)│
│ max_kbs │ │ is_active │
│ max_documents│ │ plan_started │
│ max_storage │ │ created_at │
│ max_messages │ └──────┬───────┘
│ max_tokens │ │
│ max_api_keys │ ┌──────┴───────┐
│ price_monthly│ │ users │
│ max_concurrent│ ├──────────────┤
└──────────────┘ │ id (UUID) │
│ tenant_id(FK)│
│ email │
│ role (enum) │
│ is_active │
└──────────────┘
┌───────────────────────┐
│ tenant_monthly_usage │
├───────────────────────┤
│ id (UUID) │
│ tenant_id (FK) │
│ billing_period (str) │
│ messages_count (int) │
│ tokens_used (bigint) │
│ UNIQUE(tenant_id, │
│ billing_period) │
└───────────────────────┘
Plan Model
Each plan defines all the limits the system will enforce:
class Plan(Base):
__tablename__ = "plans"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(100), nullable=False)
slug: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
max_users: Mapped[int] = mapped_column(Integer, nullable=False, default=3)
max_concurrent: Mapped[int] = mapped_column(Integer, nullable=False, default=2)
max_kbs: Mapped[int] = mapped_column(Integer, nullable=False, default=3)
max_documents: Mapped[int] = mapped_column(Integer, nullable=False, default=20)
max_storage_mb: Mapped[int] = mapped_column(Integer, nullable=False, default=200)
max_messages_month: Mapped[int] = mapped_column(Integer, nullable=False, default=500)
max_tokens_month: Mapped[int] = mapped_column(BigInteger, nullable=False, default=200000)
max_api_keys: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
price_monthly: Mapped[float | None] = mapped_column(Numeric(10, 2), nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
Key decision: -1 means unlimited. Instead of having an is_unlimited boolean per resource, I use -1 as a convention. Simplifies all comparisons:
if plan.max_users == -1:
return # Unlimited, skip check
Tenant Model
class Tenant(Base):
__tablename__ = "tenants"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(255), nullable=False)
slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
plan_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True), ForeignKey("plans.id"), nullable=True
)
plan_started_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, default=None
)
notification_preferences: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
plan_started_at is crucial: it defines the billing cycle anchor. When a tenant upgrades from Free to a paid plan, it resets to the payment timestamp. This determines when monthly counters reset.
User Model
class UserRole(str, PyEnum):
SUPER_ADMIN = "super_admin"
TENANT_ADMIN = "tenant_admin"
MEMBER = "member"
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
full_name: Mapped[str] = mapped_column(String(255), default="")
role: Mapped[UserRole] = mapped_column(
Enum(UserRole, name="user_role", values_callable=lambda e: [x.value for x in e]),
default=UserRole.MEMBER,
)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
email_verified: Mapped[bool] = mapped_column(Boolean, default=False)
A user belongs to exactly one tenant. No multi-tenant users. If someone needs access to two organizations, create two accounts. This enormously simplifies the permissions model.
RBAC: 3 Roles, Zero Ambiguity
The Roles
| Role | Scope | Can Do |
|---|---|---|
super_admin |
Platform | View all tenants, change plans, view global metrics |
tenant_admin |
Their tenant | CRUD KBs, documents, FAQs, invite users, view analytics |
member |
Their tenant (restricted) | Chat, view assigned KBs, feedback |
Dependency Injection for Authorization
FastAPI has a dependency system that's perfect for RBAC. I created a dependency factory:
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
):
payload = decode_access_token(token)
user_id = payload.get("sub")
if not user_id:
raise UnauthorizedException("Invalid token payload")
user = await get_user_by_id(db, UUID(user_id))
if not user:
raise UnauthorizedException("User not found")
if not user.is_active:
raise UnauthorizedException("Account deactivated")
# Verify tenant is active (super_admin bypasses)
if user.role != UserRole.SUPER_ADMIN:
tenant = await db.get(Tenant, user.tenant_id)
if not tenant or not tenant.is_active:
raise UnauthorizedException("Tenant deactivated")
return user
Subtle detail: super_admin bypasses the active tenant check. If we deactivate a malicious tenant, the super_admin can still get in to investigate.
Role Restriction Factory
def require_role(*roles):
"""Dependency factory: restricts endpoint to specific roles."""
async def _check(current_user=Depends(get_current_user)):
if current_user.role not in roles:
raise ForbiddenException("You don't have permission for this action")
return current_user
return _check
# Shortcuts
def require_super_admin():
return require_role(UserRole.SUPER_ADMIN)
def require_tenant_admin_or_above():
return require_role(UserRole.SUPER_ADMIN, UserRole.TENANT_ADMIN)
Usage in endpoints:
@router.get("/tenants", response_model=list[TenantResponse])
async def list_tenants(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role != UserRole.SUPER_ADMIN:
raise ForbiddenException("Only super_admin can list all tenants")
return await tenant_service.list_tenants(db)
@router.put("/{kb_id}/access")
async def update_kb_access(
kb_id: UUID,
data: KBAccessConfig,
db: AsyncSession = Depends(require_tenant_admin_or_above()),
):
# Only tenant_admin or super_admin reach here
...
Data Isolation: tenant_id Everywhere
The Pattern
Every table containing user data has a tenant_id column (direct or transitive). No exceptions.
class KnowledgeBase(Base):
__tablename__ = "knowledge_bases"
# ...
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
)
class ApiKey(Base):
__tablename__ = "api_keys"
# ...
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
)
Filtering in Every Service
Every function that lists or searches data filters by the authenticated user's tenant_id:
async def list_knowledge_bases(db: AsyncSession, user: User) -> list[dict]:
stmt = (
select(KnowledgeBase, func.count(Document.id).label("document_count"))
.outerjoin(Document, Document.knowledge_base_id == KnowledgeBase.id)
)
if user.role == UserRole.SUPER_ADMIN:
# Super admin sees only their own KBs + system KBs
stmt = stmt.where(or_(
KnowledgeBase.tenant_id == user.tenant_id,
KnowledgeBase.is_system == True,
))
else:
stmt = stmt.where(or_(
KnowledgeBase.tenant_id == user.tenant_id,
KnowledgeBase.is_system == True,
))
# For members, apply access filter
access_filter = await get_accessible_kb_filter(user)
if access_filter is not None:
stmt = stmt.where(or_(KnowledgeBase.is_system == True, access_filter))
stmt = stmt.group_by(KnowledgeBase.id).order_by(KnowledgeBase.created_at.desc())
result = await db.execute(stmt)
# ...
Important lesson: the super_admin does NOT see other tenants' data. They're the platform operator. They see aggregate statistics (total tenants, revenue, etc.) but can't read other tenants' conversations or KBs. This was a deliberate privacy decision.
Granular KB Access
Within the same tenant, not all members see all KBs. There's an access control system:
async def _check_kb_access(db: AsyncSession, kb: KnowledgeBase, user: User) -> None:
has_access = await user_can_access_kb(db, user, kb)
if not has_access:
raise ForbiddenException("You don't have access to this knowledge base")
KBs can be:
- Public within the tenant: all members can see them
- Private: only members assigned via groups can see them
- System: system KBs, visible to all, only modifiable by super_admin
CASCADE: Clean Deletion
All foreign keys use ondelete="CASCADE". When a tenant is deleted, everything cascades: users, KBs, documents, chunks, conversations, API keys, usage logs. A single operation:
async def delete_tenant(db: AsyncSession, tenant_id: UUID, current_user: User) -> dict:
tenant = await get_tenant(db, tenant_id)
# Safety checks
if current_user.tenant_id == tenant_id:
raise BadRequestException("You cannot delete your own tenant")
if tenant.slug == "system-tenant":
raise ForbiddenException("Cannot delete the system tenant")
# 1. Cancel active subscription
try:
await admin_cancel_subscription(db, tenant_id, immediate=True)
except NotFoundException:
pass
# 2. Clean Redis cache
redis = await get_redis()
if redis:
keys = await redis.keys(f"*:{tenant_id}:*")
if keys:
await redis.delete(*keys)
# 3. DELETE → CASCADE handles the rest
await db.delete(tenant)
await db.commit()
Plan Enforcement: Atomic Counters with UPSERT
The Problem
Imagine a Free-plan tenant has a limit of 50 messages/month. Two users send a message at the same time. If you do SELECT count → check → INSERT, you have a race condition: both read 49, both pass the check, both insert. Now there are 51.
The Solution: Atomic UPSERT
The tenant_monthly_usage table uses a UNIQUE constraint on (tenant_id, billing_period):
class TenantMonthlyUsage(Base):
__tablename__ = "tenant_monthly_usage"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False
)
billing_period: Mapped[str] = mapped_column(String(10), nullable=False)
messages_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
tokens_used: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
__table_args__ = (
UniqueConstraint("tenant_id", "billing_period", name="uq_tenant_billing_usage"),
)
The increment uses INSERT ... ON CONFLICT DO UPDATE:
async def increment_message_count(db: AsyncSession, tenant_id: UUID) -> None:
anchor = await _get_billing_anchor(db, tenant_id)
bp = _get_billing_period_key(anchor)
await db.execute(text("""
INSERT INTO tenant_monthly_usage (id, tenant_id, billing_period, messages_count, tokens_used)
VALUES (gen_random_uuid(), :tid, :bp, 1, 0)
ON CONFLICT (tenant_id, billing_period)
DO UPDATE SET messages_count = tenant_monthly_usage.messages_count + 1,
updated_at = now()
"""), {"tid": tenant_id, "bp": bp})
await db.commit()
Why does this work? PostgreSQL executes INSERT ... ON CONFLICT DO UPDATE atomically. There's no race condition window. If two requests arrive at the same time, one will INSERT and the other will UPDATE, but the counter will always be correct.
Anti-fraud detail: counters only increment. There's no endpoint to decrement. No way for a user to manipulate their usage. The only way to "reset" is when the billing period changes (i.e., a month passes).
Billing Period Calculation
def _get_billing_period_key(anchor: datetime) -> str:
now = datetime.now(timezone.utc)
billing_day = anchor.day
# Clamping: if anchor is day 31 and we're in February (28 days),
# effective billing day is 28
max_day = calendar.monthrange(now.year, now.month)[1]
effective_day = min(billing_day, max_day)
if now.day >= effective_day:
return f"{now.year}-{now.month:02d}-{effective_day:02d}"
else:
# Current period started last month
if now.month == 1:
prev_year, prev_month = now.year - 1, 12
else:
prev_year, prev_month = now.year, now.month - 1
max_day_prev = calendar.monthrange(prev_year, prev_month)[1]
effective_day_prev = min(billing_day, max_day_prev)
return f"{prev_year}-{prev_month:02d}-{effective_day_prev:02d}"
Why is this complex? Because months don't have the same number of days. If a tenant paid on January 31st, their next cycle doesn't start on February 31st (doesn't exist). The clamping handles this.
Limit Verification
Before executing the RAG pipeline, I verify limits:
async def check_message_limit(db: AsyncSession, tenant_id: UUID) -> None:
plan, tenant = await _get_plan_and_tenant(db, tenant_id)
if not plan:
return
if plan.max_messages_month == -1:
return # Unlimited
usage = await get_monthly_usage(db, tenant_id)
if usage["messages_month"] >= plan.max_messages_month:
raise PlanLimitException(
f"Monthly message limit reached "
f"({plan.max_messages_month} max for {plan.name} plan)"
)
PlanLimitException returns HTTP 403, not 429. It's a plan restriction, not rate limiting. The frontend distinguishes between "upgrade your plan" (403) and "wait a moment" (429).
Limits on All Resources
Not just messages. Every resource has its check:
# Before creating a user
await check_user_limit(db, tenant_id)
# Before creating a KB
await check_kb_limit(db, tenant_id)
# Before uploading a document
await check_document_limit(db, tenant_id)
await check_storage_limit(db, tenant_id, new_file_size_bytes=file_size)
# Before creating an API key
await check_api_key_limit(db, tenant_id)
# Before sending a message to the LLM
await check_message_limit(db, tenant_id)
await check_concurrent_limit(db, tenant_id)
The pattern is always the same:
async def check_kb_limit(db: AsyncSession, tenant_id: UUID) -> None:
tenant = await get_tenant(db, tenant_id)
if not tenant.plan_id:
return
plan = await db.get(Plan, tenant.plan_id)
if not plan or plan.max_kbs == -1:
return
stmt = select(func.count(KnowledgeBase.id)).where(KnowledgeBase.tenant_id == tenant_id)
result = await db.execute(stmt)
current_count = result.scalar() or 0
if current_count >= plan.max_kbs:
raise PlanLimitException(
f"Knowledge base limit reached ({plan.max_kbs} max)"
)
Rate Limiting with Redis: Sliding Window
Why Redis and Not PostgreSQL
Billing period counters are in PostgreSQL because they need durability. Rate limiting is in Redis because it needs speed and automatic TTL.
If the server restarts mid-stream, I don't want a "phantom" counter in PostgreSQL blocking the tenant. Redis with TTL self-cleans.
Sliding Window with Sorted Sets
import time
import redis.asyncio as redis
async def is_allowed(key: str, max_requests: int, window_seconds: int = 60) -> bool:
client = await get_redis()
if client is None:
return True # Fail-open
redis_key = f"ratelimit:{key}"
now = time.time()
cutoff = now - window_seconds
try:
pipe = client.pipeline()
pipe.zremrangebyscore(redis_key, 0, cutoff) # Clean expired
pipe.zcard(redis_key) # Count current
pipe.zadd(redis_key, {str(now): now}) # Add this request
pipe.expire(redis_key, window_seconds + 1) # TTL safety net
results = await pipe.execute()
current_count = results[1]
if current_count >= max_requests:
await client.zrem(redis_key, str(now)) # Rollback
return False
return True
except Exception:
return True # Fail-open
Why sorted sets and not a simple INCR? Because I need a sliding window, not a fixed window. With INCR, if a user sends 60 requests at second 59 of the minute, and 60 more at second 1 of the next minute, 120 requests pass in 2 seconds. With sorted sets, each request has its timestamp and the window slides continuously.
Why fail-open? If Redis goes down, I prefer to let requests through (graceful degradation) than to block all users. Billing limits in PostgreSQL are still active as a safety net.
Stream Concurrency
To limit simultaneous SSE connections per tenant, I use a different pattern: INCR/DECR with safety TTL.
_CONCURRENT_TTL = 300 # 5 minutes
async def increment_concurrent(tenant_id: str) -> int:
client = await get_redis()
if client is None:
return 0
redis_key = f"concurrent:{tenant_id}"
try:
pipe = client.pipeline()
pipe.incr(redis_key)
pipe.expire(redis_key, _CONCURRENT_TTL)
results = await pipe.execute()
return results[0]
except Exception:
return 0
async def decrement_concurrent(tenant_id: str) -> int:
client = await get_redis()
if client is None:
return 0
redis_key = f"concurrent:{tenant_id}"
try:
count = await client.decr(redis_key)
if count <= 0:
await client.delete(redis_key)
return 0
await client.expire(redis_key, _CONCURRENT_TTL)
return count
except Exception:
return 0
Why 5-minute TTL? If the server crashes during a stream, the DECR never executes and the counter stays inflated. The TTL self-heals: after 5 minutes without activity, the counter deletes itself.
Usage in the chat endpoint:
@router.post("/conversations/{conv_id}/messages/stream")
async def stream_chat(conv_id: UUID, data: MessageCreate, ...):
# Check LLM budget (soft check)
faq_only_mode = await is_llm_budget_exhausted(db, current_user.tenant_id)
if not faq_only_mode:
await check_concurrent_limit(db, current_user.tenant_id)
# ... prepare pipeline ...
async def event_generator():
await increment_concurrent(str(current_user.tenant_id))
try:
async for event in _event_generator_inner():
yield event
finally:
# ALWAYS decrement, even on error
await decrement_concurrent(str(current_user.tenant_id))
return EventSourceResponse(event_generator())
The finally is critical. Without it, any exception during streaming would leave the counter inflated.
Registration: One Tenant Per User, Automatic
When someone registers, a tenant with the Free plan is automatically created:
async def register(db: AsyncSession, data: UserRegister) -> User:
existing = await get_user_by_email(db, data.email)
if existing:
raise BadRequestException("Email already registered")
# Create tenant automatically
tenant = await create_tenant(db, data.tenant_name, plan_slug="free")
user = User(
email=data.email,
hashed_password=hash_password(data.password),
full_name=data.full_name,
role=UserRole.TENANT_ADMIN, # Creator is admin
tenant_id=tenant.id,
email_verified=False,
)
db.add(user)
await db.commit()
await db.refresh(user)
return user
The tenant slug is auto-generated with deduplication:
def _generate_slug(name: str) -> str:
slug = re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-')
return slug[:80] if slug else 'tenant'
async def create_tenant(db: AsyncSession, name: str, plan_slug: str = "free") -> Tenant:
# Find plan
stmt = select(Plan).where(Plan.slug == plan_slug, Plan.is_active == True)
result = await db.execute(stmt)
plan = result.scalar_one_or_none()
base_slug = _generate_slug(name)
slug = base_slug
# If exists, add random suffix
existing = await db.execute(select(Tenant).where(Tenant.slug == slug))
if existing.scalar_one_or_none():
slug = f"{base_slug}-{uuid.uuid4().hex[:6]}"
tenant = Tenant(name=name, slug=slug, plan_id=plan.id if plan else None)
db.add(tenant)
await db.flush()
return tenant
JWT: tenant_id in the Token
The access token includes tenant_id and role directly in the payload:
def create_access_token(user_id: str, tenant_id: str, role: str,
expire_minutes: int | None = None) -> str:
minutes = expire_minutes or settings.access_token_expire_minutes
expire = datetime.now(timezone.utc) + timedelta(minutes=minutes)
payload = {
"sub": user_id,
"tenant_id": tenant_id,
"role": role,
"exp": expire,
"type": "access",
}
return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
Why include tenant_id in the JWT? To avoid a DB query on every request just to find out which tenant the user belongs to. In get_current_user I still verify against the DB (in case the user was deactivated), but the token's tenant_id serves as a quick hint.
Refresh Token Rotation
Every time a refresh token is used, it's revoked and a new one is issued:
async def refresh_access_token(db: AsyncSession, refresh_token_value: str):
stmt = select(RefreshToken).where(
RefreshToken.token == refresh_token_value,
RefreshToken.revoked == False,
)
result = await db.execute(stmt)
token_record = result.scalar_one_or_none()
if not token_record:
raise UnauthorizedException("Invalid refresh token")
# Revoke the old one (rotation)
token_record.revoked = True
user = await get_user_by_id(db, token_record.user_id)
if not user or not user.is_active:
await db.commit()
raise UnauthorizedException("User not found or deactivated")
# Issue new tokens
new_access = create_access_token(str(user.id), str(user.tenant_id), user.role.value)
new_refresh_value = create_refresh_token_value()
new_refresh = RefreshToken(
user_id=user.id,
token=new_refresh_value,
expires_at=datetime.now(timezone.utc) + timedelta(days=settings.refresh_token_expire_days),
)
db.add(new_refresh)
await db.commit()
return new_access, new_refresh_value
If someone intercepts a refresh token and uses it, the original token gets revoked. When the legitimate user tries to refresh, it will fail and they'll know there was a compromise.
Threshold Warnings: Alert Before It Breaks
I don't wait until the tenant hits 100% of their limit. I implemented warnings at 80% and 95%:
async def get_usage_warnings(db: AsyncSession, tenant_id: UUID) -> list[dict]:
plan, tenant = await _get_plan_and_tenant(db, tenant_id)
if not plan:
return []
usage = await get_monthly_usage(db, tenant_id)
warnings = []
resources = [
("messages", usage["messages_month"], plan.max_messages_month),
("tokens", usage["tokens_month"], plan.max_tokens_month),
]
for resource, used, max_val in resources:
if max_val == -1 or max_val <= 0:
continue
pct = round((used / max_val) * 100, 1)
if pct >= 95:
warnings.append({
"resource": resource, "used": used, "max": max_val,
"pct": pct, "level": "critical"
})
elif pct >= 80:
warnings.append({
"resource": resource, "used": used, "max": max_val,
"pct": pct, "level": "warning"
})
return warnings
Warnings are sent by email to tenant admins, with deduplication to avoid spamming:
# Only send if:
# 1. Not sent for this billing period, OR
# 2. Escalating from warning → critical
if last_sent == bp and last_level == "critical":
return # Already sent critical for this period
# Save dedup marker
updated_prefs["_last_warning_bp"] = bp
updated_prefs["_last_warning_level"] = most_severe["level"]
FAQ-Only Fallback: Graceful Degradation
When a tenant exhausts their LLM budget, I don't block everything. FAQs keep working:
async def is_llm_budget_exhausted(db: AsyncSession, tenant_id: UUID) -> bool:
plan, tenant = await _get_plan_and_tenant(db, tenant_id)
if not plan:
return False
usage = await get_monthly_usage(db, tenant_id)
if plan.max_messages_month != -1 and usage["messages_month"] >= plan.max_messages_month:
return True
if plan.max_tokens_month != -1 and usage["tokens_month"] >= plan.max_tokens_month:
return True
return False
In the chat flow, if is_llm_budget_exhausted returns True, FAQ match is attempted first. If it matches, free response. If not, an upgrade_required SSE event is sent and the frontend shows an upgrade card.
Custom Exceptions: Semantic Errors
class PlanLimitException(HTTPException):
def __init__(self, detail: str = "Plan limit reached"):
super().__init__(status_code=status.HTTP_403_FORBIDDEN, detail=detail)
class TooManyRequestsException(HTTPException):
def __init__(self, detail: str = "Too many requests"):
super().__init__(status_code=429, detail=detail)
The frontend distinguishes:
- 403 PlanLimit: "You've reached your plan limit. Upgrade available."
- 429 RateLimit: "Too many requests. Try again in a few seconds."
- 401 Unauthorized: "Session expired. Please log in again."
Numbers That Matter
| Resource | Free | Starter | Pro |
|---|---|---|---|
| Users | 3 | 10 | 50 |
| Knowledge Bases | 3 | 10 | -1 (unlimited) |
| Documents | 20 | 100 | -1 |
| Storage | 200 MB | 1 GB | 10 GB |
| AI Messages/month | 50 | 500 | 5000 |
| API Keys | 1 | 5 | 20 |
| Concurrency | 2 | 5 | 20 |
Lessons Learned
1. UPSERT > SELECT + INSERT
Never do "read count → check → insert" for usage counters. The race condition is inevitable under load. PostgreSQL's atomic UPSERT is the only correct approach. Saved me hours of debugging "why does the tenant have 51 messages when the limit is 50".
2. Fail-Open for Rate Limiting, Fail-Closed for Billing
If Redis goes down, rate limits relax (fail-open) but plan limits in PostgreSQL remain active (fail-closed). Never let a downed dependency block all your users.
3. The Super Admin Isn't an Omniscient God
My first design gave the super_admin full access to all tenants' data. Bad. The super_admin is the platform operator, not the data owner. They see aggregate metrics, not individual conversations. This simplifies compliance and builds trust.
4. -1 for Unlimited Is Better Than a Boolean
Having max_kbs: int with -1 = unlimited is cleaner than max_kbs: int + is_kbs_unlimited: bool. One column, one check. Applied it to all plan limits.
5. Billing Period Based on Payment Date, Not Calendar Month
If a tenant paid on March 15th, their cycle runs from 3/15 to 4/14, not 3/1 to 3/31. Seems like a minor detail until a user complains that "I paid yesterday and already used half my quota".
6. TTL as Safety Net for Redis Counters
Redis concurrency counters need TTL. Without it, a server crash leaves "zombie" counters that block the tenant. With a 5-minute TTL, the system self-heals.
7. CASCADE Simplifies Deletion But You Must Clean Redis Too
PostgreSQL's ON DELETE CASCADE is wonderful for deleting everything related to a tenant. But it doesn't clean Redis. You have to do it explicitly. Found out when a deleted tenant was still counting rate limits.
What's Next
- Audit log: Record who did what, when, on what resource
- Fine-grained permissions: Permissions per KB and per action (not just global role)
- Multi-region: Tenants assigned to specific regions for compliance
Conclusion
Real multi-tenancy isn't an extra column in the database. It's an enforcement system spanning from the data model to rate limiting, through authorization and billing. The most critical points:
- Atomic counters (UPSERT) so limits are respected under concurrency
- Redis for the ephemeral (rate limits, concurrency) and PostgreSQL for the durable (billing, usage)
- Clear roles with dependency injection that makes authorization impossible to skip
- Fail-open where it hurts little, fail-closed where it hurts a lot
- Isolation by default — every query filters by tenant_id, no exceptions
If you're building a multi-tenant SaaS, invest in enforcement from day 1. Refactoring this later is orders of magnitude more painful than doing it right from the start.
If you're building something multi-tenant and hit a problem I didn't cover, drop a comment. And if this article saved you time, a like helps it reach more people.
Top comments (0)