DEV Community

Jagadish Arunagiri
Jagadish Arunagiri

Posted on

How we evolved our data engineering workflow day by day

Data engineers are responsible for maintaining, cleaning, and manipulating data from the warehouse or OTAP database. We are mostly working closely with software engineers and the data analytics team to produce the desired output that enhances their customer business based on data.

In this article, we are going to explain what we learned from our data engineering orchestration and drawbacks and how we improved day by day.

We are a small data engineering team, Jagadish(me), and Ravi Consul handling our client analytic requirement and building scale-able pipelines to produce derived tables.

Initially, we had our data engineering architecture with the ETL process for the data movement from the source to the target warehouse. It was quite beneficial at the beginning of our workflow, where complete transformation was done before pushing data to the warehouse, thus, no more unwanted columns would be present in the final tables. ETL stands for Extract, Transform, and Load that is better when it comes to large amounts of data and produces the desired table in the warehouse in a cleaned manner. We had used airflow for pipeline schedulers.

Alt Text

Here are some of the advantages of using ETL

  • Good for bulk data transfer that makes maintenance much easier.
  • Due to Transformation completed on early-stage, always cleaned data would end-up in the warehouse.

It had been a month when several new features were introduced in our source database that costed trouble to the data engineering pipeline that made us build the entire pipeline flow again from scratch. As we had so many dependency tables, it made it hard to maintain ETL flow, therefore, we moved into ELT flow.

Here are some disadvantages of using ETL

  • It will take a month to implement changes in the pipeline due to minor changes in source tables.
  • Difficult to keep up with the changing requirements.
  • Hard to track dependency table changes.

Currently, we moved our platform into ELT.

Extract, Load, and Transform (ELT) is a data integration process for transferring raw data from the source database into a data warehouse and then transforming the information for downstream uses.

Alt Text

The first step is to Extract the data. Extracting data is the process of identifying and reading data from one or more source systems, which may be databases or various other data sources.

The second step is to Load the extract data. Loading is the process of adding the extracted data to the target database like a warehouse.

The third step is to Transform the data. The process of transforming data from our warehouse to the format required for analysis.

Because transformation is not dependent on extraction, ELT is more flexible than ETL for adding more extracted data in the future. Now, most of our derived table tasks are done through the SQL.

We used to schedule and monitor workflows tool airflow as our ELT processor and have to extract data from SQL and No-SQL databases to load them into the warehouse. Our airflow deployment was done through docker, for more details checkout puckel/airflow. Currently, we are adopting our image to the official docker images.

For deployment, wise everything went well but to adopt different environments like production, stage, and development, we modified both docker-compose and airflow configurations methods.

Moved our deployment environment to the .env file and loaded them through env_file. It made our env separate from docker-compose

LOAD_EX=None
FERNET_KEY=None
EXECUTOR=None
AIRFLOW__WEBSERVER__AUTHENTICATE=None
AIRFLOW__WEBSERVER__AUTH_BACKEND=None
AIRFLOW__CORE__SQL_ALCHEMY_POOL_ENABLED=None
AIRFLOW__CELERY__FLOWER_BASIC_AUTH=None
Enter fullscreen mode Exit fullscreen mode

In airflow, we used variable to handle different environment by utilizing kwargs in tasks

    kwargs = {
        "project_id": "{{ var.json.config.project_id }}",
        "table_dataset": "{{ var.json.config.table_dataset }}",
        "table_test_dataset": "{{ var.json.config.table_test_dataset }}",
    }
Enter fullscreen mode Exit fullscreen mode

Let's look into the airflow folder structure and code improvement.

Alt Text

Project-wise dags are located inside the dags, the rest can be easily understood.

We build pipeline dags and tasks for SQL databases. It has some disadvantages of duplication of code with similar source tables structure. Also, it has code without columns validation and columns deletion.

def table_task(**kwargs):
    # connect to source database
    start_date, end_date  
    postgres_conn_obj = PostgresHook(postgres_conn_id=connection_id)
    sql = "SELECT * FROM table WHERE updatedAt>{} AND updatedAt<{}"
    df = postgres_conn_obj.get_pandas_df(
        sql.format(self.start_date, self.end_date)
    )
    df.to_bq("table_name", cred)

#dags
    table_task = PythonOperator(
        task_id="table_task",
        python_callable=table_task,
        provide_context=True,
        op_kwargs=kwargs,
    )
Enter fullscreen mode Exit fullscreen mode

To overcome duplication, deletion on the table, as well as the standard way of handling database connection and schema, we implemented a class-based approach to solve the issue and reusable for on-going tables with less effort.

    class Table(PythonTaskBase):
    """
    Task for table which is present in source
    """

    def __init__(
        self, postgres_connection_id, bigquery_connection_id, table_name
    ):
        super().__init__(
            postgres_connection_id=postgres_connection_id,
            bigquery_connection_id=bigquery_connection_id,
            table_name=table_name,
        )
        self.datetime_column = ["created_at", "updated_at"]
        self.table_columns = [
            "id",
            "user_id",
            "updated_at",
        ]
        self.schemas = source_schema

    def query(self):
        "SQL query for table."
        sql = """
        SELECT *
        FROM table
        WHERE updated_at >= '{0}'::TIMESTAMP
          AND '{1}'::TIMESTAMP > updated_at
        """
        return sql

    def execute(self, **kwargs):
        """
        The function which is callable from dags.
        :param kwargs: Passing all task context
        :type kwargs: kwargs
        :return: None
        :rtype: None
        """
        self.kwargs = kwargs
        self.source_to_warehouse()
Enter fullscreen mode Exit fullscreen mode

By the above approach, we made 70% of our code clean and robust. So far, we have covered the EL part in ELT. Let's move into Transformations and how we implemented the flow.

All of our Transformation tasks were running directly as SQL queries for creating and manipulating data. Big thanks to SpecialOperator that does all the magic for us to create a derived table.

Once the derived table landed, we used popular visualizing tools to visualize data and share reports according to requirements.

Top comments (0)