DEV Community

Cover image for Pipelines in ML: A guide to developing good workflows
Fortune Adekogbe
Fortune Adekogbe

Posted on

Pipelines in ML: A guide to developing good workflows

When I got started with machine learning, I did not consider pipelines to be very important. They just seemed like one of those things some people used but isn't really essential. I was creating reasonably accurate models in my notebooks and it felt alright.

Eventually, I began to focus more on the later part of a models' life — deployment and using models in Production — and thus the importance of a structured workflow became glaring.

In this article, I will walk us through a machine learning problem with our preprocessing handled in a Pipeline. You may be familiar with using kubeflow and docker on your choice cloud platform to do this but this article will make use of the good ol' Sklearn.


  • We will focus on just the implementation of various preprocessing and feature extraction steps with transforms. Little to no attention will be given to detailed data exploration, visualization and model accuracy. Check out this link to see a version of the notebook that focused on that.
  • If any of the packages used is not installed in your environment make sure to install with: pip install [package_name].

The data that is used in this article was gotten from a Data Science Nigeria 2019 Challenge and the aim is to build a predictive model to determine if a building will have an insurance claim during a certain period based on building characteristics. The target variable, Claim, is a:

1 if the building has at least a claim over the insured period. 0 if the building doesn’t have a claim over the insured period.

Making Relevant Imports

import pandas as pd
import numpy as np

train_df = pd.read_csv('train_data.csv')

Claim = train_df.pop('Claim')
train_df.pop('Customer Id');
Enter fullscreen mode Exit fullscreen mode

First, we import pandas and numpy to aid our data wrangling and then we read in the training data using the read_csv method in pandas. We then use the pop function of DataFrames to remove the customer ID (since the IDs are unique and contain no useful information). The target column is also removed and stored in a variable named Claim for later use.

from sklearn.model_selection import train_test_split
X_train, X_valid, y_train,y_valid = train_test_split(train_df,Claim,test_size=0.25,
Enter fullscreen mode Exit fullscreen mode

The training data is split into training and validation sets using the train_test_split function in sklearn. A test size of 25% is set alongside a random_state for reproducibility. The target is parsed into the stratify argument to ensure that the proportion of 0s and 1s in the original data gets transferred into the splits.


scikit-learn provides a library of transformers, which may clean, reduce, expand or generate feature representations. However, in this tutorial, we will be building our custom transformers.

Like other estimators, these are represented by classes with a fit method, which learns model parameters (e.g. mean and standard deviation for normalization) from a training set, and a transform method which applies this transformation model to unseen data. The fit_transform method may be more convenient and efficient for modelling and transforming the training data simultaneously and so we will be using it.

The Empty Transformer

I found that it is useful to have a base class to build up from whenever a transformer needs creating.

from sklearn.base import BaseEstimator, TransformerMixin

class Transformer(BaseEstimator, TransformerMixin):
    def __init__(self):

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X
Enter fullscreen mode Exit fullscreen mode

The class above inherits from two other classes namely BaseEstimator and TransformerMixin. BaseEstimator is the base class for all estimators in scikit-learn. The TransformerMixin is a class that defines and implements the fit_transform method (this fits to data, then transforms it).

Its __init__ method is an empty function, its fit method returns the instance while the transform function returns its input.

To create a custom transformer, at least one of these functions must be modified. We will explore a couple of transformers for the stated problem.

class NanFillerTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, column, normalize_ = False):
        self.column = column
        self.normalize_ = normalize_

    def fit(self, X, y=None):
        return self

    def replace_nan(self, X,feature_name,new_values,weights):
        assert len(new_values)==len(weights),'New values do not correspond with weights'
        from random import choices
        mask= X[feature_name].isna()
        length = sum(mask)
        replacement = choices(new_values,weights =weights,k=length)
        return X[feature_name]

    def transform(self, X):
        x = X[self.column].value_counts(normalize=True)
        X[self.column] = self.replace_nan(X,self.column,x.keys(),x.values)
        if self.normalize_: X[self.column] = X[self.column]/X[self.column].max()
        return X
Enter fullscreen mode Exit fullscreen mode

