AWS has announced, “AWS Lambda now supports partial batch response for SQS as an event source”. Before we go through, how it works let understand that how SQS messages were handled before, and then we will go through how the partial batch response feature will add value.
1st scenario
Assumption: The SQS trigger is configured for the lambda function. The exception is not handled in case of any message processing failure for a given batch.
The lambda function is triggered with the batch of 5 messages and if the lambda function fails to process any of the messages and throws an error then all the 5 messages would be kept on the queue to be reprocessed after a visibility timeout period. In this case, either the batch processing would be completely successful and the messages would be deleted from the SQS queue or it would completely fail and put the whole batch in the queue for reprocessing.
import json
import boto3
def lambda_handler(event, context):
region_name = os.environ['AWS_REGION']
if event:
for record in event['Records']:
body = record['body']
# process message
return {
'statusCode': 200,
'body': json.dumps('Message processed successfully!')
}
Re-processing already processed message is not feasible, so to avoid it at some level let’s delete each message from the batch once it is processed.
2nd scenario
Assumption: The SQS trigger is configured for the lambda function. The exception is not handled in case of any message processing failure. But the delete functionality is added to delete each message after it is processed successfully.
In this case, let’s say the first 2 messages in the batch are processed successfully and deleted. The 3rd message failed and lambda returns an error, so for this failure, the 3rd, 4th & 5th messages will be set for a retry since the 1st & 2nd messages are processed successfully and deleted from the queue. Hence, the processing of already processed messages will not happen.
import os
import json
import boto3
def lambda_handler(event, context):
region_name = os.environ['AWS_REGION']
if event:
sqs = boto3.client('sqs', region_name=region_name)
queue_name = event['Records'][0]['eventSourceARN'].split(':')[-1]
queue_url = sqs.get_queue_url(
QueueName=queue_name,
)
for record in event['Records']:
body = record['body']
print(body)
# process message
response = sqs.delete_message(
QueueUrl=queue_url['QueueUrl'],
ReceiptHandle=record['receiptHandle']
)
return {
'statusCode': 200,
'body': json.dumps('Message processed successfully!')
}
view raw
The lambda failed to process the 3rd message from the batch and due to that further processing of the rest of the messages is interrupted. But let’s say we want all the messages to be processed even if any message is failing and the successfully processed messages should be deleted and only the failed message should be retried.
3rd scenario
Assumption: The SQS trigger is configured for the lambda function. Exception handling for failed messages is configured on top of delete functionality.
Here, we will maintain a flag to determine if any message is failing. Let’s say again the 3rd message failed. Now, since we have error handling in place, it will handle the failed message and process the rest of the messages in the batch followed by the deletion. Finally, the manual exception will be raised based on the flag condition which will cause the failed message to retry since the rest of the messages are processed and deleted successfully. In this scenario, we cannot control which messages we want lambda to retry with and if we want to control the messages that lambda should retry then the partial batch response feature is the answer.
import os
import json
import boto3
def lambda_handler(event, context):
region_name = os.environ['AWS_REGION']
if event:
sqs = boto3.resource('sqs', region_name=region_name)
queue_name = event['Records'][0]['eventSourceARN'].split(':')[-1]
queue = sqs.get_queue_by_name(QueueName=queue_name)
failed_flag = False
messages_to_delete = []
for record in event['Records']:
try:
body = record['body']
# process message
messages_to_delete.append({
'Id': record['messageId'],
'ReceiptHandle': record['receiptHandle']
})
except RuntimeError as e:
failed_flag =True
if messages_to_delete:
delete_response = queue.delete_messages(
Entries=messages_to_delete)
if failed_flag:
raise RuntimeError('Failed to process messages')
return {
'statusCode': 200,
'body': json.dumps('Messages processed successfully!')
}
All the 3 scenarios were about how lambda can handle the messages depending on the requirements before the partial batch response was introduced.
4th Scenario
With the partial batch response feature, a lambda can identify the failed messages from the batch and return the identified messages back to the queue which will allow reprocessing of only the failed or asked messages. This will make the SQS queue processing more efficient, kill the need for repetitive data transfer with increased throughput, improve processing performance, and on top of that it does not come with any additional cost beyond the standard price.
While using this feature, exception handling should be in place and the lambda function has to return the message ids of the messages that requires reprocessing in the particular format given below.
To enable and handle partial batch failure, check the Report batch item failures option under Additional settings while adding the SQS trigger.
After the SQS event source configuration, the response part in the code should be in a particular format that is given below for the partial batch response failure to work.
{
"batchItemFailures": [
{
"itemIdentifier": "id2"
},
{
"itemIdentifier": "id4"
}
]
}
import os
import json
import boto3
def lambda_handler(event, context):
if event:
messages_to_reprocess = []
batch_failure_response = {}
for record in event["Records"]:
try:
body = record["body"]
# process message
except Exception as e:
messages_to_reprocess.append({"itemIdentifier": record['messageId']})
batch_failure_response["batchItemFailures"] = messages_to_reprocess
return batch_failure_response
For a detailed video please refer to the below video.
Recommendation
For most of the situations/scenarios adopting the implementation used in the 4th scenario would be beneficial and have added advantage of efficient & fast processing, reduced repetitive data transfer hence increased throughput.
If you have any questions, comments, or feedback then please leave them below. Subscribe to my channel for more.
Top comments (0)