<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: ledux</title>
    <description>The latest articles on DEV Community by ledux (@ledux).</description>
    <link>https://dev.to/ledux</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F450503%2F65bd1351-afc1-479d-8036-269c89025f1a.png</url>
      <title>DEV Community: ledux</title>
      <link>https://dev.to/ledux</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ledux"/>
    <language>en</language>
    <item>
      <title>AWS Stack in UPDATE_ROLLBACK_FAILED state</title>
      <dc:creator>ledux</dc:creator>
      <pubDate>Thu, 19 Oct 2023 12:27:59 +0000</pubDate>
      <link>https://dev.to/ledux/aws-stack-in-updaterollbackfailed-state-33c8</link>
      <guid>https://dev.to/ledux/aws-stack-in-updaterollbackfailed-state-33c8</guid>
      <description>&lt;p&gt;When your stack is in a UPDATE_ROLLBACK_FAILED state, then you cannot update your stack anymore. You have two solutions here:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Delete the whole stack and deploy it again&lt;/li&gt;
&lt;li&gt;Continue the rollback process&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In this post I am going to outline how to go route 2.&lt;/p&gt;

&lt;h2&gt;
  
  
  How to continue the update rollback
&lt;/h2&gt;

&lt;p&gt;When your stack is in UPDATE_ROLLBACK_FAILED state, this means that some resouces could not be rolled back.&lt;br&gt;
When you are going to continue the rollback process, you have to skip these.&lt;br&gt;
We are going to use the &lt;code&gt;aws cloudformation continue-update-rollback&lt;/code&gt; command, which has a parameter &lt;code&gt;--resources-to-skip&lt;/code&gt;. &lt;br&gt;
This parameter takes one or more &lt;code&gt;LogicalResourceId&lt;/code&gt;s, separated by a space.&lt;/p&gt;

&lt;p&gt;To get these IDs, this command can be used:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws cloudformation describe-stack-resources &lt;span class="nt"&gt;--stack-name&lt;/span&gt; &amp;lt;stackname&amp;gt; | jq &lt;span class="s1"&gt;'.StackResources[] | select(.ResourceStatus == "UPDATE_FAILED") | .LogicalResourceId'&lt;/span&gt; | &lt;span class="nb"&gt;tr&lt;/span&gt; &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'"'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;These values you can then use in the following command.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt; aws cloudformation &lt;span class="k"&gt;continue&lt;/span&gt;&lt;span class="nt"&gt;-update-rollback&lt;/span&gt; &lt;span class="nt"&gt;--stack-name&lt;/span&gt; &amp;lt;stackname&amp;gt; &lt;span class="nt"&gt;--resources-to-skip&lt;/span&gt; &amp;lt;ResourceOne&amp;gt; &amp;lt;ResourceTwo&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To skip resources means, that they will just get the &lt;code&gt;ResourceStatus = RollbackComplete&lt;/code&gt;. &lt;br&gt;
It doesn't mean, that an actual rollback was performed.&lt;br&gt;
This leaves the stack in an inconsistent state.&lt;br&gt;
But it is ready to have another try to update it.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>cloudformation</category>
      <category>awscli</category>
    </item>
    <item>
      <title>Search in Kinesis</title>
      <dc:creator>ledux</dc:creator>
      <pubDate>Fri, 24 Mar 2023 13:17:00 +0000</pubDate>
      <link>https://dev.to/ledux/search-in-kinesis-42c3</link>
      <guid>https://dev.to/ledux/search-in-kinesis-42c3</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;We once couldn't process data records from a Kinesis data stream with a certain value in a field. After we fixed the bug, we needed to reprocess these records. &lt;/p&gt;

&lt;p&gt;This blog post describes how I read the data records from the stream, filtered them by the value in question and prepared them for reprocessing.&lt;/p&gt;

&lt;p&gt;In this post I will cover the following topics&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;What is Kinesis&lt;/li&gt;
&lt;li&gt;How to read records from Kinesis, using the &lt;code&gt;aws&lt;/code&gt; cli&lt;/li&gt;
&lt;li&gt;How to process json using the &lt;code&gt;jq&lt;/code&gt; cli&lt;/li&gt;
&lt;li&gt;How to read only records which match certain criteria&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  What is Kinesis
&lt;/h2&gt;

