<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Joseph</title>
    <description>The latest articles on DEV Community by Joseph (@start_data_engineering).</description>
    <link>https://dev.to/start_data_engineering</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F383548%2F53158f43-d7a4-4bbc-a6c6-9579e3186a94.png</url>
      <title>DEV Community: Joseph</title>
      <link>https://dev.to/start_data_engineering</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/start_data_engineering"/>
    <language>en</language>
    <item>
      <title>A proven approach to land a Data Engineering job</title>
      <dc:creator>Joseph</dc:creator>
      <pubDate>Wed, 03 Jun 2020 03:58:43 +0000</pubDate>
      <link>https://dev.to/start_data_engineering/a-proven-approach-to-land-a-data-engineering-job-p64</link>
      <guid>https://dev.to/start_data_engineering/a-proven-approach-to-land-a-data-engineering-job-p64</guid>
      <description>&lt;p&gt;I have seen and been asked the following questions by students, backend engineers and analysts who want to get into the data engineering industry.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;What approach should i take to land a Data Engineering job?&lt;/p&gt;

&lt;p&gt;I really want to get into DE. What can I do to learn more about it?&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;In this article, I will try to provide a general approach that you as a beginner, student, backend engineer or analyst can use to land your first data engineering job. This is the approach I followed to get my first data engineering job.&lt;/p&gt;

&lt;h2&gt;
  
  
  What is a Data Engineer
&lt;/h2&gt;

&lt;p&gt;Before we try to solve a problem, we should define it clearly. The job definition of a &lt;code&gt;data engineer&lt;/code&gt; varies widely depending on the company and team. Many backend engineers are basically data engineers in many cases. For our case, we define a data engineer purely based on the skills and not on the job title.&lt;/p&gt;

&lt;p&gt;Upon researching the popular job boards, understanding the current direction of the data engineering industry, interviewing and hiring data engineers, we can define a beginner data engineer as someone with the skills shown below&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Knowledge of a scripting language such as &lt;code&gt;python&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In depth knowledge of OLTP data modeling and when to use it such as &lt;code&gt;star schema&lt;/code&gt;, &lt;code&gt;indexes&lt;/code&gt;, etc&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In depth knowledge of OLAP data modeling and when to use it such as &lt;code&gt;distribution key&lt;/code&gt;, &lt;code&gt;partitioning&lt;/code&gt;, etc&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Understanding of Unix based system and commands.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Knowledge of a distributed data store such as &lt;code&gt;HDFS&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Knowledge of a distributed batch data processing framework such as &lt;code&gt;Apache spark&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Knowledge of a data pipeline orchestration tool such as &lt;code&gt;Apache Airflow&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Basic knowledge of queuing system such as &lt;code&gt;kafka&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;There is a more detailed list &lt;a href="https://dev.to/post/10-key-skills-data-engineer/"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;As you can see, it is an extensive list. This is in addition to knowing CS engineering basics such as basics of web development, FE constructs, BE constructs, APIs and databases.&lt;/p&gt;

&lt;h2&gt;
  
  
  Starting point
&lt;/h2&gt;

&lt;p&gt;Now that we have defined what a &lt;code&gt;data engineer&lt;/code&gt; is, we can form a plan to get there. Your starting point may be different given your individual circumstances, but generally people who want to move into data engineering fall into one of the following categories&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Beginner (no knowledge of computers)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Student (CS degree either undergrad or grad school)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Backend/ Fullstack/ Frontend Engineer (either FE or BE or other disciplines that involves creating software)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data Analyst&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fsyk3i8wwu6izossi9t1v.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fsyk3i8wwu6izossi9t1v.png" alt="Path to data engineering"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  1. Beginner
&lt;/h3&gt;

&lt;p&gt;Everyone starts here. The best and surest way for someone here to get into &lt;code&gt;data engineering&lt;/code&gt; is to first get a job as a &lt;code&gt;Backend Engineer&lt;/code&gt; or &lt;code&gt;Fullstack Engineer&lt;/code&gt;. These jobs will provide you the basic skills you will need to land a DE role. So how do you get into &lt;code&gt;Backend/Fullstack Engineering&lt;/code&gt;? There are 3 main approaches&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;College, CS Degree - very long, tried and true approach, expensive, good chance of getting an engineering job(depending on the college and individual pre)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Coding Bootcamp - short, a bit of wild west but mostly good approach, expensive, ok-ish chance of getting an engineering job (this may be changing)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Self learning - can be very very long, without a mentor or someone to guide you this can get tricky, not a great chance of getting an engineering job&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Which route you choose will depend on your individual circumstances. Irrespective of the route you take, &lt;strong&gt;&lt;a href="https://leetcode.com/" rel="noopener noreferrer"&gt;Leetcode&lt;/a&gt; is crucial.&lt;/strong&gt; make sure you know the commonly asked interview questions for the company you are interviewing at.&lt;/p&gt;

&lt;h3&gt;
  
  
  2. Student (CS)
&lt;/h3&gt;

&lt;p&gt;This is a good place to be in. You have good knowledge of computers, a few programming languages, what an &lt;code&gt;API&lt;/code&gt; is, algorithms, data structures, machine learning, &lt;code&gt;distributed systems&lt;/code&gt; and &lt;code&gt;operating systems&lt;/code&gt;. If you are here, there is a possibility that you might be able to land a &lt;code&gt;junior data engineer&lt;/code&gt; but these roles are very rare. In order to land a good &lt;code&gt;Backend/Fullstack engineer&lt;/code&gt; role, you will need to&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Build a few projects(ideally &amp;gt;=3 or a big one), probably a CRUD based web app with complex logic and a database. Make sure it actually works and that potential employers can try it out easily online. Adding in a &lt;code&gt;design diagram&lt;/code&gt;, description, why and how you built it as a &lt;code&gt;README.md&lt;/code&gt; on &lt;code&gt;github&lt;/code&gt; would show understanding of product requirements and clear communication.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;&lt;a href="https://leetcode.com/" rel="noopener noreferrer"&gt;Leetcode&lt;/a&gt; is crucial.&lt;/strong&gt; For better or worse, companies heavily rely on algorithms and data structure type questions to recruit engineers. Make sure you know the commonly asked interview questions for the company you are interviewing at.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  3. Backend / Fullstack / Frontend Engineer
&lt;/h3&gt;

&lt;p&gt;You might already be doing some data engineering work. Luckily, you have most of the skills needed to learn the rest. Side projects are great, but work experience weighs a lot more in hiring decisions. Here are some actions you can take to increase your chances of getting an interview&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Take initiative and build a data pipeline at your current job. This can become a point on your resume which leads to interviews. For example &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;If you are working on a &lt;code&gt;webapp&lt;/code&gt; at your job, in your free time, build a simple data processing pipeline using &lt;code&gt;python&lt;/code&gt; and &lt;code&gt;cron&lt;/code&gt; to analyze the logs and find the places where the most errors occur in your code base. This might have the potential to reduce bugs.&lt;/li&gt;
&lt;li&gt;If this provides valuable information, present it to your boss. Even if there are no new projects building trust, showing interest and recognition will make sure that when a new project comes in, you are the go-to person.&lt;/li&gt;
&lt;li&gt;Most companies these days are aware of the benefits of data and analytics, so keep trying to come up with and implement new ideas for work. This will lead to more valuable experience than just side projects.&lt;/li&gt;
&lt;/ol&gt;


&lt;/li&gt;

&lt;li&gt;&lt;p&gt;Another interesting project maybe understanding the changes in your database using &lt;code&gt;debezium&lt;/code&gt; and &lt;code&gt;kafka&lt;/code&gt;, &lt;a href="https://dev.to/post/change-data-capture-using-debezium-kafka-and-pg/"&gt;like shown here&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;

&lt;li&gt;&lt;p&gt;Sometimes it is difficult to implement a new project at work for various reasons. In such cases, try building a side project (&lt;a href="https://dev.to/post/data-engineering-project-for-beginners-batch-edition/"&gt;example&lt;/a&gt;) and make sure to write out a detailed &lt;code&gt;README.md&lt;/code&gt; on your &lt;code&gt;github&lt;/code&gt; repo and note the skills you learnt on your &lt;code&gt;linkedin&lt;/code&gt; profile for keyword based search discovery by recruiters.&lt;/p&gt;&lt;/li&gt;

