Have you wondered how to write Hudi tables (Scala) in AWS Glue?
Look no further.
Pre-requisites
- Create a Glue Database called
hudi_dbfrom theDatabasesunderData Catalogmenu in the Glue Console
Let's pick the Apache Hudi Spark QuickStart guide to drive this example.
Configuring the job
- In Glue console, choose
ETL Jobsthen chooseScript Editor - Now in the tabs above, choose
Job detailsand inLanguagechooseScala - Feel free to make any infra changes as required.
- Click on
Advanced propertiesand navigate toJob parametersand add the below parameters one by one. Of course, change these variables as you prefer.-
--S3_OUTPUT_PATHass3://hudi-spark-quickstart/write-path/ -
--classasGlueApp -
--confasspark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false -
--datalake-formatsashudi
-
Note: In this example, I'm using the default Hudi version - 0.12.0 - that comes with Glue 4.0. If you want to use a different Hudi version, you might have to add the jar to the class path by adding one more property
--extra-jarsand point to the S3 path of the Hudi JAR file.
On to the cool stuff now.
Scripting
Navigate to the Script tab and add the below Scala code
Let's add the boiler plate imports
import com.amazonaws.services.glue.{GlueContext, DynamicFrame}
import com.amazonaws.services.glue.util.GlueArgParser
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import com.amazonaws.services.glue.log.GlueLogger
Add glue specific code, i.e. to parse the job parameters and to create a glueContext
object GlueApp {
def main(sysArgs: Array[String]) {
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "S3_OUTPUT_PATH").toArray)
val spark: SparkSession = SparkSession.builder().appName("AWS Glue Hudi Job").getOrCreate()
val glueContext: GlueContext = new GlueContext(spark.sparkContext)
val logger = new GlueLogger()
Prepping the data.
import spark.implicits._
val tableName = "trips"
val recordKeyColumn = "uuid"
val precombineKeyColumn = "ts"
val partitionKeyColumn = "city"
val s3OutputPath = args("S3_OUTPUT_PATH")
val glueDbName = "hudi_db"
val writePath = s"$s3OutputPath/$tableName"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo" ),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
Add the options required by Hudi to write the table and sync it with Glue Database.
val hudiOptions = Map[String, String](
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.recordkey.field" -> recordKeyColumn,
"hoodie.datasource.write.precombine.field" -> precombineKeyColumn,
"hoodie.datasource.write.partitionpath.field" -> partitionKeyColumn,
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.storage.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.database" -> glueDbName,
"hoodie.datasource.hive_sync.table" -> tableName,
"hoodie.datasource.hive_sync.partition_fields" -> partitionKeyColumn,
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc" -> "false",
"hoodie.datasource.hive_sync.mode" -> "hms",
"path" -> writePath
)
Finally create the dataframe and write it to S3.
var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write
.format("hudi")
.options(hudiOptions)
.mode("overwrite")
.save()
logger.info("Data successfully written to S3 using Hudi")
}
}
Querying
Now that we have written the table to S3, we can query this table from Athena.
SELECT * FROM "hudi_db"."trips" limit 10;
Top comments (0)