<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Tasio Guevara</title>
    <description>The latest articles on DEV Community by Tasio Guevara (@tasio).</description>
    <link>https://dev.to/tasio</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F645259%2Fab045c33-2629-49ba-880a-fed019991cdc.jpeg</url>
      <title>DEV Community: Tasio Guevara</title>
      <link>https://dev.to/tasio</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/tasio"/>
    <language>en</language>
    <item>
      <title>Unlock innovation using data from your on-premises databases (III) - Event-driven apps</title>
      <dc:creator>Tasio Guevara</dc:creator>
      <pubDate>Mon, 09 May 2022 16:09:46 +0000</pubDate>
      <link>https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-iii-event-driven-apps-46m3</link>
      <guid>https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-iii-event-driven-apps-46m3</guid>
      <description>&lt;p&gt;In &lt;a href="https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-ii-streaming-state-changes-2948"&gt;my previous blog post&lt;/a&gt;, I described how to capture data changes in a relational database and push them as records in a Kinesis data stream.&lt;/p&gt;

&lt;p&gt;In this new post, I will describe how to convert those database operations into business events of potential interest in near real-time, so other applications can react to them.&lt;/p&gt;

&lt;p&gt;With the solution deployed in the previous post, now I have at my disposal a steady stream of all the changes that are committed to tables of interest in the source database. Now, it's a matter of consuming those changes and turning them into a more actionable construct: &lt;strong&gt;business events&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;For that, I am going to use two AWS Lambda functions and the Lambda &lt;a href="https://aws.amazon.com/blogs/compute/filtering-event-sources-for-aws-lambda-functions/" rel="noopener noreferrer"&gt;capability to filter event sources&lt;/a&gt; from &lt;a href="https://aws.amazon.com/kinesis/data-streams/" rel="noopener noreferrer"&gt;Amazon Kinesis Data Streams&lt;/a&gt;. This filtering will help me reduce the number of invocations, reducing cost and potentially allowing to scale my application more easily.&lt;/p&gt;

&lt;p&gt;When the Lambda functions receive new change data capture (CDC) records, they will, based on some business logic, publish events into the default Amazon EventBridge event bus. The idea behind these functions is to convert database operations into "business events" that carry more business context in their definition and data. Once those events are published in EventBridge, you can use the rich variety of supported EventBridge targets to build event-driven applications that react to them.&lt;/p&gt;

&lt;p&gt;For this example, I will publish two business events:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;NegativeProductReviewReceived&lt;/strong&gt;, issued when a product receives a bad review (less than 4 stars rating).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;ProductStockOverbooked&lt;/strong&gt;, issued When a transaction is filed with a quantity of a product above the available quantity in stock. Note that this is just to illustrate the use case, depending on the real business logic, a system might not allow an order that has quantities above existing stock.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;As aforementioned, I'll use two separate Lambda functions for processing records from the two source tables involved. I could use the same function and two filters, but I think that architecturally it makes more sense this way, because it's likely that two separate teams would take care of those business areas.&lt;/p&gt;

&lt;p&gt;The next sections describe the logic for each function.&lt;/p&gt;

&lt;h2&gt;
  
  
  Lambda function that processes changes on the ProductReview table
&lt;/h2&gt;

&lt;p&gt;This function will publish the &lt;strong&gt;NegativeProductReviewReceived&lt;/strong&gt; event. It will be triggered every time there is an update or insert operation on the &lt;strong&gt;Production.ProductReview&lt;/strong&gt; table and there is a review with a rating below 4. Although the event filter will be configured to check for those specific conditions, I will also include that logic in the function code. This way, if I want to use this function to generate other events, I can simply loosen up the triggering conditions and add that new logic in the function code. Note that, with that logic in the function, I could already configure a more permissive filter to trigger the function, but this way I avoid unnecessary function invocations and showcase how powerful the Lambda event filtering feature is.&lt;/p&gt;

&lt;p&gt;Some additional logic in the function will handle the special case of an update; the function will check that the previous value was not lower than or equal to the new value, avoiding issuing an event related to the same review twice. &lt;/p&gt;

&lt;h2&gt;
  
  
  Lambda function that processes changes on the TransactionHistory table
&lt;/h2&gt;

&lt;p&gt;This function will publish the &lt;strong&gt;ProductStockOverbooked&lt;/strong&gt; event. It will be triggered every time there is an update or insert operation in the &lt;strong&gt;Production.TransactionHistory&lt;/strong&gt; table. It will query the original database to check the existing stock of the product in the transaction and, if the amount is lower than the one found in the transaction, it will issue the &lt;strong&gt;ProductStockOverbooked&lt;/strong&gt; event.&lt;/p&gt;

&lt;h2&gt;
  
  
  The architecture
&lt;/h2&gt;

&lt;p&gt;This is the architecture for this solution. The most notable characteristic is that the Lambda function that processes the TransactionHistory table is configured to access resources in the same VPC where the AWS DMS replication instance sits. This is to provide connectivity to the source database and enable the function to query data to fulfill its business logic. Due to this and the function needing to access EventBridge to publish events, I will also need to deploy a VPC Endpoint for that service.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnrxnoe9lzuv64oygt11m.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnrxnoe9lzuv64oygt11m.png" alt="Solution architecture" width="800" height="328"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Deploying the solution
&lt;/h2&gt;

&lt;p&gt;To make it easier to package and deploy the Lambda functions and all the necessary resources (e.g.: IAM roles for the functions), I will use a very convenient tool when working with serverless applications: the &lt;a href="https://aws.amazon.com/serverless/sam/" rel="noopener noreferrer"&gt;AWS Serverless Application Model (SAM)&lt;/a&gt;. SAM uses templates similar to those of AWS CloudFormation, but it takes care of downloading and packaging all your function's dependencies into a &lt;strong&gt;.zip&lt;/strong&gt; file or a &lt;strong&gt;Docker&lt;/strong&gt; container, depending on the package type you choose. Furthermore, if you have runtime-specific dependencies (e.g.: Python packages that use C/C++ extensions), you can easily install a Labmda-compatible package by using Docker locally and the &lt;code&gt;--use-container&lt;/code&gt; flag when building the Lambda package. Finally, it uploads the function package or container to Amazon S3 or Amazon ECR, respectively, so you can deploy them using CloudFormation.&lt;/p&gt;

&lt;p&gt;If you want to follow the next steps, you need to &lt;a href="https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html" rel="noopener noreferrer"&gt;install the AWS SAM CLI&lt;/a&gt;. SAM provides project templates to initialize different kinds of sample serverless applications. I will skip that part, but I encourage you to check the &lt;a href="https://github.com/aws/aws-sam-cli-app-templates" rel="noopener noreferrer"&gt;built-in project templates&lt;/a&gt; or create your own using &lt;a href="https://cookiecutter.readthedocs.io/en/latest/README.html" rel="noopener noreferrer"&gt;Cookiecutter&lt;/a&gt;.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Set up the following project directory and file structure under a folder of your choice (e.g.: cdc2event)&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cdc2event
├── reviews
│   ├── app.py
│   └── requirements.txt
├── transactions
│   ├── app.py
│   └── requirements.txt
└── template.yaml
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Edit the &lt;strong&gt;app.py&lt;/strong&gt; file under the &lt;strong&gt;reviews&lt;/strong&gt; directory and paste the following code. You can read details about what the code does in the comments.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;base64&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dateutil.parser&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;parse&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;

&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;cdc2eventLogger&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;setLevel&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;level&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DEBUG&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;event_bridge&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;events&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;lambda_handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    This function processes CDC records from inserted and updated product reviews that have a rating below 4.
    For each of the records, it will publish an event in Amazon EventBridge.
    If the record has been updated, it will only publish an event if the previous value was above or equal to 4.
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt; &lt;span class="c1"&gt;# This list will containt the events to publish on the EventBridge event bus.
&lt;/span&gt;
    &lt;span class="c1"&gt;# An event can contain several Kinesis records.
&lt;/span&gt;    &lt;span class="c1"&gt;# Each Kinesis record contains data encoded in Base64, so it has to be decoded to a string.
&lt;/span&gt;    &lt;span class="c1"&gt;# The replication task was configured to deliver JSON objects, so these have
&lt;/span&gt;    &lt;span class="c1"&gt;# to be parsed into Python objects using the json package.
&lt;/span&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Records&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
        &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;base64&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;b64decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kinesis&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
        &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Processing data: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="c1"&gt;# If the record comes from a database update operation,
