<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Vibhor Agarwal</title>
    <description>The latest articles on DEV Community by Vibhor Agarwal (@vibhor_agarwal).</description>
    <link>https://dev.to/vibhor_agarwal</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1942519%2F6e274193-5fe1-4bc2-bdd9-2e925b9dac37.png</url>
      <title>DEV Community: Vibhor Agarwal</title>
      <link>https://dev.to/vibhor_agarwal</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/vibhor_agarwal"/>
    <language>en</language>
    <item>
      <title>Custom API Analytics with AWS Serverless</title>
      <dc:creator>Vibhor Agarwal</dc:creator>
      <pubDate>Wed, 09 Oct 2024 12:50:05 +0000</pubDate>
      <link>https://dev.to/vibhor_agarwal/custom-api-analytics-with-aws-serverless-3f54</link>
      <guid>https://dev.to/vibhor_agarwal/custom-api-analytics-with-aws-serverless-3f54</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;This document describes the high level workflow to capture request data at various stages for analytics later for a lambda API.&lt;/p&gt;

&lt;h2&gt;
  
  
  Objective
&lt;/h2&gt;

&lt;p&gt;Collect API trace data in a custom format with fields that might be needed by developer for analysis in a S3 bucket, on a daily basis.&lt;br&gt;
Data in S3 must be partitioned by year/month/day, so that we can even run Athena queries later.&lt;br&gt;
Also, the data in S3 should be downloadable over an API call.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;A framework utility (indicators below) can be used to collect data in a dictionary in your main lambda's memory (not described here). Code may look like this to capture data in memory for each API call.
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;lambda_handler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;any&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;any&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# first thing in lambda handler
&lt;/span&gt;    &lt;span class="n"&gt;analytics_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;MyInMemoryData&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_if_exists&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;AnalyticsData&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;analytics_data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;MyInMemoryData&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;register&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;AnalyticsData&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{})&lt;/span&gt;

    &lt;span class="c1"&gt;# re-initialize this.
&lt;/span&gt;    &lt;span class="n"&gt;MyInMemoryData&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AnalyticsData&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;time&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;)),&lt;/span&gt;
                                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;request_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;aws_request_id&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;dummy&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
                                    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;ol&gt;
&lt;li&gt;Capture key outcomes of API at various stages of workflow in a dictionary (lives in python memory)
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;    &lt;span class="c1"&gt;# populate data for your in-memory dictionary at various stages of your code flow
&lt;/span&gt;    &lt;span class="n"&gt;MyInMemoryData&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AnalyticsData&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;json_request_body&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;copy&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;deepcopy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;body&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="bp"&gt;...&lt;/span&gt;
    &lt;span class="n"&gt;MyInMemoryData&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AnalyticsData&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;col1&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;MyInMemoryData&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AnalyticsData&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;col2&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;some&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="bp"&gt;...&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h2&gt;
  
  
  Infrastructure as Code for this feature
&lt;/h2&gt;

&lt;p&gt;Snippets below for 'serverless' framework with Cloud Formation.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://www.serverless.com/framework/docs/providers/aws/guide/serverless.yml" rel="noopener noreferrer"&gt;Serverless Infra as Code&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;'analytics' lambda is deployed and has 2 triggers:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;load on SQS&lt;/li&gt;
&lt;li&gt;an event bridge event rule (CRON) that runs daily&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;'analytics-download' lambda offloads preparation of data for analytics on S3&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;load on a different queue, where message is sent when API request is received
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;   &lt;span class="na"&gt;analytics&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;handler&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;lambda_analytics.lambda_handler&lt;/span&gt;
    &lt;span class="na"&gt;description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Lambda function to read request &amp;amp; response and log into timestream database, also to upload to s3 via CRON schedule&lt;/span&gt;
    &lt;span class="na"&gt;memorySize&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;128&lt;/span&gt;
    &lt;span class="na"&gt;module&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;analytics&lt;/span&gt;
    &lt;span class="na"&gt;events&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
       &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;sqs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;arn:aws:sqs:${self:provider.region}:${aws:accountId}:${self:provider.environment.SQS_ANALYTICS_NAME}&lt;/span&gt; &lt;span class="c1"&gt;# create entry in timestream&lt;/span&gt;
       &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;schedule&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;cron(0 0 * * ? *)&lt;/span&gt; &lt;span class="c1"&gt;# run daily, as the date changes, and same lambda then queries time stream and uploads data to S3&lt;/span&gt;
   &lt;span class="na"&gt;analytics-download&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;handler&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;lambda_download_analytics.lambda_handler&lt;/span&gt; &lt;span class="c1"&gt;# on a SQS trigger (SQS sent when API all is received), zips and uploads to S3&lt;/span&gt;
    &lt;span class="na"&gt;description&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Lambda function to process download analytics data&lt;/span&gt;
    &lt;span class="na"&gt;memorySize&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;128&lt;/span&gt;
    &lt;span class="na"&gt;module&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;analytics&lt;/span&gt;
    &lt;span class="na"&gt;events&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;sqs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;arn:aws:sqs:${self:provider.region}:${aws:accountId}:${self:provider.environment.SQS_DOWNLOAD_ANALYTICS_NAME}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;We need other cloud resources also:&lt;/p&gt;

&lt;p&gt;1. AnalyticsSQS: FIFO queue that would receive API data to persist in time stream database from 'analytics' lambda above&lt;/p&gt;

&lt;p&gt;2. AnalyticsTimeStreamDB: Time stream database&lt;/p&gt;

&lt;p&gt;3. AnalyticsTimeStreamTable: Table under the database; note that we dont write in magnetic store but only memory. In memory, only 1 hour old timestamp data can be written (we write instantly !). We also retain data in magnetic store (cheaper) for 7 days only&lt;/p&gt;

&lt;p&gt;4. AnalyticsAsyncLambdaSNSDestination, AnalyticsAsyncLambdaSNSEmailSubscription1: we also need a SNS topic and an email subscription to this topic to alert developers via email in case this functionality is broken ( since this feature is not exposed via API, users would not report failures)&lt;/p&gt;

