When Synchronous Processing Breaks Down
As DailyWatch grew past 50,000 videos across 8 regions, our cron pipeline started hitting the wall. Thumbnail validation, metadata enrichment, and broken link detection can't all run in a single sequential script without blowing past timeout limits. The solution: offload heavy work to Celery task queues.
Setting Up Celery with Redis
Install the dependencies:
pip install celery[redis] requests pillow
Configure Celery with sensible defaults:
# celery_app.py
from celery import Celery
app = Celery(
"dailywatch",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
task_acks_late=True, # Re-queue if worker crashes
worker_prefetch_multiplier=4, # Don't grab too many tasks at once
task_soft_time_limit=120, # Soft limit: 2 minutes
task_time_limit=180, # Hard kill: 3 minutes
task_routes={
"tasks.validate_thumbnail": {"queue": "thumbnails"},
"tasks.enrich_metadata": {"queue": "metadata"},
"tasks.check_broken_links": {"queue": "links"},
},
)
Task 1: Thumbnail Validation
Detect broken or placeholder thumbnails:
# tasks.py
import requests
from PIL import Image
from io import BytesIO
from celery_app import app
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def validate_thumbnail(self, video_id: str, thumbnail_url: str) -> dict:
"""Check if a thumbnail is valid, not a placeholder, and accessible."""
try:
response = requests.get(thumbnail_url, timeout=10, stream=True)
response.raise_for_status()
img = Image.open(BytesIO(response.content))
width, height = img.size
# YouTube's "no thumbnail" placeholder is 120x90 gray
is_placeholder = (width == 120 and height == 90)
# Check for very small or broken images
is_valid = width >= 320 and height >= 180 and not is_placeholder
return {
"video_id": video_id,
"valid": is_valid,
"width": width,
"height": height,
"is_placeholder": is_placeholder,
"content_length": len(response.content),
}
except requests.RequestException as exc:
# Retry on network errors
raise self.retry(exc=exc)
except Exception as e:
return {
"video_id": video_id,
"valid": False,
"error": str(e),
}
Task 2: Metadata Enrichment
Fetch additional data that our initial pipeline skips for speed:
@app.task(bind=True, max_retries=2)
def enrich_metadata(self, video_id: str, api_key: str) -> dict:
"""Fetch extended metadata: tags, default language, topic categories."""
url = "https://www.googleapis.com/youtube/v3/videos"
params = {
"part": "snippet,topicDetails,localizations",
"id": video_id,
"key": api_key,
}
try:
resp = requests.get(url, params=params, timeout=15)
resp.raise_for_status()
items = resp.json().get("items", [])
if not items:
return {"video_id": video_id, "found": False}
item = items[0]
snippet = item.get("snippet", {})
topics = item.get("topicDetails", {})
return {
"video_id": video_id,
"found": True,
"tags": snippet.get("tags", [])[:20],
"default_language": snippet.get("defaultLanguage", ""),
"default_audio_language": snippet.get("defaultAudioLanguage", ""),
"topic_categories": topics.get("topicCategories", []),
"localized_titles": {
lang: loc.get("title", "")
for lang, loc in item.get("localizations", {}).items()
},
}
except requests.RequestException as exc:
raise self.retry(exc=exc, countdown=30)
Task 3: Broken Link Checker
Detect videos that have been removed or made private:
@app.task(rate_limit="10/m") # Don't hammer YouTube
def check_broken_links(video_ids: list[str]) -> dict:
"""Batch check which videos are still accessible."""
# YouTube oEmbed is free and doesn't need an API key
results = {"alive": [], "dead": [], "errors": []}
for vid in video_ids:
try:
resp = requests.head(
f"https://www.youtube.com/oembed?url=https://youtube.com/watch?v={vid}",
timeout=10,
allow_redirects=True,
)
if resp.status_code == 200:
results["alive"].append(vid)
else:
results["dead"].append(vid)
except requests.RequestException:
results["errors"].append(vid)
return results
Orchestrating from the Cron Pipeline
After the main fetch completes, dispatch background work:
from celery import group, chord
from tasks import validate_thumbnail, enrich_metadata, check_broken_links
def post_fetch_pipeline(new_videos: list[dict], api_key: str):
"""Dispatch background tasks after cron fetch completes."""
# Fan out thumbnail validation
thumbnail_jobs = group(
validate_thumbnail.s(v["video_id"], v["thumbnail_url"])
for v in new_videos
)
thumbnail_jobs.apply_async(queue="thumbnails")
# Fan out metadata enrichment (smaller batch, costs API quota)
enrich_jobs = group(
enrich_metadata.s(v["video_id"], api_key)
for v in new_videos[:50] # Limit to preserve quota
)
enrich_jobs.apply_async(queue="metadata")
# Batch broken link check for older videos
check_broken_links.apply_async(
args=[get_stale_video_ids(older_than_days=7)],
queue="links",
)
print(f"Dispatched {len(new_videos)} thumbnail + enrichment tasks")
Running the Workers
Start separate workers for each queue so you can scale them independently:
# Terminal 1: thumbnail workers (CPU-bound, fewer)
celery -A celery_app worker --queues=thumbnails --concurrency=4
# Terminal 2: metadata workers (IO-bound, more)
celery -A celery_app worker --queues=metadata --concurrency=8
# Terminal 3: link checker (rate-limited)
celery -A celery_app worker --queues=links --concurrency=2
Monitoring with Flower
pip install flower
celery -A celery_app flower --port=5555
This gives you a web dashboard at localhost:5555 showing task throughput, failure rates, and worker health.
Celery transformed the DailyWatch pipeline from a fragile monolith into a resilient distributed system. Tasks that used to block the cron cycle now run asynchronously, and failures get retried automatically.
This article is part of the Building DailyWatch series. Check out DailyWatch to see these techniques in action.
Top comments (0)