&lt;/ol&gt;

&lt;p&gt;As I mention in the previous sections, &lt;strong&gt;&lt;a href="https://leetcode.com/" rel="noopener noreferrer"&gt;Leetcode&lt;/a&gt; is crucial&lt;/strong&gt; for interviews.&lt;/p&gt;

&lt;h3&gt;
  
  
  4. Data Analyst
&lt;/h3&gt;

&lt;p&gt;You are already in a good position to transition, but there will probably need to be work done in the engineering part. You might already be using SQL to pull data from a data warehouse. You can&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Automate one data pull using python.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Schedule that data pull to run at certain time every day using &lt;code&gt;cron&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Automate more data pulls. For complex data pulls setup &lt;code&gt;Airflow&lt;/code&gt; and use it &lt;a href="https://dev.to/post/data-engineering-project-for-beginners-batch-edition/"&gt;sample project&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Understand your data warehouse infrastructure. (e.g. size of the warehouse cluster, partitions, how data is loaded etc).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If you can, do some NLP or big data processing in &lt;code&gt;Apache Spark&lt;/code&gt; using &lt;code&gt;AWS EMR&lt;/code&gt; or in &lt;code&gt;GCP dataflow&lt;/code&gt;. This would be great.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;As I mention in the previous sections, &lt;strong&gt;&lt;a href="https://leetcode.com/" rel="noopener noreferrer"&gt;Leetcode&lt;/a&gt; is crucial&lt;/strong&gt; for interviews.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;TL; DR&lt;/code&gt;
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Research the data engineering job postings.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Use the technical requirements as a base to build projects at work/self that actually help your company/you in some way.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Put those points as experience on your resume.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Practice interview questions on &lt;a href="https://leetcode.com/" rel="noopener noreferrer"&gt;Leetcode&lt;/a&gt; regularly.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Repeat the above 4 steps and you will be in a much better place in 3 months.&lt;/p&gt;

&lt;p&gt;Hope this article gives you some direction and helps you land your first data engineering job. Good luck.&lt;br&gt;
Let me know the approach you plan to take in the comments below or send us an email &lt;a href="https://www.startdataengineering.com/contact" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;NOTE: This is a cross post from &lt;a href="https://www.startdataengineering.com/" rel="noopener noreferrer"&gt;https://www.startdataengineering.com/&lt;/a&gt;&lt;/p&gt;

</description>
      <category>database</category>
      <category>beginners</category>
      <category>career</category>
      <category>dataengineering</category>
    </item>
    <item>
      <title>Data Engineering Project for Beginners - Batch edition</title>
      <dc:creator>Joseph</dc:creator>
      <pubDate>Thu, 28 May 2020 11:11:59 +0000</pubDate>
      <link>https://dev.to/start_data_engineering/data-engineering-project-for-beginners-batch-edition-11l8</link>
      <guid>https://dev.to/start_data_engineering/data-engineering-project-for-beginners-batch-edition-11l8</guid>
      <description>&lt;p&gt;Starting out in data engineering can be a little intimidating, especially because data engineering involves a lot of moving parts. I have seen and been asked questions like&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;I’m a data analyst who is striving to become a data engineer. What are some &lt;code&gt;aws&lt;/code&gt; project ideas that I could start working on and gain some DE experience?&lt;/p&gt;

&lt;p&gt;Are there any front-to-back tutorials for a basic DE project?&lt;/p&gt;

&lt;p&gt;What is a good project to get DE experience for job interviews?&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;The objective of this article is to help you&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Build and understand a data processing framework used for Batch data loading by companies&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Set up and understand the cloud components involved (Redshift, EMR)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Understand how to spot failure points in an data processing pipeline and how to build systems resistant to failures and errors&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Understand how to approach or build an data processing pipeline from the ground up&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Approach
&lt;/h2&gt;

&lt;p&gt;The best way to get a good understanding of any topic is to try it out and build something with it. Following this approach, to understand building an data processing pipeline we will build our own. Based on data from open data sources such as &lt;a href="https://archive.ics.uci.edu/ml/datasets.php" rel="noopener noreferrer"&gt;uci&lt;/a&gt; and &lt;a href="https://www.kaggle.com/" rel="noopener noreferrer"&gt;kaggle&lt;/a&gt; that have been modified a little to enable joins.&lt;/p&gt;

&lt;h2&gt;
  
  
  Project overview
&lt;/h2&gt;

&lt;p&gt;For our project we will assume we work for a user behavior analytics company that collects user data from different data sources and joins them together to get a broader understanding of the customer. For this project we will consider 2 sources,&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;code&gt;Purchase data&lt;/code&gt; from an OLTP database&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Movie review data&lt;/code&gt; from a 3rd party data vendor (we simulate this by using a file and assuming its from a data vendor)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The goal is to provide a joined dataset of the 2 datasets above, in our analytics (OLAP) database every day, to be used by analysts, dashboard software, etc.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fod37tdnas65a3mhmau9l.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fod37tdnas65a3mhmau9l.png" alt="Data Flow"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Engineering Design
&lt;/h2&gt;

&lt;p&gt;We use a standard load-stage-clean pattern for storing out data. We will also design our ETL with &lt;code&gt;idemptonent&lt;/code&gt; functions, for cleaner reruns and backfills.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fygtln2qmf5y3jcl48h7a.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fygtln2qmf5y3jcl48h7a.png" alt="Engineering Design"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;NOTE&lt;/strong&gt;: we don't need Redshift or EMR in our data pipeline, since the data is very small in our example. But we use it to "simulate" a real big data scenario.&lt;/p&gt;

&lt;h3&gt;
  
  
  Airflow Primer:
&lt;/h3&gt;

&lt;p&gt;Airflow runs data pipelines using DAG's. Each DAG is made up of one or more tasks which can be stringed to gether to get the required data flow. Airflow also enables templating, where text surrounded by &lt;code&gt;{{ }}&lt;/code&gt; can be replaced by variables when the DAG is run. These variables can either be passed in by the user as params, or we can use inbuilt &lt;a href="https://airflow.apache.org/docs/stable/macros-ref" rel="noopener noreferrer"&gt;macros&lt;/a&gt; for commonly used variables. Airflow runs DAG's based on time ranges, so if you are running a DAG every day, then for the run happening today, the execution day of airflow will be the yesterday, because Airflow looks for data that was created in the previous time chunk(in our case yesterday).&lt;/p&gt;

&lt;h2&gt;
  
  
  Setup
&lt;/h2&gt;

&lt;p&gt;From the above engineering spec you can see that this is a fairly involved project, we will use the following tools&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;a href="https://docs.docker.com/get-docker/" rel="noopener noreferrer"&gt;docker&lt;/a&gt; (also make sure you have &lt;code&gt;docker-compose&lt;/code&gt;) we will use this to run Airflow locally&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/dbcli/pgcli" rel="noopener noreferrer"&gt;pgcli&lt;/a&gt; to connect to our databases(postgres and Redshift)&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://aws.amazon.com/" rel="noopener noreferrer"&gt;AWS account&lt;/a&gt; to set up required cloud services&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://www.dropbox.com/s/ql8wxqjjcv42065/aws-components-setup.pdf?dl=0" rel="noopener noreferrer"&gt;AWS Components&lt;/a&gt; to start the required services&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;By the end of the setup you should have(or know how to get)&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;code&gt;aws cli&lt;/code&gt; configured with keys and region&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;pem or ppk&lt;/code&gt; file saved locally with correct permissions&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;ARN&lt;/code&gt; from your &lt;code&gt;iam&lt;/code&gt; role for Redshift&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;S3&lt;/code&gt; bucket&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;EMR ID&lt;/code&gt; from the summary page&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Redshift&lt;/code&gt; host, port, database, username, password and have the appropriate &lt;code&gt;iam&lt;/code&gt; role associated with it for running &lt;code&gt;Spectrum&lt;/code&gt; queries.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;NOTE&lt;/strong&gt; We try to keep the cost very low, and it will be given that we are dealing with small data for our example, but it will still cost some money. Please switch on only while using and don't forget to switch it off after use.&lt;/p&gt;

&lt;h2&gt;
  
  
  Code and explanation
&lt;/h2&gt;