&lt;p&gt;5. AnalyticsS3Bucket: S3 bucket to finally persist the time stream data. In STANDARD tier, we retain data for 2 months, to allow download of data in this period after which&lt;br&gt;
   data is pushed to INFREQUENT ACCESS tier, where it stays for another 30 days, before being archived in GLACIER. The data expires after 1 year of its creation&lt;br&gt;
   There is another lifecycle rule that we run for files under /tmp prefix in this bucket, to delete files after 1 day only ! This is to support admin requests to download data&lt;br&gt;
   over a date range, which are zipped and uploaded under /tmp prefix as a zip file, for users to download via signed URL. Since this URL is active only for few minutes, we &lt;br&gt;
   do not need long retention for 'tmp' files, the contents of this prefix are cleaned up automatically in 1 day using S3 lifecycle rules.&lt;/p&gt;

&lt;p&gt;6. AnalyticsDataDownloadSQS: Queue to offload download data requests. Main lamdba that received API requests can offload the work to this queue, which can be then processed by 'analytics-download' lambda.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;
&lt;span class="na"&gt;Resources&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;AnalyticsSQS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::SQS::Queue&lt;/span&gt;
      &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;QueueName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${self:provider.environment.SQS_ANALYTICS_NAME}&lt;/span&gt;
        &lt;span class="na"&gt;FifoQueue&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;
        &lt;span class="na"&gt;ReceiveMessageWaitTimeSeconds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;20&lt;/span&gt; &lt;span class="c1"&gt;# long polling&lt;/span&gt;
        &lt;span class="na"&gt;MessageRetentionPeriod&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;600&lt;/span&gt; &lt;span class="c1"&gt;# 10 mins&lt;/span&gt;
        &lt;span class="na"&gt;VisibilityTimeout&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt; &lt;span class="c1"&gt;# for few seconds, not visible to other consumers once received by a consumer&lt;/span&gt;
    &lt;span class="na"&gt;AnalyticsTimeStreamDB&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::Timestream::Database&lt;/span&gt;
      &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;DatabaseName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${self:provider.environment.TIME_STREAM_DB_NAME}&lt;/span&gt;
    &lt;span class="na"&gt;AnalyticsTimeStreamTable&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::Timestream::Table&lt;/span&gt;
      &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;DatabaseName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${self:provider.environment.TIME_STREAM_DB_NAME}&lt;/span&gt;
        &lt;span class="na"&gt;TableName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${self:provider.environment.TIME_STREAM_TABLE_NAME}&lt;/span&gt;
        &lt;span class="na"&gt;MagneticStoreWriteProperties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;EnableMagneticStoreWrites&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;
        &lt;span class="na"&gt;RetentionProperties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;MagneticStoreRetentionPeriodInDays&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;7&lt;/span&gt; &lt;span class="c1"&gt;# move to magnetic store after 1 hr&lt;/span&gt;
          &lt;span class="na"&gt;MemoryStoreRetentionPeriodInHours&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;  &lt;span class="c1"&gt;# retain only minimal period here&lt;/span&gt;
    &lt;span class="na"&gt;AnalyticsAsyncLambdaSNSDestination&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::SNS::Topic&lt;/span&gt;
      &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;DisplayName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Analytics&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;Lambda&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;Failure&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;Notiffications'&lt;/span&gt;
        &lt;span class="na"&gt;TopicName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;SNS_TOPIC_NAME&lt;/span&gt;  &lt;span class="c1"&gt;# implement your email notif on failures&lt;/span&gt;
    &lt;span class="na"&gt;AnalyticsAsyncLambdaSNSEmailSubscription1&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::SNS::Subscription&lt;/span&gt;
      &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;Endpoint&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${self:provider.environment.ADMIN_USERS}&lt;/span&gt;
        &lt;span class="na"&gt;Protocol&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email"&lt;/span&gt;
        &lt;span class="na"&gt;TopicArn&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;{&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Ref"&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;AnalyticsAsyncLambdaSNSDestination"&lt;/span&gt; &lt;span class="pi"&gt;}&lt;/span&gt;
    &lt;span class="na"&gt;AnalyticsS3Bucket&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::S3::Bucket&lt;/span&gt;
      &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;BucketName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${self:provider.environment.ANALYTICS_BUCKET}&lt;/span&gt;
        &lt;span class="na"&gt;AccessControl&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Private&lt;/span&gt;
        &lt;span class="na"&gt;AccelerateConfiguration&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;AccelerationStatus&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Enabled&lt;/span&gt;
        &lt;span class="na"&gt;LifecycleConfiguration&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
          &lt;span class="na"&gt;Rules&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
            &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;Id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;TmpFilesDeleteRule&lt;/span&gt; &lt;span class="c1"&gt;# user requests keep tmp zip here, and serve them via pre-signed URLs, they are no longer needed when user downloads&lt;/span&gt;
              &lt;span class="na"&gt;Prefix&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;tmp/&lt;/span&gt;
              &lt;span class="na"&gt;Status&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Enabled&lt;/span&gt;
              &lt;span class="na"&gt;ExpirationInDays&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt; &lt;span class="c1"&gt;# run deletion midnight UTC following creation&lt;/span&gt;
            &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;Id&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ToIAToGlacierRule&lt;/span&gt; &lt;span class="c1"&gt;# analytics requests collected need to remain in standard tier for several days for users to download historic usage stats&lt;/span&gt;
              &lt;span class="na"&gt;Status&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;Enabled&lt;/span&gt;
              &lt;span class="na"&gt;ExpirationInDays&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;365&lt;/span&gt;
              &lt;span class="na"&gt;Transitions&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;TransitionInDays&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;60&lt;/span&gt;  &lt;span class="c1"&gt;# allow fetch of upton 2 months of data in standard and then push to IA. Amazon S3 does not transition objects smaller than 128 KB to the Standard-IA&lt;/span&gt;
                  &lt;span class="na"&gt;StorageClass&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;STANDARD_IA&lt;/span&gt;
                &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;TransitionInDays&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;90&lt;/span&gt;  &lt;span class="c1"&gt;# remains in IA for 30 days, then pushed to archive&lt;/span&gt;
                  &lt;span class="na"&gt;StorageClass&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;GLACIER&lt;/span&gt;
    &lt;span class="na"&gt;AnalyticsDataDownloadSQS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;Type&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;AWS::SQS::Queue&lt;/span&gt;
      &lt;span class="na"&gt;Properties&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;QueueName&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${self:provider.environment.SQS_DOWNLOAD_ANALYTICS_NAME}&lt;/span&gt;
        &lt;span class="na"&gt;ReceiveMessageWaitTimeSeconds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;20&lt;/span&gt; &lt;span class="c1"&gt;# long polling&lt;/span&gt;
        &lt;span class="na"&gt;MessageRetentionPeriod&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;300&lt;/span&gt; &lt;span class="c1"&gt;# 5 mins&lt;/span&gt;
        &lt;span class="na"&gt;VisibilityTimeout&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;31&lt;/span&gt; &lt;span class="c1"&gt;# for few seconds, not visible to other consumers once received by a consumer&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Refer to the architecture for numbered flows.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvjp5xcdmt95wzyn1z4kb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvjp5xcdmt95wzyn1z4kb.png" alt="Analytics Architecture" width="800" height="495"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  FLOW 1: Analytics - collecting API trace in time stream DB
