<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Cícero Joasyo Mateus de Moura</title>
    <description>The latest articles on DEV Community by Cícero Joasyo Mateus de Moura (@cicerojmm).</description>
    <link>https://dev.to/cicerojmm</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1068068%2F632c787b-6532-453b-ab4c-3caa66b6cf61.jpeg</url>
      <title>DEV Community: Cícero Joasyo Mateus de Moura</title>
      <link>https://dev.to/cicerojmm</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/cicerojmm"/>
    <language>en</language>
    <item>
      <title>Simplifying Data Transformation in Redshift: An Approach with DBT and Airflow</title>
      <dc:creator>Cícero Joasyo Mateus de Moura</dc:creator>
      <pubDate>Tue, 07 Nov 2023 17:23:42 +0000</pubDate>
      <link>https://dev.to/aws-builders/simplifying-data-transformation-in-redshift-an-approach-with-dbt-and-airflow-18ko</link>
      <guid>https://dev.to/aws-builders/simplifying-data-transformation-in-redshift-an-approach-with-dbt-and-airflow-18ko</guid>
      <description>&lt;p&gt;&lt;strong&gt;Let's transform and model data stored in Redshift with a simple and effective approach using DBT and Airflow. Why make it complicated when we can simplify it?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In this article, we'll work with the following scenario:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Amazon Redshift stores the input data from an e-commerce website.&lt;br&gt;
We'll provide insights into customer behavior.&lt;br&gt;
The DW is not yet structured for analytical applications; it contains a copy of the raw data&lt;br&gt;
We'll transform the data and structure the DW to enable business analysis. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;The data used for this article is open and relates to sales on the Amazon website, which can be found on Kaggle &lt;a href="https://www.kaggle.com/datasets/karkavelrajaj/amazon-sales-dataset" rel="noopener noreferrer"&gt;at this link&lt;/a&gt;.&lt;br&gt;
The complete code for this project can also be found on &lt;a href="https://github.com/cicerojmm/transformacaoDadosDbtAirflow/tree/main" rel="noopener noreferrer"&gt;my GitHub&lt;/a&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  About the solution
&lt;/h2&gt;

&lt;p&gt;To solve the presented problem, we have the following architecture design with the technologies and tools that will be used:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuz838a09u12clc1mi4bo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuz838a09u12clc1mi4bo.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For those unfamiliar with the tools to be used, I'll provide a brief explanation of them below.&lt;br&gt;
 If you're already familiar, feel free to skip to the next section: Data Modeling.&lt;/p&gt;
&lt;h3&gt;
  
  
  About dbt
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://www.getdbt.com/" rel="noopener noreferrer"&gt;Data Build Tool or dbt&lt;/a&gt; is a modern tool for data transformation in the data warehouse or lakehouse scenario. It allows data engineers, data scientists, and data analysts to manipulate data using SQL.&lt;/p&gt;

&lt;p&gt;DBT has two versions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;dbt-core:&lt;/strong&gt; It's an open-source version maintained by the community and can be freely used.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;dbt-cloud:&lt;/strong&gt; It's the paid version managed as a SaaS, which can be used in the cloud with a monthly subscription.&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  About Airflow and Cosmos
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://airflow.apache.org/" rel="noopener noreferrer"&gt;Airflow&lt;/a&gt; is the most widely used and well-known tool for orchestrating data workflows. It allows for efficient pipeline construction, scheduling, and monitoring.&lt;/p&gt;

&lt;p&gt;Since we're talking about Airflow, let's also discuss Cosmos.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://www.astronomer.io/cosmos/" rel="noopener noreferrer"&gt;Cosmos&lt;/a&gt; is a library for Airflow developed by Astronomer that aims to simplify the execution of DBT projects with Airflow.&lt;/p&gt;

&lt;p&gt;With Cosmos, you can execute a DBT project through a group of Airflow tasks, which are automatically recognized. Each DBT model becomes an Airflow task or group, performing transformations and tests.&lt;/p&gt;
&lt;h3&gt;
  
  
  Amazon Redshift
&lt;/h3&gt;

&lt;p&gt;The &lt;a href="https://aws.amazon.com/pt/redshift/" rel="noopener noreferrer"&gt;Redshift&lt;/a&gt; cloud-based data warehouse is intended for analyzing and querying massive amounts of data. &lt;br&gt;
Nowadays, Redshift has both a managed and a serverless version and has evolved into a robust data platform.&lt;/p&gt;
&lt;h2&gt;
  
  
  Data Modeling
&lt;/h2&gt;

&lt;p&gt;To better understand the solution we'll adopt in this article, the following diagram shows the data modeling for building the DW.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxw9g8gem8qtogbt32vvb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxw9g8gem8qtogbt32vvb.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;As mentioned earlier, there is a table where the data is stored without transformation (raw data), which is the &lt;strong&gt;sales&lt;/strong&gt; table. &lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;sales&lt;/strong&gt; table also serves as a staging area for data processing and the creation of other tables.&lt;/p&gt;

&lt;p&gt;In the data warehouse modeling, there are three dimension tables:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;dim_product&lt;/strong&gt;: This table contains data about the store's products.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;dim_user&lt;/strong&gt;: It contains data about users who are customers of the store.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;dim_rating&lt;/strong&gt;: It contains all the product ratings given by customers of the store.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;And two fact tables:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;fact_product_rating&lt;/strong&gt;: This table contains data for extracting product rating metrics by users, e.g., the top-rated products.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;fact_sales_category&lt;/strong&gt;: This table contains data for extracting sales metrics by product categories, e.g., the top categories with the most profit for the store.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Note: It's important to remember that the example in this article is hypothetical and may not represent the best data modeling for the given data; it's for educational purposes only.&lt;/p&gt;
&lt;h2&gt;
  
  
  Show me the Code
&lt;/h2&gt;

&lt;p&gt;Next, it's time to explore in practice the construction of the dbt project, the DAG in Airflow and the results of data transformations in Redshift.&lt;/p&gt;
&lt;h4&gt;
  
  
  1. Dbt Project
&lt;/h4&gt;

&lt;p&gt;In this section, the focus is on the structure, configuration, and SQL code for the data transformations that are part of the project.&lt;br&gt;
To start a DBT project, you need to install the Python package and use the CLI. &lt;/p&gt;

