DEV Community

Dror Atariah
Dror Atariah

Posted on

JSON Schema to PySpark StructType

Assume that you get the following JSON schema specification:

// Saved in schema_def.json
{
  "title": "Customer",
  "description": "Schema for Customer data",
  "type": "object",
  "properties": {
    "name": {
      "type": "string",
      "description": "Customer name"
    },
    "age": {
      "type": "integer",
      "description": "Customer age"
    }
  },
  "required": [
    "name"
  ]
}
Enter fullscreen mode Exit fullscreen mode

Naturally, when reading data that adhere to this definition, you'd like to have a programmatic way to obtain a StructType object.
Based on my research, this seems to be a little tricky and there is no way to do it nicely (i.e., with the help of a well accepted package).

So, here is a little code snippet that you can use to achieve what you need:

import json
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DoubleType,
    BooleanType,
    ArrayType,
    LongType,
    NullType,
)

def json_schema_to_pyspark_struct(schema: dict) -> StructType:
    """
    Converts a JSON schema dictionary to a PySpark StructType.

    This function recursively parses the JSON schema and maps its types
    to the corresponding PySpark SQL data types.

    Args:
        schema: A dictionary representing the JSON schema.

    Returns:
        A PySpark StructType object representing the schema.

    Raises:
        ValueError: If an unsupported JSON schema type is encountered.
    """

    # Mapping from JSON schema types to PySpark data types
    type_mapping = {
        "string": StringType(),
        "number": DoubleType(),  # More general than FloatType
        "integer": LongType(),    # More general than IntegerType
        "boolean": BooleanType(),
        "null": NullType(),
    }

    fields = []

    # Get the list of required fields, if specified
    required_fields = schema.get("required", [])

    # Iterate over each property defined in the schema
    for property_name, property_details in schema.get("properties", {}).items():
        field_type_str = property_details.get("type")
        is_nullable = property_name not in required_fields
        field_type = None

        if field_type_str == "object":
            # Recursively call the function for nested objects
            field_type = json_schema_to_pyspark_struct(property_details)
        elif field_type_str == "array":
            # Handle arrays by getting the type of the items in the array
            item_schema = property_details.get("items", {})
            if not item_schema:
                 # Default to StringType if items schema is not defined
                item_type = StringType()
            elif item_schema.get("type") == "object":
                # Recursively call for arrays of objects
                item_type = json_schema_to_pyspark_struct(item_schema)
            else:
                item_type = type_mapping.get(item_schema.get("type"))
                if not item_type:
                    raise ValueError(f"Unsupported array item type: {item_schema.get('type')}")

            field_type = ArrayType(item_type, True) # Arrays can contain nulls
        else:
            # Map simple types using the dictionary
            field_type = type_mapping.get(field_type_str)
            if not field_type:
                raise ValueError(f"Unsupported type for property '{property_name}': {field_type_str}")

        # Add the constructed field to our list of fields
        fields.append(StructField(property_name, field_type, is_nullable))

    return StructType(fields)
Enter fullscreen mode Exit fullscreen mode

So, you can obtain the StructType as follows:

with open("schema_def.json") as f:
    schema_def = json.load(f)
schema = json_schema_to_pyspark_struct(schema_def)
Enter fullscreen mode Exit fullscreen mode

This yields: StructType([StructField('name', StringType(), False), StructField('age', LongType(), True)]).
Nice, using this schema, we can now read/create data that adhere to the schema.
For example:

spark.createDataFrame([["Alice", 10], ["Bob", 11]], schema=schema)
Enter fullscreen mode Exit fullscreen mode

What do you think? How do you solve this use case?

Disclaimer: The code above is mostly a collaboration with Gemini. So be careful ;)

Top comments (0)