This documentation covers how to scale a Celery-based application for document extraction and comparison using FastAPI, Celery, and Redis. The guide includes steps for task splitting, configuring task dependencies, and scaling individual tasks.
Table of Contents
- Introduction
- Task Definitions
- Orchestrating Tasks with Parallel Processing
- FastAPI Integration
- Scaling Celery Workers
- Using Dedicated Queues for Each Task Type
- Autoscaling
- Distributed Task Execution
- Monitoring and Management
- Load Balancing and High Availability
- Summary
Introduction
This guide provides a detailed explanation of how to scale a Celery-based application that performs document extraction and comparison. It covers breaking down the tasks, orchestrating them for parallel processing, and scaling the application to handle increased loads in a production environment.
Task Definitions
Define the tasks for fetching, extracting, and comparing documents:
# tasks.py
from celery_config import celery_app
import logging
logger = logging.getLogger(__name__)
@celery_app.task
def fetch_documents_task(blob_path):
try:
documents = fetch_documents(blob_path) # Replace with your actual fetch logic
return documents # Assume this returns a list of document paths or contents
except Exception as e:
logger.error(f"Error fetching documents: {e}")
raise
@celery_app.task
def extract_data_task(document):
try:
extracted_data = extract_data(document) # Replace with your actual extraction logic
return extracted_data
except Exception as e:
logger.error(f"Error extracting data: {e}")
raise
@celery_app.task
def compare_data_task(extracted_data_list):
try:
comparison_results = compare_data(extracted_data_list) # Replace with your actual comparison logic
return comparison_results
except Exception as e:
logger.error(f"Error comparing data: {e}")
raise
Orchestrating Tasks with Parallel Processing
Use a combination of chains and groups to handle dependencies and parallel processing:
# main.py or workflow.py
from celery import chain, group
from tasks import fetch_documents_task, extract_data_task, compare_data_task
def process_documents(blob_path):
# Step 1: Fetch documents
fetch_task = fetch_documents_task.s(blob_path)
# Step 2: Extract data from each document in parallel
extract_tasks = fetch_task | group(extract_data_task.s(doc) for doc in fetch_task.get())
# Step 3: Compare the extracted data
compare_task = compare_data_task.s()
# Combine the workflow into a single chain
workflow = chain(fetch_task, extract_tasks, compare_task)
result = workflow.apply_async()
return result
FastAPI Integration
Integrate the workflow with a FastAPI endpoint:
# main.py
from fastapi import FastAPI
from workflow import process_documents # Import your workflow function
from celery_config import celery_app
app = FastAPI()
@app.post("/process/")
async def process_endpoint(blob_path: str):
result = process_documents(blob_path)
return {"task_id": result.id}
@app.get("/status/{task_id}")
async def get_status(task_id: str):
result = celery_app.AsyncResult(task_id)
if result.state == 'PENDING':
return {"status": "Pending..."}
elif result.state == 'SUCCESS':
return {"status": "Completed", "result": result.result}
elif result.state == 'FAILURE':
return {"status": "Failed", "result": str(result.result)}
else:
return {"status": result.state}
Scaling Celery Workers
Increasing the Number of Workers
Start multiple Celery worker processes:
celery -A celery_config worker --loglevel=info --concurrency=4
To scale further, start more workers:
celery -A celery_config worker --loglevel=info --concurrency=4
celery -A celery_config worker --loglevel=info --concurrency=4
Distributed Workers
Run workers on different machines by pointing them to the same message broker:
celery -A celery_config worker --loglevel=info --concurrency=4 -Q fetch_queue
celery -A celery_config worker --loglevel=info --concurrency=8 -Q extract_queue
celery -A celery_config worker --loglevel=info --concurrency=2 -Q compare_queue
Using Dedicated Queues for Each Task Type
Defining Queues
Configure Celery to define multiple queues:
# celery_config.py
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
celery_app.conf.task_queues = (
Queue('fetch_queue', routing_key='fetch.#'),
Queue('extract_queue', routing_key='extract.#'),
Queue('compare_queue', routing_key='compare.#'),
)
celery_app.conf.task_routes = {
'tasks.fetch_documents_task': {'queue': 'fetch_queue', 'routing_key': 'fetch.documents'},
'tasks.extract_data_task': {'queue': 'extract_queue', 'routing_key': 'extract.data'},
'tasks.compare_data_task': {'queue': 'compare_queue', 'routing_key': 'compare.data'},
}
Starting Workers for Specific Queues
celery -A celery_config worker --loglevel=info --concurrency=4 -Q fetch_queue
celery -A celery_config worker --loglevel=info --concurrency=8 -Q extract_queue
celery -A celery_config worker --loglevel=info --concurrency=2 -Q compare_queue
Autoscaling
Enable autoscaling to dynamically adjust the number of worker processes:
celery -A celery_config worker --loglevel=info --autoscale=10,3
-
--autoscale=10,3
: Scales between 3 and 10 worker processes based on load.
Distributed Task Execution
Distribute Celery workers across multiple machines:
Example Setup
-
Machine 1 (Message Broker and Backend):
- Run Redis as your broker and backend.
-
Machine 2 (Worker Node):
- Start Celery workers:
celery -A celery_config worker --loglevel=info --concurrency=4 -Q fetch_queue
-
Machine 3 (Worker Node):
- Start Celery workers:
celery -A celery_config worker --loglevel=info --concurrency=8 -Q extract_queue
-
Machine 4 (Worker Node):
- Start Celery workers:
celery -A celery_config worker --loglevel=info --concurrency=2 -Q compare_queue
Monitoring and Management
Use monitoring tools like Flower, Prometheus, and Grafana to monitor Celery tasks:
Flower
Start Flower to monitor Celery workers:
celery -A celery_config flower
Load Balancing and High Availability
Implement load balancing for high availability and fault tolerance:
Example Load Balancer Setup
Use HAProxy or another load balancer to distribute requests across multiple Redis instances.
Summary
- Scale Workers: Increase the number of Celery workers to handle more tasks concurrently.
- Dedicated Queues: Use different queues for different types of tasks and scale them independently.
- Autoscaling: Enable autoscaling to dynamically adjust the number of worker processes based on load.
- Distributed Execution: Distribute workers across multiple machines to improve scalability and fault tolerance.
- Monitoring: Use monitoring tools to keep track of the performance and health of your Celery workers.
- Load Balancing: Implement load balancing for high availability and fault tolerance.
By following these strategies, you can effectively scale your Celery-based application to handle increased loads and ensure reliable task execution in a production environment.
Top comments (1)
Is there any co-relation between concurrency and number of threads in a machine?