DEV Community

Cover image for Federated Airflow with SQS
elliott cordo for AWS Heroes

Posted on • Updated on

Federated Airflow with SQS

Simple Queueing Services (SQS) is one of the most simple and effective services AWS has to offer. The thing I like most about it is it's versatility, performing equality well in high volume pub-sub event processing and more general low-volume orchestration. In this post we'll review how we can use SQS to create non-monolithic Airflow architecture, a double-click into my previous post on the subject.

More Airflows = More Happy

Airflow is a great tool and at this point fairly ubiquitous in the data engineering community. However the more complex the environment the more difficult it will be to develop/deploy, and ultimately the less stable it will be.

I generally recommend the following principles when architecting with Airflow:

  • Small maintainable data product repos
  • Multiple purposeful Airflow environments
  • Airflow environments communicating through events

So this all sounds good, but what about dependencies 😬??? Organizations are going to have some dependencies that cross domain/product boundaries. We'd stand to lose that Airflow dependency goodness if the products are running in separate Airflow environments. A good example of this would be a "customer master", which might be used in several independently developed data products. That's where "communicating through events" comes in 😃

SQS to the rescue

Luckily this problem is VERY easily solved using SQS, and we've put together this little demo repository to help you get started.

Step 1: Setup

Assuming you already have two MWAA environments or self-hosted Airflow infrastructure you will need to create an SNS topic and create and subscribe an SQS subscription. We decided to be cute and package these as DAG's, but you can lift the code or create from the console.

You will then need to attach a policy to the role used by your Airflow environment. Note that this policy is fairly permissive as they will enable the steps above to be run through Airflow.



{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowSNSActions",
"Effect": "Allow",
"Action": [
"sns:CreateTopic",
"sns:DeleteTopic",
"sns:ListTopics",
"sns:ListSubscriptionsByTopic",
"sns:GetSubscriptionAttributes",
"sns:Subscribe",
"sns:SetSubscriptionAttributes",
"sns:ConfirmSubscription",
"sns:Publish"
],
"Resource": "arn:aws:sns::{AccountID}:"
},
{
"Sid": "AllowSQSActions",
"Effect": "Allow",
"Action": [
"sqs:CreateQueue",
"sqs:DeleteQueue",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:ListQueues",
"sqs:ReceiveMessage",
"sqs:DeleteMessage"
],
"Resource": "arn:aws:sqs::{AccountID}:"
}
]
}
Enter fullscreen mode Exit fullscreen mode




Step 2: Create SNS Publish DAG

In the following DAG we create a simulated upstream dependency(consider this your customer master build step). We use the SnsPublishOperator to notify downstream dependencies after our dummy step is complete.

⚠️ Note that if you did not build your SNS/SQS resources using the DAGS, you will need to manually set your Airflow variables with the appropriate ARN's.



from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import datetime
import boto3

default_args = {
'owner': 'airflow',
'start_date': days_ago(1)
}

# Define DAG to display the cross-dag dependency using SNS topic publish
with DAG(
'sns_publish_dummy',
default_args=default_args,
description='A simple DAG to publish a message to an SNS topic',
schedule_interval=None,
catchup=False
) as dag:
# Dummy task to show upward dag dependency success
dummy_sleep_task = BashOperator(
task_id='sleep_task',
bash_command='sleep 10'
)

<span class="c1"># SNS Publish operator to publish message to SNS topic after the upward tasks are successful
Enter fullscreen mode Exit fullscreen mode

publish_to_sns = SnsPublishOperator(
task_id='publish_to_sns',
target_arn=Variable.get("sns_test_arn"), # SNS topic arn to which you want to publish the message
message='This is a test message from Airflow',
subject='Test SNS Message'
)

<span class="n">dummy_sleep_task</span> <span class="o">&gt;&gt;</span> <span class="n">publish_to_sns</span>
Enter fullscreen mode Exit fullscreen mode

if name == "main":
dag.cli()

Enter fullscreen mode Exit fullscreen mode




Step 3: Create SQS Subscribe DAG

This DAG will simulate the downstream dependency, perhaps a customer profile job. Leveraging the SqsSensor, it simply waits for the upstream job to complete, and then runs it's own dummy step. Note that the mode='reschedule' is required to enable this polling/waiting functionality.



from airflow import DAG
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import timedelta
import boto3

default_args = {
'owner': 'airflow',
'start_date': days_ago(1)
}

def print_sqs_message():
print("Hello, SQS read and delete successful!! ")

# Define DAG to show cross-dag dependency using SQS sensor operator

with DAG(
'sqs_sensor_example',
default_args=default_args,
description='A simple DAG to sense and print messages from SQS',
schedule_interval=None
) as dag:

<span class="c1"># SQS sensor operator waiting to receive message in the provided SQS queue from SNS topic
Enter fullscreen mode Exit fullscreen mode

sense_sqs_queue = SqsSensor(
task_id='sense_sqs_queue',
sqs_queue=Variable.get("sqs_queue_test_url"), # Airflow variable name for the SQS queue url
aws_conn_id='aws_default',
max_messages=1,
wait_time_seconds=20,
visibility_timeout=30,
mode='reschedule' # the task waits for any message to be received in the specified queue
)

<span class="n">print_message</span> <span class="o">=</span> <span class="nc">PythonOperator</span><span class="p">(</span>
    <span class="n">task_id</span><span class="o">=</span><span class="sh">'</span><span class="s">print_message</span><span class="sh">'</span><span class="p">,</span>
    <span class="n">python_callable</span><span class="o">=</span><span class="n">print_sqs_message</span>
<span class="p">)</span>

<span class="n">sense_sqs_queue</span> <span class="o">&gt;&gt;</span> <span class="n">print_message</span>
Enter fullscreen mode Exit fullscreen mode

if name == "main":
dag.cli()

Enter fullscreen mode Exit fullscreen mode




Testing, Testing

Once your environment is setup, simply start your SQS subscriber DAG. It will patiently wait polling SQS for a completion state.

When you are ready start your SNS publisher DAG. Once complete your subscriber will start it's dummy step and complete.

Bringing it all together..

Big picture, leveraging SQS you can enable a pragmatic, data-mesh inspired infrastructure like has been illustrated below. No Airflow-based single point of failure, observed domain/product boundaries, and team autonomy.

As a bonus you have also enabled evolutionary architecture. If some team wants to transition from Airflow to Step Functions, or Prefect they are empowered to do so, so long as they continue interacting through SNS/SQS.

Image description

I'd like to give special thanks to @deekshagunde for contributing to this article and preparing the demo repo.

Top comments (0)