&lt;p&gt;For ease of implementation and testing, we will build our data pipeline in stages. There are 3 stages and these 3 stages shown below&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fflw3ennwka4wq3wvr06y.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fflw3ennwka4wq3wvr06y.png" alt="Engineering Design Parts"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In order to get started git clone this &lt;a href="https://github.com/josephmachado/beginner_de_project" rel="noopener noreferrer"&gt;git repo&lt;/a&gt; and work of the &lt;code&gt;starter&lt;/code&gt; branch(&lt;code&gt;git checkout starter&lt;/code&gt;). This is the airflow docker implementation forked from the popular &lt;a href="https://github.com/puckel/docker-airflow" rel="noopener noreferrer"&gt;puckle airflow&lt;/a&gt; docker image with some additional changes for our data pipeline.&lt;/p&gt;

&lt;p&gt;We will work on one stage(from the above diagram) at a time.&lt;/p&gt;

&lt;h3&gt;
  
  
  Stage 1. pg -&amp;gt; file -&amp;gt; s3
&lt;/h3&gt;

&lt;p&gt;cd to your airflow repository and start up the docker services using the compose file a shown below&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose &lt;span class="nt"&gt;-f&lt;/span&gt; docker-compose-LocalExecutor.yml up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This command starts the &lt;code&gt;airflow webserver&lt;/code&gt; and &lt;code&gt;postgres&lt;/code&gt; database for metadata. You can verify that the docker containers have started using &lt;code&gt;docker ps&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Download the data required for using the following &lt;br&gt;
&lt;a href="https://www.dropbox.com/sh/amdyc6z8744hrl5/AAC2Fnbzb_nLhdT2nGjL7-7ta?dl=0" rel="noopener noreferrer"&gt;link&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Place this in &lt;code&gt;/beginner_de_project/setup/&lt;/code&gt; location. &lt;br&gt;
Your project directory should look like&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fldh8f5xvrt06c53uqccl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fldh8f5xvrt06c53uqccl.png" alt="directory"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Since we are not dealing with a lot of data we can use or &lt;code&gt;Airflow metadata database&lt;/code&gt; as our "fake" datastore as well. Open a connection to &lt;code&gt;pg&lt;/code&gt; database using &lt;code&gt;pgcli&lt;/code&gt; as shown below&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pgcli &lt;span class="nt"&gt;-h&lt;/span&gt; localhost &lt;span class="nt"&gt;-p&lt;/span&gt; 5432 &lt;span class="nt"&gt;-U&lt;/span&gt; airflow
&lt;span class="c"&gt;# the password is also airflow&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's set up our fake datastore. In your &lt;code&gt;pgcli&lt;/code&gt; session run the following script&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SCHEMA&lt;/span&gt; &lt;span class="n"&gt;retail&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;retail&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_purchase&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;invoice_number&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;stock_code&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;detail&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;quantity&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;invoice_date&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;unit_price&lt;/span&gt; &lt;span class="nb"&gt;Numeric&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;customer_id&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;country&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;COPY&lt;/span&gt; &lt;span class="n"&gt;retail&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_purchase&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;invoice_number&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;stock_code&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;detail&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;invoice_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;unit_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;customer_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;country&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="s1"&gt;'/data/retail/OnlineRetail.csv'&lt;/span&gt; 
&lt;span class="k"&gt;DELIMITER&lt;/span&gt; &lt;span class="s1"&gt;','&lt;/span&gt;  &lt;span class="n"&gt;CSV&lt;/span&gt; &lt;span class="n"&gt;HEADER&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is also available in your repo at &lt;code&gt;setup/postgres/create_user_purchase.sql&lt;/code&gt;. This script creates the source table and loads in the data. Do a count(*) on the &lt;code&gt;user_purchase&lt;/code&gt; table, there should be &lt;code&gt;541908&lt;/code&gt; rows.&lt;/p&gt;

&lt;p&gt;Now we are ready to start writing our data pipeline. Let's create our first airflow dag in the &lt;code&gt;dags&lt;/code&gt; folder and call it &lt;code&gt;user_behaviour.py&lt;/code&gt;. In this script lets create a simple &lt;code&gt;Airflow DAG&lt;/code&gt; as shown below&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timedelta&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DAG&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators.dummy_operator&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DummyOperator&lt;/span&gt;

&lt;span class="n"&gt;default_args&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;owner&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;airflow&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;depends_on_past&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;wait_for_downstream&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;start_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2010&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;12&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="c1"&gt;# we start at this date to be consistent with the dataset we have and airflow will catchup
&lt;/span&gt;    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;airflow@airflow.com&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email_on_failure&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email_on_retry&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retry_delay&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;minutes&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;dag&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;user_behaviour&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;0 0 * * *&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;max_active_runs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;end_of_data_pipeline&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;DummyOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;end_of_data_pipeline&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;end_of_data_pipeline&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This &lt;code&gt;DAG&lt;/code&gt; basically does nothing but runs a dummy operator (which does nothing). If you go to &lt;a href="http://localhost:8080" rel="noopener noreferrer"&gt;http://localhost:8080&lt;/a&gt; you will be able to see the airflow UI up. You should be able to see something like shown below&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F9td2j83zefxd29kww02z.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F9td2j83zefxd29kww02z.png" alt="Airflow start"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now let's build up on what we already have and unload data from &lt;code&gt;pg&lt;/code&gt; into a local file. We have to unload data for the specified execution day and only if the quantity is greater than 2. Create a script called &lt;code&gt;filter_unload_user_purchase.sql&lt;/code&gt; in the &lt;code&gt;/dags/scripts/sql/&lt;/code&gt;  directory&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;COPY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="n"&gt;invoice_number&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;stock_code&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;detail&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;invoice_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;unit_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;customer_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;country&lt;/span&gt;
      &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;retail&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_purchase&lt;/span&gt;
     &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;quantity&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;
       &lt;span class="k"&gt;and&lt;/span&gt; &lt;span class="k"&gt;cast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;invoice_date&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'{{ ds }}'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;TO&lt;/span&gt; &lt;span class="s1"&gt;'{{ params.temp_filtered_user_purchase }}'&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;FORMAT&lt;/span&gt; &lt;span class="n"&gt;CSV&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;HEADER&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this templated SQL script we use &lt;code&gt;{{ ds }}&lt;/code&gt; which is one of airflow's inbuilt macros to get the execution date. &lt;code&gt;{{ params.temp_filtered_user_purchase }}&lt;/code&gt; is a parameter we have to set at the DAG. In the DAG we will use a &lt;code&gt;PostgresOperator&lt;/code&gt; to execute the  &lt;code&gt;filter_unload_user_purchase.sql&lt;/code&gt; sql script. Add the following snippet to your DAG at &lt;code&gt;user_behaviour.py&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# existing imports
&lt;/span&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators.postgres_operator&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;PostgresOperator&lt;/span&gt;

&lt;span class="c1"&gt;# config
# local
&lt;/span&gt;&lt;span class="n"&gt;unload_user_purchase&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;./scripts/sql/filter_unload_user_purchase.sql&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;temp_filtered_user_purchase&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/temp/temp_filtered_user_purchase.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="c1"&gt;# existing code
&lt;/span&gt;
&lt;span class="n"&gt;pg_unload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PostgresOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;pg_unload&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;unload_user_purchase&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;postgres_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;postgres_default&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;temp_filtered_user_purchase&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;temp_filtered_user_purchase&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;depends_on_past&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;wait_for_downstream&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;pg_unload&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;end_of_data_pipeline&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Your diff should be similar to one as shown below&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight diff"&gt;&lt;code&gt;&lt;span class="p"&gt;from datetime import datetime, timedelta
&lt;/span&gt;&lt;span class="err"&gt;
&lt;/span&gt;&lt;span class="p"&gt;from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
&lt;/span&gt;&lt;span class="gi"&gt;++from airflow.operators.postgres_operator import PostgresOperator
&lt;/span&gt;&lt;span class="err"&gt;

