DEV Community

Tom Milner for AWS Community Builders

Posted on • Edited on

Select Count(*) from DynamoDB Group By with Streams

Alt Text

I have recently been playing around with DynamoDB Streams for a side project. I've learned a few things while I was getting it setup and I put together this article to share what I learned.

This tutorial will show you how to use DynamoDB Streams to maintain an up to date count of different grouping of items in the same table as the items themselves. For each unique pk1 value in the table, I will maintain a count of sales per day, per month, per year and for all time. The sort key will be a date value. Every insert of a new sale will result in four updates to the table. If the count item does not exist for date period, it will be automatically created. See example of count items below.

Alt Text

I could use a traditional relational database and calculate the count on query each time I need it. My query would look something like this:



SELECT COUNT(*)
FROM tablea a
WHERE a.key = 'PROD#0001'
AND a.orderdate BETWEEN '2021-03-01 00:00:00.000' AND '2021-03-31 23:59:59.999';


Enter fullscreen mode Exit fullscreen mode

That will always return the number of relevant items in the table for March 2021. In most cases, provided you are using the primary key and/or an indexed field, this will return speedily enough. However, this does not scale and performance won't be consistent. As the data grows in the table, the query will take longer to run. Also, depending on the load on the database, the performance of this query could be impacted.

It is for this reason that I want to use DynamoDB. I know of no other database that gives you this level of consistency and fast performance with a pay per use model.

However, DynamoDB is not a traditional dbms, it is a NOSQL database designed to scale out. It's underlying architecture is designed to allow a table to scale out to almost limitless size and for consistent millisecond query run times. To do this, DynamoDB imposes certain limitations on the queries you can issue against the table. A SQL statement where you recalculate count on the fly each time is not possible in DynamoDB.

In theory, you could issue a wide query that would scan the widest range of items. However this is inefficient and a lot of overhead on every call. Alex Debrie does a far better of explaining this than I ever could in his article on filter expressions, https://www.alexdebrie.com/posts/dynamodb-filter-expressions/.
Leaving the performance issue to one side, the obvious reason not to use it is that a single DynamoDB query operation can only read up to a maximum of 1mb. Therefore if the combined size of the all the items you need to count is greater than this, your query transaction will be restricted by this limit too.

You could issue a DescribeTable request for your table. This returns an ItemCount for the table and another for each GSI and LSI on the table. It returns a count of the number of items in each index but wouldn't be able to get a count at any finer grain than that.

So if we can't count the relevant items by reading every single one of them on the fly, what are the options? DynamoDB has a feature called Streams that can be used to maintain a count of items written, deleted and updated on the table. The rest of this article shows how I implemented this to build a solution that I believe is ultimately more efficient than the traditional SELECT COUNT(*) approach.

Tutorial

I will share a Github link with all the code and an AWS SAM template at the end of the article that pulls all this together. For now, I will show you how to set this up by going through the console. I think it's the best way to illustrate the solution.

DynamoDB

1) Log into AWS console and go to the DynamoDB console.
2) Click Create table and enter details as below.

Alt Text

Leave all other settings as default and click Create.

3) Once table is created you will be brought to Overview tab for your new table. Click Manage DynamoDB stream button on this tab.

Alt Text

Choose New and old images option. We will need to filter out the aggregation records from the overall counts so will need item data to do this.

Alt Text

Your stream is now created for the table. Every item added, updated or removed from the table will result in a new entry in the stream containing the values of the new item and old item in the case of an update of an existing item.
Copy the DynamoDB table arn. You'll need it when setting up your IAM policy.

IAM

Before going to Lambda, you must create an IAM policy and associate it to a role.

4) Go to IAM console and select Roles from Access Management menu on the left hand side of screen. This will open a new screen showing all the custom roles created in your account. Click Create role.

5) Select Lambda under Choose a use case and click Next: permissions.

Alt Text

