loading...

Handling upstream data changes via Change Data Capture

waiyan1612 profile image Wai Yan ・8 min read

Anyone who has managed a data pipeline would be aware of how the upstream data can change and how it can impact the entire pipeline. Imagine we have the following customer data in our datalake.

+---+-----+
| id| name|
+---+-----+
|id1|Alice|
|id2|  Bob|
+---+-----+

One month later, we realized that the customer name has changed from Alice to Carol and we have been using incorrect data for the past month. Such data inaccuracies can impact our data analysis and machine learning models. So, how can we detect it and how can we automate it?

Fortunately, with enough information, it is easy to update the data without over-complicating our existing pipeline. We need two components:

  1. Change Event Generator - to generate the create/ update/ delete events of our data
  2. Change Event Resolver - to apply these changes on our existing data

The need for the Change Event Resolver is debatable if we can afford to live on a bleeding edge by using Apache Hudi or Delta Lake. Instead of resolving the changes and keeping a single copy of the data, Hudi / Delta Lake can persist each revision of our data (timeline in Hudi and time travel in delta lake). This is undoubtedly more powerful but there are scenarios where these components will not simply fit into existing pipelines especially because they require the data to be stored in their own hudi or delta format instead of the common parquet or avro format. This means our data cataloging, data monitoring and data visualization now need to understand hudi or delta - which may not be ready in a year's time especially if these services are not developed in-house.

A model to capture the change events

Let's try to model a class that we can use to capture the changes.

case class ChangeEvent(
  changeType: String,           // type of change - INSERT, UPDATE or DELETE
  timestamp: String,            // when this change happened 
  columnNames: Seq[String],     // names of columns (Mandatory for INSERT / UPDATE)
  columnValues: Seq[String],    // values of columns (Mandatory for INSERT / UPDATE)
  oldKeyNames: Seq[String],     // names of old key columns (Mandatory for UPDATE / DELETE)
  oldKeyValues: Seq[String]     // values of old key columns (Mandatory for UPDATE / DELETE)
)

Most of the attributes are self-explanatory. oldKeyNames and oldKeyValues contain the primary keys / values of the old data which will be used in case of UPDATE / DELETE queries. We can also enrich the model with more attributes such as columnTypes and oldKeyTypes if we need to apply the changes differently based on the column data types.

This is commonly known as Change data capture (CDC) and is supported by many databases - including PostgreSQL, MySQL and MongoDB. In fact, our ChangeEvent class is a simplified output of wal2json - a PostgreSQL plugin used for logical replication.

Pros & Cons

There are 2 main advantages of using CDC:

  1. Although the primary purpose of this feature is to create database replicas and to migrate data, it becomes very powerful in data pipelines since the change is available in the pipeline almost immediately - making a real time pipeline possible.
  2. The upstream does not need to implement the notification logic in several places. To give an example, think of a movie review website. The pipeline needs to be informed when the user creates / updates / deletes his reviews - which means the pipeline needs to be informed across 3 different REST APIs. All it takes is one developer to forget notifying the pipeline and we will be losing data without anyone realising it.

The slight con here is that there is a slight performance degradation on the source database due to replication but the impact will differ based on the type of the database.

Questions, questions, questions

Now, after hearing this proposal, we might want to ask ourselves some questions.

  1. What if the data does not have the concept of a primary key (i.e. no oldKeyNames or oldKeyValues)?
  2. What if the primary key changes?
  3. What if I add a column or rename a column?
  4. What if I remove a column?
  5. What if a column changes its data type?

Unfortunately, if our data does not have a primary key, CDC will not work at all (unless we are only interested in INSERTs and do not care about UPDATEs or DELETEs). In this case, the best would be to add a UUID or a sequence column to be used as the primary key. Primary key changes rarely happen to properly designed data models but in the unavoidable case, it would be best to treat post-change data as a new data model and get a snapshot of the data.

The rest of the questions can be addressed within our code when we apply the change events to our existing data but these will not be covered in this post.

Now, let's see how it works with some examples. I am using Apache Spark 2.4 in this example but the same logic can be reimplemented in other frameworks as well.

Mocking the existing data

First, we are going to mock some data that already exists in our datalake. Notice that timestamp column is present in the data previously written to the datalake since we need to apply the change events based on this column.

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window

val schema = StructType(Seq("timestamp", "id", "name").map(StructField(_, StringType)))
val datalakeDf = spark.createDataFrame(spark.sparkContext.parallelize(Seq(
    Row("0", "id1", "Alice"),
    Row("0", "id2", "Bob")
)), schema)
+---------+---+-----+
|timestamp| id| name|
+---------+---+-----+
|        0|id1|Alice|
|        0|id2|  Bob|
+---------+---+-----+

Change Event Generator

Next is to populate the change events that we want to apply to this data that our Change Event Generator would produce. The implementation of this generator is source specific so we won't be covering this in this post.

Note that oldKeyNames and oldKeyValues are not present for insert. Similarly, columnNames and columnValues are not required for delete.

val COLUMN_NAMES = Seq("id", "name")  // column names of the data
val OLD_KEY_NAMES = Seq("id")         // column names of the primary key

