DEV Community

Cover image for Design Patterns for Data Engineers: Cleaner ETL with the Builder Pattern.
Cristian Bergamo
Cristian Bergamo

Posted on • Edited on

Design Patterns for Data Engineers: Cleaner ETL with the Builder Pattern.

In my job, I often end up writing long ETL pipelines in Python and PySpark. The usual story: you read a bunch of tables, join them together, run several preprocessing steps where the output of one step becomes the input of another, and meanwhile, requirements keep changing --> new rules, new table names, renamed columns, and so on.

Recently, I had to refactor a client function that was doing exactly this. It received several Spark DataFrames as input and was supposed to return one “clean” transactions DataFrame, ready to be written to the database. Inside, it was basically just calling a chain of helper functions in a fixed order. Each helper returned a DataFrame that either contributed to the final result or was used as input for some other step further down the pipeline.

The problem was that everything was tightly coupled. Each helper depended (sometimes implicitly) on what the previous helpers were doing. Changing a single line in one of them meant double-checking a lot of other functions to make sure nothing broke. It was fragile, and refactoring was painful.

In situations like this, the Builder design pattern turned out to be a lifesaver for me. It’s very handy when the object you want to build is the result of several processing steps, not just a few fields assigned inside an __init__. If you want a proper deep dive into the pattern itself, I recommend this page: https://refactoring.guru/design-patterns/builder. I first studied it in the book Python Design Patterns by Ayeva and Kasampalis, which I’d also recommend.

In the example below, we’ll define:

  • a Transactions class – with very little responsibility: it just defines the attributes that make up our final object (raw inputs, intermediate tables, final output);
  • a TransactionsBuilderType1 class – which receives the input DataFrames, creates a Transactions instance, and exposes the methods that progressively build each attribute;
  • a Director class – which knows the builder interface, has one main method that runs the builder steps in the right order, and exposes a method/property to return the final Transactions object once everything is done.

class Transactions:
    def __init__(self, input_table_1, input_table_2):
        self.raw_input_table_1 = input_table_1
        self.raw_input_table_2 = input_table_2

        self.preprocessed_table_1 = None
        self.preprocessed_table_2 = None

        # Final ETL output table (e.g. cleaned, enriched transactions)
        self.preprocessed_transactions = None


class TransactionsBuilderType1:
    def __init__(self, input_table_1, input_table_2):
        self.transactions = Transactions(
            input_table_1=input_table_1,
            input_table_2=input_table_2,
        )
        ... # other builder-related attributes (configs, parameters, etc.)

    def preprocess_table_1(self):
        # Business logic to preprocess input_table_1
        self.transactions.preprocessed_table_1 = ...

    def preprocess_table_2(self):
        # Business logic to preprocess input_table_2
        self.transactions.preprocessed_table_2 = ...

    def compute_final_table(self):
        # Use preprocessed tables to compute the final transactions table 
        # (like self.preprocess_table_1.join(self.preprocess_table_2) etc.)
        self.transactions.preprocessed_transactions = ... 


class Director:
    def __init__(self):
        self.builder = None

    def construct_transactions(self, builder):
        self.builder = builder
        steps = [
            self.builder.preprocess_table_1,
            self.builder.preprocess_table_2,
            self.builder.compute_final_table,
        ]

        for step in steps:
            step()

    @property
    def transactions(self):
        # Return the fully built Transactions object
        return self.builder.transactions


# client code:

input_table_1 = spark.table(...)
input_table_2 = spark.table(...)
builder = TransactionsBuilderType1(input_table_1, input_table_2)
director = Director()
director.construct_transactions(builder)

preprocessed_transactions = director.transactions.preprocessed_transactions
Enter fullscreen mode Exit fullscreen mode

Let me know your thoughts!
Thanks.

Cristian Bergamo

Top comments (0)