DEV Community

Cover image for Enrich DynamoDB stream (List/Map) using EventBridge Pipes and Step Functions
Rakesh Sanghvi
Rakesh Sanghvi

Posted on • Originally published at awsmantra.com

Enrich DynamoDB stream (List/Map) using EventBridge Pipes and Step Functions

Use Case:-

I am sure you all remember how much boilerplate code we used to write when we convert the DynamoDB stream to the business object. Here is a small snippet of the GoLang program.

    m := &EntitySearch{}

    for name, value := range recordImage {
        if name == "ExternalId" {
            m.ExternalId = value.String()
        } else if name == "Source" {
            m.Source = value.String()
        } else if name == "EntityId" {
            m.EntityId = value.String()
        } else if name == "EntityType" {
            m.EntityType = value.String()
        }
Enter fullscreen mode Exit fullscreen mode

During ReInvent 2022, the introduction of EventBridge pipes brought about a significant transformation in how we utilize the DynamoDB stream by converting it into business objects. This breakthrough has nearly eliminated the need for writing Lambda code for the conversion process. But what if you have a DynamoDB List/Map object in your stream? EventBridge pipes don't support all JSON Path.

No Json Path

What if you have List or Map objects like this respectively?

"Skills": [
  "AWS",
  "GoLang",
  "Java",
  "TypeScript"
]
Enter fullscreen mode Exit fullscreen mode
{
  "orderItems": [
    {
      "itemId": "1234567890",
      "quantity": 1,
      "price": 10.00
    },
    {
      "itemId": "9876543210",
      "quantity": 2,
      "price": 5.00
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

if you’ve played with EventBridge Pipes you know that you can do a bit of a transform in target if you try to access Skills objects like this "$.dynamodb.NewImage.Skills.L[*].S", you will get an error

Here is the solution where I used the Pipeline Enrichment step with the Step Function. Step Function support fully JSON path.

The Architecture:

Architecture

Step Functions:-

Step Functions

You can download the source code from here, which has been implemented using AWS CDK and SAM, whichever approach you prefer.

CDK Deploy:-

1) cd cdk
2) npm install
3) cdk deploy --all -a "npx ts-node bin/app.ts" --profile <your profile name>
Enter fullscreen mode Exit fullscreen mode

SAM Deploy:-

1) cd sam
2) sam deploy --stack-name EventBridgePipeEnrich --capabilities    CAPABILITY_NAMED_IAM --guided --profile <profile name>
Enter fullscreen mode Exit fullscreen mode

Insert Data in DynamoDB:-

1) cd dynamodb
2) ./dynamodb.sh
Enter fullscreen mode Exit fullscreen mode

You will see the below record in the DynamoDB table.

DynamoDB

Here are the final Cloudwatch logs after an Enrichment.

Cloudwatch

Understand State Machine:-

I am utilizing straightforward Step Functions with an Inline Map to iterate over a DynamoDB Stream. It's worth noting that in this case, I am using the expression 'Skills.$' to refer to the 'Skills' attribute within the 'NewImage' object of the DynamoDB record, where the values are stored as a list of strings ('L[*].S').

{
  "Comment": "A description of my state machine",
  "StartAt": "DynamoDB Map",
  "States": {
    "DynamoDB Map": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "INLINE"
        },
        "StartAt": "Transform",
        "States": {
          "Transform": {
            "Type": "Pass",
            "Parameters": {
              "details": {
                "meta-data": {
                  "correlationId.$": "$.eventID",
                  "eventName.$": "$.eventName"
                },
                "data": {
                  "PK.$": "$.dynamodb.Keys.PK.S",
                  "SK.$": "$.dynamodb.Keys.SK.S",
                  "Skills.$": "$.dynamodb.NewImage.Skills.L[*].S",
                  "Contact": {
                    "Home.$": "$.dynamodb.NewImage.Contact.M.Home.S",
                    "Phone.$": "$.dynamodb.NewImage.Contact.M.Phone.S"
                  }
                }
              }
            },
            "InputPath": "$",
            "End": true
          }
        }
      },
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

DynamoDB table and Stream:-

To ensure that EventBridge pipes can read it, we must activate the DynamoDB stream. Reading Both the new and the old images of the item.

   this._table = new dynamodb.Table(this, id, {
   billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
   removalPolicy: cdk.RemovalPolicy.DESTROY,
   partitionKey: { name: 'PK', type: dynamodb.AttributeType.STRING },
   sortKey: { 
            name: 'SK', type: dynamodb.AttributeType.STRING },
            tableName: `Employee`,
            stream: StreamViewType.NEW_AND_OLD_IMAGES
        });        
Enter fullscreen mode Exit fullscreen mode

EventBridge Pipes:-

Here is how I set up EventBridge pipes.

 const pipe = new pipes.CfnPipe(this, 'pipe', {
            name: 'employee-app-pipe',
            roleArn: props.employeePipeRole.roleArn,
            source: props.table.tableStreamArn!,
            target:  props.employeeEventBus.targetArn,
            sourceParameters: {
                dynamoDbStreamParameters: {
                    startingPosition: 'LATEST',
                    batchSize: 1,
                    deadLetterConfig: {
                        arn: props.employeeDLQueue.queueArn,
                    },
                    maximumRetryAttempts: 1,
                },
                filterCriteria: {
                    filters: [{
                     pattern: '{ "eventName": ["INSERT","MODIFY"] }',
                    }],
                }
            },
       enrichment: props.employeeAppStateMachine.stateMachine.attrArn,
            targetParameters: {
                eventBridgeEventBusParameters: {
                    detailType: 'EmployeeDetailsChanged',
                    source: 'employee-app',
                },
            },
        });
Enter fullscreen mode Exit fullscreen mode
  • Reading LATEST from the DynamoDB stream.

  • Reading 1 Item in batch due to Sample Example.

  • Make sure you set "maximumRetryAttempts" value. By default value is -1 and it will keep retrying for the next 24 hours. you can read more here.

  • For Enrichment steps, call Step Functions props.employeeAppStateMachine.stateMachine.attrArn

Wrap-Up:-

In the provided example, we demonstrate the utilization of EventBridge Enrichment for processing complex JSON structures, specifically DynamoDB List/Map data types. This approach is necessary because EventBridge pipes lack complete support for JSON path expressions. Employing this example will prove useful when working with intricate JSON structures.

I hope you enjoy reading this post, and I sincerely hope it proves beneficial to you during your implementation process.

Top comments (0)