DEV Community

ahmet gedik
ahmet gedik

Posted on

Python Celery Task Queues for Video Metadata Processing

When Synchronous Processing Breaks Down

As DailyWatch grew past 50,000 videos across 8 regions, our cron pipeline started hitting the wall. Thumbnail validation, metadata enrichment, and broken link detection can't all run in a single sequential script without blowing past timeout limits. The solution: offload heavy work to Celery task queues.

Setting Up Celery with Redis

Install the dependencies:

pip install celery[redis] requests pillow
Enter fullscreen mode Exit fullscreen mode

Configure Celery with sensible defaults:

# celery_app.py
from celery import Celery

app = Celery(
    "dailywatch",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="UTC",
    task_acks_late=True,             # Re-queue if worker crashes
    worker_prefetch_multiplier=4,    # Don't grab too many tasks at once
    task_soft_time_limit=120,        # Soft limit: 2 minutes
    task_time_limit=180,             # Hard kill: 3 minutes
    task_routes={
        "tasks.validate_thumbnail": {"queue": "thumbnails"},
        "tasks.enrich_metadata": {"queue": "metadata"},
        "tasks.check_broken_links": {"queue": "links"},
    },
)
Enter fullscreen mode Exit fullscreen mode

Task 1: Thumbnail Validation

Detect broken or placeholder thumbnails:

# tasks.py
import requests
from PIL import Image
from io import BytesIO
from celery_app import app

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def validate_thumbnail(self, video_id: str, thumbnail_url: str) -> dict:
    """Check if a thumbnail is valid, not a placeholder, and accessible."""
    try:
        response = requests.get(thumbnail_url, timeout=10, stream=True)
        response.raise_for_status()

        img = Image.open(BytesIO(response.content))
        width, height = img.size

        # YouTube's "no thumbnail" placeholder is 120x90 gray
        is_placeholder = (width == 120 and height == 90)

        # Check for very small or broken images
        is_valid = width >= 320 and height >= 180 and not is_placeholder

        return {
            "video_id": video_id,
            "valid": is_valid,
            "width": width,
            "height": height,
            "is_placeholder": is_placeholder,
            "content_length": len(response.content),
        }

    except requests.RequestException as exc:
        # Retry on network errors
        raise self.retry(exc=exc)
    except Exception as e:
        return {
            "video_id": video_id,
            "valid": False,
            "error": str(e),
        }
Enter fullscreen mode Exit fullscreen mode

Task 2: Metadata Enrichment

Fetch additional data that our initial pipeline skips for speed:

@app.task(bind=True, max_retries=2)
def enrich_metadata(self, video_id: str, api_key: str) -> dict:
    """Fetch extended metadata: tags, default language, topic categories."""
    url = "https://www.googleapis.com/youtube/v3/videos"
    params = {
        "part": "snippet,topicDetails,localizations",
        "id": video_id,
        "key": api_key,
    }

    try:
        resp = requests.get(url, params=params, timeout=15)
        resp.raise_for_status()
        items = resp.json().get("items", [])

        if not items:
            return {"video_id": video_id, "found": False}

        item = items[0]
        snippet = item.get("snippet", {})
        topics = item.get("topicDetails", {})

        return {
            "video_id": video_id,
            "found": True,
            "tags": snippet.get("tags", [])[:20],
            "default_language": snippet.get("defaultLanguage", ""),
            "default_audio_language": snippet.get("defaultAudioLanguage", ""),
            "topic_categories": topics.get("topicCategories", []),
            "localized_titles": {
                lang: loc.get("title", "")
                for lang, loc in item.get("localizations", {}).items()
            },
        }

    except requests.RequestException as exc:
        raise self.retry(exc=exc, countdown=30)
Enter fullscreen mode Exit fullscreen mode

Task 3: Broken Link Checker

Detect videos that have been removed or made private:

@app.task(rate_limit="10/m")  # Don't hammer YouTube
def check_broken_links(video_ids: list[str]) -> dict:
    """Batch check which videos are still accessible."""
    # YouTube oEmbed is free and doesn't need an API key
    results = {"alive": [], "dead": [], "errors": []}

    for vid in video_ids:
        try:
            resp = requests.head(
                f"https://www.youtube.com/oembed?url=https://youtube.com/watch?v={vid}",
                timeout=10,
                allow_redirects=True,
            )
            if resp.status_code == 200:
                results["alive"].append(vid)
            else:
                results["dead"].append(vid)
        except requests.RequestException:
            results["errors"].append(vid)

    return results
Enter fullscreen mode Exit fullscreen mode

Orchestrating from the Cron Pipeline

After the main fetch completes, dispatch background work:

from celery import group, chord
from tasks import validate_thumbnail, enrich_metadata, check_broken_links

def post_fetch_pipeline(new_videos: list[dict], api_key: str):
    """Dispatch background tasks after cron fetch completes."""

    # Fan out thumbnail validation
    thumbnail_jobs = group(
        validate_thumbnail.s(v["video_id"], v["thumbnail_url"])
        for v in new_videos
    )
    thumbnail_jobs.apply_async(queue="thumbnails")

    # Fan out metadata enrichment (smaller batch, costs API quota)
    enrich_jobs = group(
        enrich_metadata.s(v["video_id"], api_key)
        for v in new_videos[:50]  # Limit to preserve quota
    )
    enrich_jobs.apply_async(queue="metadata")

    # Batch broken link check for older videos
    check_broken_links.apply_async(
        args=[get_stale_video_ids(older_than_days=7)],
        queue="links",
    )

    print(f"Dispatched {len(new_videos)} thumbnail + enrichment tasks")
Enter fullscreen mode Exit fullscreen mode

Running the Workers

Start separate workers for each queue so you can scale them independently:

# Terminal 1: thumbnail workers (CPU-bound, fewer)
celery -A celery_app worker --queues=thumbnails --concurrency=4

# Terminal 2: metadata workers (IO-bound, more)
celery -A celery_app worker --queues=metadata --concurrency=8

# Terminal 3: link checker (rate-limited)
celery -A celery_app worker --queues=links --concurrency=2
Enter fullscreen mode Exit fullscreen mode

Monitoring with Flower

pip install flower
celery -A celery_app flower --port=5555
Enter fullscreen mode Exit fullscreen mode

This gives you a web dashboard at localhost:5555 showing task throughput, failure rates, and worker health.

Celery transformed the DailyWatch pipeline from a fragile monolith into a resilient distributed system. Tasks that used to block the cron cycle now run asynchronously, and failures get retried automatically.


This article is part of the Building DailyWatch series. Check out DailyWatch to see these techniques in action.

Top comments (0)