DEV Community

ahmet gedik
ahmet gedik

Posted on

Python Celery Task Queues for Video Metadata Processing

Background Jobs for a Multilingual Video Library

TrendVidStream fetches videos from UAE, Finland, Czech Republic, Denmark, Belgium, UK, Switzerland, and the US. Content validation — thumbnail health, language detection for Arabic vs. Finnish vs. Czech titles, broken embed cleanup — runs as Celery background tasks so the main fetch loop stays fast.

Project Setup

pip install celery redis requests langdetect
Enter fullscreen mode Exit fullscreen mode

celery_app.py:

from celery import Celery

app = Celery(
    'trendvidstream',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
)

app.conf.update(
    task_serializer='json',
    result_expires=3600,
    task_acks_late=True,
    worker_prefetch_multiplier=1,
    task_routes={
        'tasks.validate_thumbnail': {'queue': 'thumbnails'},
        'tasks.detect_language':    {'queue': 'enrichment'},
        'tasks.check_embed':        {'queue': 'health'},
        'tasks.cleanup_stale':      {'queue': 'maintenance'},
    },
)
Enter fullscreen mode Exit fullscreen mode

Task: Thumbnail Validation

import requests
from celery_app import app
from database import get_db

@app.task(name='tasks.validate_thumbnail', bind=True, max_retries=2, default_retry_delay=60)
def validate_thumbnail(self, video_id: str, thumbnail_url: str, region: str) -> dict:
    try:
        resp = requests.head(thumbnail_url, timeout=6, allow_redirects=True)
        ok = resp.status_code == 200 and 'image' in resp.headers.get('Content-Type', '')
        if not ok:
            db = get_db()
            db.execute('UPDATE videos SET thumbnail_broken=1 WHERE video_id=?', (video_id,))
            db.commit()
        return {'video_id': video_id, 'region': region, 'ok': ok}
    except requests.RequestException as exc:
        raise self.retry(exc=exc)
Enter fullscreen mode Exit fullscreen mode

Task: Multilingual Language Detection

TrendVidStream regions include Arabic (AE), Finnish (FI), Czech (CZ), Dutch (BE), and English (GB):

from langdetect import detect, LangDetectException

EXPECTED_LANGS = {
    'AE': {'ar'}, 'FI': {'fi'}, 'CZ': {'cs'}, 'DK': {'da'},
    'BE': {'nl', 'fr'}, 'CH': {'de', 'fr', 'it'}, 'GB': {'en'}, 'US': {'en'},
}

@app.task(name='tasks.detect_language')
def detect_language(video_id: str, title: str, region: str) -> dict:
    try:
        lang = detect(title)
    except LangDetectException:
        lang = 'und'

    expected = EXPECTED_LANGS.get(region, {'en'})
    is_expected = lang in expected

    db = get_db()
    db.execute('UPDATE videos SET title_lang=?, lang_mismatch=? WHERE video_id=?',
               (lang, 0 if is_expected else 1, video_id))
    db.commit()

    if not is_expected:
        print(f'[LANG MISMATCH] {region}: expected {expected}, got "{lang}"{title[:50]}')
    return {'video_id': video_id, 'lang': lang, 'expected': is_expected}
Enter fullscreen mode Exit fullscreen mode

Task: Broken Embed Detection

@app.task(name='tasks.check_embed', bind=True, max_retries=1, default_retry_delay=300)
def check_embed(self, video_id: str) -> bool:
    url = f'https://www.youtube.com/oembed?url=https://youtu.be/{video_id}&format=json'
    try:
        resp = requests.get(url, timeout=8)
        live = resp.status_code == 200
        if not live:
            db = get_db()
            db.execute('UPDATE videos SET embed_broken=1 WHERE video_id=?', (video_id,))
            db.commit()
        return live
    except requests.RequestException as exc:
        raise self.retry(exc=exc)
Enter fullscreen mode Exit fullscreen mode

Task: Stale Content Cleanup

from datetime import datetime, timedelta

@app.task(name='tasks.cleanup_stale')
def cleanup_stale(days: int = 30) -> int:
    cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
    db = get_db()
    cur = db.execute('DELETE FROM videos WHERE fetched_at < ? AND embed_broken = 1', (cutoff,))
    db.commit()
    return cur.rowcount
Enter fullscreen mode Exit fullscreen mode

Celery Beat Scheduler

celery -A celery_app worker -Q thumbnails,enrichment -c 8 &
celery -A celery_app worker -Q health -c 4 &
celery -A celery_app worker -Q maintenance -c 1 &
celery -A celery_app beat &
Enter fullscreen mode Exit fullscreen mode

Since deploying Celery for TrendVidStream, Arabic language mismatch detection alone flagged over 300 videos incorrectly assigned to European regions, improving content quality across the platform.


This article is part of the Building TrendVidStream series. Check out TrendVidStream to see these techniques in action.

Top comments (0)