DEV Community

Damian Mazurek
Damian Mazurek

Posted on

Processing EventHub Captured Messages in Avro Files Using Databricks

Step 1: Mount Azure Blob Storage

The first step is to mount the Azure Blob Storage, which contains our Avro files. Azure Blob Storage is a scalable and secure object storage platform. Databricks file system (DBFS) allows you to mount blob storage so that it can be accessed like a local file system. Below is the command to mount Blob Storage:

dbutils.fs.mount(
  source = "wasbs://{container}@{storage_name}.blob.core.windows.net",
  mount_point = "/mnt/iotdata",
  extra_configs = {"fs.azure.account.key.{storage_name}.blob.core.windows.net":"AccessKey"})
Enter fullscreen mode Exit fullscreen mode

Replace {container}, {storage_name}, and AccessKey with your container name, storage account name, and access key respectively.

Step 2: Load all Avro Files

After mounting the storage, we can load Avro files using the spark.read.format('avro') function. Avro is a data serialization system that allows big data to be exchanged among programs written in different languages.

The number of * in the path represents the depth of subdirectories that you set in EventHub in the capture naming convention. Each * represents a directory level.

df=spark.read.format('avro').load("/mnt/iotdata/*/*/*/*/*/*/*/*/*.avro")
display(df)
Enter fullscreen mode Exit fullscreen mode

This command reads all the Avro files from the mounted blob storage that are captured by EventHub native functionality and displays the DataFrame.

Step 3: Transform Binary Message Payload (Body) to String

The Avro files contain binary data, which needs to be converted into a string for further processing. You can convert the binary data to a string format using the cast function in PySpark.

body_df=df.withColumn("Body",df.Body.cast("string")).select("Body")
display(body_df)
Enter fullscreen mode Exit fullscreen mode

Step 4: Map String to JSON Schema

After converting the data to string format, we can map it to a JSON schema using PySpark's StructType and from_json functions.

First, define the schema for the data:

from pyspark.sql.types import *
data_schema = StructType(
    [
        StructField("a",DoubleType(),True),
        StructField("b", DoubleType(), True)
    ]
)
json_schema = StructType(
    [
        StructField("factoryId", LongType(), True),
        StructField("timeStamp", TimestampType(), True),
        StructField("data", data_schema, True)
    ]
)
Enter fullscreen mode Exit fullscreen mode

Then, convert the string data to JSON format according to the schema:

from pyspark.sql.functions import from_json, col
json_df = body_df.withColumn("Body", from_json(col("Body"), json_schema))
display(json_df)
Enter fullscreen mode Exit fullscreen mode

Step 5: Map JSON to Table Format

After converting the data to JSON format, we can flatten the data and convert it into a table format.

final_df=json_df.select(
 col("Body.data.a"),
 col("Body.data.b"), 
 col("Body.factoryId"),
 col("Body.timestamp"))
Enter fullscreen mode Exit fullscreen mode

Step 6: Save it using Spark SQL in Silver Layer

Finally, we save our processed data using Spark SQL. Here, we'll save it in a silver layer, which is a cleaned, processed representation of our raw data.

First, create a database for our processed data:

%sql
CREATE DATABASE iotdata
LOCATION "/mnt/iotdata"
Enter fullscreen mode Exit fullscreen mode

Next, save the processed DataFrame as a table in the newly created database:

final_df.write.saveAsTable("iotdata.tablename")
Enter fullscreen mode Exit fullscreen mode

And there you have it! You have successfully processed Avro files captured from Azure Event Hubs using Databricks. You've transformed binary data into a readable format, mapped it to a schema, converted it into a silver layer tabular format, and saved it using Spark SQL.

Top comments (0)