DEV Community

Cover image for AWS MWAA and AWS SES integration
mucio
mucio

Posted on

AWS MWAA and AWS SES integration

Table of Contents

  1. Intro
  2. The standard SMTP configuration
  3. Can we skip the credentials?
  4. A possible solution

Intro

This post is about integrating MWAA with SES using an IAM role and not SMTP credentials. I will try to keep it short and focused.

The assumption here is that you already have your MWAA 2.0.2 environment and its role configured, as per the AWS documentation, and with the SES:* actions allowed on your AWS SES.

The standard SMTP configuration

If you already have your MWAA environment configured and you are trying to send emails, you probably ended up on this documentation page: the default settings there should be good enough to give you an idea of what you need.

The one we are interested in the most is the email.email_backend. As you can see in the documentation the default value is airflow.utils.email.send_email_smtp.

This is the default airflow SMTP integration and works pretty well if you have SMTP credentials (and it works also with SES, see the link above to create the credentials).

Can we skip the credentials?

I do not like to create more credentials than I need. I wanted to be able to use AWS SES without them, just using the IAM Role assigned to my MWAA environment.

What we need is a different email backend, specifically airflow.providers.amazon.aws.utils.emailer.send_email. This will use a small module which is sitting among the ones provided by AWS.

All good and dandy until you try to send the first email.

Here is a quick DAG to copy&paste to test your email:

from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email import EmailOperator

my_dag = DAG(dag_id="test_email_dag",
             start_date=datetime(2021, 10, 1),
             schedule_interval="0 0 * * *"
            )

start = DummyOperator(dag=my_dag,
                      task_id="start"
                     )

end = DummyOperator(dag=my_dag,
                    task_id="end"
                   )

run_this = EmailOperator(task_id='sent_email',
                         to='mucio@mucio.net',
                         subject="test",
                         html_content="this is a test, nothing to worry"
                        )

start >> run_this >> end
Enter fullscreen mode Exit fullscreen mode

If you run this DAG you will probably see the following error in the logs:

botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter Source, value: None, type: , valid types:

What is happening here? Do you remember the email_backend provided by the AWS? For some reasons the mail_from parameter which is passed as Source to boto3 does not contain the correct value.

This problem was already reported in a few Airflow issues and PRs. The fix didn't make the cut for Airflow 2.2 and will be probably there in version 2.3, but because we are talking about MWAA (version 2.0.2), we don't really know when this will be fixed on AWS.

A possible solution

The solution I come up with was to rewrite the emailer.py utility, deploy it in the dags folder and reference it in the MWAA configuration.

Here the new emailer (I put mine in ses_email_fix/emailer.py, with an empty __init__.py):

"""Airflow module fix for email backend using AWS SES"""

from typing import List, Optional, Union

from airflow.configuration import conf
from airflow.providers.amazon.aws.hooks.ses import SESHook


def send_email(to: Union[List[str], str],
               subject: str,
               html_content: str,
               files: Optional[List] = None,
               cc: Optional[Union[List[str], str]] = None,
               bcc: Optional[Union[List[str], str]] = None,
               mime_subtype: str = 'mixed',
               mime_charset: str = 'utf-8',
               conn_id: str = 'aws_default',
               **kwargs,
              ) -> None:
    """Email backend for SES."""

    hook = SESHook(aws_conn_id=conn_id)

    hook.send_email(mail_from=conf.get('smtp', 'SMTP_MAIL_FROM'),
                    to=to,
                    subject=subject,
                    html_content=html_content,
                    files=files,
                    cc=cc,
                    bcc=bcc,
                    mime_subtype=mime_subtype,
                    mime_charset=mime_charset,
                   )
Enter fullscreen mode Exit fullscreen mode

This is taking the mail_from from the smtp.smtp_mail_from MWAA environment setting. Also I changed my email.email_backend to be ses_email_fix.emailer.send_email.

Last thing you don't want to forget is to put the folder ses_email_fix in your .airflowignore file (otherwise Airflow will parse that as a DAG).

Now, after updating your environment, your test DAG should be able to fire emails via SES without using credentials.

Last words are for the Airflow community who came up with a quick workaround for this problem that I just implemented in my own MWAA environment.

Credits: Cover photo by Quoc Nguyen from Pexels

Top comments (6)

Collapse
 
novakovalexey profile image
Alexey Novakov

Thank you! It works!

Collapse
 
sophistafunk profile image
sophistafunk

FYI - This issue has been resolved in Airflow v2.4.3 which is now available for MWAA. Therefore, if you can upgrade to that version then all you need to do is set the email backend config per the Airflow docs here and it will work as long as you have given the MWAA instance proper SES permissions via the IAM role attached to it.

Collapse
 
bibek_pdyl profile image
Bibek Paudyal

I got the following error. What could be the cause?
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
result = execute_callable(context=context)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/email.py", line 88, in execute
conn_id=self.conn_id,
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/email.py", line 66, in send_email
**kwargs,
File "/usr/local/airflow/dags/ses-email-fix/emailer.py", line 32, in send_email
mime_charset=mime_charset,
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/ses.py", line 98, in send_email
Source=mail_from, Destinations=recipients, RawMessage={'Data': message.as_string()}
File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 530, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 960, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the SendRawEmail operation:

Collapse
 
krigo profile image
shcsd-krigo

i am getting the following error when I am running the DAG

ModuleNotFoundError: No module named 'ses_email_fix'

Collapse
 
mucio profile image
mucio

where did you put your ses_email_fix module? And did you include the __init__.py file?

Collapse
 
krigo profile image
shcsd-krigo

[2022-10-06, 16:21:25 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py", line 482, in getimport
return import_string(full_qualified_path)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/module_loading.py", line 32, in import_string
module = import_module(module_path)
File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'ses_email_fix'