&lt;/span&gt;        &lt;span class="c1"&gt;# check whether the rating value was above or equal to 4.
&lt;/span&gt;        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;operation&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;update&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="nf"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;before&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Rating&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt; &lt;span class="ow"&gt;or&lt;/span&gt; \
            &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;operation&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;insert&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Rating&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Create an EventBridge event and add it to the list
&lt;/span&gt;            &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="nf"&gt;create_event&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;com.example.cdc2event&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Ignoring event: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Publish the events in the default EventBridge bus
&lt;/span&gt;    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Publishing to EventBridge: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;event_bridge&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;put_events&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;Entries&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_event&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    This function returns an EventBridge event based on the information within a record
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="c1"&gt;# The detail of the event is a dictionary with any arbitrary structure.
&lt;/span&gt;    &lt;span class="c1"&gt;# In this case it contains all the information from the data and metadata items
&lt;/span&gt;    &lt;span class="c1"&gt;# provided by DMS, as it may be relevant for applications downstream.
&lt;/span&gt;    &lt;span class="c1"&gt;# Depending on the use case, removing certain fields might be needed for regulatory purposes.
&lt;/span&gt;    &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Time&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;parse&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;timestamp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Source&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;DetailType&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;NegativeProductReviewReceived&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Detail&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;That piece of code uses an external Python package, so you need to include it in the Lambda package. For that, edit the &lt;strong&gt;requirements.txt&lt;/strong&gt; file under the &lt;strong&gt;reviews&lt;/strong&gt; directory and paste the following text.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;python-dateutil
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Edit the &lt;strong&gt;app.py&lt;/strong&gt; file under the &lt;strong&gt;transactions&lt;/strong&gt; directory and paste the following code. Check the comments for further insights into it.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;base64&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dateutil.parser&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;parse&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pymssql&lt;/span&gt;

&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;cdc2eventLogger&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;setLevel&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;level&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DEBUG&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;eventbridge&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;events&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;secretsmanager&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;secretsmanager&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;connection_details&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;secretsmanager&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_secret_value&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;SecretId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;DB_SECRET_ARN&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SecretString&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

&lt;span class="n"&gt;database&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;DB_NAME&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

&lt;span class="n"&gt;MAX_DB_READ_ATTEMPTS&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;connect_to_database&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="k"&gt;global&lt;/span&gt; &lt;span class="n"&gt;db_connection&lt;/span&gt;
    &lt;span class="n"&gt;db_connection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;db_connection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pymssql&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;connection_details&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;host&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;connection_details&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;port&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;connection_details&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;username&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;connection_details&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;password&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="n"&gt;database&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;database&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="n"&gt;pymssql&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;InterfaceError&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error connecting to the database, please check the host name.&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s"&gt; &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="n"&gt;pymssql&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DatabaseError&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error connecting to the database, check the credentials. &lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s"&gt; &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;db_connection&lt;/span&gt;

&lt;span class="c1"&gt;# Initialize the connection in the global scope.
# This way, the connection will be reused across function executions.
&lt;/span&gt;&lt;span class="n"&gt;db_connection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;connect_to_database&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;lambda_handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    This function processes CDC records from inserted and updated product reviews that have a rating below 4.
    For each of the records, it will publish an event in Amazon EventBridge.
    If the record has been updated, it will only publish an event if the previous value was above or equal to 4.
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt; &lt;span class="c1"&gt;# This list will containt the events to publish on the EventBridge event bus.
&lt;/span&gt;
    &lt;span class="c1"&gt;# An event can contain several Kinesis records.
&lt;/span&gt;    &lt;span class="c1"&gt;# Each Kinesis record contains data encoded in Base64, so it has to be decoded to a string.
&lt;/span&gt;    &lt;span class="c1"&gt;# The replication task was configured to deliver JSON objects, so these have
&lt;/span&gt;    &lt;span class="c1"&gt;# to be parsed into Python objects using the json package.
&lt;/span&gt;    &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;decode_kinesis_record&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Records&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

    &lt;span class="c1"&gt;# Filter out the records where the quantity has not changed to avoid duplicate events.
&lt;/span&gt;    &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Quantity&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="nf"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;before&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{}).&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Quantity&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;-1&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)),&lt;/span&gt;
        &lt;span class="n"&gt;records&lt;/span&gt;
    &lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Processing changed records: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;No records to process.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;
    &lt;span class="c1"&gt;# Extract all the product identifiers in all the records to be processed.
&lt;/span&gt;    &lt;span class="c1"&gt;# This way, there is only one round-trip to the database
&lt;/span&gt;    &lt;span class="n"&gt;product_ids&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ProductID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;stock&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;get_stock&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_ids&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Check that the inventory read was successful
&lt;/span&gt;    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;stock&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Create an event for every order that has a quantity above the aggregated stock in inventory
&lt;/span&gt;        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ProductID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Quantity&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;stock&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;)]:&lt;/span&gt;
                &lt;span class="c1"&gt;# Add the stock quantity to the business event
&lt;/span&gt;                &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;StockQuantity&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;stock&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;
                &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="nf"&gt;create_event&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;com.example.cdc2event&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Ignoring event: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Publish the events in the default EventBridge bus
&lt;/span&gt;    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Publishing to EventBridge: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;eventbridge&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;put_events&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;Entries&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;decode_kinesis_record&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    Decodes a CDC record and returns a python object
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;base64&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;b64decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;kinesis&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_stock&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_ids&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;attempt&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    Reads the aggregated stock in inventory of the products provided in the input parameter.
    Returns a dictionary where each key is the product id and the value is the quantity in stock.
    If the connection to the database is not successful, it will retry up to MAX_DB_READ_ATTEMPTS times.
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;global&lt;/span&gt; &lt;span class="n"&gt;db_connection&lt;/span&gt;
    &lt;span class="n"&gt;stock&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;db_connection&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="n"&gt;attempt&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;MAX_DB_READ_ATTEMPTS&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;cursor&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;db_connection&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;ids&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;product_ids&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

        &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SELECT ProductId, SUM([Quantity])&lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;                    FROM [Production].[ProductInventory]&lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;                    WHERE [ProductId] IN (&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;ids&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;)&lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;                    GROUP BY [ProductId]&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Executing query: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;row&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;fetchone&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;row&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;stock&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;row&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;])]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;row&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ProductId = &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;row&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;, Inventory Quantity = &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;row&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;row&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;fetchone&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;attempt&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;MAX_DB_READ_ATTEMPTS&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Attempting to reconnect to the database...&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;db_connection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;connect_to_database&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="nf"&gt;get_stock&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_ids&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;product_ids&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;attempt&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;attempt&lt;/span&gt;&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;stock&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_event&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    This function returns an EventBridge event based on the information within a record
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="c1"&gt;# The detail of the event is a dictionary with any arbitrary structure.
&lt;/span&gt;    &lt;span class="c1"&gt;# In this case it contains all the information from the data and metadata items
&lt;/span&gt;    &lt;span class="c1"&gt;# provided by DMS, as it may be relevant for applications downstream.
&lt;/span&gt;    &lt;span class="c1"&gt;# Depending on the use case, removing certain fields might be needed for regulatory purposes.
&lt;/span&gt;    &lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Time&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;parse&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;timestamp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]),&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Source&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;source&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;DetailType&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;ProductStockOverbooked&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Detail&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;metadata&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;This piece of code also uses external packages, so similarly, edit the &lt;strong&gt;requirements.txt&lt;/strong&gt; file under the &lt;strong&gt;transactions&lt;/strong&gt; directory and paste the following text. Note that &lt;strong&gt;pymssql&lt;/strong&gt; uses native extensions, so the SAM CLI feature to use a Lambda-compatible container will come handy.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;pymssql&amp;gt;=2.2.3
python-dateutil
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;At this step, fill in the &lt;strong&gt;template.yaml&lt;/strong&gt; file with the following SAM template. I'll highlight a couple of points I think of interest:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Each Lambda function uses a different architecture: arm64, which runs on AWS Graviton processors and is very cost-performance effective, and x86_64. The reason for the latter is that, at the time of writing, there are no ARM compatible versions for the &lt;strong&gt;pymssql&lt;/strong&gt; package.&lt;/li&gt;
&lt;li&gt;Note that there are no explicit IAM roles defined in the template. SAM will create a role with basic permissions for Lambda functions if you don't specify an explicit one. The nice thing is that you can also add other policies to that implicit role. You can use SAM policy templates, AWS managed policies, customer managed policies, or you can define your own inline. In this case, I opted for two SAM policy templates EventBridgePutEventsPolicy and AWSSecretsManagerGetSecretValuePolicy, which make it straightforward to grant those self-described permissions to the functions.
&lt;/li&gt;
&lt;/ul&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;AWSTemplateFormatVersion&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;2010-09-09&lt;/span&gt;
&lt;span class="na"&gt;Transform&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::Serverless-2016-10-31&lt;/span&gt;
&lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;SAM Template for resources in the III unlocking innovation blog post&lt;/span&gt;