&lt;/span&gt;&lt;span class="gi"&gt;++# config
++# local
++unload_user_purchase ='./scripts/sql/filter_unload_user_purchase.sql'
++temp_filtered_user_purchase = '/temp/temp_filtered_user_purchase.csv'
&lt;/span&gt;&lt;span class="err"&gt;
&lt;/span&gt;&lt;span class="p"&gt;default_args = {
&lt;/span&gt;    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2010, 12, 1),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5)
}
&lt;span class="err"&gt;
&lt;/span&gt;&lt;span class="p"&gt;dag = DAG("user_behaviour", default_args=default_args,
&lt;/span&gt;          schedule_interval="0 0 * * *")
&lt;span class="err"&gt;
&lt;/span&gt;&lt;span class="p"&gt;end_of_data_pipeline = DummyOperator(task_id='end_of_data_pipeline', dag=dag)
&lt;/span&gt;&lt;span class="err"&gt;
&lt;/span&gt;&lt;span class="gi"&gt;++pg_unload = PostgresOperator(
++    dag=dag,
++    task_id='pg_unload',
++    sql=unload_user_purchase,
++    postgres_conn_id='postgres_default',
++    params={'temp_filtered_user_purchase': temp_filtered_user_purchase},
++  depends_on_past=True,
++ wait_for_downstream=True
++)
&lt;/span&gt;&lt;span class="err"&gt;
&lt;/span&gt;&lt;span class="gd"&gt;-- end_of_data_pipeline
&lt;/span&gt;&lt;span class="gi"&gt;++pg_unload &amp;gt;&amp;gt; end_of_data_pipeline
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Ideally you will have your configs in a different file or set them as docker env variables, but due to this being a simple example we keep them with the &lt;code&gt;DAG&lt;/code&gt; script.&lt;/p&gt;

&lt;p&gt;You can verify that your code is working by going to the airflow UI at localhost:8080 and clicking on the &lt;code&gt;dag&lt;/code&gt; and task and render as shown below&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fowyz33aj5g7mr2leyigy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fowyz33aj5g7mr2leyigy.png" alt="Airflow SQL"&gt;&lt;/a&gt;&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fijhmt15tynmec0x3ko1j.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fijhmt15tynmec0x3ko1j.png" alt="Airflow SQL"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You will see that the &lt;code&gt;{{ }}&lt;/code&gt; template in your SQL script will have been replaced by parameters set in the DAG at &lt;code&gt;user_behaviour.py&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;For the next task lets upload this file to our S3 bucket.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# existing imports
&lt;/span&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.hooks.S3_hook&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;S3Hook&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;PythonOperator&lt;/span&gt;

&lt;span class="c1"&gt;# existing config
# remote config
&lt;/span&gt;&lt;span class="n"&gt;BUCKET_NAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;&amp;lt;your-bucket-name&amp;gt;&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;temp_filtered_user_purchase_key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;user_purchase/stage/{{ ds }}/temp_filtered_user_purchase.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

&lt;span class="c1"&gt;# helper function(s)
&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;_local_to_s3&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filename&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;BUCKET_NAME&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;s3&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;S3Hook&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;load_file&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filename&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;filename&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                 &lt;span class="n"&gt;replace&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# existing code
&lt;/span&gt;&lt;span class="n"&gt;user_purchase_to_s3_stage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;user_purchase_to_s3_stage&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;_local_to_s3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;op_kwargs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;filename&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;temp_filtered_user_purchase&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;temp_filtered_user_purchase_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;pg_unload&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;user_purchase_to_s3_stage&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;end_of_data_pipeline&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the above snippet we have introduced 2 new concepts the &lt;code&gt;S3Hook&lt;/code&gt; and &lt;code&gt;PythonOperator&lt;/code&gt;. The hook is a mechanism used by airflow to establish connections to other systems(S3 in our case), we wrap the creation of an &lt;code&gt;S3Hook&lt;/code&gt; and moving a file from our local filesystem to S3 using a python function called &lt;code&gt;_local_to_s3&lt;/code&gt; and call it using the &lt;code&gt;PythonOperator&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Once the data gets uploaded to S3 we should remove it from our local file system to prevent wasting disk space on stale data. Let's add another task to our &lt;code&gt;DAG&lt;/code&gt; to remove this temp file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="c1"&gt;# existing imports
&lt;/span&gt;
&lt;span class="c1"&gt;# existing helper function(s)
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;remove_local_file&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filelocation&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;isfile&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filelocation&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;remove&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filelocation&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;File &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;filelocation&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; not found&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# existing code
&lt;/span&gt;&lt;span class="n"&gt;remove_local_user_purchase_file&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;remove_local_user_purchase_file&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;remove_local_file&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;op_kwargs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;filelocation&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;temp_filtered_user_purchase&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;pg_unload&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;user_purchase_to_s3_stage&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;remove_local_user_purchase_file&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;end_of_data_pipeline&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now that we have completed the first stage(&lt;code&gt;Stage 1. pg -&amp;gt; file -&amp;gt; s3&lt;/code&gt;), let's do a test run. Go to the Airflow UI at &lt;code&gt;http://localhost:8080&lt;/code&gt; and switch on the DAG. Your DAG will start running and catching up, you should see your DAG running.&lt;/p&gt;

&lt;p&gt;You can always reset the local airflow instance by running the following commands&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose &lt;span class="nt"&gt;-f&lt;/span&gt; docker-compose-LocalExecutor.yml down
docker-compose &lt;span class="nt"&gt;-f&lt;/span&gt; docker-compose-LocalExecutor.yml up &lt;span class="nt"&gt;-d&lt;/span&gt;
pgcli &lt;span class="nt"&gt;-h&lt;/span&gt; localhost &lt;span class="nt"&gt;-p&lt;/span&gt; 5432 &lt;span class="nt"&gt;-U&lt;/span&gt; airflow
&lt;span class="c"&gt;# and rerunning the query at setup/postgres/create_user_purchase.sql to reload data into pg&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Stage 2. file -&amp;gt; s3 -&amp;gt; EMR -&amp;gt; s3
&lt;/h3&gt;

&lt;p&gt;In this stage we assume we are getting a movie review data feed from a data vendor. usually the data vendor drops data in S3 or some SFTP server, but in our example let's assume the data is available at &lt;code&gt;setup/raw_input_data/movie_review/movie_review.csv&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Moving the &lt;code&gt;movie_review.csv&lt;/code&gt; file to S3 is similar to the tasks we did in stage 1&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;#config
# local
# existing local config
&lt;/span&gt;&lt;span class="n"&gt;movie_review_local&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/data/movie_review/movie_review.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt; &lt;span class="c1"&gt;# location of movie review withing docker see docker-compose volume mount
&lt;/span&gt;
&lt;span class="c1"&gt;# existing remote config
&lt;/span&gt;&lt;span class="n"&gt;movie_review_load&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;movie_review/load/movie.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="c1"&gt;# existing code
&lt;/span&gt;

&lt;span class="n"&gt;movie_review_to_s3_stage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;movie_review_to_s3_stage&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;_local_to_s3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;op_kwargs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;filename&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;movie_review_local&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;movie_review_load&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# existing data pipeline
&lt;/span&gt;&lt;span class="n"&gt;movie_review_to_s3_stage&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It's similar to the previous task,but not directly dependent on any other task.&lt;/p&gt;

&lt;p&gt;In EMR we have a feature called steps which can be used to run commands on the EMR cluster one at at time, we will use these steps to&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Pull data from &lt;code&gt;movie_review_load&lt;/code&gt; S3 location to EMR clusters HDFS location.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Perform text data cleaning and naive text classification using a pyspark script and write the output to HDFS in the EMR cluster.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Push data from HDFS to a staging S3 location.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;we can define the EMR steps as a json file, create a file &lt;code&gt;beginner_de_project/dags/scripts/emr/clean_movie_review.json&lt;/code&gt;. Its content should be as follows&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Move raw data from S3 to HDFS"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"ActionOnFailure"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"CANCEL_AND_WAIT"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"HadoopJarStep"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"Jar"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"command-runner.jar"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"Args"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"s3-dist-cp"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"--src=s3://{{ params.BUCKET_NAME }}/{{ params.movie_review_load }}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"--dest=/movie"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Classify movie reviews"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"ActionOnFailure"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"CANCEL_AND_WAIT"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"HadoopJarStep"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"Jar"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"command-runner.jar"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"Args"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"spark-submit"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"s3://{{ params.BUCKET_NAME  }}/scripts/random_text_classification.py"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Move raw data from S3 to HDFS"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"ActionOnFailure"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"CANCEL_AND_WAIT"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"HadoopJarStep"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"Jar"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"command-runner.jar"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"Args"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"s3-dist-cp"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"--src=/output"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"--dest=s3://{{ params.BUCKET_NAME }}/{{ params.movie_review_stage }}"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The first step uses &lt;a href="https://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html" rel="noopener noreferrer"&gt;s3-dist-cp&lt;/a&gt; is a distributed copy tool to copy data from S3 to EMR's HDFS. The second step runs a &lt;code&gt;pyspark&lt;/code&gt; script called &lt;code&gt;random_text_classification.py&lt;/code&gt; we will see what it is and how it gets moved to that S3 location and finally we move the &lt;code&gt;output&lt;/code&gt; to a stage location. The templated values will be filled in by the values provided to the DAG at run time.&lt;/p&gt;