&lt;p&gt;To install the DBT package:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ssh"&gt;&lt;code&gt;&lt;span class="err"&gt;$&lt;/span&gt; &lt;span class="k"&gt;pip&lt;/span&gt; install dbt-core==1.4.9
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To initialize a project:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ssh"&gt;&lt;code&gt;&lt;span class="err"&gt;$&lt;/span&gt; &lt;span class="k"&gt;dbt&lt;/span&gt; init &amp;lt;project_name&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;By default, a DBT project already has some folders and configurations.&lt;br&gt;
 Here's the structure with all the folders and files created for our scenario:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- dbt_project/
  |______ dbt_project.yml
  |______ analyses/
  |______ macros/
  |______ models/
  |   |   |______ dimensions/
  |   |   |      |______ dim_product.sql
  |   |   |      |______ dim_rating.sql
  |   |   |      |______ dim_user.sql
  |   |   |______ facts/
  |   |   |      |______ fact_product_rating.sql
  |   |   |      |______ fact_sales_category.sql
  |   |   |______ staging/
  |   |   |      |______ stg_sales_eph.sql
  |   |   |      |______ staging.yml
  |______ seeds/
  |______ snapshots/
  |______ tests/
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Most of the work is done within the &lt;strong&gt;models&lt;/strong&gt; folder.&lt;br&gt;
The next step is to analyze each SQL transformation code.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Staging Table&lt;/strong&gt;&lt;br&gt;
As mentioned earlier, the staging table is the &lt;strong&gt;sales&lt;/strong&gt; table itself, which already exists in the data warehouse. Essentially, it involves loading the entire table into memory.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'ephemeral'&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt; 
    &lt;span class="o"&gt;*&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;source&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'public'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'sales'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I'd like to highlight three points:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;In DBT, there are two ways to materialize tables: &lt;strong&gt;ephemeral&lt;/strong&gt;, which is a virtual table loaded only in memory and not persisted in the database, and &lt;strong&gt;table&lt;/strong&gt;, which is a table that will be persisted in the data warehouse.&lt;/li&gt;
&lt;li&gt;The second point concerns processing, which is entirely done in the source database, in this case, Redshift. So when I say that the table is loaded into memory, it refers to Redshift's memory.&lt;/li&gt;
&lt;li&gt;Tables, whether &lt;strong&gt;ephemeral&lt;/strong&gt; or &lt;strong&gt;table&lt;/strong&gt; are created with the same name as the SQL file.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Dimension Tables&lt;/strong&gt;&lt;br&gt;
Now let's analyze the SQL code for the three dimension tables, which will be materialized as &lt;strong&gt;table&lt;/strong&gt;. &lt;/p&gt;

&lt;p&gt;The first one is &lt;strong&gt;dim_product&lt;/strong&gt;, containing data about the store's products based on the &lt;strong&gt;sales&lt;/strong&gt; table.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'table'&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="k"&gt;DISTINCT&lt;/span&gt;
    &lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;category&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;about_product&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;img_link&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;product_link&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'stg_sales_eph'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The second is &lt;strong&gt;dim_rating&lt;/strong&gt;, which includes product ratings by buyers.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'table'&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;rating&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;rating_count&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'stg_sales_eph'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The third and final one is &lt;strong&gt;dim_user&lt;/strong&gt;, containing data about store customers.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'table'&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;DISTINCT&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;user_name&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'stg_sales_eph'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the dimension tables code completed, it's time to look at the fact tables.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Fact Tables&lt;/strong&gt;&lt;br&gt;
The two fact tables are more complex as they include business metrics, aggregations, and relationships with dimensions.&lt;/p&gt;

&lt;p&gt;Here's the code for &lt;strong&gt;fact_product_rating&lt;/strong&gt;, which relates to the product and rating dimensions, calculating average rating scores grouped by products.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'table'&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;CASE&lt;/span&gt;
        &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;rating&lt;/span&gt; &lt;span class="o"&gt;~&lt;/span&gt; &lt;span class="s1"&gt;'^[0-9.]+$'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="k"&gt;CAST&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;rating&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="nb"&gt;numeric&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;ELSE&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;
    &lt;span class="k"&gt;END&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;avg_rating&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt;
    &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'stg_sales_eph'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;
    &lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'dim_product'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;
    &lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'dim_rating'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt;
    &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_name&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The second fact table is &lt;strong&gt;fact_sales_category&lt;/strong&gt;, which groups products by category and calculates total revenue. It relates to the user and category dimensions.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'table'&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;u&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;category&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;SUM&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;CAST&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;REGEXP_REPLACE&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;actual_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'[^0-9.]'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;''&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="nb"&gt;DECIMAL&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt; &lt;span class="n"&gt;sales_amount&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt;
    &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'stg_sales_eph'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;
    &lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'dim_product'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;
    &lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'dim_user'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt; &lt;span class="n"&gt;u&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;u&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt;
    &lt;span class="n"&gt;u&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;category&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the data modeling ready, it's time to complete the project and make it all run with Airflow.&lt;/p&gt;

&lt;h4&gt;
  
  
  2. Construction of the DAG in Airflow
&lt;/h4&gt;

&lt;p&gt;As mentioned earlier, for this project, we'll use the Cosmos library, which offers a great integration for projects with DBT and Airflow.&lt;br&gt;
To do this, we need to follow a few steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Install the necessary dependencies on the Airflow environment to run the project.&lt;/li&gt;
&lt;li&gt;Configure an Airflow connection for Redshift. This is an interesting feature of Cosmos; we don't need to configure the database connection in the DBT project. Instead, we can pass it as a parameter through Airflow.&lt;/li&gt;
&lt;li&gt;Finally, build the tasks with the "DbtTaskGroup" from Cosmos.
Following the mentioned steps, here's the "requirements.txt" file with the necessary dependencies to be installed on the Airflow server:
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight ssh"&gt;&lt;code&gt;&lt;span class="k"&gt;dbt&lt;/span&gt;-core==1.4.9
&lt;span class="k"&gt;dbt&lt;/span&gt;-redshift==1.4.0
&lt;span class="k"&gt;astronomer&lt;/span&gt;-cosmos==1.0.5
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;The configuration of the connection in the Airflow UI for Redshift looks like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8xqgxi1089mwrdzwg1xy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8xqgxi1089mwrdzwg1xy.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now, here's the complete DAG code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.decorators&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators.dummy_operator&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DummyOperator&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;cosmos&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DbtTaskGroup&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ProjectConfig&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ProfileConfig&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;RenderConfig&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;cosmos.profiles&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;RedshiftUserPasswordProfileMapping&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;cosmos.constants&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;TestBehavior&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pendulum&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;


&lt;span class="n"&gt;CONNECTION_ID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;redshift_default&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;DB_NAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;amazon_sales&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;SCHEMA_NAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;public&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

&lt;span class="n"&gt;ROOT_PATH&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/opt/airflow/dags/dbt&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;DBT_PROJECT_PATH&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;ROOT_PATH&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/sales_dw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

&lt;span class="n"&gt;profile_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ProfileConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;profile_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sales_dw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;target_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;dev&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;profile_mapping&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;RedshiftUserPasswordProfileMapping&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;CONNECTION_ID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;profile_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;schema&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;SCHEMA_NAME&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="nd"&gt;@dag&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2023&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;14&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;schedule&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;catchup&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;dag_dbt_sales_dw_cosmos&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;

    &lt;span class="n"&gt;start_process&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;DummyOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;start_process&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;transform_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;DbtTaskGroup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;group_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;transform_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;project_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;ProjectConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;DBT_PROJECT_PATH&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;profile_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;profile_config&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;start_process&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;transform_data&lt;/span&gt;


