How I Built an AI Pipeline That Transcribes YouTube With Whisper and Summarizes With Claude
Getting useful information out of long-form YouTube content is a time sink. You watch a 90-minute podcast, realize the three key insights could have been a paragraph, and wonder if there is a better way. There is. I built MurmurCast, an automated pipeline that monitors YouTube channels, transcribes new uploads using Whisper, summarizes them with Claude, and delivers the results as a daily email digest. This article is a full architecture walkthrough -- the APIs involved, the fallback strategies, the infrastructure choices, and every hard lesson learned along the way.
The Problem: YouTube Content Is Unstructured and Time-Consuming
The average knowledge worker follows dozens of YouTube channels, podcasts, and newsletters. The content is valuable but locked in formats that demand your full attention. You cannot skim a video the way you skim an article. Search inside a video is terrible. And if you follow 20 channels posting weekly, you are looking at 20+ hours of content to stay current.
I wanted a system that would do the following automatically:
- Detect when a channel publishes new content
- Extract the spoken words from the video
- Generate a structured summary with key insights
- Bundle everything into a daily email digest
The result is a pipeline with five distinct stages, each with its own challenges.
Architecture Overview: The Five-Stage Pipeline
The system runs on FastAPI (backend), Next.js (frontend), PostgreSQL (database), Celery + Redis (task queue), and is deployed on Railway. Here is how data flows through the system:
YouTube Data API / WebSub Notification
|
v
[1. Discovery] -- Poll channels or receive push notifications
|
v
[2. Caption Extraction] -- youtube_transcript_api (free, fast)
|
v (if captions unavailable)
[3. Audio Download + Whisper] -- yt-dlp + Replicate Whisper large-v3
|
v
[4. Summarization] -- Claude (Anthropic) with structured JSON output
|
v
[5. Daily Brief Generation] -- Aggregate summaries, send email via Resend
Each stage is a Celery task that chains into the next. If transcription succeeds, it dispatches summarization. If summarization succeeds, the episode is marked complete and becomes eligible for the next daily brief.
Stage 1: Discovering New Videos
There are two mechanisms for detecting new uploads, and the system uses both.
Polling via YouTube Data API v3
Every 30 minutes, a Celery Beat task polls all monitored channels using the YouTube Data API. The implementation resolves each channel's uploads playlist, then fetches recent items:
async def get_channel_videos(
api_key: str,
channel_id: str,
published_after: str | None = None,
max_results: int = 50,
) -> list[dict]:
async with httpx.AsyncClient() as client:
# Get the uploads playlist ID
channel_resp = await client.get(
f"{YOUTUBE_API_BASE}/channels",
params={
"part": "contentDetails",
"id": channel_id,
"key": api_key,
},
)
channel_resp.raise_for_status()
channel_data = channel_resp.json()
items = channel_data.get("items", [])
if not items:
return []
uploads_playlist_id = items[0]["contentDetails"]["relatedPlaylists"]["uploads"]
# Fetch playlist items
params = {
"part": "snippet,contentDetails",
"playlistId": uploads_playlist_id,
"maxResults": min(max_results, 50),
"key": api_key,
}
playlist_resp = await client.get(
f"{YOUTUBE_API_BASE}/playlistItems",
params=params,
)
playlist_resp.raise_for_status()
playlist_data = playlist_resp.json()
# ... process items and fetch durations via videos.list
The key detail: initial polls are limited to 5 videos to avoid overwhelming new users with a backlog. Subsequent polls fetch up to 50 and filter by published_after.
WebSub Push Notifications
For channels with active WebSub subscriptions, the system receives real-time push notifications from Google's PubSubHubbub hub. When WebSub is active for a channel, the polling task skips it entirely, saving API quota. More on this in the WebSub section below.
Stage 2: Caption Extraction With youtube_transcript_api
The cheapest and fastest path to a transcript is YouTube's own captions. Most popular channels have auto-generated or manually uploaded captions. The youtube_transcript_api library extracts them without needing a YouTube Data API quota hit:
async def get_video_captions(video_id: str) -> str | None:
try:
ytt_api = YouTubeTranscriptApi()
transcript = ytt_api.fetch(video_id)
# Build transcript with periodic timestamp markers
parts: list[str] = []
last_marker = -30.0
for snippet in transcript:
start = snippet.start
if start - last_marker >= 30.0:
mins = int(start // 60)
secs = int(start % 60)
parts.append(f"[{mins}:{secs:02d}]")
last_marker = start
parts.append(snippet.text)
return " ".join(parts)
except Exception:
return None
The timestamp markers every 30 seconds are critical. They flow through to the summarization stage, where Claude uses them to attribute insights to specific moments in the video. The final user-facing summary includes clickable timestamps.
This works for roughly 85-90% of videos. For the rest, we fall back to Whisper.
Stage 3: Audio Download and Whisper Transcription
When captions are unavailable, the pipeline downloads the audio with yt-dlp and sends it to Replicate's Whisper large-v3 model.
The yt-dlp Download
async def download_youtube_audio(video_id: str, output_dir: str | None = None) -> str:
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="murmurcast_")
output_template = os.path.join(output_dir, f"{video_id}.%(ext)s")
url = f"https://www.youtube.com/watch?v={video_id}"
cmd = [
"yt-dlp",
"--extract-audio",
"--audio-format", "mp3",
"--audio-quality", "0",
"--output", output_template,
"--no-playlist",
url,
]
# Optional proxy for bypassing YouTube bot detection
proxy_url = get_settings().yt_dlp_proxy
env = None
if proxy_url:
env = {**os.environ, "HTTP_PROXY": proxy_url, "HTTPS_PROXY": proxy_url}
process = await _run_subprocess(cmd, env=env)
if process.returncode != 0:
raise RuntimeError(
f"yt-dlp failed (exit {process.returncode}): {process.stderr}"
)
return output_path
YouTube actively blocks downloads from cloud server IPs (Railway, AWS, GCP, etc.). The yt_dlp_proxy setting configures a residential proxy to work around this. This was one of the hardest problems to solve in production and deserves its own article.
Whisper via Replicate
For transcription, the system uses Replicate's hosted Whisper large-v3 model as the primary provider, with OpenAI's Whisper API as a fallback only when Replicate is not configured:
async def transcribe_with_whisper(audio_path: str) -> str:
if settings.replicate_api_token:
text = await _transcribe_replicate(audio_path)
if text:
return text
raise ValueError("Replicate transcription returned empty result")
if settings.openai_api_key:
return await _transcribe_openai(audio_path)
raise ValueError(
"No Whisper provider configured (set REPLICATE_API_TOKEN or OPENAI_API_KEY)."
)
Large audio files (over 10MB) are automatically chunked using ffmpeg before being sent to Whisper. Each chunk gets its own API call, and the results are concatenated:
async def _transcribe_replicate(audio_path: str) -> str:
with open(audio_path, "rb") as f:
audio_data = f.read()
file_size = len(audio_data)
if file_size > 10 * 1024 * 1024:
chunks = await chunk_audio(audio_path)
transcripts = []
for chunk_path in chunks:
text = await _replicate_single_call(chunk_path)
transcripts.append(text)
return " ".join(transcripts)
return await _replicate_single_call(audio_path)
The Replicate SDK is synchronous, so each call runs in a thread executor with a 30-minute timeout per chunk.
Stage 4: Summarization With Claude
Once a transcript exists, the summarization task sends it to Claude with a structured prompt that requests JSON output:
MODEL = "claude-sonnet-4-20250514"
prompt = f"""Analyze the following transcript from "{title}" and provide a structured summary.
Respond with ONLY valid JSON in this exact format:
{{
"short_summary": "A concise 2-3 sentence summary of the key points.",
"detailed_summary": "A comprehensive multi-paragraph summary.",
"key_topics": ["topic1", "topic2", "topic3"],
"key_insights": [
{{"text": "specific, actionable insight", "timestamp": "M:SS"}},
...up to N insights
]
}}
"""
Two details matter here:
Insight scaling by duration. A 5-minute video gets 3 insights. A 2-hour podcast gets 15. This prevents short content from being padded with filler and long content from losing important points:
def _insight_count(duration_seconds: int | None) -> int:
if not duration_seconds:
return 5
minutes = duration_seconds / 60
if minutes < 10:
return 3
if minutes < 30:
return 5
if minutes < 60:
return 8
if minutes < 120:
return 12
return 15
Timestamp-aware insights. When the transcript has embedded timestamps (from YouTube captions), the prompt instructs Claude to include the timestamp where each insight originates. The frontend converts these to clickable links that jump to that point in the video.
Transcripts are capped at 400,000 characters (roughly 100K tokens) to stay within Claude's context window and avoid excessive costs.
Stage 5: Daily Brief Generation and Email Delivery
The final stage aggregates all completed summaries for a user into a daily email digest. A Celery Beat task runs every hour and checks each user's preferred delivery time in their local timezone:
async def _generate_all_daily_briefs_async():
now_utc = datetime.now(timezone.utc)
async with get_task_session_factory()() as db:
result = await db.execute(
select(NotificationPreference)
.options(joinedload(NotificationPreference.user))
.where(NotificationPreference.daily_brief_email.is_(True))
)
prefs_list = result.unique().scalars().all()
for prefs in prefs_list:
user_tz = zoneinfo.ZoneInfo(prefs.timezone)
user_now = now_utc.astimezone(user_tz)
if user_now.hour != prefs.daily_brief_time.hour:
continue
# Generate and dispatch email...
The brief is rendered as HTML and sent via Resend's API.
Challenge 1: YouTube Blocking Cloud Server IPs
This was the single biggest production issue. YouTube aggressively detects and blocks downloads from data center IP ranges. When running yt-dlp on Railway, you get Sign in to confirm you're not a bot errors.
The solution is a residential proxy configured via environment variable. The YT_DLP_PROXY setting accepts a SOCKS5 or HTTP proxy URL, and the download function passes it through environment variables to yt-dlp. But the real fix was making captions the primary path and treating audio download as a fallback, since youtube_transcript_api works reliably from any IP.
Challenge 2: Async Event Loops in Celery Workers
Celery workers are synchronous. SQLAlchemy's async engine uses asyncpg, which binds its connection pool to the event loop that created it. If you reuse an engine across multiple asyncio.run() calls, asyncpg throws connection pool is bound to a different event loop errors.
The fix is a factory function that creates a fresh engine for each task invocation:
def get_task_session_factory() -> async_sessionmaker[AsyncSession]:
"""Create a fresh engine + session factory for Celery tasks."""
task_engine = create_async_engine(
settings.database_url,
echo=False,
pool_size=5,
max_overflow=5,
pool_pre_ping=True,
)
return async_sessionmaker(
task_engine,
class_=AsyncSession,
expire_on_commit=False,
)
Combined with asyncio.run() in the task utility:
def run_async(coro):
"""Run an async coroutine from synchronous Celery task context."""
return asyncio.run(coro)
This ensures every task gets its own event loop and its own connection pool.
Challenge 3: Task Reliability and Self-Healing
Deployments, worker crashes, and transient API failures all cause tasks to get stuck. The system handles this with a stale episode cleanup task that runs every 15 minutes:
- Episodes stuck in
transcribingorsummarizingfor 30+ minutes are reset topendingand re-dispatched - Episodes stuck in
pendingfor 5+ minutes (task never dispatched) are dispatched - Newly added channels that have never been polled get their first poll
This makes the pipeline self-healing. After a deployment that restarts all workers, in-flight tasks are automatically retried within 15 minutes.
Lessons Learned
Caption-first saves money and time. YouTube captions are free and return in milliseconds. Whisper transcription costs money and takes minutes. Making captions the primary path reduced Whisper usage by roughly 85%.
Structured JSON from LLMs needs defensive parsing. Claude occasionally wraps JSON in markdown code blocks. The parser handles this:
def _parse_summary_json(response_text: str) -> dict:
try:
return json.loads(response_text)
except json.JSONDecodeError:
if "```
json" in response_text:
json_str = response_text.split("
```json")[1].split("```
")[0].strip()
return json.loads(json_str)
elif "
```" in response_text:
json_str = response_text.split("```
")[1].split("
```")[0].strip()
return json.loads(json_str)
else:
raise ValueError(f"Could not parse summary response: {response_text[:200]}")
Stagger task dispatch. When polling discovers 10 new episodes at once, dispatching all transcription tasks simultaneously overwhelms API rate limits. The system staggers them with a countdown multiplier: apply_async(args=[str(episode.id)], countdown=created_count * 30).
Pool sizing matters for Celery tasks. The web server engine uses pool_size=20, but Celery task engines use pool_size=5. Tasks are sequential within a worker, so a small pool avoids wasting database connections.
FAQ
What does this pipeline cost to run per month?
The infrastructure (Railway) costs roughly $10-20/month for the backend, database, and Redis. API costs depend on volume. YouTube Data API is free within its quota (10,000 units/day). Replicate Whisper charges per second of audio (roughly $0.003/second). Claude summarization costs roughly $0.01-0.05 per episode depending on transcript length. For a user following 20 channels, expect $15-30/month in API costs.
Why Replicate Whisper instead of OpenAI's Whisper API?
Replicate runs Whisper large-v3, which is more accurate than the model behind OpenAI's API. Replicate also has more generous rate limits. The system only falls back to OpenAI Whisper when Replicate is not configured.
How do you handle very long videos (3+ hours)?
The audio chunking system splits files into segments under 25MB using ffmpeg. Each chunk is transcribed separately and concatenated. Transcripts are capped at 400,000 characters before being sent to Claude to avoid excessive token costs. For a 3-hour podcast, this typically captures the full content.
Why FastAPI with async SQLAlchemy instead of Django?
The pipeline is I/O-bound -- making HTTP calls to YouTube, Replicate, Anthropic, and Resend. Async enables concurrent operations without threading complexity. FastAPI's native async support with SQLAlchemy 2.0's async engine (via asyncpg) provides excellent throughput for the web API while sharing the same models and service layer with Celery tasks.
How do you prevent duplicate episode processing?
Episodes are deduplicated by external_id (the YouTube video ID or podcast GUID). Before creating an episode record, the system checks for an existing record with the same external_id. This prevents duplicates from both polling and WebSub notifications running concurrently.
What happens when the Claude API is down?
The summarization task is configured with exponential backoff retries (up to 10 retries, max 1800 seconds between attempts). If Claude is down, the task keeps retrying. If all retries are exhausted, the episode is marked as failed, an admin notification email is sent, and the stale episode cleanup task will pick it up on the next cycle for another round of retries.
If you want to try this yourself, MurmurCast is live at murmurcast.com. Add your YouTube channels and podcasts, and you will get AI-generated summaries delivered to your inbox every morning. The pipeline described here processes every episode automatically -- no manual intervention required.
Top comments (0)