DEV Community

ahmet gedik
ahmet gedik

Posted on

Python Celery Task Queues for Video Metadata Processing

Why Background Jobs?

Fetching trending videos is fast. But validating that 5,000 thumbnails still load, enriching metadata with language detection, and purging broken embeds from the database — these tasks can take minutes. Running them synchronously inside a cron job blocks the fetcher and risks timeouts.

Celery with a Redis broker moves this work off the hot path. TopVideoHub uses a four-worker Celery setup to keep its 9-region video library clean without affecting fetch latency.

Setup

pip install celery redis requests langdetect
Enter fullscreen mode Exit fullscreen mode

celery_app.py:

from celery import Celery

app = Celery(
    'topvideohub',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
)

app.conf.update(
    task_serializer='json',
    result_expires=3600,
    worker_prefetch_multiplier=1,
    task_acks_late=True,
    task_routes={
        'tasks.validate_thumbnail': {'queue': 'thumbnails'},
        'tasks.detect_language':    {'queue': 'enrichment'},
        'tasks.check_embed':        {'queue': 'health'},
        'tasks.cleanup_stale':      {'queue': 'maintenance'},
    },
)
Enter fullscreen mode Exit fullscreen mode

Task: Thumbnail Validation

CJK video thumbnails occasionally return 404 after the video is removed from YouTube but before our cron cleanup runs.

import requests
from celery_app import app
from database import get_db

@app.task(
    name='tasks.validate_thumbnail',
    bind=True,
    max_retries=2,
    default_retry_delay=60,
)
def validate_thumbnail(self, video_id: str, thumbnail_url: str) -> dict:
    try:
        resp = requests.head(thumbnail_url, timeout=5, allow_redirects=True)
        content_type = resp.headers.get('Content-Type', '')
        ok = resp.status_code == 200 and 'image' in content_type
        if not ok:
            db = get_db()
            db.execute('UPDATE videos SET thumbnail_broken = 1 WHERE video_id = ?', (video_id,))
            db.commit()
        return {'video_id': video_id, 'ok': ok}
    except requests.RequestException as exc:
        raise self.retry(exc=exc)
Enter fullscreen mode Exit fullscreen mode

Task: Language Detection for CJK Metadata

Knowing whether a title is Japanese, Korean, or Chinese allows smarter regional routing:

from langdetect import detect, LangDetectException

@app.task(name='tasks.detect_language')
def detect_language(video_id: str, title: str) -> str:
    try:
        lang = detect(title)  # returns 'ja', 'ko', 'zh-cn', 'zh-tw', etc.
    except LangDetectException:
        lang = 'und'

    db = get_db()
    db.execute('UPDATE videos SET title_lang = ? WHERE video_id = ?', (lang, video_id))
    db.commit()
    return lang
Enter fullscreen mode Exit fullscreen mode

Task: Broken Embed Detection

YouTube embeds 404 when a video is taken down for copyright:

@app.task(name='tasks.check_embed', bind=True, max_retries=1, default_retry_delay=300)
def check_embed(self, video_id: str) -> bool:
    url = f'https://www.youtube.com/oembed?url=https://youtu.be/{video_id}&format=json'
    try:
        resp = requests.get(url, timeout=8)
        live = resp.status_code == 200
        if not live:
            db = get_db()
            db.execute('UPDATE videos SET embed_broken = 1 WHERE video_id = ?', (video_id,))
            db.commit()
        return live
    except requests.RequestException as exc:
        raise self.retry(exc=exc)
Enter fullscreen mode Exit fullscreen mode

Task: Stale Content Cleanup

from datetime import datetime, timedelta

@app.task(name='tasks.cleanup_stale')
def cleanup_stale(days: int = 30) -> int:
    cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
    db = get_db()
    cur = db.execute('DELETE FROM videos WHERE fetched_at < ? AND embed_broken = 1', (cutoff,))
    db.commit()
    return cur.rowcount
Enter fullscreen mode Exit fullscreen mode

Celery Beat Schedule

from celery.schedules import crontab
from celery_app import app

app.conf.beat_schedule = {
    'validate-thumbnails-nightly': {
        'task': 'tasks.validate_thumbnail_batch',
        'schedule': crontab(hour=3, minute=0),
    },
    'cleanup-stale-weekly': {
        'task': 'tasks.cleanup_stale',
        'schedule': crontab(day_of_week='sunday', hour=2),
        'kwargs': {'days': 30},
    },
}
Enter fullscreen mode Exit fullscreen mode

Start workers:

celery -A celery_app worker -Q thumbnails -c 10 &
celery -A celery_app worker -Q health -c 5 &
celery -A celery_app worker -Q maintenance -c 1 &
celery -A celery_app beat &
Enter fullscreen mode Exit fullscreen mode

Running this alongside TopVideoHub's PHP fetcher keeps the video library accurate. Thumbnail 404 rates dropped from 3% to under 0.1% within a week of deploying the validation queue.


This article is part of the Building TopVideoHub series. Check out TopVideoHub to see these techniques in action.

Top comments (0)