6) As we want to restrict permission to individual resources, therefore we want to create a custom policy. Click Create policy. This will open new screen.

Alt Text

7) On the Create policy screen, click the JSON tab and copy paste the JSON from the next 3 sections below in between the square brackets after Statement. Update the arns in the resource fields in each.

Alt Text

7.1) Allow role log to Cloudwatch

    {
    "Effect": "Allow",
    "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
    ],
    "Resource": [
        "arn:aws:logs:[region name]:[aws account number]:*"
        ]
    }
Enter fullscreen mode Exit fullscreen mode

7.2) Allow role to read from stream

    {
        "Effect": "Allow",
        "Action": [
            "dynamodb:GetShardIterator",
            "dynamodb:DescribeStream",
            "dynamodb:GetRecords",
            "dynamodb:ListStreams"
        ],
        "Resource": [
            "[Put your DynamoDB table arn here]/stream/*"
        ]
    }
Enter fullscreen mode Exit fullscreen mode

7.3) Allow the role update items on the table

    {
        "Effect": "Allow",
        "Action": "dynamodb:UpdateItem",
        "Resource": "[Put your DynamoDB table arn here]"
    }
Enter fullscreen mode Exit fullscreen mode

8) Click Next: Tags and then Next: Review.
9) Call the policy streamsdemo and click Create policy.
10) With the policy created, you can return to the Create role screen and attach the streamsdemo policy.

Alt Text

11) Click Next: Tags and then Next: Review.
12) On the Review screen, call the Role streamsdemo and click Create role.

Alt Text

Lambda

13) Go to Lambda console and click Create Function. Choose Author from scratch, use streamsdemo as Function name and choose Python 3.8 as your runtime. Expand the Change default execution role section of the screen. Pick Use an existing role and choose the streamsdemo role. Leave all other settings as they are. Click Create function.

Alt Text

14) From the Function overview tab, click + Add trigger. Choose DynamoDB. I think this should say DynamoDB Streams as the trigger is what is being added to the Stream, not the table but that might be just me being pedantic.

Alt Text

Fill out the details as below. Pick the arn of the table created earlier from the DynamoDB table dropdown. Set Batch size = 1 and Starting position = Trim horizon.

Alt Text

15) Now we can begin coding. On the Lambda Function overview screen, go to Code and click on lambda_function.py

Alt Text

15.1) Import the following libraries.



import json
import boto3
import traceback
from botocore.exceptions import ClientError


Enter fullscreen mode Exit fullscreen mode

15.2) Every lambda has to have a lambda_handler function. This is the entry point into the application. However, it is good practice to keep this as simple as possible and place your logic in other functions.



def lambda_handler(event, context):
    try:
        return _lambda_handler(event, context)
    except Exception:
        print (traceback.format_exc())


Enter fullscreen mode Exit fullscreen mode

15.3) Use this separate function to loop through records and orchestrate logic.



def _lambda_handler(event, context):

    records = event['Records']

    record1 = records[0]
    tableName = parseStreamArn(record1['eventSourceARN'])

    for record in records:

        event_name = record['eventName'].upper()  # INSERT, MODIFY, REMOVE
        pkValue = record['dynamodb']['Keys']['pk1']['S']
        skValue = record['dynamodb']['Keys']['sk1']['S']
        #print(keyValue)

        if (event_name == 'INSERT') and "sales_cnt" not in record['dynamodb']["NewImage"]:
            print(event_name)
            updateCounter(tableName,pkValue,skValue,1)

        #if (event_name == 'REMOVE') and "sales_cnt" not in record['dynamodb']["NewImage"]:
        #    updateCounter(tableName,pkValue,skValue,-1)

    return 'Successfully processed {} records.'.format(len(event['Records']))


Enter fullscreen mode Exit fullscreen mode

15.4) I am trying to make this lambda code as reusable as possible. Therefore I don't want to hardcode the table name so I included this function to parse the table name from the stream record.



