DEV Community

Lagat Josiah
Lagat Josiah

Posted on

YouTube Channel Analytics Pipeline.

YouTube Channel Analytics Pipeline.

Let me break this down into manageable components and provide you with the complete implementation.

Project Structure

youtube-analytics-pipeline/
├── docker-compose.yml
├── airflow/
│   ├── Dockerfile
│   ├── dags/
│   │   └── youtube_pipeline.py
│   └── requirements.txt
├── spark/
│   ├── Dockerfile
│   └── jobs/
│       └── youtube_etl.py
├── scripts/
│   └── youtube_api.py
├── grafana/
│   └── provisioning/
│       ├── dashboards/
│       │   └── dashboard.yml
│       └── datasources/
│           └── datasource.yml
└── data/
    ├── raw/
    └── processed/
Enter fullscreen mode Exit fullscreen mode

1. Docker Configuration

docker-compose.yml

version: '3.8'

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  airflow:
    build: ./airflow
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
    volumes:
      - ./airflow/dags:/opt/airflow/dags
      - ./scripts:/opt/airflow/scripts
      - ./data:/opt/airflow/data
    ports:
      - "8080:8080"
    command: >
      bash -c "airflow db init &&
               airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.org &&
               airflow webserver & airflow scheduler"

  spark:
    build: ./spark
    volumes:
      - ./spark/jobs:/opt/spark/jobs
      - ./data:/opt/spark/data
      - ./scripts:/opt/spark/scripts

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_USER=admin
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - ./grafana/provisioning:/etc/grafana/provisioning
    depends_on:
      - postgres

volumes:
  postgres_data:
Enter fullscreen mode Exit fullscreen mode

Airflow Dockerfile

FROM apache/airflow:2.5.1
USER root
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
    openjdk-11-jre-headless \
    curl \
    && apt-get autoremove -yqq --purge \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*
USER airflow
RUN pip install --no-cache-dir apache-airflow-providers-apache-spark
Enter fullscreen mode Exit fullscreen mode

Spark Dockerfile

FROM bitnami/spark:3.3.0
USER root
RUN apt-get update && apt-get install -y python3-pip
RUN pip install pyspark pandas requests
USER 1001
Enter fullscreen mode Exit fullscreen mode

2. Data Ingestion Script

scripts/youtube_api.py

import requests
import pandas as pd
import json
import os
from datetime import datetime
from typing import List, Dict

class YouTubeAPI:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://www.googleapis.com/youtube/v3"

    def get_channel_stats(self, channel_id: str) -> Dict:
        """Fetch channel statistics"""
        url = f"{self.base_url}/channels"
        params = {
            'part': 'snippet,statistics',
            'id': channel_id,
            'key': self.api_key
        }

        response = requests.get(url, params=params)
        data = response.json()

        if 'items' in data and len(data['items']) > 0:
            channel = data['items'][0]
            return {
                'channel_id': channel_id,
                'title': channel['snippet']['title'],
                'description': channel['snippet']['description'],
                'published_at': channel['snippet']['publishedAt'],
                'subscribers': channel['statistics'].get('subscriberCount', 0),
                'total_views': channel['statistics'].get('viewCount', 0),
                'video_count': channel['statistics'].get('videoCount', 0),
                'timestamp': datetime.now().isoformat()
            }
        return {}

    def get_channel_videos(self, channel_id: str, max_results: int = 50) -> List[Dict]:
        """Fetch videos from a channel"""
        # First, get uploads playlist ID
        url = f"{self.base_url}/channels"
        params = {
            'part': 'contentDetails',
            'id': channel_id,
            'key': self.api_key
        }

        response = requests.get(url, params=params)
        data = response.json()

        if not data.get('items'):
            return []

        uploads_playlist_id = data['items'][0]['contentDetails']['relatedPlaylists']['uploads']

        # Get videos from uploads playlist
        videos = []
        next_page_token = None

        while len(videos) < max_results:
            url = f"{self.base_url}/playlistItems"
            params = {
                'part': 'snippet,contentDetails',
                'playlistId': uploads_playlist_id,
                'maxResults': min(50, max_results - len(videos)),
                'key': self.api_key
            }

            if next_page_token:
                params['pageToken'] = next_page_token

            response = requests.get(url, params=params)
            data = response.json()

            for item in data.get('items', []):
                video_id = item['contentDetails']['videoId']
                video_details = self.get_video_details(video_id)
                if video_details:
                    videos.append(video_details)

            next_page_token = data.get('nextPageToken')
            if not next_page_token:
                break

        return videos

    def get_video_details(self, video_id: str) -> Dict:
        """Get detailed statistics for a specific video"""
        url = f"{self.base_url}/videos"
        params = {
            'part': 'snippet,statistics,contentDetails',
            'id': video_id,
            'key': self.api_key
        }

        response = requests.get(url, params=params)
        data = response.json()

        if 'items' in data and len(data['items']) > 0:
            video = data['items'][0]
            return {
                'video_id': video_id,
                'title': video['snippet']['title'],
                'description': video['snippet']['description'],
                'published_at': video['snippet']['publishedAt'],
                'channel_id': video['snippet']['channelId'],
                'duration': video['contentDetails']['duration'],
                'views': video['statistics'].get('viewCount', 0),
                'likes': video['statistics'].get('likeCount', 0),
                'comments': video['statistics'].get('commentCount', 0),
                'timestamp': datetime.now().isoformat()
            }
        return {}