&lt;span class="c1"&gt;# Sets the timeout for all the functions in the application&lt;/span&gt;
&lt;span class="na"&gt;Globals&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;Function&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Timeout&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt;

&lt;span class="na"&gt;Parameters&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;SourceStreamName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;String&lt;/span&gt;
    &lt;span class="na"&gt;Default&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;mssql-cdc&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ARN of the Kinesis data stream to use as source&lt;/span&gt;
&lt;span class="na"&gt;VpcId&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::EC2::VPC::Id&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Id of the VPC that has connectivity to the source database. A VPC Endpoint for EventBridge will be deployed there.&lt;/span&gt;
&lt;span class="na"&gt;SubnetIds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;List&amp;lt;AWS::EC2::Subnet::Id&amp;gt;&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;List of SubnetIds that have connectivity to the source database. A VPC Endpoint for EventBridge will deploy an ENI on each subnet.&lt;/span&gt;
&lt;span class="na"&gt;VpcEndpointSecurityGroupIds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;List&amp;lt;AWS::EC2::SecurityGroup::Id&amp;gt;&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;List of SecurityGroupIds for the VPC Endpoint for EventBridge.&lt;/span&gt;
&lt;span class="na"&gt;SecurityGroupIds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;List&amp;lt;AWS::EC2::SecurityGroup::Id&amp;gt;&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;List of SecurityGroupIds that allow connecting to the source database&lt;/span&gt;
&lt;span class="na"&gt;DatabaseConnectionSecretArn&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;String&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ARN of the AWS SecretsManager secret containing connection details for the source database&lt;/span&gt;
&lt;span class="na"&gt;DatabaseName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;String&lt;/span&gt;
    &lt;span class="na"&gt;Default&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AdventureWorks2019&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Name of the source database from where to read the current inventory&lt;/span&gt;

&lt;span class="na"&gt;Resources&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="c1"&gt;# This VPC endpoint is needed because the TransactionHistoryProcessingFunction will have ENIs deployed in private subnets&lt;/span&gt;
&lt;span class="na"&gt;EventBridgeVPCEndpoint&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::EC2::VPCEndpoint&lt;/span&gt;
    &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;PrivateDnsEnabled&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
    &lt;span class="na"&gt;SecurityGroupIds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;VpcEndpointSecurityGroupIds&lt;/span&gt;
    &lt;span class="na"&gt;ServiceName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Sub&lt;/span&gt; &lt;span class="s"&gt;com.amazonaws.${AWS::Region}.events&lt;/span&gt;
    &lt;span class="na"&gt;SubnetIds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;SubnetIds&lt;/span&gt;
    &lt;span class="na"&gt;VpcEndpointType&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Interface&lt;/span&gt;
    &lt;span class="na"&gt;VpcId&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;VpcId&lt;/span&gt;

&lt;span class="na"&gt;ProductReviewProcessingFunction&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::Serverless::Function&lt;/span&gt;
    &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;This function processes records from the CDC-populated Kinesis data stream related to the ProductReview table that match certain conditions&lt;/span&gt;
    &lt;span class="na"&gt;CodeUri&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;reviews/&lt;/span&gt;
    &lt;span class="na"&gt;Handler&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;app.lambda_handler&lt;/span&gt;
    &lt;span class="na"&gt;Runtime&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;python3.9&lt;/span&gt;
    &lt;span class="na"&gt;Architectures&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;arm64&lt;/span&gt;
    &lt;span class="na"&gt;Events&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;KinesisSourceStream&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Kinesis&lt;/span&gt;
        &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;Stream&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Sub&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;arn:${AWS::Partition}:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${streamName}"&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;streamName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;SourceStreamName&lt;/span&gt;
            &lt;span class="na"&gt;StartingPosition&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;TRIM_HORIZON&lt;/span&gt;
            &lt;span class="na"&gt;FilterCriteria&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;Filters&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;Pattern&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;{&lt;/span&gt;
                    &lt;span class="s"&gt;"data":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;{&lt;/span&gt;
                        &lt;span class="s"&gt;"metadata":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;{&lt;/span&gt; 
                        &lt;span class="s"&gt;"operation":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["update",&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;"insert"],&lt;/span&gt;
                        &lt;span class="s"&gt;"schema-name":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["Production"],&lt;/span&gt;
                        &lt;span class="s"&gt;"table-name":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["ProductReview"],&lt;/span&gt;
                        &lt;span class="s"&gt;"record-type":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["data"]&lt;/span&gt;
                        &lt;span class="s"&gt;},&lt;/span&gt;
                        &lt;span class="s"&gt;"data":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;{&lt;/span&gt;
                        &lt;span class="s"&gt;"Rating":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;[&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;{"numeric":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["&amp;lt;",&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;4]}]&lt;/span&gt;
                        &lt;span class="s"&gt;}&lt;/span&gt;
                    &lt;span class="s"&gt;}&lt;/span&gt;
                    &lt;span class="s"&gt;}'&lt;/span&gt;
    &lt;span class="na"&gt;Policies&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;EventBridgePutEventsPolicy&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;EventBusName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;default&lt;/span&gt;

&lt;span class="na"&gt;TransactionHistoryProcessingFunction&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::Serverless::Function&lt;/span&gt;
    &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;Description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;This function processes records from the CDC-populated Kinesis data stream related to the TransactionHistory table that match certain conditions&lt;/span&gt;
    &lt;span class="na"&gt;CodeUri&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;transactions/&lt;/span&gt;
    &lt;span class="na"&gt;Handler&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;app.lambda_handler&lt;/span&gt;
    &lt;span class="na"&gt;Runtime&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;python3.9&lt;/span&gt;
    &lt;span class="na"&gt;Architectures&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;x86_64&lt;/span&gt;
    &lt;span class="na"&gt;VpcConfig&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;SecurityGroupIds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;SecurityGroupIds&lt;/span&gt;
        &lt;span class="na"&gt;SubnetIds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;SubnetIds&lt;/span&gt;
    &lt;span class="na"&gt;Environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;Variables&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;DB_SECRET_ARN&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;DatabaseConnectionSecretArn&lt;/span&gt;
        &lt;span class="na"&gt;DB_NAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;DatabaseName&lt;/span&gt;
    &lt;span class="na"&gt;Events&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;KinesisSourceStream&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Kinesis&lt;/span&gt;
        &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;Stream&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Sub&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;arn:${AWS::Partition}:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${streamName}"&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;streamName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;SourceStreamName&lt;/span&gt;
            &lt;span class="na"&gt;StartingPosition&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;TRIM_HORIZON&lt;/span&gt;
            &lt;span class="na"&gt;FilterCriteria&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;Filters&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;Pattern&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;{&lt;/span&gt;
                    &lt;span class="s"&gt;"data":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;{&lt;/span&gt;
                        &lt;span class="s"&gt;"metadata":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;{&lt;/span&gt; 
                        &lt;span class="s"&gt;"operation":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["update",&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;"insert"],&lt;/span&gt;
                        &lt;span class="s"&gt;"schema-name":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["Production"],&lt;/span&gt;
                        &lt;span class="s"&gt;"table-name":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["TransactionHistory"],&lt;/span&gt;
                        &lt;span class="s"&gt;"record-type":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["data"]&lt;/span&gt;
                        &lt;span class="s"&gt;},&lt;/span&gt;
                        &lt;span class="s"&gt;"data":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;{&lt;/span&gt;
                        &lt;span class="s"&gt;"TransactionType":&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;["S"]&lt;/span&gt;
                        &lt;span class="s"&gt;}&lt;/span&gt;
                    &lt;span class="s"&gt;}&lt;/span&gt;
                    &lt;span class="s"&gt;}'&lt;/span&gt;
    &lt;span class="na"&gt;Policies&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;EventBridgePutEventsPolicy&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;EventBusName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;default&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;AWSSecretsManagerGetSecretValuePolicy&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="na"&gt;SecretArn&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;!Ref&lt;/span&gt; &lt;span class="s"&gt;DatabaseConnectionSecretArn&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Now it's time to build the template and deploy the solution. Make sure the AWS CLI is configured with an AWS principal with the proper permissions and you're using a profile attached to the AWS account you want to use. Note that SAM does some bootstrapping upon the first use (e.g.: creating an S3 bucket for uploading the processed templates). For the deployment step, you'll need to introduce some parameters such as the name of the application and some CloudFormation behaviors, but the SAM CLI has the &lt;code&gt;--guided&lt;/code&gt;option that helps you introduce those parameters and store them locally for further reuse. Run the following commands on a terminal&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;sam build &lt;span class="nt"&gt;--use-container&lt;/span&gt;