&lt;p&gt;Create a python file at &lt;code&gt;beginner_de_project/dags/scripts/spark/random_text_classification.py&lt;/code&gt; with the following content&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# pyspark
&lt;/span&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;argparse&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.ml.feature&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Tokenizer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;StopWordsRemover&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;array_contains&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;random_text_classifier&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_loc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;output_loc&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    This is a dummy function to show how to use spark, It is supposed to mock
    the following steps
        1. clean input data
        2. use a pre-trained model to make prediction 
        3. write predictions to a HDFS output

    Since this is meant as an example, we are going to skip building a model,
    instead we are naively going to mark reviews having the text &lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;good&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt; as positive and
    the rest as negative 
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="c1"&gt;# read input
&lt;/span&gt;    &lt;span class="n"&gt;df_raw&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;header&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_loc&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# perform text cleaning
&lt;/span&gt;
    &lt;span class="c1"&gt;# Tokenize text
&lt;/span&gt;    &lt;span class="n"&gt;tokenizer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Tokenizer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;review_str&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;outputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;review_token&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_tokens&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tokenizer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;transform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df_raw&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;cid&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;review_token&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Remove stop words
&lt;/span&gt;    &lt;span class="n"&gt;remover&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;StopWordsRemover&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;inputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;review_token&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;outputCol&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;review_clean&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df_clean&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;remover&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;transform&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;df_tokens&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;cid&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;review_clean&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# function to check presence of good and naively assume its a positive review
&lt;/span&gt;    &lt;span class="n"&gt;df_out&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df_clean&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;select&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;cid&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;array_contains&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;df_clean&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;review_clean&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;good&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;alias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;positive_review&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="n"&gt;df_out&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;mode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;overwrite&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;parquet&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;output_loc&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;argparse&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ArgumentParser&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--input&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;HDFS input&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;default&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/movie&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--output&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;type&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;HDFS output&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;default&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/output&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;args&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parse_args&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Random Text Classifier&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="nf"&gt;random_text_classifier&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input_loc&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;input&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;output_loc&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;output&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It's a simple spark script to clean text data (tokenize and remove stop words) and use a naive classification heuristic to classify if a review is positive or not. Since this tutorial is on how to build a data pipeline we don't want to spend a lot of time training and validating the model. Note that in the second EMR step we are reading the script from your S3 bucket so we also have to move the &lt;code&gt;pyspark&lt;/code&gt; script to a S3 location.&lt;br&gt;
Add the following content to your &lt;code&gt;DAG&lt;/code&gt; at &lt;code&gt;user_behaviour.py&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.contrib.operators.emr_add_steps_operator&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;EmrAddStepsOperator&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.contrib.sensors.emr_step_sensor&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;EmrStepSensor&lt;/span&gt;

&lt;span class="n"&gt;movie_clean_emr_steps&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;./dags/scripts/emr/clean_movie_review.json&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;movie_text_classification_script&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;./dags/scripts/spark/random_text_classification.py&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

&lt;span class="n"&gt;EMR_ID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;&amp;lt;your-emr-id&amp;gt;&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;movie_review_load_folder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;movie_review/load/&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;movie_review_stage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;movie_review/stage/&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;text_classifier_script&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;scripts/random_text_classifier.py&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

&lt;span class="n"&gt;move_emr_script_to_s3&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;move_emr_script_to_s3&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;_local_to_s3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;op_kwargs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;filename&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;movie_text_classification_script&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;scripts/random_text_classification.py&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nf"&gt;open&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;movie_clean_emr_steps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;json_file&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;emr_steps&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;load&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json_file&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# adding our EMR steps to an existing EMR cluster
&lt;/span&gt;&lt;span class="n"&gt;add_emr_steps&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;EmrAddStepsOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;add_emr_steps&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;job_flow_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;EMR_ID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;aws_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;aws_default&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;steps&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;emr_steps&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;BUCKET_NAME&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;BUCKET_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;movie_review_load&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;movie_review_load_folder&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;text_classifier_script&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;text_classifier_script&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;movie_review_stage&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;movie_review_stage&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="n"&gt;depends_on_past&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;last_step&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;emr_steps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;

&lt;span class="c1"&gt;# sensing if the last step is complete
&lt;/span&gt;&lt;span class="n"&gt;clean_movie_review_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;EmrStepSensor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;clean_movie_review_data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;job_flow_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;EMR_ID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;step_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;{{ task_instance.xcom_pull(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;add_emr_steps&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;, key=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;return_value&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;)[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;last_step&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;] }}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;depends_on_past&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;movie_review_to_s3_stage&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;move_emr_script_to_s3&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;move_emr_script_to_s3&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;add_emr_steps&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;clean_movie_review_data&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's understand what is going on the above snippet.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;We move the &lt;code&gt;pyspark&lt;/code&gt; script from our local filesystem to S3 using the &lt;code&gt;move_emr_script_to_s3&lt;/code&gt; but we are doing this in parallel with &lt;code&gt;movie_review_to_s3_stage&lt;/code&gt; task since they are independent and can be parallelized.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The next task &lt;code&gt;add_emr_steps&lt;/code&gt; is to add EMR steps from the &lt;code&gt;json&lt;/code&gt; file to our running EMR cluster. When the steps get added to EMR it automatically starts executing.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The next task is an EMR step sensor which basically checks if a given step out of a list of steps is complete. (we specify the last step by getting the last index of the steps array)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Notice we have also added in the EMR ID from our EMR cluster page, this denotes the EMR cluster we are going to use.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Notice the parameterized task instance and &lt;code&gt;xcom&lt;/code&gt;. The task instance will contain all the metadata for the specific &lt;code&gt;DAG&lt;/code&gt; run. &lt;code&gt;XCOM&lt;/code&gt; is a way to pass data like a simple variable among different tasks. The &lt;code&gt;add_emr_steps&lt;/code&gt; automatically adds the list of steps to the DAG's task instance which is used by &lt;code&gt;clean_movie_review_data&lt;/code&gt; step sensor to identify and monitor the last step.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Now that we have the second stage complete we can test our DAG. Note that at this point you can comment out the tasks in stage 1 and just run stage 2 for testing. It should complete successfully.&lt;/p&gt;

&lt;p&gt;You can restart the docker using&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose &lt;span class="nt"&gt;-f&lt;/span&gt; docker-compose-LocalExecutor.yml down
docker-compose &lt;span class="nt"&gt;-f&lt;/span&gt; docker-compose-LocalExecutor.yml up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Do not forget to reload data into local &lt;code&gt;pg&lt;/code&gt; after restarting using &lt;code&gt;setup/postgres/create_user_purchase.sql&lt;/code&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Stage 3. movie_review_stage, user_purchase_stage -&amp;gt; Redshift table -&amp;gt; quality Check data
&lt;/h3&gt;

