DEV Community

ahmet gedik
ahmet gedik

Posted on

Async Video Metadata Pipeline with Python asyncio and aiohttp

Async Video Metadata Pipeline with Python asyncio and aiohttp

Fetching trending video metadata from 9 regions sequentially takes about 18 seconds. Switching to asyncio with aiohttp drops that to under 3 seconds. Here's the async pipeline I built for TopVideoHub.

Why asyncio for YouTube API Calls

YouTube Data API calls are I/O-bound — your code spends most of its time waiting for HTTP responses. That's the perfect use case for asyncio: while one coroutine waits for Japan's trending data, another fetches Korea's, another Taiwan's. No OS threads needed.

The Core Fetch Coroutine

import asyncio
import aiohttp
import logging
from dataclasses import dataclass
from typing import Optional

logger = logging.getLogger(__name__)

REGIONS = ["JP", "KR", "TW", "SG", "VN", "TH", "HK", "US", "GB"]
YOUTUBE_BASE = "https://www.googleapis.com/youtube/v3"


@dataclass
class VideoMetadata:
    video_id: str
    title: str
    channel_title: str
    region: str
    view_count: int
    thumbnail_url: str
    published_at: str
    category_id: int


async def fetch_region(
    session: aiohttp.ClientSession,
    region: str,
    api_key: str,
    semaphore: asyncio.Semaphore,
    max_results: int = 50,
) -> list[VideoMetadata]:
    """Fetch trending videos for a single region, respecting rate limits."""
    async with semaphore:  # Only N concurrent requests allowed
        url = f"{YOUTUBE_BASE}/videos"
        params = {
            "part": "snippet,statistics,contentDetails",
            "chart": "mostPopular",
            "regionCode": region,
            "maxResults": max_results,
            "key": api_key,
        }
        try:
            timeout = aiohttp.ClientTimeout(total=30)
            async with session.get(url, params=params, timeout=timeout) as resp:
                if resp.status == 429:
                    logger.warning(f"[{region}] Rate limited, retrying after backoff")
                    await asyncio.sleep(5)
                    return []  # Caller can retry
                resp.raise_for_status()
                data = await resp.json()
                videos = [_parse_item(item, region) for item in data.get("items", [])]
                logger.info(f"[{region}] Fetched {len(videos)} videos")
                return videos
        except aiohttp.ClientError as e:
            logger.error(f"[{region}] HTTP error: {e}")
            return []
        except asyncio.TimeoutError:
            logger.error(f"[{region}] Request timed out")
            return []


def _parse_item(item: dict, region: str) -> VideoMetadata:
    snippet = item["snippet"]
    stats = item.get("statistics", {})
    thumbs = snippet.get("thumbnails", {})
    thumb_url = ""
    for quality in ("maxres", "standard", "high", "medium", "default"):
        if quality in thumbs:
            thumb_url = thumbs[quality]["url"]
            break
    return VideoMetadata(
        video_id=item["id"],
        title=snippet["title"],
        channel_title=snippet.get("channelTitle", ""),
        region=region,
        view_count=int(stats.get("viewCount", 0)),
        thumbnail_url=thumb_url,
        published_at=snippet.get("publishedAt", ""),
        category_id=int(snippet.get("categoryId", 0)),
    )
Enter fullscreen mode Exit fullscreen mode

Gathering All Regions

async def fetch_all_regions(
    api_key: str,
    regions: list[str] = REGIONS,
    max_concurrent: int = 5,
) -> dict[str, list[VideoMetadata]]:
    """Fetch all regions concurrently with controlled parallelism."""
    semaphore = asyncio.Semaphore(max_concurrent)

    connector = aiohttp.TCPConnector(
        limit=10,           # Max total simultaneous connections
        ttl_dns_cache=300,  # Cache DNS lookups for 5 minutes
    )

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [
            fetch_region(session, region, api_key, semaphore)
            for region in regions
        ]
        # gather() runs all tasks concurrently, collects results
        results = await asyncio.gather(*tasks, return_exceptions=True)

    output = {}
    for region, result in zip(regions, results):
        if isinstance(result, Exception):
            logger.error(f"[{region}] Task exception: {result}")
            output[region] = []
        else:
            output[region] = result

    return output
Enter fullscreen mode Exit fullscreen mode

Handling CJK Titles

Japanese, Korean, and Chinese titles from the API come back as proper Unicode. Python 3 handles this natively, but normalize before storing to ensure consistent representation:

import unicodedata

def normalize_cjk_title(title: str) -> str:
    """NFC normalization for consistent CJK Unicode storage."""
    # NFC: composed form (e.g. U+00E9 rather than e + combining acute)
    title = unicodedata.normalize("NFC", title)
    # Remove control characters but preserve all CJK scripts
    return "".join(
        ch for ch in title
        if unicodedata.category(ch) not in ("Cc", "Cs")
    ).strip()
Enter fullscreen mode Exit fullscreen mode

Async Database Writes

Pair async fetching with async database writes for maximum throughput:

import asyncpg

async def bulk_upsert(pool: asyncpg.Pool, videos: list[VideoMetadata]) -> int:
    """Bulk upsert using executemany for efficient batch writes."""
    async with pool.acquire() as conn:
        await conn.executemany("""
            INSERT INTO videos
                (video_id, title, channel_title, region, view_count,
                 thumbnail_url, published_at, category_id)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
            ON CONFLICT (video_id, region) DO UPDATE SET
                view_count = EXCLUDED.view_count,
                fetched_at = now()
        """, [
            (v.video_id, normalize_cjk_title(v.title), v.channel_title,
             v.region, v.view_count, v.thumbnail_url, v.published_at, v.category_id)
            for v in videos
        ])
    return len(videos)


async def main():
    import os
    api_key = os.environ["YOUTUBE_API_KEY"]
    db_url = os.environ["DATABASE_URL"]

    pool = await asyncpg.create_pool(db_url, min_size=2, max_size=5)

    all_videos = await fetch_all_regions(api_key)
    flat_videos = [v for videos in all_videos.values() for v in videos]

    inserted = await bulk_upsert(pool, flat_videos)
    logger.info(f"Upserted {inserted} videos from {len(all_videos)} regions")

    await pool.close()


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Performance Results

For TopVideoHub's 9 Asia-Pacific regions:

Approach Time Notes
Sequential requests ~18s Simple loop
asyncio semaphore=3 ~6s 3 concurrent
asyncio semaphore=9 ~2.5s All parallel

The semaphore value of 3-5 is the sweet spot — enough parallelism to be fast, few enough concurrent connections to avoid triggering YouTube's rate limiting. The return_exceptions=True argument to gather() is critical: without it, a single region failure would cancel all other pending requests.


This article is part of the Building TopVideoHub series. Check out TopVideoHub to see these techniques in action.

Top comments (0)