def main():
    # Configuration
    API_KEY = os.getenv('YOUTUBE_API_KEY')
    CHANNEL_ID = os.getenv('YOUTUBE_CHANNEL_ID', 'UC_x5XG1OV2P6uZZ5FSM9Ttw')  # Google Developers channel as example

    if not API_KEY:
        raise ValueError("Please set YOUTUBE_API_KEY environment variable")

    youtube = YouTubeAPI(API_KEY)

    # Fetch channel data
    print("Fetching channel statistics...")
    channel_stats = youtube.get_channel_stats(CHANNEL_ID)

    # Fetch videos data
    print("Fetching videos data...")
    videos_data = youtube.get_channel_videos(CHANNEL_ID, max_results=100)

    # Save raw data
    os.makedirs('/opt/airflow/data/raw', exist_ok=True)

    with open('/opt/airflow/data/raw/channel_stats.json', 'w') as f:
        json.dump(channel_stats, f, indent=2)

    with open('/opt/airflow/data/raw/videos_data.json', 'w') as f:
        json.dump(videos_data, f, indent=2)

    print(f"Data saved: {len(videos_data)} videos processed")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

3. PySpark ETL Job

spark/jobs/youtube_etl.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from datetime import datetime

def create_spark_session():
    return SparkSession.builder \
        .appName("YouTubeAnalytics") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()

def read_json_data(spark, file_path):
    """Read JSON data as text and parse manually"""
    with open(file_path, 'r') as f:
        data = json.load(f)

    # Handle both single object and array of objects
    if isinstance(data, list):
        return spark.createDataFrame(data)
    else:
        return spark.createDataFrame([data])

def process_channel_data(spark):
    """Process channel statistics"""
    df = read_json_data(spark, '/opt/spark/data/raw/channel_stats.json')

    processed_df = df.select(
        col('channel_id'),
        col('title').alias('channel_title'),
        col('description').alias('channel_description'),
        to_timestamp(col('published_at')).alias('channel_created_at'),
        col('subscribers').cast('long'),
        col('total_views').cast('long'),
        col('video_count').cast('int'),
        to_timestamp(col('timestamp')).alias('processed_at')
    )

    return processed_df

def process_videos_data(spark):
    """Process videos data and create features"""
    df = read_json_data(spark, '/opt/spark/data/raw/videos_data.json')

    # Calculate duration in minutes and engagement rate
    processed_df = df.select(
        col('video_id'),
        col('channel_id'),
        col('title').alias('video_title'),
        to_timestamp(col('published_at')).alias('published_at'),
        col('views').cast('long'),
        col('likes').cast('long'),
        col('comments').cast('long'),
        # Extract features
        dayofweek(col('published_at')).alias('publish_day_of_week'),
        hour(col('published_at')).alias('publish_hour'),
        # Calculate engagement metrics
        when(col('views') > 0, (col('likes') + col('comments')) / col('views')).otherwise(0).alias('engagement_rate'),
        (col('likes') / col('views')).alias('like_rate'),
        (col('comments') / col('views')).alias('comment_rate'),
        # Duration parsing (ISO 8601 format)
        regexp_extract(col('duration'), r'PT(\d+)M', 1).cast('int').alias('duration_minutes'),
        to_timestamp(col('timestamp')).alias('processed_at')
    ).filter(col('views').isNotNull())  # Remove videos with no views data

    return processed_df

def calculate_trending_metrics(videos_df):
    """Calculate trending metrics and top videos"""
    trending_videos = videos_df.orderBy(col('views').desc()).limit(10)

    daily_metrics = videos_df.groupBy(
        date_trunc('day', col('published_at')).alias('publish_date')
    ).agg(
        count('*').alias('videos_published'),
        avg('views').alias('avg_views'),
        avg('engagement_rate').alias('avg_engagement_rate'),
        sum('views').alias('total_views')
    )

    return trending_videos, daily_metrics

