Amazon Managed Workflows for Apache Airflow (MWAA) is a fully managed service that allows us to orchestrate, manage and create Data and Machine Learning Pipelines in AWS based on Apache Airflow.
Currently, many customers run their pipelines using Apache Airflow in EKS, ECS, or EC2, in which they have to spend a lot of time in the administration of all the components that Apache Airflow has such as Scheduler, Executor, WebServer, WebUI, Workers, DB and the Broker. Additionally, all Security, authentication, authorization, and logging options must be configured.
When using Amazon Managed Workflows for Apache Airflow (MWAA) AWS manages all the components related to instances, storage, software installation, integration with IAM SSO, Logging (Cloudwatch), Workers Scaling allowing the flexibility to add custom configurations and install operators, hooks, sensors, and plugins without any inconvenience.
As our Data & Analytics and Machine Learning projects grow, flexibility becomes more important, this leads us to use custom plugins within MWAA to be able to create different types of tasks that fit our needs, now we will see the steps to create a plugin (operators, hooks, Sensors), how to validate their creation and monitoring.
To better understand this process I will show you how to add a Notification Plugin in Slack.
Note: All the code in this blog post is in My GitHub Repo.
The directory structure must be in the following format, take into account that for this example I am not using sensors.
__init__.py |-- slack_plugin.py hooks/ |-- __init__.py |-- slack_webhook_hook.py operators/ |-- __init__.py |-- slack_webhook_operator.py
A slack_plugin.py file should be created in this case with the following content:
from airflow.plugins_manager import AirflowPlugin from hooks.slack_webhook_hook import SlackWebhookHook from operators.slack_webhook_operator import SlackWebhookOperator class slack_webhook_operator(SlackWebhookOperator): pass class slack_webhook_hook(SlackWebhookHook): pass class slack_plugin(AirflowPlugin): name = 'my_slack_plugin' hooks = [slack_webhook_hook] operators = [slack_webhook_operator]
This code shows that we have to create a class with the name of our plugin and the name of the hook and operator (for this case we do not use sensors). Also, keep in mind that the routes to call the operators are different from the Operators that come by default with Apache Airflow 1.10.12
PythonOperator Path (comes with the default installation):
from airflow.operators.python_operator import PythonOperator
SlackWebhookOperator Path (custom operator):
from operators.slack_webhook_operator import SlackWebhookOperator
To install the plugin you must be in the plugins folder of your project:
MacBook-Pro-73:mwaa czam$ cd plugins MacBook-Pro-73:mwaa czam$ chmod -R 755 . MacBook-Pro-73:mwaa czam$ zip -r plugins.zip .
Once you have compressed the plugins.zip file, you must upload it to your S3 bucket specified in the MWAA configuration:
Once the plugins.zip file is uploaded to the specified path, you must go to the MWAA environment and select the newly uploaded file. We click Next and then Save. By doing this the environment enters an Updating state and takes a few minutes to return to the Available state.
The requirements.txt file should be loaded with the packages we need to install, in this case, the file should look like this:
Note that the same step must be done for plugins.zip for MWAA to recognize requirements.txt
To validate that the plugin was installed we must create a DAG and execute the SlackWebhookOperator task. The following code is for a task with SlackWebhookOperator:
task1 = SlackWebhookOperator( task_id='task_1', http_conn_id='slack_connection', message='I am the task 1', channel='#airflowchannel', dag=dag )
To finish this configuration, a connection must be added in connections, which must contain the webhook created in slack.
To create a webhook in Slack follow these these steps.
After making the configurations, we activate the dag and verify in our slack channel that the notifications begin to arrive.
Additionally, within the Apache Airflow console, we can see in Tree View the number of executions for each task and their respective status
When creating the environment, it is recommended to enable logging at the INFO level for all components, this will allow you to have greater visibility of the status of the entire environment.
When you need to execute custom tasks it is best to create custom operators and add them within a plugin.
In the official Apache Airflow documentation you will find a large number of Operators, Hooks, and Sensors that you can use.
To interact with AWS services you can do it in different ways, you can use a PythonOperator and use the boto3 library, you can also create a connection with AWS and interact with the services through custom operators.
Pipelines can be created for data and analytics projects and also Machine Learning, you can make use of operators to connect with services such as sage maker.
All the content in this post could be found here.