&lt;/h3&gt;

&lt;p&gt;1. Before sending the final primary API response to the user, the object's data is sent to a FIFO queue ( why FIFO ? to guarantee what is sent first is processed first, though not so critical)&lt;/p&gt;

&lt;p&gt;2. Load on queue would trigger the lambda function (see lambda_analytics.lambda_handler). For this SQS based trigger, create time stream record. &lt;br&gt;
Create a single  entry in AWS timestream database ( various attributes in dict would be various measures ) - this is a single multi measure record. In case of any errors, an email notification with exception trace can be sent to the admins.&lt;br&gt;
Note that if lambda throws error, the record in SQS will be retained; you may want to handle the&lt;br&gt;
exception and be notified, rather than keep un-processed records in queue.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aws.amazon.com/blogs/database/store-and-analyze-time-series-data-with-multi-measure-records-magnetic-storage-writes-and-scheduled-queries-in-amazon-timestream/" rel="noopener noreferrer"&gt;How to create multi measure record&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Implement your own logic to create a single record with measure. Key points here:&lt;/p&gt;

&lt;p&gt;a. map right data type to the record, for e.g. you can use measure name to indicate type of data - str/numeric. 'json' can be converted to string and persisted in time stream.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;prepare_measure&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;measure_name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;prepare measure with attrs name, value and type
        Convert all JSON to VARCHAR and numeric type as BIGINT
        Args:
            payload: user data as dict
            measure_name: name of measure

        Returns:
            dict of measure
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="n"&gt;_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;VARCHAR&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;count&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;measure_name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;measure_name&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;measure_name&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="c1"&gt;# indicates not relevant for when this measure has no collected data in api
&lt;/span&gt;            &lt;span class="n"&gt;measure_value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;measure_name&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
            &lt;span class="n"&gt;measure_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;BIGINT&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# measures such as request_id, user_id (varchar)
&lt;/span&gt;            &lt;span class="c1"&gt;# always expected in payload
&lt;/span&gt;            &lt;span class="n"&gt;measure_value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;measure_name&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
            &lt;span class="n"&gt;measure_type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;VARCHAR&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;measure_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;measure_value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Type&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;measure_type&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;b. add measures in row itself for year, month and date, so that you can query it later.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;prepare_partition_measures&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;prepare measure to use as partition

        Args:
            timestamp: timestamp as string for this time stream entry

        Returns:
            list of dicts, each as a time stream measure containing Name, Value &amp;amp; Type
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;year&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;month&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;day&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;split_date_time&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# implement split
&lt;/span&gt;        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;[{&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;year&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;year&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Type&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;VARCHAR&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;month&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;month&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Type&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;VARCHAR&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;day&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;day&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Type&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;VARCHAR&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="p"&gt;}]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;c. prepare common attributes&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;   &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;prepare_common_attributes&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
       &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;prepare common attributes for timestream such as dimensions, measure
       name and type as MULTI
       Args:
           payload: dict of user data

       Returns:
           dict
       &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
       &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
           &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Dimensions&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
               &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;api&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Value&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;request_url&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]}&lt;/span&gt;
           &lt;span class="p"&gt;],&lt;/span&gt;
           &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;MeasureName&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;input_output&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;MeasureValueType&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;MULTI&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
       &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;d. capture exception when write fails&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Write records to Timestream
&lt;/span&gt;        &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;write_records&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;DatabaseName&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;TableName&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;CommonAttributes&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;prepare_common_attributes&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# pylint: disable=broad-except
&lt;/span&gt;    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;exceptions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;RejectedRecordsException&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;RejectedRecords: &lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;""&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;RejectedRecords&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
            &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Rejected Index: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;RecordIndex&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; : &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Reason&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="k"&gt;raise&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;3. Data is retained in time stream database only for minimal period (1 day) after which it is sent to 'magnetic' store, and kept there for about a week ( magnetic store can be queried). Data is written only to memory store, not to magnetic store. 'EnableMagneticStoreWrites' is set to 'false'. We dont need to write there, as almost instantly, the data to write is available when a request to API comes in (API processing takes few seconds only) - data (identified by timestamp) is stale only by few seconds&lt;/p&gt;

&lt;p&gt;From AWS: "The memory store is optimized for high throughput data writes and fast point-in-time queries.&lt;br&gt;
The magnetic store is optimized for lower throughput late-arriving data writes, long term data storage, and fast analytical queries."&lt;/p&gt;
&lt;h3&gt;
  
  
  FLOW 2: Analytics - uploading time stream data to S3
&lt;/h3&gt;

&lt;p&gt;1. The lambda : "lambda_analytics.lambda_handler" is also triggered on a daily basis at 00 hours GMT, by an event bridge rule (CRON job). Since this is an event, control code flow to invoke: 'load_timestream_data_to_s3' method ( we are using same lambda to create timestream entry and also to upload data to S3) - to keep logic together and for traceability.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;load_timestream_data_to_s3&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;query time stream DB
     Returns:
         count of rows processed
     &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

    &lt;span class="c1"&gt;# Initialize the Timestream write client
