DEV Community

Kyle Escosia for AWS Community ASEAN

Posted on • Updated on

SQL-based INSERTS, DELETES and UPSERTS in S3 using AWS Glue 3.0 and Delta Lake

AWS NOW SUPPORTS DELTA LAKE ON GLUE NATIVELY.
CHECK IT OUT HERE:

The purpose of this blog post is to demonstrate how you can use Spark SQL Engine to do UPSERTS, DELETES, and INSERTS. Basically, updates.

Earlier this month, I made a blog post about doing this via PySpark. Check it out below:

But, what if we want it to make it more simple and familiar?

This month, AWS released Glue version 3.0! AWS Glue 3.0 introduces a performance-optimized Apache Spark 3.1 runtime for batch and stream processing. The new engine speeds up data ingestion, processing and integration allowing you to hydrate your data lake and extract insights from data quicker.

aws-glue-3.0-updates

aws-glue-3.0-performance-improvements

But, what's the big deal with this?

Well, aside from a lot of general performance improvements of the Spark Engine, it can now also support the latest versions of Delta Lake. The most notable one is the Support for SQL Insert, Delete, Update and Merge.

If you don't know what Delta Lake is, you can check out my blog post that I referenced above to have a general idea of what it is.

Let's proceed with the demo!

Table of Contents

✅ Architecture Diagram

kyle-escosia-aws-glue-delta-lake-diagram

This is basically a simple process flow of what we'll be doing. We take a sample csv file, load it into an S3 Bucket then process it using Glue. (OPTIONAL) Then you can connect it into your favorite BI tool (I'll leave it up to you) and start visualizing your updated data.

❗ Pre-requisites

But, before we get to that, we need to do some pre-work.

  • Download the Delta Lake package here - a bit hard to spot, but look for the Files in the table and click on the jar
  • An AWS Account - ❗ Glue ETL is not included in the free tier
  • Download the sample data here - you can use your own though, but I'll be using this one
  • Codes can be found in my GitHub Repository

✅ Format to Delta Table

First things first, we need to convert each of our dataset into Delta Format. Below is the code for doing this.


# Import the packages
from delta import *
from pyspark.sql.session import SparkSession

# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/raw/')

# Write data as a DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/current/")

# Read Source
updatesDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-aws-glue-demo/updates/')

# Write data as a DELTA TABLE
updatesDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-aws-glue-demo/updates_delta/")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")

### OPTIONAL, UNCOMMENT IF YOU WANT TO VIEW ALSO THE DATA FOR UPDATES IN ATHENA
###
# Generate MANIFEST file for Updates
# updatesDeltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/updates_delta/")
# updatesDeltaTable.generate("symlink_format_manifest")
Enter fullscreen mode Exit fullscreen mode

This code converts our dataset into delta format. This is done on both our source data and as well as for the updates.

After generating the SYMLINK MANIFEST file, we can view it via Athena. SQL code is also included in the repository

athena-sample-data

🔀 Upserts

Upsert is defined as an operation that inserts rows into a database table if they do not already exist, or updates them if they do.

In this example, we'll be updating the value for a couple of rows on ship_mode, customer_name, sales, and profit. I just did a random character spam and I didn't think it through 😅.


# Import as always
from delta import *
from pyspark.sql.session import SparkSession

# Initialize Spark Session along with configs for Delta Lake
spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


updateDF = spark.sql("""

MERGE INTO delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore
USING delta.`s3a://delta-lake-aws-glue-demo/updates_delta/` as updates
ON superstore.row_id = updates.row_id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *
""")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")

### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK

# spark.sql("""
# GENERATE symlink_format_manifest 
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`
# """)

Enter fullscreen mode Exit fullscreen mode

The SQL Code above updates the current table that is found on the updates table based on the row_id. It then proceeds to evaluate the condition that,

If row_id is matched, then UPDATE ALL the data. If not, then do an INSERT ALL.

If you want to check out the full operation semantics of MERGE you can read through this

After which, we update the MANIFEST file again. Note that this generation of MANIFEST file can be set to automatically update by running the query below.

ALTER TABLE delta.`<path-to-delta-table>` 
SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
Enter fullscreen mode Exit fullscreen mode

More information can be found here

You should now see your updated table in Athena.

❌ Deletes

Deletes via Delta Lakes are very straightforward.

from delta import *
from pyspark.sql.session import SparkSession


spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


deleteDF = spark.sql("""
DELETE 
FROM delta.`s3a://delta-lake-aws-glue-demo/current/` as superstore 
WHERE CAST(superstore.row_id as integer) <= 20
""")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
    spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")

### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK MANIFEST

# spark.sql("""

