DEV Community

archi-jain
archi-jain

Posted on

Day 5/100: Many-to-Many Relationships - Tags, Associations, and Bulk Operations

Part of my 100 Days of Code journey. Today we tackle one of the most important database patterns.

The Challenge

Implement a tagging system with many-to-many relationships and add bulk operations for efficiency.

The Problem: Tasks need flexible organization. Fixed categories don't work because:

  • One task can belong to multiple categories (urgent AND work AND client-facing)
  • Categories can have multiple tasks
  • Users want custom labels, not predefined options

The Solution: Many-to-many relationships via junction tables, plus bulk operations for power users.

Why Many-to-Many Matters

One-to-Many relationships (what we've used so far):

User ──┬─→ Task 1
       ├─→ Task 2
       └─→ Task 3
Enter fullscreen mode Exit fullscreen mode

Each task has ONE user. Simple!

Many-to-Many relationships (what we need):

Task 1 ──┬─→ Tag: urgent
         └─→ Tag: work

Task 2 ──┬─→ Tag: urgent
         ├─→ Tag: personal
         └─→ Tag: home

Tag: urgent ──┬─→ Task 1
              └─→ Task 2
Enter fullscreen mode Exit fullscreen mode

Problem: Can't use a simple foreign key!

If we add tag_id to tasks table:

  • ❌ Task can only have ONE tag
  • ❌ Need multiple tag_id columns (tag_id_1, tag_id_2...) - terrible design!

Solution: Association table (junction table)

Understanding Association Tables

An association table (also called junction table or link table) connects two tables in a many-to-many relationship.

Three tables:

tasks table:
| id | title | owner_id |
|----|-------|----------|
| 1 | Meeting | user-a |
| 2 | Report | user-a |
| 3 | Email | user-a |

tags table:
| id | name | color | owner_id |
|----|------|-------|----------|
| A | urgent | #FF0000 | user-a |
| B | work | #0000FF | user-a |
| C | personal | #00FF00 | user-a |

task_tags table (association):
| task_id | tag_id |
|---------|--------|
| 1 | A |
| 1 | B |
| 2 | A |
| 3 | C |

What this means:

  • Task 1 (Meeting) has tags: urgent, work
  • Task 2 (Report) has tags: urgent
  • Task 3 (Email) has tags: personal
  • Tag A (urgent) is on: Task 1, Task 2
  • Tag B (work) is on: Task 1

Perfect flexibility!

Step-by-Step Implementation

Step 1: Creating the Association Table

from sqlalchemy import Table, Column, ForeignKey
from sqlalchemy.dialects.postgresql import UUID

task_tags = Table(
    "task_tags",
    Base.metadata,
    Column("task_id", UUID(as_uuid=True), ForeignKey("tasks.id", ondelete="CASCADE"), primary_key=True),
    Column("tag_id", UUID(as_uuid=True), ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)
Enter fullscreen mode Exit fullscreen mode

Breaking it down:

Why Table() instead of a class?

# Simple association (just links)
task_tags = Table(...)  # Recommended!

# Complex association (with extra fields like created_at, priority)
class TaskTag(Base):  # Use when needed
    __tablename__ = "task_tags"
    task_id = Column(...)
    tag_id = Column(...)
    priority = Column(Integer)
    created_at = Column(DateTime)
Enter fullscreen mode Exit fullscreen mode

For pure many-to-many (just linking), Table() is cleaner.

Composite primary key:

primary_key=True  # On task_id
primary_key=True  # On tag_id
Enter fullscreen mode Exit fullscreen mode

Both together form the primary key! This means:

  • ✅ Can't add same tag twice to same task
  • ✅ (task_id=1, tag_id=A) can exist once
  • ✅ (task_id=1, tag_id=B) is different, allowed

CASCADE deletes:

ForeignKey("tasks.id", ondelete="CASCADE")
Enter fullscreen mode Exit fullscreen mode

What happens:

  1. Task deleted → All task_tags entries with that task_id deleted
  2. Tag deleted → All task_tags entries with that tag_id deleted
  3. No orphaned associations!

Without CASCADE:

Task 1 deleted
task_tags still has: (task_id=1, tag_id=A)  # Orphan!
Next query: "Show tasks with tag A"
Result: Error! task_id=1 doesn't exist!
Enter fullscreen mode Exit fullscreen mode

With CASCADE:

Task 1 deleted
Database automatically deletes: (task_id=1, tag_id=A), (task_id=1, tag_id=B)
Clean! No orphans!
Enter fullscreen mode Exit fullscreen mode

Step 2: Creating the Tag Model

class Tag(Base):
    __tablename__ = "tags"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(50), nullable=False)
    color = Column(String(7))  # Hex color: #RRGGBB
    created_at = Column(DateTime, default=datetime.utcnow)
    owner_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"))

    # Relationships
    owner = relationship("User", back_populates="tags")
    tasks = relationship("Task", secondary=task_tags, back_populates="tags")

    # Unique constraint: user can't have duplicate tag names
    __table_args__ = (
        Index("idx_owner_tag", "owner_id", "name", unique=True),
    )
Enter fullscreen mode Exit fullscreen mode

Understanding the relationship:

tasks = relationship("Task", secondary=task_tags, back_populates="tags")
Enter fullscreen mode Exit fullscreen mode
  • "Task": Links to Task model
  • secondary=task_tags: Use task_tags as junction table
  • back_populates="tags": Links to tags field in Task model

The magic:

tag = db.query(Tag).first()
print(tag.tasks)  # [Task(...), Task(...)]

# SQLAlchemy auto-generates SQL:
# SELECT tasks.* 
# FROM tasks 
# JOIN task_tags ON tasks.id = task_tags.task_id
# WHERE task_tags.tag_id = 'tag-uuid'
Enter fullscreen mode Exit fullscreen mode

We didn't write that JOIN! SQLAlchemy did!

Composite unique constraint:

Index("idx_owner_tag", "owner_id", "name", unique=True)
Enter fullscreen mode Exit fullscreen mode

What it prevents:

User A creates tag "urgent" ✅
User A creates tag "urgent" again ❌ Error: duplicate
User B creates tag "urgent" ✅ Different user, allowed!
Enter fullscreen mode Exit fullscreen mode

Step 3: Updating the Task Model

class Task(Base):
    __tablename__ = "tasks"

    # ... existing fields ...

    # Relationships
    owner = relationship("User", back_populates="tasks")
    tags = relationship("Tag", secondary=task_tags, back_populates="tasks")
Enter fullscreen mode Exit fullscreen mode

Bidirectional relationship:

From Task:

task = db.query(Task).first()
print(task.tags)  # [Tag(name="urgent"), Tag(name="work")]
Enter fullscreen mode Exit fullscreen mode

From Tag:

tag = db.query(Tag).filter(Tag.name == "urgent").first()
print(tag.tasks)  # [Task(title="Meeting"), Task(title="Report")]
Enter fullscreen mode Exit fullscreen mode

Both directions work!

Step 4: Creating Tags

@router.post("/tags", response_model=TagResponse)
def create_tag(tag: TagCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
    # Check if tag already exists for this user
    existing = db.query(Tag).filter(
        Tag.owner_id == current_user.id,
        Tag.name == tag.name
    ).first()

    if existing:
        raise HTTPException(400, "Tag already exists")

    new_tag = Tag(
        name=tag.name,
        color=tag.color,
        owner_id=current_user.id
    )

    db.add(new_tag)
    db.commit()
    db.refresh(new_tag)

    return new_tag
Enter fullscreen mode Exit fullscreen mode

Why check for existing tag?

Without check:

POST /tags {"name": "urgent", "color": "#FF0000"}  ✅
POST /tags {"name": "urgent", "color": "#0000FF"}  💥 Database error!
Enter fullscreen mode Exit fullscreen mode

Database throws error because of unique constraint, but:

  • Error message is cryptic
  • HTTP 500 instead of 400
  • Bad user experience

With check:

POST /tags {"name": "urgent"}  ✅
POST /tags {"name": "urgent"}  ✅ 400 Bad Request "Tag already exists"
Enter fullscreen mode Exit fullscreen mode

Clean error handling!

Step 5: Creating Tasks with Tags

@router.post("/tasks", response_model=TaskResponse)
def create_task(task: TaskCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
    new_task = Task(
        title=task.title,
        description=task.description,
        owner_id=current_user.id
    )

    # Add tags if provided
    if task.tag_ids:
        tags = db.query(Tag).filter(
            Tag.id.in_(task.tag_ids),
            Tag.owner_id == current_user.id
        ).all()

        if len(tags) != len(task.tag_ids):
            raise HTTPException(400, "One or more tags not found")

        new_task.tags = tags

    db.add(new_task)
    db.commit()
    db.refresh(new_task)

    return new_task
Enter fullscreen mode Exit fullscreen mode

Understanding id.in_():

Tag.id.in_(["uuid1", "uuid2", "uuid3"])
Enter fullscreen mode Exit fullscreen mode

SQL generated:

SELECT * FROM tags 
WHERE id IN ('uuid1', 'uuid2', 'uuid3')
  AND owner_id = 'current-user-uuid'
Enter fullscreen mode Exit fullscreen mode

One query gets all tags!

Setting tags:

new_task.tags = tags
Enter fullscreen mode Exit fullscreen mode

What SQLAlchemy does:

  1. Creates task in tasks table
  2. For each tag, creates entry in task_tags table:
    • INSERT INTO task_tags (task_id, tag_id) VALUES (new_task.id, tag.id)

All automatic!

Validation:

if len(tags) != len(task.tag_ids):
    raise HTTPException(400, "One or more tags not found")
Enter fullscreen mode Exit fullscreen mode

If user sends 3 tag IDs but only 2 exist → error!

Step 6: Filtering Tasks by Tags

@router.get("/tasks")
def get_tasks(
    tag_id: Optional[UUID] = Query(None),
    tag_name: Optional[str] = Query(None),
    ...
):
    query = db.query(Task).filter(Task.owner_id == current_user.id)

    # Filter by tag ID
    if tag_id:
        query = query.filter(Task.tags.any(Tag.id == tag_id))

    # Filter by tag name
    if tag_name:
        query = query.filter(Task.tags.any(Tag.name == tag_name.lower()))

    return query.all()
Enter fullscreen mode Exit fullscreen mode

Understanding .any():

Task.tags.any(Tag.id == tag_id)
Enter fullscreen mode Exit fullscreen mode

Translation: "Get tasks where ANY of their tags has this ID"

SQL generated:

SELECT tasks.* 
FROM tasks
WHERE tasks.owner_id = 'user-uuid'
  AND EXISTS (
    SELECT 1 
    FROM task_tags 
    JOIN tags ON tags.id = task_tags.tag_id
    WHERE task_tags.task_id = tasks.id
      AND tags.id = 'tag-uuid'
  )
Enter fullscreen mode Exit fullscreen mode

Example:

# Get all tasks tagged "urgent"
GET /tasks?tag_name=urgent

# Get all urgent tasks that are incomplete
GET /tasks?tag_name=urgent&completed=false

# Get all work tasks, sorted by date
GET /tasks?tag_name=work&sort_by=created_at&order=desc
Enter fullscreen mode Exit fullscreen mode

Filters stack beautifully!

Step 7: Adding/Removing Tags from Tasks

Add tags:

@router.post("/tasks/{task_id}/tags")
def add_tags_to_task(task_id: UUID, tag_data: BulkUpdateTags, ...):
    task = db.query(Task).filter(
        Task.id == task_id,
        Task.owner_id == current_user.id
    ).first()

    if not task:
        raise HTTPException(404, "Task not found")

    tags = db.query(Tag).filter(
        Tag.id.in_(tag_data.tag_ids),
        Tag.owner_id == current_user.id
    ).all()

    for tag in tags:
        if tag not in task.tags:
            task.tags.append(tag)

    db.commit()
    db.refresh(task)

    return task
Enter fullscreen mode Exit fullscreen mode

Why check if tag not in task.tags?

Without check:

task.tags.append(tag)  # Adds tag
task.tags.append(tag)  # Adds again!
# Database error: duplicate primary key (task_id, tag_id)
Enter fullscreen mode Exit fullscreen mode

With check:

if tag not in task.tags:
    task.tags.append(tag)  # Safe!
Enter fullscreen mode Exit fullscreen mode

Remove tag:

@router.delete("/tasks/{task_id}/tags/{tag_id}")
def remove_tag_from_task(task_id: UUID, tag_id: UUID, ...):
    task = ...
    tag = ...

    if tag in task.tags:
        task.tags.remove(tag)
        db.commit()

    return task
Enter fullscreen mode Exit fullscreen mode

What remove() does:

task.tags.remove(tag)
Enter fullscreen mode Exit fullscreen mode

SQLAlchemy:

DELETE FROM task_tags 
WHERE task_id = 'task-uuid' 
  AND tag_id = 'tag-uuid'
Enter fullscreen mode Exit fullscreen mode

Association removed!

Step 8: Bulk Operations

Bulk complete:

@router.post("/tasks/bulk/complete")
def bulk_complete_tasks(bulk_data: BulkTaskIds, ...):
    updated = db.query(Task).filter(
        Task.id.in_(bulk_data.task_ids),
        Task.owner_id == current_user.id
    ).update({"completed": True}, synchronize_session=False)

    db.commit()

    return {
        "updated": updated,
        "message": f"{updated} tasks marked complete"
    }
Enter fullscreen mode Exit fullscreen mode

Why bulk operations matter:

Without bulk (N requests):

// Frontend makes 50 API calls
taskIds.forEach(id => {
  fetch(`/tasks/${id}`, {
    method: 'PATCH',
    body: JSON.stringify({completed: true})
  })
})

// Result:
// - 50 HTTP requests
// - 50 SQL queries
// - ~5 seconds total
Enter fullscreen mode Exit fullscreen mode

With bulk (1 request):

// Frontend makes 1 API call
fetch('/tasks/bulk/complete', {
  method: 'POST',
  body: JSON.stringify({task_ids: taskIds})
})

// Result:
// - 1 HTTP request
// - 1 SQL query
// - ~0.1 seconds total
Enter fullscreen mode Exit fullscreen mode

50x faster!

The SQL:

UPDATE tasks 
SET completed = true 
WHERE id IN ('uuid1', 'uuid2', ..., 'uuid50')
  AND owner_id = 'user-uuid'
Enter fullscreen mode Exit fullscreen mode

One query updates 50 tasks!

synchronize_session=False:

.update({...}, synchronize_session=False)
Enter fullscreen mode Exit fullscreen mode

What it means:

  • We're doing bulk update
  • Don't update ORM objects in memory
  • Faster (we don't need those objects anyway)

Use when you don't need the updated objects returned.

Bulk delete:

@router.delete("/tasks/bulk")
def bulk_delete_tasks(bulk_data: BulkTaskIds, ...):
    deleted = db.query(Task).filter(
        Task.id.in_(bulk_data.task_ids),
        Task.owner_id == current_user.id
    ).delete(synchronize_session=False)

    db.commit()

    return {
        "deleted": deleted,
        "message": f"{deleted} tasks deleted"
    }
Enter fullscreen mode Exit fullscreen mode

SQL:

DELETE FROM tasks 
WHERE id IN ('uuid1', 'uuid2', 'uuid3')
  AND owner_id = 'user-uuid'
Enter fullscreen mode Exit fullscreen mode

CASCADE in action:

DELETE FROM tasks WHERE id = 'uuid1'
  ↓ CASCADE
DELETE FROM task_tags WHERE task_id = 'uuid1'
Enter fullscreen mode Exit fullscreen mode

All associations cleaned up automatically!

Testing the Features

Test 1: Create Tags

curl -X POST "http://localhost:8000/tags" \
  -H "Authorization: Bearer TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"name": "urgent", "color": "#FF0000"}'

curl -X POST "http://localhost:8000/tags" \
  -H "Authorization: Bearer TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"name": "work", "color": "#0000FF"}'
Enter fullscreen mode Exit fullscreen mode

Test 2: Create Task with Tags

curl -X POST "http://localhost:8000/tasks" \
  -H "Authorization: Bearer TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "title": "Team Meeting",
    "tag_ids": ["TAG_UUID_1", "TAG_UUID_2"]
  }'
Enter fullscreen mode Exit fullscreen mode

Test 3: Filter by Tags

# All urgent tasks
curl -H "Authorization: Bearer TOKEN" \
  "http://localhost:8000/tasks?tag_name=urgent"

# Urgent AND incomplete
curl -H "Authorization: Bearer TOKEN" \
  "http://localhost:8000/tasks?tag_name=urgent&completed=false"
Enter fullscreen mode Exit fullscreen mode

Test 4: Bulk Complete

curl -X POST "http://localhost:8000/tasks/bulk/complete" \
  -H "Authorization: Bearer TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"task_ids": ["UUID1", "UUID2", "UUID3"]}'
Enter fullscreen mode Exit fullscreen mode

Response:

{
  "updated": 3,
  "message": "3 tasks marked complete"
}
Enter fullscreen mode Exit fullscreen mode

Real-World Impact

Before tagging:

  • Tasks in flat list
  • No organization
  • Hard to find related tasks

After tagging:

  • Flexible organization
  • Filter by project, priority, context
  • Visual coding with colors
  • Power user features (bulk ops)

Example workflows:

# Show all urgent client tasks
GET /tasks?tag_name=urgent&tag_name=client

# Complete all daily standup tasks
POST /tasks/bulk/complete
Body: {task_ids: [all standup task IDs]}

# Find all bugs
GET /tasks?tag_name=bug&completed=false
Enter fullscreen mode Exit fullscreen mode

Performance Considerations

Junction table indexes:

# Automatically indexed (part of primary key)
- (task_id, tag_id)

# Should also index individually for reverse lookups
Index("idx_task_tags_task", "task_id")
Index("idx_task_tags_tag", "tag_id")
Enter fullscreen mode Exit fullscreen mode

Query performance:

Without indexes:

SELECT tasks.* FROM tasks
JOIN task_tags ON ...
WHERE tag_id = 'uuid'
-- Scans entire task_tags table (~10,000 rows: 500ms)
Enter fullscreen mode Exit fullscreen mode

With indexes:

-- Same query, uses index
-- (~10,000 rows: 50ms)
Enter fullscreen mode Exit fullscreen mode

10x faster with proper indexes!

What I Learned

Technical Skills

✅ Many-to-many relationship implementation

✅ Association tables (junction tables)

✅ Composite primary keys

✅ SQLAlchemy secondary parameter

✅ Relationship navigation (bidirectional)

.any() for filtering relationships

✅ Bulk update/delete operations

✅ CASCADE behavior and cleanup

Database Design

✅ When to use junction tables

✅ Composite unique constraints

✅ Index optimization for associations

✅ Preventing orphaned records

API Design

✅ Bulk operation endpoints

✅ Tag-based filtering

✅ Flexible organization systems

✅ Power user features

Common Mistakes I Avoided

Mistake 1: Forgetting CASCADE

# Bad - orphaned associations
ForeignKey("tasks.id")

# Good - clean cascade
ForeignKey("tasks.id", ondelete="CASCADE")
Enter fullscreen mode Exit fullscreen mode

Mistake 2: Not Validating Ownership

# Bad - user can add anyone's tags
tags = db.query(Tag).filter(Tag.id.in_(tag_ids)).all()

# Good - only user's tags
tags = db.query(Tag).filter(
    Tag.id.in_(tag_ids),
    Tag.owner_id == current_user.id
).all()
Enter fullscreen mode Exit fullscreen mode

Mistake 3: Duplicate Tag Names

# Bad - no uniqueness check
Tag(name="urgent")  # User creates
Tag(name="urgent")  # User creates again - confusing!

# Good - unique constraint
Index("idx_owner_tag", "owner_id", "name", unique=True)
Enter fullscreen mode Exit fullscreen mode

Mistake 4: N Loop Updates

# Bad - 50 queries
for task_id in task_ids:
    task = db.query(Task).get(task_id)
    task.completed = True
    db.commit()

# Good - 1 query
db.query(Task).filter(Task.id.in_(task_ids)).update({...})
Enter fullscreen mode Exit fullscreen mode

Tomorrow's Plans

Day 6 will add:

  • Due dates and reminders
  • Task priorities (high, medium, low)
  • Recurring tasks
  • Calendar view support

But today? Today I celebrate mastering many-to-many relationships - one of the most important database patterns! 🎉

Resources

Full Code

Complete code on GitHub:
100-days-of-python/days/005


Time Investment: 3 hours

Knowledge Gained: Advanced relationship patterns

Feeling: Like I understand how real databases work 🗄️

Day 5/100 complete!

Tomorrow: Due dates, priorities, and scheduling


Following my journey?

📝 Blog | 🐦 Twitter | 💼 LinkedIn

Tags: #100DaysOfCode #Python #FastAPI #PostgreSQL #SQLAlchemy #ManyToMany #JunctionTables #BulkOperations #DatabaseDesign #LearningInPublic

Top comments (0)