DEV Community

Cover image for How I built an AI Pipeline That Transcribes YouTube with Whisper and Summarizes with Claude
John T
John T

Posted on

How I built an AI Pipeline That Transcribes YouTube with Whisper and Summarizes with Claude

How I Built an AI Pipeline That Transcribes YouTube With Whisper and Summarizes With Claude

Getting useful information out of long-form YouTube content is a time sink. You watch a 90-minute podcast, realize the three key insights could have been a paragraph, and wonder if there is a better way. There is. I built MurmurCast, an automated pipeline that monitors YouTube channels, transcribes new uploads using Whisper, summarizes them with Claude, and delivers the results as a daily email digest. This article is a full architecture walkthrough -- the APIs involved, the fallback strategies, the infrastructure choices, and every hard lesson learned along the way.

The Problem: YouTube Content Is Unstructured and Time-Consuming

The average knowledge worker follows dozens of YouTube channels, podcasts, and newsletters. The content is valuable but locked in formats that demand your full attention. You cannot skim a video the way you skim an article. Search inside a video is terrible. And if you follow 20 channels posting weekly, you are looking at 20+ hours of content to stay current.

I wanted a system that would do the following automatically:

  1. Detect when a channel publishes new content
  2. Extract the spoken words from the video
  3. Generate a structured summary with key insights
  4. Bundle everything into a daily email digest

The result is a pipeline with five distinct stages, each with its own challenges.

Architecture Overview: The Five-Stage Pipeline

The system runs on FastAPI (backend), Next.js (frontend), PostgreSQL (database), Celery + Redis (task queue), and is deployed on Railway. Here is how data flows through the system:

YouTube Data API / WebSub Notification
        |
        v
  [1. Discovery]  -- Poll channels or receive push notifications
        |
        v
  [2. Caption Extraction]  -- youtube_transcript_api (free, fast)
        |
        v  (if captions unavailable)
  [3. Audio Download + Whisper]  -- yt-dlp + Replicate Whisper large-v3
        |
        v
  [4. Summarization]  -- Claude (Anthropic) with structured JSON output
        |
        v
  [5. Daily Brief Generation]  -- Aggregate summaries, send email via Resend
Enter fullscreen mode Exit fullscreen mode

Each stage is a Celery task that chains into the next. If transcription succeeds, it dispatches summarization. If summarization succeeds, the episode is marked complete and becomes eligible for the next daily brief.

Stage 1: Discovering New Videos

There are two mechanisms for detecting new uploads, and the system uses both.

Polling via YouTube Data API v3

Every 30 minutes, a Celery Beat task polls all monitored channels using the YouTube Data API. The implementation resolves each channel's uploads playlist, then fetches recent items:

async def get_channel_videos(
    api_key: str,
    channel_id: str,
    published_after: str | None = None,
    max_results: int = 50,
) -> list[dict]:
    async with httpx.AsyncClient() as client:
        # Get the uploads playlist ID
        channel_resp = await client.get(
            f"{YOUTUBE_API_BASE}/channels",
            params={
                "part": "contentDetails",
                "id": channel_id,
                "key": api_key,
            },
        )
        channel_resp.raise_for_status()
        channel_data = channel_resp.json()

        items = channel_data.get("items", [])
        if not items:
            return []

        uploads_playlist_id = items[0]["contentDetails"]["relatedPlaylists"]["uploads"]

        # Fetch playlist items
        params = {
            "part": "snippet,contentDetails",
            "playlistId": uploads_playlist_id,
            "maxResults": min(max_results, 50),
            "key": api_key,
        }

        playlist_resp = await client.get(
            f"{YOUTUBE_API_BASE}/playlistItems",
            params=params,
        )
        playlist_resp.raise_for_status()
        playlist_data = playlist_resp.json()

    # ... process items and fetch durations via videos.list
Enter fullscreen mode Exit fullscreen mode

The key detail: initial polls are limited to 5 videos to avoid overwhelming new users with a backlog. Subsequent polls fetch up to 50 and filter by published_after.

WebSub Push Notifications

