<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Wai Yan</title>
    <description>The latest articles on DEV Community by Wai Yan (@waiyan1612).</description>
    <link>https://dev.to/waiyan1612</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F451439%2F9aee9ef5-a165-4d49-bc5b-c83220ddae4e.jpg</url>
      <title>DEV Community: Wai Yan</title>
      <link>https://dev.to/waiyan1612</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/waiyan1612"/>
    <language>en</language>
    <item>
      <title>Handling upstream data changes via Change Data Capture</title>
      <dc:creator>Wai Yan</dc:creator>
      <pubDate>Mon, 14 Sep 2020 16:49:36 +0000</pubDate>
      <link>https://dev.to/waiyan1612/handling-upstream-data-changes-via-change-data-capture-1aog</link>
      <guid>https://dev.to/waiyan1612/handling-upstream-data-changes-via-change-data-capture-1aog</guid>
      <description>&lt;p&gt;Anyone who has managed a data pipeline would be aware of how the upstream data can change and how it can impact the entire pipeline. Imagine we have the following customer data in our datalake.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+---+-----+
| id| name|
+---+-----+
|id1|Alice|
|id2|  Bob|
+---+-----+
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;One month later, we realized that the customer name has changed from Alice to Carol and we have been using incorrect data for the past month. Such data inaccuracies can impact our data analysis and machine learning models. So, how can we detect it and how can we automate it?&lt;/p&gt;

&lt;p&gt;Fortunately, with enough information, it is easy to update the data without over-complicating our existing pipeline. We need two components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Change Event Generator&lt;/strong&gt; - to generate the create/ update/ delete events of our data&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Change Event Resolver&lt;/strong&gt; - to apply these changes on our existing data&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The need for the Change Event Resolver is debatable if we can afford to live on a bleeding edge by using Apache Hudi or Delta Lake. Instead of resolving the changes and keeping a single copy of the data, Hudi / Delta Lake can persist each revision of our data (timeline in Hudi and time travel in delta lake). This is undoubtedly more powerful but there are scenarios where these components will not simply fit into existing pipelines especially because they require the data to be stored in their own hudi or delta format instead of the common parquet or avro format. This means our data cataloging, data monitoring and data visualization now need to understand hudi or delta - which may not be ready in a year's time especially if these services are not developed in-house.&lt;/p&gt;

&lt;h2&gt;
  
  
  A model to capture the change events
&lt;/h2&gt;

&lt;p&gt;Let's try to model a class that we can use to capture the changes.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ChangeEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;changeType&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;           &lt;span class="c1"&gt;// type of change - INSERT, UPDATE or DELETE&lt;/span&gt;
  &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;            &lt;span class="c1"&gt;// when this change happened &lt;/span&gt;
  &lt;span class="n"&gt;columnNames&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;],&lt;/span&gt;     &lt;span class="c1"&gt;// names of columns (Mandatory for INSERT / UPDATE)&lt;/span&gt;
  &lt;span class="n"&gt;columnValues&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;],&lt;/span&gt;    &lt;span class="c1"&gt;// values of columns (Mandatory for INSERT / UPDATE)&lt;/span&gt;
  &lt;span class="n"&gt;oldKeyNames&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;],&lt;/span&gt;     &lt;span class="c1"&gt;// names of old key columns (Mandatory for UPDATE / DELETE)&lt;/span&gt;
  &lt;span class="n"&gt;oldKeyValues&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;     &lt;span class="c1"&gt;// values of old key columns (Mandatory for UPDATE / DELETE)&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Most of the attributes are self-explanatory. &lt;code&gt;oldKeyNames&lt;/code&gt; and &lt;code&gt;oldKeyValues&lt;/code&gt; contain the primary keys  / values of the old data which will be used in case of &lt;code&gt;UPDATE&lt;/code&gt; / &lt;code&gt;DELETE&lt;/code&gt; queries. We can also enrich the model with more attributes such as &lt;code&gt;columnTypes&lt;/code&gt; and &lt;code&gt;oldKeyTypes&lt;/code&gt; if we need to apply the changes differently based on the column data types.&lt;/p&gt;

&lt;p&gt;This is commonly known as &lt;strong&gt;Change data capture (CDC)&lt;/strong&gt; and is supported by many databases - including PostgreSQL, MySQL and MongoDB. In fact, our &lt;code&gt;ChangeEvent&lt;/code&gt; class is a simplified output of &lt;a href="https://github.com/eulerto/wal2json"&gt;wal2json&lt;/a&gt; - a PostgreSQL plugin used for logical replication. &lt;/p&gt;