&lt;p&gt;This stage involves doing joins in your &lt;code&gt;Redshift Cluster&lt;/code&gt;. You should have your redshift &lt;code&gt;host&lt;/code&gt;, &lt;code&gt;database&lt;/code&gt;, &lt;code&gt;username&lt;/code&gt; and &lt;code&gt;password&lt;/code&gt; from when you set up Redshift. In your Redshift cluster you need to set up the staging tables and our final table, you can do this using the sql script at &lt;code&gt;/setup/redshift/create_external_schema.sql&lt;/code&gt; in your repo, replacing the &lt;code&gt;iam-ARN&lt;/code&gt; and &lt;code&gt;s3-bucket&lt;/code&gt; with your specific ARN and bucket name. You can run this by connecting to your redshift instance using&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pgcli &lt;span class="nt"&gt;-h&lt;/span&gt; &amp;lt;your-redshift-cluster&amp;gt; &lt;span class="nt"&gt;-U&lt;/span&gt; &amp;lt;your-user&amp;gt; &lt;span class="nt"&gt;-p&lt;/span&gt; 5439 &lt;span class="nt"&gt;-d&lt;/span&gt; &amp;lt;your-database&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's understand the &lt;code&gt;create_external_schema.sql&lt;/code&gt; script.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- create_external_schema.sql&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;external&lt;/span&gt; &lt;span class="k"&gt;schema&lt;/span&gt; &lt;span class="n"&gt;spectrum&lt;/span&gt; 
&lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="k"&gt;data&lt;/span&gt; &lt;span class="k"&gt;catalog&lt;/span&gt; 
&lt;span class="k"&gt;database&lt;/span&gt; &lt;span class="s1"&gt;'spectrumdb'&lt;/span&gt; 
&lt;span class="n"&gt;iam_role&lt;/span&gt; &lt;span class="s1"&gt;'&amp;lt;your-iam-role-ARN&amp;gt;'&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;external&lt;/span&gt; &lt;span class="k"&gt;database&lt;/span&gt; &lt;span class="n"&gt;if&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;exists&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- user purchase staging table with an insert_date partition&lt;/span&gt;
&lt;span class="k"&gt;drop&lt;/span&gt; &lt;span class="k"&gt;table&lt;/span&gt; &lt;span class="n"&gt;if&lt;/span&gt; &lt;span class="k"&gt;exists&lt;/span&gt; &lt;span class="n"&gt;spectrum&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_purchase_staging&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;create&lt;/span&gt; &lt;span class="k"&gt;external&lt;/span&gt; &lt;span class="k"&gt;table&lt;/span&gt; &lt;span class="n"&gt;spectrum&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_purchase_staging&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;InvoiceNo&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;StockCode&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;detail&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;Quantity&lt;/span&gt; &lt;span class="nb"&gt;integer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;InvoiceDate&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;UnitPrice&lt;/span&gt; &lt;span class="nb"&gt;decimal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;customerid&lt;/span&gt; &lt;span class="nb"&gt;integer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;Country&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;partitioned&lt;/span&gt; &lt;span class="k"&gt;by&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;insert_date&lt;/span&gt; &lt;span class="nb"&gt;date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;row&lt;/span&gt; &lt;span class="n"&gt;format&lt;/span&gt; &lt;span class="n"&gt;delimited&lt;/span&gt; &lt;span class="n"&gt;fields&lt;/span&gt; &lt;span class="n"&gt;terminated&lt;/span&gt; &lt;span class="k"&gt;by&lt;/span&gt; &lt;span class="s1"&gt;','&lt;/span&gt;
&lt;span class="n"&gt;stored&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;textfile&lt;/span&gt;
&lt;span class="k"&gt;location&lt;/span&gt; &lt;span class="s1"&gt;'s3://&amp;lt;your-s3-bucket&amp;gt;/user_purchase/stage/'&lt;/span&gt;
&lt;span class="k"&gt;table&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'skip.header.line.count'&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'1'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- movie review staging table&lt;/span&gt;
&lt;span class="k"&gt;drop&lt;/span&gt; &lt;span class="k"&gt;table&lt;/span&gt; &lt;span class="n"&gt;if&lt;/span&gt; &lt;span class="k"&gt;exists&lt;/span&gt; &lt;span class="n"&gt;spectrum&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;movie_review_clean_stage&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;EXTERNAL&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;spectrum&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;movie_review_clean_stage&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
   &lt;span class="n"&gt;cid&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
   &lt;span class="n"&gt;positive_review&lt;/span&gt; &lt;span class="nb"&gt;boolean&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;STORED&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;PARQUET&lt;/span&gt;
&lt;span class="k"&gt;LOCATION&lt;/span&gt; &lt;span class="s1"&gt;'s3://&amp;lt;your-s3-bucket&amp;gt;/movie_review/stage/'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- user behaviour metric tabls&lt;/span&gt;
&lt;span class="k"&gt;DROP&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="k"&gt;public&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_behavior_metric&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="k"&gt;public&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_behavior_metric&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;customerid&lt;/span&gt; &lt;span class="nb"&gt;integer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;amount_spent&lt;/span&gt; &lt;span class="nb"&gt;decimal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;18&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;review_score&lt;/span&gt; &lt;span class="nb"&gt;integer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;review_count&lt;/span&gt; &lt;span class="nb"&gt;integer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;insert_date&lt;/span&gt; &lt;span class="nb"&gt;date&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the above script there are 4 main steps&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Create your spectrum external schema, if you are unfamiliar with the &lt;code&gt;external&lt;/code&gt; part, it is basically a mechanism where the data is stored outside of the database(in our case in S3) and the data schema details are stored in something called a &lt;code&gt;data catalog&lt;/code&gt;(in our case AWS glue). When the query is run, the database executor talks to the data catalog to get information about the location and schema of the queried table and processes the data. The advantage here is separation of storage(cheaper than storing directly in database) and processing(we can scale as required) of data. This is called &lt;code&gt;Spectrum&lt;/code&gt; within &lt;code&gt;Redshift&lt;/code&gt;, we have to create an external database to enable this functionality.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Creating an external &lt;code&gt;user_purchase_staging&lt;/code&gt; table, note here we are partitioning by &lt;code&gt;insert_date&lt;/code&gt;, this means the data is stored at &lt;code&gt;s3://&amp;lt;your-s3-bucket&amp;gt;/user_purchase/stage/yyyy-mm-dd&lt;/code&gt;, partitioning is a technique to reduce the data that needs to be scanned by the query engine to get the requested data. The partition column(s) should depend on the query pattern that the data is going to get. But rule of thumb, date is generally a good partition column, especially our Airflow works off date ranges. Note here that once we add a partition we need to alter the &lt;code&gt;user_purchase_staging&lt;/code&gt; to be made aware of that.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Creating an external &lt;code&gt;movie_review_clean_stage&lt;/code&gt; table to store the data which was cleaned by EMR. Note here we use a term &lt;code&gt;STORED AS PARQUET&lt;/code&gt; this means that data is stored in parquet format. &lt;a href="https://parquet.apache.org/" rel="noopener noreferrer"&gt;Parquet&lt;/a&gt; is a column storage format for efficient compression. We wrote out the data as parquet in our spark script. Note here that we can just drop the correct data at &lt;code&gt;s3://&amp;lt;your-s3-bucket&amp;gt;/movie_review/stage/&lt;/code&gt; and it will automatically be ready for queries.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Create a table &lt;code&gt;user_behavior_metric&lt;/code&gt; which is our final goal.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;We have the movie review and user purchase data cleaned and ready in the staging S3 location. We need to enable airflow to connect to our redshift database. To do this we go to &lt;code&gt;Airflow UI -&amp;gt; Admin -&amp;gt; Connections&lt;/code&gt; and click on the &lt;code&gt;Create&lt;/code&gt; tab.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fef24chitwi6t4tkwcmzd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fef24chitwi6t4tkwcmzd.png" alt="Airflow Connection"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And create a postgres type connection with the name redshift, using your redshift credentials. These define how your airflow instance will connect to your redshift cluster.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fnyfqf0zcjj6ixxicg6b6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fnyfqf0zcjj6ixxicg6b6.png" alt="Airflow Connection"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Once we have the connection established, we need to let the &lt;code&gt;user_purchase_staging&lt;/code&gt; table know that a new partition has been added. We can do that on our DAG as shown below.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# existing imports
&lt;/span&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.hooks.postgres_hook&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;PostgresHook&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;psycopg2&lt;/span&gt;

