DEV Community

Aris Georgatos
Aris Georgatos

Posted on

How to Build a Thread-Safe Rate Limiter with FastAPI and Atomic Redis

Ever been worried about bots scraping your data, attackers brute-forcing logins, or your platform getting hit with a sudden spike in expensive operations? Without proper protection, a simple DDoS attack or bot script can cost you time, resources, and even thousands in third-party service fees (like SMS). Let me show you how to implement a thread-safe, high-performance rate limiter using Python, FastAPI, and Redis.

The Concept

Rate Limiting: Allow only X requests per Y seconds per user.

For example: 100 requests per 60 seconds

Why Redis?

Fast: Stores data in memory, allowing for near-instantaneous read/write operations critical for low-latency APIs.

Automatic Windowing: The EXPIRE command lets us define a "time window" (e.g., 60 seconds) after which the counter is automatically cleared, saving manual cleanup code.

Atomicity (Thread-Safety): Redis allows us to perform the check and increment simultaneously using commands like INCR. This prevents race conditions in high-concurrency environments, ensuring your limit is never accidentally exceeded.

How It Works (The Atomic Solution)

Our implementation avoids the concurrency issues of a simple GET → CHECK → INCR pattern. Instead, we perform the increment and limit check atomically:

  1. Atomic Increment (r.incr): The request immediately increments the counter. We read the new value of the counter in a single, safe operation.

  2. Set Expiration (r.expire): Only if the counter's new value is 1 (meaning a new window just started), we set the 60-second expiration. This prevents the window from resetting on every subsequent request.

  3. Limit Check: We compare the new counter value against our RATE_LIMIT_COUNT (100).

  4. Block and Report: If the user is over the limit, we use r.ttl to tell the user exactly how many seconds they need to wait, which is a great UX practice.

from fastapi import FastAPI, HTTPException, Depends
import redis
from pydantic import BaseModel

app = FastAPI()

def get_redis():
    return redis.Redis(host='localhost', port=6379, decode_responses=True)

class DataResponse(BaseModel):
    message: str
    requests_left: int

RATE_LIMIT_COUNT = 100
RATE_LIMIT_WINDOW_SECONDS = 60

@app.get("/api/data", response_model=DataResponse)
def get_data(r: redis.Redis = Depends(get_redis)) -> DataResponse:
    user_id = "user_123"
    key = f"rate_limit:{user_id}"

    # individually increment the counter. r.incr() returns the new value
    try:
        current_count = r.incr(key)
    except redis.exceptions.ConnectionError:
        raise HTTPException(status_code=503, detail="Rate limiting service unavailable.")

    # set the key expiration aka the time window, only if it's the first request
    # this prevents resetting the window on every request
    if current_count == 1:
        r.expire(key, RATE_LIMIT_WINDOW_SECONDS)

    if current_count > RATE_LIMIT_COUNT:
        ttl = r.ttl(key)
        raise HTTPException(
            status_code=429, 
            detail=f"Too many requests! Wait {ttl} seconds.",
            headers={"Retry-After": str(ttl)}
        )

    requests_left = RATE_LIMIT_COUNT - current_count
    return DataResponse(message="Success!", requests_left=requests_left)

Enter fullscreen mode Exit fullscreen mode

Why This Pattern Works

Atomic operations: r.incr() is atomic, preventing race conditions

Memory efficient: Redis automatically cleans up expired keys

Scalable: Works across multiple app servers since Redis is centralized

Simple: No complex algorithms, just increment and check

Conclusion

This simple pattern provides a powerful, high-performance defense layer for your applications. By leveraging Redis's atomic INCR operation, we've built a rate limiter that is both fast and thread-safe-crucial for modern web services.


Have you implemented rate limiting differently? Drop your approach in the comments!

Top comments (0)