&lt;h3&gt;
  
  
  Pros &amp;amp; Cons
&lt;/h3&gt;

&lt;p&gt;There are 2 main advantages of using CDC:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Although the primary purpose of this feature is to create database replicas and to migrate data, it becomes very powerful in data pipelines since the change is available in the pipeline almost immediately - making a real time pipeline possible.&lt;/li&gt;
&lt;li&gt;The upstream does not need to implement the notification logic in several places. To give an example, think of a movie review website. The pipeline needs to be informed when the user creates / updates / deletes his reviews - which means the pipeline needs to be informed across 3 different REST APIs. All it takes is one developer to forget notifying the pipeline and we will be losing data without anyone realising it.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The slight con here is that there is a slight performance degradation on the source database due to replication but the impact will differ based on the type of the database.&lt;/p&gt;

&lt;h2&gt;
  
  
  Questions, questions, questions
&lt;/h2&gt;

&lt;p&gt;Now, after hearing this proposal, we might want to ask ourselves some questions.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;What if the data does not have the concept of a primary key (i.e. no &lt;code&gt;oldKeyNames&lt;/code&gt; or &lt;code&gt;oldKeyValues&lt;/code&gt;)?&lt;/li&gt;
&lt;li&gt;What if the primary key changes?&lt;/li&gt;
&lt;li&gt;What if I add a column or rename a column?&lt;/li&gt;
&lt;li&gt;What if I remove a column?&lt;/li&gt;
&lt;li&gt;What if a column changes its data type?&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Unfortunately, if our data does not have a primary key, CDC will not work at all (unless we are only interested in &lt;code&gt;INSERT&lt;/code&gt;s and do not care about &lt;code&gt;UPDATE&lt;/code&gt;s or &lt;code&gt;DELETE&lt;/code&gt;s). In this case, the best would be to add a UUID or a sequence column to be used as the primary key. Primary key changes rarely happen to properly designed data models but in the unavoidable case, it would be best to treat post-change data as a new data model and get a snapshot of the data.&lt;/p&gt;

&lt;p&gt;The rest of the questions can be addressed within our code when we apply the change events to our existing data but these will not be covered in this post.&lt;/p&gt;

&lt;p&gt;Now, let's see how it works with some examples. I am using &lt;code&gt;Apache Spark 2.4&lt;/code&gt; in this example but the same logic can be reimplemented in other frameworks as well.&lt;/p&gt;

&lt;h2&gt;
  
  
  Mocking the existing data
&lt;/h2&gt;

&lt;p&gt;First, we are going to mock some data that already exists in our datalake. Notice that &lt;code&gt;timestamp&lt;/code&gt; column is present in the data previously written to the datalake since we need to apply the change events based on this column.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.sql.&lt;/span&gt;&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nc"&gt;DataFrame&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.sql.types.&lt;/span&gt;&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nc"&gt;StringType&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StructField&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StructType&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.sql.expressions.Window&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;schema&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;StructType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"name"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;StructField&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringType&lt;/span&gt;&lt;span class="o"&gt;)))&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;datalakeDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;createDataFrame&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;sparkContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parallelize&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
    &lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"0"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"id1"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Alice"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
    &lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"0"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"id2"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Bob"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;)),&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;





&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+---------+---+-----+
|timestamp| id| name|
+---------+---+-----+
|        0|id1|Alice|
|        0|id2|  Bob|
+---------+---+-----+
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Change Event Generator
&lt;/h2&gt;

&lt;p&gt;Next is to populate the &lt;strong&gt;change events&lt;/strong&gt; that we want to apply to this data that our &lt;code&gt;Change Event Generator&lt;/code&gt; would produce. The implementation of this generator is source specific so we won't be covering this in this post.&lt;/p&gt;

