In this post I discuss how messages in a SQS queue can be split between multiple SQS queues with their original payloads. The idea is to achieve this functionality with a low code solution using EventBridge Pipe, Event Bus and Event Rules.
EventBridge Pipe
EventBridge Pipe was introduced in the last re:Invent 2022. EB Pipes helps to create point to point integration with event producers and consumers with low code.
This was a great addition to the Serverless services since this will reduce some custom codes that are required to connect two services together. Also it includes additional capabilities to filter and enrich events as they are passed through the Pipe. As of now, EB Pipe supports streaming sources such as Kinesis, DynamoDB, Self Managed Apache Kafka, Amazon MSK and Amazon MQ and SQS as well.
About this project
Imagine a situation where you need to split messages in a SQS queue based on it's content to separate SQS queues. An example can be where an external producer sends messages to a single SQS queue and you have to process those messages using different consumers. Else, those messages may need to be processed with different priorities. So, first, you need to separate those messages into different SQS queues.
This can be achieved using a single Lambda function where Lambda function will first process the messages from the source queue and then send them to different SQS queues.
However, this Lambda function will contain too much business logic and permissions to point correct messages to the respective target queues. Also, it should perform in a way to scale well based on the demand and be reliable to not to be a single point of failure. Further, when there are more targets introduced, this needs to be extended to cater those requirements, which is always a challenge.
Solution
High level architecture
How it works
EventBridge Pipe is configured to poll messages from the source SQS queue.
There is no filter set up, which means all the messages in the source queue will be processed through the Pipe.
When a message is received from the Pipe, it contains not only the original message in the body parameter, but a lot of metadata (related to SQS) as well. Because of that, in order to send the message to the targets, it is required to extract only the original message from the payload. For this, the enrichment Lambda function is used here.
Then, this message is sent to the Event Bus.
There are event rules defined for the Event Bus with conditions that will match against this original message.
Each rule has a SQS queue defined as the target.
In a scenario where a message content is matched with the rule, that message will be sent to the particular queue.
Test it yourself
I have created a sample application to test this scenario. This is created using AWS CDK v2 with Python. So, you need CDK v2 and Python installed in your environment.
Set up
Clone the repository: https://github.com/pubudusj/sqs-to-multiple-sqs
Go into the cloned directory.
-
To manually create a virtualenv on MacOS and Linux:
$ python3 -m venv .venv
-
After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
$ source .venv/bin/activate
-
If you are a Windows platform, you would activate the virtualenv using:
% .venv\Scripts\activate.bat
-
Once the virtualenv is activated, you can install the required dependencies.
$ pip install -r requirements.txt
-
Then, deploy the application:
$ cdk deploy
Once the application is deployed, in the output you can see 4 values for:
SourceQueueUrl
,TargetQueueOrderCreated
,TargetQueueOrderUpdated
andPipeArn
. Copy the value ofSourceQueueUrl
to test the application.
Test
Here, I have configured 2 Event Bus rules.
First rule will match a message with the field
type
which has the value 'OrderCreated'. This rule has a target SQS queue namedTargetQueueOrderCreated
.Second rule will match a message with the field
type
which has value 'OrderUpdated'. This rule has a target SQS queue namedTargetQueueOrderUpdated
.-
Send the a message with type 'OrderCreated' into the source queue as follows:
aws sqs send-message \ --queue-url=SourceQueueUrl \ --message-body '{"orderId":"125a2e1e-d420-482e-8008-5a606f4b2076", "customerId": "a48516db-66aa-4dbc-bb66-a7f058c5ec24", "type": "OrderCreated"}'
If you check the
TargetQueueOrderCreated
, you will see the message has arrived into the queue with the original payload.-
Send the a message with type 'OrderUpdated' into the source queue as follows:
aws sqs send-message \ --queue-url=SourceQueueUrl \ --message-body '{"orderId":"125a2e1e-d420-482e-8008-5a606f4b2076", "customerId": "a48516db-66aa-4dbc-bb66-a7f058c5ec24", "type": "OrderUpdated"}'
If you check the
TargetQueueOrderUpdated
, you will see the message has arrived into the queue with the original payload.
Conclusion
With EventBridge Pipes, it is easy to connect sources, specially streaming sources, with the targets with minimum configurations, where previously custom code is required. It scales well, cost effective and very minimum maintenance is required.
Useful Links
EventBridge Pipes documentation:
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.htmlCloudformation API for Pipes:
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.htmlPipes Documentation for CDK v2 Python:
https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_pipes/CfnPipe.htmlEventBridge Pipes pricing:
https://aws.amazon.com/eventbridge/pricing/#Pipes
Feedback
Please feel free to deploy this solution to your own AWS environment and share your experience with me. And you can connect with me in LinkedIn: https://www.linkedin.com/in/pubudusj and Twitter https://twitter.com/pubudusj
Keep building! Keep sharing!
Top comments (2)
Can't we just use
Eventbridge
pipes to enhance and filter the messages based on the content and directly send them to different queues by having rules for them here? Thus removing the lambda and event bus dependencies?I believe if you did that, the Enrichment Lambda would become the single source of failure, and in the event you have new consumers you'd have to modify this Lambda (not ideal). Instead you can simple add whatever information you need for enrichment, then send to the EventBridge, if you have more consumers in the future, you simple subscribe them to the EventBridge rather than needing to modify the Lambda to send to the new SQS queue.