Part of my 100 Days of Code journey. Today we tackle one of the most important database patterns.
The Challenge
Implement a tagging system with many-to-many relationships and add bulk operations for efficiency.
The Problem: Tasks need flexible organization. Fixed categories don't work because:
- One task can belong to multiple categories (urgent AND work AND client-facing)
- Categories can have multiple tasks
- Users want custom labels, not predefined options
The Solution: Many-to-many relationships via junction tables, plus bulk operations for power users.
Why Many-to-Many Matters
One-to-Many relationships (what we've used so far):
User ──┬─→ Task 1
├─→ Task 2
└─→ Task 3
Each task has ONE user. Simple!
Many-to-Many relationships (what we need):
Task 1 ──┬─→ Tag: urgent
└─→ Tag: work
Task 2 ──┬─→ Tag: urgent
├─→ Tag: personal
└─→ Tag: home
Tag: urgent ──┬─→ Task 1
└─→ Task 2
Problem: Can't use a simple foreign key!
If we add tag_id to tasks table:
- ❌ Task can only have ONE tag
- ❌ Need multiple
tag_idcolumns (tag_id_1, tag_id_2...) - terrible design!
Solution: Association table (junction table)
Understanding Association Tables
An association table (also called junction table or link table) connects two tables in a many-to-many relationship.
Three tables:
tasks table:
| id | title | owner_id |
|----|-------|----------|
| 1 | Meeting | user-a |
| 2 | Report | user-a |
| 3 | Email | user-a |
tags table:
| id | name | color | owner_id |
|----|------|-------|----------|
| A | urgent | #FF0000 | user-a |
| B | work | #0000FF | user-a |
| C | personal | #00FF00 | user-a |
task_tags table (association):
| task_id | tag_id |
|---------|--------|
| 1 | A |
| 1 | B |
| 2 | A |
| 3 | C |
What this means:
- Task 1 (Meeting) has tags: urgent, work
- Task 2 (Report) has tags: urgent
- Task 3 (Email) has tags: personal
- Tag A (urgent) is on: Task 1, Task 2
- Tag B (work) is on: Task 1
Perfect flexibility!
Step-by-Step Implementation
Step 1: Creating the Association Table
from sqlalchemy import Table, Column, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
task_tags = Table(
"task_tags",
Base.metadata,
Column("task_id", UUID(as_uuid=True), ForeignKey("tasks.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", UUID(as_uuid=True), ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)
Breaking it down:
Why Table() instead of a class?
# Simple association (just links)
task_tags = Table(...) # Recommended!
# Complex association (with extra fields like created_at, priority)
class TaskTag(Base): # Use when needed
__tablename__ = "task_tags"
task_id = Column(...)
tag_id = Column(...)
priority = Column(Integer)
created_at = Column(DateTime)
For pure many-to-many (just linking), Table() is cleaner.
Composite primary key:
primary_key=True # On task_id
primary_key=True # On tag_id
Both together form the primary key! This means:
- ✅ Can't add same tag twice to same task
- ✅ (task_id=1, tag_id=A) can exist once
- ✅ (task_id=1, tag_id=B) is different, allowed
CASCADE deletes:
ForeignKey("tasks.id", ondelete="CASCADE")
What happens:
- Task deleted → All task_tags entries with that task_id deleted
- Tag deleted → All task_tags entries with that tag_id deleted
- No orphaned associations!
Without CASCADE:
Task 1 deleted
task_tags still has: (task_id=1, tag_id=A) # Orphan!
Next query: "Show tasks with tag A"
Result: Error! task_id=1 doesn't exist!
With CASCADE:
Task 1 deleted
Database automatically deletes: (task_id=1, tag_id=A), (task_id=1, tag_id=B)
Clean! No orphans!
Step 2: Creating the Tag Model
class Tag(Base):
__tablename__ = "tags"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(50), nullable=False)
color = Column(String(7)) # Hex color: #RRGGBB
created_at = Column(DateTime, default=datetime.utcnow)
owner_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"))
# Relationships
owner = relationship("User", back_populates="tags")
tasks = relationship("Task", secondary=task_tags, back_populates="tags")
# Unique constraint: user can't have duplicate tag names
__table_args__ = (
Index("idx_owner_tag", "owner_id", "name", unique=True),
)
Understanding the relationship:
tasks = relationship("Task", secondary=task_tags, back_populates="tags")
-
"Task": Links to Task model -
secondary=task_tags: Use task_tags as junction table -
back_populates="tags": Links totagsfield in Task model
The magic:
tag = db.query(Tag).first()
print(tag.tasks) # [Task(...), Task(...)]
# SQLAlchemy auto-generates SQL:
# SELECT tasks.*
# FROM tasks
# JOIN task_tags ON tasks.id = task_tags.task_id
# WHERE task_tags.tag_id = 'tag-uuid'
We didn't write that JOIN! SQLAlchemy did!
Composite unique constraint:
Index("idx_owner_tag", "owner_id", "name", unique=True)
What it prevents:
User A creates tag "urgent" ✅
User A creates tag "urgent" again ❌ Error: duplicate
User B creates tag "urgent" ✅ Different user, allowed!
Step 3: Updating the Task Model
class Task(Base):
__tablename__ = "tasks"
# ... existing fields ...
# Relationships
owner = relationship("User", back_populates="tasks")
tags = relationship("Tag", secondary=task_tags, back_populates="tasks")
Bidirectional relationship:
From Task:
task = db.query(Task).first()
print(task.tags) # [Tag(name="urgent"), Tag(name="work")]
From Tag:
tag = db.query(Tag).filter(Tag.name == "urgent").first()
print(tag.tasks) # [Task(title="Meeting"), Task(title="Report")]
Both directions work!
Step 4: Creating Tags
@router.post("/tags", response_model=TagResponse)
def create_tag(tag: TagCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
# Check if tag already exists for this user
existing = db.query(Tag).filter(
Tag.owner_id == current_user.id,
Tag.name == tag.name
).first()
if existing:
raise HTTPException(400, "Tag already exists")
new_tag = Tag(
name=tag.name,
color=tag.color,
owner_id=current_user.id
)
db.add(new_tag)
db.commit()
db.refresh(new_tag)
return new_tag
Why check for existing tag?
Without check:
POST /tags {"name": "urgent", "color": "#FF0000"} ✅
POST /tags {"name": "urgent", "color": "#0000FF"} 💥 Database error!
Database throws error because of unique constraint, but:
- Error message is cryptic
- HTTP 500 instead of 400
- Bad user experience
With check:
POST /tags {"name": "urgent"} ✅
POST /tags {"name": "urgent"} ✅ 400 Bad Request "Tag already exists"
Clean error handling!
Step 5: Creating Tasks with Tags
@router.post("/tasks", response_model=TaskResponse)
def create_task(task: TaskCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
new_task = Task(
title=task.title,
description=task.description,
owner_id=current_user.id
)
# Add tags if provided
if task.tag_ids:
tags = db.query(Tag).filter(
Tag.id.in_(task.tag_ids),
Tag.owner_id == current_user.id
).all()
if len(tags) != len(task.tag_ids):
raise HTTPException(400, "One or more tags not found")
new_task.tags = tags
db.add(new_task)
db.commit()
db.refresh(new_task)
return new_task
Understanding id.in_():
Tag.id.in_(["uuid1", "uuid2", "uuid3"])
SQL generated:
SELECT * FROM tags
WHERE id IN ('uuid1', 'uuid2', 'uuid3')
AND owner_id = 'current-user-uuid'
One query gets all tags!
Setting tags:
new_task.tags = tags
What SQLAlchemy does:
- Creates task in tasks table
- For each tag, creates entry in task_tags table:
- INSERT INTO task_tags (task_id, tag_id) VALUES (new_task.id, tag.id)
All automatic!
Validation:
if len(tags) != len(task.tag_ids):
raise HTTPException(400, "One or more tags not found")
If user sends 3 tag IDs but only 2 exist → error!
Step 6: Filtering Tasks by Tags
@router.get("/tasks")
def get_tasks(
tag_id: Optional[UUID] = Query(None),
tag_name: Optional[str] = Query(None),
...
):
query = db.query(Task).filter(Task.owner_id == current_user.id)
# Filter by tag ID
if tag_id:
query = query.filter(Task.tags.any(Tag.id == tag_id))
# Filter by tag name
if tag_name:
query = query.filter(Task.tags.any(Tag.name == tag_name.lower()))
return query.all()
Understanding .any():
Task.tags.any(Tag.id == tag_id)
Translation: "Get tasks where ANY of their tags has this ID"
SQL generated:
SELECT tasks.*
FROM tasks
WHERE tasks.owner_id = 'user-uuid'
AND EXISTS (
SELECT 1
FROM task_tags
JOIN tags ON tags.id = task_tags.tag_id
WHERE task_tags.task_id = tasks.id
AND tags.id = 'tag-uuid'
)
Example:
# Get all tasks tagged "urgent"
GET /tasks?tag_name=urgent
# Get all urgent tasks that are incomplete
GET /tasks?tag_name=urgent&completed=false
# Get all work tasks, sorted by date
GET /tasks?tag_name=work&sort_by=created_at&order=desc
Filters stack beautifully!
Step 7: Adding/Removing Tags from Tasks
Add tags:
@router.post("/tasks/{task_id}/tags")
def add_tags_to_task(task_id: UUID, tag_data: BulkUpdateTags, ...):
task = db.query(Task).filter(
Task.id == task_id,
Task.owner_id == current_user.id
).first()
if not task:
raise HTTPException(404, "Task not found")
tags = db.query(Tag).filter(
Tag.id.in_(tag_data.tag_ids),
Tag.owner_id == current_user.id
).all()
for tag in tags:
if tag not in task.tags:
task.tags.append(tag)
db.commit()
db.refresh(task)
return task
Why check if tag not in task.tags?
Without check:
task.tags.append(tag) # Adds tag
task.tags.append(tag) # Adds again!
# Database error: duplicate primary key (task_id, tag_id)
With check:
if tag not in task.tags:
task.tags.append(tag) # Safe!
Remove tag:
@router.delete("/tasks/{task_id}/tags/{tag_id}")
def remove_tag_from_task(task_id: UUID, tag_id: UUID, ...):
task = ...
tag = ...
if tag in task.tags:
task.tags.remove(tag)
db.commit()
return task
What remove() does:
task.tags.remove(tag)
SQLAlchemy:
DELETE FROM task_tags
WHERE task_id = 'task-uuid'
AND tag_id = 'tag-uuid'
Association removed!
Step 8: Bulk Operations
Bulk complete:
@router.post("/tasks/bulk/complete")
def bulk_complete_tasks(bulk_data: BulkTaskIds, ...):
updated = db.query(Task).filter(
Task.id.in_(bulk_data.task_ids),
Task.owner_id == current_user.id
).update({"completed": True}, synchronize_session=False)
db.commit()
return {
"updated": updated,
"message": f"{updated} tasks marked complete"
}
Why bulk operations matter:
Without bulk (N requests):
// Frontend makes 50 API calls
taskIds.forEach(id => {
fetch(`/tasks/${id}`, {
method: 'PATCH',
body: JSON.stringify({completed: true})
})
})
// Result:
// - 50 HTTP requests
// - 50 SQL queries
// - ~5 seconds total
With bulk (1 request):
// Frontend makes 1 API call
fetch('/tasks/bulk/complete', {
method: 'POST',
body: JSON.stringify({task_ids: taskIds})
})
// Result:
// - 1 HTTP request
// - 1 SQL query
// - ~0.1 seconds total
50x faster!
The SQL:
UPDATE tasks
SET completed = true
WHERE id IN ('uuid1', 'uuid2', ..., 'uuid50')
AND owner_id = 'user-uuid'
One query updates 50 tasks!
synchronize_session=False:
.update({...}, synchronize_session=False)
What it means:
- We're doing bulk update
- Don't update ORM objects in memory
- Faster (we don't need those objects anyway)
Use when you don't need the updated objects returned.
Bulk delete:
@router.delete("/tasks/bulk")
def bulk_delete_tasks(bulk_data: BulkTaskIds, ...):
deleted = db.query(Task).filter(
Task.id.in_(bulk_data.task_ids),
Task.owner_id == current_user.id
).delete(synchronize_session=False)
db.commit()
return {
"deleted": deleted,
"message": f"{deleted} tasks deleted"
}
SQL:
DELETE FROM tasks
WHERE id IN ('uuid1', 'uuid2', 'uuid3')
AND owner_id = 'user-uuid'
CASCADE in action:
DELETE FROM tasks WHERE id = 'uuid1'
↓ CASCADE
DELETE FROM task_tags WHERE task_id = 'uuid1'
All associations cleaned up automatically!
Testing the Features
Test 1: Create Tags
curl -X POST "http://localhost:8000/tags" \
-H "Authorization: Bearer TOKEN" \
-H "Content-Type: application/json" \
-d '{"name": "urgent", "color": "#FF0000"}'
curl -X POST "http://localhost:8000/tags" \
-H "Authorization: Bearer TOKEN" \
-H "Content-Type: application/json" \
-d '{"name": "work", "color": "#0000FF"}'
Test 2: Create Task with Tags
curl -X POST "http://localhost:8000/tasks" \
-H "Authorization: Bearer TOKEN" \
-H "Content-Type: application/json" \
-d '{
"title": "Team Meeting",
"tag_ids": ["TAG_UUID_1", "TAG_UUID_2"]
}'
Test 3: Filter by Tags
# All urgent tasks
curl -H "Authorization: Bearer TOKEN" \
"http://localhost:8000/tasks?tag_name=urgent"
# Urgent AND incomplete
curl -H "Authorization: Bearer TOKEN" \
"http://localhost:8000/tasks?tag_name=urgent&completed=false"
Test 4: Bulk Complete
curl -X POST "http://localhost:8000/tasks/bulk/complete" \
-H "Authorization: Bearer TOKEN" \
-H "Content-Type: application/json" \
-d '{"task_ids": ["UUID1", "UUID2", "UUID3"]}'
Response:
{
"updated": 3,
"message": "3 tasks marked complete"
}
Real-World Impact
Before tagging:
- Tasks in flat list
- No organization
- Hard to find related tasks
After tagging:
- Flexible organization
- Filter by project, priority, context
- Visual coding with colors
- Power user features (bulk ops)
Example workflows:
# Show all urgent client tasks
GET /tasks?tag_name=urgent&tag_name=client
# Complete all daily standup tasks
POST /tasks/bulk/complete
Body: {task_ids: [all standup task IDs]}
# Find all bugs
GET /tasks?tag_name=bug&completed=false
Performance Considerations
Junction table indexes:
# Automatically indexed (part of primary key)
- (task_id, tag_id)
# Should also index individually for reverse lookups
Index("idx_task_tags_task", "task_id")
Index("idx_task_tags_tag", "tag_id")
Query performance:
Without indexes:
SELECT tasks.* FROM tasks
JOIN task_tags ON ...
WHERE tag_id = 'uuid'
-- Scans entire task_tags table (~10,000 rows: 500ms)
With indexes:
-- Same query, uses index
-- (~10,000 rows: 50ms)
10x faster with proper indexes!
What I Learned
Technical Skills
✅ Many-to-many relationship implementation
✅ Association tables (junction tables)
✅ Composite primary keys
✅ SQLAlchemy secondary parameter
✅ Relationship navigation (bidirectional)
✅ .any() for filtering relationships
✅ Bulk update/delete operations
✅ CASCADE behavior and cleanup
Database Design
✅ When to use junction tables
✅ Composite unique constraints
✅ Index optimization for associations
✅ Preventing orphaned records
API Design
✅ Bulk operation endpoints
✅ Tag-based filtering
✅ Flexible organization systems
✅ Power user features
Common Mistakes I Avoided
Mistake 1: Forgetting CASCADE
# Bad - orphaned associations
ForeignKey("tasks.id")
# Good - clean cascade
ForeignKey("tasks.id", ondelete="CASCADE")
Mistake 2: Not Validating Ownership
# Bad - user can add anyone's tags
tags = db.query(Tag).filter(Tag.id.in_(tag_ids)).all()
# Good - only user's tags
tags = db.query(Tag).filter(
Tag.id.in_(tag_ids),
Tag.owner_id == current_user.id
).all()
Mistake 3: Duplicate Tag Names
# Bad - no uniqueness check
Tag(name="urgent") # User creates
Tag(name="urgent") # User creates again - confusing!
# Good - unique constraint
Index("idx_owner_tag", "owner_id", "name", unique=True)
Mistake 4: N Loop Updates
# Bad - 50 queries
for task_id in task_ids:
task = db.query(Task).get(task_id)
task.completed = True
db.commit()
# Good - 1 query
db.query(Task).filter(Task.id.in_(task_ids)).update({...})
Tomorrow's Plans
Day 6 will add:
- Due dates and reminders
- Task priorities (high, medium, low)
- Recurring tasks
- Calendar view support
But today? Today I celebrate mastering many-to-many relationships - one of the most important database patterns! 🎉
Resources
Full Code
Complete code on GitHub:
100-days-of-python/days/005
Time Investment: 3 hours
Knowledge Gained: Advanced relationship patterns
Feeling: Like I understand how real databases work 🗄️
Day 5/100 complete! ✅
Tomorrow: Due dates, priorities, and scheduling
Following my journey?
📝 Blog | 🐦 Twitter | 💼 LinkedIn
Tags: #100DaysOfCode #Python #FastAPI #PostgreSQL #SQLAlchemy #ManyToMany #JunctionTables #BulkOperations #DatabaseDesign #LearningInPublic
Top comments (0)