def parseStreamArn(streamARN):
    tableName = streamARN.split(':')[5].split('/')[1]
    return(tableName)


Enter fullscreen mode Exit fullscreen mode

15.5) To link the handler and the update function, I built this function to build a dictionary of values to be inserted or updated as sort keys in the aggregation items in combination with the partition key of the triggering item.



def updateCounter(tableName,pkValue,skValue,counter):
    #always increment 0000 entry
    counterKey = [skValue[0:10], skValue[0:8]+ "00",skValue[0:5]+ "00-00","0000-00-00"]
    #persist changes to table
    updateDDBTable(tableName,pkValue,counterKey,counter)


Enter fullscreen mode Exit fullscreen mode

15.6) The update statement is secret sauce here, especially the UpdateExpression parameter. The if_not_exists condition will ensure that the application can insert and update records with the one call without an explicit existence check. If the record with that primary key does not exist, it will be created with the sales_cnt field set to value of init (0 in this case) and then the value of num (1 in this case) will be added to it all within the one call. This code is essential as it will automatically add new aggregation records every day a product is sold. Every other sale of the same product in the same day/month/year will just update the same record created by the initial sale.



def updateDDBTable(tableName,pkValue,counterKey,counter):
    dynamodb = boto3.resource('dynamodb')

    #Get table name from stream. Updates will be written back to same table
    dynamodb_table = dynamodb.Table(tableName)

    #loop through collection
    for i in counterKey:
        dynamodb_table.update_item(
                Key={
                        'pk1': pkValue,
                        'sk1': i,
                    },
                UpdateExpression="set sales_cnt = ((if_not_exists(sales_cnt,:init)) + :num)", #if record doesn't exist, create it
                ExpressionAttributeValues={
                        ':init': 0, #new record will be created with 0 + num value
                        ':num': counter
                    },
                ReturnValues="NONE"
                )


Enter fullscreen mode Exit fullscreen mode

15.7) This is the Lambda code in full.



import json
import boto3
import traceback
from botocore.exceptions import ClientError

def updateDDBTable(tableName,pkValue,counterKey,counter):
    dynamodb = boto3.resource('dynamodb')

    #Get table name from stream. Updates will be written back to same table
    dynamodb_table = dynamodb.Table(tableName)

    #loop through collection
    for i in counterKey:
        dynamodb_table.update_item(
                Key={
                        'pk1': pkValue,
                        'sk1': i,
                    },
                UpdateExpression="set sales_cnt = ((if_not_exists(sales_cnt,:init)) + :num)", #if record doesn't exist, create it
                ExpressionAttributeValues={
                        ':init': 0, #new record will be created with 0 + num value
                        ':num': counter
                    },
                ReturnValues="NONE"
                )

def updateCounter(tableName,pkValue,skValue,counter):
    #always increment 0000 entry
    counterKey = [skValue[0:10], skValue[0:8]+ "00",skValue[0:5]+ "00-00","0000-00-00"]
    #persist changes to table
    updateDDBTable(tableName,pkValue,counterKey,counter)

def parseStreamArn(streamARN):
    tableName = streamARN.split(':')[5].split('/')[1]
    return(tableName)

def _lambda_handler(event, context):

    records = event['Records']

    record1 = records[0]
    tableName = parseStreamArn(record1['eventSourceARN'])

    for record in records:

        event_name = record['eventName'].upper()  # INSERT, MODIFY, REMOVE
        pkValue = record['dynamodb']['Keys']['pk1']['S']
        skValue = record['dynamodb']['Keys']['sk1']['S']
        #print(keyValue)

        if (event_name == 'INSERT') and "sales_cnt" not in record['dynamodb']["NewImage"]:
            print(event_name)
            updateCounter(tableName,pkValue,skValue,1)

        #if (event_name == 'REMOVE') and "sales_cnt" not in record['dynamodb']["NewImage"]:
        #    updateCounter(tableName,pkValue,skValue,-1)

    return 'Successfully processed {} records.'.format(len(event['Records']))

