<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Hans-Peter Grahsl</title>
    <description>The latest articles on DEV Community by Hans-Peter Grahsl (@hpgrahsl).</description>
    <link>https://dev.to/hpgrahsl</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F291291%2F4d2f8bec-0f20-4119-a2ff-605a144a4d67.jpg</url>
      <title>DEV Community: Hans-Peter Grahsl</title>
      <link>https://dev.to/hpgrahsl</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/hpgrahsl"/>
    <language>en</language>
    <item>
      <title>Flink CDC from the Trenches: Handling JSON in Pipelines with UDFs</title>
      <dc:creator>Hans-Peter Grahsl</dc:creator>
      <pubDate>Tue, 16 Dec 2025 13:04:41 +0000</pubDate>
      <link>https://dev.to/hpgrahsl/flink-cdc-from-the-trenches-handling-json-in-pipelines-with-udfs-p4d</link>
      <guid>https://dev.to/hpgrahsl/flink-cdc-from-the-trenches-handling-json-in-pipelines-with-udfs-p4d</guid>
      <description>&lt;h1&gt;
  
  
  Introduction
&lt;/h1&gt;

&lt;p&gt;&lt;em&gt;Recently, I &lt;a href="https://www.linkedin.com/posts/hpgrahsl_fromabrjson-apacheflink-json-activity-7401724967916044289-7fcH" rel="noopener noreferrer"&gt;shared&lt;/a&gt; a custom function (&lt;a href="https://github.com/hpgrahsl/flink-json-udfs" rel="noopener noreferrer"&gt;FROM_JSON&lt;/a&gt;) for Apache Flink to ease the handling of complex JSON structures in situations where a deeper integration into Flink's type system is not only beneficial but required. While it's a helpful addition for data engineers, who are primarily building streaming workloads directly with Flink SQL or on top of Flink's Table API, it didn't take long before I received a few questions around the applicability of this very UDF as part of end-to-end data flows that are built with &lt;a href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/" rel="noopener noreferrer"&gt;Flink CDC&lt;/a&gt; &lt;a href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/docs/connectors/pipeline-connectors/overview/" rel="noopener noreferrer"&gt;pipeline connectors&lt;/a&gt;.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;In this article, I share selected learnings and condensed findings I discovered while exploring the most relevant parts of Flink CDC’s &lt;a href="https://github.com/apache/flink-cdc" rel="noopener noreferrer"&gt;codebase&lt;/a&gt; to figure out what it takes to implement a &lt;code&gt;FROM_JSON&lt;/code&gt; UDF equivalent for Flink CDC pipelines.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;My journey involved navigating some interesting aspects and challenges related to differences in the involved type systems, the lack of specific capabilities for user-defined functions in Flink CDC, and some subtleties during the handling of complex, nested data structures, as well as specific binary runtime representations for objects of certain data types.&lt;/p&gt;

&lt;h1&gt;
  
  
  Motivation
&lt;/h1&gt;

&lt;p&gt;The primary reason why I decided to spend some non-negligible amount of time with Flink CDC lately was triggered by the fact that JSON data is currently not treated as a first-class citizen in Flink CDC pipelines. The following concrete example illustrates what I mean by that. Let's assume we have a Postgres table that contains a few  columns, three of which store JSON/JSONB:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;postgres=# \d+ public.gov_fake_citizens
                                         Table "public.gov_fake_citizens"
     Column      |  Type   | Collation | Nullable | Default | Storage  | Compression | Stats target | Description
-----------------+---------+-----------+----------+---------+----------+-------------+--------------+-------------
 id              | uuid    |           | not null |         | plain    |             |              |
 personal        | json    |           | not null |         | extended |             |              |
 isactive        | boolean |           | not null | false   | plain    |             |              |
 registered      | text    |           | not null |         | extended |             |              |
 contact         | json    |           | not null |         | extended |             |              |
 knownresidences | json    |           | not null |         | extended |             |              |
Indexes:
    "gov_fake_citizens_pkey" PRIMARY KEY, btree (id)
Access method: heap
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here is how one row in this table might look:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;                  id                  |                                                     personal                                                     | isactive |          registered           |                             contact                             |                                        knownresidences                                         
--------------------------------------+------------------------------------------------------------------------------------------------------------------+----------+-------------------------------+-----------------------------------------------------------------+------------------------------------------------------------------------------------------------
 11e66db7-6834-1efe-498c-b7c9d661fb5b | {"firstname":"Emile","lastname":"Sanford","age":28,"eyecolor":"gray","gender":"female","height":163,"weight":51} | f        | 2025-10-17T07:30:49.067+00:00 | {"email":"morgan.shanahan@gmail.com","phone":"+1 305-699-4846"} | ["4361 Prohaska Common, Norikofort, NH 56814","7616 Krajcik Crest, Port Emilioside, ID 36016"]
(1 row)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To run a Flink CDC pipeline that ships data from Postgres to Elasticsearch, the configuration is straightforward. The results, however, are most likely not what the typical friendly data engineer from next door would expect, let alone like to achieve. If we query the Elasticsearch index, the Postgres table row shown above comes back in this way:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"11e66db7-6834-1efe-498c-b7c9d661fb5b"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"personal"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"{&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;firstname&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;Emile&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;lastname&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;Sanford&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;age&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:28,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;eyecolor&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;gray&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;gender&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;female&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;height&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:163,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;weight&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:51}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"contact"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"{&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;email&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;morgan.shanahan@gmail.com&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;phone&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;+1 305-699-4846&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"knownresidences"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"[&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;4361 Prohaska Common, Norikofort, NH 56814&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;7616 Krajcik Crest, Port Emilioside, ID 36016&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;]"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"isactive"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"registered"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2025-10-17T07:30:49.067+00:00"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The reason behind this is that any form of JSON data gets stringified  by pipeline source connectors like the Postgres or MySQL ones. From the moment this happens, pipelines just propagate flat strings all the way to the sink. If this is the only result you'd ever need from your Flink CDC pipelines, consider yourself lucky, stop reading, and keep leading an undistracted and happy life.&lt;/p&gt;

&lt;p&gt;Chances are, you'd want this to look different and if so, read on to learn more about what needs to happen to end up with documents in your search index that look more like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"11e66db7-6834-1efe-498c-b7c9d661fb5b"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"personal"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"firstname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Emile"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"gender"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"female"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"weight"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;51&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"eyecolor"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"gray"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"age"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;28&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"lastname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Sanford"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"height"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;163&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"contact"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"phone"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"+1 305-699-4846"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"email"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"morgan.shanahan@gmail.com"&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"knownresidences"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"4361 Prohaska Common, Norikofort, NH 56814"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="s2"&gt;"7616 Krajcik Crest, Port Emilioside, ID 36016"&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"isactive"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"registered"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"2025-10-17T07:30:49.067+00:00"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Same, Same But Different
&lt;/h1&gt;

&lt;p&gt;A fundamental aspect to understand when moving from Flink SQL/Table API to Flink CDC context is that while their underlying type systems appear similar on the surface, they are still different from one another. As a consequence, several nuances start to matter, especially when dealing with complex structures and nested data types at runtime, but more on that later.&lt;/p&gt;

&lt;h2&gt;
  
  
  Schema Definitions
&lt;/h2&gt;

&lt;p&gt;The good news first. Pure schema definitions expressed as string literals are basically interchangeable because both type systems heavily draw from SQL types. We can express the usual primitive suspects and complex types similarly. For instance, the following type definitions are valid in both type systems:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;
&lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="nb"&gt;DECIMAL&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;hobbies&lt;/span&gt; &lt;span class="n"&gt;ARRAY&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;64&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="n"&gt;scores&lt;/span&gt; &lt;span class="k"&gt;MAP&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;user&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;age&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;is_active&lt;/span&gt; &lt;span class="nb"&gt;BOOLEAN&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In other words, the high similarity between the two type systems is primarily conceptual and structural. On an implementation level, however, both have their separate Java type equivalents.&lt;/p&gt;

&lt;p&gt;Flink Table API defines its types based on classes from:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;org.apache.flink.table.types.*&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;org.apache.flink.table.data.*&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Flink CDC defines its types based on classes from:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;org.apache.flink.cdc.common.types.*&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;org.apache.flink.cdc.common.data.*&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Schema Parsing
&lt;/h2&gt;

&lt;p&gt;If we were to parse any such schema string literals shown in the above examples into proper data types, the parsing logic would be largely the same. Yet the different Java type systems behind both worlds suggest separate implementations to either express an &lt;code&gt;org.apache.flink.table.types.DataType&lt;/code&gt; (Flink Table API) or an &lt;code&gt;org.apache.flink.cdc.common.types.DataType&lt;/code&gt; (Flink CDC). Of course, another option would be to parse schema definitions into one of the two type systems only, and complement this with a uni- or bidirectional type converter depending on our needs.&lt;/p&gt;

&lt;h3&gt;
  
  
  Task 1
&lt;/h3&gt;

&lt;p&gt;Working towards my Flink CDC variant of the &lt;code&gt;FROM_JSON&lt;/code&gt; UDF, the initial task was to parse the schema string literal, this time, however, into a &lt;code&gt;DataType&lt;/code&gt; for Flink CDC rather than the Flink Table API. By nature, the result is almost identical to the minimum viable schema parser I already had, except it had to respect the different Java types backing Flink CDC’s type system. The first line in the following test method, taken from one of the unit tests, hints at how this piece of code is going to be used:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Test&lt;/span&gt;
&lt;span class="nd"&gt;@DisplayName&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Parse ROW schema with nested ARRAY"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;testRowTypeSchemaWithNestedArray&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nc"&gt;DataType&lt;/span&gt; &lt;span class="n"&gt;dataType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SchemaParserCdc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;parseType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"ROW&amp;lt;id INT, name STRING, hobbies ARRAY&amp;lt;STRING&amp;gt;&amp;gt;"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="c1"&gt;// Verify it's a RowType&lt;/span&gt;
    &lt;span class="n"&gt;assertTrue&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataType&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nc"&gt;RowType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;RowType&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RowType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;dataType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="c1"&gt;// Verify field count&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFieldCount&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

    &lt;span class="c1"&gt;// Verify field names&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFieldNames&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFieldNames&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"hobbies"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFieldNames&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

    &lt;span class="c1"&gt;// Verify field types&lt;/span&gt;
    &lt;span class="n"&gt;assertTrue&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFields&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;getType&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nc"&gt;IntType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;assertTrue&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFields&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;getType&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nc"&gt;VarCharType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;assertTrue&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFields&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;getType&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nc"&gt;ArrayType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="c1"&gt;// Verify array element type&lt;/span&gt;
    &lt;span class="nc"&gt;ArrayType&lt;/span&gt; &lt;span class="n"&gt;hobbiesArray&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ArrayType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFields&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;getType&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;assertTrue&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;hobbiesArray&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getElementType&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nc"&gt;VarCharType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the schema parsing out of the way, it’s time to shift our focus to the implementation of the user-defined function itself, which will make use of this schema parser to infer the Flink CDC data type according to any custom target schema.&lt;/p&gt;

&lt;h1&gt;
  
  
  UDFs that aren’t the UDFs You Know
&lt;/h1&gt;

&lt;p&gt;Having written several basic and advanced UDFs for Flink Table API/SQL as well as other data processing systems such as ksqlDB—yeah I know—and Apache Spark in the past, I initially expected Flink CDC UDFs to offer similar capabilities and the same flexibility. Turns out I was in for a little surprise.&lt;/p&gt;

&lt;h2&gt;
  
  
  Functions in Flink Table API/SQL
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;extensive library of &lt;a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/" rel="noopener noreferrer"&gt;built-in&lt;/a&gt; system functions&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/" rel="noopener noreferrer"&gt;user-defined functions&lt;/a&gt; across multiple categories (scalar, table, aggregate, table aggregate, and process table function)&lt;/li&gt;
&lt;li&gt;async processing for scalar and table functions&lt;/li&gt;
&lt;li&gt;context access in the &lt;code&gt;open(...)&lt;/code&gt; lifecycle method&lt;/li&gt;
&lt;li&gt;automatic, annotation-based, and custom programmatic type inference, including call context accessibility&lt;/li&gt;
&lt;li&gt;comprehensive docs&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In short, very mature, powerful, and feature-rich, allowing for various capabilities ranging from the most basic to very advanced ones.&lt;/p&gt;

&lt;h2&gt;
  
  
  Functions in Flink CDC
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;relatively small library of &lt;a href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/docs/core-concept/transform/#functions" rel="noopener noreferrer"&gt;built-in&lt;/a&gt; system functions&lt;/li&gt;
&lt;li&gt;focused on a single function category, namely scalar functions
context access in the &lt;code&gt;open(...)&lt;/code&gt; lifecycle method but with caveats—more on that follows :) &lt;/li&gt;
&lt;li&gt;interoperability with Table API/SQL scalar functions, with notable limitations such as no lifecycle methods or no type hint/type inference compatibility&lt;/li&gt;
&lt;li&gt;automatic or programmatic type inference, however, without call context accessibility or other similar advanced aspects&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/docs/core-concept/transform/#user-defined-functions" rel="noopener noreferrer"&gt;full UDF docs&lt;/a&gt; essentially fit on a single screen, which, by the way, is a great opportunity to contribute to this rather promising project!&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The bottom line here is that, as it stands today, the UDF support in Flink CDC is, well, lean and seems to primarily focus on simpler use cases than what I had in mind here with this &lt;code&gt;FROM_JSON&lt;/code&gt; UDF. Any custom scalar functions we’d write could be either applied in projection and/or filter definitions as part of the pipeline’s respective transform block:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;transform&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;source-table&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;mydb.mytable&lt;/span&gt;
    &lt;span class="na"&gt;projection&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;*,&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;MY_UDF_A(some_col_1)"&lt;/span&gt;
    &lt;span class="na"&gt;filter&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;MY_UDF_B(some_col_2) = &lt;/span&gt;&lt;span class="m"&gt;123&lt;/span&gt;