&lt;span class="nf"&gt;dag_dbt_sales_dw_cosmos&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I'd like to highlight the following part of the code for further analysis:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;transform_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;DbtTaskGroup&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
       &lt;span class="n"&gt;group_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;transform_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;project_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;ProjectConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;DBT_PROJECT_PATH&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
       &lt;span class="n"&gt;profile_config&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;profile_config&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The "DbtTaskGroup" reads the directory where the DBT project is located. In the previous code, the path is indicated by the "DBT_PROJECT_PATH" variable. &lt;br&gt;
It then constructs Airflow tasks based on the models created in DBT, which, in our case, include staging, dimensions, and fact tables.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;ROOT_PATH&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/opt/airflow/dags/dbt&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;DBT_PROJECT_PATH&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;ROOT_PATH&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/sales_dw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Note&lt;/strong&gt;: It's important to emphasize that in this case, the DBT project needs to be on the same server as Airflow, as defined above.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;ProfileConfig&lt;/strong&gt; is the object that configures the connection. &lt;br&gt;
Essentially, it's the Airflow connection and some parameters that can be passed, such as the database schema.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;CONNECTION_ID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;redshift_default&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;DB_NAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;amazon_sales&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;SCHEMA_NAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;public&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

&lt;span class="n"&gt;profile_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ProfileConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
   &lt;span class="n"&gt;profile_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sales_dw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
   &lt;span class="n"&gt;target_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;dev&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
   &lt;span class="n"&gt;profile_mapping&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;RedshiftUserPasswordProfileMapping&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
       &lt;span class="n"&gt;conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;CONNECTION_ID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;profile_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;schema&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;SCHEMA_NAME&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Constructing the DAG is that simple – it's straightforward and efficient, isn't it?&lt;/p&gt;

&lt;p&gt;Here's an image of how the tasks built automatically by Cosmos in the Airflow UI will look:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkpxzogz0wt3xiysnn3xh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fkpxzogz0wt3xiysnn3xh.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You can see the dependencies for building &lt;strong&gt;fact_product_rating&lt;/strong&gt; in the image, as defined by the SQL in the models.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbcqcbn4eqiusqknomlok.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbcqcbn4eqiusqknomlok.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Also highlighted in the image are the dependencies for building &lt;strong&gt;fact_sales_category&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F08lcqhlo2pxopscufg04.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F08lcqhlo2pxopscufg04.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  3. Finally... Results in Redshift
&lt;/h4&gt;

&lt;p&gt;Upon running the DAG and building the models configured in DBT, we achieve the following result in Redshift:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr4d0jqns7lp943uy6d2a.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr4d0jqns7lp943uy6d2a.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You can see the &lt;strong&gt;sales&lt;/strong&gt; table, which contains raw data used for staging, as well as the materialized dimension and fact tables.&lt;br&gt;
Performing a query on the &lt;strong&gt;fact_sales_category&lt;/strong&gt; table, we get the following result:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F00nlpd26u7pznivexkir.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F00nlpd26u7pznivexkir.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;DBT is an excellent tool for data processing and modeling, providing convenience and speed for data projects. One of its strengths is that it automatically creates dependencies between models, which makes it easier to work with because you only need to write your transformations in SQL. &lt;/p&gt;

&lt;p&gt;The Cosmos library from Astronomer is also a great help in orchestrating DBT with Airflow, simplifying and speeding up the process. It provides an overview of the DBT models and their execution. &lt;/p&gt;

&lt;p&gt;Airflow + DBT + Cosmos = The Perfect Combination ❤&lt;/p&gt;

&lt;p&gt;If you're looking for a simple, efficient, and cost-effective data stack, the solution presented in this article may be effective and ideal for your scenario.&lt;/p&gt;

&lt;p&gt;I'll be writing more articles about DBT in the future, so stay tuned!&lt;/p&gt;




&lt;blockquote&gt;
&lt;p&gt;Follow me here and on other social networks:&lt;br&gt;
LinkedIn: &lt;a href="https://www.linkedin.com/in/cicero-moura/" rel="noopener noreferrer"&gt;https://www.linkedin.com/in/cicero-moura/&lt;/a&gt;&lt;br&gt;
Github: &lt;a href="https://github.com/cicerojmm" rel="noopener noreferrer"&gt;https://github.com/cicerojmm&lt;/a&gt;&lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>dbt</category>
      <category>airflow</category>
      <category>aws</category>
      <category>datamodeling</category>
    </item>
    <item>
      <title>Data-aware Scheduling in Airflow: A Practical Guide with DAG Factory</title>
      <dc:creator>Cícero Joasyo Mateus de Moura</dc:creator>
      <pubDate>Mon, 31 Jul 2023 20:37:03 +0000</pubDate>
      <link>https://dev.to/aws-builders/data-aware-scheduling-in-airflow-a-practical-guide-with-dag-factory-585k</link>
      <guid>https://dev.to/aws-builders/data-aware-scheduling-in-airflow-a-practical-guide-with-dag-factory-585k</guid>
      <description>&lt;p&gt;I recently contribute to an open-source project called &lt;a href="https://medium.com/r/?url=https%3A%2F%2Fgithub.com%2Fajbosco%2Fdag-factory" rel="noopener noreferrer"&gt;DAG Factory&lt;/a&gt;, library for building Airflow DAGs declaratively using YAML files, eliminating the need for Python coding.&lt;/p&gt;

&lt;p&gt;My contribution was to add support for the &lt;a href="https://medium.com/r/?url=https%3A%2F%2Fgithub.com%2Fajbosco%2Fdag-factory%2Fpull%2F171" rel="noopener noreferrer"&gt;Data-aware Scheduling (Datasets) functionality of Airflow&lt;/a&gt;, which was introduced starting from version 2.4 (at the time of writing this article, &lt;a href="https://medium.com/r/?url=https%3A%2F%2Fairflow.apache.org%2Fdocs%2Fapache-airflow%2Fstable%2Frelease_notes.html" rel="noopener noreferrer"&gt;Airflow is at version 2.6.3&lt;/a&gt;).&lt;/p&gt;

&lt;p&gt;The aim here is to talk about the Datasets functionality in Airflow, introduce the DAG Factory library, and create a practical example using both.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/cicerojmm/airflowDatasetsComDagFactory" rel="noopener noreferrer"&gt;You can access the repository with the code used in this article here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  What are Datasets in Airflow?
&lt;/h2&gt;

&lt;p&gt;Data-aware Scheduling allows creating DAGs linked to files, whether local or remote, to trigger data processing based on the modification of one or multiple files, known as datasets.&lt;/p&gt;

