DEV Community

Fred Munjogu
Fred Munjogu

Posted on

End-to-End Data Workflow: Kestra, Redshift, and dbt Integration

Imagine that at the end of every month, you are required to download data from a particular source, load it into storage, and transfer it to a data warehouse. After the first time, you will quickly realize how repetitive these tasks can be. Since you will be moving similar data each month, it would be much easier if there were a way to automate this. Lucky for us, there are several tools at our disposal. In this article, we will focus on the following tools:

  • Kestra
  • dbt
  • S3 Bucket
  • Amazon Redshift

We will be using NYC taxi trip data in this project (yellow and green taxis).

If you want to follow along, you can create an AWS account, and for first timers, you get $200 credit, which will be more than enough. Also, Redshift offers $300 credit for first-time users, which really comes in handy since queries can be quite expensive if not careful or dealing with large amounts of data.

Kestra

Kestra is an orchestration tool that we will leverage to automate most of the repetitive tasks. It is an easy-to-use tool since the flows we will write for it are stored in a yaml file.

We will have two flows. One will ingest data, i.e., download the data we will use, upload it to our S3 bucket, and then load the schemas and tables to our data warehouse (Redshift).

Data Build Tool (DBT)

After our data is in our data warehouse, we will use dbt to clean, aggregate, and create analytics-ready tables. This is made possible by the use of DBT models.

AWS Resources

As I have mentioned before, we will be utilizing some AWS resources, and to set them up, we will be using Terraform.

Terraform allows us to write a configuration that then creates the resources with three simple commands (terraform init, terraform plan, and terraform apply). This article will not focus too much on this, but I will mention the steps needed to get our resources up and running.

Configuring Kestra

This will be our first step since it contains the flows needed to run our pipeline. We will first run Docker to get our Kestra instance. Once that is done, we will navigate to the Kestra UI, where we will begin by adding environment variables in the KV store, which is found in the namespace tab.

Inside the KV store, we will add the following environment variables:

  • S3_BUCKET_NAME: the name of our S3 bucket.
  • AWS_REGION: our AWS region
  • JDBC_URL: string connection to our database in Redshift
  • DB_USER: database username
  • DB_PASS: database password
  • KESTRA_ROLE: IAM role that allows Kestra to copy data from S3 to Redshift
  • REDSHIFT_USER: our Redshift user configured in Terraform
  • REDSHIFT_PASSWORD: our Redshift password
  • REDSHIFT_HOST: our workgroup host

With this, we are all set up to run our first flow. All the values stored in the KV store will be included in our flow when we reference it using the "{{ kv('var_name') }}" block.

Here is a snippet of our KV store:

Kestra's KV store

Running our first flow

The entire project can be found here. We will run the flow named redshift_taxi_scheduled.yaml, located in the kestra/flows directory. This flow:

  • Downloads Yellow/Green Taxi Data (depending on the taxi you choose)
  • Uploads to S3
  • Loads to Redshift

To run this flow, we will navigate to the triggers tab, where we will execute a backfill. The triggers tab looks like this:

Triggers tab

We will click on the backfill execution for the green schedule, which will download taxi data for the green cabs. We will be asked to choose the dates from when the data will be downloaded, and for this article, we will use data from January 1, 2021, to June 30, 2021.

Backfill Execution

We will also add an execution label (backfill: true) so that we know this is a backfill when it runs. After doing this, we can execute our flow, and it will begin downloading and uploading the green taxi data.

Kestra has this rather neat feature that, during executions, you can see a Gantt chart showing the tasks that are ongoing and their statuses (passed or failed).

Green taxi execution

Since we are doing it for six months, our flow will run six times. If we look at our executions tab at the end of the six runs, we will see that all six runs were successful, and next to them are the labels of the files downloaded.

Executions

We can see that our flows executed successfully. Now, we can check to see if our S3 bucket contains any data.

S3 bucket

Inside our S3 bucket, we can see that our files have been uploaded and named similarly to the labels we saw in Kestra.