# GENERATE symlink_format_manifest 
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`

# """)

Enter fullscreen mode Exit fullscreen mode

This operation does a simple delete based on the row_id.

SELECT * 
FROM "default"."superstore" 
-- Need to CAST hehe bec it is currently a STRING
ORDER BY CAST(row_id as integer); 
Enter fullscreen mode Exit fullscreen mode

aws-athena-delete

⤴ Inserts

Like Deletes, Inserts are also very straightforward.


from delta import *
from pyspark.sql.session import SparkSession


spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


insertDF = spark.sql("""
INSERT INTO delta.`s3a://delta-lake-aws-glue-demo/current/`
SELECT *
FROM delta.`s3a://delta-lake-aws-glue-demo/updates_delta/`
WHERE CAST(row_id as integer) <= 20
""")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(
    spark, "s3a://delta-lake-aws-glue-demo/current/")
deltaTable.generate("symlink_format_manifest")

### OPTIONAL
## SQL-BASED GENERATION OF SYMLINK MANIFEST

# spark.sql("""

# GENERATE symlink_format_manifest 
# FOR TABLE delta.`s3a://delta-lake-aws-glue-demo/current/`

# """)

Enter fullscreen mode Exit fullscreen mode

❗ Partitioned Data

We've done Upsert, Delete, and Insert operations for a simple dataset. But, that rarely happens irl. So what if we spice things up and do it to a partitioned data?

I went ahead and did some partitioning via Spark and did a partitioned version of this using the order_date as the partition key. The S3 structure looks like this:

s3-partitioned-data

❗ What do you think?

Answer is: YES! You can also do this on a partitioned data.

The concept of Delta Lake is based on log history.

Delta Lake will generate delta logs for each committed transactions.

Delta logs will have delta files stored as JSON which has information about the operations occurred and details about the latest snapshot of the file and also it contains the information about the statistics of the data.

Delta files are sequentially increasing named JSON files and together make up the log of all changes that have occurred to a table.

-from Data Floq

We can see this on the example below

raw date_part=2014-08-27/
raw-partitioned

current date_part=2014-08-27/ - DELETED ROWS
current-partitioned

If we open the parquet file:
updated-data

From the examples above, we can see that our code wrote a new parquet file during the delete excluding the ones that are filtered from our delete operation. After which, the JSON file maps it to the newly generated parquet.

Additionally, in Athena, if your table is partitioned, you need to specify it in your query during the creation of schema


CREATE EXTERNAL TABLE IF NOT EXISTS superstore ( 
    row_id STRING,
    order_id STRING,
    order_date STRING,
    ship_date STRING,
    ship_mode STRING,
    customer_id STRING,
    customer_name STRING,
    segment STRING,
    country STRING,
    city STRING,
    state STRING,
    postal_code STRING,
    region STRING,
    product_id STRING,
    category STRING,
    sub_category STRING,
    product_name STRING,
    sales STRING,
    quantity STRING,
    discount STRING,
    profit STRING,
    date_part STRING

)
-- Add PARTITIONED BY option
PARTITIONED BY (date_part STRING)

ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
LOCATION 's3://delta-lake-aws-glue-demo/current/_symlink_format_manifest/'
Enter fullscreen mode Exit fullscreen mode

Then run an MSCK REPAIR <table> to add the partitions.

If you don't do these steps, you'll get an error.
partition-error

✅ Conclusion

That's it! It's a great time to be a SQL Developer! Thank you for reading through! Hope you learned something new on this post.

Have you tried Delta Lake? What tips, tricks and best practices can you share with the community? Would love to hear your thoughts on the comments below!

Happy coding!

Top comments (8)

Collapse
 
dude0001 profile image
Mark Lambert

Thank you for the article. We have the need to do fast UPSERTs in an ETL pipeline just like this article. I am using Glue 2.0 with Hudi in a PoC that seems to be giving us the performance we need. Delta was on my radar and when I saw the Glue 3.0 announcement making a lot of improvements for Delta but no mention of Hudi it makes me think we should have looked at Delta first. Do you have any experience with Hudi to compare with your Delta experience in this article?

Collapse
 
klescosia profile image
Kyle Escosia

I actually want to try out Hudi because I'm still evaluating whether to use Delta Lake over it for our future workloads. I'm on the same boat as you, I was reluctant to try out Delta Lake since AWS Glue only supports Spark 2.4, but yeah, Glue 3.0 came, and with it, the support for the latest Delta Lake package.

Others think that Delta Lake is too "databricks-y", if that's a word lol, not sure what they meant by that (perhaps the runtime?). But so far, I haven't encountered any problems with it because AWS supports Delta Lake as much as it does with Hudi.

Collapse
 
arnlan profile image
arn-lan

Thanks much for this nice article.
I was just wondering whether you could actually test the performance of such setup while querying from Athena.
Indeed a typical optimization technique for Athena is to have files which are big enough ( ~100 MB). So what would be the impact of having instead many small Parquet files within a given partition, each containing a wave of updates?

Collapse
 
klescosia profile image
Kyle Escosia • Edited