&lt;span class="na"&gt;pipeline&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;user-defined-function&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;MY_UDF_A&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.MyDemoUdfA&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;MY_UDF_B&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.MyDemoUdfB&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The limitations mentioned above create an interesting challenge, though. How or where would we specify the schema definition for a &lt;code&gt;FROM_JSON&lt;/code&gt; function call, so that the Flink CDC runtime can properly infer the actual return type when applying this function?&lt;/p&gt;

&lt;p&gt;For Flink Table API/SQL functions, this is straightforward, as we can state the schema string literal in a second call parameter and derive the necessary type information directly from that.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;FROM_JSON&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;my_json_col&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'ARRAY&amp;lt;STRING&amp;gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;parsed_json&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Unfortunately, it seems there is no such equivalent that would allow type inference based on the UDF's call context in Flink CDC land, or at least I’m not aware of it. My next idea was to specify the required target schema using some function configuration parameters, which can be accessed in the UDF’s &lt;code&gt;open(...)&lt;/code&gt; lifecycle method—far too easy, right? Trying to do so, I figured out that while some classes related to Flink CDC UDFs are prepared for configuration parameters, others are not. Therefore, I had to dig deeper to identify the gaps across about a handful of classes in the current implementation. As a result, I opened &lt;a href="https://issues.apache.org/jira/browse/FLINK-38792" rel="noopener noreferrer"&gt;FLINK-38792&lt;/a&gt; based on these findings.&lt;/p&gt;

&lt;h3&gt;
  
  
  Task 2
&lt;/h3&gt;

&lt;p&gt;Since I needed to make progress, I decided to move forward by patching my own fork of Flink CDC &lt;code&gt;3.6-SNAPSHOT&lt;/code&gt; to get proper UDF configuration parameter support. With this in place, I can now do the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;pipeline&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;user-defined-function&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;MY_DEMO_UDF&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.MyDemoUdf&lt;/span&gt;
      &lt;span class="na"&gt;some.config.param&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;foo&lt;/span&gt;
      &lt;span class="na"&gt;another.setting&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;123&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and can define an arbitrary configuration for my custom functions as part of the pipeline definition. Call me biased, but I definitely feel this is an improvement and probably also the originally targeted behaviour. Thus, I hope that the upstream project will eventually decide to pick up a PR I’m planning to contribute.&lt;/p&gt;

&lt;p&gt;Coming back to our original context, I can define the &lt;code&gt;FROM_JSON&lt;/code&gt; UDF like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;pipeline&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;user-defined-function&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;FROM_JSON&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf&lt;/span&gt;
      &lt;span class="na"&gt;from.json.schema&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ARRAY&amp;lt;STRING&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this particular example configuration, the UDF is instructed to parse JSON arrays containing string elements, which themselves originate from a database column captured by a Flink CDC pipeline connector, for instance, the one for Postgres. The configuration parameter &lt;code&gt;from.json.schema&lt;/code&gt; is read in the function’s &lt;code&gt;open(...)&lt;/code&gt; lifecycle method and stored in an attribute.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Override&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;open&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;UserDefinedFunctionContext&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;configuration&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;configuredSchema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;configuration&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;SCHEMA_CONFIG&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;configuredSchema&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;configuredSchema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;trim&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;IllegalArgumentException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="s"&gt;"Schema configuration is required. Set '"&lt;/span&gt;
                &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="no"&gt;SCHEMA_CONFIG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
                &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;"' in the UDF configuration of your pipeline YAML."&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="c1"&gt;//...&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This way, the schema information becomes accessible in the &lt;code&gt;getReturnType()&lt;/code&gt; method, which uses the schema parser I previously wrote to create the corresponding &lt;code&gt;DataType&lt;/code&gt;. The Flink CDC runtime can thus infer the proper &lt;code&gt;DataType&lt;/code&gt; to expect whenever it needs to evaluate the UDF in question.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Override&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;DataType&lt;/span&gt; &lt;span class="nf"&gt;getReturnType&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;configuredSchema&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;configuredSchema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;trim&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;SchemaParserCdc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;parseType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;configuredSchema&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;IllegalArgumentException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="s"&gt;"Schema configuration is required. Set '"&lt;/span&gt;
                &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="no"&gt;SCHEMA_CONFIG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
                &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;"' in the UDF configuration (e.g., in pipeline YAML under 'config')."&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Everything up to this point can be considered as foundational preparations before we can eventually come to the meat of it—converting the actual JSON data.&lt;/p&gt;

&lt;h1&gt;
  
  
  From JSON to Typed Values for Flink CDC
&lt;/h1&gt;

&lt;p&gt;Walking the JSON structure and converting the input data into properly typed values matching the Flink CDC type system along the way doesn’t sound too complicated, or does it? Well, based on my initial experiments, it turned out not to be a walk in the park and here is why.&lt;/p&gt;

&lt;h2&gt;
  
  
  RecordData
&lt;/h2&gt;

&lt;p&gt;RecordData is a vital interface as it describes how table rows in Flink CDC are represented by a complex type, the &lt;code&gt;RowType&lt;/code&gt;. It also defines how each SQL data type is meant to be mapped to internal data structures. These internal data structures are themselves based on either standard Java types (for primitives) or Flink CDC-specific Java types defined by additional custom interfaces or classes such as &lt;code&gt;StringData&lt;/code&gt;, &lt;code&gt;DecimalData&lt;/code&gt; and others. The table below is taken directly from the Java docs and contains the full picture:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt; * +--------------------------------+-----------------------------------------+
 * | SQL Data Types                 | Internal Data Structures                |
 * +--------------------------------+-----------------------------------------+
 * | BOOLEAN                        | boolean                                 |
 * +--------------------------------+-----------------------------------------+
 * | CHAR / VARCHAR / STRING        | {@link StringData}                      |
 * +--------------------------------+-----------------------------------------+
 * | BINARY / VARBINARY / BYTES     | byte[]                                  |
 * +--------------------------------+-----------------------------------------+
 * | DECIMAL                        | {@link DecimalData}                     |
 * +--------------------------------+-----------------------------------------+
 * | TINYINT                        | byte                                    |
 * +--------------------------------+-----------------------------------------+
 * | SMALLINT                       | short                                   |
 * +--------------------------------+-----------------------------------------+
 * | INT                            | int                                     |
 * +--------------------------------+-----------------------------------------+
 * | BIGINT                         | long                                    |
 * +--------------------------------+-----------------------------------------+
 * | FLOAT                          | float                                   |
 * +--------------------------------+-----------------------------------------+
 * | DOUBLE                         | double                                  |
 * +--------------------------------+-----------------------------------------+
 * | DATE                           | int (number of days since epoch)        |
 * +--------------------------------+-----------------------------------------+
 * | TIME                           | int (number of milliseconds of the day) |
 * +--------------------------------+-----------------------------------------+
 * | TIMESTAMP                      | {@link TimestampData}                   |
 * +--------------------------------+-----------------------------------------+
 * | TIMESTAMP WITH LOCAL TIME ZONE | {@link LocalZonedTimestampData}         |
 * +--------------------------------+-----------------------------------------+
 * | TIMESTAMP WITH TIME ZONE       | {@link ZonedTimestampData}              |
 * +--------------------------------+-----------------------------------------+
 * | ROW                            | {@link RecordData}                      |
 * +--------------------------------+-----------------------------------------+
 * | ARRAY                          | {@link ArrayData}                       |
 * +--------------------------------+-----------------------------------------+
 * | MAP                            | {@link MapData}                         |
 * +--------------------------------+-----------------------------------------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The main takeaway for successfully converting JSON with a UDF is as follows: for all data types that directly map to a standard Java type, we only need to read the JSON and can extract the corresponding &lt;code&gt;JsonNode&lt;/code&gt; values as is without any additional considerations or explicit conversions on top.&lt;/p&gt;

&lt;h2&gt;
  
  
  Constructed Data Type Caveats
&lt;/h2&gt;

&lt;p&gt;Based on my early-stage experiments while implementing the &lt;code&gt;FROM_JSON&lt;/code&gt; UDF, it’s helpful to take a closer look at the three constructed data types.&lt;/p&gt;

&lt;h3&gt;
  
  
  ArrayData
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;ArrayData&lt;/code&gt; type is relevant whenever we hit any JSON array node, the corresponding schema literal for which is &lt;code&gt;ARRAY&amp;lt;element_type&amp;gt;&lt;/code&gt;. &lt;code&gt;GenericArrayData&lt;/code&gt; is an implementation that allows for the convenient construction of objects based on regular Java arrays.&lt;/p&gt;

&lt;h3&gt;
  
  
  MapData
&lt;/h3&gt;

&lt;p&gt;Any JSON object for which all field values are of the same type can be converted to the &lt;code&gt;MapData&lt;/code&gt; type. The schema literal to express this is &lt;code&gt;MAP&amp;lt;key_type,value_type&amp;gt;&lt;/code&gt;. &lt;code&gt;GenericMapData&lt;/code&gt; implements this interface and can be used for object construction based on regular Java maps. This implementation internally uses two &lt;code&gt;GenericArrayData&lt;/code&gt; objects to represent the map’s keys and values.&lt;/p&gt;

&lt;p&gt;For both &lt;code&gt;ArrayData&lt;/code&gt; and &lt;code&gt;MapData&lt;/code&gt;, it’s important to know that all array elements or map entries must themselves be valid internal data structures. What this means is that any array element or map entry that must be represented by means of a special Java type (see table above) must be explicitly converted before constructing &lt;code&gt;GenericArrayData&lt;/code&gt; or &lt;code&gt;GenericMapData&lt;/code&gt; objects. This is because their constructors use the data as is without performing type checks let alone do (deep) auto-conversions. Hence, it’s our own responsibility to ensure the constructor only ever receives objects of internal data structure types and potentially convert them upfront.&lt;/p&gt;

&lt;h4&gt;
  
  
  Task 3
&lt;/h4&gt;