val changeEventDf = spark.createDataFrame(Seq(
    ChangeEvent("update", "1", COLUMN_NAMES, Seq("id1", "Angela"), OLD_KEY_NAMES, Seq("id1")),
    ChangeEvent("delete", "2", null, null, OLD_KEY_NAMES, Seq("id2")),
    ChangeEvent("insert", "3", COLUMN_NAMES, Seq("id2", "Carol"), null, null)
))
+----------+---------+-----------+-------------+-----------+------------+
|changeType|timestamp|columnNames| columnValues|oldKeyNames|oldKeyValues|
+----------+---------+-----------+-------------+-----------+------------+
|    update|        1| [id, name]|[id1, Angela]|       [id]|       [id1]|
|    delete|        2|       null|         null|       [id]|       [id2]|
|    insert|        3| [id, name]| [id2, Carol]|       null|        null|
+----------+---------+-----------+-------------+-----------+------------+

Let's split the columnNames and columnValues columns so that we have a dataframe similar to our existing data. But we will keep the oldKeyNames and oldKeyValues since we still need to use them.

val splitChangeEventDf = COLUMN_NAMES.zipWithIndex.foldLeft(changeEventDf) {
    (df, column) => df.withColumn(column._1, $"columnValues"(column._2))
}.drop("columnNames", "columnValues")
+----------+---------+-----------+------------+----+------+
|changeType|timestamp|oldKeyNames|oldKeyValues|  id|  name|
+----------+---------+-----------+------------+----+------+
|    update|        1|       [id]|       [id1]| id1|Angela|
|    delete|        2|       [id]|       [id2]|null|  null|
|    insert|        3|       null|        null| id2| Carol|
+----------+---------+-----------+------------+----+------+

According to these change events, we should apply the changes in this order.

  1. update name of id1 from Alice to Angela
  2. delete id2
  3. insert id2 again with name = Carol

Change Event Resolver

Now it's time to create our second component that applies the change events generated by our Change Event Generator. First, let's create a couple of helper functions:

unionWithSchema - union dataframes while accounting for schema differences such as different column orders or mismatched columns, e.g. df1 has 2 columns: colA and colB but df2 only has colA.

  def unionWithSchema(dataFrames: DataFrame *): Option[DataFrame] = {

    if(dataFrames.isEmpty) {
      return None
    }
    val spark = dataFrames.head.sparkSession
    val distinctSchemas = dataFrames.map(_.schema.toList).distinct
    val unionDf = if(distinctSchemas.size == 1) {
      dataFrames.tail.foldLeft(dataFrames.head) {
        (df1, df2) => df1.union(df2)
      }
    } else {
      val allSchemas = distinctSchemas.flatten.distinct.sortBy(schema => schema.name)
      val schemaWithAllColumns = StructType(allSchemas)
      val emptyDataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schemaWithAllColumns)
      val orderedColumns = emptyDataFrame.columns.map(col)

      dataFrames.foldLeft(emptyDataFrame) {
        (df1, df2) => {
          val missingColumns = allSchemas diff df2.schema
          val unionSafeDf = missingColumns.foldLeft(df2) {
            (df, missingField) => df.withColumn(missingField.name, lit(null).cast(missingField.dataType))
          }
          df1.union(unionSafeDf.select(orderedColumns: _*))
        }
      }
    }
    Some(unionDf)
}

applyChangeEventsByTimestamp - apply change events chronologically.

  def applyChangeEventsByTimestamp(dataFrame: DataFrame, primaryKeyArr: Array[String]): DataFrame = {

    val partitionCols = if(primaryKeyArr.isEmpty) {
      dataFrame.columns.map(col)
    } else {
      primaryKeyArr.map(colName => col(s"old_$colName"))
    }
    val window = Window.partitionBy(partitionCols: _*).orderBy(desc("timestamp"))

    var dfInitial = dataFrame
    var iteration = 0
    var done = false

    // Potentially expensive. Monitor logs and improve if required.
    while (!done) {
      val dfBefore = if (iteration == 0) {
        dfInitial
      } else {
        primaryKeyArr.foldLeft(dfInitial) {
          // Update the old keys for UPDATE ops
          (df, colName) => df.withColumn(s"old_$colName",
            when($"changeType" === "update", col(colName)).otherwise(col(s"old_$colName")))
        }
      }

      val dfAfter = dfBefore.withColumn("rank", row_number().over(window))
        .filter(col("rank") === 1)
        .drop("rank")

      done = dfAfter.count == dfBefore.count
      dfInitial = dfAfter
      iteration = iteration + 1
    }
    dfInitial
}

Resolving change events

First, we will handle the most common scenario - new records getting added. We do not need to use timestamp at this point. Notice that there are 2 records for id2 since delete change event has not been applied yet.

val insertDf = splitChangeEventDf.filter($"changeType" === "insert").drop("oldKeyNames", "oldKeyValues")
val datalakeAndInsertDf = unionWithSchema(insertDf, datalakeDf).get
+----------+---+-----+---------+
|changeType| id| name|timestamp|
+----------+---+-----+---------+
|    insert|id2|Carol|        3|
|      null|id1|Alice|        0|
|      null|id2|  Bob|        0|
+----------+---+-----+---------+

