DEV Community

Konstantinos Blatsoukas
Konstantinos Blatsoukas

Posted on

frictionless, first transform and then validate

In a recent task, I had to use the frictionless library for validating a csv file, after a transformation. So, the weird part in this task was that, the transformation step comes first, and then the validation on the transformed data. Usually, a validation is performed first and then the data transformation (if any required).

Let's see, how I did that:

from frictionless import Resource, Schema, transform, Pipeline, steps, validate

# you can find the pipeline implementation in the next code section
    build_successfully, trx_pipeline, error_messages = build_trx_pipeline(read_file, schema.field_names)
if not build_successfully:
    error_file_name = file.filename
    error_report[error_file_name] = error_messages
        continue

# create a Pipeline object based on a list of Steps, see details below
trx_pipeline = Pipeline(steps=build_trx_pipeline)

# create a resource, based on the original file (no schema was specified)
original_resource = Resource(data=read_file, format="csv")

# apply the trandormation to the original file
transformed_resource = transform(original_resource, pipeline=trx_pipeline)

# explicitly specify  the validation schema
transformed_resource.schema = Schema("/my_schema.schema.json")
Enter fullscreen mode Exit fullscreen mode
  • transform the data to the desired schema (based on the json schema)

# read file, I am using the fastAPI framework
read_file = await file.read()

# build the pipeline transformation, in term of steps
def build_trx_pipeline(read_file, schema_field_names) -> tuple[bool, list[Any], list[str]]:

    csv_headers = str(read_file.splitlines()[0].decode("utf-8")).split(",")

    pipeline_steps = []

    # keep only the rows with apprtype == 'TRN' and tank == 'MAIN'
    pipeline_steps.append(steps.row_filter(formula="foo == 'X' and bar == 'HELLO'"))

    pipeline_steps.append(steps.field_remove(names=list({"a", "b"})))

    # rename fields to match the lab dga SAI schema
    renaming_steps = [
            steps.field_update(name=a_name, descriptor={"name": b_name})
            for a_name, b_name in toa2sai_fields.items()
            if a_name in csv_headers
      ]
    pipeline_steps.extend(renaming_steps)

    #    .....

    return True, pipeline_steps, []
Enter fullscreen mode Exit fullscreen mode

Top comments (0)