&lt;p&gt;Consequently, the &lt;code&gt;FROM_JSON&lt;/code&gt; UDF must respect this at all times, as otherwise, we’d face runtime errors and Flink CDC will almost certainly complain with a &lt;code&gt;ClassCastException&lt;/code&gt; whenever this constraint gets violated. To give one concrete example here, if we were to mistakenly build a &lt;code&gt;GenericArrayData&lt;/code&gt; object based on a regular Java &lt;code&gt;String[]&lt;/code&gt; array rather than a &lt;code&gt;StringData[]&lt;/code&gt; array, the following would happen when the Flink CDC runtime tries to access and process its elements:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;…
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.flink.cdc.common.data.StringData (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.flink.cdc.common.data.StringData is in unnamed module of loader 'app')
        at org.apache.flink.cdc.common.data.GenericArrayData.getString(GenericArrayData.java:221)
        at org.apache.flink.cdc.common.data.ArrayData.lambda$createElementGetter$3e58363e$1(ArrayData.java:228)
…
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That said, the code for parsing a JSON array node and creating the &lt;code&gt;GenericArrayData&lt;/code&gt; object should look something more like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;ArrayData&lt;/span&gt; &lt;span class="nf"&gt;parseArray&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;JsonNode&lt;/span&gt; &lt;span class="n"&gt;jsonArray&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;schemaString&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;DataType&lt;/span&gt; &lt;span class="n"&gt;arrayDataType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;getCachedSchema&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schemaString&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(!(&lt;/span&gt;&lt;span class="n"&gt;arrayDataType&lt;/span&gt; &lt;span class="k"&gt;instanceof&lt;/span&gt; &lt;span class="nc"&gt;ArrayType&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;IllegalArgumentException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="s"&gt;"specified schemaString must be of type ARRAY - got: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;schemaString&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nc"&gt;ArrayType&lt;/span&gt; &lt;span class="n"&gt;arrayType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ArrayType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;arrayDataType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;DataType&lt;/span&gt; &lt;span class="n"&gt;elementType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;arrayType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getElementType&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jsonArray&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;size&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;GenericArrayData&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Object&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;]);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nc"&gt;Object&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;elements&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Object&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;jsonArray&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;size&lt;/span&gt;&lt;span class="o"&gt;()];&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;jsonArray&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;size&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;elements&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;convertJsonNodeToInternalType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jsonArray&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="n"&gt;elementType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;GenericArrayData&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;elements&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice the method call to &lt;code&gt;convertJsonNodeToInternalType(...)&lt;/code&gt; inside the loop, which guarantees each JSON array element is converted into the proper Flink CDC internal data structure before the resulting &lt;code&gt;GenericArrayData&lt;/code&gt; is created and returned. The conversion of map entries to &lt;code&gt;GenericMapData&lt;/code&gt; is done in a similar fashion. Besides, this approach ensures that nested arrays (e.g. &lt;code&gt;ARRAY&amp;lt;ARRAY&amp;lt;...&amp;gt;&amp;gt;&lt;/code&gt;), nested maps (e.g.  &lt;code&gt;MAP&amp;lt;STRING,MAP&amp;lt;STRING,INT&amp;gt;&amp;gt;&lt;/code&gt;), or a mix thereof can be implicitly handled as well by means of recursion.&lt;/p&gt;

&lt;p&gt;With map and array support in place, let’s focus on the next aspect, namely, how to handle arbitrary JSON objects. Out of habit, I wrongly assumed there must be a similar type duo in place - something like &lt;code&gt;RowData&lt;/code&gt; and &lt;code&gt;GenericRowData&lt;/code&gt; - however, I was obviously mixing things up with Flink’s Table API, again. Over here in Flink CDC land, we are instead revisiting RecordData.&lt;/p&gt;

&lt;h3&gt;
  
  
  RecordData
&lt;/h3&gt;

&lt;p&gt;Whenever the &lt;code&gt;FROM_JSON&lt;/code&gt; UDF is about to parse arbitrary JSON objects, be it on top-level or somewhere nested, it’s supposed to build a &lt;code&gt;RecordData&lt;/code&gt; object matching the specified target schema that is defined with a corresponding &lt;code&gt;ROW&lt;/code&gt; type, for instance, &lt;code&gt;ROW&amp;lt;id INT, name STRING, age INT, is_active BOOLEAN&amp;gt;&lt;/code&gt;. The relevant implementation of the &lt;code&gt;RecordData&lt;/code&gt; interface is &lt;code&gt;BinaryRecordData&lt;/code&gt;, which, for reasons of efficiency and performance, is backed by &lt;code&gt;MemorySegment&lt;/code&gt; under the hood.&lt;/p&gt;

&lt;h4&gt;
  
  
  Task 4
&lt;/h4&gt;

&lt;p&gt;Luckily, and in order for us not having to touch any of the lower-level building blocks in that regard, we can rely on the &lt;code&gt;BinaryRecordDataGenerator&lt;/code&gt;, a convenience class in Flink CDC’s runtime type utils package. Similar to &lt;code&gt;ArrayData&lt;/code&gt; and &lt;code&gt;MapData&lt;/code&gt;, it’s important not to forget that each field in a &lt;code&gt;RecordData&lt;/code&gt; object must be an internal data structure. With this in mind, the implementation to generate any such &lt;code&gt;BinaryRecordData&lt;/code&gt; objects might look as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;RecordData&lt;/span&gt; &lt;span class="nf"&gt;parseObject&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;JsonNode&lt;/span&gt; &lt;span class="n"&gt;jsonObject&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;schemaString&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;DataType&lt;/span&gt; &lt;span class="n"&gt;rowDataType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;getCachedSchema&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schemaString&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;RowType&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RowType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;rowDataType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;nestedFieldNames&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFieldNames&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;arity&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;nestedFieldNames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;size&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="nc"&gt;Object&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;nestedFields&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Object&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;arity&lt;/span&gt;&lt;span class="o"&gt;];&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;arity&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;fieldName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;nestedFieldNames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;JsonNode&lt;/span&gt; &lt;span class="n"&gt;fieldValue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;jsonObject&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fieldName&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;DataType&lt;/span&gt; &lt;span class="n"&gt;fieldType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFields&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;getType&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;nestedFields&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;convertJsonNodeToInternalType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fieldValue&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fieldType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BinaryRecordDataGenerator&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;generate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;nestedFields&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This essentially concludes the most relevant implementation aspects of an equivalent &lt;code&gt;FROM_JSON&lt;/code&gt; UDF that can work directly within Flink CDC. What’s left are a few more minor tweaks.&lt;/p&gt;

&lt;h2&gt;
  
  
  Flink CDC’s built-in &lt;code&gt;DataTypeConverter&lt;/code&gt;
&lt;/h2&gt;

&lt;p&gt;A critically important class that Flink CDC ships to perform type conversions internally is the &lt;code&gt;DataTypeConverter&lt;/code&gt;, which you can find the source code for right &lt;a href="https://github.com/apache/flink-cdc/blob/release-3.5/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java" rel="noopener noreferrer"&gt;here&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Across several methods, we can find multiple larger switch blocks to handle the different data types accordingly. In a few of these places it becomes apparent when reading the code that constructed types, especially the ROW type, aren’t receiving the necessary attention they’d need to enable use cases like the &lt;code&gt;FROM_JSON&lt;/code&gt; UDF. Here are two of the more pressing ones:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;ROW&lt;/code&gt; isn’t supported &lt;a href="https://github.com/apache/flink-cdc/blob/release-3.5/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java#L410-L413" rel="noopener noreferrer"&gt;here&lt;/a&gt;:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;DataType&lt;/span&gt; &lt;span class="nf"&gt;convertCalciteRelDataTypeToDataType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RelDataType&lt;/span&gt; &lt;span class="n"&gt;relDataType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;switch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;relDataType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSqlTypeName&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;//…&lt;/span&gt;
       &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="nl"&gt;ROW:&lt;/span&gt;
        &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;UnsupportedOperationException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="s"&gt;"Unsupported type: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;relDataType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSqlTypeName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;ROW&lt;/code&gt; type is passed through as is &lt;a href="https://github.com/apache/flink-cdc/blob/release-3.5/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java#L454-L455" rel="noopener noreferrer"&gt;here&lt;/a&gt;, which may or may not work depending on how the user code created the Object value in the first place:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Object&lt;/span&gt; &lt;span class="nf"&gt;convert&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Object&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;DataType&lt;/span&gt; &lt;span class="n"&gt;dataType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;switch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTypeRoot&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// …&lt;/span&gt;
       &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="nl"&gt;ROW:&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="c1"&gt;// …&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Besides, it would be beneficial if the methods &lt;code&gt;convertToArray(...)&lt;/code&gt; and &lt;code&gt;convertToMap(...)&lt;/code&gt; detected any non-internal data structures used for array elements or map entries and automatically converted them whenever reasonably possible. This could ultimately free user code from explicitly fulfilling any such framework internal type constraints.&lt;/p&gt;

&lt;h3&gt;
  
  
  Task 5
&lt;/h3&gt;

&lt;p&gt;Since I already patched my Flink CDC fork during the work on Task 2 (see further above), I did the same here and fixed these little limitations in the &lt;code&gt;DataTypeConverter&lt;/code&gt;. And with that, I reached a point where &lt;code&gt;FROM_JSON&lt;/code&gt; could be integration tested for the first time as part of a projection specification in the Flink CDC pipeline definition while processing actual JSON data in-flight.&lt;/p&gt;

&lt;h1&gt;
  
  
  Applying &lt;code&gt;FROM_JSON&lt;/code&gt; in Flink CDC Pipelines
&lt;/h1&gt;

&lt;p&gt;We’ve come a long way, so let’s see an exemplary Flink CDC pipeline definition where we are now able to successfully apply &lt;code&gt;FROM_JSON&lt;/code&gt; in a projection as part of the transform block. Here, the &lt;code&gt;knownresidences&lt;/code&gt; field refers to a JSON column in the Postgres source database table, which contains JSON arrays with string elements.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;source&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
   &lt;span class="c1"&gt;# ...&lt;/span&gt;

&lt;span class="na"&gt;transform&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;source-table&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;public.fake_citizens&lt;/span&gt;
    &lt;span class="na"&gt;projection&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;id,registered,isactive,FROM_JSON(knownresidences) as knownresidences&lt;/span&gt;

&lt;span class="na"&gt;sink&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="c1"&gt;# ...&lt;/span&gt;

&lt;span class="na"&gt;pipeline&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;my-pipeline&lt;/span&gt;
  &lt;span class="na"&gt;parallelism&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
  &lt;span class="na"&gt;user-defined-function&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;FROM_JSON&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf&lt;/span&gt;
      &lt;span class="na"&gt;from.json.schema&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ARRAY&amp;lt;STRING&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Bonus Surprise
&lt;/h2&gt;

&lt;p&gt;The curious and informed reader will almost certainly wonder how to apply this function multiple times within the same pipeline. And this is a great question indeed :)&lt;/p&gt;

&lt;p&gt;Unfortunately, we must assume every use of &lt;code&gt;FROM_JSON&lt;/code&gt; will most likely require its own schema string literal defined in the function configuration. For obvious reasons, this inherently rules out direct re-use of the same function definition. So, how about we introduce multiple "uniquely named references" to &lt;code&gt;FROM_JSON&lt;/code&gt; such that each function definition gets its individual schema string literal configured? The example below tries to do that with two named references:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;pipeline&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;my-pipeline&lt;/span&gt;
  &lt;span class="na"&gt;parallelism&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
  &lt;span class="na"&gt;user-defined-function&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;FROM_JSON_1&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf&lt;/span&gt;
      &lt;span class="na"&gt;from.json.schema&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ARRAY&amp;lt;STRING&amp;gt;&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;FROM_JSON_2&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf&lt;/span&gt;
      &lt;span class="na"&gt;from.json.schema&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;MAP&amp;lt;VARCHAR,INT&amp;gt;&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Well, it turns out the wish was father to the thought as the Flink CDC runtime currently stumbles over this. More specifically, this clashes with the way the Janino compiler tries to process it under the hood and causes the following exception:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// …
Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 76: Redefinition of parameter "__instanceOfFromJsonCdcUdf"
        at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13080)
        at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3791)
