DEV Community

Damian Mazurek
Damian Mazurek

Posted on


Processing EventHub Captured Messages in Avro Files Using Databricks

Step 1: Mount Azure Blob Storage

The first step is to mount the Azure Blob Storage, which contains our Avro files. Azure Blob Storage is a scalable and secure object storage platform. Databricks file system (DBFS) allows you to mount blob storage so that it can be accessed like a local file system. Below is the command to mount Blob Storage:

  source = "wasbs://{container}@{storage_name}",
  mount_point = "/mnt/iotdata",
  extra_configs = {"{storage_name}":"AccessKey"})
Enter fullscreen mode Exit fullscreen mode

Replace {container}, {storage_name}, and AccessKey with your container name, storage account name, and access key respectively.

Step 2: Load all Avro Files

After mounting the storage, we can load Avro files using the'avro') function. Avro is a data serialization system that allows big data to be exchanged among programs written in different languages.

The number of * in the path represents the depth of subdirectories that you set in EventHub in the capture naming convention. Each * represents a directory level.'avro').load("/mnt/iotdata/*/*/*/*/*/*/*/*/*.avro")
Enter fullscreen mode Exit fullscreen mode

This command reads all the Avro files from the mounted blob storage that are captured by EventHub native functionality and displays the DataFrame.

Step 3: Transform Binary Message Payload (Body) to String

The Avro files contain binary data, which needs to be converted into a string for further processing. You can convert the binary data to a string format using the cast function in PySpark.

Enter fullscreen mode Exit fullscreen mode

Step 4: Map String to JSON Schema

After converting the data to string format, we can map it to a JSON schema using PySpark's StructType and from_json functions.

First, define the schema for the data:

from pyspark.sql.types import *
data_schema = StructType(
        StructField("b", DoubleType(), True)
json_schema = StructType(
        StructField("factoryId", LongType(), True),
        StructField("timeStamp", TimestampType(), True),
        StructField("data", data_schema, True)
Enter fullscreen mode Exit fullscreen mode

Then, convert the string data to JSON format according to the schema:

from pyspark.sql.functions import from_json, col
json_df = body_df.withColumn("Body", from_json(col("Body"), json_schema))
Enter fullscreen mode Exit fullscreen mode

Step 5: Map JSON to Table Format

After converting the data to JSON format, we can flatten the data and convert it into a table format.
Enter fullscreen mode Exit fullscreen mode

Step 6: Save it using Spark SQL in Silver Layer

Finally, we save our processed data using Spark SQL. Here, we'll save it in a silver layer, which is a cleaned, processed representation of our raw data.

First, create a database for our processed data:

LOCATION "/mnt/iotdata"
Enter fullscreen mode Exit fullscreen mode

Next, save the processed DataFrame as a table in the newly created database:

Enter fullscreen mode Exit fullscreen mode

And there you have it! You have successfully processed Avro files captured from Azure Event Hubs using Databricks. You've transformed binary data into a readable format, mapped it to a schema, converted it into a silver layer tabular format, and saved it using Spark SQL.

Top comments (0)