DEV Community

Robert Slootjes
Robert Slootjes

Posted on

QLDB to EventBridge using Pipes

Last year I did a project with AWS Quantum Ledger Database or simply QLDB. QLDB is a fully managed ledger database which keeps track of every change through a journal. This makes it possible to see the complete history inserts, updates and deletes but also queries being fired at it. As a database it has its limitations but this is a very nice product if having a reliable audit trail is crucial.

For this project I was looking for a reliable way to act on changes in the database using EventBridge without the application publishing it. I prefer to do everything async as much as possible to avoid the database record to have changed and have the application fail to publish the event as that means I lost valuable information. The good news is that QLBD supports Streams which sends it data to a Kinesis Data Stream. This way, you do not have to worry about losing any data. It sounds pretty easy and it kind of is but the bad news is that the records from QLDB are in the ion format and on top of that, Kinesis also aggregates the records.

An event from QLDB in a Kinesis Data Stream looks something like this:

[
  {
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000000:49636282114612585653977882531504014001922575972444405762",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456:role/qldbpipe-ledger-stream-pipe-role",
    "awsRegion": "eu-west-1",
    "eventSourceARN": "arn:aws:kinesis:eu-west-1:123456:stream/qldbpipe-dev-ledger",
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "aa2f52e2b20be04ac9e38683dff18447fecf9e89a62bf5744428e6b650138f37",
    "sequenceNumber": "49636282114612585653977882531504014001922575972444405762",
    "data": "4RVR6l4S5ZXU3xYwy74S24ZosXIzL3IpQNWkHOAlzeAcP29pQWI5tXNYtXc5sX9yQZozsX9arFWbQYAct3FZt3IpPN5bJNJBt2MouNMlP2MFs4ZFtdWlt2WauXcmsbcbaf5zsX9arZIgsNMquXWktZczsX9arFyyt2zCQN5FtdcctFyyt2zFbOSpQOQgs3MqHdomP2kZPOEfaf9cseIprNMqJXWqrVogt3JFa3IpPN5qPNEFrN9lJN5ds4gquXWFyNZcseIqzOEFPOIcsNMluZcquXWpuWIgsNNFa3EFPOIcsNM9uVIgQ2MquE4UnPhFqdWpsagyu3D68NobPagcuJZ3QOEFCKV6FUbpDab3FKLpDUtoFeEFtdMysJ9osXIztXcnQJZbQOPmDYuQKZtnDKMmNOPqEWgkIMckJbgXDPlEHboGHFkwLZM3LWJNPqvRk2E3g2FagQWJ3uVuOW6rZQKJKAArOMWPaSLsYRnapZTW51FcbgZud9nrdHpJL9ADZQZEWEZrcANQqrIrfRG5fpLa5FxnpTJihThCZCzjxmxJjeayfGw8PIY/j+vzrPi9OIVBFr2LSFGE5FlfWdZUhLOZX34TgjOBGTyb7vGCYOOPeF/3Fck2ehCuuUucB6xd7uSYSN/v92EwhOF4pQZZNDOKmy3ZIlFEjlGLUpSdK2MmjNlfEdbg//sUl838Ny84FwbqcrgmIEolPaJ69vF7n+MiJ6ZfB6xuU2SGVGaxanFqZBQGHLRs9ZSxIVH7Ymd2ZzsEHqbwllN3lJOmlYv35zFiWEWKVMUMTRhZVQJKFFxrN5ds3AkPOIgs25wt2EfQNZyCeMqQOAwuXWzsX7qdNlRU+rDcZ+Ke8DUn5hlfC67Zfb4/laSZmVAdyY5sSZY+BuFxQFoFIBRh4f4Jmh3",
    "approximateArrivalTimestamp": 1671549572.187
  }
]
Enter fullscreen mode Exit fullscreen mode

Great, now what? If this was published to EventBridge, adding filter rules to this data isn't really possible...