&lt;span class="c1"&gt;# existing helper function(s)
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run_redshift_external_query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;qry&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;rs_hook&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PostgresHook&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;postgres_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;redshift&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;rs_conn&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;rs_hook&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_conn&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;rs_conn&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;set_isolation_level&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;psycopg2&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;extensions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ISOLATION_LEVEL_AUTOCOMMIT&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;rs_cursor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;rs_conn&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;rs_cursor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;qry&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;rs_cursor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;rs_conn&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;commit&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# existing tasks
&lt;/span&gt;&lt;span class="n"&gt;user_purchase_to_rs_stage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;user_purchase_to_rs_stage&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;run_redshift_external_query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;op_kwargs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;qry&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;alter table spectrum.user_purchase_staging add partition(insert_date=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;{{ ds }}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;) &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;            location &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3://&amp;lt;your-bucket&amp;gt;/user_purchase/stage/{{ ds }}&lt;/span&gt;&lt;span class="sh"&gt;'"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# existing task dependency def
&lt;/span&gt;&lt;span class="n"&gt;remove_local_user_purchase_file&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;user_purchase_to_rs_stage&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is a tricky one. If we are using normal redshift we can use the &lt;code&gt;PostgresOperator&lt;/code&gt; to execute queries, since Redshift is based on Postgres. But because we are working with &lt;a href="https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_EXTERNAL_TABLE.html" rel="noopener noreferrer"&gt;external tables&lt;/a&gt; we cannot use it, instead we have to open a postgres connection using &lt;code&gt;PostgresHook&lt;/code&gt; and we have to set the appropriate isolation level. You can create your own operator to handle this, but since this is an example we are not creating a separate operator. The query passed as &lt;code&gt;qry&lt;/code&gt; to the python function is an alter statement letting the &lt;code&gt;user_purchase_staging&lt;/code&gt; table know that a new partition has been added at &lt;code&gt;s3://&amp;lt;your-bucket&amp;gt;/user_purchase/stage/yyyy-mm-dd&lt;/code&gt;. The final task is to load data into our &lt;code&gt;user_behavior_metric&lt;/code&gt; table. Let's write a templated query to do this at &lt;code&gt;beginner_de_project/dags/scripts/sql/get_user_behavior_metrics.sql&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="k"&gt;public&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_behavior_metric&lt;/span&gt; 
&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;customerid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
&lt;span class="n"&gt;amount_spent&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
&lt;span class="n"&gt;review_score&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
&lt;span class="n"&gt;review_count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
&lt;span class="n"&gt;insert_date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 

&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;ups&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;customerid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
&lt;span class="k"&gt;cast&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt; &lt;span class="n"&gt;ups&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Quantity&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;ups&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UnitPrice&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;decimal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;18&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;amount_spent&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
&lt;span class="k"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mrcs&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;positive_review&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;review_score&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;count&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mrcs&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;cid&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;review_count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="s1"&gt;'{{ ds }}'&lt;/span&gt; 
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;spectrum&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_purchase_staging&lt;/span&gt; &lt;span class="n"&gt;ups&lt;/span&gt;  
&lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="n"&gt;cid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="k"&gt;when&lt;/span&gt; &lt;span class="n"&gt;positive_review&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="k"&gt;True&lt;/span&gt; &lt;span class="k"&gt;then&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;end&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;positive_review&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;spectrum&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;movie_review_clean_stage&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;mrcs&lt;/span&gt;  
&lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;ups&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;customerid&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;mrcs&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;cid&lt;/span&gt; 
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;ups&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;insert_date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'{{ ds }}'&lt;/span&gt; 
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;ups&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;customerid&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We are getting customer level metrics and loading them into a &lt;code&gt;user_behavior_metric&lt;/code&gt; table. Add this as a task to our DAG as shown below&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# existing config
&lt;/span&gt;&lt;span class="n"&gt;get_user_behaviour&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;scripts/sql/get_user_behavior_metrics.sql&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

&lt;span class="c1"&gt;# existing tasks
&lt;/span&gt;&lt;span class="n"&gt;get_user_behaviour&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PostgresOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;get_user_behaviour&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;get_user_behaviour&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;postgres_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;redshift&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;pg_unload&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;user_purchase_to_s3_stage&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;remove_local_user_purchase_file&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;user_purchase_to_rs_stage&lt;/span&gt;
&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;movie_review_to_s3_stage&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;move_emr_script_to_s3&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;add_emr_steps&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;clean_movie_review_data&lt;/span&gt;
&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;user_purchase_to_rs_stage&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;clean_movie_review_data&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;get_user_behaviour&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;end_of_data_pipeline&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Your airflow &lt;code&gt;DAG&lt;/code&gt; should look like this&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F9urdo7umes7m3gvl0vei.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F9urdo7umes7m3gvl0vei.png" alt="Data Pipeline"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Congratulations you have your data pipeline setup. Verify that it completes successfully from the Airflow UI and using &lt;code&gt;pgcli&lt;/code&gt; connect to your redshift instance and run the query&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt;  &lt;span class="k"&gt;public&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_behavior_metric&lt;/span&gt; &lt;span class="k"&gt;limit&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;you should see the data with &lt;code&gt;insert_date&lt;/code&gt; &lt;code&gt;2010-12-01&lt;/code&gt; for the first run.&lt;/p&gt;

&lt;h2&gt;
  
  
  Monitoring ETL
&lt;/h2&gt;

&lt;p&gt;Before you start running the DAG it would be good to understand how to monitor them as they are long running processes consuming a lot of memory. &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Airflow UI, in the airflow UI you can monitor the status, logs, task details(note that some of these are only visible after the DAG is started)&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fj9ipzvwmxz6apmurc6qf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fj9ipzvwmxz6apmurc6qf.png" alt="Airflow UI"&gt;&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Since we are running a Spark application on an EMR cluster we can monitor the Spark job status using the Spark UI that you can find on your emr page.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fnuucufvi00tvho3ik7no.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fnuucufvi00tvho3ik7no.png" alt="EMR Monitor"&gt;&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The Spark UI provides spark task level details&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fs8junhz09j36kdlxrhxw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fs8junhz09j36kdlxrhxw.png" alt="Spark UI"&gt;&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Redshift has its own query monitoring capabilities &lt;a href="https://docs.aws.amazon.com/redshift/latest/dg/cm-c-wlm-query-monitoring-rules.html" rel="noopener noreferrer"&gt;ref&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Design Review
&lt;/h2&gt;

&lt;p&gt;In our data pipeline there are some obvious issues let's review them&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Flvibe3vnviioccggq0is.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Flvibe3vnviioccggq0is.png" alt="Design Review"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;We are pulling data from an OLTP database, this is usually a bad idea since OLTP databases are meant to be used for app level transactions. And depending on the size of this query it can significantly slow down the other queries on this OLTP database. In cases like these we should use some sort of trickle down approach such as using &lt;a href="https://www.startdataengineering.com/post/change-data-capture-using-debezium-kafka-and-pg" rel="noopener noreferrer"&gt;Debezium&lt;/a&gt;. And also, If we want to run multiple DAGs in parallel we cannot since the temp file location is not name spaced by date. i.e we can't simultaneously run the &lt;code&gt;remove_local_user_purchase_file&lt;/code&gt; for a DAG run and &lt;code&gt;pg_unload&lt;/code&gt; for its next DAG run, since they might run into a race condition when writing and reading the same named file. This can be prevented by saving the file in a location such as &lt;code&gt;../yyyy-mm-dd-hh-mm/temp_file.csv&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Here we are overwriting the same data in load and stage areas. This means that our movie review stage table only contains the latest days data. This is extremely dangerous. If we want to make a backfill for specific dates(for whatever business reasons), it would be very difficult since we will have to go back to vendor to get that data. If we have stored the data in folders partitioned by time(similar to the way we store &lt;code&gt;user_purchase_staging&lt;/code&gt;) it would have been a simple backfill in Airflow.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Common Scenarios
&lt;/h2&gt;

