DEV Community

Yasuhiro Matsuda for AWS Community Builders

Posted on • Edited on

3

DBのレコード更新をトリガーにLambdaを実行する(パート1)

複数のサブシステムにデータの同期を行いたい場合に、どのようなアーキテクチャにしますか?

重要なこととして、トランザクションが利用できないためにSQLの発行元は、データの一貫性が保てるように配慮する必要があります。

例えば、SQLの実行をLambdaで行った場合、データベースAとデータベースBにそれぞれ書き込みを行う必要がありますが、複数のレコードを更新する場合にはアプリケーションロジックが複雑になりがちです。

そこで、Trigger an AWS Lambda function from Amazon RDS for MySQL or Amazon RDS for MariaDB using audit logs and Amazon CloudWatchのアーキテクチャを紹介します。

この方法であれば、監査ログを使って実際に書き込みされたことをトリガーにしてLambdaが実行されるので、実装がシンプルで、データの同期の実装を後付けで行うこともできます。上記の例では、RDS for MySQL(もしくはMariaDB)になっていますが、冒頭の説明にもあるとおりAurora MySQLでも可能です。

ただこのアーキテクチャの場合には、CloudWatch Logsのサブスクリプションフィルターを使ってLambdaを実行するため、Lambdaの実行ができない場合が考えられます。Lambda関数の同時実行数の上限に達するなどの問題により実行エラー発生時のリトライを考慮したい場合には、CloudWatch LogsのサブスクリプションフィルターからLambdaを起動するのではなく、Kinesis Data Streamsに書き込み、イベントソースマッピングにて対応することができます。

複数のレコードをバッチにまとめてLambda関数で処理することで同時実行数を抑えたり、Lambda関数実行エラー時も自動でリトライ可能となり、最小限の呼び出しで実行できるようになります。

ここで注意したいのが、Aurora MySQL の監査ログの場合、パラメータグループ(character_set_database)で指定された文字コードではなく、UTF-8形式となっていてCloudWatch LogsからKinesis DataStreamへデータが書き込まれる際に、gzip 形式で圧縮されていることです。

そのため、CloudWatch Logsでは正しく表示されますが、Kinesis Data Streamのデータビュアーでは文字化けして表示されます。また、Lambdaのテストテンプレートにあります kinesis-get-recordsは、「Hello, this is a test 123.」のデータがBase64でエンコードされ、gzip圧縮されていないため、テストする際には以下の方法で事前にデータを準備しておく必要があります。

echo "hoge" > ~/work/tmp/hoge
gzip ~/work/tmp/hoge
cat ~/work/tmp/hoge.gz | base64
Enter fullscreen mode Exit fullscreen mode

ちなみにBase64をデコードする方法は以下の通りです。

echo -n "H4sICAgtzGYAA2hvZ2UAy8hPT+UCAJ2H4rkFAAAA" | base64 -d | zcat
Enter fullscreen mode Exit fullscreen mode

Kinesis DataStream のデータを参照するためのコードは他言語では参考となるものがあるものの、C#のコードがほとんど見あたりませんでした。AWSの公式ドキュメントを参考にして、試行錯誤しながら以下の方法で実装してみました。

using System.Text;
using System.Text.RegularExpressions;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using System.IO;
using System.IO.Compression;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace SyncStatus;

public class Function
{
    Regex reg = new Regex(",(?=(?:[^\']*\'[^\']*\')*[^\']*$)");
    public void FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {kinesisEvent.Records.Count} records...");

        foreach (var record in kinesisEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventId}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            try
            {
                string recordData = GetRecordContents(record.Kinesis);
                var messages = JObject.Parse(recordData).GetValue("logEvents");
                context.Logger.LogInformation($"Record Data(UTF-8):");
                foreach (var item in messages)
                {
                    var log = item["message"].Value<string>();
                    context.Logger.LogInformation((reg.Split(log))[8].Trim('\'').Replace("\\'", "'"));
                }
            }
            catch (Exception e)
            {
                context.Logger.LogError(e.Message);
            }
        }

        context.Logger.LogInformation("Stream processing complete.");
    }

    private string GetRecordContents(KinesisEvent.Record streamRecord)
    {
        using (var gZipStream = new GZipStream(streamRecord.Data, CompressionMode.Decompress))
        using (var memoryStreamOutput = new MemoryStream()) 
        {
            gZipStream.CopyTo(memoryStreamOutput);
            var outputBytes = memoryStreamOutput.ToArray();

            string decompressed = Encoding.UTF8.GetString(outputBytes);
            return decompressed;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

是非、Kinesis DataStreamを使って楽しいピタゴラスイッチライフを!

Heroku

Build apps, not infrastructure.

Dealing with servers, hardware, and infrastructure can take up your valuable time. Discover the benefits of Heroku, the PaaS of choice for developers since 2007.

Visit Site

Top comments (0)

Best Practices for Running  Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK cover image

Best Practices for Running Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK

This post discusses the process of migrating a growing WordPress eShop business to AWS using AWS CDK for an easily scalable, high availability architecture. The detailed structure encompasses several pillars: Compute, Storage, Database, Cache, CDN, DNS, Security, and Backup.

Read full post

👋 Kindness is contagious

Dive into an ocean of knowledge with this thought-provoking post, revered deeply within the supportive DEV Community. Developers of all levels are welcome to join and enhance our collective intelligence.

Saying a simple "thank you" can brighten someone's day. Share your gratitude in the comments below!

On DEV, sharing ideas eases our path and fortifies our community connections. Found this helpful? Sending a quick thanks to the author can be profoundly valued.

Okay