&lt;/span&gt;    &lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;timestream-query&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Execute the query
&lt;/span&gt;    &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;query&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;QueryString&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;get_query&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

    &lt;span class="c1"&gt;# Extract the column names and rows
&lt;/span&gt;    &lt;span class="n"&gt;columns&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;column&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;column&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ColumnInfo&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]]&lt;/span&gt;

    &lt;span class="n"&gt;rows&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Rows&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;nothing found in query to upload&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;

    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; found for upload..&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;buffer_map&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_key&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;get key to use
        Include prefix, and partition if any
        Returns:
            get the S3 bucket key to use
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="c1"&gt;# day, month, year are at fixed positions at end
&lt;/span&gt;        &lt;span class="n"&gt;day&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;row_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;row_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="n"&gt;month&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;row_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;row_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="n"&gt;year&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;row_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;row_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="c1"&gt;# prepare the partition
&lt;/span&gt;        &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my_analytics/year=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="n"&gt;year&lt;/span&gt;
        &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/month=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="n"&gt;month&lt;/span&gt;
        &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/day=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="n"&gt;day&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;

    &lt;span class="c1"&gt;# assuming 128 MB would be sufficient to hold data in mem before dumping to s3
&lt;/span&gt;    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create_buffer_write&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="c1"&gt;# Prepare the CSV data
&lt;/span&gt;        &lt;span class="n"&gt;csv_buffer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;io&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;StringIO&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;csv_writer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;csv&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;writer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;csv_buffer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="c1"&gt;# Write the header
&lt;/span&gt;        &lt;span class="n"&gt;csv_writer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;writerow&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;columns&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;buffer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;csv_buffer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;writer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;csv_writer&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c1"&gt;# Write the rows
&lt;/span&gt;    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;row&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;row_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ScalarValue&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;NA&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;row&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]]&lt;/span&gt;
        &lt;span class="n"&gt;obj_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;get_key&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;obj_key&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;buffer_map&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;buffer_map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;obj_key&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_buffer_write&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;buffer_map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;obj_key&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;writer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;writerow&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;row_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="nf"&gt;do_upload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;buffer_map&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_query&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;create and return query to use

    Returns:
        string query
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="c1"&gt;# query exactly entire yesterday's data
&lt;/span&gt;
    &lt;span class="c1"&gt;# Define the database and table names
&lt;/span&gt;    &lt;span class="n"&gt;db&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;TIME_STREAM_DB_NAME&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;table&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;environ&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;TIME_STREAM_TABLE_NAME&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Define the Timestream query, 1 day ago's data
&lt;/span&gt;    &lt;span class="n"&gt;ago_range&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1d&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

    &lt;span class="c1"&gt;# these cols are expected based on how we inserted the data
&lt;/span&gt;    &lt;span class="n"&gt;query_string&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SELECT col1, col2, coln, year, month, day FROM &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt;{db}&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt;.&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt;{table}&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt; WHERE time &amp;gt;= date_trunc(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;day&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, ago(&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;ago_range&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;))&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt; AND time &amp;lt; date_trunc(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;day&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, now()) ORDER BY time ASC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;query_string&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;do_upload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;buffer_map&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Upload data to s3

    Args:
        buffer_map: with key as object key and value as dict of buffer and writer objects

    Returns:
        nothing
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;s3&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# Define the S3 bucket and object key
&lt;/span&gt;    &lt;span class="n"&gt;bucket_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my-bucket&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;obj_key&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;buffer_map&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;s3_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;obj_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;uuid&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;uuid4&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="c1"&gt;# Upload the CSV data to S3
&lt;/span&gt;        &lt;span class="nb"&gt;buffer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nb"&gt;buffer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;buffer_map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;obj_key&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;buffer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;  uploading to bucket &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; with key &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;put_object&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Bucket&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Body&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getvalue&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="k"&gt;finally&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
             &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;2. Code forms a time stream DB query to query a day's ago of data, prepares the data buffer in memory, and uses the day/month/year fields in the record itself, to create a partition key, where the file with a unique name is finally uploaded&lt;/p&gt;

&lt;p&gt;3. In case of any errors, an email notification with exception trace can be sent to the administrators. Implement your logic.&lt;/p&gt;

&lt;p&gt;4. Now this partitioned data lake can be used to run analytical queries using Athena ( we can create table and load partitions when needed ) or can be downloaded when needed to analyze data via console or via APIs described below&lt;/p&gt;

&lt;p&gt;5. Note that the timestamp here can be used to trace the complete trace in Cloudwatch logs, as the timestamp is when the request reached the lambda function - this should be the first thing done in your lambda: instantiate the record that would go in for analytics with current timestamp&lt;/p&gt;

&lt;h3&gt;
  
  
  FLOW 3: Analytics - download files for analytics
&lt;/h3&gt;

&lt;p&gt;Build API to download analytica data such as: (/download/usage/start_date=yyyy-MM-dd&amp;amp;end_date=yyyy-MM-dd)&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Approach/ Implementation for this API&lt;/em&gt;:&lt;/p&gt;

&lt;p&gt;User provides date range, and this API would prepare the signed URL for "expected" (need not be already created !) object to be produced yet &amp;amp; send the request to another lambda for processing via SQS.&lt;/p&gt;

&lt;p&gt;Message sending to SQS produces an ID called here 'sqs_identifier'&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;start_date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;end_date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;end_date&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;message_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AnalyticsDataDownloadSQS&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish_queue_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;#implement your logic
&lt;/span&gt;
    &lt;span class="c1"&gt;# use the file name : tmp/message_id.zip
&lt;/span&gt;    &lt;span class="n"&gt;download_link&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;generate_presigned_download_url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# generate the pre signed URL even when download file is not ready, with a message
&lt;/span&gt;    &lt;span class="c1"&gt;# this avoids sending an email later, when zip is done
&lt;/span&gt;    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;result&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ok&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Your file is being prepared, use the link to download file after couple of   minutes&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;download_link&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;download_link&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The processing lambda "lambda_download_analytics.lambda_handler" would iterate over the date range to download data from S3 analytics bucket into '/tmp' location (of the lambda ephemeral space), zip the files, and upload to a 'tmp' prefix in S3 with a pre-defined name matching with what API caller was sent back as the download link for the file.&lt;/p&gt;

