DEV Community

Damian Mazurek
Damian Mazurek

Posted on

2

Processing EventHub Captured Messages in Avro Files Using Databricks

Step 1: Mount Azure Blob Storage

The first step is to mount the Azure Blob Storage, which contains our Avro files. Azure Blob Storage is a scalable and secure object storage platform. Databricks file system (DBFS) allows you to mount blob storage so that it can be accessed like a local file system. Below is the command to mount Blob Storage:

dbutils.fs.mount(
  source = "wasbs://{container}@{storage_name}.blob.core.windows.net",
  mount_point = "/mnt/iotdata",
  extra_configs = {"fs.azure.account.key.{storage_name}.blob.core.windows.net":"AccessKey"})
Enter fullscreen mode Exit fullscreen mode

Replace {container}, {storage_name}, and AccessKey with your container name, storage account name, and access key respectively.

Step 2: Load all Avro Files

After mounting the storage, we can load Avro files using the spark.read.format('avro') function. Avro is a data serialization system that allows big data to be exchanged among programs written in different languages.

The number of * in the path represents the depth of subdirectories that you set in EventHub in the capture naming convention. Each * represents a directory level.

df=spark.read.format('avro').load("/mnt/iotdata/*/*/*/*/*/*/*/*/*.avro")
display(df)
Enter fullscreen mode Exit fullscreen mode

This command reads all the Avro files from the mounted blob storage that are captured by EventHub native functionality and displays the DataFrame.

Step 3: Transform Binary Message Payload (Body) to String

The Avro files contain binary data, which needs to be converted into a string for further processing. You can convert the binary data to a string format using the cast function in PySpark.

body_df=df.withColumn("Body",df.Body.cast("string")).select("Body")
display(body_df)
Enter fullscreen mode Exit fullscreen mode

Step 4: Map String to JSON Schema

After converting the data to string format, we can map it to a JSON schema using PySpark's StructType and from_json functions.

First, define the schema for the data:

from pyspark.sql.types import *
data_schema = StructType(
    [
        StructField("a",DoubleType(),True),
        StructField("b", DoubleType(), True)
    ]
)
json_schema = StructType(
    [
        StructField("factoryId", LongType(), True),
        StructField("timeStamp", TimestampType(), True),
        StructField("data", data_schema, True)
    ]
)
Enter fullscreen mode Exit fullscreen mode

Then, convert the string data to JSON format according to the schema:

from pyspark.sql.functions import from_json, col
json_df = body_df.withColumn("Body", from_json(col("Body"), json_schema))
display(json_df)
Enter fullscreen mode Exit fullscreen mode

Step 5: Map JSON to Table Format

After converting the data to JSON format, we can flatten the data and convert it into a table format.

final_df=json_df.select(
 col("Body.data.a"),
 col("Body.data.b"), 
 col("Body.factoryId"),
 col("Body.timestamp"))
Enter fullscreen mode Exit fullscreen mode

Step 6: Save it using Spark SQL in Silver Layer

Finally, we save our processed data using Spark SQL. Here, we'll save it in a silver layer, which is a cleaned, processed representation of our raw data.

First, create a database for our processed data:

%sql
CREATE DATABASE iotdata
LOCATION "/mnt/iotdata"
Enter fullscreen mode Exit fullscreen mode

Next, save the processed DataFrame as a table in the newly created database:

final_df.write.saveAsTable("iotdata.tablename")
Enter fullscreen mode Exit fullscreen mode

And there you have it! You have successfully processed Avro files captured from Azure Event Hubs using Databricks. You've transformed binary data into a readable format, mapped it to a schema, converted it into a silver layer tabular format, and saved it using Spark SQL.

Heroku

Simplify your DevOps and maximize your time.

Since 2007, Heroku has been the go-to platform for developers as it monitors uptime, performance, and infrastructure concerns, allowing you to focus on writing code.

Learn More

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay