Step 1: Mount Azure Blob Storage
The first step is to mount the Azure Blob Storage, which contains our Avro files. Azure Blob Storage is a scalable and secure object storage platform. Databricks file system (DBFS) allows you to mount blob storage so that it can be accessed like a local file system. Below is the command to mount Blob Storage:
dbutils.fs.mount(
source = "wasbs://{container}@{storage_name}.blob.core.windows.net",
mount_point = "/mnt/iotdata",
extra_configs = {"fs.azure.account.key.{storage_name}.blob.core.windows.net":"AccessKey"})
Replace {container}, {storage_name}, and AccessKey with your container name, storage account name, and access key respectively.
Step 2: Load all Avro Files
After mounting the storage, we can load Avro files using the spark.read.format('avro') function. Avro is a data serialization system that allows big data to be exchanged among programs written in different languages.
The number of * in the path represents the depth of subdirectories that you set in EventHub in the capture naming convention. Each * represents a directory level.
df=spark.read.format('avro').load("/mnt/iotdata/*/*/*/*/*/*/*/*/*.avro")
display(df)
This command reads all the Avro files from the mounted blob storage that are captured by EventHub native functionality and displays the DataFrame.
Step 3: Transform Binary Message Payload (Body) to String
The Avro files contain binary data, which needs to be converted into a string for further processing. You can convert the binary data to a string format using the cast function in PySpark.
body_df=df.withColumn("Body",df.Body.cast("string")).select("Body")
display(body_df)
Step 4: Map String to JSON Schema
After converting the data to string format, we can map it to a JSON schema using PySpark's StructType and from_json functions.
First, define the schema for the data:
from pyspark.sql.types import *
data_schema = StructType(
[
StructField("a",DoubleType(),True),
StructField("b", DoubleType(), True)
]
)
json_schema = StructType(
[
StructField("factoryId", LongType(), True),
StructField("timeStamp", TimestampType(), True),
StructField("data", data_schema, True)
]
)
Then, convert the string data to JSON format according to the schema:
from pyspark.sql.functions import from_json, col
json_df = body_df.withColumn("Body", from_json(col("Body"), json_schema))
display(json_df)
Step 5: Map JSON to Table Format
After converting the data to JSON format, we can flatten the data and convert it into a table format.
final_df=json_df.select(
col("Body.data.a"),
col("Body.data.b"),
col("Body.factoryId"),
col("Body.timestamp"))
Step 6: Save it using Spark SQL in Silver Layer
Finally, we save our processed data using Spark SQL. Here, we'll save it in a silver layer, which is a cleaned, processed representation of our raw data.
First, create a database for our processed data:
%sql
CREATE DATABASE iotdata
LOCATION "/mnt/iotdata"
Next, save the processed DataFrame as a table in the newly created database:
final_df.write.saveAsTable("iotdata.tablename")
And there you have it! You have successfully processed Avro files captured from Azure Event Hubs using Databricks. You've transformed binary data into a readable format, mapped it to a schema, converted it into a silver layer tabular format, and saved it using Spark SQL.
Top comments (0)