DEV Community

Cover image for Glue Spark frequently used code snippets and configuration
Data Tech Bridge
Data Tech Bridge

Posted on

Glue Spark frequently used code snippets and configuration

How to enable DEBUG mode in Glue ?

    sc = SparkContext()
    sc.setLogLevel("DEBUG")
    glueContext = GlueContext(sc)
Enter fullscreen mode Exit fullscreen mode

How to log statement using Logger in Glue ?

logger = glueContext.get_logger()
logger.info('starting job')
Enter fullscreen mode Exit fullscreen mode

How to access/get the parameter in the Glue job ?

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-get-resolved-options.html

import sys
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv,
                          ['JOB_NAME',
                           'day_partition_key',
                           'hour_partition_key',
                           'day_partition_value',
                           'hour_partition_value'])
print "The day-partition key is: ", args['day_partition_key']
print "and the day-partition value is: ", args['day_partition_value']
Enter fullscreen mode Exit fullscreen mode

How to delete s3 file in Glue ?

glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 0})
Enter fullscreen mode Exit fullscreen mode

How to add source S3 filename for each record of DyamicFrame ?

attachFilename and attachTimestamp

S3bucket_node2 = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        format="json",
        connection_options={
            "paths": [s3_path],
            "recurse": True,
        },
        format_options={"multiline": False, 
        "attachFilename": "FileNameCol",
        "attachTimestamp": "timestampCol"},
        transformation_ctx=f"S3bucket_node1",
    )
Enter fullscreen mode Exit fullscreen mode

How to create a Table from CLI ?

Use the attached json.

aws glue create-table —database-name default —cli-input-json file://create_table_input.json —region us-east-1 —debug
Enter fullscreen mode Exit fullscreen mode

How to set Spark Conf in Glue code ?

from pyspark.conf import SparkConf  
conf = (SparkConf().setAppName("MyGlueApp").set('spark.driver.maxResultSize','2g'))
sparkContext = SparkContext(conf=conf)
# without any conf
#sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
spark = glueContext.spark_session

Enter fullscreen mode Exit fullscreen mode

How to enable disable column name case sensitivity ?

spark.sql("set spark.sql.caseSensitive=false")
Enter fullscreen mode Exit fullscreen mode

How to merge schema of different parquet file of the dataframe ?

glueContext.sql("set spark.sql.parquet.mergeSchema=true")
Enter fullscreen mode Exit fullscreen mode

How to set multiple Spark Conf with Glue job ?

Below configuration is to disable AQE and Auto Broadcast :

Key: --conf
 Value: spark.sql.adaptive.enabled=false --conf spark.sql.autoBroadcastJoinThreshold=-1
Enter fullscreen mode Exit fullscreen mode

How to created view of dataframe to run sql query in Spark ?

df.createOrReplaceTempView("temp_view")
spark.sql("select * from temp_view).show()
Enter fullscreen mode Exit fullscreen mode

How to print the spark conf from Code ?

for item in sorted(sc._conf.getAll()): print(item)
Enter fullscreen mode Exit fullscreen mode

How to convert Dataframe to DynamicFrame ?

from awsglue.dynamicframe import DynamicFrame
output_dyf = DynamicFrame.fromDF(df_products, glueContext, 'output_dyf') 
Enter fullscreen mode Exit fullscreen mode

How to use different DynamicFrame Transformations ?

Refer this documentation with Example :
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-transforms.html
Like :

  • ApplyMapping
  • DropFields
  • DropNullFields
  • ErrorsAsDynamicFrame
  • EvaluateDataQuality
  • FillMissingValues
  • Filter
  • FindIncrementalMatches
  • FindMatches
  • FlatMap
  • Join
  • Map

How to create a Glue table and write to S3 with DynamicFrame ?

# save as parquet to s3
writes3_dyf = glueContext.getSink(
    path="s3://my-sample-bkt/temp/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    enableUpdateCatalog=True,
    transformation_ctx="writes3_dyf",
)
writes3_dyf.setCatalogInfo(
    catalogDatabase="default", catalogTableName="table_name"
)
writes3_dyf.setFormat("glueparquet", compression="snappy")
writes3_dyf.writeFrame(output_dyf)
Enter fullscreen mode Exit fullscreen mode

How to write to s3 and create Glue table with Dataframe?

writes3_dyf.toDF().write.format("parquet").saveAsTable('default.table_name', format='parquet',mode='overwrite',path='s3://my-sample-bkt/temp/')
Enter fullscreen mode Exit fullscreen mode

To just write as parquet:

df.write.parquet("s3://bucket/tmp/out/people.parquet") 
df.write.mode("overwrite").parquet("s3://bucket/tmp/out/people.parquet") 
Enter fullscreen mode Exit fullscreen mode

How to find number of partition in Dataframe ?

df.rdd.getNumPartitions()
Enter fullscreen mode Exit fullscreen mode

How to check Dataframe is empty or not?

df.rdd.isEmpty()
Enter fullscreen mode Exit fullscreen mode

How to configure different Glue job property in Glue Interactive Session?

You can add all the AWS Glue job parameters using %%configure :

Job parameter reference - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html#job-parameter-reference

%%configure
{
   "--user-jars-first": "true",
   "--enable-glue-datacatalog": "false"
}   
Enter fullscreen mode Exit fullscreen mode

Useful mock data to work with :

from pyspark.sql import Row
import time 
import pyspark.sql.functions as F
ut = time.time()

product = [
    {'product_id': '00001', 'product_name': 'Heater', 'price': 250.0, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400.00, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00003', 'product_name': 'Television', 'price': 600.00, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00004', 'product_name': 'Blender', 'price': 100.00, 'category': 'Electronics', 'updated_at': ut},
    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50.000, 'category': 'Electronics', 'updated_at': ut}
]

df_products = spark.createDataFrame(Row(**x) for x in product)
df_products.printSchema()

from awsglue.dynamicframe import DynamicFrame
output_dyf = DynamicFrame.fromDF(df_products, glueContext, 'output_dyf')
Enter fullscreen mode Exit fullscreen mode

Do we have any publicly accessible big data set ?

Use the below S3 bucket for large data set. It has both CSV And Parquet, partitioned s3 path.

s3://noaa-ghcn-pds/parquet
Enter fullscreen mode Exit fullscreen mode

How to get record count per Spark partition ?

Use the below code to add a column to existing dataframe (df), then run count query:

import org.apache.spark.sql.functions.spark_partition_id

df_with_id = df.withColumn("partitionId", spark_partition_id())
df_with_id.createOrReplaceTempView("df_with_id")

spark.sql("select partitionId, count(1) as num_records
from df_with_id
group by partitionId
order by num_records asc").show()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)