π¬ Exploring the Netflix TV Shows and Movies Dataset with Spark
The "Netflix TV Shows and Movies" dataset contains information about movies and TV shows β including title, country, rating, release year, duration, and more.
Goal: Use Apache Spark to read and analyze this dataset, generating some useful insights and metrics.
Source: Dataset on Kaggle
Environment: Databricks Free Edition Notebook
βοΈ 1. Install Kaggle and Restart Python
%pip install kaggle
%restart_python
π₯ 2. Download the Dataset into Volumes
import os
import json
kaggle_token = dbutils.secrets.get('kaggle', 'kaggle-token')
kaggle_creds = json.loads(kaggle_token)
kaggle_dir = os.path.expanduser('~/.config/kaggle')
os.makedirs(kaggle_dir, exist_ok=True)
kaggle_json_path = os.path.join(kaggle_dir, 'kaggle.json')
# Write the token to kaggle.json
with open(kaggle_json_path, 'w') as f:
f.write(kaggle_token)
from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()
dataset_identifier = "shivamb/netflix-shows"
volume_path = "/Volumes/dev/raw/netflix/"
api.dataset_download_files(dataset_identifier, path=volume_path, unzip=True)
-- Preparing catalog
CREATE CATALOG IF NOT EXISTS dev;
USE CATALOG dev;
CREATE SCHEMA IF NOT EXISTS dev.raw;
CREATE VOLUME IF NOT EXISTS dev.raw.netflix;
π 3. Read the CSV File and Infer the Schema
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import to_date, trim, col
schema = StructType() \
.add("show_id", StringType()) \
.add("type", StringType()) \
.add("title", StringType()) \
.add("director", StringType()) \
.add("cast", StringType()) \
.add("country", StringType()) \
.add("date_added", StringType()) \
.add("release_year", IntegerType()) \
.add("rating", StringType()) \
.add("duration", StringType()) \
.add("listed_in", StringType()) \
.add("description", StringType())
df = spark.read.format('csv') \
.options(header=True, quote='"', escape='"', multiline=True, mode="PERMISSIVE") \
.schema(schema) \
.option("ignoreLeadingWhiteSpace", True) \
.option("ignoreTrailingWhiteSpace", True) \
.load("/Volumes/dev/raw/netflix/netflix_titles.csv")
df = df.withColumn('date_added', to_date(trim(col('date_added')), 'MMMM d, yyyy'))
π‘ Note:
Some options were added to prevent Spark from parsing the data incorrectly.
In this case, quotes and extra spaces were affecting how the data was distributed across columns.
Thedate_added
column was also converted to a proper date type.
π 4. View Statistics and Data Quality
from pyspark.sql.functions import col, when, count, countDistinct
# Count null values per column
df_nulls = df.select([
count(when(col(c).isNull() | (col(c) == "") | (col(c) == " "), c)).alias(c)
for c in df.columns
])
# Count distinct values
display(df.select([countDistinct(col(c)).alias(c) for c in df.columns]))
# Count duplicated records
print(f"Duplicate records: {(df.count() - df.dropDuplicates().count())}")
# General statistics
display(df.describe())
# Show null counts
display(df_nulls)
# View inferred schema
df.printSchema()
# Show a random sample of the dataset
display(df.sample(withReplacement=False, fraction=0.01, seed=42).limit(10))
π 5. Analysis
a) How many movies and TV shows are in the catalog?
value = df.groupBy("type").count()
display(value)
b) How are the contents distributed by rating?
value = df.groupBy("rating").count()
display(value)
c) What is the average duration of movies?
from pyspark.sql.functions import col, regexp_extract, avg
(df.filter(col('type') == 'Movie')
.select(
regexp_extract(col('duration'), r'(\d+)', 1)
.cast('int')
.alias('duration')
)
.agg(
avg('duration')
.cast('decimal(10,2)')
.alias('average_duration')
)
.show())
Top comments (0)