def lambda_handler(event, context):
    try:
        return _lambda_handler(event, context)
    except Exception:
        print (traceback.format_exc())


Enter fullscreen mode Exit fullscreen mode

Testing

In my Git repo, I included a script (/tests/dataload.py) that loops through two iterator to generate 100 unique records for insert. You can run this multiple times to generate more records. Just update the tableName parameter to your own table if you used a different one.

You can use the queries below to query the table from the CLI.



aws dynamodb query --table-name streamsdemo --key-condition-expression "pk1 = :name" --expression-attribute-values  '{":name":{"S":"PROD#0001"}}'

aws dynamodb query --table-name streamsdemo --key-condition-expression "pk1 = :name and sk1 = :date" --expression-attribute-values  '{":name":{"S":"PROD#0001"},":date":{"S":"2021-03-00"}}'

aws dynamodb describe-table --table-name streamsdemo


Enter fullscreen mode Exit fullscreen mode

I have also included python scripts in the repo to run the same queries. See /tests/query1.py, /tests/query1.py, /tests/query2.py.

If you haven't AWS Cloudshell yet, you should try it. It is a great way to execute these scripts without having to setup up any access keys/secret keys from your machine.

Lambda settings

Batch size vs timeout vs memory

When using the SAM template to deploy the application, I originally had the batch size set to 10. This means that Lambda will try to fetch up to 10 records from the stream when it is invoked. If you have a large amount of records coming through, this will reduce the number of times Lambda gets invoked from once per record to once per 10 records. As I process each record singularly, this means my Lambda function will run for O(n) in a single invocation. Processing 10 records will take 10 times as long as processing 1 record.
I didn't set an explicit timeout so that was defaulted to 3 seconds. Trying to process 10 records in 3 seconds resulted in timeouts and highlighted a bug in my setup. When timeouts happen at the batch size greater than 1, all records will be put back on the queue for processing. This lead to records being counted more than once and ultimately incorrect counts. For example, if the timeout happened after the 5th record was counted, Lambda will think that no records have been processed and put all 10 back on the queue.

To fix this, I had 3 levers to try, reduce the batch size, increase the timeout or increase the memory. Increasing memory had no effect as this is a small function. Memory consumed in both cases normalized around 75mb, well below the minimum 128mb.
Increasing the timeout to 30 seconds when batch size was set to 10 solved the timeout problem in this case.
However, I ultimately went with reducing the batch size to 1. Timeouts may happen for other reasons and setting the batch size to 1 reduces the blast radius of any timeout to 1 record, minimizing the chances of counting records multiple times.

Cost

Lambda is charged for number of invocations and duration * memory allocated. The decision to set batch size to 1 record results in 10x more invocations than a batch size of 10 records. This does not lead to 10x increase in costs however as the billed duration is 10x lower. The higher number of invocations seems to count more to cost than billed duration but not massively. The difference of $1.80 will be worth it to ensure more accurate counts.

Batch Size Timeout Memory Billed Duration Cost per # invocations
1 record 3 secs 128mb 365ms $9.29 / 10 million
10 records 30 secs 128mb 3650ms $7.49 / 1 million

All pricing figures come from the Lambda pricing calculator on this page.

https://aws.amazon.com/lambda/pricing/

Use cases

This pattern will not be for every use case and there are other ways to do fulfil the requirement. For the side project I am working on, the DynamoDB table will have a high read to write ratio, meaning that it will have to support far more reads than writes. With a low number of writes, the DynamoDB streams use and Lambda invocations will be very low. Having the number of items pre-calculated and cached in a separate record will greatly reduce query times for the application.
Please comment below if you have any questions. I'm happy to help.

Github Repo

https://github.com/thomasmilner/ddbstreamsdemo

Top comments (0)