We will do the same for the yellow taxi data. By the end of these executions, we will have twelve objects in our S3 bucket (6 for green taxis and the other 6 for the yellow taxis).

We can now confirm if our tables have been loaded into Redshift. Before we do this, let me explain a section of the flow. Inside the flow we have been running, there is a segment responsible for loading data into Redshift.

- id: yellow_table_create
        type: io.kestra.plugin.jdbc.redshift.Query
        url: "{{kv('JDBC_URL')}}"
        username: "{{kv('DB_USER')}}"
        password: "{{kv('DB_PASS')}}"
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              tpep_pickup_datetime   timestamp,
              tpep_dropoff_datetime  timestamp,
              passenger_count        integer,
              trip_distance          double precision,
              RatecodeID             text,
              store_and_fwd_flag     text,
              PULocationID           text,
              DOLocationID           text,
              payment_type           integer,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              congestion_surcharge   double precision
          );
Enter fullscreen mode Exit fullscreen mode

This creates our main table, and based on the columns, it creates the yellow taxi main table in Redshift. The next step is creating the staging table, which will be used to deduplicate data before loading it to the main table.

- id: yellow_create_staging_table
        type: io.kestra.plugin.jdbc.redshift.Query
        url: "{{kv('JDBC_URL')}}"
        username: "{{kv('DB_USER')}}"
        password: "{{kv('DB_PASS')}}"
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (LIKE {{render(vars.table)}});
Enter fullscreen mode Exit fullscreen mode

The staging table will be similar to the main table. The next step will be to truncate this table. In the first run, it does not make any sense since the table is empty, and truncating it does not change anything. However, in the succeeding runs, it will make sense. We will come back to this shortly.

After our staging table is created and then truncated, we move to the next step, where we copy the tables' contents from the S3 files we uploaded a while ago. This is done by this task:

 - id: yellow_copy_into_staging_table
        type: io.kestra.plugin.jdbc.redshift.Query
        url: "{{kv('JDBC_URL')}}"
        username: "{{kv('DB_USER')}}"
        password: "{{kv('DB_PASS')}}"
        sql: |
          COPY {{render(vars.staging_table)}} (
            VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count,
            trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID,
            payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
            improvement_surcharge, total_amount, congestion_surcharge
          )
          FROM 's3://{{kv("S3_BUCKET_NAME")}}/{{render(vars.file)}}'
          IAM_ROLE '{{kv("KESTRA_ROLE")}}'
          FORMAT AS CSV
          IGNOREHEADER 1;
Enter fullscreen mode Exit fullscreen mode

We specify the columns here since the tables we created have two additional columns not present in the CSV files we are copying data from. The purpose of those two additional columns is identification and uniqueness. The unique_row_id column will have a unique value, and it is what we will use to ensure there is no duplicate data in our main table.

We will then move to adding the unique columns and the filenames. This is done using the following code:

 - id: yellow_add_unique_id_and_filename
        type: io.kestra.plugin.jdbc.redshift.Query
        url: "{{kv('JDBC_URL')}}"
        username: "{{kv('DB_USER')}}"
        password: "{{kv('DB_PASS')}}"
        sql: |
          UPDATE {{render(vars.staging_table)}}
          SET 
            unique_row_id = md5(
              COALESCE(VendorID, '') ||
              COALESCE(CAST(tpep_pickup_datetime AS varchar), '') ||
              COALESCE(CAST(tpep_dropoff_datetime AS varchar), '') ||
              COALESCE(PULocationID, '') ||
              COALESCE(DOLocationID, '') ||
              COALESCE(CAST(fare_amount AS varchar), '') ||
              COALESCE(CAST(trip_distance AS varchar), '')
            ),
            filename = '{{render(vars.file)}}';
Enter fullscreen mode Exit fullscreen mode

We then merge the files in this table with our main table.