&lt;p&gt;Datasets help resolve the problem of data dependency between DAGs, which occurs when one DAG needs to consume data from another for analysis or further processing. They enable a more intelligent and visible scheduling with an explicit dependency between DAGs.&lt;/p&gt;

&lt;p&gt;Basically, there are two fundamental concepts in Airflow's Datasets:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;DAG Producer: this DAG creates or updates one or more datasets, accomplished through tasks using a parameter called &lt;strong&gt;outlets&lt;/strong&gt;, to specify a particular dataset.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;DAG Consumer: this DAG consumes one or more datasets and will be scheduled and triggered as soon as all datasets are successfully created or updated by the DAG Producer. The scheduling is done using the &lt;strong&gt;schedule&lt;/strong&gt; directly in the DAG configuration.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Currently, there are two ways to schedule DAGs in Airflow: either by a recurrent schedule (cron, timedelta, timetable, etc.) or through one or multiple datasets. It's important to note that we cannot use both scheduling methods in a single DAG, only one in each DAG.&lt;/p&gt;

&lt;h2&gt;
  
  
  DAG Factory: Building DAGs with YAML
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://medium.com/r/?url=https%3A%2F%2Fgithub.com%2Fajbosco%2Fdag-factory" rel="noopener noreferrer"&gt;DAG Factory&lt;/a&gt; is a community library that allows configuring Airflow to generate DAGs from one or multiple YAML files.&lt;/p&gt;

&lt;p&gt;The library aims to facilitate the creation and configuration of new DAGs by using declarative parameters in YAML. It allows default customizations and is open-source, making it easy to create and customize new functionalities.&lt;/p&gt;

&lt;p&gt;The community around this library is highly engaged, making it worth exploring =)&lt;/p&gt;

&lt;h2&gt;
  
  
  Practical Use of Datasets
&lt;/h2&gt;

&lt;p&gt;In this article, we'll work with the following scenario:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;We need to build a pipeline that downloads data from an API and saves the results to Amazon S3. After successfully extracting and saving the data, we need to process it. Hence, we'll have another pipeline that will be triggered based on the create or update of the data.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flnsisovo8pgon7s32ei5.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flnsisovo8pgon7s32ei5.gif" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The infrastructure to run Airflow and reproduce the example in this article &lt;a href="https://medium.com/r/?url=https%3A%2F%2Fgithub.com%2Fcicerojmm%2FairflowDatasetsComDagFactory%2Ftree%2Fmain%2Fairflow-docker-compose" rel="noopener noreferrer"&gt;can be found here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The first step is to build the pipeline that extracts and saves the data to S3.&lt;/p&gt;

&lt;h3&gt;
  
  
  Producer DAG for Data
&lt;/h3&gt;

&lt;p&gt;This pipeline consists of two tasks to extract data from the public &lt;a href="https://pokeapi.co/" rel="noopener noreferrer"&gt;PokeAPI&lt;/a&gt; and another two tasks to save the data to S3.&lt;/p&gt;

&lt;p&gt;The tasks that extract data from the API using the &lt;strong&gt;SimpleHttpOperator&lt;/strong&gt;, and the tasks that save the data to S3 use the &lt;strong&gt;S3CreateObjectOperator&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Since we'll be using YAML to build our DAGs, the following code constructs this first DAG with all its tasks.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;

&lt;span class="na"&gt;download_data_api_dataset_producer_dag&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Example&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;DAG&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;producer&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;custom&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;config&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;datasets"&lt;/span&gt;
  &lt;span class="na"&gt;schedule_interval&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;0&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;5&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;*&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;*&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;*"&lt;/span&gt;
  &lt;span class="na"&gt;task_groups&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;extract_data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;tooltip&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;this&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;is&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;a&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;task&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;group"&lt;/span&gt;
    &lt;span class="na"&gt;save_data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;tooltip&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;this&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;is&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;a&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;task&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;group"&lt;/span&gt;
  &lt;span class="na"&gt;tasks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;start_process&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;operator&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;airflow.operators.dummy.DummyOperator&lt;/span&gt;
    &lt;span class="na"&gt;get_items_data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;operator&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;airflow.providers.http.operators.http.SimpleHttpOperator&lt;/span&gt;
      &lt;span class="na"&gt;method&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GET"&lt;/span&gt;
      &lt;span class="na"&gt;http_conn_id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;poke_api"&lt;/span&gt;
      &lt;span class="na"&gt;endpoint&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;item/1"&lt;/span&gt;
      &lt;span class="na"&gt;task_group_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;extract_data&lt;/span&gt;
      &lt;span class="na"&gt;dependencies&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;start_process&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;save_items_data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;operator&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator&lt;/span&gt;
      &lt;span class="na"&gt;aws_conn_id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;aws_default&lt;/span&gt;
      &lt;span class="na"&gt;s3_bucket&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;cjmm-datalake-raw&lt;/span&gt;
      &lt;span class="na"&gt;s3_key&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;poke_api/item/data_{{&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;ts&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;}}.json"&lt;/span&gt;
      &lt;span class="na"&gt;data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;{{&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;ti.xcom_pull(task_ids='get_items_data')&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;}}"&lt;/span&gt;
      &lt;span class="na"&gt;dependencies&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;get_items_data&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
      &lt;span class="na"&gt;task_group_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;save_data&lt;/span&gt;
      &lt;span class="na"&gt;outlets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;file&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/opt/airflow/dags/dags_config/datasets_config.yml&lt;/span&gt;
        &lt;span class="na"&gt;datasets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;dataset_poke_items'&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;get_items_attribute_data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;operator&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;airflow.providers.http.operators.http.SimpleHttpOperator&lt;/span&gt;
      &lt;span class="na"&gt;method&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GET"&lt;/span&gt;
      &lt;span class="na"&gt;http_conn_id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;poke_api"&lt;/span&gt;
      &lt;span class="na"&gt;endpoint&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;item-attribute/1"&lt;/span&gt;
      &lt;span class="na"&gt;dependencies&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;start_process&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
      &lt;span class="na"&gt;task_group_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;extract_data&lt;/span&gt;
    &lt;span class="na"&gt;save_items_attribute_data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;operator&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator&lt;/span&gt;
        &lt;span class="na"&gt;aws_conn_id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;aws_default&lt;/span&gt;
        &lt;span class="na"&gt;s3_bucket&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;cjmm-datalake-raw&lt;/span&gt;
        &lt;span class="na"&gt;s3_key&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;poke_api/items_attribute/data_{{&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;ts&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;}}.json"&lt;/span&gt;
        &lt;span class="na"&gt;data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;{{&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;ti.xcom_pull(task_ids='get_items_attribute_data')&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;}}"&lt;/span&gt;
        &lt;span class="na"&gt;dependencies&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;get_items_attribute_data&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
        &lt;span class="na"&gt;task_group_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;save_data&lt;/span&gt;
        &lt;span class="na"&gt;outlets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;file&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/opt/airflow/dags/dags_config/datasets_config.yml&lt;/span&gt;
          &lt;span class="na"&gt;datasets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;dataset_poke_items_attribute'&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;A highlight is the configuration of the Datasets, done through the &lt;strong&gt;outlets&lt;/strong&gt; tag added to the tasks &lt;strong&gt;save_items_data&lt;/strong&gt; and &lt;strong&gt;save_items_attribute_data&lt;/strong&gt;.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;