This transformer carries out a series of operations that results in missing values in a column being replaced by some predefined values with weights assigned to each value. The "brain" of the transformer is the replace_nan function and it is used in the transform method such that, the predefined values are the unique values in the column and the weights are the normalized value counts.

The transformer takes in 2 arguments which are:

  • column: the label of the column to be filled as a string
  • normalize_: a boolean that determines whether the values in the column are normalized after the missing values are replaced.

Given that it is not an estimator, the fit method stays unchanged.

class EncoderTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,show_map = False):
        self.show_map = show_map

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X['NoWindows'] = X['NumberOfWindows'].map(lambda x: 1 if x == '   .' else 0)
        X['3-7Windows'] = X['NumberOfWindows'].map(lambda x: 1 if x in '34567' else 0)
        X['Other_Windows'] = X['NumberOfWindows'].map(lambda x: 1 if x in '1 2 8 9 >=10'.split(' ') else 0)
        X['Sectioned_Insured_Period'] = X['Insured_Period'].map(lambda x: 1 if x==1 else 0)
        X['Grouped_Date_of_Occupancy'] = X['Date_of_Occupancy'].map(lambda x: 1 if x>1900 else 0)
        return X

class YearEncoderTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, show_map = False):
        self.show_map = show_map

    def fit(self, X, y=None):
        return self

    def map_counts(self, X,feat):
        mapp = X[feat].value_counts(normalize=True)
        X[feat].map(lambda x:mapp[x])
        return mapp

    def transform(self, X):
        X['2012-13YearOfObs'] = X['YearOfObservation'].map(lambda x: 1 if x in [2012,2013] else 0)
        X['2014YearOfObs'] = X['YearOfObservation'].map(lambda x: 1 if x in [2014] else 0)
        X['2015-16YearOfObs'] = X['YearOfObservation'].map(lambda x: 1 if x in [2015,2016] else 0)
        X['YearOfObservation']= X['YearOfObservation'].map(lambda x: 2021 - x)
        if self.show_map:
        return X
Enter fullscreen mode Exit fullscreen mode

The EncoderTransformer and YearEncoderTransformer carry out operations that are specific to this problem. The decision to carry out the operations in their transform method was made by observing the data distribution in modified columns.

The show_map attribute of the YearEncoderTransformer is used to determine whether the map_count function is run. This function modifies the data in a column with label feat by replacing each with its normalized value count.

class FeatureCombiningTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, columns, drop_any=[]):
        self.columns = columns
        self.drop_any = drop_any

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        suffix = ''.join([i[0] for i in self.columns])
        X[f'Combined_{suffix}'] = X[self.columns].sum(axis=0)
        for j in self.drop_any:
          print(f">>> Removed {self.columns[j]} from dataframe")
        return X
Enter fullscreen mode Exit fullscreen mode

The FeatureCombiningTransformer transformer sums up the row values in a subset of the data frame. The class is instantiated with the columns whose values are to be combined. The drop_any attribute contains a list of indexes of original column names that are to be dropped. For uniqueness, the suffix variable was added.

class DummyFeaturesTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, column = None):
        self.column = column

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        if self.column:   
            X = pd.get_dummies(X,columns=[self.column])
            X = pd.get_dummies(X)
        return X

class NormalizedFrequencyTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, column, replace_original= True):
        self.column = column
        self.replace_original = replace_original

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        mapper = X[self.column].value_counts(normalize=True)
        if self.replace_original:
          X[self.column] = X[self.column].map(lambda x:mapper[x])    
          X[f"Coded_{self.column}"] = X[self.column].map(lambda x:mapper[x])    
        return X
Enter fullscreen mode Exit fullscreen mode

The DummyFeaturesTransformer creates a pandas DataFrame that contains dummies made from the categorical data present in the specified column or the entire DataFrame.

The NormalizedFrequencyTransformer modifies a feature's data by replacing each with its normalized value count.

The Pipeline

Things get a little more interesting here. We have created a number of transformers but how do they get used together? How is the pipeline formed? Well, it starts with an import statement.

from sklearn.pipeline import Pipeline

