DEV Community

Cover image for Adding Custom Operators on  
Amazon Managed Workflows for Apache Airflow
Carlos Zambrano for AWS Community Builders

Posted on

Adding Custom Operators on Amazon Managed Workflows for Apache Airflow

Introduction to Apache Airflow on AWS (MWAA)

Amazon Managed Workflows for Apache Airflow (MWAA) is a fully managed service that allows us to orchestrate, manage and create Data and Machine Learning Pipelines in AWS based on Apache Airflow.

Currently, many customers run their pipelines using Apache Airflow in EKS, ECS, or EC2, in which they have to spend a lot of time in the administration of all the components that Apache Airflow has such as Scheduler, Executor, WebServer, WebUI, Workers, DB and the Broker. Additionally, all Security, authentication, authorization, and logging options must be configured.

When using Amazon Managed Workflows for Apache Airflow (MWAA) AWS manages all the components related to instances, storage, software installation, integration with IAM SSO, Logging (Cloudwatch), Workers Scaling allowing the flexibility to add custom configurations and install operators, hooks, sensors, and plugins without any inconvenience.

Adding our Slack plugin to MWAA

As our Data & Analytics and Machine Learning projects grow, flexibility becomes more important, this leads us to use custom plugins within MWAA to be able to create different types of tasks that fit our needs, now we will see the steps to create a plugin (operators, hooks, Sensors), how to validate their creation and monitoring.

To better understand this process I will show you how to add a Notification Plugin in Slack.

Note: All the code in this blog post is in My GitHub Repo.

1. Plugin Directory Structure

The directory structure must be in the following format, take into account that for this example I am not using sensors.
Enter fullscreen mode Exit fullscreen mode

2. Content

A file should be created in this case with the following content:

from airflow.plugins_manager import AirflowPlugin
from hooks.slack_webhook_hook import SlackWebhookHook
from operators.slack_webhook_operator import SlackWebhookOperator

class slack_webhook_operator(SlackWebhookOperator):

class slack_webhook_hook(SlackWebhookHook):

class slack_plugin(AirflowPlugin):

    name = 'my_slack_plugin'       
    hooks = [slack_webhook_hook]
    operators = [slack_webhook_operator]
Enter fullscreen mode Exit fullscreen mode

This code shows that we have to create a class with the name of our plugin and the name of the hook and operator (for this case we do not use sensors). Also, keep in mind that the routes to call the operators are different from the Operators that come by default with Apache Airflow 1.10.12

PythonOperator Path (comes with the default installation):

from airflow.operators.python_operator import PythonOperator
Enter fullscreen mode Exit fullscreen mode

SlackWebhookOperator Path (custom operator):

from operators.slack_webhook_operator import SlackWebhookOperator
Enter fullscreen mode Exit fullscreen mode

3. Install the Plugin

To install the plugin you must be in the plugins folder of your project:

MacBook-Pro-73:mwaa czam$  cd plugins
MacBook-Pro-73:mwaa czam$  chmod -R 755 .
MacBook-Pro-73:mwaa czam$  zip -r .
Enter fullscreen mode Exit fullscreen mode

Once you have compressed the file, you must upload it to your S3 bucket specified in the MWAA configuration:

S3 Configuration

Once the file is uploaded to the specified path, you must go to the MWAA environment and select the newly uploaded file. We click Next and then Save. By doing this the environment enters an Updating state and takes a few minutes to return to the Available state.
Updating MWAA

4. Install Requirements

The requirements.txt file should be loaded with the packages we need to install, in this case, the file should look like this:

Enter fullscreen mode Exit fullscreen mode

Note that the same step must be done for for MWAA to recognize requirements.txt

5. Test the Custom Plugin

To validate that the plugin was installed we must create a DAG and execute the SlackWebhookOperator task. The following code is for a task with SlackWebhookOperator:

task1 = SlackWebhookOperator(
    message='I am the task 1',
Enter fullscreen mode Exit fullscreen mode

To finish this configuration, a connection must be added in connections, which must contain the webhook created in slack.

Go to the connections option.
Connections Option

Now you need to add an HTTP connection and should be like this:
Add Connection

To create a webhook in Slack follow these these steps.

After making the configurations, we activate the dag and verify in our slack channel that the notifications begin to arrive.

6. Monitor your Plugin

To monitor our MWAA environment we can in CloudWatch review the Log groups in which we can see the status of scheduler, web server, tasks, and for each of the dags.
Cloudwatch MWAA

Additionally, within the Apache Airflow console, we can see in Tree View the number of executions for each task and their respective status

Dag Monitoring

Learned lessons and Best Practices

  1. When creating the environment, it is recommended to enable logging at the INFO level for all components, this will allow you to have greater visibility of the status of the entire environment.

  2. When you need to execute custom tasks it is best to create custom operators and add them within a plugin.

  3. In the official Apache Airflow documentation you will find a large number of Operators, Hooks, and Sensors that you can use.

  4. To interact with AWS services you can do it in different ways, you can use a PythonOperator and use the boto3 library, you can also create a connection with AWS and interact with the services through custom operators.

  5. Pipelines can be created for data and analytics projects and also Machine Learning, you can make use of operators to connect with services such as sage maker.

All the content in this post could be found here.

-- Carlos Zambrano

Top comments (0)