// …
        at org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:76)
        ... 37 more
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Pragmatic Workaround
&lt;/h2&gt;

&lt;p&gt;Rather than setting out to directly address the root cause, I decided for a pragmatic, yet more than good enough workaround for my needs here. By pre-defining a certain number of static inner classes (e.g. N=16) inside the original UDF class, the function can be applied several (up to N) times in the same Flink CDC pipeline context, thereby elegantly circumventing the redefinition problem we faced above. Finally, a valid pipeline definition when doing so could look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;source&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
   &lt;span class="c1"&gt;# ...&lt;/span&gt;

&lt;span class="na"&gt;transform&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;source-table&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;public.fake_citizens&lt;/span&gt;
    &lt;span class="na"&gt;projection&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;id,registered,isactive,FROM_JSON_1(knownresidences) as knownresidences,FROM_JSON_2(scores) as scores&lt;/span&gt;

&lt;span class="na"&gt;sink&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="c1"&gt;# ...&lt;/span&gt;

&lt;span class="na"&gt;pipeline&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;my-pipeline&lt;/span&gt;
  &lt;span class="na"&gt;parallelism&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
  &lt;span class="na"&gt;user-defined-function&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;FROM_JSON_1&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf$_C1&lt;/span&gt;
      &lt;span class="na"&gt;from.json.schema&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;ARRAY&amp;lt;STRING&amp;gt;&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;FROM_JSON_2&lt;/span&gt;
      &lt;span class="na"&gt;classpath&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;com.github.hpgrahsl.flink.cdc.udfs.json.FromJsonCdcUdf$_C2&lt;/span&gt;
      &lt;span class="na"&gt;from.json.schema&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;MAP&amp;lt;VARCHAR,INT&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Summary
&lt;/h1&gt;

&lt;p&gt;Building an equivalent &lt;code&gt;FROM_JSON&lt;/code&gt; UDF for Flink CDC pipelines revealed certain implementation challenges stemming from differences between Flink CDC and Flink Table API ecosystems. While both systems use similar SQL-based schema definitions, they rely on entirely separate Java type hierarchies, creating subtle but critical runtime considerations. Another aspect worth highlighting is that Flink CDC's UDF capabilities are currently less mature than the ones found in Flink's Table API/SQL, lacking features like call-context-based type inference. This necessitated an alternate approach using configuration parameters to specify target schemas—functionality that required patching Flink CDC itself (documented in &lt;code&gt;FLINK-38792&lt;/code&gt;). Probably the most critical implementation detail involves respecting Flink CDC's internal data structure requirements: constructed types like &lt;code&gt;GenericArrayData&lt;/code&gt; and &lt;code&gt;GenericMapData&lt;/code&gt; don't perform automatic type conversions, so developers must explicitly convert elements to internal types (&lt;code&gt;StringData&lt;/code&gt;, &lt;code&gt;DecimalData&lt;/code&gt;, etc.) before target object construction. Failing to do so triggers &lt;code&gt;ClassCastExceptions&lt;/code&gt; at runtime. Additional challenges included gaps in the &lt;code&gt;DataTypeConverter&lt;/code&gt; class for &lt;code&gt;ROW&lt;/code&gt; type handling and issues when trying to reuse UDF definitions in Flink CDC's pipeline YAML. The solution for the latter employs static inner classes to enable multiple &lt;code&gt;FROM_JSON&lt;/code&gt; invocations with different schemas in a single pipeline, providing a practical path forward despite current framework limitations.&lt;/p&gt;

&lt;h1&gt;
  
  
  Takeaways
&lt;/h1&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Type system divergence:&lt;/strong&gt; Flink Table API and Flink CDC have conceptually similar but implementation-wise distinct type systems with separate Java class hierarchies&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;UDF limitations:&lt;/strong&gt; Flink CDC's UDF support is lean compared to Table API, lacking advanced features like call context accessibility for type inference&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Configuration gap:&lt;/strong&gt; UDF configuration parameters aren't fully supported in Flink CDC &lt;code&gt;3.6-SNAPSHOT&lt;/code&gt;, requiring custom patches&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Internal data structures matter:&lt;/strong&gt; &lt;code&gt;ArrayData&lt;/code&gt;, &lt;code&gt;MapData&lt;/code&gt;, and &lt;code&gt;RecordData&lt;/code&gt; all require elements to already be converted to internal types (&lt;code&gt;StringData&lt;/code&gt;, &lt;code&gt;DecimalData&lt;/code&gt;, etc.)—no automatic conversion occurs leading to a &lt;code&gt;ClassCastException&lt;/code&gt; pitfall as using standard Java types causes runtime failures when Flink CDC processes the data&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;ROW type handling:&lt;/strong&gt; &lt;code&gt;DataTypeConverter&lt;/code&gt; has gaps in &lt;code&gt;ROW&lt;/code&gt; type support that need addressing to get complex nested structures working properly&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Multiple UDF instances of the same type:&lt;/strong&gt; Static inner classes seem to provide a pragmatic workaround for applying the same UDF multiple times with different configurations in one pipeline&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I learned many things while reading through Flink CDC's codebase, implementing this custom UDF, debugging existing issues, fixing my own bugs, adding unit/integration tests, and writing this article. Hope it's useful for others out there who are trying to extend the processing capabilities of Flink CDC by means of UDFs. At the very least, reading this article should help you save a few precious hours in case you'd hit similar issues.&lt;/p&gt;

&lt;h3&gt;
  
  
  Happy Flink CDCing!
&lt;/h3&gt;

</description>
      <category>apacheflink</category>
      <category>flinkcdc</category>
      <category>udfs</category>
      <category>json</category>
    </item>
    <item>
      <title>A slightly closer look at MongoDB 5.0 time series collections - Part 1</title>
      <dc:creator>Hans-Peter Grahsl</dc:creator>
      <pubDate>Sun, 18 Jul 2021 13:18:11 +0000</pubDate>
      <link>https://dev.to/hpgrahsl/a-slightly-closer-look-at-mongodb-5-0-time-series-collections-part-1-32m6</link>
      <guid>https://dev.to/hpgrahsl/a-slightly-closer-look-at-mongodb-5-0-time-series-collections-part-1-32m6</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fx5meoovgfe9x12ej07kz.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fx5meoovgfe9x12ej07kz.jpg" alt="Banner Image Charts" width="800" height="533"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Setting the Scene
&lt;/h3&gt;

&lt;p&gt;Recently, at &lt;a href="https://www.mongodb.com/live" rel="noopener noreferrer"&gt;MongoDB.live 2021&lt;/a&gt; one of the bigger feature announcements was the fact that MongoDB version 5.0 introduces so-called &lt;a href="https://www.youtube.com/watch?v=OQJHf8xdDRM&amp;amp;t=765s" rel="noopener noreferrer"&gt;time-series collections&lt;/a&gt;. The information about it was primarily high-level and also the current documentation doesn't give away some of the details. This is why I decided to dig just a little bit deeper to improve my personal understanding about what is going on behind the scenes, when storing time-series data with this new collection type in MongoDB.&lt;/p&gt;

&lt;h3&gt;
  
  
  The Past
&lt;/h3&gt;

&lt;p&gt;For several years already, people have been using MongoDB to store their time-series data. Some of them struggled initially and had to learn the hard way, that one doesn’t simply store time-series data as is. The biggest mistake I’ve seen over and over again in the wild was that data hasn’t been stored in an optimized way. What I mean by that is people didn’t invest any further thoughts into proper schema design for their documents, but instead just inserted e.g. raw sensor measurements directly into collections. In almost all cases, doing so eventually led to a lot of storage and processing overhead, unnecessarily large index structures and oftentimes poor performance overall. The way to properly tackle time-series data storage with MongoDB in the past was to apply a schema design trick called the &lt;a href="https://www.mongodb.com/blog/post/building-with-patterns-the-bucket-pattern" rel="noopener noreferrer"&gt;bucket pattern&lt;/a&gt;. The main idea behind this pattern is to store several measurements which logically belong together - e.g. data from one specific sensor over a certain period of time - into a single document which contains a bucket holding multiple of these measurements. Since it’s impractical to indefinitely grow one document and its bucket, the application layer sees to it that it starts a new document based on certain thresholds and rules, which are depending on the granularity of time and the ingestion frequency / interval of sensor data. To give a concrete example, there could be a single document and its bucket, which stores all measurements happening every second for one specific hour of the day. This single document would then contain up to 3600 measurements ingested at a 1 second interval during a particular hour of the day, before a new document would then be created to store all the measurements of the same sensor for the next hour of the day.&lt;/p&gt;

&lt;p&gt;While this approach can work pretty fine, one needs to invest upfront thoughts regarding schema design and in addition, it means a higher burden for developers. They have to implement as well as tweak and tune the bucketing logic for such time-series ingestion scenarios in the application layer. Also when it comes to certain types of queries there is more effort involved when targeting collections that contain documents which are structured according to the bucket pattern. This is because for queries against such collections, the particular bucketing strategy has to be known and considered accordingly.&lt;/p&gt;

&lt;h3&gt;
  
  
  The Present
&lt;/h3&gt;

&lt;p&gt;Fast forward to the MongoDB release 5.0 which now brings “native” support for time-series collections. The promise is, that developers don’t need to agonize over schema design tricks such as the bucket pattern any longer. Instead, they can simply insert and query their time-series data directly, without any further considerations on the application layer. But how does this exactly work and how does it look behind the scenes from a document storage perspective?&lt;/p&gt;

&lt;p&gt;The following explorations are based on raw measurements. The data contains 3 fields and looks as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;03.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nx"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;31096&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;32.53987084180961&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;ts&lt;/strong&gt; represents the timestamp of sensor data&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;metadata&lt;/strong&gt; stores which sensor and type of data we are dealing with&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;value&lt;/strong&gt; holds the actual sensor reading, a windspeed value in this case&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Note that in general, you can have much more complex measurement documents containing more payload fields with varying data types and nested elements, too. It's kept simple here on purpose.&lt;/p&gt;

&lt;h4&gt;
  
  
  Step 1: Creating a time series collection
&lt;/h4&gt;

&lt;p&gt;The command to create this new time series collection type is as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="nx"&gt;db&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;createCollection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windsensors&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;timeseries&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;timeField&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;metaField&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;granularity&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;seconds&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Besides the name, we specify time series related settings. Most importantly and the only obligatory config is to provide the name of the field which holds the &lt;strong&gt;timestamp of measurements&lt;/strong&gt;, &lt;em&gt;"ts"&lt;/em&gt; in this case. The &lt;em&gt;"metaField"&lt;/em&gt; is a descriptive label for the sensor data and the &lt;em&gt;"granularity"&lt;/em&gt; (hours, minutes or seconds = default) defines the expected ingestion interval for the sensor readings in question.&lt;/p&gt;

&lt;h4&gt;
  
  
  Step 2: Insert sample documents
&lt;/h4&gt;

