Background Jobs for a Multilingual Video Library
TrendVidStream fetches videos from UAE, Finland, Czech Republic, Denmark, Belgium, UK, Switzerland, and the US. Content validation — thumbnail health, language detection for Arabic vs. Finnish vs. Czech titles, broken embed cleanup — runs as Celery background tasks so the main fetch loop stays fast.
Project Setup
pip install celery redis requests langdetect
celery_app.py:
from celery import Celery
app = Celery(
'trendvidstream',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
)
app.conf.update(
task_serializer='json',
result_expires=3600,
task_acks_late=True,
worker_prefetch_multiplier=1,
task_routes={
'tasks.validate_thumbnail': {'queue': 'thumbnails'},
'tasks.detect_language': {'queue': 'enrichment'},
'tasks.check_embed': {'queue': 'health'},
'tasks.cleanup_stale': {'queue': 'maintenance'},
},
)
Task: Thumbnail Validation
import requests
from celery_app import app
from database import get_db
@app.task(name='tasks.validate_thumbnail', bind=True, max_retries=2, default_retry_delay=60)
def validate_thumbnail(self, video_id: str, thumbnail_url: str, region: str) -> dict:
try:
resp = requests.head(thumbnail_url, timeout=6, allow_redirects=True)
ok = resp.status_code == 200 and 'image' in resp.headers.get('Content-Type', '')
if not ok:
db = get_db()
db.execute('UPDATE videos SET thumbnail_broken=1 WHERE video_id=?', (video_id,))
db.commit()
return {'video_id': video_id, 'region': region, 'ok': ok}
except requests.RequestException as exc:
raise self.retry(exc=exc)
Task: Multilingual Language Detection
TrendVidStream regions include Arabic (AE), Finnish (FI), Czech (CZ), Dutch (BE), and English (GB):
from langdetect import detect, LangDetectException
EXPECTED_LANGS = {
'AE': {'ar'}, 'FI': {'fi'}, 'CZ': {'cs'}, 'DK': {'da'},
'BE': {'nl', 'fr'}, 'CH': {'de', 'fr', 'it'}, 'GB': {'en'}, 'US': {'en'},
}
@app.task(name='tasks.detect_language')
def detect_language(video_id: str, title: str, region: str) -> dict:
try:
lang = detect(title)
except LangDetectException:
lang = 'und'
expected = EXPECTED_LANGS.get(region, {'en'})
is_expected = lang in expected
db = get_db()
db.execute('UPDATE videos SET title_lang=?, lang_mismatch=? WHERE video_id=?',
(lang, 0 if is_expected else 1, video_id))
db.commit()
if not is_expected:
print(f'[LANG MISMATCH] {region}: expected {expected}, got "{lang}" — {title[:50]}')
return {'video_id': video_id, 'lang': lang, 'expected': is_expected}
Task: Broken Embed Detection
@app.task(name='tasks.check_embed', bind=True, max_retries=1, default_retry_delay=300)
def check_embed(self, video_id: str) -> bool:
url = f'https://www.youtube.com/oembed?url=https://youtu.be/{video_id}&format=json'
try:
resp = requests.get(url, timeout=8)
live = resp.status_code == 200
if not live:
db = get_db()
db.execute('UPDATE videos SET embed_broken=1 WHERE video_id=?', (video_id,))
db.commit()
return live
except requests.RequestException as exc:
raise self.retry(exc=exc)
Task: Stale Content Cleanup
from datetime import datetime, timedelta
@app.task(name='tasks.cleanup_stale')
def cleanup_stale(days: int = 30) -> int:
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
db = get_db()
cur = db.execute('DELETE FROM videos WHERE fetched_at < ? AND embed_broken = 1', (cutoff,))
db.commit()
return cur.rowcount
Celery Beat Scheduler
celery -A celery_app worker -Q thumbnails,enrichment -c 8 &
celery -A celery_app worker -Q health -c 4 &
celery -A celery_app worker -Q maintenance -c 1 &
celery -A celery_app beat &
Since deploying Celery for TrendVidStream, Arabic language mismatch detection alone flagged over 300 videos incorrectly assigned to European regions, improving content quality across the platform.
This article is part of the Building TrendVidStream series. Check out TrendVidStream to see these techniques in action.
Top comments (0)