In my job, I often end up writing long ETL pipelines in Python and PySpark. The usual story: you read a bunch of tables, join them together, run several preprocessing steps where the output of one step becomes the input of another, and meanwhile, requirements keep changing --> new rules, new table names, renamed columns, and so on.
Recently, I had to refactor a client function that was doing exactly this. It received several Spark DataFrames as input and was supposed to return one “clean” transactions DataFrame, ready to be written to the database. Inside, it was basically just calling a chain of helper functions in a fixed order. Each helper returned a DataFrame that either contributed to the final result or was used as input for some other step further down the pipeline.
The problem was that everything was tightly coupled. Each helper depended (sometimes implicitly) on what the previous helpers were doing. Changing a single line in one of them meant double-checking a lot of other functions to make sure nothing broke. It was fragile, and refactoring was painful.
In situations like this, the Builder design pattern turned out to be a lifesaver for me. It’s very handy when the object you want to build is the result of several processing steps, not just a few fields assigned inside an __init__. If you want a proper deep dive into the pattern itself, I recommend this page: https://refactoring.guru/design-patterns/builder. I first studied it in the book Python Design Patterns by Ayeva and Kasampalis, which I’d also recommend.
In the example below, we’ll define:
- a Transactions class – with very little responsibility: it just defines the attributes that make up our final object (raw inputs, intermediate tables, final output);
- a TransactionsBuilderType1 class – which receives the input DataFrames, creates a Transactions instance, and exposes the methods that progressively build each attribute;
- a Director class – which knows the builder interface, has one main method that runs the builder steps in the right order, and exposes a method/property to return the final Transactions object once everything is done.
class Transactions:
def __init__(self, input_table_1, input_table_2):
self.raw_input_table_1 = input_table_1
self.raw_input_table_2 = input_table_2
self.preprocessed_table_1 = None
self.preprocessed_table_2 = None
# Final ETL output table (e.g. cleaned, enriched transactions)
self.preprocessed_transactions = None
class TransactionsBuilderType1:
def __init__(self, input_table_1, input_table_2):
self.transactions = Transactions(
input_table_1=input_table_1,
input_table_2=input_table_2,
)
... # other builder-related attributes (configs, parameters, etc.)
def preprocess_table_1(self):
# Business logic to preprocess input_table_1
self.transactions.preprocessed_table_1 = ...
def preprocess_table_2(self):
# Business logic to preprocess input_table_2
self.transactions.preprocessed_table_2 = ...
def compute_final_table(self):
# Use preprocessed tables to compute the final transactions table
# (like self.preprocess_table_1.join(self.preprocess_table_2) etc.)
self.transactions.preprocessed_transactions = ...
class Director:
def __init__(self):
self.builder = None
def construct_transactions(self, builder):
self.builder = builder
steps = [
self.builder.preprocess_table_1,
self.builder.preprocess_table_2,
self.builder.compute_final_table,
]
for step in steps:
step()
@property
def transactions(self):
# Return the fully built Transactions object
return self.builder.transactions
# client code:
input_table_1 = spark.table(...)
input_table_2 = spark.table(...)
builder = TransactionsBuilderType1(input_table_1, input_table_2)
director = Director()
director.construct_transactions(builder)
preprocessed_transactions = director.transactions.preprocessed_transactions
Let me know your thoughts!
Thanks.
Cristian Bergamo
Top comments (0)