&lt;p&gt;The name of the zip is '.zip' - same file name is used when creating pre signed URL for the user to download.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;    &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;makedirs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exist_ok&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;delete_files_in_directory&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# implement your logic
&lt;/span&gt;
    &lt;span class="c1"&gt;# Iterate over the date range
&lt;/span&gt;    &lt;span class="n"&gt;current_date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strptime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%Y-%m-%d&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;end_date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strptime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;end_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%Y-%m-%d&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;s3&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Define the S3 bucket and object key
&lt;/span&gt;    &lt;span class="n"&gt;bucket_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ANALYTICS_BUCKET&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt; &lt;span class="c1"&gt;# implement your code
&lt;/span&gt;
    &lt;span class="c1"&gt;# download for both inclusive dates
&lt;/span&gt;    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;current_date&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="n"&gt;end_date&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;prefix&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my_analytics/year=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;year&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/month=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%B&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/day=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;day&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;f_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;year&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%B&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;day&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;current_date&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;days&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;listing contents under prefix &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;prefix&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;list_objects_v2&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Bucket&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;prefix&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Contents&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Contents&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
                &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
                &lt;span class="n"&gt;download_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;f_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;basename&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
                &lt;span class="n"&gt;download_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;download_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;download_file&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;download_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;    downloaded &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; as &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;download_nam&lt;/span&gt;
                &lt;span class="n"&gt;at_least_one_file_found&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Truee&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;   no files found&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# name should align  as the pre signed
&lt;/span&gt;    &lt;span class="c1"&gt;# URL was already generated for this name
&lt;/span&gt;    &lt;span class="c1"&gt;# and sent back to caller when request was submitted
&lt;/span&gt;    &lt;span class="n"&gt;s3_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;tmp/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.zip&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="c1"&gt;# Create an in-memory bytes buffer
&lt;/span&gt;    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;io&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;BytesIO&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

        &lt;span class="c1"&gt;# zip buffer may be empty if no files were found in the given date range
&lt;/span&gt;        &lt;span class="nf"&gt;zip_files&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;at_least_one_file_found&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;no files were found in the given date range, empty zip file being created !!&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;upload_fileobj&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Uploaded zip to s3://&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="nf"&gt;delete_files_in_directory&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Approach/ Implementation for this API&lt;/em&gt;:&lt;/p&gt;

&lt;p&gt;User provides date range, and this API would prepare the signed URL for "expected" (need not be already created !) object to be produced yet &amp;amp; send the request to another lambda for processing via SQS.&lt;/p&gt;

&lt;p&gt;Message sending to SQS produces an ID called here 'sqs_identifier'&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;start_date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;end_date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;end_date&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;message_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AnalyticsDataDownloadSQS&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish_queue_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;#implement your logic
&lt;/span&gt;
    &lt;span class="c1"&gt;# use the file name : tmp/message_id.zip
&lt;/span&gt;    &lt;span class="n"&gt;download_link&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;generate_presigned_download_url&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# generate the pre signed URL even when download file is not ready, with a message
&lt;/span&gt;    &lt;span class="c1"&gt;# this avoids sending an email later, when zip is done
&lt;/span&gt;    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;result&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ok&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Your file is being prepared, use the link to download file after couple of   minutes&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;download_link&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;download_link&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The processing lambda "lambda_download_analytics.lambda_handler" would iterate over the date range to download data from S3 analytics bucket into '/tmp' location (of the lambda ephemeral space), zip the files, and upload to a 'tmp' prefix in S3 with a pre-defined name matching with what API caller was sent back as the download link for the file.&lt;/p&gt;

&lt;p&gt;The name of the zip is '.zip' - same file name is used when creating pre signed URL for the user to download.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;    &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;makedirs&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;exist_ok&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;delete_files_in_directory&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# implement your logic
&lt;/span&gt;
    &lt;span class="c1"&gt;# Iterate over the date range
&lt;/span&gt;    &lt;span class="n"&gt;current_date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strptime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%Y-%m-%d&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;end_date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strptime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;end_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%Y-%m-%d&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;date&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;s3&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;boto3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s3&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Define the S3 bucket and object key
&lt;/span&gt;    &lt;span class="n"&gt;bucket_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;ANALYTICS_BUCKET&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt; &lt;span class="c1"&gt;# implement your code
&lt;/span&gt;
    &lt;span class="c1"&gt;# download for both inclusive dates
&lt;/span&gt;    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;current_date&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="n"&gt;end_date&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;prefix&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;my_analytics/year=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;year&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/month=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%B&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/day=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;day&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;f_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;year&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%B&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;current_date&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;day&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;current_date&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;days&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;listing contents under prefix &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;prefix&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;list_objects_v2&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Bucket&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;prefix&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Contents&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Contents&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
                &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;obj&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Key&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
                &lt;span class="n"&gt;download_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;f_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;basename&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
                &lt;span class="n"&gt;download_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;download_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;download_file&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;download_name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;    downloaded &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; as &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;download_nam&lt;/span&gt;
                &lt;span class="n"&gt;at_least_one_file_found&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Truee&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;   no files found&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# name should align  as the pre signed
&lt;/span&gt;    &lt;span class="c1"&gt;# URL was already generated for this name
&lt;/span&gt;    &lt;span class="c1"&gt;# and sent back to caller when request was submitted
&lt;/span&gt;    &lt;span class="n"&gt;s3_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;tmp/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.zip&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="c1"&gt;# Create an in-memory bytes buffer
&lt;/span&gt;    &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="n"&gt;io&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;BytesIO&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

        &lt;span class="c1"&gt;# zip buffer may be empty if no files were found in the given date range
