Introduction
We will be looking at how to set up an OpenSearch index from a DynamoDB table. We will assume you have some knowledge of DynamoDB and Lambdas and also are familiar with using CDK for deploying infrastructure into AWS.
DynamoDB
Firstly, let’s think about our DynamoDB table and how to set it up in a way that it is ready to be indexed. This is actually very straight forward and utilises a DynamoDB stream.
"A DynamoDB stream is an ordered flow of information about changes to items in a DynamoDB table." - AWS
Using CDK your table might look like:
const userTable = new dynamodb.Table(this, "UserTable", {
tableName: "user-table",
billingMode: BillingMode.PAY_PER_REQUEST,
partitionKey: {name: "partitionKey", type: AttributeType.STRING},
sortKey: {name: "sortKey", type: AttributeType.STRING},
pointInTimeRecovery: true,
stream: StreamViewType.NEW_IMAGE // This is the important line!
});
If you are using the console instead of CDK see this.
There are different types of streams:
KEYS_ONLY - Only the key attributes of the modified item are written to the stream.
NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream.
OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream.
NEW_AND_OLD_IMAGES - Both new and old item images of the item are written to the stream.
Here we have chosen NEW_IMAGE
because we only need to know the new item to index.
This will create a table with a DynamoDB stream, which means any new, updated or deleted item events will be streamed into a place of your choosing; we have chosen a Lambda.
Lambda
So, next up, we must think about the indexing Lambda. There is currently no direct way to index your data from a stream to the OpenSearch domain, so we must add a middle man to do the work. More on this can be found here.
The code for this Lambda and a few other helpful Lambdas can be found here on Github. This Lambda lives in the index-stream
directory.
Here is a code snippet of this Lambda’s handler:
export const handler = async (event: DynamoDBStreamEvent): Promise<void> => {
console.log("Received event from the user table");
for (const record of event.Records) {
if (!record.eventName || !record.dynamodb || !record.dynamodb.Keys) continue;
const partitionKey = record.dynamodb.Keys.partitionKey.S;
const sortKey = record.dynamodb.Keys.sortKey.S;
// Note here that we are using a pk and sk
// but maybe you are using only an id, this would look like:
// const id = record.dynamodb.Keys.id.S;
try {
if (record.eventName === "REMOVE") {
// performing a DELETE request to your index
return await removeDocumentFromOpenSearch(partitionKey, sortKey);
} else {
// There are 2 types of events left to handle, INSERT and MODIFY,
// which will both contain a NewImage
if (!record.dynamodb.NewImage) continue;
const userDocument = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage) as User;
// performing a PUT request to your index
return await indexDocumentInOpenSearch(userDocument, partitionKey, sortKey);
}
} catch (error) {
console.error("Error occurred updating OpenSearch domain", error);
throw error;
}
}
};
In CDK you can create your Lambda as follows:
const userTableIndexingFunction = new Function(this, "UserTableIndexingFunction", {
functionName: "UserTableIndexingFunction",
code: Code.fromAsset("user-table-indexing-lambda-dist-folder"),
runtime: Runtime.NODEJS_16_X,
handler: "index.handler"
});
Then we can add the DynamoDB stream as a source event to this Lambda.
userTableIndexingFunction.addEventSource(new DynamoEventSource(userTable, {
startingPosition: StartingPosition.TRIM_HORIZON,
batchSize: 1, // Our lambda could handle this being more than 1 as well but of the for loop
retryAttempts: 3
}));
There are 2 types of starting positions:
TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system,
which is the oldest data record in the shard.
In other words, the stream will look at all the item events and
deal with them in chronological order (oldest event to most recent event)
LATEST - Start reading just after the most recent record in the shard,
so that you always read the most recent data in the shard.
In other words, the stream will look at all the item events and
deal with the most recent first and work down until the oldest event.
For this example, we therefore use TRIM_HORIZON
so that the index will reflect the data in its current state.
OpenSearch
Now, let’s look at the actual OpenSearch domain setup. Now, AWS suggests some substantial power (and therefore money) for a production ready domain. You can find the best practices here. For this example we will use a very small setup with no redundancy, however, feel free to scale this up based on your needs:
const openSearchDomain = new Domain(this, "OpenSearchDomain", {
version: EngineVersion.OPENSEARCH_1_0,
capacity: {
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
masterNodes: 0
},
ebs: {
enabled: true,
volumeSize: 50,
volumeType: EbsDeviceVolumeType.GENERAL_PURPOSE_SSD
}
});
This will deploy your OpenSearch domain, this can take some time, so be patient.
One final thing to think about is the granting your Lambda the rights to read and write to your domain. In your stack with your OpenSearch domain, add this:
openSearchDomain.grantIndexReadWrite("user-index", userTableIndexingFunction);
This will allow your Lambda to do its job.
That's it, you are all set up and ready to index any new data into your OpenSearch index.
For indexing existing data, you can find a helpful Lambda under the index-data
directory here on Github.
Top comments (1)
cool article 👍