data_pipeline = Pipeline([
        ('yet', YearEncoderTransformer()),
        ('fct',FeatureCombiningTransformer(['Garden', 'Building_Fenced', 'Settlement'], [0])),
        ('nanft1', NanFillerTransformer('Building Dimension', normalize_ = True)),
        ('nanft2', NanFillerTransformer('Date_of_Occupancy')),
        ('normft1', NormalizedFrequencyTransformer("Date_of_Occupancy")),
        ('nanft3', NanFillerTransformer('Geo_Code')),
        ('normft2', NormalizedFrequencyTransformer("Geo_Code")),
        ('normft3', NormalizedFrequencyTransformer("Insured_Period", replace_original = False)),
        ('normft4', NormalizedFrequencyTransformer("YearOfObservation", replace_original = False)),
        ('dft', DummyFeaturesTransformer('')),
Enter fullscreen mode Exit fullscreen mode

Here we create pipelines object with the Pipeline function from sklearn. The same pipeline is used for the train and validation data. The order of the transformer combinations was predetermined but by paying attention to the transforms and their parameters, we will definitely end up on the same page.

Note that the different transforms are included as tuples whose first element is a string naming them. This step can be avoided with the sklearn.pipeline.make_pipeline function but we will not be implementing it.

from sklearn.preprocessing import LabelEncoder

class ColumnSelectTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, column):
        self.column = column

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)
        return X[self.column]

class Encoder(BaseEstimator, TransformerMixin):
    def __init__(self):

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        le = LabelEncoder()
        return le.fit_transform(X.values).reshape(-1,1)
Enter fullscreen mode Exit fullscreen mode

Before we wrap things up, we create 2 last transforms. The first extracts a column from the DataFrame while the second encodes its content with the LabelEncoder function from sklearn.

Feature Unions

The transformation above could have been done like the others and added to the main transforms but a new concept must be introduced. It so happens that we can have transforms combined in series and in parallel. Pipeline implements the series combination while FeatureUnion implements the parallel combination of transforms.

from sklearn.pipeline import FeatureUnion

Settlement_onehot = Pipeline([
    ('cst', ColumnSelectTransformer(['Settlement'])),

Building_Fenced_onehot = Pipeline([
    ('cst', ColumnSelectTransformer(['Building_Fenced'])),


Building_Painted_onehot = Pipeline([
    ('cst', ColumnSelectTransformer(['Building_Painted'])),


categorical_features = FeatureUnion([
    ("Settlement_onehot", Settlement_onehot),
    ("Building_Fenced_onehot", Building_Fenced_onehot),
    ("Building_Painted_onehot", Building_Painted_onehot),
Enter fullscreen mode Exit fullscreen mode

In the above code snippet, we have implementations of both the series and parallel combinations and it culminates in a pipeline named categorical_features.

data_transformer = FeatureUnion([
    ('features', data_pipeline),
    ('categorical', categorical_features),

train = data_transformer.fit_transform(X_train)
validate = data_transformer.fit_transform(X_valid)
Enter fullscreen mode Exit fullscreen mode

Here we use the FeatureUnion function we just learned of to combine both categorical_features pipeline with the data_pipeline we created earlier.

We then go ahead and transform our training and validation data by using the fit_transform method.


In this tutorial, the Categorical boosting classifier is used with a learning rate of 0.01 and 400 estimators. log_loss is used as the metric for comparing models.

from catboost import CatBoostClassifier
from sklearn.metrics import log_loss

cbc1 = CatBoostClassifier(verbose=0,learning_rate = 0.01, n_estimators=400), y_train)
print(f'Train score: {log_loss(y_train, cbc1.predict_proba(train))}')
print(f'Validation score: {log_loss(y_valid, cbc1.predict_proba(validate))}')
Enter fullscreen mode Exit fullscreen mode

Saving our Model

The model is then saved using the joblib library in Python as is shown below.

import joblib
filename = 'Insurance_model.sav'
Enter fullscreen mode Exit fullscreen mode

In practice, I would advise that the specific preprocessing steps are finalized before creating custom transforms and a pipeline.

Thanks for reading through, I hope you enjoyed and learnt from this. Check out this repository for the data and script(s) related to this article.


SKlearn Data Transforms

Top comments (1)

aayomide profile image

Well articulated .. Definitely learned new stuff