&lt;p&gt;Note that &lt;code&gt;oldKeyNames&lt;/code&gt; and &lt;code&gt;oldKeyValues&lt;/code&gt; are not present for &lt;code&gt;insert&lt;/code&gt;. Similarly, &lt;code&gt;columnNames&lt;/code&gt; and &lt;code&gt;columnValues&lt;/code&gt; are not required for &lt;code&gt;delete&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;COLUMN_NAMES&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"name"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;// column names of the data&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;OLD_KEY_NAMES&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;         &lt;span class="c1"&gt;// column names of the primary key&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;changeEventDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;createDataFrame&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
    &lt;span class="nc"&gt;ChangeEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"update"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"1"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;COLUMN_NAMES&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id1"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Angela"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="nc"&gt;OLD_KEY_NAMES&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id1"&lt;/span&gt;&lt;span class="o"&gt;)),&lt;/span&gt;
    &lt;span class="nc"&gt;ChangeEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"delete"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;OLD_KEY_NAMES&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id2"&lt;/span&gt;&lt;span class="o"&gt;)),&lt;/span&gt;
    &lt;span class="nc"&gt;ChangeEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"insert"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"3"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;COLUMN_NAMES&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Seq&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"id2"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Carol"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;





&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+----------+---------+-----------+-------------+-----------+------------+
|changeType|timestamp|columnNames| columnValues|oldKeyNames|oldKeyValues|
+----------+---------+-----------+-------------+-----------+------------+
|    update|        1| [id, name]|[id1, Angela]|       [id]|       [id1]|
|    delete|        2|       null|         null|       [id]|       [id2]|
|    insert|        3| [id, name]| [id2, Carol]|       null|        null|
+----------+---------+-----------+-------------+-----------+------------+
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Let's split the &lt;code&gt;columnNames&lt;/code&gt; and &lt;code&gt;columnValues&lt;/code&gt; columns so that we have a dataframe similar to our existing data. But we will keep the &lt;code&gt;oldKeyNames&lt;/code&gt; and &lt;code&gt;oldKeyValues&lt;/code&gt; since we still need to use them.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;splitChangeEventDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;COLUMN_NAMES&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;zipWithIndex&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foldLeft&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;changeEventDf&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;column&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;df&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;withColumn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;column&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;_1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"columnValues"&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;column&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;_2&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;span class="o"&gt;}.&lt;/span&gt;&lt;span class="py"&gt;drop&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"columnNames"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"columnValues"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;





&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+----------+---------+-----------+------------+----+------+
|changeType|timestamp|oldKeyNames|oldKeyValues|  id|  name|
+----------+---------+-----------+------------+----+------+
|    update|        1|       [id]|       [id1]| id1|Angela|
|    delete|        2|       [id]|       [id2]|null|  null|
|    insert|        3|       null|        null| id2| Carol|
+----------+---------+-----------+------------+----+------+
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;According to these change events, we should apply the changes in this order.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;update &lt;code&gt;name&lt;/code&gt; of &lt;code&gt;id1&lt;/code&gt; from &lt;code&gt;Alice&lt;/code&gt; to &lt;code&gt;Angela&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;delete &lt;code&gt;id2&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;insert &lt;code&gt;id2&lt;/code&gt; again with &lt;code&gt;name&lt;/code&gt; = &lt;code&gt;Carol&lt;/code&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Change Event Resolver
&lt;/h2&gt;

