DEV Community

Ricardo Sueiras for AWS

Posted on • Updated on • Originally published at blog.beachgeek.co.uk

Working with Amazon EKS and Amazon Managed Workflows for Apache Airflow v2.x

Introduction

The Apache Airflow slack channel is a vibrant community of open source builders that is a great source of feedback, knowledge and answers to problems and use cases you might have when trying to do stuff with Apache Airflow. This week I picked up on someone seeing errors with Amazon EKS, and so I thought what better time to try out the new Apache Airflow 2.x version that was recently launched in Amazon Managed Workflows for Apache Airflow (MWAA).

In this post, I will explore one of the code examples in the
MWAA documentation, Using Amazon MWAA with Amazon EKS and specifically look at getting this up and running with the recently announced Apache Airflow 2.x. The post assumes that you already have your MWAA 2.x environment up and running.

What will you need

  • An AWS account with the right level of privileges
  • The latest/up to date aws cli - at least version 1.19.73 / 2.24
  • The eksctl and kubectl command line tools - you can find details on how to install those from the original documentation guide linked above
  • A MWAA environment up and running - may I suggest you check out some of my earlier blog post like this one if you are familiar with AWS CDK or this one, if you are not. All you will need to do is change the version number in the template/CDK app to "2.0.2" from the current version of "1.10.12"
  • An EC2 key pair for the region you have deployed your MWAA environment

I ran this on my Mac and on my Ubuntu desktop, but I have not tried it on Windows but cannot see why it would not work on that too.

Creating the Amazon EKS Cluster

As per the original documentation, you will need to create a public key from an EC2 keypair in your region. I already had one that I use, but if you do not, then you can either use the AWS cli or Console to create one. Ensure you create it in the SAME region as your MWAA environment.

We will also create a folder to keep everything that we are creating in one place.

mkdir mwaa-eks
cd mwaa-eks
ssh-keygen -y -f airflow-eu-2.pem > airflow-eu-2.pub
Enter fullscreen mode Exit fullscreen mode

We now can create our Amazon EKS cluster. We are going to call this "mwaa-2-eks" and deploy this in the same region as our MWAA environment. We need to additionally add the EC2 keypair information and details of the Private and Public subnets of the MWAA environment.

When you created your MWAA environment, you will have created a VPC. If you used the AWS CDK or CloudFormation templates linked above, you can go to the CloudFormation console to view the resources created, and obtain the Subnet info for your MWAA environment.

This is what my command looks like, yours will look different.

eksctl create cluster \
--name mwaa-2-eks \
--region eu-central-1 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key airflow-eu-2 \
--managed \
--vpc-public-subnets "subnet-0e15c476b96769dd0, subnet-0a49eb679f8c55f5b" \
--vpc-private-subnets "subnet-051d91dcd0103d0b3, subnet-00ac6b944d5a0e902"
Enter fullscreen mode Exit fullscreen mode

When we run this, we can check in CloudFormation console for any issues. This takes a few minutes (about 5 mins when I ran this) to complete. You can confirm all is good by running the following command:

eksctl utils associate-iam-oidc-provider \
--region eu-central-1 \
--cluster mwaa-2-eks \
--approve
Enter fullscreen mode Exit fullscreen mode

And you should get output like

[ℹ]  eksctl version 0.35.0
[ℹ]  using region eu-central-1
[ℹ]  IAM Open ID Connect provider is already associated with cluster "mwaa-2-eks" in "eu-central-1"
Enter fullscreen mode Exit fullscreen mode

The next step is to create a new namespace in EKS, which we do running this command

kubectl create namespace mwaa
Enter fullscreen mode Exit fullscreen mode

which should return the following if successful.

namespace/mwaa created
Enter fullscreen mode Exit fullscreen mode

Updating permissions for your MWAA environment

The next step is to create a new role. From the same terminal, run this command.

cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: mwaa-role
rules:
  - apiGroups:
      - ""
      - "apps"
      - "batch"
      - "extensions"
    resources:      
      - "jobs"
      - "pods"
      - "pods/attach"
      - "pods/exec"
      - "pods/log"
      - "pods/portforward"
      - "secrets"
      - "services"
    verbs:
      - "create"
      - "delete"
      - "describe"
      - "get"
      - "list"
      - "patch"
      - "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: mwaa-role-binding
subjects:
- kind: User
  name: mwaa-service
roleRef:
  kind: Role
  name: mwaa-role
  apiGroup: rbac.authorization.k8s.io
EOF
Enter fullscreen mode Exit fullscreen mode

which will generate the following if successful

role.rbac.authorization.k8s.io/mwaa-role created
rolebinding.rbac.authorization.k8s.io/mwaa-role-binding created
Enter fullscreen mode Exit fullscreen mode

and you can check this by running the following command