sam deploy &lt;span class="nt"&gt;--guided&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In subsequent calls after SAM CLI has installed the dependencies locally and you have set the parameters, you can simply run &lt;code&gt;sam build &amp;amp;&amp;amp; sam deploy&lt;/code&gt; (unless you want to change any of the two).&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Following this post, you now have a steady stream of business events that you can use to build event-driven applications that react to them. In the next blog post, I'll explore how to use the DMS and the existing CDC stream to build an advanced use case.&lt;/p&gt;

&lt;h2&gt;
  
  
  Clean up
&lt;/h2&gt;

&lt;p&gt;You can delete the resources you have deployed using SAM CLI with the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sam delete
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>aws</category>
      <category>eventdriven</category>
      <category>database</category>
      <category>innovation</category>
    </item>
    <item>
      <title>Unlock innovation using data from your on-premises databases (II) - Streaming state changes</title>
      <dc:creator>Tasio Guevara</dc:creator>
      <pubDate>Mon, 14 Feb 2022 18:05:19 +0000</pubDate>
      <link>https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-ii-streaming-state-changes-2948</link>
      <guid>https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-ii-streaming-state-changes-2948</guid>
      <description>&lt;p&gt;In my &lt;a href="https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-i-36na"&gt;last blog post&lt;/a&gt;, I explained the motivation to continuously extract data from relational databases on-premises and introduced Change Data Capture (CDC) and AWS Database Migration Service (DMS) as tools to do this while minimizing the impact on the database.&lt;/p&gt;

&lt;p&gt;In this blog post, I show how to implement a solution to get data changes into a data stream. Data streams are a powerful tool to build near real-time analytics and other use cases, such as building applications that react to state changes. For example, you may want to trigger a process when a customer updates their consent to receive marketing information or updates their email address.&lt;/p&gt;

&lt;p&gt;In this solution, I will use AWS DMS to continuously replicate data from a SQL Server database into an Amazon Kinesis data stream. In this data stream, each record will contain the row that was changed (updated, inserted, or deleted) in the source database with all the current and previous values for each field.&lt;/p&gt;

&lt;p&gt;Below, you can find a diagram that represents the architecture of the solution for a database on-premises and the architecture I used on my lab environment to prepare this blog post.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Because you need network connectivity between AWS DMS and your source database, I have, simply for illustrative purposes, placed a &lt;a href="https://docs.aws.amazon.com/vpn/latest/s2svpn/VPC_VPN.html" rel="noopener noreferrer"&gt;site-to-site VPN connection&lt;/a&gt; between the on-premises local network and a VPC in AWS. Note that there are &lt;a href="https://docs.aws.amazon.com/whitepapers/latest/aws-vpc-connectivity-options/network-to-amazon-vpc-connectivity-options.html" rel="noopener noreferrer"&gt;several alternatives&lt;/a&gt; to establish connectivity between on-premises networks and AWS. In my lab setup, I am running the source database in a VPC, so I just need to run my AWS DMS replication instance in the same VPC or, better yet, in another VPC with connectivity to the former one (using, for example, VPC peering).&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F99mth0r0tnvhg0ta3oj1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F99mth0r0tnvhg0ta3oj1.png" alt="Architecture for the solution with a database on-premises" width="800" height="316"&gt;&lt;/a&gt;  &lt;em&gt;Figure 1 - Architecture for the solution with a database on-premises&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvhbmo5r0az7afr2nt00i.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvhbmo5r0az7afr2nt00i.png" alt="Architecture for the lab environment" width="800" height="345"&gt;&lt;/a&gt;  &lt;em&gt;Figure 2 - Architecture for the lab environment&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Sample database
&lt;/h2&gt;

&lt;p&gt;For this exercise, as a sample database, I will use a usual suspect in the SQL Server world, the &lt;a href="https://docs.microsoft.com/en-us/sql/samples/adventureworks-install-configure" rel="noopener noreferrer"&gt;AdventureWorks sample database&lt;/a&gt;. More precisely, I’ll use the &lt;a href="https://github.com/Microsoft/sql-server-samples/releases/download/adventureworks/AdventureWorks2019.bak" rel="noopener noreferrer"&gt;2019 OLTP (Online Transaction Processing) version&lt;/a&gt;, as I want to simulate an OLTP workload and I want will be using SQL Server 2019 running on Amazon RDS.&lt;/p&gt;

