<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Antonio Davide Calì</title>
    <description>The latest articles on DEV Community by Antonio Davide Calì (@calificio).</description>
    <link>https://dev.to/calificio</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F851970%2Fb762cd0c-43ec-46e3-b349-60bf5aeffdd0.jpg</url>
      <title>DEV Community: Antonio Davide Calì</title>
      <link>https://dev.to/calificio</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/calificio"/>
    <language>en</language>
    <item>
      <title>My (Goofy) attempt on building a Flink BigQuery Source Connector</title>
      <dc:creator>Antonio Davide Calì</dc:creator>
      <pubDate>Mon, 18 Nov 2024 15:46:46 +0000</pubDate>
      <link>https://dev.to/calificio/my-goofy-attempt-on-building-a-flink-bigquery-source-connector-336c</link>
      <guid>https://dev.to/calificio/my-goofy-attempt-on-building-a-flink-bigquery-source-connector-336c</guid>
      <description>&lt;p&gt;&lt;strong&gt;EDIT&lt;/strong&gt;: So, I got told that post is probably way too long, and I fully agree. What I really suggest you to focus on is the DataSource Picture at Start, and the very last Recap at the bottom. And in case you want to deep in more in a single component, just go to the correct section.&lt;/p&gt;




&lt;p&gt;So hello everyone!&lt;br&gt;
Here I am, taking you in this journey of my attempt on creating a Flink BigQuery Source Connector.&lt;/p&gt;

&lt;p&gt;For the sake of it, I will skip a lot of "What's Flink", "BigQuery: An introduction" etc.. etc.. and I will try to jump directly to the action.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;FOR THE SAKE OF IT&lt;/strong&gt;: this is a tentative to make things work. They work, badly, inneficiently, and in a very ugly way, but they work.&lt;/p&gt;

&lt;p&gt;But let's begin at least with one important image taken from Flink Official Documentation.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9i6e22eykb6klp43lvca.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9i6e22eykb6klp43lvca.jpg" alt="Official Documentation Flink DataSource" width="800" height="455"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;From what we can understand, writing a &lt;strong&gt;Source Connector&lt;/strong&gt; needs at least to implementing the &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/sources/" rel="noopener noreferrer"&gt;DataSource structure&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Let's start looking at my project tree, just to have an overview&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;lib/src
├── main
│   ├── java
│   │   └── com
│   │       └── antoniocali
│   │           ├── Library.java
│   │           ├── bq
│   │           │   ├── BigQueryClient.java
│   │           │   ├── BigQueryReadOptions.java
│   │           │   ├── BigQueryUtils.java
│   │           │   └── converters
│   │           │       ├── AvroToRowDataConverters.java
│   │           │       └── FieldValueListToRowDataConverters.java
│   │           └── sources
│   │               ├── BigQueryDataStreamSource.java
│   │               ├── emitter
│   │               │   └── BigQueryRecordEmitter.java
│   │               ├── reader
│   │               │   ├── BigQuerySourceReader.java
│   │               │   ├── BigQuerySplitEnumerator.java
│   │               │   └── BigQuerySplitReader.java
│   │               └── &lt;span class="nb"&gt;split&lt;/span&gt;
│   │                   ├── BigQuerySourceSplit.java
│   │                   ├── BigQuerySourceSplitSerializer.java
│   │                   ├── BigQuerySourceSplitState.java
│   │                   ├── BigQuerySplitEnumeratorState.java
│   │                   └── BigQuerySplitEnumeratorStateSerializer.java

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Wow a lot of files. Yup. &lt;br&gt;
I will problably go in each one &lt;strong&gt;LOL&lt;/strong&gt; SO BE READY!.&lt;/p&gt;

&lt;p&gt;But let's make a step back and describe the image above and each of those components.&lt;/p&gt;
&lt;h1&gt;
  
  
  Split
&lt;/h1&gt;

&lt;p&gt;So what's a &lt;strong&gt;Split&lt;/strong&gt;? (the little yellow box in the graph above)&lt;br&gt;
A split is a base unit of "data" operation in Flink. I would describe mostly like a metadata of information more than data itself, at least in my case, because it contains the information needed to retrieve the actual data from BigQuery.&lt;/p&gt;