&lt;span class="na"&gt;outlets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
   &lt;span class="na"&gt;file&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/opt/airflow/dags/dags_config/datasets_config.yml&lt;/span&gt;
   &lt;span class="na"&gt;datasets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;dataset_poke_items_attribute'&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;In this configuration, we specify the path of the file, where all Datasets are centrally declared for reuse, and the names of the datasets contained in the file for use.&lt;/p&gt;

&lt;p&gt;Below is the &lt;strong&gt;&lt;em&gt;datasets_config.yml&lt;/em&gt;&lt;/strong&gt; file used in this example, containing the Dataset's name (used only in Airflow) and the URI, which is the path where the current file is stored, in this case, Amazon S3.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;

&lt;span class="na"&gt;datasets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;dataset_poke_items_attribute&lt;/span&gt;
    &lt;span class="na"&gt;uri&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;s3://cjmm-datalake-raw/poke_api/items_attribute/*.json&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;dataset_poke_items&lt;/span&gt;
    &lt;span class="na"&gt;uri&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;s3://cjmm-datalake-raw/poke_api/items/*.json&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The resulting DAG visualization in Airflow will look like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fin7dnxif88laqiix2tw5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fin7dnxif88laqiix2tw5.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Consumer DAG for Data
&lt;/h3&gt;

&lt;p&gt;Now, let's build the DAG that consumes the data, which performs the processing and handling of the datasets.&lt;/p&gt;

&lt;p&gt;The DAG is scheduled based on datasets, not on an execution time, so it will only be triggered when all the datasets it depends on are updated.&lt;/p&gt;

&lt;p&gt;Currently, we cannot use two types of scheduling simultaneously; it's either through a schedule interval or datasets.&lt;/p&gt;

&lt;p&gt;In this example, we'll only build a DAG with &lt;strong&gt;PythonOperator&lt;/strong&gt;, simulating the consumption and processing of the produced data.&lt;/p&gt;

&lt;p&gt;Below is the configuration file for the consumer DAG:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;

&lt;span class="na"&gt;process_data_api_dataset_consumer_dag&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Example&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;DAG&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;consumer&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;custom&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;config&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;datasets"&lt;/span&gt;
  &lt;span class="na"&gt;schedule&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;file&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/opt/airflow/dags/dags_config/datasets_config.yml&lt;/span&gt;
    &lt;span class="na"&gt;datasets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;dataset_poke_items'&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;dataset_poke_items_attribute'&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
  &lt;span class="na"&gt;tasks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;start_process&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;operator&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;airflow.operators.dummy.DummyOperator&lt;/span&gt;
    &lt;span class="na"&gt;process_data&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;operator&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;airflow.operators.python_operator.PythonOperator&lt;/span&gt;
      &lt;span class="na"&gt;python_callable_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;process_data_function&lt;/span&gt;
      &lt;span class="na"&gt;python_callable_file&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/opt/airflow/dags/process_data.py&lt;/span&gt;
      &lt;span class="na"&gt;task_group_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;etl_data&lt;/span&gt;
      &lt;span class="na"&gt;provide_context&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
      &lt;span class="na"&gt;dependencies&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;start_process&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;A highlight is the configuration of the &lt;strong&gt;schedule&lt;/strong&gt; based on datasets, which is similar to the configuration of the &lt;strong&gt;outlets&lt;/strong&gt; in the producer DAG:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;

&lt;span class="na"&gt;schedule&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
   &lt;span class="na"&gt;file&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;/opt/airflow/dags/dags_config/datasets_config.yml&lt;/span&gt;
   &lt;span class="na"&gt;datasets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;dataset_poke_items'&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;dataset_poke_items_attribute'&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The resulting DAG visualization in Airflow will be as follows:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7siy1n9890elnhz15jq2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7siy1n9890elnhz15jq2.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Overview of DAGs with Datasets
&lt;/h3&gt;

&lt;p&gt;When we have DAGs using Airflow's datasets, we can observe some interesting points:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The consumer DAG in the list of all DAGs is flagged to indicate scheduling based on datasets.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsw1wyg3291undieu8sdf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsw1wyg3291undieu8sdf.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;There is a specific visualization in the Airflow menu called Datasets, you can check the configured datasets, the dependencies between DAGs, and the log of dataset creation, update, and consumption.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8zq7wm16ydt546y9ahil.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8zq7wm16ydt546y9ahil.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The DAG Dependencies visualization shows the relationships between the DAGs, providing a helpful overview of the processing mesh and data dependencies.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fv1x9kha2o0won4kc6le7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fv1x9kha2o0won4kc6le7.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Important Points about Datasets
&lt;/h2&gt;

&lt;p&gt;The functionality of Datasets in Airflow is still recent, and there are many improvements in the community backlog. However, I would like to highlight some points at this moment:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Currently, Airflow's dataset functionality does not directly inspect the physical file itself. Instead, it schedules the consumer pipeline directly through the database, almost like an implicit DAG Trigger.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Considering the previous point, it's better to use a Sensor if you genuinely need to "see and access" the data when triggering the DAG Consumer.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The official documentation does not recommend using regular expressions in the URI of datasets. However, in my tests, I didn't encounter any issues with this, as the functionality doesn't yet look directly at the physical file.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Since the DAG Consumer doesn't have a specific schedule, it's challenging to measure if it was triggered at a planned time, making it difficult to define an SLA. A more refined monitoring approach is needed to avoid missing critical scheduling.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;By using the DAG Factory library, we simplify the process of creating and configuring new DAGs, leveraging the extensibility provided by the library's open-source code.&lt;/p&gt;

&lt;p&gt;Airflow's Datasets enable more efficient scheduling by triggering DAGs only when necessary data is available, avoiding unnecessary and delayed executions.&lt;/p&gt;

&lt;p&gt;I hope this article has been useful in understanding Airflow's Datasets functionality and how to apply it to your projects. With this approach, you can build more robust and efficient pipelines, fully utilizing Airflow's potential.&lt;/p&gt;