&lt;p&gt;Kinesis data stream is an AWS service for data processing. Producer of data send data records to Kinesis, consumers can then process them.&lt;/p&gt;

&lt;p&gt;A data stream consists of one or more &lt;code&gt;shard&lt;/code&gt;s. A shard is a sequence of data records and are used for scaling the throughput. Ideally every shard has its own consumer.&lt;/p&gt;

&lt;p&gt;On which shard a data records will be stored, is determined by the &lt;code&gt;PartitionKey&lt;/code&gt;. The partition key is defined by the producer and can be any field in the payload. Kinesis will hash this value and based on the outcome assign it to a shard.&lt;/p&gt;

&lt;p&gt;Every data record gets a &lt;code&gt;SequenceNumber&lt;/code&gt;. This number identifies the data record inside the shard. It is an increasing number but not evenly distributed.&lt;/p&gt;

&lt;h3&gt;
  
  
  ShardIterator
&lt;/h3&gt;

&lt;p&gt;Kinesis is designed to read the ingested data in order it came in. There is no mechanism to search or filter the data. A consumer can only read the data records forward in sequence. &lt;/p&gt;

&lt;p&gt;To read the data, the consumer needs a &lt;code&gt;ShardIterator&lt;/code&gt;. It represents the position from where the consumer will start reading.&lt;/p&gt;

&lt;p&gt;Depending on how the first record is defined, there are different iterator types:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;AT_SEQUENCE_NUMBER&lt;/strong&gt;: starts at the record with the exact number provided or the record with the next higher number&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;AFTER_SEQUENCE_NUMBER&lt;/strong&gt;: starts at the record with the next higher number than the provided one&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;AT_TIMESTAMP&lt;/strong&gt;: starts at the record with the exact timestamp provided or the next higher one&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;LATEST&lt;/strong&gt;: starts with the next record created&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;TRIM_HORIZON&lt;/strong&gt;: starts the the first available record in the shard&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  AWS CLI
&lt;/h2&gt;

&lt;p&gt;AWS provides a command line interface (CLI) to interact with their services, thus also with Kinesis Data Streams.&lt;/p&gt;

&lt;p&gt;To read from Kinesis a ShardIterator is needed. Here I create one of type &lt;code&gt;AT_TIMESTAMP&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;stream-name
&lt;span class="nv"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;date&lt;/span&gt; &lt;span class="nt"&gt;--date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'2023-03-13 12:00:00'&lt;/span&gt; +%s&lt;span class="si"&gt;)&lt;/span&gt;
&lt;span class="nv"&gt;sharditerator&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;aws kinesis get-shard-iterator &lt;span class="nt"&gt;--stream-name&lt;/span&gt; &lt;span class="nv"&gt;$stream&lt;/span&gt; &lt;span class="nt"&gt;--shard-id&lt;/span&gt; 0 &lt;span class="nt"&gt;--shard-iterator-type&lt;/span&gt; AT_TIMESTAMP &lt;span class="nt"&gt;--timestamp&lt;/span&gt; &lt;span class="nv"&gt;$timestamp&lt;/span&gt; | jq .ShardIterator&lt;span class="si"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This shard iterator allows to read data records from Kinesis:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;aws kinesis get-records &lt;span class="nt"&gt;--shard-iterator&lt;/span&gt; &lt;span class="nv"&gt;$sharditerator&lt;/span&gt; &lt;span class="nt"&gt;--limit&lt;/span&gt; 10
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This gives an object with an array called &lt;code&gt;Records&lt;/code&gt;, where the Kinesis data records are listed. Additionally, there is the next shard iterator and how many milliseconds this batch is behind the latest data record.&lt;/p&gt;