&lt;p&gt;With our empty time series collection in place, let’s ingest the following 10 sample documents, originating from 4 different sensors:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="nx"&gt;db&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;windsensors&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;insertMany&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:02Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;18.263742590570686&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;31096&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:03Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;32.53987084180961&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:03Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;18.106480571706808&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;62088&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:04Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;20.306831899199864&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;31096&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:04Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;0.6909954039798452&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;62088&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:06Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;0.031065898581725086&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;27470&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:07Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;6.878726412679837&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;31096&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:07Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;3.9089926192773534&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:07Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;28.03679268099916&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
   &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:{&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;sensorId&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;type&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="nc"&gt;ISODate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;2021-07-10T00:00:07Z&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;value&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;1.0575968433736358&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;])&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Step 3: Run simple find query against time series collection
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="nx"&gt;db&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;windsensors&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;find&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The result set shows that all 10 documents are returned separately, which might be surprising at first sight, because this pretty much resembles what we would expect from a "normal" collection, i.e. without any kind of time series optimized storage.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;02.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nx"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="nx"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a19&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;18.263742590570686&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;03.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1b&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;18.106480571706808&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a21&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;28.03679268099916&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a22&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;1.0575968433736358&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;03.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;31096&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1a&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;32.53987084180961&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;04.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;31096&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1d&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;0.6909954039798452&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;31096&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a20&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;3.9089926192773534&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;04.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;62088&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1c&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;20.306831899199864&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;06.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;62088&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1e&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;0.031065898581725086&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="na"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;27470&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1f&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;6.878726412679837&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In fact, when we refer to &lt;em&gt;windsensors&lt;/em&gt; in our query, we are working with a logical abstraction which is officially deemed to be a "writable, non-materialized view". We can verify this by inspecting the currently existing views as follows. Running&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="nx"&gt;db&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getCollection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;system.views&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;find&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;shows&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;mytsdemo.windsensors&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nx"&gt;viewOn&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;system.buckets.windsensors&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nx"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;[&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;$_internalUnpackBucket&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
        &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;timeField&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;ts&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;metaField&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;metadata&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;bucketMaxSpanSeconds&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;3600&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
          &lt;span class="na"&gt;exclude&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The view definition informs us, that it is based on a collection called &lt;em&gt;system.buckets.windsensors&lt;/em&gt;. Other than for "normal", user created views, the pipeline field for this special view shows a placeholder called &lt;em&gt;$_internalUnpackBucket&lt;/em&gt; together with the time series related config settings which were used during the creation of the respective collection. Worth noting is the &lt;em&gt;bucketMaxSpanSeconds&lt;/em&gt; field which is &lt;strong&gt;3600&lt;/strong&gt; here. It is a value in seconds and depends on the chosen granularity which was set during creation time. For this example it means that a bucket would span at most 3600 seconds, i.e. 1 hour. The take away from this is that the actual storage optimized time series data can be found in separate, "internal" collection specified in the &lt;em&gt;viewOn&lt;/em&gt; field of the logical view abstraction.&lt;/p&gt;

&lt;h4&gt;
  
  
  Step 4: Run simple find query against the native underlying collection
&lt;/h4&gt;

&lt;p&gt;Even if there usually shouldn’t be a need to directly access the storage optimized version of the time series data, let’s do it anyway to learn what happens behind the scenes. The following query retrieves just one document from this underlying collection:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="nx"&gt;db&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getCollection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;system.buckets.windsensors&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;findOne&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We get back a result set like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60e8e30043c83ccb1994f6d5&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="nx"&gt;control&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;version&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
     &lt;span class="nx"&gt;min&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;00.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;1.0575968433736358&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a19&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;max&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;28.03679268099916&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a22&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="nx"&gt;meta&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="nx"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a19&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1b&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;2&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a21&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;3&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a22&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;18.263742590570686&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;18.106480571706808&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;2&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;28.03679268099916&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;3&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;1.0575968433736358&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;02.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;03.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;2&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;3&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let’s inspect the document structure by taking a closer look at a subset of the contained fields:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;control.min&lt;/strong&gt; holds the bucket’s lower bound timestamp value which depends on the chosen granularity, additionally the lowest value measured in this bucket and the ObjectId referring to the first entry stored in this document’s bucket.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;control.max&lt;/strong&gt; holds the most recent timestamp value stored in this bucket, additionally the highest value measured in this bucket and the ObjectId referring to the last entry stored in this document’s bucket so far.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Obviously the contained data for both, &lt;em&gt;control.min&lt;/em&gt; and &lt;em&gt;control.max&lt;/em&gt; is updated on-the-fly as new sensor readings are ingested into this document and its bucket. In general, those two sub-documents would store the &lt;strong&gt;min and max values&lt;/strong&gt; for each field contained in the original measurement’s payload. In our case it was only the &lt;em&gt;value&lt;/em&gt; field with a single windspeed measurement.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;data&lt;/strong&gt; is a complex object that holds all the information of every sensor data payload that has been ingested so far. There are just 3 fields in this particular example: the document identifier &lt;strong&gt;(_id)&lt;/strong&gt;, the timestamp &lt;strong&gt;(ts)&lt;/strong&gt; and the sensor data &lt;strong&gt;(value)&lt;/strong&gt;. If there were more fields in the original measurement document besides just &lt;em&gt;value&lt;/em&gt;, they would all be stored here in a similar fashion. The measurements themselves are referred to by using a field name given by the bucket index i.e. 0 .. N for every single measurement. In general the &lt;em&gt;data&lt;/em&gt; field would hold sub-documents for all payload fields of the original measurement document.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Based on this single document it is possible to reconstruct every original measurement document which was ever ingested into this bucket, simply by combining the &lt;em&gt;meta&lt;/em&gt; field with every &lt;em&gt;3-tuple&lt;/em&gt;, e.g. { &lt;em&gt;id.0, ts.0, value.0 } … { _id.N, ts.N, value.N } taken from the _data&lt;/em&gt; field. In general, this would be an N-tuple, since the &lt;em&gt;data&lt;/em&gt; field would hold sub-documents for all payload fields of the original measurement document. One concrete example for our sample data resulting in the first original measurement document which was stored in this bucket is:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;
&lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a19&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
 &lt;span class="nx"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;02.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;18.263742590570686&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="nx"&gt;meta&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we inspect the other 3 documents in the underlying storage-optimized collection they look all very similar. The only structural difference of the buckets is that currently, each bucket has a different number of entries, which is exactly as it should be because the 10 original documents originated from 4 different sensors each having a varying number of readings being ingested until that point.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60e8e30043c83ccb1994f6d6&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="nx"&gt;control&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;version&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
     &lt;span class="nx"&gt;min&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;00.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;0.6909954039798452&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1a&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;max&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;32.53987084180961&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a20&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="nx"&gt;meta&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;31096&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="nx"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1a&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1d&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;2&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a20&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;32.53987084180961&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;0.6909954039798452&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;2&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;3.9089926192773534&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;03.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;04.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;2&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60e8e30043c83ccb1994f6d7&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;control&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
     &lt;span class="na"&gt;min&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;00.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;0.031065898581725086&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1c&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="na"&gt;max&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;06.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;20.306831899199864&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1e&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;meta&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;62088&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1c&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1e&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;20.306831899199864&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;0.031065898581725086&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;04.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;1&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;06.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60e8e30043c83ccb1994f6d8&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="na"&gt;control&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
     &lt;span class="na"&gt;min&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;00.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;6.878726412679837&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1f&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="na"&gt;max&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;6.878726412679837&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1f&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;meta&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;27470&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="na"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="na"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f3350afbb696c9ace09a1f&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;6.878726412679837&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="na"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="na"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;07.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Step 5: Playing with bucket growth and bucket limits
&lt;/h4&gt;

&lt;p&gt;The bucket document for &lt;em&gt;meta: { sensorId: 52396, type: 'windspeed' }&lt;/em&gt; currently holds 4 sensor readings. The question is how many more measurements can we ingest into this bucket? Obviously, buckets cannot grow indefinitely so there has to be an upper bound. &lt;/p&gt;

&lt;p&gt;Earlier, we inspected the view definition of the logical abstraction and briefly mentioned the &lt;em&gt;maxBucketSpanSize&lt;/em&gt; setting. When choosing a granularity of seconds during the creation of a time series collection the value for &lt;em&gt;maxBucketSpanSize&lt;/em&gt; is 3600. In other words, this means buckets like this can span 1 hour worth of data. If we were to ingest one measurement per second we might assume that such a bucket can store up to 3600 sensor readings, one every second. However, when trying this we see a different behaviour. It seems that there is some kind of fixed upper bound of 1000 entries per bucket in a time series collection. The example document below shows a "full bucket" for sensorId 52396 with their first and last bucket entries respectively while omitting the rest of the data for reasons of brevity.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight javascript"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60e8e30043c83ccb19952b3f&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="nx"&gt;control&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;version&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
     &lt;span class="nx"&gt;min&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f2ef27f161e04419383dbb&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="nx"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;00.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;0.13698301036640723&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;max&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f2ef3bf161e04419477ec3&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="nx"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;39&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;23.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="nx"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;37.4522108368789&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="nx"&gt;meta&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;sensorId&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;52396&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;windspeed&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
  &lt;span class="nx"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
   &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="nl"&gt;value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;18.106480571706808&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;

      &lt;span class="p"&gt;...&lt;/span&gt;

        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;999&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;3.6329149494110027&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;03.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;

      &lt;span class="p"&gt;...&lt;/span&gt;

        &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;999&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2021&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;07&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="nx"&gt;T00&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;39&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;23.000&lt;/span&gt;&lt;span class="nx"&gt;Z&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt;
     &lt;span class="nx"&gt;_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; 
      &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;0&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f2ef36f161e0441944f2df&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;

      &lt;span class="p"&gt;...&lt;/span&gt;

      &lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="s1"&gt;999&lt;/span&gt;&lt;span class="dl"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;ObjectId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="s2"&gt;60f2ef2ff161e044193f73ff&lt;/span&gt;&lt;span class="dl"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
      &lt;span class="p"&gt;}&lt;/span&gt; 
   &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I haven’t found any indication in the current official documentation about this "magic constant" of limiting buckets to 1000 entries. In this case, it cannot be related e.g. to the document size limit because storing 1000 entries with this sample data doesn’t come anywhere close to a hard document size limit. Maybe the source code would reveal more about this, but so far I didn’t take the time to study the implementation itself.&lt;/p&gt;

&lt;p&gt;We can also see from the &lt;em&gt;control.min&lt;/em&gt; and &lt;em&gt;control.max&lt;/em&gt; timestamps that this particular bucket span size is "only" 2363 seconds which is less than the maximum possible value of 3600. This is because the bucket hit its 1000 entries limit before the span size could be reached. In general, a bucket is closed and a new document created, if either its &lt;em&gt;maxBucketSpanSize&lt;/em&gt; is reached or its maximum entries are exceeded (currently 1000), whichever happens first.&lt;/p&gt;

&lt;p&gt;Another learning based on these observations explains the recommendation found in the official docs, namely that the &lt;a href="https://docs.mongodb.com/manual/core/timeseries/timeseries-granularity/#set-granularity-for-time-series-data" rel="noopener noreferrer"&gt;chosen granularity settings&lt;/a&gt; should match the actual data ingestion rate as closely as possible. In our example of a time series collection with "seconds" granularity the bucket size is 1 hour (3600 sec). If, however, we would only ingest 2 - 3 values per hour this would mean, that we would get many new documents in the underlying time series collection with very small buckets of only 2 - 3 entries each. Clearly, this would drastically impact performance in a negative way and reduce the whole storage optimization mechanism of time series collections to absurdity. So choose the granularity of your time series collections wisely.&lt;/p&gt;

&lt;h3&gt;
  
  
  Conclusion and Outlook
&lt;/h3&gt;

&lt;p&gt;MongoDB 5.0 introduced a new, natively optimized collection type for storing time series data. It makes the lives of developers easier because working with time series collections is a whole lot easier and more convenient when contrasting this to the past, where it was necessary to  explicitly implement the bucket pattern. I hope this article contributed a bit to your understanding about what exactly happens behind the scenes of time series collections from a document storage perspective and the corresponding schema which implicitly reflects the ideas behind the bucket pattern. The most important thing to keep in mind is to take my observations with a grain of salt because it was my first quick exploration of this new MongoDB 5.0 feature.&lt;/p&gt;

&lt;p&gt;I plan to write more parts in this series. The 2nd article, should discuss different kinds of aggregation queries over time series collections focusing on the newly introduced window functions.&lt;/p&gt;

&lt;h4&gt;
  
  
  Stay tuned!
&lt;/h4&gt;

&lt;p&gt;Image Credits:&lt;br&gt;
(c) lukechesser @ Unsplash - &lt;a href="https://unsplash.com/photos/JKUTrJ4vK00" rel="noopener noreferrer"&gt;https://unsplash.com/photos/JKUTrJ4vK00&lt;/a&gt;&lt;/p&gt;

