When we launched Seendr — a video chat application that connects strangers based on shared interests — we ran into a problem we never anticipated.
Not a bug. Not a crash. Slowness.
Every time a user triggered a match search, the server queried PostgreSQL: the user's profile, their interest list, users currently online, language filters, region filters — all in real time. For a single user, this took 800ms to 1.2 seconds. With 500 users simultaneously searching for a match, the server went down.
The solution? Redis. Two days of implementation. Result: matching time dropped to under 80ms, database load reduced by 90%, and the application now handles 5,000 simultaneous connections.
Our stack: Django for the REST API, Django Channels (via Daphne) for WebSockets, Celery for background tasks — and Redis sitting at the center of all three.
This article is what I wish I had read before starting.
Why Redis Became the Core of Seendr
On Seendr, Redis plays three roles simultaneously:
- Application cache — profiles, search results, rankings
- Channel Layer for Django Channels — WebSocket synchronization across instances
- Celery Broker — message queue for async tasks (email sending, cleanup, notifications)
In our docker-compose.yml, Redis is configured like this:
redis:
image: redis:7-alpine
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
ports:
- "6380:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
allkeys-lru automatically removes the least recently used keys when Redis reaches 256MB. This is the optimal eviction policy for an application cache.
Why Caching Is Non-Negotiable in 2026
Teams are smaller. Seendr runs with 3 developers where an equivalent startup would have had 8 people in 2022. Fewer people to optimize queries, monitor performance, and scale infrastructure.
Caching is today the optimization with the best effort-to-impact ratio you can bring to a Django application. A well-placed Redis layer can:
- Reduce PostgreSQL load by 80 to 95%
- Cut your response times by 5 to 10x
- Significantly reduce your cloud bill (every millisecond saved = money on Lambda or serverless)
In Python/Django job postings in 2026, Redis appears in more than 30% of senior backend listings. It is no longer a "nice to have." It is a baseline skill for anyone building applications that hold up in production.
Installing and Configuring Redis in Django
pip install django-redis redis channels channels-redis celery
Configuration in settings.py
# settings/base.py
REDIS_URL = env("REDIS_URL", default="redis://localhost:6379/0")
# ─── Cache ────────────────────────────────────────────────────────────────────
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": REDIS_URL,
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 50},
"COMPRESSOR": "django_redis.compressors.zlib.ZlibCompressor",
},
"KEY_PREFIX": "seendr",
"TIMEOUT": 300, # 5 minutes default
}
}
# ─── Sessions via Redis ────────────────────────────────────────────────────────
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
SESSION_CACHE_ALIAS = "default"
# ─── Django Channels (WebSockets) ─────────────────────────────────────────────
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [REDIS_URL],
"capacity": 1500,
"expiry": 10,
},
}
}
# ─── Celery (async tasks) ─────────────────────────────────────────────────────
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
Three lines of configuration, and Django uses Redis for caching, sessions, and WebSockets.
The Seendr Case: Real-Time Matching Without PostgreSQL
The original problem: every match attempt caused Django to query PostgreSQL to find available users. In real time. For every user. That is exactly what Redis is built to eliminate.
Step 1 — Maintain the Matching Pool in Redis
Instead of running SELECT * FROM users WHERE status = 'available' on every match request, we keep a live list of available users directly in Redis, updated in real time.
# matching/services.py
import json
import time
from django_redis import get_redis_connection
redis_client = get_redis_connection("default")
def add_user_to_pool(user):
"""Add a user to the matching pool."""
key = f"matching:pool:{user.id}"
payload = {
"user_id": str(user.id),
"interests": list(user.interests.values_list("slug", flat=True)),
"language": user.language,
"region": user.region,
"joined_at": int(time.time()),
}
# Store the full profile as a Redis hash
redis_client.hset(
key,
mapping={
k: json.dumps(v) if isinstance(v, list) else v
for k, v in payload.items()
},
)
# TTL of 2 minutes — expires automatically if the user disconnects without notice
redis_client.expire(key, 120)
# Index by language for fast filtering
redis_client.sadd(f"matching:lang:{user.language}", str(user.id))
redis_client.expire(f"matching:lang:{user.language}", 300)
def remove_user_from_pool(user_id, language):
"""Remove a user from the matching pool."""
redis_client.delete(f"matching:pool:{user_id}")
redis_client.srem(f"matching:lang:{language}", str(user_id))
def find_match(seeker_id, interests, language):
"""Find the best match in Redis — without touching PostgreSQL."""
candidates = redis_client.smembers(f"matching:lang:{language}")
best_match = None
best_score = -1
for candidate_id in candidates:
candidate_id = candidate_id.decode()
if candidate_id == str(seeker_id):
continue
data = redis_client.hgetall(f"matching:pool:{candidate_id}")
if not data:
continue
candidate_interests = json.loads(data.get(b"interests", b"[]"))
score = len(set(interests) & set(candidate_interests))
if score > best_score:
best_score = score
best_match = candidate_id
return best_match
Result: across 500 available users, this search executes in under 10ms. Before, the same logic with PostgreSQL took 800ms.
Core Django Caching Patterns with django-redis
Cache-Aside with @cache_page and cache.get/set
Django provides several levels of caching. For full views:
# views/profiles.py
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from rest_framework.viewsets import ReadOnlyModelViewSet
@method_decorator(cache_page(60 * 60), name="dispatch") # 1 hour
class PublicProfileViewSet(ReadOnlyModelViewSet):
"""Public profiles rarely change — cache aggressively."""
queryset = User.objects.select_related("profile").prefetch_related("interests")
serializer_class = PublicProfileSerializer
For finer-grained control:
# services/user_service.py
from django.core.cache import cache
def get_user_profile(user_id: str) -> dict | None:
cache_key = f"profile:{user_id}"
cached = cache.get(cache_key)
if cached is not None:
return cached
try:
user = (
User.objects
.select_related("profile")
.prefetch_related("interests")
.get(id=user_id)
)
data = UserProfileSerializer(user).data
cache.set(cache_key, data, timeout=3600) # 1 hour
return data
except User.DoesNotExist:
return None
def invalidate_user_profile(user_id: str):
"""Invalidate the cache after a profile update."""
cache.delete(f"profile:{user_id}")
A Custom @cache_result Decorator
# utils/cache.py
import functools
import hashlib
import json
from django.core.cache import cache
def cache_result(timeout=300, prefix=""):
"""Decorator to cache a function's return value."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
key_data = json.dumps(
{"args": args, "kwargs": kwargs}, default=str, sort_keys=True
)
cache_key = f"{prefix or func.__name__}:{hashlib.md5(key_data.encode()).hexdigest()}"
result = cache.get(cache_key)
if result is not None:
return result
result = func(*args, **kwargs)
if result is not None:
cache.set(cache_key, result, timeout=timeout)
return result
return wrapper
return decorator
# Usage
@cache_result(timeout=300, prefix="trending_interests")
def get_trending_interests(limit=10):
return list(
InterestMatch.objects
.values("interest__slug")
.annotate(count=Count("id"))
.order_by("-count")[:limit]
)
Cache Warming with Celery Beat
Seendr receives 80% of its traffic between 8pm and 11pm. We pre-warm the cache 30 minutes before peak via a scheduled Celery task.
# tasks/cache_tasks.py
from celery import shared_task
from django.utils import timezone
from datetime import timedelta
@shared_task(name="warm_matching_cache")
def warm_matching_cache():
"""Pre-warm the matching pool before peak traffic."""
recently_active = User.objects.filter(
last_seen__gte=timezone.now() - timedelta(minutes=15),
is_available=True,
).prefetch_related("interests").select_related("profile")
count = 0
for user in recently_active:
add_user_to_pool(user)
count += 1
return f"Cache warmed: {count} users"
In settings.py:
CELERY_BEAT_SCHEDULE = {
"warm-matching-cache": {
"task": "tasks.cache_tasks.warm_matching_cache",
"schedule": crontab(minute="*/5"),
},
}
Django Channels + Redis: WebSockets at Scale
This is where Redis becomes truly indispensable. Django Channels uses Redis as a channel layer to synchronize WebSockets across multiple Daphne instances.
Without Redis, if user A is connected to instance 1 and user B to instance 2, they cannot communicate. With Redis, all instances share the same message bus.
# consumers/matching_consumer.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from matching.services import add_user_to_pool, remove_user_from_pool, find_match
class MatchingConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user = self.scope["user"]
if not self.user.is_authenticated:
await self.close()
return
# Join personal group to receive notifications
self.personal_group = f"user_{self.user.id}"
await self.channel_layer.group_add(self.personal_group, self.channel_name)
# Add to the matching pool in Redis
await database_sync_to_async(add_user_to_pool)(self.user)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.personal_group, self.channel_name)
await database_sync_to_async(remove_user_from_pool)(
self.user.id, self.user.language
)
async def receive(self, text_data):
data = json.loads(text_data)
if data.get("action") == "find_match":
interests = data.get("interests", [])
match_id = await database_sync_to_async(find_match)(
self.user.id, interests, self.user.language
)
if match_id:
room_id = (
f"room_{min(str(self.user.id), match_id)}"
f"_{max(str(self.user.id), match_id)}"
)
# Notify both users via the Redis Channel Layer
for uid in [str(self.user.id), match_id]:
await self.channel_layer.group_send(
f"user_{uid}",
{
"type": "match_found",
"room_id": room_id,
"partner_id": (
match_id if uid == str(self.user.id)
else str(self.user.id)
),
},
)
else:
await self.send(json.dumps({"type": "no_match_found"}))
# Handler called when a "match_found" message arrives via Redis
async def match_found(self, event):
await self.send(json.dumps({
"type": "match_found",
"room_id": event["room_id"],
"partner_id": event["partner_id"],
}))
group_send publishes through the Redis Channel Layer. All Daphne instances receive the message and deliver it to their connected WebSockets. This is the native Django Channels replacement for manual Redis Pub/Sub.
Rate Limiting with Redis
On Seendr, a user cannot trigger more than 10 matching attempts per minute. We implement this with a Django middleware and direct Redis access.
# middleware/rate_limit.py
from django.http import JsonResponse
from django_redis import get_redis_connection
class RateLimitMiddleware:
def __init__(self, get_response):
self.get_response = get_response
self.redis = get_redis_connection("default")
def __call__(self, request):
if request.path.startswith("/api/matching/"):
result = self._check_rate_limit(request)
if result:
return result
return self.get_response(request)
def _check_rate_limit(self, request, max_requests=10, window_seconds=60):
user_id = (
request.user.id
if request.user.is_authenticated
else request.META.get("REMOTE_ADDR")
)
key = f"ratelimit:{user_id}:{request.path}"
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, window_seconds)
if current > max_requests:
return JsonResponse(
{"error": "Too many requests", "retry_after": self.redis.ttl(key)},
status=429,
headers={"X-RateLimit-Remaining": "0"},
)
return None
Or with django-ratelimit, which integrates directly with the Redis cache backend:
# views/matching.py
from django_ratelimit.decorators import ratelimit
from django.utils.decorators import method_decorator
@method_decorator(ratelimit(key="user", rate="10/m", method="POST", block=True), name="post")
class MatchingView(APIView):
def post(self, request):
# ...
incr is atomic. Even if 200 requests arrive simultaneously, each one gets a unique counter value. No race condition possible.
Redis Sorted Sets — Real-Time Trending
On Seendr, we display the most popular interests of the moment. Redis sorted sets maintain this ranking automatically, with no SQL query required.
# services/trending.py
from django_redis import get_redis_connection
redis_client = get_redis_connection("default")
def track_interest_match(interest_slug: str):
"""Increment an interest's score on every successful match."""
redis_client.zincrby("trending:interests:daily", 1, interest_slug)
def get_trending_interests(limit: int = 10) -> list[dict]:
"""Get the top N interests with their scores."""
results = redis_client.zrevrange(
"trending:interests:daily", 0, limit - 1, withscores=True
)
return [{"slug": item[0].decode(), "score": int(item[1])} for item in results]
def get_interest_rank(interest_slug: str) -> int | None:
"""Get the rank of a given interest in the leaderboard."""
rank = redis_client.zrevrank("trending:interests:daily", interest_slug)
return rank + 1 if rank is not None else None
# Reset the daily leaderboard at midnight
@shared_task(name="reset_daily_trending")
def reset_daily_trending():
redis_client = get_redis_connection("default")
redis_client.delete("trending:interests:daily")
zincrby is atomic. 5,000 users can increment the same interest simultaneously without conflict.
Common Mistakes to Avoid
1. keys('*') in Production — Real Danger
KEYS is a blocking command. On 100,000 keys, it freezes Redis for several seconds and impacts every active user. Always use SCAN.
# DANGEROUS in production
keys = redis_client.keys("matching:pool:*")
# Correct approach
def scan_keys(pattern: str) -> list[str]:
keys = []
cursor = 0
while True:
cursor, found = redis_client.scan(cursor, match=pattern, count=100)
keys.extend(k.decode() for k in found)
if cursor == 0:
break
return keys
2. Cache Stampede — The Django Mutex Pattern
When a popular key expires, hundreds of requests hit the database simultaneously. The solution: a Redis-based mutex.
# utils/cache.py
import time
from django.core.cache import cache
def get_or_set_with_lock(cache_key: str, fetch_fn, timeout=300, lock_timeout=10):
"""Mutex pattern to prevent cache stampede."""
cached = cache.get(cache_key)
if cached is not None:
return cached
lock_key = f"lock:{cache_key}"
# cache.add = set only if the key does not exist
acquired = cache.add(lock_key, "1", lock_timeout)
if acquired:
try:
result = fetch_fn()
if result is not None:
cache.set(cache_key, result, timeout)
return result
finally:
cache.delete(lock_key)
else:
time.sleep(0.1)
return get_or_set_with_lock(cache_key, fetch_fn, timeout, lock_timeout)
3. Automatic Cache Invalidation with Django Signals
# signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.core.cache import cache
@receiver(post_save, sender=User)
def invalidate_user_cache(sender, instance, **kwargs):
cache.delete_many([
f"profile:{instance.id}",
f"user:{instance.id}:state",
])
@receiver(post_save, sender=Interest)
def invalidate_trending_cache(sender, **kwargs):
cache.delete_pattern("trending:interests:*") # django-redis only
4. Calibrate TTLs to Data Sensitivity
| Data type | Recommended TTL |
|---|---|
| Public user profile | 1 hour |
| Available users in matching pool | 2 minutes (auto-expires) |
| Online status | 30 seconds |
| Interest leaderboard | 5 minutes |
| Paginated search results | 3 minutes |
| Static content (FAQ, Terms) | 24 hours |
Monitoring Redis from Django
# views/health.py
from django_redis import get_redis_connection
from rest_framework.views import APIView
from rest_framework.response import Response
class RedisHealthView(APIView):
permission_classes = []
def get(self, request):
redis_client = get_redis_connection("default")
stats = redis_client.info("stats")
hits = stats.get("keyspace_hits", 0)
misses = stats.get("keyspace_misses", 0)
total = hits + misses
hit_rate = round((hits / total) * 100, 1) if total > 0 else 0
memory = redis_client.info("memory")
return Response({
"status": "healthy" if hit_rate > 80 else "warning",
"hit_rate": f"{hit_rate}%",
"memory_used": memory.get("used_memory_human"),
"connected_clients": redis_client.info("clients").get("connected_clients"),
})
A healthy cache hit rate is above 90%. Below 80%, your TTLs are probably too short or your cache keys too specific.
What Redis Changed on Seendr — Real Numbers
| Metric | Before Redis | After Redis |
|---|---|---|
| Average matching time | 800ms | 68ms |
| Max simultaneous connections | ~400 | 5,000+ |
| PostgreSQL queries/min | 12,000 | 1,100 |
| Cache hit rate | — | 94% |
| Monthly infrastructure cost | €85 | €42 |
This is not just about speed. It is a 50% reduction in infrastructure costs, because the database is under far less pressure.
Where to Start This Week
Day 1 — Add django-redis and configure CACHES. Decorate your most-called view with @cache_page. Measure before and after with Django Debug Toolbar.
Day 2 — Migrate your sessions to Redis (SESSION_ENGINE = "django.contrib.sessions.backends.cache").
Week 1 — Add rate limiting to your sensitive endpoints. Configure the Channel Layer for Django Channels.
Week 2 — Move slow tasks (email sending, image processing) to Celery with Redis as the broker.
The best Redis implementations I have seen all evolved incrementally. Start small. Measure. Iterate.
FAQ
django-redis or Django's native Redis backend?
django-redis in 2026. It exposes the raw Redis client via get_redis_connection(), supports delete_pattern(), and provides far more configuration options. The native backend is too limited for anything beyond basic caching.
Can Redis replace PostgreSQL?
No. Redis is for data that is acceptable to lose, or data that requires microsecond access. For transactions and data integrity, PostgreSQL remains non-negotiable. On Seendr, both coexist.
How much does Redis cost in production?
Between €0 (Redis Cloud free tier, 30MB) and €15/month (AWS ElastiCache, minimum instance). For variable traffic, Upstash pay-per-request is often the most economical starting point.
Do I need Redis if I already use Django Channels?
Yes — it is mandatory. Django Channels requires a channel layer to function correctly across multiple instances in production. Redis is the reference backend.
My hit rate is at 65%. What should I do?
Increase your TTLs if your data changes infrequently. Make sure your cache keys are not too specific. Use Django Debug Toolbar to identify the views with the most cache misses and prioritize them.
Diderot Sielinou — Full-stack Python/Django developer, creator of Seendr.
If this article helped you, share it with a developer building a real-time Django application.
Top comments (0)