YouTube Analytics Pipeline Using Delta Lake and PySpark.
Complete data pipeline for extracting, processing, and analyzing data from multiple YouTube channels using Delta Lake and PySpark.
π― Overview
This pipeline extracts data from YouTube API, processes it with PySpark, stores it in Delta Lake with medallion architecture, and generates comprehensive analytics.
π Data Flow
YouTube API β Extract β Process (PySpark) β Delta Lake (Bronze/Silver/Gold) β Analytics
Pipeline Stages
- Extract Channels - Channel statistics and metadata
- Extract Videos - Video details, views, likes, comments count
- Extract Comments - User comments and engagement metrics
- Process Data - Clean, transform, and enrich with PySpark
- Write to Delta Lake - Store in medallion architecture
- Generate Analytics - Create insights and reports
- Quality Checks - Validate data quality
π YouTube API Setup
Get Your API Key
- Go to Google Cloud Console
- Create a new project or select existing
- Enable YouTube Data API v3
- Go to Credentials β Create Credentials β API Key
- Copy your API key
Add to Environment
# Edit .env file
nano .env
# Add your API key
YOUTUBE_API_KEY=your_actual_api_key_here
API Quota Limits
- Free tier: 10,000 units/day
- Channel read: 1 unit
- Video list: 1 unit per 50 videos
- Comments: 1 unit per request
- Pipeline uses ~200-500 units per run (depending on configuration)
πΊ Configured Channels
The pipeline tracks these channels by default:
- Google Developers - Technology and development
- Fireship - Quick coding tutorials
- freeCodeCamp - Programming education
- The Net Ninja - Web development
- Web Dev Simplified - Modern web development
Add More Channels
Edit scripts/extract_youtube_channels.py:
YOUTUBE_CHANNELS = [
{
'channel_id': 'UCxxxxxxxxxxxxxx', # Get from channel URL
'channel_name': 'Channel Name'
},
# Add more...
]
To find channel ID:
- Go to channel page
- View page source
- Search for
"channelId":" - Or use:
https://www.youtube.com/channel/CHANNEL_ID
π Running the Pipeline
Option 1: Via Airflow UI
- Navigate to http://localhost:8080
- Login (airflow/airflow)
- Find
youtube_analytics_pipelineDAG - Toggle to enable
- Click "Trigger DAG"
Option 2: Via Command Line
# Trigger the complete pipeline
docker compose exec airflow-webserver airflow dags trigger youtube_analytics_pipeline
# Check status
docker compose exec airflow-webserver airflow dags list-runs -d youtube_analytics_pipeline
# View task logs
docker compose exec airflow-webserver airflow tasks logs youtube_analytics_pipeline extract_youtube_channels latest
Option 3: Run Scripts Individually
# 1. Extract channels
docker compose exec spark-master python /opt/spark-scripts/extract_youtube_channels.py
# 2. Extract videos
docker compose exec spark-master python /opt/spark-scripts/extract_youtube_videos.py
# 3. Extract comments
docker compose exec spark-master python /opt/spark-scripts/extract_youtube_comments.py
# 4. Process data
docker compose exec spark-master spark-submit \
--master spark://spark-master:7077 \
--packages io.delta:delta-spark_2.12:3.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
/opt/spark-scripts/process_youtube_data.py
# 5. Write to Delta Lake
docker compose exec spark-master spark-submit \
--master spark://spark-master:7077 \
--packages io.delta:delta-spark_2.12:3.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
/opt/spark-scripts/write_youtube_delta.py
# 6. Generate analytics
docker compose exec spark-master spark-submit \
--master spark://spark-master:7077 \
--packages io.delta:delta-spark_2.12:3.0.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
/opt/spark-scripts/youtube_analytics.py
# 7. Run quality checks
docker compose exec spark-master python /opt/spark-scripts/youtube_quality_checks.py
π Data Structure
Delta Lake Tables
Bronze Layer (Raw Data)
-
bronze_youtube_channels- Channel metadata -
bronze_youtube_videos- Video details (partitioned by year/month) -
bronze_youtube_comments- User comments
Silver Layer (Cleaned Data)
-
silver_youtube_channels- Validated channels -
silver_youtube_videos- Enriched video data -
silver_youtube_comments- Processed comments
Gold Layer (Analytics)
-
gold_youtube_channel_performance- Channel metrics -
gold_youtube_duration_performance- Video length analysis -
gold_youtube_monthly_trends- Publishing patterns -
gold_youtube_comment_activity- Engagement metrics
π Analytics Generated
1. Top Performing Channels
- Subscriber count
- Total views
- Average engagement rate
- Content velocity
2. Video Duration Analysis
- Optimal video length
- Performance by duration category
- Engagement patterns
3. Content Trends
- Publishing frequency
- Monthly performance
- Growth patterns
4. Engagement Insights
- Like-to-view ratio
- Comment activity
- Viral content identification
5. Comment Analysis
- Sentiment patterns
- Question detection
- User engagement levels
π Viewing Results
Option 1: Check Reports
# View analytics report
cat data/reports/youtube_analytics_report.json | jq .
# View quality report
cat data/reports/youtube_quality_report.json | jq .
Option 2: Query Delta Lake (Jupyter)
Access Jupyter at http://localhost:8888
from pyspark.sql import SparkSession
from delta import *
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Read channel performance
df = spark.read.format("delta").load("/opt/delta-lake/gold_youtube_channel_performance")
df.show()
# Top videos by engagement
videos = spark.read.format("delta").load("/opt/delta-lake/silver_youtube_videos")
videos.orderBy("engagement_rate", ascending=False).select("title", "view_count", "engagement_rate").show(10)
Option 3: SQL Queries
# Create temp views
spark.read.format("delta").load("/opt/delta-lake/silver_youtube_videos").createOrReplaceTempView("videos")
# Query with SQL
spark.sql("""
SELECT
channel_title,
COUNT(*) as video_count,
AVG(view_count) as avg_views,
AVG(engagement_rate) as avg_engagement
FROM videos
GROUP BY channel_title
ORDER BY avg_views DESC
""").show()
π Monitoring
Check Pipeline Status
# View Airflow DAG runs
docker compose logs airflow-scheduler | grep youtube_analytics
# Check Spark job status
docker compose logs spark-master | tail -50
# View extraction logs
docker compose exec spark-master ls -lh /opt/spark-data/raw/youtube/
Data Quality Metrics
The quality checks validate:
- β No null values in critical fields
- β No duplicate records
- β Valid data ranges (no negative counts)
- β Referential integrity
- β Data freshness
βοΈ Configuration
Adjust API Limits
Edit script files to control API usage:
# scripts/extract_youtube_videos.py
MAX_VIDEOS_PER_CHANNEL = 50 # Reduce to save quota
# scripts/extract_youtube_comments.py
MAX_COMMENTS_PER_VIDEO = 100 # Reduce to save quota
Schedule Changes
Edit dags/youtube_analytics_pipeline.py:
dag = DAG(
'youtube_analytics_pipeline',
schedule_interval='@daily', # Change to @weekly, @hourly, etc.
...
)
π Troubleshooting
API Key Issues
# Test API key
docker compose exec spark-master python3 << 'EOF'
import os
import requests
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv('YOUTUBE_API_KEY')
if not api_key:
print("ERROR: YOUTUBE_API_KEY not set!")
else:
# Test API call
response = requests.get(
'https://www.googleapis.com/youtube/v3/channels',
params={'part': 'snippet', 'id': 'UC_x5XG1OV2P6uZZ5FSM9Ttw', 'key': api_key}
)
if response.status_code == 200:
print("β API key is valid!")
else:
print(f"β API error: {response.status_code}")
print(response.json())
EOF
Common API Errors:
-
403- Invalid API key or quota exceeded -
400- Malformed request -
404- Channel/video not found
No Data Extracted
# Check if files were created
ls -lh data/raw/youtube/
# View extraction logs
docker compose logs spark-master | grep "EXTRACTING"
# Run without API key (uses sample data)
unset YOUTUBE_API_KEY
docker compose exec spark-master python /opt/spark-scripts/extract_youtube_channels.py
Spark Job Failures
# Check Spark master status
docker compose ps spark-master
# View Spark logs
docker compose logs spark-master
# Restart Spark
docker compose restart spark-master spark-worker
# Check memory allocation
docker stats spark-master spark-worker
Delta Lake Errors
# Check Delta Lake directory
ls -lh delta-lake/
# Verify Delta tables
docker compose exec spark-master python3 << 'EOF'
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
try:
delta_table = DeltaTable.forPath(spark, "/opt/delta-lake/bronze_youtube_channels")
print("β Delta table exists")
delta_table.history(1).show()
except Exception as e:
print(f"β Error: {e}")
EOF
π Example Use Cases
1. Content Strategy Analysis
# Find optimal publishing time
videos_df = spark.read.format("delta").load("/opt/delta-lake/silver_youtube_videos")
videos_df.groupBy("published_month") \
.agg({"view_count": "avg", "engagement_rate": "avg"}) \
.orderBy("published_month") \
.show()
2. Competitor Analysis
# Compare channel performance
channels_df = spark.read.format("delta").load("/opt/delta-lake/silver_youtube_channels")
channels_df.select(
"channel_name",
"subscriber_count",
"view_count",
"avg_views_per_video"
).orderBy("subscriber_count", ascending=False).show()
3. Engagement Pattern Discovery
# Find high-engagement videos
videos_df.filter("view_count > 10000") \
.orderBy("engagement_rate", ascending=False) \
.select("title", "channel_title", "view_count", "like_count", "engagement_rate") \
.show(20, truncate=50)
4. Content Gap Analysis
# Find popular topics with fewer videos
from pyspark.sql.functions import explode, col
# Analyze tags
videos_with_tags = videos_df.filter(col("tags").isNotNull())
tags_exploded = videos_with_tags.select(explode("tags").alias("tag"), "view_count")
tag_performance = tags_exploded.groupBy("tag") \
.agg(
count("*").alias("video_count"),
avg("view_count").alias("avg_views")
) \
.filter("video_count < 5 AND avg_views > 50000") \
.orderBy("avg_views", ascending=False)
tag_performance.show(20, truncate=False)
π Incremental Updates
For production use, implement incremental updates:
# In extract_youtube_videos.py
# Add logic to only fetch videos after last extraction
from datetime import datetime, timedelta
# Get last extraction date
last_extraction = get_last_extraction_date()
# Filter videos published after last extraction
params = {
'publishedAfter': last_extraction.isoformat() + 'Z'
}
π Dashboard Integration
Export to BI Tools
# Export to CSV for Tableau/PowerBI
spark.read.format("delta") \
.load("/opt/delta-lake/gold_youtube_channel_performance") \
.coalesce(1) \
.write.mode("overwrite") \
.option("header", "true") \
.csv("/opt/spark-data/exports/channel_performance")
Create Visualizations
import matplotlib.pyplot as plt
import pandas as pd
# Read data
df = spark.read.format("delta").load("/opt/delta-lake/gold_youtube_monthly_trends").toPandas()
# Plot monthly trends
plt.figure(figsize=(12, 6))
plt.plot(df['published_month'], df['total_views'], marker='o')
plt.title('Monthly View Trends')
plt.xlabel('Month')
plt.ylabel('Total Views')
plt.savefig('/opt/notebooks/monthly_trends.png')
π Best Practices
1. Protect Your API Key
- β Never commit .env to git
- β Use environment variables
- β Rotate keys periodically
- β Monitor API usage in Google Cloud Console
2. Optimize API Usage
- β Cache results when possible
- β Batch requests efficiently
- β Use incremental updates
- β Monitor quota consumption
3. Data Quality
- β Run quality checks regularly
- β Handle API errors gracefully
- β Validate data before processing
- β Keep audit logs
4. Performance
- β Partition large tables by date
- β Optimize Delta tables regularly
- β Use appropriate Spark memory settings
- β Monitor job execution times
π Scaling Considerations
Handling More Channels
# Process channels in batches
BATCH_SIZE = 10
for i in range(0, len(YOUTUBE_CHANNELS), BATCH_SIZE):
batch = YOUTUBE_CHANNELS[i:i+BATCH_SIZE]
process_batch(batch)
time.sleep(1) # Rate limiting
Distributed Processing
# Use Spark's parallelism
channel_ids_rdd = spark.sparkContext.parallelize(channel_ids, numSlices=4)
results = channel_ids_rdd.map(extract_channel_data).collect()
Data Retention
# Clean old data (run monthly)
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/opt/delta-lake/bronze_youtube_videos")
# Keep only last 12 months
delta_table.delete("published_year < year(current_date()) - 1")
# Vacuum old versions
delta_table.vacuum(168) # 7 days retention
π Learning Resources
YouTube API Documentation
Delta Lake Resources
PySpark Resources
π€ Contributing
Want to enhance the pipeline? Consider adding:
- Sentiment analysis on comments
- Thumbnail analysis
- Video category classification
- Predictive analytics (view forecasting)
- Real-time streaming updates
- Additional data sources (Twitter, Instagram)
π Notes
Sample Data Mode
If you don't have a YouTube API key, the pipeline will automatically generate sample data for testing. This allows you to:
- Test the complete pipeline flow
- Learn Delta Lake operations
- Understand PySpark transformations
- Validate analytics logic
Production Deployment
For production use:
- Set up proper error handling and retries
- Implement monitoring and alerting
- Use secrets management (AWS Secrets Manager, Vault)
- Set up automated backups
- Configure proper logging
- Implement data governance policies
π Support
Check Logs
# Airflow logs
docker compose logs -f airflow-scheduler
# Spark logs
docker compose logs -f spark-master
# All logs
docker compose logs -f
Common Issues
Issue: API Quota Exceeded
- Solution: Wait 24 hours or reduce
MAX_VIDEOS_PER_CHANNEL
Issue: Out of Memory
# Increase Spark memory in docker-compose.yaml
environment:
- SPARK_WORKER_MEMORY=4G
- SPARK_DRIVER_MEMORY=4G
Issue: Slow Extraction
- Solution: Process fewer videos or add more workers
π Sample Output
YOUTUBE ANALYTICS GENERATION: STARTING
============================================================
TOP PERFORMING CHANNELS
------------------------------------------------------------
Top 10 Channels by Total Views:
+-------------------+------------+-------------+------------+----------------+
|channel_title |video_count |total_views |total_likes |avg_engagement |
+-------------------+------------+-------------+------------+----------------+
|freeCodeCamp |500 |150000000 |3000000 |2.5 |
|Fireship |300 |120000000 |2800000 |3.1 |
|Google Developers |450 |100000000 |2000000 |2.2 |
+-------------------+------------+-------------+------------+----------------+
VIDEO DURATION ANALYSIS
------------------------------------------------------------
Performance by Duration Category:
+--------------------+------------+----------+----------+---------------+
|duration_category |video_count |avg_views |avg_likes |avg_engagement |
+--------------------+------------+----------+----------+---------------+
|Medium (3-10min) |850 |45000 |1200 |3.2 |
|Long (10-30min) |420 |52000 |1500 |3.0 |
|Short (<3min) |230 |35000 |950 |3.5 |
+--------------------+------------+----------+----------+---------------+
β Report saved to: /opt/spark-data/reports/youtube_analytics_report.json
Happy Analyzing! π

Top comments (0)