&lt;p&gt;A data record contains the identification (&lt;code&gt;SequenceNumber&lt;/code&gt;), the timestamp, when the record was sent to Kinesis, the Id of the shard, and the actual data. The data is base64 encoded.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"Records"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"SequenceNumber"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"49633478129523018598145441545878647674003909456640868354"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"ApproximateArrivalTimestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2023-03-03T13:22:56.498000+01:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"Data"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"ewogICJmb3Jtc0RhdGEiOiB7CiAgICAiZmlyc3ROYW1lIjogIkVyaWMiLAogICAgImxhc3ROYW1lIjogIklkbGUiCiAgfSwKICAibWV0YWRhdGEiOiB7CiAgICAicGFyZW50SWQiOiAiNzgwN2IzNzMtNTRlNy00ZTAzLWE1ZGUtMGI0MDE2OGFmNTRiIgogIH0KfQo="&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"PartitionKey"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"0"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"SequenceNumber"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"49633478129523018598145441601514622818488773411761815554"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"ApproximateArrivalTimestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2023-03-03T13:26:30.057000+01:00"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"Data"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"ewogICJmb3Jtc0RhdGEiOiB7CiAgICAiZmlyc3ROYW1lIjogIkdyYWhhbSIsCiAgICAibGFzdE5hbWUiOiAiQ2hhcG1hbiIKICB9LAogICJtZXRhZGF0YSI6IHsKICAgICJpbmZvbWVldGluZ0lkIjogImt3MzlrenNhZjMiCiAgfQp9Cg=="&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"PartitionKey"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"0"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"NextShardIterator"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"AAAAAAAAAAHrZiW4CusNNQOgyPkpxttbi9hAPH35qT91FVx7RcmZKmSuzulLh0t16SAlG9jPUO+NJ0RPxfaWZaCjwusIjzxI3MBdGvKbJt/MX2bJHv2FTqiyArEDvuFBI0cvdNeX+T18wcnljCEZ3etm7tBkr9l84O0+1KakYygljotcBba49QLuvW3f90OXxXV9bam5HY3CmbxEr5fK5quRhoBgvhrvxBXUCvMoRCGzVn7krSr9EhZD79DwynYJ9qL3JY5/ZyVAMeh4a20ENkt6PR7MdUikElbjeyuvmeLBpOj+demEps/1NaHh2i5r1i/BRiemgj/5sii+bKcWGPqhkeEujl5+"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"MillisBehindLatest"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;953238000&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Kinesis will only return the data records in a certain time frame. Often times not even the amount specified with the &lt;code&gt;--limit&lt;/code&gt; parameter. To get the next batch of records, the same request must be sent, but with the &lt;code&gt;NextShardIterator&lt;/code&gt;. Also, to filter the data records on a field in the &lt;code&gt;Data&lt;/code&gt;, we need act on the JSON. &lt;/p&gt;

&lt;p&gt;There is a nice little powerful tool called &lt;code&gt;jq&lt;/code&gt; which can do that.&lt;/p&gt;

&lt;h2&gt;
  
  
  Excursion &lt;code&gt;jq&lt;/code&gt;
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://github.com/stedolan/jq"&gt;jq&lt;/a&gt; is a lightweight and flexible command-line JSON processor.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://stedolan.github.io/jq/tutorial/"&gt;There is a nice tutorial&lt;/a&gt;, but I'll give you the basics here:&lt;/p&gt;

&lt;p&gt;Just passing json to jq, it will pretty print the it, including syntax highlighting:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;json&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'[{"name":"John","age":30,"address":{"street":"123 Main St","city":"Anytown","country":"USA"}},{"name":"Jane","age":25},{"name":"Bob","age":40,"address":{"street":"456 Oak St","city":"Othertown","country":"USA"}}]'&lt;/span&gt;
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$json&lt;/span&gt; | jq &lt;span class="nb"&gt;.&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To get an entry in the array, we can use the array notation.&lt;br&gt;
To get the entries from the end, use negative numbers (&lt;code&gt;-1&lt;/code&gt; for the last, &lt;code&gt;-2&lt;/code&gt; for the second to last, etc.)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$json&lt;/span&gt; | jq &lt;span class="s1"&gt;'.[0]'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can also transform the data and for example just get the firstname and the city:&lt;br&gt;
The &lt;code&gt;|&lt;/code&gt; operator in jq feeds the output of one filter into the input of another.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$json&lt;/span&gt; | jq &lt;span class="s1"&gt;'.[0] | { firstname: .name, city: .address.city }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To act on all entries in the array, just omit the index:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$json&lt;/span&gt; | jq &lt;span class="s1"&gt;'.[] | { firstname: .name, city: .address.city }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To put the result into an array, instead of using independent objects, just wrap the whole query in brackets:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$json&lt;/span&gt; | jq &lt;span class="s1"&gt;'[.[] | { firstname: .name, city: .address.city }]'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Besides filtering and modifying json, there are &lt;a href="https://stedolan.github.io/jq/manual/#Builtinoperatorsandfunctions"&gt;numerous built-in functions and operators&lt;/a&gt; one can use. &lt;br&gt;
In our example we need to select data records which have a certain property.&lt;/p&gt;

