Introduction
In this post, I’ll walk you through how I built a scalable ETL data pipeline to extract and analyze video content from the Premier League YouTube Channel, using the YouTube Data API, PySpark, Airflow, and PostgreSQL, and visualized the data on a Grafana dashboard.
Here is the GitHub repository.
1: Extract Data from YouTube API
Using googleapiclient
, I connected to the YouTube Data API and extracted:
- The channel's upload playlist ID
- Processed all video IDs (in batches of 50)
- Metadata for each video: title, view count, like count, comment count, and published date
This data was written to a JSON file for further processing and transformation.
2: Transform Data with PySpark and Load into Postgres
Next, I used PySpark to clean and process the data by creating a Spark dataframe and using a temporary view to transform data.
df = spark.read.option("multiLine", True).json(filepath)
videos = df.createOrReplaceTempView("videos")
new_df = spark.sql("""
SELECT
title,
videoId,
CAST(viewCount as INT) as viewCount,
CASE
WHEN CAST(viewCount AS INT) >= 100000 THEN 'very viral'
WHEN CAST(viewCount AS INT) BETWEEN 10000 AND 99999 THEN 'viral'
ELSE 'normal viewing'
END AS video_virality,
CAST(commentCount AS INT) AS commentCount,
CAST(likeCount AS INT) AS likeCount,
CAST(publishedAt AS DATE) AS date
FROM videos
""")
The transformations include:
- Casted viewCount, likeCount, and commentCount to integer values for easier analysis.
- Calculated engagement rate whose formula is:
(likes + comments) / views
Loading into Postgres
To load the data into a Postgres database, we should use the df.write.jdbc()
method from Pyspark.
NOTE: Make sure that the
spark.jars
package is installed to avoid running into errors while loading data into the database. Also, use the JDBC URI to connect to the database (it doesn't have the user and password as they have been set up in the properties variable)
properties = {
'user': 'avnadmin',
'password': DB_PWD,
"driver": "org.postgresql.Driver"
}
# code
try:
new_df.write.jdbc(url=DB_URL, table='pl_analytics', mode='overwrite', properties=properties)
print("Data loaded into Postgres DB successfully!")
except Exception as e:
print(f"Loaded Spark Dataframe into db error: {e}")
3: Orchestrate with Airflow
To automate the workflow weekly, I created two Python modules:
extract_data.py: Extracts data from YouTube and writes to JSON
transform_and_load.py: Reads the JSON, processes with PySpark, and loads into PostgreSQL
Then I defined a DAG in extract_transform.py and defined the extract, transform and load tasks using the PythonOperator:
with DAG(
dag_id='pl_analytics_dag',
description='This DAG contains the pipeline for extracting, transforming and loading data from the Youtube API',
default_args=default_args,
start_date=datetime(2025, 6, 12),
schedule='@weekly',
catchup=False
) as dag:
extract = PythonOperator(
task_id = 'extract_data',
python_callable=extract_task
)
transform = PythonOperator(
task_id = 'transform_and_load',
python_callable=transform_task
)
extract >> transform
4. Visualizing with Grafana
I connected the PostgreSQL database to Grafana Cloud and created a dashboard to draw insights from the collected data. Some of the insights include:
Line chart of views over time
Top 10 most viewed videos (bar chart)
Histogram of engagement rates
Scatter plot of video length vs. views/likes/comments
Conclusion
This project gives hands-on experience with end-to-end ETL pipeline design, from data extraction from Google APIs using googleapiclient
to transformation using Pyspark, loading, and visualization. It's a solid foundation for building more robust data pipelines.
For more blogs like this, please like, comment and follow me to stay updated in the data engineering world!
Top comments (1)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.