DEV Community

Cover image for Using PySpark on AWS Glue Studio to load data from RDS MySQL to Data Lake
Vuong Bach Doan
Vuong Bach Doan

Posted on • Edited on

Using PySpark on AWS Glue Studio to load data from RDS MySQL to Data Lake

This post explores the powerful combination of PySpark and AWS Glue for streamlining data ETL (Extract, Transform, Load) processes. We'll delve into:

  • PySpark: Harnessing Python's flexibility for large-scale data analysis and transformations within the familiar Python environment.

  • AWS Glue: Simplifying and scaling ETL workflows with a fully managed, serverless service on AWS.

The Challenge:

Efficiently transferring data from an RDS MySQL database to an S3 data lake.

The Solution:

  1. Defining the ETL Job: Moving data from stats, ability, and info tables in MySQL to S3.

  2. Setting Up Glue Studio: Selecting Author code with a script editor, establishing IAM roles, and downloading the MySQL JDBC driver to S3.

  3. Coding with PySpark: Utilizing the provided code template for a smooth workflow:

    • Creating a SparkSession.
    • Adding the JDBC driver.
    • Defining a function to extract data from tables.
    • Reading data from multiple tables.
    • Transforming the "capture_rate" in the "info" table.
    • Partitioning data into timestamp-based subfolders.
    • Writing data to S3 in Parquet format.

I have prepared a code template for you to easier start with it:

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col
from datetime import datetime

spark = SparkSession.builder.getOrCreate()  # Create SparkSession
spark.sparkContext.addPyFile("s3://<you-bucket>/mysql-connector-j-8.3.0.jar")  # Add the MySQL JDBC driver to the classpath
jdbc_url = "jdbc:mysql://<your-host>:3306/<your-database>"
connection_properties = {"user": "admin", "password": "********"}

def extract_df_to_s3(spark_session, jdbc_url, connection_properties, table_name):
    df = spark_session.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    return df

# Read data from multiple tables
df_info = extract_df_to_s3(spark, jdbc_url, connection_properties, "info")
df_ability = extract_df_to_s3(spark, jdbc_url, connection_properties, "ability")
df_stats = extract_df_to_s3(spark, jdbc_url, connection_properties, "stats")

# Transform capture_rate in the info table and cast to int
df_info = df_info.withColumn("capture_rate", regexp_extract("capture_rate", r"^\D*(\d+).*$", 1)) \
                 .withColumn("capture_rate", col("capture_rate").cast("int"))

# Generate timestamp subfolders for each DataFrame
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
info_subfolder = f"pokemon_info_processed/{timestamp_str}"
ability_subfolder = f"pokemon_ability_processed/{timestamp_str}"
stats_subfolder = f"pokemon_stat_processed/{timestamp_str}"

# Write DataFrames to separate folders
df_info.write.parquet(f"s3://<your-bucket>-datalake/{info_subfolder}", mode="overwrite")
df_ability.write.parquet(f"s3://<your-bucket>-datalake/{ability_subfolder}", mode="overwrite")
df_stats.write.parquet(f"s3://<your-bucket>-datalake/{stats_subfolder}", mode="overwrite")

Enter fullscreen mode Exit fullscreen mode

Now you can run the job and check if the result appear in the S3 Data Lake bucket. ✨

Top comments (0)