In this previous post, we showed how to run Delta Lake on Amazon EMR Serverless. Since then, a new release was out (6.7.0) with the --packages
flag implemented . This helps us getting things done with spark a lot easier. Yet, --packages
flag requires some extra networking setup that most of Data Scientists and Engineers are not familiar with. Our goal is to show step by step how to do it.
First, some concept explanations
When using Spark with Java dependencies, we have two options: (1) build and insert .jar
files manually in cluster or (2) pass the dependencies to the --packages
flag so spark can automatically download them from maven. Since release 6.7.0 of EMR Serverless, this flag is available for use.
The problem is that spark cluster must reach the internet to download packages from maven. Amazon EMR Serverless, at first, lives outside any VPC and so, cannot reach the internet. To do that, you must create your EMR application inside a VPC. However, EMR applications can only be created in private subnets which (by the way...) don't reach the internet and cannot reach S3 đ... How do we fix this?
Step one: networking
The diagram below shows the whole network structure that is necessary:
This is easily created on AWS in the VPC interface. Click the Create VPC button and select VPC and more. AWS does the heavy lifting and provides a design for a VPC with 2 public subnets, 2 private subnets, internet gateway, the necessary route tables and an S3-endpoint (so resources inside the VPC can reach S3).
You can set the Number of availability zones to 1 if you want but in order to have high availability, you should work with, at least, 2 AZ's.
Next, you need to make sure you mark at least one NAT Gateway which is responsible for letting private subnets reach the internet. Below is the screen with the final setup:
Hit create VPC and we're done with networking.
Last thing is to create a Security Group that allows outbound traffic to the internet. Go back to VPC in AWS and click Security Group in the left panel. Then, click Create security group. Name your security group, uncheck the selected VPC and check the one you have just created. By default, security groups don't allow any inbound traffic and allow all outbound traffic. We can leave it that way. Create the security group et voilĂ !
Step two: IAM Roles and Policies
You need two roles, a Service Linked Role and another role that gives permission to Access S3 and Glue. We have already discussed that in this previous post in the Setup - Authentication section. Check it out. We will also need a dataset to work with. The famous Titanic dataset should do it. You can download it here.
Step three: Create EMR Studio and an EMR Serverless Application
First, we must create an EMR Studio. If you don't have any studios created yet, this is very straightforward. After clicking Get started in the EMR Serverless home page, you can click to create a studio automatically.
Second, you have to create an EMR Serverless application. Set up a name and (remember!) choose release 6.7.0. To setup networking you have to check Choose custom settings and scroll down to Network connections.
In Network connections choose the VPC you created, the two private subnets and the security group.
Step four: Spark code
Now, we are preparing a simple pyspark code to simulate some modifications in the dataset (we will include two new passengers - Ney and Sarah - and we will update information on two passengers that were presumed dead but found alive, Mr. Owen Braund and Mr. William Allen). Below is the code to do that.
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
from delta.tables import *
print("Reading CSV file from S3...")
schema = "PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"
df = spark.read.csv(
"s3://<YOUR-BUCKET>/titanic",
header=True, schema=schema, sep=";"
)
print("Writing titanic dataset as a delta table...")
df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")
print("Updating and inserting new rows...")
new = df.where("PassengerId IN (1, 5)")
new = new.withColumn("Survived", f.lit(1))
newrows = [
(892, 1, 1, "Sarah Crepalde", "female", 23.0, 1, 0, None, None, None, None),
(893, 0, 1, "Ney Crepalde", "male", 35.0, 1, 0, None, None, None, None)
]
newrowsdf = spark.createDataFrame(newrows, schema=schema)
new = new.union(newrowsdf)
print("Create a delta table object...")
old = DeltaTable.forPath(spark, "s3://<YOUR-BUCKET>/silver/titanic_delta")
print("UPSERT...")
# UPSERT
(
old.alias("old")
.merge(new.alias("new"),
"old.PassengerId = new.PassengerId"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
print("Checking if everything is ok")
print("New data...")
(
spark.read.format("delta")
.load("s3://<YOUR-BUCKET>/silver/titanic_delta")
.where("PassengerId < 6 OR PassengerId > 888")
.show()
)
print("Old data - with time travel")
(
spark.read.format("delta")
.option("versionAsOf", "0")
.load("s3://<YOUR-BUCKET>/silver/titanic_delta")
.where("PassengerId < 6 OR PassengerId > 888")
.show()
)
This .py
file should be uploaded to S3.
Step five: GO!
Now, we submit a job for execution. We can do it with AWS CLI:
aws emr-serverless start-job-run \
--name Delta-Upsert \
--application-id <YOUR-APPLICATION-ID> \
--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py",
"sparkSubmitParameters": "--packages io.delta:delta-core_2.12:2.0.0"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"}
}
}'
That's it! When the job is done, go the you log folder and check the logs (look for your application ID, job ID and SPARK_DRIVER logs). You should see something like this:
Reading CSV file from S3...
Writing titanic dataset as a delta table...
Updating and inserting new rows...
Create a delta table object...
UPSERT...
Checking if everything is ok
New data...
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 1| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 1| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
| 889| 0| 3|"Johnston, Miss. ...|female|null| 1| 2| W./C. 6607| 23.45| null| S|
| 890| 1| 1|Behr, Mr. Karl Ho...| male|26.0| 0| 0| 111369| 30.0| C148| C|
| 891| 0| 3| Dooley, Mr. Patrick| male|32.0| 0| 0| 370376| 7.75| null| Q|
| 892| 1| 1| Sarah Crepalde|female|23.0| 1| 0| null| null| null| null|
| 893| 0| 1| Ney Crepalde| male|35.0| 1| 0| null| null| null| null|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Old data - with time travel
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
| 889| 0| 3|"Johnston, Miss. ...|female|null| 1| 2| W./C. 6607| 23.45| null| S|
| 890| 1| 1|Behr, Mr. Karl Ho...| male|26.0| 0| 0| 111369| 30.0| C148| C|
| 891| 0| 3| Dooley, Mr. Patrick| male|32.0| 0| 0| 370376| 7.75| null| Q|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
Happy coding and build on!
Top comments (2)
Your solution works great! It is effective and easy to follow. I was struggling with this for the past 24 hours. Thank you for posting this!
What if my project is not single file and much more complex?