&lt;p&gt;Follow me:&lt;br&gt;
LinkedIn: &lt;a href="https://www.linkedin.com/in/cicero-moura/" rel="noopener noreferrer"&gt;https://www.linkedin.com/in/cicero-moura/&lt;/a&gt;&lt;br&gt;
Github: &lt;a href="https://github.com/cicerojmm" rel="noopener noreferrer"&gt;https://github.com/cicerojmm&lt;/a&gt;&lt;/p&gt;

</description>
      <category>airflow</category>
      <category>dagfactory</category>
      <category>aws</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Data Quality at Scale with Great Expectations, Spark, and Airflow on EMR</title>
      <dc:creator>Cícero Joasyo Mateus de Moura</dc:creator>
      <pubDate>Mon, 24 Apr 2023 13:56:19 +0000</pubDate>
      <link>https://dev.to/aws-builders/data-quality-at-scale-with-great-expectations-spark-and-airflow-on-emr-5bnm</link>
      <guid>https://dev.to/aws-builders/data-quality-at-scale-with-great-expectations-spark-and-airflow-on-emr-5bnm</guid>
      <description>&lt;p&gt;Data quality is one of the biggest challenges that companies face nowadays, as it's necessary to ensure that the data is accurate, reliable, and relevant so that the decisions made based on this data are successful.&lt;/p&gt;

&lt;p&gt;In this regard, we have seen several trends emerge, such as the Modern Data Stack that brings data quality as one of the main practices.&lt;/p&gt;

&lt;p&gt;The &lt;a href="https://www.moderndatastack.xyz/" rel="noopener noreferrer"&gt;Modern Data Stack (MDS)&lt;/a&gt; is a set of tools and technologies that help companies store, manage, and learn from their data quickly and efficiently. Concepts such as Data Quality and Data Observability are highlights of the MDS.&lt;/p&gt;

&lt;p&gt;This article aims to explore Great Expectations, a data validation tool contained within the MDS, which can be used in conjunction with Spark to ensure data quality at scale.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;The code used in this article can be found in &lt;a href="https://github.com/cicerojmm/dataQualityGreatExpectationsSpark" rel="noopener noreferrer"&gt;this repository&lt;/a&gt;.&lt;br&gt;
The link to the data docs generated by Great Expectations can also be &lt;a href="http://datadocs-greatexpectations.cjmm.s3-website-us-east-1.amazonaws.com/" rel="noopener noreferrer"&gt;accessed here&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Great Expectations
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://greatexpectations.io/" rel="noopener noreferrer"&gt;Great Expectations&lt;/a&gt; (GE) is an &lt;a href="https://github.com/great-expectations/great_expectations" rel="noopener noreferrer"&gt;open-source&lt;/a&gt; data validation tool that helps ensure data quality.&lt;/p&gt;

&lt;p&gt;With Great Expectations, it's possible to define expectations about your data and check whether they meet them or not.&lt;/p&gt;

&lt;p&gt;Some of the existing functionalities include the ability to validate data schema, ensure referential integrity, check consistency, and detect anomalies.&lt;/p&gt;

&lt;p&gt;GE is very flexible and scalable, allowing integration into our data pipelines, whether to validate, generate reports, or even prevent the pipeline from advancing by recording inconsistent data in the most "curated" layers of the Data Lake.&lt;/p&gt;

&lt;p&gt;Some points that we can highlight:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;it's possible to create tests for your data directly from Spark or Pandas dataframe;&lt;/li&gt;
&lt;li&gt;it's possible to create data documentation in HTML including expectation suites and validation reports;&lt;/li&gt;
&lt;li&gt;it's possible to save a set of tests (suite) to be used later (&lt;a href="https://docs.greatexpectations.io/docs/terms/checkpoint/" rel="noopener noreferrer"&gt;checkpoints&lt;/a&gt;);&lt;/li&gt;
&lt;li&gt;we can use a large number of ready-made expectations or easily create &lt;a href="https://docs.greatexpectations.io/docs/guides/expectations/creating_custom_expectations/overview" rel="noopener noreferrer"&gt;custom expectations&lt;/a&gt; that meet our test cases;
-it has a &lt;a href="https://docs.greatexpectations.io/docs/guides/miscellaneous/how_to_use_the_great_expectations_cli/" rel="noopener noreferrer"&gt;CLI&lt;/a&gt; that simplifies the creation of test cases, or we can generate tests by coding in Python;&lt;/li&gt;
&lt;li&gt;it's possible to connect directly to &lt;a href="https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview" rel="noopener noreferrer"&gt;data source origins&lt;/a&gt;, consequently validating data more quickly.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Practical case with Great Expectations
&lt;/h3&gt;

&lt;p&gt;In this article, we present a scenario that is closest to what we find in our daily lives, so we will work with the following case:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;We have data stored in a Data Lake located on AWS S3 and need to verify data quality before the business makes critical decisions based on it.&lt;br&gt;
The dataset used is about product sales from an e-commerce website (Amazon) and tells us a lot about the behavior of that store's customers.&lt;br&gt;
The dataset used in this article is open and can be found on Kaggle at this &lt;a href="https://www.kaggle.com/datasets/karkavelrajaj/amazon-sales-dataset" rel="noopener noreferrer"&gt;this link&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Using Great Expectations with Spark on EMR
&lt;/h3&gt;

&lt;p&gt;This article will use Great Expectations with Spark to execute test cases.&lt;br&gt;
The Spark environment will be on EMR and Airflow will be the means of orchestrating the jobs that will run.&lt;/p&gt;

&lt;p&gt;To facilitate the understanding of the process, we will analyze the architecture design below:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frrcbviv8tiiw6x82amtj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frrcbviv8tiiw6x82amtj.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We can highlight the following points:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;the Spark code containing all the logic to execute GE with Spark will be stored in S3;&lt;/li&gt;
&lt;li&gt;the data is also stored in S3 in CSV format;&lt;/li&gt;
&lt;li&gt;the generated data docs will also be stored in an S3 bucket configured for a &lt;a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/WebsiteHosting.html" rel="noopener noreferrer"&gt;static site&lt;/a&gt;;&lt;/li&gt;
&lt;li&gt;Airflow will orchestrate the EMR and control the lifecycle of the jobs.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  1. Creation of Spark script with Great Expectations
&lt;/h3&gt;

&lt;p&gt;To create the Spark script that contains the test cases, we will divide it into some steps, as follows:&lt;/p&gt;

&lt;h4&gt;
  
  
  Configuration of the context
&lt;/h4&gt;

&lt;p&gt;The GE context indicates the main configurations to be considered to run the tests.&lt;/p&gt;