Glad you liked it! Interesting. Haven't done an extensive test yet, but yeah I get your point, one impact would be your overhead cost of querying because you have a lot of partitions. Well, you aren't going to query all the partitions anyways if you wanted to update, the Glue Job will do that for you. So the one that you'll see in Athena will always be the latest ones. Having said that, you can always control the number of files that are being stored in a partition using coalesce() or repartition() in Spark.

Collapse
 
chatchaikomrangded profile image
Chatchai Komrangded (Bas) • Edited

Hi Kyle, Thank a lot for your article, it's very useful information that data engineer can understand how to use Deta lake, with AWS Glue like Upsert scenario. I think your post is useful with Thai developer community, and I have already did translate your post in Thai language version, just want to let you know, and all credit to you. :)

dev.to/chatchaikomrangded/sql-base...

Collapse
 
klescosia profile image
Kyle Escosia

Cool! Thank you! Glad I could help! Thanks for letting me know.

Collapse
 
supernaveen profile image
nk7983

This is so awesome! Tried first time on our own data and looks very promising. May I know if you have written seperate glue job scripts for Update/Insert/Deletes or is it just one glue job that does all operations?

Current Situation:

We had 3~5 Business Units prior to 2019 and each business unit used to have their own warehouse tools and technologies for eg: one business unit completely built the warehouse using SQL Server CDC, Stored Procedures, SSIS, SSRS etc.This was done as very complex stored procedures with lots of surrogate keys generated and follows star schema. The jobs for this business unit uses CDC and have an SLA of 5 minutes. Users still want more and more fresh data.
Another Buiness Unit used Snaplogic for ETL and target data store as Redshift.
Another Business Unit used custom python codes to merge the data and write to SQL Server.

Now in 2022, these Business Units got merged, I have been tasked with building a common data ingestion framework for all the business units using lake house architecture/concepts. I have come with a draft architecture following prescriptive methodology from AWS, below is the tool set selected as we are an AWS shop

Stream Ingestion: Kinesis Firehouse
Batch Ingestion: AWS Glue
Jobs Orchestrator : MWAA ( Managed Airflow )
Lake House Data Store: S3
Target Analytics Store: Redshift
Presentation : Quicksight and Tableu

The jobs run on various cadence like 5 minutes to daily depending on each business unit requirement. I have proposed 3 AWS storage layers like raw/modified/processed. I'm so confused about how to partition these layers but to the best of my knowledge, i have proposed the below

raw --> raw-bucketname/source_system_name/tablename/extract_date=
Modified--> modified-bucketname/source_system_name/tablename ( if the table is large or have lot of data to query based on a date then choose date partition)
processed --> processed-bucketname/tablename/ ( partition should be based on analytical queries)

Questions that I have are:

  1. Is that above partitioning is a good approach? there are sometimes, business asks us to do a full refresh, in such cases there will be duplicate data in raw layer for different extract dates, is that good design ? What if someone wants to query RAW layer, won't they see lot of duplicate data ?
  2. Should I create crawlers for each of these layers separately? How do I organize Glue Catalog Database names, should I create a different database name for each sourcesystem and schema name? We have nearly 300+ schema's that we pull the data from, so in this case, I will have nearly 300*2 =600 (raw, modified layers) Glue Catalog database names.
  3. On what basis should I trigger the jobs and crawlers?
  4. Does Glue capable of completing execution with-in 5 minutes?
  5. Are there any auto generation tools available to generate glue scripts as its tough to develop each job independently? Any suggestions you have?
Collapse
 
klescosia profile image
Kyle Escosia

Hi nk793,

Yes, jobs are different for each process. You want to be as idempotent as possible. Just remember to tag your resources so you don't get lost in the jungle of jobs lol.

  1. Prefixes/Partitioning should be okay, but you might want to split the date further for throughput purposes (more prefix = more throughput). In case of a full refresh, you don't have a choice where you'll start with your earliest date and apply UPSERTS or changes as you go through the dates. What would be a scenario where you'll query the RAW layer? Usually DS accesses the Analytics/Curated/Processed layer, sometimes, staging layer. More info on storage layers here.

  2. Good thing that crawlers now support Delta Files, when I was writing this article, it doesn't support it yet. I suggest you should create crawlers for each layers so each crawler is not dependent from each other. Ideally, it should be 1 database per source system so you'll be able to distinguish them from each other. You can just put a _dev, _raw, _curated in the prefix if you want. Up to you.

  3. This should come from the business. If the trigger is everyday @9am, you can schedule that or if not, you can schedule it based on event. Crawlers can be run if there are additional partitions.

  4. Depends on how complex your processing is and how optimized your queries and codes are. AutoScaling in Glue is also a preview, perhaps have a go on that one. Check out also the different worker types in Glue.

  5. Glue has a Glue Studio, it's a drag and drop tool if you have troubles in writing your own code. If you're talking about automating the same set of Glue Scripts and creating a Glue Job, you can look at Infrastructure-as-a-Code (IaaC) frameworks such as AWS CDK, CloudFormation or Terraform.

Hope this was helpful for you.