&lt;/span&gt;        &lt;span class="nf"&gt;zip_files&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;at_least_one_file_found&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;no files were found in the given date range, empty zip file being created !!&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;s3&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;upload_fileobj&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;buffer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Uploaded zip to s3://&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;s3_key&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="nf"&gt;delete_files_in_directory&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;/tmp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>timestream</category>
      <category>aws</category>
      <category>serverless</category>
      <category>sqs</category>
    </item>
    <item>
      <title>Customized Scaling of AWS ECS</title>
      <dc:creator>Vibhor Agarwal</dc:creator>
      <pubDate>Sat, 17 Aug 2024 16:44:16 +0000</pubDate>
      <link>https://dev.to/vibhor_agarwal/customized-scaling-of-aws-ecs-39ge</link>
      <guid>https://dev.to/vibhor_agarwal/customized-scaling-of-aws-ecs-39ge</guid>
      <description>&lt;p&gt;&lt;strong&gt;Summary&lt;/strong&gt;&lt;br&gt;
There are multiple use cases to containerize and host proprietary applications on AWS ECS which is “a fully managed container orchestration service that makes it easy for you to deploy, manage, and scale containerized applications”&lt;/p&gt;

&lt;p&gt;Scaling ECS then is one of the key needs of any application. This article describes challenges with ECS supported scaling and describes a custom solution to alleviate them. Thanks to &lt;a href="https://www.linkedin.com/in/edwin-essenius-1960951/?originalSubdomain=nl" rel="noopener noreferrer"&gt;Edwin Essenius&lt;/a&gt; for the mentorship.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Problem Statement&lt;/strong&gt;&lt;br&gt;
In this scenario, application hosted on AWS ECS can process a wide range of requests, each with unpredictable compute usage and uncertain execution time.&lt;/p&gt;

&lt;p&gt;The core back-end architecture on AWS is based on asynchronous processing, where SQS receives the requests. The application on AWS ECS (Fargate) polls continuously on SQS for incoming requests and process them.&lt;/p&gt;

&lt;p&gt;One of the use cases for us is that the users can split a single large request into multiple smaller requests and distribute them asynchronously to cloud, for parallel processing to reduce the overall processing time by even 10-12 times (from even hours to minutes). ECS needs to scale quickly &amp;amp; accurately to be able to serve the spikes in demand. &lt;/p&gt;