&lt;p&gt;Now it's time to create our second component that applies the change events generated by our &lt;code&gt;Change Event Generator&lt;/code&gt;. First, let's create a couple of helper functions:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;unionWithSchema&lt;/code&gt; - union dataframes while accounting for schema differences such as different column orders or mismatched columns, e.g. df1 has 2 columns: colA and colB but df2 only has colA.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;unionWithSchema&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataFrames&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt; &lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Option&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;DataFrame&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nf"&gt;if&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;dataFrames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;None&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;dataFrames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;head&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;sparkSession&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;distinctSchemas&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;dataFrames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;_&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;distinct&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;unionDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;if&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;distinctSchemas&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;size&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="nv"&gt;dataFrames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;tail&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foldLeft&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;dataFrames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;head&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;df1&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;union&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;allSchemas&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;distinctSchemas&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;flatten&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;distinct&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;sortBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;name&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;schemaWithAllColumns&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;StructType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;allSchemas&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;emptyDataFrame&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;createDataFrame&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;sparkContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;emptyRDD&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;],&lt;/span&gt; &lt;span class="n"&gt;schemaWithAllColumns&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;orderedColumns&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;emptyDataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

      &lt;span class="nv"&gt;dataFrames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foldLeft&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;emptyDataFrame&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
          &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;missingColumns&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;allSchemas&lt;/span&gt; &lt;span class="n"&gt;diff&lt;/span&gt; &lt;span class="nv"&gt;df2&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;schema&lt;/span&gt;
          &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;unionSafeDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;missingColumns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foldLeft&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df2&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;missingField&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;df&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;withColumn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;missingField&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;name&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;cast&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;missingField&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;dataType&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
          &lt;span class="o"&gt;}&lt;/span&gt;
          &lt;span class="nv"&gt;df1&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;union&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;unionSafeDf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;select&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;orderedColumns&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="nc"&gt;Some&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;unionDf&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;code&gt;applyChangeEventsByTimestamp&lt;/code&gt; - apply change events chronologically.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;applyChangeEventsByTimestamp&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataFrame&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;partitionCols&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;if&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;col&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="nv"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;colName&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="s"&gt;"old_$colName"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;window&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;Window&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;partitionBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;partitionCols&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;orderBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;desc&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;

    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;dfInitial&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;dataFrame&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;iteration&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;done&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;

    &lt;span class="c1"&gt;// Potentially expensive. Monitor logs and improve if required.&lt;/span&gt;
    &lt;span class="nf"&gt;while&lt;/span&gt; &lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;done&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;dfBefore&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;iteration&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;dfInitial&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nv"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foldLeft&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dfInitial&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
          &lt;span class="c1"&gt;// Update the old keys for UPDATE ops&lt;/span&gt;
          &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;colName&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;df&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;withColumn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="s"&gt;"old_$colName"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nf"&gt;when&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"changeType"&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="s"&gt;"update"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;colName&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;otherwise&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="s"&gt;"old_$colName"&lt;/span&gt;&lt;span class="o"&gt;)))&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;

      &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;dfAfter&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;dfBefore&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;withColumn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"rank"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;row_number&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="py"&gt;over&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;window&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;filter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"rank"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;drop&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"rank"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

      &lt;span class="n"&gt;done&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;dfAfter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;count&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="nv"&gt;dfBefore&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;count&lt;/span&gt;
      &lt;span class="n"&gt;dfInitial&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;dfAfter&lt;/span&gt;
      &lt;span class="n"&gt;iteration&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;iteration&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;dfInitial&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Resolving change events
&lt;/h2&gt;

&lt;p&gt;First, we will handle the most common scenario - new records getting added. We do not need to use &lt;code&gt;timestamp&lt;/code&gt; at this point. Notice that there are 2 records for &lt;code&gt;id2&lt;/code&gt; since &lt;code&gt;delete&lt;/code&gt; change event has not been applied yet.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;insertDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;splitChangeEventDf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;filter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"changeType"&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="s"&gt;"insert"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;drop&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"oldKeyNames"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"oldKeyValues"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;datalakeAndInsertDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;unionWithSchema&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;insertDf&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;datalakeDf&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;get&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;





&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+----------+---+-----+---------+
|changeType| id| name|timestamp|
+----------+---+-----+---------+
|    insert|id2|Carol|        3|
|      null|id1|Alice|        0|
|      null|id2|  Bob|        0|
+----------+---+-----+---------+
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now, let's handle updating and deleting records. In our example, there is only one unique primary key array so the code inside the fold left will be executed only once.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;updateDeleteDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;splitChangeEventDf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;filter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"changeType"&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="s"&gt;"update"&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"changeType"&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="s"&gt;"delete"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;distinctOldKeyNames&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;updateDeleteDf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;select&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"oldKeyNames"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;distinct&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;flatMap&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;_&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getSeq&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;

&lt;span class="c1"&gt;// Ideally there should only be one but primary key can be changed in some cases&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;updateDeleteExpandedDfs&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;distinctOldKeyNames&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foldLeft&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datalakeAndInsertDf&lt;/span&gt;&lt;span class="o"&gt;)(&lt;/span&gt;

    &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datalakeAndInsertDf&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

        &lt;span class="c1"&gt;// Step 1: For INSERT / Existing data, create new pkey columns (using existing values)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;datalakeAndInsertCeDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foldLeft&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datalakeAndInsertDf&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;colName&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;df&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;withColumn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="s"&gt;"old_$colName"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;colName&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;

        &lt;span class="c1"&gt;// Step 2: For UPDATE / DELETE, split the old keys array column&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;updateDeleteCeDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;zipWithIndex&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foldLeft&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;updateDeleteDf&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pKey&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;df&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;withColumn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="s"&gt;"old_${pKey._1}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"oldKeyValues"&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;pKey&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;_2&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;filter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"oldKeyNames"&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="n"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;drop&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"oldKeyNames"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"oldKeyValues"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;// Step 3: Union all the data&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;initialDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;unionWithSchema&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;datalakeAndInsertCeDf&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;updateDeleteCeDf&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;get&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;cache&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;

        &lt;span class="c1"&gt;// Step 4: Resolve the change events chronologically&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;resolvedDf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;applyChangeEventsByTimestamp&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;initialDf&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;// Step 5: Drop DELETE records and unnecessary columns&lt;/span&gt;
        &lt;span class="n"&gt;resolvedDf&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;filter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"changeType"&lt;/span&gt; &lt;span class="o"&gt;=!=&lt;/span&gt; &lt;span class="s"&gt;"delete"&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;$&lt;/span&gt;&lt;span class="s"&gt;"changeType"&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;isNull&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;drop&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;primaryKeyArr&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;colName&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="s"&gt;"old_$colName"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt;  &lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="c1"&gt;// Step 6: Remove unwanted columns and get rid of duplicate rows&lt;/span&gt;
