DEV Community

ahmet gedik
ahmet gedik

Posted on

Async Video Processing Pipeline with Python for European Content

Processing Video Data from 7 European Regions

ViralVidVault fetches trending videos from Poland, Netherlands, Sweden, Norway, Austria, UK, and US every 7 hours. Each API call has 200-500ms of network latency. Done sequentially, that is 1.4-3.5 seconds of waiting. With Python's asyncio, all 7 fetches run concurrently and the total time drops to one round-trip.

The Fetcher with Regional Awareness

European APIs sometimes have quirks — rate limit differences, occasional regional outages, content encoding issues with non-ASCII characters. The fetcher accounts for these:

import asyncio
import aiohttp
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class RegionalVideo:
    video_id: str
    title: str
    channel: str
    views: int
    region: str
    language: str = "en"
    fetched_at: datetime = field(default_factory=datetime.utcnow)

REGION_LANGUAGES = {
    "PL": "pl", "NL": "nl", "SE": "sv",
    "NO": "no", "AT": "de", "GB": "en", "US": "en",
}

class EuropeanFetcher:
    def __init__(self, api_key: str, max_concurrent: int = 3):
        self.api_key = api_key
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.stats = {"fetched": 0, "errors": 0, "regions_ok": []}

    async def fetch_region(self, session: aiohttp.ClientSession, region: str) -> list[RegionalVideo]:
        async with self.semaphore:
            url = "https://www.googleapis.com/youtube/v3/videos"
            params = {
                "part": "snippet,statistics,contentDetails",
                "chart": "mostPopular",
                "regionCode": region,
                "maxResults": 25,
                "key": self.api_key,
            }
            try:
                async with session.get(url, params=params,
                                       timeout=aiohttp.ClientTimeout(total=10)) as resp:
                    if resp.status == 429:
                        wait = int(resp.headers.get("Retry-After", 5))
                        print(f"[{region}] Rate limited, waiting {wait}s")
                        await asyncio.sleep(wait)
                        return await self.fetch_region(session, region)

                    resp.raise_for_status()
                    data = await resp.json()
                    lang = REGION_LANGUAGES.get(region, "en")

                    videos = [
                        RegionalVideo(
                            video_id=item["id"],
                            title=item["snippet"]["title"],
                            channel=item["snippet"].get("channelTitle", ""),
                            views=int(item["statistics"].get("viewCount", 0)),
                            region=region,
                            language=lang,
                        )
                        for item in data.get("items", [])
                    ]
                    self.stats["fetched"] += len(videos)
                    self.stats["regions_ok"].append(region)
                    return videos

            except asyncio.TimeoutError:
                print(f"[{region}] Timeout")
                self.stats["errors"] += 1
                return []
            except aiohttp.ClientError as e:
                print(f"[{region}] Error: {e}")
                self.stats["errors"] += 1
                return []

    async def fetch_all(self) -> dict[str, list[RegionalVideo]]:
        regions = ["US", "GB", "PL", "NL", "SE", "NO", "AT"]
        async with aiohttp.ClientSession() as session:
            tasks = {r: self.fetch_region(session, r) for r in regions}
            results = {}
            gathered = await asyncio.gather(
                *[self.fetch_region(session, r) for r in regions],
                return_exceptions=True,
            )
            for region, result in zip(regions, gathered):
                if isinstance(result, Exception):
                    print(f"[{region}] Exception: {result}")
                    results[region] = []
                else:
                    results[region] = result
            return results
Enter fullscreen mode Exit fullscreen mode

The REGION_LANGUAGES mapping is essential for downstream processing — it tells the search indexer which PostgreSQL dictionary to use and helps with content categorization.

Deduplication with Region Merging

A viral music video might trend in Poland, Netherlands, and Sweden simultaneously:

def deduplicate(region_results: dict[str, list[RegionalVideo]]) -> list[dict]:
    seen: dict[str, dict] = {}

    for region, videos in region_results.items():
        for video in videos:
            if video.video_id in seen:
                seen[video.video_id]["regions"].append(region)
                seen[video.video_id]["views"] = max(
                    seen[video.video_id]["views"], video.views
                )
            else:
                seen[video.video_id] = {
                    "video_id": video.video_id,
                    "title": video.title,
                    "channel": video.channel,
                    "views": video.views,
                    "regions": [region],
                    "primary_language": video.language,
                }

    return list(seen.values())
Enter fullscreen mode Exit fullscreen mode

For ViralVidVault, cross-region overlap is less than 5% among European regions — much lower than you might expect. This means most videos appear in exactly one region's trending feed.

Running the Full Pipeline

async def run_pipeline(api_key: str):
    fetcher = EuropeanFetcher(api_key, max_concurrent=3)

    # Stage 1: Fetch all regions concurrently
    raw = await fetcher.fetch_all()
    total_raw = sum(len(v) for v in raw.values())
    print(f"Fetched {total_raw} videos from {len(fetcher.stats['regions_ok'])} regions")

    # Stage 2: Deduplicate
    unique = deduplicate(raw)
    print(f"After dedup: {len(unique)} unique videos")

    # Stage 3: Store (using your DB layer)
    # await store_videos(unique)

    return {"raw": total_raw, "unique": len(unique), "stats": fetcher.stats}

if __name__ == "__main__":
    import os
    result = asyncio.run(run_pipeline(os.environ["YOUTUBE_API_KEY"]))
    print(f"Pipeline complete: {result}")
Enter fullscreen mode Exit fullscreen mode

Performance Numbers

On a typical 7-region fetch for ViralVidVault:

Approach Total Time
Sequential for loop 2.8s
asyncio.gather (unlimited) 0.45s
asyncio + Semaphore(3) 0.7s

The semaphore version is 4x faster than sequential while respecting rate limits. The full pipeline (fetch + dedup + store) completes in about 1.5 seconds.


This article is part of the Building ViralVidVault series. Check out ViralVidVault to see these techniques in action.

Top comments (0)