How to enable DEBUG mode in Glue ?
sc = SparkContext()
sc.setLogLevel("DEBUG")
glueContext = GlueContext(sc)
How to log statement using Logger in Glue ?
logger = glueContext.get_logger()
logger.info('starting job')
How to access/get the parameter in the Glue job ?
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv,
['JOB_NAME',
'day_partition_key',
'hour_partition_key',
'day_partition_value',
'hour_partition_value'])
print "The day-partition key is: ", args['day_partition_key']
print "and the day-partition value is: ", args['day_partition_value']
How to delete s3 file in Glue ?
glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 0})
How to add source S3 filename for each record of DyamicFrame ?
attachFilename and attachTimestamp
S3bucket_node2 = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
format="json",
connection_options={
"paths": [s3_path],
"recurse": True,
},
format_options={"multiline": False,
"attachFilename": "FileNameCol",
"attachTimestamp": "timestampCol"},
transformation_ctx=f"S3bucket_node1",
)
How to create a Table from CLI ?
Use the attached json.
aws glue create-table —database-name default —cli-input-json file://create_table_input.json —region us-east-1 —debug
How to set Spark Conf in Glue code ?
from pyspark.conf import SparkConf
conf = (SparkConf().setAppName("MyGlueApp").set('spark.driver.maxResultSize','2g'))
sparkContext = SparkContext(conf=conf)
# without any conf
#sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
spark = glueContext.spark_session
How to enable disable column name case sensitivity ?
spark.sql("set spark.sql.caseSensitive=false")
How to merge schema of different parquet file of the dataframe ?
glueContext.sql("set spark.sql.parquet.mergeSchema=true")
How to set multiple Spark Conf with Glue job ?
Below configuration is to disable AQE and Auto Broadcast :
Key: --conf
Value: spark.sql.adaptive.enabled=false --conf spark.sql.autoBroadcastJoinThreshold=-1
How to created view of dataframe to run sql query in Spark ?
df.createOrReplaceTempView("temp_view")
spark.sql("select * from temp_view).show()
How to print the spark conf from Code ?
for item in sorted(sc._conf.getAll()): print(item)
How to convert Dataframe to DynamicFrame ?
from awsglue.dynamicframe import DynamicFrame
output_dyf = DynamicFrame.fromDF(df_products, glueContext, 'output_dyf')
How to use different DynamicFrame Transformations ?
Refer this documentation with Example :
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-transforms.html
Like :
- ApplyMapping
- DropFields
- DropNullFields
- ErrorsAsDynamicFrame
- EvaluateDataQuality
- FillMissingValues
- Filter
- FindIncrementalMatches
- FindMatches
- FlatMap
- Join
- Map
How to create a Glue table and write to S3 with DynamicFrame ?
# save as parquet to s3
writes3_dyf = glueContext.getSink(
path="s3://my-sample-bkt/temp/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=[],
enableUpdateCatalog=True,
transformation_ctx="writes3_dyf",
)
writes3_dyf.setCatalogInfo(
catalogDatabase="default", catalogTableName="table_name"
)
writes3_dyf.setFormat("glueparquet", compression="snappy")
writes3_dyf.writeFrame(output_dyf)
How to write to s3 and create Glue table with Dataframe?
writes3_dyf.toDF().write.format("parquet").saveAsTable('default.table_name', format='parquet',mode='overwrite',path='s3://my-sample-bkt/temp/')
To just write as parquet:
df.write.parquet("s3://bucket/tmp/out/people.parquet")
df.write.mode("overwrite").parquet("s3://bucket/tmp/out/people.parquet")
How to find number of partition in Dataframe ?
df.rdd.getNumPartitions()
How to check Dataframe is empty or not?
df.rdd.isEmpty()
How to configure different Glue job property in Glue Interactive Session?
You can add all the AWS Glue job parameters using %%configure :
Job parameter reference - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html#job-parameter-reference
%%configure
{
"--user-jars-first": "true",
"--enable-glue-datacatalog": "false"
}
Useful mock data to work with :
from pyspark.sql import Row
import time
import pyspark.sql.functions as F
ut = time.time()
product = [
{'product_id': '00001', 'product_name': 'Heater', 'price': 250.0, 'category': 'Electronics', 'updated_at': ut},
{'product_id': '00002', 'product_name': 'Thermostat', 'price': 400.00, 'category': 'Electronics', 'updated_at': ut},
{'product_id': '00003', 'product_name': 'Television', 'price': 600.00, 'category': 'Electronics', 'updated_at': ut},
{'product_id': '00004', 'product_name': 'Blender', 'price': 100.00, 'category': 'Electronics', 'updated_at': ut},
{'product_id': '00005', 'product_name': 'USB charger', 'price': 50.000, 'category': 'Electronics', 'updated_at': ut}
]
df_products = spark.createDataFrame(Row(**x) for x in product)
df_products.printSchema()
from awsglue.dynamicframe import DynamicFrame
output_dyf = DynamicFrame.fromDF(df_products, glueContext, 'output_dyf')
Do we have any publicly accessible big data set ?
Use the below S3 bucket for large data set. It has both CSV And Parquet, partitioned s3 path.
s3://noaa-ghcn-pds/parquet
How to get record count per Spark partition ?
Use the below code to add a column to existing dataframe (df), then run count query:
import org.apache.spark.sql.functions.spark_partition_id
df_with_id = df.withColumn("partitionId", spark_partition_id())
df_with_id.createOrReplaceTempView("df_with_id")
spark.sql("select partitionId, count(1) as num_records
from df_with_id
group by partitionId
order by num_records asc").show()
Top comments (0)