&lt;p&gt;I will not dive into the details about how to deploy such an environment, but if you want to replicate it, take into account that in order to restore a database from a native SQL Server backup (like the one you'd download from the &lt;a href="https://docs.microsoft.com/en-us/sql/samples/adventureworks-install-configure" rel="noopener noreferrer"&gt;Microsoft AdventureWorks web site&lt;/a&gt;), you need to perform the steps described in the &lt;a href="https://aws.amazon.com/premiumsupport/knowledge-center/native-backup-rds-sql-server/" rel="noopener noreferrer"&gt;How do I perform native backups of an Amazon RDS DB instance that's running SQL Server?&lt;/a&gt; article.&lt;/p&gt;

&lt;h2&gt;
  
  
  Preparing the database
&lt;/h2&gt;

&lt;p&gt;The sample database contains five schemas and a total of 68 tables (not counting administrative tables and the ‘dbo’ schema). For this exercise, I’m going to use three tables in the ‘Production’ schema: ‘ProductInventory’, ‘ProductReview’, and ‘TransactionHistory’.&lt;/p&gt;

&lt;p&gt;To enable continuous replication, you need to configure the database. You can find prerequisites and detailed instructions in &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.SQLServer.html#CHAP_Source.SQLServer.CDC.Publication" rel="noopener noreferrer"&gt;Using a Microsoft SQL Server database as a source for AWS DMS&lt;/a&gt;. In my case, because I’m using AWS RDS, so I do the following:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Execute this query in SQL Server to enable CDC on the database:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;exec&lt;/span&gt; &lt;span class="n"&gt;msdb&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dbo&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;rds_cdc_enable_db&lt;/span&gt; &lt;span class="s1"&gt;'AdventureWorks2019'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Execute these queries in SQL Server to enable CDC on each table:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="n"&gt;USE&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;AdventureWorks2019&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="k"&gt;exec&lt;/span&gt; &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sp_cdc_enable_table&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;source_schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;N&lt;/span&gt;&lt;span class="s1"&gt;'Production'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;source_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;N&lt;/span&gt;&lt;span class="s1"&gt;'ProductInventory'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;role_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;supports_net_changes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
&lt;span class="k"&gt;GO&lt;/span&gt;

&lt;span class="n"&gt;USE&lt;/span&gt; &lt;span class="n"&gt;AdventureWorks2019&lt;/span&gt;
&lt;span class="k"&gt;exec&lt;/span&gt; &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sp_cdc_enable_table&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;source_schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;N&lt;/span&gt;&lt;span class="s1"&gt;'Production'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;source_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;N&lt;/span&gt;&lt;span class="s1"&gt;'ProductReview'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;role_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;supports_net_changes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
&lt;span class="k"&gt;GO&lt;/span&gt;

&lt;span class="n"&gt;USE&lt;/span&gt; &lt;span class="n"&gt;AdventureWorks2019&lt;/span&gt;
&lt;span class="k"&gt;exec&lt;/span&gt; &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sp_cdc_enable_table&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;source_schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;N&lt;/span&gt;&lt;span class="s1"&gt;'Production'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;source_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;N&lt;/span&gt;&lt;span class="s1"&gt;'TransactionHistory'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;role_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;supports_net_changes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
&lt;span class="k"&gt;GO&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Run these queries to configure and enable the retention time (in seconds) for the changes in the transaction log. The documentation recommends 86399 seconds (one day), but you need to consider your need and the implications of having, potentially, to store that data in your local disks.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="n"&gt;USE&lt;/span&gt; &lt;span class="n"&gt;AdventureWorks2019&lt;/span&gt;
&lt;span class="k"&gt;EXEC&lt;/span&gt; &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sp_cdc_change_job&lt;/span&gt; &lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;job_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'capture'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;pollinginterval&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;86399&lt;/span&gt;
&lt;span class="k"&gt;EXEC&lt;/span&gt; &lt;span class="n"&gt;sp_cdc_stop_job&lt;/span&gt; &lt;span class="s1"&gt;'capture'&lt;/span&gt;
&lt;span class="k"&gt;EXEC&lt;/span&gt; &lt;span class="n"&gt;sp_cdc_start_job&lt;/span&gt; &lt;span class="s1"&gt;'capture'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;If you are curious about the effect of these commands, you can browse the SQL Server system tables and check that now there are a few tables under a schema named ‘cdc’.&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites for AWS DMS
&lt;/h2&gt;

&lt;p&gt;There are a few things that you need in your environment to successfully run a migration task in AWS DMS.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;You can find more detailed instructions about how to set up some resources in the &lt;a href="https://docs.aws.amazon.com/streams/latest/dev/vpc.html" rel="noopener noreferrer"&gt;Prerequisites section of the AWS DMS Documentation&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;ul&gt;
&lt;li&gt;An IAM principal that &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Security.html#CHAP_Security.IAMPermissions" rel="noopener noreferrer"&gt;has permissions to use AWS DMS,&lt;/a&gt; create the IAM roles in the next point, and full access to Kinesis data streams.&lt;/li&gt;
&lt;li&gt;Two IAM roles to use DMS from the AWS CLI. You can create them following &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Security.html#CHAP_Security.APIRole" rel="noopener noreferrer"&gt;https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Security.html#CHAP_Security.APIRole&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;A VPC with:

&lt;ul&gt;
&lt;li&gt;At least two subnets that have a route (and allow traffic) to your database server.&lt;/li&gt;
&lt;li&gt;If your subnet is private, you will need either a NAT Gateway or &lt;a href="https://docs.aws.amazon.com/streams/latest/dev/vpc.html" rel="noopener noreferrer"&gt;VPC endpoints for Amazon Kinesis&lt;/a&gt; and &lt;a href="https://docs.aws.amazon.com/secretsmanager/latest/userguide/vpc-endpoint-overview.html" rel="noopener noreferrer"&gt;AWS Secrets Manager&lt;/a&gt;, so the migration task can reach their corresponding endpoints.&lt;/li&gt;
&lt;li&gt;A Security Group that allows outbound traffic to your database server on the right port (1433 by default for SQL Server).&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting up AWS DMS
&lt;/h2&gt;

&lt;p&gt;AWS DMS requires the following components to run a replication task:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Source and target endpoints, each with associated IAM roles to access the necessary AWS resources (if any).&lt;/li&gt;
&lt;li&gt;A replication instance running in a VPC that can connect to those endpoints.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Configuring the source endpoint
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Storing connection details in AWS Secrets Manager&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;You need to configure the endpoint with the details needed to connect to your source database. You will typically need some user name and password for that, which is sensitive information. AWS DMS supports storing that sensitive data in AWS Secrets Manager. Since this is a best practice, first create a secret in AWS Secrets Manager containing the following key/value pairs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"host"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;ip-address-or-url-of-your-database-server&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"port"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1433&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;your-username&amp;gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;your-password&amp;gt;"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;Save the previous JSON object replacing the values between angle-brackets (&amp;lt;&amp;gt;) with your own in a file named &lt;em&gt;sourcedb-creds.json.&lt;/em&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Run the following command using the AWS CLI and take note of the ARN of the secret you just created.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws secretsmanager create-secret &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--name&lt;/span&gt; sourcedb-credentials &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--description&lt;/span&gt; &lt;span class="s2"&gt;"Credentials for the source SQL Server database"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--secret-string&lt;/span&gt; file://sourcedb-creds.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Delete the previous file to avoid having your password stored in plain text around.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Note: if you don’t provide an explicit AWS Key Management Service (KMS) key, AWS Secrets Manager will use the default KMS Key to encrypt this secret. This key’s alias is “aws/secretsmanager”, which is what you will later use to grant AWS DMS access to it .&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Creating a role for AWS DMS to access the secret&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;To access that secret, AWS DMS needs an IAM role that grants access to it and to the KMS key that AWS Secrets Manager used to encrypt it. In this section you are going to create an IAM policy that grants minimal permissions to those resources and an IAM role that uses that policy.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Using your favorite editor, create a text file and name it read-sourcedb-secret-policy.json; paste the following text substituting the values between angle brackets (&amp;lt;&amp;gt;) for your your own and save it.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Version"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2012-10-17"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Statement"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Sid"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"AllowGetSecretValue"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Effect"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Allow"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Action"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"secretsmanager:GetSecretValue"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Resource"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"&amp;lt;your-secret-arn&amp;gt;"&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Effect"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Allow"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Action"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
                    &lt;/span&gt;&lt;span class="s2"&gt;"kms:Decrypt"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                    &lt;/span&gt;&lt;span class="s2"&gt;"kms:DescribeKey"&lt;/span&gt;&lt;span class="w"&gt;
                    &lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Resource"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"arn:aws:kms:&amp;lt;your-region&amp;gt;:&amp;lt;your-account-id&amp;gt;:alias/aws/secretsmanager"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Create the IAM policy running this AWS CLI command.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws iam create-policy &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--policy-name&lt;/span&gt; read-sourcedb-secret &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--policy-document&lt;/span&gt; file://read-sourcedb-secret-policy.json &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--description&lt;/span&gt; &lt;span class="s1"&gt;'Policy that allows reading the sourcedb secret in AWS Secrets Manager'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Now you can create the IAM role. Every IAM role requires a trust policy that determines which principal(s) will be able to assume it. In this case, that principal will be the AWS DMS service. Create a new text file, paste the following content and save it as &lt;em&gt;dms-trust-policy.json&lt;/em&gt;.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Version"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2012-10-17"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Statement"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Effect"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Allow"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Principal"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Service"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"dms.&amp;lt;your-region&amp;gt;.amazonaws.com"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Action"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"sts:AssumeRole"&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Creating the endpoint&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Now you can create the endpoint.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Create a text file, paste the following content replacing the values between angle brackets (&amp;lt;&amp;gt;) with your own, and save it as &lt;em&gt;sql-server-settings.json&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"DatabaseName"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"AdventureWorks2019"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"SecretsManagerAccessRoleArn"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"arn:aws:iam::&amp;lt;your-account-id&amp;gt;:role/dms-secretsmanager-role"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"SecretsManagerSecretId"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"arn:aws:secretsmanager:&amp;lt;your-region&amp;gt;:&amp;lt;your-account-id&amp;gt;:secret:&amp;lt;your-secret-id&amp;gt;"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Run the following AWS CLI command:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Note: if you are using a VPC endpoint to access AWS Secrets Manager and don’t enable &lt;a href="https://docs.aws.amazon.com/vpc/latest/userguide/vpce-interface.html#vpce-private-dns" rel="noopener noreferrer"&gt;private DNS hostnames&lt;/a&gt; in it, you will need to add the parameter &lt;code&gt;--extra-connection-attributes secretsManagerEndpointOverride=[vpce-09d9c3f89fxxxxxxx-abcd0ef1.secretsmanager.&amp;lt;your-region&amp;gt;.vpce.amazonaws.com](http://vpce-09d9c3f89fxxxxxxx-abcd0ef1.secretsmanager.&amp;lt;your-region&amp;gt;.vpce.amazonaws.com/)&lt;/code&gt;to the command. Replace the value with the endpoint DNS of your VPC endpoint, which you can find under the endpoint details in the VPC console.&lt;br&gt;
&lt;/p&gt;
&lt;/blockquote&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws dms create-endpoint &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--endpoint-identifier&lt;/span&gt; source-sqlserver &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--endpoint-type&lt;/span&gt; &lt;span class="nb"&gt;source&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--engine-name&lt;/span&gt; sqlserver &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--microsoft-sql-server-settings&lt;/span&gt; file://sql-server-settings.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Take note of the ARN of this endpoint, you will need it when creating the replication task.&lt;/p&gt;

&lt;h3&gt;
  
  
  Creating the AWS DMS replication instance
&lt;/h3&gt;

&lt;p&gt;To connect to the source database, the replication instance needs a subnet group. This group is a configuration that tells AWS DMS where to deploy Elastic Network Interfaces (ENI) so it can access resources in a VPC (e.g.: a VPC endpoint, an RDS instance, a VPN gateway, etc.). AWS DMS requires at least two subnets for a subnet group.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Get the identifiers of two subnets within your VPC, you can find them using the AWS Management Console or using the AWS CLI with this command:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws ec2 describe-subnets &lt;span class="nt"&gt;--query&lt;/span&gt; &lt;span class="s2"&gt;"Subnets[?VpcId=='&amp;lt;your-vpc-id&amp;gt;'].SubnetId"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Create the subnet group using the following AWS CLI command:&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws dms create-replication-subnet-group &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--replication-subnet-group-identifier&lt;/span&gt; mssql-replication-subnet-group &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--replication-subnet-group-description&lt;/span&gt; &lt;span class="s2"&gt;"Subnet group for the SQL Server replication instance"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--subnet-ids&lt;/span&gt; &lt;span class="s2"&gt;"&amp;lt;subnet-id-1&amp;gt;"&lt;/span&gt; &lt;span class="s2"&gt;"&amp;lt;subnet-id-2&amp;gt;"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Create the replication instance using the following AWS CLI command and take note of the instance ARN.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws dms create-replication-instance &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--replication-instance-identifier&lt;/span&gt; mssql-replication-instance &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--allocated-storage&lt;/span&gt; 50 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--replication-instance-class&lt;/span&gt; dms.t3.small &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--replication-subnet-group-identifier&lt;/span&gt; mssql-replication-subnet-group &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--no-multi-az&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--no-publicly-accessible&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Note that I have set the storage to 50 GB, used a &lt;em&gt;dms.t3.small&lt;/em&gt; instance, and used the &lt;em&gt;-—no-multi-az&lt;/em&gt; flag. Depending on your needs, you may want to change these settings or additional ones (see the &lt;a href="https://docs.aws.amazon.com/dms/latest/APIReference/API_CreateReplicationInstance.html" rel="noopener noreferrer"&gt;CreateReplicationInstance API documentation&lt;/a&gt;). You can find more guidance in &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReplicationInstance.Types.html" rel="noopener noreferrer"&gt;Choosing the right AWS DMS replication instance for your migration&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Finally, I haven’t specified any security group, so the instance will use the &lt;em&gt;default&lt;/em&gt; security group in the VPC. If you haven’t changed that security group, it will contain an inbound rule that allows all traffic coming from the same security group, and all outbound traffic anywhere.&lt;/p&gt;

&lt;p&gt;Now, wait until the instance is status &lt;em&gt;Available&lt;/em&gt; (you can check this in the AWS DMS console) and test the connection.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fh1bjmo1hbyni6v59pa28.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fh1bjmo1hbyni6v59pa28.gif" alt="Testing the endpoint" width="600" height="337"&gt;&lt;/a&gt;  &lt;em&gt;Figure 3: Testing the endpoint&lt;/em&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Configuring the target endpoint
&lt;/h3&gt;

&lt;p&gt;The target endpoint requires a Kinesis data stream and a role that allows DMS to access that Kinesis data stream.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Creating the Amazon Kinesis data stream&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In this post, I am using a Kinesis data in provisioned mode with one shard as I am not expecting any spiky load. Depending on your own scenario, you might want to use on-demand mode or more shards in provisioned mode.&lt;/p&gt;

&lt;p&gt;To create the data stream, run the following AWS CLI command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws kinesis create-stream &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--stream-name&lt;/span&gt; mssql-cdc &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--shard-count&lt;/span&gt; 1 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--stream-mode-details&lt;/span&gt; &lt;span class="nv"&gt;StreamMode&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;PROVISIONED    
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Granting access from AWS DMS to the Kinesis data stream&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;AWS DMS needs to be explicitly given access to the data stream you just created. For this, you need to create an IAM policy and attach it to a new role that can be assumed by the AWS DMS service.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Create a new text file named &lt;em&gt;write-to-stream-policy.json&lt;/em&gt;, paste the following content, and save it.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Version"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2012-10-17"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Statement"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Sid"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"AllowWritingToStream"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Effect"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Allow"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Action"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="s2"&gt;"kinesis:DescribeStream"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="s2"&gt;"kinesis:PutRecord"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="s2"&gt;"kinesis:PutRecords"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"Resource"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"arn:aws:kinesis:&amp;lt;your-region&amp;gt;:&amp;lt;your-account-id&amp;gt;:stream/mssql-cdc"&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Create the IAM policy using this AWS CLI command.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws iam create-policy &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--policy-name&lt;/span&gt; write-to-cdc-stream &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--policy-document&lt;/span&gt; file://write-to-stream-policy.json &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--description&lt;/span&gt; &lt;span class="s1"&gt;'Policy that allows writing to the mssql-cdc Kinesis data stream'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Create the IAM role that AWS DMS will assume running the following AWS CLI command. Note that this role uses the same trust policy as the IAM role used for the source endpoint.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws iam create-role &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--path&lt;/span&gt; / &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--role-name&lt;/span&gt; dms-kinesis-role &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--assume-role-policy-document&lt;/span&gt; file://dms-trust-policy.json &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--description&lt;/span&gt; &lt;span class="s1"&gt;'Role for DMS to access Amazon Kinesis'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Attach the policy to the newly created role by running this command.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws iam attach-role-policy &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--role-name&lt;/span&gt; dms-kinesis-role &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--policy-arn&lt;/span&gt; arn:aws:iam::&amp;lt;your-account-id&amp;gt;:policy/write-to-cdc-stream
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Creating the target endpoint&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Now, you have everything set up to create the target endpoint. For that, run this command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws dms create-endpoint &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--endpoint-identifier&lt;/span&gt; target-kinesis &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--endpoint-type&lt;/span&gt; target &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--engine-name&lt;/span&gt; kinesis &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--kinesis-settings&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nv"&gt;StreamArn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;arn:aws:kinesis:&amp;lt;your-region&amp;gt;:&amp;lt;your-account-id&amp;gt;:stream/mssql-cdc,&lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nv"&gt;ServiceAccessRoleArn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;arn:aws:iam::&amp;lt;your-account-id&amp;gt;:role/dms-kinesis-role,&lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nv"&gt;MessageFormat&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;JSON
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As with the source endpoint, test the connectivity to make sure there are no issues. If there are issues, make sure that the subnets associated with the replication instance have access to Amazon Kinesis. Some hints if you are using private subnets and find problems&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Check there is a VPC endpoint for Kinesis with ENIs deployed on those subnets.&lt;/li&gt;
&lt;li&gt;Check that the VPC endpoint has enabled &lt;a href="https://docs.aws.amazon.com/vpc/latest/userguide/vpce-interface.html#vpce-private-dns" rel="noopener noreferrer"&gt;private DNS hostnames&lt;/a&gt; in it. If not, you will need to add the parameter &lt;code&gt;--extra-connection-attributes&lt;/code&gt; with the private DNS hostname of the VPC endpoint to the command to create the endpoint.&lt;/li&gt;
&lt;li&gt;Alternatively, you can use a NAT gateway or another type of proxy to access the Internet.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting up the replication task
&lt;/h2&gt;

&lt;p&gt;At this point, you have everything ready to create and launch a replication task.&lt;/p&gt;

&lt;p&gt;A replication task needs to have a table mapping, so it knows which tables and columns from the data source to read and how to write them to the target. You can also define a series of (optional) task settings you can use to customize its behavior. With those settings you can configure multiple aspects, such as &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.Logging.html" rel="noopener noreferrer"&gt;logging&lt;/a&gt;, &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Validating.html" rel="noopener noreferrer"&gt;data validation&lt;/a&gt;, &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.TimeTravel.html" rel="noopener noreferrer"&gt;time travel&lt;/a&gt;, or &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.Filters.html" rel="noopener noreferrer"&gt;data filtering&lt;/a&gt;. You can find a list of the available settings in the &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.html" rel="noopener noreferrer"&gt;Specifying task settings for AWS Database Migration Service tasks documentation&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;In this example, you are going to configure a task that does continuous replication of two source tables in the SQL Server database into the Kinesis data stream. In the output, each record will include the updated and previous values for each row affected (when relevant, as for deletes and inserts this does not apply).&lt;/p&gt;

&lt;p&gt;Depending on the database operation Each record in the stream will represent and updated row from the database with a JSON dictionary that includes three items: “data”, “before”, and “metadata”.&lt;/p&gt;

&lt;p&gt;The “data” and “before” items will contain a key-value pair for column and value key for each column in the table, where “data” has the values in the updated row, and “before” has the values in the updated row prior to the database operation.&lt;/p&gt;

&lt;p&gt;The “metadata” item will contain information about the operation, such as its timestamp, the operation type (update, delete, or insert), or the table and schema the operation was performed on. Note that when inserting or deleting a row, the “before” element will not be included in the dictionary.&lt;/p&gt;

&lt;p&gt;Here’s a sample record for an update operation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"data"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ProductReviewID"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ProductID"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;937&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ReviewerName"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Alice"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ReviewDate"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2013-11-15T00:00:00Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"EmailAddress"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"alice@example.com"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"Rating"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"Comments"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Maybe it's just because I'm new to mountain biking, but I had a terrible time getting use&lt;/span&gt;&lt;span class="se"&gt;\r\n&lt;/span&gt;&lt;span class="s2"&gt;to these pedals. In my first outing, I wiped out trying to release my foot. Any suggestions on&lt;/span&gt;&lt;span class="se"&gt;\r\n&lt;/span&gt;&lt;span class="s2"&gt;ways I can adjust the pedals, or is it just a learning curve thing?"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ModifiedDate"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2013-11-15T00:00:00Z"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ProductReviewID"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"3"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ProductID"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"937"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ReviewerName"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Alice"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ReviewDate"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2013-11-15T00:00:00Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"EmailAddress"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"alice@example.com"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"Rating"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"Comments"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Maybe it's just because I'm new to mountain biking, but I had a terrible time getting use&lt;/span&gt;&lt;span class="se"&gt;\r\n&lt;/span&gt;&lt;span class="s2"&gt;to these pedals. In my first outing, I wiped out trying to release my foot. Any suggestions on&lt;/span&gt;&lt;span class="se"&gt;\r\n&lt;/span&gt;&lt;span class="s2"&gt;ways I can adjust the pedals, or is it just a learning curve thing?"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ModifiedDate"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2013-11-15T00:00:00Z"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"metadata"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2022-02-02T11:52:59.913277Z"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"record-type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"data"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"operation"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"update"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"partition-key-type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"schema-table"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"schema-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Production"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"table-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"ProductReview"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"transaction-id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;128599&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Table Mapping&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Diving a bit deeper into table mapping, you need to select which tables from which schemas are going to be read from the data source. You have the option of using the ‘%’ as a wildcard for the name, or part of it, of the tables and schemas. You can also and apply some transformations prior to data being written at the target. You can do things like &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.Filters.html" rel="noopener noreferrer"&gt;filtering values&lt;/a&gt;, adding/removing columns, renaming, adding suffixes, or changing data types (you can find a complete list of actions and capabilities in the &lt;a href="https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.html" rel="noopener noreferrer"&gt;Transformation rules and actions documentation&lt;/a&gt;). In this example, I’m not applying any transformation, only reading two tables from two different schemas.&lt;/p&gt;

&lt;p&gt;You can define those mappings either in the AWS DMS console or by using a JSON file. In this guide, I’m using the latter.&lt;/p&gt;

&lt;p&gt;Create a text file and paste the following JSON object. Save the file as &lt;em&gt;table-mapping-rules.json&lt;/em&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"rules"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"selection"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"products"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"object-locator"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"schema-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Production"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"table-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Product"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-action"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"include"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"filters"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"selection"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"transaction-history"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"object-locator"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"schema-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Production"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"table-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"TransactionHistory"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-action"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"include"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"filters"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"selection"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"3"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"product-reviews"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"object-locator"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"schema-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Production"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"table-name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"ProductReview"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"rule-action"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"include"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"filters"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Task settings&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;AWS DMS allows defining the behavior of a task very granularly, although in this case I’m only going to define rules for logging into Amazon CloudWatch Logs and a behavior to include the “before” values of a row that has been updated.&lt;/p&gt;

&lt;p&gt;Create a text file, paste the following JSON object, and save it as &lt;em&gt;task-settings.json&lt;/em&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Logging"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"EnableLogging"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"LogComponents"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"TRANSFORMATION"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"SOURCE_UNLOAD"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"IO"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"TARGET_LOAD"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"PERFORMANCE"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"SOURCE_CAPTURE"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"SORTER"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"REST_SERVER"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"VALIDATOR_EXT"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"TARGET_APPLY"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"TASK_MANAGER"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"TABLES_MANAGER"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"METADATA_MANAGER"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"FILE_FACTORY"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"COMMON"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"ADDONS"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"DATA_STRUCTURE"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"COMMUNICATION"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"FILE_TRANSFER"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Severity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"LOGGER_SEVERITY_DEFAULT"&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"BeforeImageSettings"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"EnableBeforeImage"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"FieldName"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"ColumnFilter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"all"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Creating the replication task&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Run the following AWS CLI commands to create and start the task respectively.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws dms create-replication-task &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--replication-task-identifier&lt;/span&gt; mssql-to-kinesis-cdc
    &lt;span class="nt"&gt;--source-endpoint-arn&lt;/span&gt; arn:aws:dms:&amp;lt;your-region&amp;gt;:&amp;lt;your-account-id&amp;gt;:endpoint:&amp;lt;your-source-endpoint-id&amp;gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--target-endpoint-arn&lt;/span&gt; arn:aws:dms:&amp;lt;your-region&amp;gt;:&amp;lt;your-account-id&amp;gt;:endpoint:&amp;lt;your-target-endpoint-id&amp;gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--replication-instance-arn&lt;/span&gt; arn:aws:dms:&amp;lt;your-region&amp;gt;:&amp;lt;your-account-id&amp;gt;:rep:&amp;lt;your-replication-instance-id&amp;gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--migration-type&lt;/span&gt; cdc &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--table-mappings&lt;/span&gt; file://table-mapping-rules.json &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--replication-task-settings&lt;/span&gt; file://task-settings.json &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--resource-identifier&lt;/span&gt; mssql-to-kinesis-cdc
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws dms start-replication-task &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--replication-task-arn&lt;/span&gt; arn:aws:dms:&amp;lt;your-region&amp;gt;:&amp;lt;your-account-id&amp;gt;:task:&amp;lt;your-task-id&amp;gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--start-replication-task-type&lt;/span&gt; start-replication
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;Note that, because this is a continuous replication task, you can decide at what point in time in the database you want to start the replication from and when to stop. You can also stop and resume the task.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;After a few minutes, your task should be in the state “Replication ongoing“.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8t1ptzdkewcz16eb9z8k.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8t1ptzdkewcz16eb9z8k.png" alt="Task state" width="728" height="221"&gt;&lt;/a&gt;  &lt;em&gt;Figure 4: Task state&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Testing the solution
&lt;/h2&gt;

&lt;p&gt;To test the solution, perform some update, insert or delete in one of the two source tables, for example, run the following SQL statement against your source database:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;UPDATE&lt;/span&gt; &lt;span class="n"&gt;AdventureWorks2019&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Production&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ProductReview&lt;/span&gt;
&lt;span class="k"&gt;SET&lt;/span&gt; &lt;span class="n"&gt;Rating&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;ProductReviewID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Wait for about a minute until the replication task has written into your Kinesis data stream and then read from the stream.&lt;/p&gt;

&lt;p&gt;If you are running this tutorial from a Unix-type command processor such as bash, you can automate the acquisition of the shard iterator using a nested command, like this (scroll horizontally to see the entire command):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;SHARD_ITERATOR&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;aws kinesis get-shard-iterator &lt;span class="nt"&gt;--shard-id&lt;/span&gt; shardId-000000000000 &lt;span class="nt"&gt;--shard-iterator-type&lt;/span&gt; TRIM_HORIZON &lt;span class="nt"&gt;--stream-name&lt;/span&gt; &amp;lt;your-stream-name&amp;gt; &lt;span class="nt"&gt;--query&lt;/span&gt; &lt;span class="s1"&gt;'ShardIterator'&lt;/span&gt;&lt;span class="si"&gt;)&lt;/span&gt;
aws kinesis get-records &lt;span class="nt"&gt;--shard-iterator&lt;/span&gt; &lt;span class="nv"&gt;$SHARD_ITERATOR&lt;/span&gt; &lt;span class="nt"&gt;--query&lt;/span&gt; &lt;span class="s2"&gt;"Records[-1].Data"&lt;/span&gt; | &lt;span class="nb"&gt;sed&lt;/span&gt; &lt;span class="s1"&gt;'s/"//'&lt;/span&gt; | &lt;span class="nb"&gt;base64&lt;/span&gt; &lt;span class="nt"&gt;--decode&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you are running this tutorial from a system that supports PowerShell, you can automate acquisition of the shard iterator using a command such as this (scroll horizontally to see the entire command):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In this post, I have shown how to set up a replication task to stream changes in a relational databases into a data stream. In the next post, I will showcase how to process data from the stream in real time.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;&lt;a href="https://unsplash.com/@lazycreekimages?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Photo by Michael Dziedzic&lt;/a&gt; on &lt;a href="https://unsplash.com/s/photos/data-stream?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText" rel="noopener noreferrer"&gt;Unsplash&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