What I did a year ago is that I had a Lambda function as a trigger on the Kinesis Data Stream that deaggregates the records and converts ion into json. Then I would publish it to EventBridge. This worked perfectly fine but when EventBridge Pipes launched I wanted to see if I could reduce the amount of code I needed to achieve the same result.

EventBridge Pipes

In this scenario a Kinesis Data Stream is the source and the EventBridge is the target. The only glue required is a Lambda that takes care of the enrichment which in my case is deaggregating and converting from ion to json.

As I strongly believe you shouldn't be clicking things in the console but rather use CloudFormation I created a fully working proof of concept using Serverless Framework which you can find on my Github. In this proof of concept I'm just the logging the events to S3 but you can tweak this to suit your needs of course. Also, if anyone has a better way of converting ion to json, please share it as I went for quick and dirty (it is very effective however :-).

Data flow
Data flow

With the proof of concept a SELECT query done on QLDB would look something like this when published to EventBridge:

{
  "version": "0",
  "id": "4dad5941-44b2-d3cc-4373-5763ebec04cb",
  "detail-type": "Event from aws:kinesis",
  "source": "Pipe qldbpipe-dev-ledger",
  "account": "123456",
  "time": "2023-03-04T09:41:35Z",
  "region": "eu-west-1",
  "resources": [],
  "detail": {
    "qldbStreamArn": "arn:aws:qldb:eu-west-1:123456:stream/qldbpipe-dev/0wYOW015oYv34ZmEYmJJF2",
    "recordType": "BLOCK_SUMMARY",
    "payload": {
      "blockAddress": {
        "strandId": "EKwDuqzkVSI2IiuEb0Tlp1",
        "sequenceNo": 653
      },
      "transactionId": "EFcCQMkOEPXDj5aVyogJbj",
      "blockTimestamp": "2023-03-04T09:41:35.292Z",
      "blockHash": "Z6gKNWpJ+kmeHLVmr2ZjT05ca4XhdbL6YU6Y5phGcEY=",
      "entriesHash": "AhDroVPQiFYltthOkVoGlwONAmG+tIBrvlzHVColmO7=",
      "previousBlockHash": "rGsA9+leZON3iZGRqW8PB2CaWVc0Lminy1DOSrbpKZ7=",
      "entriesHashList": ["AhCD3Slvf3F+wCazKP12zK0upXJ44/weIsfd7ceaFvI=", "", "a60i/33Xewkp2koI7+06NkuAt8v5qRGpKjCBVqdfe17="],
      "transactionInfo": {
        "statements": [{
          "statement": "SELECT * FROM tickets",
          "startTime": "2023-03-04T09:41:35.259Z",
          "statementDigest": "ux//sy9PGxtDcFw6VzZkzxssE74KCOA3vCULzGs/vCc="
        }]
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Of course this is still very verbose but at least I can add filter rules for this in EventBridge. Optionally you could also adjust the Lambda code to filter records or remove information you don't need. For instance, you might be interested only in inserts and updates.

Overall I'm quite happy with the end result as the only code I need to maintain now is a very simple transformer. What bothers me though is that it currently doesn't seem possible is to override the source and detail-type, these will be set by the Pipe itself:

"detail-type": "Event from aws:kinesis",
"source": "Pipe qldbpipe-dev-ledger",
Enter fullscreen mode Exit fullscreen mode

I think it would be a very nice feature to for instance set the source to "qldb" and the detail type to something like "select" or "insert". We could change the target of the pipe to be an SQS queue where we attach a Lambda that manually publishes the records to EventBridge with this configuration but that would add more glue code which I tried to avoid with this proof of concept. At this moment I am not yet sure what I will be using in production in my next project but for sure this gave me some good insights in the possibilities of EventBridge Pipes. Let me know what you would prefer in the comments.


To close off I'm excited and proud to announce that I was recently selected as Community Builder by AWS. Follow me to not miss any upcoming articles.

AWS Community Builder

Top comments (1)

Collapse
 
wakeupmh profile image
Marcos Henrique

awesome dude