Now, let's handle updating and deleting records. In our example, there is only one unique primary key array so the code inside the fold left will be executed only once.

val updateDeleteDf = splitChangeEventDf.filter($"changeType" === "update" || $"changeType" === "delete")
val distinctOldKeyNames = updateDeleteDf.select("oldKeyNames").distinct.collect.flatMap(_.getSeq[String](0))

// Ideally there should only be one but primary key can be changed in some cases
val updateDeleteExpandedDfs = distinctOldKeyNames.foldLeft(datalakeAndInsertDf)(

    (datalakeAndInsertDf, primaryKeyArr) => {

        // Step 1: For INSERT / Existing data, create new pkey columns (using existing values)
        val datalakeAndInsertCeDf = primaryKeyArr.foldLeft(datalakeAndInsertDf) {
            (df, colName) => df.withColumn(s"old_$colName", col(colName))
        }

        // Step 2: For UPDATE / DELETE, split the old keys array column
        val updateDeleteCeDf = primaryKeyArr.zipWithIndex.foldLeft(updateDeleteDf) {
            (df, pKey) => df.withColumn(s"old_${pKey._1}", $"oldKeyValues"(pKey._2))
        }
        .filter($"oldKeyNames" === primaryKeyArr)
        .drop("oldKeyNames", "oldKeyValues")

        // Step 3: Union all the data
        val initialDf = unionWithSchema(datalakeAndInsertCeDf, updateDeleteCeDf).get.cache()

        // Step 4: Resolve the change events chronologically
        val resolvedDf = applyChangeEventsByTimestamp(initialDf, primaryKeyArr)

        // Step 5: Drop DELETE records and unnecessary columns
        resolvedDf
            .filter($"changeType" =!= "delete" || $"changeType".isNull)
            .drop(primaryKeyArr.map(colName => s"old_$colName"):  _*)
    }
}
// Step 6: Remove unwanted columns and get rid of duplicate rows
return updateDeleteExpandedDfs.drop("changeType").distinct()

Let's go through each step and visualize the output.

  1. Enrich datalakeAndInsertDf to include information about the existing primary keys. In this example, it would be old_id.

    scala> datalakeAndInsertCeDf.show
    +----------+---+-----+---------+------+
    |changeType| id| name|timestamp|old_id|
    +----------+---+-----+---------+------+
    |    insert|id2|Carol|        3|   id2|
    |      null|id1|Alice|        0|   id1|
    |      null|id2|  Bob|        0|   id2|
    +----------+---+-----+---------+------+
    
  2. Enrich updateDeleteDf to include information about the existing primary keys (old_id in this example)

    scala> updateDeleteCeDf.show
    +----------+---------+----+------+------+
    |changeType|timestamp|  id|  name|old_id|
    +----------+---------+----+------+------+
    |    update|        1| id1|Angela|   id1|
    |    delete|        2|null|  null|   id2|
    +----------+---------+----+------+------+
    
  3. Union the dataframes from #1 and #2.

    scala> initialDf.show
    +----------+----+------+------+---------+
    |changeType|  id|  name|old_id|timestamp|
    +----------+----+------+------+---------+
    |    insert| id2| Carol|   id2|        3|
    |      null| id1| Alice|   id1|        0|
    |      null| id2|   Bob|   id2|        0|
    |    update| id1|Angela|   id1|        1|
    |    delete|null|  null|   id2|        2|
    +----------+----+------+------+---------+
    
  4. Apply change events chronologically as defined by our applyChangeEventsByTimestamp() function based on timestamp. At this point, we should only have one record per primary key (except for delete records which will be removed in the next step).

    scala> resolvedDf.show
    +----------+---+------+------+---------+
    |changeType| id|  name|old_id|timestamp|
    +----------+---+------+------+---------+
    |    update|id1|Angela|   id1|        1|
    |    insert|id2| Carol|   id2|        3|
    +----------+---+------+------+---------+
    
  5. Remove records with delete change type as they are not to be persisted. In our case, the delete record is already overwritten by insert so there is nothing to be removed. old_id is also dropped since we do not need this information anymore.

    +----------+---+------+---------+
    |changeType| id|  name|timestamp|
    +----------+---+------+---------+
    |    update|id1|Angela|        1|
    |    insert|id2| Carol|        3|
    +----------+---+------+---------+
    
  6. Step 1-5 is to be repeated if there is any more combination of primary keys but since we only have one id in this example, this is the end of the resolution. We just need to drop the changeType and remove any duplicate record.

    +---+------+---------+
    | id|  name|timestamp|
    +---+------+---------+
    |id2| Carol|        3|
    |id1|Angela|        1|
    +---+------+---------+
    

And, Voila! We have a dataframe with the applied changes ready to be persisted to our datalake in any format we want.

Posted on by:

waiyan1612 profile

Wai Yan

@waiyan1612

Data Engineer and Machine Learning Enthusiast

Discussion

pic
Editor guide