DEV Community

Cover image for AWS Managed Airflow for your complex workflows
Mohamed Fayaz for AWS Community Builders

Posted on

AWS Managed Airflow for your complex workflows

Airflow is a tool to orchestrate complex workflow which was created at Airbnb in 2014. Airflow gained significant traction across several organizations in recent days due to the ability to create complex data pipelines with ease. The Airflow user interface (UI) serves as an operational dashboard to schedule, monitor and control any scripts or applications.

Despite Airflow has been adopted by several organizations, deploying and managing the infrastructure has always been challenging and introduces operational overhead. To combat this problem many companies came up with the idea of introducing managed Airflow Infrastructure such as MWAA from Amazon Web Services (AWS). AWS takes the responsibility for keeping your infrastructure with almost no downtime and up-to-date security patches in place along with the latest version readily available to use.

Besides, the other key benefits of using AWS MWAA is its elasticity, the ability to scale up and down based on the workload, and the easiness of building and deploying production-grade secure infrastructure with seamless integration with other AWS Services.

Airflow Basics

Let's dive into some key concepts of Airflow :)

In Airflow, workflows are defined in a Python file which is also referred to as DAG. You can imagine a DAG as a single job that can have multiple tasks in general. There are three common parts in every DAG those are as follows:

  • DAG Initialization
  • Tasks
  • Tasks Dependencies

DAG (Directed Acyclic Graphs)

DAGs are written in Python and are often identified by their unique dag_id. During the initialization, we specify when to start, scheduled time and so forth. Here is a simple DAG below:

from airflow.models import DAG
from airflow.utils.dates import days_ago

dag = DAG(
          dag_id="sample_dag",
          start_date=days_ago(2), 
          description="Sample DAG",
          schedule_interval='@daily')
Enter fullscreen mode Exit fullscreen mode

Task

Tasks perform different actions from executing a piece of Shell script to triggering EMR jobs. It is necessary to have a DAG before we create any task. Also, every task in a DAG is defined by an operator and similar to dag_id, the task_id would need to be unique within the DAG.

def function_a (**kwargs):
        name = kwargs['name']
        return f'hello {name} !!'

first_task = PythonOperator(
        task_id="first_task", 
        python_callable= function_a,
        op_kwargs= {'name': 'Fayaz'}, 
        dag= dag)

second_task = DummyOperator(task_id="second_task", dag=dag)
Enter fullscreen mode Exit fullscreen mode

Task Dependencies

Now the last part of the DAG is to create dependencies among the tasks. In this case, we are going to trigger the first_task first and then trigger the second_task as soon as the first_task completes. So it will look like this:

first_task >> second_task
Enter fullscreen mode Exit fullscreen mode

Now we understood what is Airflow and how to create a simple DAG so let's spin up the AWS MWAA to run this DAG.

You need to use your AWS Account to perform the next few steps which may incur some charges.

Setting up the Managed Airflow Instance in AWS

Before we create a new MWAA environment, we would need to create an S3 bucket which must have versions enabled.

Step 1. Go to Managed Airflow Console and click Create Environment

2

Step 2. Enter a name and choose the Airflow version as 2.0.2(Latest)

3

Step 3. Choose the S3 bucket - the one you have created
Step 4. For Dags folder type s3://{your-bucket-name}/dags
Step 5. Click Next

5

Step 6. Click on Create MWAA VPC

6

Step 7. It will take you to a page with a bunch of VPC, Subnets details. Click Create Stack which may take a few mins to complete

7

Step 8. Choose the VPC you just created and scroll down to enter environment class and other configurations

8

Step 9. Choose Create a new role and click Next

9

Step 10. Verify all the details and click Create environment

10

Generally, it takes 10-20 mins to spin up the Airflow Infrastructure so this is the time to get your coffee โ˜• before we deploy our very first DAG ๐Ÿคฃ๐Ÿ˜‚

๐ŸŽ‰๐ŸŽ‰๐ŸŽ‰ When you refresh in a few minutes, you will see the environment status as Available so click Open Airflow UI

11

12

Yay!! Now we got our environment up and ready to go. So let's deploy our first DAG.

Deploying DAGs in the AWS MWAA

Step 1: To deploy the DAG, we would need to copy the .py file to our s3/dags location. Copy the below code and put that in a .py file and save it as demo_dag.py in your local.

"""
Importing necessary modules
"""
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


"""
Initializing DAGs
"""
dag = DAG(
          dag_id="grepy_sample_dag",
          start_date=days_ago(2), 
          description="DAG which orchestrates a simple ML workflow",
          schedule_interval='@daily')

"""
Creating Tasks
"""
def function_a (**kwargs):
        name = kwargs['name']
        return f'hello {name} !!'

first_task = PythonOperator(
        task_id="first_task", 
        python_callable= function_a,
        op_kwargs= {'name': 'Fayaz'}, 
        dag= dag)

second_task = DummyOperator(task_id="second_task", dag=dag)

"""
Dependencies
"""
first_task >> second_task

Enter fullscreen mode Exit fullscreen mode

Step 2: Upload the demo_dag.py file to your s3/dags folder.

13

Step 3: That's it!! Now again it may take a few minutes when you deploy a DAG for the first time, but it will look like this

14

When you toggle on for the first time, the DAG will automatically be triggered so click on the DAG name which will take you to the tree view where you see the job status and task dependencies.

One of the best parts is you can see all the logs when you go to the Graph view which is also available in CloudWatch so you can ingest that to your Splunk or ELK for further analysis.

15

Conclusion

In this post, we took a high-level look at Airflow basics and we dived into AWS Managed Airflow along with a sample DAG deployment which runs a simple Python function. Similarly, you can orchestrate any type of task using various operators which is available on any Airflow infrastructure.

For further reading

Thank you for your time and happy learning !! ๐Ÿ˜Š

Top comments (0)