- id: yellow_merge_data
        type: io.kestra.plugin.jdbc.redshift.Query
        url: "{{kv('JDBC_URL')}}"
        username: "{{kv('DB_USER')}}"
        password: "{{kv('DB_PASS')}}"
        sql: |
          MERGE INTO {{render(vars.table)}}
          USING {{render(vars.staging_table)}} AS S
          ON {{render(vars.table)}}.unique_row_id = S.unique_row_id
          WHEN MATCHED THEN
            UPDATE SET
              unique_row_id = S.unique_row_id
          WHEN NOT MATCHED THEN
            INSERT (
              unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
              passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
              DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
              improvement_surcharge, total_amount, congestion_surcharge
            )
            VALUES (
              S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
              S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
              S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
              S.improvement_surcharge, S.total_amount, S.congestion_surcharge
            );

Enter fullscreen mode Exit fullscreen mode

If you look closely, you can see the WHEN NOT MATCHED THEN statement. This checks the unique_row_id column in the staging table and the main table. If none of the ids match, it means there are no duplicates, so the data is added to our main table.

I mentioned truncating our staging table and how it made sense in the subsequent runs. After this first run, both our main table and staging table have the same data. When the next flow is executed, we begin by creating the main table, but this step is aborted since the table already exists. The next step is to create the staging table, and this process is also skipped since the table exists. After this comes the truncating of the staging table. Remember our staging table contains the same data as our main table, and in this case, for January. So our staging table is truncated here to remain with no records and then loaded with records for the second month. So at this point, the main table has data for January, while our staging table has data for February.

Through our unique_row_id column, we can confirm that there is no data in February that is similar to that of January, hence no duplicates. When this condition is satisfied, then it is merged with the main table.

Inside Redshift, we can confirm if our tables have been created. We are supposed to have two green taxi tables (main and staging) and two yellow tables (main and staging).

Redshift tables

Running our Second flow

Now that we have our tables in Redshift, we can move to the transformations part using dbt. For this, we will use the flow dbt_redshift.yaml found in the kestra/flows directory.

What this flow does is clone our repository and then sync the dbt project, which is located in the taxi_dbt directory, into our namespace. This allows Kestra to find our project and, more specifically, the dbt_project.yml file.

There are a few things we need to specify before our flow runs successfully. We need to add a profiles section so that Kestra and dbt know the location of our data and where the tables our models create will be stored. If you have used dbt Core, you can get these credentials in the ~/.dbt/profiles.yml file.
Run

cat ~/.dbt/profiles.yml
Enter fullscreen mode Exit fullscreen mode

You will see the credentials needed by dbt. These configurations should be input here:

 profiles: |
      taxi_dbt:
        outputs:
          dev:
            type: redshift
            host: {{ kv('REDSHIFT_HOST') }}
            user: "{{ kv('REDSHIFT_USER') }}"
            password: "{{ kv('REDSHIFT_PASSWORD') }}"
            port: 5439
            dbname: dev
            schema: public
            autocommit: true
            threads: 3
            connect_timeout: 300
        target: dev
Enter fullscreen mode Exit fullscreen mode

If you haven't installed dbt on your machine, these configurations can be found in Redshift under the workgroup tab.

With configurations out of the way, we can run our dbt flow. Inside Kestra, execute the flow.

DBT flow

We can see that the flow executed successfully and also passed all the tests I had written. These ensure that the models we create have consistent data, which prevents our pipeline from breaking down.

Tests confirmation

You can see the number of tests and whether they succeeded or not. This marks the end of this pipeline, and because of our triggers, it should run monthly. This automates the entire workflow, and what remains is monitoring the logs to ensure no abnormalities occur and trying to identify bottlenecks that could be solved by minor tweaks in our workflow.

Remember to destroy the resources once done with the project to avoid incurring additional costs. For more explanation on usage and setup, refer to the README at the root of this repo.

With our models, we can now pass the data in the tables created to any analytics or BI tool.

Conclusion

We have executed two flows that extract data, upload it to S3, copy the content from S3 to Redshift, and perform transformations using dbt.

Hope this article was helpful and informative. If you want to read more on Terraform or Infrastructure as Code (IaC), here is a link to an article of a project I did that goes into more detail about the configurations and Terraform itself.

Here is a link to this project's repo.

Top comments (0)