&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nv"&gt;updateDeleteExpandedDfs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;drop&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"changeType"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;distinct&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Let's go through each step and visualize the output.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Enrich &lt;code&gt;datalakeAndInsertDf&lt;/code&gt; to include information about the existing primary keys. In this example, it would be &lt;code&gt;old_id&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; datalakeAndInsertCeDf.show
+----------+---+-----+---------+------+
|changeType| id| name|timestamp|old_id|
+----------+---+-----+---------+------+
|    insert|id2|Carol|        3|   id2|
|      null|id1|Alice|        0|   id1|
|      null|id2|  Bob|        0|   id2|
+----------+---+-----+---------+------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Enrich &lt;code&gt;updateDeleteDf&lt;/code&gt; to include information about the existing primary keys (&lt;code&gt;old_id&lt;/code&gt; in this example)&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; updateDeleteCeDf.show
+----------+---------+----+------+------+
|changeType|timestamp|  id|  name|old_id|
+----------+---------+----+------+------+
|    update|        1| id1|Angela|   id1|
|    delete|        2|null|  null|   id2|
+----------+---------+----+------+------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Union the dataframes from #1 and #2.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; initialDf.show
+----------+----+------+------+---------+
|changeType|  id|  name|old_id|timestamp|
+----------+----+------+------+---------+
|    insert| id2| Carol|   id2|        3|
|      null| id1| Alice|   id1|        0|
|      null| id2|   Bob|   id2|        0|
|    update| id1|Angela|   id1|        1|
|    delete|null|  null|   id2|        2|
+----------+----+------+------+---------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Apply change events chronologically as defined by our &lt;code&gt;applyChangeEventsByTimestamp()&lt;/code&gt; function based on &lt;code&gt;timestamp&lt;/code&gt;. At this point, we should only have one record per primary key (except for &lt;code&gt;delete&lt;/code&gt; records which will be removed in the next step).&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; resolvedDf.show
+----------+---+------+------+---------+
|changeType| id|  name|old_id|timestamp|
+----------+---+------+------+---------+
|    update|id1|Angela|   id1|        1|
|    insert|id2| Carol|   id2|        3|
+----------+---+------+------+---------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Remove records with &lt;code&gt;delete&lt;/code&gt; change type as they are not to be persisted. In our case, the &lt;code&gt;delete&lt;/code&gt; record is already overwritten by &lt;code&gt;insert&lt;/code&gt; so there is nothing to be removed. &lt;code&gt;old_id&lt;/code&gt; is also dropped since we do not need this information anymore.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+----------+---+------+---------+
|changeType| id|  name|timestamp|
+----------+---+------+---------+
|    update|id1|Angela|        1|
|    insert|id2| Carol|        3|
+----------+---+------+---------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Step 1-5 is to be repeated if there is any more combination of primary keys but since we only have one &lt;code&gt;id&lt;/code&gt; in this example, this is the end of the resolution. We just need to drop the &lt;code&gt;changeType&lt;/code&gt; and remove any duplicate record.&lt;br&gt;
&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;+---+------+---------+
| id|  name|timestamp|
+---+------+---------+
|id2| Carol|        3|
|id1|Angela|        1|
+---+------+---------+
&lt;/code&gt;&lt;/pre&gt;

&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;And, Voila! We have a dataframe with the applied changes ready to be persisted to our datalake in any format we want. &lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>database</category>
      <category>datapipeline</category>
    </item>
  </channel>
</rss>
