In this tutorial, we'll create a scalable task management API using Apache Cassandra as our NoSQL database and FastAPI for the web framework. Cassandra is an excellent choice for applications requiring high availability, horizontal scalability, and consistent performance with large datasets.
Why Apache Cassandra?
While MongoDB and Redis are popular NoSQL choices, Cassandra offers unique benefits:
- Linear scalability with no single point of failure
- Tunable consistency levels
- Optimized for write-heavy workloads
- Excellent support for time-series data
- Built-in partitioning and replication
Prerequisites
- Python 3.8+
- Docker and Docker Compose
- Basic understanding of REST APIs
- Familiarity with async programming
Project Setup
First, let's create our project structure:
task-manager/
├── docker-compose.yml
├── requirements.txt
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models.py
│ ├── database.py
│ └── routes/
│ └── tasks.py
└── README.md
Let's set up our dependencies in requirements.txt
:
fastapi==0.104.1
uvicorn==0.24.0
cassandra-driver==3.28.0
pydantic==2.4.2
python-dotenv==1.0.0
Create a docker-compose.yml
for Cassandra:
version: '3'
services:
cassandra:
image: cassandra:latest
ports:
- "9042:9042"
environment:
- CASSANDRA_CLUSTER_NAME=TaskManagerCluster
volumes:
- cassandra_data:/var/lib/cassandra
volumes:
cassandra_data:
Database Setup
Let's create our database connection handler in database.py
:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.cqlengine import connection
from contextlib import asynccontextmanager
class CassandraConnection:
def __init__(self):
self.cluster = None
self.session = None
async def connect(self):
self.cluster = Cluster(['localhost'])
self.session = self.cluster.connect()
# Create keyspace if not exists
self.session.execute("""
CREATE KEYSPACE IF NOT EXISTS task_manager
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
""")
self.session.set_keyspace('task_manager')
# Create tasks table
self.session.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id uuid,
title text,
description text,
status text,
due_date timestamp,
created_at timestamp,
updated_at timestamp,
PRIMARY KEY (task_id)
)
""")
return self.session
async def disconnect(self):
if self.cluster:
self.cluster.shutdown()
db = CassandraConnection()
@asynccontextmanager
async def get_db():
try:
session = await db.connect()
yield session
finally:
await db.disconnect()
Models
Define our data models in models.py
:
from datetime import datetime
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
from typing import Optional
class TaskBase(BaseModel):
title: str
description: Optional[str] = None
status: str = "pending"
due_date: Optional[datetime] = None
class TaskCreate(TaskBase):
pass
class Task(TaskBase):
task_id: UUID = Field(default_factory=uuid4)
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
class Config:
from_attributes = True
API Routes
Create our task routes in routes/tasks.py
:
from fastapi import APIRouter, Depends, HTTPException
from cassandra.cluster import Session
from datetime import datetime
import uuid
from typing import List
from ..models import Task, TaskCreate
from ..database import get_db
router = APIRouter()
@router.post("/tasks/", response_model=Task)
async def create_task(task: TaskCreate, db: Session = Depends(get_db)):
task_id = uuid.uuid4()
now = datetime.now()
query = """
INSERT INTO tasks (task_id, title, description, status, due_date, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
db.execute(query, (
task_id,
task.title,
task.description,
task.status,
task.due_date,
now,
now
))
return Task(
task_id=task_id,
title=task.title,
description=task.description,
status=task.status,
due_date=task.due_date,
created_at=now,
updated_at=now
)
@router.get("/tasks/{task_id}", response_model=Task)
async def read_task(task_id: uuid.UUID, db: Session = Depends(get_db)):
query = "SELECT * FROM tasks WHERE task_id = %s"
result = db.execute(query, [task_id]).one()
if not result:
raise HTTPException(status_code=404, detail="Task not found")
return Task(**result)
@router.get("/tasks/", response_model=List[Task])
async def list_tasks(db: Session = Depends(get_db)):
query = "SELECT * FROM tasks"
results = db.execute(query)
return [Task(**row) for row in results]
@router.put("/tasks/{task_id}", response_model=Task)
async def update_task(task_id: uuid.UUID, task: TaskCreate, db: Session = Depends(get_db)):
# Check if task exists
exists_query = "SELECT * FROM tasks WHERE task_id = %s"
existing_task = db.execute(exists_query, [task_id]).one()
if not existing_task:
raise HTTPException(status_code=404, detail="Task not found")
update_query = """
UPDATE tasks
SET title = %s, description = %s, status = %s, due_date = %s, updated_at = %s
WHERE task_id = %s
"""
now = datetime.now()
db.execute(update_query, (
task.title,
task.description,
task.status,
task.due_date,
now,
task_id
))
return Task(
task_id=task_id,
title=task.title,
description=task.description,
status=task.status,
due_date=task.due_date,
created_at=existing_task.created_at,
updated_at=now
)
@router.delete("/tasks/{task_id}")
async def delete_task(task_id: uuid.UUID, db: Session = Depends(get_db)):
# Check if task exists
exists_query = "SELECT * FROM tasks WHERE task_id = %s"
existing_task = db.execute(exists_query, [task_id]).one()
if not existing_task:
raise HTTPException(status_code=404, detail="Task not found")
query = "DELETE FROM tasks WHERE task_id = %s"
db.execute(query, [task_id])
return {"message": "Task deleted successfully"}
Main Application
Set up our FastAPI application in main.py
:
from fastapi import FastAPI
from .routes import tasks
app = FastAPI(title="Task Manager API")
app.include_router(tasks.router, tags=["tasks"])
@app.get("/")
async def root():
return {"message": "Welcome to Task Manager API"}
Running the Application
- Start Cassandra:
docker-compose up -d
- Install dependencies:
pip install -r requirements.txt
- Run the API:
uvicorn app.main:app --reload
Testing the API
Here are some example curl commands to test our API:
# Create a task
curl -X POST "http://localhost:8000/tasks/" \
-H "Content-Type: application/json" \
-d '{"title":"Complete project","description":"Finish the API implementation","status":"pending"}'
# List all tasks
curl "http://localhost:8000/tasks/"
# Get a specific task
curl "http://localhost:8000/tasks/{task_id}"
# Update a task
curl -X PUT "http://localhost:8000/tasks/{task_id}" \
-H "Content-Type: application/json" \
-d '{"title":"Complete project","description":"API implementation done","status":"completed"}'
# Delete a task
curl -X DELETE "http://localhost:8000/tasks/{task_id}"
Performance Considerations
When working with Cassandra, keep in mind:
- Data Modeling: Design tables around your query patterns
- Partition Keys: Choose partition keys that distribute data evenly
- Consistency Levels: Configure based on your use case
- Batch Operations: Use them sparingly and only for related data
- Secondary Indexes: Use them carefully as they can impact performance
Conclusion
This implementation demonstrates how to build a robust task management API using Apache Cassandra. The combination of Cassandra's scalability and FastAPI's performance makes this stack suitable for production applications requiring high availability and consistency.
Future enhancements could include:
- Authentication and authorization
- More complex querying capabilities
- Batch operations
- Advanced task filtering
- Automated testing
The complete source code is available on GitHub: [Link to your repository]
Top comments (0)