DEV Community

Cover image for Using PySpark on AWS Glue Studio to load data from RDS MySQL to Data Lake
Vuong Bach Doan
Vuong Bach Doan

Posted on • Edited on

3 2 2 2 2

Using PySpark on AWS Glue Studio to load data from RDS MySQL to Data Lake

This post explores the powerful combination of PySpark and AWS Glue for streamlining data ETL (Extract, Transform, Load) processes. We'll delve into:

  • PySpark: Harnessing Python's flexibility for large-scale data analysis and transformations within the familiar Python environment.

  • AWS Glue: Simplifying and scaling ETL workflows with a fully managed, serverless service on AWS.

The Challenge:

Efficiently transferring data from an RDS MySQL database to an S3 data lake.

The Solution:

  1. Defining the ETL Job: Moving data from stats, ability, and info tables in MySQL to S3.

  2. Setting Up Glue Studio: Selecting Author code with a script editor, establishing IAM roles, and downloading the MySQL JDBC driver to S3.

  3. Coding with PySpark: Utilizing the provided code template for a smooth workflow:

    • Creating a SparkSession.
    • Adding the JDBC driver.
    • Defining a function to extract data from tables.
    • Reading data from multiple tables.
    • Transforming the "capture_rate" in the "info" table.
    • Partitioning data into timestamp-based subfolders.
    • Writing data to S3 in Parquet format.

I have prepared a code template for you to easier start with it:

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col
from datetime import datetime

spark = SparkSession.builder.getOrCreate()  # Create SparkSession
spark.sparkContext.addPyFile("s3://<you-bucket>/mysql-connector-j-8.3.0.jar")  # Add the MySQL JDBC driver to the classpath
jdbc_url = "jdbc:mysql://<your-host>:3306/<your-database>"
connection_properties = {"user": "admin", "password": "********"}

def extract_df_to_s3(spark_session, jdbc_url, connection_properties, table_name):
    df = spark_session.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
    return df

# Read data from multiple tables
df_info = extract_df_to_s3(spark, jdbc_url, connection_properties, "info")
df_ability = extract_df_to_s3(spark, jdbc_url, connection_properties, "ability")
df_stats = extract_df_to_s3(spark, jdbc_url, connection_properties, "stats")

# Transform capture_rate in the info table and cast to int
df_info = df_info.withColumn("capture_rate", regexp_extract("capture_rate", r"^\D*(\d+).*$", 1)) \
                 .withColumn("capture_rate", col("capture_rate").cast("int"))

# Generate timestamp subfolders for each DataFrame
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
info_subfolder = f"pokemon_info_processed/{timestamp_str}"
ability_subfolder = f"pokemon_ability_processed/{timestamp_str}"
stats_subfolder = f"pokemon_stat_processed/{timestamp_str}"

# Write DataFrames to separate folders
df_info.write.parquet(f"s3://<your-bucket>-datalake/{info_subfolder}", mode="overwrite")
df_ability.write.parquet(f"s3://<your-bucket>-datalake/{ability_subfolder}", mode="overwrite")
df_stats.write.parquet(f"s3://<your-bucket>-datalake/{stats_subfolder}", mode="overwrite")

Enter fullscreen mode Exit fullscreen mode

Now you can run the job and check if the result appear in the S3 Data Lake bucket. ✨

Sentry image

Hands-on debugging session: instrument, monitor, and fix

Join Lazar for a hands-on session where you’ll build it, break it, debug it, and fix it. You’ll set up Sentry, track errors, use Session Replay and Tracing, and leverage some good ol’ AI to find and fix issues fast.

RSVP here →

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay