Ever been worried about bots scraping your data, attackers brute-forcing logins, or your platform getting hit with a sudden spike in expensive operations? Without proper protection, a simple DDoS attack or bot script can cost you time, resources, and even thousands in third-party service fees (like SMS). Let me show you how to implement a thread-safe, high-performance rate limiter using Python, FastAPI, and Redis.
The Concept
Rate Limiting: Allow only X requests per Y seconds per user.
For example: 100 requests per 60 seconds
Why Redis?
Fast: Stores data in memory, allowing for near-instantaneous read/write operations critical for low-latency APIs.
Automatic Windowing: The EXPIRE
command lets us define a "time window" (e.g., 60 seconds) after which the counter is automatically cleared, saving manual cleanup code.
Atomicity (Thread-Safety): Redis allows us to perform the check and increment simultaneously using commands like INCR
. This prevents race conditions in high-concurrency environments, ensuring your limit is never accidentally exceeded.
How It Works (The Atomic Solution)
Our implementation avoids the concurrency issues of a simple GET → CHECK → INCR
pattern. Instead, we perform the increment and limit check atomically:
Atomic Increment (
r.incr
): The request immediately increments the counter. We read the new value of the counter in a single, safe operation.Set Expiration (
r.expire
): Only if the counter's new value is1
(meaning a new window just started), we set the 60-second expiration. This prevents the window from resetting on every subsequent request.Limit Check: We compare the new counter value against our
RATE_LIMIT_COUNT
(100).Block and Report: If the user is over the limit, we use
r.ttl
to tell the user exactly how many seconds they need to wait, which is a great UX practice.
from fastapi import FastAPI, HTTPException, Depends
import redis
from pydantic import BaseModel
app = FastAPI()
def get_redis():
return redis.Redis(host='localhost', port=6379, decode_responses=True)
class DataResponse(BaseModel):
message: str
requests_left: int
RATE_LIMIT_COUNT = 100
RATE_LIMIT_WINDOW_SECONDS = 60
@app.get("/api/data", response_model=DataResponse)
def get_data(r: redis.Redis = Depends(get_redis)) -> DataResponse:
user_id = "user_123"
key = f"rate_limit:{user_id}"
# individually increment the counter. r.incr() returns the new value
try:
current_count = r.incr(key)
except redis.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Rate limiting service unavailable.")
# set the key expiration aka the time window, only if it's the first request
# this prevents resetting the window on every request
if current_count == 1:
r.expire(key, RATE_LIMIT_WINDOW_SECONDS)
if current_count > RATE_LIMIT_COUNT:
ttl = r.ttl(key)
raise HTTPException(
status_code=429,
detail=f"Too many requests! Wait {ttl} seconds.",
headers={"Retry-After": str(ttl)}
)
requests_left = RATE_LIMIT_COUNT - current_count
return DataResponse(message="Success!", requests_left=requests_left)
Why This Pattern Works
Atomic operations: r.incr()
is atomic, preventing race conditions
Memory efficient: Redis automatically cleans up expired keys
Scalable: Works across multiple app servers since Redis is centralized
Simple: No complex algorithms, just increment and check
Conclusion
This simple pattern provides a powerful, high-performance defense layer for your applications. By leveraging Redis's atomic INCR
operation, we've built a rate limiter that is both fast and thread-safe-crucial for modern web services.
Have you implemented rate limiting differently? Drop your approach in the comments!
Top comments (0)