&lt;p&gt;When running a batch data pipeline on Airflow you face some common scenarios they are&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;backfill, its when the company wants to make a historical change to the way data is processed, e.g they might say on May 24th 2020, that they want all data from July 1st, 2019 to be filter to have &lt;code&gt;quantity &amp;gt; 5&lt;/code&gt; as opposed to the &lt;code&gt;&amp;gt; 2&lt;/code&gt; filter we had. In this scenario it is very beneficial to use a date based, mature system like airflow because it has inbuilt capabilities for this exact scenario. &lt;a href="https://airflow.apache.org/docs/stable/cli-ref#backfill" rel="noopener noreferrer"&gt;ref&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The &lt;code&gt;DAG&lt;/code&gt; is not getting started, this is a commonly from other engineers. It is mostly due to the &lt;code&gt;parallelism&lt;/code&gt; or &lt;code&gt;dag_concurrency&lt;/code&gt; or wrongly set &lt;a href="https://airflow.apache.org/docs/stable/concepts.html#pools" rel="noopener noreferrer"&gt;pool size&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Not designing idempotent and independent tasks. This will cause overwrites or lead to deleting crucial data. Similar to the issue of movie review we saw in the design review above.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Not reading &lt;a href="https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls" rel="noopener noreferrer"&gt;Common pitfalls&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Next Steps
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Create a data quality check task after the &lt;code&gt;get_user_behaviour&lt;/code&gt; task to check for data presence using count.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Understand what &lt;code&gt;wait_for_downstream&lt;/code&gt; and &lt;code&gt;depends_on_past&lt;/code&gt; options we set are.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Try to recreate this &lt;code&gt;DAG&lt;/code&gt;, but scheduled hourly instead of daily. What would need to change? what are the pros and cons of this?&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Understand Airflow is running on UTC time, and what this means for how you filter the user purchase data. Most companies store data at UTC and translate to local time at the application layer.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Try to make DAG fully parallel and run backfill where DAG's are run in parallel, research &lt;code&gt;max_active_runs&lt;/code&gt; parameter in your code.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If you data size increases by 10x, 100x, 1000x reason about if/how your data pipeline will handle the load, will it just take more time or straight up fail?&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Go over &lt;code&gt;docker-compose-*.yml&lt;/code&gt; files in the repo to understand the components involved in airflow setup, and the volume mounts we have.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If you have a new idea you would like to see, or report an issue then do a PR or create an issue at &lt;a href="https://github.com/josephmachado/beginner_de_project" rel="noopener noreferrer"&gt;https://github.com/josephmachado/beginner_de_project&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Initially the plan was to build a data pipeline with both batch and streaming pipelines. But that got too big for one blog post. So I decided to split them into 2 separate ones. The next post in this series will be a streaming data processing pipeline. Let me know in the comments section below if you would like to see anything specific.&lt;/p&gt;

&lt;p&gt;Hope this article gives you a good idea of the nature and complexity of batch data processing. Let me know if you have any questions or comments in the comment section below. Good Luck.&lt;/p&gt;

&lt;p&gt;This is a cross post from my blog at &lt;a href="https://www.startdataengineering.com/" rel="noopener noreferrer"&gt;https://www.startdataengineering.com/&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>tutorial</category>
      <category>beginners</category>
      <category>aws</category>
    </item>
    <item>
      <title>10 Key skills, to help you become a data engineer</title>
      <dc:creator>Joseph</dc:creator>
      <pubDate>Mon, 11 May 2020 11:18:58 +0000</pubDate>
      <link>https://dev.to/start_data_engineering/10-key-skills-to-help-you-become-a-data-engineer-4ekf</link>
      <guid>https://dev.to/start_data_engineering/10-key-skills-to-help-you-become-a-data-engineer-4ekf</guid>
      <description>&lt;h1&gt;
  
  
  Overview
&lt;/h1&gt;

&lt;p&gt;This article gives you an overview of the 10 key skills you need to become a better data engineer. I have seen many engineers ask &lt;code&gt;how to become a data engineer&lt;/code&gt; and what skills they need. This post is an answer to that question.&lt;/p&gt;

&lt;p&gt;If you are overwhelmed by everything you need to learn to become a data engineer, start with the first topic and proceed through the list. &lt;/p&gt;

&lt;h2&gt;
  
  
  1. Linux
&lt;/h2&gt;

&lt;p&gt;        Most applications are built on linux systems so it is crucial to understand how to work with them. The key concepts to know are &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;File system commands, such as ls, cd, pwd, mkdir, rmdir &lt;/li&gt;
&lt;li&gt;Commands to get metadata about your data, such as head, tail, wc, grep, ls -lh&lt;/li&gt;
&lt;li&gt;Data processing commands, such as awk, sed &lt;/li&gt;
&lt;li&gt;Bash scripting concepts, such as control flow, looping, passing input parameters&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  2. SQL
&lt;/h2&gt;

&lt;p&gt;        SQL is crucial to access your data weather it be for running analysis or for use by your application. The key concepts to know are &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Basic CRUD, such as select, where, join (all types of joins), group by, having, window functions&lt;/li&gt;
&lt;li&gt;SQL internals, such as &lt;a href="https://www.startdataengineering.com/post/what-does-it-mean-for-a-column-to-be-indexed"&gt;index&lt;/a&gt;: different types and how they work, transaction concepts such as locks and race conditions&lt;/li&gt;
&lt;li&gt;Data modeling, such as OLTP schemas like star and snowflake schemas, OLAP schemas like denormalization, key value store, facts and dimensions.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  3. Scripting
&lt;/h2&gt;

&lt;p&gt;        Knowledge of a scripting language such as bash scripting or python is very helpful to automate multiple steps required for processing data. The key concepts to know are&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Basic DS and concept, such as list, dictionaries, map, filter, reduce&lt;/li&gt;
&lt;li&gt;Control flow and looping concepts, such as if, for loop, list comprehension&lt;/li&gt;
&lt;li&gt;Popular data processing abstraction library such as pandas or Dask in Python&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  4. Distributed Data Storage
&lt;/h2&gt;

&lt;p&gt;        Knowledge of how distributed data store such as HDFS or AWS S3 works. Concepts like data replication, serialization, partitioned data storage, file chunking&lt;/p&gt;

&lt;h2&gt;
  
  
  5. Distributed Data processing
&lt;/h2&gt;

&lt;p&gt;        Knowledge of how data in processed in a distributed fashion. The key concepts to know are&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Distributed data processing concepts, such as Mapreduce, in memory data processing such as Apache Spark&lt;/li&gt;
&lt;li&gt;Different types of joins across data sets, such as map side and reduce side joins&lt;/li&gt;
&lt;li&gt;Common techniques and patterns for data processing such as, partitioning, reducing data shuffles, handling data skews on partitioning&lt;/li&gt;
&lt;li&gt;Optimizing data processing code to take advantage of all the cores and memory available in the cluster&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  6.Building data pipelines
&lt;/h2&gt;

&lt;p&gt;       Knowledge of how to connect different data systems to build a data pipeline. The key concepts to know are&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A data orchestration tool, such as airflow &lt;/li&gt;
&lt;li&gt;Common pitfalls and how to avoid them, such as data quality checks after processing&lt;/li&gt;
&lt;li&gt;Building idempotent data pipelines&lt;/li&gt;
&lt;li&gt;Common patterns for batch and &lt;a href="https://www.startdataengineering.com/post/review-building-a-real-time-data-warehouse"&gt;stream processing&lt;/a&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  7. OLAP database
&lt;/h2&gt;

&lt;p&gt;        Knowledge of how OLAP database operates and when to use them. The key concepts to know are&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;what is a column store and why it is better for most types of aggregation queries&lt;/li&gt;
&lt;li&gt;Data modeling concepts such as paritioning, fact and dimensions, data skew&lt;/li&gt;
&lt;li&gt;Figuring out client data query pattern and designing your database accordingly‍&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  8. Queuing systems
&lt;/h2&gt;

&lt;p&gt;        Knowledge of queuing systems and when and how to use them. The key concepts to know are&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;What is a data producer and a consumer&lt;/li&gt;
&lt;li&gt;Architecture of the system (e.g Kafka)&lt;/li&gt;
&lt;li&gt;Knowledge of at least once delivery, offsets and log compaction‍&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  9. Stream processing
&lt;/h2&gt;

&lt;p&gt;        Knowledge of what stream processing is and how to use them. The key concepts to know are&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;What is stream processing and how it is different from batch processing&lt;/li&gt;
&lt;li&gt;Different types of stream processing such as Event based processing and micro batching&lt;/li&gt;
&lt;li&gt;Common patterns of stream processing such as &lt;a href="https://www.startdataengineering.com/post/change-data-capture-using-debezium-kafka-and-pg"&gt;change data capture&lt;/a&gt;, etc&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  10. JVM language
&lt;/h2&gt;

&lt;p&gt;        Knowledge of a JVM based language such as Java or Scala will be extremely useful, since most open source data processing tools are written using JVM languages. e.g Apache Spark, Apache Flink, etc. And these JVM based languages are type safe and easy to program for parallel processing and have implementations of the Actor model which is used in fast and scalable processing.&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>beginners</category>
      <category>database</category>
      <category>etl</category>
    </item>
  </channel>
</rss>
