YouTube Channel Analytics Pipeline.
Let me break this down into manageable components and provide you with the complete implementation.
Project Structure
youtube-analytics-pipeline/
├── docker-compose.yml
├── airflow/
│ ├── Dockerfile
│ ├── dags/
│ │ └── youtube_pipeline.py
│ └── requirements.txt
├── spark/
│ ├── Dockerfile
│ └── jobs/
│ └── youtube_etl.py
├── scripts/
│ └── youtube_api.py
├── grafana/
│ └── provisioning/
│ ├── dashboards/
│ │ └── dashboard.yml
│ └── datasources/
│ └── datasource.yml
└── data/
├── raw/
└── processed/
1. Docker Configuration
docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
airflow:
build: ./airflow
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
volumes:
- ./airflow/dags:/opt/airflow/dags
- ./scripts:/opt/airflow/scripts
- ./data:/opt/airflow/data
ports:
- "8080:8080"
command: >
bash -c "airflow db init &&
airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.org &&
airflow webserver & airflow scheduler"
spark:
build: ./spark
volumes:
- ./spark/jobs:/opt/spark/jobs
- ./data:/opt/spark/data
- ./scripts:/opt/spark/scripts
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- postgres
volumes:
postgres_data:
Airflow Dockerfile
FROM apache/airflow:2.5.1
USER root
RUN apt-get update && \
apt-get install -y --no-install-recommends \
openjdk-11-jre-headless \
curl \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
RUN pip install --no-cache-dir apache-airflow-providers-apache-spark
Spark Dockerfile
FROM bitnami/spark:3.3.0
USER root
RUN apt-get update && apt-get install -y python3-pip
RUN pip install pyspark pandas requests
USER 1001
2. Data Ingestion Script
scripts/youtube_api.py
import requests
import pandas as pd
import json
import os
from datetime import datetime
from typing import List, Dict
class YouTubeAPI:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://www.googleapis.com/youtube/v3"
def get_channel_stats(self, channel_id: str) -> Dict:
"""Fetch channel statistics"""
url = f"{self.base_url}/channels"
params = {
'part': 'snippet,statistics',
'id': channel_id,
'key': self.api_key
}
response = requests.get(url, params=params)
data = response.json()
if 'items' in data and len(data['items']) > 0:
channel = data['items'][0]
return {
'channel_id': channel_id,
'title': channel['snippet']['title'],
'description': channel['snippet']['description'],
'published_at': channel['snippet']['publishedAt'],
'subscribers': channel['statistics'].get('subscriberCount', 0),
'total_views': channel['statistics'].get('viewCount', 0),
'video_count': channel['statistics'].get('videoCount', 0),
'timestamp': datetime.now().isoformat()
}
return {}
def get_channel_videos(self, channel_id: str, max_results: int = 50) -> List[Dict]:
"""Fetch videos from a channel"""
# First, get uploads playlist ID
url = f"{self.base_url}/channels"
params = {
'part': 'contentDetails',
'id': channel_id,
'key': self.api_key
}
response = requests.get(url, params=params)
data = response.json()
if not data.get('items'):
return []
uploads_playlist_id = data['items'][0]['contentDetails']['relatedPlaylists']['uploads']
# Get videos from uploads playlist
videos = []
next_page_token = None
while len(videos) < max_results:
url = f"{self.base_url}/playlistItems"
params = {
'part': 'snippet,contentDetails',
'playlistId': uploads_playlist_id,
'maxResults': min(50, max_results - len(videos)),
'key': self.api_key
}
if next_page_token:
params['pageToken'] = next_page_token
response = requests.get(url, params=params)
data = response.json()
for item in data.get('items', []):
video_id = item['contentDetails']['videoId']
video_details = self.get_video_details(video_id)
if video_details:
videos.append(video_details)
next_page_token = data.get('nextPageToken')
if not next_page_token:
break
return videos
def get_video_details(self, video_id: str) -> Dict:
"""Get detailed statistics for a specific video"""
url = f"{self.base_url}/videos"
params = {
'part': 'snippet,statistics,contentDetails',
'id': video_id,
'key': self.api_key
}
response = requests.get(url, params=params)
data = response.json()
if 'items' in data and len(data['items']) > 0:
video = data['items'][0]
return {
'video_id': video_id,
'title': video['snippet']['title'],
'description': video['snippet']['description'],
'published_at': video['snippet']['publishedAt'],
'channel_id': video['snippet']['channelId'],
'duration': video['contentDetails']['duration'],
'views': video['statistics'].get('viewCount', 0),
'likes': video['statistics'].get('likeCount', 0),
'comments': video['statistics'].get('commentCount', 0),
'timestamp': datetime.now().isoformat()
}
return {}
def main():
# Configuration
API_KEY = os.getenv('YOUTUBE_API_KEY')
CHANNEL_ID = os.getenv('YOUTUBE_CHANNEL_ID', 'UC_x5XG1OV2P6uZZ5FSM9Ttw') # Google Developers channel as example
if not API_KEY:
raise ValueError("Please set YOUTUBE_API_KEY environment variable")
youtube = YouTubeAPI(API_KEY)
# Fetch channel data
print("Fetching channel statistics...")
channel_stats = youtube.get_channel_stats(CHANNEL_ID)
# Fetch videos data
print("Fetching videos data...")
videos_data = youtube.get_channel_videos(CHANNEL_ID, max_results=100)
# Save raw data
os.makedirs('/opt/airflow/data/raw', exist_ok=True)
with open('/opt/airflow/data/raw/channel_stats.json', 'w') as f:
json.dump(channel_stats, f, indent=2)
with open('/opt/airflow/data/raw/videos_data.json', 'w') as f:
json.dump(videos_data, f, indent=2)
print(f"Data saved: {len(videos_data)} videos processed")
if __name__ == "__main__":
main()
3. PySpark ETL Job
spark/jobs/youtube_etl.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from datetime import datetime
def create_spark_session():
return SparkSession.builder \
.appName("YouTubeAnalytics") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
def read_json_data(spark, file_path):
"""Read JSON data as text and parse manually"""
with open(file_path, 'r') as f:
data = json.load(f)
# Handle both single object and array of objects
if isinstance(data, list):
return spark.createDataFrame(data)
else:
return spark.createDataFrame([data])
def process_channel_data(spark):
"""Process channel statistics"""
df = read_json_data(spark, '/opt/spark/data/raw/channel_stats.json')
processed_df = df.select(
col('channel_id'),
col('title').alias('channel_title'),
col('description').alias('channel_description'),
to_timestamp(col('published_at')).alias('channel_created_at'),
col('subscribers').cast('long'),
col('total_views').cast('long'),
col('video_count').cast('int'),
to_timestamp(col('timestamp')).alias('processed_at')
)
return processed_df
def process_videos_data(spark):
"""Process videos data and create features"""
df = read_json_data(spark, '/opt/spark/data/raw/videos_data.json')
# Calculate duration in minutes and engagement rate
processed_df = df.select(
col('video_id'),
col('channel_id'),
col('title').alias('video_title'),
to_timestamp(col('published_at')).alias('published_at'),
col('views').cast('long'),
col('likes').cast('long'),
col('comments').cast('long'),
# Extract features
dayofweek(col('published_at')).alias('publish_day_of_week'),
hour(col('published_at')).alias('publish_hour'),
# Calculate engagement metrics
when(col('views') > 0, (col('likes') + col('comments')) / col('views')).otherwise(0).alias('engagement_rate'),
(col('likes') / col('views')).alias('like_rate'),
(col('comments') / col('views')).alias('comment_rate'),
# Duration parsing (ISO 8601 format)
regexp_extract(col('duration'), r'PT(\d+)M', 1).cast('int').alias('duration_minutes'),
to_timestamp(col('timestamp')).alias('processed_at')
).filter(col('views').isNotNull()) # Remove videos with no views data
return processed_df
def calculate_trending_metrics(videos_df):
"""Calculate trending metrics and top videos"""
trending_videos = videos_df.orderBy(col('views').desc()).limit(10)
daily_metrics = videos_df.groupBy(
date_trunc('day', col('published_at')).alias('publish_date')
).agg(
count('*').alias('videos_published'),
avg('views').alias('avg_views'),
avg('engagement_rate').alias('avg_engagement_rate'),
sum('views').alias('total_views')
)
return trending_videos, daily_metrics
def main():
spark = create_spark_session()
print("Processing channel data...")
channel_df = process_channel_data(spark)
print("Processing videos data...")
videos_df = process_videos_data(spark)
print("Calculating trending metrics...")
trending_videos, daily_metrics = calculate_trending_metrics(videos_df)
# Save processed data
output_path = '/opt/spark/data/processed'
channel_df.write.mode('overwrite').parquet(f'{output_path}/channel_stats')
videos_df.write.mode('overwrite').parquet(f'{output_path}/videos_processed')
trending_videos.write.mode('overwrite').parquet(f'{output_path}/trending_videos')
daily_metrics.write.mode('overwrite').parquet(f'{output_path}/daily_metrics')
# Show some results
print("Top 10 videos by views:")
trending_videos.select('video_title', 'views', 'engagement_rate').show(truncate=False)
print("Channel statistics:")
channel_df.show(truncate=False)
spark.stop()
if __name__ == "__main__":
main()
4. Airflow DAG
airflow/dags/youtube_pipeline.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta
import os
default_args = {
'owner': 'youtube_analytics',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def run_youtube_extraction():
"""Run YouTube data extraction"""
from scripts.youtube_api import main
main()
dag = DAG(
'youtube_analytics_pipeline',
default_args=default_args,
description='End-to-end YouTube analytics pipeline',
schedule_interval=timedelta(days=1),
catchup=False,
)
# Task 1: Extract data from YouTube API
extract_task = PythonOperator(
task_id='extract_youtube_data',
python_callable=run_youtube_extraction,
dag=dag,
)
# Task 2: Transform data with PySpark
transform_task = SparkSubmitOperator(
task_id='transform_data_with_spark',
application='/opt/airflow/scripts/spark/jobs/youtube_etl.py',
conn_id='spark_default',
verbose=False,
dag=dag,
)
# Task 3: Load data to PostgreSQL (optional)
load_task = BashOperator(
task_id='load_to_database',
bash_command='echo "Loading processed data to database..."',
dag=dag,
)
# Set task dependencies
extract_task >> transform_task >> load_task
# Optional: Add data quality check
data_quality_check = BashOperator(
task_id='data_quality_check',
bash_command='echo "Running data quality checks..."',
dag=dag,
)
transform_task >> data_quality_check
5. Grafana Configuration
grafana/provisioning/datasources/datasource.yml
apiVersion: 1
datasources:
- name: PostgreSQL
type: postgresql
access: proxy
url: postgres:5432
database: airflow
user: airflow
password: airflow
isDefault: true
version: 1
editable: true
grafana/provisioning/dashboards/dashboard.yml
apiVersion: 1
providers:
- name: 'default'
orgId: 1
folder: ''
type: file
disableDeletion: false
updateIntervalSeconds: 10
allowUiUpdates: true
options:
path: /etc/grafana/provisioning/dashboards
6. Setup and Deployment Instructions
Environment Setup
-
Get YouTube API Key:
- Go to Google Cloud Console
- Enable YouTube Data API v3
- Create credentials (API Key)
Set environment variables:
export YOUTUBE_API_KEY="your_api_key_here"
export YOUTUBE_CHANNEL_ID="channel_id_you_want_to_analyze"
Running the Pipeline
- Start all services:
docker-compose up -d
-
Access services:
- Airflow: http://localhost:8080 (admin/admin)
- Grafana: http://localhost:3000 (admin/admin)
-
Trigger the pipeline:
- Go to Airflow UI
- Enable the
youtube_analytics_pipeline
DAG - Trigger manually or wait for scheduled run
Sample Queries for Grafana
Create these panels in your Grafana dashboard:
- Top Videos by Views:
SELECT video_title, views
FROM trending_videos
ORDER BY views DESC
LIMIT 10
- Engagement Rate Over Time:
SELECT publish_date, avg_engagement_rate
FROM daily_metrics
ORDER BY publish_date
- Publishing Heatmap:
SELECT publish_hour, publish_day_of_week, COUNT(*) as video_count
FROM videos_processed
GROUP BY publish_hour, publish_day_of_week
Key Features Implemented
✅ End-to-end data pipeline from YouTube API to visualization
✅ Data processing with PySpark including feature engineering
✅ Workflow orchestration with Airflow
✅ Containerization with Docker
✅ Interactive dashboards with Grafana
✅ Error handling and data quality checks
✅ Scalable architecture for multiple channels
This pipeline provides a solid foundation for YouTube channel analytics and can be extended with additional features like sentiment analysis, competitor benchmarking, and predictive analytics.
Top comments (0)