&lt;p&gt;The following code configures the context through a YAML created by a Python object itself.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="n"&gt;datasource_yaml&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    name: my_spark_datasource
    class_name: Datasource
    module_name: great_expectations.datasource
    execution_engine:
        module_name: great_expectations.execution_engine
        class_name: SparkDFExecutionEngine
    data_connectors:
        my_runtime_data_connector:
            class_name: RuntimeDataConnector
            batch_identifiers:
                - some_key_maybe_pipeline_stage
                - some_other_key_maybe_airflow_run_id
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_context_ge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;output_path&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
  &lt;span class="n"&gt;context&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ge&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_context&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

  &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_expectation_suite&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="n"&gt;expectation_suite_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;suite_name&lt;/span&gt;
  &lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_datasource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;yaml&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;load&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datasource_yaml&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
  &lt;span class="nf"&gt;config_data_docs_site&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;output_path&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;



&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The configuration of this context is basically informing that Spark is used to perform the tests, as it could be another scenario, such as the use of Pandas.&lt;/p&gt;

&lt;h4&gt;
  
  
  Configuring Data Docs
&lt;/h4&gt;

&lt;p&gt;An important point is setting where our &lt;strong&gt;data docs&lt;/strong&gt; will be saved. By default, the HTML documentation is generated on the local disk, but for this article, the data docs will be &lt;a href="https://docs.greatexpectations.io/docs/guides/setup/configuring_data_docs/how_to_host_and_share_data_docs_on_amazon_s3/" rel="noopener noreferrer"&gt;stored and hosted by S3&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The destination bucket (&lt;strong&gt;output_path&lt;/strong&gt;) is a parameter in the following code, so the script becomes more dynamic and customizable.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;config_data_docs_site&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;output_path&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;data_context_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;DataContextConfig&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;data_context_config&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data_docs_sites&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3_site&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;class_name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SiteBuilder&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;store_backend&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;class_name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;TupleS3StoreBackend&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bucket&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;output_path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;replace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3://&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;""&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;},&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;site_index_builder&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;class_name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;DefaultSiteIndexBuilder&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_project_config&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data_docs_sites&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;data_context_config&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data_docs_sites&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;



&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
  
  
  Creation of a Validator
&lt;/h4&gt;

&lt;p&gt;Before adding the test cases, we must configure a &lt;strong&gt;Validator&lt;/strong&gt; to indicate the tests as a &lt;strong&gt;Batch Request&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;The Validator already incorporates data validation functions in a built-in way, as we will see later, which makes the creation of test cases much easier and more intuitive.&lt;/p&gt;

&lt;p&gt;The code below configures and creates the Validator using the context of our tests and the dataframe containing the data for validation.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_validator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;suite&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;runtime_batch_request&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;RuntimeBatchRequest&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;datasource_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my_spark_datasource&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;data_connector_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my_runtime_data_connector&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;data_asset_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;insert_your_data_asset_name_here&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;runtime_parameters&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;batch_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="n"&gt;batch_identifiers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;some_key_maybe_pipeline_stage&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ingestion step 1&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;some_other_key_maybe_airflow_run_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;run 18&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Validator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_validator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;batch_request&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;runtime_batch_request&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;expectation_suite&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;suite&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;df_validator&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
  
  
  Creating Test Cases
&lt;/h4&gt;

&lt;p&gt;The most awaited moment has arrived, creating the test cases.&lt;/p&gt;

&lt;p&gt;At this stage, the objective is to work with two scenarios of test cases: the first is to run a data profile and the other is to add custom test cases as the business needs.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Data Profile&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Data Profile is the process  of examining, analyzing, reviewing, and summarizing datasets to obtain information about the quality of the data.&lt;/p&gt;

&lt;p&gt;The GE allows you to create a data profile automatically and very simply.&lt;/p&gt;

&lt;p&gt;In this &lt;strong&gt;profile&lt;/strong&gt;, information will be generated for all data columns, including tests to check for null values, data types, and the most frequent pattern in each column.&lt;/p&gt;

&lt;p&gt;To create a data profile and add it to the test context, you just need to have the following code:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_profile_suite&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;df_ge&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;profiler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BasicDatasetProfiler&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;expectation_suite&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;validation_result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;profiler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;profile&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df_ge&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;save_expectation_suite&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;expectation_suite&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;suite_profile_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;An important point is that the profile is executed through a Spark object created by GE (&lt;strong&gt;df_ge&lt;/strong&gt;), which will be seen later, it differs from the other test cases that will be added next, as they are based on the Validator object (created in the previous step).&lt;/p&gt;

&lt;p&gt;Another point to highlight is that a name was used for the test suite of the profile and another for the validator tests, so they will be separated in the data docs, which helps with documentation organization.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Test cases&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now just add the test cases as needed for data validation.&lt;/p&gt;

&lt;p&gt;The following code adds the following tests:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Validate if &lt;strong&gt;all desired columns&lt;/strong&gt; are in the dataset;&lt;/li&gt;
&lt;li&gt;Validate if the &lt;strong&gt;product_id&lt;/strong&gt; field has unique and non-null values;&lt;/li&gt;
&lt;li&gt;Validate if the &lt;strong&gt;discount_percentage&lt;/strong&gt; field contains only values between 0 and 100;&lt;/li&gt;
&lt;li&gt;Validate if the &lt;strong&gt;rating&lt;/strong&gt; field contains only values between 0 and 5;&lt;/li&gt;
&lt;li&gt;Validate if the &lt;strong&gt;product_link&lt;/strong&gt; field contains only data with a valid link format, using a regex to validate the pattern.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;After adding all desired test cases, save the test suite's expectations:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;add_tests_suite&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;columns_list&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;product_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;product_name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;category&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;discounted_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;actual_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;discount_percentage&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;rating&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;rating_count&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;about_product&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;user_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;user_name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;review_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;review_title&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;review_content&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;img_link&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;product_link&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

    &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;expect_table_columns_to_match_ordered_list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;columns_list&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;expect_column_values_to_be_unique&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;product_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;expect_column_values_to_not_be_null&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;product_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;expect_column_values_to_be_between&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;column&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;discount_percentage&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;min_value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;max_value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;expect_column_values_to_be_between&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;column&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;rating&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;min_value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;max_value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;expect_column_values_to_match_regex&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;column&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;product_link&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;regex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sa"&gt;r&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;^https:\/\/www\.[a-zA-Z0-9\-\.]+\.[a-zA-Z]{2,}$&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;mostly&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.9&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;save_expectation_suite&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;discard_failed_expectations&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;df_validator&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
  
  
  Running the tests
&lt;/h4&gt;

&lt;p&gt;Now it's time to connect all the dots.&lt;/p&gt;

&lt;p&gt;The code below is the main function that will be called by Spark. It reads the data we want and invokes the other functions we discussed earlier to set up and execute the test suites:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;process_suite_ge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;input_path&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;output_path&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;path_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_path&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sales&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;amazon.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;csv&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;header&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;load&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;path_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_ge&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SparkDFDataset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;context&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_context_ge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;output_path&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;suite&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ExpectationSuite&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_expectation_suite&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;expectation_suite_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;suite_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="nf"&gt;add_profile_suite&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;df_ge&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;df_validator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_validator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;suite&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_validator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;add_tests_suite&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;results&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df_validator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;validate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;expectation_suite&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;suite&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;build_data_docs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;site_names&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;s3_site&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;results&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;success&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;The test suite run successfully: &lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
              &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;results&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;success&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]))&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Validation action if necessary&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;



