In a recent task, I had to use the frictionless library for validating a csv file, after a transformation. So, the weird part in this task was that, the transformation step comes first, and then the validation on the transformed data. Usually, a validation is performed first and then the data transformation (if any required).
Let's see, how I did that:
from frictionless import Resource, Schema, transform, Pipeline, steps, validate
# you can find the pipeline implementation in the next code section
build_successfully, trx_pipeline, error_messages = build_trx_pipeline(read_file, schema.field_names)
if not build_successfully:
error_file_name = file.filename
error_report[error_file_name] = error_messages
continue
# create a Pipeline object based on a list of Steps, see details below
trx_pipeline = Pipeline(steps=build_trx_pipeline)
# create a resource, based on the original file (no schema was specified)
original_resource = Resource(data=read_file, format="csv")
# apply the trandormation to the original file
transformed_resource = transform(original_resource, pipeline=trx_pipeline)
# explicitly specify the validation schema
transformed_resource.schema = Schema("/my_schema.schema.json")
- transform the data to the desired schema (based on the json schema)
# read file, I am using the fastAPI framework
read_file = await file.read()
# build the pipeline transformation, in term of steps
def build_trx_pipeline(read_file, schema_field_names) -> tuple[bool, list[Any], list[str]]:
csv_headers = str(read_file.splitlines()[0].decode("utf-8")).split(",")
pipeline_steps = []
# keep only the rows with apprtype == 'TRN' and tank == 'MAIN'
pipeline_steps.append(steps.row_filter(formula="foo == 'X' and bar == 'HELLO'"))
pipeline_steps.append(steps.field_remove(names=list({"a", "b"})))
# rename fields to match the lab dga SAI schema
renaming_steps = [
steps.field_update(name=a_name, descriptor={"name": b_name})
for a_name, b_name in toa2sai_fields.items()
if a_name in csv_headers
]
pipeline_steps.extend(renaming_steps)
# .....
return True, pipeline_steps, []
Top comments (0)