&lt;p&gt;The first function we can use is &lt;code&gt;select(boolean_expression)&lt;/code&gt;, which can filter lists:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$json&lt;/span&gt; | jq &lt;span class="s1"&gt;'.[] | select(.age &amp;gt; 26)'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The second one is &lt;code&gt;has(key)&lt;/code&gt;, which checks for the presence of a property in a json object:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$json&lt;/span&gt; | jq &lt;span class="s1"&gt;'.[1] | has("address")'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To get all objects in the array, which have an &lt;code&gt;address&lt;/code&gt;, we can combine those filters:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="nv"&gt;$json&lt;/span&gt; | jq &lt;span class="s1"&gt;'.[] | select(has("address"))'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Those are the basics of &lt;code&gt;jq&lt;/code&gt; we need to filter all data records which have a certain property.&lt;/p&gt;

&lt;h2&gt;
  
  
  Read and filter records from Kinesis
&lt;/h2&gt;

&lt;p&gt;To filter the Kinesis data records by the presence of a property in the payload, the payload must be decoded first.&lt;br&gt;
Then we can filter it with &lt;code&gt;jq&lt;/code&gt; as described above.&lt;br&gt;
Afterwards we need to use the next shard iterator to fetch the next batch. &lt;br&gt;
To not call Kinesis twice, I cache the batch in a temporary file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;kinesisBatch&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;mktmp&lt;span class="si"&gt;)&lt;/span&gt;
aws kinesis get-records &lt;span class="nt"&gt;--sharditerator&lt;/span&gt; &lt;span class="nv"&gt;$sharditerator&lt;/span&gt; &lt;span class="nt"&gt;--limit&lt;/span&gt; 50 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;$kinesisBatch&lt;/span&gt;
&lt;span class="nb"&gt;cat&lt;/span&gt; &lt;span class="nv"&gt;$kinesisBatch&lt;/span&gt; | jq &lt;span class="s1"&gt;'.Records[].Data'&lt;/span&gt; | &lt;span class="nb"&gt;base64&lt;/span&gt; &lt;span class="nt"&gt;-d&lt;/span&gt; | jq &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="s1"&gt;'.[] | select(.metadata | has("parentId"))'&lt;/span&gt;
&lt;span class="nv"&gt;sharditerator&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;cat&lt;/span&gt; &lt;span class="nv"&gt;$kinesisBatch&lt;/span&gt; | jq &lt;span class="s1"&gt;'.NextShardIterator'&lt;/span&gt;&lt;span class="si"&gt;)&lt;/span&gt;
aws kinesis get-records &lt;span class="nt"&gt;--sharditerator&lt;/span&gt; &lt;span class="nv"&gt;$sharditerator&lt;/span&gt; &lt;span class="nt"&gt;--limit&lt;/span&gt; 50 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;$kinesisBatch&lt;/span&gt;
&lt;span class="c"&gt;# etc.&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now we need to repeat this until a break condition is reached.&lt;br&gt;
This can be a timestamp or a SequenceNumber.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;stream-name
&lt;span class="nv"&gt;fromTime&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;date&lt;/span&gt; &lt;span class="nt"&gt;--date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'2023-03-13 12:00:00'&lt;/span&gt; +%s&lt;span class="si"&gt;)&lt;/span&gt;
&lt;span class="nv"&gt;timeOfLastEntry&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;date&lt;/span&gt; &lt;span class="nt"&gt;--date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'2023-03-13 12:00:00'&lt;/span&gt; +%s&lt;span class="si"&gt;)&lt;/span&gt;
&lt;span class="nv"&gt;toTime&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;date&lt;/span&gt; &lt;span class="nt"&gt;--date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'2023-03-15 15:00:00'&lt;/span&gt; +%s&lt;span class="si"&gt;)&lt;/span&gt;
&lt;span class="nv"&gt;sharditerator&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;aws kinesis get-shard-iterator &lt;span class="nt"&gt;--stream-name&lt;/span&gt; &lt;span class="nv"&gt;$stream&lt;/span&gt; &lt;span class="nt"&gt;--shard-id&lt;/span&gt; 0 &lt;span class="nt"&gt;--shard-iterator-type&lt;/span&gt; AT_TIMESTAMP &lt;span class="nt"&gt;--timestamp&lt;/span&gt; &lt;span class="nv"&gt;$timestamp&lt;/span&gt; | jq .ShardIterator&lt;span class="si"&gt;)&lt;/span&gt;
&lt;span class="nv"&gt;kinesisBatch&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;mktmp&lt;span class="si"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt; &lt;span class="nv"&gt;$toTime&lt;/span&gt; &lt;span class="nt"&gt;-ge&lt;/span&gt; &lt;span class="nv"&gt;$timeOfLastEntry&lt;/span&gt; &lt;span class="o"&gt;]&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;do
    &lt;/span&gt;aws kinesis get-records &lt;span class="nt"&gt;--shard-iterator&lt;/span&gt; &lt;span class="nv"&gt;$sharditerator&lt;/span&gt; &lt;span class="nt"&gt;--limit&lt;/span&gt; 50 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;$kinesisBatch&lt;/span&gt;
    jq &lt;span class="s1"&gt;'.Records[].Data'&lt;/span&gt; &lt;span class="nv"&gt;$kinesisBatch&lt;/span&gt; | &lt;span class="nb"&gt;base64&lt;/span&gt; &lt;span class="nt"&gt;-d&lt;/span&gt; | jq &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="s1"&gt;'.[] | select(.metdata | has("parentId"))'&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; foundEntries.json
    &lt;span class="nv"&gt;lastArrival&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;jq &lt;span class="s1"&gt;'.Records[-1].ApproximateArrivalTimestamp'&lt;/span&gt; &lt;span class="nv"&gt;$kinesisBatch&lt;/span&gt; | &lt;span class="nb"&gt;tr&lt;/span&gt; &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'"'&lt;/span&gt;&lt;span class="si"&gt;)&lt;/span&gt;
    &lt;span class="nv"&gt;timeOfLastEntry&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;date&lt;/span&gt; &lt;span class="nt"&gt;--date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nv"&gt;$lastArrival&lt;/span&gt; +%s&lt;span class="si"&gt;)&lt;/span&gt;
    &lt;span class="nv"&gt;sharditerator&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="si"&gt;$(&lt;/span&gt;jq &lt;span class="s1"&gt;'.NextShardIterator'&lt;/span&gt; &lt;span class="nv"&gt;$kinesisBatch&lt;/span&gt;&lt;span class="si"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Wrap up
&lt;/h2&gt;

&lt;p&gt;Even though Kinesis is storing the data persistently, it is not a database. It stores all records in sequence and there is no way to query the data in Kinesis itself. If you want to act on only a subset of the records, you need consume all records and filter them locally. In this blogppost, I outlined how this can be done using the aws cli, in conjunction with &lt;code&gt;jq&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The above snippet can be put in a file an be called from the console. The next step would be, to add parameters, so it can be used more interactively, for example&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;--stream-name stream-name&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;--starting-timestamp yyyy-mm-dd&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;--end-timestamp yyyy-mm-dd&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;--jq-filter-on-data 'select(.metdata | has("parentId"))'&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>aws</category>
      <category>cli</category>
    </item>
  </channel>
</rss>
