DEV Community

Yasuhiro Matsuda for AWS Community Builders

Posted on • Edited on

3

DBのレコード更新をトリガーにLambdaを実行する(パート1)

複数のサブシステムにデータの同期を行いたい場合に、どのようなアーキテクチャにしますか?

重要なこととして、トランザクションが利用できないためにSQLの発行元は、データの一貫性が保てるように配慮する必要があります。

例えば、SQLの実行をLambdaで行った場合、データベースAとデータベースBにそれぞれ書き込みを行う必要がありますが、複数のレコードを更新する場合にはアプリケーションロジックが複雑になりがちです。

そこで、Trigger an AWS Lambda function from Amazon RDS for MySQL or Amazon RDS for MariaDB using audit logs and Amazon CloudWatchのアーキテクチャを紹介します。

この方法であれば、監査ログを使って実際に書き込みされたことをトリガーにしてLambdaが実行されるので、実装がシンプルで、データの同期の実装を後付けで行うこともできます。上記の例では、RDS for MySQL(もしくはMariaDB)になっていますが、冒頭の説明にもあるとおりAurora MySQLでも可能です。

ただこのアーキテクチャの場合には、CloudWatch Logsのサブスクリプションフィルターを使ってLambdaを実行するため、Lambdaの実行ができない場合が考えられます。Lambda関数の同時実行数の上限に達するなどの問題により実行エラー発生時のリトライを考慮したい場合には、CloudWatch LogsのサブスクリプションフィルターからLambdaを起動するのではなく、Kinesis Data Streamsに書き込み、イベントソースマッピングにて対応することができます。

複数のレコードをバッチにまとめてLambda関数で処理することで同時実行数を抑えたり、Lambda関数実行エラー時も自動でリトライ可能となり、最小限の呼び出しで実行できるようになります。

ここで注意したいのが、Aurora MySQL の監査ログの場合、パラメータグループ(character_set_database)で指定された文字コードではなく、UTF-8形式となっていてCloudWatch LogsからKinesis DataStreamへデータが書き込まれる際に、gzip 形式で圧縮されていることです。

そのため、CloudWatch Logsでは正しく表示されますが、Kinesis Data Streamのデータビュアーでは文字化けして表示されます。また、Lambdaのテストテンプレートにあります kinesis-get-recordsは、「Hello, this is a test 123.」のデータがBase64でエンコードされ、gzip圧縮されていないため、テストする際には以下の方法で事前にデータを準備しておく必要があります。

echo "hoge" > ~/work/tmp/hoge
gzip ~/work/tmp/hoge
cat ~/work/tmp/hoge.gz | base64
Enter fullscreen mode Exit fullscreen mode

ちなみにBase64をデコードする方法は以下の通りです。

echo -n "H4sICAgtzGYAA2hvZ2UAy8hPT+UCAJ2H4rkFAAAA" | base64 -d | zcat
Enter fullscreen mode Exit fullscreen mode

Kinesis DataStream のデータを参照するためのコードは他言語では参考となるものがあるものの、C#のコードがほとんど見あたりませんでした。AWSの公式ドキュメントを参考にして、試行錯誤しながら以下の方法で実装してみました。

using System.Text;
using System.Text.RegularExpressions;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using System.IO;
using System.IO.Compression;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace SyncStatus;

public class Function
{
    Regex reg = new Regex(",(?=(?:[^\']*\'[^\']*\')*[^\']*$)");
    public void FunctionHandler(KinesisEvent kinesisEvent, ILambdaContext context)
    {
        context.Logger.LogInformation($"Beginning to process {kinesisEvent.Records.Count} records...");

        foreach (var record in kinesisEvent.Records)
        {
            context.Logger.LogInformation($"Event ID: {record.EventId}");
            context.Logger.LogInformation($"Event Name: {record.EventName}");

            try
            {
                string recordData = GetRecordContents(record.Kinesis);
                var messages = JObject.Parse(recordData).GetValue("logEvents");
                context.Logger.LogInformation($"Record Data(UTF-8):");
                foreach (var item in messages)
                {
                    var log = item["message"].Value<string>();
                    context.Logger.LogInformation((reg.Split(log))[8].Trim('\'').Replace("\\'", "'"));
                }
            }
            catch (Exception e)
            {
                context.Logger.LogError(e.Message);
            }
        }

        context.Logger.LogInformation("Stream processing complete.");
    }

    private string GetRecordContents(KinesisEvent.Record streamRecord)
    {
        using (var gZipStream = new GZipStream(streamRecord.Data, CompressionMode.Decompress))
        using (var memoryStreamOutput = new MemoryStream()) 
        {
            gZipStream.CopyTo(memoryStreamOutput);
            var outputBytes = memoryStreamOutput.ToArray();

            string decompressed = Encoding.UTF8.GetString(outputBytes);
            return decompressed;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

是非、Kinesis DataStreamを使って楽しいピタゴラスイッチライフを!

Reinvent your career. Join DEV.

It takes one minute and is worth it for your career.

Get started

Top comments (0)

Best Practices for Running  Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK cover image

Best Practices for Running Container WordPress on AWS (ECS, EFS, RDS, ELB) using CDK

This post discusses the process of migrating a growing WordPress eShop business to AWS using AWS CDK for an easily scalable, high availability architecture. The detailed structure encompasses several pillars: Compute, Storage, Database, Cache, CDN, DNS, Security, and Backup.

Read full post

👋 Kindness is contagious

Immerse yourself in a wealth of knowledge with this piece, supported by the inclusive DEV Community—every developer, no matter where they are in their journey, is invited to contribute to our collective wisdom.

A simple “thank you” goes a long way—express your gratitude below in the comments!

Gathering insights enriches our journey on DEV and fortifies our community ties. Did you find this article valuable? Taking a moment to thank the author can have a significant impact.

Okay