Processing Video Data from 7 European Regions
ViralVidVault fetches trending videos from Poland, Netherlands, Sweden, Norway, Austria, UK, and US every 7 hours. Each API call has 200-500ms of network latency. Done sequentially, that is 1.4-3.5 seconds of waiting. With Python's asyncio, all 7 fetches run concurrently and the total time drops to one round-trip.
The Fetcher with Regional Awareness
European APIs sometimes have quirks — rate limit differences, occasional regional outages, content encoding issues with non-ASCII characters. The fetcher accounts for these:
import asyncio
import aiohttp
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class RegionalVideo:
video_id: str
title: str
channel: str
views: int
region: str
language: str = "en"
fetched_at: datetime = field(default_factory=datetime.utcnow)
REGION_LANGUAGES = {
"PL": "pl", "NL": "nl", "SE": "sv",
"NO": "no", "AT": "de", "GB": "en", "US": "en",
}
class EuropeanFetcher:
def __init__(self, api_key: str, max_concurrent: int = 3):
self.api_key = api_key
self.semaphore = asyncio.Semaphore(max_concurrent)
self.stats = {"fetched": 0, "errors": 0, "regions_ok": []}
async def fetch_region(self, session: aiohttp.ClientSession, region: str) -> list[RegionalVideo]:
async with self.semaphore:
url = "https://www.googleapis.com/youtube/v3/videos"
params = {
"part": "snippet,statistics,contentDetails",
"chart": "mostPopular",
"regionCode": region,
"maxResults": 25,
"key": self.api_key,
}
try:
async with session.get(url, params=params,
timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 429:
wait = int(resp.headers.get("Retry-After", 5))
print(f"[{region}] Rate limited, waiting {wait}s")
await asyncio.sleep(wait)
return await self.fetch_region(session, region)
resp.raise_for_status()
data = await resp.json()
lang = REGION_LANGUAGES.get(region, "en")
videos = [
RegionalVideo(
video_id=item["id"],
title=item["snippet"]["title"],
channel=item["snippet"].get("channelTitle", ""),
views=int(item["statistics"].get("viewCount", 0)),
region=region,
language=lang,
)
for item in data.get("items", [])
]
self.stats["fetched"] += len(videos)
self.stats["regions_ok"].append(region)
return videos
except asyncio.TimeoutError:
print(f"[{region}] Timeout")
self.stats["errors"] += 1
return []
except aiohttp.ClientError as e:
print(f"[{region}] Error: {e}")
self.stats["errors"] += 1
return []
async def fetch_all(self) -> dict[str, list[RegionalVideo]]:
regions = ["US", "GB", "PL", "NL", "SE", "NO", "AT"]
async with aiohttp.ClientSession() as session:
tasks = {r: self.fetch_region(session, r) for r in regions}
results = {}
gathered = await asyncio.gather(
*[self.fetch_region(session, r) for r in regions],
return_exceptions=True,
)
for region, result in zip(regions, gathered):
if isinstance(result, Exception):
print(f"[{region}] Exception: {result}")
results[region] = []
else:
results[region] = result
return results
The REGION_LANGUAGES mapping is essential for downstream processing — it tells the search indexer which PostgreSQL dictionary to use and helps with content categorization.
Deduplication with Region Merging
A viral music video might trend in Poland, Netherlands, and Sweden simultaneously:
def deduplicate(region_results: dict[str, list[RegionalVideo]]) -> list[dict]:
seen: dict[str, dict] = {}
for region, videos in region_results.items():
for video in videos:
if video.video_id in seen:
seen[video.video_id]["regions"].append(region)
seen[video.video_id]["views"] = max(
seen[video.video_id]["views"], video.views
)
else:
seen[video.video_id] = {
"video_id": video.video_id,
"title": video.title,
"channel": video.channel,
"views": video.views,
"regions": [region],
"primary_language": video.language,
}
return list(seen.values())
For ViralVidVault, cross-region overlap is less than 5% among European regions — much lower than you might expect. This means most videos appear in exactly one region's trending feed.
Running the Full Pipeline
async def run_pipeline(api_key: str):
fetcher = EuropeanFetcher(api_key, max_concurrent=3)
# Stage 1: Fetch all regions concurrently
raw = await fetcher.fetch_all()
total_raw = sum(len(v) for v in raw.values())
print(f"Fetched {total_raw} videos from {len(fetcher.stats['regions_ok'])} regions")
# Stage 2: Deduplicate
unique = deduplicate(raw)
print(f"After dedup: {len(unique)} unique videos")
# Stage 3: Store (using your DB layer)
# await store_videos(unique)
return {"raw": total_raw, "unique": len(unique), "stats": fetcher.stats}
if __name__ == "__main__":
import os
result = asyncio.run(run_pipeline(os.environ["YOUTUBE_API_KEY"]))
print(f"Pipeline complete: {result}")
Performance Numbers
On a typical 7-region fetch for ViralVidVault:
| Approach | Total Time |
|---|---|
| Sequential for loop | 2.8s |
| asyncio.gather (unlimited) | 0.45s |
| asyncio + Semaphore(3) | 0.7s |
The semaphore version is 4x faster than sequential while respecting rate limits. The full pipeline (fetch + dedup + store) completes in about 1.5 seconds.
This article is part of the Building ViralVidVault series. Check out ViralVidVault to see these techniques in action.
Top comments (0)