DEV Community

Cover image for Asynchronous API with DynamoDB Streams
Vikas Solegaonkar for AWS Community Builders

Posted on • Updated on • Originally published at builtin.com

Asynchronous API with DynamoDB Streams

Responsiveness is one of the most important parameters for the success of any web application. And asynchronous processing is The Solution for attaining this responsiveness. A server request from the browser should return immediately - without wait for completion. The data flow should be designed in a way that it does not depend upon an immediate response from the server.

There are several architecture patterns to do this. But a major problem with asynchronous processing is error handling. How would the client know if the request failed? We should not lose data in the process. In fact, a fire and forget service, cannot afford to fail. Even if the processing failed for some reason, the data has to reach the DB.

DynamoDB streams provides a cool solution to this problem. Let's check it out

What is DynamoDB Streams

Most of the traditional databases have a concept of Triggers. These are events generated when some data change in the DB. DynamoDB Streams are quite similar. With one difference - instead of generating a distinct trigger per data change, it generates a stream of events that flows into a target - Lambda or Kinesis.

We can have a Lambda function triggered by such events - which can process this data. The incoming API call can directly dump the data into the DynamoDB - using API Gateway service integration. This ensures very low response time in the API. The DynamoDB streams can be configured to invoke a Lambda function that can

We have an advantage here - the data is already available in our DB before we start processing it. Even if the downstream processing fails, the data is already available in the DB. And in the unlikely event of DB insert/update failing, the API itself will return an error and the client will be able to handle it.

Thus, we have best of both the worlds - low response time as well as error resilience. Let us try to implement this in our AWS account.

Lambda Function

Start with creating a Lambda Function. This is simple. Just go to the Lambda Console, create a new Lambda Function. Call it StreamProcessor. (or any other name you like). Add the below code in it:



const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();

const TABLE_NAME = "Restaurant";
const TTL = 300;

exports.handler = async(event) => {
    var plist = [];
    event.Records.forEach(record => {
        if (record.eventName == "INSERT") {
            plist.push(processInsert(record.dynamodb.NewImage));
        }
    });

    await Promise.all(plist);
    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from Lambda!'),
    };
    return response;
};

const processInsert = async(newImage) => {
    // Business Logic to process the input
    console.log("Processing: " + JSON.stringify(newImage));
    var error = await businesslogic(newImage);

    if (!error) {
        await ddb.update({
            TableName: TABLE_NAME,
            Key: { id: newImage.id.S },
            UpdateExpression: "set #error = :error",
            ExpressionAttributeValues: { ":error": error },
            ExpressionAttributeNames: { "#error": "error" }
        }).promise();
    }
    else {
        await ddb.update({
            TableName: TABLE_NAME,
            Key: { id: newImage.id.S },
            UpdateExpression: "set #ttl = :ttl",
            ExpressionAttributeValues: { ":ttl": TTL + Math.floor(Date.now() / 1000) },
            ExpressionAttributeNames: { "#ttl": "ttl" }
        }).promise();
    }
};

const businesslogic = async(input) => {
    return;    // return "Error Details" in case of any error
};


Enter fullscreen mode Exit fullscreen mode

IAM Role for Lambda

This lambda function should have enough permissions to operate on the streams of DynamoDB, along with the update permissions. To get this, create a new IAM Role.

Along with the usual Lambda permissions, include the below for DB permissions



{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:UpdateItem",
                "dynamodb:GetShardIterator",
                "dynamodb:GetItem",
                "dynamodb:UpdateTable",
                "dynamodb:GetRecords"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:1234567890:table/TableName",
                "arn:aws:dynamodb:us-east-1:1234567890:table/TableName/index/*",
                "arn:aws:dynamodb:us-east-1:1234567890:table/TableName/stream/*"
            ]
        }
    ]
}


Enter fullscreen mode Exit fullscreen mode

DynamoDB Table

Next, we create the table in DynamoDB. This table should have a primary key - id. On the Exports & Streams tab, click on Enable DynamoDB Streams (Not the Kinesis Streams).

image.png

Along with that, add the Lambda Triggers to the stream. Click on Create Trigger and choose the Lambda Function we created above. And that sets up the stream + trigger.

Then, go the Additional Settings tab. There, we can enable the TTL as below.

image.png

That will setup the DynamoDB table for us.

API Gateway

Finally, we come to the API Gateway - to create an API. A client can invoke this API. Here, we configure the API Gateway to add the request into the database table. This is achieved using the AWS Service integration of the API Gateway. Let's work on that.

Create a new REST API on the API Gateway. And add a Put method in it. Integrate the request with DynamoDB as below:

image.png

And add this to the JSON Mapping Template.



{
    "TableName":"TableName",
    "Item": {
        "id": {"S": "$context.requestId"},
        "request": {"S": "$util.escapeJavaScript($input.body)"}
    }
}


Enter fullscreen mode Exit fullscreen mode

For every API call, this will add a new Item to the table - with the Request Id as the key. This insert will trigger the Lambda function and take the processing ahead.

Benefits

One would ask - what is the big deal? Why not directly call the lambda from API Gateway? There are two aspects to this. Foremost, the response time. Directly adding data from API Gateway to DynamoDB is a lot faster than invoking a Lambda. So the client application sees superfast response.

Second, resilience and error handling get complex when we invoke Lambda directly from the API gateway? What do we do if the Lambda fails half way? Retry the same API? Invoke another API that will heal the damage? This is too much intelligence to be carried into the client. It is best that the server manages this for itself.

The client just dumps data into the DynamoDB, and is assured that its data is accepted and will be processed by the server. It will not be lost because it is already in the DB. The processing and error handling is then localized on the server - leading to a cleaner architecture.

Top comments (4)

Collapse
 
rolfstreefkerk profile image
Info Comment hidden by post author - thread only accessible via permalink
Rolf Streefkerk

you're missing the feedback loop here, probably adding a websocket would solve that problem.

the other option is to use an API Gateway & Lambda Async function with a websocket, from the client's perspective it's virtually the same experience.

Collapse
 
solegaonkar profile image
Vikas Solegaonkar • Edited

Thanks Rolf for your inputs.

Websocket with API Gateway - Lambda is async anyway. But my main concern in this application was resilience - without the client worrying about retries. Not really worried about the feedback.

The original application had Lambda picking directly from the API Gateway. It had a complex processing logic - that would often fail halfway. Error handling was messy because of this. So I moved it to this architecture where the input request is baselined in the DB - leaving the client free of any error handling responsibility.

Additionally, as the Lambda processes the request, it updates the DB record with the status. This helps it resume the processing if it errors half way. I know we can do this partly in Step functions. But the functionality is pretty complex and interwoven, so I had no option but to do it in code.

Do suggest if there is a better way to solve this problem - Resilience in Web sockets - with no retries on the client.

Collapse
 
emil profile image
Emil • Edited

I would say that depends if you need a feedback at all. Mostly cases you can return http 201 and/or refresh the page and reload you data from the backend. A websocket feedback does not make sense in my view since its way more complicated and makes it synchronous somehow. You can then check the error state then.

The approach all behind that is a Saga

Thread Thread
 
rolfstreefkerk profile image
Rolf Streefkerk

It does not make it synchronous, in some cases you want to inform the end-user about the status and such a websocket feedback loop makes sense.

Some comments have been hidden by the post's author - find out more