&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  2. Creating the DAG in Airflow
&lt;/h3&gt;

&lt;p&gt;In this step, it's time to create a DAG in Airflow to run &lt;a href="https://github.com/cicerojmm/dataQualityGreatExpectationsSpark/tree/main/airflow/dags" rel="noopener noreferrer"&gt;the tests with GE inside the EMR with Spark&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;We will have the following tasks in our DAG:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;create_emr: task responsible for creating the EMR for job execution. Remember to configure the connection with AWS (aws_default) or IAM if you're running Airflow on AWS. &lt;a href="https://github.com/cicerojmm/dataQualityGreatExpectationsSpark/blob/main/airflow/dags/emr_config.json" rel="noopener noreferrer"&gt;EMR configurations can be found in the project repository&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;add_step: responsible for adding a job to the EMR (step). We will see the configuration of this job (&lt;strong&gt;spark-submit&lt;/strong&gt;) later on.&lt;/li&gt;
&lt;li&gt;watch_step: an Airflow &lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html" rel="noopener noreferrer"&gt;sensor&lt;/a&gt; responsible for monitoring the status of the previous job until it is completed, either successfully or with failure.&lt;/li&gt;
&lt;li&gt;terminate_emr: after the job is finished, this task terminates the EMR instance allocated for running the tests.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Below is the code for the DAG:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="n"&gt;create_emr&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;EmrCreateJobFlowOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;create_emr&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;aws_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;aws_default&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;job_flow_overrides&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;JOB_FLOW_OVERRIDES&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;add_step&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;EmrAddStepsOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;add_step&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;job_flow_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;{{ task_instance.xcom_pull(task_ids=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;create_emr&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, key=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;return_value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;) }}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;steps&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;STEPS_EMR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="n"&gt;watch_step&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;EmrStepSensor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;watch_step&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;job_flow_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;{{ task_instance.xcom_pull(task_ids=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;create_emr&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, key=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;return_value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;) }}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;step_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;{{ task_instance.xcom_pull(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;add_step&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, key=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;return_value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;)[0] }}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;aws_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;aws_default&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;terminate_emr&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;EmrTerminateJobFlowOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;terminate_emr&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;job_flow_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;{{ task_instance.xcom_pull(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;create_emr&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, key=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;return_value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;) }}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;aws_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;aws_default&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;trigger_rule&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;TriggerRule&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ALL_DONE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;create_emr&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;add_step&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;watch_step&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;terminate_emr&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Now I'll detailthe configuration of the job that will be added to EMR to process the tests, which is basically a &lt;strong&gt;spark-submit&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;We can check all the settings in the code below, including the script parameters.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="n"&gt;args&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;job_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;process_suite_ge&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;input_path&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3://cjmm-datalake-raw&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;output_path&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3://datadocs-greatexpectations.cjmm&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;

&lt;span class="n"&gt;STEPS_EMR&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Run Data Quality with Great Expectations&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ActionOnFailure&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;CONTINUE&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;HadoopJarStep&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Jar&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;command-runner.jar&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Args&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/usr/bin/spark-submit&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--deploy-mode&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;client&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--master&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;yarn&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--num-executors&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--executor-cores&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--py-files&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3://cjmm-code-spark/data_quality/modules.zip&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3://cjmm-code-spark/data_quality/main.py&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;args&lt;/span&gt;
            &lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}]&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;It's important to highlight that the code that will be executed in Spark is stored in S3, both the &lt;a href="https://github.com/cicerojmm/dataQualityGreatExpectationsSpark/blob/main/script_pyspark_emr/main.py" rel="noopener noreferrer"&gt;main.py&lt;/a&gt; file that calls the other functions and the &lt;a href="https://github.com/cicerojmm/dataQualityGreatExpectationsSpark/tree/main/script_pyspark_emr/modules" rel="noopener noreferrer"&gt;modules.zip&lt;/a&gt; file that contains all the logic for the tests to run.&lt;/p&gt;

&lt;p&gt;This coding model was adopted to be scalable and easier to maintain, besides allowing us to easily run Spark in client or cluster mode.&lt;/p&gt;

&lt;h3&gt;
  
  
  3. Executing the Script on EMR
&lt;/h3&gt;

&lt;p&gt;With the script developed, and the Airflow DAG created, we can now run the tests.&lt;br&gt;
Below is an example of the Airflow DAG that ran successfully:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbbtrvpyr89v15gbr360u.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbbtrvpyr89v15gbr360u.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The following images shows more details about the job successfully executed on EMR:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flssza6849ua6qxdvgqyi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flssza6849ua6qxdvgqyi.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  4. Results
&lt;/h3&gt;

&lt;p&gt;Now it's time to analyze the two results of the executed tests.&lt;br&gt;
The first one is the data docs files saved in the S3 bucket, as shown in the following image:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftht9bcpy2h5k1nwrwrw1.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftht9bcpy2h5k1nwrwrw1.jpg" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The second result is accessing the data docs, as shown below:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvhs5ncomed5ny2x0kgoe.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvhs5ncomed5ny2x0kgoe.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Remember that the data docs created in this article can be accessed at &lt;a href="http://datadocs-greatexpectations.cjmm.s3-website-us-east-1.amazonaws.com/" rel="noopener noreferrer"&gt;this link&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;When accessing the suite with the data &lt;strong&gt;profile&lt;/strong&gt;, we have the following result:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzelrdu15dnfru73579yp.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fzelrdu15dnfru73579yp.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And when accessing the suite with the created &lt;strong&gt;test cases&lt;/strong&gt;, we have the result below:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5wl8eo4dxs4pd75lcmit.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5wl8eo4dxs4pd75lcmit.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;Great Expectation is the fastest-growing open-source data quality tool with a highly active community, constantly updated, and several large companies worldwide using it.&lt;br&gt;
With GE, we can easily create test cases for various scenarios that accommodate different datasets and customize tests for our use cases.&lt;/p&gt;

&lt;p&gt;In addition to bringing statistical results of tests that we can save and use as desired, it also brings ready-to-use data docs in HTML with a lot of helpful information about data quality.&lt;/p&gt;

&lt;p&gt;Great Expectation is an excellent tool with easy integration and management. It uses concepts we already know in the world of Big Data, so it is worth testing and using it daily to mature your Data Governance and Data Quality Monitoring.&lt;/p&gt;

&lt;p&gt;Remember:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;More than having data available for analysis, it is essential to ensure its quality.&lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>aws</category>
      <category>programming</category>
      <category>greatexpectations</category>
      <category>tutorial</category>
    </item>
  </channel>
</rss>
