Async Video Metadata Pipeline with Python asyncio and aiohttp
Fetching trending video metadata from 9 regions sequentially takes about 18 seconds. Switching to asyncio with aiohttp drops that to under 3 seconds. Here's the async pipeline I built for TopVideoHub.
Why asyncio for YouTube API Calls
YouTube Data API calls are I/O-bound — your code spends most of its time waiting for HTTP responses. That's the perfect use case for asyncio: while one coroutine waits for Japan's trending data, another fetches Korea's, another Taiwan's. No OS threads needed.
The Core Fetch Coroutine
import asyncio
import aiohttp
import logging
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger(__name__)
REGIONS = ["JP", "KR", "TW", "SG", "VN", "TH", "HK", "US", "GB"]
YOUTUBE_BASE = "https://www.googleapis.com/youtube/v3"
@dataclass
class VideoMetadata:
video_id: str
title: str
channel_title: str
region: str
view_count: int
thumbnail_url: str
published_at: str
category_id: int
async def fetch_region(
session: aiohttp.ClientSession,
region: str,
api_key: str,
semaphore: asyncio.Semaphore,
max_results: int = 50,
) -> list[VideoMetadata]:
"""Fetch trending videos for a single region, respecting rate limits."""
async with semaphore: # Only N concurrent requests allowed
url = f"{YOUTUBE_BASE}/videos"
params = {
"part": "snippet,statistics,contentDetails",
"chart": "mostPopular",
"regionCode": region,
"maxResults": max_results,
"key": api_key,
}
try:
timeout = aiohttp.ClientTimeout(total=30)
async with session.get(url, params=params, timeout=timeout) as resp:
if resp.status == 429:
logger.warning(f"[{region}] Rate limited, retrying after backoff")
await asyncio.sleep(5)
return [] # Caller can retry
resp.raise_for_status()
data = await resp.json()
videos = [_parse_item(item, region) for item in data.get("items", [])]
logger.info(f"[{region}] Fetched {len(videos)} videos")
return videos
except aiohttp.ClientError as e:
logger.error(f"[{region}] HTTP error: {e}")
return []
except asyncio.TimeoutError:
logger.error(f"[{region}] Request timed out")
return []
def _parse_item(item: dict, region: str) -> VideoMetadata:
snippet = item["snippet"]
stats = item.get("statistics", {})
thumbs = snippet.get("thumbnails", {})
thumb_url = ""
for quality in ("maxres", "standard", "high", "medium", "default"):
if quality in thumbs:
thumb_url = thumbs[quality]["url"]
break
return VideoMetadata(
video_id=item["id"],
title=snippet["title"],
channel_title=snippet.get("channelTitle", ""),
region=region,
view_count=int(stats.get("viewCount", 0)),
thumbnail_url=thumb_url,
published_at=snippet.get("publishedAt", ""),
category_id=int(snippet.get("categoryId", 0)),
)
Gathering All Regions
async def fetch_all_regions(
api_key: str,
regions: list[str] = REGIONS,
max_concurrent: int = 5,
) -> dict[str, list[VideoMetadata]]:
"""Fetch all regions concurrently with controlled parallelism."""
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(
limit=10, # Max total simultaneous connections
ttl_dns_cache=300, # Cache DNS lookups for 5 minutes
)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [
fetch_region(session, region, api_key, semaphore)
for region in regions
]
# gather() runs all tasks concurrently, collects results
results = await asyncio.gather(*tasks, return_exceptions=True)
output = {}
for region, result in zip(regions, results):
if isinstance(result, Exception):
logger.error(f"[{region}] Task exception: {result}")
output[region] = []
else:
output[region] = result
return output
Handling CJK Titles
Japanese, Korean, and Chinese titles from the API come back as proper Unicode. Python 3 handles this natively, but normalize before storing to ensure consistent representation:
import unicodedata
def normalize_cjk_title(title: str) -> str:
"""NFC normalization for consistent CJK Unicode storage."""
# NFC: composed form (e.g. U+00E9 rather than e + combining acute)
title = unicodedata.normalize("NFC", title)
# Remove control characters but preserve all CJK scripts
return "".join(
ch for ch in title
if unicodedata.category(ch) not in ("Cc", "Cs")
).strip()
Async Database Writes
Pair async fetching with async database writes for maximum throughput:
import asyncpg
async def bulk_upsert(pool: asyncpg.Pool, videos: list[VideoMetadata]) -> int:
"""Bulk upsert using executemany for efficient batch writes."""
async with pool.acquire() as conn:
await conn.executemany("""
INSERT INTO videos
(video_id, title, channel_title, region, view_count,
thumbnail_url, published_at, category_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (video_id, region) DO UPDATE SET
view_count = EXCLUDED.view_count,
fetched_at = now()
""", [
(v.video_id, normalize_cjk_title(v.title), v.channel_title,
v.region, v.view_count, v.thumbnail_url, v.published_at, v.category_id)
for v in videos
])
return len(videos)
async def main():
import os
api_key = os.environ["YOUTUBE_API_KEY"]
db_url = os.environ["DATABASE_URL"]
pool = await asyncpg.create_pool(db_url, min_size=2, max_size=5)
all_videos = await fetch_all_regions(api_key)
flat_videos = [v for videos in all_videos.values() for v in videos]
inserted = await bulk_upsert(pool, flat_videos)
logger.info(f"Upserted {inserted} videos from {len(all_videos)} regions")
await pool.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
Performance Results
For TopVideoHub's 9 Asia-Pacific regions:
| Approach | Time | Notes |
|---|---|---|
| Sequential requests | ~18s | Simple loop |
| asyncio semaphore=3 | ~6s | 3 concurrent |
| asyncio semaphore=9 | ~2.5s | All parallel |
The semaphore value of 3-5 is the sweet spot — enough parallelism to be fast, few enough concurrent connections to avoid triggering YouTube's rate limiting. The return_exceptions=True argument to gather() is critical: without it, a single region failure would cancel all other pending requests.
This article is part of the Building TopVideoHub series. Check out TopVideoHub to see these techniques in action.
Top comments (0)