In this tutorial we will implement a serverless message processing pipeline using Upstash Kafka and AWS Lambda. We will use Upstash Kafka as a source for an AWS Lambda function. The produced messages will trigger AWS Lambda, so your Lambda function will process the messages.
Because Upstash Kafka is a true serverless product, the whole pipeline will be serverless. You pay only when your pipeline is actively processing messages.
Create Upstash Kafka
First, create an Upstash Kafka cluster and topic following those steps. You will need the endpoint, username and password in the following steps.
Create AWS Lambda Function
Now let’s create an AWS Lambda function. For the best performance, select the same region with Upstash Kafka cluster. We will use Node.js runtime.
:::note
You can use Serverless Framework or AWS SAM for this step.
:::
Lambda Function Code
Update your function's code as below:
exports.handler = async (event) => {
if(!event.records) {
return {response : "no kafka event"};
}
for(let messages of Object.values(event.records)) {
for(let msg of messages) {
let buff = Buffer.from(msg.value, 'base64');
let text = buff.toString('ascii');
// process the message
console.log(text);
}
}
return {response : "success" };
};
The above code parses the Kafka message from the event parameter. AWS encodes the message using base64
so we decode the message and log it to the console.
Create AWS Secret
AWS Lambda trigger needs the Kafka credentials to be bundled as a secret. So we will create a secret in AWS Secrets Manager. Select Other type of secret
. Enter your Kafka cluster's username and password as key/value pairs as below:
In the next screen give a name to your secret.
Edit AWS Lambda Role
Now we need to configure the Lambda function’s role to access the secrets.
On the AWS Lambda function’s page, click on Configuration
tab and Permissions
. Click to the link just below the Role name
label.
The IAM management console will be opened in a new tab. On the Permissions
tab click on the link which starts with AWSLambdaBasicExecutionRole-....
Click on the Edit Policy
button and add this configuration in the JSON tab:
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": [
"REPLACE_THE_ARN_OF_THE_SECRET"
]
}
You need to replace the ARN of the secret that you created in the previous step.
Create the Trigger
Go back to your Lambda functions page and click the Add trigger
button. Select Apache Kafka
from the menu and fill in the inputs.
Bootstrap servers: copy/paste endpoint from Upstash console.
Topic name: enter your topic’s name
Click on the Add
button under Authentication. Select SASL_SCRAM_256_AUTH
and select the secret that you created in the previous step.
Check the Enable trigger
checkbox and you can leave the remaining inputs as they are.
Testing
Now let’s produce messages and see if AWS Lambda is processing the messages. Copy the curl URL to produce a message from Upstash Console.
➜ curl https://full-mantis-14289-us1-rest-kafka.upstash.io/produce/newtopic/newmessage -u ZnVsbC1tYW50aXMtMTQyODkkimaEsuUsiT9TGk3OFdjveYHBV9Jjzow03SnUtRQ:4-R-fmtoalXnoeu9TjQBOOL4njfSKwEsE10YvHMiW63hFljqUrrq5_yAq4TPGd9c6JbqfQ==
{
"topic" : "newtopic",
"partition" : 0,
"offset" : 48,
"timestamp" : 1639522675505
}
Check the cloudwatch (Lambda > Monitor > View logs in CloudWatch). You should see the messages you produced are logged by Lambda function.
Top comments (0)