&lt;p&gt;The ECS task needs to run the application for an uncertain amount of time, could be seconds or days. The compute usage statistics are not dependable to find if a particular task is active. This means that the scale-in action can kill active tasks and jobs which is highly undesirable.&lt;br&gt;
Implementing scaling with the below policies to meet above requirements is a challenge.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Target Tracking Scaling Policies&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Link below describe automatic scaling support from ECS.&lt;br&gt;
&lt;a href="https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-auto-scaling.html" rel="noopener noreferrer"&gt;https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-auto-scaling.html&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The default automatic scaling policy support from ECS can increase or decrease the number of tasks that your service runs based on a target value for a specific metric. &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; The scale out action based on compute usage spikes, but there would be cases when compute usage is un-predictable, where few requests consume negligible compute but need to parallel processing with others&lt;/li&gt;
&lt;li&gt; Inability to configure the exact number of tasks needed with out-of-the-box basic scaling&lt;/li&gt;
&lt;li&gt; Default scale out takes minutes before application in the container picks requests for processing&lt;/li&gt;
&lt;li&gt; Default scale in takes 5-15 minutes causing wasted compute adding to unnecessary costs. More importantly, the scale-in policy cannot identify &amp;amp; stop the actual idle tasks, just based on compute usage, resulting in stopping active tasks !!&lt;/li&gt;
&lt;li&gt; For the scale-in event, KILL signal sent by ECS expects tasks to complete their job within 30 seconds, which was not possible, depending on the nature of the application.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;em&gt;Step Scaling Policies&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Limitations of basic target-tracking policies drove the need to set up customized step auto-scaling policies based on cloud watch metrics on request count on SQS (possible with both default from ECS or custom metrics.&lt;br&gt;
These policies use Cloud Watch alarms and aggregates metric data points based on the statistic for the metric. On breach of the alarm, the appropriate scaling policy in invoked. &lt;br&gt;
Step scaling policies are complex and in-capable to decide exact desired count needed. Additionally, Cloud Watch alarms are costly &amp;amp; slow in response. &lt;br&gt;
The core issues with still slow scaling (out or in) stays un-resolved and the scaling policy cannot find the idle tasks to stop when scaling in, which is the key requirement. Even the advanced customized auto scaling policies can only “approximate” scaling needs.&lt;/p&gt;

&lt;p&gt;The link below is an interesting read on how cluster auto scaling works, the complexity and math applied in implementing scaling policy.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aws.amazon.com/blogs/containers/deep-dive-on-amazon-ecs-cluster-auto-scaling/" rel="noopener noreferrer"&gt;https://aws.amazon.com/blogs/containers/deep-dive-on-amazon-ecs-cluster-auto-scaling/&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Custom Solution
&lt;/h2&gt;

&lt;p&gt;&lt;em&gt;Summary&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;With multiple limitations in what ECS supports by default,  there is a need to build a  custom scaling solution by updating desired count on AWS ECS Fargate service. In the example above, desired count is known by querying SQS for the available message count.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Extend solution with ECS Capacity Providers&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;An EC2 auto-scaling group can provide capacity to ECS instead of the serverless Fargate option; preferred in certain scenarios such as:&lt;/p&gt;

&lt;p&gt;a.  ECS Fargate does not support exceptionally large compute (up-to 16 vCPUs now)&lt;br&gt;
b.  The image caching feature of container is another valuable proposition when using ECS with EC2, especially when images are large. Currently, Fargate does not offer container image caching feature. This allows single EC2 to run multiple instances of the container but downloading the container image only once for that EC2.&lt;br&gt;
c.  EC2 warm-up pool saves on instance provisioning times unlike Fargate instances&lt;br&gt;
d.  Need for more control over infrastructure, for example, specific OS configuration.&lt;/p&gt;

&lt;p&gt;Extend solution to support ECS with EC2, to simultaneously update auto-scaling group (ASG) configured as the “capacity provider” to ECS service. This is more complex to design &amp;amp; configure.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Solution Components &amp;amp; Configuration&lt;/em&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; AWS ECS runs application container on Fargate or uses auto-scaling group (ASG) as the “capacity provider”&lt;/li&gt;
&lt;li&gt; An event bridge bus with a set of rules intercepts the scaling event and triggers a lambda (target) which runs ECS scaling logic, referred now as ecs-scaling-lambda&lt;/li&gt;
&lt;li&gt; ecs-scaling-lambda’s environment is prepared with required properties to talk to AWS such as queue name, ECS cluster &amp;amp; service details, min/max desired count. With ASG,  attributes such as per EC2 capacity, min/max ASG count.&lt;/li&gt;
&lt;li&gt; Configure ecs-scaling-lambda as ASG custom termination policy (for ECS with EC2). Per documentation Amazon EC2 Auto Scaling uses termination policies to prioritize which instances to terminate first when decreasing the size of your Auto Scaling group (referred to as scaling in). However, this works fine even to stop EC2 to return them to the warm pool.&lt;/li&gt;
&lt;li&gt; Design lambda to respond to scale up and scale down events. Additionally, configure lambda to respond to auto-scaling service scale-down event, with the list of EC2 to stop (needed when ASG is the “capacity provider”)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;em&gt;Solution Architecture&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbwl7bqggkds7z0i88lwq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbwl7bqggkds7z0i88lwq.png" alt="Solution Architecture" width="800" height="412"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Implementation&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;ECS Scaling Lambda Handler&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;The lambda handler function responds to these event types &lt;/p&gt;

&lt;p&gt;a.  Scale up for new requests&lt;br&gt;
b.  Scale down when task shuts down &lt;br&gt;
c.  Respond to ASG scale-down event with the idle instance-ids to stop.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def lambda_handler(event, context):
    # Read event type, parse it based on your application
    # ASG when scaling in sends an event with cause SCALE_IN
    event_type = get_event_type(event)

    if event_type == "_scale_up":
        return scale_up()

    if event_type == "_scale_down":
        return scale_down()

    if event_type == "asg_scale_down":
        # Response to auto-scaling service with idle EC2
     # Reset idle_ec2 in environment to empty
        return  {"InstanceIDs": [env.idle_ec2]}

    return None
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Scale Up&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;On receiving a request, application emits a scale-up event. For example, application may receive a request via an API or via an async trigger such as, on a file upload.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  # Application when receiving a request
  event_bridge.put_event(source,"_scale_up", event)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;On receiving a scale-up event, ecs-scaling-lambda calculates &amp;amp; updates the new desired count on ECS based on its current running &amp;amp; pending task count; and based on pending requests on the queue.&lt;br&gt;
The lambda caps the maximum desired count as configured is its environment (same as on the ECS service).&lt;br&gt;
When using ASG, start EC2s on scale-up event by calculating and updating ASG’s desired count, again based on ECS scaling status and per EC2 processing capability (e.g., a 32 vCPU EC2 can process four requests when one request uses maximum 8 vCPU). ASG “capacity provider” itself is configured with placement strategy “binpack” to maximize compute utilization &amp;amp; run with minimal instances.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def scale_up():
   queued = Queue.get_available_messages_count()
   if queued &amp;lt;= 0:
      return None

   desired, running = ECS.get_task_count()

   pending = desired - running
   to_add = queued - pending

   if to_add &amp;lt;= 0:
      update_asg_desired(desired)
      return desired

   # Limit to count as configured
   tasks_desired = desired + to_add
   tasks_desired = min(tasks_desired, env.max_tasks)

   # Need update of ASG for EC2 deployments and new tasks
   update_asg_desired(tasks_desired)
   ECS.update_ecs_desired(tasks_desired)


def update_asg_desired(tasks_desired):

    if not env.is_asg_providing_capacity:
        return None

    # Query ASG to fetch in service instances, and its desired count
    in_service_instances, desired = AutoScaling.describe()

    # Calculate capacity of 1 EC2
    capacity = desired * env.ec2_capacity

    if capacity &amp;gt;= tasks_desired:
        return None

    # Calculate new ASG desired count
    new_desired = int(math.ceil((tasks_desired - capacity) / 
                                env.ec2_capacity)) + desired

    # But cap with maximum ASG size
    new_desired = min(new_desired, env.max_ec2)

    # Update ASG to start exact needed EC2
    AutoScaling.set_desired_count(new_desired, desired)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Scale Down&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Container triggers scale down event&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The container application keeps on processing requests and checks if it has been idle for too long (for example, idle time of 30 seconds). Only the container in the task decides when it is idle &amp;amp; when idle, requests ECS for its shutdown, stops accepting any more requests and generates a scale-down event.&lt;br&gt;
The task itself needs to query ECS to fetch running count and request shutdown but try to keep minimum desired count.&lt;/p&gt;

&lt;p&gt;Stopping of ‘self’ is the key to be able request ECS service for a graceful shutdown.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  # ----- Code Inside Container ----

    # Container starting. Define an Exit Handler
    exit_handler = ExitHandler()
    signal.signal(signal.SIGTERM, exit_handler.shutdown)

    while not exit_handler.stop:
        if shutdown_mode:
            # Do not pick any requests.
            # Though the SIGTERM is immediate
            time.sleep(0.5)
            continue
        # Main app processing logic
        if is_request_available():
            process()
        else:
            # Check since how long the process has been idle 
         shutdown_mode = stop_if_idle(last_active_at, timeout)

    def stop_if_idle(last_active_at, timeout):
        # If the task had been idle for too long, stop itself
        if (time.time() - last_active_at) &amp;lt;= timeout:

            # Check ECS for if running more than minimum tasks
        can_i_shut_down()

           # Query ECS metadata service to get own (task) ARN
           ecs.stop_task(cluster=self.cluster,
                         task=self.task_arn,
                         reason="Custom scale in")
           # Scale down event for this task which wants to stop
           EventBridge.put_event(self.service_arn, "_scale_down", 
                                  {'timestamp': str((time.time()))},
                                  [self.task_arn])
           return True
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;AWS ECS service on the other hand, on receiving the STOP request sends a KILL SIGNAL to this task, which the container process reads and exits completely. The task finally shuts down gracefully.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;ecs-scaling-lambda responds to scale down event&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;ecs-scaling-lambda intercepts scale-down event and decrements ECS desired task count. While ECS service performs the action of stopping the task by sending KILL signal, the decremented desired count ensures a replacement task is not spun. “Stopping the task” and “decrementing desired count” works together in conjunction.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def scale_down():
      ECS.decrement_desired_tasks()
      if env.is_asg_providing_capacity:
         scale_down_asg()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With shutdown of idle tasks one by one, finally ECS runs minimum desired count.&lt;/p&gt;

&lt;p&gt;With ASG when used, to scale down ASG to the needed EC2 count, find idle EC2, and ask auto-scaling service to shut down only the idle instances, and decrement ASG’s desired count simultaneously.&lt;/p&gt;

&lt;p&gt;In the below code snippet, the update of desired count on ASG results in auto-scaling service invoking ecs-scaling-lambda again, asking for list of EC2s to stop. Configure a  custom termination policy on ASG to stop only the idle instances.&lt;br&gt;
The lambda responds with the list of idle EC2 instance-ids (see lambda_handler definition) and auto-scaling service either stops them to return them to the warm pool or terminates them when there is no warm pool used.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def scale_down_asg():

     # Query auto scaling service for in-service EC2, desired count
     in_service_instances, current_desired = AutoScaling.describe()

    # Query ECS to find EC2 that are in use by the tasks
    ecs_instances = ECS.get_instances_in_use()
    new_desired = len(ecs_instances) if ecs_instances else 0

    # Find idle EC2
    if not ecs_instances:
        idle = in_service_instances
    else:
        idle = in_service_instances - ecs_instances

    if not idle:
         return
    if not ecs_instances:
         # ECS is not using any EC2, ASG desired be 0
         new_asg_desired = 0
    else:
         new_asg_desired = len(ecs_instances)

    # Only decrement ASG desired count
    # This throws an event from AWS auto-scaling service that   
    # Lambda capture and returns actual instance IDS to 
    # Stop (with warm pool) or terminate
    AutoScaling.set_desired_count(new_asg_desired,            
                            current_desired)

    # Update Lambda environment with the set of idle EC2
    # Return them auto scaling service to stop these idle
    env.idle_ec2 = idle
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Design Considerations&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;To accurately read the pending request count on SQS, use “FIFO” queue  and not “STANDARD” queue which “almost” guarantees accuracy (with slight delays seen up-to one second) in synchronizing the queue attributes. Scaling lambda waits &amp;amp; reads queue attributes after a second of the request made.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Run scaling lambda with a reserved concurrency of one to avoid concurrent updates on the ECS service from multiple scaling events received at the same time. The ecs-scaling-lambda responds very quickly to the events, and a fixed concurrency of one adds negligible overhead.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The capacity provider considered in design is 100% by either FARGATE or by EC2. Mixing “capacity provider” types would result in un-desired behavior.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If using ASG as the “capacity provider” use the placement strategy binpack. This leaves  the least amount of unused CPU or memory. This strategy minimizes the number of container instances in use. Additionally, start with using no placement constraints. Turn off “instances protected from scale-in” on ASG, for the custom scaling to work.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The capacity provider should still have “ecs managed scaling” turned on. Reason - If the scaling is “managed” by ECS, the ECS service waits for EC2s to come up and does not fail the tasks at once due to lack of available instances.&lt;br&gt;
Also, turn off managed termination protection for the capacity provider.&lt;br&gt;
Delete any lifecycle hooks on ASG that may intervene with the custom scaling service &amp;amp; add overhead.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If using ASG, use ASG warm pool to save on time (turn on reuse on scale-in), to provision new instances.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;For improved performance, remember to re-use cached AWS connections in the lambda for improved performance &amp;amp; throughout your application&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;More Ideas&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Before sending requests for processing, a trusted client can ask for capacity up front. Integrated with ecs-scaling-lambda, increment ECS desired count for the  “expected” demand as asked by a smart client.&lt;/li&gt;
&lt;li&gt; With automated deployments, a new deployment would replace tasks &amp;amp; may shut down “active” processes, which is un-desired. Abort deployment if ECS is busy processing, by querying ECS desired/pending count to check if scaling is in progress &amp;amp; by querying cloud watch log activity from the container.&lt;/li&gt;
&lt;li&gt; To capture scaling metrics, persist ECS desired count update actions in a timestream database. One use case could be to see ECS scaling status and analyze in real-time how busy the system is.&lt;/li&gt;
&lt;li&gt; Asynchronous invocation of ecs-scaling-lambda means errors may go un-noticed. Configure destinations on ecs scaling lambda to be able to send out SNS email notifications on failed invocations.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Solution Benefits&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The solution is highly scalable, fast, re-usable, robust, and cost-effective for any AWS ECS deployment that needs to scale.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Lambda itself is serverless pay-as-you-go service and runs with minimal compute (128 MB) to avoid costs &amp;amp; overhead of other components such as Cloud watch metrics &amp;amp; alarms or other resources&lt;/li&gt;
&lt;li&gt; Control exact desired count on ECS service. Custom calculation logic allows maximum control over updates on desired task count. For instance, can start say 10% or 4 more tasks to keep few instances warmed up.&lt;/li&gt;
&lt;li&gt; Event bridge is fast and within few hundred milliseconds of the scaling event, lambda can process it to update ECS, as compared against at least two minutes delay with out-of-the-box scaling solution. This means extremely fast response to scaling events. For the scale-up events, updates on desired count of ECS is immediate. With FARGATE, the tasks would start provisioning at once and with EC2 “capacity provider”, the EC2 would start also provisioning at once. &lt;/li&gt;
&lt;li&gt; The container can reliably run for as long as possible, even days, and can save considerable costs by shutting down with just few seconds of idle time (the logic within container as described). The solution gives complete control to the application on how long it wants to run, when it is idle.&lt;/li&gt;
&lt;li&gt; Simple solution that does not use any of the ECS target tracking, step scaling or other custom metrics-based policies.&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>aws</category>
      <category>ecs</category>
      <category>autoscaling</category>
      <category>fargate</category>
    </item>
  </channel>
</rss>