</description>
      <category>mongodb</category>
      <category>timeseries</category>
      <category>iot</category>
      <category>sensors</category>
    </item>
    <item>
      <title>My very own perspective on the DevRel scenery</title>
      <dc:creator>Hans-Peter Grahsl</dc:creator>
      <pubDate>Wed, 28 Oct 2020 20:46:30 +0000</pubDate>
      <link>https://dev.to/hpgrahsl/my-very-own-perspective-on-the-devrel-scenery-4l1e</link>
      <guid>https://dev.to/hpgrahsl/my-very-own-perspective-on-the-devrel-scenery-4l1e</guid>
      <description>&lt;p&gt;&lt;strong&gt;A while ago, I took the time to kind of sort some of my thoughts about how I have experienced the DevRel scenery in general and the "mission" of developer advocates in particular over the last 3-4 years.&lt;/strong&gt;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Spoiler: it changed and made me personally struggle&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;But before pointing out why, let me take one step back and briefly reflect on my very own journey about engaging with developer communities.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Disclaimer: No reader is supposed to take anything of the following personal. Also keep in mind that as a non-native speaker I might not be able to express myself in the best possible way 🙃&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Over the last couple of years I tried to become ever more active in two vastly growing developer communities. 2015-2017, it began with various activities within MongoDB territories. A little later, starting with 2018, my focus shifted a bit towards the Apache Kafka ecosystem and its community. It basically stayed like this until today and being an active member in both these communities gave me lots of opportunities to learn from others and thereby personally grow in various areas. While the public image might be a different one, my self-perception combined with anecdotal evidence is, that along the way, I managed to become not only a passionate but recognized community member. Adding to that, I would even go one step further and characterize myself as a "self-taught and independent developer advocate", for both these tech stacks today. Whether this makes some readers laugh or just smile is not really relevant at this point.&lt;/p&gt;

&lt;p&gt;Coming back to developer relations, it's clear that a large part of professional advocates' work is of course to engage with the developer community across various different touch points. First and foremost, at on-site or virtual events, which might be differentiated as follows: &lt;/p&gt;

&lt;p&gt;1) There are big stage events run by vendors which are largely focused on their very own tech stack and ecosystem&lt;/p&gt;

&lt;p&gt;2) Then there are big stage events with a rather broad technology mix, typically independent of specific vendors, apart from sponsorship of course.&lt;/p&gt;

&lt;p&gt;Both these types of conferences can be considered to be mostly revenue driven at least from the point of the organizers which is why I would call them "industry events".&lt;/p&gt;

&lt;p&gt;3) There are small(er) stage events which are also featuring a nice technology mix.&lt;/p&gt;

&lt;p&gt;4) Finally, there are little community events / meetups which are usually the only "free for participants" on-site events and these are at best "cost-covering" but are definitely not profit-oriented.&lt;/p&gt;

&lt;p&gt;Your mileage may vary on this "categorization" and I admit that it could have been done in a more nuanced way. Still it's not completely wrong or useless - at least sufficient or good enough to better understand the point I'm trying to make.&lt;/p&gt;

&lt;p&gt;The thing is, developer advocates, technology evangelists or however you want to call these folks, originally focused on the two big stage conference types that I mentioned above. In fact, this makes a lot of sense since after all, this is where vendors are likely to generate the most impact business-wise, provided their people are doing a good job during said events. So besides sending out people from their very own DevRel team to engage with the audience at vendor conferences, they are also expected to be seen at big stage multi-tech events. This is totally understandable and perfectly fine.&lt;/p&gt;

&lt;p&gt;Over time, however, I've experienced that more and more companies are trying to "flood" - sorry for this unintended negative connotation - smaller, community-driven events with their advocates, too. And herein lies the problem: this is a pain point for me, thus, it gives me a bit of frustration in terms of my very own devrel activities. After all, it's very hard to "prevail over professional advocates" for individuals like myself, especially at smaller to medium-sized community events featuring a mixed technology content. The reasons for this are clear: first, there are typically only one or at best two talks for any given technology at such events and second, given two equally good abstracts, organizers would almost certainly decide for "official advocates" representing the company behind a specific tech rather than for "someone just doing stuff" in the community. It attracts more people and helps them sell tickets which is again understandable from an economic perspective. At the same time, the probabilities to get speaking engagements for folks like me are vastly decreasing, as ever more professional developer advocates are hired and sent out to talk at smaller events, all the way down to meetups.&lt;/p&gt;

&lt;p&gt;The thing is, I'm not even trying to change this situation because, as it stands today, I couldn't do much about it anyway. Basically all I can do is "fight and push harder" to still be able to speak here and there, despite the challenges mentioned above. I think you can well imagine that continuing to do so is very exhausting to say the least. The other strategy is to stop doing it and invest the "saved" time &amp;amp; effort for other things. The latter would mean that I'm personally not affected any longer but it would still remain the same, unsatisfactory situation for other community passionates like myself who are "suffering similar pain".&lt;/p&gt;

&lt;p&gt;I'm very looking forward to receiving any input on that matter. If you think I'm crazy or an idiot fair enough, but at least please bring something constructive to "my table".&lt;/p&gt;

&lt;p&gt;Yours truly.&lt;/p&gt;

</description>
      <category>devrel</category>
      <category>communities</category>
      <category>individuals</category>
    </item>
    <item>
      <title>How to build a streaming 🤩 emojis 😍 tracker app with 🚀 ksqlDB 🚀</title>
      <dc:creator>Hans-Peter Grahsl</dc:creator>
      <pubDate>Tue, 31 Mar 2020 18:21:24 +0000</pubDate>
      <link>https://dev.to/hpgrahsl/how-to-build-a-streaming-emojis-tracker-app-with-ksqldb-514a</link>
      <guid>https://dev.to/hpgrahsl/how-to-build-a-streaming-emojis-tracker-app-with-ksqldb-514a</guid>
      <description>&lt;p&gt;I recently created and published a tiny but handy project which provides a few custom &lt;a href="https://github.com/hpgrahsl/ksqldb-emoji-functions" rel="noopener noreferrer"&gt;user-defined functions (UDFs) to work with emojis&lt;/a&gt; in &lt;a href="https://ksqldb.io/" rel="noopener noreferrer"&gt;ksqlDB&lt;/a&gt;. While there is a minimum viable amount of project documentation, it's often much easier based on a little application to quickly understand how to make use of these functions and what you can build with them.&lt;/p&gt;

&lt;p&gt;The goal is to build a simple, yet fault-tolerant and scalable stream processing backend to do near real-time tracking of &lt;a href="https://en.wikipedia.org/wiki/Emoji" rel="noopener noreferrer"&gt;emojis&lt;/a&gt; based on public tweets. Such an application can be used to feed a live web dashboard which, for instance, might show a continuously updated ranking of the most popular emojis derived from extracting and aggregating emoji occurrences found in tweets. It could look like the illustration below:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fbjvp34crul55tydc412y.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fbjvp34crul55tydc412y.jpg" alt="Streaming Emoji Tracker Dashboard" width="800" height="418"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Thanks to ksqlDB, we can build all vital components for such an application in one coherent technology stack. Its unified streaming architecture:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;allows configuration-based data ingress and egress based on Apache Kafka Connect&lt;/li&gt;
&lt;li&gt;enables transformations, aggregations, and enrichments of data with a convenient SQL-like language&lt;/li&gt;
&lt;li&gt;and supports both, streaming queries as well as point-in-time lookups against the managed state based on materialized views&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Let's get this started step-by-step...&lt;/strong&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 1: Data Ingestion
&lt;/h3&gt;

