Async Video Pipeline with Python asyncio for Multi-Region Fetching
TrendVidStream fetches trending video data from 8 regions: US, GB, AE (UAE), FI (Finland), DK (Denmark), CZ (Czech Republic), BE (Belgium), and CH (Switzerland). Sequential fetching is slow; asyncio makes it fast.
Here's the async pipeline I built, including the quirks specific to a diverse multi-region set.
Core Fetch Function
import asyncio
import aiohttp
import logging
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger(__name__)
REGIONS = ["AE", "FI", "DK", "CZ", "BE", "CH", "US", "GB"]
YT_BASE = "https://www.googleapis.com/youtube/v3"
@dataclass
class VideoMetadata:
video_id: str
title: str
channel_title: str
region: str
view_count: int
thumbnail_url: str
published_at: str
category_id: int
language: Optional[str] = None # defaultAudioLanguage from snippet
async def fetch_trending(
session: aiohttp.ClientSession,
region: str,
api_key: str,
sem: asyncio.Semaphore,
) -> list[VideoMetadata]:
"""Fetch trending videos for one region with backpressure control."""
async with sem:
params = {
"part": "snippet,statistics",
"chart": "mostPopular",
"regionCode": region,
"maxResults": 50,
"key": api_key,
}
try:
async with session.get(
f"{YT_BASE}/videos",
params=params,
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
if resp.status == 403:
logger.error(f"[{region}] 403 Forbidden — check API key quota")
return []
if resp.status == 429:
logger.warning(f"[{region}] Rate limited — sleeping 5s")
await asyncio.sleep(5)
return [] # Will be retried on next cron cycle
resp.raise_for_status()
data = await resp.json()
return [_parse(item, region) for item in data.get("items", [])]
except asyncio.TimeoutError:
logger.error(f"[{region}] Timeout")
return []
except aiohttp.ClientError as exc:
logger.error(f"[{region}] HTTP error: {exc}")
return []
def _parse(item: dict, region: str) -> VideoMetadata:
s = item["snippet"]
stats = item.get("statistics", {})
thumbs = s.get("thumbnails", {})
thumb = next(
(thumbs[q]["url"] for q in ("maxres", "standard", "high", "medium", "default") if q in thumbs),
"",
)
return VideoMetadata(
video_id=item["id"],
title=s.get("title", ""),
channel_title=s.get("channelTitle", ""),
region=region,
view_count=int(stats.get("viewCount", 0)),
thumbnail_url=thumb,
published_at=s.get("publishedAt", ""),
category_id=int(s.get("categoryId", 0)),
language=s.get("defaultAudioLanguage"), # 'ar' for UAE, 'fi' for Finland, etc.
)
Concurrent Fetch for All Regions
async def fetch_all(
api_key: str,
regions: list[str] = REGIONS,
concurrency: int = 4,
) -> dict[str, list[VideoMetadata]]:
"""Fetch all regions with bounded concurrency."""
sem = asyncio.Semaphore(concurrency)
connector = aiohttp.TCPConnector(limit=8, ttl_dns_cache=300)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [
asyncio.create_task(fetch_trending(session, r, api_key, sem))
for r in regions
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
region: (result if not isinstance(result, Exception) else [])
for region, result in zip(regions, results)
}
Handling Arabic Text from UAE
Arabic titles from the UAE feed come through as UTF-8 — Python handles this naturally. But there's a subtlety: Arabic text includes Unicode bidirectional control characters that can cause display issues if stored raw:
import unicodedata
import re
RTL_OVERRIDES = str.maketrans('', '', '\u200f\u200e\u202a\u202b\u202c\u202d\u202e')
def clean_title(title: str) -> str:
"""Remove BiDi control characters, normalize to NFC."""
title = title.translate(RTL_OVERRIDES) # Remove BiDi overrides
title = unicodedata.normalize('NFC', title) # Canonical composition
return title.strip()
Processing Pipeline with Language Detection
import asyncpg
from langdetect import detect, LangDetectException # pip install langdetect
async def process_and_store(
pool: asyncpg.Pool,
all_videos: dict[str, list[VideoMetadata]]
) -> dict[str, int]:
"""Process videos and bulk-upsert to PostgreSQL."""
processed = {}
for region, videos in all_videos.items():
clean_videos = []
for v in videos:
# Clean titles (especially important for Arabic RTL markers)
v.title = clean_title(v.title)
# If API doesn't provide language, detect it
if not v.language and v.title:
try:
v.language = detect(v.title)
except LangDetectException:
v.language = 'und' # Undetermined
clean_videos.append(v)
# Bulk upsert to PostgreSQL
async with pool.acquire() as conn:
await conn.executemany("""
INSERT INTO videos
(video_id, title, channel_title, region, view_count,
thumbnail_url, published_at, category_id, language)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (video_id, region) DO UPDATE SET
view_count = EXCLUDED.view_count,
fetched_at = now()
""", [
(v.video_id, v.title, v.channel_title, v.region,
v.view_count, v.thumbnail_url, v.published_at,
v.category_id, v.language)
for v in clean_videos
])
processed[region] = len(clean_videos)
logger.info(f"[{region}] Stored {len(clean_videos)} videos")
return processed
async def main():
import os
api_key = os.environ["YOUTUBE_API_KEY"]
db_url = os.environ["DATABASE_URL"]
pool = await asyncpg.create_pool(db_url, min_size=2, max_size=5)
all_videos = await fetch_all(api_key)
result = await process_and_store(pool, all_videos)
total = sum(result.values())
logger.info(f"Pipeline complete: {total} videos across {len(result)} regions")
await pool.close()
For TrendVidStream's 8 regions, this async pipeline runs in ~3 seconds vs ~16 seconds sequential. The defaultAudioLanguage field from YouTube's API is invaluable for UAE content — it tells you whether a video is primarily Arabic, Hindi, English, or Urdu, which drives the language mix analytics.
This article is part of the Building TrendVidStream series. Check out TrendVidStream to see these techniques in action.
Top comments (0)