YouTube Data Processing Pipeline
A comprehensive data pipeline that extracts, processes, and analyzes YouTube channel statistics using modern data engineering tools and practices.
π Overview
This pipeline automates the collection and analysis of YouTube channel data, providing insights into channel performance, subscriber growth, and content metrics. The system is built with scalability and reproducibility in mind, using industry-standard tools for data orchestration, storage, and analysis.
π Pipeline Architecture
YouTube API β Airflow (Orchestration) β PostgreSQL (Storage) β Jupyter (Analysis)
Components:
- Airflow: Workflow orchestration and scheduling
- PostgreSQL: Relational data storage
- Jupyter: Data analysis and visualization
- Docker: Containerization and service management
π οΈ Tech Stack
Component | Technology | Purpose |
---|---|---|
Orchestration | Apache Airflow 2.8.1 | Workflow scheduling & monitoring |
Database | PostgreSQL 13 | Data storage & persistence |
Analysis | Jupyter Lab | Data exploration & visualization |
Containerization | Docker & Docker Compose | Environment management |
Language | Python 3.8+ | Data processing & API integration |
π Project Structure
youtube-pipeline/
βββ docker-compose.yml # Main service configuration
βββ start-youtube-pipeline.sh # Quick start script
βββ airflow/
β βββ dags/
β β βββ youtube_pipeline.py # Main data extraction DAG
β βββ logs/ # Airflow execution logs
β βββ plugins/ # Custom Airflow plugins
βββ jupyter/
β βββ notebooks/
β βββ youtube_analysis.ipynb # Data analysis notebook
βββ postgres_data/ # Database volume
βββ backup/ # Backup directory for configurations
π Quick Start
Prerequisites
- Docker & Docker Compose
- YouTube Data API v3 key
Installation & Setup
- Clone and setup the project:
git clone <repository-url>
cd youtube-pipeline
- Set up YouTube API credentials:
# Set your YouTube API key in Airflow
docker-compose exec airflow airflow variables set YOUTUBE_API_KEY "your_api_key_here"
- Start the pipeline:
# Use the quick start script
./start-youtube-pipeline.sh
# Or manually
docker-compose up -d
-
Access the services:
- Airflow UI: http://localhost:8080 (admin/admin)
- Jupyter Lab: http://localhost:8888 (token: password)
- PostgreSQL: localhost:5433 (postgres/password)
π Pipeline Features
Data Collection
- Automated Extraction: Scheduled every 6 hours
- Multiple Channels: Configurable channel monitoring
-
Comprehensive Metrics:
- Subscriber counts
- Total views
- Video counts
- Channel metadata
- Historical tracking
Data Processing
- Incremental Updates: UPSERT operations to avoid duplicates
- Error Handling: Graceful failure recovery
- Data Validation: Type checking and constraint enforcement
Monitoring & Alerting
- Airflow Dashboard: Real-time pipeline monitoring
- Task Logs: Detailed execution logs
- Health Checks: Service status monitoring
π§ Configuration
YouTube Channels
Edit airflow/dags/youtube_pipeline.py
to modify monitored channels:
CHANNEL_IDS = [
"UC_x5XG1OV2P6uZZ5FSM9Ttw", # Google for Developers
"UCq-Fj5jknLsU6-M0R3PiHcA", # YouTube Creators
"UCBJycsmduvYEL83R_U4JriQ", # Marques Brownlee
# Add more channel IDs here
]
Scheduling
Modify the DAG schedule in youtube_pipeline.py
:
schedule_interval=timedelta(hours=6) # Change as needed
π Data Schema
channel_stats Table
Column | Type | Description |
---|---|---|
channel_id | VARCHAR(255) PRIMARY KEY | YouTube channel ID |
channel_title | TEXT | Channel name |
channel_description | TEXT | Channel description |
channel_created_at | TIMESTAMP | Channel creation date |
total_views | BIGINT | Lifetime views |
subscriber_count | BIGINT | Current subscribers |
video_count | BIGINT | Total videos published |
processed_at | TIMESTAMP | Data collection timestamp |
video_stats Table
Column | Type | Description |
---|---|---|
video_id | VARCHAR(255) PRIMARY KEY | YouTube video ID |
video_title | TEXT | Video title |
channel_id | VARCHAR(255) | Associated channel |
view_count | BIGINT | Video views |
like_count | BIGINT | Video likes |
comment_count | BIGINT | Number of comments |
published_at | TIMESTAMP | Video publish date |
processed_at | TIMESTAMP | Data collection timestamp |
π― Usage Examples
Data Analysis in Jupyter
# Connect to database
import psycopg2
import pandas as pd
conn = psycopg2.connect(
host="postgres",
database="youtube_db",
user="postgres",
password="password",
port=5432
)
# Load channel data
df = pd.read_sql("SELECT * FROM channel_stats", conn)
print(f"Tracking {len(df)} YouTube channels")
Common Queries
-- Channel growth analysis
SELECT channel_title, subscriber_count, total_views,
ROUND(total_views::numeric / subscriber_count, 2) as views_per_subscriber
FROM channel_stats
ORDER BY subscriber_count DESC;
-- Historical trends (when multiple runs exist)
SELECT channel_title, subscriber_count, processed_at
FROM channel_stats
WHERE channel_id = 'UC_x5XG1OV2P6uZZ5FSM9Ttw'
ORDER BY processed_at DESC;
π Operations
Starting Services
./start-youtube-pipeline.sh
Stopping Services
docker-compose down
docker stop jupyter
Monitoring
# Check service status
docker-compose ps
# View logs
docker-compose logs airflow
docker logs jupyter
# Check pipeline runs
docker-compose exec airflow airflow dags list-runs -d youtube_data_pipeline
Maintenance
# Backup database
docker-compose exec postgres pg_dump -U postgres youtube_db > backup.sql
# Update channel list
# Edit airflow/dags/youtube_pipeline.py
# Check API quota usage
# Monitor Google Cloud Console
π‘οΈ Security Considerations
- API keys stored in Airflow Variables (encrypted)
- Database credentials in Docker environment variables
- Network isolation between services
- Regular dependency updates
π Scaling & Extensions
Potential Enhancements
- Real-time Processing: Add Kafka for stream processing
- Data Warehouse: Integrate with BigQuery or Snowflake
- Monitoring: Add Prometheus/Grafana for metrics
- Alerting: Slack/email notifications for anomalies
- ML Integration: Predictive analytics for growth trends
Performance Optimization
- Batch Processing: Group API calls to reduce quota usage
- Caching: Implement Redis for frequently accessed data
- Partitioning: Time-based partitioning of video stats
- Indexing: Optimize database query performance
π Troubleshooting
Common Issues
-
YouTube API Quota Exceeded
- Solution: Monitor usage in Google Cloud Console
- Alternative: Reduce update frequency or request quota increase
Airflow Database Connection Issues
# Reset database connection
docker-compose restart airflow
- Jupyter Not Accessible
# Restart Jupyter container
docker restart jupyter
-
Missing YouTube Data
- Check API key validity
- Verify channel IDs are correct
- Review Airflow task logs
Logs & Debugging
# Airflow task logs
docker-compose exec airflow airflow tasks logs youtube_data_pipeline extract_youtube_data
# Service logs
docker-compose logs
docker logs jupyter
π€ Contributing
- Fork the repository
- Create a feature branch
- Make changes with tests
- Submit a pull request
Development Setup
# Create virtual environment
python -m venv venv
source venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Run tests
pytest tests/
π License
This project is licensed under the MIT License - see the LICENSE file for details.
π Acknowledgments
- YouTube Data API v3 for data access
- Apache Airflow community for orchestration tools
- Jupyter project for analysis environment
- Docker community for containerization
π Support
For issues and questions:
- Check troubleshooting section above
- Review service logs
- Open a GitHub issue
- Contact maintainers
Maintained by: Josiah Lagat
Last Updated: October 2024
Version: 1.0.0
β Star this repo if you find it helpful!
Top comments (0)