&lt;p&gt;Directly from within ksqlDB, we can manage Apache Kafka Connect source and sink connectors. For this example, we leverage a turn-key ready &lt;a href="https://www.confluent.io/hub/jcustenborder/kafka-connect-twitter" rel="noopener noreferrer"&gt;Twitter source connector&lt;/a&gt; from the community. &lt;strong&gt;Before you can deploy this connector you have to get your access credentials&lt;/strong&gt; for the &lt;a href="https://developer.twitter.com/en/docs/basics/authentication/oauth-1-0a" rel="noopener noreferrer"&gt;Twitter API&lt;/a&gt; which need to be configured for the source connector. The following ksql snippet - source connector configuration in the &lt;code&gt;WITH&lt;/code&gt; clause - does the job and brings in public live tweets that match certain keywords:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE SOURCE CONNECTOR `my-twitter-src-01` WITH (
    'connector.class'='com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector',
    'value.converter'='org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable'=false,
    'key.converter'='org.apache.kafka.connect.json.JsonConverter',
    'key.converter.schemas.enable'=false,
    'twitter.oauth.accessToken'='...',
    'twitter.oauth.consumerSecret'='...',
    'twitter.oauth.consumerKey'='...',
    'twitter.oauth.accessTokenSecret'='...',
    'kafka.status.topic'='tweets',
    'process.deletes'=false,
    'filter.keywords'='coronavirus,2019nCoV,SARSCoV2,covid19,cov19'
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;NOTE: You have to replace all 4 &lt;code&gt;twitter.oauth.*&lt;/code&gt; settings with your access credentials for this to work properly.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Shortly after the source connector starts it will produce JSON records into the configured Apache Kafka topic named &lt;code&gt;tweets&lt;/code&gt;. To quickly inspect the data ingestion we define our base stream reduced to the essence of the original payload like so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- 'create stream from tweets topic only using a few relevant payload fields'&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;STREAM&lt;/span&gt; &lt;span class="n"&gt;tweets&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ID&lt;/span&gt; &lt;span class="nb"&gt;BIGINT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CREATEDAT&lt;/span&gt; &lt;span class="nb"&gt;BIGINT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;LANG&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="k"&gt;USER&lt;/span&gt; &lt;span class="n"&gt;STRUCT&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;SCREENNAME&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafka_topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'tweets'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;value_format&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'JSON'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To inspect the output of this stream for a single ingested tweet we &lt;code&gt;SELECT&lt;/code&gt; just one record:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ksql&amp;gt; SELECT ID,TEXT FROM tweets EMIT CHANGES LIMIT 1;
+-------------------------------------+---------------------------------------------+
|ID                                   |TEXT                                         |
+-------------------------------------+---------------------------------------------+
|1244971755531354112                  |RT @Faytuks: #BREAKING🚨 - Sweden has confirm|
|                                     |ed 34 new #coronavirus deaths, raising the co|
|                                     |untry's death toll to 180 with 4 028 confirme|
|                                     |d cas…                                       |
Limit Reached
Query terminated
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 2: Data Processing
&lt;/h3&gt;

&lt;p&gt;Now, for the actual stream processing on top of the ingested data, we first create a derived stream &lt;code&gt;tweets_emojis&lt;/code&gt; by applying our custom UDF &lt;code&gt;EXTRACT_EMOJIS&lt;/code&gt;. You can find a short description how to install these custom emoji handling functions into your ksql-server installation &lt;a href="https://github.com/hpgrahsl/ksqldb-emoji-functions#installation--deployment" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- 'create derived stream using the EMOJIS_EXTRACT function on the TEXT column'&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;STREAM&lt;/span&gt; &lt;span class="n"&gt;tweets_emojis&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; 
    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;EMOJIS_EXTRACT&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;TEXT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="k"&gt;false&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;EMOJIS&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;tweets&lt;/span&gt; &lt;span class="n"&gt;EMIT&lt;/span&gt; &lt;span class="n"&gt;CHANGES&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This UDF extracts an array of potentially contained tweets from the raw &lt;code&gt;TEXT&lt;/code&gt; column. The 2nd parameter of type boolean allows us to choose between list (&lt;code&gt;unique=false&lt;/code&gt;) or set semantic (&lt;code&gt;unique=true&lt;/code&gt;) which decides whether or not the extracted array may contain duplicate emoji entries.&lt;/p&gt;

&lt;p&gt;The next step is to flatten the emojis array which we can easily achieve by applying &lt;a href="https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/table-functions/" rel="noopener noreferrer"&gt;ksqlDB's built-in UDTF&lt;/a&gt; called &lt;code&gt;EXPLODE&lt;/code&gt;. Doing so will automatically skip any tweet which didn't contain emojis at all - those which are represented with an empty array in the above &lt;code&gt;tweets_emojis&lt;/code&gt; stream.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- 'create flattened stream to get each contained emoji of a tweet in a separate record'&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;STREAM&lt;/span&gt; &lt;span class="n"&gt;tweets_emojis_flattened&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; 
    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;EXPLODE&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;EMOJIS&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;EMOJI&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;tweets_emojis&lt;/span&gt; &lt;span class="n"&gt;EMIT&lt;/span&gt; &lt;span class="n"&gt;CHANGES&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The resulting stream contains all occurring emojis as separate records:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ksql&amp;gt; SELECT ROWKEY,EMOJI FROM tweets_emojis_flattened EMIT CHANGES LIMIT 10;
+--------------------------------+--------------------------------+
|ROWKEY                          |EMOJI                           |
+--------------------------------+--------------------------------+
|{"Id":1244971755531354112}      |🚨                              |
|{"Id":1244971755996880897}      |😢                              |
|{"Id":1244971756122550272}      |💡                              |
|{"Id":1244971757011902469}      |👇🏻                              |
|{"Id":1244971760203845632}      |📽                              |
|{"Id":1244971760203845632}      |☣                              |
|{"Id":1244971760203845632}      |🇷🇺                              |
|{"Id":1244971760203845632}      |🛑                              |
|{"Id":1244971760639827968}      |❤                               |
|{"Id":1244971760287731718}      |🤣                              |
Limit Reached
Query terminated
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This stream serves as input to create a table by doing a simple aggregation to group and count the emojis:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- 'create a table which continuously calculates the total count of each specific emoji'&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;tweets_emoji_counts&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; 
   &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;EMOJI&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="k"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;TOTAL_COUNT&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;tweets_emojis_flattened&lt;/span&gt;
      &lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;EMOJI&lt;/span&gt; &lt;span class="n"&gt;EMIT&lt;/span&gt; &lt;span class="n"&gt;CHANGES&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 3: Queries
&lt;/h3&gt;

&lt;p&gt;ksqlDB provides two different types of queries against tables:&lt;/p&gt;

&lt;h5&gt;
  
  
  Pull Queries
&lt;/h5&gt;

&lt;p&gt;Pull queries can be used to do &lt;strong&gt;point-in-time lookups against the continuously updated state&lt;/strong&gt; which is represented by the table. We can retrieve the current total counter of any specific emoji by specifying it as the &lt;code&gt;ROWKEY&lt;/code&gt; in the &lt;code&gt;WHERE&lt;/code&gt; clause:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- 'pull query to retrieve single point-in-time emoji count'&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;EMOJI&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;TOTAL_COUNT&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;tweets_emoji_counts&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;ROWKEY&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'🙏'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ksql&amp;gt; SELECT EMOJI,TOTAL_COUNT FROM
tweets_emji_counts WHERE ROWKEY = '🙏';
+--------------------+--------------------+
|EMOJI               |TOTAL_COUNT         |
+--------------------+--------------------+
|🙏                  |20                  |
Query terminated
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h5&gt;
  
  
  Push Queries
&lt;/h5&gt;

&lt;p&gt;Push queries are meant to &lt;strong&gt;run indefinitely and emit results every time there are changes&lt;/strong&gt; in the underlying data stream or updates of the table state respectively. We can subscribe to a continuously updated total counter stream of a single, several specific or all emojis:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- 'PUSH query: get change stream for single emoji counter'&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;EMOJI&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;TOTAL_COUNT&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;tweets_emoji_counts&lt;/span&gt;
   &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;emoji&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'🙏'&lt;/span&gt; &lt;span class="n"&gt;EMIT&lt;/span&gt; &lt;span class="n"&gt;CHANGES&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ksql&amp;gt; SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts
WHERE emoji = '🙏' EMIT CHANGES;
+---------------------+---------------------+
|EMOJI                |TOTAL_COUNT          |
+---------------------+---------------------+
|🙏                   |55                   |
|🙏                   |56                   |
|🙏                   |59                   |
|🙏                   |61                   |
^CQuery terminated
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- 'PUSH query: change stream for several emoji counters - there is no IN(...) yet :('&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;EMOJI&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;TOTAL_COUNT&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;tweets_emoji_counts&lt;/span&gt;
   &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;emoji&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'🙏'&lt;/span&gt; &lt;span class="k"&gt;OR&lt;/span&gt; &lt;span class="n"&gt;emoji&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'🚨'&lt;/span&gt; &lt;span class="k"&gt;OR&lt;/span&gt; &lt;span class="n"&gt;emoji&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'😭'&lt;/span&gt; &lt;span class="n"&gt;EMIT&lt;/span&gt; &lt;span class="n"&gt;CHANGES&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ksql&amp;gt; SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts 
WHERE emoji = '🙏' OR emoji = '🚨' OR emoji = '😭' EMIT CHANGES;
+---------------------+---------------------+
|EMOJI                |TOTAL_COUNT          |
+---------------------+---------------------+
|😭                   |48                   |
|🙏                   |66                   |
|🚨                   |109                  |
|😭                   |49                   |
|🚨                   |111                  |
|😭                   |50                   |
|🚨                   |112                  |
|🙏                   |67                   |
^CQuery terminated
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 4: Serving Data to Clients
&lt;/h3&gt;

&lt;p&gt;Thanks to its &lt;a href="https://docs.ksqldb.io/en/latest/developer-guide/api/" rel="noopener noreferrer"&gt;HTTP / REST API&lt;/a&gt;, ksqlDB lets us directly &lt;strong&gt;expose both, push and pull queries to consuming clients.&lt;/strong&gt; For instance, we can run the needed queries to build a live dashboard in which we visualize the data streams and offer end-users a simple form-based query mechanism. Below is a cURL snippet, which when run against the REST API, is showing the output of a &lt;code&gt;PULL&lt;/code&gt; query to retrieve the current counter for the 🙏 emoji:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -s -X "POST" "http://localhost:8088/query" \
  -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
  -d $'{
   "ksql": "SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts WHERE ROWKEY = \'🙏\';",
    "streamsProperties": {}
  }' | jq '.[1] | {emoji: .row.columns[0],count: .row.columns[1]}'
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;which for my particular data results in:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"emoji"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"🙏"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"count"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2198&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Have fun handling Emojis in 🚀 ksqlDB 🚀
&lt;/h3&gt;

</description>
      <category>ksqldb</category>
      <category>streamprocessing</category>
      <category>apachekafka</category>
      <category>emojis</category>
    </item>
    <item>
      <title>Towards a MongoDB-backed Apache Kafka Streams State Store</title>
      <dc:creator>Hans-Peter Grahsl</dc:creator>
      <pubDate>Mon, 23 Dec 2019 16:03:50 +0000</pubDate>
      <link>https://dev.to/hpgrahsl/towards-a-mongodb-backed-apache-kafka-streams-state-store-20ik</link>
      <guid>https://dev.to/hpgrahsl/towards-a-mongodb-backed-apache-kafka-streams-state-store-20ik</guid>
      <description>&lt;h2&gt;
  
  
  Setting the Scene
&lt;/h2&gt;

&lt;p&gt;Nowadays, there is an increasing number of business-critical services across different industries that need to process data quicker than ever to be able to react to changing conditions with very low latencies. Irrespective of the concrete technology stack we use for near real-time stream processing, there are two broad categories of operations that we want to apply on never-ending streams of data, both of which are vastly different:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;First, some applications process single messages, one by one in a stateless fashion. Examples would be to filter records according to pre-defined criteria or to apply a set of transformations on every message that passes by. Such types of workloads are straight-forward to reason about since their stateless nature lets us process a message independently from any other message that we have already acted upon or might do so in the future.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Second, there are applications which are a completely different "beast" since they need to deal with state management along the way. Examples for stateful operations in the context of stream processing are aggregations, counting, joining streams of data or keeping track of messages within certain timespans a.k.a windowing. The hardest parts to get right in this regard are related to fault-tolerance and scalability of said state.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;So yes, efficient and reliable state management is a rather tricky undertaking comprising aspects like state growing very large - beyond the available memory - and at the same time guaranteeing that the maintained state stays consistent and isn't lost in case of certain failure scenarios.&lt;br&gt;
Luckily, most of these challenges are typically taken care of to a very large degree by the stream processing frameworks and technologies that we use - so don't be scared away. Instead, let's take a brief look at how &lt;a href="https://kafka.apache.org/24/documentation/streams/developer-guide/" rel="noopener noreferrer"&gt;Apache Kafka Streams&lt;/a&gt; is dealing with state and how it helps to take off most of the burden from our shoulders.&lt;/p&gt;

&lt;p&gt;In case you are already familiar with state management in Kafka Streams applications feel free to skip the following section and dive right into this blog post's matter.&lt;/p&gt;

&lt;h2&gt;
  
  
  State Management in Apache Kafka Streams
&lt;/h2&gt;

&lt;p&gt;Whenever our stream processing applications leverage stateful operations Kafka Streams takes care to create and continuously maintain the underlying &lt;a href="https://kafka.apache.org/24/documentation/streams/architecture#streams_architecture_state" rel="noopener noreferrer"&gt;state stores&lt;/a&gt; to persist any changes regarding the state. In general, a single application may require one or more such state stores which can be accessed by their respective names to selectively write to and read from the state store in question.&lt;br&gt;
The actual storage mechanism behind the scenes has been built with a 'pluggable architecture' in mind. Out of the box, Kafka Streams supports state stores running on top of &lt;a href="https://rocksdb.org/" rel="noopener noreferrer"&gt;RocksDB&lt;/a&gt;, which is "an embeddable persistent key-value store for fast storage". Embeddable means, that there is no need to spin-up and run external processes - as we would do for database servers - but instead, RocksDB is used as part of the Kafka Streams application and it typically persists to a locally attached disk of the corresponding processing instance. Besides leveraging RocksDB, we can opt for an in-memory only state store implementation, which essentially means that all state is kept and maintained by a hash map structure held inside the JVM's heap. The application then doesn't persist any state to its locally attached disk. Kafka Streams applications can be scaled-out, such that several processing instances are run in parallel to share the workload. Each of the instances then processes a subset of all available partitions of the involved Kafka topics. Likewise, each of the instances locally holds and maintains a partial application state only. In other words, the entire application state is distributed across multiple nodes. Since nodes can go down anytime, either unexpectedly or on purpose, the instance-local state can, of course, be temporarily lost. To cope with such situations and other failure scenarios, Kafka Streams not only performs state changes locally but writes all modifications into so-called changelog topics backed by Apache Kafka. This allows recovering from state loss because the missing state can be re-created by reprocessing the records from the corresponding Kafka topic partitions.&lt;/p&gt;

&lt;h3&gt;
  
  
  Handling State Store Access
&lt;/h3&gt;

&lt;p&gt;From a developer's perspective, the way we create state stores and interact with them very much depends on which of the two different Kafka Streams APIs (&lt;a href="https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html" rel="noopener noreferrer"&gt;Streams DSL&lt;/a&gt; vs. &lt;a href="https://kafka.apache.org/24/documentation/streams/developer-guide/processor-api.html" rel="noopener noreferrer"&gt;Processor API&lt;/a&gt;) we choose to write our stream processing applications. The DSL offers a very convenient way to define stream processors thanks to its declarative, functional and fluent API nature. Most users like DSL abstractions as well as higher-level building blocks and thus tend to primarily leverage the DSL. More importantly, when it comes to state management due to the use of stateful operators, the DSL takes care to auto-create the needed state stores. Additionally, the interactions with the state stores are handled in a fully transparent manner. That said, all the complexities that come along with explicitly touching state are more or less hidden away.&lt;/p&gt;