kubectl get pods -n mwaa --as mwaa-service
Enter fullscreen mode Exit fullscreen mode

which for the time being will return

No resources found in mwaa namespace.
Enter fullscreen mode Exit fullscreen mode

We need to update/create a new IAM policy for the MWAA environment. I am not going to go into details here on how to do that, other than outline the steps I took:

  • create a new IAM policy (which I called mwaa-2-ekspolicy)
  • create a new IAM role (which I called mwaa-2-eks-role) which uses that policy created in the previous step
  • update the MWAA environment Execution policy to use this new role

This is the policy I created, based on the MWAA environment I setup using the AWS CDK deployment


    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:eu-central-1:xxxxxxxxxxxx:environment/cdk-ricsue-demo-delete",
            "Effect": "Allow"
        },
        {
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::airflow-ricsue-cdk-demo/*",
                "arn:aws:s3:::airflow-ricsue-cdk-demo"
            ],
            "Effect": "Deny"
        },
        {
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::airflow-ricsue-cdk-demo/*",
                "arn:aws:s3:::airflow-ricsue-cdk-demo"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults"
            ],
            "Resource": "arn:aws:logs:eu-central-1:xxxxxxxxxxxx:log-group:airflow-cdk-ricsue-demo-delete-*",
            "Effect": "Allow"
        },
        {
            "Action": "logs:DescribeLogGroups",
            "Resource": "*",
            "Effect": "Allow"
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:eu-central-1:*:airflow-celery-*",
            "Effect": "Allow"
        },
        {
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": [
                        "sqs.eu-central-1.amazonaws.com",
                        "s3.eu-central-1.amazonaws.com"
                    ]
                }
            },
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "Resource": "*",
            "Effect": "Allow"
        },
        {
            "Effect": "Allow",
            "Action": [
                "eks:DescribeCluster"
            ],
            "Resource": "arn:aws:eks:eu-central-1:xxxxxxxxxxxx:cluster/mwaa-2-eks"
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

Make sure that the role you created is also has the correct Trust association for MWAA, which I configured as follows. This ensures that MWAA can uses this role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "airflow-env.amazonaws.com",
          "airflow.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Updating your MWAA environment

Before we update the MWAA environment for that IAM role update to take affect, we also need to create a requirements.txt file so that we can load up some dependencies that the KubernetesPodOperator will need.

One of the changes with MWAA when you create an Apache 2.x environment is that not all the providers are deployed, which means we need to install them. You can read more here

I created a txt file with the following

awscli
kubernetes
cryptography
watchtower==1.0.6
apache-airflow[cncf.kubernetes]==2.0.2
apache-airflow-providers-http==1.1.1
apache-airflow-providers-ssh==1.3.0
apache-airflow-providers-postgres==1.0.1
Enter fullscreen mode Exit fullscreen mode

Once you have created this file, upload it to the Apache Airflow DAG's Amazon S3 bucket. This is the folder structure I have setup (created by the CDK app), and I created a folder called "requirements" and the uploaded the requirements.txt into this folder.

airflow-ricsue-cdk-demo
├── dags
│   └── <empty>
└── requirements
    └── requirements.txt
Enter fullscreen mode Exit fullscreen mode

I now update the MWAA environment, and make two updates:

  • update the MWAA Execution role to use the newly created one
  • add a "requirements.txt" configuration that points to this file.

This will cause your environment to update, so will take 10-15 mins to complete.

Once the environment has finished (it will say Available) you can now create an identity mapping for EKS using the IAM role we create above.

eksctl create iamidentitymapping \
--region eu-central-1 \
--cluster mwaa-2-eks \
--arn arn:aws:iam::xxxxxxxxxxxx:role/mwaa-2-eks-role \
--username mwaa-service
Enter fullscreen mode Exit fullscreen mode

Which should output something like this:

[ℹ]  eksctl version 0.35.0
[ℹ]  using region eu-central-1
[ℹ]  adding identity "arn:aws:iam::704533066374:role/mwaa-2-eks-role" to auth ConfigMap
Enter fullscreen mode Exit fullscreen mode

Creating the kube_config

We now need to create the kube_config.yaml by running this command

aws eks update-kubeconfig \
--region eu-central-1 \
--kubeconfig ./kube_config.yaml \
--name mwaa-2-eks \
--alias aws
Enter fullscreen mode Exit fullscreen mode

which will output the following

Added new context aws to /Users/ricsue/Projects/CloudBuilders/Airflow/eks/kube_config.yaml
Enter fullscreen mode Exit fullscreen mode

It also creates in the directory where you ran the command, a file called kube_config.yaml which we need to edit. Edit this file, and at the end, replace

command: aws
Enter fullscreen mode Exit fullscreen mode

with

command: /usr/local/airflow/.local/bin/aws
Enter fullscreen mode Exit fullscreen mode

This was what my finished file looked like:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: <redacted>
    server: https://<yourecrhost>.gr7.eu-central-1.eks.amazonaws.com
  name: arn:aws:eks:eu-central-1:xxxxxxxxxxxx:cluster/mwaa-2-eks
contexts:
- context:
    cluster: arn:aws:eks:eu-central-1:xxxxxxxxxxxx:cluster/mwaa-2-eks
    user: arn:aws:eks:eu-central-1:xxxxxxxxxxxx:cluster/mwaa-2-eks
  name: aws
current-context: aws
kind: Config
preferences: {}
users:
- name: arn:aws:eks:eu-central-1:xxxxxxxxxxxx:cluster/mwaa-2-eks
  user:
    exec:
      apiVersion: client.authentication.k8s.io/v1alpha1
      args:
      - --region
      - eu-central-1
      - eks
      - get-token
      - --cluster-name
      - mwaa-2-eks
      command: /usr/local/airflow/.local/bin/aws
Enter fullscreen mode Exit fullscreen mode

Creating and running your DAG

We are nearly there now. We can now create a sample DAG, using the same example from the documentation. We do however, need to change this to reflect differences between Airflow 1.x and 2.x. We change the original

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
Enter fullscreen mode Exit fullscreen mode

to

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
Enter fullscreen mode Exit fullscreen mode

The complete DAG looks like the following:

from airflow import DAG
from datetime import datetime
#from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
   'owner': 'aws',
   'depends_on_past': False,
   'start_date': datetime(2019, 2, 20),
   'provide_context': True
}

dag = DAG(
   'kubernetes_pod_example', default_args=default_args, schedule_interval=None)

#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'

podRun = KubernetesPodOperator(
                       namespace="mwaa",
                       image="ubuntu:18.04",
                       cmds=["bash"],
                       arguments=["-c", "ls"],
                       labels={"foo": "bar"},
                       name="mwaa-pod-test",
                       task_id="pod-task",
                       get_logs=True,
                       dag=dag,
                       is_delete_operator_pod=False,
                       config_file=kube_config_path,
                       in_cluster=False,
                       cluster_context='aws'
                       )
Enter fullscreen mode Exit fullscreen mode

We now upload both the DAG and the kube_config.yaml file to the DAG folder of MWAA. This is what our directory tree now looks like:

airflow-ricsue-cdk-demo
├── dags
│   └── example-eks.py
│   └── kube_config.yaml
└── requirements
    └── requirements.txt
Enter fullscreen mode Exit fullscreen mode

And after around 1-2 minutes, we can see the new DAG appear within the Apache Airflow 2.x UI.

finished

When we run this DAG, we can see the output of the log

[2021-06-10 15:13:52,508] {{standard_task_runner.py:77}} INFO - Job 18: Subtask pod-task
[2021-06-10 15:13:52,670] {{logging_mixin.py:104}} INFO - [2021-06-10 15:13:52,670] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default
[2021-06-10 15:13:52,727] {{logging_mixin.py:104}} INFO - [2021-06-10 15:13:52,727] {{base_aws.py:179}} INFO - No credentials retrieved from Connection
[2021-06-10 15:13:52,785] {{logging_mixin.py:104}} INFO - [2021-06-10 15:13:52,784] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=eu-central-1
[2021-06-10 15:13:52,854] {{logging_mixin.py:104}} INFO - [2021-06-10 15:13:52,854] {{base_aws.py:157}} INFO - role_arn is None
[2021-06-10 15:13:52,963] {{logging_mixin.py:104}} INFO - Running <TaskInstance: kubernetes_pod_example.pod-task 2021-06-10T15:13:50.295205+00:00 [running]> on host ip-10-192-2-117.eu-central-1.compute.internal
[2021-06-10 15:13:53,133] {{taskinstance.py:1283}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=aws
AIRFLOW_CTX_DAG_ID=kubernetes_pod_example
AIRFLOW_CTX_TASK_ID=pod-task
AIRFLOW_CTX_EXECUTION_DATE=2021-06-10T15:13:50.295205+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-06-10T15:13:50.295205+00:00
[2021-06-10 15:13:54,075] {{kubernetes_pod.py:367}} INFO - creating pod with labels {'dag_id': 'kubernetes_pod_example', 'task_id': 'pod-task', 'execution_date': '2021-06-10T151350.2952050000-061341893', 'try_number': '1'} and launcher <airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher object at 0x7f17f1bc7d10>
[2021-06-10 15:13:54,390] {{logging_mixin.py:104}} INFO - [2021-06-10 15:13:54,390] {{pod_launcher.py:189}} INFO - Event: mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f had an event of type Pending
[2021-06-10 15:13:54,487] {{logging_mixin.py:104}} INFO - [2021-06-10 15:13:54,486] {{pod_launcher.py:126}} WARNING - Pod not yet started: mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f
[2021-06-10 15:13:55,566] {{logging_mixin.py:104}} INFO - [2021-06-10 15:13:55,566] {{pod_launcher.py:189}} INFO - Event: mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f had an event of type Pending
[2021-06-10 15:13:59,516] {{logging_mixin.py:104}} INFO - [2021-06-10 15:13:59,515] {{pod_launcher.py:126}} WARNING - Pod not yet started: mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f
[2021-06-10 15:14:00,613] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:00,613] {{pod_launcher.py:189}} INFO - Event: mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f had an event of type Succeeded
[2021-06-10 15:14:00,694] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:00,694] {{pod_launcher.py:302}} INFO - Event with job id mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f Succeeded
[2021-06-10 15:14:00,775] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:00,775] {{pod_launcher.py:148}} INFO - bin
[2021-06-10 15:14:00,825] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:00,825] {{pod_launcher.py:148}} INFO - boot
[2021-06-10 15:14:00,879] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:00,879] {{pod_launcher.py:148}} INFO - dev
[2021-06-10 15:14:00,937] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:00,936] {{pod_launcher.py:148}} INFO - etc
[2021-06-10 15:14:00,999] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:00,999] {{pod_launcher.py:148}} INFO - home
[2021-06-10 15:14:01,088] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:01,088] {{pod_launcher.py:148}} INFO - lib
[2021-06-10 15:14:01,167] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:01,167] {{pod_launcher.py:148}} INFO - lib64
[2021-06-10 15:14:01,224] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:01,224] {{pod_launcher.py:148}} INFO - media
[2021-06-10 15:14:01,317] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:01,317] {{pod_launcher.py:148}} INFO - mnt
[2021-06-10 15:14:01,376] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:01,376] {{pod_launcher.py:148}} INFO - opt
[2021-06-10 15:14:01,453] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:01,453] {{pod_launcher.py:148}} INFO - proc
[2021-06-10 15:14:01,803] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:01,803] {{pod_launcher.py:148}} INFO - root
[2021-06-10 15:14:01,866] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:01,866] {{pod_launcher.py:148}} INFO - run
[2021-06-10 15:14:03,722] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:03,722] {{pod_launcher.py:148}} INFO - sbin
[2021-06-10 15:14:04,031] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:04,031] {{pod_launcher.py:148}} INFO - srv
[2021-06-10 15:14:04,085] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:04,085] {{pod_launcher.py:148}} INFO - sys
[2021-06-10 15:14:04,732] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:04,732] {{pod_launcher.py:148}} INFO - tmp
[2021-06-10 15:14:04,957] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:04,957] {{pod_launcher.py:148}} INFO - usr
[2021-06-10 15:14:05,021] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:05,021] {{pod_launcher.py:148}} INFO - var
[2021-06-10 15:14:06,113] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:06,113] {{pod_launcher.py:189}} INFO - Event: mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f had an event of type Succeeded
[2021-06-10 15:14:08,356] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:08,356] {{pod_launcher.py:302}} INFO - Event with job id mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f Succeeded
[2021-06-10 15:14:08,441] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:08,441] {{pod_launcher.py:189}} INFO - Event: mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f had an event of type Succeeded
[2021-06-10 15:14:08,500] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:08,500] {{pod_launcher.py:302}} INFO - Event with job id mwaa-pod-test.946a6687934d4ed6a67bf11070b5b24f Succeeded
[2021-06-10 15:14:08,565] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=kubernetes_pod_example, task_id=pod-task, execution_date=20210610T151350, start_date=20210610T151351, end_date=20210610T151408
[2021-06-10 15:14:08,656] {{taskinstance.py:1246}} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-06-10 15:14:08,834] {{logging_mixin.py:104}} INFO - [2021-06-10 15:14:08,833] {{local_task_job.py:146}} INFO - Task exited with return code 0
Enter fullscreen mode Exit fullscreen mode

Conclusion

In this post we took a look at how to run one of the code examples in the MWAA documentation, showing you how you can run DAGs using the KubernetesPodOperator when you have configured Apache Airflow version 2.x.

Top comments (2)

Collapse
 
cleberjsantos profile image
Cleber J Santos

The post about MWAA and EKS is very good, however, I was unable to understand the advantage of using EKS to execute DAGs. Could you say that this approach has an impact on reducing costs in the MWAA environment, for example?

Collapse
 
094459 profile image
Ricardo Sueiras

So there are a number of reasons why folk do this, and I actually have a talk just dedicated to this. Some of the use case I’ve seen include orgs that have a mature container development lifecycle and want to use Airflow as a scheduler for those tasks, where you want to run non Python tasks which is again very common, also where you might want to run binaries, or where you want to run hybrid workflows, if you have a hybrid kubernetes infra.