</description>
      <category>aws</category>
      <category>database</category>
      <category>innovation</category>
    </item>
    <item>
      <title>Unlock innovation using data from your on-premises databases (I)</title>
      <dc:creator>Tasio Guevara</dc:creator>
      <pubDate>Thu, 28 Oct 2021 06:47:30 +0000</pubDate>
      <link>https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-i-36na</link>
      <guid>https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-i-36na</guid>
      <description>&lt;p&gt;Data has arguably become one of the most important assets for any business. Data captures the past and present state of an organization and can help shaping and forecasting its future.&lt;/p&gt;

&lt;p&gt;Thanks to the advancements on analytics and artificial intelligence, and the doors that these technologies open for innovation, data has taken the center stage for many organizations’ strategy.&lt;/p&gt;

&lt;p&gt;In this series of blog posts I am going to describe a few use cases that you can implement with minimal impact to your existing databases and with no need to migrate them to the cloud.&lt;/p&gt;

&lt;p&gt;This first post explains the main cause that limits data-driven innovation and explains an approach to solve it. The posts to follow will dive deeper into the technical implementation of different solutions for different use cases.&lt;/p&gt;

&lt;h1&gt;
  
  
  Legacy database architectures
&lt;/h1&gt;

&lt;p&gt;Even today, many businesses still struggle to use their existing data for new use cases other than those for which those databases were designed. There are several reasons for this, but the main one is due to the legacy database architectures that were used when setting those systems up.&lt;/p&gt;