&lt;p&gt;Let's take a look at the code&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// antoniocali/sources/split/BigQuerySourceSplit.java&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.flink.api.connector.source.SourceSplit&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.Serializable&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;SourceSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Serializable&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;splitName&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;minTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;splitName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;minTimestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;splitName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;splitName&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;maxTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;minTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;minTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="nf"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;splitName&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="nf"&gt;getSplitName&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;splitName&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="nf"&gt;getMinTimestamp&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;minTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="nf"&gt;getMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="nf"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="s"&gt;"BigQuerySourceSplit{"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;"splitName='"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;splitName&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="sc"&gt;'\''&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;", minTimestamp="&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;minTimestamp&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;", maxTimestamp="&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="sc"&gt;'}'&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this case a &lt;em&gt;Split&lt;/em&gt; (that I called BigQuerySourceSplit) contains the information needed for then retrieve the data from BigQuery.&lt;/p&gt;

&lt;p&gt;I think it's worth mentioning that in this case a BigQuery Flink Source is designed to read a single BigQuery Table, and that's why the only informations needed, in terms of useful metadata, are &lt;code&gt;minTimestamp&lt;/code&gt; and &lt;code&gt;maxTimestamp&lt;/code&gt;.&lt;br&gt;
I do not need to have the information of the &lt;code&gt;project.dataset.table&lt;/code&gt; of BigQuery because they are "fix" metadata of the Source.&lt;/p&gt;

&lt;p&gt;To clarify, when we will cover later how to collect data from BigQuery using a Split, we could expect a query like&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;project&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dataset&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;table&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="o"&gt;???&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;minTimestamp&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="o"&gt;???&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;???&lt;/code&gt; will fulfilled at runtime, based on a Configuration option of my Source Connector, that contains a &lt;em&gt;Timestamp&lt;/em&gt; column of the table that can be used for "chunking" the table. (e.g. &lt;code&gt;created_at&lt;/code&gt;).&lt;/p&gt;

&lt;p&gt;Since we are talking about Split, let's also see all the class around it.&lt;/p&gt;

&lt;p&gt;A very first simple one is the &lt;strong&gt;&lt;em&gt;SerDe&lt;/em&gt;&lt;/strong&gt; class for the &lt;em&gt;Split&lt;/em&gt;. Since most of information describes run across the network, we need a simple SerDe for the Split.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// antoniocali/sources/split/BigQuerySourceSplitSerializer.java&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitSerializer&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;SimpleVersionedSerializer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitSerializer&lt;/span&gt; &lt;span class="no"&gt;INSTANCE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitSerializer&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="no"&gt;VERSION&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="nf"&gt;getVersion&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;VERSION&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="nf"&gt;serialize&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="n"&gt;bigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;IOException&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ByteArrayOutputStream&lt;/span&gt; &lt;span class="n"&gt;baos&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ByteArrayOutputStream&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; &lt;span class="nc"&gt;DataOutputStream&lt;/span&gt; &lt;span class="n"&gt;out&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;DataOutputStream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;baos&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;writeUTF&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSplitName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="n"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;writeLong&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMinTimestamp&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="n"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;writeLong&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="n"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flush&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;baos&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toByteArray&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="nf"&gt;deserialize&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;version&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;serialized&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;IOException&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;getVersion&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="n"&gt;version&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;IllegalArgumentException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                    &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;format&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"The provided serializer version (%d) is not expected (expected : %s)."&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;version&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                            &lt;span class="no"&gt;VERSION&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ByteArrayInputStream&lt;/span&gt; &lt;span class="n"&gt;bais&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ByteArrayInputStream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;serialized&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; &lt;span class="nc"&gt;DataInputStream&lt;/span&gt; &lt;span class="n"&gt;in&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;DataInputStream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;bais&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;switch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;version&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="nl"&gt;VERSION:&lt;/span&gt;
                    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;splitName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;in&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readUTF&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
                    &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;minTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;in&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readLong&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
                    &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;in&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readLong&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
                    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;splitName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;minTimestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
                    &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;IOException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Unknown version: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;version&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;A more interesting class is the &lt;strong&gt;State&lt;/strong&gt; of a Split.&lt;br&gt;
A State contains the "latest" snapshot of a Split.&lt;br&gt;
In my scenario the State contains the latest timestamp "processed".&lt;/p&gt;

&lt;p&gt;A State is important for when creating a &lt;code&gt;Checkpoint&lt;/code&gt;, needed for restoring a Split in case of errors.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// antoniocali/sources/split/BigQuerySourceSplitState.java&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Getter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.NonNull&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Setter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.Serializable&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Getter&lt;/span&gt;
&lt;span class="nd"&gt;@Setter&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;Serializable&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;minCurrentTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;maxCurrentTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceSplitState&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@NonNull&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nd"&gt;@NonNull&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;minCurrentTimestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nd"&gt;@NonNull&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;currentTimestamp&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;splitId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;minCurrentTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;minCurrentTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;maxCurrentTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;currentTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="nf"&gt;toBigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;minCurrentTimestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;maxCurrentTimestamp&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Good, now we have our split and we understand how it works. What's next?&lt;/p&gt;

