DEV Community

Yasuhiro Matsuda for AWS Community Builders

Posted on • Edited on

DBのレコード更新をトリガーにLambdaを実行する(パート1)

複数のサブシステムにデータの同期を行いたい場合に、どのようなアーキテクチャにしますか?

重要なこととして、トランザクションが利用できないためにSQLの発行元は、データの一貫性が保てるように配慮する必要があります。

例えば、SQLの実行をLambdaで行った場合、データベースAとデータベースBにそれぞれ書き込みを行う必要がありますが、複数のレコードを更新する場合にはアプリケーションロジックが複雑になりがちです。

そこで、Trigger an AWS Lambda function from Amazon RDS for MySQL or Amazon RDS for MariaDB using audit logs and Amazon CloudWatchのアーキテクチャを紹介します。

この方法であれば、監査ログを使って実際に書き込みされたことをトリガーにしてLambdaが実行されるので、実装がシンプルで、データの同期の実装を後付けで行うこともできます。上記の例では、RDS for MySQL(もしくはMariaDB)になっていますが、冒頭の説明にもあるとおりAurora MySQLでも可能です。

ただこのアーキテクチャの場合には、CloudWatch Logsのサブスクリプションフィルターを使ってLambdaを実行するため、Lambdaの実行ができない場合が考えられます。Lambda関数の同時実行数の上限に達するなどの問題により実行エラー発生時のリトライを考慮したい場合には、CloudWatch LogsのサブスクリプションフィルターからLambdaを起動するのではなく、Kinesis Data Streamsに書き込み、イベントソースマッピングにて対応することができます。

複数のレコードをバッチにまとめてLambda関数で処理することで同時実行数を抑えたり、Lambda関数実行エラー時も自動でリトライ可能となり、最小限の呼び出しで実行できるようになります。

ここで注意したいのが、Aurora MySQL の監査ログの場合、パラメータグループ(character_set_database)で指定された文字コードではなく、UTF-8形式となっていてCloudWatch LogsからKinesis DataStreamへデータが書き込まれる際に、gzip 形式で圧縮されていることです。

そのため、CloudWatch Logsでは正しく表示されますが、Kinesis Data Streamのデータビュアーでは文字化けして表示されます。また、Lambdaのテストテンプレートにあります kinesis-get-recordsは、「Hello, this is a test 123.」のデータがBase64でエンコードされ、gzip圧縮されていないため、テストする際には以下の方法で事前にデータを準備しておく必要があります。

echo "hoge" > ~/work/tmp/hoge
gzip ~/work/tmp/hoge
cat ~/work/tmp/hoge.gz | base64
Enter fullscreen mode Exit fullscreen mode

ちなみにBase64をデコードする方法は以下の通りです。

echo -n "H4sICAgtzGYAA2hvZ2UAy8hPT+UCAJ2H4rkFAAAA" | base64 -d | zcat
Enter fullscreen mode Exit fullscreen mode

Kinesis DataStream のデータを参照するためのコードは他言語では参考となるものがあるものの、C#のコードがほとんど見あたりませんでした。AWSの公式ドキュメントを参考にして、試行錯誤しながら以下の方法で実装してみました。

using System.Text;
using System.Text.RegularExpressions;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using System.IO;
using System.IO.Compression;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace SyncStatus;

public class Function
{
    Regex reg = new Regex(",(?=(?:[^\']*\'[^\']*\')*[^\']*$)");
    public void FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {kinesisEvent.Records.Count} records...");

        foreach (var record in kinesisEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventId}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            try
            {
                string recordData = GetRecordContents(record.Kinesis);
                var messages = JObject.Parse(recordData).GetValue("logEvents");
                context.Logger.LogInformation($"Record Data(UTF-8):");
                foreach (var item in messages)
                {
                    var log = item["message"].Value<string>();
                    context.Logger.LogInformation((reg.Split(log))[8].Trim('\'').Replace("\\'", "'"));
                }
            }
            catch (Exception e)
            {
                context.Logger.LogError(e.Message);
            }
        }

        context.Logger.LogInformation("Stream processing complete.");
    }

    private string GetRecordContents(KinesisEvent.Record streamRecord)
    {
        using (var gZipStream = new GZipStream(streamRecord.Data, CompressionMode.Decompress))
        using (var memoryStreamOutput = new MemoryStream()) 
        {
            gZipStream.CopyTo(memoryStreamOutput);
            var outputBytes = memoryStreamOutput.ToArray();

            string decompressed = Encoding.UTF8.GetString(outputBytes);
            return decompressed;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

是非、Kinesis DataStreamを使って楽しいピタゴラスイッチライフを!

Top comments (0)