&lt;p&gt;On the other hand, there are use cases for which we need certain functionality that simply isn't provided by the DSL. Whenever we reach that point, we can "fallback to" and use the imperative-style Processor API (PAPI). While it offers us more flexibility and lower-level building blocks, at the same time, it means we need to write more code ourselves. Also when it comes to state management, it's up to us to create any required state stores in the first place, and explicitly write to or read from them as needed.&lt;/p&gt;

&lt;h3&gt;
  
  
  Exposing Managed State
&lt;/h3&gt;

&lt;p&gt;More often than not, the application state which is managed by Kafka Streams needs to be made available to the outside world, such that other services can directly perform state-based queries against the state at any point in time. For that purpose, there is a mechanism called &lt;a href="https://kafka.apache.org/24/documentation/streams/developer-guide/interactive-queries.html" rel="noopener noreferrer"&gt;Interactive Queries&lt;/a&gt; which allows performing simple queries based on a single or a range of keys. Additionally, Kafka Streams maintains and exposes the necessary meta-data which is required to derive which of potentially many processing instances is currently responsible for what parts of the distributed application state. This meta-data can be leveraged to route key-based lookup queries to the right target instance which locally holds and maintains the partial state containing the data for given keys. &lt;a&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  The Case for Custom State Stores
&lt;/h2&gt;

&lt;p&gt;While simple key-based state lookups might serve lots of use cases just fine, there are others for which such access patterns could be too restrictive. We might need to support richer queries against the application's state which is potentially given by rather complex aggregate structures that are stored for different keys. Luckily, we learned above that there is a 'pluggable' state store architecture in place and exactly in this regard, let's briefly explore how we could allow for arbitrary queries against the persisted state using a custom, remote state store implementation.&lt;/p&gt;

&lt;p&gt;When we think about the implementation of custom state stores, the following high-level characteristics are rather important:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;the data store backing the Kafka Streams state store should be resilient &amp;amp; scalable enough and offer acceptable performance because Kafka Streams applications can cause a rather high read/write load&lt;/li&gt;
&lt;li&gt;since application state may consist of complex aggregate structures, support for rich querying and indexing capabilities would be beneficial to be able to go way beyond key-based lookups&lt;/li&gt;
&lt;li&gt;ideally, the custom state store implementation works as a drop-in replacement so that it can be used with minimal to no code and/or configuration changes for existing Kafka Streams applications&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  MongoDB-backed State Store Implementation
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://www.mongodb.com/" rel="noopener noreferrer"&gt;MongoDB&lt;/a&gt; can be considered one of the leading document-oriented databases, which are a &lt;strong&gt;very good fit to store, query and index complex aggregate structures.&lt;/strong&gt; Furthermore, MongoDB offers &lt;strong&gt;replication and sharding&lt;/strong&gt; out-of-the-box which are must-have features to cope with potentially high read/write loads as well as the proper level of resilience for backing Kafka Streams applications' state.&lt;/p&gt;

&lt;h3&gt;
  
  
  Challenge
&lt;/h3&gt;

&lt;p&gt;Ideally, we would just take any application-level data structures which are used behind arbitrary stateful operations and put them into MongoDB collections as documents. However, that is easier said than done since the Kafka Streams DSL strictly limits the possibilities in that regard. The way it works under the covers is that the DSL uses several "nested stores", wrapping one another, similar to a decorator approach.&lt;/p&gt;

&lt;p&gt;The outer-most state store "layer", for instance, is a &lt;code&gt;MeteredKeyValueStore&lt;/code&gt;. Then it depends, whether or not caching and/or change logging are enabled. If so, there are e.g. &lt;code&gt;CachingKeyValueStore&lt;/code&gt; and &lt;code&gt;ChangeLoggingKeyValueStore&lt;/code&gt; in-between, before we finally reach the actual store specific implementation - the &lt;code&gt;RocksDBStore&lt;/code&gt; per default or a custom one.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fj17rpxdlbdvq5j5cz17i.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fj17rpxdlbdvq5j5cz17i.png" alt="default RocksDB state store data flow" width="800" height="785"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The following stack trace based on debugging an application using the default &lt;code&gt;RocksDBStore&lt;/code&gt; also reflects, how this looks like if both, caching and change logging are enabled:&lt;/p&gt;

&lt;h4&gt;
  
  
  For putting a new key-value pair into the state store:
&lt;/h4&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7qyd9nipc3s07llh7zgo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F7qyd9nipc3s07llh7zgo.png" alt="debug stack trace put" width="684" height="324"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  For getting the value for a key from the state store:
&lt;/h4&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhjuots1k9ktrxf47j4j0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhjuots1k9ktrxf47j4j0.png" alt="debug stack trace get" width="682" height="184"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now, the main challenge about this is the fact, that very early in this series of "decorated" state stores - in fact right after the &lt;code&gt;MeteredKeyValueStore&lt;/code&gt; layer -  the API contracts are already bound to keys and values being &lt;code&gt;bytes&lt;/code&gt;. &lt;strong&gt;As a consequence, we do not have direct access anymore to our original data structures (POJOs) at the inner-most "layer", namely the concrete and persisting key-value store implementation.&lt;/strong&gt; Instead, we have to deal with the bytes resulting from after the serialization which took place in the &lt;code&gt;MeteredKeyValueStore&lt;/code&gt; based on the configured &lt;a href="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/common/serialization/Serdes.html" rel="noopener noreferrer"&gt;&lt;code&gt;Serdes&lt;/code&gt;&lt;/a&gt;. Leaving aside the DSL, and sticking to state stores that can only be used together with the PAPI, would allow us to implement a fully customizable state store. Doing so, we could easily break free from being tightly bound to handling bits &amp;amp; bytes. But then again, it is not feasible any longer to use the custom state store for any existing code in the wild which relies on the DSL. Gone would then be the option to have a nicely working drop-in replacement.&lt;/p&gt;

&lt;p&gt;That being said, the proposed way to deal with this dilemma is to embrace the API constraints and come up with an alternate solution that can work based on the bytes contract. Obviously, the custom state store implementation needs to be aware of the configured Serde classes. It can then leverage the deserializers to restore the original key/value application-level data structures, convert these into MongoDB documents and write them into corresponding collections. Conversely, whatever needs to be read back from the database, is first mapped from documents into the application-level data structures, before being serialized and sent through the state store hierarchy as bytes. The illustration below depicts the handling of data on its way in and out of the database and through the state store hierarchy:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F90uxa3yk6spx75ru5b3j.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F90uxa3yk6spx75ru5b3j.png" alt="MongoDB custom state store data flow" width="800" height="1089"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Resulting Sample State Documents in MongoDB
&lt;/h4&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frmh5ot1g51y3ryw9qss9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frmh5ot1g51y3ryw9qss9.png" alt="Sample State Documents" width="800" height="602"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Many more Possibilities
&lt;/h4&gt;

&lt;p&gt;Once we have our stream processing state persisted in MongoDB, we can leverage its full potential when it comes to exposing said state. Any of the available fields can be efficiently queried. Thanks to sophisticated secondary index support this can range from geospatial queries all the way through to full-text search or even complex &lt;a href="https://docs.mongodb.com/manual/core/aggregation-pipeline/" rel="noopener noreferrer"&gt;aggregation pipeline&lt;/a&gt; queries if needed. For the simple scenario shown above, we might want to define indexes on the fields &lt;code&gt;avg, min, max&lt;/code&gt; and &lt;code&gt;stationName&lt;/code&gt; to efficiently support exact matches, range queries and sorting on any of these fields.&lt;/p&gt;

&lt;p&gt;Furthermore, MongoDB's &lt;a href="https://docs.mongodb.com/manual/changeStreams/" rel="noopener noreferrer"&gt;change streams feature&lt;/a&gt; can be combined with the reactive database driver to directly stream any state changes to 3rd party clients as they happen. A commonly found use case for this would be to feed a live dashboard in a single page application with either all or a specific subset of the state changes that are happening in Kafka Streams applications.&lt;/p&gt;

&lt;h2&gt;
  
  
  Trade-Offs
&lt;/h2&gt;

&lt;h3&gt;
  
  
  De/Serialization Overhead
&lt;/h3&gt;

&lt;p&gt;Without a doubt, the suggested approach to implement a custom state store on top of MongoDB involves de/serialization overhead. On the plus side, it enables a drop-in replacement and can be leveraged together with any existing Kafka Streams DSL code that needs key-value state stores. The same approach could be used to plug-in various other state store backends with relative ease.&lt;/p&gt;

&lt;h3&gt;
  
  
  Remote State
&lt;/h3&gt;

&lt;p&gt;Having a centralized, remote state store means that our stream processing application has to reach across network boundaries to read/write state, which may or may not be an issue depending on the provided network reliability as well as throughput needs. Furthermore, we shouldn't forget about the availability guarantees the centralized, remote state store can provide, since having the state available at more or less "all times" is vital. Network blips might be neglected and mitigated by retry mechanisms but longer network-induced downtimes of a critical subset of the database nodes would almost certainly cause us trouble.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;However, all of these trade-offs depend on the performance characteristics of the underlying infrastructure: in some systems, network delay may be lower than disk access latency, and network bandwidth may be comparable to disk bandwidth. There is no universally ideal trade-off for all situations, and the merits of local versus remote state may also shift as storage and networking technologies evolve. -- Martin Kleppmann&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Main Benefits
&lt;/h2&gt;

&lt;p&gt;Despite the main drawbacks described above, there are a number of beneficial aspects that come along with a custom, remote state store implementation which are worth mentioning:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;persistence of state can be scaled and replicated independently from the Kafka Streams application nodes&lt;/li&gt;
&lt;li&gt;there is no such thing as local state loss if any of the Kafka Streams application nodes goes down&lt;/li&gt;
&lt;li&gt;no need to run stand-by replicas to shorten otherwise long-lasting state store rebuilding phases&lt;/li&gt;
&lt;li&gt;managed state often reflects rich data structures (aggregates that can be efficiently served to any other application based on arbitrary non-key queries backed by secondary indexes&lt;/li&gt;
&lt;li&gt;no additional processing load / network saturation caused by lots of interactive queries which have to be directly served by the Kafka Streams applications nodes themselves&lt;/li&gt;
&lt;li&gt;MongoDB can be configured to run on top of the &lt;a href="https://docs.mongodb.com/manual/core/inmemory/" rel="noopener noreferrer"&gt;in-memory storage engine&lt;/a&gt; to completely avoid disk I/O&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Summary &amp;amp; Outlook
&lt;/h2&gt;

&lt;p&gt;This blog post explained the importance of managing state in stream processing applications. While the default RocksDB-backed Apache Kafka Streams state store implementation serves various needs just fine, some use cases could benefit from a centralized, remote state store. In particular, one possible solution for such a customized implementation that uses MongoDB has been discussed. A major drawback of this approach is, of course, the fact that any state-related read and write access has to be done over the network. However, its main benefit is that complex state aggregates can be efficiently served to any 3rd party applications based on rich querying options and indexing support directly from the database, without the need to cause interactive-query-induced CPU/network load on the actual stream processing nodes.&lt;/p&gt;

&lt;p&gt;Currently, there is only proof-of-concept code for the MongoDB-backed state store implementation. The idea is to open source it after polishing some rough edges, adding at least basic documentation and further tests. In case you would find that useful or even want to contribute, please just let me know so that I can ping you once the repo is public. Stay tuned!&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Many thanks to &lt;a href="https://twitter.com/MatthiasJSax" rel="noopener noreferrer"&gt;Matthias J. Sax&lt;/a&gt; for providing me very helpful insights to better understand the constraints and trade-offs of remote state stores. Also I'm very thankful that &lt;a href="https://twitter.com/gunnarmorling" rel="noopener noreferrer"&gt;Gunnar Morling&lt;/a&gt; took the time to challenge different aspects of this blog post and to provide very useful hints and comments on this work.&lt;/em&gt;&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>streamproessing</category>
      <category>statestores</category>
      <category>mongodb</category>
    </item>
  </channel>
</rss>
