Assume that you get the following JSON schema specification:
// Saved in schema_def.json
{
"title": "Customer",
"description": "Schema for Customer data",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Customer name"
},
"age": {
"type": "integer",
"description": "Customer age"
}
},
"required": [
"name"
]
}
Naturally, when reading data that adhere to this definition, you'd like to have a programmatic way to obtain a StructType
object.
Based on my research, this seems to be a little tricky and there is no way to do it nicely (i.e., with the help of a well accepted package).
So, here is a little code snippet that you can use to achieve what you need:
import json
from pyspark.sql.types import (
StructType,
StructField,
StringType,
IntegerType,
DoubleType,
BooleanType,
ArrayType,
LongType,
NullType,
)
def json_schema_to_pyspark_struct(schema: dict) -> StructType:
"""
Converts a JSON schema dictionary to a PySpark StructType.
This function recursively parses the JSON schema and maps its types
to the corresponding PySpark SQL data types.
Args:
schema: A dictionary representing the JSON schema.
Returns:
A PySpark StructType object representing the schema.
Raises:
ValueError: If an unsupported JSON schema type is encountered.
"""
# Mapping from JSON schema types to PySpark data types
type_mapping = {
"string": StringType(),
"number": DoubleType(), # More general than FloatType
"integer": LongType(), # More general than IntegerType
"boolean": BooleanType(),
"null": NullType(),
}
fields = []
# Get the list of required fields, if specified
required_fields = schema.get("required", [])
# Iterate over each property defined in the schema
for property_name, property_details in schema.get("properties", {}).items():
field_type_str = property_details.get("type")
is_nullable = property_name not in required_fields
field_type = None
if field_type_str == "object":
# Recursively call the function for nested objects
field_type = json_schema_to_pyspark_struct(property_details)
elif field_type_str == "array":
# Handle arrays by getting the type of the items in the array
item_schema = property_details.get("items", {})
if not item_schema:
# Default to StringType if items schema is not defined
item_type = StringType()
elif item_schema.get("type") == "object":
# Recursively call for arrays of objects
item_type = json_schema_to_pyspark_struct(item_schema)
else:
item_type = type_mapping.get(item_schema.get("type"))
if not item_type:
raise ValueError(f"Unsupported array item type: {item_schema.get('type')}")
field_type = ArrayType(item_type, True) # Arrays can contain nulls
else:
# Map simple types using the dictionary
field_type = type_mapping.get(field_type_str)
if not field_type:
raise ValueError(f"Unsupported type for property '{property_name}': {field_type_str}")
# Add the constructed field to our list of fields
fields.append(StructField(property_name, field_type, is_nullable))
return StructType(fields)
So, you can obtain the StructType
as follows:
with open("schema_def.json") as f:
schema_def = json.load(f)
schema = json_schema_to_pyspark_struct(schema_def)
This yields: StructType([StructField('name', StringType(), False), StructField('age', LongType(), True)])
.
Nice, using this schema, we can now read/create data that adhere to the schema.
For example:
spark.createDataFrame([["Alice", 10], ["Bob", 11]], schema=schema)
What do you think? How do you solve this use case?
Disclaimer: The code above is mostly a collaboration with Gemini. So be careful ;)
Top comments (0)