For channels with active WebSub subscriptions, the system receives real-time push notifications from Google's PubSubHubbub hub. When WebSub is active for a channel, the polling task skips it entirely, saving API quota. More on this in the WebSub section below.

Stage 2: Caption Extraction With youtube_transcript_api

The cheapest and fastest path to a transcript is YouTube's own captions. Most popular channels have auto-generated or manually uploaded captions. The youtube_transcript_api library extracts them without needing a YouTube Data API quota hit:

async def get_video_captions(video_id: str) -> str | None:
    try:
        ytt_api = YouTubeTranscriptApi()
        transcript = ytt_api.fetch(video_id)

        # Build transcript with periodic timestamp markers
        parts: list[str] = []
        last_marker = -30.0
        for snippet in transcript:
            start = snippet.start
            if start - last_marker >= 30.0:
                mins = int(start // 60)
                secs = int(start % 60)
                parts.append(f"[{mins}:{secs:02d}]")
                last_marker = start
            parts.append(snippet.text)

        return " ".join(parts)
    except Exception:
        return None
Enter fullscreen mode Exit fullscreen mode

The timestamp markers every 30 seconds are critical. They flow through to the summarization stage, where Claude uses them to attribute insights to specific moments in the video. The final user-facing summary includes clickable timestamps.

This works for roughly 85-90% of videos. For the rest, we fall back to Whisper.

Stage 3: Audio Download and Whisper Transcription

When captions are unavailable, the pipeline downloads the audio with yt-dlp and sends it to Replicate's Whisper large-v3 model.

The yt-dlp Download

async def download_youtube_audio(video_id: str, output_dir: str | None = None) -> str:
    if output_dir is None:
        output_dir = tempfile.mkdtemp(prefix="murmurcast_")

    output_template = os.path.join(output_dir, f"{video_id}.%(ext)s")
    url = f"https://www.youtube.com/watch?v={video_id}"

    cmd = [
        "yt-dlp",
        "--extract-audio",
        "--audio-format", "mp3",
        "--audio-quality", "0",
        "--output", output_template,
        "--no-playlist",
        url,
    ]

    # Optional proxy for bypassing YouTube bot detection
    proxy_url = get_settings().yt_dlp_proxy
    env = None
    if proxy_url:
        env = {**os.environ, "HTTP_PROXY": proxy_url, "HTTPS_PROXY": proxy_url}

    process = await _run_subprocess(cmd, env=env)

    if process.returncode != 0:
        raise RuntimeError(
            f"yt-dlp failed (exit {process.returncode}): {process.stderr}"
        )

    return output_path
Enter fullscreen mode Exit fullscreen mode

YouTube actively blocks downloads from cloud server IPs (Railway, AWS, GCP, etc.). The yt_dlp_proxy setting configures a residential proxy to work around this. This was one of the hardest problems to solve in production and deserves its own article.

Whisper via Replicate

For transcription, the system uses Replicate's hosted Whisper large-v3 model as the primary provider, with OpenAI's Whisper API as a fallback only when Replicate is not configured:

async def transcribe_with_whisper(audio_path: str) -> str:
    if settings.replicate_api_token:
        text = await _transcribe_replicate(audio_path)
        if text:
            return text
        raise ValueError("Replicate transcription returned empty result")

    if settings.openai_api_key:
        return await _transcribe_openai(audio_path)

    raise ValueError(
        "No Whisper provider configured (set REPLICATE_API_TOKEN or OPENAI_API_KEY)."
    )
Enter fullscreen mode Exit fullscreen mode

Large audio files (over 10MB) are automatically chunked using ffmpeg before being sent to Whisper. Each chunk gets its own API call, and the results are concatenated:

async def _transcribe_replicate(audio_path: str) -> str:
    with open(audio_path, "rb") as f:
        audio_data = f.read()

    file_size = len(audio_data)
    if file_size > 10 * 1024 * 1024:
        chunks = await chunk_audio(audio_path)
        transcripts = []
        for chunk_path in chunks:
            text = await _replicate_single_call(chunk_path)
            transcripts.append(text)
        return " ".join(transcripts)

    return await _replicate_single_call(audio_path)
Enter fullscreen mode Exit fullscreen mode

The Replicate SDK is synchronous, so each call runs in a thread executor with a 30-minute timeout per chunk.

Stage 4: Summarization With Claude

Once a transcript exists, the summarization task sends it to Claude with a structured prompt that requests JSON output:

MODEL = "claude-sonnet-4-20250514"

prompt = f"""Analyze the following transcript from "{title}" and provide a structured summary.

Respond with ONLY valid JSON in this exact format:
{{
    "short_summary": "A concise 2-3 sentence summary of the key points.",
    "detailed_summary": "A comprehensive multi-paragraph summary.",
    "key_topics": ["topic1", "topic2", "topic3"],
    "key_insights": [
        {{"text": "specific, actionable insight", "timestamp": "M:SS"}},
        ...up to N insights
    ]
}}
"""
Enter fullscreen mode Exit fullscreen mode

Two details matter here:

Insight scaling by duration. A 5-minute video gets 3 insights. A 2-hour podcast gets 15. This prevents short content from being padded with filler and long content from losing important points:

def _insight_count(duration_seconds: int | None) -> int:
    if not duration_seconds:
        return 5
    minutes = duration_seconds / 60
    if minutes < 10:
        return 3
    if minutes < 30:
        return 5
    if minutes < 60:
        return 8
    if minutes < 120:
        return 12
    return 15
Enter fullscreen mode Exit fullscreen mode

Timestamp-aware insights. When the transcript has embedded timestamps (from YouTube captions), the prompt instructs Claude to include the timestamp where each insight originates. The frontend converts these to clickable links that jump to that point in the video.

Transcripts are capped at 400,000 characters (roughly 100K tokens) to stay within Claude's context window and avoid excessive costs.

Stage 5: Daily Brief Generation and Email Delivery

The final stage aggregates all completed summaries for a user into a daily email digest. A Celery Beat task runs every hour and checks each user's preferred delivery time in their local timezone:

async def _generate_all_daily_briefs_async():
    now_utc = datetime.now(timezone.utc)

    async with get_task_session_factory()() as db:
        result = await db.execute(
            select(NotificationPreference)
            .options(joinedload(NotificationPreference.user))
            .where(NotificationPreference.daily_brief_email.is_(True))
        )
        prefs_list = result.unique().scalars().all()

        for prefs in prefs_list:
            user_tz = zoneinfo.ZoneInfo(prefs.timezone)
            user_now = now_utc.astimezone(user_tz)

            if user_now.hour != prefs.daily_brief_time.hour:
                continue

            # Generate and dispatch email...
Enter fullscreen mode Exit fullscreen mode

The brief is rendered as HTML and sent via Resend's API.

Challenge 1: YouTube Blocking Cloud Server IPs

This was the single biggest production issue. YouTube aggressively detects and blocks downloads from data center IP ranges. When running yt-dlp on Railway, you get Sign in to confirm you're not a bot errors.

The solution is a residential proxy configured via environment variable. The YT_DLP_PROXY setting accepts a SOCKS5 or HTTP proxy URL, and the download function passes it through environment variables to yt-dlp. But the real fix was making captions the primary path and treating audio download as a fallback, since youtube_transcript_api works reliably from any IP.

Challenge 2: Async Event Loops in Celery Workers

Celery workers are synchronous. SQLAlchemy's async engine uses asyncpg, which binds its connection pool to the event loop that created it. If you reuse an engine across multiple asyncio.run() calls, asyncpg throws connection pool is bound to a different event loop errors.

The fix is a factory function that creates a fresh engine for each task invocation:

def get_task_session_factory() -> async_sessionmaker[AsyncSession]:
    """Create a fresh engine + session factory for Celery tasks."""
    task_engine = create_async_engine(
        settings.database_url,
        echo=False,
        pool_size=5,
        max_overflow=5,
        pool_pre_ping=True,
    )
    return async_sessionmaker(
        task_engine,
        class_=AsyncSession,
        expire_on_commit=False,
    )
Enter fullscreen mode Exit fullscreen mode

Combined with asyncio.run() in the task utility:

def run_async(coro):
    """Run an async coroutine from synchronous Celery task context."""
    return asyncio.run(coro)
Enter fullscreen mode Exit fullscreen mode

This ensures every task gets its own event loop and its own connection pool.

Challenge 3: Task Reliability and Self-Healing

Deployments, worker crashes, and transient API failures all cause tasks to get stuck. The system handles this with a stale episode cleanup task that runs every 15 minutes:

  • Episodes stuck in transcribing or summarizing for 30+ minutes are reset to pending and re-dispatched
  • Episodes stuck in pending for 5+ minutes (task never dispatched) are dispatched
  • Newly added channels that have never been polled get their first poll

This makes the pipeline self-healing. After a deployment that restarts all workers, in-flight tasks are automatically retried within 15 minutes.

Lessons Learned

Caption-first saves money and time. YouTube captions are free and return in milliseconds. Whisper transcription costs money and takes minutes. Making captions the primary path reduced Whisper usage by roughly 85%.

Structured JSON from LLMs needs defensive parsing. Claude occasionally wraps JSON in markdown code blocks. The parser handles this:

def _parse_summary_json(response_text: str) -> dict:
    try:
        return json.loads(response_text)
    except json.JSONDecodeError:
        if "```

json" in response_text:
            json_str = response_text.split("

```json")[1].split("```

")[0].strip()
            return json.loads(json_str)
        elif "

```" in response_text:
            json_str = response_text.split("```

")[1].split("

```")[0].strip()
            return json.loads(json_str)
        else:
            raise ValueError(f"Could not parse summary response: {response_text[:200]}")
Enter fullscreen mode Exit fullscreen mode

Stagger task dispatch. When polling discovers 10 new episodes at once, dispatching all transcription tasks simultaneously overwhelms API rate limits. The system staggers them with a countdown multiplier: apply_async(args=[str(episode.id)], countdown=created_count * 30).

Pool sizing matters for Celery tasks. The web server engine uses pool_size=20, but Celery task engines use pool_size=5. Tasks are sequential within a worker, so a small pool avoids wasting database connections.

FAQ

What does this pipeline cost to run per month?

The infrastructure (Railway) costs roughly $10-20/month for the backend, database, and Redis. API costs depend on volume. YouTube Data API is free within its quota (10,000 units/day). Replicate Whisper charges per second of audio (roughly $0.003/second). Claude summarization costs roughly $0.01-0.05 per episode depending on transcript length. For a user following 20 channels, expect $15-30/month in API costs.

Why Replicate Whisper instead of OpenAI's Whisper API?

Replicate runs Whisper large-v3, which is more accurate than the model behind OpenAI's API. Replicate also has more generous rate limits. The system only falls back to OpenAI Whisper when Replicate is not configured.

How do you handle very long videos (3+ hours)?

The audio chunking system splits files into segments under 25MB using ffmpeg. Each chunk is transcribed separately and concatenated. Transcripts are capped at 400,000 characters before being sent to Claude to avoid excessive token costs. For a 3-hour podcast, this typically captures the full content.

Why FastAPI with async SQLAlchemy instead of Django?

The pipeline is I/O-bound -- making HTTP calls to YouTube, Replicate, Anthropic, and Resend. Async enables concurrent operations without threading complexity. FastAPI's native async support with SQLAlchemy 2.0's async engine (via asyncpg) provides excellent throughput for the web API while sharing the same models and service layer with Celery tasks.

How do you prevent duplicate episode processing?

Episodes are deduplicated by external_id (the YouTube video ID or podcast GUID). Before creating an episode record, the system checks for an existing record with the same external_id. This prevents duplicates from both polling and WebSub notifications running concurrently.

What happens when the Claude API is down?

The summarization task is configured with exponential backoff retries (up to 10 retries, max 1800 seconds between attempts). If Claude is down, the task keeps retrying. If all retries are exhausted, the episode is marked as failed, an admin notification email is sent, and the stale episode cleanup task will pick it up on the next cycle for another round of retries.


If you want to try this yourself, MurmurCast is live at murmurcast.com. Add your YouTube channels and podcasts, and you will get AI-generated summaries delivered to your inbox every morning. The pipeline described here processes every episode automatically -- no manual intervention required.

Top comments (0)