&lt;p&gt;A typical architecture consists on a transactional database, also known as OLTP (On-Line Transactional Processing)&lt;br&gt;
database, and an analytical one or OLAP (On-Line Analytics Processing). Data is copied in batches from the OLTP to the OLAP database using some Extract-Transform-Load (ETL) processes. These processes are executed on a schedule (e.g.: once a day) and the tools are often provided by the same vendors of the database engines. In order to make this work, these databases are fine-tuned to achieve the required needs in terms of performance, availability, etc. This entails the work of developers, database administrators (DBAs), and business analysts to, among other tasks, design the right tables, indexes, and partitions; optimize transactional queries; and schedule maintenance and ETL jobs to minimize the impact on business operations.&lt;/p&gt;

&lt;p&gt;To those challenges, you also need to add the inherent ones of scaling relational databases (TL;DR: it’s hard) and the constraints of using on-premises infrastructure, where you can’t just simply add more storage or spin up a more powerful server in a matter of seconds. The “you’re gonna need a bigger boat” scenario when operating on-premises usually means waiting for months until your database is running on an appropriately-sized server.&lt;/p&gt;

&lt;p&gt;As you can rightfully guess, accommodating new uses cases in that ecosystem requires plenty of planning. Furthermore, if those use cases go into the direction of near real-time analytics or machine learning, you may find that your legacy databases are simply not fitting the bill (and rightfully so, as those databases were not designed for it).&lt;/p&gt;

