Table of Contents
Intro
This post is about integrating MWAA with SES using an IAM role and not SMTP credentials. I will try to keep it short and focused.
The assumption here is that you already have your MWAA 2.0.2 environment and its role configured, as per the AWS documentation, and with the SES:*
actions allowed on your AWS SES.
The standard SMTP configuration
If you already have your MWAA environment configured and you are trying to send emails, you probably ended up on this documentation page: the default settings there should be good enough to give you an idea of what you need.
The one we are interested in the most is the email.email_backend
. As you can see in the documentation the default value is airflow.utils.email.send_email_smtp
.
This is the default airflow SMTP integration and works pretty well if you have SMTP credentials (and it works also with SES, see the link above to create the credentials).
Can we skip the credentials?
I do not like to create more credentials than I need. I wanted to be able to use AWS SES without them, just using the IAM Role assigned to my MWAA environment.
What we need is a different email backend, specifically airflow.providers.amazon.aws.utils.emailer.send_email
. This will use a small module which is sitting among the ones provided by AWS.
All good and dandy until you try to send the first email.
Here is a quick DAG to copy&paste to test your email:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email import EmailOperator
my_dag = DAG(dag_id="test_email_dag",
start_date=datetime(2021, 10, 1),
schedule_interval="0 0 * * *"
)
start = DummyOperator(dag=my_dag,
task_id="start"
)
end = DummyOperator(dag=my_dag,
task_id="end"
)
run_this = EmailOperator(task_id='sent_email',
to='mucio@mucio.net',
subject="test",
html_content="this is a test, nothing to worry"
)
start >> run_this >> end
If you run this DAG you will probably see the following error in the logs:
botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter Source, value: None, type: , valid types:
What is happening here? Do you remember the email_backend
provided by the AWS? For some reasons the mail_from
parameter which is passed as Source
to boto3 does not contain the correct value.
This problem was already reported in a few Airflow issues and PRs. The fix didn't make the cut for Airflow 2.2 and will be probably there in version 2.3, but because we are talking about MWAA (version 2.0.2), we don't really know when this will be fixed on AWS.
A possible solution
The solution I come up with was to rewrite the emailer.py
utility, deploy it in the dags
folder and reference it in the MWAA configuration.
Here the new emailer (I put mine in ses_email_fix/emailer.py
, with an empty __init__.py
):
"""Airflow module fix for email backend using AWS SES"""
from typing import List, Optional, Union
from airflow.configuration import conf
from airflow.providers.amazon.aws.hooks.ses import SESHook
def send_email(to: Union[List[str], str],
subject: str,
html_content: str,
files: Optional[List] = None,
cc: Optional[Union[List[str], str]] = None,
bcc: Optional[Union[List[str], str]] = None,
mime_subtype: str = 'mixed',
mime_charset: str = 'utf-8',
conn_id: str = 'aws_default',
**kwargs,
) -> None:
"""Email backend for SES."""
hook = SESHook(aws_conn_id=conn_id)
hook.send_email(mail_from=conf.get('smtp', 'SMTP_MAIL_FROM'),
to=to,
subject=subject,
html_content=html_content,
files=files,
cc=cc,
bcc=bcc,
mime_subtype=mime_subtype,
mime_charset=mime_charset,
)
This is taking the mail_from
from the smtp.smtp_mail_from
MWAA environment setting. Also I changed my email.email_backend
to be ses_email_fix.emailer.send_email
.
Last thing you don't want to forget is to put the folder ses_email_fix
in your .airflowignore
file (otherwise Airflow will parse that as a DAG).
Now, after updating your environment, your test DAG should be able to fire emails via SES without using credentials.
Last words are for the Airflow community who came up with a quick workaround for this problem that I just implemented in my own MWAA environment.
Credits: Cover photo by Quoc Nguyen from Pexels
Top comments (6)
Thank you! It works!
FYI - This issue has been resolved in Airflow v2.4.3 which is now available for MWAA. Therefore, if you can upgrade to that version then all you need to do is set the email backend config per the Airflow docs here and it will work as long as you have given the MWAA instance proper SES permissions via the IAM role attached to it.
I got the following error. What could be the cause?
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
result = execute_callable(context=context)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/email.py", line 88, in execute
conn_id=self.conn_id,
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/email.py", line 66, in send_email
**kwargs,
File "/usr/local/airflow/dags/ses-email-fix/emailer.py", line 32, in send_email
mime_charset=mime_charset,
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/ses.py", line 98, in send_email
Source=mail_from, Destinations=recipients, RawMessage={'Data': message.as_string()}
File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 530, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 960, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the SendRawEmail operation:
i am getting the following error when I am running the DAG
ModuleNotFoundError: No module named 'ses_email_fix'
where did you put your
ses_email_fix
module? And did you include the__init__.py
file?[2022-10-06, 16:21:25 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py", line 482, in getimport
return import_string(full_qualified_path)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/module_loading.py", line 32, in import_string
module = import_module(module_path)
File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'ses_email_fix'