def main():
    spark = create_spark_session()

    print("Processing channel data...")
    channel_df = process_channel_data(spark)

    print("Processing videos data...")
    videos_df = process_videos_data(spark)

    print("Calculating trending metrics...")
    trending_videos, daily_metrics = calculate_trending_metrics(videos_df)

    # Save processed data
    output_path = '/opt/spark/data/processed'

    channel_df.write.mode('overwrite').parquet(f'{output_path}/channel_stats')
    videos_df.write.mode('overwrite').parquet(f'{output_path}/videos_processed')
    trending_videos.write.mode('overwrite').parquet(f'{output_path}/trending_videos')
    daily_metrics.write.mode('overwrite').parquet(f'{output_path}/daily_metrics')

    # Show some results
    print("Top 10 videos by views:")
    trending_videos.select('video_title', 'views', 'engagement_rate').show(truncate=False)

    print("Channel statistics:")
    channel_df.show(truncate=False)

    spark.stop()

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

4. Airflow DAG

airflow/dags/youtube_pipeline.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta
import os

default_args = {
    'owner': 'youtube_analytics',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def run_youtube_extraction():
    """Run YouTube data extraction"""
    from scripts.youtube_api import main
    main()

dag = DAG(
    'youtube_analytics_pipeline',
    default_args=default_args,
    description='End-to-end YouTube analytics pipeline',
    schedule_interval=timedelta(days=1),
    catchup=False,
)

# Task 1: Extract data from YouTube API
extract_task = PythonOperator(
    task_id='extract_youtube_data',
    python_callable=run_youtube_extraction,
    dag=dag,
)

# Task 2: Transform data with PySpark
transform_task = SparkSubmitOperator(
    task_id='transform_data_with_spark',
    application='/opt/airflow/scripts/spark/jobs/youtube_etl.py',
    conn_id='spark_default',
    verbose=False,
    dag=dag,
)

# Task 3: Load data to PostgreSQL (optional)
load_task = BashOperator(
    task_id='load_to_database',
    bash_command='echo "Loading processed data to database..."',
    dag=dag,
)

# Set task dependencies
extract_task >> transform_task >> load_task

# Optional: Add data quality check
data_quality_check = BashOperator(
    task_id='data_quality_check',
    bash_command='echo "Running data quality checks..."',
    dag=dag,
)

transform_task >> data_quality_check
Enter fullscreen mode Exit fullscreen mode

5. Grafana Configuration

grafana/provisioning/datasources/datasource.yml

apiVersion: 1

datasources:
  - name: PostgreSQL
    type: postgresql
    access: proxy
    url: postgres:5432
    database: airflow
    user: airflow
    password: airflow
    isDefault: true
    version: 1
    editable: true
Enter fullscreen mode Exit fullscreen mode

grafana/provisioning/dashboards/dashboard.yml

apiVersion: 1

providers:
  - name: 'default'
    orgId: 1
    folder: ''
    type: file
    disableDeletion: false
    updateIntervalSeconds: 10
    allowUiUpdates: true
    options:
      path: /etc/grafana/provisioning/dashboards
Enter fullscreen mode Exit fullscreen mode

6. Setup and Deployment Instructions

Environment Setup

  1. Get YouTube API Key:

  2. Set environment variables:

export YOUTUBE_API_KEY="your_api_key_here"
export YOUTUBE_CHANNEL_ID="channel_id_you_want_to_analyze"
Enter fullscreen mode Exit fullscreen mode

Running the Pipeline

  1. Start all services:
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode
  1. Access services:

  2. Trigger the pipeline:

    • Go to Airflow UI
    • Enable the youtube_analytics_pipeline DAG
    • Trigger manually or wait for scheduled run

Sample Queries for Grafana

Create these panels in your Grafana dashboard:

  1. Top Videos by Views:
SELECT video_title, views 
FROM trending_videos 
ORDER BY views DESC 
LIMIT 10
Enter fullscreen mode Exit fullscreen mode
  1. Engagement Rate Over Time:
SELECT publish_date, avg_engagement_rate 
FROM daily_metrics 
ORDER BY publish_date
Enter fullscreen mode Exit fullscreen mode
  1. Publishing Heatmap:
SELECT publish_hour, publish_day_of_week, COUNT(*) as video_count
FROM videos_processed
GROUP BY publish_hour, publish_day_of_week
Enter fullscreen mode Exit fullscreen mode

Key Features Implemented

End-to-end data pipeline from YouTube API to visualization
Data processing with PySpark including feature engineering
Workflow orchestration with Airflow
Containerization with Docker
Interactive dashboards with Grafana
Error handling and data quality checks
Scalable architecture for multiple channels

This pipeline provides a solid foundation for YouTube channel analytics and can be extended with additional features like sentiment analysis, competitor benchmarking, and predictive analytics.

Top comments (0)