Performance is everything in modern web applications. Users expect fast responses, instant feedback, and seamless experiences even when your backend is running heavy workloads.
So what happens when your app needs to handle tasks that can't complete in milliseconds? Think of:
- Generating AI-powered summaries
- Processing large CSV files for bulk imports
- Sending thousands of emails or notifications
- Running complex image or video processing
If you try to handle these in the request/response cycle, your app slows down or worse, times out.
This is where background processing with Django and Celery comes in. In this article, we'll break down how to build a production-ready async backend that can handle heavy lifting without blocking user requests.
Why Background Processing Matters
Imagine this scenario: your user uploads a 100MB CSV with 50,000 records, and you try to process it in the same request.
- The request takes minutes to complete
- Your server thread is blocked
- The user gets frustrated or abandons the app
Instead, with background processing:
- The request is accepted quickly
- The task is queued in the background
- The user gets an instant response (202 Accepted – Your job is being processed)
- The heavy lifting happens elsewhere (workers)
- The user can poll for results or receive notifications
The Architecture
Here's a simplified flow:
Client → Django View → Celery Task Queue → Worker → Result Backend → Client (WebSocket)
- Client → Django View: User submits a request (e.g., upload a file, request AI summary)
- Django View → Task Queue: The request is validated and pushed to Celery via a broker like Redis
- Worker: Celery workers pick up tasks from the queue and process them asynchronously
- Result Backend: Task results and status are stored in Redis or your database
- Client: Receives real-time updates via WebSocket (no polling needed!)
Project Setup & Configuration
Installing Dependencies
pip install celery redis channels channels-redis django-rest-framework
pip install google-generativeai PyPDF2 pdfplumber python-docx
Celery Configuration
The key to a robust Celery setup is proper configuration with rate limiting, time limits, and periodic tasks:
# core/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
app = Celery('ai_document_processor')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# Periodic tasks for maintenance
app.conf.beat_schedule = {
'cleanup-failed-jobs': {
'task': 'documents.tasks.cleanup_failed_jobs',
'schedule': 3600.0, # Every hour
},
}
# Task execution limits and rate control
app.conf.task_annotations = {
'documents.tasks.process_document': {
'rate_limit': '10/m', # 10 tasks per minute
'time_limit': 1800, # 30 minutes max
'soft_time_limit': 1500, # Warning at 25 minutes
},
}
Key Configuration Highlights:
- Rate Limiting: Prevents overwhelming external APIs
- Time Limits: Prevents runaway tasks from consuming resources
- Periodic Tasks: Automated cleanup via Celery Beat
- Auto-discovery: Finds tasks across all Django apps
Django Settings
# settings.py
REDIS_URL = config('REDIS_URL', default='redis://localhost:6379/0')
# Celery
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# Channels for WebSockets
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {'hosts': [REDIS_URL]},
},
}
# AI Configuration
GEMINI_API_KEY = config('GEMINI_API_KEY')
GEMINI_MODEL = 'gemini-pro'
Data Models
We use four key models to track the entire processing lifecycle:
- Document: Stores file metadata and processing status
- ProcessingJob: Tracks Celery task execution with progress updates
- DocumentAnalysis: Stores AI-generated results (summary, sentiment, topics, etc.)
- ProcessingLog: Detailed logs for debugging
Each model uses UUIDs for better security and includes timestamps, status fields, and relationships that ensure data integrity through OneToOne and ForeignKey constraints.
The Processing Pipeline
Main Task Orchestration
Here's the heart of the system - the main Celery task that coordinates everything:
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_document(self, document_id: str) -> dict:
"""Process document through the complete pipeline"""
try:
document = Document.objects.get(id=document_id)
# Create processing job
processing_job = ProcessingJob.objects.create(
document=document,
celery_task_id=self.request.id,
status='started'
)
# Step 1: Extract text (20% progress)
self.update_state(state='PROGRESS', meta={'progress': 20})
extracted_text = extract_text_from_document(document)
send_websocket_update(document_id, 'processing_update', {
'progress': 20,
'message': 'Text extraction completed'
})
# Step 2: AI Analysis (50% progress)
self.update_state(state='PROGRESS', meta={'progress': 50})
analysis_results = analyze_document_with_gemini(document, extracted_text)
send_websocket_update(document_id, 'processing_update', {
'progress': 90,
'message': 'AI analysis completed'
})
# Step 3: Finalize (100% progress)
document.status = 'completed'
document.save()
send_websocket_update(document_id, 'processing_complete', {
'status': 'completed'
})
return {'status': 'success', 'document_id': document_id}
except Exception as exc:
# Handle errors and retry with exponential backoff
if self.request.retries < self.max_retries:
raise self.retry(countdown=60 * (2 ** self.request.retries))
raise exc
Key Features:
- Progress Tracking: Updates state at each step (20%, 50%, 100%)
- WebSocket Updates: Real-time notifications to connected clients
- Retry Logic: Exponential backoff for transient failures
- Error Handling: Comprehensive exception management
Sending Real-Time Updates
The critical link between Celery workers and WebSocket clients:
def send_websocket_update(document_id: str, event_type: str, data: dict):
"""Send updates to WebSocket clients"""
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f'document_{document_id}',
{
'type': event_type,
'document_id': document_id,
'timestamp': timezone.now().isoformat(),
**data
}
)
This bridges the gap between synchronous Celery tasks and asynchronous WebSocket connections, enabling real-time updates.
Text Extraction with Fallbacks
def extract_text_from_pdf(file_path: str) -> str:
"""Extract text with fallback strategy"""
try:
# Primary: pdfplumber (more reliable)
with pdfplumber.open(file_path) as pdf:
return '\n'.join([page.extract_text() for page in pdf.pages])
except Exception:
# Fallback: PyPDF2
return extract_with_pypdf2(file_path)
Multiple extraction methods ensure robustness - if one library fails, we automatically try another.
AI Analysis
def analyze_document_with_gemini(document, text_content: str) -> dict:
"""Run multiple AI analyses"""
model = genai.GenerativeModel('gemini-pro')
analysis = DocumentAnalysis.objects.create(document=document)
# Run different analysis types
analysis_types = {
'summary': 'Provide a 2-3 sentence summary',
'key_points': 'Extract key points as JSON list',
'sentiment': 'Analyze sentiment: positive/negative/neutral',
'topics': 'Identify main topics as JSON list'
}
for analysis_type, prompt in analysis_types.items():
response = model.generate_content(f"{prompt}\n\n{text_content[:4000]}")
# Store results in analysis object
setattr(analysis, f'{analysis_type}_completed', True)
time.sleep(1) # Respect API rate limits
analysis.save()
return analysis
Best Practices:
- Rate limiting with sleep between calls
- Token tracking for cost management
- Flexible JSON parsing with fallbacks
- Error isolation - one failure doesn't break others
Real-Time Updates with WebSockets
Server-Side Consumer
class DocumentProcessingConsumer(AsyncWebsocketConsumer):
"""Real-time processing updates"""
async def connect(self):
self.document_id = self.scope['url_route']['kwargs']['document_id']
self.room_group_name = f'document_{self.document_id}'
# Verify user has access
if await self.check_document_access():
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
await self.send_document_status()
async def processing_update(self, event):
"""Forward processing updates to client"""
await self.send(text_data=json.dumps({
'type': 'processing_update',
'progress': event['progress'],
'message': event['message']
}))
WebSocket Features:
- Authentication checks before accepting connections
- Room-based broadcasting for multiple clients
- Automatic status updates on connection
- Type-specific event handlers
Client-Side Integration
const socket = new WebSocket(`ws://${location.host}/ws/documents/${docId}/`);
socket.onmessage = function(e) {
const data = JSON.parse(e.data);
if (data.type === 'processing_update') {
updateProgressBar(data.progress);
showMessage(data.message);
} else if (data.type === 'processing_complete') {
showSuccess('Processing completed!');
}
};
REST API Design
Document Processing Endpoints
class DocumentViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated]
def perform_create(self, serializer):
"""Upload and automatically start processing"""
document = serializer.save(user=self.request.user)
process_document.delay(str(document.id))
@action(detail=True, methods=['post'])
def reprocess(self, request, pk=None):
"""Restart failed processing"""
document = self.get_object()
document.status = 'uploaded'
document.save()
process_document.delay(str(document.id))
return Response({'message': 'Reprocessing started'})
@action(detail=True, methods=['post'])
def cancel_processing(self, request, pk=None):
"""Cancel active processing"""
job = self.get_object().processing_job
from celery import current_app
current_app.control.revoke(job.celery_task_id, terminate=True)
return Response({'message': 'Processing cancelled'})
API Highlights:
- RESTful design with standard HTTP methods
- Automatic processing on upload
- Reprocessing for failed documents
- Task cancellation support
Production Best Practices
1. Task Rate Limiting
app.conf.task_annotations = {
'documents.tasks.process_document': {
'rate_limit': '10/m', # Prevent queue flooding
},
}
2. Monitoring with Flower
pip install flower
celery -A core flower --port=5555
# Access dashboard at http://localhost:5555
Flower provides:
- Real-time task monitoring
- Worker statistics
- Task history and results
- Performance graphs
3. Horizontal Scaling
# Run multiple workers
celery -A core worker --concurrency=4 -n worker1@%h
celery -A core worker --concurrency=4 -n worker2@%h
# Separate queues by priority
celery -A core worker -Q high_priority,default -n worker1@%h
celery -A core worker -Q low_priority -n worker2@%h
4. Task Priority Queues
# Configure routing
app.conf.task_routes = {
'documents.tasks.process_document': {'queue': 'high_priority'},
'documents.tasks.cleanup_old_logs': {'queue': 'low_priority'},
}
# Send to specific queue
process_document.apply_async(
args=[document_id],
queue='high_priority',
priority=9
)
5. Error Tracking
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
dsn="your-sentry-dsn",
integrations=[CeleryIntegration()],
)
Beyond AI: Other Real-Time Use Cases
WebSocket + Celery architecture is perfect for:
- Live Data Processing: Real-time analytics, log processing, data transformations
- File Uploads: Show upload progress, virus scanning, format conversion
- Report Generation: PDF/Excel generation with progress updates
- Batch Operations: Bulk updates, mass emails, data migrations
- Integration Workflows: Multi-step API integrations with status updates
- Machine Learning: Model training progress, inference batching
- Image/Video Processing: Encoding, resizing, filtering with live previews
Wrapping Up
By combining Django, Celery, and Django Channels, you can:
- Keep your app responsive under heavy load
- Provide real-time feedback to users (no polling!)
- Handle long-running tasks safely in the background
- Scale horizontally by adding more workers
- Build modern, real-time user experiences
This architecture powers everything from AI-driven features to bulk data processing anywhere you need to keep users informed while work happens asynchronously.
Quick Reference Commands
# Development
redis-server
celery -A core worker --loglevel=info
python manage.py runserver
# Production
celery -A core worker --loglevel=info --concurrency=4
celery -A core beat --loglevel=info # Scheduled tasks
celery -A core flower # Monitoring
daphne -b 0.0.0.0 -p 8000 core.asgi:application
# Debugging
celery -A core inspect active # View active tasks
celery -A core inspect stats # Worker statistics
celery -A core purge # Clear all tasks
Resources
- Celery Documentation
- Django Channels Documentation
- Channels Redis Layer
- Google Gemini API
- Flower Monitoring
Repository: The complete code is available at ai-document-processor
Top comments (0)