DEV Community

ahmet gedik
ahmet gedik

Posted on

Async Video Pipeline with Python asyncio for Multi-Region Fetching

Async Video Pipeline with Python asyncio for Multi-Region Fetching

TrendVidStream fetches trending video data from 8 regions: US, GB, AE (UAE), FI (Finland), DK (Denmark), CZ (Czech Republic), BE (Belgium), and CH (Switzerland). Sequential fetching is slow; asyncio makes it fast.

Here's the async pipeline I built, including the quirks specific to a diverse multi-region set.

Core Fetch Function

import asyncio
import aiohttp
import logging
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger(__name__)

REGIONS = ["AE", "FI", "DK", "CZ", "BE", "CH", "US", "GB"]
YT_BASE = "https://www.googleapis.com/youtube/v3"


@dataclass
class VideoMetadata:
    video_id: str
    title: str
    channel_title: str
    region: str
    view_count: int
    thumbnail_url: str
    published_at: str
    category_id: int
    language: Optional[str] = None  # defaultAudioLanguage from snippet


async def fetch_trending(
    session: aiohttp.ClientSession,
    region: str,
    api_key: str,
    sem: asyncio.Semaphore,
) -> list[VideoMetadata]:
    """Fetch trending videos for one region with backpressure control."""
    async with sem:
        params = {
            "part": "snippet,statistics",
            "chart": "mostPopular",
            "regionCode": region,
            "maxResults": 50,
            "key": api_key,
        }
        try:
            async with session.get(
                f"{YT_BASE}/videos",
                params=params,
                timeout=aiohttp.ClientTimeout(total=30),
            ) as resp:
                if resp.status == 403:
                    logger.error(f"[{region}] 403 Forbidden — check API key quota")
                    return []
                if resp.status == 429:
                    logger.warning(f"[{region}] Rate limited — sleeping 5s")
                    await asyncio.sleep(5)
                    return []  # Will be retried on next cron cycle
                resp.raise_for_status()
                data = await resp.json()
                return [_parse(item, region) for item in data.get("items", [])]
        except asyncio.TimeoutError:
            logger.error(f"[{region}] Timeout")
            return []
        except aiohttp.ClientError as exc:
            logger.error(f"[{region}] HTTP error: {exc}")
            return []


def _parse(item: dict, region: str) -> VideoMetadata:
    s = item["snippet"]
    stats = item.get("statistics", {})
    thumbs = s.get("thumbnails", {})
    thumb = next(
        (thumbs[q]["url"] for q in ("maxres", "standard", "high", "medium", "default") if q in thumbs),
        "",
    )
    return VideoMetadata(
        video_id=item["id"],
        title=s.get("title", ""),
        channel_title=s.get("channelTitle", ""),
        region=region,
        view_count=int(stats.get("viewCount", 0)),
        thumbnail_url=thumb,
        published_at=s.get("publishedAt", ""),
        category_id=int(s.get("categoryId", 0)),
        language=s.get("defaultAudioLanguage"),  # 'ar' for UAE, 'fi' for Finland, etc.
    )
Enter fullscreen mode Exit fullscreen mode

Concurrent Fetch for All Regions

async def fetch_all(
    api_key: str,
    regions: list[str] = REGIONS,
    concurrency: int = 4,
) -> dict[str, list[VideoMetadata]]:
    """Fetch all regions with bounded concurrency."""
    sem = asyncio.Semaphore(concurrency)
    connector = aiohttp.TCPConnector(limit=8, ttl_dns_cache=300)

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [
            asyncio.create_task(fetch_trending(session, r, api_key, sem))
            for r in regions
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return {
        region: (result if not isinstance(result, Exception) else [])
        for region, result in zip(regions, results)
    }
Enter fullscreen mode Exit fullscreen mode

Handling Arabic Text from UAE

Arabic titles from the UAE feed come through as UTF-8 — Python handles this naturally. But there's a subtlety: Arabic text includes Unicode bidirectional control characters that can cause display issues if stored raw:

import unicodedata
import re

RTL_OVERRIDES = str.maketrans('', '', '\u200f\u200e\u202a\u202b\u202c\u202d\u202e')

def clean_title(title: str) -> str:
    """Remove BiDi control characters, normalize to NFC."""
    title = title.translate(RTL_OVERRIDES)  # Remove BiDi overrides
    title = unicodedata.normalize('NFC', title)  # Canonical composition
    return title.strip()
Enter fullscreen mode Exit fullscreen mode

Processing Pipeline with Language Detection

import asyncpg
from langdetect import detect, LangDetectException  # pip install langdetect

async def process_and_store(
    pool: asyncpg.Pool,
    all_videos: dict[str, list[VideoMetadata]]
) -> dict[str, int]:
    """Process videos and bulk-upsert to PostgreSQL."""
    processed = {}

    for region, videos in all_videos.items():
        clean_videos = []
        for v in videos:
            # Clean titles (especially important for Arabic RTL markers)
            v.title = clean_title(v.title)

            # If API doesn't provide language, detect it
            if not v.language and v.title:
                try:
                    v.language = detect(v.title)
                except LangDetectException:
                    v.language = 'und'  # Undetermined

            clean_videos.append(v)

        # Bulk upsert to PostgreSQL
        async with pool.acquire() as conn:
            await conn.executemany("""
                INSERT INTO videos
                    (video_id, title, channel_title, region, view_count,
                     thumbnail_url, published_at, category_id, language)
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
                ON CONFLICT (video_id, region) DO UPDATE SET
                    view_count = EXCLUDED.view_count,
                    fetched_at = now()
            """, [
                (v.video_id, v.title, v.channel_title, v.region,
                 v.view_count, v.thumbnail_url, v.published_at,
                 v.category_id, v.language)
                for v in clean_videos
            ])
        processed[region] = len(clean_videos)
        logger.info(f"[{region}] Stored {len(clean_videos)} videos")

    return processed


async def main():
    import os
    api_key = os.environ["YOUTUBE_API_KEY"]
    db_url = os.environ["DATABASE_URL"]

    pool = await asyncpg.create_pool(db_url, min_size=2, max_size=5)
    all_videos = await fetch_all(api_key)
    result = await process_and_store(pool, all_videos)

    total = sum(result.values())
    logger.info(f"Pipeline complete: {total} videos across {len(result)} regions")
    await pool.close()
Enter fullscreen mode Exit fullscreen mode

For TrendVidStream's 8 regions, this async pipeline runs in ~3 seconds vs ~16 seconds sequential. The defaultAudioLanguage field from YouTube's API is invaluable for UAE content — it tells you whether a video is primarily Arabic, Hindi, English, or Urdu, which drives the language mix analytics.


This article is part of the Building TrendVidStream series. Check out TrendVidStream to see these techniques in action.

Top comments (0)