&lt;p&gt;Back to the graph!&lt;/p&gt;

&lt;h1&gt;
  
  
  SplitEnumerator
&lt;/h1&gt;

&lt;p&gt;Ok, we have a split, but now? We need a way to create Splits, and the &lt;code&gt;SplitEnumerator&lt;/code&gt; class comes in our help.&lt;/p&gt;

&lt;p&gt;⚠️ Pay also attention on where these classes live: the Master/Job Manager in Flink has the job to create Splits and assign them to each Task process. &lt;br&gt;
This compared to the actual process a reading Split, that it's a Task Manager job.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;SplitEnumerator&lt;/code&gt; is one of the most important class to understand, so be ready.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitEnumerator&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;SplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="no"&gt;DISCOVER_INTERVAL&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;60_000L&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="no"&gt;INITIAL_DELAY&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Logger&lt;/span&gt; &lt;span class="no"&gt;LOG&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LoggerFactory&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getLogger&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="kd"&gt;protected&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;maxCurrentTimetstamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;SplitEnumeratorContext&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;enumContext&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="n"&gt;currentSplit&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;boolean&lt;/span&gt; &lt;span class="n"&gt;isAssigned&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;BigQueryReadOptions&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;TreeSet&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Integer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;readersAwaitingSplit&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SplitEnumeratorContext&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;enumContext&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nc"&gt;BigQueryReadOptions&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitEnumeratorState&lt;/span&gt; &lt;span class="n"&gt;enumeratorState&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;enumContext&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;enumContext&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;maxCurrentTimetstamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;enumeratorState&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;enumeratorState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCurrentMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readOptions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;currentSplit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;enumeratorState&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"0L"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MAX_VALUE&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;enumeratorState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCurrentSplit&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readersAwaitingSplit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;TreeSet&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;notifyCheckpointAborted&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;checkpointId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;SplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;super&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;notifyCheckpointAborted&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;checkpointId&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Starting BigQuery split enumerator"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;scheduleNextSplit&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;handleSplitRequest&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;subtaskId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nd"&gt;@Nullable&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;requesterHostname&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;enumContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;registeredReaders&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;containsKey&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;subtaskId&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// reader failed between sending the request and now. skip this request.&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;readersAwaitingSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;add&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;subtaskId&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;addSplitsBack&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;list&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;subTaskId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;list&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;currentSplit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;list&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;remove&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isAssigned&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;addReader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitEnumeratorState&lt;/span&gt; &lt;span class="nf"&gt;snapshotState&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;checkpointId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;currentSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isAssigned&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;maxCurrentTimetstamp&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;IOException&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;notifyCheckpointComplete&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;checkpointId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;SplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;super&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;notifyCheckpointComplete&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;checkpointId&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;handleSourceEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;subtaskId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SourceEvent&lt;/span&gt; &lt;span class="n"&gt;sourceEvent&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;SplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;super&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;handleSourceEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;subtaskId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sourceEvent&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;discoverNewSplit&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

            &lt;span class="nc"&gt;BigQuery&lt;/span&gt; &lt;span class="n"&gt;bigquery&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BigQueryClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setReadOptions&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getBigQuery&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;tableName&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFullTableName&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"SELECT MAX("&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getColumnFetcher&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;") as max_timestamp FROM `"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;tableName&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;"`"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Discovering new split - Query: {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="nc"&gt;QueryJobConfiguration&lt;/span&gt; &lt;span class="n"&gt;queryConfig&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;QueryJobConfiguration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newBuilder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;setUseLegacySql&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="nc"&gt;TableResult&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bigquery&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;query&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;queryConfig&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;maxColumnFetcher&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;iterateAll&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;iterator&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;next&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"max_timestamp"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;convertFieldValueToEpochTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;maxColumnFetcher&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;maxCurrentTimetstamp&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Initial Load"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"InitialLoad"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;maxTimestamp&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;maxCurrentTimetstamp&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Found a new split with new timestamp: {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                        &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="n"&gt;maxCurrentTimetstamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"No new split found"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;empty&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;RuntimeException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;handleDiscoverNewSplitResult&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;newSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Throwable&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;RuntimeException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;newSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// No new split&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;currentSplit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;newSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;maxCurrentTimetstamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;currentSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;assignSplit&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;assignSplit&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Iterator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Integer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;awaitingReader&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;readersAwaitingSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;iterator&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Assigning split to readers"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;awaitingReader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;hasNext&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;nextAwaiting&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;awaitingReader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;next&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="c1"&gt;// if the reader that requested another split has failed in the meantime, remove&lt;/span&gt;
            &lt;span class="c1"&gt;// it from the list of waiting readers&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;enumContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;registeredReaders&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;containsKey&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;nextAwaiting&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;awaitingReader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;remove&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
                &lt;span class="k"&gt;continue&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;split&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;currentSplit&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="n"&gt;bqSplit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="n"&gt;enumContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;assignSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bqSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;nextAwaiting&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isAssigned&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;awaitingReader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;remove&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;


    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;scheduleNextSplit&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Scheduling Discovery next split"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;enumContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;callAsync&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="n"&gt;discoverNewSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="n"&gt;handleDiscoverNewSplitResult&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;INITIAL_DELAY&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="no"&gt;DISCOVER_INTERVAL&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="nf"&gt;convertFieldValueToEpochTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;FieldValue&lt;/span&gt; &lt;span class="n"&gt;fieldValue&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fieldValue&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isNull&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;IllegalArgumentException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"FieldValue is null"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;fieldValue&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTimestampValue&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;NumberFormatException&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nc"&gt;LocalDate&lt;/span&gt; &lt;span class="n"&gt;date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LocalDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;parse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fieldValue&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getStringValue&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;date&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;atStartOfDay&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;UTC&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toInstant&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toEpochMilli&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;My &lt;code&gt;BigQuerySplitEnumerator&lt;/code&gt; implements the &lt;code&gt;SplitEnumerator&lt;/code&gt; interface, which needs two generics &lt;code&gt;&amp;lt;SplitT extends SourceSplit, CheckpointT&amp;gt;&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;In our case &lt;code&gt;SplitT&lt;/code&gt; is the Split we created before -  &lt;code&gt;BigQuerySourceSplit&lt;/code&gt; - and &lt;code&gt;CheckpointT&lt;/code&gt; - in my case &lt;code&gt;BigQuerySplitEnumeratorState&lt;/code&gt;- is the &lt;code&gt;State&lt;/code&gt; of the SplitEnumerator. &lt;/p&gt;

&lt;p&gt;On Constructor side I've decided to pass few things:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A &lt;code&gt;SplitEnumeratorContext&lt;/code&gt; that:

&lt;ul&gt;
&lt;li&gt;Host information necessary for the SplitEnumerator to make split assignment decisions.&lt;/li&gt;
&lt;li&gt;Accept and track the split assignment from the enumerator.&lt;/li&gt;
&lt;li&gt;Provide a managed threading model so the split enumerators do not need to create their own internal threads.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;A &lt;code&gt;BigQueryOptions&lt;/code&gt; class that contains information to connect to BigQuery&lt;/li&gt;

&lt;li&gt;A &lt;code&gt;BigQuerySplitEnumeratorState&lt;/code&gt; to recover from a  Checkpoint the State.&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;Worth to mention the &lt;code&gt;TreeSet&amp;lt;Integer&amp;gt; readersAwaitingSplit&lt;/code&gt;,  contains &lt;strong&gt;Readers&lt;/strong&gt; (i.e. Task Processors) that have request a Split and are waiting for that to be assigned.&lt;/p&gt;




&lt;h2&gt;
  
  
  BIG BREAK
&lt;/h2&gt;

&lt;p&gt;At this point I need to stop and mention how bad my implementation is.&lt;br&gt;
For how I've coded the project there is only &lt;strong&gt;ONE&lt;/strong&gt; Split available and alive at time. &lt;br&gt;
This whole project was a way for me to understand the behind-the-scene of Flink.&lt;br&gt;
The result of this decision is that &lt;code&gt;SplitEnumerator&lt;/code&gt; will just generate one split at time and that's why, for example, I have a boolean variable named &lt;code&gt;isAssigned&lt;/code&gt; that tracks this only Split.&lt;/p&gt;



&lt;p&gt;Let's move to the methods that are not self-explanatory.&lt;/p&gt;

&lt;p&gt;A &lt;code&gt;SplitEnumerator&lt;/code&gt; needs was a way to &lt;em&gt;discover&lt;/em&gt; new splits, in other words, to create them.&lt;/p&gt;

&lt;p&gt;To do so I've used the following three methods - called by &lt;code&gt;scheduleNextSplit&lt;/code&gt; method:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;code&gt;discoverNewSplit&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;handleDiscoverNewSplitResult&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;assignSplit&lt;/code&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The &lt;em&gt;overridden&lt;/em&gt; &lt;code&gt;start&lt;/code&gt; method calls &lt;code&gt;scheduleNextSplit&lt;/code&gt; that schedule &lt;code&gt;discoverNewSplit&lt;/code&gt; every minute.&lt;br&gt;
The return value of it will be passed to &lt;code&gt;handleDiscoverNewSplitResult&lt;/code&gt; that, in case of a new Split, will assign it via &lt;code&gt;assignSplit&lt;/code&gt; method..&lt;/p&gt;

&lt;p&gt;I know it's confusing, but the idea is to have a scheduled method that discover and assign Splits.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;discovery&lt;/strong&gt; phase is based on a simple query&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;MAX&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;" + readOptions.getColumnFetcher() + "&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;max_timestamp&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="nv"&gt;`" + tableName + "`&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Every minute, it retrieves the latest timestamp available in the table. If this timestamp is greater than the most recent one stored internally, a new split is created.&lt;/p&gt;

&lt;p&gt;In case a new Split is created, the &lt;code&gt;SplitEnumerator&lt;/code&gt; would check if there are any Readers waiting for a Split, assign the split to the first one, and update the internal &lt;em&gt;State&lt;/em&gt;.&lt;/p&gt;




&lt;h3&gt;
  
  
  Let's take a break
&lt;/h3&gt;

&lt;p&gt;Ok, let's stop a seconds and try to recap what we have understood so far.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;SplitEnumerator&lt;/code&gt; creates splits and assign them to Readers that are waiting in idle.&lt;br&gt;
A Split contains important metadata needed to retrieve the actual data.&lt;br&gt;
A State class, in general, is used for creating a checkpoint.&lt;br&gt;
A SerDe class is needed for all those components that needs to be serialized and deserialized.&lt;/p&gt;

&lt;p&gt;Now that we know how a Split is created, we need to understand how to retrieve data based on the information stored on the Split.&lt;/p&gt;
&lt;h1&gt;
  
  
  SplitReader
&lt;/h1&gt;

&lt;p&gt;Here we are.&lt;br&gt;
We have an understanding of what a Split is.&lt;br&gt;
We understood how discover and create and assign them.&lt;br&gt;
Now we need a way to actually collect and read the data.&lt;br&gt;
The &lt;code&gt;SplitReader&lt;/code&gt; class is what we need.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Important&lt;/strong&gt; A single &lt;code&gt;SplitReader&lt;/code&gt; class could read from a multiple splits, but since we have only one Split available at time, the implementation of it would be easier.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="c1"&gt;// antoniocali/sources/reader/BigQuerySplitReader.java&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitReader&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;SplitReader&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Logger&lt;/span&gt; &lt;span class="no"&gt;LOG&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LoggerFactory&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getLogger&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySplitReader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Queue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;assignedSplits&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ArrayDeque&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;BigQueryReadOptions&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;currentTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Boolean&lt;/span&gt; &lt;span class="n"&gt;closed&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySplitReader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQueryReadOptions&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readOptions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;


    &lt;span class="c1"&gt;// To check where to use&lt;/span&gt;
    &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="nf"&gt;currentTimestampToFetch&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;currentTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;currentTimestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;


    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Iterator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;FieldValueList&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;retrieveSplitData&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="n"&gt;bqSplit&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;IOException&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;InterruptedException&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;BigQueryClient&lt;/span&gt; &lt;span class="n"&gt;bigQueryClient&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BigQueryClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setReadOptions&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;sqlQuery&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BigQueryUtils&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSqlFromSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bqSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Retrieving Data for - {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bqSplit&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"SQL query: {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sqlQuery&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="nc"&gt;BigQuery&lt;/span&gt; &lt;span class="n"&gt;bigQuery&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bigQueryClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getBigQuery&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;QueryJobConfiguration&lt;/span&gt; &lt;span class="n"&gt;queryConfig&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;QueryJobConfiguration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newBuilder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sqlQuery&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;setUseLegacySql&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;TableResult&lt;/span&gt; &lt;span class="n"&gt;tableResult&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bigQuery&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;query&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;queryConfig&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;tableResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;iterateAll&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;iterator&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;RecordsWithSplitIds&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;fetch&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;IOException&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;closed&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;IllegalStateException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Can't fetch records from a closed split reader."&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="nc"&gt;RecordsBySplits&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Builder&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;respBuilder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;RecordsBySplits&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Builder&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;currentSplit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofNullable&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;assignedSplits&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;currentSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"current split is empty"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;respBuilder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="nc"&gt;BigQueryClient&lt;/span&gt; &lt;span class="n"&gt;bigQueryClient&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BigQueryClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setReadOptions&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;TableSchema&lt;/span&gt; &lt;span class="n"&gt;tableSchema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;bigQueryClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTableSchema&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;Schema&lt;/span&gt; &lt;span class="n"&gt;avroSchema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SchemaTransform&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toGenericAvroSchema&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFullTableName&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
                &lt;span class="n"&gt;tableSchema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFields&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="nc"&gt;RowType&lt;/span&gt; &lt;span class="n"&gt;rowType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RowType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nc"&gt;FlinkAvroUtils&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AvroSchemaToRowType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;avroSchema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;getTypeAt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;FieldValueListToRowDataConverters&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;FieldValueListToRowDataConverter&lt;/span&gt; &lt;span class="n"&gt;fieldValueListToRowDataConverter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FieldValueListToRowDataConverters&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createRowConverter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;rowType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;actualSplit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;currentSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"actual split is {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;actualSplit&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;read&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;retrieveSplitData&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;actualSplit&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;hasNext&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;next&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
                &lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="o"&gt;++;&lt;/span&gt;
                &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;converted&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fieldValueListToRowDataConverter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;convert&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                &lt;span class="n"&gt;respBuilder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;add&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;actualSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;converted&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="n"&gt;respBuilder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addFinishedSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;actualSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="n"&gt;currentTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;actualSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Finish Reading {} - Total Records {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;actualSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;respBuilder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;InterruptedException&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;RuntimeException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;handleSplitsChanges&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SplitsChange&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;splitsChanges&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;debug&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Handle split changes {}."&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;splitsChanges&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;assignedSplits&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addAll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;splitsChanges&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;splits&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;wakeUp&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;pauseOrResumeSplits&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collection&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;splitsToPause&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nc"&gt;Collection&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;splitsToResume&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;SplitReader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;super&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pauseOrResumeSplits&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;splitsToPause&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;splitsToResume&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;


    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;closed&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;closed&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;currentTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's focus on what's important - the &lt;code&gt;fetch&lt;/code&gt; method.&lt;/p&gt;

&lt;p&gt;I've used an internal class called &lt;code&gt;RecordsBySplits&lt;/code&gt;, that provides us a easy way to push data to a Split &lt;strong&gt;Blocking Queue&lt;/strong&gt;. &lt;/p&gt;

&lt;p&gt;We finally see the query against BigQuery used to retrieve data.&lt;br&gt;
It's perfomed by the following call &lt;code&gt;var records = retrieveSplitData(actualSplit);&lt;/code&gt; and it looks like this&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="s"&gt;"SELECT * "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; 
&lt;span class="s"&gt;" FROM "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;tableName&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; 
&lt;span class="s"&gt;" WHERE "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;columnNameConverted&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; 
&lt;span class="s"&gt;" BETWEEN "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;bigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMinTimestamp&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; 
&lt;span class="s"&gt;" AND "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;bigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;fetch&lt;/code&gt; method does also few extra steps.&lt;br&gt;
My idea was to be also &lt;code&gt;TableApi&lt;/code&gt; compatible.&lt;br&gt;
So I've created a Converter from FieldValueList to RowData - the abstract class used by TableApi.&lt;/p&gt;


&lt;h3&gt;
  
  
  New Break
&lt;/h3&gt;

&lt;p&gt;We are starting to connects all the dots.&lt;br&gt;
It's important to mention that the main job of a SplitReader is just to push Records in the Reader BlockingQueue, so Flink can pass it to the downstream.&lt;/p&gt;



&lt;p&gt;If you're familiar with Flink, you know that usually a pipeline starts as &lt;code&gt;env.fromSource(???).&amp;lt;Operators&amp;gt;.to()&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;So far we checked single components, but we are missing what is needed to fulfill those ??? - a &lt;code&gt;Source&lt;/code&gt;. And that's is our next step, where all the links are going to be connected.&lt;/p&gt;



&lt;p&gt;Let's take a look of what we need for creating a Source in Flink.&lt;br&gt;
We need to create two classes:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;a &lt;code&gt;SourceReader&lt;/code&gt; class: more likely &lt;code&gt;SourceReaderBase&lt;/code&gt;, an easier provided implementation of it, that provides some synchronization between the mail box main thread and the SourceReader internal threads.&lt;/li&gt;
&lt;li&gt;a &lt;code&gt;Source&lt;/code&gt; class: which acts like a factory class that connects in one spot the SplitEnumerator and SourceReader.&lt;/li&gt;
&lt;/ol&gt;
&lt;h1&gt;
  
  
  SourceReader
&lt;/h1&gt;

&lt;p&gt;Let's take a look at the &lt;code&gt;SourceReaderBase&lt;/code&gt; first, since the &lt;code&gt;Source&lt;/code&gt; is the final glue to everything we built so far.&lt;/p&gt;

&lt;p&gt;If we take a look at our very first image on the article we can understand what a &lt;code&gt;SourceReaderBase&lt;/code&gt; actually does.&lt;br&gt;
We need a mechanism for the Reader to requests Split to the Job processor and process it.&lt;/p&gt;

&lt;p&gt;In my follow example I use a provided implementation of the &lt;code&gt;SourceReaderBase&lt;/code&gt; called &lt;code&gt;SingleThreadMultiplexSourceReaderBase&lt;/code&gt; that use a SingleThread to perform the following steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Request a Split&lt;/li&gt;
&lt;li&gt;Read the data of the Split using a &lt;code&gt;SplitReader&lt;/code&gt;
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceReader&lt;/span&gt;
        &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;SingleThreadMultiplexSourceReaderBase&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Logger&lt;/span&gt; &lt;span class="no"&gt;LOG&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LoggerFactory&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getLogger&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceReader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceReader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;FutureCompletingBlockingQueue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RecordsWithSplitIds&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;elementsQueue&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nc"&gt;Supplier&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;SplitReader&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;splitFetcherManager&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nc"&gt;RecordEmitter&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;recordEmitter&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Configuration&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nc"&gt;SourceReaderContext&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="kd"&gt;super&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;elementsQueue&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;splitFetcherManager&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;recordEmitter&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;getNumberOfCurrentlyAssignedSplits&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;sendSplitRequest&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;protected&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;onSplitFinished&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;finishedSplitIds&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt; &lt;span class="n"&gt;splitState&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;finishedSplitIds&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;values&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="n"&gt;sourceSplit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;splitState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toBigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Read for split {} is completed."&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sourceSplit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;sendSplitRequest&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;protected&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt; &lt;span class="nf"&gt;initializedState&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceSplitState&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMinTimestamp&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;split&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMaxTimestamp&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;protected&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt; &lt;span class="nf"&gt;toSplitType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt; &lt;span class="n"&gt;sst&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;splitId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sst&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMinCurrentTimestamp&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;sst&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMaxCurrentTimestamp&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;When the Reader starts, it first checks whether it has an assigned split. If not, it sends a request to obtain one.&lt;/p&gt;

&lt;p&gt;It's important to mention that &lt;code&gt;SingleThreadMultiplexSourceReaderBase&lt;/code&gt; requires some extra configuration - look at our constructor.&lt;/p&gt;

&lt;p&gt;⚠️ It requires a &lt;code&gt;RecordEmitter&lt;/code&gt;: a class that takes a record from the &lt;em&gt;SplitReader&lt;/em&gt;, updates the state of the Split and finally emits the records to the downstream. You can see an implementation below&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigQueryRecordEmitter&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;RecordEmitter&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;,&lt;/span&gt; &lt;span class="nc"&gt;Serializable&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;emitRecord&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt; &lt;span class="n"&gt;rowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SourceOutput&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;sourceOutput&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nc"&gt;BigQuerySourceSplitState&lt;/span&gt; &lt;span class="n"&gt;bigQuerySourceSplitState&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

        &lt;span class="n"&gt;sourceOutput&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rowData&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;ALERT&lt;/strong&gt;: I've never implemented a way to update the State of the Split!&lt;/p&gt;

&lt;p&gt;⚠️ It requires a &lt;code&gt;FutureCompletingBlockingQueue&lt;/code&gt;: a custom implementation of blocking queue that is used in the hand-over of data from a producing thread to a consuming thread (the same Blocking Queue I've mentioned when I was talking about a &lt;code&gt;SplitReader&lt;/code&gt;)&lt;/p&gt;




&lt;h3&gt;
  
  
  RECAP before the grand finale.
&lt;/h3&gt;

&lt;p&gt;We saw few things so far:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;What a Split is (Metadata information)&lt;/li&gt;
&lt;li&gt;a &lt;code&gt;SplitEnumerator&lt;/code&gt; that discover Splits and assigned to Readers (on Job/Master Manager)&lt;/li&gt;
&lt;li&gt;a &lt;code&gt;SplitReader&lt;/code&gt; that collects the data based on Split's metadata information (on Task Manager)&lt;/li&gt;
&lt;li&gt;a &lt;code&gt;SourceReaderBase&lt;/code&gt; that requests for a Split on a Reader when it doesn't have any assigned to (on Task Manager)&lt;/li&gt;
&lt;/ol&gt;




&lt;h1&gt;
  
  
  Source
&lt;/h1&gt;

&lt;p&gt;And finally we can step to the final class -  the &lt;code&gt;Source&lt;/code&gt;.&lt;br&gt;
As mentioned above, it is the glue to all our Lego bricks we have built so far.&lt;/p&gt;

&lt;p&gt;Let's take a look.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;BigQueryDataStreamSource&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;Source&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Logger&lt;/span&gt; &lt;span class="no"&gt;LOG&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LoggerFactory&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getLogger&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQueryDataStreamSource&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;BigQueryReadOptions&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;BigQueryDataStreamSource&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BigQueryReadOptions&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readOptions&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Boundedness&lt;/span&gt; &lt;span class="nf"&gt;getBoundedness&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Boundedness&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CONTINUOUS_UNBOUNDED&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;SplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;restoreEnumerator&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="nc"&gt;SplitEnumeratorContext&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;splitEnumeratorContext&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nc"&gt;BigQuerySplitEnumeratorState&lt;/span&gt; &lt;span class="n"&gt;bigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Restoring Enumerator with following State: {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;splitEnumeratorContext&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;SplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;createEnumerator&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="nc"&gt;SplitEnumeratorContext&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;splitEnumeratorContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="no"&gt;LOG&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Creating new Enumerator"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySplitEnumerator&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;splitEnumeratorContext&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;


    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;SimpleVersionedSerializer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getSplitSerializer&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplitSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;INSTANCE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;SimpleVersionedSerializer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;BigQuerySplitEnumeratorState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getEnumeratorCheckpointSerializer&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitEnumeratorStateSerializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;INSTANCE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;SourceReader&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;createReader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SourceReaderContext&lt;/span&gt; &lt;span class="n"&gt;sourceReaderContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt;
            &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;FutureCompletingBlockingQueue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RecordsWithSplitIds&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;elementsQueue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;FutureCompletingBlockingQueue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;BigQueryRecordEmitter&lt;/span&gt; &lt;span class="n"&gt;recordEmitter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BigQueryRecordEmitter&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;Supplier&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;SplitReader&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RowData&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySourceSplit&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;splitReaderSupplier&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BigQuerySplitReader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;readOptions&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;BigQuerySourceReader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;elementsQueue&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;splitReaderSupplier&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;recordEmitter&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Configuration&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                &lt;span class="n"&gt;sourceReaderContext&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's deep dive in:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;code&gt;getBoundiness&lt;/code&gt; provides a description of the Source: if it is Unbounded (STREAMING) or Bounded (Batched)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;createEnumerator&lt;/code&gt; glues a Source to a &lt;code&gt;SplitEnumerator&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;createReader&lt;/code&gt; glues a Source to a &lt;code&gt;SourceReader&lt;/code&gt;
&lt;/li&gt;
&lt;/ol&gt;




&lt;h1&gt;
  
  
  TL;DR - A Recap
&lt;/h1&gt;

&lt;p&gt;Here we are, at the end of our journey.&lt;br&gt;
I know I've done already few recaps but a very last one is needed, since I've also forgot to mention few things.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;A &lt;strong&gt;Split&lt;/strong&gt; serves as the fundamental unit of data processing in Flink. It primarily provides the information to retrieve the required data. Each Split is always associated with a SerDe and a State — used for checkpointing.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;A &lt;strong&gt;SplitEnumerator&lt;/strong&gt; is responsible for discovering and creating new &lt;em&gt;Splits&lt;/em&gt;, as well as assigning them to &lt;em&gt;Readers&lt;/em&gt;.&lt;br&gt;
It is also associated with a &lt;em&gt;State&lt;/em&gt;, as checkpointing is crucial to track which Splits have been created and read, as well as whether any Splits are currently assigned to Readers and to which ones.&lt;br&gt;
Given that the State of a &lt;em&gt;SplitEnumerator&lt;/em&gt; is often complex, a SerDe class is required to handle its serialization and deserialization.&lt;br&gt;
SplitEnumerator lives on Job/Master Process.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;A &lt;strong&gt;SplitReader&lt;/strong&gt;, responsible for reading the data from a Split and pushing it into the Blocking Queue of Records.&lt;br&gt;
It operates within the Job/Master Process.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;A &lt;strong&gt;SourceReader&lt;/strong&gt;, responsible for requesting a new Splits.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;[Optional] A &lt;strong&gt;RecordEmitter&lt;/strong&gt;, a straightforward implementation for retrieving data from a &lt;em&gt;SplitReader&lt;/em&gt;, updating the Split's State, and emitting records downstream.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;A &lt;strong&gt;Source&lt;/strong&gt;, the glue connecting all components. It brings together a &lt;code&gt;SplitEnumerator&amp;lt;Split&amp;gt;&lt;/code&gt; and a &lt;code&gt;SplitReader&amp;lt;Split&amp;gt;&lt;/code&gt;into a unified interface.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>flink</category>
      <category>bigquery</category>
      <category>streaming</category>
    </item>
  </channel>
</rss>
