DEV Community

Execute Lambda triggered by DB record updates

What architecture would you use if you wanted to synchronize data across multiple subsystems?

Importantly, since transactions are not available, the SQL issuer must take care to ensure that data consistency is maintained.

For example, if SQL execution is performed by Lambda, it is necessary to write to database A and database B respectively, but the application logic tends to become complex when updating multiple records.

Therefore, Trigger an AWS Lambda function from Amazon RDS for MySQL or Amazon RDS for MariaDB using audit logs and Amazon CloudWatch architecture.

With this method, Lambda is triggered by actual writes using audit logs, so the implementation is simple and the implementation of data synchronization can be retrofitted. In the above example, RDS for MySQL (or MariaDB) is used, but as explained at the beginning of this article, Aurora MySQL is also possible.

However, in the case of this architecture, since Lambda execution is performed using the subscription filter of CloudWatch Logs, there may be cases where Lambda execution is not possible. If you want to consider retries when an execution error occurs due to a problem such as the maximum number of concurrent Lambda functions being reached, instead of launching Lambda from a subscription filter in CloudWatch Logs, write to Kinesis Data Streams and use Event Source Mapping.

By processing multiple records in a batch in a Lambda function, the number of concurrent executions can be reduced, and in the event of a Lambda function execution error, retries can be made automatically, allowing execution with a minimum number of calls.

It is important to note that the Aurora MySQL audit log is not in the character encoding specified in the parameter group (character_set_database), but in UTF-8 format, when data is written from CloudWatch Logs to Kinesis DataStream, it is in gzip format. When data is written from CloudWatch Logs to Kinesis DataStream, it is compressed in gzip format.

Therefore, the data is displayed correctly in CloudWatch Logs, but is garbled in the Kinesis Data Stream data viewer. Also, in the kinesis-get-records in the Lambda test template, the β€œHello, this is a test 123.” data is encoded in Base64 and not gzip compressed, so you must prepare the data in advance using the following method when testing Therefore, it is necessary to prepare the data in advance in the following way when testing.

echo "hoge" > ~/work/tmp/hoge
gzip ~/work/tmp/hoge
cat ~/work/tmp/hoge.gz | base64
Enter fullscreen mode Exit fullscreen mode

Incidentally, the method of decoding Base64 is as follows.

echo -n "H4sICAgtzGYAA2hvZ2UAy8hPT+UCAJ2H4rkFAAAA" | base64 -d | zcat
Enter fullscreen mode Exit fullscreen mode

While there are references to code for referencing Kinesis DataStream data in other languages, I could find very little code in C#. Referring to the official AWS documentation, I implemented the following method by trial and error. I tried to implement it in the following way.

using System.Text;
using System.Text.RegularExpressions;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using System.IO;
using System.IO.Compression;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace SyncStatus;

public class Function
{
    Regex reg = new Regex(",(?=(?:[^\']*\'[^\']*\')*[^\']*$)");
    public void FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {kinesisEvent.Records.Count} records...");

        foreach (var record in kinesisEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventId}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            try
            {
                string recordData = GetRecordContents(record.Kinesis);
                var messages = JObject.Parse(recordData).GetValue("logEvents");
                context.Logger.LogInformation($"Record Data(UTF-8):");
                foreach (var item in messages)
                {
                    var log = item["message"].Value<string>();
                    context.Logger.LogInformation((reg.Split(log))[8].Trim('\'').Replace("\\'", "'"));
                }
            }
            catch (Exception e)
            {
                context.Logger.LogError(e.Message);
            }
        }

        context.Logger.LogInformation("Stream processing complete.");
    }

    private string GetRecordContents(KinesisEvent.Record streamRecord)
    {
        using (var gZipStream = new GZipStream(streamRecord.Data, CompressionMode.Decompress))
        using (var memoryStreamOutput = new MemoryStream()) 
        {
            gZipStream.CopyTo(memoryStreamOutput);
            var outputBytes = memoryStreamOutput.ToArray();

            string decompressed = Encoding.UTF8.GetString(outputBytes);
            return decompressed;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We hope you enjoy your Pythagorean life with Kinesis DataStream!

Top comments (0)