&lt;h1&gt;
  
  
  Migrate to the cloud you say?
&lt;/h1&gt;

&lt;p&gt;If you have kept with me so far, you may be thinking: “right, so I need to migrate those databases to the cloud, but...”. Well, that is an &lt;strong&gt;option&lt;/strong&gt; with its own &lt;strong&gt;benefits&lt;/strong&gt;, but I am aware that not everyone is ready or willing to migrate their databases to AWS.&lt;/p&gt;

&lt;p&gt;As I explained in the introduction, the goal of this blog series is to show you how you can start leveraging the flexibility and scale of AWS to build your innovative use cases while keeping your databases where they are and minimizing the operational impact on them. One of the solutions for doing this is called Change Data Capture or CDC.&lt;/p&gt;

&lt;h1&gt;
  
  
  Change Data Capture
&lt;/h1&gt;

&lt;p&gt;To define Change Data Capture (CDC), I’ll simply quote &lt;a href="https://en.wikipedia.org/wiki/Change_data_capture"&gt;its article in Wikipedia&lt;/a&gt;:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;In databases, &lt;em&gt;change data capture&lt;/em&gt; is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;In a nutshell, it allows you to know that "something" has happened shortly after it has happened.&lt;/p&gt;

&lt;p&gt;Although it depends on the source database engine, CDC typically uses the database transaction logs, so there is no need to query the database to extract the data. This has the  benefit of having much lower impact on the database server than continuously reading from the tables you are interested in.&lt;/p&gt;

&lt;p&gt;Implementing such design patterns is not an easy task, but don't fret, as there are offerings that provide out-of-the-box support for CDC.&lt;/p&gt;

&lt;p&gt;There are many options, including &lt;a href="https://partners.amazonaws.com/search/partners?keyword=change%20data%20capture"&gt;AWS partners that have solutions for CDC in the AWS Partner Network&lt;/a&gt; and AWS' own &lt;a href="https://aws.amazon.com/dms/"&gt;AWS Database Migration Service (DMS)&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  AWS Database Migration Service
&lt;/h2&gt;

&lt;p&gt;In this series, I will use &lt;a href="https://aws.amazon.com/dms/"&gt;AWS Database Migration Service (DMS)&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;AWS DMS is a cloud service that makes it easy to migrate relational databases, data warehouses, NoSQL databases, and other types of data stores. You can use AWS DMS to migrate your data into the AWS Cloud or between combinations of cloud and on-premises setups. &lt;/p&gt;

&lt;p&gt;But more importantly (I promised you don't need to move your database from where it is), AWS DMS supports CDC and can run continuous replication jobs, so you can uninterruptedly copy data from a source database to different target databases and AWS services.&lt;/p&gt;

&lt;p&gt;AWS DMS can use many of the most popular data engines as a source for data replication, so it's very likely that you can use it with your existing databases.&lt;/p&gt;

&lt;p&gt;The option to use different and heterogeneous targets is the key for implementing different use cases flexibly. For example, you may want to use an Amazon Kinesis Data Stream for near real-time data processing, or Amazon S3 to hydrate a data lake that stores data for analytics or training machine learning models.&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;In this post I have outlined the issues with traditional database architectures when you need to implement new use cases and have described CDC as a way to overcome those limitations. Finally, I have provided a glimpse on some of the AWS DMS capabilities that are relevant for using CDC.&lt;/p&gt;

&lt;p&gt;In &lt;a href="https://dev.to/tasio/unlock-innovation-using-data-from-your-on-premises-databases-ii-streaming-state-changes-2948"&gt;the next blog post&lt;/a&gt; I will provide the technical implementation of a solution to enable CDC using AWS DMS and a Microsoft SQL Server source database.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;&lt;a href="https://unsplash.com/@lazycreekimages?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText"&gt;Photo by Michael Dziedzic&lt;/a&gt; on &lt;a href="https://unsplash.com/s/photos/data-stream?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText"&gt;Unsplash&lt;/a&gt;&lt;/em&gt;&lt;/p&gt;

</description>
      <category>aws</category>
      <category>database</category>
      <category>innovation</category>
    </item>
  </channel>
</rss>
